InputFormat
and OutputFormats
for reading from and writing to.
I wanted to try this i built the InputFormatOutputDriver class, that uses TextInputFormat for reading a file. Then uses that input to perform word count and finally uses TextOutputFormat for storing output
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.spnotes.spark | |
import com.typesafe.scalalogging.Logger | |
import org.apache.hadoop.mapreduce.lib.input.{KeyValueTextInputFormat, TextInputFormat} | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.{SparkContext, SparkConf} | |
import org.slf4j.LoggerFactory | |
import org.apache.hadoop.io.{IntWritable, Text} | |
/** | |
* Created by sunilpatil on 1/4/16. | |
*/ | |
object InputOutputFormatDriver { | |
val logger = Logger(LoggerFactory.getLogger("InputOutputFormatDriver")) | |
def main (args: Array[String]) { | |
logger.debug("Entering InputOutputFormatDriver.main") | |
if (args.length != 2) { | |
System.err.printf("Usage: %s [generic options] <input> <output>\n", | |
getClass().getSimpleName()); | |
System.exit(-1); | |
} | |
val inputFile = args(0) | |
val outputFile = args(1) | |
logger.debug(s"Read json from $inputFile and write to $outputFile") | |
val sparkConf = new SparkConf().setMaster("local[1]").setAppName("InputOutputFormatDriver") | |
val sparkContext = new SparkContext(sparkConf) | |
//Configure the program to use Hadoop as input format and read from | |
val lines = sparkContext.newAPIHadoopFile(inputFile,classOf[KeyValueTextInputFormat], classOf[Text],classOf[Text]) | |
val wordCountRDD = lines.keys.map(lineText => lineText.toString).flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_) | |
// Configure the output to go to File with Text as key IntWritable as value using MapReduce Output Format | |
wordCountRDD.saveAsNewAPIHadoopFile(outputFile,classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]]) | |
logger.debug("Exiting InputOutputFormatDriver.main") | |
} | |
} |
sparkContext.newAPIHadoopFile()
method, i am using version of the method that takes 4 parameters. First is path of input file, second parameter is the InputFormat
class you want to use (I want to read file as Key - Value pair so i am using KeyValueTextInputFormat
), then the next parameters is type of Key and Type of value, its Text
for both key and value in my example and the last. Spark will read the file into a PairRDD[Text,Text], since i am only interested in the content of the file i am iterating through the keys and converting them from Text to String
val lines = sparkContext.newAPIHadoopFile(inputFile,classOf[KeyValueTextInputFormat],
classOf[Text],classOf[Text]).keys.map(lineText => lineText.toString)
Once i have RDD[String] i can perform wordcount with it. But once the results are ready i am calling wordCountRDD.saveAsNewAPIHadoopFile()
for storing data in Hadoop using TextOutputFormat
.
wordCountRDD.saveAsNewAPIHadoopFile(outputFile,classOf[Text],classOf[IntWritable],
classOf[TextOutputFormat[Text,IntWritable]])
1 comment:
Thanks for helping me to understand basic Hadoop outputformat concepts. As a beginner in Hadoop your post help me a lot.
Hadoop Training in Velachery | Hadoop Training .
Hadoop Training in Chennai | Hadoop .
Post a Comment