How to use HBase sink with Flume

I wanted to figure out how to use HBase as target for flume, so i created this sample configuration which reads events from netcat and writes them to HBase.
  1. First step is to create test table in HBase with CF1 as column family. Everytime Flume gets a event it will write to HBase in test table in CF1 column family
    
    create 'test','CF1'
    
  2. Create Flume configuration file that looks like this, I am using HBase sink with SimpleHbaseEventSerializer as Event Serializer. Note that i am assuming that this is unsecured cluster (Sandbox), but if you have secured cluster you should follow steps mentioned in Configure a Secure HBase Sink
  3. Start the Flume server with the following command
    
    bin/flume-ng agent --conf conf --conf-file conf/netcat-hbase.properties --name agent1 -Dflume.root.logger=DEBUG,console
    
  4. Now open the netcat client on port 44444 and send some messages to flume
  5. If you query HBase test table, you should see the messages that were published to netcat

Reading content of file into String in scala

One of the common requirements is to read content of a file into String, You would want to read content of config file at particular path in your program at runtime but during testing you would wan to read content of a file on class path. I built this simple class that takes has following 2 methods
  1. getFilePathContent(): This method takes full path of file and reads its content into string
  2. getResourceContent(): THis method takes relative path of a file already available on classpath and converts it into String

Hello Apache Tika

Apache Tika is nice framework that lets you extract content of file. Example you can extract content of PDF or word document or excel as string. It also lets you extract metadata about the file. For example things like when it was created, author,.. etc. I built this sample application to play with Tika You can try using it by giving it full path of the file that you want to extract.

Flume to Spark Streaming - Pull model

In this post i will demonstrate how to stream data from flume into Spark using Streaming. When it comes to Streaming data from Flume to Spark you have 2 options.
  1. Push Model: Spark listens on particular port for Avro event and flume connects to that port and publishes event
  2. Pull Model: You use special Spark Sink in flume that keeps collecting published data and Spark pulls that data at certain frequency
I built this simple configuration in which i could send event to flume on netcat, flume would take those events and send them to Spark as well as print to console.
  • First download spark-streaming-flume-sink_2.10-1.6.0.jar and copy it to flume/lib directory
  • Next create flume configuration that looks like this, as you can see, Flume is listening for netcat event on port 44444 and it is taking every event and replicating it to both logger and Spark sink. Spark sink would listen on port 9999 for Spark program to connect
  • This is how your Spark driver will look like. The Spark Flume listener gets event in avro format so you will have to call event.getBody().array() to get the event.
Once your spark and flume agents are started open netcat on port 44444 and send messages, those messages should appear in your Spark Console

Setting up local repository for maven

Before couple of days i was working with my colleague on setting up a cluster in AWS for Spark Lab. One problem we ran into is every time you start a spark build it download bunch of this dependencies(In our case around 200 MB mostly because of complexity of our dependencies). We thought if every student has to download all the dependencies it would take lot of time and cost money for network bandwidth consumption. So the way we ended up solving this issue is first we ran the maven build for first user say user1. Once that script worked we copied the /user/user01/.m2/repository folder to /opt/mavenrepo directory. Then everytime some other user ran the maven script they pointed to the existing directory on that machine and use the dependencies that are already downloaded.

mvn package -Dmaven.repo.local=/opt/mavenrepo/repository

Monitoring HDFS directory for new files using Spark Streaming

I wanted to build this simple Spark Streaming application that monitors a particular directory in HDFS and whenever a new file shows up, i want to print its content to Console. I built this HDFSFileStream.scala. In this program after creating a SparkStreamContext. I am calling sparkStreamingContext.textFileStream(<directoryName>) on it. Once a new file appears in the directory the value of fileRDD.count() would return more than 0 and then i invoke processNewFile(). The processNewFile() method takes a RDD[String], iterates through the file content and prints it to console Next start the program by executing following code bin/spark-submit ~/HelloSparkStreaming-1.0-SNAPSHOT-jar-with-dependencies.jar /user/mapr/stream 3 Once the streaming started it starts monitoring /user/mapr/stream directory, for new content. I copied a file with few lines in it and i got the following output, which is content of the file

Problem with scala version mismatch in Spark application

I was developing a spark program on my machine and it worked ok. But when i tried to deploy it in Spark running in my Hadoop sandbox i started getting this error

java.lang.NoSuchMethodError: scala.runtime.IntRef.create(I)Lscala/runtime/IntRef;
 at com.spnotes.enrich.CSVFieldEnricher.enrich(CSVFieldEnricher.scala:31)
 at com.spnotes.PMDriver$$anonfun$1$$anonfun$apply$2.apply(PMDriver.scala:59)
 at com.spnotes.PMDriver$$anonfun$1$$anonfun$apply$2.apply(PMDriver.scala:58)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
 at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
 at com.spnotes.PMDriver$$anonfun$1.apply(PMDriver.scala:58)
 at com.spnotes.PMDriver$$anonfun$1.apply(PMDriver.scala:56)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1469)
 at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1006)
 at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1006)
 at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
16/01/05 13:03:53 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.NoSuchMethodError: scala.runtime.IntRef.create(I)Lscala/runtime/IntRef;
 at com.spnotes.enrich.CSVFieldEnricher.enrich(CSVFieldEnricher.scala:31)
 at com.spnotes.PMDriver$$anonfun$1$$anonfun$apply$2.apply(PMDriver.scala:59)
 at com.spnotes.PMDriver$$anonfun$1$$anonfun$apply$2.apply(PMDriver.scala:58)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
 at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
 at com.spnotes.PMDriver$$anonfun$1.apply(PMDriver.scala:58)
 at com.spnotes.PMDriver$$anonfun$1.apply(PMDriver.scala:56)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1469)
 at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1006)
 at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1006)
 at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
So it seems the problem is you need to use same version of Scala for compiling your code as the Scala used by Spark. In my case i was using scala 2.11 for compiling my code and Spark 1.3.1 uses Scala 2.10.4. So i changed the build file and then rebuilt the code and deployed it. That fixed the issue

Spark error class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package

I had this Spark Program that was working from both IDE and when i built a .jar file and deployed it in Spark. But it suddenly stopped working in IDE, whenever i tried executing in IDE, i was following error

16/01/05 14:34:50 INFO SparkEnv: Registering OutputCommitCoordinator
Exception in thread "main" java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package
 at java.lang.ClassLoader.checkCerts(ClassLoader.java:895)
 at java.lang.ClassLoader.preDefineClass(ClassLoader.java:665)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:758)
 at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
 at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
 at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at org.spark-project.jetty.servlet.ServletContextHandler.(ServletContextHandler.java:136)
 at org.spark-project.jetty.servlet.ServletContextHandler.(ServletContextHandler.java:129)
 at org.spark-project.jetty.servlet.ServletContextHandler.(ServletContextHandler.java:98)
 at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:101)
 at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:92)
 at org.apache.spark.ui.WebUI.attachPage(WebUI.scala:78)
 at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:62)
 at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:62)
 at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at org.apache.spark.ui.WebUI.attachTab(WebUI.scala:62)
 at org.apache.spark.ui.SparkUI.initialize(SparkUI.scala:50)
 at org.apache.spark.ui.SparkUI.(SparkUI.scala:61)
 at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:151)
 at org.apache.spark.ui.SparkUI$.createLiveUI(SparkUI.scala:106)
 at org.apache.spark.SparkContext.(SparkContext.scala:300)
 at com.mapr.QS.PMDriver$.main(PMDriver.scala:32)
 at com.mapr.QS.PMDriver.main(PMDriver.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:497)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

Process finished with exit code 1
So the problem it seems is that order of javax.servlet:servlet-api.jar was wrong. I opened the Project setting and moved the dependency jar to the end of the list and it started working. This is the screen shot of Intellij settings
This is screen shot of how to achieve same thing in Eclipse

How to use Hadoop's InputFormat and OutputFormat in Spark

One of the things that i like about Spark is, that it allows you to use you MapReduce based InputFormat and OutputFormats for reading from and writing to. I wanted to try this i built the InputFormatOutputDriver class, that uses TextInputFormat for reading a file. Then uses that input to perform word count and finally uses TextOutputFormat for storing output As you can see most of the code is similar to WordCount program built using Apache Spark in Java , with difference that this is written in scala and following 2 lines When you want to use Hadoop API for reading data you should use sparkContext.newAPIHadoopFile() method, i am using version of the method that takes 4 parameters. First is path of input file, second parameter is the InputFormat class you want to use (I want to read file as Key - Value pair so i am using KeyValueTextInputFormat), then the next parameters is type of Key and Type of value, its Text for both key and value in my example and the last. Spark will read the file into a PairRDD[Text,Text], since i am only interested in the content of the file i am iterating through the keys and converting them from Text to String

val lines = sparkContext.newAPIHadoopFile(inputFile,classOf[KeyValueTextInputFormat], 
classOf[Text],classOf[Text]).keys.map(lineText => lineText.toString)
Once i have RDD[String] i can perform wordcount with it. But once the results are ready i am calling wordCountRDD.saveAsNewAPIHadoopFile() for storing data in Hadoop using TextOutputFormat.

wordCountRDD.saveAsNewAPIHadoopFile(outputFile,classOf[Text],classOf[IntWritable],
classOf[TextOutputFormat[Text,IntWritable]])

How to use ZooInsepector

The Zookeeper has a ZooInspector GUI that you can use for inspecting your zNode structure, you can use it with these steps
  1. First go to the ZooInsepector directory (I am assuming that you already have ZooKeeper on your machine, if not download it from Zookeeper home page)
    
    cd <ZOOKEEPER_HOME>/contrib/ZooInspector
    
  2. You can start the ZooInspector by using following command which makes sure that the necessary jars are on the classpath
    
    java -cp ../../lib/*:lib/*:zookeeper-3.4.7-ZooInspector.jar:../../zookeeper-3.4.7.jar org.apache.zookeeper.inspector.ZooInspector
    
  3. Once the ZooInspector started enter URL of the Zookeeper that you want to inspect
  4. Once ZooInspector will show you the zNode hierarchy on that server