In the How to either publish or consume message using STOMP in ActiveMQ entry i talked about how to use STOMP protocol for both publishing and consuming messages. I wanted to check if i can mix JMS + STOMP together, so i changed the sample application to use JMS API for publishing a message and use the STOMP for consuming messages, this works because no matter which API you use for communicating with the broker the message gets stored in the same messaging provider same this while consuming messages. You can download the sample code from here First configure your ActiveMQ so that it supports both JMS and STOMP connectors like this

<transportConnectors>
 <transportConnector name="openwire" uri="tcp://localhost:61616?trace=true"/>
 <transportConnector name="stomp" uri="stomp://localhost:61613?trace=true"/>
</transportConnectors>
Then create a jndi.properties file that defines the ConnectionFactory and Queue in the JNDI context for your application like this

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
java.naming.security.principal=system
java.naming.security.credentials=manager
connectionFactoryNames = QueueCF
queue.stomptest=stomptest
This jndi.properties file makes sure that the objects of ConnectionFactory and Queue and bound in the JNDI context so that your java code remains JMS compliant with no reference to ActiveMQ. Then create OpenWireMessagePublisher.java like this

package com.webspherenotes.stomp;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class OpenWireMessagePublisher {
  public static void main(String[] argv){
    try {
      InitialContext context = new InitialContext();
      QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");
      QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
      QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
      Queue queue = (Queue)context.lookup("stomptest");
      TextMessage textMessage = queueSession.createTextMessage();
      textMessage.setText("This is sample message for stomp queue");
      QueueSender queueSender = queueSession.createSender(queue);
      queueSender.send(textMessage);
      queueConnection.close();
    } catch (NamingException e) {
      e.printStackTrace();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
}
The OpenWireMessagePublisher.java is using JMS API to publish a message to stomptest queue. Now lets create a StompMessageConsumer.java like this

package com.webspherenotes.stomp;
import java.io.IOException;
import java.net.UnknownHostException;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;
import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;

public class StompMessageConsumer {
  public static void main(String[] args) {
    try {
      StompConnection connection = new StompConnection();
      connection.open("localhost", 61613);
      connection.connect("system", "manager");
      connection.subscribe("/queue/stomptest", Subscribe.AckModeValues.CLIENT);
      connection.begin("tx2");
      StompFrame message = connection.receive();
      System.out.println(message.getBody());
      connection.ack(message, "tx2");
      connection.commit("tx2");
      connection.disconnect();
    } catch (UnknownHostException e) {
      e.printStackTrace();
    } catch (IOException e) {
      e.printStackTrace();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
The StompConsumer code does not care about if the message was published using JMS or STOMP it remains same.

No comments:

Post a Comment