Saving complex object in elasticsearch as output of MapReduce program

In the Using ElasticSearch to store output of MapReduce program i built a sample MapReduce program that writes output to the ElasticSearch. I wanted to figure out how to write output of MapReduce job into elasticseach so that it creates complex JSON structure which has embedded object and uses date type. I decided to create this sample program which reads .csv file and generates contact records in elasticsearch, which look like this
{
"firstName": "Sachin",
"lastName": "Tendulkar",
"dateOfBirth": "1973-04-24",
"address": [
{
"addressLine1": "1 Main Street",
"city": "Mumbai",
"country": "India"
}
]
}
view raw contact.json hosted with ❤ by GitHub
. You can download sample code for this project from here I followed these steps for creating the program.
  1. First thing i did was create hadoop/contact index in ElasticSearch by using following mapping. You can create the index by making HTTP POST call to http://localhost:9200/hadoop
    {
    "hadoop": {
    "mappings": {
    "contact": {
    "properties": {
    "address": {
    "properties": {
    "addressLine1": {
    "type": "string"
    },
    "city": {
    "type": "string"
    },
    "country": {
    "type": "string"
    }
    }
    },
    "dateOfBirth": {
    "type": "date",
    "format": "dateOptionalTime"
    },
    "firstName": {
    "type": "string"
    },
    "lastName": {
    "type": "string"
    }
    }
    }
    }
    }
    }
    view raw mapping.json hosted with ❤ by GitHub
  2. Once the index was created i created a Contact.java object which looks like this
    package com.spnotes.eshadoop;
    import java.util.Date;
    import java.util.List;
    import org.codehaus.jackson.map.annotate.JsonSerialize;
    public class Contact {
    private String firstName;
    private String lastName;
    private Date dateOfBirth;
    private List<Address> address;
    public Contact(String firstName, String lastName, Date dateOfBirth,
    List<Address> address) {
    super();
    this.firstName = firstName;
    this.lastName = lastName;
    this.dateOfBirth = dateOfBirth;
    this.address = address;
    }
    public String getFirstName() {
    return firstName;
    }
    public void setFirstName(String firstName) {
    this.firstName = firstName;
    }
    public String getLastName() {
    return lastName;
    }
    public void setLastName(String lastName) {
    this.lastName = lastName;
    }
    @JsonSerialize(using=CustomDateSerializer.class)
    public Date getDateOfBirth() {
    return dateOfBirth;
    }
    public void setDateOfBirth(Date dateOfBirth) {
    this.dateOfBirth = dateOfBirth;
    }
    public List<Address> getAddress() {
    return address;
    }
    public void setAddress(List<Address> address) {
    this.address = address;
    }
    @Override
    public String toString() {
    return "Contact [firstName=" + firstName + ", lastName=" + lastName
    + ", dateOfBirth=" + dateOfBirth + ", address=" + address + "]";
    }
    }
    view raw Contact.java hosted with ❤ by GitHub
    As you can see this is a simple POJO which has list of Address objects which is another pojo. It also uses CustomDateSerializer as custom date serializer
  3. Next step was to create CustomDateSerializer.java which is a class that has both the Mapper and the MapReducer driver class.
    • ContactImportMapper: This is a mapper class that gets one line from Contact.txt at a time and it splits it based on comma and then uses the values to create Contact.java object. In the mapper class on line 67, i am converting the Contact object into JSON using Jackson parser and then setting the JSON as value of Text. Then on line 70, i am writing NullWriter as key and Contact JSON as value, which gets inserted into ES
    • ContactImportDriver: This class is same as any other MapReduce driver with few exceptions, On line 101 i am calling job.getConfiguration().set("es.input.json", "yes");, which tells ElasticSearch output to treat the output as JSON. Then on line 109, i am setting OutputFormatter to EsOutputFormat so that ElasticSearch Hadoop framework can take control of persisting output. job.setOutputFormatClass(EsOutputFormat.class);
    package com.spnotes.eshadoop;
    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Date;
    import java.util.List;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.codehaus.jackson.map.ObjectMapper;
    import org.elasticsearch.hadoop.mr.EsOutputFormat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    public class ContactImportDriver extends Configured implements Tool{
    Logger logger = LoggerFactory.getLogger(ContactImportDriver.class);
    public static class ContactImportMapper extends Mapper<LongWritable, Text, NullWritable, Text>{
    Logger logger = LoggerFactory.getLogger(ContactImportMapper.class);
    private ObjectMapper jsonMapper;
    private SimpleDateFormat dateFormatter =
    new SimpleDateFormat("dd-MMM-yyyy");
    private Text contactText;
    @Override
    protected void map(LongWritable key, Text value,
    Mapper<LongWritable, Text, NullWritable, Text>.Context context)
    throws IOException, InterruptedException {
    logger.debug("Entering ContactImportMapper.map()");
    try {
    String contactContent = value.toString();
    System.out.println("Value of contactContent " + contactContent);
    String[] contactDataList= contactContent.split(",");
    System.out.println("ContactDataList " + Arrays.toString(contactDataList) +" length -> " + contactDataList.length);
    SimpleDateFormat df = new SimpleDateFormat("");
    if(contactDataList.length == 6){
    String firstName = contactDataList[0];
    String lastName = contactDataList[1];
    Date dateOfBirth = dateFormatter.parse(contactDataList[2]);
    String addressLine1 = contactDataList[3];
    String city = contactDataList[4];
    String country = contactDataList[5];
    Address address = new Address(addressLine1, city, country);
    List<Address> addressList = new ArrayList<Address>();
    addressList.add(address);;
    Contact contact = new Contact(firstName, lastName,
    dateOfBirth, addressList);
    String contactJSON = jsonMapper.writeValueAsString(contact);
    contactText.set(contactJSON);
    context.write(NullWritable.get(), contactText);
    }
    } catch (ParseException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    logger.debug("Exiting ContactImportMapper.map()");
    }
    @Override
    protected void setup(
    Mapper<LongWritable, Text, NullWritable, Text>.Context context)
    throws IOException, InterruptedException {
    jsonMapper = new ObjectMapper();
    contactText = new Text();
    }
    }
    public int run(String[] args) throws Exception {
    if (args.length != 2) {
    System.err.printf("Usage: %s [generic options] <input> <output>\n",
    getClass().getSimpleName());
    ToolRunner.printGenericCommandUsage(System.err);
    return -1;
    }
    Job job = new Job();
    job.setJarByClass(ContactImportDriver.class);
    job.setJobName("ContactImporter");
    job.getConfiguration().set("es.input.json", "yes");
    logger.info("Input path " + args[0]);
    logger.info("Oupput path " + args[1]);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    Configuration configuration = job.getConfiguration();
    configuration.set("es.nodes","localhost:9200");
    configuration.set("es.resource",args[1]);
    job.setOutputFormatClass(EsOutputFormat.class);
    job.setNumReduceTasks(0);
    job.setMapOutputKeyClass(NullWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);
    job.setMapperClass(ContactImportMapper.class);
    int returnValue = job.waitForCompletion(true) ? 0:1;
    System.out.println("job.isSuccessful " + job.isSuccessful());
    return returnValue;
    }
    public static void main(final String[] args) throws Exception{
    int exitCode = ToolRunner.run(new ContactImportDriver(), args);
    System.exit(exitCode);
    }
    }
  4. Once the program was built i am passing 2 parameters to program first is the full path of contact.txt file and second argument is the ElasticSearch index and type in hadoop/contact format. This is how the content of contact.txt file looks like
    
    Sachin,Tendulkar,24-Apr-1973,1 Main Street,Mumbai,India 
    Rahul,Dravid,01-Jan-1973,1 Main Street,Bangalore,India
    Anil,Kumble,17-Oct-1970,1 Main Street,Bangalore,India
    Virat,Kohali,05-Nov-1988,1 Main Street,Delhi,India
    Mahendra Singh,Dhoni,07-Jul-1981,1 Main Street,Ranchi,India
    Saurav Ganguli,08-Jul-1982,1 Main Street,Kolkata,India
    
After executing the code i did execute following query to find out all the players, with date of birth between 1970-1974 like this
{
"query": {
"filtered": {
"query": {
"match_all": {}
},
"filter": {
"numeric_range": {
"dateOfBirth": {
"from": "1970-01-01",
"to": "1974-02-01"
}
}
}
}
}
}
view raw query.json hosted with ❤ by GitHub
This is how my sample output looks like

5 comments:

Unknown said...

How can give _id value from the file as a serial no in this Program ?

Unknown said...

But this map reduce program generates duplicate entries. My file has 180 fields and 1 crore records. Can I trust map reduce program to insert data into ElasticSearch ?

Sunil Patil said...

@featherSoft Api: You might want to use a reducer and move code for generating output to reducer. For example mapper read 3 lines
A,B,C
B,D,E
A,B,C

You want unique records based on first column A, so your mapper can publish A as key and (A,B,C) as value your reducer will get (A, (A,B,C), (A,B,C)). Now you can combine all the values for A and then at the end insert one record in ES.

The reason for reducing is it will make sure that all the records with Key equal to A finally get consolidated to single machine

Sunil Patil said...

@Unknown If your document has a particular field that you would like to use as id then you can set that using

conf.set("es.mapping.id", "fieldname");

Ex. In this case i can configure firstName to be the _id for record and make sure that no 2 players have same first name by setting

conf.set("es.mapping.id", "firstName");

Anonymous said...

Hi Sunil
I ran the code of yours downloaded from GitHub....but I am getting this error
"java.lang.StringIndexOutOfBoundsException: String index out of range: -1".....I cant understand why it is coming.....can you please help me with this?