Configure Flume to use IBM MQ as JMS Source

Recently i had a requirement in which i wanted to figure out how to read XML documents stored as message in IBM MQ and post them into Hadoop. I decided to use Apache Flume + Flume JMS Source + Flume HDFS Sink for this. I had to use following steps for this setup. Please note that i am not WebSphere MQ expert so there might be a better/easier way to achieve this.
  1. First i had to install WebSphere MQ Client on my windows machine
  2. Next i did create a simple jms.config like this in c:\temp folder of my windows box
    
    INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactory
    PROVIDER_URL=file:/C:/temp/jmsbinding
    
  3. Next step is to run JMSAdmin.bat c:\temp\jms.config, it opens up a console like this, type following command in it and change it to use the right configuration that you need
    
    DEF CF(myConnectionFactory) QMGR(myQueueManager) HOSTNAME(myHostName) PORT(1426) CHANNEL(myChannelName) TRANSPORT(CLIENT)
    
    Once you execute this command it will generate .bindings file in C:/temp/jmsbinding (Folder that is configured as value of PROVIDER_URL)
  4. Next step for me was to copy the C:/temp/jmsbinding/.bindings folder to /etc/flume/conf folder in my linux box which has Flume running on it.
  5. In addition to bindings file i also need the MQ client jar files. I started by copying jms.jar from C:\Program Files (x86)\IBM\WebSphere MQ\java\lib to /usr/hdp/current/flume-server/lib/ folder in my Hadoop installation, but i kept getting ClassNotFoundException and to deal with that i copied more and more jars from my MQ Client into Flume
    
    jms.jar
    fscontext.jar
    jndi.jar
    providerutil.jar
    com.ibm.mq.jar
    com.ibm.mqjms.jar
    com.ibm.mq.pcf.jar
    connector.jar
    dhbcore.jar
    com.ibm.mq.jmqi.jar
    com.ibm.mq.headers.jar
    
  6. Once the Flume MQ setup was in place, last step was to create Flume Configuration that points to your bindings file and also points to your MQ server like this
    
    # Flume agent config
    #st the sources, channels, and sinks for the agent
    ggflume.sources = jms
    ggflume.channels = memory
    ggflume.sinks = hadoop
    
    ggflume.sources.jms.channels=memory
    ggflume.sinks.hadoop.channel=memory
    
    ggflume.sources.jms.type = jms
    ggflume.sources.jms.providerURL = file:///etc/flume/conf
    ggflume.sources.jms.initialContextFactory = com.sun.jndi.fscontext.RefFSContextFactory
    ggflume.sources.jms.destinationType=QUEUE
    ggflume.sources.jms.destinationName=<channelName>
    ggflume.sources.jms.connectionFactory=myConnectionFactory
    ggflume.sources.jms.batchSize=1
    
    ggflume.channels.memory.type = memory
    ggflume.channels.memory.capacity = 1000
    ggflume.channels.memory.transactionCapacity = 100
    
    ggflume.sinks.hadoop.type=hdfs
    ggflume.sinks.hadoop.hdfs.path=/data/mq/xml
    ggflume.sinks.hadoop.hdfs.filePrefix=sample
    
    
  7. Now start flume server by executing following flume command flume-ng agent --conf conf --conf-file mqflume.conf --name ggflume -Dflume.root.logger=DEBUG,console
Now you should see the existing messages from MQ being dumped into HDFS

7 comments:

  1. in this instance - is the flume agent pulling data from a local jms queue?

    ReplyDelete
  2. Hi Sunil,

    I have a question on connectionFactory configuration for jms source , you given it as myConnectionFactory can you please elaborate it once

    ReplyDelete
  3. Hi Sunil,

    We tried following the steps mentioned by you in unix env. But getting error while running the command using JSADMIN tool.

    While troubleshooting we tried the below scenarios.

    1.Running the all the commands in one go then it is throwing unable to bind error.
    2.When trying to run the one command like Transport(CLIENT) throwing error as unknown command.

    pls help.

    ReplyDelete
  4. It is nice blog Thank you provide important information and I am searching for same information to save my time Big data Hadoop online Training

    ReplyDelete
  5. I want to transfer a file using JMS MQ and get the same file in flume hdfs sink, please help on the configuration

    ReplyDelete
  6. SEO or search engine optimization is the action of enhancing or optimizing a site so that it can achieve better ranking on various search engines. In short, it is all about making a site better so that it is more easily found online. A site is expected to appear more times on the first pages of Google; it has a better chance of acquiring the most traffic. There is a great likelihood of increased sales when a site gets more visitors. Countless SEO services in delhi are available, and Webgross can be the most trusted option to go for.

    ReplyDelete

  7. Thanks a lot, for share this awesome blog keep sharing this with us this great article. Please keey sharing such valuable information with us.

    Downloadgram
    Increase Twitter Followers
    CAT Coaching in Kolkata
    Google Trends
    Cast of MTV Hustle

    ReplyDelete