Saving complex object in elasticsearch as output of MapReduce program

In the Using ElasticSearch to store output of MapReduce program i built a sample MapReduce program that writes output to the ElasticSearch. I wanted to figure out how to write output of MapReduce job into elasticseach so that it creates complex JSON structure which has embedded object and uses date type. I decided to create this sample program which reads .csv file and generates contact records in elasticsearch, which look like this . You can download sample code for this project from here I followed these steps for creating the program.
  1. First thing i did was create hadoop/contact index in ElasticSearch by using following mapping. You can create the index by making HTTP POST call to http://localhost:9200/hadoop
  2. Once the index was created i created a Contact.java object which looks like this As you can see this is a simple POJO which has list of Address objects which is another pojo. It also uses CustomDateSerializer as custom date serializer
  3. Next step was to create CustomDateSerializer.java which is a class that has both the Mapper and the MapReducer driver class.
    • ContactImportMapper: This is a mapper class that gets one line from Contact.txt at a time and it splits it based on comma and then uses the values to create Contact.java object. In the mapper class on line 67, i am converting the Contact object into JSON using Jackson parser and then setting the JSON as value of Text. Then on line 70, i am writing NullWriter as key and Contact JSON as value, which gets inserted into ES
    • ContactImportDriver: This class is same as any other MapReduce driver with few exceptions, On line 101 i am calling job.getConfiguration().set("es.input.json", "yes");, which tells ElasticSearch output to treat the output as JSON. Then on line 109, i am setting OutputFormatter to EsOutputFormat so that ElasticSearch Hadoop framework can take control of persisting output. job.setOutputFormatClass(EsOutputFormat.class);
  4. Once the program was built i am passing 2 parameters to program first is the full path of contact.txt file and second argument is the ElasticSearch index and type in hadoop/contact format. This is how the content of contact.txt file looks like
    
    Sachin,Tendulkar,24-Apr-1973,1 Main Street,Mumbai,India 
    Rahul,Dravid,01-Jan-1973,1 Main Street,Bangalore,India
    Anil,Kumble,17-Oct-1970,1 Main Street,Bangalore,India
    Virat,Kohali,05-Nov-1988,1 Main Street,Delhi,India
    Mahendra Singh,Dhoni,07-Jul-1981,1 Main Street,Ranchi,India
    Saurav Ganguli,08-Jul-1982,1 Main Street,Kolkata,India
    
After executing the code i did execute following query to find out all the players, with date of birth between 1970-1974 like this This is how my sample output looks like

6 comments:

Unknown said...

How can give _id value from the file as a serial no in this Program ?

featherSoft Api said...

But this map reduce program generates duplicate entries. My file has 180 fields and 1 crore records. Can I trust map reduce program to insert data into ElasticSearch ?

Sunil Patil said...

@featherSoft Api: You might want to use a reducer and move code for generating output to reducer. For example mapper read 3 lines
A,B,C
B,D,E
A,B,C

You want unique records based on first column A, so your mapper can publish A as key and (A,B,C) as value your reducer will get (A, (A,B,C), (A,B,C)). Now you can combine all the values for A and then at the end insert one record in ES.

The reason for reducing is it will make sure that all the records with Key equal to A finally get consolidated to single machine

Sunil Patil said...

@Unknown If your document has a particular field that you would like to use as id then you can set that using

conf.set("es.mapping.id", "fieldname");

Ex. In this case i can configure firstName to be the _id for record and make sure that no 2 players have same first name by setting

conf.set("es.mapping.id", "firstName");

Anonymous said...

Hi Sunil
I ran the code of yours downloaded from GitHub....but I am getting this error
"java.lang.StringIndexOutOfBoundsException: String index out of range: -1".....I cant understand why it is coming.....can you please help me with this?

EG MEDI said...

Egmedi.com is online medical store pharmacy in laxmi nagar Delhi. You can Order prescription/OTC medicines online. Cash on Delivery available. Free Home Delivery


Online Pharmacy in Delhi
Buy Online medicine in Delhi
Online Pharmacy in laxmi nagar
Buy Online medicine in laxmi nagar
Onine Medical Store in Delhi
Online Medical store in laxmi nagar
Online medicine store in delhi
online medicine store in laxmi nagar
Purchase Medicine Online
Online Pharmacy India
Online Medical Store