{
"lastName":"Tendulkar",
"address":[
{
"country":"India\t",
"addressLine1":"1 Main Street",
"city":"Mumbai"
}
],
"firstName":"Sachin",
"dateOfBirth":"1973-04-24"
}
I wanted to figure out how to use ElasticSearch as input for MapReduce program, so i decided to create a MapReduce program that reads the contact Index and generates output on how many players are coming from a city. You can download the sample program from here
This is how my MapReduce program looks like, you can run the driver program with 2 arguments ex. hadoop/contact file:///home/user/output/
first is name of the ElasticSearch Index/type and second is the output directory where the output will get written.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.spnotes.eshadoop; | |
import java.io.IOException; | |
import java.util.Iterator; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.IntWritable; | |
import org.apache.hadoop.io.MapWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
import org.elasticsearch.hadoop.mr.EsInputFormat; | |
import org.elasticsearch.hadoop.mr.WritableArrayWritable; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class MRInputDriver extends Configured implements Tool{ | |
Logger logger = LoggerFactory.getLogger(MRInputDriver.class); | |
private static class MRInputMapper extends Mapper<Object, Object, Text, IntWritable>{ | |
Logger logger = LoggerFactory.getLogger(MRInputMapper.class); | |
private static final IntWritable ONE = new IntWritable(1); | |
@Override | |
protected void map(Object key, Object value, | |
Mapper<Object, Object, Text, IntWritable>.Context context) | |
throws IOException, InterruptedException { | |
logger.debug("Entering MRInputDriver.map()"); | |
Text documentId = (Text)key; | |
MapWritable valueMap = (MapWritable)value; | |
WritableArrayWritable address =(WritableArrayWritable) valueMap.get(new Text("address")); | |
MapWritable addressMap = (MapWritable)address.get()[0]; | |
Text city = (Text)addressMap.get(new Text("city")); | |
context.write(city, ONE); | |
logger.debug("Exiting MRInputDriver.map()");; | |
} | |
} | |
private static class MRInputReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ | |
Logger logger = LoggerFactory.getLogger(MRInputReducer.class); | |
@Override | |
protected void reduce(Text key, Iterable<IntWritable> values, | |
Reducer<Text, IntWritable, Text, IntWritable>.Context context) | |
throws IOException, InterruptedException { | |
logger.debug("Entering MRInputReducer.reduce()"); | |
int sum = 0; | |
Iterator<IntWritable> valuesIt = values.iterator(); | |
while(valuesIt.hasNext()){ | |
sum = sum + valuesIt.next().get(); | |
} | |
logger.debug(key + " -> " + sum); | |
context.write(key, new IntWritable(sum)); | |
logger.debug("Exiting MRInputReducer.reduce()");; | |
} | |
} | |
public int run(String[] args) throws Exception { | |
logger.debug("Entering MRInputDriver.run()"); | |
if (args.length != 2) { | |
System.err.printf("Usage: %s [generic options] <input> <output>\n", | |
getClass().getSimpleName()); | |
ToolRunner.printGenericCommandUsage(System.err); | |
return -1; | |
} | |
Job job = new Job(); | |
job.setJarByClass(MRInputDriver.class); | |
job.setJobName("ContactImporter"); | |
logger.info("Input path " + args[0]); | |
logger.info("Oupput path " + args[1]); | |
// FileInputFormat.addInputPath(job, new Path(args[0])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
Configuration configuration = job.getConfiguration(); | |
configuration.set("es.nodes","localhost:9200"); | |
configuration.set("es.resource",args[0]); | |
job.setInputFormatClass(EsInputFormat.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(IntWritable.class); | |
job.setMapperClass(MRInputMapper.class); | |
job.setReducerClass(MRInputReducer.class); | |
int returnValue = job.waitForCompletion(true) ? 0:1; | |
System.out.println("job.isSuccessful " + job.isSuccessful()); | |
logger.debug("Exiting MRInputDriver.run()"); | |
return returnValue; | |
} | |
public static void main(String[] args) throws Exception { | |
int exitCode = ToolRunner.run(new MRInputDriver(), args); | |
System.exit(exitCode); | |
} | |
} |
- MRInputDriver: In the Driver program you have to set
es.nodes
entry pointing to address of your elasticsearch installation and value ofes.resource
is name of the ElasticSearch index/type name. Then i am settingjob.setInputFormatClass(EsInputFormat.class);
, which sets EsInputFormat class as the input reader, it takes care of reading the records from ElasticSearch - MRInputMapper: The Mapper class sets
Object
as value of both Key and Value type. ElasticSearch Hadoop framework reads the record from ElasticSearch and passes id as key(Text) and the content of value is object ofMapWritable
class that represents the record stored in elasticsearch. Once i have the value, i am reading address from it and mapper writes City name as key and value 1. - MRInputReducer: The reducer is pretty simple it gets called with name of the city as key and
Iterable
of values, this is very similar to reducer in WordCount.
Bangalore 2
Delhi 1
Mumbai 1
Ranchi 1
7 comments:
Great example.. thanks. The doc is very confusing, this is a nice example.
could you please tell me what to extra needs to set in driver program to connect to elastic search which have VIP configuration and running with https protocol.
VIP has actual hosts running elasticsearch with 9200 port.
Hello,
It was so nice article on MapReduce Program. I was really satisified by seeing this article. Keep Blogging....
Thanks for info....
Website development in Bangalore
Informative power bi training
This information is very useful and attractive. For those who need this information, it's very informative and understandable for those all. In Extern Labs, we have professional Web designers for intuitive website designs. Extern Labs is also a website design company.You can hire a dedicated Web designer to design an interactive website with innovative ideas. Thanks for this information. website design company
Post a Comment