How to read and write JSON files with Spark

I wanted to build a Spark program that would read text file where every line in the file was a Complex JSON object like this. I wanted to parse the file and filter out few records and write output back as file. You can download the project from GitHub

      "line1":"1 main street",
There are lot of options when it comes to parsing JSON, but i decide to use Jackscon parser as i am comfortable with it. So add the Jackson dependencies in the pom.xml Next i did create Spark Driver program like this, it takes 2 parameters. One is inputFile which is path to input file that contains JSON and other is outputFile which is path to output Folder where the output should be saved. In my code i did define 2 classes Person and Address which are simple beans with json related annotations. Most of the Spark code is similar to what it would look like for dealing with say comma separate file or simple text file. Only difference is line 52 to 60, where i am calling the ObjectMapper.readValue() method to convert the line of input into object of Person class. Now if the JSON is invalid it throws exception like you can see at the bottom of the post. So to deal with it i am catching exception and every time there is exception i am increment errorRecords by one

2015-12-31 07:52:44 ERROR TaskSetManager:75 - Task 0 in stage 0.0 failed 1 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): com.fasterxml.jackson.databind.JsonMappingException: Unexpected end-of-input: was expecting closing quote for a string value
 at [Source: {"first":"VVS","last":"Laxman" , "address": {"line1":"1 main street", "city":"Hyderabad","state":"AN","zip":"121212}}; line: 1, column: 235] (through reference chain: com.spnotes.spark.Person["address"]->com.spnotes.spark.Address["zip"])
 at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(
 at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(
 at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.wrapAndThrow(
 at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(
 at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(
 at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(
 at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(
 at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(
 at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(
 at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(
 at com.fasterxml.jackson.databind.ObjectMapper.readValue(
 at com.spnotes.spark.NetcatStreamClient$$anonfun$4.apply(NetcatStreamClient.scala:74)
 at com.spnotes.spark.NetcatStreamClient$$anonfun$4.apply(NetcatStreamClient.scala:72)
 at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:413)
 at scala.collection.Iterator$class.foreach(Iterator.scala:742)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
 at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
 at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
 at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.executor.Executor$
 at java.util.concurrent.ThreadPoolExecutor.runWorker(
 at java.util.concurrent.ThreadPoolExecutor$
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input: was expecting closing quote for a string value
 at [Source: {"first":"VVS","last":"Laxman" , "address": {"line1":"1 main street", "city":"Hyderabad","state":"AN","zip":"121212}}; line: 1, column: 235]
 at com.fasterxml.jackson.core.JsonParser._constructError(
 at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(
 at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(
 at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._finishString2(
 at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._finishString(
 at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.getText(
 at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(
 at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(
 at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(
 at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(
 at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(
 ... 23 more

Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
 at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at scala.Option.foreach(Option.scala:257)
 at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
 at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
 at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
 at org.apache.spark.util.EventLoop$$anon$

Process finished with exit code 1


Khaja Asmath said...
This comment has been removed by the author.
Jegan Blog said...

Hi, Have you tried xml proessing the way how we are doing for json? Could you please provide the sample. Consider that i am reading the data which is in xml form from kafka and i ll consume and process it in spark. ( since its a streaming i ll not keep it in any physical path) Thank you.

sangi yadav said...

I really appreciate information shared above. It’s of great help. If someone want to learn Online (Virtual) instructor lead live training in APACHE SPARK , kindly contact us
MaxMunus Offer World Class Virtual Instructor led training On APACHE SPARK . We have industry expert trainer. We provide Training Material and Software Support. MaxMunus has successfully conducted 100000+ trainings in India, USA, UK, Australlia, Switzerland, Qatar, Saudi Arabia, Bangladesh, Bahrain and UAE etc.
For Demo Contact us.
Saurabh Srivastava
Skype id: saurabhmaxmunus
Ph:+91 8553576305 / 080 - 41103383

Vikas Chaudhary said...

Battery Mantra is Authorized exide car battery dealer in Noida and Greater Noida. We are providing our service in Indirapuram, Delhi, Ashok Nagar.

Exide Battery Dealer in Noida
Battery Dealer in Noida
Authorized Battery Dealer in Noida
Car Battery Dealer in Noida
Car Battery Dealer
Exide Battery Dealer

EG MEDI said... is online medical store pharmacy in laxmi nagar Delhi. You can Order prescription/OTC medicines online. Cash on Delivery available. Free Home Delivery

Online Pharmacy in Delhi
Buy Online medicine in Delhi
Online Pharmacy in laxmi nagar
Buy Online medicine in laxmi nagar
Onine Medical Store in Delhi
Online Medical store in laxmi nagar
Online medicine store in delhi
online medicine store in laxmi nagar
Purchase Medicine Online
Online Pharmacy India
Online Medical Store

Teju Teju said...

It was really a nice article and i was really impressed by reading this Big data hadoop online Training India