Request/Reply messaging using QueueSender

I wanted to try out this Request/Reply messaging so i developed this simple MessagePublisher class, which sends a message to Queue using QueueSender and then block control for the response to come back.

package com.webspherenotes.jms;

import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueRequestor;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;

public class MessagePublisher {

  public static void main(String[] argv){
    try {
      InitialContext context = new InitialContext();
      
      QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");
      
      QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
      QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
      
      Queue queue = (Queue)context.lookup("LoanRequestQ");
      queueConnection.start();
      
      QueueSender queueSender = queueSession.createSender(queue);
      QueueRequestor queueRequestor = new QueueRequestor(queueSession, queue);
      
      TextMessage textMessage = queueSession.createTextMessage();
      textMessage.setText("This is sample message");
      //queueSender.send(textMessage);
      System.out.println("Before calling queueSender.request()");

      TextMessage responseMessage = (TextMessage)queueRequestor.request(textMessage);
      System.out.println("After calling queueSender.request()");

      System.out.println("Response message is " + responseMessage.getText());
      
      queueConnection.close();
    } catch (Exception e) {
      // TODO Auto-generated catch block
      e.printStackTrace(System.out);
    }
    
  }
}
This is how the MessageConsumer.java looks like it implements the MessageListener interface to receive the message asynchronously. In the onMessage() method, after reading the Message it creates a QueueSender to the temporary queue for sending the message and sends response to that queue.

package com.webspherenotes.jms;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class MessageConsumer implements MessageListener {
  QueueConnection queueConnection;
  QueueSession queueSession;

  public MessageConsumer() {
    try {
      InitialContext context = new InitialContext();
      QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) context
          .lookup("QueueCF");
      queueConnection = queueConnectionFactory.createQueueConnection();
      queueSession = queueConnection.createQueueSession(false,
          Session.AUTO_ACKNOWLEDGE);
      Queue queue = (Queue) context.lookup("LoanRequestQ");
      QueueReceiver queueReceiver = queueSession.createReceiver(queue);
      queueReceiver.setMessageListener(this);
      queueConnection.start();
    } catch (NamingException e) {
      e.printStackTrace();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }

  @Override
  public void onMessage(Message message) {
    try {
      TextMessage textMessage = (TextMessage) message;
      System.out.println("Inside MessageConsumer.onMessage "
          + textMessage.getText());
      Queue replyTo = (Queue) textMessage.getJMSReplyTo();

      QueueSender replyToSender = queueSession.createSender(replyTo);
      System.out.println("Value of JMSReplyTo "
          + textMessage.getJMSReplyTo());
      TextMessage replyMessage = queueSession.createTextMessage();
      System.out.println("After creating replyMessage");
      replyMessage.setText("This is reply for " + textMessage.getText());
      System.out.println("Before calling send()");
      replyToSender.send(replyMessage);
      System.out.println("After calling send()");

    } catch (JMSException e) {
      e.printStackTrace(System.out);
    }
  }

  public static void main(String[] argv) {
    try {
      MessageConsumer messageConsumer = new MessageConsumer();
      BufferedReader stdin = new BufferedReader(new InputStreamReader(
          System.in));
      System.out.println("Press enter to quit application");
      stdin.readLine();
      messageConsumer.queueConnection.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

}

No comments: