I wanted to build a Spark program that would read text file where every line in the file was a Complex JSON object like this. I wanted to parse the file and filter out few records and write output back as file. You can download the project from
GitHub
{
"first":"Sachin",
"last":"Tendulkar",
"address":{
"line1":"1 main street",
"city":"mumbai",
"state":"ms",
"zip":"12345"
}
}
There are lot of options when it comes to parsing JSON, but i decide to use
Jackscon parser as i am comfortable with it. So add the Jackson dependencies in the pom.xml
Next i did create Spark Driver program like this, it takes 2 parameters. One is
inputFile
which is path to input file that contains JSON and other is
outputFile
which is path to output Folder where the output should be saved.
In my code i did define 2 classes Person and Address which are simple beans with json related annotations. Most of the Spark code is similar to what it would look like for dealing with say comma separate file or simple text file. Only difference is line 52 to 60, where i am calling the ObjectMapper.readValue() method to convert the line of input into object of Person class. Now if the JSON is invalid it throws exception like you can see at the bottom of the post. So to deal with it i am catching exception and every time there is exception i am increment
errorRecords
by one
2015-12-31 07:52:44 ERROR TaskSetManager:75 - Task 0 in stage 0.0 failed 1 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): com.fasterxml.jackson.databind.JsonMappingException: Unexpected end-of-input: was expecting closing quote for a string value
at [Source: {"first":"VVS","last":"Laxman" , "address": {"line1":"1 main street", "city":"Hyderabad","state":"AN","zip":"121212}}; line: 1, column: 235] (through reference chain: com.spnotes.spark.Person["address"]->com.spnotes.spark.Address["zip"])
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:210)
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:177)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.wrapAndThrow(BeanDeserializerBase.java:1474)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:260)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:125)
at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:520)
at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:101)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:258)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:125)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3736)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726)
at com.spnotes.spark.NetcatStreamClient$$anonfun$4.apply(NetcatStreamClient.scala:74)
at com.spnotes.spark.NetcatStreamClient$$anonfun$4.apply(NetcatStreamClient.scala:72)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:413)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input: was expecting closing quote for a string value
at [Source: {"first":"VVS","last":"Laxman" , "address": {"line1":"1 main street", "city":"Hyderabad","state":"AN","zip":"121212}}; line: 1, column: 235]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:470)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._finishString2(ReaderBasedJsonParser.java:1760)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._finishString(ReaderBasedJsonParser.java:1747)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.getText(ReaderBasedJsonParser.java:233)
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:32)
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:11)
at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:520)
at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:101)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:258)
... 23 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Process finished with exit code 1