Showing posts with label hive. Show all posts
Showing posts with label hive. Show all posts

How to use ElasticSearch as storage from Hive in cloudera

.In the Using ElasticSearch as external data store with apache hive entry i talked about how you can create a table in Hive so that actual data is stored in ElasticSearch. Problem with that approach was that i had to pass the full path to elasticsearch-hadoop-hive-<eshadoopversion>.jar as parameter every time.

hive -hiveconf hive.aux.jars.path=/opt/elastic/elasticsearch-hadoop-2.4.3/dist/elasticsearch-hadoop-hive-2.4.3.jar;
Other option for doing same thing is to open hive session and then calling following command as first thing

ADD JAR /opt/elastic/elasticsearch-hadoop-2.4.3/dist/elasticsearch-hadoop-hive-2.4.3.jar;
Problem with both these approaches is that you will have to keep letting hive know the full path to elasticsearch jars every single time. Instead you can take care of this issue by copying elasticsearch-hadoop-hive-<eshadoopversion>.jar into same directory on every node in your local machine. In my case i copied it to /usr/lib/hive/lib directory by executing following command

sudo cp /opt/elastic/elasticsearch-hadoop-2.4.3/dist/elasticsearch-hadoop-hive-2.4.3.jar /usr/lib/hive/lib/.
Then set the value of Hive Auxiliary JARs Directory hive.aux.jars.path property to /usr/lib/hive/lib directory like this.
Then restart the hive service and now you should be able to access any elastic search backed table without adding the elasticsearch hadoop jar explicitly

Importing data from RDBMS into Hive using Sqoop and oozie (hive-import)

In the How to run Sqoop command from oozie entry i talked about how you can use Oozie and Sqoop to import data into HDFS. I wanted to change it to use sqoop's hive-import option, which in addition to importing data into HDFS also creats Hive table on top of the data. These are the steps that i followed
  • First i changed the workflow.xml to take out as-avrodatafile and added hive-import option and i re-ran the workflow that looks like this When i did that the oozie workflow failed with following error
    
    7936 [uber-SubtaskRunner] WARN  org.apache.sqoop.mapreduce.JobBase  - SQOOP_HOME is unset. May not be able to find all job dependencies.
    9202 [uber-SubtaskRunner] DEBUG org.apache.sqoop.mapreduce.db.DBConfiguration  - Fetching password from job credentials store
    9207 [uber-SubtaskRunner] INFO  org.apache.sqoop.mapreduce.db.DBInputFormat  - Using read commited transaction isolation
    9210 [uber-SubtaskRunner] DEBUG org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat  - Creating input split with lower bound '1=1' and upper bound '1=1'
    25643 [uber-SubtaskRunner] INFO  org.apache.sqoop.mapreduce.ImportJobBase  - Transferred 931.1768 KB in 17.6994 seconds (52.6107 KB/sec)
    25649 [uber-SubtaskRunner] INFO  org.apache.sqoop.mapreduce.ImportJobBase  - Retrieved 12435 records.
    25649 [uber-SubtaskRunner] DEBUG org.apache.sqoop.hive.HiveImport  - Hive.inputTable: customers
    25650 [uber-SubtaskRunner] DEBUG org.apache.sqoop.hive.HiveImport  - Hive.outputTable: customers
    25653 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Execute getColumnInfoRawQuery : SELECT t.* FROM `customers` AS t LIMIT 1
    25653 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - No connection paramenters specified. Using regular API for making connection.
    25658 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Using fetchSize for next query: -2147483648
    25658 [uber-SubtaskRunner] INFO  org.apache.sqoop.manager.SqlManager  - Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
    25659 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Found column customer_id of type [4, 11, 0]
    25659 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Found column customer_fname of type [12, 45, 0]
    25659 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Found column customer_lname of type [12, 45, 0]
    25660 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Found column customer_email of type [12, 45, 0]
    25660 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Found column customer_password of type [12, 45, 0]
    25660 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Found column customer_street of type [12, 255, 0]
    25660 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Found column customer_city of type [12, 45, 0]
    25660 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Found column customer_state of type [12, 45, 0]
    25660 [uber-SubtaskRunner] DEBUG org.apache.sqoop.manager.SqlManager  - Found column customer_zipcode of type [12, 45, 0]
    25663 [uber-SubtaskRunner] DEBUG org.apache.sqoop.hive.TableDefWriter  - Create statement: CREATE TABLE IF NOT EXISTS `customers` ( `customer_id` INT, `customer_fname` STRING, `customer_lname` STRING, `customer_email` STRING, `customer_password` STRING, `customer_street` STRING, `customer_city` STRING, `customer_state` STRING, `customer_zipcode` STRING) COMMENT 'Imported by sqoop on 2016/12/22 21:18:39' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\012' STORED AS TEXTFILE
    25664 [uber-SubtaskRunner] DEBUG org.apache.sqoop.hive.TableDefWriter  - Load statement: LOAD DATA INPATH 'hdfs://quickstart.cloudera:8020/user/cloudera/customers' INTO TABLE `customers`
    25667 [uber-SubtaskRunner] INFO  org.apache.sqoop.hive.HiveImport  - Loading uploaded data into Hive
    25680 [uber-SubtaskRunner] DEBUG org.apache.sqoop.hive.HiveImport  - Using in-process Hive instance.
    25683 [uber-SubtaskRunner] DEBUG org.apache.sqoop.util.SubprocessSecurityManager  - Installing subprocess security manager
    Intercepting System.exit(1)
    
    <<< Invocation of Main class completed <<<
    
    Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SqoopMain], exit code [1]
    
    Oozie Launcher failed, finishing Hadoop job gracefully
    
    Oozie Launcher, uploading action data to HDFS sequence file: hdfs://quickstart.cloudera:8020/user/cloudera/oozie-oozi/0000007-161222163830473-oozie-oozi-W/sqoop-52c0--sqoop/action-data.seq
    
    Oozie Launcher ends
    
    
  • As you can see from the log the Sqoop job was able to import data into HDFS in /user/cloudera/customers directory and i could actually see the data in the directory. But when Sqoop tried to create the table in hive it failed and the table did not get created in hive, this is the log statement that i am referring to CREATE TABLE IF NOT EXISTS `customers` ( `customer_id` INT, `customer_fname` STRING, `customer_lname` STRING, `customer_email` STRING, `customer_password` STRING, `customer_street` STRING, `customer_city` STRING, `customer_state` STRING, `customer_zipcode` STRING) COMMENT 'Imported by sqoop on 2016/12/22 21:18:39' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\012' STORED AS TEXTFILE
  • So it seems the problem is Sqoop needs hive-site.xml so that it knows how to talk to hive service, for that first i search my sandbox to figure out where hive-site.xml is located, i executed following command to first find the hive-site.xml and then uploading it to HDFS sudo find / -name hive-site.xml hdfs dfs -put /etc/hive/conf.dist/hive-site.xml
  • After that i went back to the workflow.xml and modified it to look like this
Now when i ran the oozie workflow it was successful and i could query customer data

Importing data from Sqoop into Hive External Table with Avro encoding updated

In the Importing data from Sqoop into Hive External Table with Avro encoding i had details on how you can import a table from RDBMS into Hive using Sqoop in Avro format. In that blog i went through few steps to get the avsc file, but i realized there is easier way to do it following these steps
  1. First execute the sqoop import command like this, make sure that you pass --outdir schema as parameters to the sqoop import command, what that does is it generates the CUSTOMER.avsc and CUSTOMER.java in the schema directory on your local machine
    
    sqoop import --connect jdbc:mysql://localhost/test 
    --username root 
    --password cloudera 
    --table CUSTOMER 
    --as-avrodatafile 
    --outdir schema
    
  2. You can verify that CUSTOMER.avsc file got created as you expected by executing ls -ltrA schema
  3. Next create schema directory in HDFS by executing hdfs mkdir command like this
    
    hdfs dfs -mkdir /user/cloudera/schema
    
  4. Copy the CUSTOMER.avsc from your local schema directory to HDFS in schema directory by executing following command
    
    hdfs dfs -copyFromLocal schema/CUSTOMER.avsc /user/cloudera/schema/.
    
  5. Last step is to create Hive table with CUSTOMER.avsc as schema using following command
    
    CREATE EXTERNAL TABLE CUSTOMER
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
    LOCATION '/user/cloudera/CUSTOMER'
    TBLPROPERTIES ('avro.schema.url'='/user/cloudera/schema/CUSTOMER.avsc');
    
Now if you go to hive and execute "SELECT * FROM CUSTOMER;" query then you should see 1 record in it like this

How to access Hive table from Spark in MapR sandbox

I was trying to figure out how to query a hive table from spark in MapR 5.1 sandbox . So i started spark-shell and tried to query the sample_08 table and i got error saying no such table exists

scala> val sample08 = sqlContext.sql("select * from sample_08")
org.apache.spark.sql.AnalysisException: no such table sample_08; line 1 pos 14
 at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:260)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:268)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:264)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
 at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
 at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:279)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:264)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:254)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
 at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
 at scala.collection.immutable.List.foldLeft(List.scala:84)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
 at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:932)
 at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:932)
 at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:930)
 at org.apache.spark.sql.DataFrame.(DataFrame.scala:132)
 at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
 at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:741)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:19)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:24)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:26)
 at $iwC$$iwC$$iwC$$iwC$$iwC.(:28)
 at $iwC$$iwC$$iwC$$iwC.(:30)
 at $iwC$$iwC$$iwC.(:32)
 at $iwC$$iwC.(:34)
 at $iwC.(:36)
 at (:38)
 at .(:42)
 at .()
 at .(:7)
 at .()
 at $print()
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
 at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
 at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
 at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
 at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
 at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
 at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
 at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
 at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
 at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
 at org.apache.spark.repl.Main$.main(Main.scala:31)
 at org.apache.spark.repl.Main.main(Main.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
 at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
 at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
When i checked the <SPARK_HOME>/conf directory i noticed that hive-site.xml was missing so i searched for hive-site.xml on the cluster i found 2 hive-site.xml but the /opt/mapr/hive/hive-1.2/conf/hive-site.xml had hive.metastore.uris property pointing to thrift://localhost:9083, so i copied it in the hive-site.xml and restarted the shell. When i execute the same query i can see the results.

scala> val sample08 = sqlContext.sql("select * from sample_08")
sample08: org.apache.spark.sql.DataFrame = [code: string, description: string, total_emp: int, salary: int]

Moving data from Avro to ORC files

In the Importing data from Sqoop into Hive External Table with Avro encoding i blogged about how to sqoop data from RDBMS into Hive. But i wanted to take it to next step by moving the data downloaded to ORC table. I followed these steps to achieve that
  1. First thing is to find out the schema of the table in Avro and you can get that by executing following statement in hive
    
    show create table CUSTOMER;
    
    You will get output that looks something like this, it contains schema of the table
    
    CREATE EXTERNAL TABLE `CUSTOMER`(
      `contactid` int COMMENT 'from deserializer',
      `firstname` string COMMENT 'from deserializer',
      `lastname` string COMMENT 'from deserializer',
      `email` string COMMENT 'from deserializer')
    ROW FORMAT SERDE
      'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    STORED AS INPUTFORMAT
      'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
    OUTPUTFORMAT
      'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
    LOCATION
      'hdfs://sandbox.hortonworks.com:8020/tmp/customer/data'
    TBLPROPERTIES (
      'avro.schema.url'='hdfs:///tmp/customer/schema/customer.avsc',
      'transient_lastDdlTime'='1431719666')
    
  2. Copy the schema from last step and remove the part about format and table properties and replace it with part that highlighted in this code, execute this in Hive to create customer table in ORC format
    
    CREATE EXTERNAL TABLE `CUSTOMER_ORC`(
      `contactid` int  ,
      `firstname` string  ,
      `lastname` string  ,
      `email` string  )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '\001'
    LOCATION
      'hdfs://sandbox.hortonworks.com:8020/tmp/customer/data_orc'
    STORED AS ORC tblproperties ("orc.compress"="SNAPPY","orc.row.index.stride"="20000"); 
    
  3. Last step is to copy data from avro table to ORC table, you can achieve that by using following command
    
    insert into table CUSTOMER_ORC select * from customer;
    

Importing data from Sqoop into Hive External Table with Avro encoding

I wanted to figure out how to import content of RDBMS table into Hive with Avro encoding, during this process i wanted to use external hive tables so that i have complete control over the location of files.
Note: I have a different/easier method for doing this in Importing data from Sqoop into Hive External Table with Avro encoding updated
First i did create following table in the mysql database which is on the same machine as that of my HortonWorks Sandbox
  1. First create CUSTOMER table like this in mysql
    
    CREATE TABLE CUSTOMER (       
    contactid INTEGER NOT NULL ,       
    firstname VARCHAR(50),       
    lastname  VARCHAR(50),       
    email varchar(50) );
    
  2. After creating table add couple of records in it by executing following insert statement insert into customer values(1,'Sachin','Tendulark','sachin@gmail.com');
  3. Next step is to run sqoop query that downloads records of the table into HDFS at /tmp/customer/sample. In real world you might want to download only first 10 records or so into Hive, because you need few sample records just to create avro schema
    
    sqoop import --connect jdbc:mysql://localhost/test --table CUSTOMER --username sqoop1 
    --password sqoop -m 1 --create-hive-table 
    --hive-table CONTACT --as-avrodatafile  --target-dir /tmp/customer/sample
    
  4. Running sqoop command it will dump records in HDFS, so first download the avro file generated by sqoop
    
    hdfs dfs -get /tmp/customer/sample/part-m-00000.avro
    
  5. Use the avro-tools-*.jar, to read schema of the file generated by sqoop. by executing following command
    
    java -jar avro-tools-1.7.5.jar getschema part-m-00000.avro > customer.avsc
    
    This is how the customer.avsc file looks like in my case
    
    {
      "type" : "record",
      "name" : "CUSTOMER",
      "doc" : "Sqoop import of CUSTOMER",
      "fields" : [ {
        "name" : "contactid",
        "type" : [ "int", "null" ],
        "columnName" : "contactid",
        "sqlType" : "4"
      }, {
        "name" : "firstname",
        "type" : [ "string", "null" ],
        "columnName" : "firstname",
        "sqlType" : "12"
      }, {
        "name" : "lastname",
        "type" : [ "string", "null" ],
        "columnName" : "lastname",
        "sqlType" : "12"
      }, {
        "name" : "email",
        "type" : [ "string", "null" ],
        "columnName" : "email",
        "sqlType" : "12"
      } ],
      "tableName" : "CUSTOMER"
    }
    
  6. Next step is to upload the avro schema file that you created in the last step back to HDFS, in my case i had HDFS folder called /tmp/customer/schema and i uploaded the avro schema file in it
    
    hdfs dfs -put customer.avsc /tmp/customer/schema/
    
  7. Now go to hive and execute the following command to define External Customer Hive table with avro schema defined in last step
    
    CREATE EXTERNAL TABLE CUSTOMER
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
    LOCATION '/tmp/customer/data'
    TBLPROPERTIES ('avro.schema.url'='hdfs:///tmp/customer/schema/customer.avsc');
    
  8. Last step is to run sqoop again but this time with all the data in the external directory that Customer hive table is pointing to.
    
    sqoop import --connect jdbc:mysql://localhost/test --table CUSTOMER --username sqoop1 
    --password sqoop -m 1 --as-avrodatafile 
     --target-dir /tmp/customer/data --compression-codec snappy
    
Now if you run select query on the CUSTOMER table you should be able to get all the data that you see in your RDBMS

How to view the log files and job.xml generated by Hive on HortonWorks Sandbox

I was working on building some hive code and i ran into some problems, My query kept failing with message like invalid character in job.xml. I tried to locate the job.xml but could not find it. Same thing i could not find any logs on the local machine. So i wanted to figure out how to debug this problem. I followed these steps.
  1. First thing i did in HDP 2.1 sandbox is to enable Ambari by clicking on Enable button like this
  2. Next step was to login into Ambari by going to http://localhost:8080/ and then entering admin/admin as username and password
  3. In the Ambari Application i went to the Yarn Service screen and i did uncheck Resource Manager -<yarn.log-aggregation-enable checkbox. It disables the log aggregation which copies the log files from file system to HDFS and zips them. Disabling the log aggregation keeps them on the local filesystem.
  4. Then in the Advanced section i did change value of yarn.nodemanager.debug-delay-sec to 60000 seconds, By default value of this property is 0, which means as soon as the job is done nodemanager deletes all the intermediate files. By setting it to 60000 i am preventing it from getting deleted for 100 seconds.
  5. THen i did restart all the services using Ambari for my changes to take affect
  6. After that i did execute couple of hive queries and now i could see my logs in /hadoop/yarn/logs directory like this
  7. And i could see the job.xml files being conserved in /hadoop/yarn/local/usercache/root/appcache folder for application like this

How to update records in Hive Table that uses ElasticSearch for storage

I had a requirement in which i wanted to update the Hive table. Now Hive is more of append only database and you cant update records in it (That limitation comes from Text files stored in HDFS which is how stores data by default). But if your using Hive with elasticSearch as storage then you can get this to work. When your using ElasticSearch as storage mechanism then every call from hive to insert or delete data gets forwarded to ElasticSearch API, and ElasticSearch has ability to update existing records. I used this to implement the updatable Hive table. So the scenario is lets assume you have a elasticsearch Index that stores First Name, Last Name and Email as document in ElasticSearch. For that create a new index in ES with name equals cricketers and type equals player by making a CURL call like this.

curl -XPOST "http://localhost:9200/cricketers/player/1" -d'
{
    id: "1",
    fname: "First",
    lname: "Cricketer",
    email: "first.cricketer@gmail.com"
}'
This call will first create a Index named cricketers in ES and insert one document in it, with id equals 1. Next step is to define a external table in Hive that uses org.elasticsearch.hadoop.hive.EsStorageHandler as StorageHandler and is pointing to cricketers/player index that you created in last step. Also important setting is 'es.mapping.id'='id' which is saying that use value of id column as primary key/id in elasticsearch.

create external table cricketers_es(id String, fname String, lname String, email String) stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler' 
TBLPROPERTIES('es.resource'='cricketers/player', 'es.index.auto.create'='false', 'es.mapping.id'='id')
Once the table is created you can check records in it by executing select * from cricketers_es command. Now you should see 1 record that is there in the Index.
Since hive does not have concept of update statement. You will have to create a hive table that will have the records that you want to insert/update(Only delta) and then you will use this delta table for updating the cricketers_es table. In order to do that first create a text file that holds delta of the records that you want to update. In my case i did create this simple cricketers.txt file like this and upload into to HDFS at /user/hue folder

1,sachin,tendulkar,sachin.tendulakar@bcci.com
2,Rahul,Dravid,rahul.dravid@bcci.com
After that create a Hive table called cricketers_stage which will be used for holding the delta records you want by executing following statement

create table cricketers_stage(id String, fname String, lname String, email String) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
Now use following Hive statement to load your delta records into cricketers_stage like this.

LOAD DATA INPATH '/user/hue/cricketers.txt' INTO TABLE `default.cricketers_stage`
Next step ElasticSearch write mode to upsert by setting following property in the Hive console.

es.write.operation=upsert
The last step is to execute following statement in Hive which will take content of cricketers_stage and insert those records into cricketers_es table.

insert into cricketers_es select * from cricketers_stage
Now if you run select * from cricketers_es you should see 2 records your first record is updated and record with id 2 is new insert.

1,sachin,tendulkar,sachin.tendulakar@gmail.com
2,Rahul,Dravid,rahul.dravid@bcci.com
You can also verify the records in elasticsearch by executing following CURL command

curl -XPOST "http://localhost:9200/cricketers/player/_search?pretty=true" -d'
{
  "query": {
      "match_all": {}
  }
}'

Connecting to HDP 2.2 Hive using JDBC driver

In the http://wpcertification.blogspot.com/2014/04/connecting-to-hive-using-jdbc-client.html i blogged about how to connect to Hive using Apache Hive. I wanted to figure out how to connect to Hive 0.13 in Hortonworks Sandbox 2.2, so i followed these steps You can download the complete source code for this project from here First i did create App.java class like this. When connecting to HDP 2.2 i had to use HiveServer2. I followed instructions on HDP Documents I used following maven build file, you can see i am including Hive, Hadoop and Log4j jar. The log4j.jar lets me enable logging Once my code was complete i tried connecting to Hive and got following error around access control.

java.sql.SQLException: Error while compiling statement: FAILED: HiveAccessControlException Permission denied. 
Principal [name=root, type=USER] does not have following privileges on Object [type=TABLE_OR_VIEW, name=default.sample_07] : [SELECT]
 at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:121)
 at org.apache.hive.jdbc.Utils.verifySuccessWithInfo(Utils.java:109)
 at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:231)
 at org.apache.hive.jdbc.HiveStatement.executeQuery(HiveStatement.java:355)
 at com.spnotes.hive.App.main(App.java:24)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:601)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)

I had to use following command in hive console to give access to root user for querying employee table.

hive> grant select on table sample_08 to user employee;

Importing data from RDBMS into Hive using create-hive-table of sqoop

In the Importing data from RDBMS into Hive i blogged about how to import data from RDBMS into Hive using Sqoop. In that case the import command took care of both creating table in Hive based on RDMBS table as well as importing data from RDBMS into Hive. But Sqoop can also be used to import data stored in HDFS text file into Hive. I wanted to try that out, so what i did is i created the contact table in Hive manually and then used the contact table that i exported as text file into HDFS as input
  1. First i used sqoop import command to import content of Contact table into HDFS as text file. By default sqoop will use , for separating columns and newline for separating
    
    sqoop import --connect jdbc:mysql://macos/test --table contact -m 1
    
    After import is done i can see content of the text file by executing hdfs dfs -cat contact/part-m-00000 like this
  2. After that you can use sqoop to create table into hive based on schema of the CONTACT table in RDBMS. by executing following command
    
    sqoop create-hive-table --connect jdbc:mysql://macos/test --table Address --fields-terminated-by ','
    
  3. Last step is to use Hive for loading content of contact text file into contact table. by executing following command.
    
    LOAD DATA INPATH 'contact' into table contact;
    

Importing data from RDBMS into Hive using sqoop

In the Importing data from RDBMS into Hadoop i blogged about how to import content of RDBMS into Hadoop Text file using Sqoop. But its more common to import the content of RDMBS into Hive. I wanted to try that out, so i decided to import content of the Contact table that i created in the Importing data from RDBMS into Hadoop entry in Contact table in Hive on my local machine. I followed these steps
  1. First take a look at content of Contact table in my local MySQL like this (SELECT * from CONTACT)
  2. Next step is to use sqoop import command like this
    
    sqoop import --connect jdbc:mysql://macos/test --table Address -m 1 --hive-import
    
    As you will notice this command is same as hive import command that i used in last blog entry to import content of RDMBS into text file, only difference is i had to add --hive-import switch
  3. This command takes care of first creating Contact table into Hive and then importing content of CONTACT table from RDMBS into CONTACT table in Hive. Now i can see content of Contact table in Hive like this

Using elasticsearch as external data store with apache hive

ElasticSearch has this feature in which you can configure Hive table that actually points to index in ElasticSearch. I wanted to learn how to use this feature so i followed these steps
  1. First i did create contact/contact index and type in ElasticSearch and i did insert 4 records in it like this
  2. Next i did download ElasticSearch Hadoop zip file on my Hadoop VM by executing following command
    
    wget http://download.elasticsearch.org/hadoop/elasticsearch-hadoop-2.0.0.RC1.zip
    
    I did expand the elasticsearch-hadoop-2.0.0.RC1.zip in the /root directory
  3. Next i had to start the hive console by executing following command, take a look at how i had to add elasticsearch-hadoop-2.0.0.RC1.jar to the aux.jars.path hive -hiveconf hive.aux.jars.path=/root/elasticsearch-hadoop-2.0.0.RC1/dist/elasticsearch-hadoop-2.0.0.RC1.jar
  4. Next i did define artists table in hive that points to contact index in the elasticsearch server like this
    
    CREATE EXTERNAL TABLE artists (
    fname STRING,
    lname STRING,
    email STRING)
    STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
    TBLPROPERTIES('es.resource' = 'contact/contact',
                  'es.index.auto.create' = 'false') ;
    
  5. Once the table is configured i could query it like any normal Hive table like this

Connecting to Hive using Beeline client

I like to use Hive Beeline client better than using the default hive command line interface. On my cloudera VM 4.4.0-1 i can connect to beeline client by executing following command beeline -u jdbc:hive://

Connecting to Hive using JDBC client

I wanted to try out connecting to Hive using a JDBC driver, so i followed these steps. You can download the Maven project from this location
  1. First start the HiveServer by executing hive --service hiveserver command
  2. Make sure that Hive is running by executing netstat --tnlp | grep 10000
  3. Now use the following Java code for connecting and executing simple select query on Hive