This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"firstName": "Sachin", | |
"lastName": "Tendulkar", | |
"dateOfBirth": "1973-04-24", | |
"address": [ | |
{ | |
"addressLine1": "1 Main Street", | |
"city": "Mumbai", | |
"country": "India" | |
} | |
] | |
} |
-
First thing i did was create
hadoop/contact
index in ElasticSearch by using following mapping. You can create the index by making HTTP POST call tohttp://localhost:9200/hadoop
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters{ "hadoop": { "mappings": { "contact": { "properties": { "address": { "properties": { "addressLine1": { "type": "string" }, "city": { "type": "string" }, "country": { "type": "string" } } }, "dateOfBirth": { "type": "date", "format": "dateOptionalTime" }, "firstName": { "type": "string" }, "lastName": { "type": "string" } } } } } } - Once the index was created i created a Contact.java object which looks like this
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
package com.spnotes.eshadoop; import java.util.Date; import java.util.List; import org.codehaus.jackson.map.annotate.JsonSerialize; public class Contact { private String firstName; private String lastName; private Date dateOfBirth; private List<Address> address; public Contact(String firstName, String lastName, Date dateOfBirth, List<Address> address) { super(); this.firstName = firstName; this.lastName = lastName; this.dateOfBirth = dateOfBirth; this.address = address; } public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } @JsonSerialize(using=CustomDateSerializer.class) public Date getDateOfBirth() { return dateOfBirth; } public void setDateOfBirth(Date dateOfBirth) { this.dateOfBirth = dateOfBirth; } public List<Address> getAddress() { return address; } public void setAddress(List<Address> address) { this.address = address; } @Override public String toString() { return "Contact [firstName=" + firstName + ", lastName=" + lastName + ", dateOfBirth=" + dateOfBirth + ", address=" + address + "]"; } } CustomDateSerializer
as custom date serializer -
Next step was to create CustomDateSerializer.java which is a class that has both the Mapper and the MapReducer driver class.
- ContactImportMapper: This is a mapper class that gets one line from Contact.txt at a time and it splits it based on comma and then uses the values to create Contact.java object. In the mapper class on line 67, i am converting the Contact object into JSON using Jackson parser and then setting the JSON as value of Text. Then on line 70, i am writing
NullWriter
as key and Contact JSON as value, which gets inserted into ES - ContactImportDriver: This class is same as any other MapReduce driver with few exceptions, On line 101 i am calling
job.getConfiguration().set("es.input.json", "yes");
, which tells ElasticSearch output to treat the output as JSON. Then on line 109, i am setting OutputFormatter toEsOutputFormat
so that ElasticSearch Hadoop framework can take control of persisting output.job.setOutputFormatClass(EsOutputFormat.class);
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characterspackage com.spnotes.eshadoop; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.codehaus.jackson.map.ObjectMapper; import org.elasticsearch.hadoop.mr.EsOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ContactImportDriver extends Configured implements Tool{ Logger logger = LoggerFactory.getLogger(ContactImportDriver.class); public static class ContactImportMapper extends Mapper<LongWritable, Text, NullWritable, Text>{ Logger logger = LoggerFactory.getLogger(ContactImportMapper.class); private ObjectMapper jsonMapper; private SimpleDateFormat dateFormatter = new SimpleDateFormat("dd-MMM-yyyy"); private Text contactText; @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException { logger.debug("Entering ContactImportMapper.map()"); try { String contactContent = value.toString(); System.out.println("Value of contactContent " + contactContent); String[] contactDataList= contactContent.split(","); System.out.println("ContactDataList " + Arrays.toString(contactDataList) +" length -> " + contactDataList.length); SimpleDateFormat df = new SimpleDateFormat(""); if(contactDataList.length == 6){ String firstName = contactDataList[0]; String lastName = contactDataList[1]; Date dateOfBirth = dateFormatter.parse(contactDataList[2]); String addressLine1 = contactDataList[3]; String city = contactDataList[4]; String country = contactDataList[5]; Address address = new Address(addressLine1, city, country); List<Address> addressList = new ArrayList<Address>(); addressList.add(address);; Contact contact = new Contact(firstName, lastName, dateOfBirth, addressList); String contactJSON = jsonMapper.writeValueAsString(contact); contactText.set(contactJSON); context.write(NullWritable.get(), contactText); } } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } logger.debug("Exiting ContactImportMapper.map()"); } @Override protected void setup( Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException { jsonMapper = new ObjectMapper(); contactText = new Text(); } } public int run(String[] args) throws Exception { if (args.length != 2) { System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } Job job = new Job(); job.setJarByClass(ContactImportDriver.class); job.setJobName("ContactImporter"); job.getConfiguration().set("es.input.json", "yes"); logger.info("Input path " + args[0]); logger.info("Oupput path " + args[1]); FileInputFormat.addInputPath(job, new Path(args[0])); Configuration configuration = job.getConfiguration(); configuration.set("es.nodes","localhost:9200"); configuration.set("es.resource",args[1]); job.setOutputFormatClass(EsOutputFormat.class); job.setNumReduceTasks(0); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setMapperClass(ContactImportMapper.class); int returnValue = job.waitForCompletion(true) ? 0:1; System.out.println("job.isSuccessful " + job.isSuccessful()); return returnValue; } public static void main(final String[] args) throws Exception{ int exitCode = ToolRunner.run(new ContactImportDriver(), args); System.exit(exitCode); } } - ContactImportMapper: This is a mapper class that gets one line from Contact.txt at a time and it splits it based on comma and then uses the values to create Contact.java object. In the mapper class on line 67, i am converting the Contact object into JSON using Jackson parser and then setting the JSON as value of Text. Then on line 70, i am writing
- Once the program was built i am passing 2 parameters to program first is the full path of contact.txt file and second argument is the ElasticSearch index and type in
hadoop/contact
format. This is how the content of contact.txt file looks likeSachin,Tendulkar,24-Apr-1973,1 Main Street,Mumbai,India Rahul,Dravid,01-Jan-1973,1 Main Street,Bangalore,India Anil,Kumble,17-Oct-1970,1 Main Street,Bangalore,India Virat,Kohali,05-Nov-1988,1 Main Street,Delhi,India Mahendra Singh,Dhoni,07-Jul-1981,1 Main Street,Ranchi,India Saurav Ganguli,08-Jul-1982,1 Main Street,Kolkata,India
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"query": { | |
"filtered": { | |
"query": { | |
"match_all": {} | |
}, | |
"filter": { | |
"numeric_range": { | |
"dateOfBirth": { | |
"from": "1970-01-01", | |
"to": "1974-02-01" | |
} | |
} | |
} | |
} | |
} | |
} |
5 comments:
How can give _id value from the file as a serial no in this Program ?
But this map reduce program generates duplicate entries. My file has 180 fields and 1 crore records. Can I trust map reduce program to insert data into ElasticSearch ?
@featherSoft Api: You might want to use a reducer and move code for generating output to reducer. For example mapper read 3 lines
A,B,C
B,D,E
A,B,C
You want unique records based on first column A, so your mapper can publish A as key and (A,B,C) as value your reducer will get (A, (A,B,C), (A,B,C)). Now you can combine all the values for A and then at the end insert one record in ES.
The reason for reducing is it will make sure that all the records with Key equal to A finally get consolidated to single machine
@Unknown If your document has a particular field that you would like to use as id then you can set that using
conf.set("es.mapping.id", "fieldname");
Ex. In this case i can configure firstName to be the _id for record and make sure that no 2 players have same first name by setting
conf.set("es.mapping.id", "firstName");
Hi Sunil
I ran the code of yours downloaded from GitHub....but I am getting this error
"java.lang.StringIndexOutOfBoundsException: String index out of range: -1".....I cant understand why it is coming.....can you please help me with this?
Post a Comment