Configure Flume to use IBM MQ as JMS Source

Recently i had a requirement in which i wanted to figure out how to read XML documents stored as message in IBM MQ and post them into Hadoop. I decided to use Apache Flume + Flume JMS Source + Flume HDFS Sink for this. I had to use following steps for this setup. Please note that i am not WebSphere MQ expert so there might be a better/easier way to achieve this.
  1. First i had to install WebSphere MQ Client on my windows machine
  2. Next i did create a simple jms.config like this in c:\temp folder of my windows box
    
    INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactory
    PROVIDER_URL=file:/C:/temp/jmsbinding
    
  3. Next step is to run JMSAdmin.bat c:\temp\jms.config, it opens up a console like this, type following command in it and change it to use the right configuration that you need
    
    DEF CF(myConnectionFactory) QMGR(myQueueManager) HOSTNAME(myHostName) PORT(1426) CHANNEL(myChannelName) TRANSPORT(CLIENT)
    
    Once you execute this command it will generate .bindings file in C:/temp/jmsbinding (Folder that is configured as value of PROVIDER_URL)
  4. Next step for me was to copy the C:/temp/jmsbinding/.bindings folder to /etc/flume/conf folder in my linux box which has Flume running on it.
  5. In addition to bindings file i also need the MQ client jar files. I started by copying jms.jar from C:\Program Files (x86)\IBM\WebSphere MQ\java\lib to /usr/hdp/current/flume-server/lib/ folder in my Hadoop installation, but i kept getting ClassNotFoundException and to deal with that i copied more and more jars from my MQ Client into Flume
    
    jms.jar
    fscontext.jar
    jndi.jar
    providerutil.jar
    com.ibm.mq.jar
    com.ibm.mqjms.jar
    com.ibm.mq.pcf.jar
    connector.jar
    dhbcore.jar
    com.ibm.mq.jmqi.jar
    com.ibm.mq.headers.jar
    
  6. Once the Flume MQ setup was in place, last step was to create Flume Configuration that points to your bindings file and also points to your MQ server like this
    
    # Flume agent config
    #st the sources, channels, and sinks for the agent
    ggflume.sources = jms
    ggflume.channels = memory
    ggflume.sinks = hadoop
    
    ggflume.sources.jms.channels=memory
    ggflume.sinks.hadoop.channel=memory
    
    ggflume.sources.jms.type = jms
    ggflume.sources.jms.providerURL = file:///etc/flume/conf
    ggflume.sources.jms.initialContextFactory = com.sun.jndi.fscontext.RefFSContextFactory
    ggflume.sources.jms.destinationType=QUEUE
    ggflume.sources.jms.destinationName=<channelName>
    ggflume.sources.jms.connectionFactory=myConnectionFactory
    ggflume.sources.jms.batchSize=1
    
    ggflume.channels.memory.type = memory
    ggflume.channels.memory.capacity = 1000
    ggflume.channels.memory.transactionCapacity = 100
    
    ggflume.sinks.hadoop.type=hdfs
    ggflume.sinks.hadoop.hdfs.path=/data/mq/xml
    ggflume.sinks.hadoop.hdfs.filePrefix=sample
    
    
  7. Now start flume server by executing following flume command flume-ng agent --conf conf --conf-file mqflume.conf --name ggflume -Dflume.root.logger=DEBUG,console
Now you should see the existing messages from MQ being dumped into HDFS

3 comments:

Anonymous said...

in this instance - is the flume agent pulling data from a local jms queue?

Srinivas said...

Hi Sunil,

I have a question on connectionFactory configuration for jms source , you given it as myConnectionFactory can you please elaborate it once

saket said...

Hi Sunil,

We tried following the steps mentioned by you in unix env. But getting error while running the command using JSADMIN tool.

While troubleshooting we tried the below scenarios.

1.Running the all the commands in one go then it is throwing unable to bind error.
2.When trying to run the one command like Transport(CLIENT) throwing error as unknown command.

pls help.