The ActiveMQ broker allows message level security, that means you can ask ActiveMQ to call your business logic before consuming every message, and as a result get control on which message can be consumed by which
MessageListener
. I wanted to try that, so i created this
SimpleMessagePolicy
.java class which checks if the message body contains Secrete message for
clientId
com.webspherenotes.secret
text, if yes it checks if the consumers's caller Id is
com.webspherenotes.secret
if yes then only it will allow the consumer to consume message. You can download the sample code for
SampleMessagePolicy
from
here
First i had to create
SampleMessagePolicy
class that implements
MessageAuthorizationPolicy
interface.The
isAllowedToConsume()
method of your class gets before a consumer is trying to consume every message. Create this class in separate java project, compile that project and copy the .jar file in the activemq/lib directory
package com.webspherenotes.jms;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.security.MessageAuthorizationPolicy;
public class SampleMessagePolicy implements MessageAuthorizationPolicy {
@Override
public boolean isAllowedToConsume(ConnectionContext messageContext,
Message message) {
try {
System.out
.println("Inside SampleMessagePolicy.isAllowedToConsume() ");
System.out.println("Client Id " + messageContext.getClientId());
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Text message is " + textMessage.getText());
String messageStr = textMessage.getText();
if (messageStr
.equals("Secrete message for clientId com.webspherenotes.secret")) {
System.out
.println("Secret message received check the clientId");
if (messageContext.getClientId().equals(
"com.webspherenotes.secret")) {
System.out
.println("Got request from com.webspherenotes.secret, for secret message returning message");
return true;
} else {
System.out
.println("Got request from some other client, for secret message returning, hidding message");
return false;
}
} else {
System.out
.println("Non secret message received, returning message");
return true;
}
}
} catch (JMSException e) {
e.printStackTrace();
}
return true;
}
}
Next configure the ActiveMQ so that it will use
SampleMessagePolicy
as
MessageAuthorizationPolicy
like this
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring
http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost" dataDirectory="${activemq.base}/data">
<messageAuthorizationPolicy>
<bean class="com.webspherenotes.jms.SampleMessagePolicy"
xmlns="http://www.springframework.org/schema/beans" />
</messageAuthorizationPolicy>
<!-- The transport connectors ActiveMQ will listen to -->
<transportConnectors>
<transportConnector name="openwire"
uri="tcp://localhost:61616" />
</transportConnectors>
</broker>
</beans>
After configuring the ActiveMQ restart the server for your changes to take effect. Then create
MessageConsumer.java
like this, in this file set the
ClientId
to
com.webspherenotes.secret
so that it will receive the secrete message.
package com.webspherenotes.jms;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class MessageConsumer implements MessageListener{
QueueConnection queueConnection;
QueueSession queueSession;
public MessageConsumer() {
try {
InitialContext context = new InitialContext();
QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) context
.lookup("QueueCF");
queueConnection = queueConnectionFactory.createQueueConnection("consumer","password");
queueConnection.setClientID("com.webspherenotes.secret");
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue) context.lookup("SampleQ");
QueueReceiver queueReceiver = queueSession.createReceiver(queue);
queueReceiver.setMessageListener(this);
queueConnection.start();
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("Inside MessageConsumer.onMessage "
+ textMessage.getText());
} catch (JMSException e) {
e.printStackTrace(System.out);
}
}
public static void main(String[] argv) {
try {
MessageConsumer messageConsumer = new MessageConsumer();
BufferedReader stdin = new BufferedReader(new InputStreamReader(
System.in));
System.out.println("Press enter to quit application");
stdin.readLine();
messageConsumer.queueConnection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
No comments:
Post a Comment