Handling Dynamic JSON Schemas in Apache Spark: A Step-by-Step Guide Using Scala

Prasad Khode
3 min readAug 21, 2024

In the world of big data, working with JSON data is a common task. However, handling JSON schemas that may vary or are not predefined can be challenging, especially when working with large datasets in Apache Spark. In this tutorial, I’ll guide you through the process of dynamically inferring and handling JSON schemas using Scala in a Spark environment.

Why Dynamic JSON Handling?

When processing JSON data in Spark, the schema can either be predefined or inferred. In scenarios where the schema is not known upfront or may vary, dynamically inferring the schema becomes crucial. This approach allows for greater flexibility and reduces the risk of errors related to schema mismatches.

Step 1: Setting Up the DataFrame

Let’s start by creating a sample DataFrame. Suppose you have a DataFrame with an id and an address column, where address contains a JSON array as a string:


import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// New sample data with varied JSON structures
val data = Seq(
("a1", """[{"address_category":"Home","address_line_1":"123 Main St","pincode":"10001","state_code":"NY","address_type":"Residential","residence_code":"RC1"}]"""),
("b2", """[{"address_category":"Work","address_line_1":"456 Elm St","pincode":"20002","state_code":"CA","address_type":"Office","residence_code":"RC2"},{"address_category":"Secondary Home","address_line_1":"789 Oak St","pincode":"30003","state_code":"TX","address_type":"Secondary","residence_code":"RC3"}]"""),
("c3", """[{"address_category":"Permanent","address_line_1":"321 Pine St","pincode":"40004","state_code":"FL","address_type":"Primary","enrichMode":"R","residence_code":"RC4"},{"address_category":"Vacation Home","address_line_1":"654 Cedar St","pincode":"50005","state_code":"CO","address_type":"Vacation","enrichMode":"R","residence_code":"RC5"}]"""),
("d4", """[{"address_category":"Billing","address_line_1":"987 Birch St","pincode":"60006","state_code":"NV","address_type":"Billing","enrichMode":"M","residence_code":"RC6"},{"address_category":"Shipping","address_line_1":"321 Spruce St","address_line_2":"Apt 5","pincode":"70007","state_code":"AZ","address_type":"Shipping","enrichMode":"M","residence_code":"RC7"},{"address_category":"Temporary","address_line_1":"123 Willow St","pincode":"80008","state_code":"WA","address_type":"Temporary","enrichMode":"M","residence_code":"RC8"}]"""),
("e5", """[{"address_category":"Office","address_line_1":"999 Maple St","address_line_2":"Suite 100","pincode":"90009","state_code":"IL","address_type":"Office","residence_code":"RC9"}]""")
)

val df = data.toDF("id", "address")
df.show(false)

Step 2: Inferring the JSON Schema Dynamically

One of the strengths of Spark is its ability to infer schemas on the fly. To handle the JSON data dynamically, we first need to infer the schema from the JSON string:

// Infer the schema by selecting the first row
val jsonSchema = spark.read.json(df.select("address").as[String]).schema

// Print the inferred schema for verification
println(jsonSchema.prettyJson)

In this step, we read the JSON string from the address column, and Spark automatically infers the schema. This schema is crucial for the next steps, where we'll parse and explode the JSON data.

Step 3: Parsing the JSON Column

With the inferred schema in hand, we can now parse the JSON column. This process converts the JSON string into a structured column:

// Parse the JSON column using the inferred schema
val dfParsed = df.withColumn("address", from_json(col("address"), ArrayType(jsonSchema)))

Step 4: Exploding and Flattening the JSON Data

After parsing, the address column is now an array of structs. To work with this data effectively, we need to explode this array into individual rows and flatten the nested fields:

// Explode the array of JSON objects into separate rows
val dfExploded = dfParsed.withColumn("address", explode(col("address")))

// Select and flatten the individual fields
val dfFinal = dfExploded.select(
col("id"),
col("address.*") // This will select all fields from the exploded JSON object
)

// Show the final DataFrame
dfFinal.show(false)

Result / Output:

+---+----------------+--------------+--------------+------------+----------+-------+--------------+----------+
|id |address_category|address_line_1|address_line_2|address_type|enrichMode|pincode|residence_code|state_code|
+---+----------------+--------------+--------------+------------+----------+-------+--------------+----------+
|a1 |Home |123 Main St |NULL |Residential |NULL |10001 |RC1 |NY |
|b2 |Work |456 Elm St |NULL |Office |NULL |20002 |RC2 |CA |
|b2 |Secondary Home |789 Oak St |NULL |Secondary |NULL |30003 |RC3 |TX |
|c3 |Permanent |321 Pine St |NULL |Primary |R |40004 |RC4 |FL |
|c3 |Vacation Home |654 Cedar St |NULL |Vacation |R |50005 |RC5 |CO |
|d4 |Billing |987 Birch St |NULL |Billing |M |60006 |RC6 |NV |
|d4 |Shipping |321 Spruce St |Apt 5 |Shipping |M |70007 |RC7 |AZ |
|d4 |Temporary |123 Willow St |NULL |Temporary |M |80008 |RC8 |WA |
|e5 |Office |999 Maple St |Suite 100 |Office |NULL |90009 |RC9 |IL |
+---+----------------+--------------+--------------+------------+----------+-------+--------------+----------+

Final Thoughts

Handling JSON data with unknown or dynamic schemas is a common requirement when processing large datasets. By dynamically inferring the schema and using Spark’s powerful functions, you can efficiently manage and process JSON data in your Scala-based Spark applications.

The approach outlined here ensures that you can handle any variations in your JSON data without hardcoding schemas, making your Spark applications more robust and adaptable to changing data structures.

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app