How to use ElasticSearch as input for Apache Spark program

In the How to use ElasticSearch as input for MapReduce program entry i blogged about how to create a MapReduce program that reads data from ElasticSearch Index or query as input and uses it to produce some output. I wanted to build same functionality using Apache Spark, you can download the source code for the project from here Basic idea in the program is that i have a hadoop/contact index/type that contains contact records that look like this, with every contact having first name, last name and address. I want to write a program that tells me how many contacts are from particular city.

         "addressLine1":"1 Main Street",
In order to do that i build a simple class that looks like this (I did add it to my WordCount Apache Spark project that i built in WordCount program built using Apache Spark in Java ) This program is similar to any other with difference of few lines, i had to create a Hadoop Configuration object and set properties required to use ESInputFormat as InputFormat and then call sc.newAPIHadoopRDD(} to pass the newly created Hadoop Configuration object to it.

Configuration hadoopConfiguration = new Configuration();
JavaPairRDD esRDD = sc.newAPIHadoopRDD(hadoopConfiguration, EsInputFormat.class, Text.class, MapWritable.class);
The Spark framework reads the ElasticSearch index as Map in which the id of the record is key and the actual record becomes value and get passed as object of MapWritable. You have to use little bit different plumbing to read embedded values stored inside the record I configured Spark to store the output on the disk it created different part files and you can see the content of the part files like this