For people in hurry, refer the following steps to run the Demo.
Introduction
Combining vFabric RabbitMQ and Spring Integration we can create loosely coupled Enterprise class message based workflow using Spring AMQP. Refer Spring Integration, AMQP Backed Message Channels documentation.
In this demo, we will design a Spring Integration workflow, where in a message is published on to a p2p-pollable-channel and a publish-subscribe-channel is listening to it, it will pickup that message and pass it to 2 different service activators to further process it. I will also demonstrate that, both these message channels are on their own, listening to a external applications and when an external applications publish a message to these queues they will process from there on in the workflow. A typical use case of this is, in an enterprise IT, we can create robust workflows, where in if a message is not processed in a particular step, we can massage the data and process it from there on. Since Spring AMQP is a wire protocol the external application can be use any network protocol to publish the message to the vFabric RabbitMQ queue and the Spring Integration flow is triggered.
AMQP Backed Message Channel in Spring Integration flow
As always in my blog, as per TDD, I will be writing the test first as below,
public class PublisherSubscriberTest {
@Test
public void testIntegration() {
try {
String request = streamToString(getClass().getResourceAsStream(
"/data/payload.xml"));
Message<String> message = MessageBuilder.withPayload(request)
.build();
channel.send(message);
//assert various messages
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
The Spring Integration Configuration flow looks as below,
<int:bridge input-channel="p2p-pollable-channel" output-channel="pub-sub-channel" /> <int:service-activator input-channel="pub-sub-channel" id="serviceActivator1" ref="serviceActivator1Bean" method="logXml" /> <int:service-activator input-channel="pub-sub-channel" id="serviceActivator2" ref="serviceActivator2Bean" method="logXml" /> <bean id="serviceActivator1Bean" class="com.goSmarter.amqp.service.Subscriber1ServiceActivator"> </bean> <bean id="serviceActivator2Bean" class="com.goSmarter.amqp.service.Subscriber2ServiceActivator"> </bean>
Test channel definition looks as below,
<int:poller default="true" fixed-rate="1000" /> <int:channel id="p2p-pollable-channel" /> <int:publish-subscribe-channel id="pub-sub-channel" />
Actual channel definition used as a part of web.xml is as below,
<!-- A reference to the org.springframework.amqp.rabbit.connection.ConnectionFactory --> <rabbit:connection-factory id="connectionFactory" /> <!-- Creates a org.springframework.amqp.rabbit.core.RabbitAdmin to manage exchanges, queues and bindings --> <rabbit:admin connection-factory="connectionFactory" /> <int-amqp:channel id="p2p-pollable-channel" connection-factory="connectionFactory" /> <int-amqp:publish-subscribe-channel id="pub-sub-channel" connection-factory="connectionFactory" />
When you run this application in STS as “Run on Server” for the 1st time, you will notice in the vFabric RabbitMQ admin console that it will have one Queue, p2p-pollable-channel and Exchange called si.fanout.pub-sub-channel. If you notice the Spring Integration framework appended “si.fanout” in front of pub-sub-channel. If you want to test if end to end is working, you can post a message on either the Queue or Exchange, it will continue the workflow from that point onwards.
If you are using AMQP backed Channels, you can reply a payload from any channels to continue the workflow, this will be useful in error handling and Payload retry.
Conclusion
In this sample, we have demonstrated that with a single component backed by AMQP, we can build a loosely coupled workflow. Based on the context, there can be 2 different ways of building publish subscriber workflow, one is as described above and another one is one publisher component will have several subscriber components.
I hope this blog helped you.

Pingback: Distributed Weekly 179 — Scott Banwart's Blog