sparkStreamingContext.textFileStream(<directoryName>)
on it. Once a new file appears in the directory the value of fileRDD.count() would return more than 0 and then i invoke processNewFile()
. The processNewFile()
method takes a RDD[String], iterates through the file content and prints it to console
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.spnotes.spark | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.streaming.{Seconds, StreamingContext} | |
import org.apache.spark.{Logging, SparkConf} | |
object HDFSFileStream extends Logging { | |
def main(args: Array[String]): Unit = { | |
if (args.length != 2) { | |
println("Please provide 2 parameters <directoryToMonitor> <microbatchtime>") | |
System.exit(1) | |
} | |
val directoryToMonitor = args(0) | |
val microBatchTime = args(1).toInt | |
val sparkConf = new SparkConf().setAppName("HDFSFileStream") | |
val sparkStreamingContext = new StreamingContext(sparkConf, Seconds(microBatchTime)) | |
logInfo("Value of microBatchTime " + microBatchTime) | |
logInfo("DirectoryToMonitor " + directoryToMonitor) | |
val directoryStream = sparkStreamingContext.textFileStream(directoryToMonitor) | |
logInfo("After starting directoryStream") | |
directoryStream.foreachRDD { fileRdd => | |
if (fileRdd.count() != 0) | |
processNewFile(fileRdd) | |
} | |
sparkStreamingContext.start() | |
sparkStreamingContext.awaitTermination() | |
logInfo("Exiting HDFSFileStream.main") | |
} | |
def processNewFile(fileRDD: RDD[String]): Unit = { | |
logDebug("Entering processNewFile " ) | |
fileRDD.foreach{ line => | |
println(line) | |
} | |
logDebug("Exiting processNewFile " ) | |
} | |
} |
bin/spark-submit ~/HelloSparkStreaming-1.0-SNAPSHOT-jar-with-dependencies.jar /user/mapr/stream 3
Once the streaming started it starts monitoring /user/mapr/stream
directory, for new content. I copied a file with few lines in it and i got the following output, which is content of the file
5 comments:
Hello Sunil,
Thanks for the blog. I need to implement similar code for a component of mine. Had a couple of questions ? I mean
1. I was wondering what kind of load will your approach place on the hdfs namenode. Will you be constantly pinging the namenode to get the latest files on your monitored directories ? Is this going to cause any adverse performance issues on the cluster?
2. Is there a push model instead of this pull model so that your code can be notified of the existence of a new file in hdfs ; rather than querying hdfs ourselves ?
Thanks !
Basanth
Hi Sunil
I would like to refer HDFSFileStream.scala.
I have below requirement
I need to monitor few directories in our cluster and make report every month for below things;
Size of the directories previous month and current month
Monitor new folders which are getting added in current month.
create this report every month
can i get the above code for pyspark?
Thanks for info....
Website development in Bangalore
very nice article,Thank you ..
Keep updating..
Big Data Hadoop Course
Post a Comment