Using WebHDFS as input and output for MapReduce program

In the WordCount(HelloWorld) MapReduce program blog i talked about how to create simple WordCount MapReduce program. Then in the WebHDFS REST API entry i blogged about how to configure WebHDFS end point for your hadoop installation. I wanted to combine both those things so that my MapReduce program reads input using WebHDFS and writes output back to HDFS using WebHDFS. First i changed the program arguments to use webhdfs URL for both input and output to MapReduce program.

hadoop jar WordCount.jar webhdfs://172.16.225.192:50070/test/startupdemo.txt webhdfs://172.16.225.192:50070/output
When i tried to run this program i got org.apache.hadoop.security.AccessControlException exception, so in this case it was taking the login user name on my machine (I run hadoop on vm and my Eclipse IDE with MapReduce on my machine directly) and using it to to run MapReduce program. Since the HDFS system does not allow user sunil to create any files in HDFS.

14/05/09 10:10:18 WARN mapred.LocalJobRunner: job_local_0001
org.apache.hadoop.security.AccessControlException: Permission denied: user=gpzpati, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
 at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
 at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:90)
 at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:57)
 at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:280)
 at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.run(WebHdfsFileSystem.java:427)
 at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.mkdirs(WebHdfsFileSystem.java:469)
 at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1731)
 at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:82)
 at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:236)
Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=gpzpati, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x
 at org.apache.hadoop.hdfs.web.JsonUtil.toRemoteException(JsonUtil.java:167)
 at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:279)
 ... 5 more
So i changed the main method of my program to wrap it in UserGroupInformation.doAs() call. In that call i am overriding name of the user used for running MapReduce to hdfs. Now it works ok. Note: In order for this program to work your NameNode and DataNodes should have valid name (network recognizable). Because when you run the MapReduce first it makes OPEN request to the webhdfs://172.16.225.192:50070/test/startupdemo.txt URL, the NameNode will send a redirect response with URL pointing to data node like this

$curl -i 'http://172.16.225.192:50070/webhdfs/v1/test/startupdemo.txt?op=OPEN'
HTTP/1.1 307 TEMPORARY_REDIRECT
Cache-Control: no-cache
Expires: Tue, 06 May 2014 21:44:56 GMT
Date: Tue, 06 May 2014 21:44:56 GMT
Pragma: no-cache
Expires: Tue, 06 May 2014 21:44:56 GMT
Date: Tue, 06 May 2014 21:44:56 GMT
Pragma: no-cache
Location: http://ubuntu:50075/webhdfs/v1/test/startupdemo.txt?op=OPEN&namenoderpcaddress=localhost:9000&offset=0
Content-Type: application/octet-stream
Content-Length: 0
Server: Jetty(6.1.26)
Now if your Datanode hostname which is ubuntu in my case is not directly addressable then it will fail with error like this, you can fix this issue by mapping name ubuntu to the right ip in your /etc/host file.

14/05/09 10:16:34 WARN mapred.LocalJobRunner: job_local_0001
java.net.UnknownHostException: ubuntu
 at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:223)
 at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:431)
 at java.net.Socket.connect(Socket.java:527)
 at java.net.Socket.connect(Socket.java:476)
 at sun.net.NetworkClient.doConnect(NetworkClient.java:163)
 at sun.net.www.http.HttpClient.openServer(HttpClient.java:424)
 at sun.net.www.http.HttpClient.openServer(HttpClient.java:538)
 at sun.net.www.http.HttpClient.(HttpClient.java:214)
 at sun.net.www.http.HttpClient.New(HttpClient.java:300)
 at sun.net.www.http.HttpClient.New(HttpClient.java:319)
 at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:987)
 at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:923)
 at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:841)
 at sun.net.www.protocol.http.HttpURLConnection.followRedirect(HttpURLConnection.java:2156)
 at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1390)
 at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:379)
 at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:264)
 at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.access$000(WebHdfsFileSystem.java:106)
 at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$OffsetUrlInputStream.checkResponseCode(WebHdfsFileSystem.java:688)
 at org.apache.hadoop.hdfs.ByteRangeInputStream.openInputStream(ByteRangeInputStream.java:121)
 at org.apache.hadoop.hdfs.ByteRangeInputStream.getInputStream(ByteRangeInputStream.java:103)
 at org.apache.hadoop.hdfs.ByteRangeInputStream.read(ByteRangeInputStream.java:158)
 at java.io.DataInputStream.read(DataInputStream.java:83)
 at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
 at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:114)
 at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:458)
 at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:76)
 at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:85)
 at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:139)
 at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:645)
 at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
 at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:263)
14/05/09 10:16:35 INFO mapred.JobClient:  map 0% reduce 0%
14/05/09 10:16:35 INFO mapred.JobClient: Job complete: job_local_0001
14/05/09 10:16:35 INFO mapred.JobClient: Counters: 0
job.isSuccessful false

No comments: