How to use ElasticSearch as input for Apache Spark program

In the How to use ElasticSearch as input for MapReduce program entry i blogged about how to create a MapReduce program that reads data from ElasticSearch Index or query as input and uses it to produce some output. I wanted to build same functionality using Apache Spark, you can download the source code for the project from here Basic idea in the program is that i have a hadoop/contact index/type that contains contact records that look like this, with every contact having first name, last name and address. I want to write a program that tells me how many contacts are from particular city.

{
   "lastName":"Tendulkar",
   "address":[
      {
         "country":"India\t",
         "addressLine1":"1 Main Street",
         "city":"Mumbai"
      }
   ],
   "firstName":"Sachin",
   "dateOfBirth":"1973-04-24"
}
In order to do that i build a simple HelloESInputSpark.java class that looks like this (I did add it to my WordCount Apache Spark project that i built in WordCount program built using Apache Spark in Java )
package com.spnotes.spark;
import com.twitter.chill.Externalizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.elasticsearch.hadoop.mr.WritableArrayWritable;
import scala.Tuple2;
/**
* Created by user on 8/25/14.
*/
public class HelloESInputSpark {
public static void main(String[] argv){
System.setProperty("hadoop.home.dir","/usr/local/hadoop");
SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local");
conf.set("spark.serializer", org.apache.spark.serializer.KryoSerializer.class.getName());
JavaSparkContext sc = new JavaSparkContext(conf);
Configuration hadoopConfiguration = new Configuration();
hadoopConfiguration.set("es.nodes","localhost:9200");
hadoopConfiguration.set("es.resource","hadoop/contact");
JavaPairRDD<Text,MapWritable> esRDD = sc.newAPIHadoopRDD(hadoopConfiguration, EsInputFormat.class, Text.class, MapWritable.class);
System.out.println("Count of records founds is " + esRDD.count());
//This function will get ES record key as first parameter and the ES record as second parameter, it will return {city,1} tuple for each city in the record
JavaPairRDD<String, Integer> cityCountMap = esRDD.mapToPair(new PairFunction<Tuple2<Text, MapWritable>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Text, MapWritable> currentEntry) throws Exception {
MapWritable valueMap = currentEntry._2();
WritableArrayWritable address =(WritableArrayWritable) valueMap.get(new Text("address"));
MapWritable addressMap = (MapWritable)address.get()[0];
Text city = (Text)addressMap.get(new Text("city"));
return new Tuple2<String, Integer>(city.toString(),1);
}
});
//This is reducer which will maintain running count of city vs count
JavaPairRDD<String, Integer> cityCount = cityCountMap.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer first, Integer second) throws Exception {
return first + second;
}
});
cityCount.saveAsTextFile("file:///tmp/sparkes");
}
}
This program is similar to any other with difference of few lines, i had to create a Hadoop Configuration object and set properties required to use ESInputFormat as InputFormat and then call sc.newAPIHadoopRDD(} to pass the newly created Hadoop Configuration object to it.

Configuration hadoopConfiguration = new Configuration();
hadoopConfiguration.set("es.nodes","localhost:9200");
hadoopConfiguration.set("es.resource","hadoop/contact");
JavaPairRDD esRDD = sc.newAPIHadoopRDD(hadoopConfiguration, EsInputFormat.class, Text.class, MapWritable.class);
The Spark framework reads the ElasticSearch index as Map in which the id of the record is key and the actual record becomes value and get passed as object of MapWritable. You have to use little bit different plumbing to read embedded values stored inside the record I configured Spark to store the output on the disk it created different part files and you can see the content of the part files like this

1 comment:

veera cynixit said...

Thanks for sharing your knowledge with us,keep sharingmore blog posts with us.

Thank you...

online course for big data and hadoop
best online training for big data and hadoop