How to use Hadoop's InputFormat and OutputFormat in Spark

One of the things that i like about Spark is, that it allows you to use you MapReduce based InputFormat and OutputFormats for reading from and writing to. I wanted to try this i built the InputFormatOutputDriver class, that uses TextInputFormat for reading a file. Then uses that input to perform word count and finally uses TextOutputFormat for storing output
package com.spnotes.spark
import com.typesafe.scalalogging.Logger
import org.apache.hadoop.mapreduce.lib.input.{KeyValueTextInputFormat, TextInputFormat}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.slf4j.LoggerFactory
import org.apache.hadoop.io.{IntWritable, Text}
/**
* Created by sunilpatil on 1/4/16.
*/
object InputOutputFormatDriver {
val logger = Logger(LoggerFactory.getLogger("InputOutputFormatDriver"))
def main (args: Array[String]) {
logger.debug("Entering InputOutputFormatDriver.main")
if (args.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n",
getClass().getSimpleName());
System.exit(-1);
}
val inputFile = args(0)
val outputFile = args(1)
logger.debug(s"Read json from $inputFile and write to $outputFile")
val sparkConf = new SparkConf().setMaster("local[1]").setAppName("InputOutputFormatDriver")
val sparkContext = new SparkContext(sparkConf)
//Configure the program to use Hadoop as input format and read from
val lines = sparkContext.newAPIHadoopFile(inputFile,classOf[KeyValueTextInputFormat], classOf[Text],classOf[Text])
val wordCountRDD = lines.keys.map(lineText => lineText.toString).flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_)
// Configure the output to go to File with Text as key IntWritable as value using MapReduce Output Format
wordCountRDD.saveAsNewAPIHadoopFile(outputFile,classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
logger.debug("Exiting InputOutputFormatDriver.main")
}
}
As you can see most of the code is similar to WordCount program built using Apache Spark in Java , with difference that this is written in scala and following 2 lines When you want to use Hadoop API for reading data you should use sparkContext.newAPIHadoopFile() method, i am using version of the method that takes 4 parameters. First is path of input file, second parameter is the InputFormat class you want to use (I want to read file as Key - Value pair so i am using KeyValueTextInputFormat), then the next parameters is type of Key and Type of value, its Text for both key and value in my example and the last. Spark will read the file into a PairRDD[Text,Text], since i am only interested in the content of the file i am iterating through the keys and converting them from Text to String

val lines = sparkContext.newAPIHadoopFile(inputFile,classOf[KeyValueTextInputFormat], 
classOf[Text],classOf[Text]).keys.map(lineText => lineText.toString)
Once i have RDD[String] i can perform wordcount with it. But once the results are ready i am calling wordCountRDD.saveAsNewAPIHadoopFile() for storing data in Hadoop using TextOutputFormat.

wordCountRDD.saveAsNewAPIHadoopFile(outputFile,classOf[Text],classOf[IntWritable],
classOf[TextOutputFormat[Text,IntWritable]])

1 comment:

Anonymous said...

Thanks for helping me to understand basic Hadoop outputformat concepts. As a beginner in Hadoop your post help me a lot.
Hadoop Training in Velachery | Hadoop Training .
Hadoop Training in Chennai | Hadoop .