HelloWorld - Apache Storm Word Counter program

I wanted to learn Apache Storm so i started by creating this simple Hello World type Word counter program, the basic idea behind the program is it takes .txt file as input and passes it to LineReaderSpout.java, which reads the file one line at a time and passes it to Storm for further processing. Storm will pass each line to WordSpitterBolt.java, this class is responsible for splitting the line into multiple words and passing them back to Storm for further processing, The last part is WordCounterBolt.java which takes each of the word and maintains a HashMap of words with their frequency count. At the end the WordCounetrBolt.java will print all the words to the console. You can download the project from here
  • First download the Apache Storm binaries from storm-project.net. In my case i did download storm-0.9.0.1, extract the content onto your local directory
  • Create HelloStorm project in eclipse, add all .jar's in <STORM_HOME> directory as well as all the jars from <STORM_HOME>/lib directory, it should look like this
  • Now create LineReaderSpout.java like this, implement IRichSpout interface The LineReaderSpout code has three important methods
    1. open(): This method would get called at the start and will give you context information. You read value of inputFile configuration variable and read that file
    2. nextTuple(): This method would allow you to pass one tuple to storm for processing at a time, in this method i am just reading one line from file and pass it to tuple
    3. declareOutputFields(): This method declares that LineReaderSpout is going to emit line tuple
  • Next create WordSpitterBolt.java which implements WordSpitterBolt interface like this
    1. prepare(): This method is similar to open() method in LineReaderSpout, it allows you to initialize your code and get access to OutputCollector object for passing output back to Storm
    2. declareOutputFields(): This method is similar to declareOutputFields() method in LineReaderSpout, it declares that it is going to return word tuple for further processing
    3. execute(): This is the method where you implement business logic of your bolt, in this case i am splitting the input line into words and passing them back to Storm for further processing
  • Next create the WordCounterBolt.java which implements the IRichBolt interface like this The
    1. prepare(): In this method i am creating a HashMap that would be used for maintaining list of words to their frequency count
    2. declareOutputFields(): This method is empty because we dont want to return any tuples for further processing
    3. execute(): This method takes care of building/maintaining a HashMap for counting the frequency of the words
    4. cleanup(): This method would be called at the end and we are using it to print all the words with their frequency
  • The HelloStorm.java is the place where everything gets tied together, it specifies how the Spouts and Bolts are related to each other,.. etc
  • Now execute the HelloStorm.java and pass it name of the text file that you want to count words in.

23 comments:

Daniele said...

This was very useful, thank you!

Sidhartha Ray said...

Hi Sunil,

Thank you very much for the post.
It's my 1st Storm example I did with your help.

Keep it up man.
Again Thank You Very Much!

Anonymous said...

Thanks for doing this Sunil.

I just got it going on my Mac by doing the following:
1. brew install storm
2. File > Import > General > Existing Projects into Workspace
3. Configure as Maven Project
4. In Properties > Java Build Path > Libraries, delete all those external jars
5. Add the Maven dependency:
org.apache.storm storm-core 0.9.2-incubating

Saba said...

Hi Sunil,

Thank you very much for the post.

I am facing one problem here, I am not able to see world count process in Storm UI.Anything I need to change in the above code u posted?

Anonymous said...

Hi there,

Thanks for the code, although I'm not sure it will work. I think it may double count as you are using the shuffleGrouping instead of the fieldGrouping, which means that the same word can be counted in different threads erroneously. Also once you have counted the words surely you would want to output them to check that the count is correct, I can't see where you do that in the program. Happy to discuss if I'm incorrect.

Anonymous said...

great work sunil

nageswara rao said...

Hi Sunil,

Thank you very much for the post.

Good job!

thanks

Harshawardhan Kulkarni said...

How are you executing this program...i am a newbie to storm..can anyone just post the command to execute it...
Thanks

Anonymous said...

This should work as normal java program in local mode probably.....else create a jar and execute in strom pseudo distributed cluster

Harshawardhan Kulkarni said...

Thanks Now I am able to execute, but not able to see the Storm UI as my Ports asre disabled on my AWS Instance

Anonymous said...

Was this run on Windows?
Someone passed this link and said Storm topology could be submitted via above code on Windows...is this true?
Dont you need nimbus and supervisor at la bare minimum?
Tx

syed jameer said...

hi,

can any one tell me how to pass the input file for this hello world example

syed jameer said...

hello,

can any one help me how to pass the text file in this program

getting worker died

Samba Siva Reddy said...

Hi Sunil,

Am getting Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0, pleas suggest me..

Retina SIRI said...

Hi Sunil,
I'm tying to read a big file with this program (860.6MB) but i got am error like this:

458119 [Thread-4-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error)
458120 [Thread-4-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 26341ms for sessionid 0x1512b84729d000b, closing socket connection and attempting reconnect
Java Result: 20


How can i do?

Sushant said...

Thanks a ton!!
Pls put some more examples like this as the Apache Documentation is not very much clear in itself..

Ankita said...

Hey thanks sunil for this post it help us...

i am getting "Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0
at com.spnotes.storm.HelloStorm.main(HelloStorm.java:16)" while running HelloStorm.java file in eclipse

Will u help to solve this error? or explain that how to pass input file in this program?

Anonymous said...

Hi sunil

I am having issue in the installation/running of storm, can I get some help??
gothmic@gmail.com

Jimmy Tekli said...

hello , i tested the topology locally and it worked great !! i was trying to submit it on a remote cluster , i replaced cluster.submitTopology() with StormSubmitter.submitTopology() and changed multiple things concerning the configuration but the problem is i don 't know where to store the text file from which i want the spout to read the data from. Do you have any idea concerning this matter??
Thanks in advance

Anonymous said...

Hi !
Thanks Sunil for this basic and yet very helpful project. I see that many newbies like me visit this site for reference and have been asking about how to run this program.

Steps to run this program
mvn compile
mvn package - this will create a jar of the project.

1. Navigate to your storm bin folder:
cd /Users/nav/programming/apache-storm-1.0.1/bin

2. Start nimbus
./storm nimbus

3. Start supervisor
./storm supervisor

4. Start the ui program
./storm ui

5. Navigate to the storm bin folder again
cd /Users/nav/programming/apache-storm-1.0.1/bin

6. Submit your jar file to storm ./storm jar /Users/nav/myworkspace/StormTrial/build/libs/HelloStorm2-0.0.1-SNAPSHOT.jar com.spnotes.storm.HelloStorm test.txt

The above command is basically just this.If you want to pass commandline arguments to your program, add it at the end:
stormExecutable jarOption pathToYourJarFile theClassContainingYourMainFile commandlineArguments

If you run the above program as-is , it might not show up on the storm UI. So what I did was add this little snippet in main method of HelloStorm.java

if (args[1] != null && args[1].length() > 0) {
config.setNumWorkers(3);
StormSubmitter.submitTopology(args[1], config, builder.createTopology());
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("HelloStorm", config, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}

Then do the following
1.mvn compile
2.mvn package
3.Submit your jar file to storm ./storm jar /Users/nav/....../HelloStorm2-0.0.1-SNAPSHOT.jar com.spnotes.storm.HelloStorm test.txt
demo

Observe that in the 3rd step above you are giving two command line arguments. The first one is the input test file required by the program. The second argument is the name of topology that appears on the UI.

Hope this helps :)

Anonymous said...

In reference to the above comment -
3.Submit your jar file to storm ./storm jar /Users/nav/....../HelloStorm2-0.0.1-SNAPSHOT.jar com.spnotes.storm.HelloStorm test.txt demo
arg[0] = test.txt
arg[1] = demo . "demo" is what appears under the topology name on the storm UI.

Anonymous said...

So, I have a question as follows:

If I do not use maven or eclipse, and just try and compile, I cannot resolve anything along th lines of ...
import backtype.storm.....

I presume a class path is required but i was looking at the jars with jar tf and could not find anything.

So, it seems pretty simple the question, but I don't get it ...

An advice is possible?

kinjal gada said...

To the above question, for the error import backtype.storm.....
Install - org.apache.storm storm-core 0.9.2-incubating