Configuring Flume to write avro events into HDFS

Recently i wanted to figure out how to configure Flume so that it can listen for Avro Events and whenever it gets event it should dump it in the HDFS. In order to do that i built this simple Flume configuration

# example.conf: A single-node Flume configuration

# Name the components on this agent
agent1.sources = avro
agent1.sinks = logger1
agent1.channels = memory1

# Describe/configure the source
agent1.sources.avro.type = avro
agent1.sources.avro.bind = localhost
agent1.sources.avro.port = 41414
agent1.sources.avro.selector.type = replicating
agent1.sources.avro.channels = memory1

# Describe the sink
agent1.sinks.hdfs1.type = hdfs
agent1.sinks.hdfs1.hdfs.path=/tmp/flume/events
agent1.sinks.hdfs1.hdfs.rollInterval=60
#The number of events to be written into a file before it is rolled.
agent1.sinks.hdfs1.hdfs.rollSize=0
agent1.sinks.hdfs1.hdfs.batchSize=100
agent1.sinks.hdfs1.hdfs.serializer=org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
agent1.sinks.hdfs1.hdfs.fileType = DataStream
agent1.sinks = hdfs1
agent1.sinks.hdfs1.channel = memory1

# Use a channel which buffers events in memory
agent1.channels.memory1.type = memory
agent1.channels.memory1.capacity = 1000
agent1.channels.memory1.transactionCapacity = 100
In this i have Avro source listening on local machine at port 41414, once it gets event it writes that in HDFS in /tmp/flume/events directory Once this file is saved in local machine as hellohdfsavro.conf i can start the flume agent using following command

flume-ng agent --conf conf --conf-file hellohdfsavro.conf  --name agent1 -Dflume.root.logger=DEBUG,console

Configure Flume to use IBM MQ as JMS Source

Recently i had a requirement in which i wanted to figure out how to read XML documents stored as message in IBM MQ and post them into Hadoop. I decided to use Apache Flume + Flume JMS Source + Flume HDFS Sink for this. I had to use following steps for this setup. Please note that i am not WebSphere MQ expert so there might be a better/easier way to achieve this.
  1. First i had to install WebSphere MQ Client on my windows machine
  2. Next i did create a simple jms.config like this in c:\temp folder of my windows box
    
    INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactory
    PROVIDER_URL=file:/C:/temp/jmsbinding
    
  3. Next step is to run JMSAdmin.bat c:\temp\jms.config, it opens up a console like this, type following command in it and change it to use the right configuration that you need
    
    DEF CF(myConnectionFactory) QMGR(myQueueManager) HOSTNAME(myHostName) PORT(1426) CHANNEL(myChannelName) TRANSPORT(CLIENT)
    
    Once you execute this command it will generate .bindings file in C:/temp/jmsbinding (Folder that is configured as value of PROVIDER_URL)
  4. Next step for me was to copy the C:/temp/jmsbinding/.bindings folder to /etc/flume/conf folder in my linux box which has Flume running on it.
  5. In addition to bindings file i also need the MQ client jar files. I started by copying jms.jar from C:\Program Files (x86)\IBM\WebSphere MQ\java\lib to /usr/hdp/current/flume-server/lib/ folder in my Hadoop installation, but i kept getting ClassNotFoundException and to deal with that i copied more and more jars from my MQ Client into Flume
    
    jms.jar
    fscontext.jar
    jndi.jar
    providerutil.jar
    com.ibm.mq.jar
    com.ibm.mqjms.jar
    com.ibm.mq.pcf.jar
    connector.jar
    dhbcore.jar
    com.ibm.mq.jmqi.jar
    com.ibm.mq.headers.jar
    
  6. Once the Flume MQ setup was in place, last step was to create Flume Configuration that points to your bindings file and also points to your MQ server like this
    
    # Flume agent config
    #st the sources, channels, and sinks for the agent
    ggflume.sources = jms
    ggflume.channels = memory
    ggflume.sinks = hadoop
    
    ggflume.sources.jms.channels=memory
    ggflume.sinks.hadoop.channel=memory
    
    ggflume.sources.jms.type = jms
    ggflume.sources.jms.providerURL = file:///etc/flume/conf
    ggflume.sources.jms.initialContextFactory = com.sun.jndi.fscontext.RefFSContextFactory
    ggflume.sources.jms.destinationType=QUEUE
    ggflume.sources.jms.destinationName=<channelName>
    ggflume.sources.jms.connectionFactory=myConnectionFactory
    ggflume.sources.jms.batchSize=1
    
    ggflume.channels.memory.type = memory
    ggflume.channels.memory.capacity = 1000
    ggflume.channels.memory.transactionCapacity = 100
    
    ggflume.sinks.hadoop.type=hdfs
    ggflume.sinks.hadoop.hdfs.path=/data/mq/xml
    ggflume.sinks.hadoop.hdfs.filePrefix=sample
    
    
  7. Now start flume server by executing following flume command flume-ng agent --conf conf --conf-file mqflume.conf --name ggflume -Dflume.root.logger=DEBUG,console
Now you should see the existing messages from MQ being dumped into HDFS

Moving data from Avro to ORC files

In the Importing data from Sqoop into Hive External Table with Avro encoding i blogged about how to sqoop data from RDBMS into Hive. But i wanted to take it to next step by moving the data downloaded to ORC table. I followed these steps to achieve that
  1. First thing is to find out the schema of the table in Avro and you can get that by executing following statement in hive
    
    show create table CUSTOMER;
    
    You will get output that looks something like this, it contains schema of the table
    
    CREATE EXTERNAL TABLE `CUSTOMER`(
      `contactid` int COMMENT 'from deserializer',
      `firstname` string COMMENT 'from deserializer',
      `lastname` string COMMENT 'from deserializer',
      `email` string COMMENT 'from deserializer')
    ROW FORMAT SERDE
      'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    STORED AS INPUTFORMAT
      'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
    OUTPUTFORMAT
      'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
    LOCATION
      'hdfs://sandbox.hortonworks.com:8020/tmp/customer/data'
    TBLPROPERTIES (
      'avro.schema.url'='hdfs:///tmp/customer/schema/customer.avsc',
      'transient_lastDdlTime'='1431719666')
    
  2. Copy the schema from last step and remove the part about format and table properties and replace it with part that highlighted in this code, execute this in Hive to create customer table in ORC format
    
    CREATE EXTERNAL TABLE `CUSTOMER_ORC`(
      `contactid` int  ,
      `firstname` string  ,
      `lastname` string  ,
      `email` string  )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '\001'
    LOCATION
      'hdfs://sandbox.hortonworks.com:8020/tmp/customer/data_orc'
    STORED AS ORC tblproperties ("orc.compress"="SNAPPY","orc.row.index.stride"="20000"); 
    
  3. Last step is to copy data from avro table to ORC table, you can achieve that by using following command
    
    insert into table CUSTOMER_ORC select * from customer;
    

Importing data from Sqoop into Hive External Table with Avro encoding

I wanted to figure out how to import content of RDBMS table into Hive with Avro encoding, during this process i wanted to use external hive tables so that i have complete control over the location of files. First i did create following table in the mysql database which is on the same machine as that of my HortonWorks Sandbox
  1. First create CUSTOMER table like this in mysql
    
    CREATE TABLE CUSTOMER (       
    contactid INTEGER NOT NULL ,       
    firstname VARCHAR(50),       
    lastname  VARCHAR(50),       
    email varchar(50) );
    
  2. After creating table add couple of records in it by executing following insert statement insert into customer values(1,'Sachin','Tendulark','sachin@gmail.com');
  3. Next step is to run sqoop query that downloads records of the table into HDFS at /tmp/customer/sample. In real world you might want to download only first 10 records or so into Hive, because you need few sample records just to create avro schema
    
    sqoop import --connect jdbc:mysql://localhost/test --table CUSTOMER --username sqoop1 
    --password sqoop -m 1 --create-hive-table 
    --hive-table CONTACT --as-avrodatafile  --target-dir /tmp/customer/sample
    
  4. Running sqoop command it will dump records in HDFS, so first download the avro file generated by sqoop
    
    hdfs dfs -get /tmp/customer/sample/part-m-00000.avro
    
  5. Use the avro-tools-*.jar, to read schema of the file generated by sqoop. by executing following command
    
    java -jar avro-tools-1.7.5.jar getschema part-m-00000.avro > customer.avsc
    
    This is how the customer.avsc file looks like in my case
    
    {
      "type" : "record",
      "name" : "CUSTOMER",
      "doc" : "Sqoop import of CUSTOMER",
      "fields" : [ {
        "name" : "contactid",
        "type" : [ "int", "null" ],
        "columnName" : "contactid",
        "sqlType" : "4"
      }, {
        "name" : "firstname",
        "type" : [ "string", "null" ],
        "columnName" : "firstname",
        "sqlType" : "12"
      }, {
        "name" : "lastname",
        "type" : [ "string", "null" ],
        "columnName" : "lastname",
        "sqlType" : "12"
      }, {
        "name" : "email",
        "type" : [ "string", "null" ],
        "columnName" : "email",
        "sqlType" : "12"
      } ],
      "tableName" : "CUSTOMER"
    }
    
  6. Next step is to upload the avro schema file that you created in the last step back to HDFS, in my case i had HDFS folder called /tmp/customer/schema and i uploaded the avro schema file in it
    
    hdfs dfs -put customer.avsc /tmp/customer/schema/
    
  7. Now go to hive and execute the following command to define External Customer Hive table with avro schema defined in last step
    
    CREATE EXTERNAL TABLE CUSTOMER
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
    LOCATION '/tmp/customer/data'
    TBLPROPERTIES ('avro.schema.url'='hdfs:///tmp/customer/schema/customer.avsc');
    
  8. Last step is to run sqoop again but this time with all the data in the external directory that Customer hive table is pointing to.
    
    sqoop import --connect jdbc:mysql://localhost/test --table CUSTOMER --username sqoop1 
    --password sqoop -m 1 --as-avrodatafile 
     --target-dir /tmp/customer/data --compression-codec snappy
    
Now if you run select query on the CUSTOMER table you should be able to get all the data that you see in your RDBMS

Running oozie job on Hortonworks Sandbox

In the Enabling Oozie console on Cloudera VM 4.4.0 and executing examples i blogged about how to run oozie job in Cloudera Sandbox. It seems this process is little bit easier in HortonWorks 2.2 sandbox. So first i had brand new HDP 2.2 image and i tried running oozie example on it by executing

oozie job -oozie http://localhost:11000/oozie -config examples/apps/map-reduce/job.properties -run
But when i tried running it i got following error

Error: E0501 : E0501: Could not perform authorization operation, Call From sandbox.hortonworks.com/10.0.2.15 to localhost:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
So i looked into /var/log/oozie/oozie.log and i saw following error

2015-05-01 20:34:39,195  WARN V1JobsServlet:546 - SERVER[sandbox.hortonworks.com] USER[root] GROUP[-] TOKEN[-] APP[-] JOB[-] ACTION[-] URL[POST http://sandbox.hortonworks.com:11000/oozie/v2/jobs?action=start] error[E0501], E0501: Could not perform authorization operation, Call From sandbox.hortonworks.com/10.0.2.15 to localhost:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
org.apache.oozie.servlet.XServletException: E0501: Could not perform authorization operation, Call From sandbox.hortonworks.com/10.0.2.15 to localhost:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
 at org.apache.oozie.servlet.BaseJobServlet.checkAuthorizationForApp(BaseJobServlet.java:240)
 at org.apache.oozie.servlet.BaseJobsServlet.doPost(BaseJobsServlet.java:96)
 at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
 at org.apache.oozie.servlet.JsonRestServlet.service(JsonRestServlet.java:287)
 at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
 at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290)
 at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
 at org.apache.oozie.servlet.AuthFilter$2.doFilter(AuthFilter.java:143)
 at org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:572)
 at org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:542)
 at org.apache.oozie.servlet.AuthFilter.doFilter(AuthFilter.java:148)
 at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:235)
 at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
 at org.apache.oozie.servlet.HostnameFilter.doFilter(HostnameFilter.java:84)
 at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:235)
 at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
 at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:233)
 at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191)
 at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
 at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
 at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
 at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:293)
 at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:861)
 at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.java:606)
 at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:489)
 at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.oozie.service.AuthorizationException: E0501: Could not perform authorization operation, Call From sandbox.hortonworks.com/10.0.2.15 to localhost:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
 at org.apache.oozie.service.AuthorizationService.authorizeForApp(AuthorizationService.java:399)
 at org.apache.oozie.servlet.BaseJobServlet.checkAuthorizationForApp(BaseJobServlet.java:229)
 ... 25 more
Caused by: java.net.ConnectException: Call From sandbox.hortonworks.com/10.0.2.15 to localhost:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:791)
 at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:731)
 at org.apache.hadoop.ipc.Client.call(Client.java:1472)
 at org.apache.hadoop.ipc.Client.call(Client.java:1399)
 at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
 at com.sun.proxy.$Proxy29.getFileInfo(Unknown Source)
 at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:752)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
 at com.sun.proxy.$Proxy30.getFileInfo(Unknown Source)
 at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1988)
 at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118)
 at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
 at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
 at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400)
 at org.apache.oozie.service.AuthorizationService.authorizeForApp(AuthorizationService.java:371)
 ... 26 more
Caused by: java.net.ConnectException: Connection refused
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
 at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
 at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
 at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:494)
 at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:607)
 at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:705)
 at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:368)
 at org.apache.hadoop.ipc.Client.getConnection(Client.java:1521)
 at org.apache.hadoop.ipc.Client.call(Client.java:1438)
 ... 44 more
In order to solve these issues i had to make changes in examples/apps/map-reduce/job.properties, to replace localhost with sandbox.hortonworks.com


nameNode=hdfs://sandbox.hortonworks.com:8020
jobTracker=sandbox.hortonworks.com:8032

queueName=default
examplesRoot=examples

oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/map-reduce
outputDir=map-reduce

Exporting data from Hive table to RDBMS

In the Importing data from RDBMS into Hadoop using sqoop i blogged about how to import data from RDBMS to Hive, but now i wanted to figure out how to export data from Hive back to RDBMS, Sqoop has export feature that allows you to export data from Hadoop directory(CSV files in a directory) to RDBMS, I wanted to try exporting data from sqoop so first i created a simple contact_hive table and populated some data in it, then i used sqoop to export the content of contact_hive table into contact table in MySQL, i followed these steps, if you already have a hive table populated then you can skip first 5 steps and go to step 6.
  1. Create contacthive.csv file which has simple data with 4 columns separated by comma
    
    1,MahendraSingh,Dhoni,mahendra@bcci.com
    2,Virat,Kohali,virat@bcci.com
    5,Sachin,Tendulkar,sachin@bcci.com
    
  2. Upload the contacthive.csv that you created in last step in HDFS at /tmp folder using following command
    
    hdfs dfs -put contacthive.csv /tmp
    
  3. Define a contact_hive table that will have 4 columns, contactId, firstName, lastName and email, execute this command in hive console
    
    CREATE TABLE contact_hive(contactId Int, firstName String, lastName String, email String) row format delimited fields terminated by "," stored as textfile;
    
  4. In this step populate the contact_hive table that you created in the last step with the data from contacthive.csv file created in step 1. Execute this command in Hive console to populate contact_hive table
    
    LOAD DATA INPATH  "/tmp/contacthive.csv" OVERWRITE INTO TABLE contact_hive;
    
  5. Since i am using Hive managed table, it will move the contacthive.csv file to Hive managed directory in case of Hortonworks that directory is /apps/hive/warehouse, You can verify that by executing following command on HDFS
    
    hdfs dfs -ls /apps/hive/warehouse/contact_hive
    
  6. Before you export data into RDBMS, you will have to create the table in mysql, use following command to create the CONTACT table in mysql.
    
    
    CREATE TABLE CUSTOMER (
          contactid INTEGER NOT NULL ,
          firstname VARCHAR(50),
          lastname  VARCHAR(50),
          email varchar(50)
    );
    
  7. Now last step is to execute sqoop export command that exports data from hive/hdfs directory to database
    
    sqoop export --connect jdbc:mysql://localhost/test --table CONTACT --export-dir /apps/hive/warehouse/contact_hive
    

ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not resolve

I was trying out Pig UDF samples from Hadoop definitive guide. Every time i tried executing com.hadoopbook.pig.IsGoodQuality UDF like this i got ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not resolve com.hadoop.pig.IsGoodQuality using imports: error

grunt< filtered_records = filter records by temp != 9999 and com.hadoop.pig.IsGoodQuality(quality);

2014-05-17 15:39:10,445 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not resolve com.hadoop.pig.IsGoodQuality using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
Details at logfile: /usr/lib/hue/pig_1400366300986.log
The way to fix that problem is by using -Dpig.additional.jars=pig-examples.jar while starting pig

pig -Dpig.additional.jars=pig-examples.jar

Create ElasticSearch cluster on single machine

I wanted to figure out how to create a multi-node ElasticSearch cluster on single machine. So i followed these instructions
  1. First i did download elasticsearch zip file.
  2. Create 3 folders in /temp like node1, node2 and node3
  3. Unzip elasticsearch.zip in each one of these folders like this
  4. Then i opend the node1/elasticsearch-1.4.2/conf/elasticsearch.yml and i did change value of cluster.name to sunilscluster and value of node.name to first. I did same in the node2 and node 3 but i did set value of node.name to second and third on node2 and node3 respectively
  5. Next step was to install marvel on each of the nodes by executing bin/plugin -i elasticsearch/marvel/latest
  6. Then i went to each of the directory and did start elasticsearch. When the nodes were coming up i could see messages in the log indicating that the nodes were able to discover each other
Once all 3 nodes were started i could see them in marvel
Note: There might be efficient way of doing this in which i could share binaries. I tried creating 3 different elasticsearch.yml and running the elasticsearch server 3 times using elasticsearch -Des.config=/node1/elasticsearch.yml,elasticsearch -Des.config=/node2/elasticsearch.yml but that did not work.

Using NodeClient for connecting to ElasticSearch server

There are 2 options for connecting to ElasticSearch server from java program
  1. TransportClient
  2. NodeClient
As per ElasticSearch.org you should use ransportClient when you want to use short lived connection but you should use NodeClient when you want to use few long-lived connection. If your creating a Web Application that talks to ElasticSearch then you would be better off creating only one connection object during startup of application and destroy the object during shutdown of application. The basic org.elasticsearch.client.Client object is thread safe so you can call it from multiple threads of Web Application. I was trying to figure out how to use NodeClient and ran into few issues but now i have my sample working, and these are my notes during the development process First i did download ElasticSearch version 1.1.1 on my local machine and i tried running it as standalone server (I did not change any thing in the configuration) and then i used following code for connecting to it and it worked In my local ElasticSearch log i could see a log line indicating when my Client connected to cluster and when it disconnected from cluster like this
At this point i was not connected to my office network. But as soon as i connected to my office network (Which has few other elasticsearch server's working), same code started failing and i got org.elasticsearch.cluster.block.ClusterBlockException: blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];[SERVICE_UNAVAILABLE/2/no master] exception

4618 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
7621 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
10623 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
13625 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
16626 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
19628 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
22630 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
25632 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
28634 [elasticsearch[Tiboro][generic][T#1]] DEBUG org.elasticsearch.discovery.zen  - [Tiboro] filtered ping responses: (filter_client[true], filter_data[false]) {none}
31611 [main] WARN  org.elasticsearch.discovery  - [Tiboro] waited for 30s and no initial state was set by the discovery
31611 [main] INFO  org.elasticsearch.discovery  - [Tiboro] elasticsearch/tuC0zIEOSMaxL6reqCYjPA
31611 [main] DEBUG org.elasticsearch.gateway  - [Tiboro] can't wait on start for (possibly) reading state from gateway, will do it asynchronously
31617 [main] INFO  org.elasticsearch.http  - [Tiboro] bound_address {inet[/0:0:0:0:0:0:0:0:9201]}, publish_address {inet[/192.168.1.10:9201]}
31621 [elasticsearch[Tiboro][clusterService#updateTask][T#1]] DEBUG org.elasticsearch.cluster.service  - [Tiboro] processing [updating local node id]: execute
31621 [elasticsearch[Tiboro][clusterService#updateTask][T#1]] DEBUG org.elasticsearch.cluster.service  - [Tiboro] cluster state updated, version [0], source [updating local node id]
31622 [elasticsearch[Tiboro][clusterService#updateTask][T#1]] DEBUG org.elasticsearch.cluster.service  - [Tiboro] set local cluster state to version 0
31622 [elasticsearch[Tiboro][clusterService#updateTask][T#1]] DEBUG org.elasticsearch.cluster.service  - [Tiboro] processing [updating local node id]: done applying updated cluster_state (version: 0)
31622 [main] INFO  org.elasticsearch.node  - [Tiboro] started
Value of client org.elasticsearch.client.node.NodeClient@16888fd4
Caught exception
31630 [main] INFO  org.elasticsearch.node  - [Tiboro] stopping ...
org.elasticsearch.cluster.block.ClusterBlockException: blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];[SERVICE_UNAVAILABLE/2/no master];
	at org.elasticsearch.cluster.block.ClusterBlocks.globalBlockedException(ClusterBlocks.java:138)
	at org.elasticsearch.cluster.block.ClusterBlocks.globalBlockedRaiseException(ClusterBlocks.java:128)
	at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.(TransportSearchTypeAction.java:107)
	at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction$AsyncAction.(TransportSearchQueryThenFetchAction.java:68)
	at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction$AsyncAction.(TransportSearchQueryThenFetchAction.java:62)
	at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction.doExecute(TransportSearchQueryThenFetchAction.java:59)
	at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction.doExecute(TransportSearchQueryThenFetchAction.java:49)
	at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:63)
	at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:108)
	at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:43)
	at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:63)
	at org.elasticsearch.client.node.NodeClient.execute(NodeClient.java:92)
	at org.elasticsearch.client.support.AbstractClient.search(AbstractClient.java:212)
	at org.elasticsearch.action.search.SearchRequestBuilder.doExecute(SearchRequestBuilder.java:1043)
	at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:85)
	at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:59)
	at HelloESClient.getNodeClient(HelloESClient.java:34)
	at HelloESClient.main(HelloESClient.java:15)
31639 [main] INFO  org.elasticsearch.node  - [Tiboro] stopped
Exiting HelloESClient.main()
The problem was my client was trying to discover all different ES servers on my office network and that was causing problem. So i had to make changes in the code to introduce settings object then i set discovery.zen.ping.multicast.enabled to false and discovery.zen.ping.unicast.hosts to localhost, note that i did not change anything on elastisearch configuration. It was running as usual. After making changes i used following code to connect to elasticsearch server on my local. This client is saying only talk to elasticsearch on my local machine. After that i wanted to figure out how this will work in case of cluster, so stopped my elasticsearch server. I changed elasticsearch.yml to following now instead of default cluster name of elasticsearch. I changed it to use sunilscluster as cluster name and i did restart my es.

cluster.name: sunilscluster
node.name: deves11
In order for my NodeClient to work i had to change it and set clusterName("sunilscluster") like this Once this was done i tried connecting to ElasticSearch server in development cluster, which was on separate network with master nodes as masternode1, masternode2, masternode3. For that all i had to do was change value of discovery.zen.ping.unicast.hosts to masternode1.dev.com,masternode2.dev.com,masternode3.dev.com but i got following error The problem is when you use NodeClient it tries to talk to es server on UDP, lot of times the ports are blocked. Only solution to this is that either open UDP port or run your client on machine where UDP port can be accessed, take a look at this for more information

2014-12-30 10:14:09 DEBUG zen:104 - [nodeclient1] filtered ping responses: (filter_client[true], filter_data[false])
	--> target [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}], master [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}]
2014-12-30 10:14:09 DEBUG netty:104 - [nodeclient1] connected to node [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}]
2014-12-30 10:14:09 INFO  zen:114 - [nodeclient1] failed to send join request to master [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}], reason [org.elasticsearch.transport.RemoteTransportException: [masternode1][inet[/172.30.0.0:9300]][discovery/zen/join]; org.elasticsearch.transport.ConnectTransportException: [nodeclient1][inet[/172.29.8.36:9300]] connect_timeout[30s]; java.net.ConnectException: Connection refused: /172.29.8.36:9300]
2014-12-30 10:14:09 DEBUG netty:104 - [nodeclient1] connected to node [[#zen_unicast_1#][VDD7S0ITAA024][inet[masternode1.dev.com/172.30.0.0:9300]]]
2014-12-30 10:14:12 DEBUG netty:104 - [nodeclient1] disconnected from [[#zen_unicast_1#][VDD7S0ITAA024][inet[masternode1.dev.com/172.30.0.0:9300]]]
2014-12-30 10:14:12 DEBUG zen:104 - [nodeclient1] filtered ping responses: (filter_client[true], filter_data[false])
	--> target [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}], master [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}]
2014-12-30 10:14:12 INFO  zen:114 - [nodeclient1] failed to send join request to master [[masternode1][1u63mFF0SrG7J3UQ3rtLkQ][n01sml101][inet[/172.30.0.0:9300]]{data=false, master=true}], reason [org.elasticsearch.transport.RemoteTransportException: [masternode1][inet[/172.30.0.0:9300]][discovery/zen/join]; org.elasticsearch.transport.ConnectTransportException: [nodeclient1][inet[/172.29.8.36:9300]] connect_timeout[30s]; java.net.ConnectException: Connection refused: /172.29.8.36:9300]
2014-12-30 10:14:12 DEBUG netty:104 - [nodeclient1] connected to node [[#zen_unicast_1#][VDD7S0ITAA024][inet[masternode1.dev.com/172.30.0.0:9300]]]
2014-12-30 10:14:15 DEBUG netty:104 - [nodeclient1] disconnected from [[#zen_unicast_1#][VDD7S0ITAA024][inet[masternode1.dev.com/172.30.0.0:9300]]]
2014-12-30 10:14:15 DEBUG zen:104 - [nodeclient1] filtered ping responses: (filter_client[true], filter_data[false])

How to use ElasticSearch from Web Application (How to fix java.lang.OutOfMemoryError: PermGen space error in Tomcat with ElasticSearch client as use case)

Recently i was working on a issue of Memory leak in the Web Application that talks to ElasticSearch. The problem was when i tried to deploy/undeploy the application few times without restarting server Tomcat ran out of memory and i got Exception in thread "http-bio-8080-exec-30" java.lang.OutOfMemoryError: PermGen space error. I wanted to figure out what is going on, so first i did create this simple web application that uses ElaticSearch client and provides a JAX-RS api to proxy calls to ElasticSearch. This is how my sample ESClient.java looks like this takes care of establishing connection with ElasticSearch using TransportClient I could use the RESTClient to make a POST call for search like this
Now when i use the VisualVM for looking at the application, i could see elasticsearch created a threadpool of about 17 threads that are connected to the ElasticSearch like this
When i was stop/ my application for updating it, these threads created by ElasticSearch do not getting closed and because of the classes loaded by the web application cannot be garbage collected and i can see following warning messages on the console. [Dec 23, 2014 8:09:55 PM] [org.apache.catalina.loader.WebappClassLoader clearReferencesThreads] [SEVERE]: The web application [/ESClientWeb] appears to have started a thread named [elasticsearch[Blink][[timer]]] but has failed to stop it. This is very likely to create a memory leak. I see bunch of these messages in the console like this
When i use the Find Leak functionality on Apache Tomcat console i could see my application name shows up like this multiple times, once for every update
After couple of redeploy's my Tomcat runs out of memory and throws Exception in thread "http-bio-8080-exec-30" java.lang.OutOfMemoryError: PermGen space error like this
In order to solve this problem, all i had to do was to create ContextListener like this, i am creating object of org.elasticsearch.client.transport.TransportClient during startup and closing it during application stopping. I had to make couple of changes in ESClient.java like this After making these changes i could see the thread pool getting destroyed during shutdown of application and as a result the application classes are getting garbage collected and no more OutOfMemoryError