Showing posts with label flume. Show all posts
Showing posts with label flume. Show all posts

How to use HBase sink with Flume

I wanted to figure out how to use HBase as target for flume, so i created this sample configuration which reads events from netcat and writes them to HBase.
  1. First step is to create test table in HBase with CF1 as column family. Everytime Flume gets a event it will write to HBase in test table in CF1 column family
    
    create 'test','CF1'
    
  2. Create Flume configuration file that looks like this, I am using HBase sink with SimpleHbaseEventSerializer as Event Serializer. Note that i am assuming that this is unsecured cluster (Sandbox), but if you have secured cluster you should follow steps mentioned in Configure a Secure HBase Sink
  3. Start the Flume server with the following command
    
    bin/flume-ng agent --conf conf --conf-file conf/netcat-hbase.properties --name agent1 -Dflume.root.logger=DEBUG,console
    
  4. Now open the netcat client on port 44444 and send some messages to flume
  5. If you query HBase test table, you should see the messages that were published to netcat

Flume to Spark Streaming - Pull model

In this post i will demonstrate how to stream data from flume into Spark using Streaming. When it comes to Streaming data from Flume to Spark you have 2 options.
  1. Push Model: Spark listens on particular port for Avro event and flume connects to that port and publishes event
  2. Pull Model: You use special Spark Sink in flume that keeps collecting published data and Spark pulls that data at certain frequency
I built this simple configuration in which i could send event to flume on netcat, flume would take those events and send them to Spark as well as print to console.
  • First download spark-streaming-flume-sink_2.10-1.6.0.jar and copy it to flume/lib directory
  • Next create flume configuration that looks like this, as you can see, Flume is listening for netcat event on port 44444 and it is taking every event and replicating it to both logger and Spark sink. Spark sink would listen on port 9999 for Spark program to connect
  • This is how your Spark driver will look like. The Spark Flume listener gets event in avro format so you will have to call event.getBody().array() to get the event.
Once your spark and flume agents are started open netcat on port 44444 and send messages, those messages should appear in your Spark Console

Configuring Flume to use Twitter as Source

I wanted to figure out how to Configure Twitter as source for Flume so i tried these steps
  1. First go to Twitter Application Management page and configure application. This should give you consumerKey, consumerSecret, accessToken and accessTokenSecret
  2. Next create twitterflume.properties, that looks like this. You should create source of org.apache.flume.source.twitter.TwitterSource type and use the 4 values you got in the last step to configure access to twitter
    
    agent1.sources = twitter1
    agent1.sinks = logger1
    agent1.channels = memory1
    
    
    agent1.sources.twitter1.type = org.apache.flume.source.twitter.TwitterSource
    agent1.sources.twitter1.consumerKey =<consumerkey>
    agent1.sources.twitter1.consumerSecret =<consumerSecret>
    agent1.sources.twitter1.accessToken =<accessToken>
    agent1.sources.twitter1.accessTokenSecret =<accessTokenSecret>
    agent1.sources.twitter1.keywords = bigdata, hadoop
    agent1.sources.twitter1.maxBatchSize = 10
    agent1.sources.twitter1.maxBatchDurationMillis = 200
    
    
    # Describe the sink
    agent1.sinks.logger1.type = logger
    
    # Use a channel which buffers events in memory
    agent1.channels.memory1.type = memory
    agent1.channels.memory1.capacity = 1000
    agent1.channels.memory1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    agent1.sources.twitter1.channels = memory1
    agent1.sinks.logger1.channel = memory1
    
  3. Now last step is to run the flume agent and you should see twitter messages being dumped to console bin/flume-ng agent --conf conf --conf-file conf/twitterflume.properties --name agent1 -Dflume.root.logger=DEBUG,console
Note: When i tried this in the Hadoop Sandbox i started getting following authentication error, it seems the problem is that if your VM time is in the past then this causes this issue. Ex. when i did execute the date command on my sandbox i got date which was 3 days in the past. So i did restart the VM and after restart when i tried date command it gave me accurate time and the following error went away

[Twitter Stream consumer-1[Establishing connection]] ERROR   
org.apache.flume.source.twitter.TwitterSource (TwitterSource.java:331) -   
Exception while streaming tweets
stream.twitter.com
Relevant discussions can be found on the Internet at:
    http://www.google.co.jp/search?q=d0031b0b or
    http://www.google.co.jp/search?q=1db75522
TwitterException{exceptionCode=[d0031b0b-1db75522 db667dea-99334ae4],    
statusCode=-1, message=null, code=-1, retryAfter=-1, rateLimitStatus=null,   
version=3.0.3}
    at   
twitter4j.internal.http.HttpClientImpl.request(HttpClientImpl.java:192)
    at   
twitter4j.internal.http.HttpClientWrapper.request(HttpClientWrapper.java:61)
    at   
twitter4j.internal.http.HttpClientWrapper.get(HttpClientWrapper.java:89)
    at  
twitter4j.TwitterStreamImpl.getSampleStream(TwitterStreamImpl.java:176)
    at twitter4j.TwitterStreamImpl$4.getStream(TwitterStreamImpl.java:164)
    at  
   twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run
(TwitterStreamImpl.java:462)
Caused by: java.net.UnknownHostException: stream.twitter.com
    at   
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:579)
    at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:637)
    at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
    at sun.net.www.protocol.https.HttpsClient.(HttpsClient.java:264)
    at sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:367)
    at  
   sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.
getNewHttpClient
(AbstractDelegateHttpsURLConnection.java:191)
    at  sun.net.www.protocol.http.HttpURLConnection.plainConnect
(HttpURLConnection.java:933)
    at  
sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect
(AbstractDelegateHttpsURLConnection.java:177)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream
(HttpURLConnection.java:1301)
    at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:468)
    at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode
(HttpsURLConnectionImpl.java:338)
    at twitter4j.internal.http.HttpResponseImpl.    
(HttpResponseImpl.java:34)
    at  
twitter4j.internal.http.HttpClientImpl.request(HttpClientImpl.java:156)

Using Syslog as source in Flume

I wanted to figure out how to use Flume for receiving Syslog message. So i tried 2 different configurations one is using Syslog server on TCP port and other on UDP port. This is the flume configuration for listening on UDP port Copy the flumesyslogudp.properties file in the conf directory of your flume server and use following command to start flume server

bin/flume-ng agent --conf conf --conf-file conf/flumesyslogudp.properties --name agent1 
-Dflume.root.logger=DEBUG,console
Or you can configure flume to listen on TCP port. Only difference is the source type is syslogtcp instead of syslogudp

bin/flume-ng agent --conf conf --conf-file conf/flumesyslogtcp.properties --name agent1 
-Dflume.root.logger=DEBUG,console

Flume Hello World tutotiral

I am using flume for some time now and really like it. This is simple HelloWorld tutorial that i thought would be helpful if you want to get started with Flume. This tutorial will walk you through steps for setting up Flume that listens to messages on port 44444, once it gets message it just prints it out on console, Follow these steps
  1. First create sampleflume.properties file on your machine like this
    
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    agent1.sources = netcat1
    agent1.sinks = logger1
    agent1.channels = memory1
    
    # Describe/configure the source
    agent1.sources.netcat1.type = netcat
    agent1.sources.netcat1.bind = localhost
    agent1.sources.netcat1.port = 44444
    
    # Describe the sink
    agent1.sinks.logger1.type = logger
    
    
    # Use a channel which buffers events in memory
    agent1.channels.memory1.type = memory
    agent1.channels.memory1.capacity = 1000
    agent1.channels.memory1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    agent1.sources.netcat1.channels = memory1
    agent1.sinks.logger1.channel = memory1
    
    Your flume configuration file must have at least 3 elements a source, channel and sink
    • netcat1: netcat1 source defines how flume is listening to messages. In this case type of netcat means it will listen on port that you can connect to using either netcat or telnet
    • memory: memory channel defines how flume stores messages that it has received before they are consumed by sink. In this case i am saying keep the messages in memory
    • logger1: Logger sink is for testing, it just prints the messages on console
  2. Once your configuration file is ready you can start a flume agent by executing following command
    
    flume-ng agent --conf conf --conf-file sampleflume.properties  --name agent1 -Dflume.root.logger=DEBUG,console
    
    YOu will see flume printing messages on the console while it is starting like this
  3. Once server is started you can connect to it using nc or telnet and send messages to it like this. Whatever messages you send will be printed to console
  4. Once you send messages using nc command look at the server console and you should see the messages that you sent

Configuring Flume to write avro events into HDFS

Recently i wanted to figure out how to configure Flume so that it can listen for Avro Events and whenever it gets event it should dump it in the HDFS. In order to do that i built this simple Flume configuration

# example.conf: A single-node Flume configuration

# Name the components on this agent
agent1.sources = avro
agent1.sinks = logger1
agent1.channels = memory1

# Describe/configure the source
agent1.sources.avro.type = avro
agent1.sources.avro.bind = localhost
agent1.sources.avro.port = 41414
agent1.sources.avro.selector.type = replicating
agent1.sources.avro.channels = memory1

# Describe the sink
agent1.sinks.hdfs1.type = hdfs
agent1.sinks.hdfs1.hdfs.path=/tmp/flume/events
agent1.sinks.hdfs1.hdfs.rollInterval=60
#The number of events to be written into a file before it is rolled.
agent1.sinks.hdfs1.hdfs.rollSize=0
agent1.sinks.hdfs1.hdfs.batchSize=100
agent1.sinks.hdfs1.hdfs.serializer=org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
agent1.sinks.hdfs1.hdfs.fileType = DataStream
agent1.sinks = hdfs1
agent1.sinks.hdfs1.channel = memory1

# Use a channel which buffers events in memory
agent1.channels.memory1.type = memory
agent1.channels.memory1.capacity = 1000
agent1.channels.memory1.transactionCapacity = 100
In this i have Avro source listening on local machine at port 41414, once it gets event it writes that in HDFS in /tmp/flume/events directory Once this file is saved in local machine as hellohdfsavro.conf i can start the flume agent using following command

flume-ng agent --conf conf --conf-file hellohdfsavro.conf  --name agent1 -Dflume.root.logger=DEBUG,console