Showing posts with label activemq. Show all posts
Showing posts with label activemq. Show all posts
In the How to either publish or consume message using STOMP in ActiveMQ entry i talked about how to use STOMP protocol for both publishing and consuming messages. I wanted to check if i can mix JMS + STOMP together, so i changed the sample application to use JMS API for publishing a message and use the STOMP for consuming messages, this works because no matter which API you use for communicating with the broker the message gets stored in the same messaging provider same this while consuming messages. You can download the sample code from here First configure your ActiveMQ so that it supports both JMS and STOMP connectors like this

<transportConnectors>
 <transportConnector name="openwire" uri="tcp://localhost:61616?trace=true"/>
 <transportConnector name="stomp" uri="stomp://localhost:61613?trace=true"/>
</transportConnectors>
Then create a jndi.properties file that defines the ConnectionFactory and Queue in the JNDI context for your application like this

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
java.naming.security.principal=system
java.naming.security.credentials=manager
connectionFactoryNames = QueueCF
queue.stomptest=stomptest
This jndi.properties file makes sure that the objects of ConnectionFactory and Queue and bound in the JNDI context so that your java code remains JMS compliant with no reference to ActiveMQ. Then create OpenWireMessagePublisher.java like this

package com.webspherenotes.stomp;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class OpenWireMessagePublisher {
  public static void main(String[] argv){
    try {
      InitialContext context = new InitialContext();
      QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");
      QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
      QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
      Queue queue = (Queue)context.lookup("stomptest");
      TextMessage textMessage = queueSession.createTextMessage();
      textMessage.setText("This is sample message for stomp queue");
      QueueSender queueSender = queueSession.createSender(queue);
      queueSender.send(textMessage);
      queueConnection.close();
    } catch (NamingException e) {
      e.printStackTrace();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
}
The OpenWireMessagePublisher.java is using JMS API to publish a message to stomptest queue. Now lets create a StompMessageConsumer.java like this

package com.webspherenotes.stomp;
import java.io.IOException;
import java.net.UnknownHostException;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;
import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;

public class StompMessageConsumer {
  public static void main(String[] args) {
    try {
      StompConnection connection = new StompConnection();
      connection.open("localhost", 61613);
      connection.connect("system", "manager");
      connection.subscribe("/queue/stomptest", Subscribe.AckModeValues.CLIENT);
      connection.begin("tx2");
      StompFrame message = connection.receive();
      System.out.println(message.getBody());
      connection.ack(message, "tx2");
      connection.commit("tx2");
      connection.disconnect();
    } catch (UnknownHostException e) {
      e.printStackTrace();
    } catch (IOException e) {
      e.printStackTrace();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
The StompConsumer code does not care about if the message was published using JMS or STOMP it remains same.

How to either publish or consume message using STOMP in ActiveMQ

The ActiveMQ server supports Simple Text-oriented Messaging Protocol(STOMP) that we can use for communicating between client and server. I wanted to try out this feature so i built a sample application that has two classes one is StompMessagePublisher.java that publishes a message using STOMP and StompMessageConsumer that consumes a message. You can download the sample application from here By default the STOMP connector is disabled in the ActiveMQ so first thing that you would have to do is enable it. First open <activemq_installroot>\conf\activemq.xml file and find the transportConnectors element by default it will have only one transportConnector child element with name equal to openwire add stomp element in it like this

<transportConnectors>
 <transportConnector name="openwire" uri="tcp://localhost:61616?trace=true"/>

 <transportConnector name="stomp" uri="stomp://localhost:61613?trace=true"/>

</transportConnectors>
Then create a StompMessagePublisher.java like this

package com.webspherenotes.stomp;

import java.io.IOException;
import java.net.UnknownHostException;

import org.apache.activemq.transport.stomp.StompConnection;
public class StompMessagePublisher {

  public static void main(String[] args) {
    try {
      StompConnection connection = new StompConnection();
      connection.open("localhost", 61613);
      connection.connect("system", "manager");
       connection.begin("tx1");
      connection.send("/queue/stomptest", "This is test message 1");
      connection.commit("tx1");
      connection.disconnect();
    } catch (UnknownHostException e) {
      e.printStackTrace();
    } catch (IOException e) {
      e.printStackTrace();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
In this class first we are opening a StompConnection with localhost:61613, after establishing connection we are sending message to stomptest queue in a transaction Next create StompMessageConsumer.java class which looks like this

package com.webspherenotes.stomp;

import java.io.IOException;
import java.net.UnknownHostException;

import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;
import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;

public class StompMessageConsumer {

  public static void main(String[] args) {
    try {
      StompConnection connection = new StompConnection();
      connection.open("localhost", 61613);
      connection.connect("system", "manager");
      connection.subscribe("/queue/stomptest", Subscribe.AckModeValues.CLIENT);
      connection.begin("tx2");
      StompFrame message = connection.receive();
      System.out.println(message.getBody());
      connection.ack(message, "tx2");
      connection.commit("tx2");
      connection.disconnect();
    } catch (UnknownHostException e) {
      e.printStackTrace();
    } catch (IOException e) {
      e.printStackTrace();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
In the StompMessageConsumer.java, first i am establishing STOMP connection to the localhost and then subscribing to stomptest queue, after subscribing to the queue we have to call the receive() method on the connection to receive the message from the queue.

How to use ActiveMQ in Jetty

I wanted to figure out how to use Apache ActiveMQ in a web application that is running in Jetty, Also i wanted to use the Maven Jetty Plugin so i built this sample application which contains a , when i make GET request to servelt it takes value of message query parameter and publishes it as a TextMessage to a Queue, you can download the source code for the sample application from here First thing that i did is create a pom.xml file that looks like this

<project xmlns="http://maven.apache.org/POM/4.0.0" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
  http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.webspherenotes.jms</groupId>
  <artifactId>HelloJettyActiveMQ</artifactId>
  <version>1.0</version>
  <packaging>war</packaging>
  <name>HelloJettyActiveMQ</name>
  <description>Sample app to demonstrate how to use 
  ActiveMQ in Jetty</description>
  <dependencies>
    <dependency>
      <groupId>javax.servlet</groupId>
      <artifactId>servlet-api</artifactId>
      <version>2.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-core</artifactId>
      <version>5.5.0</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.5.11</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>3.0.3.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.apache.xbean</groupId>
      <artifactId>xbean-spring</artifactId>
      <version>3.9</version>
    </dependency>
  </dependencies>
  <build>
    <finalName>HelloJettyActiveMQ</finalName>
    <plugins>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.5</source>
          <target>1.5</target>
        </configuration>
      </plugin>
   
      <plugin>
        <groupId>org.mortbay.jetty</groupId>
        <artifactId>jetty-maven-plugin</artifactId>
        <version>7.2.2.v20101205</version>
        <configuration>
          <scanIntervalSeconds>10</scanIntervalSeconds>
          <webAppConfig>
            <jettyEnvXml>${basedir}/src/main/resources/jetty-env.xml</jettyEnvXml>
          </webAppConfig>
        </configuration>
      </plugin>
   
    </plugins>
  </build>
</project>
I am using version 7.2.2 of Jetty server in the jetty-maven-plugin, also note that i configured a jetty-env.xml file which defines the JMS resources in the JNDI context. This is how my jetty-env.xml file looks like

<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Mort Bay Consulting//DTD Configure//EN" 
"http://jetty.mortbay.org/configure.dtd">
<Configure id='jms-webapp-wac' class="org.eclipse.jetty.webapp.WebAppContext">
  <New id="connectionFactory" class="org.eclipse.jetty.plus.jndi.Resource">
    <Arg>
      <Ref id='jms-webapp-wac' />
    </Arg>
    <Arg>jms/ConnectionFactory</Arg>
    <Arg>
      <New class="org.apache.activemq.ActiveMQConnectionFactory">
        <Arg>tcp://localhost:61616</Arg>
      </New>
    </Arg>
  </New>
  <New id="fooQueue" class="org.eclipse.jetty.plus.jndi.Resource">
    <Arg>jms/FooQueue</Arg>
    <Arg>
      <New class="org.apache.activemq.command.ActiveMQQueue">
        <Arg>FOO.QUEUE</Arg>
      </New>
    </Arg>
  </New>
</Configure>
The jetty-env.xml file defines 2 resources on is the ActiveMQConnectionFactory and second is the ActiveMQQueue. After that i did declare the messaging related resources in web.xml, so my web.xml file looks like this

<?xml version="1.0" encoding="UTF-8"?>
<web-app version="2.4" xmlns="http://java.sun.com/xml/ns/j2ee"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee 
         http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd">  
  <display-name>HelloEmbeddedServer</display-name>
  <servlet>
    <servlet-name>MessagePublishingServlet</servlet-name>
    <servlet-class>com.webspherenotes.jms.MessagePublishingServlet</servlet-class>
    <load-on-startup>1</load-on-startup>
  </servlet>

  <servlet-mapping>
    <servlet-name>MessagePublishingServlet</servlet-name>
    <url-pattern>/MessagePublishingServlet/*</url-pattern>
  </servlet-mapping>
  
  
  <resource-ref>
    <description>JMS Connection</description>
    <res-ref-name>jms/ConnectionFactory</res-ref-name>
    <res-type>javax.jms.ConnectionFactory</res-type>
    <res-auth>Container</res-auth>
    <res-sharing-scope>Shareable</res-sharing-scope>
  </resource-ref>
  
  <message-destination-ref>
    <message-destination-ref-name>jms/FooQueue</message-destination-ref-name>
    <message-destination-type>javax.jms.Queue</message-destination-type>
    <message-destination-usage>Produces</message-destination-usage>
    <message-destination-link>jms/FooQueue</message-destination-link>
  </message-destination-ref>
 
 
</web-app>
This is how my MessagePublishingServlet.java looks like

package com.webspherenotes.jms;

import java.io.IOException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessagePublishingServlet extends HttpServlet{
  Logger logger = LoggerFactory.getLogger(MessagePublishingServlet.class);
  Connection connection;
  Queue queue;
  

  @Override
  public void init() throws ServletException {
    logger.debug("Entering MessagePublishingServlet.init()");
    try {
      InitialContext context = new InitialContext();
      ConnectionFactory connectionFactory = (ConnectionFactory)context.lookup("java:comp/env/jms/ConnectionFactory");
      logger.debug("Connection Factory " + connectionFactory);
      connection = connectionFactory.createConnection();
      queue =(Queue) context.lookup("jms/FooQueue");
      logger.debug("After looking up the queue " + queue); 
    } catch (Exception e) {
      logger.error("Error occured in MessagePublishingServlet.init() " + e.getMessage(),e);
    }
    logger.debug("Exiting MessagePublishingServlet.init()");

  }

  @Override
  protected void doGet(HttpServletRequest req, HttpServletResponse resp)
      throws ServletException, IOException {
    logger.debug("Entering MessagePublishingServlet.doGet()");
    resp.setContentType("text/html");
    resp.getWriter().println("Hello from MessagePublishingServlet.doGet()");
    try {

      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      TextMessage textMessage = session.createTextMessage();
      textMessage.setText(req.getParameter("message"));
      MessageProducer queueSender = session.createProducer(queue);
      queueSender.send(textMessage); 
    } catch (JMSException e) {
      logger.error("Error occured in MessagePublishingServlet.doGet() " + e.getMessage(),e);

    }
    logger.debug("Exiting MessagePublishingServlet.doGet()");
  }

}
The init() method looks up the JMS objects from the InitialContext, in the doGet() method i am reading the value of message query parameter and using it to send a TextMessage.

Maven build file(pom.xml) for Spring Active MQ JMS application

In the Using amq namespace for building Spring JMS application for ActiveMQ entry i built a sample Spring Active MQ JMS application, this is the maven pom.xml file for it.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.webspherenotes.jms</groupId>
  <artifactId>HelloSpringActiveMQ</artifactId>
  <version>1.0</version>
  <name>HelloSpringActiveMQ</name>
  <description>Sample Spring ActiveMQ JMS application</description>
  <dependencies>
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-core</artifactId>
      <version>5.5.0</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.5.11</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>3.0.3.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.apache.xbean</groupId>
      <artifactId>xbean-spring</artifactId>
      <version>3.9</version>
    </dependency>
  </dependencies>
</project>
Once my pom.xml is ready i can use following commands to run MessagePublisher.java and MessageReceiver.java
  1. mvn exec:java -Dexec.mainClass=com.webspherenotes.jms.MessagePublisher
  2. mvn exec:java -Dexec.mainClass=com.webspherenotes.jms.MessageReceiver

Using amq namespace for building Spring JMS application for ActiveMQ

Using amq namespace makes developing Spring application for ActiveMQ very easy, i wanted to try that so i built this sample application, This is how my applicationContext.xml file looks like

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
  xmlns:jms="http://www.springframework.org/schema/jms"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xsi:schemaLocation="http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

  <amq:connectionFactory id="connectionFactory"
    brokerURL="tcp://localhost:61616" />

  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestinationName" value="queue1" />
  </bean>

</beans>
As you can see i have only two beans one for ConnectionFactory and other for JmsTemplate. This is how my MessagePublisher.java looks like

package com.webspherenotes.jms;

import java.util.Date;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class MessagePublisher {
  public static void main(String[] args)throws Exception {
    ApplicationContext context = 
 new ClassPathXmlApplicationContext("applicationContext.xml");
    JmsTemplate jmsTemplate =(JmsTemplate) context.getBean("jmsTemplate");
    MessageCreator message = new MessageCreator() {
      public Message createMessage(Session session) throws JMSException {
        TextMessage textMessage = session.createTextMessage();
        String messageStr = "This message is sent using MessageCreator" + new Date();
        textMessage.setText(messageStr);
        return textMessage;
      }
    };
    jmsTemplate.send(message);
  }
}
This is how my MessageReceiver class looks like

package com.webspherenotes.jms;

import javax.jms.TextMessage;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;

public class MessageReceiver {
  public static void main(String[] args)throws Exception {
    ApplicationContext context = 
    new ClassPathXmlApplicationContext("applicationContext.xml");
    JmsTemplate jmsTemplate =(JmsTemplate) context.getBean("jmsTemplate");
    TextMessage message = (TextMessage)jmsTemplate.receive();
    System.out.println("Message received " + message.getText());
  }
}
This is sample of how to wait for message synchronously.

Embed ActiveMQ using Java Code

You can use embedded ActiveMQ which means you can create object of BrokerService and then use java code to configure it instead of regular approach of using activemq.xml file, i wanted to try that so i did create this sample application which you can download from here First create HelloEmbeddedBrokerService class like this, in this class i am just creating object of BrokerService, after that you can call methods to configure it and once your done call brokerService.start() to start the broker.

package com.webspherenotes.jms;

import org.apache.activemq.broker.BrokerService;

public class HelloEmbeddedBrokerService {

  public static void main(String[] args) {
    try {

      BrokerService brokerService = new BrokerService();
      brokerService.addConnector("tcp://localhost:61616");
      brokerService.start();

    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
Then use the jndi.properties file like this to configure JNDI context for the JMS client code

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
java.naming.security.principal=system
java.naming.security.credentials=manager
connectionFactoryNames = QueueCF
queue.SampleQ = jms.SampleQ
This is how the code for the message publisher would look like, as you can see your client code does not care how the message broker is started.

package com.webspherenotes.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;

import org.apache.activemq.ActiveMQConnectionFactory;

public class HelloActiveMQPublisher {

  /**
   * @param args
   */
  public static void main(String[] args) throws Exception{
    InitialContext context = new InitialContext();
    ConnectionFactory connectionFactory = (ConnectionFactory)context.lookup("QueueCF");
    
    Connection connection = connectionFactory.createConnection();
      connection.start();
      
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      
      Queue queue = (Queue)context.lookup("SampleQ");
      
      MessageProducer messageProducer = session.createProducer(queue);
      
      TextMessage textMessage = session.createTextMessage();
      textMessage.setText("Lets see if i can send messages using Embedded Broker");
      
      messageProducer.send(textMessage);

      connection.close();
  }

}

Using MessageAuthorizationPolicy in ActiveMQ

The ActiveMQ broker allows message level security, that means you can ask ActiveMQ to call your business logic before consuming every message, and as a result get control on which message can be consumed by which MessageListener. I wanted to try that, so i created this SimpleMessagePolicy.java class which checks if the message body contains Secrete message for clientId com.webspherenotes.secret text, if yes it checks if the consumers's caller Id is com.webspherenotes.secret if yes then only it will allow the consumer to consume message. You can download the sample code for SampleMessagePolicy from here First i had to create SampleMessagePolicy class that implements MessageAuthorizationPolicy interface.The isAllowedToConsume() method of your class gets before a consumer is trying to consume every message. Create this class in separate java project, compile that project and copy the .jar file in the activemq/lib directory

package com.webspherenotes.jms;

import javax.jms.JMSException;
import javax.jms.TextMessage;

import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.security.MessageAuthorizationPolicy;

public class SampleMessagePolicy implements MessageAuthorizationPolicy {

  @Override
  public boolean isAllowedToConsume(ConnectionContext messageContext,
      Message message) {
    try {
      System.out
          .println("Inside SampleMessagePolicy.isAllowedToConsume() ");
      System.out.println("Client Id " + messageContext.getClientId());
      if (message instanceof TextMessage) {
        TextMessage textMessage = (TextMessage) message;
        System.out.println("Text message is " + textMessage.getText());

        String messageStr = textMessage.getText();
        if (messageStr
            .equals("Secrete message for clientId com.webspherenotes.secret")) {
          System.out
              .println("Secret message received check the clientId");
          if (messageContext.getClientId().equals(
              "com.webspherenotes.secret")) {
            System.out
                .println("Got request from com.webspherenotes.secret, for secret message returning message");
            return true;
          } else {
            System.out
                .println("Got request from some other client, for secret message returning, hidding message");
            return false;
          }
        } else {
          System.out
              .println("Non secret message received, returning message");
          return true;
        }
      }

    } catch (JMSException e) {
      e.printStackTrace();
    }

    return true;
  }

}
Next configure the ActiveMQ so that it will use SampleMessagePolicy as MessageAuthorizationPolicy like this

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans 
  http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core 
  http://activemq.apache.org/schema/core/activemq-core.xsd
  http://activemq.apache.org/camel/schema/spring 
  http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">

  <!-- Allows us to use system properties as variables in this configuration file -->
  <bean
    class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />

  <broker xmlns="http://activemq.apache.org/schema/core"
    brokerName="localhost" dataDirectory="${activemq.base}/data">


        <messageAuthorizationPolicy>
            <bean class="com.webspherenotes.jms.SampleMessagePolicy"
                xmlns="http://www.springframework.org/schema/beans" />
        </messageAuthorizationPolicy>

    <!-- The transport connectors ActiveMQ will listen to -->
    <transportConnectors>
      <transportConnector name="openwire"
        uri="tcp://localhost:61616" />
    </transportConnectors>

  </broker>

</beans>
After configuring the ActiveMQ restart the server for your changes to take effect. Then create MessageConsumer.java like this, in this file set the ClientId to com.webspherenotes.secret so that it will receive the secrete message.

package com.webspherenotes.jms;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class MessageConsumer implements MessageListener{

  QueueConnection queueConnection;
  QueueSession queueSession;

  public MessageConsumer() {
    try {
      InitialContext context = new InitialContext();
      QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) context
          .lookup("QueueCF");
      queueConnection = queueConnectionFactory.createQueueConnection("consumer","password");

      queueConnection.setClientID("com.webspherenotes.secret");
      queueSession = queueConnection.createQueueSession(false,
          Session.AUTO_ACKNOWLEDGE);
      Queue queue = (Queue) context.lookup("SampleQ");
      QueueReceiver queueReceiver = queueSession.createReceiver(queue);
      queueReceiver.setMessageListener(this);
      queueConnection.start();
    } catch (NamingException e) {
      e.printStackTrace();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }

  @Override
  public void onMessage(Message message) {
    try {
      TextMessage textMessage = (TextMessage) message;
      System.out.println("Inside MessageConsumer.onMessage "
          + textMessage.getText());
    } catch (JMSException e) {
      e.printStackTrace(System.out);
    }
  }

  public static void main(String[] argv) {
    try {
      MessageConsumer messageConsumer = new MessageConsumer();

      BufferedReader stdin = new BufferedReader(new InputStreamReader(
          System.in));
      System.out.println("Press enter to quit application");
      stdin.readLine();
      messageConsumer.queueConnection.close();

    } catch (Exception e) {
      e.printStackTrace();
    }
  }

}

Using ActiveMQConnectionFactory for creating connection factory

When developing a messaging application for ActiveMQ, say for standalone client, normally we create a jndi.properties file like this in the source folder.

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
java.naming.security.principal=system
java.naming.security.credentials=manager
connectionFactoryNames = QueueCF
queue.ActiveMQ = jms.ActiveMQ
After that we can look up both QueueCF and ActiveMQ from inside the code by looking them up in the InitialContext like this

InitialContext context = new InitialContext();
  
QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");

QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

Queue queue = (Queue)context.lookup("ActiveMQ");

Using the jndi.properties option makes your code JMS compliant, but you will have to define the queues and connection factories before hand, with ActiveMQ we have another option which is to use ActiveMQConnectionFactory like this, in this example first we create object of ActiveMQConnectionFactory by passing URL to the broker then we use session.createQueue("ActiveMQ") to create the Queue, It allows us to create Queues dynamically at run time.

package com.webspherenotes.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class HelloActiveMQPublisher {
  public static void main(String[] args) throws Exception{

      String brokerURL = "tcp://localhost:61616";
      ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);

      Connection connection = connectionFactory.createConnection();
      connection.start();
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      Queue queue = session.createQueue("ActiveMQ");
      MessageProducer messageProducer = session.createProducer(queue);
      TextMessage textMessage = session.createTextMessage();
      textMessage.setText("This is a message for dynamically create message q");
      messageProducer.send(textMessage);
      connection.close();
  }
}

package com.webspherenotes.jms;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class HelloActiveMQConsumer implements MessageListener{

  public static void main(String[] args) throws Exception{
    String brokerURL = "tcp://localhost:61616";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    Connection connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Queue queue = session.createQueue("ActiveMQ");
    MessageConsumer messageConsumer = session.createConsumer(queue);
    messageConsumer.setMessageListener(new HelloActiveMQConsumer());
    BufferedReader stdin = new BufferedReader(new InputStreamReader(
          System.in));
    System.out.println("Press enter to quit application");
    stdin.readLine();
    connection.close();
  }
  @Override
  public void onMessage(Message message) {
    try {
      TextMessage textMessage =(TextMessage)message;
      System.out.println("Thie message is " + textMessage.getText());
    } catch (JMSException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}

Request/reply messaging using TopicRequestor

In the Request/Reply messaging using QueueSender , i tried out the QueueRequestor approach for request/reply messaging, similarly i wanted to try out the TopicRequestor, so i built this sample code First create a TopicRequestor class and use it for sending message, the control blocks at the topicRequestor.request() method till it gets response, Even if there are more than one consumer the request() method would return on the first response, rest of the responses would be ignored.

package com.webspherenotes.jms;

import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicRequestor;
import javax.jms.TopicSession;
import javax.naming.InitialContext;

public class SampleTopicRequestor {

  /**
   * @param args
   */
  public static void main(String[] args)throws Exception{
    InitialContext context = new InitialContext();
    
    TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory)context.lookup("TopicCF");
    
    TopicConnection topicConnection = topicConnectionFactory.createTopicConnection();
    
    TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    
    Topic topic = (Topic)context.lookup("topic1");
    
    TopicPublisher topicPublisher = topicSession.createPublisher(topic);
    
    TextMessage textMessage = topicSession.createTextMessage();
    textMessage.setText("Hello how are you doing?");
    
    TopicRequestor topicRequestor = new TopicRequestor(topicSession, topic);
    topicConnection.start();
    
    TextMessage replyMessage =(TextMessage) topicRequestor.request(textMessage);
    System.out.println("Topic response " + replyMessage.getText());

    topicConnection.close();
  }

}

On the consumer side the value of the replyTo header would be a temporary topic but i wanted to try out the generic Destination and MessageProducer for sending message.

package com.webspherenotes.jms;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class SampleTopicConsumer implements MessageListener {
  
  TopicConnection topicConnection;
  TopicSession topicSession;

  public SampleTopicConsumer() {
    try {
      InitialContext context = new InitialContext();

      TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) context
          .lookup("TopicCF");

       topicConnection = topicConnectionFactory
          .createTopicConnection();

      topicSession = topicConnection.createTopicSession(false,
          Session.AUTO_ACKNOWLEDGE);

      Topic topic = (Topic) context.lookup("topic1");
      
      TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic);
      
      topicSubscriber.setMessageListener(this);
      topicConnection.start();
    } catch (NamingException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    } catch (JMSException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
  
  

  @Override
  public void onMessage(Message message) {
    try {
      TextMessage textMessage =(TextMessage)message;
      System.out.println("The message is " + textMessage.getText());

      Destination replyToDestination = textMessage.getJMSReplyTo();
      MessageProducer messageProducer = topicSession.createProducer(replyToDestination);
      
      TextMessage replyMessage = topicSession.createTextMessage();
      replyMessage.setText("Reply message to " + textMessage.getText());
      messageProducer.send(replyMessage);
      System.out.println( textMessage.getJMSReplyTo());

    } catch (JMSException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    
  }



  /**
   * @param args
   */
  public static void main(String[] args) throws Exception {
    SampleTopicConsumer sampleTopicConsumer = new SampleTopicConsumer();
    BufferedReader stdin = new BufferedReader(new InputStreamReader(
        System.in));
    System.out.println("Press enter to quit application");
    stdin.readLine();
    sampleTopicConsumer.topicConnection.close();
  }

}

Request/Reply messaging using QueueSender

I wanted to try out this Request/Reply messaging so i developed this simple MessagePublisher class, which sends a message to Queue using QueueSender and then block control for the response to come back.

package com.webspherenotes.jms;

import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueRequestor;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;

public class MessagePublisher {

  public static void main(String[] argv){
    try {
      InitialContext context = new InitialContext();
      
      QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");
      
      QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
      QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
      
      Queue queue = (Queue)context.lookup("LoanRequestQ");
      queueConnection.start();
      
      QueueSender queueSender = queueSession.createSender(queue);
      QueueRequestor queueRequestor = new QueueRequestor(queueSession, queue);
      
      TextMessage textMessage = queueSession.createTextMessage();
      textMessage.setText("This is sample message");
      //queueSender.send(textMessage);
      System.out.println("Before calling queueSender.request()");

      TextMessage responseMessage = (TextMessage)queueRequestor.request(textMessage);
      System.out.println("After calling queueSender.request()");

      System.out.println("Response message is " + responseMessage.getText());
      
      queueConnection.close();
    } catch (Exception e) {
      // TODO Auto-generated catch block
      e.printStackTrace(System.out);
    }
    
  }
}
This is how the MessageConsumer.java looks like it implements the MessageListener interface to receive the message asynchronously. In the onMessage() method, after reading the Message it creates a QueueSender to the temporary queue for sending the message and sends response to that queue.

package com.webspherenotes.jms;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class MessageConsumer implements MessageListener {
  QueueConnection queueConnection;
  QueueSession queueSession;

  public MessageConsumer() {
    try {
      InitialContext context = new InitialContext();
      QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) context
          .lookup("QueueCF");
      queueConnection = queueConnectionFactory.createQueueConnection();
      queueSession = queueConnection.createQueueSession(false,
          Session.AUTO_ACKNOWLEDGE);
      Queue queue = (Queue) context.lookup("LoanRequestQ");
      QueueReceiver queueReceiver = queueSession.createReceiver(queue);
      queueReceiver.setMessageListener(this);
      queueConnection.start();
    } catch (NamingException e) {
      e.printStackTrace();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }

  @Override
  public void onMessage(Message message) {
    try {
      TextMessage textMessage = (TextMessage) message;
      System.out.println("Inside MessageConsumer.onMessage "
          + textMessage.getText());
      Queue replyTo = (Queue) textMessage.getJMSReplyTo();

      QueueSender replyToSender = queueSession.createSender(replyTo);
      System.out.println("Value of JMSReplyTo "
          + textMessage.getJMSReplyTo());
      TextMessage replyMessage = queueSession.createTextMessage();
      System.out.println("After creating replyMessage");
      replyMessage.setText("This is reply for " + textMessage.getText());
      System.out.println("Before calling send()");
      replyToSender.send(replyMessage);
      System.out.println("After calling send()");

    } catch (JMSException e) {
      e.printStackTrace(System.out);
    }
  }

  public static void main(String[] argv) {
    try {
      MessageConsumer messageConsumer = new MessageConsumer();
      BufferedReader stdin = new BufferedReader(new InputStreamReader(
          System.in));
      System.out.println("Press enter to quit application");
      stdin.readLine();
      messageConsumer.queueConnection.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

}