Saving complex object in elasticsearch as output of MapReduce program

In the Using ElasticSearch to store output of MapReduce program i built a sample MapReduce program that writes output to the ElasticSearch. I wanted to figure out how to write output of MapReduce job into elasticseach so that it creates complex JSON structure which has embedded object and uses date type. I decided to create this sample program which reads .csv file and generates contact records in elasticsearch, which look like this . You can download sample code for this project from here I followed these steps for creating the program.
  1. First thing i did was create hadoop/contact index in ElasticSearch by using following mapping. You can create the index by making HTTP POST call to http://localhost:9200/hadoop
  2. Once the index was created i created a Contact.java object which looks like this As you can see this is a simple POJO which has list of Address objects which is another pojo. It also uses CustomDateSerializer as custom date serializer
  3. Next step was to create CustomDateSerializer.java which is a class that has both the Mapper and the MapReducer driver class.
    • ContactImportMapper: This is a mapper class that gets one line from Contact.txt at a time and it splits it based on comma and then uses the values to create Contact.java object. In the mapper class on line 67, i am converting the Contact object into JSON using Jackson parser and then setting the JSON as value of Text. Then on line 70, i am writing NullWriter as key and Contact JSON as value, which gets inserted into ES
    • ContactImportDriver: This class is same as any other MapReduce driver with few exceptions, On line 101 i am calling job.getConfiguration().set("es.input.json", "yes");, which tells ElasticSearch output to treat the output as JSON. Then on line 109, i am setting OutputFormatter to EsOutputFormat so that ElasticSearch Hadoop framework can take control of persisting output. job.setOutputFormatClass(EsOutputFormat.class);
  4. Once the program was built i am passing 2 parameters to program first is the full path of contact.txt file and second argument is the ElasticSearch index and type in hadoop/contact format. This is how the content of contact.txt file looks like
    
    Sachin,Tendulkar,24-Apr-1973,1 Main Street,Mumbai,India 
    Rahul,Dravid,01-Jan-1973,1 Main Street,Bangalore,India
    Anil,Kumble,17-Oct-1970,1 Main Street,Bangalore,India
    Virat,Kohali,05-Nov-1988,1 Main Street,Delhi,India
    Mahendra Singh,Dhoni,07-Jul-1981,1 Main Street,Ranchi,India
    Saurav Ganguli,08-Jul-1982,1 Main Street,Kolkata,India
    
After executing the code i did execute following query to find out all the players, with date of birth between 1970-1974 like this This is how my sample output looks like

5 comments:

  1. How can give _id value from the file as a serial no in this Program ?

    ReplyDelete
  2. But this map reduce program generates duplicate entries. My file has 180 fields and 1 crore records. Can I trust map reduce program to insert data into ElasticSearch ?

    ReplyDelete
  3. @featherSoft Api: You might want to use a reducer and move code for generating output to reducer. For example mapper read 3 lines
    A,B,C
    B,D,E
    A,B,C

    You want unique records based on first column A, so your mapper can publish A as key and (A,B,C) as value your reducer will get (A, (A,B,C), (A,B,C)). Now you can combine all the values for A and then at the end insert one record in ES.

    The reason for reducing is it will make sure that all the records with Key equal to A finally get consolidated to single machine

    ReplyDelete
  4. @Unknown If your document has a particular field that you would like to use as id then you can set that using

    conf.set("es.mapping.id", "fieldname");

    Ex. In this case i can configure firstName to be the _id for record and make sure that no 2 players have same first name by setting

    conf.set("es.mapping.id", "firstName");

    ReplyDelete
  5. Hi Sunil
    I ran the code of yours downloaded from GitHub....but I am getting this error
    "java.lang.StringIndexOutOfBoundsException: String index out of range: -1".....I cant understand why it is coming.....can you please help me with this?

    ReplyDelete