Spark program to read data from RDBMS

I wanted to figure out how to connect to RDBMS from spark and extract data, so i followed these steps. You can download this project form github
First i did create Address table in my local mysql like this

CREATE TABLE `address` (
  `addressid` int(11) NOT NULL AUTO_INCREMENT,
  `contactid` int(11) DEFAULT NULL,
  `line1` varchar(300) NOT NULL,
  `city` varchar(50) NOT NULL,
  `state` varchar(50) NOT NULL,
  `zip` varchar(50) NOT NULL,
  `lastmodified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`addressid`),
  KEY `contactid` (`contactid`),
  CONSTRAINT `address_ibfk_1` FOREIGN KEY (`contactid`) REFERENCES `CONTACT` (`contactid`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
Then i did add 5 sample records to the address table. When i query address table on my local this is what i get
After that i did create a Spark Scala project that has mysql-connector-java as one of the dependencies The last step was to create a simple Spark program like this, My program has 4 main sections
  1. First is Address as case class with same schema as that of Address table, without lastmodified field
  2. Next is this call to create object of JdbcRDD that says query everything from address with addressid between 1 and 5. new JdbcRDD(sparkContext, getConnection, "select * from address limit ?,?", 0, 5, 1, convertToAddress)
  3. Then i did define getConnection() method that creates JDBC connection to my database and returns it
  4. Last is the convertToAddress() method that knows how to take a ResultSet and convert it into object of Address
When i run this program in IDE this is the output i get

How to implement cache (LRU Cache) using LinkedHashMap in java

Recently i wanted to implement a simple Least recently used (LRU) cache in one my applications. But my use case is simple enough that instead of going for something ehcache i decided to build it on own by using java.util.LinkedHashMap
As you can see from the code its very simple. All you have to do is extend java.util.LinkedHashMap and override its protected removeEldestEntry() method so that it checks if the size of map is greater than a size you specified while creating the Map if yes remove the eldest entry
Now the question is when Map is full which entry will it remove, you have 2 options
  1. Eldest: If you just want to remove the first entry that you inserted in the Map when adding a new entry then in your constructor you could use super(cacheSize, 0.75f);, so LinkedHashMap wont keep track of when a particular entry were accessed.
  2. Least recently used (LRU): But if you want to make sure that the entry that was least recently used should be removed then call super(cacheSize, 0.75f, true); from constructor of your LRUCache so that LinkedHashMap keeps track of when entry was accessed and removes the Least recently used entry

Spark Streaming Kafka 10 API Word Count application Scala

In Spark Kafka Streaming Java program Word Count using Kafka 0.10 API blog entry i talked about how you create a simple java program that uses Spark Streaming's Kafka10 API using Java. This blog entry does the same thing but using Scala. You can download the complete application from github
You can run this sample by first downloading Kafka 0.10.* from Apache Kafka WebSite, then you can create and start a test topic and send messages to it by following this Kafka Quick start document

Spark Kafka Streaming Java program Word Count using Kafka 0.10 API

Kafka API went through a lot of changes starting Kafka 0.9. Spark Kafka Streaming API also was changed to better support Kafka 0.9. i wanted to try that out so i built this simple Word Count application using Kafka 0.10 API. This blog entry does the same thing but using Scala. You can download the complete application from github
You can run this sample by first downloading Kafka 0.10.* from Apache Kafka WebSite, then you can create and start a test topic and send messages to it by following this Kafka Quick start document First thing i did was to include Kafka 0.10 API dependencies for the Spark Project. As you can see i am using Spark 2.1 version Then i did create a SparkKafka10.java file that looks like this. Please take a look at comments inside the code for what i am doing. Now if you create test topic and send messages to it, you should be able to see the wordcount on console

How to use ElasticSearch as storage from Hive in cloudera

.In the Using ElasticSearch as external data store with apache hive entry i talked about how you can create a table in Hive so that actual data is stored in ElasticSearch. Problem with that approach was that i had to pass the full path to elasticsearch-hadoop-hive-<eshadoopversion>.jar as parameter every time.

hive -hiveconf hive.aux.jars.path=/opt/elastic/elasticsearch-hadoop-2.4.3/dist/elasticsearch-hadoop-hive-2.4.3.jar;
Other option for doing same thing is to open hive session and then calling following command as first thing

ADD JAR /opt/elastic/elasticsearch-hadoop-2.4.3/dist/elasticsearch-hadoop-hive-2.4.3.jar;
Problem with both these approaches is that you will have to keep letting hive know the full path to elasticsearch jars every single time. Instead you can take care of this issue by copying elasticsearch-hadoop-hive-<eshadoopversion>.jar into same directory on every node in your local machine. In my case i copied it to /usr/lib/hive/lib directory by executing following command

sudo cp /opt/elastic/elasticsearch-hadoop-2.4.3/dist/elasticsearch-hadoop-hive-2.4.3.jar /usr/lib/hive/lib/.
Then set the value of Hive Auxiliary JARs Directory hive.aux.jars.path property to /usr/lib/hive/lib directory like this.
Then restart the hive service and now you should be able to access any elastic search backed table without adding the elasticsearch hadoop jar explicitly

Installing ElasticSearch on existing docker container

I was using a Cloudera Quickstart docker image for one experiment and wanted to install ElasticSearch on it but i had trouble in accessing from my host, but i found workaround by following these steps
  • First i installed ElasticSearch by downloading and unzipping ElasticSearch version 2.4.3 and unzipping it in /opt/elastic folder
  • Then i started elasticsearch by executing /bin/elasticsearch, and it started ok. When i ran curl -XGET "http://localhost:9200/ from inside docker images i was able to access ElasticSearch, but when i tried to access it from my host machine, i could not access it. But where as i was able to access other services running on my docker image. So first i tried running netstat on my docker image to see why i was able to access other services but not elasticsearch (Also to make sure if elasticsearch was actually listening on port 9200 and i got output like this
  • Looking at the port mapping i could see that other services were mapped to 0.0.0.0 but elasticsearch was only mapped to 127.0.0.1, so i opene <ELASTICSEARCH_HOME>/config/elasticsearch.yml and added two more lines to it like this
    
    http.host: 0.0.0.0
    transport.host: 127.0.0.1
    
  • Then i restarted elasticsearch server and i checked the netstat again and i could see this mapping, and when i tried accessing elasticsearch from host machine it worked

Sending and Receiving JSON messages in Kafka

Sometime back i wrote couple of articles for Java World about Kafka Big data messaging with Kafka, Part 1 and Big data messaging with Kafka, Part 2, you can find basic Producer and Consumer for Kafka along with some basic samples.
I wanted to figure out how do i pass JSON message using Kafka. It looks like Kafak Connect provides a simple JSON Serializer org.apache.kafka.connect.json.JsonSerializer and Desrializer org.apache.kafka.connect.json.JsonDeserializer that uses Jackson JSON parser. I wanted to figure out how to use it, so i built following sample
  • First i did create a Contact Object, which is a simple Pojo that has 3 fields contactId, firstName and lastName. Take a look at main() method, in which i create simple object of Contact and then convert it to JSON and write to console.
  • Next i created Producer.java, which reads values in CSV format like 1,Sunil,Patil from command line and parse it to Contact object first. Then i convert Contact object into JSONNode and pass it as value to Kafka, The JSONSerializer converts the JsonNode into byte[]
    The producer code is mostly same as one required for passing String, with difference that on line 35, i am creating object of com.fasterxml.jackson.databind.ObjectMapper and then on line 41 i am converting Contact object into JSONNode by calling objectMapper.valueToTree(contact)
  • Since i am using org.apache.kafka.connect.json.JsonSerializer on the producer i have to use org.apache.kafka.connect.json.JsonDeserializer on the Consumer, Then while creating KafkaConsumer object i declare that i will get String key and JSONNode as value. Then once i get messages from Kafka i am calling mapper.treeToValue(jsonNode,Contact.class) to read the message and convert it back to Contact object.
Now you can run the producer and consumer with same topic name and it should work

Importing data from RDBMS into Hive using Sqoop and oozie (hive-import)

In the How to run Sqoop command from oozie entry i talked about how you can use Oozie and Sqoop to import data into HDFS. I wanted to change it to use sqoop's hive-import option, which in addition to importing data into HDFS also creats Hive table on top of the data. These are the steps that i followed
  • First i changed the workflow.xml to take out as-avrodatafile and added hive-import option and i re-ran the workflow that looks like this When i did that the oozie workflow failed with following error
    
    7936 [uber-SubtaskRunner] WARN  org.apache.sqoop.mapreduce.JobBase  - SQOOP_HOME is unset. May not be able to find all job dependencies.
    9202 [uber-SubtaskRunner] DEBUG org.apache.sqoop.mapreduce.db.DBConfiguration  - Fetching password from job credentials store
    9207 [uber-SubtaskRunner] INFO  org.apache.sqoop.mapreduce.db.DBInputFormat  - Using read commited transaction isolation
    9210 [uber-SubtaskRunner] DEBUG org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat  - Creating input split with lower bound '1=1' and upper bound '1=1'
    25643 [uber-SubtaskRunner] INFO  org.apache.sqoop.mapreduce.ImportJobBase  - Transferred 931.1768 KB in 17.6994 seconds (52.6107 KB/sec)
    25649 [uber-SubtaskRunner] INFO  org.apache.sqoop.mapreduce.ImportJobBase  - Retrieved 12435 records.
    25649 [uber-SubtaskRunner] DEBUG org.apache.sqoop.hive.HiveImport  - Hive.inputTable: customers
    25650 [uber-SubtaskRunner] DEBUG org.apache.sqoop.hive.HiveImport  - Hive.outputTable: customers
    25653 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Execute getColumnInfoRawQuery : SELECT t.* FROM `customers` AS t LIMIT 1
    25653 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - No connection paramenters specified. Using regular API for making connection.
    25658 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Using fetchSize for next query: -2147483648
    25658 [uber-SubtaskRunner] INFO  org.apache.sqoop.manager.SqlManager  - Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
    25659 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Found column customer_id of type [4, 11, 0]
    25659 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Found column customer_fname of type [12, 45, 0]
    25659 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Found column customer_lname of type [12, 45, 0]
    25660 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Found column customer_email of type [12, 45, 0]
    25660 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Found column customer_password of type [12, 45, 0]
    25660 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Found column customer_street of type [12, 255, 0]
    25660 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Found column customer_city of type [12, 45, 0]
    25660 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Found column customer_state of type [12, 45, 0]
    25660 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Found column customer_zipcode of type [12, 45, 0]
    25663 [uber-SubtaskRunner] DEBUG org.apache.sqoop.hive.TableDefWriter  - Create statement: CREATE TABLE IF NOT EXISTS `customers` ( `customer_id` INT, `customer_fname` STRING, `customer_lname` STRING, `customer_email` STRING, `customer_password` STRING, `customer_street` STRING, `customer_city` STRING, `customer_state` STRING, `customer_zipcode` STRING) COMMENT 'Imported by sqoop on 2016/12/22 21:18:39' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\012' STORED AS TEXTFILE
    25664 [uber-SubtaskRunner] DEBUG org.apache.sqoop.hive.TableDefWriter  - Load statement: LOAD DATA INPATH 'hdfs://quickstart.cloudera:8020/user/cloudera/customers' INTO TABLE `customers`
    25667 [uber-SubtaskRunner] INFO  org.apache.sqoop.hive.HiveImport  - Loading uploaded data into Hive
    25680 [uber-SubtaskRunner] DEBUG org.apache.sqoop.hive.HiveImport  - Using in-process Hive instance.
    25683 [uber-SubtaskRunner] DEBUG org.apache.sqoop.util.SubprocessSecurityManager  - Installing subprocess security manager
    Intercepting System.exit(1)
    
    <<< Invocation of Main class completed <<<
    
    Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SqoopMain], exit code [1]
    
    Oozie Launcher failed, finishing Hadoop job gracefully
    
    Oozie Launcher, uploading action data to HDFS sequence file: hdfs://quickstart.cloudera:8020/user/cloudera/oozie-oozi/0000007-161222163830473-oozie-oozi-W/sqoop-52c0--sqoop/action-data.seq
    
    Oozie Launcher ends
    
    
  • As you can see from the log the Sqoop job was able to import data into HDFS in /user/cloudera/customers directory and i could actually see the data in the directory. But when Sqoop tried to create the table in hive it failed and the table did not get created in hive, this is the log statement that i am referring to CREATE TABLE IF NOT EXISTS `customers` ( `customer_id` INT, `customer_fname` STRING, `customer_lname` STRING, `customer_email` STRING, `customer_password` STRING, `customer_street` STRING, `customer_city` STRING, `customer_state` STRING, `customer_zipcode` STRING) COMMENT 'Imported by sqoop on 2016/12/22 21:18:39' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\012' STORED AS TEXTFILE
  • So it seems the problem is Sqoop needs hive-site.xml so that it knows how to talk to hive service, for that first i search my sandbox to figure out where hive-site.xml is located, i executed following command to first find the hive-site.xml and then uploading it to HDFS sudo find / -name hive-site.xml hdfs dfs -put /etc/hive/conf.dist/hive-site.xml
  • After that i went back to the workflow.xml and modified it to look like this
Now when i ran the oozie workflow it was successful and i could query customer data

How to run Sqoop command from oozie

In the Importing data from Sqoop into Hive External Table with Avro encoding updated i blogged about how you can use sqoop to import data from RDBMS into Hadoop. I wanted to test if i can use Oozie for invoking Sqoop command and i followed these steps for doing that.
  1. First i tried executing this command from my command line on Hadoop cluster to make sure that i can actually run sqoop without any problem
    
    sqoop import --connect jdbc:mysql://localhost/test 
    --username root 
    --password cloudera 
    --table CUSTOMER 
    --as-avrodatafile
    
  2. Once the sqoop command was successfully executed i went back and deleted the CUSTOMER directory from HDFS to make sure that i could re-import data using following command
    
    hdfs dfs -rm -R CUSTOMER
    
  3. Next i went to Hue to create oozie workflow with single sqoop command that i had executed before
    But if your not using the Hue console you can create workflow.xml manually like this Also make sure to create job.properties file like this Take a look at Enabling Oozie console on Cloudera VM 4.4.0 and executing examples for information on how to run oozie job from command line
  4. Next when i ran the Oozie workflow, the job failed with following error, which indicates that Oozie does not have the MySQL JDBC driver.
    
    java.lang.RuntimeException: Could not load db driver class: com.mysql.jdbc.Driver
     at org.apache.sqoop.manager.SqlManager.makeConnection(SqlManager.java:875)
     at org.apache.sqoop.manager.GenericJdbcManager.getConnection(GenericJdbcManager.java:52)
     at org.apache.sqoop.manager.SqlManager.execute(SqlManager.java:763)
     at org.apache.sqoop.manager.SqlManager.execute(SqlManager.java:786)
     at org.apache.sqoop.manager.SqlManager.getColumnInfoForRawQuery(SqlManager.java:289)
     at org.apache.sqoop.manager.SqlManager.getColumnTypesForRawQuery(SqlManager.java:260)
     at org.apache.sqoop.manager.SqlManager.getColumnTypes(SqlManager.java:246)
     at org.apache.sqoop.manager.ConnManager.getColumnTypes(ConnManager.java:327)
     at org.apache.sqoop.orm.ClassWriter.getColumnTypes(ClassWriter.java:1846)
     at org.apache.sqoop.orm.ClassWriter.generate(ClassWriter.java:1646)
     at org.apache.sqoop.tool.CodeGenTool.generateORM(CodeGenTool.java:107)
     at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:478)
     at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:605)
     at org.apache.sqoop.Sqoop.run(Sqoop.java:143)
     at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
     at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:179)
     at org.apache.sqoop.Sqoop.runTool(Sqoop.java:218)
     at org.apache.sqoop.Sqoop.runTool(Sqoop.java:227)
     at org.apache.sqoop.Sqoop.main(Sqoop.java:236)
     at org.apache.oozie.action.hadoop.SqoopMain.runSqoopJob(SqoopMain.java:197)
     at org.apache.oozie.action.hadoop.SqoopMain.run(SqoopMain.java:177)
     at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:49)
    
  5. So first thing i did was to check if mysql driver is there in the oozie shared lib by executing following commands
    
    export OOZIE_URL=http://localhost:11000/oozie
    oozie admin -shareliblist sqoop
    
    I noticed that the mysql-connector-java.jar was not there in the list of shared libs for Oozie + sqoop
  6. Next step was to find the mysql-connector-java.jar in my sandbox that i could do by finding it like this
    
    sudo find / -name mysql*
    
    I found mysql-connector-java.jar on my local machine at /var/lib/sqoop/mysql-connector-java.jar
  7. I wanted to update the Oozie shared lib to include the mysql driver jar. So i executed following command to figure out the directory where the oozie sqoop shared lib is
    
    oozie admin -sharelibupdate
    
    From this output i got HDFS directory location for Oozie shared lib which is /user/oozie/share/lib/lib_20160406022812
  8. Then i used following two commands to first copy the db driver into the oozie shared lib and making sure it is accessible to other users hdfs -copyFromLocal /var/lib/sqoop/mysql-connector-java.jar /user/oozie/share/lib/sqoop/. hdfs dfs -chmod 777 /user/oozie/share/lib/sqoop/mysql-connector-java.jar
  9. Now the last step was to let Oozie know that it should reload the sharedlib and i did that by executing following two commands
    
    oozie admin -sharedlibupdate
    oozie admin -shareliblist sqoop | grep mysql*
    
    The second command queries oozie to get current list of shared jars and i could see mysql-connector-java.jar listed in it like this
When i re-executed the ooize job again this time it ran successfully.

Importing data from Sqoop into Hive External Table with Avro encoding updated

In the Importing data from Sqoop into Hive External Table with Avro encoding i had details on how you can import a table from RDBMS into Hive using Sqoop in Avro format. In that blog i went through few steps to get the avsc file, but i realized there is easier way to do it following these steps
  1. First execute the sqoop import command like this, make sure that you pass --outdir schema as parameters to the sqoop import command, what that does is it generates the CUSTOMER.avsc and CUSTOMER.java in the schema directory on your local machine
    
    sqoop import --connect jdbc:mysql://localhost/test 
    --username root 
    --password cloudera 
    --table CUSTOMER 
    --as-avrodatafile 
    --outdir schema
    
  2. You can verify that CUSTOMER.avsc file got created as you expected by executing ls -ltrA schema
  3. Next create schema directory in HDFS by executing hdfs mkdir command like this
    
    hdfs dfs -mkdir /user/cloudera/schema
    
  4. Copy the CUSTOMER.avsc from your local schema directory to HDFS in schema directory by executing following command
    
    hdfs dfs -copyFromLocal schema/CUSTOMER.avsc /user/cloudera/schema/.
    
  5. Last step is to create Hive table with CUSTOMER.avsc as schema using following command
    
    CREATE EXTERNAL TABLE CUSTOMER
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
    LOCATION '/user/cloudera/CUSTOMER'
    TBLPROPERTIES ('avro.schema.url'='/user/cloudera/schema/CUSTOMER.avsc');
    
Now if you go to hive and execute "SELECT * FROM CUSTOMER;" query then you should see 1 record in it like this