Reading content of ElasticSearch index into Pig Script

In the Using ElasticSearch for storing ouput of Pig Script , i built a sample for storing output of Pig Script into ElasticSearch. I wanted to try out the reverse, in which i wanted to use Index/Search Result in elastic search as input into Pig Script, so i built this sample
  1. First follow step 3 in the Using ElasticSearch for storing ouput of Pig Script to download and upload the ElasticSearch Hadoop jars into HDFS store.
  2. After that create a pig script like this, In this script first 2 lines are used to make the ElasticSearch Hadoop related jars available to Pig. Then the DEFINE statement is creating alias for org.elasticsearch.hadoop.pig.EsStorage and giving it a simple/user friendly name of ES. Then the 4th line is telling Pig to load the content of pig/cricket index on local elastic search into variable A. The last line is used for dumping content of variable A.
    
    REGISTER /user/root/elasticsearch-hadoop-2.0.0.RC1/dist/elasticsearch-hadoop-2.0.0.RC1.jar
    REGISTER /user/root/elasticsearch-hadoop-2.0.0.RC1/dist/elasticsearch-hadoop-pig-2.0.0.RC1.jar
    
    DEFINE ES org.elasticsearch.hadoop.pig.EsStorage;
    A = LOAD 'pig/cricket' USING ES;
    DUMP A;
    
After i executed the script i could see the output like this
Note: Before i got it to work i was using v = LOAD 'pig/cricket' USING org.elasticsearch.pig.EsStorage command to load the content of ES and it kept throwing the following error. I realized that i was using the wrong package name

grunt> v = LOAD 'pig/cricket' USING org.elasticsearch.pig.EsStorage;
2014-05-14 15:56:48,873 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not resolve org.elasticsearch.pig.EsStorage using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
Details at logfile: /root/pig_1400106825043.log

Using ElasticSearch for storing ouput of Pig Script

I wanted to learn how to use ElasticSearch for storing output of Pig Script. So i did create this simple text file that has names of cricket players and their role in the team and email id. Then i used Pig script for simply loading the text file into Elastic Search. I used following steps
  1. First i did create cricket.txt file that contains the crickets information like this
    
    Virat Kohli batsman virat@bcci.com
    MahendraSingh Dhoni batsman mahendra@bcci.com
    Shikhar Dhawan batsman shikhar@bcci.com
    
  2. The next step was to upload the cicket.txt file to HDFS /user/root directory
    
    hdfs dfs -copyFromLocal cricket.txt /user/root/cricket.txt
    
  3. After that i did download the ElasticSearch Hadoop zip and i did expand it on my local. After that i decided to upload the whole elasticsearch-hadoop-2.0.0.RC1 directory to HDFS so that it is available from all the clusters
    
    dfs dfs -copyFromLocal elasticsearch-hadoop-2.0.0.RC1/ /user/root/
    
  4. Then i did create this cricketes.pig script which registers the ElasticSearch related jar files into pig as first step then, it loads the content of cricket.txt file into cricket variable and then stores that content into pig/cricket index on local host
    
    
    /*
    Register the elasticsearch hadoop related jar files
    */
    
    REGISTER /user/root/elasticsearch-hadoop-2.0.0.RC1/dist/elasticsearch-hadoop-2.0.0.RC1.jar
    REGISTER /user/root/elasticsearch-hadoop-2.0.0.RC1/dist/elasticsearch-hadoop-pig-2.0.0.RC1.jar
    
    -- Load the content of /user/root/cricket.txt into Pig
    cricket = LOAD '/user/root/cricket.txt' AS( fname:chararray, lname:chararray, skill: chararray, email: chararray);
    DUMP cricket;
    -- Store the content of cricket variable into instance of elastic search on local server, into pig/crciket index
    STORE cricket into 'pig/cricket' USING org.elasticsearch.hadoop.pig.EsStorage;
    
After loading the pig script i did verify the content of the pig/cricket index on ES and i could see the content of text file like this

Using elasticsearch as external data store with apache hive

ElasticSearch has this feature in which you can configure Hive table that actually points to index in ElasticSearch. I wanted to learn how to use this feature so i followed these steps
  1. First i did create contact/contact index and type in ElasticSearch and i did insert 4 records in it like this
  2. Next i did download ElasticSearch Hadoop zip file on my Hadoop VM by executing following command
    
    wget http://download.elasticsearch.org/hadoop/elasticsearch-hadoop-2.0.0.RC1.zip
    
    I did expand the elasticsearch-hadoop-2.0.0.RC1.zip in the /root directory
  3. Next i had to start the hive console by executing following command, take a look at how i had to add elasticsearch-hadoop-2.0.0.RC1.jar to the aux.jars.path hive -hiveconf hive.aux.jars.path=/root/elasticsearch-hadoop-2.0.0.RC1/dist/elasticsearch-hadoop-2.0.0.RC1.jar
  4. Next i did define artists table in hive that points to contact index in the elasticsearch server like this
    
    CREATE EXTERNAL TABLE artists (
    fname STRING,
    lname STRING,
    email STRING)
    STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
    TBLPROPERTIES('es.resource' = 'contact/contact',
                  'es.index.auto.create' = 'false') ;
    
  5. Once the table is configured i could query it like any normal Hive table like this

Using ElasticSearch to store output of MapReduce program

I wanted to use use ElasticSearch for storing the output of MapReduce program. So i modified the WordCount(HelloWorld) MapReduce program so that it stores output in ElasticSearch instead of Text File. You can download the complete project from here
  1. First change the maven build script to declare dependency on elasticsearch-hadoop-mr like this, I had to try out few combination before this worked (Watch out for jackson mapper version mismatch)
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.spnotes.hadoop.es</groupId>
    <artifactId>MRElasticSearch</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <hadoop.version>2.0.0-cdh4.0.0</hadoop.version>
    <scp.user>cloudera</scp.user>
    <scp.password>cloudera</scp.password>
    <scp.host>172.16.225.176</scp.host>
    <scp.dirCopyTo>/home/cloudera/test</scp.dirCopyTo>
    </properties>
    <dependencies>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce</artifactId>
    <version>2.4.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client</artifactId>
    <version>2.4.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.4.0</version>
    </dependency>
    <dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-hadoop-mr</artifactId>
    <version>2.0.0.RC1</version>
    </dependency>
    </dependencies>
    <repositories>
    <repository>
    <id>Apache Repository</id>
    <url>https://repository.apache.org/content/repositories</url>
    </repository>
    <repository>
    <id>cloudera</id>
    <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
    </repository>
    </repositories>
    </project>
    view raw pom.xml hosted with ❤ by GitHub
  2. Next change your MapReduce Driver class, to use EsOutputFormat as output format. You will have to set value of es.nodes property to set the host and port of elastic search server that you want to use for storing output. THe value of es.resource points to the Index and type name of elastic search where output should be stored. In my case ElasticSearch is running on local machine.
    
    public int run(String[] args) throws Exception {
            if (args.length != 2) {
                System.err.printf("Usage: %s [generic options] <input> <output>\n",
                        getClass().getSimpleName());
                ToolRunner.printGenericCommandUsage(System.err);
                return -1;
            }
    
            Job job = new Job();
    
            job.setJarByClass(WordCount.class);
            job.setJobName("WordCounter");
            logger.info("Input path " + args[0]);
            logger.info("Oupput path " + args[1]);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            //Configuration for using ElasticSearch as OutputFormat
            Configuration configuration = job.getConfiguration();
            configuration.set("es.nodes","localhost:9200");
            configuration.set("es.resource","hadoop/wordcount2");
            job.setOutputFormatClass(EsOutputFormat.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(MapWritable.class); 
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
    
            int returnValue = job.waitForCompletion(true) ? 0:1;
            System.out.println("job.isSuccessful " + job.isSuccessful());
            return returnValue;
        }
    
  3. I had to start ElasticSearch 1.1 server on my local machine as last step before starting MapReduce program
  4. After running the program when i search wordcount2 index i found results like this

Using WebHDFS as input and output for MapReduce program

In the WordCount(HelloWorld) MapReduce program blog i talked about how to create simple WordCount MapReduce program. Then in the WebHDFS REST API entry i blogged about how to configure WebHDFS end point for your hadoop installation. I wanted to combine both those things so that my MapReduce program reads input using WebHDFS and writes output back to HDFS using WebHDFS. First i changed the program arguments to use webhdfs URL for both input and output to MapReduce program.

hadoop jar WordCount.jar webhdfs://172.16.225.192:50070/test/startupdemo.txt webhdfs://172.16.225.192:50070/output
When i tried to run this program i got org.apache.hadoop.security.AccessControlException exception, so in this case it was taking the login user name on my machine (I run hadoop on vm and my Eclipse IDE with MapReduce on my machine directly) and using it to to run MapReduce program. Since the HDFS system does not allow user sunil to create any files in HDFS.

14/05/09 10:10:18 WARN mapred.LocalJobRunner: job_local_0001
org.apache.hadoop.security.AccessControlException: Permission denied: user=gpzpati, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
 at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
 at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:90)
 at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:57)
 at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:280)
 at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.run(WebHdfsFileSystem.java:427)
 at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.mkdirs(WebHdfsFileSystem.java:469)
 at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1731)
 at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:82)
 at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:236)
Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=gpzpati, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x
 at org.apache.hadoop.hdfs.web.JsonUtil.toRemoteException(JsonUtil.java:167)
 at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:279)
 ... 5 more
So i changed the main method of my program to wrap it in UserGroupInformation.doAs() call. In that call i am overriding name of the user used for running MapReduce to hdfs. Now it works ok.
package com.spnotes.hadoop;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WordCountDriver extends Configured implements Tool{
Logger logger = LoggerFactory.getLogger(WordCountDriver.class);
public static void main(final String[] args) throws Exception{
System.setProperty("HADOOP_USER_NAME", "hdfs");
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs");
ugi.doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
Configuration configuration = new Configuration();
configuration.set("hadoop.job.ugi", "hdfs");
ToolRunner.run(new WordCountDriver(), args);
return null;
}
});
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n",
getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Job job = new org.apache.hadoop.mapreduce.Job();
job.setJarByClass(WordCountDriver.class);
job.setJobName("WordCounter");
logger.info("Input path " + args[0]);
logger.info("Oupput path " + args[1]);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
int returnValue = job.waitForCompletion(true) ? 0:1;
System.out.println("job.isSuccessful " + job.isSuccessful());
return returnValue;
}
}
view raw gistfile1.java hosted with ❤ by GitHub
Note: In order for this program to work your NameNode and DataNodes should have valid name (network recognizable). Because when you run the MapReduce first it makes OPEN request to the webhdfs://172.16.225.192:50070/test/startupdemo.txt URL, the NameNode will send a redirect response with URL pointing to data node like this

$curl -i 'http://172.16.225.192:50070/webhdfs/v1/test/startupdemo.txt?op=OPEN'
HTTP/1.1 307 TEMPORARY_REDIRECT
Cache-Control: no-cache
Expires: Tue, 06 May 2014 21:44:56 GMT
Date: Tue, 06 May 2014 21:44:56 GMT
Pragma: no-cache
Expires: Tue, 06 May 2014 21:44:56 GMT
Date: Tue, 06 May 2014 21:44:56 GMT
Pragma: no-cache
Location: http://ubuntu:50075/webhdfs/v1/test/startupdemo.txt?op=OPEN&namenoderpcaddress=localhost:9000&offset=0
Content-Type: application/octet-stream
Content-Length: 0
Server: Jetty(6.1.26)
Now if your Datanode hostname which is ubuntu in my case is not directly addressable then it will fail with error like this, you can fix this issue by mapping name ubuntu to the right ip in your /etc/host file.

14/05/09 10:16:34 WARN mapred.LocalJobRunner: job_local_0001
java.net.UnknownHostException: ubuntu
 at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:223)
 at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:431)
 at java.net.Socket.connect(Socket.java:527)
 at java.net.Socket.connect(Socket.java:476)
 at sun.net.NetworkClient.doConnect(NetworkClient.java:163)
 at sun.net.www.http.HttpClient.openServer(HttpClient.java:424)
 at sun.net.www.http.HttpClient.openServer(HttpClient.java:538)
 at sun.net.www.http.HttpClient.(HttpClient.java:214)
 at sun.net.www.http.HttpClient.New(HttpClient.java:300)
 at sun.net.www.http.HttpClient.New(HttpClient.java:319)
 at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:987)
 at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:923)
 at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:841)
 at sun.net.www.protocol.http.HttpURLConnection.followRedirect(HttpURLConnection.java:2156)
 at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1390)
 at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:379)
 at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:264)
 at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.access$000(WebHdfsFileSystem.java:106)
 at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$OffsetUrlInputStream.checkResponseCode(WebHdfsFileSystem.java:688)
 at org.apache.hadoop.hdfs.ByteRangeInputStream.openInputStream(ByteRangeInputStream.java:121)
 at org.apache.hadoop.hdfs.ByteRangeInputStream.getInputStream(ByteRangeInputStream.java:103)
 at org.apache.hadoop.hdfs.ByteRangeInputStream.read(ByteRangeInputStream.java:158)
 at java.io.DataInputStream.read(DataInputStream.java:83)
 at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
 at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:114)
 at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:458)
 at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:76)
 at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:85)
 at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:139)
 at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:645)
 at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
 at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:263)
14/05/09 10:16:35 INFO mapred.JobClient:  map 0% reduce 0%
14/05/09 10:16:35 INFO mapred.JobClient: Job complete: job_local_0001
14/05/09 10:16:35 INFO mapred.JobClient: Counters: 0
job.isSuccessful false

WebHDFS REST API

Hadoop provides HTTP REST API interface that exposes access to HDFS using REST API. The WebHDFS provides ability to read and write files in HDFS and also provides support for all operations. It also provides security using Kerberos (SPNEGO) and Hadoop delegation tokens for authentication. You can find more information about it here I wanted to try WebHDFS API, so i followed these steps, first i changed the hdfs-site.xml file and changed it to set value of dfs.webhdfs.enabled property to true.
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/var/data/hadoop/hdfs/nn</value>
</property>
<property>
<name>dfs.checkpoint.dir</name>
<value>file:/var/data/hadoop/hdfs/snn</value>
</property>
<property>
<name>dfs.checkpoints.edits.dir</name>
<value>file:/var/data/hadoop/hdfs/snn</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/var/data/hadoop/hdfs/dn</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
</configuration>
view raw gistfile1.xml hosted with ❤ by GitHub
Then i did restart my hadoop server and when i the HDFS name node was starting i looked at the logs to verify that WebHDFS is started
After restarting the server i used CURL for testing out couple of WebHDFS REST API calls like this