Usually when I want to convert a JSON file to a CSV I will write a simple script in PHP. Lately I've been playing more with Apache Spark and wanted to try converting a 600MB JSON file to a CSV using a 3 node cluster I have setup. The JSON file itself contains a nested structure so it took a little fiddling to get it right, but overall I'm impressed with the speed of the execution.
So I decided to take the JSON data and put it on the HDFS (Hadoop Filesystem). My setup consists of 3 RHEL 7 boxes running Spark and Hadoop in cluster mode.
So I uploaded some json file containing a bunch of keyword data to my home folder (/home/tegan). The ran the following to move it to HDFS.
dzdo -s
hdfs dfs -mkdir /keywordData
hdfs dfs -put /tegan/keywordData.json /keywordData
# verify the folder shows up
hdfs dfs -ls
#verify the file shows up
hdfs dfs -ls /keywordData
Ok then I decided to try a spark-shell and write some scala to convert the JSON to CSV:
val json_path = "hdfs:///keywordData/keywordData.json"
val df = spark.read.json(json_path)
df.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save("keywordData.csv")
Executing that on the spark-shell caused a java heap error with memory:
17/02/07 16:00:33 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
I thought this would be an issue with such a large JSON file, luckily Spark's ability to cluster a job can solve my memory issue. Instead of running this on one server, I can distribute it out to my other worker nodes by compiling the Scala code to a JAR.
To setup the structure of the Scala project i did the following:
mkdir -p /home/tegan/KeywordData/src/main/scala/com/tegan/spark/keyworddata/
cd /home/tegan/KeywordData/src/main/scala/com/tegab/spark/keyworddata/
Then in the folder I created a KeywordData.scala file with the following in it:
vi /home/tegan/KeywordData/src/main/scala/com/tegab/spark/keyworddata/KeywordData.scala
package com.tegan.spark.keyworddata
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SparkSession
object KeywordData {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Keyword Data")
val sc = new SparkContext(sparkConf)
val sparkSession = SparkSession.builder.getOrCreate()
import sparkSession.implicits._
val json_path = "hdfs://hadoop-master:9000/keywordData/keywordData.json"
val df = sparkSession.read.json(json_path)
df.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save("hdfs://hadoop-master:9000/keywordData/keywordData.csv")
}
}
To compile the Scala above into a JAR I'm using SBT which requires a project file. This is created by switching directories to /home/tegan/KeywordData/ and creating a KeywordData.sbt file.
cd /home/tegan/KeywordData
vi ExternalSearch.sbt
name := "KeywordData"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.0.2",
"org.apache.spark" %% "spark-sql" % "2.0.2"
)
Then compile the Scala project to a JAR with:
sbt package
When you are ready to submit the submit to Spark issue:
/opt/spark/bin/spark-submit \
--class com.tegan.spark.keyworddata.KeywordData \
--master spark://spark-master:7077 \
--deploy-mode=client \
/home/tegan/KeywordData/target/scala-2.11/externalsearch_2.11-1.0.jar
It starts to run for a bit then I get this error:
Exception in thread "main" java.lang.UnsupportedOperationException: CSV data source does not support array
I'm guessing it is because the JSON data contains a nested format. Here is an example of what it looks like:
{
"keyword_data": [{
"value": "some keyword term here",
"type": "Keyword Phrase",
"data_points": [{
"name": "Sessions",
"value": 173628
}, {
"name": "Users",
"value": 158454
}, {
"name": "Views",
"value": 221868
}]
},{
"value": "another keyword term here",
"type": "Keyword Phrase",
"data_points": [{
"name": "Sessions",
"value": 32432
}, {
"name": "Users",
"value": 2333
}, {
"name": "Views",
"value": 3332111
}]
}]
}
So a little altercations to the Scala code to read these nested values into a data frame looks like this:
package com.tegan.spark.keyworddata
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SparkSession
object ExternalSearch {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Keyword Data")
val sc = new SparkContext(sparkConf)
val spark = SparkSession.builder.getOrCreate()
import spark.implicits._
val json_path = "hdfs://hadoop-master:9000/keywordData/keywordData.json"
val df = spark.read.json(json_path)
val df_2 = spark.read.json(json_path)
var xp_df_1 = df.withColumn("term_flat", explode(df("keyword_data")))
var xp_df_2 = xp_df_1.drop(xp_df_1.col("keyword_data"))
var xp_df_data_points = xp_df_2.withColumn("data_points", xp_df_2("term_flat.data_points"))
var xp_df_name = xp_df_data_points.withColumn("m_guid", xp_df_data_points("measure.name"))
var xp_df_name_val = xp_df_name.withColumn("m_name", xp_df_name("measure.value"))
var xp_df_final = xp_df_name_val.drop(xp_df_name_val.col("term_flat"))
var final_df = xp_df_final.drop(xp_df_final.col("data_points"))
final_df.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save("hdfs://hadoop-master:9000/keywordData/keywordData.csv")
}
}
Recompile the new Scala code to a JAR using "sbt package" then submit it again and it should run, placing the final results in the HDFS location called out in the write option above.
The more I play with data, building ETL pipelines, working on marketing data applications, and working hand in hand with sales data, the more power I see to developing a good understanding of tools like Apache Spark, Airbnb's Airflow, and Elastic Stack. More to come!