Configure Flume to use IBM MQ as JMS Source

Recently i had a requirement in which i wanted to figure out how to read XML documents stored as message in IBM MQ and post them into Hadoop. I decided to use Apache Flume + Flume JMS Source + Flume HDFS Sink for this. I had to use following steps for this setup. Please note that i am not WebSphere MQ expert so there might be a better/easier way to achieve this.
  1. First i had to install WebSphere MQ Client on my windows machine
  2. Next i did create a simple jms.config like this in c:\temp folder of my windows box
    
    INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactory
    PROVIDER_URL=file:/C:/temp/jmsbinding
    
  3. Next step is to run JMSAdmin.bat c:\temp\jms.config, it opens up a console like this, type following command in it and change it to use the right configuration that you need
    
    DEF CF(myConnectionFactory) QMGR(myQueueManager) HOSTNAME(myHostName) PORT(1426) CHANNEL(myChannelName) TRANSPORT(CLIENT)
    
    Once you execute this command it will generate .bindings file in C:/temp/jmsbinding (Folder that is configured as value of PROVIDER_URL)
  4. Next step for me was to copy the C:/temp/jmsbinding/.bindings folder to /etc/flume/conf folder in my linux box which has Flume running on it.
  5. In addition to bindings file i also need the MQ client jar files. I started by copying jms.jar from C:\Program Files (x86)\IBM\WebSphere MQ\java\lib to /usr/hdp/current/flume-server/lib/ folder in my Hadoop installation, but i kept getting ClassNotFoundException and to deal with that i copied more and more jars from my MQ Client into Flume
    
    jms.jar
    fscontext.jar
    jndi.jar
    providerutil.jar
    com.ibm.mq.jar
    com.ibm.mqjms.jar
    com.ibm.mq.pcf.jar
    connector.jar
    dhbcore.jar
    com.ibm.mq.jmqi.jar
    com.ibm.mq.headers.jar
    
  6. Once the Flume MQ setup was in place, last step was to create Flume Configuration that points to your bindings file and also points to your MQ server like this
    
    # Flume agent config
    #st the sources, channels, and sinks for the agent
    ggflume.sources = jms
    ggflume.channels = memory
    ggflume.sinks = hadoop
    
    ggflume.sources.jms.channels=memory
    ggflume.sinks.hadoop.channel=memory
    
    ggflume.sources.jms.type = jms
    ggflume.sources.jms.providerURL = file:///etc/flume/conf
    ggflume.sources.jms.initialContextFactory = com.sun.jndi.fscontext.RefFSContextFactory
    ggflume.sources.jms.destinationType=QUEUE
    ggflume.sources.jms.destinationName=<channelName>
    ggflume.sources.jms.connectionFactory=myConnectionFactory
    ggflume.sources.jms.batchSize=1
    
    ggflume.channels.memory.type = memory
    ggflume.channels.memory.capacity = 1000
    ggflume.channels.memory.transactionCapacity = 100
    
    ggflume.sinks.hadoop.type=hdfs
    ggflume.sinks.hadoop.hdfs.path=/data/mq/xml
    ggflume.sinks.hadoop.hdfs.filePrefix=sample
    
    
  7. Now start flume server by executing following flume command flume-ng agent --conf conf --conf-file mqflume.conf --name ggflume -Dflume.root.logger=DEBUG,console
Now you should see the existing messages from MQ being dumped into HDFS

1 comment:

Anonymous said...

in this instance - is the flume agent pulling data from a local jms queue?