Showing posts with label storm. Show all posts
Showing posts with label storm. Show all posts

How to use Apache Storm Trident API

Sometime back i blogged about HelloWorld - Apache Storm Word Counter program , which demonstrates how to build WordCount program using Apache Storm. Now problem with that project was that it was not Maven project instead i had screen shot of all the jars that you will have to include in the program. So i changed it to use Apache Maven as build framework. You can download the source code. In addition to normal API, storm also provides trident API, which allows us to build much compact code, i wanted to try that out so i built this simple Word Count program using Trident API. While using Trident API you will have to start by creating object of TridentTopology, you still need LineReaderSpout that takes file path as input, reads and emits one line of file at a time. But the part that is different is you dont need WordSpitterBolt and WordCounterBolt, instead you can use compact code like this

 topology.newStream("spout",lineReaderSpout)
.each(new Fields("line"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.aggregate(new Fields("word"), new Count(), new Fields("count"))
.each(new Fields("word","count"), new Debug());
The each(new Fields("line"), new Split(), new Fields("word")) line takes the line emitted by the LineReaderSpout and uses built in storm.trident.operation.builtin.Split function to split the lines into words and emits each word as Tuple. The groupBy(new Fields("word")) line takes the tuples and groups them by word's. The aggregate(new Fields("word"), new Count(), new Fields("count")) line takes care of aggregating the words and counts them(At this point you have a tuple like {word,count}), for that it uses storm.trident.operation.builtin.Count class. The last part is .each(new Fields("word","count"), new Debug());, which takes care of printing each tuple which in WORD count format. Trident API provides set of sample classes that makes developing WordCount type of program very easy. But you could have created your own version of Split and Count program and the code would still look significantly compact

HelloWorld - Apache Storm Word Counter program

I wanted to learn Apache Storm so i started by creating this simple Hello World type Word counter program, the basic idea behind the program is it takes .txt file as input and passes it to LineReaderSpout.java, which reads the file one line at a time and passes it to Storm for further processing. Storm will pass each line to WordSpitterBolt.java, this class is responsible for splitting the line into multiple words and passing them back to Storm for further processing, The last part is WordCounterBolt.java which takes each of the word and maintains a HashMap of words with their frequency count. At the end the WordCounetrBolt.java will print all the words to the console. You can download the project from here
  • First download the Apache Storm binaries from storm-project.net. In my case i did download storm-0.9.0.1, extract the content onto your local directory
  • Create HelloStorm project in eclipse, add all .jar's in <STORM_HOME> directory as well as all the jars from <STORM_HOME>/lib directory, it should look like this
  • Now create LineReaderSpout.java like this, implement IRichSpout interface The LineReaderSpout code has three important methods
    1. open(): This method would get called at the start and will give you context information. You read value of inputFile configuration variable and read that file
    2. nextTuple(): This method would allow you to pass one tuple to storm for processing at a time, in this method i am just reading one line from file and pass it to tuple
    3. declareOutputFields(): This method declares that LineReaderSpout is going to emit line tuple
  • Next create WordSpitterBolt.java which implements WordSpitterBolt interface like this
    1. prepare(): This method is similar to open() method in LineReaderSpout, it allows you to initialize your code and get access to OutputCollector object for passing output back to Storm
    2. declareOutputFields(): This method is similar to declareOutputFields() method in LineReaderSpout, it declares that it is going to return word tuple for further processing
    3. execute(): This is the method where you implement business logic of your bolt, in this case i am splitting the input line into words and passing them back to Storm for further processing
  • Next create the WordCounterBolt.java which implements the IRichBolt interface like this The
    1. prepare(): In this method i am creating a HashMap that would be used for maintaining list of words to their frequency count
    2. declareOutputFields(): This method is empty because we dont want to return any tuples for further processing
    3. execute(): This method takes care of building/maintaining a HashMap for counting the frequency of the words
    4. cleanup(): This method would be called at the end and we are using it to print all the words with their frequency
  • The HelloStorm.java is the place where everything gets tied together, it specifies how the Spouts and Bolts are related to each other,.. etc
  • Now execute the HelloStorm.java and pass it name of the text file that you want to count words in.