Using ActiveMQConnectionFactory for creating connection factory

When developing a messaging application for ActiveMQ, say for standalone client, normally we create a jndi.properties file like this in the source folder.

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
java.naming.security.principal=system
java.naming.security.credentials=manager
connectionFactoryNames = QueueCF
queue.ActiveMQ = jms.ActiveMQ
After that we can look up both QueueCF and ActiveMQ from inside the code by looking them up in the InitialContext like this

InitialContext context = new InitialContext();
  
QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");

QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

Queue queue = (Queue)context.lookup("ActiveMQ");

Using the jndi.properties option makes your code JMS compliant, but you will have to define the queues and connection factories before hand, with ActiveMQ we have another option which is to use ActiveMQConnectionFactory like this, in this example first we create object of ActiveMQConnectionFactory by passing URL to the broker then we use session.createQueue("ActiveMQ") to create the Queue, It allows us to create Queues dynamically at run time.

package com.webspherenotes.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class HelloActiveMQPublisher {
  public static void main(String[] args) throws Exception{

      String brokerURL = "tcp://localhost:61616";
      ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);

      Connection connection = connectionFactory.createConnection();
      connection.start();
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      Queue queue = session.createQueue("ActiveMQ");
      MessageProducer messageProducer = session.createProducer(queue);
      TextMessage textMessage = session.createTextMessage();
      textMessage.setText("This is a message for dynamically create message q");
      messageProducer.send(textMessage);
      connection.close();
  }
}

package com.webspherenotes.jms;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class HelloActiveMQConsumer implements MessageListener{

  public static void main(String[] args) throws Exception{
    String brokerURL = "tcp://localhost:61616";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    Connection connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Queue queue = session.createQueue("ActiveMQ");
    MessageConsumer messageConsumer = session.createConsumer(queue);
    messageConsumer.setMessageListener(new HelloActiveMQConsumer());
    BufferedReader stdin = new BufferedReader(new InputStreamReader(
          System.in));
    System.out.println("Press enter to quit application");
    stdin.readLine();
    connection.close();
  }
  @Override
  public void onMessage(Message message) {
    try {
      TextMessage textMessage =(TextMessage)message;
      System.out.println("Thie message is " + textMessage.getText());
    } catch (JMSException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}

2 comments:

Anonymous said...

Thank you so much for the clear explanation.

Anonymous said...

Please could you advise any read-up / guidance on how to set timeout on consumer and producer in ActiveMQ. Please refer stackoverflow link for code sample - http://stackoverflow.com/questions/40217028/activemq-transport-tcp-thread-runnable-state-too-many-threads-hanging