-
First thing that i did was to create WordCountCombiner.java class that looks same as that of
WordCountReducer
, but i did add oneSystem.Out.println()
in it so that i would know when my combiner is called instead of reducer.This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characterspackage com.spnotes.hadoop; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{ Logger logger = LoggerFactory.getLogger(WordCountCombiner.class); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { System.out.println("Entering WordCountCombiner.reduce() " ); int sum = 0; Iterator<IntWritable> valuesIt = values.iterator(); while(valuesIt.hasNext()){ sum = sum + valuesIt.next().get(); } logger.debug(key + " -> " + sum); context.write(key, new IntWritable(sum)); logger.debug("Exiting WordCountCombiner.reduce()"); } } -
Then i changed the driver class for my MapReduce framework class to add
job.setCombinerClass(WordCountCombiner.class);
line in it.This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characterspackage com.spnotes.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class WordCountDriver extends Configured implements Tool{ Logger logger = LoggerFactory.getLogger(WordCountDriver.class); public static void main(String[] args) throws Exception{ int exitCode = ToolRunner.run(new WordCountDriver(), args); System.exit(exitCode); } public int run(String[] args) throws Exception { if (args.length != 2) { System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } Job job = new org.apache.hadoop.mapreduce.Job(); Configuration conf= job.getConfiguration(); job.setJarByClass(WordCountDriver.class); job.setJobName("WordCounter"); logger.info("Input path " + args[0]); logger.info("Oupput path " + args[1]); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setCombinerClass(WordCountCombiner.class); int returnValue = job.waitForCompletion(true) ? 0:1; System.out.println("job.isSuccessful " + job.isSuccessful()); return returnValue; } }
How to create custom Combiner class with your MapReduce framework
The MapReduce framework passes control to your combiner class at the end of the map phase to combine different output files generated by Mappers, so that your combiner class combines/reduce the data generated by Mappers before it gets transferred to the Reducers. Sending data from Mapper to reducer requires that data to go over network from Mapper to Reducer.
I wanted to try creating custom combiner class, In order to keep things simple i decided to add combiner class in WordCount(HelloWorld) MapReduce program . Basically my combiner class does same thing as reducer, which is to take multiple [word, 1] tuples and combine them into something like [word1, 5], [word2, 6],,, etc. I followed these steps
Subscribe to:
Post Comments (Atom)
3 comments:
Thanks for the great explanation! Running MapReduce framework for massive data processing on a cluster of commodity hardware requires enormous resource, especially high CPU and memory occupation. To enhance the commodity hardware performance without physical update and topology change, the highly parallel and dynamically configurable FPGA can be dedicated to provide feasible supplements in computation running as coprocessor to CPU. More at www.youtube.com/watch?v=1jMR4cHBwZE
Thanks for info....
Website development in Bangalore
Very nice post,thank you for sharing this awesome blog with us.
keep sharing more...
Big data hadoop certification
Big data and hadoop online training
Post a Comment