When developing a messaging application for ActiveMQ, say for standalone client, normally we create a jndi.properties file like this in the source folder.
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
java.naming.security.principal=system
java.naming.security.credentials=manager
connectionFactoryNames = QueueCF
queue.ActiveMQ = jms.ActiveMQ
After that we can look up both
QueueCF
and
ActiveMQ
from inside the code by looking them up in the
InitialContext
like this
InitialContext context = new InitialContext();
QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");
QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue)context.lookup("ActiveMQ");
Using the jndi.properties option makes your code JMS compliant, but you will have to define the queues and connection factories before hand, with ActiveMQ we have another option which is to use
ActiveMQConnectionFactory
like this, in this example first we create object of
ActiveMQConnectionFactory
by passing URL to the broker then we use
session.createQueue("ActiveMQ")
to create the Queue, It allows us to create
Queues
dynamically at run time.
package com.webspherenotes.jms;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class HelloActiveMQPublisher {
public static void main(String[] args) throws Exception{
String brokerURL = "tcp://localhost:61616";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("ActiveMQ");
MessageProducer messageProducer = session.createProducer(queue);
TextMessage textMessage = session.createTextMessage();
textMessage.setText("This is a message for dynamically create message q");
messageProducer.send(textMessage);
connection.close();
}
}
package com.webspherenotes.jms;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class HelloActiveMQConsumer implements MessageListener{
public static void main(String[] args) throws Exception{
String brokerURL = "tcp://localhost:61616";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("ActiveMQ");
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new HelloActiveMQConsumer());
BufferedReader stdin = new BufferedReader(new InputStreamReader(
System.in));
System.out.println("Press enter to quit application");
stdin.readLine();
connection.close();
}
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage =(TextMessage)message;
System.out.println("Thie message is " + textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Thank you so much for the clear explanation.
ReplyDeletePlease could you advise any read-up / guidance on how to set timeout on consumer and producer in ActiveMQ. Please refer stackoverflow link for code sample - http://stackoverflow.com/questions/40217028/activemq-transport-tcp-thread-runnable-state-too-many-threads-hanging
ReplyDelete