How to either publish or consume message using STOMP in ActiveMQ

The ActiveMQ server supports Simple Text-oriented Messaging Protocol(STOMP) that we can use for communicating between client and server. I wanted to try out this feature so i built a sample application that has two classes one is StompMessagePublisher.java that publishes a message using STOMP and StompMessageConsumer that consumes a message. You can download the sample application from here By default the STOMP connector is disabled in the ActiveMQ so first thing that you would have to do is enable it. First open <activemq_installroot>\conf\activemq.xml file and find the transportConnectors element by default it will have only one transportConnector child element with name equal to openwire add stomp element in it like this

<transportConnectors>
 <transportConnector name="openwire" uri="tcp://localhost:61616?trace=true"/>

 <transportConnector name="stomp" uri="stomp://localhost:61613?trace=true"/>

</transportConnectors>
Then create a StompMessagePublisher.java like this

package com.webspherenotes.stomp;

import java.io.IOException;
import java.net.UnknownHostException;

import org.apache.activemq.transport.stomp.StompConnection;
public class StompMessagePublisher {

  public static void main(String[] args) {
    try {
      StompConnection connection = new StompConnection();
      connection.open("localhost", 61613);
      connection.connect("system", "manager");
       connection.begin("tx1");
      connection.send("/queue/stomptest", "This is test message 1");
      connection.commit("tx1");
      connection.disconnect();
    } catch (UnknownHostException e) {
      e.printStackTrace();
    } catch (IOException e) {
      e.printStackTrace();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
In this class first we are opening a StompConnection with localhost:61613, after establishing connection we are sending message to stomptest queue in a transaction Next create StompMessageConsumer.java class which looks like this

package com.webspherenotes.stomp;

import java.io.IOException;
import java.net.UnknownHostException;

import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;
import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;

public class StompMessageConsumer {

  public static void main(String[] args) {
    try {
      StompConnection connection = new StompConnection();
      connection.open("localhost", 61613);
      connection.connect("system", "manager");
      connection.subscribe("/queue/stomptest", Subscribe.AckModeValues.CLIENT);
      connection.begin("tx2");
      StompFrame message = connection.receive();
      System.out.println(message.getBody());
      connection.ack(message, "tx2");
      connection.commit("tx2");
      connection.disconnect();
    } catch (UnknownHostException e) {
      e.printStackTrace();
    } catch (IOException e) {
      e.printStackTrace();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
In the StompMessageConsumer.java, first i am establishing STOMP connection to the localhost and then subscribing to stomptest queue, after subscribing to the queue we have to call the receive() method on the connection to receive the message from the queue.

4 comments:

  1. java.net.SocketTimeoutException: Read timed out

    ReplyDelete
  2. java.net.SocketTimeoutException: Read timed out

    ReplyDelete
  3. same issue :java.net.SocketTimeoutException: Read timed out

    ReplyDelete
  4. Thank you! This was exactly the example I was looking for!

    ReplyDelete