Using ElasticSearch to store output of MapReduce program

I wanted to use use ElasticSearch for storing the output of MapReduce program. So i modified the WordCount(HelloWorld) MapReduce program so that it stores output in ElasticSearch instead of Text File. You can download the complete project from here
  1. First change the maven build script to declare dependency on elasticsearch-hadoop-mr like this, I had to try out few combination before this worked (Watch out for jackson mapper version mismatch)
  2. Next change your MapReduce Driver class, to use EsOutputFormat as output format. You will have to set value of es.nodes property to set the host and port of elastic search server that you want to use for storing output. THe value of es.resource points to the Index and type name of elastic search where output should be stored. In my case ElasticSearch is running on local machine.
    
    public int run(String[] args) throws Exception {
            if (args.length != 2) {
                System.err.printf("Usage: %s [generic options] <input> <output>\n",
                        getClass().getSimpleName());
                ToolRunner.printGenericCommandUsage(System.err);
                return -1;
            }
    
            Job job = new Job();
    
            job.setJarByClass(WordCount.class);
            job.setJobName("WordCounter");
            logger.info("Input path " + args[0]);
            logger.info("Oupput path " + args[1]);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            //Configuration for using ElasticSearch as OutputFormat
            Configuration configuration = job.getConfiguration();
            configuration.set("es.nodes","localhost:9200");
            configuration.set("es.resource","hadoop/wordcount2");
            job.setOutputFormatClass(EsOutputFormat.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(MapWritable.class); 
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
    
            int returnValue = job.waitForCompletion(true) ? 0:1;
            System.out.println("job.isSuccessful " + job.isSuccessful());
            return returnValue;
        }
    
  3. I had to start ElasticSearch 1.1 server on my local machine as last step before starting MapReduce program
  4. After running the program when i search wordcount2 index i found results like this

1 comment:

  1. Thank you for sharing article with us. It is reaaly awesome. Keep posting such information.
    Custom website design Phoenix

    ReplyDelete