In the Using ElasticSearch for storing ouput of Pig Script , i built a sample for storing output of Pig Script into ElasticSearch. I wanted to try out the reverse, in which i wanted to use Index/Search Result in elastic search as input into Pig Script, so i built this sample
After that create a pig script like this,
In this script first 2 lines are used to make the ElasticSearch Hadoop related jars available to Pig. Then the DEFINE statement is creating alias for org.elasticsearch.hadoop.pig.EsStorage and giving it a simple/user friendly name of ES. Then the 4th line is telling Pig to load the content of pig/cricket index on local elastic search into variable A. The last line is used for dumping content of variable A.
REGISTER /user/root/elasticsearch-hadoop-2.0.0.RC1/dist/elasticsearch-hadoop-2.0.0.RC1.jar
REGISTER /user/root/elasticsearch-hadoop-2.0.0.RC1/dist/elasticsearch-hadoop-pig-2.0.0.RC1.jar
DEFINE ES org.elasticsearch.hadoop.pig.EsStorage;
A = LOAD 'pig/cricket' USING ES;
DUMP A;
After i executed the script i could see the output like this
Note: Before i got it to work i was using v = LOAD 'pig/cricket' USING org.elasticsearch.pig.EsStorage command to load the content of ES and it kept throwing the following error. I realized that i was using the wrong package name
grunt> v = LOAD 'pig/cricket' USING org.elasticsearch.pig.EsStorage;
2014-05-14 15:56:48,873 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not resolve org.elasticsearch.pig.EsStorage using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
Details at logfile: /root/pig_1400106825043.log
I wanted to learn how to use ElasticSearch for storing output of Pig Script. So i did create this simple text file that has names of cricket players and their role in the team and email id. Then i used Pig script for simply loading the text file into Elastic Search. I used following steps
First i did create cricket.txt file that contains the crickets information like this
After that i did download the ElasticSearch Hadoop zip and i did expand it on my local. After that i decided to upload the whole elasticsearch-hadoop-2.0.0.RC1 directory to HDFS so that it is available from all the clusters
Then i did create this cricketes.pig script which registers the ElasticSearch related jar files into pig as first step then, it loads the content of cricket.txt file into cricket variable and then stores that content into pig/cricket index on local host
/*
Register the elasticsearch hadoop related jar files
*/
REGISTER /user/root/elasticsearch-hadoop-2.0.0.RC1/dist/elasticsearch-hadoop-2.0.0.RC1.jar
REGISTER /user/root/elasticsearch-hadoop-2.0.0.RC1/dist/elasticsearch-hadoop-pig-2.0.0.RC1.jar
-- Load the content of /user/root/cricket.txt into Pig
cricket = LOAD '/user/root/cricket.txt' AS( fname:chararray, lname:chararray, skill: chararray, email: chararray);
DUMP cricket;
-- Store the content of cricket variable into instance of elastic search on local server, into pig/crciket index
STORE cricket into 'pig/cricket' USING org.elasticsearch.hadoop.pig.EsStorage;
After loading the pig script i did verify the content of the pig/cricket index on ES and i could see the content of text file like this
ElasticSearch has this feature in which you can configure Hive table that actually points to index in ElasticSearch. I wanted to learn how to use this feature so i followed these steps
First i did create contact/contact index and type in ElasticSearch and i did insert 4 records in it like this
Next i did download ElasticSearch Hadoop zip file on my Hadoop VM by executing following command
I did expand the elasticsearch-hadoop-2.0.0.RC1.zip in the /root directory
Next i had to start the hive console by executing following command, take a look at how i had to add elasticsearch-hadoop-2.0.0.RC1.jar to the aux.jars.path
hive -hiveconf hive.aux.jars.path=/root/elasticsearch-hadoop-2.0.0.RC1/dist/elasticsearch-hadoop-2.0.0.RC1.jar
Next i did define artists table in hive that points to contact index in the elasticsearch server like this
I wanted to use use ElasticSearch for storing the output of MapReduce program. So i modified the WordCount(HelloWorld) MapReduce program
so that it stores output in ElasticSearch instead of Text File. You can download the complete project from here
First change the maven build script to declare dependency on elasticsearch-hadoop-mr like this, I had to try out few combination before this worked (Watch out for jackson mapper version mismatch)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Next change your MapReduce Driver class, to use EsOutputFormat as output format. You will have to set value of es.nodes property to set the host and port of elastic search server that you want to use for storing output. THe value of es.resource points to the Index and type name of elastic search where output should be stored. In my case ElasticSearch is running on local machine.
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n",
getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Job job = new Job();
job.setJarByClass(WordCount.class);
job.setJobName("WordCounter");
logger.info("Input path " + args[0]);
logger.info("Oupput path " + args[1]);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//Configuration for using ElasticSearch as OutputFormat
Configuration configuration = job.getConfiguration();
configuration.set("es.nodes","localhost:9200");
configuration.set("es.resource","hadoop/wordcount2");
job.setOutputFormatClass(EsOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MapWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
int returnValue = job.waitForCompletion(true) ? 0:1;
System.out.println("job.isSuccessful " + job.isSuccessful());
return returnValue;
}
I had to start ElasticSearch 1.1 server on my local machine as last step before starting MapReduce program
After running the program when i search wordcount2 index i found results like this
In the WordCount(HelloWorld) MapReduce program blog i talked about how to create simple WordCount MapReduce program. Then in the WebHDFS REST API entry i blogged about how to configure WebHDFS end point for your hadoop installation. I wanted to combine both those things so that my MapReduce program reads input using WebHDFS and writes output back to HDFS using WebHDFS.
First i changed the program arguments to use webhdfs URL for both input and output to MapReduce program.
hadoop jar WordCount.jar webhdfs://172.16.225.192:50070/test/startupdemo.txt webhdfs://172.16.225.192:50070/output
When i tried to run this program i got org.apache.hadoop.security.AccessControlException exception, so in this case it was taking the login user name on my machine (I run hadoop on vm and my Eclipse IDE with MapReduce on my machine directly) and using it to to run MapReduce program. Since the HDFS system does not allow user sunil to create any files in HDFS.
14/05/09 10:10:18 WARN mapred.LocalJobRunner: job_local_0001
org.apache.hadoop.security.AccessControlException: Permission denied: user=gpzpati, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:90)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:57)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:280)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.run(WebHdfsFileSystem.java:427)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.mkdirs(WebHdfsFileSystem.java:469)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1731)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:82)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:236)
Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=gpzpati, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x
at org.apache.hadoop.hdfs.web.JsonUtil.toRemoteException(JsonUtil.java:167)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:279)
... 5 more
So i changed the main method of my program to wrap it in UserGroupInformation.doAs() call. In that call i am overriding name of the user used for running MapReduce to hdfs. Now it works ok.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Note: In order for this program to work your NameNode and DataNodes should have valid name (network recognizable). Because when you run the MapReduce first it makes OPEN request to the webhdfs://172.16.225.192:50070/test/startupdemo.txt URL, the NameNode will send a redirect response with URL pointing to data node like this
Now if your Datanode hostname which is ubuntu in my case is not directly addressable then it will fail with error like this, you can fix this issue by mapping name ubuntu to the right ip in your /etc/host file.
14/05/09 10:16:34 WARN mapred.LocalJobRunner: job_local_0001
java.net.UnknownHostException: ubuntu
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:223)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:431)
at java.net.Socket.connect(Socket.java:527)
at java.net.Socket.connect(Socket.java:476)
at sun.net.NetworkClient.doConnect(NetworkClient.java:163)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:424)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:538)
at sun.net.www.http.HttpClient.(HttpClient.java:214)
at sun.net.www.http.HttpClient.New(HttpClient.java:300)
at sun.net.www.http.HttpClient.New(HttpClient.java:319)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:987)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:923)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:841)
at sun.net.www.protocol.http.HttpURLConnection.followRedirect(HttpURLConnection.java:2156)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1390)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:379)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:264)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.access$000(WebHdfsFileSystem.java:106)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$OffsetUrlInputStream.checkResponseCode(WebHdfsFileSystem.java:688)
at org.apache.hadoop.hdfs.ByteRangeInputStream.openInputStream(ByteRangeInputStream.java:121)
at org.apache.hadoop.hdfs.ByteRangeInputStream.getInputStream(ByteRangeInputStream.java:103)
at org.apache.hadoop.hdfs.ByteRangeInputStream.read(ByteRangeInputStream.java:158)
at java.io.DataInputStream.read(DataInputStream.java:83)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:114)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:458)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:76)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:85)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:139)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:645)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:263)
14/05/09 10:16:35 INFO mapred.JobClient: map 0% reduce 0%
14/05/09 10:16:35 INFO mapred.JobClient: Job complete: job_local_0001
14/05/09 10:16:35 INFO mapred.JobClient: Counters: 0
job.isSuccessful false
Hadoop provides HTTP REST API interface that exposes access to HDFS using REST API. The WebHDFS provides ability to read and write files in HDFS and also provides support for all operations. It also provides security using Kerberos (SPNEGO) and Hadoop delegation tokens for authentication. You can find more information about it here
I wanted to try WebHDFS API, so i followed these steps, first i changed the hdfs-site.xml file and changed it to set value of dfs.webhdfs.enabled property to true.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters