I wanted to try out this Request/Reply messaging so i developed this simple MessagePublisher class, which sends a message to Queue using QueueSender and then block control for the response to come back.
package com.webspherenotes.jms;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueRequestor;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
public class MessagePublisher {
public static void main(String[] argv){
try {
InitialContext context = new InitialContext();
QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");
QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue)context.lookup("LoanRequestQ");
queueConnection.start();
QueueSender queueSender = queueSession.createSender(queue);
QueueRequestor queueRequestor = new QueueRequestor(queueSession, queue);
TextMessage textMessage = queueSession.createTextMessage();
textMessage.setText("This is sample message");
//queueSender.send(textMessage);
System.out.println("Before calling queueSender.request()");
TextMessage responseMessage = (TextMessage)queueRequestor.request(textMessage);
System.out.println("After calling queueSender.request()");
System.out.println("Response message is " + responseMessage.getText());
queueConnection.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace(System.out);
}
}
}
This is how the MessageConsumer.java looks like it implements the MessageListener interface to receive the message asynchronously. In the onMessage() method, after reading the Message it creates a QueueSender to the temporary queue for sending the message and sends response to that queue.
package com.webspherenotes.jms;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class MessageConsumer implements MessageListener {
QueueConnection queueConnection;
QueueSession queueSession;
public MessageConsumer() {
try {
InitialContext context = new InitialContext();
QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) context
.lookup("QueueCF");
queueConnection = queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue) context.lookup("LoanRequestQ");
QueueReceiver queueReceiver = queueSession.createReceiver(queue);
queueReceiver.setMessageListener(this);
queueConnection.start();
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("Inside MessageConsumer.onMessage "
+ textMessage.getText());
Queue replyTo = (Queue) textMessage.getJMSReplyTo();
QueueSender replyToSender = queueSession.createSender(replyTo);
System.out.println("Value of JMSReplyTo "
+ textMessage.getJMSReplyTo());
TextMessage replyMessage = queueSession.createTextMessage();
System.out.println("After creating replyMessage");
replyMessage.setText("This is reply for " + textMessage.getText());
System.out.println("Before calling send()");
replyToSender.send(replyMessage);
System.out.println("After calling send()");
} catch (JMSException e) {
e.printStackTrace(System.out);
}
}
public static void main(String[] argv) {
try {
MessageConsumer messageConsumer = new MessageConsumer();
BufferedReader stdin = new BufferedReader(new InputStreamReader(
System.in));
System.out.println("Press enter to quit application");
stdin.readLine();
messageConsumer.queueConnection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
No comments:
Post a Comment