Writing data from Spark to ElasticSearch

ElasticSearch for Apache Hadoop project has introduced a way to directly write to ElasticSearch without going through Elastic Search OutputFormat. I wanted to try that out so i built simple application that saves output of word count into Elastic Search, you can download this project from github First thing that i had to do was to build maven pom.xml that includes org.elasticsearch.elasticsearch-hadoop version 5.0 jar. I could not find it in the regular maven repository so i had to include elasticsearch repository in my pom.xml Then this is how my Spark program looks like, the main part is line 42 where i create Map of all the properties that i need for saving this RDD into ElasticSearch and then line 43, where i am calling wordCountJson.saveToEs(esMap), which actually takes care of writing data into elasticsearch

How to use KafkaLog4jAppender for sending Log4j logs to kafka

Apache Kafka has a KafkaLog4jAppender that you can use for redirecting your Log4j log to Kafka topic. I wanted to try it out so i used following steps, you can download sample project from here First i created a simple standalone java program that use Log4j like this. As you can see this is like any other normal Java program that uses Log4j. Then in the log4j.properties file i added line 12 to 17 for using KafkaLog4jAppender, on line 13, value of brokerList property points to the Kafka server and line 14 value of topic points to the Kafka topic name to which logs should go. Now before running this program make sure that you actually have topic named kafkalogger, if not you can create using this command

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkalogger
You can verify if you have topic named kafkalogger by executing following command

bin/kafka-topics.sh --list --zookeeper localhost:2181
Also you can run kafka console consumer that reads messages from Kafka and prints them to console, using following command

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkalogger
Now when you run your java program you should see messages on console like this

WordCount program using Spark DataFrame

I wanted to figure out how to write Word Count Program using Spark DataFrame API, so i followed these steps. Import org.apache.spark.sql.functions._, it includes UDF's that i need to use import org.apache.spark.sql.functions._ Create a data frame by reading README.md. When you read the file, spark will create a data frame with single column value, the content of the value column would be the line in the file

val df = sqlContext.read.text("README.md")
df.show(10,truncate=false)
Next split each of the line into words using split function. This will create a new DataFrame with words column, each words column would have array of words for that line

val wordsDF = df.select(split(df("value")," ").alias("words"))
wordsDF.show(10,truncate=false)
Next use explode transformation to convert the words array into a dataframe with word column. This is equivalent of using flatMap() method on RDD

val wordDF = wordsDF.select(explode(wordsDF("words")).alias("word"))
wordsDF.show(10,truncate=false)
Now you have data frame with each line containing single word in the file. So group the data frame based on word and count the occurrence of each word

val wordCountDF = wordDF.groupBy("word").count
wordCountDF.show(truncate=false)
This is the code you need if you want to figure out 20 top most words in the file

wordCountDF.orderBy(desc("count")).show(truncate=false)

How to use built in spark UDF's

In the i talked about how to create a custom UDF in scala for spark. But before you do that always check Spark UDF's that are available with Spark already. I have this sample Spark data frame with list of users I wanted to sort the list of users in descending order of age so i used following 2 lines, first is to import functions that are available with Spark already and then i used desc function to order age in descending order

import org.apache.spark.sql.functions._
display(userDF.orderBy(desc("age")))
Now if i wanted to sort the data frame records using age in ascending order

display(userDF.orderBy(asc("age")))
This is sample of how to use the sum() function

userDF.select(sum("age")).show

How to use built in spark UDF's

In the i talked about how to create a custom UDF in scala for spark. But before you do that always check Spark UDF's that are available with Spark already. I have this sample Spark data frame with list of users I wanted to sort the list of users in descending order of age so i used following 2 lines, first is to import functions that are available with Spark already and then i used desc function to order age in descending order

import org.apache.spark.sql.functions._
display(userDF.orderBy(desc("age")))
Now if i wanted to sort the data frame records using age in ascending order

display(userDF.orderBy(asc("age")))
This is sample of how to use the sum() function

userDF.select(sum("age")).show

How to define Scala UDF in Spark

Scala UDF Sample Notebook

How to access Hive table from Spark in MapR sandbox

I was trying to figure out how to query a hive table from spark in MapR 5.1 sandbox . So i started spark-shell and tried to query the sample_08 table and i got error saying no such table exists

scala> val sample08 = sqlContext.sql("select * from sample_08")
org.apache.spark.sql.AnalysisException: no such table sample_08; line 1 pos 14
 at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:260)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:268)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:264)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
 at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
 at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:279)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:264)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:254)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
 at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
 at scala.collection.immutable.List.foldLeft(List.scala:84)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
 at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:932)
 at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:932)
 at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:930)
 at org.apache.spark.sql.DataFrame.(DataFrame.scala:132)
 at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
 at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:741)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:19)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:24)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:26)
 at $iwC$$iwC$$iwC$$iwC$$iwC.(:28)
 at $iwC$$iwC$$iwC$$iwC.(:30)
 at $iwC$$iwC$$iwC.(:32)
 at $iwC$$iwC.(:34)
 at $iwC.(:36)
 at (:38)
 at .(:42)
 at .()
 at .(:7)
 at .()
 at $print()
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
 at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
 at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
 at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
 at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
 at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
 at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
 at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
 at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
 at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
 at org.apache.spark.repl.Main$.main(Main.scala:31)
 at org.apache.spark.repl.Main.main(Main.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
 at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
 at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
When i checked the <SPARK_HOME>/conf directory i noticed that hive-site.xml was missing so i searched for hive-site.xml on the cluster i found 2 hive-site.xml but the /opt/mapr/hive/hive-1.2/conf/hive-site.xml had hive.metastore.uris property pointing to thrift://localhost:9083, so i copied it in the hive-site.xml and restarted the shell. When i execute the same query i can see the results.

scala> val sample08 = sqlContext.sql("select * from sample_08")
sample08: org.apache.spark.sql.DataFrame = [code: string, description: string, total_emp: int, salary: int]

How to use custom delimiter character while reading file in Spark

I wanted to figure out how to get spark to read text file and break it based on custom delimiter instead of '\n'. These are my notes on how to do that The Spark Input/Output is based on Mapreduce's InputFormat and OutputFormat. For example when you call SparkContext.textFile() it actually uses TextInputFormat for reading the file. Advantage of this approach is that you do everything that TextInputFormat does. For example by default when you use TextInputFormat to read file it will break the file into records based on \n character. But sometimes you might want to read the file using some other logic. Example i wanted to parse a book based on sentences instead of \n characters, so i looked into TextInputFormat code and i noticed that it takes textinputformat.record.delimiter configuration property that i could set with value equal to '.' and the TextInputFormat returns sentences instead of lines. This sample code shows how to do that Only change in this code is sparkContext.hadoopConfiguration.set("textinputformat.record.delimiter",".") that is setting up hadoop configuration property. When i used this code to parse 2city10.txt i noticed that it has 16104 lines of text but 6554 sentences.

Difference between reduce() and fold() method on Spark RDD

When you can call fold() method on the RDD it returns a different result than you normally expect, so i wanted to figure out how fold() method actually works so i built this simple application First thing that i do in the application is create a simple RDD with 8 values from 1 to 8 and divide it into 3 partitions sparkContext.parallelize(List(1,2,3,4,5,6,7,8),3). Then i am calling input.mapPartitions(mapPartition) to iterate through all the partitions in the RDD and printing records in them one by one. This shows that the RDD has 3 partitions and 1 and 2 are in first partitions 3,4,5 are in second partions and record 6,7,8 are in the third partitions. Then next step is to call input.reduce((x,y)=> add(x,y))) method that will invoke add reduce function on the RDD, as you can see the output. The reduce function simply starts calling add method first for first 2 records then it starts calling it with running count for rest of the elements in the RDD The last part is fold() method which i am calling with initial value of 10. As you can see from the output of fold() method, it first takes 10 as initial value and adds all the elements in single partitions to it. But then it also takes running counts across the RDDs adds 10 to it sums them up. Because of this, the result of fold() = (initial value * num of partitions +1) + sum of reduce

********************** mapPartitions *******************
[Stage 0:>(0 + 0) / 3]2016-02-17 10:22:13 DEBUG HelloSparkPartitions:63 - Inside mapPartition 
1
2
2016-02-17 10:22:13 DEBUG HelloSparkPartitions:63 - Inside mapPartition 
3
4
5
2016-02-17 10:22:13 DEBUG HelloSparkPartitions:63 - Inside mapPartition 
6
7
8
********************** reduce *******************
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 1, 2
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 3, 4
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 7, 5
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 3, 12
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 6, 7
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 13, 8
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 15, 21
input.reduce 36
********************** fold *******************
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 1
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 11, 2
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 13
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 3
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 13, 4
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 17, 5
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 23, 22
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 6
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 16, 7
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 23, 8
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 45, 31
input.fold 76

How to parse fillable PDF form in Java

I wanted to figure out how to parse a fillable PDF form in Java, so that i could do some processing on it. So i built this sample PDFFormParsingPOC project that uses Apache PDFBox library. This is simple java class that i built, in which i read the PDF file first and then parse it into PDDocument. Then i can get all the fields in the PDF form by calling PDDocument.getDocumentCatalog().getAcroForm().getFields() and start iterating through it. For every field that i find, first i try to figure out what is the type of the field and then use it to print the field with its name and value to console You can download the Apache PDFBox project and execute it by passing fully qualified name of the fillable PDF form and it will print out field name value pairs to console. If you dont have a pdf form already you can download Sample Fillable PDF Form