In the
WordCount program writtten using Spark framework written in python language entry i talked about how to create WordCount program using Apache Spark in python language. Now i wanted to try building same program using Java, so i followed these steps. You can download the full code from
here
This is how the Java class looks like, it has 4 anonymous classes
- file.flatMap(new FlatMapFunction() {}): This class gets called for every line in the input file, it splits the input line and return list of words in the line
- words.filter(new Function() {}): This class gets the list of words and checks if the word is actually a word or just a blank space. My input file has some blank spaces so i used this filter (Part of the reason was to try out Filters) to remove the words which are just blank spaces
- words.mapToPair(new PairFunction() {}): This class takes list of words as input and converts into Tuple2 with word as key and 1 as value
- wordToCountMap.reduceByKey(new Function2() {}): This class gets called with running total and count as input and only thing that is does is adds the current count to running total
This is how the maven build script for my program looks like, i had to fight through couple of issues to build this maven, first one is by default when you add
org.apache.spark.spark-core_2.10
dependency it includes HDFS 1.0.4 jars, and in my case i have Hadoop 2.4 server so i got following error, you get this error even if you include HDFS related jars because now suddenly you have both Hadoop 2.4 and Hadoop 1.0.4 jars so you have to ask maven to exclude hadoop client included by
org.apache.spark.spark-core_2.10
14/08/25 12:16:50 WARN snappy.LoadSnappy: Snappy native library not loaded
Exception in thread "main" org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4
at org.apache.hadoop.ipc.Client.call(Client.java:1070)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source)
at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:238)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:203)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.FilteredRDD.getPartitions(FilteredRDD.scala:29)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:59)
at org.apache.spark.api.java.JavaPairRDD.reduceByKey(JavaPairRDD.scala:385)
at com.spnotes.spark.WordCount2.main(WordCount2.java:62)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
Once that part is done, i ran into issue because of 2 different versions of Jetty getting included
Exception in thread "main" java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package
so i had to add Jetty related jars exclusion filter in
org.apache.hadoop.hadoop-common
This turned out to be EXACTLY what I was looking for!
ReplyDeleteRashmi
Thank you ! Was facing the exact same problem and this fixed it.
ReplyDeleteThanks ! I was looking for this for so long
ReplyDeleteGreat Article.
ReplyDeleteThis is what I was searching to understand spark....
I'm a novice to Intellij and to Java, but that's exactly what I am looking for too. Can you please be a little more explicit about the last point? Specifically, what is the name of the file in which you specify the exclusion, and where exactly should it be? Thanks!
ReplyDeleteIs there a way I can do the same using DataFrames API.?
ReplyDeleteGood article to start.
ReplyDeletejava codings with examples
ReplyDeleteI really appreciate information shared above. It’s of great help. If someone want to learn Online (Virtual) instructor lead live training in Apache Spark and Scala, kindly contact us http://www.maxmunus.com/contact
ReplyDeleteMaxMunus Offer World Class Virtual Instructor led training on TECHNOLOGY. We have industry expert trainer. We provide Training Material and Software Support. MaxMunus has successfully conducted 100000+ trainings in India, USA, UK, Australlia, Switzerland, Qatar, Saudi Arabia, Bangladesh, Bahrain and UAE etc.
For Demo Contact us.
Sangita Mohanty
MaxMunus
E-mail: sangita@maxmunus.com
Skype id: training_maxmunus
Ph:(0) 9738075708 / 080 - 41103383
http://www.maxmunus.com/
Thanks for info....
ReplyDeleteWebsite development in Bangalore
Thanks you for sharing such an amazing blog.
ReplyDeletevery nice post,keep sharning more blogs.
intrested candidates visit our blog
big data hadoop certification
big data hadoop training
big data online course
This post is so interactive and informative.keep updating more information...
ReplyDeleteJava Learning Course
What Can We Do With Java