Tag Archives: Spring Integration

AMQP Backed Spring Integration using vFabric RabbitMQ

For people in hurry, refer the following steps to run the Demo.


Combining vFabric RabbitMQ and Spring Integration we can create loosely coupled Enterprise class message based workflow using Spring AMQP. Refer Spring Integration, AMQP Backed Message Channels documentation.

In this demo, we will design a Spring Integration workflow, where in a message is published on to a p2p-pollable-channel and a publish-subscribe-channel is listening to it, it will pickup that message and pass it to 2 different service activators to further process it. I will also demonstrate that, both these message channels are on their own, listening to a external applications and when an external applications publish a message to these queues they will process from there on in the workflow. A typical use case of this is, in an enterprise IT, we can create robust workflows, where in if a message is not processed in a particular step, we can massage the data and process it from there on. Since Spring AMQP is a wire protocol the external application can be use any network protocol to publish the message to the vFabric RabbitMQ queue and the Spring Integration flow is triggered.

AMQP Backed Message Channel in Spring Integration flow

AMQP based message channel within Spring Integration

As always in my blog, as per TDD, I will be writing the test first as below,

public class PublisherSubscriberTest {

public void testIntegration() {
try {
String request = streamToString(getClass().getResourceAsStream(
Message<String> message = MessageBuilder.withPayload(request)
//assert various messages
} catch (IOException e) {
// TODO Auto-generated catch block

The Spring Integration Configuration flow looks as below,

<int:bridge input-channel="p2p-pollable-channel"
output-channel="pub-sub-channel" />

<int:service-activator input-channel="pub-sub-channel"
id="serviceActivator1" ref="serviceActivator1Bean" method="logXml" />

<int:service-activator input-channel="pub-sub-channel"
id="serviceActivator2" ref="serviceActivator2Bean" method="logXml" />

<bean id="serviceActivator1Bean"

<bean id="serviceActivator2Bean"

Test channel definition looks as below,

<int:poller default="true" fixed-rate="1000" />
<int:channel id="p2p-pollable-channel" />
<int:publish-subscribe-channel id="pub-sub-channel" />

Actual channel definition used as a part of web.xml is as below,

<!-- A reference to the org.springframework.amqp.rabbit.connection.ConnectionFactory -->
<rabbit:connection-factory id="connectionFactory" />
<!-- Creates a org.springframework.amqp.rabbit.core.RabbitAdmin to manage
exchanges, queues and bindings -->
<rabbit:admin connection-factory="connectionFactory" />

<int-amqp:channel id="p2p-pollable-channel" connection-factory="connectionFactory" />
<int-amqp:publish-subscribe-channel id="pub-sub-channel" connection-factory="connectionFactory" />

When you run this application in STS as “Run on Server” for the 1st time, you will notice in the vFabric RabbitMQ admin console that it will have one Queue, p2p-pollable-channel and Exchange called si.fanout.pub-sub-channel. If you notice the Spring Integration framework appended “si.fanout” in front of pub-sub-channel. If you want to test if end to end is working, you can post a message on either the Queue or Exchange, it will continue the workflow from that point onwards.

If you are using AMQP backed Channels, you can reply a payload from any channels to continue the workflow, this will be useful in error handling and Payload retry.


In this sample, we have demonstrated that with a single component backed by AMQP, we can build a loosely coupled workflow. Based on the context, there can be 2 different ways of building publish subscriber workflow, one is as described above and another one is one publisher component will have several subscriber components.

I hope this blog helped you.

Publish Subscribe with vFabric RabbitMQ and Spring Integration using Spring AMQP

For people in hurry, get the latest code and steps to run the demo.

Introduction: Spring AMQP and vFabric RabbitMQ

vFabric RabbitMQ is a new offering from VMware that support AMQP standard. AMQP has been around for a long time, at a high level AMQP is yet another standard API to send and recieve messages between applications similar to JMS. The main difference being, AMQP is a wire-level protocal, JMS is a JAVA standard. There is a neutral blog comparing AMQP and JMS. As per this blog, AMQP is a standard that is popular among Python, RoR, C++ developers. Stomp and OpenWire are some of the implementation of AMQP. In the Spring world, we have Spring AMQP abstraction and vFabric RabbitMQ has the first implementation of Spring AMQP. This is a  good introduction of Spring AMQP with with vFabric RabbitMQ.

At a high level, RabbitMQ does not have a concept of Topics the way it is in JMS. It has a concept of Queues and Exchanges. The Exchanges can be of different types like Fanout, Topic. Refer to routing topologies used by RabbitMQ. There is also a good article in VMware about how vFabric RabbitMQ is used for a very large messaging implementation and Understanding AMQP, the protocol used by RabbitMQ.

Run Publish Subscribe with Spring Integration

In this example, we have built 2 Spring Integration component, Publisher component, that publishers messages to an RebbitMQ Exchange. The message is Fanout to multiple Queues. The Subscriber Spring Integration is listening to the Queue and process the messages. There are different ways of Publish/Subscribe messages in vFabric RabbitMQ, there is a good article on this in stackoverflow. There is also a good article on how Publisher/ Subscriber works in RabbitMQ.

In the example we have used Spring Integration configuration to define a subscriber as below,

<!-- A reference to the org.springframework.amqp.rabbit.connection.ConnectionFactory -->
<rabbit:connection-factory id="connectionFactory"/>

<!-- Creates a org.springframework.amqp.rabbit.core.RabbitTemplate for access to the broker -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
<!-- Creates a org.springframework.amqp.rabbit.core.RabbitAdmin    to manage exchanges, queues and bindings -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- Queues -->
<rabbit:queue id="queue1" name="queue.1.name"/>
<rabbit:queue id="queue2" name="queue.2.name"/>

<!-- Exchanges -->
<rabbit:fanout-exchange name="fanout.exchange.1.name">
<rabbit:binding queue="queue1"/>
<rabbit:binding queue="queue2"/>

<int:publish-subscribe-channel id="fanoutChannel" />

<int-amqp:outbound-channel-adapter channel="fanoutChannel"
amqp-template="amqpTemplate" exchange-name="fanout.exchange.1.name"/>

In our demo, you can start the vFabric RabbitMQ and run the test as below, the publisher will put the message in the 2 queues.

mvn -Dtest=com.goSmarter.amqp.PublisherSubscriberTest test

If you open the vFabric RabbitMQ management console, it will display that there is a message in the queue.

Subscriber Spring Integration configuration looks as below,

<!-- A reference to the org.springframework.amqp.rabbit.connection.ConnectionFactory -->
<rabbit:connection-factory id="connectionFactory" />

<!-- Creates a org.springframework.amqp.rabbit.core.RabbitTemplate for access   to the broker -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />

<!-- Creates a org.springframework.amqp.rabbit.core.RabbitAdmin to manage   exchanges, queues and bindings -->
<rabbit:admin connection-factory="connectionFactory" />
<int:channel id="inbound-channel" />

channel="inbound-channel" queue-names="queue.1.name"
connection-factory="connectionFactory" />

<int:service-activator input-channel="inbound-channel"
id="serviceActivator" ref="serviceActivatorBean" method="logXml" />

<bean id="serviceActivatorBean"

Now when you run the Subscriber Tomcat Instance within STS as “Run on Server” it will pick the message from the Queue and print it to the console.


In this blog, I have demonstrated a simple fanout based Publisher /Subscriber, where in a Publisher will publish a message and Subscriber will subscriber and print it. This will jumpstart you with a decent prototype on Spring AMQP. There are also few Spring AMQP samples on Stocks and other stuff.

I hope this blog helped you. Please leave me feedback.

Publisher, Subscriber using vFabric Spring Integration, Gemfire

As part of vFabric you get SpringSource Tool Suite (STS), vFabric tc Server, Spring AMQP to incorporate vFabric RabbitMQ messaging, and Spring Data project that simplifies access to vFabric GemFire, SQLFire and Postgres.

In the next few blogs, I will be discussing how each pieces tie in together. In this blog I will be demonstrating how Spring Integration and vFabric Gemfire participate in a typical Publisher/ Subscriber EIP pattern, where in data is picked from source system and delivered to 1 or many target systems.

For people who are in hurry, here is the code and the steps to execute.

Details: Publisher, Subscriber using vFabric Spring Integration, Gemfire

This demo is the continuation of my earlier blog on Claimcheck pattern. I will implement an end to end flow as below

Publisher Subscriber ESB using Spring Integration, Gemfire and ActiveMQ

Publisher Subscriber ESB using Spring Integration, Gemfire and ActiveMQ

For Demo purpose, on the publisher side, the message that comes in from claimcheck output is directly bridged to the jms outbound channel adapter. But in an enterprise application, in reality there can be a spring integration flow which can do an entire workflow/ transformation/ mapping, before it puts the message onto the Topic.

Again for demo purpose, on the subscriber side, I just directly took the payload output from claimcheck and passed to a console based service activator. But in an enterprise application, it can be sending this to a target system.

The technology stack I used for this is, hsqldb database, ActiveMQ JMS along with vFabric TC Server and vFabric Gemfire.

On the publisher side, the JDBC listener is configured in the Spring Integration as below,

query="select * from notification where status=2" channel="jdbc.inbound"
data-source="dataSource" update="update notification set status=10 where id in (:id)">
<int:poller fixed-rate="1000" />

and you bridge this to the claimcheck channel as below,

<!-- for convenience sake we are bridging the jdbc to directly claimcheck in reality we will be going thru a SI flow
<int:bridge input-channel="jdbc.inbound"
output-channel="common.claimcheck.in.inputChannel" />

<!-- Bridge the claimcheck output directly to JMS -->
<int:bridge input-channel="common.claimcheck.in.outputChannel"
output-channel="jms-channel.out" />

On the subscriber side, the payload is pushed to the vFabric Gemfire and a GUID is created on the vFabric Gemfire side and it is pushed to the Topic. The Subscriber component picks up the payload and prints to the console. This blog explains this more in detail.


vFabric infrastructure provides a good support for developing Integration components by the way of STS IDE, vFabric TC Server, vFabric Gemfire. I have clearly demonstrated how you can build a Publisher/subscriber components which can integrate a source system to target system.

I hope this blog helped you.

Claimcheck Pattern using Spring Integration and Gemfire


For people who are in hurry, here is the code and the steps to execute. Please also refer to my blog on Publisher/Subscriber using Spring Integration and Gemfire as the ESB.

In a Enterprise Service Bus (ESB) architecture, the “Service Bus” will become bottle neck over a period of time. In a typical ESB you write to the Bus and lot of subscribers read from the bus. Gemfire can help ESB to achieve Distributed caching, by virtue of being powerful inmemory distributed caching backbone. Claimcheck pattern is an Enterprise Integration Pattern where in a Publisher component can publish a large payload into Gemfire cache, a GUID key is generated for this payload and placed on the Topic. A subscriber in the Claimcheck pattern will be listening to the Topic for this key and get the payload from Gemfire. The Dataflow diagram is as follows,

Spring Integration Gemfire Claimcheck Pattern

Spring Integration Gemfire Claimcheck Pattern

Claimcheck Pattern using Spring Integration and Gemfire

There are lot of good example from David Turanski in Github to learn Spring Integration with Gemfire. In this prototype, I built a simple replicated region based server based on the idea from basic\replicated-cs example from David’s examples. In order to run this prototype, download and install Activemq and start ActiveMq and create a Topic called “MyTopic”. After that, run the Gemfire Replicated Server as below.

cd gemfire-server
mvn package
mvn exec:java -Dexec.mainClass="org.springframework.data.gemfire.examples.Server"

Now Maven import the claimcheck-subscriber project into STS IDE and start the tomcat instance by clicking “Run on Server” this project. The subscriber will start listening to the messages/GUID in the topic “MyTopic”

Finally publish the XML payload on to the Gemfire cache by executing below command,

cd claimcheck-publisher
mvn test

In the Tomcat server console you will see below message

[Headers={timestamp=1348935448580, id=77480754-a4d8-4bc4-a529-26c9e26a7724, jms_timestamp=1348935447237, jms_redelivered=false, jms_messageId=ID:krishna-PC-62975-1348935446968-1:1:1:1:1}]
29 Sep 2012 12:17:28,702[topicContainer-1] DEBUG: com.goSmarter.gemfire.claimcheckpattern.ServiceActivator - ### PAYLOAD ###<!--?xml version="1.0" encoding="UTF-8"?-->



In this demo, I have demonstrated the claimcheck pattern. In an enterprise ecosystem, this is a basic component of claimcheck pattern. The enhancements on top of this includes various other things including,

  • each subscribers doing partial checkout of payloads from Gemfire.
  • it also need to have a way to work in a cluster environment.
  • Implementing distribution caching

I hope this example helped you.

Spring Integration JMS Mbean configuration

Spring Integration JMS message-driven-channel-adapter has a good MBean support through  DefaultMessageListenerContainer where we can control the flow of messages to the subscribers. In this blog I will demonstrate some of these capabilities. For people in hurry, here is the code @ Github,

  • Download activemq and unzip it
  • run activemq.bat under <activemq-home>/bin folder
  • Go to browser and type http://localhost:8161/admin/queues.jsp, create a new queue “MyQueue”
  • Get the latest code from Github
  • build the war file using “mvn clean package”
  • Deploy the war file in Spring tomcat container
  • “Send to” message to MyQueue in admin console of activemq in the browser
  • You notice the message gets consumed
  • Open jconsole.exe under jdk bin directory
  • Login in to Remote process “localhost:6969”, uid: admin, password: “springsource”
  • Go to MBean tab and expand spring.application/JMSContainer/testQueueContainer/Operation, click on stop operation
  • Again “Send to” message to MyQueue in admin console of activemq in the browser, if you notice, the message is not consumed
  • Go to MBean tab and expand spring.application/JMSContainer/testQueueContainer/Operation, click on start operation
  • Again “Send to” message to MyQueue in admin console of activemq in the browser, if you notice, the message got consumed

Details: Spring Integration JMS Mbean configuration

In a typical enterprise IT production environment, you need to control the message processing by the queue subscribers. Typical scenario would be, if there is a an issue with the subscriber and we want to pause the subscriber to troubleshoot it, we need a mechanism for this as below.

Expose Lifecycle interface of DefaultMessageListenerContainer

<bean id="queue.exporter" class="org.springframework.jmx.export.MBeanExporter">
<property name="beans">
<entry key="spring.application:type=JMSContainer,name=testQueueContainer" value-ref="queueContainer" />
<property name="assembler">
<bean class="org.springframework.jmx.export.assembler.InterfaceBasedMBeanInfoAssembler">
<property name="managedInterfaces">

InterfaceBasedMBeanInfoAssembler class expose an interface of a particular class as a Mbean, in the above example it exposes Lifecycle

Managing JMS subscriber in JConsole

Managing JMS subscriber using JConsole

Managing JMS subscriber using JConsole

When you click on start operation, and click on start button, it will start the subscriber to listen to the message and when you click on stop, it will pause the subscriber.


MBean is a powerful way to expose some of the capabilities of Spring Integration’s various Channel Adapter, so that you can control them from various tools which are MBean aware, like JConsole, Hyperic . We can also expose, Log4J Mbean so that we can change the log level in the production system. More on this in my subsequent blog.

Measuring flow channel JUnit test coverage in Spring Integration Flow

Test Coverage

In any enterprise integration project there has to be a Business Workflow that needs to be developed and should have proper test coverage. In one  of our largest Spring Integration project, we had to measure how well the Business workflow has test coverage. Spring Integration (SI) is a lightweight framework that has good unit testing support.

Gary Russell one of the leading Spring Integration Architects helped us in designing this framework. The framework does the following,

  • The framework has a class, you need to inherit this class in your SI to get JUnit Test Coverage test with this class. It will wire tap all the channels and count the total channels tested in the SI flow and calculates how many channels the tests are covering and will derive the coverage ratio.
  • We also implemented a Maven plugin, where in we can set the threshold for the build to succeed based on the coverage ratio
  • This framework also consists of the sonar plugin for channel coverage  (contributed by Gary to Github) for Sonar software quality tool. Once you get the plugin, you need to build the plugin as “mvn package”. Take the jar file from the target and copy it into <Sonar Home>/extensions/plugins.
  • Start Sonar
  • In your SI component, if you do “mvn sonar:sonar” it will, push the quality metrics information into Sonar

The report in Sonar looks as below,

Spring Integration Channel Coverage

Spring Integration Channel Coverage

The codebase for the utility class AbstractChannelCoverageTests is checked in @ github . You can see the test class @ src/test/java/org/springframework/integration/test/coverage/ChannelCoverageTests.java. If you notice, in your SI testclass you need to extend this class as follows,

public class ChannelCoverageTests extends AbstractChannelCoverageTests {

void testChannel(){
//testing and asserting the flow

It will load a SI config file @ src/test/resources/org/springframework/integration/test/coverage/ChannelCoverageTests-context.xml and load all the channels and will start intercepting for any payload flow and keep count of it.

As per the above diagram, it will indicate how many channels are missed and it helps in making sure you cover those channels as well.

JUnit testing with Spring Integration and Spring WS

Spring Integration, Spring WS for Webservice is a decent framework to design and implement Webservice. More than that it has a good JUnit testing support. In this section I will be talking about how we do implement a Webservice using Test Driven Development (TDD). The working sample is here in the Github .

As a first step we need to define the XSD schema:

<?xml version="1.0" encoding="UTF-8"?> <xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:data="http://mycompany.com/it/enterprise/data/v1" xmlns:msg="http://mycompany.com/it/enterprise/msg/v1" xmlns:svc="http://mycompany.com/it/enterprise/contract/TestService/v1" targetNamespace="http://mycompany.com/it/enterprise/contract/TestService/v1" elementFormDefault="qualified" attributeFormDefault="unqualified" version="1.0">  <xsd:import namespace="http : //mycompany.com/it/enterprise/data/v1" schemaLocation="TestTransactions_v1.xsd"/>
<xsd:element name="TestServiceRequest" type="data:TestServiceRequestType"/>
<xsd:element name="TestServiceResponse" type="data:TestServiceResponseType"/>

<?xml version="1.0" encoding="UTF-8"?><xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:data="http://mycompany.com/it/enterprise/data/v1" xmlns:msg="http://mycompany.com/it/enterprise/msg/v1" targetNamespace="http://mycompany.com/it/enterprise/data/v1" elementFormDefault="qualified" attributeFormDefault="unqualified">
<xsd:complexType name="TestServiceRequestType">
<xsd:element name="Document">
<xsd:element name="Id" type="xsd:string" minOccurs="0"/>
<xsd:element name="Type" type="xsd:string" minOccurs="0"/>
<xsd:complexType name="TestServiceResponseType">
<xsd:element name="Document">
<xsd:element name="Result" type="xsd:string" minOccurs="0"/>

Writing the JUnit test as below,

public class TestIntegrationEndPointTest {

private ApplicationContext applicationContext;

private MockWebServiceClient mockClient;

public void createClient() {
mockClient = MockWebServiceClient.createClient(applicationContext);

public void testWsEndPointTest() throws Exception {
Source requestPayload = new StringSource("<?xml version=\"1.0\" encoding=\"UTF-8\"?><v1:TestServiceRequest xmlns:v1=\"http://mycompany.com/it/enterprise/contract/TestService/v1\" xmlns:v11=\"http://mycompany.com/it/enterprise/data/v1\"><v11:Document><v11:Id>101</v11:Id><v11:Type>MaterialMaster</v11:Type></v11:Document></v1:TestServiceRequest>");
Source responsePayload = new StringSource("<testServiceResponseType xmlns=\"http://mycompany.com/it/enterprise/data/v1\" xmlns:ns2=\"http://mycompany.com/it/enterprise/contract/TestService/v1\"><Document><Result>SUCCESS</Result></Document></testServiceResponseType>");


Now the test will still fail with need for Spring autowiring, the spring bean definition configuration for Spring WS looks as below,

<property name="endpointMap">
<entry key="{http://mycompany.com/it/enterprise/contract/TestService/v1}TestServiceRequest" value-ref="TestBatchEndpoint" />
<int-ws:inbound-gateway id="TestBatchEndpoint" reply-channel="test.batch.webservice.out" request-channel="test.batch.webservice.in" />
<bean id="marshaller" >
<property name="contextPath" value="com.mycompany.it.enterprise.contract.testservice.v1" />
<bean id="testServiceActivator" class="com.mycompany.it.eis.webservice.ws.TestServiceActivator"/>
<bean id="resultTransformer" class="org.springframework.integration.xml.transformer.ResultToStringTransformer"/>

Spring Integration and Spring WS flow looks as below

<int:bridge  input-channel="test.batch.webservice.in" output-channel="test.batch.webservice.unmarshalling.in"></int:bridge>
<int-xml:unmarshalling-transformer id="defaultUnmarshaller" input-channel="test.batch.webservice.unmarshalling.in" output-channel="test.batch.webservice.unmarshalling.out" unmarshaller="marshaller"/>
<int:service-activator id="test.batch.webservice.activator" input-channel="test.batch.webservice.unmarshalling.out" ref="testServiceActivator" output-channel="test.batch.webservice.marshalling.in">
<int-xml:marshalling-transformer input-channel="test.batch.webservice.marshalling.in" output-channel="test.batch.webservice.out" marshaller="marshaller" result-transformer="resultTransformer" />

IBatisTemplate looks as below,

<bean id="sqlMapClientTemplate" class="org.springframework.orm.ibatis.SqlMapClientTemplate">>
<property name="sqlMapClient" ref="sqlMapClient" />

You need to create the Java stubs from the XSL for that you need to run,

mvn jaxb2:xjc

It assumes that you have XSDs at location src/main/xsd location by default, refer this article for more details.

In the Test class, you need to add the Spring JUnit testing capabilities as follows,

"classpath:config/test-webservice-beans-config-test.xml" })
public class TestIntegrationEndPointTest {

TestServiceActivator looks somewhat as below:

public class TestServiceActivator {
private static Logger logger = Logger.getLogger(TestServiceActivator.class);

SqlMapClientTemplate ibatisTemplate;

public TestServiceResponseType processRequest(JAXBElement element)    throws Exception {
TestServiceRequestType request = (TestServiceRequestType) element.getValue();
String status = "SUCCESS";
String type = request.getDocument().getType();
String id = request.getDocument().getId();

TestObject notifyObject = new TestObject();
ibatisTemplate.insert("testInsert", notifyObject);

TestServiceResponseType response = new TestServiceResponseType();

Document doc = new Document();

logger.debug("Successfully saved request");

return response;

One hack we need to do is, we need to add @XmlRootElement to the stubs that were created as per this article

@XmlType(name = "TestServiceResponseType", propOrder = {
public class TestServiceResponseType {

@XmlType(name = "TestServiceRequestType", propOrder = {
public class TestServiceRequestType {

Once you do this, we can run the mvn test and it will work. If you notice in this example, we have tested the String WS end to end using JUnit and Spring Integraiton.

For further reading refer http://blog.springsource.org/2011/01/11/spring-web-services-2-0-released/ .