Configuring Flume to write avro events into HDFS

Recently i wanted to figure out how to configure Flume so that it can listen for Avro Events and whenever it gets event it should dump it in the HDFS. In order to do that i built this simple Flume configuration

# example.conf: A single-node Flume configuration

# Name the components on this agent
agent1.sources = avro
agent1.sinks = logger1
agent1.channels = memory1

# Describe/configure the source
agent1.sources.avro.type = avro
agent1.sources.avro.bind = localhost
agent1.sources.avro.port = 41414
agent1.sources.avro.selector.type = replicating
agent1.sources.avro.channels = memory1

# Describe the sink
agent1.sinks.hdfs1.type = hdfs
agent1.sinks.hdfs1.hdfs.path=/tmp/flume/events
agent1.sinks.hdfs1.hdfs.rollInterval=60
#The number of events to be written into a file before it is rolled.
agent1.sinks.hdfs1.hdfs.rollSize=0
agent1.sinks.hdfs1.hdfs.batchSize=100
agent1.sinks.hdfs1.hdfs.serializer=org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
agent1.sinks.hdfs1.hdfs.fileType = DataStream
agent1.sinks = hdfs1
agent1.sinks.hdfs1.channel = memory1

# Use a channel which buffers events in memory
agent1.channels.memory1.type = memory
agent1.channels.memory1.capacity = 1000
agent1.channels.memory1.transactionCapacity = 100
In this i have Avro source listening on local machine at port 41414, once it gets event it writes that in HDFS in /tmp/flume/events directory Once this file is saved in local machine as hellohdfsavro.conf i can start the flume agent using following command

flume-ng agent --conf conf --conf-file hellohdfsavro.conf  --name agent1 -Dflume.root.logger=DEBUG,console

Configure Flume to use IBM MQ as JMS Source

Recently i had a requirement in which i wanted to figure out how to read XML documents stored as message in IBM MQ and post them into Hadoop. I decided to use Apache Flume + Flume JMS Source + Flume HDFS Sink for this. I had to use following steps for this setup. Please note that i am not WebSphere MQ expert so there might be a better/easier way to achieve this.
  1. First i had to install WebSphere MQ Client on my windows machine
  2. Next i did create a simple jms.config like this in c:\temp folder of my windows box
    
    INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactory
    PROVIDER_URL=file:/C:/temp/jmsbinding
    
  3. Next step is to run JMSAdmin.bat c:\temp\jms.config, it opens up a console like this, type following command in it and change it to use the right configuration that you need
    
    DEF CF(myConnectionFactory) QMGR(myQueueManager) HOSTNAME(myHostName) PORT(1426) CHANNEL(myChannelName) TRANSPORT(CLIENT)
    
    Once you execute this command it will generate .bindings file in C:/temp/jmsbinding (Folder that is configured as value of PROVIDER_URL)
  4. Next step for me was to copy the C:/temp/jmsbinding/.bindings folder to /etc/flume/conf folder in my linux box which has Flume running on it.
  5. In addition to bindings file i also need the MQ client jar files. I started by copying jms.jar from C:\Program Files (x86)\IBM\WebSphere MQ\java\lib to /usr/hdp/current/flume-server/lib/ folder in my Hadoop installation, but i kept getting ClassNotFoundException and to deal with that i copied more and more jars from my MQ Client into Flume
    
    jms.jar
    fscontext.jar
    jndi.jar
    providerutil.jar
    com.ibm.mq.jar
    com.ibm.mqjms.jar
    com.ibm.mq.pcf.jar
    connector.jar
    dhbcore.jar
    com.ibm.mq.jmqi.jar
    com.ibm.mq.headers.jar
    
  6. Once the Flume MQ setup was in place, last step was to create Flume Configuration that points to your bindings file and also points to your MQ server like this
    
    # Flume agent config
    #st the sources, channels, and sinks for the agent
    ggflume.sources = jms
    ggflume.channels = memory
    ggflume.sinks = hadoop
    
    ggflume.sources.jms.channels=memory
    ggflume.sinks.hadoop.channel=memory
    
    ggflume.sources.jms.type = jms
    ggflume.sources.jms.providerURL = file:///etc/flume/conf
    ggflume.sources.jms.initialContextFactory = com.sun.jndi.fscontext.RefFSContextFactory
    ggflume.sources.jms.destinationType=QUEUE
    ggflume.sources.jms.destinationName=<channelName>
    ggflume.sources.jms.connectionFactory=myConnectionFactory
    ggflume.sources.jms.batchSize=1
    
    ggflume.channels.memory.type = memory
    ggflume.channels.memory.capacity = 1000
    ggflume.channels.memory.transactionCapacity = 100
    
    ggflume.sinks.hadoop.type=hdfs
    ggflume.sinks.hadoop.hdfs.path=/data/mq/xml
    ggflume.sinks.hadoop.hdfs.filePrefix=sample
    
    
  7. Now start flume server by executing following flume command flume-ng agent --conf conf --conf-file mqflume.conf --name ggflume -Dflume.root.logger=DEBUG,console
Now you should see the existing messages from MQ being dumped into HDFS