hadoop/contact
index/type that contains contact records that look like this, with every contact having first name, last name and address. I want to write a program that tells me how many contacts are from particular city.
{
"lastName":"Tendulkar",
"address":[
{
"country":"India\t",
"addressLine1":"1 Main Street",
"city":"Mumbai"
}
],
"firstName":"Sachin",
"dateOfBirth":"1973-04-24"
}
In order to do that i build a simple HelloESInputSpark.java class that looks like this (I did add it to my WordCount Apache Spark project that i built in WordCount program built using Apache Spark in Java )
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.spnotes.spark; | |
import com.twitter.chill.Externalizer; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.io.MapWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.MRJobConfig; | |
import org.apache.spark.SparkConf; | |
import org.apache.spark.api.java.JavaPairRDD; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.api.java.function.Function; | |
import org.apache.spark.api.java.function.Function2; | |
import org.apache.spark.api.java.function.PairFunction; | |
import org.elasticsearch.hadoop.mr.EsInputFormat; | |
import org.elasticsearch.hadoop.mr.EsOutputFormat; | |
import org.elasticsearch.hadoop.mr.WritableArrayWritable; | |
import scala.Tuple2; | |
/** | |
* Created by user on 8/25/14. | |
*/ | |
public class HelloESInputSpark { | |
public static void main(String[] argv){ | |
System.setProperty("hadoop.home.dir","/usr/local/hadoop"); | |
SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local"); | |
conf.set("spark.serializer", org.apache.spark.serializer.KryoSerializer.class.getName()); | |
JavaSparkContext sc = new JavaSparkContext(conf); | |
Configuration hadoopConfiguration = new Configuration(); | |
hadoopConfiguration.set("es.nodes","localhost:9200"); | |
hadoopConfiguration.set("es.resource","hadoop/contact"); | |
JavaPairRDD<Text,MapWritable> esRDD = sc.newAPIHadoopRDD(hadoopConfiguration, EsInputFormat.class, Text.class, MapWritable.class); | |
System.out.println("Count of records founds is " + esRDD.count()); | |
//This function will get ES record key as first parameter and the ES record as second parameter, it will return {city,1} tuple for each city in the record | |
JavaPairRDD<String, Integer> cityCountMap = esRDD.mapToPair(new PairFunction<Tuple2<Text, MapWritable>, String, Integer>() { | |
@Override | |
public Tuple2<String, Integer> call(Tuple2<Text, MapWritable> currentEntry) throws Exception { | |
MapWritable valueMap = currentEntry._2(); | |
WritableArrayWritable address =(WritableArrayWritable) valueMap.get(new Text("address")); | |
MapWritable addressMap = (MapWritable)address.get()[0]; | |
Text city = (Text)addressMap.get(new Text("city")); | |
return new Tuple2<String, Integer>(city.toString(),1); | |
} | |
}); | |
//This is reducer which will maintain running count of city vs count | |
JavaPairRDD<String, Integer> cityCount = cityCountMap.reduceByKey(new Function2<Integer, Integer, Integer>() { | |
@Override | |
public Integer call(Integer first, Integer second) throws Exception { | |
return first + second; | |
} | |
}); | |
cityCount.saveAsTextFile("file:///tmp/sparkes"); | |
} | |
} |
ESInputFormat
as InputFormat and then call sc.newAPIHadoopRDD(}
to pass the newly created Hadoop Configuration object to it.
Configuration hadoopConfiguration = new Configuration();
hadoopConfiguration.set("es.nodes","localhost:9200");
hadoopConfiguration.set("es.resource","hadoop/contact");
JavaPairRDD esRDD = sc.newAPIHadoopRDD(hadoopConfiguration, EsInputFormat.class, Text.class, MapWritable.class);
The Spark framework reads the ElasticSearch index as Map in which the id of the record is key and the actual record becomes value and get passed as object of MapWritable
. You have to use little bit different plumbing to read embedded values stored inside the record
I configured Spark to store the output on the disk it created different part files and you can see the content of the part files like this
1 comment:
Thanks for sharing your knowledge with us,keep sharingmore blog posts with us.
Thank you...
online course for big data and hadoop
best online training for big data and hadoop
Post a Comment