Publish Subscribe with vFabric RabbitMQ and Spring Integration using Spring AMQP

For people in hurry, get the latest code and steps to run the demo.

Introduction: Spring AMQP and vFabric RabbitMQ

vFabric RabbitMQ is a new offering from VMware that support AMQP standard. AMQP has been around for a long time, at a high level AMQP is yet another standard API to send and recieve messages between applications similar to JMS. The main difference being, AMQP is a wire-level protocal, JMS is a JAVA standard. There is a neutral blog comparing AMQP and JMS. As per this blog, AMQP is a standard that is popular among Python, RoR, C++ developers. Stomp and OpenWire are some of the implementation of AMQP. In the Spring world, we have Spring AMQP abstraction and vFabric RabbitMQ has the first implementation of Spring AMQP. This is a  good introduction of Spring AMQP with with vFabric RabbitMQ.

At a high level, RabbitMQ does not have a concept of Topics the way it is in JMS. It has a concept of Queues and Exchanges. The Exchanges can be of different types like Fanout, Topic. Refer to routing topologies used by RabbitMQ. There is also a good article in VMware about how vFabric RabbitMQ is used for a very large messaging implementation and Understanding AMQP, the protocol used by RabbitMQ.

Run Publish Subscribe with Spring Integration

In this example, we have built 2 Spring Integration component, Publisher component, that publishers messages to an RebbitMQ Exchange. The message is Fanout to multiple Queues. The Subscriber Spring Integration is listening to the Queue and process the messages. There are different ways of Publish/Subscribe messages in vFabric RabbitMQ, there is a good article on this in stackoverflow. There is also a good article on how Publisher/ Subscriber works in RabbitMQ.

In the example we have used Spring Integration configuration to define a subscriber as below,

<!-- A reference to the org.springframework.amqp.rabbit.connection.ConnectionFactory -->
<rabbit:connection-factory id="connectionFactory"/>

<!-- Creates a org.springframework.amqp.rabbit.core.RabbitTemplate for access to the broker -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
<!-- Creates a org.springframework.amqp.rabbit.core.RabbitAdmin    to manage exchanges, queues and bindings -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- Queues -->
<rabbit:queue id="queue1" name="queue.1.name"/>
<rabbit:queue id="queue2" name="queue.2.name"/>

<!-- Exchanges -->
<rabbit:fanout-exchange name="fanout.exchange.1.name">
<rabbit:bindings>
<rabbit:binding queue="queue1"/>
<rabbit:binding queue="queue2"/>
</rabbit:bindings>
</rabbit:fanout-exchange>

<int:publish-subscribe-channel id="fanoutChannel" />

<int-amqp:outbound-channel-adapter channel="fanoutChannel"
amqp-template="amqpTemplate" exchange-name="fanout.exchange.1.name"/>

In our demo, you can start the vFabric RabbitMQ and run the test as below, the publisher will put the message in the 2 queues.


mvn -Dtest=com.goSmarter.amqp.PublisherSubscriberTest test

If you open the vFabric RabbitMQ management console, it will display that there is a message in the queue.

Subscriber Spring Integration configuration looks as below,

<!-- A reference to the org.springframework.amqp.rabbit.connection.ConnectionFactory -->
<rabbit:connection-factory id="connectionFactory" />

<!-- Creates a org.springframework.amqp.rabbit.core.RabbitTemplate for access   to the broker -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />

<!-- Creates a org.springframework.amqp.rabbit.core.RabbitAdmin to manage   exchanges, queues and bindings -->
<rabbit:admin connection-factory="connectionFactory" />
<int:channel id="inbound-channel" />

<int-amqp:inbound-channel-adapter
channel="inbound-channel" queue-names="queue.1.name"
connection-factory="connectionFactory" />

<int:service-activator input-channel="inbound-channel"
id="serviceActivator" ref="serviceActivatorBean" method="logXml" />

<bean id="serviceActivatorBean"
class="com.goSmarter.amqp.service.Subscriber1ServiceActivator">
</bean>

Now when you run the Subscriber Tomcat Instance within STS as “Run on Server” it will pick the message from the Queue and print it to the console.

Conclusion

In this blog, I have demonstrated a simple fanout based Publisher /Subscriber, where in a Publisher will publish a message and Subscriber will subscriber and print it. This will jumpstart you with a decent prototype on Spring AMQP. There are also few Spring AMQP samples on Stocks and other stuff.

I hope this blog helped you. Please leave me feedback.

About these ads

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s