WordCount program built using Apache Spark in Java

In the WordCount program writtten using Spark framework written in python language entry i talked about how to create WordCount program using Apache Spark in python language. Now i wanted to try building same program using Java, so i followed these steps. You can download the full code from here This is how the Java class looks like, it has 4 anonymous classes
  1. file.flatMap(new FlatMapFunction() {}): This class gets called for every line in the input file, it splits the input line and return list of words in the line
  2. words.filter(new Function() {}): This class gets the list of words and checks if the word is actually a word or just a blank space. My input file has some blank spaces so i used this filter (Part of the reason was to try out Filters) to remove the words which are just blank spaces
  3. words.mapToPair(new PairFunction() {}): This class takes list of words as input and converts into Tuple2 with word as key and 1 as value
  4. wordToCountMap.reduceByKey(new Function2() {}): This class gets called with running total and count as input and only thing that is does is adds the current count to running total
This is how the maven build script for my program looks like, i had to fight through couple of issues to build this maven, first one is by default when you add org.apache.spark.spark-core_2.10 dependency it includes HDFS 1.0.4 jars, and in my case i have Hadoop 2.4 server so i got following error, you get this error even if you include HDFS related jars because now suddenly you have both Hadoop 2.4 and Hadoop 1.0.4 jars so you have to ask maven to exclude hadoop client included by org.apache.spark.spark-core_2.10

14/08/25 12:16:50 WARN snappy.LoadSnappy: Snappy native library not loaded
Exception in thread "main" org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4
 at org.apache.hadoop.ipc.Client.call(Client.java:1070)
 at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
 at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source)
 at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
 at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
 at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
 at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:238)
 at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:203)
 at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
 at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
 at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
 at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
 at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.FilteredRDD.getPartitions(FilteredRDD.scala:29)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:59)
 at org.apache.spark.api.java.JavaPairRDD.reduceByKey(JavaPairRDD.scala:385)
 at com.spnotes.spark.WordCount2.main(WordCount2.java:62)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
Once that part is done, i ran into issue because of 2 different versions of Jetty getting included Exception in thread "main" java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package so i had to add Jetty related jars exclusion filter in org.apache.hadoop.hadoop-common

How to use ElasticSearch as input for MapReduce program

In the Saving complex object in elasticsearch as output of MapReduce program entry, i talked about how to use ElasticSearch for storing output of the MapReduce job. In that blog i was creating Contact records that look like this in elasticSearch

         "addressLine1":"1 Main Street",
I wanted to figure out how to use ElasticSearch as input for MapReduce program, so i decided to create a MapReduce program that reads the contact Index and generates output on how many players are coming from a city. You can download the sample program from here This is how my MapReduce program looks like, you can run the driver program with 2 arguments ex. hadoop/contact file:///home/user/output/ first is name of the ElasticSearch Index/type and second is the output directory where the output will get written. This program has 3 main components
  1. MRInputDriver: In the Driver program you have to set es.nodes entry pointing to address of your elasticsearch installation and value of es.resource is name of the ElasticSearch index/type name. Then i am setting job.setInputFormatClass(EsInputFormat.class);, which sets EsInputFormat class as the input reader, it takes care of reading the records from ElasticSearch
  2. MRInputMapper: The Mapper class sets Object as value of both Key and Value type. ElasticSearch Hadoop framework reads the record from ElasticSearch and passes id as key(Text) and the content of value is object of MapWritable class that represents the record stored in elasticsearch. Once i have the value, i am reading address from it and mapper writes City name as key and value 1.
  3. MRInputReducer: The reducer is pretty simple it gets called with name of the city as key and Iterable of values, this is very similar to reducer in WordCount.
After running the program i could see output being generated like this

Bangalore 2
Delhi 1
Mumbai 1
Ranchi 1

Saving complex object in elasticsearch as output of MapReduce program

In the Using ElasticSearch to store output of MapReduce program i built a sample MapReduce program that writes output to the ElasticSearch. I wanted to figure out how to write output of MapReduce job into elasticseach so that it creates complex JSON structure which has embedded object and uses date type. I decided to create this sample program which reads .csv file and generates contact records in elasticsearch, which look like this . You can download sample code for this project from here I followed these steps for creating the program.
  1. First thing i did was create hadoop/contact index in ElasticSearch by using following mapping. You can create the index by making HTTP POST call to http://localhost:9200/hadoop
  2. Once the index was created i created a Contact.java object which looks like this As you can see this is a simple POJO which has list of Address objects which is another pojo. It also uses CustomDateSerializer as custom date serializer
  3. Next step was to create CustomDateSerializer.java which is a class that has both the Mapper and the MapReducer driver class.
    • ContactImportMapper: This is a mapper class that gets one line from Contact.txt at a time and it splits it based on comma and then uses the values to create Contact.java object. In the mapper class on line 67, i am converting the Contact object into JSON using Jackson parser and then setting the JSON as value of Text. Then on line 70, i am writing NullWriter as key and Contact JSON as value, which gets inserted into ES
    • ContactImportDriver: This class is same as any other MapReduce driver with few exceptions, On line 101 i am calling job.getConfiguration().set("es.input.json", "yes");, which tells ElasticSearch output to treat the output as JSON. Then on line 109, i am setting OutputFormatter to EsOutputFormat so that ElasticSearch Hadoop framework can take control of persisting output. job.setOutputFormatClass(EsOutputFormat.class);
  4. Once the program was built i am passing 2 parameters to program first is the full path of contact.txt file and second argument is the ElasticSearch index and type in hadoop/contact format. This is how the content of contact.txt file looks like
    Sachin,Tendulkar,24-Apr-1973,1 Main Street,Mumbai,India 
    Rahul,Dravid,01-Jan-1973,1 Main Street,Bangalore,India
    Anil,Kumble,17-Oct-1970,1 Main Street,Bangalore,India
    Virat,Kohali,05-Nov-1988,1 Main Street,Delhi,India
    Mahendra Singh,Dhoni,07-Jul-1981,1 Main Street,Ranchi,India
    Saurav Ganguli,08-Jul-1982,1 Main Street,Kolkata,India
After executing the code i did execute following query to find out all the players, with date of birth between 1970-1974 like this This is how my sample output looks like

How to create custom Combiner class with your MapReduce framework

The MapReduce framework passes control to your combiner class at the end of the map phase to combine different output files generated by Mappers, so that your combiner class combines/reduce the data generated by Mappers before it gets transferred to the Reducers. Sending data from Mapper to reducer requires that data to go over network from Mapper to Reducer. I wanted to try creating custom combiner class, In order to keep things simple i decided to add combiner class in WordCount(HelloWorld) MapReduce program . Basically my combiner class does same thing as reducer, which is to take multiple [word, 1] tuples and combine them into something like [word1, 5], [word2, 6],,, etc. I followed these steps
  1. First thing that i did was to create WordCountCombiner.java class that looks same as that of WordCountReducer, but i did add one System.Out.println() in it so that i would know when my combiner is called instead of reducer.
  2. Then i changed the driver class for my MapReduce framework class to add job.setCombinerClass(WordCountCombiner.class); line in it.
Then i did execute the WordCountDriver class with 3 files as input and i could see my Combiner class getting called after the Mapper class for each input file, before it wrote the mapper output to the disk and before starting reducer phase.

Creating custom Partitioner class for your mapreduce program

The MapReduce framwork uses instance of org.apache.hadoop.mapreduce.Partitioner class to figure our which mapreduce output key goes to which reducer. By default it uses org.apache.hadoop.mapreduce.lib.partition.HashPartitioner, this class calculates hash value for the key and divides it by number of Reducers in the program and uses remainder to figure out the reducer it goes to. This implementation is pretty good and as long as the keys generate hashCodes that gives uniform distribution it should be good. But in some exceptional cases you might want to take control of how the output of Mapper gets distributed to Reducers. I just wanted to figure out how this works, so i decided to change my WordCount(HelloWorld) MapReduce program to add a custom Partitioner that sends upper and lower case alphabets two 2 different reducers. I followed these steps
  1. First i did create a WordCountPartitioner.java class like this First thing i am doing is checking if there are 2 reducers if yes i am using the first letter of the key to figure out if it starts with lower case letter (simply check it against 'a' letter, if yes send it to first reducer if not send it to second reducer
  2. I had to make few changes in the Driver program to use my WordCountPartitioner
    • job.setNumReduceTasks(2): This call is asking MapReduce framework to use 2 reducers
    • job.setPartitionerClass(WordCountPartitioner.class); This call is setting my WordCountPartitioner as the class for partitioner
This screen shot shows how my sample.txt got divided into 2 reducer outputs. First 2 lines show output with default HashPartitioner and 2nd 2 lines show output when i used my custom Partitioner

Python client for publishing and consuming message from Apache Kafka

In the Java Client for publishing and consuming messages from Apache Kafka i talked about how to create a Java Client for publishing and consuming messages from Kafka. I wanted to try same thing using Python so i followed these steps
  1. Follow steps 1 through 4 of Java Client for publishing and consuming messages from Apache Kafka entry to start Zookeeper and Kafka server
  2. Follow these steps to install Kafka python client
    git clone https://github.com/mumrah/kafka-python
    pip install ./kafka-python
  3. Next create a Producer.py python script to publish message to pythontest topic, the basic concept here is you connect to Kafka server on localhost:9092 port and then publish a message to a particular topic
  4. Now create Consumer.py script that consumes messages from pythontest topic and writes them to console.

Java Client for publishing and consuming messages from Apache Kafka

I wanted to learn how to use Apache Kafka for publishing and consuming messages from Apache Kafka using Java client, so i followed these steps.
  1. Download the Kafka binaries from Kafka download page
  2. Unzip the kafka tar file by executing tar -xzf kafka_2.9.2- Then go to kafka directory by executing cd kafka_2.9.2-
  3. Next start the Zookeeper server by executing following command
    bin/zookeeper-server-start.sh config/zookeeper.properties
  4. Start the Kafka server by executing following command
    bin/kafka-server-start.sh config/server.properties
  5. Now your Zookeeper and Kafka server are ready and you can download the source code for sample project from here
  6. This is how a Java Client that publishes messages to Kafka looks like, execute it couple of times to publish couple of messages First thing that you have to do while developing a producer is connect to the Kafka server, for that you will set value of metadata.broker.list property to point to the port on which kafka server is listening (You can find value of port and host name from server.properties that you used in step 4. Once you have Producer object you can use it for publishing messages by creating object of kafka.producer.KeyedMessage, you will have to pass name of the topic and message as argument
  7. This is how the Java client for consumer of messages from Kafka looks like, run it and it will start a thread that will keep listening to messages on topic and every time there is a message it will print it to console The HelloKafkaConsumer class extends Thread class. In the constructor of this class first i am creating Properties class with value of zookeeper.connect property equal to the port on which zookeeper server is listening on. In the constructor i am creating object of kafka.javaapi.consumer.ConsumerConnector
    Once the ConsumerConnector is ready in the run() method i am passing it name of the topic on which i want to listen (You can pass multiple topic names here). Everytime there is a new message i am reading it and printing it to console.

How MapReduce job in Yarn framework works

In case of Yarn MapReduce framework these are the main players involved in the process of running mapreduce application.
  1. Client The client submits the MapReduce job. Client is responsible for copying the map reduce related jars, configuration files and the distributed cache related files(jars, archives and files) into HDFS for distribution. It is also responsible for splitting the input into pieces and saving that information into HDFS for later use
  2. Application Master To manage lifecycle of application running on the cluster. When the map reduce framework starts application it creates application master on one of the worker nodes and the application master runs for the lifecycle of application/job
  3. Application master negotiates with the resource manager for cluster resources - described in terms of a number of containers, each with certain memory limit. Application master is also responsible for tracking the application progress such as status as well as counters across application.
  4. Resource Manager To manage the use of compute resources(Resources available for running map and reduce related tasks) across the cluster
  5. Node Manager node managers runs the application specific processes in the containers. The node manager ensures that the application does not use more resources than it has been allocated.

Creating Oozie workflow for mapreduce job that uses distributed cache

In the Using third part jars and files in your MapReduce application(Distributed cache) entry i blogged about how to create a MapReduce job that uses distributed cache for storing both required jar files and files for use in distributed cache. I wanted to figure out how to automate this mapreduce job using Apache Oozie so i followed these steps
  1. First i did create apachelog directory and in that i had to create job.properties file like this
  2. Then i create workflow.xml file that looks like this, in this one thing to notice is <file>GeoLite.mmdb#GeoLite2-City.mmdb</file>, so basically i have file GeoLite.mmdb on the disk but i want to refer to it as GeoLite2-City.mmdb in my program so that file element takes care of creating symlink
  3. Then i copied all the required jar files in the lib folder and then this is how my directory structure looks like
  4. I used following command to copy the apachelog directory that has everything that my oozie job needs to the hdfs
    hdfs dfs -put apachelog apachelog
  5. Last step is to invoke the oozie job by executing following command
    oozie job -oozie http://localhost:11000/oozie -config job.properties -run

Using Apache Oozie for automating streaming map-reduce job

In the WordCount MapReduce program using Hadoop streaming and python i talked about how to create a Streaming map-reduce job using python. I wanted to figure out how to automate that program using Oozie workflow so i followed these steps
  1. First step was to create a folder called streaming on my local machine and copying of mapper.py, reducer.py into the streaming folder, i also create the place holder for job.properties and workflow.xml
  2. Next i did create a job.properties file like this Now this job.properties is quite similar to the job.properties for java mapreduce job, only difference is you must set oozie.use.system.libpath=true, by default the streaming related jars are not included in the classpath, so unless you set that value to true you will get following error
    2014-07-23 06:15:13,170 WARN org.apache.hadoop.mapred.Child: Error running child
    java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.streaming.Pi
    peMapRunner not found
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1649)
     at org.apache.hadoop.mapred.JobConf.getMapRunnerClass(JobConf.java:1010)
     at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:413)
     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
     at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
     at java.security.AccessController.doPrivileged(Native Method)
     at javax.security.auth.Subject.doAs(Subject.java:396)
     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
     at org.apache.hadoop.mapred.Child.main(Child.java:262)
    Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.streaming.PipeMapRunner not f
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1617)
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1641)
     ... 8 more
    Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.streaming.PipeMapRunner not found
     at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1523)
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1615)
     ... 9 more
    2014-07-23 06:15:13,175 INFO org.apache.hadoop.mapred.Task: Runnning cleanup for the task
  3. Next step in the process is to create workflow.xml file like this, make sure to add <file>mapper.py#mapper.py</file> element in the workflow.xml, which takes care of putting the mapper.py and reducer.py in the sharedlib and creating symbolic link to these two files.
  4. Upload the streaming folder with all your changes on hdfs by executing following command
    hdfs dfs -put streaming streaming
  5. You can trigger the oozie workflow by executing following command
    oozie job -oozie http://localhost:11000/oozie -config streaming/job.properties -run