Monitoring HDFS directory for new files using Spark Streaming

I wanted to build this simple Spark Streaming application that monitors a particular directory in HDFS and whenever a new file shows up, i want to print its content to Console. I built this HDFSFileStream.scala. In this program after creating a SparkStreamContext. I am calling sparkStreamingContext.textFileStream(<directoryName>) on it. Once a new file appears in the directory the value of fileRDD.count() would return more than 0 and then i invoke processNewFile(). The processNewFile() method takes a RDD[String], iterates through the file content and prints it to console
package com.spnotes.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{Logging, SparkConf}
object HDFSFileStream extends Logging {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
println("Please provide 2 parameters <directoryToMonitor> <microbatchtime>")
System.exit(1)
}
val directoryToMonitor = args(0)
val microBatchTime = args(1).toInt
val sparkConf = new SparkConf().setAppName("HDFSFileStream")
val sparkStreamingContext = new StreamingContext(sparkConf, Seconds(microBatchTime))
logInfo("Value of microBatchTime " + microBatchTime)
logInfo("DirectoryToMonitor " + directoryToMonitor)
val directoryStream = sparkStreamingContext.textFileStream(directoryToMonitor)
logInfo("After starting directoryStream")
directoryStream.foreachRDD { fileRdd =>
if (fileRdd.count() != 0)
processNewFile(fileRdd)
}
sparkStreamingContext.start()
sparkStreamingContext.awaitTermination()
logInfo("Exiting HDFSFileStream.main")
}
def processNewFile(fileRDD: RDD[String]): Unit = {
logDebug("Entering processNewFile " )
fileRDD.foreach{ line =>
println(line)
}
logDebug("Exiting processNewFile " )
}
}
Next start the program by executing following code bin/spark-submit ~/HelloSparkStreaming-1.0-SNAPSHOT-jar-with-dependencies.jar /user/mapr/stream 3 Once the streaming started it starts monitoring /user/mapr/stream directory, for new content. I copied a file with few lines in it and i got the following output, which is content of the file

5 comments:

Basanth said...

Hello Sunil,

Thanks for the blog. I need to implement similar code for a component of mine. Had a couple of questions ? I mean

1. I was wondering what kind of load will your approach place on the hdfs namenode. Will you be constantly pinging the namenode to get the latest files on your monitored directories ? Is this going to cause any adverse performance issues on the cluster?

2. Is there a push model instead of this pull model so that your code can be notified of the existence of a new file in hdfs ; rather than querying hdfs ourselves ?

Thanks !
Basanth

Jay M said...

Hi Sunil

I would like to refer HDFSFileStream.scala.
I have below requirement

I need to monitor few directories in our cluster and make report every month for below things;
Size of the directories previous month and current month
Monitor new folders which are getting added in current month.
create this report every month

Unknown said...

can i get the above code for pyspark?

Abhi said...

Thanks for info....
Website development in Bangalore

Veera Blogspot said...

very nice article,Thank you ..

Keep updating..

Big Data Hadoop Course