- First step is to create test table in HBase with CF1 as column family. Everytime Flume gets a event it will write to HBase in test table in CF1 column family
create 'test','CF1'
-
Create Flume configuration file that looks like this, I am using HBase sink with
SimpleHbaseEventSerializer
as Event Serializer. Note that i am assuming that this is unsecured cluster (Sandbox), but if you have secured cluster you should follow steps mentioned in Configure a Secure HBase Sink - Start the Flume server with the following command
bin/flume-ng agent --conf conf --conf-file conf/netcat-hbase.properties --name agent1 -Dflume.root.logger=DEBUG,console
- Now open the netcat client on port 44444 and send some messages to flume
- If you query HBase test table, you should see the messages that were published to netcat
Showing posts with label flume. Show all posts
Showing posts with label flume. Show all posts
How to use HBase sink with Flume
I wanted to figure out how to use HBase as target for flume, so i created this sample configuration which reads events from netcat and writes them to HBase.
Flume to Spark Streaming - Pull model
In this post i will demonstrate how to stream data from flume into Spark using Streaming. When it comes to Streaming data from Flume to Spark you have 2 options.
- Push Model: Spark listens on particular port for Avro event and flume connects to that port and publishes event
- Pull Model: You use special Spark Sink in flume that keeps collecting published data and Spark pulls that data at certain frequency
- First download spark-streaming-flume-sink_2.10-1.6.0.jar and copy it to flume/lib directory
- Next create flume configuration that looks like this, as you can see, Flume is listening for netcat event on port 44444 and it is taking every event and replicating it to both logger and Spark sink. Spark sink would listen on port 9999 for Spark program to connect
-
This is how your Spark driver will look like. The Spark Flume listener gets event in avro format so you will have to call
event.getBody().array() to get the event.
Configuring Flume to use Twitter as Source
I wanted to figure out how to Configure Twitter as source for Flume so i tried these steps
- First go to Twitter Application Management page and configure application. This should give you consumerKey, consumerSecret, accessToken and accessTokenSecret
-
Next create twitterflume.properties, that looks like this. You should create source of
org.apache.flume.source.twitter.TwitterSource
type and use the 4 values you got in the last step to configure access to twitteragent1.sources = twitter1 agent1.sinks = logger1 agent1.channels = memory1 agent1.sources.twitter1.type = org.apache.flume.source.twitter.TwitterSource agent1.sources.twitter1.consumerKey =<consumerkey> agent1.sources.twitter1.consumerSecret =<consumerSecret> agent1.sources.twitter1.accessToken =<accessToken> agent1.sources.twitter1.accessTokenSecret =<accessTokenSecret> agent1.sources.twitter1.keywords = bigdata, hadoop agent1.sources.twitter1.maxBatchSize = 10 agent1.sources.twitter1.maxBatchDurationMillis = 200 # Describe the sink agent1.sinks.logger1.type = logger # Use a channel which buffers events in memory agent1.channels.memory1.type = memory agent1.channels.memory1.capacity = 1000 agent1.channels.memory1.transactionCapacity = 100 # Bind the source and sink to the channel agent1.sources.twitter1.channels = memory1 agent1.sinks.logger1.channel = memory1
-
Now last step is to run the flume agent and you should see twitter messages being dumped to console
bin/flume-ng agent --conf conf --conf-file conf/twitterflume.properties --name agent1 -Dflume.root.logger=DEBUG,console
date
command on my sandbox i got date which was 3 days in the past. So i did restart the VM and after restart when i tried date
command it gave me accurate time and the following error went away
[Twitter Stream consumer-1[Establishing connection]] ERROR
org.apache.flume.source.twitter.TwitterSource (TwitterSource.java:331) -
Exception while streaming tweets
stream.twitter.com
Relevant discussions can be found on the Internet at:
http://www.google.co.jp/search?q=d0031b0b or
http://www.google.co.jp/search?q=1db75522
TwitterException{exceptionCode=[d0031b0b-1db75522 db667dea-99334ae4],
statusCode=-1, message=null, code=-1, retryAfter=-1, rateLimitStatus=null,
version=3.0.3}
at
twitter4j.internal.http.HttpClientImpl.request(HttpClientImpl.java:192)
at
twitter4j.internal.http.HttpClientWrapper.request(HttpClientWrapper.java:61)
at
twitter4j.internal.http.HttpClientWrapper.get(HttpClientWrapper.java:89)
at
twitter4j.TwitterStreamImpl.getSampleStream(TwitterStreamImpl.java:176)
at twitter4j.TwitterStreamImpl$4.getStream(TwitterStreamImpl.java:164)
at
twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run
(TwitterStreamImpl.java:462)
Caused by: java.net.UnknownHostException: stream.twitter.com
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:637)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
at sun.net.www.protocol.https.HttpsClient.(HttpsClient.java:264)
at sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:367)
at
sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.
getNewHttpClient
(AbstractDelegateHttpsURLConnection.java:191)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect
(HttpURLConnection.java:933)
at
sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect
(AbstractDelegateHttpsURLConnection.java:177)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream
(HttpURLConnection.java:1301)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:468)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode
(HttpsURLConnectionImpl.java:338)
at twitter4j.internal.http.HttpResponseImpl.
(HttpResponseImpl.java:34)
at
twitter4j.internal.http.HttpClientImpl.request(HttpClientImpl.java:156)
Using Syslog as source in Flume
I wanted to figure out how to use Flume for receiving Syslog message. So i tried 2 different configurations one is using Syslog server on TCP port and other on UDP port.
This is the flume configuration for listening on UDP port
Copy the flumesyslogudp.properties file in the conf directory of your flume server and use following command to start flume server
bin/flume-ng agent --conf conf --conf-file conf/flumesyslogudp.properties --name agent1
-Dflume.root.logger=DEBUG,console
Or you can configure flume to listen on TCP port. Only difference is the source type is syslogtcp
instead of syslogudp
bin/flume-ng agent --conf conf --conf-file conf/flumesyslogtcp.properties --name agent1
-Dflume.root.logger=DEBUG,console
Flume Hello World tutotiral
I am using flume for some time now and really like it. This is simple HelloWorld tutorial that i thought would be helpful if you want to get started with Flume. This tutorial will walk you through steps for setting up Flume that listens to messages on port 44444, once it gets message it just prints it out on console, Follow these steps
- First create sampleflume.properties file on your machine like this
Your flume configuration file must have at least 3 elements a source, channel and sink# example.conf: A single-node Flume configuration # Name the components on this agent agent1.sources = netcat1 agent1.sinks = logger1 agent1.channels = memory1 # Describe/configure the source agent1.sources.netcat1.type = netcat agent1.sources.netcat1.bind = localhost agent1.sources.netcat1.port = 44444 # Describe the sink agent1.sinks.logger1.type = logger # Use a channel which buffers events in memory agent1.channels.memory1.type = memory agent1.channels.memory1.capacity = 1000 agent1.channels.memory1.transactionCapacity = 100 # Bind the source and sink to the channel agent1.sources.netcat1.channels = memory1 agent1.sinks.logger1.channel = memory1
- netcat1: netcat1 source defines how flume is listening to messages. In this case type of netcat means it will listen on port that you can connect to using either netcat or telnet
- memory: memory channel defines how flume stores messages that it has received before they are consumed by sink. In this case i am saying keep the messages in memory
- logger1: Logger sink is for testing, it just prints the messages on console
-
Once your configuration file is ready you can start a flume agent by executing following command
YOu will see flume printing messages on the console while it is starting like thisflume-ng agent --conf conf --conf-file sampleflume.properties --name agent1 -Dflume.root.logger=DEBUG,console
-
Once server is started you can connect to it using nc or telnet and send messages to it like this. Whatever messages you send will be printed to console
- Once you send messages using nc command look at the server console and you should see the messages that you sent
Configuring Flume to write avro events into HDFS
Recently i wanted to figure out how to configure Flume so that it can listen for Avro Events and whenever it gets event it should dump it in the HDFS. In order to do that i built this simple Flume configuration
# example.conf: A single-node Flume configuration
# Name the components on this agent
agent1.sources = avro
agent1.sinks = logger1
agent1.channels = memory1
# Describe/configure the source
agent1.sources.avro.type = avro
agent1.sources.avro.bind = localhost
agent1.sources.avro.port = 41414
agent1.sources.avro.selector.type = replicating
agent1.sources.avro.channels = memory1
# Describe the sink
agent1.sinks.hdfs1.type = hdfs
agent1.sinks.hdfs1.hdfs.path=/tmp/flume/events
agent1.sinks.hdfs1.hdfs.rollInterval=60
#The number of events to be written into a file before it is rolled.
agent1.sinks.hdfs1.hdfs.rollSize=0
agent1.sinks.hdfs1.hdfs.batchSize=100
agent1.sinks.hdfs1.hdfs.serializer=org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
agent1.sinks.hdfs1.hdfs.fileType = DataStream
agent1.sinks = hdfs1
agent1.sinks.hdfs1.channel = memory1
# Use a channel which buffers events in memory
agent1.channels.memory1.type = memory
agent1.channels.memory1.capacity = 1000
agent1.channels.memory1.transactionCapacity = 100
In this i have Avro source listening on local machine at port 41414
, once it gets event it writes that in HDFS in /tmp/flume/events
directory
Once this file is saved in local machine as hellohdfsavro.conf i can start the flume agent using following command
flume-ng agent --conf conf --conf-file hellohdfsavro.conf --name agent1 -Dflume.root.logger=DEBUG,console
Subscribe to:
Posts (Atom)