One of the things that i like about Spark is, that it allows you to use you MapReduce based
InputFormat
and
OutputFormats
for reading from and writing to.
I wanted to try this i built the InputFormatOutputDriver class, that uses TextInputFormat for reading a file. Then uses that input to
perform word count and finally uses TextOutputFormat for storing output
As you can see most of the code is similar to
WordCount program built using Apache Spark in Java , with difference that this is written in scala and following 2 lines
When you want to use Hadoop API for reading data you should use
sparkContext.newAPIHadoopFile()
method, i am using version of the method that takes 4 parameters. First is path of input file, second parameter is the
InputFormat
class you want to use (I want to read file as Key - Value pair so i am using
KeyValueTextInputFormat
), then the next parameters is type of Key and Type of value, its
Text
for both key and value in my example and the last. Spark will read the file into a PairRDD[Text,Text], since i am only interested in the content of the file i am iterating through the keys and converting them from Text to String
val lines = sparkContext.newAPIHadoopFile(inputFile,classOf[KeyValueTextInputFormat],
classOf[Text],classOf[Text]).keys.map(lineText => lineText.toString)
Once i have RDD[String] i can perform wordcount with it. But once the results are ready i am calling
wordCountRDD.saveAsNewAPIHadoopFile()
for storing data in Hadoop using
TextOutputFormat
.
wordCountRDD.saveAsNewAPIHadoopFile(outputFile,classOf[Text],classOf[IntWritable],
classOf[TextOutputFormat[Text,IntWritable]])