Using Apache Oozie for automating streaming map-reduce job

In the WordCount MapReduce program using Hadoop streaming and python i talked about how to create a Streaming map-reduce job using python. I wanted to figure out how to automate that program using Oozie workflow so i followed these steps
  1. First step was to create a folder called streaming on my local machine and copying of mapper.py, reducer.py into the streaming folder, i also create the place holder for job.properties and workflow.xml
  2. Next i did create a job.properties file like this Now this job.properties is quite similar to the job.properties for java mapreduce job, only difference is you must set oozie.use.system.libpath=true, by default the streaming related jars are not included in the classpath, so unless you set that value to true you will get following error
    
    2014-07-23 06:15:13,170 WARN org.apache.hadoop.mapred.Child: Error running child
    java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.streaming.Pi
    peMapRunner not found
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1649)
     at org.apache.hadoop.mapred.JobConf.getMapRunnerClass(JobConf.java:1010)
     at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:413)
     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
     at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
     at java.security.AccessController.doPrivileged(Native Method)
     at javax.security.auth.Subject.doAs(Subject.java:396)
     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
     at org.apache.hadoop.mapred.Child.main(Child.java:262)
    Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.streaming.PipeMapRunner not f
    ound
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1617)
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1641)
     ... 8 more
    Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.streaming.PipeMapRunner not found
     at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1523)
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1615)
     ... 9 more
    2014-07-23 06:15:13,175 INFO org.apache.hadoop.mapred.Task: Runnning cleanup for the task
    
  3. Next step in the process is to create workflow.xml file like this, make sure to add <file>mapper.py#mapper.py</file> element in the workflow.xml, which takes care of putting the mapper.py and reducer.py in the sharedlib and creating symbolic link to these two files.
  4. Upload the streaming folder with all your changes on hdfs by executing following command
    
    hdfs dfs -put streaming streaming
    
  5. You can trigger the oozie workflow by executing following command
    
    oozie job -oozie http://localhost:11000/oozie -config streaming/job.properties -run
    

Using Apache Oozie to execute MapReduce jobs

I wanted to learn about how to automate MapReduce job using Oozie, so i decide to create Oozie workflow to invoke WordCount(HelloWorld) MapReduce program. I had to follow these steps
  1. FIrst thing that i did was to download the WordCount program source code by executing
    
    git clone https://github.com/sdpatil/HadoopWordCount3
    
    This program does have maven script for building executable jar, so i used mvn clean package command to build Hadoop jar.
  2. After that i tried executing the program manually by using following following command
    
    hadoop jar target/HadoopWordCount.jar sorttest.txt output/wordcount
    
  3. Now in order to use Oozie workflow you will have to create a particular folder structure on your machine
    
    wordcount
       -- job.properties
       -- workflow.xml
       -- lib
             -- HadoopWordCount.jar  
    
  4. In the workcount folder create job.properties file like this, This file lets you pass parameters to your oozie workflow. Value of nameNode and jobTracker represent the name node and job tracker location. In my case i am using cloudera vm with single ndoe so both these properties point to localhost. The value of oozie.wf.application.path is equal to HDFS path where you uploaded the wordcount folder created in step 3
  5. Next define your Apache oozie workflow.xml file like this. In my case the workflow has single step which is to execute mapreduce job. I am
    • mapred.mapper.new-api & mapred.reducer.new-api: Set this property to true if your using the new MapReduce API based on org.apache.hadoop.mapreduce.* classes
    • mapreduce.map.class: The fully qualified name of your mapper class
    • mapreduce.reduce.class: The fully qualified name of your reducer class
    • mapred.output.key.class: Fully qualified name of the output key class. This is same as parameter to job.setOutputKeyClass() in your driver class
    • mapred.output.value.class: Fully qualified name of the output value class. This is same as parameter to job.setOutputValueClass() in your driver class
    • mapred.input.dir: Location of your input file in my case i have sorttext.txt in hdfs://localhost/user/cloudera directory
    • mapred.output.dir:Location of output file that will get generated. In my case i want output to go to hdfs://localhost/user/cloudera/output/wordcount directory
  6. Once your oozie workflow is ready upload the wordcount folder in HDFS by executing following command
    
    hdfs dfs -put oozie wordcount
    
  7. 
    Now run your oozie workflow by executing following command from your wordcount directory
    oozie job -oozie http://localhost:11000/oozie -config job.properties -run
    
    If it runs successfully you should see output generated in hdfs://localhost/user/cloudera/output/wordcount directory

Enabling Oozie console on Cloudera VM 4.4.0 and executing examples

I am trying to learn about Apache Oozie, so i wanted to figure out how to use it in Cloudera 4.4.0 VM. When you go to the Oozie web console it shows a message saying that the Console is disabled. In order to enable the console i had to follow these steps
  1. Go to your Cloudera Manager, in that i went to the oozie configuration screen and i did check the Enable Oozie Server Web Console screen like this. As you can see in the description it says install ExtJS2.2 in /usr/lib/oozie/libext
  2. Next i did go to /usr/lib/oozie/libext directory and executed following command to download the ext-2.2.zip.
    
    wget 'http://extjs.com/deploy/ext-2.2.zip'
    
    Since i am using CDH 4.4 i had to execute unzip ext-2.2.zip to unzip the ext-2.2.zip
  3. Last step was to restart the oozie service and now i could see the Oozie web console
Executing oozie examples After the Oozie console was enabled i wanted to execute oozie example to test out my installation so i followed these steps
  1. First thing for me was to find the oozie-examples.tar.gz file on my vm
    
    find / -name oozie-examples.tar.gz
    
    I found it under /usr/share/doc/oozie-3.3.2+92/ directory. So i did untar it using tar xvf oozie-examples.tar.gz
  2. Then i had to make change in the job.properties to change value of namenode and jobTracker from localhost to localhost.localdomain get rid of Error: E0901 : E0901: Namenode [localhost:8020] not allowed, not in Oozies whitelist error.
    
    nameNode=hdfs://localhost.localdomain:8020
    jobTracker=localhost.localdomain:8021
    queueName=default
    examplesRoot=examples
    
    oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/map-reduce
    outputDir=map-reduce
    
  3. After making changes in job.properties i did upload the examples folder to HDFS using following command
    
    hdfs dfs -put examples examples
    
  4. The last step in the process was to actually run the mapreduce job in oozie by executing following command
    
    oozie job -oozie http://localhost:11000/oozie -config examples/apps/map-reduce/job.properties -run
    
  5. Once the job was started i could see the progress using Oozie web console like this

Where are MapReduce logs when your using yarn framework

For last couple of months i have been using Yarn framework for running my mapreduce jobs. Normally using Yarn is transparent so i did not have to do any thing different but just change my mapred-site.xml file to set value of mapreduce.framework.name to yarn like this. But YARN affects how the logs and job history gets stored. For example if your using traditional map reduce framework you can go to http://localhost:50030 to look at the job and task history and also access the logs generated by mapreduce framework. In case of Yarn you will have to go to http://localhost:8088/cluster and it will take you to Resource Manager Home page like this, there you should see list of applications and then click on the name of the application and to get more details
When you try to look at the logs for application, it takes you the nodemanager home page like this
Since i am working on single node cluster i like to go to the hadoop log directory and there under userlogs directory i can see log folders for each application. The application folder is subdivided into container folder one for mapper task one for reducer task and one for driver task and each of the container folders has one file for stdout, stderr and syslog that contains more output. If you have any System.out.println() in your mapper or reducer class you should find the appropriate container folder and stdout file in that container should have output that you generated using System.out.println()

Using counters in MapReduce program

While developing mapreduce jobs you might want to keep counters for some conditions that you find. For example in Map Reduce job that uses GeoIP to counts number of requests from particular city, i want to check how many requests came from US, India vs. other countries. Also there are cases when you try to find out location of a IP address and if the IP is not in the GeoIP database it throws error. I wanted to see how many ips are not found in DB. In order to do that i changed the ApacheLogMapper.java like this I had to make following changes in the ApacheLogMapper.java
  1. Declare Enum: On line 24 i had to create a Enum for GEO, i am declaring 4 different counters in it one for ERRORS, 1 for ips in US, 1 for ips in India and 1 for everything else
  2. Count countries: On line 67 i am checking if the location of the ip is in USA, if yes i am increasing counter for USA by 1 using context.getCounter(GEO.US).increment(1)
  3. Counting errors: Whenever GeoIP API is not able to find the ip in GeoIP database it throws GeoIp2Exception, i am catching that exception and using it as opportunity to increment ERROR count by 1 using context.getCounter(GEO.ERROR).increment(1);
After executing the MapReduce program i could see the different counters calculated by Hadoop on the console output like this

Using DistributedCache with MapReduce job

In the Using third part jars and files in your MapReduce application(Distributed cache) entry i blogged about how to use Distributed Cache in Hadoop using command line option. But you can also have option of using DistributedCache API. You will have to use following steps to use DistributedCache programmatically In order to use it, first change your MapReduce Driver class to add job.addCacheFile()
  1. In order to use a file with DistributedCache API, it has to available on either hdfs:// or http:// URL, that is accessible to all the cluster members. So first step was to upload the file that you are interested in into HDFS, in my case i used following command to copy the GoeLite2-City.mmdb file to hdfs.
    
    hdfs dfs -copyFromLocal GeoLite2-City.mmdb /GeoLite2-City.mmdb
    
  2. Next step is to change the Driver class and add job.addCacheFile(new URI("hdfs://localhost:9000/GeoLite2-City.mmdb#GeoLite2-City.mmdb")); call, this call takes the hdfs url of the file that you just uploaded to HDFS and passes it to DistributedCache class. The #GeoLite2-City.mmdb is used here to tell Hadoop that it should create a symbolic link to this file
  3. Now in your Mapper class you can read the GeoLite2-City.mmdb using normal File API
When you use the distributed cache Hadoop first copies the file specified in the DistributedCache API on the machine executing task. You can view it by looking at the mapreduce temp directory like this.

Killing bad behaving mapreduce job

I was working on building this MapReduce program, and after submitting it i realized that i made a mistake and it was taking really long time to complete the job. So i decided to kill it. These are the steps that i followed First i did execute the mapred job -list command to get list of jobs that were in progress. The output of the list command gives you the job id Then you can use mapred job -kill job_1405432500430_0001 command to kill the job that your interested in
You can confirm the mapreduce job was actually killed by using the web console like this

Configure LogStash to read Apache HTTP Server logs and add GeoIP information in it.

LogStash is a tool that you can use for managing your logs. Basic idea is you configure logstash to read the log file, it enhances log records and then it writes those records to ElasticSearch. Then you can use Kibana to view your log files. I wanted to figure out where my web traffic is coming from, so i configured the LogStash server to read the HTTP server log, then used its geoip capability to find out the location of the request based on the ip of the request and store it in elastic search. This is how my logstash configuration looks like, before starting this i did download the GeoCity database from maxmind and configured LogStash to use it. Next i did start elasticsearch server on local machine to collect logs and used following command to start logstash server

java -jar logstash-1.3.2-flatjar.jar agent -f httpaccess.conf
Once logstash server was started i could see how it was parsing logs and posting them in elasticsearch. For example for the following log statement

129.143.71.36 - - [31/Aug/2011:08:35:17 -0700] "GET /favicon.ico HTTP/1.1" 200 3935 "-" "Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.10 (KHTML, like Gecko) Chrome/8.0.552.224 Safari/534.10"
I could see logstash converting it into following JSON before posting it into elasticsearch

Using third part jars and files in your MapReduce application(Distributed cache)

If you want to use a third party jar in your MapReduce program you have two options, one is to create a single jar with all dependencies and other is to use the hadoop distributed cache option. I wanted to play around with both these options, so i built this simple application in which i read the Standard Apache HTTP Server log and parse it to read the ip address request is coming from. Then i use the ip address and invoke the Geo IP lookup to find out what city, country that request came from. Once i have a city i am using it to count number of requests that came from particular city. You can download the source code for this project from here. This is how the mapper class for my map-reduce application looks like, In the setup() method of my class i am creating object of com.maxmind.geoip2.DatabaseReader. I am passing the GeoLite2-City.mmdb the geo ip database file to my reader. In order for this program to work my MapReduce program needs access to com.maxmind.geoip2.geoip2 along with its dependencies, it also needs access to GeoLite2-City.mmdb.

Creating single jar with all dependencies

In order for this method to work add following plugin in your maven build file like this, in this case the jar will get created with main class as com.spnotes.hadoop.logs.ApacheLogMRDriver Now you can build a single jar file by executing mvn clean compile assembly:single command. Copy the GeoLite2-City.mmdb to the hadoop cluster(You can package it inside jar but might not be a good idea). Now you can execute following command to execute this job on cluster

hadoop jar ApacheLogsMR-jar-with-dependencies.jar -files GeoLite2-City.mmdb  /apache.log /output/logs/apachelogs

Using Hadoop distributed cache

Second option you have is to create a jar only for your map reduce program and pass dependencies to hadoop using distributed cache. In this option, you should make sure that your Driver class extends Configured like this Copy the dependency jars as well as GeoLite2-City.mmdb file on to the same machine that has hadoop cluster. Then execute following command to pass both jar and file to the hadoop.

hadoop jar ApacheLogsMR.jar -libjars geoip2-0.7.1.jar,maxmind-db-0.3.2.jar,jackson-databind-2.2.3.jar,jackson-core-2.2.3.jar,jackson-annotations-2.2.3.jar  -files GeoLite2-City.mmdb  /apache.log /output/logs/apachelogs

How reading and writing of files in HDFS works

Read Path
  1. The client program starts with Hadoop library jar and copy of cluster configuration data, that specifies the location of the name node.
  2. The client begins by contact the node node indicating the file it wants to read.
  3. The name node will validate clients identity, either by simply trusting client or using authentication protocol such as Kerberos.
  4. The client identity is verified against the owner and permission of the file.
  5. Namenode responds to the client with the first block ID and the list of data nodes on which a copy of the block can be found, sorted by their distance to the client, Distance to the client is measured according to Hadoop's rack topology
  6. With the block IDS and datanode hostnames, the client can now contact the most appropriate datanode directly and read the block data it needs. This process repeats until all the blocks in the file have been read or the client closes the file stream.
Write Path
  1. Client makes a request to open a file for wringing using the Hadoop FileSystem APIs.
  2. A request is sent to the name node to create the file metadata if the user has the necessary permission to do so. However, it initially has no associated blocks.
  3. Namenode responds to the client indicating that the request was successful and it should start writing data.
  4. The client library sends request to name node asking set of datanodes to which data should be written, it gets a list from name node
  5. The client makes connection to first data node, which in turn makes connection to second and second datanode makes connection to third.
  6. The client starts writing data to first data node, the first data node writes data to disk as well as to the input stream pointing to second data node. The second data node writes the data the disk and writes to the connection pointing to third data node and so on.
  7. Once client is finished writing it indicates closing of the stream that flushes data and writes to disk.