{
"first":"Sachin",
"last":"Tendulkar",
"address":{
"line1":"1 main street",
"city":"mumbai",
"state":"ms",
"zip":"12345"
}
}
There are lot of options when it comes to parsing JSON, but i decide to use Jackscon parser as i am comfortable with it. So add the Jackson dependencies in the pom.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<dependency> | |
<groupId>com.fasterxml.jackson.core</groupId> | |
<artifactId>jackson-core</artifactId> | |
<version>2.6.4</version> | |
</dependency> | |
<dependency> | |
<groupId>com.fasterxml.jackson.core</groupId> | |
<artifactId>jackson-annotations</artifactId> | |
<version>2.6.4</version> | |
</dependency> | |
<dependency> | |
<groupId>com.fasterxml.jackson.core</groupId> | |
<artifactId>jackson-databind</artifactId> | |
<version>2.6.4</version> | |
</dependency> |
inputFile
which is path to input file that contains JSON and other is outputFile
which is path to output Folder where the output should be saved.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.spnotes.spark | |
import com.fasterxml.jackson.annotation.JsonProperty | |
import com.fasterxml.jackson.core.JsonParseException | |
import com.fasterxml.jackson.databind.ObjectMapper | |
import com.typesafe.scalalogging.Logger | |
import org.apache.spark.{SparkContext, SparkConf} | |
import org.slf4j.LoggerFactory | |
/** | |
* Created by sunilpatil on 12/31/15. | |
*/ | |
class Person { | |
@JsonProperty var first: String = null | |
@JsonProperty var last: String = null | |
@JsonProperty var address: Address = null | |
override def toString = s"Person(first=$first, last=$last, address=$address)" | |
} | |
class Address { | |
@JsonProperty var line1: String = null | |
@JsonProperty var line2: String = null | |
@JsonProperty var city: String = null | |
@JsonProperty var state: String = null | |
@JsonProperty var zip: String = null | |
override def toString = s"Address(line1=$line1, line2=$line2, city=$city, state=$state, zip=$zip)" | |
} | |
object JSONFileReaderWriter { | |
val logger = Logger(LoggerFactory.getLogger("JSONFileReaderWriter")) | |
val mapper = new ObjectMapper() | |
def main(argv: Array[String]): Unit = { | |
if (argv.length != 2) { | |
println("Please provide 2 parameters <inputfile> <outputfile>") | |
System.exit(1) | |
} | |
val inputFile = argv(0) | |
val outputFile = argv(1) | |
logger.debug(s"Read json from $inputFile and write to $outputFile") | |
val sparkConf = new SparkConf().setMaster("local[1]").setAppName("JSONFileReaderWriter") | |
val sparkContext = new SparkContext(sparkConf) | |
val errorRecords = sparkContext.accumulator(0) | |
val records = sparkContext.textFile(inputFile) | |
var results = records.flatMap { record => | |
try { | |
Some(mapper.readValue(record, classOf[Person])) | |
} catch { | |
case e: Exception => { | |
errorRecords += 1 | |
None | |
} | |
} | |
}.filter(person => person.address.city.equals("mumbai")) | |
results.saveAsTextFile(outputFile) | |
println("Number of bad records " + errorRecords) | |
} | |
} |
errorRecords
by one
2015-12-31 07:52:44 ERROR TaskSetManager:75 - Task 0 in stage 0.0 failed 1 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): com.fasterxml.jackson.databind.JsonMappingException: Unexpected end-of-input: was expecting closing quote for a string value
at [Source: {"first":"VVS","last":"Laxman" , "address": {"line1":"1 main street", "city":"Hyderabad","state":"AN","zip":"121212}}; line: 1, column: 235] (through reference chain: com.spnotes.spark.Person["address"]->com.spnotes.spark.Address["zip"])
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:210)
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:177)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.wrapAndThrow(BeanDeserializerBase.java:1474)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:260)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:125)
at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:520)
at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:101)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:258)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:125)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3736)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726)
at com.spnotes.spark.NetcatStreamClient$$anonfun$4.apply(NetcatStreamClient.scala:74)
at com.spnotes.spark.NetcatStreamClient$$anonfun$4.apply(NetcatStreamClient.scala:72)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:413)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input: was expecting closing quote for a string value
at [Source: {"first":"VVS","last":"Laxman" , "address": {"line1":"1 main street", "city":"Hyderabad","state":"AN","zip":"121212}}; line: 1, column: 235]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:470)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._finishString2(ReaderBasedJsonParser.java:1760)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._finishString(ReaderBasedJsonParser.java:1747)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.getText(ReaderBasedJsonParser.java:233)
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:32)
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:11)
at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:520)
at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:101)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:258)
... 23 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Process finished with exit code 1
5 comments:
Hi, Have you tried xml proessing the way how we are doing for json? Could you please provide the sample. Consider that i am reading the data which is in xml form from kafka and i ll consume and process it in spark. ( since its a streaming i ll not keep it in any physical path) Thank you.
It was really a nice article and i was really impressed by reading this Big data hadoop online Training India
Thanks for info....
Website development in Bangalore
Very nice blog post. Thanks for sharing such helpful blog post. Keep posting in future also.
Web Development Company in Bangalore
Website Development Company in Bangalore
Post a Comment