ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not resolve

I was trying out Pig UDF samples from Hadoop definitive guide. Every time i tried executing com.hadoopbook.pig.IsGoodQuality UDF like this i got ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not resolve com.hadoop.pig.IsGoodQuality using imports: error

grunt< filtered_records = filter records by temp != 9999 and com.hadoop.pig.IsGoodQuality(quality);

2014-05-17 15:39:10,445 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not resolve com.hadoop.pig.IsGoodQuality using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
Details at logfile: /usr/lib/hue/pig_1400366300986.log
The way to fix that problem is by using -Dpig.additional.jars=pig-examples.jar while starting pig

pig -Dpig.additional.jars=pig-examples.jar

Create ElasticSearch cluster on single machine

I wanted to figure out how to create a multi-node ElasticSearch cluster on single machine. So i followed these instructions
  1. First i did download elasticsearch zip file.
  2. Create 3 folders in /temp like node1, node2 and node3
  3. Unzip elasticsearch.zip in each one of these folders like this
  4. Then i opend the node1/elasticsearch-1.4.2/conf/elasticsearch.yml and i did change value of cluster.name to sunilscluster and value of node.name to first. I did same in the node2 and node 3 but i did set value of node.name to second and third on node2 and node3 respectively
  5. Next step was to install marvel on each of the nodes by executing bin/plugin -i elasticsearch/marvel/latest
  6. Then i went to each of the directory and did start elasticsearch. When the nodes were coming up i could see messages in the log indicating that the nodes were able to discover each other
Once all 3 nodes were started i could see them in marvel
Note: There might be efficient way of doing this in which i could share binaries. I tried creating 3 different elasticsearch.yml and running the elasticsearch server 3 times using elasticsearch -Des.config=/node1/elasticsearch.yml,elasticsearch -Des.config=/node2/elasticsearch.yml but that did not work.

Using NodeClient for connecting to ElasticSearch server

There are 2 options for connecting to ElasticSearch server from java program
  1. TransportClient
  2. NodeClient
As per ElasticSearch.org you should use ransportClient when you want to use short lived connection but you should use NodeClient when you want to use few long-lived connection. If your creating a Web Application that talks to ElasticSearch then you would be better off creating only one connection object during startup of application and destroy the object during shutdown of application. The basic org.elasticsearch.client.Client object is thread safe so you can call it from multiple threads of Web Application. I was trying to figure out how to use NodeClient and ran into few issues but now i have my sample working, and these are my notes during the development process First i did download ElasticSearch version 1.1.1 on my local machine and i tried running it as standalone server (I did not change any thing in the configuration) and then i used following code for connecting to it and it worked In my local ElasticSearch log i could see a log line indicating when my Client connected to cluster and when it disconnected from cluster like this
At this point i was not connected to my office network. But as soon as i connected to my office network (Which has few other elasticsearch server's working), same code started failing and i got org.elasticsearch.cluster.block.ClusterBlockException: blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];[SERVICE_UNAVAILABLE/2/no master] exception

4618 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
7621 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
10623 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
13625 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
16626 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
19628 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
22630 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
25632 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
28634 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
31611 [main] WARN  org.elasticsearch.discovery  - [Tiboro] waited for 30s and no initial state was set by the discovery
31611 [main] INFO  org.elasticsearch.discovery  - [Tiboro] elasticsearch/tuC0zIEOSMaxL6reqCYjPA
31611 [main] DEBUG org.elasticsearch.gateway  - [Tiboro] can't wait on start for (possibly) reading state from gateway, will do it asynchronously
31617 [main] INFO  org.elasticsearch.http  - [Tiboro] bound_address {inet[/0:0:0:0:0:0:0:0:9201]}, publish_address {inet[/192.168.1.10:9201]}
31621 [elasticsearch[Tiboro][clusterService#updateTask][T#1]] DEBUG org.elasticsearch.cluster.service  - [Tiboro] processing [updating local node id]: execute
31621 [elasticsearch[Tiboro][clusterService#updateTask][T#1]] DEBUG org.elasticsearch.cluster.service  - [Tiboro] cluster state updated, version [0], source [updating local node id]
31622 [elasticsearch[Tiboro][clusterService#updateTask][T#1]] DEBUG org.elasticsearch.cluster.service  - [Tiboro] set local cluster state to version 0
31622 [elasticsearch[Tiboro][clusterService#updateTask][T#1]] DEBUG org.elasticsearch.cluster.service  - [Tiboro] processing [updating local node id]: done applying updated cluster_state (version: 0)
31622 [main] INFO  org.elasticsearch.node  - [Tiboro] started
Value of client org.elasticsearch.client.node.NodeClient@16888fd4
Caught exception
31630 [main] INFO  org.elasticsearch.node  - [Tiboro] stopping ...
org.elasticsearch.cluster.block.ClusterBlockException: blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];[SERVICE_UNAVAILABLE/2/no master];
	at org.elasticsearch.cluster.block.ClusterBlocks.globalBlockedException(ClusterBlocks.java:138)
	at org.elasticsearch.cluster.block.ClusterBlocks.globalBlockedRaiseException(ClusterBlocks.java:128)
	at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.(TransportSearchTypeAction.java:107)
	at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction$AsyncAction.(TransportSearchQueryThenFetchAction.java:68)
	at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction$AsyncAction.(TransportSearchQueryThenFetchAction.java:62)
	at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction.doExecute(TransportSearchQueryThenFetchAction.java:59)
	at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction.doExecute(TransportSearchQueryThenFetchAction.java:49)
	at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:63)
	at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:108)
	at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:43)
	at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:63)
	at org.elasticsearch.client.node.NodeClient.execute(NodeClient.java:92)
	at org.elasticsearch.client.support.AbstractClient.search(AbstractClient.java:212)
	at org.elasticsearch.action.search.SearchRequestBuilder.doExecute(SearchRequestBuilder.java:1043)
	at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:85)
	at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:59)
	at HelloESClient.getNodeClient(HelloESClient.java:34)
	at HelloESClient.main(HelloESClient.java:15)
31639 [main] INFO  org.elasticsearch.node  - [Tiboro] stopped
Exiting HelloESClient.main()
The problem was my client was trying to discover all different ES servers on my office network and that was causing problem. So i had to make changes in the code to introduce settings object then i set discovery.zen.ping.multicast.enabled to false and discovery.zen.ping.unicast.hosts to localhost, note that i did not change anything on elastisearch configuration. It was running as usual. After making changes i used following code to connect to elasticsearch server on my local. This client is saying only talk to elasticsearch on my local machine. After that i wanted to figure out how this will work in case of cluster, so stopped my elasticsearch server. I changed elasticsearch.yml to following now instead of default cluster name of elasticsearch. I changed it to use sunilscluster as cluster name and i did restart my es.

cluster.name: sunilscluster
node.name: deves11
In order for my NodeClient to work i had to change it and set clusterName("sunilscluster") like this Once this was done i tried connecting to ElasticSearch server in development cluster, which was on separate network with master nodes as masternode1, masternode2, masternode3. For that all i had to do was change value of discovery.zen.ping.unicast.hosts to masternode1.dev.com,masternode2.dev.com,masternode3.dev.com but i got following error The problem is when you use NodeClient it tries to talk to es server on UDP, lot of times the ports are blocked. Only solution to this is that either open UDP port or run your client on machine where UDP port can be accessed, take a look at this for more information

2014-12-30 10:14:09 DEBUG zen:104 - [nodeclient1] filtered ping responses: (filter_client[true], filter_data[false])
	--> target [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}], master [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}]
2014-12-30 10:14:09 DEBUG netty:104 - [nodeclient1] connected to node [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}]
2014-12-30 10:14:09 INFO  zen:114 - [nodeclient1] failed to send join request to master [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}], reason [org.elasticsearch.transport.RemoteTransportException: [masternode1][inet[/172.30.0.0:9300]][discovery/zen/join]; org.elasticsearch.transport.ConnectTransportException: [nodeclient1][inet[/172.29.8.36:9300]] connect_timeout[30s]; java.net.ConnectException: Connection refused: /172.29.8.36:9300]
2014-12-30 10:14:09 DEBUG netty:104 - [nodeclient1] connected to node [[#zen_unicast_1#][VDD7S0ITAA024][inet[masternode1.dev.com/172.30.0.0:9300]]]
2014-12-30 10:14:12 DEBUG netty:104 - [nodeclient1] disconnected from [[#zen_unicast_1#][VDD7S0ITAA024][inet[masternode1.dev.com/172.30.0.0:9300]]]
2014-12-30 10:14:12 DEBUG zen:104 - [nodeclient1] filtered ping responses: (filter_client[true], filter_data[false])
	--> target [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}], master [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}]
2014-12-30 10:14:12 INFO  zen:114 - [nodeclient1] failed to send join request to master [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}], reason [org.elasticsearch.transport.RemoteTransportException: [masternode1][inet[/172.30.0.0:9300]][discovery/zen/join]; org.elasticsearch.transport.ConnectTransportException: [nodeclient1][inet[/172.29.8.36:9300]] connect_timeout[30s]; java.net.ConnectException: Connection refused: /172.29.8.36:9300]
2014-12-30 10:14:12 DEBUG netty:104 - [nodeclient1] connected to node [[#zen_unicast_1#][VDD7S0ITAA024][inet[masternode1.dev.com/172.30.0.0:9300]]]
2014-12-30 10:14:15 DEBUG netty:104 - [nodeclient1] disconnected from [[#zen_unicast_1#][VDD7S0ITAA024][inet[masternode1.dev.com/172.30.0.0:9300]]]
2014-12-30 10:14:15 DEBUG zen:104 - [nodeclient1] filtered ping responses: (filter_client[true], filter_data[false])

How to use ElasticSearch from Web Application (How to fix java.lang.OutOfMemoryError: PermGen space error in Tomcat with ElasticSearch client as use case)

Recently i was working on a issue of Memory leak in the Web Application that talks to ElasticSearch. The problem was when i tried to deploy/undeploy the application few times without restarting server Tomcat ran out of memory and i got Exception in thread "http-bio-8080-exec-30" java.lang.OutOfMemoryError: PermGen space error. I wanted to figure out what is going on, so first i did create this simple web application that uses ElaticSearch client and provides a JAX-RS api to proxy calls to ElasticSearch. This is how my sample ESClient.java looks like this takes care of establishing connection with ElasticSearch using TransportClient I could use the RESTClient to make a POST call for search like this
Now when i use the VisualVM for looking at the application, i could see elasticsearch created a threadpool of about 17 threads that are connected to the ElasticSearch like this
When i was stop/ my application for updating it, these threads created by ElasticSearch do not getting closed and because of the classes loaded by the web application cannot be garbage collected and i can see following warning messages on the console. [Dec 23, 2014 8:09:55 PM] [org.apache.catalina.loader.WebappClassLoader clearReferencesThreads] [SEVERE]: The web application [/ESClientWeb] appears to have started a thread named [elasticsearch[Blink][[timer]]] but has failed to stop it. This is very likely to create a memory leak. I see bunch of these messages in the console like this
When i use the Find Leak functionality on Apache Tomcat console i could see my application name shows up like this multiple times, once for every update
After couple of redeploy's my Tomcat runs out of memory and throws Exception in thread "http-bio-8080-exec-30" java.lang.OutOfMemoryError: PermGen space error like this
In order to solve this problem, all i had to do was to create ContextListener like this, i am creating object of org.elasticsearch.client.transport.TransportClient during startup and closing it during application stopping. I had to make couple of changes in ESClient.java like this After making these changes i could see the thread pool getting destroyed during shutdown of application and as a result the application classes are getting garbage collected and no more OutOfMemoryError

Debugging Tomcat memory leak using java VisualVM

Recently i was trying to debug a memory leak in the Web Application deployed in Apache Tomcat, i wanted to figure out how the JVM memory was doing. For that i decided to use Java VisualVM and it turns out that is pretty easy to use and very powerful.
  1. I had the Apache Tomcat running on my machine, so i tested it if the application was running ok.
  2. The next step was to execute jvisualvm command which is part of JDK.
  3. It brought up Java VisualVM like this, as you can see it shows you every Java application that is running on your machine, including tomcat
  4. Then double click on the Tomcat process and it opens the details like this, As you can see i dont have to set any JVM flags but could attach the VisualVM directly
  5. The monitor tab gives you high level information about the VM like how its doing on memory and threads,.. etc. It also has Heap Dump button, when you click on it, That generates Heap Dump. By default the heap dump goes to /var/folders/.. folder. So right click on the Heap Dump related node and click on Save As and save it somewhere else on your disk. Then you can use Eclipse MAT to analyze the dump

How to view the log files and job.xml generated by Hive on HortonWorks Sandbox

I was working on building some hive code and i ran into some problems, My query kept failing with message like invalid character in job.xml. I tried to locate the job.xml but could not find it. Same thing i could not find any logs on the local machine. So i wanted to figure out how to debug this problem. I followed these steps.
  1. First thing i did in HDP 2.1 sandbox is to enable Ambari by clicking on Enable button like this
  2. Next step was to login into Ambari by going to http://localhost:8080/ and then entering admin/admin as username and password
  3. In the Ambari Application i went to the Yarn Service screen and i did uncheck Resource Manager -<yarn.log-aggregation-enable checkbox. It disables the log aggregation which copies the log files from file system to HDFS and zips them. Disabling the log aggregation keeps them on the local filesystem.
  4. Then in the Advanced section i did change value of yarn.nodemanager.debug-delay-sec to 60000 seconds, By default value of this property is 0, which means as soon as the job is done nodemanager deletes all the intermediate files. By setting it to 60000 i am preventing it from getting deleted for 100 seconds.
  5. THen i did restart all the services using Ambari for my changes to take affect
  6. After that i did execute couple of hive queries and now i could see my logs in /hadoop/yarn/logs directory like this
  7. And i could see the job.xml files being conserved in /hadoop/yarn/local/usercache/root/appcache folder for application like this

How to update records in Hive Table that uses ElasticSearch for storage

I had a requirement in which i wanted to update the Hive table. Now Hive is more of append only database and you cant update records in it (That limitation comes from Text files stored in HDFS which is how stores data by default). But if your using Hive with elasticSearch as storage then you can get this to work. When your using ElasticSearch as storage mechanism then every call from hive to insert or delete data gets forwarded to ElasticSearch API, and ElasticSearch has ability to update existing records. I used this to implement the updatable Hive table. So the scenario is lets assume you have a elasticsearch Index that stores First Name, Last Name and Email as document in ElasticSearch. For that create a new index in ES with name equals cricketers and type equals player by making a CURL call like this.

curl -XPOST "http://localhost:9200/cricketers/player/1" -d'
{
    id: "1",
    fname: "First",
    lname: "Cricketer",
    email: "first.cricketer@gmail.com"
}'
This call will first create a Index named cricketers in ES and insert one document in it, with id equals 1. Next step is to define a external table in Hive that uses org.elasticsearch.hadoop.hive.EsStorageHandler as StorageHandler and is pointing to cricketers/player index that you created in last step. Also important setting is 'es.mapping.id'='id' which is saying that use value of id column as primary key/id in elasticsearch.

create external table cricketers_es(id String, fname String, lname String, email String) stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler' 
TBLPROPERTIES('es.resource'='cricketers/player', 'es.index.auto.create'='false', 'es.mapping.id'='id')
Once the table is created you can check records in it by executing select * from cricketers_es command. Now you should see 1 record that is there in the Index.
Since hive does not have concept of update statement. You will have to create a hive table that will have the records that you want to insert/update(Only delta) and then you will use this delta table for updating the cricketers_es table. In order to do that first create a text file that holds delta of the records that you want to update. In my case i did create this simple cricketers.txt file like this and upload into to HDFS at /user/hue folder

1,sachin,tendulkar,sachin.tendulakar@bcci.com
2,Rahul,Dravid,rahul.dravid@bcci.com
After that create a Hive table called cricketers_stage which will be used for holding the delta records you want by executing following statement

create table cricketers_stage(id String, fname String, lname String, email String) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
Now use following Hive statement to load your delta records into cricketers_stage like this.

LOAD DATA INPATH '/user/hue/cricketers.txt' INTO TABLE `default.cricketers_stage`
Next step ElasticSearch write mode to upsert by setting following property in the Hive console.

es.write.operation=upsert
The last step is to execute following statement in Hive which will take content of cricketers_stage and insert those records into cricketers_es table.

insert into cricketers_es select * from cricketers_stage
Now if you run select * from cricketers_es you should see 2 records your first record is updated and record with id 2 is new insert.

1,sachin,tendulkar,sachin.tendulakar@gmail.com
2,Rahul,Dravid,rahul.dravid@bcci.com
You can also verify the records in elasticsearch by executing following CURL command

curl -XPOST "http://localhost:9200/cricketers/player/_search?pretty=true" -d'
{
  "query": {
      "match_all": {}
  }
}'

Connecting to HDP 2.2 Hive using JDBC driver

In the http://wpcertification.blogspot.com/2014/04/connecting-to-hive-using-jdbc-client.html i blogged about how to connect to Hive using Apache Hive. I wanted to figure out how to connect to Hive 0.13 in Hortonworks Sandbox 2.2, so i followed these steps You can download the complete source code for this project from here First i did create App.java class like this. When connecting to HDP 2.2 i had to use HiveServer2. I followed instructions on HDP Documents I used following maven build file, you can see i am including Hive, Hadoop and Log4j jar. The log4j.jar lets me enable logging Once my code was complete i tried connecting to Hive and got following error around access control.

java.sql.SQLException: Error while compiling statement: FAILED: HiveAccessControlException Permission denied. 
Principal [name=root, type=USER] does not have following privileges on Object [type=TABLE_OR_VIEW, name=default.sample_07] : [SELECT]
 at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:121)
 at org.apache.hive.jdbc.Utils.verifySuccessWithInfo(Utils.java:109)
 at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:231)
 at org.apache.hive.jdbc.HiveStatement.executeQuery(HiveStatement.java:355)
 at com.spnotes.hive.App.main(App.java:24)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:601)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)

I had to use following command in hive console to give access to root user for querying employee table.

hive> grant select on table sample_08 to user employee;

Fixing "WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!" problem

I use HortonWorks Sandbox for learning/trying out Hadoop features. Now the way Hortonworks Sandbox works is your supposed to download the VM and then ssh into it using ssh root@127.0.0.1 -p 2222

for connecting to it or playing with it. Initially i had HDP 2.1 sandbox but when i downloaded HDP 2.2 and tried to connect to it i got WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED! error. So basically both HDP 2.1 and HDP 2.2 has a different SSH key but your trying to refer them using same IP 127.0.0.1 and your machine says hey something is wrong your ip is same but SSH key does not match
In order to solve this problem, all i did was to open /Users/test/.ssh/known_hosts file in text editor and then i searched for 127.0.0.1 and i removed the line that started with this ip and saved my known_hosts file
Now when i rerun the ssh root@127.0.0.1 -p 2222

command i am able to connect to the HDP 2.2 image ok.

How to use Apache Storm Trident API

Sometime back i blogged about HelloWorld - Apache Storm Word Counter program , which demonstrates how to build WordCount program using Apache Storm. Now problem with that project was that it was not Maven project instead i had screen shot of all the jars that you will have to include in the program. So i changed it to use Apache Maven as build framework. You can download the source code. In addition to normal API, storm also provides trident API, which allows us to build much compact code, i wanted to try that out so i built this simple Word Count program using Trident API. While using Trident API you will have to start by creating object of TridentTopology, you still need LineReaderSpout that takes file path as input, reads and emits one line of file at a time. But the part that is different is you dont need WordSpitterBolt and WordCounterBolt, instead you can use compact code like this

 topology.newStream("spout",lineReaderSpout)
.each(new Fields("line"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.aggregate(new Fields("word"), new Count(), new Fields("count"))
.each(new Fields("word","count"), new Debug());
The each(new Fields("line"), new Split(), new Fields("word")) line takes the line emitted by the LineReaderSpout and uses built in storm.trident.operation.builtin.Split function to split the lines into words and emits each word as Tuple. The groupBy(new Fields("word")) line takes the tuples and groups them by word's. The aggregate(new Fields("word"), new Count(), new Fields("count")) line takes care of aggregating the words and counts them(At this point you have a tuple like {word,count}), for that it uses storm.trident.operation.builtin.Count class. The last part is .each(new Fields("word","count"), new Debug());, which takes care of printing each tuple which in WORD count format. Trident API provides set of sample classes that makes developing WordCount type of program very easy. But you could have created your own version of Split and Count program and the code would still look significantly compact