How to use Apache Storm Trident API

Sometime back i blogged about HelloWorld - Apache Storm Word Counter program , which demonstrates how to build WordCount program using Apache Storm. Now problem with that project was that it was not Maven project instead i had screen shot of all the jars that you will have to include in the program. So i changed it to use Apache Maven as build framework. You can download the source code. In addition to normal API, storm also provides trident API, which allows us to build much compact code, i wanted to try that out so i built this simple Word Count program using Trident API.
package com.spnotes.storm.trident;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.tuple.Fields;
import com.spnotes.storm.LineReaderSpout;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Debug;
import storm.trident.testing.Split;
/**
* Created by Sunil Patil on 8/12/14.
*/
public class TridentWordCountDriver {
public static void main(String[] argv){
if(argv.length != 1){
System.out.println("Please provide input file path");
System.exit(-1);
}
String inputFile = argv[0];
System.out.println("TridentWordCountDriver is starting for " + inputFile);
Config config = new Config();
config.put("inputFile",inputFile);
LocalDRPC localDRPC = new LocalDRPC();
LocalCluster localCluster = new LocalCluster();
LineReaderSpout lineReaderSpout = new LineReaderSpout();
TridentTopology topology = new TridentTopology();
topology.newStream("spout",lineReaderSpout)
.each(new Fields("line"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.aggregate(new Fields("word"), new Count(), new Fields("count"))
.each(new Fields("word","count"), new Debug());
localCluster.submitTopology("WordCount",config,topology.build());;
}
}
While using Trident API you will have to start by creating object of TridentTopology, you still need LineReaderSpout that takes file path as input, reads and emits one line of file at a time. But the part that is different is you dont need WordSpitterBolt and WordCounterBolt, instead you can use compact code like this

 topology.newStream("spout",lineReaderSpout)
.each(new Fields("line"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.aggregate(new Fields("word"), new Count(), new Fields("count"))
.each(new Fields("word","count"), new Debug());
The each(new Fields("line"), new Split(), new Fields("word")) line takes the line emitted by the LineReaderSpout and uses built in storm.trident.operation.builtin.Split function to split the lines into words and emits each word as Tuple. The groupBy(new Fields("word")) line takes the tuples and groups them by word's. The aggregate(new Fields("word"), new Count(), new Fields("count")) line takes care of aggregating the words and counts them(At this point you have a tuple like {word,count}), for that it uses storm.trident.operation.builtin.Count class. The last part is .each(new Fields("word","count"), new Debug());, which takes care of printing each tuple which in WORD count format. Trident API provides set of sample classes that makes developing WordCount type of program very easy. But you could have created your own version of Split and Count program and the code would still look significantly compact

How to use ElasticSearch as input for Apache Spark program

In the How to use ElasticSearch as input for MapReduce program entry i blogged about how to create a MapReduce program that reads data from ElasticSearch Index or query as input and uses it to produce some output. I wanted to build same functionality using Apache Spark, you can download the source code for the project from here Basic idea in the program is that i have a hadoop/contact index/type that contains contact records that look like this, with every contact having first name, last name and address. I want to write a program that tells me how many contacts are from particular city.

{
   "lastName":"Tendulkar",
   "address":[
      {
         "country":"India\t",
         "addressLine1":"1 Main Street",
         "city":"Mumbai"
      }
   ],
   "firstName":"Sachin",
   "dateOfBirth":"1973-04-24"
}
In order to do that i build a simple HelloESInputSpark.java class that looks like this (I did add it to my WordCount Apache Spark project that i built in WordCount program built using Apache Spark in Java )
package com.spnotes.spark;
import com.twitter.chill.Externalizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.elasticsearch.hadoop.mr.WritableArrayWritable;
import scala.Tuple2;
/**
* Created by user on 8/25/14.
*/
public class HelloESInputSpark {
public static void main(String[] argv){
System.setProperty("hadoop.home.dir","/usr/local/hadoop");
SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local");
conf.set("spark.serializer", org.apache.spark.serializer.KryoSerializer.class.getName());
JavaSparkContext sc = new JavaSparkContext(conf);
Configuration hadoopConfiguration = new Configuration();
hadoopConfiguration.set("es.nodes","localhost:9200");
hadoopConfiguration.set("es.resource","hadoop/contact");
JavaPairRDD<Text,MapWritable> esRDD = sc.newAPIHadoopRDD(hadoopConfiguration, EsInputFormat.class, Text.class, MapWritable.class);
System.out.println("Count of records founds is " + esRDD.count());
//This function will get ES record key as first parameter and the ES record as second parameter, it will return {city,1} tuple for each city in the record
JavaPairRDD<String, Integer> cityCountMap = esRDD.mapToPair(new PairFunction<Tuple2<Text, MapWritable>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Text, MapWritable> currentEntry) throws Exception {
MapWritable valueMap = currentEntry._2();
WritableArrayWritable address =(WritableArrayWritable) valueMap.get(new Text("address"));
MapWritable addressMap = (MapWritable)address.get()[0];
Text city = (Text)addressMap.get(new Text("city"));
return new Tuple2<String, Integer>(city.toString(),1);
}
});
//This is reducer which will maintain running count of city vs count
JavaPairRDD<String, Integer> cityCount = cityCountMap.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer first, Integer second) throws Exception {
return first + second;
}
});
cityCount.saveAsTextFile("file:///tmp/sparkes");
}
}
This program is similar to any other with difference of few lines, i had to create a Hadoop Configuration object and set properties required to use ESInputFormat as InputFormat and then call sc.newAPIHadoopRDD(} to pass the newly created Hadoop Configuration object to it.

Configuration hadoopConfiguration = new Configuration();
hadoopConfiguration.set("es.nodes","localhost:9200");
hadoopConfiguration.set("es.resource","hadoop/contact");
JavaPairRDD esRDD = sc.newAPIHadoopRDD(hadoopConfiguration, EsInputFormat.class, Text.class, MapWritable.class);
The Spark framework reads the ElasticSearch index as Map in which the id of the record is key and the actual record becomes value and get passed as object of MapWritable. You have to use little bit different plumbing to read embedded values stored inside the record I configured Spark to store the output on the disk it created different part files and you can see the content of the part files like this

WordCount program built using Apache Spark in Java

In the WordCount program writtten using Spark framework written in python language entry i talked about how to create WordCount program using Apache Spark in python language. Now i wanted to try building same program using Java, so i followed these steps. You can download the full code from here This is how the Java class looks like, it has 4 anonymous classes
  1. file.flatMap(new FlatMapFunction() {}): This class gets called for every line in the input file, it splits the input line and return list of words in the line
  2. words.filter(new Function() {}): This class gets the list of words and checks if the word is actually a word or just a blank space. My input file has some blank spaces so i used this filter (Part of the reason was to try out Filters) to remove the words which are just blank spaces
  3. words.mapToPair(new PairFunction() {}): This class takes list of words as input and converts into Tuple2 with word as key and 1 as value
  4. wordToCountMap.reduceByKey(new Function2() {}): This class gets called with running total and count as input and only thing that is does is adds the current count to running total
package com.spnotes.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Created by user on 8/24/14.
*/
public class WordCount2 {
public static void main(String[] argv){
if (argv.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n",
WordCount.class.getSimpleName());
return;
}
String inputPath = argv[0];
String outputPath = argv[1];
System.out.printf("Starting WordCount program with %s as input %s as output\n", inputPath,outputPath);
SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> file = sc.textFile(inputPath);
JavaRDD<String> words = file.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
});
words = words.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
System.out.println("Inside filter words ->" +s);
if( s.trim().length() == 0)
return false;
return true;
}
});
JavaPairRDD<String, Integer> wordToCountMap = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s,1);
}
});
JavaPairRDD<String, Integer> wordCounts = wordToCountMap.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer first, Integer second) throws Exception {
return first + second;
}
});
wordCounts.saveAsTextFile(outputPath);
}
}
view raw WordCount2.java hosted with ❤ by GitHub
This is how the maven build script for my program looks like, i had to fight through couple of issues to build this maven, first one is by default when you add org.apache.spark.spark-core_2.10 dependency it includes HDFS 1.0.4 jars, and in my case i have Hadoop 2.4 server so i got following error, you get this error even if you include HDFS related jars because now suddenly you have both Hadoop 2.4 and Hadoop 1.0.4 jars so you have to ask maven to exclude hadoop client included by org.apache.spark.spark-core_2.10

14/08/25 12:16:50 WARN snappy.LoadSnappy: Snappy native library not loaded
Exception in thread "main" org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4
 at org.apache.hadoop.ipc.Client.call(Client.java:1070)
 at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
 at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source)
 at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
 at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
 at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
 at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:238)
 at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:203)
 at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
 at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
 at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
 at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
 at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.FilteredRDD.getPartitions(FilteredRDD.scala:29)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:59)
 at org.apache.spark.api.java.JavaPairRDD.reduceByKey(JavaPairRDD.scala:385)
 at com.spnotes.spark.WordCount2.main(WordCount2.java:62)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
 
Once that part is done, i ran into issue because of 2 different versions of Jetty getting included Exception in thread "main" java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package so i had to add Jetty related jars exclusion filter in org.apache.hadoop.hadoop-common
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>WordCountSpark</groupId>
<artifactId>WordCountSpark</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<hadoop.version>2.4.0</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
</project>
view raw pom.xml hosted with ❤ by GitHub

How to use ElasticSearch as input for MapReduce program

In the Saving complex object in elasticsearch as output of MapReduce program entry, i talked about how to use ElasticSearch for storing output of the MapReduce job. In that blog i was creating Contact records that look like this in elasticSearch

{
   "lastName":"Tendulkar",
   "address":[
      {
         "country":"India\t",
         "addressLine1":"1 Main Street",
         "city":"Mumbai"
      }
   ],
   "firstName":"Sachin",
   "dateOfBirth":"1973-04-24"
}
I wanted to figure out how to use ElasticSearch as input for MapReduce program, so i decided to create a MapReduce program that reads the contact Index and generates output on how many players are coming from a city. You can download the sample program from here This is how my MapReduce program looks like, you can run the driver program with 2 arguments ex. hadoop/contact file:///home/user/output/ first is name of the ElasticSearch Index/type and second is the output directory where the output will get written.
package com.spnotes.eshadoop;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.WritableArrayWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MRInputDriver extends Configured implements Tool{
Logger logger = LoggerFactory.getLogger(MRInputDriver.class);
private static class MRInputMapper extends Mapper<Object, Object, Text, IntWritable>{
Logger logger = LoggerFactory.getLogger(MRInputMapper.class);
private static final IntWritable ONE = new IntWritable(1);
@Override
protected void map(Object key, Object value,
Mapper<Object, Object, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
logger.debug("Entering MRInputDriver.map()");
Text documentId = (Text)key;
MapWritable valueMap = (MapWritable)value;
WritableArrayWritable address =(WritableArrayWritable) valueMap.get(new Text("address"));
MapWritable addressMap = (MapWritable)address.get()[0];
Text city = (Text)addressMap.get(new Text("city"));
context.write(city, ONE);
logger.debug("Exiting MRInputDriver.map()");;
}
}
private static class MRInputReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
Logger logger = LoggerFactory.getLogger(MRInputReducer.class);
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
logger.debug("Entering MRInputReducer.reduce()");
int sum = 0;
Iterator<IntWritable> valuesIt = values.iterator();
while(valuesIt.hasNext()){
sum = sum + valuesIt.next().get();
}
logger.debug(key + " -> " + sum);
context.write(key, new IntWritable(sum));
logger.debug("Exiting MRInputReducer.reduce()");;
}
}
public int run(String[] args) throws Exception {
logger.debug("Entering MRInputDriver.run()");
if (args.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n",
getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Job job = new Job();
job.setJarByClass(MRInputDriver.class);
job.setJobName("ContactImporter");
logger.info("Input path " + args[0]);
logger.info("Oupput path " + args[1]);
// FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
Configuration configuration = job.getConfiguration();
configuration.set("es.nodes","localhost:9200");
configuration.set("es.resource",args[0]);
job.setInputFormatClass(EsInputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(MRInputMapper.class);
job.setReducerClass(MRInputReducer.class);
int returnValue = job.waitForCompletion(true) ? 0:1;
System.out.println("job.isSuccessful " + job.isSuccessful());
logger.debug("Exiting MRInputDriver.run()");
return returnValue;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MRInputDriver(), args);
System.exit(exitCode);
}
}
This program has 3 main components
  1. MRInputDriver: In the Driver program you have to set es.nodes entry pointing to address of your elasticsearch installation and value of es.resource is name of the ElasticSearch index/type name. Then i am setting job.setInputFormatClass(EsInputFormat.class);, which sets EsInputFormat class as the input reader, it takes care of reading the records from ElasticSearch
  2. MRInputMapper: The Mapper class sets Object as value of both Key and Value type. ElasticSearch Hadoop framework reads the record from ElasticSearch and passes id as key(Text) and the content of value is object of MapWritable class that represents the record stored in elasticsearch. Once i have the value, i am reading address from it and mapper writes City name as key and value 1.
  3. MRInputReducer: The reducer is pretty simple it gets called with name of the city as key and Iterable of values, this is very similar to reducer in WordCount.
After running the program i could see output being generated like this

Bangalore 2
Delhi 1
Mumbai 1
Ranchi 1

Saving complex object in elasticsearch as output of MapReduce program

In the Using ElasticSearch to store output of MapReduce program i built a sample MapReduce program that writes output to the ElasticSearch. I wanted to figure out how to write output of MapReduce job into elasticseach so that it creates complex JSON structure which has embedded object and uses date type. I decided to create this sample program which reads .csv file and generates contact records in elasticsearch, which look like this
{
"firstName": "Sachin",
"lastName": "Tendulkar",
"dateOfBirth": "1973-04-24",
"address": [
{
"addressLine1": "1 Main Street",
"city": "Mumbai",
"country": "India"
}
]
}
view raw contact.json hosted with ❤ by GitHub
. You can download sample code for this project from here I followed these steps for creating the program.
  1. First thing i did was create hadoop/contact index in ElasticSearch by using following mapping. You can create the index by making HTTP POST call to http://localhost:9200/hadoop
    {
    "hadoop": {
    "mappings": {
    "contact": {
    "properties": {
    "address": {
    "properties": {
    "addressLine1": {
    "type": "string"
    },
    "city": {
    "type": "string"
    },
    "country": {
    "type": "string"
    }
    }
    },
    "dateOfBirth": {
    "type": "date",
    "format": "dateOptionalTime"
    },
    "firstName": {
    "type": "string"
    },
    "lastName": {
    "type": "string"
    }
    }
    }
    }
    }
    }
    view raw mapping.json hosted with ❤ by GitHub
  2. Once the index was created i created a Contact.java object which looks like this
    package com.spnotes.eshadoop;
    import java.util.Date;
    import java.util.List;
    import org.codehaus.jackson.map.annotate.JsonSerialize;
    public class Contact {
    private String firstName;
    private String lastName;
    private Date dateOfBirth;
    private List<Address> address;
    public Contact(String firstName, String lastName, Date dateOfBirth,
    List<Address> address) {
    super();
    this.firstName = firstName;
    this.lastName = lastName;
    this.dateOfBirth = dateOfBirth;
    this.address = address;
    }
    public String getFirstName() {
    return firstName;
    }
    public void setFirstName(String firstName) {
    this.firstName = firstName;
    }
    public String getLastName() {
    return lastName;
    }
    public void setLastName(String lastName) {
    this.lastName = lastName;
    }
    @JsonSerialize(using=CustomDateSerializer.class)
    public Date getDateOfBirth() {
    return dateOfBirth;
    }
    public void setDateOfBirth(Date dateOfBirth) {
    this.dateOfBirth = dateOfBirth;
    }
    public List<Address> getAddress() {
    return address;
    }
    public void setAddress(List<Address> address) {
    this.address = address;
    }
    @Override
    public String toString() {
    return "Contact [firstName=" + firstName + ", lastName=" + lastName
    + ", dateOfBirth=" + dateOfBirth + ", address=" + address + "]";
    }
    }
    view raw Contact.java hosted with ❤ by GitHub
    As you can see this is a simple POJO which has list of Address objects which is another pojo. It also uses CustomDateSerializer as custom date serializer
  3. Next step was to create CustomDateSerializer.java which is a class that has both the Mapper and the MapReducer driver class.
    • ContactImportMapper: This is a mapper class that gets one line from Contact.txt at a time and it splits it based on comma and then uses the values to create Contact.java object. In the mapper class on line 67, i am converting the Contact object into JSON using Jackson parser and then setting the JSON as value of Text. Then on line 70, i am writing NullWriter as key and Contact JSON as value, which gets inserted into ES
    • ContactImportDriver: This class is same as any other MapReduce driver with few exceptions, On line 101 i am calling job.getConfiguration().set("es.input.json", "yes");, which tells ElasticSearch output to treat the output as JSON. Then on line 109, i am setting OutputFormatter to EsOutputFormat so that ElasticSearch Hadoop framework can take control of persisting output. job.setOutputFormatClass(EsOutputFormat.class);
    package com.spnotes.eshadoop;
    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Date;
    import java.util.List;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.codehaus.jackson.map.ObjectMapper;
    import org.elasticsearch.hadoop.mr.EsOutputFormat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    public class ContactImportDriver extends Configured implements Tool{
    Logger logger = LoggerFactory.getLogger(ContactImportDriver.class);
    public static class ContactImportMapper extends Mapper<LongWritable, Text, NullWritable, Text>{
    Logger logger = LoggerFactory.getLogger(ContactImportMapper.class);
    private ObjectMapper jsonMapper;
    private SimpleDateFormat dateFormatter =
    new SimpleDateFormat("dd-MMM-yyyy");
    private Text contactText;
    @Override
    protected void map(LongWritable key, Text value,
    Mapper<LongWritable, Text, NullWritable, Text>.Context context)
    throws IOException, InterruptedException {
    logger.debug("Entering ContactImportMapper.map()");
    try {
    String contactContent = value.toString();
    System.out.println("Value of contactContent " + contactContent);
    String[] contactDataList= contactContent.split(",");
    System.out.println("ContactDataList " + Arrays.toString(contactDataList) +" length -> " + contactDataList.length);
    SimpleDateFormat df = new SimpleDateFormat("");
    if(contactDataList.length == 6){
    String firstName = contactDataList[0];
    String lastName = contactDataList[1];
    Date dateOfBirth = dateFormatter.parse(contactDataList[2]);
    String addressLine1 = contactDataList[3];
    String city = contactDataList[4];
    String country = contactDataList[5];
    Address address = new Address(addressLine1, city, country);
    List<Address> addressList = new ArrayList<Address>();
    addressList.add(address);;
    Contact contact = new Contact(firstName, lastName,
    dateOfBirth, addressList);
    String contactJSON = jsonMapper.writeValueAsString(contact);
    contactText.set(contactJSON);
    context.write(NullWritable.get(), contactText);
    }
    } catch (ParseException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    logger.debug("Exiting ContactImportMapper.map()");
    }
    @Override
    protected void setup(
    Mapper<LongWritable, Text, NullWritable, Text>.Context context)
    throws IOException, InterruptedException {
    jsonMapper = new ObjectMapper();
    contactText = new Text();
    }
    }
    public int run(String[] args) throws Exception {
    if (args.length != 2) {
    System.err.printf("Usage: %s [generic options] <input> <output>\n",
    getClass().getSimpleName());
    ToolRunner.printGenericCommandUsage(System.err);
    return -1;
    }
    Job job = new Job();
    job.setJarByClass(ContactImportDriver.class);
    job.setJobName("ContactImporter");
    job.getConfiguration().set("es.input.json", "yes");
    logger.info("Input path " + args[0]);
    logger.info("Oupput path " + args[1]);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    Configuration configuration = job.getConfiguration();
    configuration.set("es.nodes","localhost:9200");
    configuration.set("es.resource",args[1]);
    job.setOutputFormatClass(EsOutputFormat.class);
    job.setNumReduceTasks(0);
    job.setMapOutputKeyClass(NullWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);
    job.setMapperClass(ContactImportMapper.class);
    int returnValue = job.waitForCompletion(true) ? 0:1;
    System.out.println("job.isSuccessful " + job.isSuccessful());
    return returnValue;
    }
    public static void main(final String[] args) throws Exception{
    int exitCode = ToolRunner.run(new ContactImportDriver(), args);
    System.exit(exitCode);
    }
    }
  4. Once the program was built i am passing 2 parameters to program first is the full path of contact.txt file and second argument is the ElasticSearch index and type in hadoop/contact format. This is how the content of contact.txt file looks like
    
    Sachin,Tendulkar,24-Apr-1973,1 Main Street,Mumbai,India 
    Rahul,Dravid,01-Jan-1973,1 Main Street,Bangalore,India
    Anil,Kumble,17-Oct-1970,1 Main Street,Bangalore,India
    Virat,Kohali,05-Nov-1988,1 Main Street,Delhi,India
    Mahendra Singh,Dhoni,07-Jul-1981,1 Main Street,Ranchi,India
    Saurav Ganguli,08-Jul-1982,1 Main Street,Kolkata,India
    
After executing the code i did execute following query to find out all the players, with date of birth between 1970-1974 like this
{
"query": {
"filtered": {
"query": {
"match_all": {}
},
"filter": {
"numeric_range": {
"dateOfBirth": {
"from": "1970-01-01",
"to": "1974-02-01"
}
}
}
}
}
}
view raw query.json hosted with ❤ by GitHub
This is how my sample output looks like

How to create custom Combiner class with your MapReduce framework

The MapReduce framework passes control to your combiner class at the end of the map phase to combine different output files generated by Mappers, so that your combiner class combines/reduce the data generated by Mappers before it gets transferred to the Reducers. Sending data from Mapper to reducer requires that data to go over network from Mapper to Reducer. I wanted to try creating custom combiner class, In order to keep things simple i decided to add combiner class in WordCount(HelloWorld) MapReduce program . Basically my combiner class does same thing as reducer, which is to take multiple [word, 1] tuples and combine them into something like [word1, 5], [word2, 6],,, etc. I followed these steps
  1. First thing that i did was to create WordCountCombiner.java class that looks same as that of WordCountReducer, but i did add one System.Out.println() in it so that i would know when my combiner is called instead of reducer.
    package com.spnotes.hadoop;
    import java.io.IOException;
    import java.util.Iterator;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
    Logger logger = LoggerFactory.getLogger(WordCountCombiner.class);
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
    Context context)
    throws IOException, InterruptedException {
    System.out.println("Entering WordCountCombiner.reduce() " );
    int sum = 0;
    Iterator<IntWritable> valuesIt = values.iterator();
    while(valuesIt.hasNext()){
    sum = sum + valuesIt.next().get();
    }
    logger.debug(key + " -> " + sum);
    context.write(key, new IntWritable(sum));
    logger.debug("Exiting WordCountCombiner.reduce()");
    }
    }
  2. Then i changed the driver class for my MapReduce framework class to add job.setCombinerClass(WordCountCombiner.class); line in it.
    package com.spnotes.hadoop;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    public class WordCountDriver extends Configured implements Tool{
    Logger logger = LoggerFactory.getLogger(WordCountDriver.class);
    public static void main(String[] args) throws Exception{
    int exitCode = ToolRunner.run(new WordCountDriver(), args);
    System.exit(exitCode);
    }
    public int run(String[] args) throws Exception {
    if (args.length != 2) {
    System.err.printf("Usage: %s [generic options] <input> <output>\n",
    getClass().getSimpleName());
    ToolRunner.printGenericCommandUsage(System.err);
    return -1;
    }
    Job job = new org.apache.hadoop.mapreduce.Job();
    Configuration conf= job.getConfiguration();
    job.setJarByClass(WordCountDriver.class);
    job.setJobName("WordCounter");
    logger.info("Input path " + args[0]);
    logger.info("Oupput path " + args[1]);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setMapperClass(WordCountMapper.class);
    job.setReducerClass(WordCountReducer.class);
    job.setCombinerClass(WordCountCombiner.class);
    int returnValue = job.waitForCompletion(true) ? 0:1;
    System.out.println("job.isSuccessful " + job.isSuccessful());
    return returnValue;
    }
    }
Then i did execute the WordCountDriver class with 3 files as input and i could see my Combiner class getting called after the Mapper class for each input file, before it wrote the mapper output to the disk and before starting reducer phase.

Creating custom Partitioner class for your mapreduce program

The MapReduce framwork uses instance of org.apache.hadoop.mapreduce.Partitioner class to figure our which mapreduce output key goes to which reducer. By default it uses org.apache.hadoop.mapreduce.lib.partition.HashPartitioner, this class calculates hash value for the key and divides it by number of Reducers in the program and uses remainder to figure out the reducer it goes to. This implementation is pretty good and as long as the keys generate hashCodes that gives uniform distribution it should be good. But in some exceptional cases you might want to take control of how the output of Mapper gets distributed to Reducers. I just wanted to figure out how this works, so i decided to change my WordCount(HelloWorld) MapReduce program to add a custom Partitioner that sends upper and lower case alphabets two 2 different reducers. I followed these steps
  1. First i did create a WordCountPartitioner.java class like this
    package com.spnotes.hadoop;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    public class WordCountPartitioner extends Partitioner<Text, IntWritable>{
    @Override
    public int getPartition(Text key, IntWritable value, int numPartitions) {
    if(numPartitions == 2){
    String partitionKey = key.toString();
    if(partitionKey.charAt(0) < 'a' )
    return 0;
    else
    return 1;
    }else if(numPartitions == 1)
    return 0;
    else{
    System.err.println("WordCountParitioner can only handle either 1 or 2 paritions");
    return 0;
    }
    }
    }
    First thing i am doing is checking if there are 2 reducers if yes i am using the first letter of the key to figure out if it starts with lower case letter (simply check it against 'a' letter, if yes send it to first reducer if not send it to second reducer
  2. I had to make few changes in the Driver program to use my WordCountPartitioner
    package com.spnotes.hadoop;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    public class WordCountDriver extends Configured implements Tool{
    Logger logger = LoggerFactory.getLogger(WordCountDriver.class);
    public static void main(String[] args) throws Exception{
    int exitCode = ToolRunner.run(new WordCountDriver(), args);
    System.exit(exitCode);
    }
    public int run(String[] args) throws Exception {
    if (args.length != 2) {
    System.err.printf("Usage: %s [generic options] <input> <output>\n",
    getClass().getSimpleName());
    ToolRunner.printGenericCommandUsage(System.err);
    return -1;
    }
    Job job = new org.apache.hadoop.mapreduce.Job();
    Configuration conf= job.getConfiguration();
    job.setJarByClass(WordCountDriver.class);
    job.setJobName("WordCounter");
    logger.info("Input path " + args[0]);
    logger.info("Oupput path " + args[1]);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.setNumReduceTasks(2);
    job.setPartitionerClass(WordCountPartitioner.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setMapperClass(WordCountMapper.class);
    job.setReducerClass(WordCountReducer.class);
    int returnValue = job.waitForCompletion(true) ? 0:1;
    System.out.println("job.isSuccessful " + job.isSuccessful());
    return returnValue;
    }
    }
    • job.setNumReduceTasks(2): This call is asking MapReduce framework to use 2 reducers
    • job.setPartitionerClass(WordCountPartitioner.class); This call is setting my WordCountPartitioner as the class for partitioner
This screen shot shows how my sample.txt got divided into 2 reducer outputs. First 2 lines show output with default HashPartitioner and 2nd 2 lines show output when i used my custom Partitioner

Python client for publishing and consuming message from Apache Kafka

In the Java Client for publishing and consuming messages from Apache Kafka i talked about how to create a Java Client for publishing and consuming messages from Kafka. I wanted to try same thing using Python so i followed these steps
  1. Follow steps 1 through 4 of Java Client for publishing and consuming messages from Apache Kafka entry to start Zookeeper and Kafka server
  2. Follow these steps to install Kafka python client
    
    git clone https://github.com/mumrah/kafka-python
    pip install ./kafka-python
    
  3. Next create a Producer.py python script to publish message to pythontest topic, the basic concept here is you connect to Kafka server on localhost:9092 port and then publish a message to a particular topic
    __author__ = 'user'
    from kafka.client import KafkaClient
    from kafka.producer import SimpleProducer
    from datetime import datetime
    kafka = KafkaClient("localhost:9092")
    producer = SimpleProducer(kafka)
    producer.send_messages("pythontest", "This is message sent from python client " + str(datetime.now().time()) )
    view raw producer.py hosted with ❤ by GitHub
  4. Now create Consumer.py script that consumes messages from pythontest topic and writes them to console.
    __author__ = 'user'
    from kafka.client import KafkaClient
    from kafka.consumer import SimpleConsumer
    kafka = KafkaClient("localhost:9092")
    print("After connecting to kafka")
    consumer = SimpleConsumer(kafka, "my-group", "test")
    for message in consumer:
    print(message)
    view raw Consumer.py hosted with ❤ by GitHub

Java Client for publishing and consuming messages from Apache Kafka

I wanted to learn how to use Apache Kafka for publishing and consuming messages from Apache Kafka using Java client, so i followed these steps.
  1. Download the Kafka binaries from Kafka download page
  2. Unzip the kafka tar file by executing tar -xzf kafka_2.9.2-0.8.1.1.tgz. Then go to kafka directory by executing cd kafka_2.9.2-0.8.1.1
  3. Next start the Zookeeper server by executing following command
    
    bin/zookeeper-server-start.sh config/zookeeper.properties
    
  4. Start the Kafka server by executing following command
    
    bin/kafka-server-start.sh config/server.properties
    
  5. Now your Zookeeper and Kafka server are ready and you can download the source code for sample project from here
  6. This is how a Java Client that publishes messages to Kafka looks like, execute it couple of times to publish couple of messages
    package com.spnotes.kafka;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Properties;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    /**
    * Created by user on 8/4/14.
    */
    public class HelloKafkaProducer {
    final static String TOPIC = "javatest";
    public static void main(String[] argv){
    Properties properties = new Properties();
    properties.put("metadata.broker.list","localhost:9092");
    properties.put("serializer.class","kafka.serializer.StringEncoder");
    ProducerConfig producerConfig = new ProducerConfig(properties);
    kafka.javaapi.producer.Producer<String,String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);
    SimpleDateFormat sdf = new SimpleDateFormat();
    KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,"Test message from java program " + sdf.format(new Date()));
    producer.send(message);
    producer.close();
    }
    }
    First thing that you have to do while developing a producer is connect to the Kafka server, for that you will set value of metadata.broker.list property to point to the port on which kafka server is listening (You can find value of port and host name from server.properties that you used in step 4. Once you have Producer object you can use it for publishing messages by creating object of kafka.producer.KeyedMessage, you will have to pass name of the topic and message as argument
  7. This is how the Java client for consumer of messages from Kafka looks like, run it and it will start a thread that will keep listening to messages on topic and every time there is a message it will print it to console
    package com.spnotes.kafka;
    import java.io.UnsupportedEncodingException;
    import java.nio.ByteBuffer;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.javaapi.message.ByteBufferMessageSet;
    import kafka.message.MessageAndOffset;
    /**
    * Created by user on 8/4/14.
    */
    public class HelloKafkaConsumer extends Thread {
    final static String clientId = "SimpleConsumerDemoClient";
    final static String TOPIC = "pythontest";
    ConsumerConnector consumerConnector;
    public static void main(String[] argv) throws UnsupportedEncodingException {
    HelloKafkaConsumer helloKafkaConsumer = new HelloKafkaConsumer();
    helloKafkaConsumer.start();
    }
    public HelloKafkaConsumer(){
    Properties properties = new Properties();
    properties.put("zookeeper.connect","localhost:2181");
    properties.put("group.id","test-group");
    ConsumerConfig consumerConfig = new ConsumerConfig(properties);
    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    }
    @Override
    public void run() {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(TOPIC, new Integer(1));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
    KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0);
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    while(it.hasNext())
    System.out.println(new String(it.next().message()));
    }
    private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
    for(MessageAndOffset messageAndOffset: messageSet) {
    ByteBuffer payload = messageAndOffset.message().payload();
    byte[] bytes = new byte[payload.limit()];
    payload.get(bytes);
    System.out.println(new String(bytes, "UTF-8"));
    }
    }
    }
    The HelloKafkaConsumer class extends Thread class. In the constructor of this class first i am creating Properties class with value of zookeeper.connect property equal to the port on which zookeeper server is listening on. In the constructor i am creating object of kafka.javaapi.consumer.ConsumerConnector
    Once the ConsumerConnector is ready in the run() method i am passing it name of the topic on which i want to listen (You can pass multiple topic names here). Everytime there is a new message i am reading it and printing it to console.

How MapReduce job in Yarn framework works

In case of Yarn MapReduce framework these are the main players involved in the process of running mapreduce application.
  1. Client The client submits the MapReduce job. Client is responsible for copying the map reduce related jars, configuration files and the distributed cache related files(jars, archives and files) into HDFS for distribution. It is also responsible for splitting the input into pieces and saving that information into HDFS for later use
  2. Application Master To manage lifecycle of application running on the cluster. When the map reduce framework starts application it creates application master on one of the worker nodes and the application master runs for the lifecycle of application/job
  3. Application master negotiates with the resource manager for cluster resources - described in terms of a number of containers, each with certain memory limit. Application master is also responsible for tracking the application progress such as status as well as counters across application.
  4. Resource Manager To manage the use of compute resources(Resources available for running map and reduce related tasks) across the cluster
  5. Node Manager node managers runs the application specific processes in the containers. The node manager ensures that the application does not use more resources than it has been allocated.