In the
WordCount program built using Apache Spark in Java , i built simple Spark program that takes name of the file as input, reads the file and performs word count on the file.
Now Spark also has concept of Spark Streaming which allows you to read file as stream of real time events instead of one time load of input file. But the API for transforming the data, in both cases Spark converts the input in RDD. In case of Spark Streaming it would convert the input events into Micro RDD which is nothing but collecting all the incoming data for certain duration (microBatchTime) and then exposes it as RDD.
I built this simple NetcatStreamClient Streaming application that listens for incoming data on netcat port, once it has data it performs wordcount on it and prints that to console. You can download the full source code from
GitHub
package com.spnotes.spark
import com.typesafe.scalalogging.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.slf4j.LoggerFactory
object NetcatStreamClient{
val logger = Logger(LoggerFactory.getLogger("NetcatStreamClient"))
def main(argv:Array[String]): Unit ={
logger.debug("Entering NetcatStreamClient.main")
if(argv.length != 3){
println("Please provide 3 parameters ")
System.exit(1)
}
val hostName =argv(0)
val port = argv(1).toInt
val microBatchTime = argv(2).toInt
logger.debug(s"Listening on $hostName at $port batching records every $microBatchTime")
//Create Spark Configuration
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
//Create SparkStreamingContext with microBatchTime which specifies how long spark should collect data
val sparkStreamingContext = new StreamingContext(sparkConf,Seconds(microBatchTime))
//Start listening for data on given host and port
val lines = sparkStreamingContext.socketTextStream(hostName,port)
// Logic for implementing word count on the input batch
lines.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_).print()
logger.debug("Number of words " + lines.count())
//Start the stream so that data starts flowing, you must define transformation logic before calling start()
sparkStreamingContext.start()
sparkStreamingContext.awaitTermination()
logger.debug("Exiting NetcatStreamClient.main")
}
}
Once your project is built using
mvn clean compile assembly:single
, first thing you should do is executing following command to start netcat on localhost at port 9999
nc -l 9999
Next execute following code to start Spark Driver that takes 3 parameters host and port where netcat is listening and last parameter is how the batch duration should be
bin/spark-submit ~/HelloSparkStreaming-1.0-SNAPSHOT-jar-with-dependencies.jar localhost 9999 5
Thanks for info....
ReplyDeleteWebsite development in Bangalore