Hello Spark Streaming

In the WordCount program built using Apache Spark in Java , i built simple Spark program that takes name of the file as input, reads the file and performs word count on the file. Now Spark also has concept of Spark Streaming which allows you to read file as stream of real time events instead of one time load of input file. But the API for transforming the data, in both cases Spark converts the input in RDD. In case of Spark Streaming it would convert the input events into Micro RDD which is nothing but collecting all the incoming data for certain duration (microBatchTime) and then exposes it as RDD. I built this simple NetcatStreamClient Streaming application that listens for incoming data on netcat port, once it has data it performs wordcount on it and prints that to console. You can download the full source code from GitHub

package com.spnotes.spark

import com.typesafe.scalalogging.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.slf4j.LoggerFactory

object NetcatStreamClient{
  val logger = Logger(LoggerFactory.getLogger("NetcatStreamClient"))

  def main(argv:Array[String]): Unit ={
    logger.debug("Entering NetcatStreamClient.main")
    if(argv.length != 3){
      println("Please provide 3 parameters   ")
      System.exit(1)
    }
    val hostName =argv(0)
    val port = argv(1).toInt
    val microBatchTime = argv(2).toInt

    logger.debug(s"Listening on $hostName at $port batching records every $microBatchTime")

    //Create Spark Configuration
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
 
 
    //Create SparkStreamingContext with microBatchTime which specifies how long spark should collect data
    val sparkStreamingContext = new StreamingContext(sparkConf,Seconds(microBatchTime))
    //Start listening for data on given host and port
    val lines = sparkStreamingContext.socketTextStream(hostName,port)
// Logic for implementing word count on the input batch
    lines.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_).print()
 
    logger.debug("Number of words " + lines.count())

    //Start the stream so that data starts flowing, you must define transformation logic before calling start()
    sparkStreamingContext.start()
    sparkStreamingContext.awaitTermination()
    logger.debug("Exiting NetcatStreamClient.main")
  }
}
Once your project is built using mvn clean compile assembly:single, first thing you should do is executing following command to start netcat on localhost at port 9999 nc -l 9999
Next execute following code to start Spark Driver that takes 3 parameters host and port where netcat is listening and last parameter is how the batch duration should be bin/spark-submit ~/HelloSparkStreaming-1.0-SNAPSHOT-jar-with-dependencies.jar localhost 9999 5

1 comment: