tag:blogger.com,1999:blog-22027664632519031592024-03-18T11:03:21.674-07:00Sunil's NotesMy notes for things that i am learning aboutSunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.comBlogger1145125tag:blogger.com,1999:blog-2202766463251903159.post-13270454679463808062017-11-08T13:34:00.000-08:002017-11-08T13:34:25.497-08:00How to drain/delete/expire existing messages in KafkaSome time you might have a bad record in Kafka topic that you want to delete. Kafka does not provide direct option to delete specific record. Only way to delete records is to expire them. You can achieve this by setting data retention to say 1 second that expires all the old messages.
You can follow these steps
<ol>
First check the topic to find out value of <code>retention.ms</code> config parameter for the topic
<li>
<pre><code>
kafka_2.11-0.11.0.1 spatil$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name my-topic --entity-type topics
Configs for topic 'my-topic' are retention.ms=86400000
</code></pre>
</li>
<li>
Change value of <code>retention.ms</code> to 1 which means all messages older than 1 ms will be expired
<pre><code>
kafka_2.11-0.11.0.1 spatil$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name my-topic --entity-type topics --add-config retention.ms=1
Completed Updating config for entity: topic 'my-topic'
</code></pre>
</li>
<li>Wait for few seconds and monitor logs of the Kafka server to make sure that messages from the topic are deleted </li>
<li>
Now change the value of <code>retention.ms</code> back to its original value which was 86400000 (7 days)
<pre><code>
kafka_2.11-0.11.0.1 spatil$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name my-topic --entity-type topics --add-config retention.ms=86400000
Completed Updating config for entity: topic 'my-topic'.
</code></pre>
</li>
<li>
You can verify that your changes are saved by running this command
<pre><code>
kafka_2.11-0.11.0.1 spatil$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name my-topic --entity-type topics
Configs for topic 'my-topic' are retention.ms=86400000
</code></pre>
<li>
</ol>Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com259tag:blogger.com,1999:blog-2202766463251903159.post-30470051087975949922017-11-03T15:19:00.000-07:002017-11-03T15:19:31.085-07:00Kafka how to reset number of partitions in a topics I wanted to figure out how to reset number of partitions in my topic in Kafka and I followed these steps
<ol>
<li>
I did create a sample topic called my-topic with single partition
<pre><code>
spatil$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic my-topic --replication-factor 1 --partitions 1
Created topic "my-topic".
</code></pre>
</li>
<li>
I used describe command to verify that my topic has actually single partition
<pre><code>
spatil$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-topic
Topic:my-topic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
</code></pre>
</li>
<li>
Then I did execute alter command on my partition and changed number of partitions from 1 to 3
<pre><code>
spatil$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
</code></pre>
</li>
<li>
I did execute describe command on my topic to verify that it actually has 3 topics
<pre><code>
spatil$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-topic
Topic:my-topic PartitionCount:3 ReplicationFactor:1 Configs:
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: my-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: my-topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
</code></pre>
</li>
</ol>Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com37tag:blogger.com,1999:blog-2202766463251903159.post-25429075085790343972017-11-03T14:48:00.000-07:002017-11-03T16:25:44.127-07:00How to reset consumer group offsetFirst run describe on topic to check what it the current LAG its zero in this case
<pre><code>
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group user.kafkaconsumer
</code></pre>
Now run following command to just preview what will be the next offset if you reset
<pre><code>
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group user.kafkaconsumer --reset-offsets --to-earliest --all-topics
</code></pre>
Shutdown the consumer so that you can restart the consumer
Now go back/reset the offset so that it goes back to first message
<code>
3bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group user.kafkaconsumer --reset-offsets --to-earliest --all-topics <b>--execute</b>
</code>
Go back and verify that the consumer offset actually went back by executing following command
<pre><code>
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group user.kafkaconsumer
</code></pre>
You should be able to see the offset set back to 22000 which is start of first message in Kafka.Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com26tag:blogger.com,1999:blog-2202766463251903159.post-35576516926600052432017-01-11T21:48:00.001-08:002017-01-11T22:09:53.418-08:00Spark program to read data from RDBMSI wanted to figure out how to connect to RDBMS from spark and extract data, so i followed these steps. You can download this project form <a href='https://github.com/sdpatil/JDBCSpark'>github</a> <br/>
First i did create Address table in my local mysql like this
<pre><code>
CREATE TABLE `address` (
`addressid` int(11) NOT NULL AUTO_INCREMENT,
`contactid` int(11) DEFAULT NULL,
`line1` varchar(300) NOT NULL,
`city` varchar(50) NOT NULL,
`state` varchar(50) NOT NULL,
`zip` varchar(50) NOT NULL,
`lastmodified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`addressid`),
KEY `contactid` (`contactid`),
CONSTRAINT `address_ibfk_1` FOREIGN KEY (`contactid`) REFERENCES `CONTACT` (`contactid`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
</code></pre>
Then i did add 5 sample records to the address table. When i query address table on my local this is what i get </br>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEj3DmNJ0IffiH7pVuOCed6RFr8Z9iTC6DLTct7qph2GYJYHMcGvzy7G6B_0cl-8NrD5xMBpeQFqsg-8sI1y98XUXohwXdryNC_x92l0fpPvWJWEuyl-M9sGVIhlxXKK452vtTb86a2TY6T3/s1600/Screen+Shot+2017-01-11+at+9.41.10+PM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEj3DmNJ0IffiH7pVuOCed6RFr8Z9iTC6DLTct7qph2GYJYHMcGvzy7G6B_0cl-8NrD5xMBpeQFqsg-8sI1y98XUXohwXdryNC_x92l0fpPvWJWEuyl-M9sGVIhlxXKK452vtTb86a2TY6T3/s640/Screen+Shot+2017-01-11+at+9.41.10+PM.png" width="640" height="414" /></a></div>
After that i did create a Spark Scala project that has mysql-connector-java as one of the dependencies
<script src="https://gist.github.com/sdpatil/e4f86253cd29ffe079d106c8b33aa922.js"></script>
The last step was to create a simple Spark program like this,
<script src="https://gist.github.com/sdpatil/08cc8cb3cbd8f8518a20ef2f44e38926.js"></script>
My program has 4 main sections
<ol>
<li>First is Address as case class with same schema as that of Address table, without lastmodified field </li>
<li>Next is this call to create object of JdbcRDD that says query everything from address with addressid between 1 and 5.
<code>
new JdbcRDD(sparkContext, getConnection,
"select * from address limit ?,?",
0, 5, 1, convertToAddress)
</code>
<li>Then i did define getConnection() method that creates JDBC connection to my database and returns it</li>
<li> Last is the convertToAddress() method that knows how to take a ResultSet and convert it into object of Address</li>
</ol>
When i run this program in IDE this is the output i get
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEigA2ph-TAgfF4q_mTfC_Obt4l9d99rxSS6Zrwgk126mHwjJARgdoVbEDCbNZOX92A2Mf_Rz5hDZSuGKCI8aKtQ-2aEObgKmdt1IQpvZpj4jpAMn6hZdXwPjf1nvHe24Lcmbup2d7u68cWD/s1600/Screen+Shot+2017-01-11+at+9.44.55+PM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEigA2ph-TAgfF4q_mTfC_Obt4l9d99rxSS6Zrwgk126mHwjJARgdoVbEDCbNZOX92A2Mf_Rz5hDZSuGKCI8aKtQ-2aEObgKmdt1IQpvZpj4jpAMn6hZdXwPjf1nvHe24Lcmbup2d7u68cWD/s640/Screen+Shot+2017-01-11+at+9.44.55+PM.png" width="640" height="407" /></a></div>Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com185tag:blogger.com,1999:blog-2202766463251903159.post-33814389473380537252017-01-11T21:08:00.001-08:002017-01-11T21:08:52.995-08:00How to implement cache (LRU Cache) using LinkedHashMap in javaRecently i wanted to implement a simple <a href='http://mcicpc.cs.atu.edu/archives/2012/mcpc2012/lru/lru.html'>Least recently used (LRU)</a> cache in one my applications. But my use case is simple enough that instead of going for something ehcache i decided to build it on own by using <a href='https://docs.oracle.com/javase/7/docs/api/java/util/LinkedHashMap.html'><code>java.util.LinkedHashMap</code></a> <br/>
As you can see from the code its very simple. All you have to do is extend <code>java.util.LinkedHashMap</code> and override its protected <code>removeEldestEntry()</code> method so that it checks if the size of map is greater than a size you specified while creating the Map if yes remove the eldest entry <br/>
<script src="https://gist.github.com/sdpatil/a72bd191efcb3d71535c585567863ab5.js"></script>
Now the question is when Map is full which entry will it remove, you have 2 options
<ol>
<li>Eldest: If you just want to remove the first entry that you inserted in the Map when adding a new entry then in your constructor you could use <code> super(cacheSize, 0.75f);</code>, so LinkedHashMap wont keep track of when a particular entry were accessed.</li>
<li>Least recently used (LRU): But if you want to make sure that the entry that was least recently used should be removed then call <code>super(cacheSize, 0.75f, true);</code> from constructor of your LRUCache so that LinkedHashMap keeps track of when entry was accessed and removes the Least recently used entry</li>
</ol>Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com27tag:blogger.com,1999:blog-2202766463251903159.post-65462203213800018712017-01-11T16:30:00.001-08:002017-01-11T16:30:32.016-08:00Spark Streaming Kafka 10 API Word Count application Scala In <a href='http://wpcertification.blogspot.com/2017/01/spark-kafka-streaming-java-program-word.html'>Spark Kafka Streaming Java program Word Count using Kafka 0.10 API </a> blog entry i talked about how you create a simple java program that uses Spark Streaming's Kafka10 API using Java. This blog entry does the same thing but using Scala. You can download the complete application from <a href='https://github.com/sdpatil/KafkaSpark10'>github</a> <br/>
You can run this sample by first downloading Kafka 0.10.* from <a href='http://kafka.apache.org/'>Apache Kafka WebSite</a>, then you can create and start a test topic and send messages to it by following this <a href='http://kafka.apache.org/quickstart'>Kafka Quick start</a> document <br/>
<script src="https://gist.github.com/sdpatil/2a5031ab0ce0fb12857fc9c0e236db5d.js"></script>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgl6kIjXS43rIWTb-IunHuK3kw-fsGm0Wc4ORsBtii0W8_2VjVuRdmsLKGxrKk3NpWkLgcq3NVYfyp8UlFzpDeU92MiVJLWemeTq_EqQKBilgIiNfgbzsMTCoTNF7VgdUt6XyX5TPQaMtvZ/s1600/Screen+Shot+2017-01-11+at+4.19.21+PM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgl6kIjXS43rIWTb-IunHuK3kw-fsGm0Wc4ORsBtii0W8_2VjVuRdmsLKGxrKk3NpWkLgcq3NVYfyp8UlFzpDeU92MiVJLWemeTq_EqQKBilgIiNfgbzsMTCoTNF7VgdUt6XyX5TPQaMtvZ/s640/Screen+Shot+2017-01-11+at+4.19.21+PM.png" width="640" height="240" /></a></div>Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com19tag:blogger.com,1999:blog-2202766463251903159.post-2986174027678777842017-01-11T15:39:00.000-08:002017-01-11T16:30:59.133-08:00Spark Kafka Streaming Java program Word Count using Kafka 0.10 APIKafka API went through a lot of changes starting Kafka 0.9. Spark Kafka Streaming API also was changed to better support Kafka 0.9. i wanted to try that out so i built this simple Word Count application using Kafka 0.10 API. This blog entry does the same thing but using Scala. You can download the complete application from <a href='https://github.com/sdpatil/KafkaSpark10'>github</a> <br/>
You can run this sample by first downloading Kafka 0.10.* from <a href='http://kafka.apache.org/'>Apache Kafka WebSite</a>, then you can create and start a test topic and send messages to it by following this <a href='http://kafka.apache.org/quickstart'>Kafka Quick start</a> document
First thing i did was to include Kafka 0.10 API dependencies for the Spark Project. As you can see i am using Spark 2.1 version
<script src="https://gist.github.com/sdpatil/251997df661b2c632121052ef5cdeb2e.js"></script>
Then i did create a SparkKafka10.java file that looks like this. Please take a look at comments inside the code for what i am doing.
Now if you create test topic and send messages to it, you should be able to see the wordcount on console
<script src="https://gist.github.com/sdpatil/0ce18ccd7aeb857eb8c851d4e4bf0ac7.js"></script>Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com26tag:blogger.com,1999:blog-2202766463251903159.post-16668421496150246522016-12-31T20:07:00.000-08:002016-12-31T20:07:29.766-08:00How to use ElasticSearch as storage from Hive in cloudera .In the <a href='http://wpcertification.blogspot.com/2014/05/using-elasticsearch-as-external-data.html'> Using ElasticSearch as external data store with apache hive</a> entry i talked about how you can create a table in Hive so that actual data is stored in ElasticSearch. Problem with that approach was that i had to pass the full path to <code>elasticsearch-hadoop-hive-<eshadoopversion>.jar</code> as parameter every time.
<pre><code>
hive -hiveconf hive.aux.jars.path=/opt/elastic/elasticsearch-hadoop-2.4.3/dist/elasticsearch-hadoop-hive-2.4.3.jar;
</code></pre>
Other option for doing same thing is to open hive session and then calling following command as first thing
<pre><code>
ADD JAR /opt/elastic/elasticsearch-hadoop-2.4.3/dist/elasticsearch-hadoop-hive-2.4.3.jar;
</code></pre>
Problem with both these approaches is that you will have to keep letting hive know the full path to elasticsearch jars every single time. Instead you can take care of this issue by copying <code>elasticsearch-hadoop-hive-<eshadoopversion>.jar</code> into same directory on every node in your local machine. In my case i copied it to <code>/usr/lib/hive/lib</code> directory by executing following command
<pre><code>
sudo cp /opt/elastic/elasticsearch-hadoop-2.4.3/dist/elasticsearch-hadoop-hive-2.4.3.jar /usr/lib/hive/lib/.
</code></pre>
Then set the value of Hive Auxiliary JARs Directory <code>hive.aux.jars.path</code> property to <code>/usr/lib/hive/lib</code> directory like this.
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhhUvAXeDdm9-xzK0vpC_tsx2yE0AgfWWxAds9fVm09sMSKg-qZZK9ua8IvP_pgvAgqiALfWTKOn8UWc9EEsLP_g8trGh0kyFx6H27wTgU_bc-AXt9r9MezikEuVWxYd2mEtGFNm56moQqn/s1600/Screenshot+from+2016-12-31+19-37-21.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhhUvAXeDdm9-xzK0vpC_tsx2yE0AgfWWxAds9fVm09sMSKg-qZZK9ua8IvP_pgvAgqiALfWTKOn8UWc9EEsLP_g8trGh0kyFx6H27wTgU_bc-AXt9r9MezikEuVWxYd2mEtGFNm56moQqn/s640/Screenshot+from+2016-12-31+19-37-21.png" width="640" height="364" /></a></div>
Then restart the hive service and now you should be able to access any elastic search backed table without adding the elasticsearch hadoop jar explicitly
Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com3tag:blogger.com,1999:blog-2202766463251903159.post-86616741627534187662016-12-31T14:34:00.001-08:002016-12-31T14:56:03.113-08:00Installing ElasticSearch on existing docker containerI was using a <a href='https://www.cloudera.com/documentation/enterprise/5-6-x/topics/quickstart_docker_container.html'>Cloudera Quickstart docker image</a> for one experiment and wanted to install ElasticSearch on it but i had trouble in accessing from my host, but i found workaround by following these steps
<ul>
<li> First i installed ElasticSearch by downloading and unzipping ElasticSearch version 2.4.3 and unzipping it in /opt/elastic folder</li>
<li>Then i started elasticsearch by executing <code>/bin/elasticsearch</code>, and it started ok. When i ran
<code>curl -XGET "http://localhost:9200/</code> from inside docker images i was able to access ElasticSearch, but when i tried to access it from my host machine, i could not access it. But where as i was able to access other services running on my docker image. So first i tried running netstat on my docker image to see why i was able to access other services but not elasticsearch (Also to make sure if elasticsearch was actually listening on port 9200 and i got output like this
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjWhgJf7B1Zqf3DL0F5SuXrgGSdEGbFy2boycYZEH5yEGUo7h_zWLpXnxR-OBP3ONAXYVGghd62SxpaoBFoBF2-UKrgf4S9iSjmHXWYooIOkLGYiSlokIjkpHG35hgX1UCuzWdsrdyJLAdl/s1600/Screenshot+from+2016-12-31+11-50-51.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjWhgJf7B1Zqf3DL0F5SuXrgGSdEGbFy2boycYZEH5yEGUo7h_zWLpXnxR-OBP3ONAXYVGghd62SxpaoBFoBF2-UKrgf4S9iSjmHXWYooIOkLGYiSlokIjkpHG35hgX1UCuzWdsrdyJLAdl/s640/Screenshot+from+2016-12-31+11-50-51.png" width="640" height="431" /></a></div>
</li>
<li>Looking at the port mapping i could see that other services were mapped to 0.0.0.0 but elasticsearch was only mapped to 127.0.0.1, so i opene <ELASTICSEARCH_HOME>/config/elasticsearch.yml and added two more lines to it like this
<pre><code>
http.host: 0.0.0.0
transport.host: 127.0.0.1
</code></pre>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgrQqkjXXGsmvKaGjj1vW_N02LCq4KqDEd4OeKLeaKvSXACpM_MuwRK2q-K0KykTlE4KOIEW3nuy5pvSEqY2iy4czNFUZcWI2TOFJfa-ZzA7FyyGiBLvn0OywPsj2Zz8_xjr0MJjnf9kV61/s1600/elasticymlchanges.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgrQqkjXXGsmvKaGjj1vW_N02LCq4KqDEd4OeKLeaKvSXACpM_MuwRK2q-K0KykTlE4KOIEW3nuy5pvSEqY2iy4czNFUZcWI2TOFJfa-ZzA7FyyGiBLvn0OywPsj2Zz8_xjr0MJjnf9kV61/s640/elasticymlchanges.png" width="640" height="431" /></a></div>
</li>
<li>Then i restarted elasticsearch server and i checked the netstat again and i could see this mapping, and when i tried accessing elasticsearch from host machine it worked
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjf_gg4k-1vdgv074gGpY791ePHOC4EAt9xHMOmxjf2EciMNRkP4X7bwTKxTdFqRVFujdIvriqjumOUM9GQo2mWDyIJEsFjA_oJ_dT6txM2H-iNirgKwpl7_7fxW4j7HuNGj5OKyVkfqBDX/s1600/Screenshot+from+2016-12-31+14-33-09.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjf_gg4k-1vdgv074gGpY791ePHOC4EAt9xHMOmxjf2EciMNRkP4X7bwTKxTdFqRVFujdIvriqjumOUM9GQo2mWDyIJEsFjA_oJ_dT6txM2H-iNirgKwpl7_7fxW4j7HuNGj5OKyVkfqBDX/s640/Screenshot+from+2016-12-31+14-33-09.png" width="640" height="431" /></a></div>
</li>
</ul>Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com8tag:blogger.com,1999:blog-2202766463251903159.post-79630053325911897812016-12-25T20:14:00.001-08:002016-12-25T20:14:43.148-08:00Sending and Receiving JSON messages in KafkaSometime back i wrote couple of articles for Java World about Kafka <a href='http://www.javaworld.com/article/3060078/big-data/big-data-messaging-with-kafka-part-1.html'>Big data messaging with Kafka, Part 1</a> and <a href='http://www.javaworld.com/article/3066873/big-data/big-data-messaging-with-kafka-part-2.html'>Big data messaging with Kafka, Part 2</a>, you can find basic Producer and Consumer for Kafka along with some basic samples. <br/>
I wanted to figure out how do i pass JSON message using Kafka. It looks like Kafak Connect provides a simple JSON Serializer <code>org.apache.kafka.connect.json.JsonSerializer</code> and Desrializer <code>org.apache.kafka.connect.json.JsonDeserializer</code> that uses Jackson JSON parser. I wanted to figure out how to use it, so i built following sample
<ul>
<li> First i did create a Contact Object, which is a simple Pojo that has 3 fields <code>contactId, firstName and lastName</code>. Take a look at main() method, in which i create simple object of Contact and then convert it to JSON and write to console.
<script src="https://gist.github.com/sdpatil/3db909176fc8e675d5be03d3e8b1576a.js"></script>
</li>
<li>
Next i created Producer.java, which reads values in CSV format like <code>1,Sunil,Patil</code> from command line and parse it to Contact object first. Then i convert Contact object into <code>JSONNode</code> and pass it as value to Kafka, The <code>JSONSerializer</code> converts the <code>JsonNode</code> into byte[] <br/>
The producer code is mostly same as one required for passing String, with difference that on line 35, i am creating object of <code>com.fasterxml.jackson.databind.ObjectMapper</code> and then on line 41 i am converting Contact object into JSONNode by calling <code>objectMapper.valueToTree(contact)</code>
<script src="https://gist.github.com/sdpatil/f803ff60dd0e9c5f8664fcd5fbb60242.js"></script>
</li>
<li>
Since i am using <code>org.apache.kafka.connect.json.JsonSerializer</code> on the producer i have to use <code>org.apache.kafka.connect.json.JsonDeserializer</code> on the Consumer, Then while creating <code>KafkaConsumer</code> object i declare that i will get String key and JSONNode as value. Then once i get messages from Kafka i am calling <code>mapper.treeToValue(jsonNode,Contact.class)</code> to read the message and convert it back to Contact object.
<script src="https://gist.github.com/sdpatil/5faadf78373f0cee312274c25399e367.js"></script>
</li>
</ul>
Now you can run the producer and consumer with same topic name and it should work
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh_tDY63WK3C7w2brGWL8HYCB4XxcZ9FgXJqTE7AVhCwiABmW277_F82CC1wSJzxB3trfe893LXXumhA0N2hDiSUaNlWBX9DhtwBF9R-HkiQaB_XMbponHOwWZKaKaRc30ZvR-qS9jVooVY/s1600/Screen+Shot+2016-12-25+at+8.13.46+PM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh_tDY63WK3C7w2brGWL8HYCB4XxcZ9FgXJqTE7AVhCwiABmW277_F82CC1wSJzxB3trfe893LXXumhA0N2hDiSUaNlWBX9DhtwBF9R-HkiQaB_XMbponHOwWZKaKaRc30ZvR-qS9jVooVY/s640/Screen+Shot+2016-12-25+at+8.13.46+PM.png" width="640" height="632" /></a></div>Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com108tag:blogger.com,1999:blog-2202766463251903159.post-20208324078886877332016-12-22T16:18:00.001-08:002016-12-22T16:18:31.205-08:00Importing data from RDBMS into Hive using Sqoop and oozie (hive-import)In the <a href='http://wpcertification.blogspot.com/2016/12/how-to-run-sqoop-command-from-oozie.html'>How to run Sqoop command from oozie</a> entry i talked about how you can use Oozie and Sqoop to import data into HDFS. I wanted to change it to use sqoop's hive-import option, which in addition to importing data into HDFS also creats Hive table on top of the data. These are the steps that i followed
<ul>
<li>
First i changed the workflow.xml to take out <code>as-avrodatafile</code> and added <code>hive-import</code> option and i re-ran the workflow that looks like this
<script src="https://gist.github.com/sdpatil/a168f1fe51ccde711d77b0492cfea79b.js"></script>
When i did that the oozie workflow failed with following error
<pre><code>
7936 [uber-SubtaskRunner] WARN org.apache.sqoop.mapreduce.JobBase - SQOOP_HOME is unset. May not be able to find all job dependencies.
9202 [uber-SubtaskRunner] DEBUG org.apache.sqoop.mapreduce.db.DBConfiguration - Fetching password from job credentials store
9207 [uber-SubtaskRunner] INFO org.apache.sqoop.mapreduce.db.DBInputFormat - Using read commited transaction isolation
9210 [uber-SubtaskRunner] DEBUG org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat - Creating input split with lower bound '1=1' and upper bound '1=1'
25643 [uber-SubtaskRunner] INFO org.apache.sqoop.mapreduce.ImportJobBase - Transferred 931.1768 KB in 17.6994 seconds (52.6107 KB/sec)
25649 [uber-SubtaskRunner] INFO org.apache.sqoop.mapreduce.ImportJobBase - Retrieved 12435 records.
25649 [uber-SubtaskRunner] DEBUG org.apache.sqoop.hive.HiveImport - Hive.inputTable: customers
25650 [uber-SubtaskRunner] DEBUG org.apache.sqoop.hive.HiveImport - Hive.outputTable: customers
25653 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager - Execute getColumnInfoRawQuery : SELECT t.* FROM `customers` AS t LIMIT 1
25653 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager - No connection paramenters specified. Using regular API for making connection.
25658 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager - Using fetchSize for next query: -2147483648
25658 [uber-SubtaskRunner] INFO org.apache.sqoop.manager.SqlManager - Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
25659 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager - Found column customer_id of type [4, 11, 0]
25659 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager - Found column customer_fname of type [12, 45, 0]
25659 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager - Found column customer_lname of type [12, 45, 0]
25660 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager - Found column customer_email of type [12, 45, 0]
25660 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager - Found column customer_password of type [12, 45, 0]
25660 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager - Found column customer_street of type [12, 255, 0]
25660 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager - Found column customer_city of type [12, 45, 0]
25660 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager - Found column customer_state of type [12, 45, 0]
25660 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager - Found column customer_zipcode of type [12, 45, 0]
25663 [uber-SubtaskRunner] DEBUG org.apache.sqoop.hive.TableDefWriter - Create statement: CREATE TABLE IF NOT EXISTS `customers` ( `customer_id` INT, `customer_fname` STRING, `customer_lname` STRING, `customer_email` STRING, `customer_password` STRING, `customer_street` STRING, `customer_city` STRING, `customer_state` STRING, `customer_zipcode` STRING) COMMENT 'Imported by sqoop on 2016/12/22 21:18:39' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\012' STORED AS TEXTFILE
25664 [uber-SubtaskRunner] DEBUG org.apache.sqoop.hive.TableDefWriter - Load statement: LOAD DATA INPATH 'hdfs://quickstart.cloudera:8020/user/cloudera/customers' INTO TABLE `customers`
25667 [uber-SubtaskRunner] INFO org.apache.sqoop.hive.HiveImport - Loading uploaded data into Hive
25680 [uber-SubtaskRunner] DEBUG org.apache.sqoop.hive.HiveImport - Using in-process Hive instance.
25683 [uber-SubtaskRunner] DEBUG org.apache.sqoop.util.SubprocessSecurityManager - Installing subprocess security manager
Intercepting System.exit(1)
<b>
<<< Invocation of Main class completed <<<
Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SqoopMain], exit code [1]
Oozie Launcher failed, finishing Hadoop job gracefully
Oozie Launcher, uploading action data to HDFS sequence file: hdfs://quickstart.cloudera:8020/user/cloudera/oozie-oozi/0000007-161222163830473-oozie-oozi-W/sqoop-52c0--sqoop/action-data.seq
Oozie Launcher ends</b>
</code></pre>
</li>
<li>
As you can see from the log the Sqoop job was able to import data into HDFS in <code>/user/cloudera/customers</code> directory and i could actually see the data in the directory. But when Sqoop tried to create the table in hive it failed and the table did not get created in hive, this is the log statement that i am referring to
<code>
CREATE TABLE IF NOT EXISTS `customers` ( `customer_id` INT, `customer_fname` STRING, `customer_lname` STRING, `customer_email` STRING, `customer_password` STRING, `customer_street` STRING, `customer_city` STRING, `customer_state` STRING, `customer_zipcode` STRING) COMMENT 'Imported by sqoop on 2016/12/22 21:18:39' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\012' STORED AS TEXTFILE
</code>
</li>
<li> So it seems the problem is Sqoop needs hive-site.xml so that it knows how to talk to hive service, for that first i search my sandbox to figure out where hive-site.xml is located, i executed following command to first find the hive-site.xml and then uploading it to HDFS
<code>
sudo find / -name hive-site.xml
hdfs dfs -put /etc/hive/conf.dist/hive-site.xml
</code>
</li>
<li>
After that i went back to the workflow.xml and modified it to look like this
<script src="https://gist.github.com/sdpatil/85083439daca44f036936a8038164691.js"></script>
</li>
</ul>
Now when i ran the oozie workflow it was successful and i could query customer data Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com5tag:blogger.com,1999:blog-2202766463251903159.post-72393493946473657932016-12-22T12:31:00.001-08:002016-12-31T20:12:07.270-08:00How to run Sqoop command from oozieIn the <a href='http://wpcertification.blogspot.com/2016/12/importing-data-from-sqoop-into-hive.html'>Importing data from Sqoop into Hive External Table with Avro encoding updated</a> i blogged about how you can use sqoop to import data from RDBMS into Hadoop. I wanted to test if i can use Oozie for invoking Sqoop command and i followed these steps for doing that.
<ol>
<li>
First i tried executing this command from my command line on Hadoop cluster to make sure that i can actually run sqoop without any problem
<pre><code>
sqoop import --connect jdbc:mysql://localhost/test
--username root
--password cloudera
--table CUSTOMER
--as-avrodatafile
</code></pre>
<li>
Once the sqoop command was successfully executed i went back and deleted the CUSTOMER directory from HDFS to make sure that i could re-import data using following command
<pre><code>
hdfs dfs -rm -R CUSTOMER
</code></pre>
</li>
<li>
Next i went to Hue to create oozie workflow with single sqoop command that i had executed before
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhYIYw0QCjGtVidGNtptHyxpey_SomwM5F-3CreNNyW7I9Fotyq46iQfp0w5yI17JdgYA67hDo9v4FfKMA7ZUJgs1k4tzkfpKwXsnosEnePXwtwYmUCvKwOm1ydcjMTbm-cgrwiNnFpUft2/s1600/Screenshot+from+2016-12-22+12-13-49.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhYIYw0QCjGtVidGNtptHyxpey_SomwM5F-3CreNNyW7I9Fotyq46iQfp0w5yI17JdgYA67hDo9v4FfKMA7ZUJgs1k4tzkfpKwXsnosEnePXwtwYmUCvKwOm1ydcjMTbm-cgrwiNnFpUft2/s640/Screenshot+from+2016-12-22+12-13-49.png" width="640" height="366" /></a></div>
But if your not using the Hue console you can create workflow.xml manually like this
<script src="https://gist.github.com/sdpatil/2ec0af268952fa4a323af8308d00bd38.js"></script>
Also make sure to create job.properties file like this
<script src="https://gist.github.com/sdpatil/d65d41511ae95d0b8754d16cce3393d6.js"></script>
Take a look at <a href='http://wpcertification.blogspot.com/2014/07/enabling-oozie-console-on-cloudera-vm.html'>Enabling Oozie console on Cloudera VM 4.4.0 and executing examples</a> for information on how to run oozie job from command line
</li>
<li> Next when i ran the Oozie workflow, the job failed with following error, which indicates that Oozie does not have the MySQL JDBC driver.
<pre><code>
java.lang.RuntimeException: Could not load db driver class: com.mysql.jdbc.Driver
at org.apache.sqoop.manager.SqlManager.makeConnection(SqlManager.java:875)
at org.apache.sqoop.manager.GenericJdbcManager.getConnection(GenericJdbcManager.java:52)
at org.apache.sqoop.manager.SqlManager.execute(SqlManager.java:763)
at org.apache.sqoop.manager.SqlManager.execute(SqlManager.java:786)
at org.apache.sqoop.manager.SqlManager.getColumnInfoForRawQuery(SqlManager.java:289)
at org.apache.sqoop.manager.SqlManager.getColumnTypesForRawQuery(SqlManager.java:260)
at org.apache.sqoop.manager.SqlManager.getColumnTypes(SqlManager.java:246)
at org.apache.sqoop.manager.ConnManager.getColumnTypes(ConnManager.java:327)
at org.apache.sqoop.orm.ClassWriter.getColumnTypes(ClassWriter.java:1846)
at org.apache.sqoop.orm.ClassWriter.generate(ClassWriter.java:1646)
at org.apache.sqoop.tool.CodeGenTool.generateORM(CodeGenTool.java:107)
at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:478)
at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:605)
at org.apache.sqoop.Sqoop.run(Sqoop.java:143)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:179)
at org.apache.sqoop.Sqoop.runTool(Sqoop.java:218)
at org.apache.sqoop.Sqoop.runTool(Sqoop.java:227)
at org.apache.sqoop.Sqoop.main(Sqoop.java:236)
at org.apache.oozie.action.hadoop.SqoopMain.runSqoopJob(SqoopMain.java:197)
at org.apache.oozie.action.hadoop.SqoopMain.run(SqoopMain.java:177)
at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:49)
</code></pre>
</li>
<li>So first thing i did was to check if mysql driver is there in the oozie shared lib by executing following commands
<pre><code>
export OOZIE_URL=http://localhost:11000/oozie
oozie admin -shareliblist sqoop
</pre></code>
I noticed that the mysql-connector-java.jar was not there in the list of shared libs for Oozie + sqoop
</li>
<li> Next step was to find the mysql-connector-java.jar in my sandbox that i could do by finding it like this
<pre><code>
sudo find / -name mysql*
</code></pre>
I found mysql-connector-java.jar on my local machine at <code>/var/lib/sqoop/mysql-connector-java.jar </code>
</li>
<li>I wanted to update the Oozie shared lib to include the mysql driver jar. So i executed following command to figure out the directory where the oozie sqoop shared lib is
<pre><code>
oozie admin -sharelibupdate
</code></pre>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiinppvS5TS503bUjIGmaQ7lpHVPu-v8NQx57G4_P7jlHl1qsqPV-1TPnzrem3VbnKMa1Y2KM19Jks_4rJOv7IwTcNX-33HvKYuymyEncdPNPdxb13YJebjQmuH04un6mtDJYNZh-eoo8Ch/s1600/Screenshot+from+2016-12-22+12-22-54.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiinppvS5TS503bUjIGmaQ7lpHVPu-v8NQx57G4_P7jlHl1qsqPV-1TPnzrem3VbnKMa1Y2KM19Jks_4rJOv7IwTcNX-33HvKYuymyEncdPNPdxb13YJebjQmuH04un6mtDJYNZh-eoo8Ch/s400/Screenshot+from+2016-12-22+12-22-54.png" width="400" height="57" /></a></div>
From this output i got HDFS directory location for Oozie shared lib which is <code>/user/oozie/share/lib/lib_20160406022812</code>
</li>
<li> Then i used following two commands to first copy the db driver into the oozie shared lib and making sure it is accessible to other users
<code>
hdfs -copyFromLocal /var/lib/sqoop/mysql-connector-java.jar /user/oozie/share/lib/sqoop/.
hdfs dfs -chmod 777 /user/oozie/share/lib/sqoop/mysql-connector-java.jar
</code>
</li>
<li>Now the last step was to let Oozie know that it should reload the sharedlib and i did that by executing following two commands
<pre><code>
oozie admin -sharedlibupdate
oozie admin -shareliblist sqoop | grep mysql*
</code></pre>
The second command queries oozie to get current list of shared jars and i could see mysql-connector-java.jar listed in it like this
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjGNLZ_waPv_kQ9U_cZhvPmB3iyBLgVCo1WSix2xEBG0w1Tj7HtGoIkxAfx187F9CEqEDMgxD5in2ac0EZJ4rRk_ehu6fBhS5MDes2P7YaVx_Yz1kadhMi7LDPFTaVvgEW4mZUrXo27NIxS/s1600/Screenshot+from+2016-12-22+12-26-36.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjGNLZ_waPv_kQ9U_cZhvPmB3iyBLgVCo1WSix2xEBG0w1Tj7HtGoIkxAfx187F9CEqEDMgxD5in2ac0EZJ4rRk_ehu6fBhS5MDes2P7YaVx_Yz1kadhMi7LDPFTaVvgEW4mZUrXo27NIxS/s400/Screenshot+from+2016-12-22+12-26-36.png" width="400" height="82" /></a></div>
</li>
</ol>
When i re-executed the ooize job again this time it ran successfully.
Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com11tag:blogger.com,1999:blog-2202766463251903159.post-15918722831082033332016-12-22T10:06:00.003-08:002016-12-22T10:06:53.935-08:00 Importing data from Sqoop into Hive External Table with Avro encoding updatedIn the <a href='http://wpcertification.blogspot.com/2015/05/importing-data-from-sqoop-into-hive.html'> Importing data from Sqoop into Hive External Table with Avro encoding</a> i had details on how you can import a table from RDBMS into Hive using Sqoop in Avro format. In that blog i went through few steps to get the avsc file, but i realized there is easier way to do it following these steps
<ol>
<li>
First execute the <code>sqoop import</code> command like this, make sure that you pass <code>--outdir schema</code> as parameters to the sqoop import command, what that does is it generates the CUSTOMER.avsc and CUSTOMER.java in the schema directory on your local machine
<pre><code>
sqoop import --connect jdbc:mysql://localhost/test
--username root
--password cloudera
--table CUSTOMER
--as-avrodatafile
<b>--outdir schema</b>
</code></pre>
</li>
<li>
You can verify that CUSTOMER.avsc file got created as you expected by executing <code>ls -ltrA schema</code>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh7V1Djt1RPTSZnJo3eYABokbz86r49BrkjZa-zRqOD915UnW-U4swmy2VQQffIZLAeB4an9SsYTMvatKtdbIvOixpzLfyF3HVWP5NqYDVPgmqF-P4WzQvxGbJWfneICfUnSigW-yPCY_rb/s1600/Screenshot+from+2016-12-22+10-02-46.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh7V1Djt1RPTSZnJo3eYABokbz86r49BrkjZa-zRqOD915UnW-U4swmy2VQQffIZLAeB4an9SsYTMvatKtdbIvOixpzLfyF3HVWP5NqYDVPgmqF-P4WzQvxGbJWfneICfUnSigW-yPCY_rb/s400/Screenshot+from+2016-12-22+10-02-46.png" width="400" height="254" /></a></div>
</li>
<li>
Next create schema directory in HDFS by executing hdfs mkdir command like this
<pre><code>
hdfs dfs -mkdir /user/cloudera/schema
</code></pre>
</li>
<li>
Copy the CUSTOMER.avsc from your local schema directory to HDFS in schema directory by executing following command
<pre><code>
hdfs dfs -copyFromLocal schema/CUSTOMER.avsc /user/cloudera/schema/.
</code></pre>
</li>
<li>
Last step is to create Hive table with CUSTOMER.avsc as schema using following command
<pre><code>
CREATE EXTERNAL TABLE CUSTOMER
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/user/cloudera/CUSTOMER'
TBLPROPERTIES ('avro.schema.url'='/user/cloudera/schema/CUSTOMER.avsc');
</code></pre>
</li>
</ol>
Now if you go to hive and execute "SELECT * FROM CUSTOMER;" query then you should see 1 record in it like this
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEijqokkQK9bffQ7tmTYA7EIJ895xAjM8HSaMVaadFUG5AwTJ8EY8HXwAW7VnwFSumMgA0idaXXdkVdIJhk6gBW_X_0n40i9vWEk2QEN1gIUXLtDl8lviqeaJlpNk1zQic70TjJLNQFpiVv0/s1600/Screenshot+from+2016-12-22+09-52-08.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEijqokkQK9bffQ7tmTYA7EIJ895xAjM8HSaMVaadFUG5AwTJ8EY8HXwAW7VnwFSumMgA0idaXXdkVdIJhk6gBW_X_0n40i9vWEk2QEN1gIUXLtDl8lviqeaJlpNk1zQic70TjJLNQFpiVv0/s400/Screenshot+from+2016-12-22+09-52-08.png" width="400" height="254" /></a></div>Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com27tag:blogger.com,1999:blog-2202766463251903159.post-38815711966311982922016-11-23T14:27:00.000-08:002016-11-23T14:30:26.416-08:00Writing data from Spark to ElasticSearch<a href="https://www.elastic.co/guide/en/elasticsearch/hadoop/current/index.html">ElasticSearch for Apache Hadoop</a> project has introduced a way to directly write to ElasticSearch without going through <a href='http://wpcertification.blogspot.com/2014/05/using-elasticsearch-to-store-output-of.html'>Elastic Search OutputFormat</a>. I wanted to try that out so i built simple application that saves output of word count into Elastic Search, you can download this project from <a href='https://github.com/sdpatil/HelloElasticWriter'>github</a>
First thing that i had to do was to build maven pom.xml that includes <code>org.elasticsearch.elasticsearch-hadoop</code> version 5.0 jar. I could not find it in the regular maven repository so i had to include elasticsearch repository in my pom.xml
<script src="https://gist.github.com/sdpatil/6cc5ba0c9315f5e844c49e30e58c25ab.js"></script>
Then this is how my Spark program looks like, the main part is line 42 where i create Map of all the properties that i need for saving this RDD into ElasticSearch and then line 43, where i am calling <code>wordCountJson.saveToEs(esMap)</code>, which actually takes care of writing data into elasticsearch
<script src="https://gist.github.com/sdpatil/dce158529204ee22ffac9b3792a946fa.js"></script>
Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com0tag:blogger.com,1999:blog-2202766463251903159.post-77796088233172699622016-11-22T13:24:00.001-08:002016-11-22T15:36:21.882-08:00How to use KafkaLog4jAppender for sending Log4j logs to kafkaApache Kafka has a KafkaLog4jAppender that you can use for redirecting your Log4j log to Kafka topic. I wanted to try it out so i used following steps, you can download sample project from <a href='https://github.com/sdpatil/HelloKafkaLogger'>here</a>
First i created a simple standalone java program that use Log4j like this. As you can see this is like any other normal Java program that uses Log4j.
<script src="https://gist.github.com/sdpatil/dc7807b6eaa3813ebb99ab98ea29e184.js"></script>
Then in the log4j.properties file i added line 12 to 17 for using KafkaLog4jAppender, on line 13, value of brokerList property points to the Kafka server and line 14 value of topic points to the Kafka topic name to which logs should go.
<script src="https://gist.github.com/sdpatil/815b25b6ddbe151514a0233d54fc54b9.js"></script>
Now before running this program make sure that you actually have topic named kafkalogger, if not you can create using this command
<pre><code>
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkalogger
</code></pre>
You can verify if you have topic named kafkalogger by executing following command
<pre><code>
bin/kafka-topics.sh --list --zookeeper localhost:2181
</code></pre>
Also you can run kafka console consumer that reads messages from Kafka and prints them to console, using following command
<pre><code>
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkalogger
</code></pre>
Now when you run your java program you should see messages on console like this
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEj8-nJ2TFegsAFtTCd72H6HiD46eXibTP6n7YX9hkyg2xLDFiD4v0Lvq1t6CFiQx6-BfuoFzgtcOMSyZ3P_OzwZ0VzNhWPxxSyfs9hhPEREoqjcLObv-OdSvY97ILw7_Kn61BeSCCn93MVh/s1600/Screen+Shot+2016-11-22+at+1.23.50+PM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEj8-nJ2TFegsAFtTCd72H6HiD46eXibTP6n7YX9hkyg2xLDFiD4v0Lvq1t6CFiQx6-BfuoFzgtcOMSyZ3P_OzwZ0VzNhWPxxSyfs9hhPEREoqjcLObv-OdSvY97ILw7_Kn61BeSCCn93MVh/s640/Screen+Shot+2016-11-22+at+1.23.50+PM.png" width="640" height="452" /></a></div>
Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com15tag:blogger.com,1999:blog-2202766463251903159.post-25797196923279016612016-07-04T11:47:00.000-07:002016-07-04T11:47:08.060-07:00WordCount program using Spark DataFrameI wanted to figure out how to write Word Count Program using Spark DataFrame API, so i followed these steps.
Import <code>org.apache.spark.sql.functions._</code>, it includes UDF's that i need to use
<code>
import org.apache.spark.sql.functions._
</code>
Create a data frame by reading README.md. When you read the file, spark will create a data frame with single column value, the content of the value column would be the line in the file
<pre><code>
val df = sqlContext.read.text("README.md")
df.show(10,truncate=false)
</code></pre>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjWHu04n9g6oLsQMuxsgHYr0ncy4AGoCDBw5EFmgW-Da2raEniOd_yx1npS8qBXSs7RUpUP3lxHhtp7EZ5dLAFGODf3Rq2zYBhW3dhtft4C8nqQ7Ape6WSsGgf95SJfvMYmdr61ZNrn1F_h/s1600/Screen+Shot+2016-07-04+at+11.26.25+AM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjWHu04n9g6oLsQMuxsgHYr0ncy4AGoCDBw5EFmgW-Da2raEniOd_yx1npS8qBXSs7RUpUP3lxHhtp7EZ5dLAFGODf3Rq2zYBhW3dhtft4C8nqQ7Ape6WSsGgf95SJfvMYmdr61ZNrn1F_h/s400/Screen+Shot+2016-07-04+at+11.26.25+AM.png" width="400" height="250" /></a></div>
Next split each of the line into words using split function. This will create a new DataFrame with words column, each words column would have array of words for that line
<pre><code>
val wordsDF = df.select(split(df("value")," ").alias("words"))
wordsDF.show(10,truncate=false)
</code></pre>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgx2nWsKX3sCBbMQcFvqYxDWjw1UVeCiJo3OSxQ4jBdKOlw1d0E6ivL7Z5d31NeMRL-91DGLy5k1JVK3rNVnoLMBf6MbIzK0tQ9JVQWQr1vpnYLo7BU6h7ZSJ33H4AKW95dvDnxCGpkSNo2/s1600/Screen+Shot+2016-07-04+at+11.28.01+AM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgx2nWsKX3sCBbMQcFvqYxDWjw1UVeCiJo3OSxQ4jBdKOlw1d0E6ivL7Z5d31NeMRL-91DGLy5k1JVK3rNVnoLMBf6MbIzK0tQ9JVQWQr1vpnYLo7BU6h7ZSJ33H4AKW95dvDnxCGpkSNo2/s400/Screen+Shot+2016-07-04+at+11.28.01+AM.png" width="400" height="216" /></a></div>
Next use explode transformation to convert the words array into a dataframe with word column. This is equivalent of using flatMap() method on RDD
<pre><code>
val wordDF = wordsDF.select(explode(wordsDF("words")).alias("word"))
wordsDF.show(10,truncate=false)
</code></pre>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgOsV7qT7lUYzodW763wrO9nkPeBgVmA4ANpAOatP-DJs0QNIjzhHm_bherjHH-Quxtspjf53hXf0nUoVgz9U0q6rOaicrKH0_JLSdQczs-iemwcOpX5Qbtom3YrIJqPcoNK5cmGNIJ96gB/s1600/Screen+Shot+2016-07-04+at+11.29.53+AM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgOsV7qT7lUYzodW763wrO9nkPeBgVmA4ANpAOatP-DJs0QNIjzhHm_bherjHH-Quxtspjf53hXf0nUoVgz9U0q6rOaicrKH0_JLSdQczs-iemwcOpX5Qbtom3YrIJqPcoNK5cmGNIJ96gB/s400/Screen+Shot+2016-07-04+at+11.29.53+AM.png" width="372" height="400" /></a></div>
Now you have data frame with each line containing single word in the file. So group the data frame based on word and count the occurrence of each word
<pre><code>
val wordCountDF = wordDF.groupBy("word").count
wordCountDF.show(truncate=false)
</code></pre>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh9tTRhLayW7AGKaaWEONybeVNbs5rg64Ac9AbwL7J5EgGxQdv8CUgZ2L7DKw35e_zVRVG7RwQU1ogBd_DrEHZRQ8RmG1VjdZX3vgYNxDWY77BeKssCWAu-iQ_6UDKw1UNZKvKh81W8ST1Y/s1600/Screen+Shot+2016-07-04+at+11.34.06+AM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh9tTRhLayW7AGKaaWEONybeVNbs5rg64Ac9AbwL7J5EgGxQdv8CUgZ2L7DKw35e_zVRVG7RwQU1ogBd_DrEHZRQ8RmG1VjdZX3vgYNxDWY77BeKssCWAu-iQ_6UDKw1UNZKvKh81W8ST1Y/s400/Screen+Shot+2016-07-04+at+11.34.06+AM.png" width="400" height="366" /></a></div>
This is the code you need if you want to figure out 20 top most words in the file
<pre><code>
wordCountDF.orderBy(desc("count")).show(truncate=false)
</code></pre>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhjQ2tu61eu63YdikFR9RqMCpkK41CF-q6aHkmKWnayXz1Rx3PqniXTWZWt_hQYUMEOhrTDQ8oNFY3BS89ukKrg3kQ0gjZwt9fwepY1LhGEhz2zupOEOasFBJsbV9GlZHF9VR_FXri8lEQr/s1600/Screen+Shot+2016-07-04+at+11.35.26+AM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhjQ2tu61eu63YdikFR9RqMCpkK41CF-q6aHkmKWnayXz1Rx3PqniXTWZWt_hQYUMEOhrTDQ8oNFY3BS89ukKrg3kQ0gjZwt9fwepY1LhGEhz2zupOEOasFBJsbV9GlZHF9VR_FXri8lEQr/s400/Screen+Shot+2016-07-04+at+11.35.26+AM.png" width="400" height="383" /></a></div>Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com12tag:blogger.com,1999:blog-2202766463251903159.post-50095622736748239412016-07-03T11:09:00.001-07:002016-11-23T14:31:54.903-08:00How to use built in spark UDF's In the <a href=''></a> i talked about how to create a custom UDF in scala for spark. But before you do that always check <a href='https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html'>Spark UDF's</a> that are available with Spark already.
I have this sample Spark data frame with list of users
<script src="https://gist.github.com/sdpatil/26cfc62d0ec75b9269da7138695f44e2.js"></script>
I wanted to sort the list of users in descending order of age so i used following 2 lines, first is to import functions that are available with Spark already and then i used <code>desc</code> function to order age in descending order
<pre><code>
import org.apache.spark.sql.functions._
display(userDF.orderBy(desc("age")))
</code></pre>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgwJsrNmosOBxhZweuHjPtRklETDn56Stgg0zcpnCF7E_MmTkE2MRiabHm5_7KuC0oF-WXGcQQg_YLNnhA2ic_483_1GE-zBE4GHdoO2fevE0Q4z7lReaAFvq3pU95wqfTbSDhZtVFuf2zM/s1600/Screen+Shot+2016-07-03+at+11.00.12+AM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgwJsrNmosOBxhZweuHjPtRklETDn56Stgg0zcpnCF7E_MmTkE2MRiabHm5_7KuC0oF-WXGcQQg_YLNnhA2ic_483_1GE-zBE4GHdoO2fevE0Q4z7lReaAFvq3pU95wqfTbSDhZtVFuf2zM/s400/Screen+Shot+2016-07-03+at+11.00.12+AM.png" width="400" height="109" /></a></div>
Now if i wanted to sort the data frame records using age in ascending order
<pre><code>
display(userDF.orderBy(asc("age")))
</code></pre>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhDLR5K6oK9vBQFgeKPFuiXVjkciFFxW5UUK9KKwb04eah3wQhd5j8mNx2lNQouRTMsAH_Sny3cOZeCAZMEAuAZbxlJz4s6qA7PemN6BeBmyU1-E5b5_q8Lq8anes8GgWyQeVrwCng4O4L9/s1600/Screen+Shot+2016-07-03+at+11.00.01+AM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhDLR5K6oK9vBQFgeKPFuiXVjkciFFxW5UUK9KKwb04eah3wQhd5j8mNx2lNQouRTMsAH_Sny3cOZeCAZMEAuAZbxlJz4s6qA7PemN6BeBmyU1-E5b5_q8Lq8anes8GgWyQeVrwCng4O4L9/s400/Screen+Shot+2016-07-03+at+11.00.01+AM.png" width="400" height="93" /></a></div>
This is sample of how to use the <code>sum()</code> function
<pre><code>
userDF.select(sum("age")).show
</code></pre>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgdrpPd-FRZj07cy-vQUQl3MrXy2fZzUVJt35FnpZxw-1c-575dOWRnGPlVOcKCBfVhd9H3WpW7_xG5zOEKOwSOJyVJ7cKHHgA7eYJYD2vaUsAxPEyn9opDE5Buz-LVoWFloOyuJpqUFjBK/s1600/Screen+Shot+2016-07-03+at+10.59.38+AM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgdrpPd-FRZj07cy-vQUQl3MrXy2fZzUVJt35FnpZxw-1c-575dOWRnGPlVOcKCBfVhd9H3WpW7_xG5zOEKOwSOJyVJ7cKHHgA7eYJYD2vaUsAxPEyn9opDE5Buz-LVoWFloOyuJpqUFjBK/s400/Screen+Shot+2016-07-03+at+10.59.38+AM.png" width="400" height="281" /></a></div>Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com1tag:blogger.com,1999:blog-2202766463251903159.post-70993154121040132592016-07-03T11:09:00.000-07:002016-08-16T10:02:49.805-07:00How to use built in spark UDF's In the <a href=''></a> i talked about how to create a custom UDF in scala for spark. But before you do that always check <a href='https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html'>Spark UDF's</a> that are available with Spark already.
I have this sample Spark data frame with list of users
<script src="https://gist.github.com/sdpatil/26cfc62d0ec75b9269da7138695f44e2.js"></script>
I wanted to sort the list of users in descending order of age so i used following 2 lines, first is to import functions that are available with Spark already and then i used <code>desc</code> function to order age in descending order
<pre><code>
import org.apache.spark.sql.functions._
display(userDF.orderBy(desc("age")))
</code></pre>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgwJsrNmosOBxhZweuHjPtRklETDn56Stgg0zcpnCF7E_MmTkE2MRiabHm5_7KuC0oF-WXGcQQg_YLNnhA2ic_483_1GE-zBE4GHdoO2fevE0Q4z7lReaAFvq3pU95wqfTbSDhZtVFuf2zM/s1600/Screen+Shot+2016-07-03+at+11.00.12+AM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgwJsrNmosOBxhZweuHjPtRklETDn56Stgg0zcpnCF7E_MmTkE2MRiabHm5_7KuC0oF-WXGcQQg_YLNnhA2ic_483_1GE-zBE4GHdoO2fevE0Q4z7lReaAFvq3pU95wqfTbSDhZtVFuf2zM/s400/Screen+Shot+2016-07-03+at+11.00.12+AM.png" width="400" height="109" /></a></div>
Now if i wanted to sort the data frame records using age in ascending order
<pre><code>
display(userDF.orderBy(asc("age")))
</code></pre>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhDLR5K6oK9vBQFgeKPFuiXVjkciFFxW5UUK9KKwb04eah3wQhd5j8mNx2lNQouRTMsAH_Sny3cOZeCAZMEAuAZbxlJz4s6qA7PemN6BeBmyU1-E5b5_q8Lq8anes8GgWyQeVrwCng4O4L9/s1600/Screen+Shot+2016-07-03+at+11.00.01+AM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhDLR5K6oK9vBQFgeKPFuiXVjkciFFxW5UUK9KKwb04eah3wQhd5j8mNx2lNQouRTMsAH_Sny3cOZeCAZMEAuAZbxlJz4s6qA7PemN6BeBmyU1-E5b5_q8Lq8anes8GgWyQeVrwCng4O4L9/s400/Screen+Shot+2016-07-03+at+11.00.01+AM.png" width="400" height="93" /></a></div>
This is sample of how to use the <code>sum()</code> function
<pre><code>
userDF.select(sum("age")).show
</code></pre>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgdrpPd-FRZj07cy-vQUQl3MrXy2fZzUVJt35FnpZxw-1c-575dOWRnGPlVOcKCBfVhd9H3WpW7_xG5zOEKOwSOJyVJ7cKHHgA7eYJYD2vaUsAxPEyn9opDE5Buz-LVoWFloOyuJpqUFjBK/s1600/Screen+Shot+2016-07-03+at+10.59.38+AM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgdrpPd-FRZj07cy-vQUQl3MrXy2fZzUVJt35FnpZxw-1c-575dOWRnGPlVOcKCBfVhd9H3WpW7_xG5zOEKOwSOJyVJ7cKHHgA7eYJYD2vaUsAxPEyn9opDE5Buz-LVoWFloOyuJpqUFjBK/s400/Screen+Shot+2016-07-03+at+10.59.38+AM.png" width="400" height="281" /></a></div>Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com1tag:blogger.com,1999:blog-2202766463251903159.post-82829210217517692492016-07-03T10:42:00.001-07:002016-07-03T16:07:42.366-07:00How to define Scala UDF in Spark<script src="https://gist.github.com/sdpatil/dc1afb05599127c92c5d5959ea83c9f5.js"></script>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEj23cOOnzRX4eqFhtwsnn1OjvB6_kkdFQ40s9-vhsHY625Y8taCsgTId4ArtuTpcaH6pP56yPipNoEpPWNiCEl0ADxMSsay8t7q2CvhZp6bNnaxBrgVsLwJ69w3X8FC0ucM-6yjVWLfzuvX/s1600/Screen+Shot+2016-07-03+at+10.41.19+AM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEj23cOOnzRX4eqFhtwsnn1OjvB6_kkdFQ40s9-vhsHY625Y8taCsgTId4ArtuTpcaH6pP56yPipNoEpPWNiCEl0ADxMSsay8t7q2CvhZp6bNnaxBrgVsLwJ69w3X8FC0ucM-6yjVWLfzuvX/s400/Screen+Shot+2016-07-03+at+10.41.19+AM.png" width="400" height="201" /></a></div>
<a hre="https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/4872615757169678/4007576048898212/5867227349852569/latest.html">Scala UDF Sample Notebook</a>Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com1tag:blogger.com,1999:blog-2202766463251903159.post-34200971251848351542016-04-19T08:24:00.001-07:002016-04-19T08:24:12.460-07:00How to access Hive table from Spark in MapR sandboxI was trying to figure out how to query a hive table from spark in <a href="https://www.mapr.com/products/mapr-sandbox-hadoop">MapR 5.1 sandbox </a>. So i started spark-shell and tried to query the <code>sample_08</code> table and i got error saying no such table exists
<pre><code>
scala> val sample08 = sqlContext.sql("select * from sample_08")
org.apache.spark.sql.AnalysisException: no such table sample_08; line 1 pos 14
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:260)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:268)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:264)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:254)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:932)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:932)
at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:930)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:741)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:19)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:24)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:26)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:28)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
at $iwC$$iwC$$iwC.<init>(<console>:32)
at $iwC$$iwC.<init>(<console>:34)
at $iwC.<init>(<console>:36)
at <init>(<console>:38)
at .<init>(<console>:42)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
</code></pre>
When i checked the <SPARK_HOME>/conf directory i noticed that hive-site.xml was missing so i searched for hive-site.xml on the cluster i found 2 hive-site.xml but the <code>/opt/mapr/hive/hive-1.2/conf/hive-site.xml</code> had <code>hive.metastore.uris</code> property pointing to <code>thrift://localhost:9083</code>, so i copied it in the hive-site.xml and restarted the shell. When i execute the same query i can see the results.
<pre><code>
scala> val sample08 = sqlContext.sql("select * from sample_08")
sample08: org.apache.spark.sql.DataFrame = [code: string, description: string, total_emp: int, salary: int]
</code></pre>Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com2tag:blogger.com,1999:blog-2202766463251903159.post-19982253561335888402016-04-18T08:43:00.000-07:002016-04-18T08:43:44.184-07:00How to use custom delimiter character while reading file in SparkI wanted to figure out how to get spark to read text file and break it based on custom delimiter instead of '\n'. These are my notes on how to do that
The Spark Input/Output is based on Mapreduce's InputFormat and OutputFormat. For example when you call <code>SparkContext.textFile()</code> it actually uses <code>TextInputFormat</code> for reading the file. Advantage of this approach is that you do everything that TextInputFormat does. For example by default when you use TextInputFormat to read file it will break the file into records based on \n character. But sometimes you might want to read the file using some other logic. Example i wanted to parse a book based on sentences instead of \n characters, so i looked into TextInputFormat code and i noticed that it takes <code>textinputformat.record.delimiter</code> configuration property that i could set with value equal to '.' and the TextInputFormat returns sentences instead of lines. This sample code shows how to do that
<script src="https://gist.github.com/sdpatil/91d709a674d096d578b1cf79dc3cfb23.js"></script>
Only change in this code is <code> sparkContext.hadoopConfiguration.set("textinputformat.record.delimiter",".")</code> that is setting up hadoop configuration property.
When i used this code to parse <a href='http://www.textfiles.com/etext/FICTION/2city10.txt'>2city10.txt</a> i noticed that it has 16104 lines of text but 6554 sentences.Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com2tag:blogger.com,1999:blog-2202766463251903159.post-51191748193448061852016-02-17T11:37:00.001-08:002016-02-17T11:37:29.948-08:00Difference between reduce() and fold() method on Spark RDDWhen you can call fold() method on the RDD it returns a different result than you normally expect, so i wanted to figure out how fold() method actually works so i built this simple application
<script src="https://gist.github.com/sdpatil/6ed116b003351f2c0709.js"></script>
First thing that i do in the application is create a simple RDD with 8 values from 1 to 8 and divide it into 3 partitions <code>sparkContext.parallelize(List(1,2,3,4,5,6,7,8),3)</code>. Then i am calling <code>input.mapPartitions(mapPartition)</code> to iterate through all the partitions in the RDD and printing records in them one by one. This shows that the RDD has 3 partitions and 1 and 2 are in first partitions 3,4,5 are in second partions and record 6,7,8 are in the third partitions.
Then next step is to call <code>input.reduce((x,y)=> add(x,y)))</code> method that will invoke <code>add</code> reduce function on the RDD, as you can see the output. The reduce function simply starts calling add method first for first 2 records then it starts calling it with running count for rest of the elements in the RDD
The last part is fold() method which i am calling with initial value of 10. As you can see from the output of fold() method, it first takes 10 as initial value and adds all the elements in single partitions to it. But then it also takes running counts across the RDDs adds 10 to it sums them up. Because of this, the result of fold() = (initial value * num of partitions +1) + sum of reduce
<pre><code>
********************** mapPartitions *******************
[Stage 0:>(0 + 0) / 3]2016-02-17 10:22:13 DEBUG HelloSparkPartitions:63 - Inside mapPartition
1
2
2016-02-17 10:22:13 DEBUG HelloSparkPartitions:63 - Inside mapPartition
3
4
5
2016-02-17 10:22:13 DEBUG HelloSparkPartitions:63 - Inside mapPartition
6
7
8
********************** reduce *******************
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 1, 2
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 3, 4
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 7, 5
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 3, 12
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 6, 7
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 13, 8
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 15, 21
input.reduce 36
********************** fold *******************
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 1
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 11, 2
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 13
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 3
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 13, 4
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 17, 5
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 23, 22
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 6
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 16, 7
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 23, 8
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 45, 31
input.fold 76
</code></pre>Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com3tag:blogger.com,1999:blog-2202766463251903159.post-22905229747225372422016-02-17T10:16:00.000-08:002016-02-17T10:16:14.317-08:00How to parse fillable PDF form in JavaI wanted to figure out how to parse a fillable PDF form in Java, so that i could do some processing on it. So i built this sample <a href='https://github.com/sdpatil/PDFParsingPOC'>PDFFormParsingPOC</a> project that uses <a href='https://pdfbox.apache.org/'>Apache PDFBox</a> library.
This is simple java class that i built, in which i read the PDF file first and then parse it into PDDocument. Then i can get all the fields in the PDF form by calling <code>PDDocument.getDocumentCatalog().getAcroForm().getFields()</code> and start iterating through it. For every field that i find, first i try to figure out what is the type of the field and then use it to print the field with its name and value to console
<script src="https://gist.github.com/sdpatil/48415dc8f8bf95d7c289.js"></script>
You can download the <a href='https://pdfbox.apache.org/'>Apache PDFBox</a> project and execute it by passing fully qualified name of the fillable PDF form and it will print out field name value pairs to console. If you dont have a pdf form already you can download <a href='http://foersom.com/net/HowTo/data/OoPdfFormExample.pdf'>Sample Fillable PDF Form </a>
Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com0tag:blogger.com,1999:blog-2202766463251903159.post-53617442452181419662016-02-12T16:56:00.001-08:002016-02-12T16:56:40.058-08:00Invoking Python from Spark Scala projectWhen your developing your Spark code, you have option of developing it using either Scala, Java or Python. In some cases you might want to mix the languages that you want to use. I wanted to try that out so i built this simple Spark program that passes control to Python for performing transformation (All that it does it append word "python " in front of every line). You can download source code for sample project from <a href='https://github.com/sdpatil/ScalaPython'>here</a>
First thing that i did was to develop this simple python script that reads one line at a time from console, appends "Python " to the line and writes it back to standard console
<script src="https://gist.github.com/sdpatil/0fc42af584b4f3e4a5c2.js"></script>
Now this is how the driver looks like, most of the spark code is same only difference is <code>lines.pipe("python echo.py")</code> which says that pass every line in the RDD to <code>python echo.py</code>. and collect the output. Now there is nothing specific to python here, instead you could use any executable.
<script src="https://gist.github.com/sdpatil/97608ce3421eb7cd96c3.js"></script>
When you run this code in cluster you should copy the python file on your machine say in spark directory then you can execute
<pre><code>
bin/spark-submit
--files echo.py
ScalaPython-1.0-SNAPSHOT-jar-with-dependencies.jar helloworld.txt
</code></pre>Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com3tag:blogger.com,1999:blog-2202766463251903159.post-59888511192425635222016-02-01T08:02:00.000-08:002016-02-01T08:02:37.339-08:00How to use HBase sink with FlumeI wanted to figure out how to use HBase as target for flume, so i created this sample configuration which reads events from netcat and writes them to HBase.
<ol>
<li>First step is to create test table in HBase with CF1 as column family. Everytime Flume gets a event it will write to HBase in test table in CF1 column family
<pre><code>
create 'test','CF1'
</code></pre>
</li>
<li>
Create Flume configuration file that looks like this, I am using HBase sink with <code>SimpleHbaseEventSerializer</code> as Event Serializer. Note that i am assuming that this is unsecured cluster (Sandbox), but if you have secured cluster you should follow steps mentioned in <a href='http://doc.mapr.com/display/MapR/Configure+a+Secure+HBase+Sink'>Configure a Secure HBase Sink</a>
<script src="https://gist.github.com/sdpatil/ca94bc9d3f5e0c81ab1a.js"></script>
</li>
<li> Start the Flume server with the following command
<pre><code>
bin/flume-ng agent --conf conf --conf-file conf/netcat-hbase.properties --name agent1 -Dflume.root.logger=DEBUG,console
</code></pre>
</li>
<li> Now open the netcat client on port 44444 and send some messages to flume
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEga_12I0PGorkaKn7icYNlV9cxT_N5eclEu3fl9S1VE7NNGuUd_vlXHJnJbQHw0eClC8TnZdzcbK60BoyRsBbtSym9JbhalFm0dqM5nOJyP4R_wiMxAXn6NRi3Oj8O5dhFfS90ntV-cPgzY/s1600/Screen+Shot+2016-02-01+at+7.51.55+AM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEga_12I0PGorkaKn7icYNlV9cxT_N5eclEu3fl9S1VE7NNGuUd_vlXHJnJbQHw0eClC8TnZdzcbK60BoyRsBbtSym9JbhalFm0dqM5nOJyP4R_wiMxAXn6NRi3Oj8O5dhFfS90ntV-cPgzY/s400/Screen+Shot+2016-02-01+at+7.51.55+AM.png" /></a></div>
</li>
<li>If you query HBase test table, you should see the messages that were published to netcat
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh8VHwIAE-ALQw72nsowQlod4lMD-sgxd8jG1qeIjMPp8EY1wVErgbh7Eo9aCfhm-_T2plXzzKf9p2LTY9cPRc4Ti5qTLEv1R5u2wQNT7WCrqTioLouT2kgRYnWct-_nC1bl9U1m8ZPcomc/s1600/Screen+Shot+2016-02-01+at+7.51.43+AM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh8VHwIAE-ALQw72nsowQlod4lMD-sgxd8jG1qeIjMPp8EY1wVErgbh7Eo9aCfhm-_T2plXzzKf9p2LTY9cPRc4Ti5qTLEv1R5u2wQNT7WCrqTioLouT2kgRYnWct-_nC1bl9U1m8ZPcomc/s400/Screen+Shot+2016-02-01+at+7.51.43+AM.png" /></a></div>
</li>
</ol>Sunil Patilhttp://www.blogger.com/profile/16075929903536310688noreply@blogger.com2