How to view the log files and job.xml generated by Hive on HortonWorks Sandbox

I was working on building some hive code and i ran into some problems, My query kept failing with message like invalid character in job.xml. I tried to locate the job.xml but could not find it. Same thing i could not find any logs on the local machine. So i wanted to figure out how to debug this problem. I followed these steps.
  1. First thing i did in HDP 2.1 sandbox is to enable Ambari by clicking on Enable button like this
  2. Next step was to login into Ambari by going to http://localhost:8080/ and then entering admin/admin as username and password
  3. In the Ambari Application i went to the Yarn Service screen and i did uncheck Resource Manager -<yarn.log-aggregation-enable checkbox. It disables the log aggregation which copies the log files from file system to HDFS and zips them. Disabling the log aggregation keeps them on the local filesystem.
  4. Then in the Advanced section i did change value of yarn.nodemanager.debug-delay-sec to 60000 seconds, By default value of this property is 0, which means as soon as the job is done nodemanager deletes all the intermediate files. By setting it to 60000 i am preventing it from getting deleted for 100 seconds.
  5. THen i did restart all the services using Ambari for my changes to take affect
  6. After that i did execute couple of hive queries and now i could see my logs in /hadoop/yarn/logs directory like this
  7. And i could see the job.xml files being conserved in /hadoop/yarn/local/usercache/root/appcache folder for application like this

How to update records in Hive Table that uses ElasticSearch for storage

I had a requirement in which i wanted to update the Hive table. Now Hive is more of append only database and you cant update records in it (That limitation comes from Text files stored in HDFS which is how stores data by default). But if your using Hive with elasticSearch as storage then you can get this to work. When your using ElasticSearch as storage mechanism then every call from hive to insert or delete data gets forwarded to ElasticSearch API, and ElasticSearch has ability to update existing records. I used this to implement the updatable Hive table. So the scenario is lets assume you have a elasticsearch Index that stores First Name, Last Name and Email as document in ElasticSearch. For that create a new index in ES with name equals cricketers and type equals player by making a CURL call like this.

curl -XPOST "http://localhost:9200/cricketers/player/1" -d'
{
    id: "1",
    fname: "First",
    lname: "Cricketer",
    email: "first.cricketer@gmail.com"
}'
This call will first create a Index named cricketers in ES and insert one document in it, with id equals 1. Next step is to define a external table in Hive that uses org.elasticsearch.hadoop.hive.EsStorageHandler as StorageHandler and is pointing to cricketers/player index that you created in last step. Also important setting is 'es.mapping.id'='id' which is saying that use value of id column as primary key/id in elasticsearch.

create external table cricketers_es(id String, fname String, lname String, email String) stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler' 
TBLPROPERTIES('es.resource'='cricketers/player', 'es.index.auto.create'='false', 'es.mapping.id'='id')
Once the table is created you can check records in it by executing select * from cricketers_es command. Now you should see 1 record that is there in the Index.
Since hive does not have concept of update statement. You will have to create a hive table that will have the records that you want to insert/update(Only delta) and then you will use this delta table for updating the cricketers_es table. In order to do that first create a text file that holds delta of the records that you want to update. In my case i did create this simple cricketers.txt file like this and upload into to HDFS at /user/hue folder

1,sachin,tendulkar,sachin.tendulakar@bcci.com
2,Rahul,Dravid,rahul.dravid@bcci.com
After that create a Hive table called cricketers_stage which will be used for holding the delta records you want by executing following statement

create table cricketers_stage(id String, fname String, lname String, email String) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
Now use following Hive statement to load your delta records into cricketers_stage like this.

LOAD DATA INPATH '/user/hue/cricketers.txt' INTO TABLE `default.cricketers_stage`
Next step ElasticSearch write mode to upsert by setting following property in the Hive console.

es.write.operation=upsert
The last step is to execute following statement in Hive which will take content of cricketers_stage and insert those records into cricketers_es table.

insert into cricketers_es select * from cricketers_stage
Now if you run select * from cricketers_es you should see 2 records your first record is updated and record with id 2 is new insert.

1,sachin,tendulkar,sachin.tendulakar@gmail.com
2,Rahul,Dravid,rahul.dravid@bcci.com
You can also verify the records in elasticsearch by executing following CURL command

curl -XPOST "http://localhost:9200/cricketers/player/_search?pretty=true" -d'
{
  "query": {
      "match_all": {}
  }
}'

Connecting to HDP 2.2 Hive using JDBC driver

In the http://wpcertification.blogspot.com/2014/04/connecting-to-hive-using-jdbc-client.html i blogged about how to connect to Hive using Apache Hive. I wanted to figure out how to connect to Hive 0.13 in Hortonworks Sandbox 2.2, so i followed these steps You can download the complete source code for this project from here First i did create App.java class like this. When connecting to HDP 2.2 i had to use HiveServer2. I followed instructions on HDP Documents I used following maven build file, you can see i am including Hive, Hadoop and Log4j jar. The log4j.jar lets me enable logging Once my code was complete i tried connecting to Hive and got following error around access control.

java.sql.SQLException: Error while compiling statement: FAILED: HiveAccessControlException Permission denied. 
Principal [name=root, type=USER] does not have following privileges on Object [type=TABLE_OR_VIEW, name=default.sample_07] : [SELECT]
 at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:121)
 at org.apache.hive.jdbc.Utils.verifySuccessWithInfo(Utils.java:109)
 at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:231)
 at org.apache.hive.jdbc.HiveStatement.executeQuery(HiveStatement.java:355)
 at com.spnotes.hive.App.main(App.java:24)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:601)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)

I had to use following command in hive console to give access to root user for querying employee table.

hive> grant select on table sample_08 to user employee;

Fixing "WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!" problem

I use HortonWorks Sandbox for learning/trying out Hadoop features. Now the way Hortonworks Sandbox works is your supposed to download the VM and then ssh into it using ssh root@127.0.0.1 -p 2222

for connecting to it or playing with it. Initially i had HDP 2.1 sandbox but when i downloaded HDP 2.2 and tried to connect to it i got WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED! error. So basically both HDP 2.1 and HDP 2.2 has a different SSH key but your trying to refer them using same IP 127.0.0.1 and your machine says hey something is wrong your ip is same but SSH key does not match
In order to solve this problem, all i did was to open /Users/test/.ssh/known_hosts file in text editor and then i searched for 127.0.0.1 and i removed the line that started with this ip and saved my known_hosts file
Now when i rerun the ssh root@127.0.0.1 -p 2222

command i am able to connect to the HDP 2.2 image ok.

How to use Apache Storm Trident API

Sometime back i blogged about HelloWorld - Apache Storm Word Counter program , which demonstrates how to build WordCount program using Apache Storm. Now problem with that project was that it was not Maven project instead i had screen shot of all the jars that you will have to include in the program. So i changed it to use Apache Maven as build framework. You can download the source code. In addition to normal API, storm also provides trident API, which allows us to build much compact code, i wanted to try that out so i built this simple Word Count program using Trident API. While using Trident API you will have to start by creating object of TridentTopology, you still need LineReaderSpout that takes file path as input, reads and emits one line of file at a time. But the part that is different is you dont need WordSpitterBolt and WordCounterBolt, instead you can use compact code like this

 topology.newStream("spout",lineReaderSpout)
.each(new Fields("line"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.aggregate(new Fields("word"), new Count(), new Fields("count"))
.each(new Fields("word","count"), new Debug());
The each(new Fields("line"), new Split(), new Fields("word")) line takes the line emitted by the LineReaderSpout and uses built in storm.trident.operation.builtin.Split function to split the lines into words and emits each word as Tuple. The groupBy(new Fields("word")) line takes the tuples and groups them by word's. The aggregate(new Fields("word"), new Count(), new Fields("count")) line takes care of aggregating the words and counts them(At this point you have a tuple like {word,count}), for that it uses storm.trident.operation.builtin.Count class. The last part is .each(new Fields("word","count"), new Debug());, which takes care of printing each tuple which in WORD count format. Trident API provides set of sample classes that makes developing WordCount type of program very easy. But you could have created your own version of Split and Count program and the code would still look significantly compact

How to use ElasticSearch as input for Apache Spark program

In the How to use ElasticSearch as input for MapReduce program entry i blogged about how to create a MapReduce program that reads data from ElasticSearch Index or query as input and uses it to produce some output. I wanted to build same functionality using Apache Spark, you can download the source code for the project from here Basic idea in the program is that i have a hadoop/contact index/type that contains contact records that look like this, with every contact having first name, last name and address. I want to write a program that tells me how many contacts are from particular city.

{
   "lastName":"Tendulkar",
   "address":[
      {
         "country":"India\t",
         "addressLine1":"1 Main Street",
         "city":"Mumbai"
      }
   ],
   "firstName":"Sachin",
   "dateOfBirth":"1973-04-24"
}
In order to do that i build a simple HelloESInputSpark.java class that looks like this (I did add it to my WordCount Apache Spark project that i built in WordCount program built using Apache Spark in Java ) This program is similar to any other with difference of few lines, i had to create a Hadoop Configuration object and set properties required to use ESInputFormat as InputFormat and then call sc.newAPIHadoopRDD(} to pass the newly created Hadoop Configuration object to it.

Configuration hadoopConfiguration = new Configuration();
hadoopConfiguration.set("es.nodes","localhost:9200");
hadoopConfiguration.set("es.resource","hadoop/contact");
JavaPairRDD esRDD = sc.newAPIHadoopRDD(hadoopConfiguration, EsInputFormat.class, Text.class, MapWritable.class);
The Spark framework reads the ElasticSearch index as Map in which the id of the record is key and the actual record becomes value and get passed as object of MapWritable. You have to use little bit different plumbing to read embedded values stored inside the record I configured Spark to store the output on the disk it created different part files and you can see the content of the part files like this

WordCount program built using Apache Spark in Java

In the WordCount program writtten using Spark framework written in python language entry i talked about how to create WordCount program using Apache Spark in python language. Now i wanted to try building same program using Java, so i followed these steps. You can download the full code from here This is how the Java class looks like, it has 4 anonymous classes
  1. file.flatMap(new FlatMapFunction() {}): This class gets called for every line in the input file, it splits the input line and return list of words in the line
  2. words.filter(new Function() {}): This class gets the list of words and checks if the word is actually a word or just a blank space. My input file has some blank spaces so i used this filter (Part of the reason was to try out Filters) to remove the words which are just blank spaces
  3. words.mapToPair(new PairFunction() {}): This class takes list of words as input and converts into Tuple2 with word as key and 1 as value
  4. wordToCountMap.reduceByKey(new Function2() {}): This class gets called with running total and count as input and only thing that is does is adds the current count to running total
This is how the maven build script for my program looks like, i had to fight through couple of issues to build this maven, first one is by default when you add org.apache.spark.spark-core_2.10 dependency it includes HDFS 1.0.4 jars, and in my case i have Hadoop 2.4 server so i got following error, you get this error even if you include HDFS related jars because now suddenly you have both Hadoop 2.4 and Hadoop 1.0.4 jars so you have to ask maven to exclude hadoop client included by org.apache.spark.spark-core_2.10

14/08/25 12:16:50 WARN snappy.LoadSnappy: Snappy native library not loaded
Exception in thread "main" org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4
 at org.apache.hadoop.ipc.Client.call(Client.java:1070)
 at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
 at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source)
 at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
 at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
 at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
 at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:238)
 at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:203)
 at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
 at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
 at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
 at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
 at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.FilteredRDD.getPartitions(FilteredRDD.scala:29)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:59)
 at org.apache.spark.api.java.JavaPairRDD.reduceByKey(JavaPairRDD.scala:385)
 at com.spnotes.spark.WordCount2.main(WordCount2.java:62)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
 
Once that part is done, i ran into issue because of 2 different versions of Jetty getting included Exception in thread "main" java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package so i had to add Jetty related jars exclusion filter in org.apache.hadoop.hadoop-common

How to use ElasticSearch as input for MapReduce program

In the Saving complex object in elasticsearch as output of MapReduce program entry, i talked about how to use ElasticSearch for storing output of the MapReduce job. In that blog i was creating Contact records that look like this in elasticSearch

{
   "lastName":"Tendulkar",
   "address":[
      {
         "country":"India\t",
         "addressLine1":"1 Main Street",
         "city":"Mumbai"
      }
   ],
   "firstName":"Sachin",
   "dateOfBirth":"1973-04-24"
}
I wanted to figure out how to use ElasticSearch as input for MapReduce program, so i decided to create a MapReduce program that reads the contact Index and generates output on how many players are coming from a city. You can download the sample program from here This is how my MapReduce program looks like, you can run the driver program with 2 arguments ex. hadoop/contact file:///home/user/output/ first is name of the ElasticSearch Index/type and second is the output directory where the output will get written. This program has 3 main components
  1. MRInputDriver: In the Driver program you have to set es.nodes entry pointing to address of your elasticsearch installation and value of es.resource is name of the ElasticSearch index/type name. Then i am setting job.setInputFormatClass(EsInputFormat.class);, which sets EsInputFormat class as the input reader, it takes care of reading the records from ElasticSearch
  2. MRInputMapper: The Mapper class sets Object as value of both Key and Value type. ElasticSearch Hadoop framework reads the record from ElasticSearch and passes id as key(Text) and the content of value is object of MapWritable class that represents the record stored in elasticsearch. Once i have the value, i am reading address from it and mapper writes City name as key and value 1.
  3. MRInputReducer: The reducer is pretty simple it gets called with name of the city as key and Iterable of values, this is very similar to reducer in WordCount.
After running the program i could see output being generated like this

Bangalore 2
Delhi 1
Mumbai 1
Ranchi 1

Saving complex object in elasticsearch as output of MapReduce program

In the Using ElasticSearch to store output of MapReduce program i built a sample MapReduce program that writes output to the ElasticSearch. I wanted to figure out how to write output of MapReduce job into elasticseach so that it creates complex JSON structure which has embedded object and uses date type. I decided to create this sample program which reads .csv file and generates contact records in elasticsearch, which look like this . You can download sample code for this project from here I followed these steps for creating the program.
  1. First thing i did was create hadoop/contact index in ElasticSearch by using following mapping. You can create the index by making HTTP POST call to http://localhost:9200/hadoop
  2. Once the index was created i created a Contact.java object which looks like this As you can see this is a simple POJO which has list of Address objects which is another pojo. It also uses CustomDateSerializer as custom date serializer
  3. Next step was to create CustomDateSerializer.java which is a class that has both the Mapper and the MapReducer driver class.
    • ContactImportMapper: This is a mapper class that gets one line from Contact.txt at a time and it splits it based on comma and then uses the values to create Contact.java object. In the mapper class on line 67, i am converting the Contact object into JSON using Jackson parser and then setting the JSON as value of Text. Then on line 70, i am writing NullWriter as key and Contact JSON as value, which gets inserted into ES
    • ContactImportDriver: This class is same as any other MapReduce driver with few exceptions, On line 101 i am calling job.getConfiguration().set("es.input.json", "yes");, which tells ElasticSearch output to treat the output as JSON. Then on line 109, i am setting OutputFormatter to EsOutputFormat so that ElasticSearch Hadoop framework can take control of persisting output. job.setOutputFormatClass(EsOutputFormat.class);
  4. Once the program was built i am passing 2 parameters to program first is the full path of contact.txt file and second argument is the ElasticSearch index and type in hadoop/contact format. This is how the content of contact.txt file looks like
    
    Sachin,Tendulkar,24-Apr-1973,1 Main Street,Mumbai,India 
    Rahul,Dravid,01-Jan-1973,1 Main Street,Bangalore,India
    Anil,Kumble,17-Oct-1970,1 Main Street,Bangalore,India
    Virat,Kohali,05-Nov-1988,1 Main Street,Delhi,India
    Mahendra Singh,Dhoni,07-Jul-1981,1 Main Street,Ranchi,India
    Saurav Ganguli,08-Jul-1982,1 Main Street,Kolkata,India
    
After executing the code i did execute following query to find out all the players, with date of birth between 1970-1974 like this This is how my sample output looks like

How to create custom Combiner class with your MapReduce framework

The MapReduce framework passes control to your combiner class at the end of the map phase to combine different output files generated by Mappers, so that your combiner class combines/reduce the data generated by Mappers before it gets transferred to the Reducers. Sending data from Mapper to reducer requires that data to go over network from Mapper to Reducer. I wanted to try creating custom combiner class, In order to keep things simple i decided to add combiner class in WordCount(HelloWorld) MapReduce program . Basically my combiner class does same thing as reducer, which is to take multiple [word, 1] tuples and combine them into something like [word1, 5], [word2, 6],,, etc. I followed these steps
  1. First thing that i did was to create WordCountCombiner.java class that looks same as that of WordCountReducer, but i did add one System.Out.println() in it so that i would know when my combiner is called instead of reducer.
  2. Then i changed the driver class for my MapReduce framework class to add job.setCombinerClass(WordCountCombiner.class); line in it.
Then i did execute the WordCountDriver class with 3 files as input and i could see my Combiner class getting called after the Mapper class for each input file, before it wrote the mapper output to the disk and before starting reducer phase.