- First i did create apachelog directory and in that i had to create job.properties file like this
-
Then i create workflow.xml file that looks like this, in this one thing to notice is
<file>GeoLite.mmdb#GeoLite2-City.mmdb</file>
, so basically i have file GeoLite.mmdb on the disk but i want to refer to it asGeoLite2-City.mmdb
in my program so that file element takes care of creating symlink - Then i copied all the required jar files in the lib folder and then this is how my directory structure looks like
- I used following command to copy the apachelog directory that has everything that my oozie job needs to the hdfs
hdfs dfs -put apachelog apachelog
- Last step is to invoke the oozie job by executing following command
oozie job -oozie http://localhost:11000/oozie -config job.properties -run
Creating Oozie workflow for mapreduce job that uses distributed cache
In the Using third part jars and files in your MapReduce application(Distributed cache) entry i blogged about how to create a MapReduce job that uses distributed cache for storing both required jar files and files for use in distributed cache. I wanted to figure out how to automate this mapreduce job using Apache Oozie so i followed these steps
Using Apache Oozie for automating streaming map-reduce job
In the
WordCount MapReduce program using Hadoop streaming and python i talked about how to create a Streaming map-reduce job using python. I wanted to figure out how to automate that program using Oozie workflow so i followed these steps
- First step was to create a folder called streaming on my local machine and copying of mapper.py, reducer.py into the streaming folder, i also create the place holder for job.properties and workflow.xml
- Next i did create a job.properties file like this
Now this job.properties is quite similar to the job.properties for java mapreduce job, only difference is you must set
oozie.use.system.libpath=true
, by default the streaming related jars are not included in the classpath, so unless you set that value to true you will get following error2014-07-23 06:15:13,170 WARN org.apache.hadoop.mapred.Child: Error running child java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.streaming.Pi peMapRunner not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1649) at org.apache.hadoop.mapred.JobConf.getMapRunnerClass(JobConf.java:1010) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:413) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332) at org.apache.hadoop.mapred.Child$4.run(Child.java:268) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) at org.apache.hadoop.mapred.Child.main(Child.java:262) Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.streaming.PipeMapRunner not f ound at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1617) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1641) ... 8 more Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.streaming.PipeMapRunner not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1523) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1615) ... 9 more 2014-07-23 06:15:13,175 INFO org.apache.hadoop.mapred.Task: Runnning cleanup for the task
-
Next step in the process is to create workflow.xml file like this, make sure to add
<file>mapper.py#mapper.py</file>
element in the workflow.xml, which takes care of putting the mapper.py and reducer.py in the sharedlib and creating symbolic link to these two files. -
Upload the streaming folder with all your changes on hdfs by executing following command
hdfs dfs -put streaming streaming
- You can trigger the oozie workflow by executing following command
oozie job -oozie http://localhost:11000/oozie -config streaming/job.properties -run
Using Apache Oozie to execute MapReduce jobs
I wanted to learn about how to automate MapReduce job using Oozie, so i decide to create Oozie workflow to invoke WordCount(HelloWorld) MapReduce program. I had to follow these steps
- FIrst thing that i did was to download the WordCount program source code by executing
This program does have maven script for building executable jar, so i usedgit clone https://github.com/sdpatil/HadoopWordCount3
mvn clean package
command to build Hadoop jar. -
After that i tried executing the program manually by using following following command
hadoop jar target/HadoopWordCount.jar sorttest.txt output/wordcount
- Now in order to use Oozie workflow you will have to create a particular folder structure on your machine
wordcount -- job.properties -- workflow.xml -- lib -- HadoopWordCount.jar
-
In the workcount folder create job.properties file like this, This file lets you pass parameters to your oozie workflow. Value of
nameNode
andjobTracker
represent the name node and job tracker location. In my case i am using cloudera vm with single ndoe so both these properties point to localhost. The value ofoozie.wf.application.path
is equal to HDFS path where you uploaded the wordcount folder created in step 3 -
Next define your Apache oozie workflow.xml file like this. In my case the workflow has single step which is to execute mapreduce job. I am
- mapred.mapper.new-api & mapred.reducer.new-api: Set this property to true if your using the new MapReduce API based on
org.apache.hadoop.mapreduce.*
classes - mapreduce.map.class: The fully qualified name of your mapper class
- mapreduce.reduce.class: The fully qualified name of your reducer class
- mapred.output.key.class: Fully qualified name of the output key class. This is same as parameter to
job.setOutputKeyClass()
in your driver class - mapred.output.value.class: Fully qualified name of the output value class. This is same as parameter to
job.setOutputValueClass()
in your driver class - mapred.input.dir: Location of your input file in my case i have sorttext.txt in hdfs://localhost/user/cloudera directory
- mapred.output.dir:Location of output file that will get generated. In my case i want output to go to hdfs://localhost/user/cloudera/output/wordcount directory
- mapred.mapper.new-api & mapred.reducer.new-api: Set this property to true if your using the new MapReduce API based on
-
Once your oozie workflow is ready upload the wordcount folder in HDFS by executing following command
hdfs dfs -put oozie wordcount
-
If it runs successfully you should see output generated inNow run your oozie workflow by executing following command from your wordcount directory oozie job -oozie http://localhost:11000/oozie -config job.properties -run
hdfs://localhost/user/cloudera/output/wordcount
directory
Enabling Oozie console on Cloudera VM 4.4.0 and executing examples
I am trying to learn about Apache Oozie, so i wanted to figure out how to use it in Cloudera 4.4.0 VM. When you go to the Oozie web console it shows a message saying that the Console is disabled. In order to enable the console i had to follow these steps
- Go to your Cloudera Manager, in that i went to the oozie configuration screen and i did check the
Enable Oozie Server Web Console
screen like this. As you can see in the description it says install ExtJS2.2 in/usr/lib/oozie/libext
- Next i did go to
/usr/lib/oozie/libext
directory and executed following command to download the ext-2.2.zip.
Since i am using CDH 4.4 i had to executewget 'http://extjs.com/deploy/ext-2.2.zip'
unzip ext-2.2.zip
to unzip the ext-2.2.zip - Last step was to restart the oozie service and now i could see the Oozie web console
- First thing for me was to find the
oozie-examples.tar.gz
file on my vm
I found it underfind / -name oozie-examples.tar.gz
/usr/share/doc/oozie-3.3.2+92/
directory. So i did untar it usingtar xvf oozie-examples.tar.gz
-
Then i had to make change in the job.properties to change value of namenode and jobTracker from localhost to localhost.localdomain get rid of
Error: E0901 : E0901: Namenode [localhost:8020] not allowed, not in Oozies whitelist
error.nameNode=hdfs://localhost.localdomain:8020 jobTracker=localhost.localdomain:8021 queueName=default examplesRoot=examples oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/map-reduce outputDir=map-reduce
- After making changes in job.properties i did upload the examples folder to HDFS using following command
hdfs dfs -put examples examples
- The last step in the process was to actually run the mapreduce job in oozie by executing following command
oozie job -oozie http://localhost:11000/oozie -config examples/apps/map-reduce/job.properties -run
- Once the job was started i could see the progress using Oozie web console like this
Where are MapReduce logs when your using yarn framework
For last couple of months i have been using Yarn framework for running my mapreduce jobs. Normally using Yarn is transparent so i did not have to do any thing different but just change my mapred-site.xml file to set value of
mapreduce.framework.name
to yarn
like this.
But YARN affects how the logs and job history gets stored. For example if your using traditional map reduce framework you can go to http://localhost:50030
to look at the job and task history and also access the logs generated by mapreduce framework. In case of Yarn you will have to go to http://localhost:8088/cluster
and it will take you to Resource Manager Home page like this, there you should see list of applications and then click on the name of the application and to get more details
When you try to look at the logs for application, it takes you the nodemanager home page like this
Since i am working on single node cluster i like to go to the hadoop log directory and there under userlogs directory i can see log folders for each application. The application folder is subdivided into container folder one for mapper task one for reducer task and one for driver task and each of the container folders has one file for stdout, stderr and syslog that contains more output. If you have any System.out.println()
in your mapper or reducer class you should find the appropriate container folder and stdout file in that container should have output that you generated using System.out.println()
Using counters in MapReduce program
While developing mapreduce jobs you might want to keep counters for some conditions that you find. For example in Map Reduce job that uses GeoIP to counts number of requests from particular city, i want to check how many requests came from US, India vs. other countries. Also there are cases when you try to find out location of a IP address and if the IP is not in the GeoIP database it throws error. I wanted to see how many ips are not found in DB.
In order to do that i changed the ApacheLogMapper.java like this
I had to make following changes in the ApacheLogMapper.java
- Declare Enum: On line 24 i had to create a Enum for GEO, i am declaring 4 different counters in it one for ERRORS, 1 for ips in US, 1 for ips in India and 1 for everything else
- Count countries: On line 67 i am checking if the location of the ip is in USA, if yes i am increasing counter for USA by 1 using
context.getCounter(GEO.US).increment(1)
- Counting errors: Whenever GeoIP API is not able to find the ip in GeoIP database it throws
GeoIp2Exception
, i am catching that exception and using it as opportunity to increment ERROR count by 1 usingcontext.getCounter(GEO.ERROR).increment(1);
Using DistributedCache with MapReduce job
In the Using third part jars and files in your MapReduce application(Distributed cache) entry i blogged about how to use Distributed Cache in Hadoop using command line option. But you can also have option of using DistributedCache API. You will have to use following steps to use DistributedCache programmatically
In order to use it, first change your MapReduce Driver class to add
job.addCacheFile()
- In order to use a file with DistributedCache API, it has to available on either
hdfs:// or http://
URL, that is accessible to all the cluster members. So first step was to upload the file that you are interested in into HDFS, in my case i used following command to copy the GoeLite2-City.mmdb file to hdfs.hdfs dfs -copyFromLocal GeoLite2-City.mmdb /GeoLite2-City.mmdb
-
Next step is to change the Driver class and add
job.addCacheFile(new URI("hdfs://localhost:9000/GeoLite2-City.mmdb#GeoLite2-City.mmdb"));
call, this call takes the hdfs url of the file that you just uploaded to HDFS and passes it to DistributedCache class. The#GeoLite2-City.mmdb
is used here to tell Hadoop that it should create a symbolic link to this file -
Now in your Mapper class you can read the
GeoLite2-City.mmdb
using normal File API
Killing bad behaving mapreduce job
I was working on building this MapReduce program, and after submitting it i realized that i made a mistake and it was taking really long time to complete the job. So i decided to kill it. These are the steps that i followed
First i did execute the
mapred job -list
command to get list of jobs that were in progress. The output of the list command gives you the job id
Then you can use mapred job -kill job_1405432500430_0001
command to kill the job that your interested in
You can confirm the mapreduce job was actually killed by using the web console like this
Configure LogStash to read Apache HTTP Server logs and add GeoIP information in it.
LogStash is a tool that you can use for managing your logs. Basic idea is you configure logstash to read the log file, it enhances log records and then it writes those records to ElasticSearch. Then you can use Kibana to view your log files.
I wanted to figure out where my web traffic is coming from, so i configured the LogStash server to read the HTTP server log, then used its geoip capability to find out the location of the request based on the ip of the request and store it in elastic search.
This is how my logstash configuration looks like, before starting this i did download the GeoCity database from maxmind and configured LogStash to use it.
Next i did start elasticsearch server on local machine to collect logs and used following command to start logstash server
java -jar logstash-1.3.2-flatjar.jar agent -f httpaccess.conf
Once logstash server was started i could see how it was parsing logs and posting them in elasticsearch. For example for the following log statement
129.143.71.36 - - [31/Aug/2011:08:35:17 -0700] "GET /favicon.ico HTTP/1.1" 200 3935 "-" "Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.10 (KHTML, like Gecko) Chrome/8.0.552.224 Safari/534.10"
I could see logstash converting it into following JSON before posting it into elasticsearch
Using third part jars and files in your MapReduce application(Distributed cache)
If you want to use a third party jar in your MapReduce program you have two options, one is to create a single jar with all dependencies and other is to use the hadoop distributed cache option.
I wanted to play around with both these options, so i built this simple application in which i read the Standard Apache HTTP Server log and parse it to read the ip address request is coming from. Then i use the ip address and invoke the Geo IP lookup to find out what city, country that request came from. Once i have a city i am using it to count number of requests that came from particular city. You can download the source code for this project from here. This is how the mapper class for my map-reduce application looks like, In the
setup()
method of my class i am creating object of com.maxmind.geoip2.DatabaseReader
. I am passing the GeoLite2-City.mmdb
the geo ip database file to my reader.
In order for this program to work my MapReduce program needs access to com.maxmind.geoip2.geoip2
along with its dependencies, it also needs access to GeoLite2-City.mmdb
.
Creating single jar with all dependencies
In order for this method to work add following plugin in your maven build file like this, in this case the jar will get created with main class ascom.spnotes.hadoop.logs.ApacheLogMRDriver
Now you can build a single jar file by executing mvn clean compile assembly:single
command. Copy the GeoLite2-City.mmdb
to the hadoop cluster(You can package it inside jar but might not be a good idea). Now you can execute following command to execute this job on cluster
hadoop jar ApacheLogsMR-jar-with-dependencies.jar -files GeoLite2-City.mmdb /apache.log /output/logs/apachelogs
Using Hadoop distributed cache
Second option you have is to create a jar only for your map reduce program and pass dependencies to hadoop using distributed cache. In this option, you should make sure that your Driver class extendsConfigured
like this
Copy the dependency jars as well as GeoLite2-City.mmdb
file on to the same machine that has hadoop cluster. Then execute following command to pass both jar and file to the hadoop.
hadoop jar ApacheLogsMR.jar -libjars geoip2-0.7.1.jar,maxmind-db-0.3.2.jar,jackson-databind-2.2.3.jar,jackson-core-2.2.3.jar,jackson-annotations-2.2.3.jar -files GeoLite2-City.mmdb /apache.log /output/logs/apachelogs
How reading and writing of files in HDFS works
Read Path
- The client program starts with Hadoop library jar and copy of cluster configuration data, that specifies the location of the name node.
- The client begins by contact the node node indicating the file it wants to read.
- The name node will validate clients identity, either by simply trusting client or using authentication protocol such as Kerberos.
- The client identity is verified against the owner and permission of the file.
- Namenode responds to the client with the first block ID and the list of data nodes on which a copy of the block can be found, sorted by their distance to the client, Distance to the client is measured according to Hadoop's rack topology
- With the block IDS and datanode hostnames, the client can now contact the most appropriate datanode directly and read the block data it needs. This process repeats until all the blocks in the file have been read or the client closes the file stream.
- Client makes a request to open a file for wringing using the Hadoop FileSystem APIs.
- A request is sent to the name node to create the file metadata if the user has the necessary permission to do so. However, it initially has no associated blocks.
- Namenode responds to the client indicating that the request was successful and it should start writing data.
- The client library sends request to name node asking set of datanodes to which data should be written, it gets a list from name node
- The client makes connection to first data node, which in turn makes connection to second and second datanode makes connection to third.
- The client starts writing data to first data node, the first data node writes data to disk as well as to the input stream pointing to second data node. The second data node writes the data the disk and writes to the connection pointing to third data node and so on.
- Once client is finished writing it indicates closing of the stream that flushes data and writes to disk.
HDFS Java Client
Hadoop provides you with the Java API that you can use to perform some of the commonly used file operations such as read, create a new file or append at the end of the existing file or search for files. I wanted to try these common operartions out so i built this HelloHDFS project, that you can download from here
This is the main class that takes command line argument for operation name and file path and performs the operation.
After downloading the source code for the project you can use following maven command to build a single jar that also contains all the dependencies in it.
mvn clean compile assembly:single
Once your jar is ready you can use it like this for reading file from hdfs with relative path
java -jar target/HelloHDFS-jar-with-dependencies.jar read aesop.txt
or fully qualified path
java -jar target/HelloHDFS-jar-with-dependencies.jar read hdfs://localhost/user/user/aesop.txt
HDFS Daemons
An HDFS cluster has two types of nodes operating in master-worker pattern
- NameNode: Manages the filesystem's directory structure and meta data for all the files. This information is persisted on local disk in the form of 2 files
- fsimage This is master copy of the metadata for the file system.
- edits: This file stores changes(delta/modifications) made to the meta information. In new version of hadoop (I am using 2.4) there would be multiple edit files(per transaction) that get created which store the changes made to meta.
- Secondary namenode: The job of secondary namenode is to merge the copy of fsimage and edits file for primary Namenode. So the basic issue is its very CPU consuming to take the fsimage and apply all the edits to it, so that work is delegated to secondary namenode. The secondary namenode downloads the edits file from primary and applies/merges it with fsimage and then sends it back to primary.
- DataNde: This is workhorse daemon that is responsible for storing and retrieving blocks of data. This daemon is also responsible for maintaining block report(List of blocks that are stored on that datanode). It sends a heart beat to Namenode at regular interval(1 hr) and as part of the heart beat it also sends block report
start <daemontype> ex. start namenode
or you can start all of them using start-dfs
How to control replication for a file stored in HDFS
In Hadoop you can control how many replicas of your file gets created. I wanted to try that out, so i tried different options.
First option is to set up the replication factor in hdfs-site.xml, the settings in hdfs-site.xml apply to all the files (globals ettings)
- dfs.replication: Default block replication. The actual number of replications can be specified when the file is created. The default is used if replication is not specified in create time.
- dfs.replication.max: Maximal block replication.
- dfs.namenode.replication.min: Minimal block replication.
hdfs setrep
command like this.
hdfs dfs -setrep 5 /user/user/aesop.txt
Then i could verify the effect of replication settings like this
Then the last option is to set replication factor programmatically
Setting replication factor for a file programmatically
I wanted to figure out how to set/change replication factor for a file stored in HDFS, so i built this sample program, which has 3 methods
- read(): This method takes a file path as input and print its content to system console
- write(): This method takes a file path as input and open it for writing. It takes user input and writes that to the HDFS file
- setReplication(): This method takes file path and replication factor as input and sets the replication factor for the given file path
java -jar target/HelloHDFS-jar-with-dependencies.jar read <hdfsfile_path>
Ex. In my case i have aesop.txt at hdfs://localhost/user/user/aesop.txt so i can print it by using following command
java -jar target/HelloHDFS-jar-with-dependencies.jar read aesop.txt