-
First thing i did was create
hadoop/contact
index in ElasticSearch by using following mapping. You can create the index by making HTTP POST call tohttp://localhost:9200/hadoop
- Once the index was created i created a Contact.java object which looks like this
As you can see this is a simple POJO which has list of Address objects which is another pojo. It also uses
CustomDateSerializer
as custom date serializer -
Next step was to create CustomDateSerializer.java which is a class that has both the Mapper and the MapReducer driver class.
- ContactImportMapper: This is a mapper class that gets one line from Contact.txt at a time and it splits it based on comma and then uses the values to create Contact.java object. In the mapper class on line 67, i am converting the Contact object into JSON using Jackson parser and then setting the JSON as value of Text. Then on line 70, i am writing
NullWriter
as key and Contact JSON as value, which gets inserted into ES - ContactImportDriver: This class is same as any other MapReduce driver with few exceptions, On line 101 i am calling
job.getConfiguration().set("es.input.json", "yes");
, which tells ElasticSearch output to treat the output as JSON. Then on line 109, i am setting OutputFormatter toEsOutputFormat
so that ElasticSearch Hadoop framework can take control of persisting output.job.setOutputFormatClass(EsOutputFormat.class);
- ContactImportMapper: This is a mapper class that gets one line from Contact.txt at a time and it splits it based on comma and then uses the values to create Contact.java object. In the mapper class on line 67, i am converting the Contact object into JSON using Jackson parser and then setting the JSON as value of Text. Then on line 70, i am writing
- Once the program was built i am passing 2 parameters to program first is the full path of contact.txt file and second argument is the ElasticSearch index and type in
hadoop/contact
format. This is how the content of contact.txt file looks likeSachin,Tendulkar,24-Apr-1973,1 Main Street,Mumbai,India Rahul,Dravid,01-Jan-1973,1 Main Street,Bangalore,India Anil,Kumble,17-Oct-1970,1 Main Street,Bangalore,India Virat,Kohali,05-Nov-1988,1 Main Street,Delhi,India Mahendra Singh,Dhoni,07-Jul-1981,1 Main Street,Ranchi,India Saurav Ganguli,08-Jul-1982,1 Main Street,Kolkata,India
Saving complex object in elasticsearch as output of MapReduce program
In the Using ElasticSearch to store output of MapReduce program i built a sample MapReduce program that writes output to the ElasticSearch. I wanted to figure out how to write output of MapReduce job into elasticseach so that it creates complex JSON structure which has embedded object and uses date type. I decided to create this sample program which reads .csv file and generates contact records in elasticsearch, which look like this
. You can download sample code for this project from here
I followed these steps for creating the program.
How can give _id value from the file as a serial no in this Program ?
ReplyDeleteBut this map reduce program generates duplicate entries. My file has 180 fields and 1 crore records. Can I trust map reduce program to insert data into ElasticSearch ?
ReplyDelete@featherSoft Api: You might want to use a reducer and move code for generating output to reducer. For example mapper read 3 lines
ReplyDeleteA,B,C
B,D,E
A,B,C
You want unique records based on first column A, so your mapper can publish A as key and (A,B,C) as value your reducer will get (A, (A,B,C), (A,B,C)). Now you can combine all the values for A and then at the end insert one record in ES.
The reason for reducing is it will make sure that all the records with Key equal to A finally get consolidated to single machine
@Unknown If your document has a particular field that you would like to use as id then you can set that using
ReplyDeleteconf.set("es.mapping.id", "fieldname");
Ex. In this case i can configure firstName to be the _id for record and make sure that no 2 players have same first name by setting
conf.set("es.mapping.id", "firstName");
Hi Sunil
ReplyDeleteI ran the code of yours downloaded from GitHub....but I am getting this error
"java.lang.StringIndexOutOfBoundsException: String index out of range: -1".....I cant understand why it is coming.....can you please help me with this?