How to access Hive table from Spark in MapR sandbox

I was trying to figure out how to query a hive table from spark in MapR 5.1 sandbox . So i started spark-shell and tried to query the sample_08 table and i got error saying no such table exists

scala> val sample08 = sqlContext.sql("select * from sample_08")
org.apache.spark.sql.AnalysisException: no such table sample_08; line 1 pos 14
 at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:260)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:268)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:264)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
 at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
 at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:279)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:264)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:254)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
 at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
 at scala.collection.immutable.List.foldLeft(List.scala:84)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
 at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:932)
 at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:932)
 at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:930)
 at org.apache.spark.sql.DataFrame.(DataFrame.scala:132)
 at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
 at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:741)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:19)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:24)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:26)
 at $iwC$$iwC$$iwC$$iwC$$iwC.(:28)
 at $iwC$$iwC$$iwC$$iwC.(:30)
 at $iwC$$iwC$$iwC.(:32)
 at $iwC$$iwC.(:34)
 at $iwC.(:36)
 at (:38)
 at .(:42)
 at .()
 at .(:7)
 at .()
 at $print()
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
 at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
 at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
 at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
 at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
 at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
 at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
 at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
 at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
 at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
 at org.apache.spark.repl.Main$.main(Main.scala:31)
 at org.apache.spark.repl.Main.main(Main.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
 at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
 at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
When i checked the <SPARK_HOME>/conf directory i noticed that hive-site.xml was missing so i searched for hive-site.xml on the cluster i found 2 hive-site.xml but the /opt/mapr/hive/hive-1.2/conf/hive-site.xml had hive.metastore.uris property pointing to thrift://localhost:9083, so i copied it in the hive-site.xml and restarted the shell. When i execute the same query i can see the results.

scala> val sample08 = sqlContext.sql("select * from sample_08")
sample08: org.apache.spark.sql.DataFrame = [code: string, description: string, total_emp: int, salary: int]

How to use custom delimiter character while reading file in Spark

I wanted to figure out how to get spark to read text file and break it based on custom delimiter instead of '\n'. These are my notes on how to do that The Spark Input/Output is based on Mapreduce's InputFormat and OutputFormat. For example when you call SparkContext.textFile() it actually uses TextInputFormat for reading the file. Advantage of this approach is that you do everything that TextInputFormat does. For example by default when you use TextInputFormat to read file it will break the file into records based on \n character. But sometimes you might want to read the file using some other logic. Example i wanted to parse a book based on sentences instead of \n characters, so i looked into TextInputFormat code and i noticed that it takes textinputformat.record.delimiter configuration property that i could set with value equal to '.' and the TextInputFormat returns sentences instead of lines. This sample code shows how to do that Only change in this code is sparkContext.hadoopConfiguration.set("textinputformat.record.delimiter",".") that is setting up hadoop configuration property. When i used this code to parse 2city10.txt i noticed that it has 16104 lines of text but 6554 sentences.

Difference between reduce() and fold() method on Spark RDD

When you can call fold() method on the RDD it returns a different result than you normally expect, so i wanted to figure out how fold() method actually works so i built this simple application First thing that i do in the application is create a simple RDD with 8 values from 1 to 8 and divide it into 3 partitions sparkContext.parallelize(List(1,2,3,4,5,6,7,8),3). Then i am calling input.mapPartitions(mapPartition) to iterate through all the partitions in the RDD and printing records in them one by one. This shows that the RDD has 3 partitions and 1 and 2 are in first partitions 3,4,5 are in second partions and record 6,7,8 are in the third partitions. Then next step is to call input.reduce((x,y)=> add(x,y))) method that will invoke add reduce function on the RDD, as you can see the output. The reduce function simply starts calling add method first for first 2 records then it starts calling it with running count for rest of the elements in the RDD The last part is fold() method which i am calling with initial value of 10. As you can see from the output of fold() method, it first takes 10 as initial value and adds all the elements in single partitions to it. But then it also takes running counts across the RDDs adds 10 to it sums them up. Because of this, the result of fold() = (initial value * num of partitions +1) + sum of reduce

********************** mapPartitions *******************
[Stage 0:>(0 + 0) / 3]2016-02-17 10:22:13 DEBUG HelloSparkPartitions:63 - Inside mapPartition 
1
2
2016-02-17 10:22:13 DEBUG HelloSparkPartitions:63 - Inside mapPartition 
3
4
5
2016-02-17 10:22:13 DEBUG HelloSparkPartitions:63 - Inside mapPartition 
6
7
8
********************** reduce *******************
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 1, 2
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 3, 4
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 7, 5
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 3, 12
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 6, 7
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 13, 8
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 15, 21
input.reduce 36
********************** fold *******************
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 1
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 11, 2
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 13
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 3
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 13, 4
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 17, 5
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 23, 22
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 6
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 16, 7
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 23, 8
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 45, 31
input.fold 76

How to parse fillable PDF form in Java

I wanted to figure out how to parse a fillable PDF form in Java, so that i could do some processing on it. So i built this sample PDFFormParsingPOC project that uses Apache PDFBox library. This is simple java class that i built, in which i read the PDF file first and then parse it into PDDocument. Then i can get all the fields in the PDF form by calling PDDocument.getDocumentCatalog().getAcroForm().getFields() and start iterating through it. For every field that i find, first i try to figure out what is the type of the field and then use it to print the field with its name and value to console You can download the Apache PDFBox project and execute it by passing fully qualified name of the fillable PDF form and it will print out field name value pairs to console. If you dont have a pdf form already you can download Sample Fillable PDF Form

Invoking Python from Spark Scala project

When your developing your Spark code, you have option of developing it using either Scala, Java or Python. In some cases you might want to mix the languages that you want to use. I wanted to try that out so i built this simple Spark program that passes control to Python for performing transformation (All that it does it append word "python " in front of every line). You can download source code for sample project from here First thing that i did was to develop this simple python script that reads one line at a time from console, appends "Python " to the line and writes it back to standard console Now this is how the driver looks like, most of the spark code is same only difference is lines.pipe("python echo.py") which says that pass every line in the RDD to python echo.py. and collect the output. Now there is nothing specific to python here, instead you could use any executable. When you run this code in cluster you should copy the python file on your machine say in spark directory then you can execute

bin/spark-submit 
    --files echo.py  
    ScalaPython-1.0-SNAPSHOT-jar-with-dependencies.jar helloworld.txt

How to use HBase sink with Flume

I wanted to figure out how to use HBase as target for flume, so i created this sample configuration which reads events from netcat and writes them to HBase.
  1. First step is to create test table in HBase with CF1 as column family. Everytime Flume gets a event it will write to HBase in test table in CF1 column family
    
    create 'test','CF1'
    
  2. Create Flume configuration file that looks like this, I am using HBase sink with SimpleHbaseEventSerializer as Event Serializer. Note that i am assuming that this is unsecured cluster (Sandbox), but if you have secured cluster you should follow steps mentioned in Configure a Secure HBase Sink
  3. Start the Flume server with the following command
    
    bin/flume-ng agent --conf conf --conf-file conf/netcat-hbase.properties --name agent1 -Dflume.root.logger=DEBUG,console
    
  4. Now open the netcat client on port 44444 and send some messages to flume
  5. If you query HBase test table, you should see the messages that were published to netcat

Reading content of file into String in scala

One of the common requirements is to read content of a file into String, You would want to read content of config file at particular path in your program at runtime but during testing you would wan to read content of a file on class path. I built this simple class that takes has following 2 methods
  1. getFilePathContent(): This method takes full path of file and reads its content into string
  2. getResourceContent(): THis method takes relative path of a file already available on classpath and converts it into String

Hello Apache Tika

Apache Tika is nice framework that lets you extract content of file. Example you can extract content of PDF or word document or excel as string. It also lets you extract metadata about the file. For example things like when it was created, author,.. etc. I built this sample application to play with Tika You can try using it by giving it full path of the file that you want to extract.

Flume to Spark Streaming - Pull model

In this post i will demonstrate how to stream data from flume into Spark using Streaming. When it comes to Streaming data from Flume to Spark you have 2 options.
  1. Push Model: Spark listens on particular port for Avro event and flume connects to that port and publishes event
  2. Pull Model: You use special Spark Sink in flume that keeps collecting published data and Spark pulls that data at certain frequency
I built this simple configuration in which i could send event to flume on netcat, flume would take those events and send them to Spark as well as print to console.
  • First download spark-streaming-flume-sink_2.10-1.6.0.jar and copy it to flume/lib directory
  • Next create flume configuration that looks like this, as you can see, Flume is listening for netcat event on port 44444 and it is taking every event and replicating it to both logger and Spark sink. Spark sink would listen on port 9999 for Spark program to connect
  • This is how your Spark driver will look like. The Spark Flume listener gets event in avro format so you will have to call event.getBody().array() to get the event.
Once your spark and flume agents are started open netcat on port 44444 and send messages, those messages should appear in your Spark Console

Setting up local repository for maven

Before couple of days i was working with my colleague on setting up a cluster in AWS for Spark Lab. One problem we ran into is every time you start a spark build it download bunch of this dependencies(In our case around 200 MB mostly because of complexity of our dependencies). We thought if every student has to download all the dependencies it would take lot of time and cost money for network bandwidth consumption. So the way we ended up solving this issue is first we ran the maven build for first user say user1. Once that script worked we copied the /user/user01/.m2/repository folder to /opt/mavenrepo directory. Then everytime some other user ran the maven script they pointed to the existing directory on that machine and use the dependencies that are already downloaded.

mvn package -Dmaven.repo.local=/opt/mavenrepo/repository