- file.flatMap(new FlatMapFunction
() {}): This class gets called for every line in the input file, it splits the input line and return list of words in the line - words.filter(new Function
() {}): This class gets the list of words and checks if the word is actually a word or just a blank space. My input file has some blank spaces so i used this filter (Part of the reason was to try out Filters) to remove the words which are just blank spaces - words.mapToPair(new PairFunction
() {}): This class takes list of words as input and converts into Tuple2 with word as key and 1 as value - wordToCountMap.reduceByKey(new Function2
() {}): This class gets called with running total and count as input and only thing that is does is adds the current count to running total
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.spnotes.spark; | |
import org.apache.spark.SparkConf; | |
import org.apache.spark.api.java.JavaPairRDD; | |
import org.apache.spark.api.java.JavaRDD; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.api.java.function.FlatMapFunction; | |
import org.apache.spark.api.java.function.Function; | |
import org.apache.spark.api.java.function.Function2; | |
import org.apache.spark.api.java.function.PairFunction; | |
import scala.Tuple2; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
/** | |
* Created by user on 8/24/14. | |
*/ | |
public class WordCount2 { | |
public static void main(String[] argv){ | |
if (argv.length != 2) { | |
System.err.printf("Usage: %s [generic options] <input> <output>\n", | |
WordCount.class.getSimpleName()); | |
return; | |
} | |
String inputPath = argv[0]; | |
String outputPath = argv[1]; | |
System.out.printf("Starting WordCount program with %s as input %s as output\n", inputPath,outputPath); | |
SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local"); | |
JavaSparkContext sc = new JavaSparkContext(conf); | |
JavaRDD<String> file = sc.textFile(inputPath); | |
JavaRDD<String> words = file.flatMap(new FlatMapFunction<String, String>() { | |
@Override | |
public Iterable<String> call(String s) throws Exception { | |
return Arrays.asList(s.split(" ")); | |
} | |
}); | |
words = words.filter(new Function<String, Boolean>() { | |
@Override | |
public Boolean call(String s) throws Exception { | |
System.out.println("Inside filter words ->" +s); | |
if( s.trim().length() == 0) | |
return false; | |
return true; | |
} | |
}); | |
JavaPairRDD<String, Integer> wordToCountMap = words.mapToPair(new PairFunction<String, String, Integer>() { | |
@Override | |
public Tuple2<String, Integer> call(String s) throws Exception { | |
return new Tuple2<String, Integer>(s,1); | |
} | |
}); | |
JavaPairRDD<String, Integer> wordCounts = wordToCountMap.reduceByKey(new Function2<Integer, Integer, Integer>() { | |
@Override | |
public Integer call(Integer first, Integer second) throws Exception { | |
return first + second; | |
} | |
}); | |
wordCounts.saveAsTextFile(outputPath); | |
} | |
} |
org.apache.spark.spark-core_2.10
dependency it includes HDFS 1.0.4 jars, and in my case i have Hadoop 2.4 server so i got following error, you get this error even if you include HDFS related jars because now suddenly you have both Hadoop 2.4 and Hadoop 1.0.4 jars so you have to ask maven to exclude hadoop client included by org.apache.spark.spark-core_2.10
14/08/25 12:16:50 WARN snappy.LoadSnappy: Snappy native library not loaded
Exception in thread "main" org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4
at org.apache.hadoop.ipc.Client.call(Client.java:1070)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source)
at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:238)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:203)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.FilteredRDD.getPartitions(FilteredRDD.scala:29)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:59)
at org.apache.spark.api.java.JavaPairRDD.reduceByKey(JavaPairRDD.scala:385)
at com.spnotes.spark.WordCount2.main(WordCount2.java:62)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
Once that part is done, i ran into issue because of 2 different versions of Jetty getting included
Exception in thread "main" java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package
so i had to add Jetty related jars exclusion filter in org.apache.hadoop.hadoop-common
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>WordCountSpark</groupId> | |
<artifactId>WordCountSpark</artifactId> | |
<version>1.0-SNAPSHOT</version> | |
<properties> | |
<hadoop.version>2.4.0</hadoop.version> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.apache.spark</groupId> | |
<artifactId>spark-core_2.10</artifactId> | |
<version>1.0.0</version> | |
<exclusions> | |
<exclusion> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-client</artifactId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-hdfs</artifactId> | |
<version>${hadoop.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-common</artifactId> | |
<version>${hadoop.version}</version> | |
<exclusions> | |
<exclusion> | |
<groupId>org.eclipse.jetty</groupId> | |
<artifactId>*</artifactId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-mapreduce-client-app</artifactId> | |
<version>${hadoop.version}</version> | |
</dependency> | |
</dependencies> | |
</project> |
12 comments:
This turned out to be EXACTLY what I was looking for!
Rashmi
Thank you ! Was facing the exact same problem and this fixed it.
Thanks ! I was looking for this for so long
Great Article.
This is what I was searching to understand spark....
I'm a novice to Intellij and to Java, but that's exactly what I am looking for too. Can you please be a little more explicit about the last point? Specifically, what is the name of the file in which you specify the exclusion, and where exactly should it be? Thanks!
Is there a way I can do the same using DataFrames API.?
Good article to start.
java codings with examples
I really appreciate information shared above. It’s of great help. If someone want to learn Online (Virtual) instructor lead live training in Apache Spark and Scala, kindly contact us http://www.maxmunus.com/contact
MaxMunus Offer World Class Virtual Instructor led training on TECHNOLOGY. We have industry expert trainer. We provide Training Material and Software Support. MaxMunus has successfully conducted 100000+ trainings in India, USA, UK, Australlia, Switzerland, Qatar, Saudi Arabia, Bangladesh, Bahrain and UAE etc.
For Demo Contact us.
Sangita Mohanty
MaxMunus
E-mail: sangita@maxmunus.com
Skype id: training_maxmunus
Ph:(0) 9738075708 / 080 - 41103383
http://www.maxmunus.com/
Thanks for info....
Website development in Bangalore
Thanks you for sharing such an amazing blog.
very nice post,keep sharning more blogs.
intrested candidates visit our blog
big data hadoop certification
big data hadoop training
big data online course
This post is so interactive and informative.keep updating more information...
Java Learning Course
What Can We Do With Java
Post a Comment