Create ElasticSearch cluster on single machine

I wanted to figure out how to create a multi-node ElasticSearch cluster on single machine. So i followed these instructions
  1. First i did download elasticsearch zip file.
  2. Create 3 folders in /temp like node1, node2 and node3
  3. Unzip elasticsearch.zip in each one of these folders like this
  4. Then i opend the node1/elasticsearch-1.4.2/conf/elasticsearch.yml and i did change value of cluster.name to sunilscluster and value of node.name to first. I did same in the node2 and node 3 but i did set value of node.name to second and third on node2 and node3 respectively
  5. Next step was to install marvel on each of the nodes by executing bin/plugin -i elasticsearch/marvel/latest
  6. Then i went to each of the directory and did start elasticsearch. When the nodes were coming up i could see messages in the log indicating that the nodes were able to discover each other
Once all 3 nodes were started i could see them in marvel
Note: There might be efficient way of doing this in which i could share binaries. I tried creating 3 different elasticsearch.yml and running the elasticsearch server 3 times using elasticsearch -Des.config=/node1/elasticsearch.yml,elasticsearch -Des.config=/node2/elasticsearch.yml but that did not work.

Using NodeClient for connecting to ElasticSearch server

There are 2 options for connecting to ElasticSearch server from java program
  1. TransportClient
  2. NodeClient
As per ElasticSearch.org you should use ransportClient when you want to use short lived connection but you should use NodeClient when you want to use few long-lived connection. If your creating a Web Application that talks to ElasticSearch then you would be better off creating only one connection object during startup of application and destroy the object during shutdown of application. The basic org.elasticsearch.client.Client object is thread safe so you can call it from multiple threads of Web Application. I was trying to figure out how to use NodeClient and ran into few issues but now i have my sample working, and these are my notes during the development process First i did download ElasticSearch version 1.1.1 on my local machine and i tried running it as standalone server (I did not change any thing in the configuration) and then i used following code for connecting to it and it worked In my local ElasticSearch log i could see a log line indicating when my Client connected to cluster and when it disconnected from cluster like this
At this point i was not connected to my office network. But as soon as i connected to my office network (Which has few other elasticsearch server's working), same code started failing and i got org.elasticsearch.cluster.block.ClusterBlockException: blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];[SERVICE_UNAVAILABLE/2/no master] exception

4618 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
7621 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
10623 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
13625 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
16626 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
19628 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
22630 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
25632 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
28634 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
31611 [main] WARN  org.elasticsearch.discovery  - [Tiboro] waited for 30s and no initial state was set by the discovery
31611 [main] INFO  org.elasticsearch.discovery  - [Tiboro] elasticsearch/tuC0zIEOSMaxL6reqCYjPA
31611 [main] DEBUG org.elasticsearch.gateway  - [Tiboro] can't wait on start for (possibly) reading state from gateway, will do it asynchronously
31617 [main] INFO  org.elasticsearch.http  - [Tiboro] bound_address {inet[/0:0:0:0:0:0:0:0:9201]}, publish_address {inet[/192.168.1.10:9201]}
31621 [elasticsearch[Tiboro][clusterService#updateTask][T#1]] DEBUG org.elasticsearch.cluster.service  - [Tiboro] processing [updating local node id]: execute
31621 [elasticsearch[Tiboro][clusterService#updateTask][T#1]] DEBUG org.elasticsearch.cluster.service  - [Tiboro] cluster state updated, version [0], source [updating local node id]
31622 [elasticsearch[Tiboro][clusterService#updateTask][T#1]] DEBUG org.elasticsearch.cluster.service  - [Tiboro] set local cluster state to version 0
31622 [elasticsearch[Tiboro][clusterService#updateTask][T#1]] DEBUG org.elasticsearch.cluster.service  - [Tiboro] processing [updating local node id]: done applying updated cluster_state (version: 0)
31622 [main] INFO  org.elasticsearch.node  - [Tiboro] started
Value of client org.elasticsearch.client.node.NodeClient@16888fd4
Caught exception
31630 [main] INFO  org.elasticsearch.node  - [Tiboro] stopping ...
org.elasticsearch.cluster.block.ClusterBlockException: blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];[SERVICE_UNAVAILABLE/2/no master];
	at org.elasticsearch.cluster.block.ClusterBlocks.globalBlockedException(ClusterBlocks.java:138)
	at org.elasticsearch.cluster.block.ClusterBlocks.globalBlockedRaiseException(ClusterBlocks.java:128)
	at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.(TransportSearchTypeAction.java:107)
	at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction$AsyncAction.(TransportSearchQueryThenFetchAction.java:68)
	at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction$AsyncAction.(TransportSearchQueryThenFetchAction.java:62)
	at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction.doExecute(TransportSearchQueryThenFetchAction.java:59)
	at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction.doExecute(TransportSearchQueryThenFetchAction.java:49)
	at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:63)
	at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:108)
	at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:43)
	at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:63)
	at org.elasticsearch.client.node.NodeClient.execute(NodeClient.java:92)
	at org.elasticsearch.client.support.AbstractClient.search(AbstractClient.java:212)
	at org.elasticsearch.action.search.SearchRequestBuilder.doExecute(SearchRequestBuilder.java:1043)
	at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:85)
	at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:59)
	at HelloESClient.getNodeClient(HelloESClient.java:34)
	at HelloESClient.main(HelloESClient.java:15)
31639 [main] INFO  org.elasticsearch.node  - [Tiboro] stopped
Exiting HelloESClient.main()
The problem was my client was trying to discover all different ES servers on my office network and that was causing problem. So i had to make changes in the code to introduce settings object then i set discovery.zen.ping.multicast.enabled to false and discovery.zen.ping.unicast.hosts to localhost, note that i did not change anything on elastisearch configuration. It was running as usual. After making changes i used following code to connect to elasticsearch server on my local. This client is saying only talk to elasticsearch on my local machine. After that i wanted to figure out how this will work in case of cluster, so stopped my elasticsearch server. I changed elasticsearch.yml to following now instead of default cluster name of elasticsearch. I changed it to use sunilscluster as cluster name and i did restart my es.

cluster.name: sunilscluster
node.name: deves11
In order for my NodeClient to work i had to change it and set clusterName("sunilscluster") like this Once this was done i tried connecting to ElasticSearch server in development cluster, which was on separate network with master nodes as masternode1, masternode2, masternode3. For that all i had to do was change value of discovery.zen.ping.unicast.hosts to masternode1.dev.com,masternode2.dev.com,masternode3.dev.com but i got following error The problem is when you use NodeClient it tries to talk to es server on UDP, lot of times the ports are blocked. Only solution to this is that either open UDP port or run your client on machine where UDP port can be accessed, take a look at this for more information

2014-12-30 10:14:09 DEBUG zen:104 - [nodeclient1] filtered ping responses: (filter_client[true], filter_data[false])
	--> target [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}], master [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}]
2014-12-30 10:14:09 DEBUG netty:104 - [nodeclient1] connected to node [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}]
2014-12-30 10:14:09 INFO  zen:114 - [nodeclient1] failed to send join request to master [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}], reason [org.elasticsearch.transport.RemoteTransportException: [masternode1][inet[/172.30.0.0:9300]][discovery/zen/join]; org.elasticsearch.transport.ConnectTransportException: [nodeclient1][inet[/172.29.8.36:9300]] connect_timeout[30s]; java.net.ConnectException: Connection refused: /172.29.8.36:9300]
2014-12-30 10:14:09 DEBUG netty:104 - [nodeclient1] connected to node [[#zen_unicast_1#][VDD7S0ITAA024][inet[masternode1.dev.com/172.30.0.0:9300]]]
2014-12-30 10:14:12 DEBUG netty:104 - [nodeclient1] disconnected from [[#zen_unicast_1#][VDD7S0ITAA024][inet[masternode1.dev.com/172.30.0.0:9300]]]
2014-12-30 10:14:12 DEBUG zen:104 - [nodeclient1] filtered ping responses: (filter_client[true], filter_data[false])
	--> target [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}], master [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}]
2014-12-30 10:14:12 INFO  zen:114 - [nodeclient1] failed to send join request to master [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}], reason [org.elasticsearch.transport.RemoteTransportException: [masternode1][inet[/172.30.0.0:9300]][discovery/zen/join]; org.elasticsearch.transport.ConnectTransportException: [nodeclient1][inet[/172.29.8.36:9300]] connect_timeout[30s]; java.net.ConnectException: Connection refused: /172.29.8.36:9300]
2014-12-30 10:14:12 DEBUG netty:104 - [nodeclient1] connected to node [[#zen_unicast_1#][VDD7S0ITAA024][inet[masternode1.dev.com/172.30.0.0:9300]]]
2014-12-30 10:14:15 DEBUG netty:104 - [nodeclient1] disconnected from [[#zen_unicast_1#][VDD7S0ITAA024][inet[masternode1.dev.com/172.30.0.0:9300]]]
2014-12-30 10:14:15 DEBUG zen:104 - [nodeclient1] filtered ping responses: (filter_client[true], filter_data[false])

How to use ElasticSearch from Web Application (How to fix java.lang.OutOfMemoryError: PermGen space error in Tomcat with ElasticSearch client as use case)

Recently i was working on a issue of Memory leak in the Web Application that talks to ElasticSearch. The problem was when i tried to deploy/undeploy the application few times without restarting server Tomcat ran out of memory and i got Exception in thread "http-bio-8080-exec-30" java.lang.OutOfMemoryError: PermGen space error. I wanted to figure out what is going on, so first i did create this simple web application that uses ElaticSearch client and provides a JAX-RS api to proxy calls to ElasticSearch. This is how my sample ESClient.java looks like this takes care of establishing connection with ElasticSearch using TransportClient I could use the RESTClient to make a POST call for search like this
Now when i use the VisualVM for looking at the application, i could see elasticsearch created a threadpool of about 17 threads that are connected to the ElasticSearch like this
When i was stop/ my application for updating it, these threads created by ElasticSearch do not getting closed and because of the classes loaded by the web application cannot be garbage collected and i can see following warning messages on the console. [Dec 23, 2014 8:09:55 PM] [org.apache.catalina.loader.WebappClassLoader clearReferencesThreads] [SEVERE]: The web application [/ESClientWeb] appears to have started a thread named [elasticsearch[Blink][[timer]]] but has failed to stop it. This is very likely to create a memory leak. I see bunch of these messages in the console like this
When i use the Find Leak functionality on Apache Tomcat console i could see my application name shows up like this multiple times, once for every update
After couple of redeploy's my Tomcat runs out of memory and throws Exception in thread "http-bio-8080-exec-30" java.lang.OutOfMemoryError: PermGen space error like this
In order to solve this problem, all i had to do was to create ContextListener like this, i am creating object of org.elasticsearch.client.transport.TransportClient during startup and closing it during application stopping. I had to make couple of changes in ESClient.java like this After making these changes i could see the thread pool getting destroyed during shutdown of application and as a result the application classes are getting garbage collected and no more OutOfMemoryError

Debugging Tomcat memory leak using java VisualVM

Recently i was trying to debug a memory leak in the Web Application deployed in Apache Tomcat, i wanted to figure out how the JVM memory was doing. For that i decided to use Java VisualVM and it turns out that is pretty easy to use and very powerful.
  1. I had the Apache Tomcat running on my machine, so i tested it if the application was running ok.
  2. The next step was to execute jvisualvm command which is part of JDK.
  3. It brought up Java VisualVM like this, as you can see it shows you every Java application that is running on your machine, including tomcat
  4. Then double click on the Tomcat process and it opens the details like this, As you can see i dont have to set any JVM flags but could attach the VisualVM directly
  5. The monitor tab gives you high level information about the VM like how its doing on memory and threads,.. etc. It also has Heap Dump button, when you click on it, That generates Heap Dump. By default the heap dump goes to /var/folders/.. folder. So right click on the Heap Dump related node and click on Save As and save it somewhere else on your disk. Then you can use Eclipse MAT to analyze the dump

How to view the log files and job.xml generated by Hive on HortonWorks Sandbox

I was working on building some hive code and i ran into some problems, My query kept failing with message like invalid character in job.xml. I tried to locate the job.xml but could not find it. Same thing i could not find any logs on the local machine. So i wanted to figure out how to debug this problem. I followed these steps.
  1. First thing i did in HDP 2.1 sandbox is to enable Ambari by clicking on Enable button like this
  2. Next step was to login into Ambari by going to http://localhost:8080/ and then entering admin/admin as username and password
  3. In the Ambari Application i went to the Yarn Service screen and i did uncheck Resource Manager -<yarn.log-aggregation-enable checkbox. It disables the log aggregation which copies the log files from file system to HDFS and zips them. Disabling the log aggregation keeps them on the local filesystem.
  4. Then in the Advanced section i did change value of yarn.nodemanager.debug-delay-sec to 60000 seconds, By default value of this property is 0, which means as soon as the job is done nodemanager deletes all the intermediate files. By setting it to 60000 i am preventing it from getting deleted for 100 seconds.
  5. THen i did restart all the services using Ambari for my changes to take affect
  6. After that i did execute couple of hive queries and now i could see my logs in /hadoop/yarn/logs directory like this
  7. And i could see the job.xml files being conserved in /hadoop/yarn/local/usercache/root/appcache folder for application like this

How to update records in Hive Table that uses ElasticSearch for storage

I had a requirement in which i wanted to update the Hive table. Now Hive is more of append only database and you cant update records in it (That limitation comes from Text files stored in HDFS which is how stores data by default). But if your using Hive with elasticSearch as storage then you can get this to work. When your using ElasticSearch as storage mechanism then every call from hive to insert or delete data gets forwarded to ElasticSearch API, and ElasticSearch has ability to update existing records. I used this to implement the updatable Hive table. So the scenario is lets assume you have a elasticsearch Index that stores First Name, Last Name and Email as document in ElasticSearch. For that create a new index in ES with name equals cricketers and type equals player by making a CURL call like this.

curl -XPOST "http://localhost:9200/cricketers/player/1" -d'
{
    id: "1",
    fname: "First",
    lname: "Cricketer",
    email: "first.cricketer@gmail.com"
}'
This call will first create a Index named cricketers in ES and insert one document in it, with id equals 1. Next step is to define a external table in Hive that uses org.elasticsearch.hadoop.hive.EsStorageHandler as StorageHandler and is pointing to cricketers/player index that you created in last step. Also important setting is 'es.mapping.id'='id' which is saying that use value of id column as primary key/id in elasticsearch.

create external table cricketers_es(id String, fname String, lname String, email String) stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler' 
TBLPROPERTIES('es.resource'='cricketers/player', 'es.index.auto.create'='false', 'es.mapping.id'='id')
Once the table is created you can check records in it by executing select * from cricketers_es command. Now you should see 1 record that is there in the Index.
Since hive does not have concept of update statement. You will have to create a hive table that will have the records that you want to insert/update(Only delta) and then you will use this delta table for updating the cricketers_es table. In order to do that first create a text file that holds delta of the records that you want to update. In my case i did create this simple cricketers.txt file like this and upload into to HDFS at /user/hue folder

1,sachin,tendulkar,sachin.tendulakar@bcci.com
2,Rahul,Dravid,rahul.dravid@bcci.com
After that create a Hive table called cricketers_stage which will be used for holding the delta records you want by executing following statement

create table cricketers_stage(id String, fname String, lname String, email String) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
Now use following Hive statement to load your delta records into cricketers_stage like this.

LOAD DATA INPATH '/user/hue/cricketers.txt' INTO TABLE `default.cricketers_stage`
Next step ElasticSearch write mode to upsert by setting following property in the Hive console.

es.write.operation=upsert
The last step is to execute following statement in Hive which will take content of cricketers_stage and insert those records into cricketers_es table.

insert into cricketers_es select * from cricketers_stage
Now if you run select * from cricketers_es you should see 2 records your first record is updated and record with id 2 is new insert.

1,sachin,tendulkar,sachin.tendulakar@gmail.com
2,Rahul,Dravid,rahul.dravid@bcci.com
You can also verify the records in elasticsearch by executing following CURL command

curl -XPOST "http://localhost:9200/cricketers/player/_search?pretty=true" -d'
{
  "query": {
      "match_all": {}
  }
}'

Connecting to HDP 2.2 Hive using JDBC driver

In the http://wpcertification.blogspot.com/2014/04/connecting-to-hive-using-jdbc-client.html i blogged about how to connect to Hive using Apache Hive. I wanted to figure out how to connect to Hive 0.13 in Hortonworks Sandbox 2.2, so i followed these steps You can download the complete source code for this project from here First i did create App.java class like this. When connecting to HDP 2.2 i had to use HiveServer2. I followed instructions on HDP Documents I used following maven build file, you can see i am including Hive, Hadoop and Log4j jar. The log4j.jar lets me enable logging Once my code was complete i tried connecting to Hive and got following error around access control.

java.sql.SQLException: Error while compiling statement: FAILED: HiveAccessControlException Permission denied. 
Principal [name=root, type=USER] does not have following privileges on Object [type=TABLE_OR_VIEW, name=default.sample_07] : [SELECT]
 at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:121)
 at org.apache.hive.jdbc.Utils.verifySuccessWithInfo(Utils.java:109)
 at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:231)
 at org.apache.hive.jdbc.HiveStatement.executeQuery(HiveStatement.java:355)
 at com.spnotes.hive.App.main(App.java:24)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:601)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)

I had to use following command in hive console to give access to root user for querying employee table.

hive> grant select on table sample_08 to user employee;

Fixing "WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!" problem

I use HortonWorks Sandbox for learning/trying out Hadoop features. Now the way Hortonworks Sandbox works is your supposed to download the VM and then ssh into it using ssh root@127.0.0.1 -p 2222

for connecting to it or playing with it. Initially i had HDP 2.1 sandbox but when i downloaded HDP 2.2 and tried to connect to it i got WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED! error. So basically both HDP 2.1 and HDP 2.2 has a different SSH key but your trying to refer them using same IP 127.0.0.1 and your machine says hey something is wrong your ip is same but SSH key does not match
In order to solve this problem, all i did was to open /Users/test/.ssh/known_hosts file in text editor and then i searched for 127.0.0.1 and i removed the line that started with this ip and saved my known_hosts file
Now when i rerun the ssh root@127.0.0.1 -p 2222

command i am able to connect to the HDP 2.2 image ok.

How to use Apache Storm Trident API

Sometime back i blogged about HelloWorld - Apache Storm Word Counter program , which demonstrates how to build WordCount program using Apache Storm. Now problem with that project was that it was not Maven project instead i had screen shot of all the jars that you will have to include in the program. So i changed it to use Apache Maven as build framework. You can download the source code. In addition to normal API, storm also provides trident API, which allows us to build much compact code, i wanted to try that out so i built this simple Word Count program using Trident API. While using Trident API you will have to start by creating object of TridentTopology, you still need LineReaderSpout that takes file path as input, reads and emits one line of file at a time. But the part that is different is you dont need WordSpitterBolt and WordCounterBolt, instead you can use compact code like this

 topology.newStream("spout",lineReaderSpout)
.each(new Fields("line"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.aggregate(new Fields("word"), new Count(), new Fields("count"))
.each(new Fields("word","count"), new Debug());
The each(new Fields("line"), new Split(), new Fields("word")) line takes the line emitted by the LineReaderSpout and uses built in storm.trident.operation.builtin.Split function to split the lines into words and emits each word as Tuple. The groupBy(new Fields("word")) line takes the tuples and groups them by word's. The aggregate(new Fields("word"), new Count(), new Fields("count")) line takes care of aggregating the words and counts them(At this point you have a tuple like {word,count}), for that it uses storm.trident.operation.builtin.Count class. The last part is .each(new Fields("word","count"), new Debug());, which takes care of printing each tuple which in WORD count format. Trident API provides set of sample classes that makes developing WordCount type of program very easy. But you could have created your own version of Split and Count program and the code would still look significantly compact

How to use ElasticSearch as input for Apache Spark program

In the How to use ElasticSearch as input for MapReduce program entry i blogged about how to create a MapReduce program that reads data from ElasticSearch Index or query as input and uses it to produce some output. I wanted to build same functionality using Apache Spark, you can download the source code for the project from here Basic idea in the program is that i have a hadoop/contact index/type that contains contact records that look like this, with every contact having first name, last name and address. I want to write a program that tells me how many contacts are from particular city.

{
   "lastName":"Tendulkar",
   "address":[
      {
         "country":"India\t",
         "addressLine1":"1 Main Street",
         "city":"Mumbai"
      }
   ],
   "firstName":"Sachin",
   "dateOfBirth":"1973-04-24"
}
In order to do that i build a simple HelloESInputSpark.java class that looks like this (I did add it to my WordCount Apache Spark project that i built in WordCount program built using Apache Spark in Java ) This program is similar to any other with difference of few lines, i had to create a Hadoop Configuration object and set properties required to use ESInputFormat as InputFormat and then call sc.newAPIHadoopRDD(} to pass the newly created Hadoop Configuration object to it.

Configuration hadoopConfiguration = new Configuration();
hadoopConfiguration.set("es.nodes","localhost:9200");
hadoopConfiguration.set("es.resource","hadoop/contact");
JavaPairRDD esRDD = sc.newAPIHadoopRDD(hadoopConfiguration, EsInputFormat.class, Text.class, MapWritable.class);
The Spark framework reads the ElasticSearch index as Map in which the id of the record is key and the actual record becomes value and get passed as object of MapWritable. You have to use little bit different plumbing to read embedded values stored inside the record I configured Spark to store the output on the disk it created different part files and you can see the content of the part files like this

WordCount program built using Apache Spark in Java

In the WordCount program writtten using Spark framework written in python language entry i talked about how to create WordCount program using Apache Spark in python language. Now i wanted to try building same program using Java, so i followed these steps. You can download the full code from here This is how the Java class looks like, it has 4 anonymous classes
  1. file.flatMap(new FlatMapFunction() {}): This class gets called for every line in the input file, it splits the input line and return list of words in the line
  2. words.filter(new Function() {}): This class gets the list of words and checks if the word is actually a word or just a blank space. My input file has some blank spaces so i used this filter (Part of the reason was to try out Filters) to remove the words which are just blank spaces
  3. words.mapToPair(new PairFunction() {}): This class takes list of words as input and converts into Tuple2 with word as key and 1 as value
  4. wordToCountMap.reduceByKey(new Function2() {}): This class gets called with running total and count as input and only thing that is does is adds the current count to running total
This is how the maven build script for my program looks like, i had to fight through couple of issues to build this maven, first one is by default when you add org.apache.spark.spark-core_2.10 dependency it includes HDFS 1.0.4 jars, and in my case i have Hadoop 2.4 server so i got following error, you get this error even if you include HDFS related jars because now suddenly you have both Hadoop 2.4 and Hadoop 1.0.4 jars so you have to ask maven to exclude hadoop client included by org.apache.spark.spark-core_2.10

14/08/25 12:16:50 WARN snappy.LoadSnappy: Snappy native library not loaded
Exception in thread "main" org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4
 at org.apache.hadoop.ipc.Client.call(Client.java:1070)
 at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
 at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source)
 at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
 at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
 at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
 at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:238)
 at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:203)
 at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
 at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
 at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
 at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
 at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.FilteredRDD.getPartitions(FilteredRDD.scala:29)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:59)
 at org.apache.spark.api.java.JavaPairRDD.reduceByKey(JavaPairRDD.scala:385)
 at com.spnotes.spark.WordCount2.main(WordCount2.java:62)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
 
Once that part is done, i ran into issue because of 2 different versions of Jetty getting included Exception in thread "main" java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package so i had to add Jetty related jars exclusion filter in org.apache.hadoop.hadoop-common

How to use ElasticSearch as input for MapReduce program

In the Saving complex object in elasticsearch as output of MapReduce program entry, i talked about how to use ElasticSearch for storing output of the MapReduce job. In that blog i was creating Contact records that look like this in elasticSearch

{
   "lastName":"Tendulkar",
   "address":[
      {
         "country":"India\t",
         "addressLine1":"1 Main Street",
         "city":"Mumbai"
      }
   ],
   "firstName":"Sachin",
   "dateOfBirth":"1973-04-24"
}
I wanted to figure out how to use ElasticSearch as input for MapReduce program, so i decided to create a MapReduce program that reads the contact Index and generates output on how many players are coming from a city. You can download the sample program from here This is how my MapReduce program looks like, you can run the driver program with 2 arguments ex. hadoop/contact file:///home/user/output/ first is name of the ElasticSearch Index/type and second is the output directory where the output will get written. This program has 3 main components
  1. MRInputDriver: In the Driver program you have to set es.nodes entry pointing to address of your elasticsearch installation and value of es.resource is name of the ElasticSearch index/type name. Then i am setting job.setInputFormatClass(EsInputFormat.class);, which sets EsInputFormat class as the input reader, it takes care of reading the records from ElasticSearch
  2. MRInputMapper: The Mapper class sets Object as value of both Key and Value type. ElasticSearch Hadoop framework reads the record from ElasticSearch and passes id as key(Text) and the content of value is object of MapWritable class that represents the record stored in elasticsearch. Once i have the value, i am reading address from it and mapper writes City name as key and value 1.
  3. MRInputReducer: The reducer is pretty simple it gets called with name of the city as key and Iterable of values, this is very similar to reducer in WordCount.
After running the program i could see output being generated like this

Bangalore 2
Delhi 1
Mumbai 1
Ranchi 1

Saving complex object in elasticsearch as output of MapReduce program

In the Using ElasticSearch to store output of MapReduce program i built a sample MapReduce program that writes output to the ElasticSearch. I wanted to figure out how to write output of MapReduce job into elasticseach so that it creates complex JSON structure which has embedded object and uses date type. I decided to create this sample program which reads .csv file and generates contact records in elasticsearch, which look like this . You can download sample code for this project from here I followed these steps for creating the program.
  1. First thing i did was create hadoop/contact index in ElasticSearch by using following mapping. You can create the index by making HTTP POST call to http://localhost:9200/hadoop
  2. Once the index was created i created a Contact.java object which looks like this As you can see this is a simple POJO which has list of Address objects which is another pojo. It also uses CustomDateSerializer as custom date serializer
  3. Next step was to create CustomDateSerializer.java which is a class that has both the Mapper and the MapReducer driver class.
    • ContactImportMapper: This is a mapper class that gets one line from Contact.txt at a time and it splits it based on comma and then uses the values to create Contact.java object. In the mapper class on line 67, i am converting the Contact object into JSON using Jackson parser and then setting the JSON as value of Text. Then on line 70, i am writing NullWriter as key and Contact JSON as value, which gets inserted into ES
    • ContactImportDriver: This class is same as any other MapReduce driver with few exceptions, On line 101 i am calling job.getConfiguration().set("es.input.json", "yes");, which tells ElasticSearch output to treat the output as JSON. Then on line 109, i am setting OutputFormatter to EsOutputFormat so that ElasticSearch Hadoop framework can take control of persisting output. job.setOutputFormatClass(EsOutputFormat.class);
  4. Once the program was built i am passing 2 parameters to program first is the full path of contact.txt file and second argument is the ElasticSearch index and type in hadoop/contact format. This is how the content of contact.txt file looks like
    
    Sachin,Tendulkar,24-Apr-1973,1 Main Street,Mumbai,India 
    Rahul,Dravid,01-Jan-1973,1 Main Street,Bangalore,India
    Anil,Kumble,17-Oct-1970,1 Main Street,Bangalore,India
    Virat,Kohali,05-Nov-1988,1 Main Street,Delhi,India
    Mahendra Singh,Dhoni,07-Jul-1981,1 Main Street,Ranchi,India
    Saurav Ganguli,08-Jul-1982,1 Main Street,Kolkata,India
    
After executing the code i did execute following query to find out all the players, with date of birth between 1970-1974 like this This is how my sample output looks like

How to create custom Combiner class with your MapReduce framework

The MapReduce framework passes control to your combiner class at the end of the map phase to combine different output files generated by Mappers, so that your combiner class combines/reduce the data generated by Mappers before it gets transferred to the Reducers. Sending data from Mapper to reducer requires that data to go over network from Mapper to Reducer. I wanted to try creating custom combiner class, In order to keep things simple i decided to add combiner class in WordCount(HelloWorld) MapReduce program . Basically my combiner class does same thing as reducer, which is to take multiple [word, 1] tuples and combine them into something like [word1, 5], [word2, 6],,, etc. I followed these steps
  1. First thing that i did was to create WordCountCombiner.java class that looks same as that of WordCountReducer, but i did add one System.Out.println() in it so that i would know when my combiner is called instead of reducer.
  2. Then i changed the driver class for my MapReduce framework class to add job.setCombinerClass(WordCountCombiner.class); line in it.
Then i did execute the WordCountDriver class with 3 files as input and i could see my Combiner class getting called after the Mapper class for each input file, before it wrote the mapper output to the disk and before starting reducer phase.

Creating custom Partitioner class for your mapreduce program

The MapReduce framwork uses instance of org.apache.hadoop.mapreduce.Partitioner class to figure our which mapreduce output key goes to which reducer. By default it uses org.apache.hadoop.mapreduce.lib.partition.HashPartitioner, this class calculates hash value for the key and divides it by number of Reducers in the program and uses remainder to figure out the reducer it goes to. This implementation is pretty good and as long as the keys generate hashCodes that gives uniform distribution it should be good. But in some exceptional cases you might want to take control of how the output of Mapper gets distributed to Reducers. I just wanted to figure out how this works, so i decided to change my WordCount(HelloWorld) MapReduce program to add a custom Partitioner that sends upper and lower case alphabets two 2 different reducers. I followed these steps
  1. First i did create a WordCountPartitioner.java class like this First thing i am doing is checking if there are 2 reducers if yes i am using the first letter of the key to figure out if it starts with lower case letter (simply check it against 'a' letter, if yes send it to first reducer if not send it to second reducer
  2. I had to make few changes in the Driver program to use my WordCountPartitioner
    • job.setNumReduceTasks(2): This call is asking MapReduce framework to use 2 reducers
    • job.setPartitionerClass(WordCountPartitioner.class); This call is setting my WordCountPartitioner as the class for partitioner
This screen shot shows how my sample.txt got divided into 2 reducer outputs. First 2 lines show output with default HashPartitioner and 2nd 2 lines show output when i used my custom Partitioner

Python client for publishing and consuming message from Apache Kafka

In the Java Client for publishing and consuming messages from Apache Kafka i talked about how to create a Java Client for publishing and consuming messages from Kafka. I wanted to try same thing using Python so i followed these steps
  1. Follow steps 1 through 4 of Java Client for publishing and consuming messages from Apache Kafka entry to start Zookeeper and Kafka server
  2. Follow these steps to install Kafka python client
    
    git clone https://github.com/mumrah/kafka-python
    pip install ./kafka-python
    
  3. Next create a Producer.py python script to publish message to pythontest topic, the basic concept here is you connect to Kafka server on localhost:9092 port and then publish a message to a particular topic
  4. Now create Consumer.py script that consumes messages from pythontest topic and writes them to console.

Java Client for publishing and consuming messages from Apache Kafka

I wanted to learn how to use Apache Kafka for publishing and consuming messages from Apache Kafka using Java client, so i followed these steps.
  1. Download the Kafka binaries from Kafka download page
  2. Unzip the kafka tar file by executing tar -xzf kafka_2.9.2-0.8.1.1.tgz. Then go to kafka directory by executing cd kafka_2.9.2-0.8.1.1
  3. Next start the Zookeeper server by executing following command
    
    bin/zookeeper-server-start.sh config/zookeeper.properties
    
  4. Start the Kafka server by executing following command
    
    bin/kafka-server-start.sh config/server.properties
    
  5. Now your Zookeeper and Kafka server are ready and you can download the source code for sample project from here
  6. This is how a Java Client that publishes messages to Kafka looks like, execute it couple of times to publish couple of messages First thing that you have to do while developing a producer is connect to the Kafka server, for that you will set value of metadata.broker.list property to point to the port on which kafka server is listening (You can find value of port and host name from server.properties that you used in step 4. Once you have Producer object you can use it for publishing messages by creating object of kafka.producer.KeyedMessage, you will have to pass name of the topic and message as argument
  7. This is how the Java client for consumer of messages from Kafka looks like, run it and it will start a thread that will keep listening to messages on topic and every time there is a message it will print it to console The HelloKafkaConsumer class extends Thread class. In the constructor of this class first i am creating Properties class with value of zookeeper.connect property equal to the port on which zookeeper server is listening on. In the constructor i am creating object of kafka.javaapi.consumer.ConsumerConnector
    Once the ConsumerConnector is ready in the run() method i am passing it name of the topic on which i want to listen (You can pass multiple topic names here). Everytime there is a new message i am reading it and printing it to console.

How MapReduce job in Yarn framework works

In case of Yarn MapReduce framework these are the main players involved in the process of running mapreduce application.
  1. Client The client submits the MapReduce job. Client is responsible for copying the map reduce related jars, configuration files and the distributed cache related files(jars, archives and files) into HDFS for distribution. It is also responsible for splitting the input into pieces and saving that information into HDFS for later use
  2. Application Master To manage lifecycle of application running on the cluster. When the map reduce framework starts application it creates application master on one of the worker nodes and the application master runs for the lifecycle of application/job
  3. Application master negotiates with the resource manager for cluster resources - described in terms of a number of containers, each with certain memory limit. Application master is also responsible for tracking the application progress such as status as well as counters across application.
  4. Resource Manager To manage the use of compute resources(Resources available for running map and reduce related tasks) across the cluster
  5. Node Manager node managers runs the application specific processes in the containers. The node manager ensures that the application does not use more resources than it has been allocated.

Creating Oozie workflow for mapreduce job that uses distributed cache

In the Using third part jars and files in your MapReduce application(Distributed cache) entry i blogged about how to create a MapReduce job that uses distributed cache for storing both required jar files and files for use in distributed cache. I wanted to figure out how to automate this mapreduce job using Apache Oozie so i followed these steps
  1. First i did create apachelog directory and in that i had to create job.properties file like this
  2. Then i create workflow.xml file that looks like this, in this one thing to notice is <file>GeoLite.mmdb#GeoLite2-City.mmdb</file>, so basically i have file GeoLite.mmdb on the disk but i want to refer to it as GeoLite2-City.mmdb in my program so that file element takes care of creating symlink
  3. Then i copied all the required jar files in the lib folder and then this is how my directory structure looks like
  4. I used following command to copy the apachelog directory that has everything that my oozie job needs to the hdfs
    
    hdfs dfs -put apachelog apachelog
    
  5. Last step is to invoke the oozie job by executing following command
    
    oozie job -oozie http://localhost:11000/oozie -config job.properties -run
    

Using Apache Oozie for automating streaming map-reduce job

In the WordCount MapReduce program using Hadoop streaming and python i talked about how to create a Streaming map-reduce job using python. I wanted to figure out how to automate that program using Oozie workflow so i followed these steps
  1. First step was to create a folder called streaming on my local machine and copying of mapper.py, reducer.py into the streaming folder, i also create the place holder for job.properties and workflow.xml
  2. Next i did create a job.properties file like this Now this job.properties is quite similar to the job.properties for java mapreduce job, only difference is you must set oozie.use.system.libpath=true, by default the streaming related jars are not included in the classpath, so unless you set that value to true you will get following error
    
    2014-07-23 06:15:13,170 WARN org.apache.hadoop.mapred.Child: Error running child
    java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.streaming.Pi
    peMapRunner not found
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1649)
     at org.apache.hadoop.mapred.JobConf.getMapRunnerClass(JobConf.java:1010)
     at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:413)
     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
     at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
     at java.security.AccessController.doPrivileged(Native Method)
     at javax.security.auth.Subject.doAs(Subject.java:396)
     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
     at org.apache.hadoop.mapred.Child.main(Child.java:262)
    Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.streaming.PipeMapRunner not f
    ound
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1617)
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1641)
     ... 8 more
    Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.streaming.PipeMapRunner not found
     at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1523)
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1615)
     ... 9 more
    2014-07-23 06:15:13,175 INFO org.apache.hadoop.mapred.Task: Runnning cleanup for the task
    
  3. Next step in the process is to create workflow.xml file like this, make sure to add <file>mapper.py#mapper.py</file> element in the workflow.xml, which takes care of putting the mapper.py and reducer.py in the sharedlib and creating symbolic link to these two files.
  4. Upload the streaming folder with all your changes on hdfs by executing following command
    
    hdfs dfs -put streaming streaming
    
  5. You can trigger the oozie workflow by executing following command
    
    oozie job -oozie http://localhost:11000/oozie -config streaming/job.properties -run
    

Using Apache Oozie to execute MapReduce jobs

I wanted to learn about how to automate MapReduce job using Oozie, so i decide to create Oozie workflow to invoke WordCount(HelloWorld) MapReduce program. I had to follow these steps
  1. FIrst thing that i did was to download the WordCount program source code by executing
    
    git clone https://github.com/sdpatil/HadoopWordCount3
    
    This program does have maven script for building executable jar, so i used mvn clean package command to build Hadoop jar.
  2. After that i tried executing the program manually by using following following command
    
    hadoop jar target/HadoopWordCount.jar sorttest.txt output/wordcount
    
  3. Now in order to use Oozie workflow you will have to create a particular folder structure on your machine
    
    wordcount
       -- job.properties
       -- workflow.xml
       -- lib
             -- HadoopWordCount.jar  
    
  4. In the workcount folder create job.properties file like this, This file lets you pass parameters to your oozie workflow. Value of nameNode and jobTracker represent the name node and job tracker location. In my case i am using cloudera vm with single ndoe so both these properties point to localhost. The value of oozie.wf.application.path is equal to HDFS path where you uploaded the wordcount folder created in step 3
  5. Next define your Apache oozie workflow.xml file like this. In my case the workflow has single step which is to execute mapreduce job. I am
    • mapred.mapper.new-api & mapred.reducer.new-api: Set this property to true if your using the new MapReduce API based on org.apache.hadoop.mapreduce.* classes
    • mapreduce.map.class: The fully qualified name of your mapper class
    • mapreduce.reduce.class: The fully qualified name of your reducer class
    • mapred.output.key.class: Fully qualified name of the output key class. This is same as parameter to job.setOutputKeyClass() in your driver class
    • mapred.output.value.class: Fully qualified name of the output value class. This is same as parameter to job.setOutputValueClass() in your driver class
    • mapred.input.dir: Location of your input file in my case i have sorttext.txt in hdfs://localhost/user/cloudera directory
    • mapred.output.dir:Location of output file that will get generated. In my case i want output to go to hdfs://localhost/user/cloudera/output/wordcount directory
  6. Once your oozie workflow is ready upload the wordcount folder in HDFS by executing following command
    
    hdfs dfs -put oozie wordcount
    
  7. 
    Now run your oozie workflow by executing following command from your wordcount directory
    oozie job -oozie http://localhost:11000/oozie -config job.properties -run
    
    If it runs successfully you should see output generated in hdfs://localhost/user/cloudera/output/wordcount directory

Enabling Oozie console on Cloudera VM 4.4.0 and executing examples

I am trying to learn about Apache Oozie, so i wanted to figure out how to use it in Cloudera 4.4.0 VM. When you go to the Oozie web console it shows a message saying that the Console is disabled. In order to enable the console i had to follow these steps
  1. Go to your Cloudera Manager, in that i went to the oozie configuration screen and i did check the Enable Oozie Server Web Console screen like this. As you can see in the description it says install ExtJS2.2 in /usr/lib/oozie/libext
  2. Next i did go to /usr/lib/oozie/libext directory and executed following command to download the ext-2.2.zip.
    
    wget 'http://extjs.com/deploy/ext-2.2.zip'
    
    Since i am using CDH 4.4 i had to execute unzip ext-2.2.zip to unzip the ext-2.2.zip
  3. Last step was to restart the oozie service and now i could see the Oozie web console
Executing oozie examples After the Oozie console was enabled i wanted to execute oozie example to test out my installation so i followed these steps
  1. First thing for me was to find the oozie-examples.tar.gz file on my vm
    
    find / -name oozie-examples.tar.gz
    
    I found it under /usr/share/doc/oozie-3.3.2+92/ directory. So i did untar it using tar xvf oozie-examples.tar.gz
  2. Then i had to make change in the job.properties to change value of namenode and jobTracker from localhost to localhost.localdomain get rid of Error: E0901 : E0901: Namenode [localhost:8020] not allowed, not in Oozies whitelist error.
    
    nameNode=hdfs://localhost.localdomain:8020
    jobTracker=localhost.localdomain:8021
    queueName=default
    examplesRoot=examples
    
    oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/map-reduce
    outputDir=map-reduce
    
  3. After making changes in job.properties i did upload the examples folder to HDFS using following command
    
    hdfs dfs -put examples examples
    
  4. The last step in the process was to actually run the mapreduce job in oozie by executing following command
    
    oozie job -oozie http://localhost:11000/oozie -config examples/apps/map-reduce/job.properties -run
    
  5. Once the job was started i could see the progress using Oozie web console like this

Where are MapReduce logs when your using yarn framework

For last couple of months i have been using Yarn framework for running my mapreduce jobs. Normally using Yarn is transparent so i did not have to do any thing different but just change my mapred-site.xml file to set value of mapreduce.framework.name to yarn like this. But YARN affects how the logs and job history gets stored. For example if your using traditional map reduce framework you can go to http://localhost:50030 to look at the job and task history and also access the logs generated by mapreduce framework. In case of Yarn you will have to go to http://localhost:8088/cluster and it will take you to Resource Manager Home page like this, there you should see list of applications and then click on the name of the application and to get more details
When you try to look at the logs for application, it takes you the nodemanager home page like this
Since i am working on single node cluster i like to go to the hadoop log directory and there under userlogs directory i can see log folders for each application. The application folder is subdivided into container folder one for mapper task one for reducer task and one for driver task and each of the container folders has one file for stdout, stderr and syslog that contains more output. If you have any System.out.println() in your mapper or reducer class you should find the appropriate container folder and stdout file in that container should have output that you generated using System.out.println()

Using counters in MapReduce program

While developing mapreduce jobs you might want to keep counters for some conditions that you find. For example in Map Reduce job that uses GeoIP to counts number of requests from particular city, i want to check how many requests came from US, India vs. other countries. Also there are cases when you try to find out location of a IP address and if the IP is not in the GeoIP database it throws error. I wanted to see how many ips are not found in DB. In order to do that i changed the ApacheLogMapper.java like this I had to make following changes in the ApacheLogMapper.java
  1. Declare Enum: On line 24 i had to create a Enum for GEO, i am declaring 4 different counters in it one for ERRORS, 1 for ips in US, 1 for ips in India and 1 for everything else
  2. Count countries: On line 67 i am checking if the location of the ip is in USA, if yes i am increasing counter for USA by 1 using context.getCounter(GEO.US).increment(1)
  3. Counting errors: Whenever GeoIP API is not able to find the ip in GeoIP database it throws GeoIp2Exception, i am catching that exception and using it as opportunity to increment ERROR count by 1 using context.getCounter(GEO.ERROR).increment(1);
After executing the MapReduce program i could see the different counters calculated by Hadoop on the console output like this

Using DistributedCache with MapReduce job

In the Using third part jars and files in your MapReduce application(Distributed cache) entry i blogged about how to use Distributed Cache in Hadoop using command line option. But you can also have option of using DistributedCache API. You will have to use following steps to use DistributedCache programmatically In order to use it, first change your MapReduce Driver class to add job.addCacheFile()
  1. In order to use a file with DistributedCache API, it has to available on either hdfs:// or http:// URL, that is accessible to all the cluster members. So first step was to upload the file that you are interested in into HDFS, in my case i used following command to copy the GoeLite2-City.mmdb file to hdfs.
    
    hdfs dfs -copyFromLocal GeoLite2-City.mmdb /GeoLite2-City.mmdb
    
  2. Next step is to change the Driver class and add job.addCacheFile(new URI("hdfs://localhost:9000/GeoLite2-City.mmdb#GeoLite2-City.mmdb")); call, this call takes the hdfs url of the file that you just uploaded to HDFS and passes it to DistributedCache class. The #GeoLite2-City.mmdb is used here to tell Hadoop that it should create a symbolic link to this file
  3. Now in your Mapper class you can read the GeoLite2-City.mmdb using normal File API
When you use the distributed cache Hadoop first copies the file specified in the DistributedCache API on the machine executing task. You can view it by looking at the mapreduce temp directory like this.

Killing bad behaving mapreduce job

I was working on building this MapReduce program, and after submitting it i realized that i made a mistake and it was taking really long time to complete the job. So i decided to kill it. These are the steps that i followed First i did execute the mapred job -list command to get list of jobs that were in progress. The output of the list command gives you the job id Then you can use mapred job -kill job_1405432500430_0001 command to kill the job that your interested in
You can confirm the mapreduce job was actually killed by using the web console like this

Configure LogStash to read Apache HTTP Server logs and add GeoIP information in it.

LogStash is a tool that you can use for managing your logs. Basic idea is you configure logstash to read the log file, it enhances log records and then it writes those records to ElasticSearch. Then you can use Kibana to view your log files. I wanted to figure out where my web traffic is coming from, so i configured the LogStash server to read the HTTP server log, then used its geoip capability to find out the location of the request based on the ip of the request and store it in elastic search. This is how my logstash configuration looks like, before starting this i did download the GeoCity database from maxmind and configured LogStash to use it. Next i did start elasticsearch server on local machine to collect logs and used following command to start logstash server

java -jar logstash-1.3.2-flatjar.jar agent -f httpaccess.conf
Once logstash server was started i could see how it was parsing logs and posting them in elasticsearch. For example for the following log statement

129.143.71.36 - - [31/Aug/2011:08:35:17 -0700] "GET /favicon.ico HTTP/1.1" 200 3935 "-" "Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.10 (KHTML, like Gecko) Chrome/8.0.552.224 Safari/534.10"
I could see logstash converting it into following JSON before posting it into elasticsearch

Using third part jars and files in your MapReduce application(Distributed cache)

If you want to use a third party jar in your MapReduce program you have two options, one is to create a single jar with all dependencies and other is to use the hadoop distributed cache option. I wanted to play around with both these options, so i built this simple application in which i read the Standard Apache HTTP Server log and parse it to read the ip address request is coming from. Then i use the ip address and invoke the Geo IP lookup to find out what city, country that request came from. Once i have a city i am using it to count number of requests that came from particular city. You can download the source code for this project from here. This is how the mapper class for my map-reduce application looks like, In the setup() method of my class i am creating object of com.maxmind.geoip2.DatabaseReader. I am passing the GeoLite2-City.mmdb the geo ip database file to my reader. In order for this program to work my MapReduce program needs access to com.maxmind.geoip2.geoip2 along with its dependencies, it also needs access to GeoLite2-City.mmdb.

Creating single jar with all dependencies

In order for this method to work add following plugin in your maven build file like this, in this case the jar will get created with main class as com.spnotes.hadoop.logs.ApacheLogMRDriver Now you can build a single jar file by executing mvn clean compile assembly:single command. Copy the GeoLite2-City.mmdb to the hadoop cluster(You can package it inside jar but might not be a good idea). Now you can execute following command to execute this job on cluster

hadoop jar ApacheLogsMR-jar-with-dependencies.jar -files GeoLite2-City.mmdb  /apache.log /output/logs/apachelogs

Using Hadoop distributed cache

Second option you have is to create a jar only for your map reduce program and pass dependencies to hadoop using distributed cache. In this option, you should make sure that your Driver class extends Configured like this Copy the dependency jars as well as GeoLite2-City.mmdb file on to the same machine that has hadoop cluster. Then execute following command to pass both jar and file to the hadoop.

hadoop jar ApacheLogsMR.jar -libjars geoip2-0.7.1.jar,maxmind-db-0.3.2.jar,jackson-databind-2.2.3.jar,jackson-core-2.2.3.jar,jackson-annotations-2.2.3.jar  -files GeoLite2-City.mmdb  /apache.log /output/logs/apachelogs

How reading and writing of files in HDFS works

Read Path
  1. The client program starts with Hadoop library jar and copy of cluster configuration data, that specifies the location of the name node.
  2. The client begins by contact the node node indicating the file it wants to read.
  3. The name node will validate clients identity, either by simply trusting client or using authentication protocol such as Kerberos.
  4. The client identity is verified against the owner and permission of the file.
  5. Namenode responds to the client with the first block ID and the list of data nodes on which a copy of the block can be found, sorted by their distance to the client, Distance to the client is measured according to Hadoop's rack topology
  6. With the block IDS and datanode hostnames, the client can now contact the most appropriate datanode directly and read the block data it needs. This process repeats until all the blocks in the file have been read or the client closes the file stream.
Write Path
  1. Client makes a request to open a file for wringing using the Hadoop FileSystem APIs.
  2. A request is sent to the name node to create the file metadata if the user has the necessary permission to do so. However, it initially has no associated blocks.
  3. Namenode responds to the client indicating that the request was successful and it should start writing data.
  4. The client library sends request to name node asking set of datanodes to which data should be written, it gets a list from name node
  5. The client makes connection to first data node, which in turn makes connection to second and second datanode makes connection to third.
  6. The client starts writing data to first data node, the first data node writes data to disk as well as to the input stream pointing to second data node. The second data node writes the data the disk and writes to the connection pointing to third data node and so on.
  7. Once client is finished writing it indicates closing of the stream that flushes data and writes to disk.

HDFS Java Client

Hadoop provides you with the Java API that you can use to perform some of the commonly used file operations such as read, create a new file or append at the end of the existing file or search for files. I wanted to try these common operartions out so i built this HelloHDFS project, that you can download from here This is the main class that takes command line argument for operation name and file path and performs the operation. After downloading the source code for the project you can use following maven command to build a single jar that also contains all the dependencies in it.

mvn clean compile assembly:single
Once your jar is ready you can use it like this for reading file from hdfs with relative path

java -jar target/HelloHDFS-jar-with-dependencies.jar read aesop.txt
or fully qualified path

java -jar target/HelloHDFS-jar-with-dependencies.jar read hdfs://localhost/user/user/aesop.txt

HDFS Daemons

An HDFS cluster has two types of nodes operating in master-worker pattern
  • NameNode: Manages the filesystem's directory structure and meta data for all the files. This information is persisted on local disk in the form of 2 files
    1. fsimage This is master copy of the metadata for the file system.
    2. edits: This file stores changes(delta/modifications) made to the meta information. In new version of hadoop (I am using 2.4) there would be multiple edit files(per transaction) that get created which store the changes made to meta.
    In addition to this the data node also has mapping of blocks to the datanode where the block is persisted but, that information does not get persisted to the disk. Instead data ndoes send list of blocks that they have to Namenode on startup and Namenode keeps is in memory The name node filesystem metadata is served entirely from RAM for fast lookup and retrieval and thus places a cap on how much metadata the name node can handle.
  • Secondary namenode: The job of secondary namenode is to merge the copy of fsimage and edits file for primary Namenode. So the basic issue is its very CPU consuming to take the fsimage and apply all the edits to it, so that work is delegated to secondary namenode. The secondary namenode downloads the edits file from primary and applies/merges it with fsimage and then sends it back to primary.
  • DataNde: This is workhorse daemon that is responsible for storing and retrieving blocks of data. This daemon is also responsible for maintaining block report(List of blocks that are stored on that datanode). It sends a heart beat to Namenode at regular interval(1 hr) and as part of the heart beat it also sends block report
There are two ways to start the daemons necessary for HDFS one is you can start they individually using start <daemontype> ex. start namenode or you can start all of them using start-dfs

How to control replication for a file stored in HDFS

In Hadoop you can control how many replicas of your file gets created. I wanted to try that out, so i tried different options. First option is to set up the replication factor in hdfs-site.xml, the settings in hdfs-site.xml apply to all the files (globals ettings)
  • dfs.replication: Default block replication. The actual number of replications can be specified when the file is created. The default is used if replication is not specified in create time.
  • dfs.replication.max: Maximal block replication.
  • dfs.namenode.replication.min: Minimal block replication.
After configuring global replication settings, i did restart hdfs daemons and when i executed hdfs fsck on one of the files to see the effect of setting replication and this is the output i got
You could also use the command line tools to set particular value of replication factor by executing hdfs setrep command like this.

hdfs dfs -setrep 5 /user/user/aesop.txt
Then i could verify the effect of replication settings like this
Then the last option is to set replication factor programmatically

Setting replication factor for a file programmatically

I wanted to figure out how to set/change replication factor for a file stored in HDFS, so i built this sample program, which has 3 methods
  • read(): This method takes a file path as input and print its content to system console
  • write(): This method takes a file path as input and open it for writing. It takes user input and writes that to the HDFS file
  • setReplication(): This method takes file path and replication factor as input and sets the replication factor for the given file path
Once you build the jar you can execute it by using command like this

java -jar target/HelloHDFS-jar-with-dependencies.jar read <hdfsfile_path>
Ex. In my case i have aesop.txt at hdfs://localhost/user/user/aesop.txt so i can print it by using following command

java -jar target/HelloHDFS-jar-with-dependencies.jar read aesop.txt

Finding out the blocks used for storing file in HDFS

The HDFS stores files by breaking them into blocks of 64 MB in size (default block size but you can change it). It stores these blocks across one or more disks/machines, Unlike filesystem for a single disk, a file in HDFS that is smaller than single block does not occupy a fully block's worth of underlying storage. Map tasks in Map Reduce operate on one block at a time (InputSplit takes care of assigning different map task to each of teh block). Replication is handled at block level, what it means is HDFS will replicate the block to different machine and if one of the block is corrupted or one of the machine where the block is stored is down it can read that from different machine, in case of corrupted unreachable block HDFS will take care of replication of the block to bring the replication factor back to the normal level. Some applications may choose to set a high replication factor for blocks in popular file to spread the read load on the cluster I wanted to figure out how the file gets stored in HDFS(/user/user/aesop.txt file as example), so i tried these steps on my local hadoop single-node cluster.
  1. First i looked at my hadoop-site.xml to find out value of dfs.data.dir element, which points to where the data is stored. In my case its /home/user/data directory
    
    <configuration>
      <property>
             <name>dfs.replication</name>
             <value>1</value>
      </property>
     <property>
             <name>dfs.data.dir</name>
             <value>/home/user/data</value>
      </property> 
    </configuration>
    
  2. There is a file /user/user/aesop.txt stored in my HDFS and i wanted to see where the file is stored so i did execute hdfs fsck /user/user/aesop.txt -blocks -files command to get list of blocks where this file is located
    It is stored in BP-1574103969-127.0.1.1-1403309876533:blk_1073742129_1306
  3. When i looked under /home/user/data/current directory i saw BP-1574103969-127.0.1.1-1403309876533 which gives me first part of the block name, and it has bunch of subdirectories
  4. The next part was to find blk_1073742129_1306 which is second part of the block name so i did search for it without _1306 the last part and i found .meta file and a normal file that contains the actual content of aesop.txt, when i opened it i could see the content of the file like this
Note: I found this on my local hadoop, but havent seen this method in documentation so it might not work for you. But the good part is as long as your HDFS commands are working you dont need to worry about this.