Request/reply messaging using TopicRequestor

In the Request/Reply messaging using QueueSender , i tried out the QueueRequestor approach for request/reply messaging, similarly i wanted to try out the TopicRequestor, so i built this sample code First create a TopicRequestor class and use it for sending message, the control blocks at the topicRequestor.request() method till it gets response, Even if there are more than one consumer the request() method would return on the first response, rest of the responses would be ignored.

package com.webspherenotes.jms;

import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicRequestor;
import javax.jms.TopicSession;
import javax.naming.InitialContext;

public class SampleTopicRequestor {

  /**
   * @param args
   */
  public static void main(String[] args)throws Exception{
    InitialContext context = new InitialContext();
    
    TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory)context.lookup("TopicCF");
    
    TopicConnection topicConnection = topicConnectionFactory.createTopicConnection();
    
    TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    
    Topic topic = (Topic)context.lookup("topic1");
    
    TopicPublisher topicPublisher = topicSession.createPublisher(topic);
    
    TextMessage textMessage = topicSession.createTextMessage();
    textMessage.setText("Hello how are you doing?");
    
    TopicRequestor topicRequestor = new TopicRequestor(topicSession, topic);
    topicConnection.start();
    
    TextMessage replyMessage =(TextMessage) topicRequestor.request(textMessage);
    System.out.println("Topic response " + replyMessage.getText());

    topicConnection.close();
  }

}

On the consumer side the value of the replyTo header would be a temporary topic but i wanted to try out the generic Destination and MessageProducer for sending message.

package com.webspherenotes.jms;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class SampleTopicConsumer implements MessageListener {
  
  TopicConnection topicConnection;
  TopicSession topicSession;

  public SampleTopicConsumer() {
    try {
      InitialContext context = new InitialContext();

      TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) context
          .lookup("TopicCF");

       topicConnection = topicConnectionFactory
          .createTopicConnection();

      topicSession = topicConnection.createTopicSession(false,
          Session.AUTO_ACKNOWLEDGE);

      Topic topic = (Topic) context.lookup("topic1");
      
      TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic);
      
      topicSubscriber.setMessageListener(this);
      topicConnection.start();
    } catch (NamingException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    } catch (JMSException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
  
  

  @Override
  public void onMessage(Message message) {
    try {
      TextMessage textMessage =(TextMessage)message;
      System.out.println("The message is " + textMessage.getText());

      Destination replyToDestination = textMessage.getJMSReplyTo();
      MessageProducer messageProducer = topicSession.createProducer(replyToDestination);
      
      TextMessage replyMessage = topicSession.createTextMessage();
      replyMessage.setText("Reply message to " + textMessage.getText());
      messageProducer.send(replyMessage);
      System.out.println( textMessage.getJMSReplyTo());

    } catch (JMSException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    
  }



  /**
   * @param args
   */
  public static void main(String[] args) throws Exception {
    SampleTopicConsumer sampleTopicConsumer = new SampleTopicConsumer();
    BufferedReader stdin = new BufferedReader(new InputStreamReader(
        System.in));
    System.out.println("Press enter to quit application");
    stdin.readLine();
    sampleTopicConsumer.topicConnection.close();
  }

}