WSO2 Message Broker vs. Apache Qpid – Messaging Integration Patterns

If you want to define an Integration Architecture based on Messaging with WSO2, the only alternative you have is to do with WSO2 Message Broker and possibly also with Apache ActiveMQ.
In earlier versions of WSO2 ESB, the WSO2 web had official information on how to integrate WSO2 ESB and Apache ActiveMQ, integrate both always was a common pattern, now it has been left in place of WSO2 Message Broker.

Initially to use WSO2 ESB and Apache ActiveMQ was the de-facto solution to implement the Messaging Integration Architecture, but now all the focus are on WSO2 Message Broker. The product strategy for WSO2 Message Broker is being used in big and critical projects, for example, below some use cases:

  • Guarantee Delivery
  • Large volumes of messages
  • Persistence of several and big messages
  • High availability & scalability

Well, WSO2 Message Broker is perfect for BigData or IoT Projects, but what if we want implement and solve the same requirements but without losing functionalities. Then, What Message Broker or MOM tool should I use?.

Apache ActiveMQ is a good alternative, JBoss HornetQ, RabbitMQ and other. Some time ago I wrote about of this, here the blog post:
https://holisticsecurity.wordpress.com/2014/03/07/message-brokering-y-recoleccion-datos-big-data-wso2

So what alternatives do we have to WSO2 Message Broker comparable to robustness, scalability, interoperability, lightweight without losing functionalities?. The answer is Apache Qpid. Now we will explain why.

I. Why Apache Qpid is a good comparable alternative to WSO2 Message Broker?

  1. Earlier releases of WSO2 Message Broker had Apache Qpid embedded.
  2. Last release of WSO2 Message Broker uses WSO2 Andes, an extended version of Apache Qpid, to connect to Apache Cassandra as message store backend.
  3. WSO2 ESB solves the most common EIPs for messaging (guarantee delivery, persistence of messages, routing, selective channels, etc.) by developing 2 features as are the MessageStore and MessageProcessor. These features are implement on top of JMS and are technically agnostic. In other words, if the Message Broker implements JMS, then there are many possibilities that the Message Broker can be integrated to WSO2 ESB.
  4. Apache Qpid is a Message Broker lightweight, powerful, robust and has different message implementation, they are:
    • In-memory message store.
    • Apache Derby store.
    • Message store based on Oracle Berkeley DB, suitable to create high available messaging platforms.
  5. Apache Qpid is compatible with: JMS 1.1, AMQP 1.0, 0-8, 0-9 and 0-9-1.
  6. If you want to play with WSO2 Message Broker, you should do it on an robust and powerful infraestructura, however, that is not necessary for Apache Qpid. You can run Apache Qpid in constrained infraestructure.
  7. Other alternatives to WSO2 Message Broker could be:
    • Apache ActiveMQ, although is a project mature, it is big compared to Apache Qpid and its architecture is its limit.
    • RabbitMQ, it is bigger than Apache Qpid, but its architecture is based on plugins, it is faster but does not implement AMQP 1.0 natively, is necessary install an experimental plugin (https://www.rabbitmq.com/specification.html).

Well, after of this little introduction, I will explain step-by-step how to integrate quickly WSO2 ESB with Apache Qpid, how to enable JMS transport for Synapse Proxies and how to implement the EIP related to messaging.

II. Installation of Apache Qpid

1.- Download Apache Qpid. The latest version is 0.30 (qpid-broker-0.30-bin.tar.gz) and you can download from here http://qpid.apache.org/components/java-broker/index.html

2.- Unzip and run it.

 
$ ~/0dev-env/2srv/qpid-broker-0.30/bin/qpid-server
System Properties set to -Damqj.logging.level=info -DQPID_HOME=/Users/Chilcano/0dev-env/2srv/qpid-broker-0.30 -DQPID_WORK=/Users/Chilcano/0dev-env/2srv/qpid-work-std
QPID_OPTS set to -Damqj.read_write_pool_size=32 -DQPID_LOG_APPEND=
Using QPID_CLASSPATH /Users/Chilcano/0dev-env/2srv/qpid-broker-0.30/lib/*:/Users/Chilcano/0dev-env/2srv/qpid-broker-0.30/lib/plugins/*:/Users/Chilcano/0dev-env/2srv/qpid-broker-0.30/lib/opt/*
Info: QPID_JAVA_GC not set. Defaulting to JAVA_GC -XX:+UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError
Info: QPID_JAVA_MEM not set. Defaulting to JAVA_MEM -Xmx2g
log4j:WARN No appenders could be found for logger (org.apache.qpid.server.plugin.QpidServiceLoader).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[Broker] BRK-1006 : Using configuration : /Users/Chilcano/0dev-env/2srv/qpid-work-std/config.json
[Broker] BRK-1007 : Using logging configuration : /Users/Chilcano/0dev-env/2srv/qpid-broker-0.30/etc/log4j.xml
[Broker] BRK-1001 : Startup : Version: 0.30 Build: Unversioned directory
[Broker] BRK-1010 : Platform : JVM : Oracle Corporation version: 1.7.0_51-b13 OS : Mac OS X version: 10.10 arch: x86_64
[Broker] BRK-1011 : Maximum Memory : 2,077,753,344 bytes
[Broker] BRK-1002 : Starting : Listening on TCP port 5672
[Broker] MNG-1001 : Web Management Startup
[Broker] MNG-1002 : Starting : HTTP : Listening on port 8080
[Broker] MNG-1004 : Web Management Ready
[Broker] MNG-1001 : JMX Management Startup
[Broker] MNG-1002 : Starting : RMI Registry : Listening on port 8999
[Broker] MNG-1002 : Starting : JMX RMIConnectorServer : Listening on port 9099
[Broker] MNG-1004 : JMX Management Ready
[Broker] BRK-1004 : Qpid Broker Ready

If you want change the working directory, then you should define “QPID_WORK” variable in ‘qpid-server’ file or as System Variable in your S.O.:

 
export QPID_WORK=$HOME/0dev-env/2srv/qpid-work-std

3.- If you want change ports, enable services, exchanges, enable plugins or replace default configuration, you have a lot of possibilities, just follow this indications:

https://qpid.apache.org/releases/qpid-0.30/java-broker/book/Java-Broker-Configuring-And-Managing.html

4.- If you want configure High Availability or configure a Cluster of Apache Qpid, then just follow this informations. You could configure Apache Qpid HA as Active-Pasive or Active-Active with automatic Failover.
Check this information:

http://qpid.apache.org/releases/qpid-0.30/java-broker/book/Java-Broker-High-Availability.html

III. Enabling JMS transport in WSO2 ESB for Apache Qpid

Enabling JMS Transport

Enabling JMS Transport

1.- After of installation of WSO2 ESB, to edit the file %WSO2ESB_HOME%/repository/conf/axis2/axis2.xml and add the JMSSender and JMSListener for Apache Qpid as follow:

 
[...]

  <!-- ================================================= -->
  <!--             Transport Ins (Listeners)             -->
  <!-- ================================================= -->

[...]

    <!-- SAP Transport Listeners -->
    <!-- <transportReceiver name="idoc" class="org.wso2.carbon.transports.sap.SAPTransportListener"/> -->
    <!-- <transportReceiver name="bapi" class="org.wso2.carbon.transports.sap.SAPTransportListener"/> -->

    <!-- **** JMS Transport LISTENER support with Apache Qpid **** -->
  <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
    <parameter name="myTopicConnectionFactory" locked="false">
        <parameter name="java.naming.factory.initial" locked="false">org.apache.qpid.jndi.PropertiesFileInitialContextFactory</parameter>
        <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
        <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
        <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
    </parameter>

    <parameter name="myQueueConnectionFactory" locked="false">
        <parameter name="java.naming.factory.initial" locked="false">org.apache.qpid.jndi.PropertiesFileInitialContextFactory</parameter>
        <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
        <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
        <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
    </parameter>

    <parameter name="default" locked="false">
        <parameter name="java.naming.factory.initial" locked="false">org.apache.qpid.jndi.PropertiesFileInitialContextFactory</parameter>
        <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
        <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
        <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
    </parameter>
  </transportReceiver>

    <!-- ================================================= -->
    <!--             Transport Outs (Senders)              -->
    <!-- ================================================= -->

[...]

    <!-- SAP Transport Senders -->
    <!-- <transportSender name="idoc" class="org.wso2.carbon.transports.sap.SAPTransportSender"/> -->
    <!-- <transportSender name="bapi" class="org.wso2.carbon.transports.sap.SAPTransportSender"/> -->

  <!-- **** JMS Transport SENDER support with Apache Qpid **** -->
  <transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/>


    <!-- ================================================= -->
    <!--             Global Engaged Modules                -->
    <!-- ================================================= -->

[...]

2.- Now, download the Apache Qpid Java Client (qpid-client-0.30-bin.tar) from http://qpid.apache.org/components/qpid-jms/index.html, unzip it and copy the qpid-client-0.30.jar and qpid-common-0.30.jar to %WSO2ESB_HOME%/repository/components/lib/ and restart WSO2 ESB.

3.- Edit the JNDI file (%WSO2ESB_HOME%/repository/conf/jndi.properties) as follow:

 
# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
##connectionfactory.QueueConnectionFactory = amqp://admin:admin@clientID/test?brokerlist='tcp://localhost:5672'
connectionfactory.QueueConnectionFactory = amqp://admin:admin@clientID/default?brokerlist='tcp://localhost:5672'
connectionfactory.TopicConnectionFactory = amqp://admin:admin@clientID/default?brokerlist='tcp://localhost:5672'

# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.MyJNDIQueue02 = QPID_QUEUE_02

# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
##topic.MyTopic = example.MyTopic

Where:

  • ‘clientID/default’ is the identifier of client used to connect and the virtualhost respectively.
  • ‘MyJNDIQueue02’ is the JNDI name for my queue and ‘QPID_QUEUE_02’ is the physical queue name. They will used in the next section, when creating MessaeStore and MessaProcessor.

4.- Create a Synapse Proxy in WSO2 ESB to send a message to Apache Qpid.

 
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="ProxyQpidSender"
       transports="http"
       statistics="disable"
       trace="disable"
       startOnLoad="true">
   <target>
      <inSequence>
         <property name="OUT_ONLY" value="true" scope="default" type="STRING"/>
         <property name="FORCE_SC_ACCEPTED"
                   value="true"
                   scope="axis2"
                   type="STRING"/>
         <header name="Action" scope="default" value="ProxyQpidSender"/>
         <property name="SYS_DATE"
                   expression="get-property('SYSTEM_DATE')"
                   scope="default"/>
         <payloadFactory media-type="xml">
            <format>
               <MySysDate xmlns="">Date $1</MySysDate>
            </format>
            <args>
               <arg evaluator="xml" expression="$ctx:SYS_DATE"/>
            </args>
         </payloadFactory>
         <property xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope"
                   name="MSG_BODY_IN"
                   expression="//soapenv:Body"
                   scope="default"/>
         <log level="custom">
            <property name="[ProxyQpidSender] BODY SENT" expression="$ctx:MSG_BODY_IN"/>
         </log>
         <send>
            <endpoint>
              <address uri="jms:/QPID_QUEUE_01?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.apache.qpid.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=repository/conf/jndi.properties&amp;transport.jms.ConnectionFactoryType=queue"/>
            </endpoint>
         </send>
      </inSequence>
   </target>
   <description>Send a message to Apache Qpid queue (QPID_QUEUE_01)</description>
</proxy>

Now we can send messages using this Proxy and in the Apache Qpid side are as shown:

inspecting the store messages in Apache Qpid

inspecting the store messages in Apache Qpid

5.- Create a Synapse Proxy in WSO2 ESB to listen for messages from Apache Qpid.

 
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="ProxyQpidReceiver"
       transports="jms"
       statistics="disable"
       trace="disable"
       startOnLoad="true">
   <target>
      <inSequence>
         <property name="OUT_ONLY" value="true"/>
         <property xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope"
                   name="MSG_FROM_QUEUE"
                   expression="//soapenv:Body"
                   scope="default"/>
         <log level="custom">
            <property name="[ProxyQpidReceiver] MSG RECEIVED"
                      expression="$ctx:MSG_FROM_QUEUE"/>
         </log>
      </inSequence>
   </target>
   <parameter name="transport.jms.ContentType">
      <rules>
         <jmsProperty>contentType</jmsProperty>
         <default>application/soap+xml</default>
      </rules>
   </parameter>
   <parameter name="transport.jms.ConnectionFactory">myQueueConnectionFactory</parameter>
   <parameter name="transport.jms.Destination">QPID_QUEUE_01</parameter>
   <parameter name="transport.jms.ConnectionFactoryType">queue</parameter>
   <description>Message received from Apache Qpid queue (QPID_QUEUE_01)</description>
</proxy>

6.- Now we can test this configuration using SoapUI and follow the logs in WSO2 ESB console.

 
[...]
[2014-11-27 22:13:29,000]  INFO - LogMediator [ProxyQpidSender] BODY SENT = <soap:Body xmlns:soap="http://www.w3.org/2003/05/soap-envelope"><MySysDate>Date 11/27/14 10:13 PM</MySysDate></soap:Body>
[2014-11-27 22:13:29,021]  INFO - LogMediator [ProxyQpidReceiver] BODY RECEIVED = <soap:Body xmlns:soap="http://www.w3.org/2003/05/soap-envelope"><MySysDate>Date 11/27/14 10:13 PM</MySysDate></soap:Body>
[2014-11-27 22:13:29,474] DEBUG - ServerWorker Starting a new Server Worker instance
[2014-11-27 22:13:29,476]  INFO - LogMediator [ProxyQpidSender] BODY SENT = <soap:Body xmlns:soap="http://www.w3.org/2003/05/soap-envelope"><MySysDate>Date 11/27/14 10:13 PM</MySysDate></soap:Body>
[2014-11-27 22:13:29,496]  INFO - LogMediator [ProxyQpidReceiver] BODY RECEIVED = <soap:Body xmlns:soap="http://www.w3.org/2003/05/soap-envelope"><MySysDate>Date 11/27/14 10:13 PM</MySysDate></soap:Body>
[...]

7.- Conclusions:

  • The first time you run the “ProxyQpidSender” proxy, the proxy will automatically create the “QPID_QUEUE_01” queue in Apache Qpid.
  • If you run several times the “ProxyQpidSender” proxy to send and store message in “QPID_QUEUE_01” queue, they will be processed quickly for the “ProxyQpidReceiver” proxy, because this proxy is listen permanently for messages in this queue.
    But, if you want inspect the payload of the messages, I recommend you disable the “ProxyQpidReceiver” proxy in WSO2 ESB. Of this way, using Apache Qpid Web Console you can review any aspect of message (header and payload).
  • The “ProxyQpidSender” proxy has as transport the HTTP protocol and “ProxyQpidReceiver” proxy is using JMS transport.

IV. WSO2 ESB Message Store and Message Processor for Apache Qpid

IV.1. Creating a Message Store in WSO2 ESB

1.- Create the WSO2 ESB Message Store pointing to QPID_QUEUE_02.

 
<messageStore name="MsgStoreQpid02" class="org.apache.synapse.message.store.impl.jms.JmsStore" xmlns="http://ws.apache.org/ns/synapse">
   <parameter name="java.naming.factory.initial">org.apache.qpid.jndi.PropertiesFileInitialContextFactory</parameter>
   <parameter name="java.naming.provider.url">repository/conf/jndi.properties</parameter>
   <parameter name="store.jms.destination">MyJNDIQpidQueue02</parameter>
   <parameter name="store.jms.connection.factory">QueueConnectionFactory</parameter>
   <parameter name="store.jms.username">admin</parameter>
   <parameter name="store.jms.password">admin</parameter>
   <parameter name="store.jms.JMSSpecVersion">1.1</parameter>
</messageStore>

2.- Create a new WSO2 ESB Proxy to send messages to the new message store ‘MsgStoreQpid02’ using the ‘Store Mediator’.

 
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="ProxyQpidSender2MsgStoreSampling"
       transports="http"
       statistics="disable"
       trace="disable"
       startOnLoad="true">
   <target>
      <inSequence>
         <property name="OUT_ONLY" value="true" scope="default" type="STRING"/>
         <property name="FORCE_SC_ACCEPTED"
                   value="true"
                   scope="axis2"
                   type="STRING"/>
         <header name="Action" scope="default" value="ProxyQpidSender2MsgStoreSampling"/>
         <property name="SYS_DATE"
                   expression="get-property('SYSTEM_DATE')"
                   scope="default"/>
         <property name="messageType" value="text/plain" scope="transport"/>
         <property name="CONTENT_TYPE" value="application/soap+xml" scope="transport"/>
         <payloadFactory media-type="xml">
            <format>
               <MySysDate xmlns="">MsgStore - SysDate $1</MySysDate>
            </format>
            <args>
               <arg evaluator="xml" expression="$ctx:SYS_DATE"/>
            </args>
         </payloadFactory>
         <property xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope"
                   name="MSG_STORED"
                   expression="//soapenv:Body"
                   scope="default"/>
         <log level="custom">
            <property name="[ProxyQpidSender2MsgStoreSampling] MSG STORED"
                      expression="$ctx:MSG_STORED"/>
         </log>
         <store messageStore="MsgStoreQpid02"/>
      </inSequence>
   </target>
   <description>Store the message in the WSO2 ESB Message Store (MsgStoreQpid02)</description>
</proxy>

IV.2. Creating Message Sampling Processor in WSO2 ESB

Message Sampling Processor with WSO2 ESB and Apache Qpid

Message Sampling Processor with WSO2 ESB and Apache Qpid

1.- Create a Sequence for the Sampling Processor

 
<sequence xmlns="http://ws.apache.org/ns/synapse" name="SequenceQpidSampling">
   <property xmlns:ns="http://org.apache.synapse/xsd" xmlns:soap="http://www.w3.org/2003/05/soap-envelope" name="MSG_RECEIVED" expression="//soap:Body" scope="default"></property>
   <log level="custom">
      <property xmlns:ns="http://org.apache.synapse/xsd" name="[SequenceQpidSampling] MSG" expression="$ctx:MSG_RECEIVED"></property>
   </log>
   <drop></drop>
</sequence>

2.- Create the Message Sampling Processor

 
<messageProcessor name="MsgProcessorQpidSampling" class="org.apache.synapse.message.processor.impl.sampler.SamplingProcessor" messageStore="MsgStoreQpid02" xmlns="http://ws.apache.org/ns/synapse">
   <parameter name="interval">1000</parameter>
   <parameter name="concurrency">1</parameter>
   <parameter name="sequence">SequenceQpidSampling</parameter>
   <parameter name="cronExpression">0 0/1 * 1/1 * ? *</parameter>
   <parameter name="is.active">true</parameter>
</messageProcessor>

3.- Send messages to Message Store and check the logs

 
[2014-11-28 00:06:39,654] DEBUG - ServerWorker Starting a new Server Worker instance
[2014-11-28 00:06:39,657]  INFO - LogMediator [ProxyQpidSender2MsgStoreSampling] MSG STORED = <soap:Body xmlns:soap="http://www.w3.org/2003/05/soap-envelope"><MySysDate>MsgStore - SysDate 11/28/14 12:06 AM</MySysDate></soap:Body>
[2014-11-28 00:06:45,894] DEBUG - ServerWorker Starting a new Server Worker instance
[2014-11-28 00:06:45,896]  INFO - LogMediator [ProxyQpidSender2MsgStoreSampling] MSG STORED = <soap:Body xmlns:soap="http://www.w3.org/2003/05/soap-envelope"><MySysDate>MsgStore - SysDate 11/28/14 12:06 AM</MySysDate></soap:Body>
[2014-11-28 00:07:00,014]  INFO - JmsConsumer [MsgStoreQpid02-C-1] reconnected to store.
[2014-11-28 00:07:00,029]  INFO - LogMediator [SequenceQpidSampling] MSG = <soap:Body xmlns:soap="http://www.w3.org/2003/05/soap-envelope"><MySysDate>MsgStore - SysDate 11/28/14 12:06 AM</MySysDate></soap:Body>
[2014-11-28 00:08:00,016]  INFO - LogMediator [SequenceQpidSampling] MSG = <soap:Body xmlns:soap="http://www.w3.org/2003/05/soap-envelope"><MySysDate>MsgStore - SysDate 11/28/14 12:06 AM</MySysDate></soap:Body>

IV.3. Creating Message Forwarding Processor in WSO2 ESB

1.- Create a Message Forwarding Processor implementing the OUT-ONLY pattern

In this case, if the backend service processes very well the request, then the message processor will delete the message from message store.

Message Forwarding Processor with OUT-ONLY implemented

Message Forwarding Processor with OUT-ONLY implemented

Before, we have to disable the below MessageProcessor (MsgProcessorQpidSampling), just for testing purposes. Also, we have to create a new Proxy to send messages well formated to MsgStoreQpid02.

1.1. Create the Synapse Proxy with OUT-ONLY property enabled.

 
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="ProxyQpidSender2MsgStoreFwdOutOnly"
       transports="http"
       statistics="disable"
       trace="disable"
       startOnLoad="true">
   <target>
      <inSequence>
         <property name="OUT_ONLY" value="true" scope="default" type="STRING"/>
         <property name="FORCE_SC_ACCEPTED"
                   value="true"
                   scope="axis2"
                   type="STRING"/>
         <header name="Action" scope="default" value="urn:echoString"/>
         <property name="SYS_DATE"
                   expression="get-property('SYSTEM_DATE')"
                   scope="default"/>
         <property name="messageType" value="text/plain" scope="transport"/>
         <property name="CONTENT_TYPE" value="application/soap+xml" scope="transport"/>
         <payloadFactory media-type="xml">
            <format>
               <echo:echoString xmlns:echo="http://echo.services.core.carbon.wso2.org">MySysDate $1</echo:echoString>
            </format>
            <args>
               <arg evaluator="xml" expression="$ctx:SYS_DATE"/>
            </args>
         </payloadFactory>
         <property xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope"
                   name="MSG_STORED"
                   expression="//soapenv:Body"
                   scope="default"/>
         <log level="custom">
            <property name="[ProxyQpidSender2MsgStoreFwdOutOnly] MSG STORED"
                      expression="$ctx:MSG_STORED"/>
         </log>
         <store messageStore="MsgStoreQpid02"/>
      </inSequence>
   </target>
   <description>Store the message in the WSO2 ESB Message Store (MsgStoreQpid02)</description>
</proxy>

Where:
* Action is the header property of SOAP message that represents the operation to be executed.
* OUT_ONLY: should be set to ‘true’ for OUT-ONLY pattern.

Also, as Endpoint I will use my custom echoPassThroughProxy, where the URL is: http://localhost:8310/services/echoPassThroughProxy

1.2. Create an Endpoint for our External backend service, in this case is my ‘echoPassThroughProxy’ will be created in WSO2 ESB.

 
<endpoint xmlns="http://ws.apache.org/ns/synapse" name="EndpointOfBackendSrv">
   <address uri="http://localhost:8310/services/echoPassThroughProxy"></address>
</endpoint>

1.3. Create the Message Forwarding Processor (OUT-ONLY).

 
<messageProcessor name="MsgProcessorQpidForwardingOutOnly" class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor" targetEndpoint="EndpointOfBackendSrv" messageStore="MsgStoreQpid02" xmlns="http://ws.apache.org/ns/synapse">
   <parameter name="interval">1000</parameter>
   <parameter name="client.retry.interval">1000</parameter>
   <parameter name="cronExpression">0 0/1 * 1/1 * ? *</parameter>
   <parameter name="is.active">true</parameter>
</messageProcessor>

1.4. Check the logs

 
[2014-11-27 23:25:25,003]  INFO - LogMediator [ProxyQpidSender2MsgStoreFwdOutOnly] MSG STORED = <soap:Body xmlns:soap="http://www.w3.org/2003/05/soap-envelope"><echo:echoString xmlns:echo="http://echo.services.core.carbon.wso2.org">MySysDate 11/27/14 11:25 PM</echo:echoString></soap:Body>
[2014-11-27 23:25:26,062] DEBUG - ServerWorker Starting a new Server Worker instance
[2014-11-27 23:25:26,064]  INFO - LogMediator [ProxyQpidSender2MsgStoreFwdOutOnly] MSG STORED = <soap:Body xmlns:soap="http://www.w3.org/2003/05/soap-envelope"><echo:echoString xmlns:echo="http://echo.services.core.carbon.wso2.org">MySysDate 11/27/14 11:25 PM</echo:echoString></soap:Body>
[2014-11-27 23:25:27,056] DEBUG - ServerWorker Starting a new Server Worker instance
[2014-11-27 23:25:27,058]  INFO - LogMediator [ProxyQpidSender2MsgStoreFwdOutOnly] MSG STORED = <soap:Body xmlns:soap="http://www.w3.org/2003/05/soap-envelope"><echo:echoString xmlns:echo="http://echo.services.core.carbon.wso2.org">MySysDate 11/27/14 11:25 PM</echo:echoString></soap:Body>
[2014-11-27 23:26:00,004] DEBUG - ServerWorker Starting a new Server Worker instance
[2014-11-27 23:26:00,005]  INFO - LogMediator [echoPassThroughProxy] = ----- inSeq ----- = [echoPassThroughProxy]
[2014-11-27 23:26:00,005]  INFO - LogMediator To: /services/echoPassThroughProxy, WSAction: urn:echoString, SOAPAction: urn:echoString, MessageID: urn:uuid:08a5db99-c590-4352-8ced-1ae8908814c2, Direction: request, Envelope: <?xml version="1.0" encoding="utf-8"?><soapenv:Envelope xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope"><soapenv:Body><echo:echoString xmlns:echo="http://echo.services.core.carbon.wso2.org">MySysDate 11/27/14 11:25 PM</echo:echoString></soapenv:Body></soapenv:Envelope>
[2014-11-27 23:26:00,008] DEBUG - ServerWorker Starting a new Server Worker instance
[2014-11-27 23:26:00,012]  INFO - LogMediator [echoPassThroughProxy] = ----- outSeq ----- = [echoPassThroughProxy]
[2014-11-27 23:26:00,012]  INFO - LogMediator To: http://www.w3.org/2005/08/addressing/anonymous, WSAction: , SOAPAction: , MessageID: urn:uuid:8916ed21-10a3-43ee-9d0e-6acf8a3f4de7, Direction: response, Envelope: <?xml version="1.0" encoding="utf-8"?><soapenv:Envelope xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope"><soapenv:Body><ns:echoStringResponse xmlns:ns="http://echo.services.core.carbon.wso2.org"><return xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:nil="true"></return></ns:echoStringResponse></soapenv:Body></soapenv:Envelope>

2.- Create a Message Forwarding Processor implementing IN-OUT pattern

The only difference with the above is that it requires two Synapse Sequences (Reply and Fault sequences) that will post-process the SOAP response of Backend Service in a blocking-mode as It is a succesfully response or unseccsfully response, in other words, if everything goes well, the message processor will send an ACK to message store to consume (delete) the stored message and the message processor will trigger the reply sequence, otherwise the message processor will trigger the fault squence (the message is not consumed/deleted of message store).

Message Forwarding Processor with IN-OUT implemented

Message Forwarding Processor with IN-OUT implemented

Before, disable the above message processor, just for testing purporses.

2.1. Create the Proxy with OUT_ONLY property set to false.

 
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="ProxyQpidSender2MsgStoreFwdInOut"
       transports="http"
       statistics="disable"
       trace="disable"
       startOnLoad="true">
   <target>
      <inSequence>
         <property name="OUT_ONLY" value="false" scope="default" type="STRING"/>
         <property name="FORCE_SC_ACCEPTED"
                   value="true"
                   scope="axis2"
                   type="STRING"/>
         <header name="Action" scope="default" value="urn:echoString"/>
         <property name="SYS_DATE"
                   expression="get-property('SYSTEM_DATE')"
                   scope="default"/>
         <property name="messageType" value="text/plain" scope="transport"/>
         <property name="CONTENT_TYPE" value="application/soap+xml" scope="transport"/>
         <payloadFactory media-type="xml">
            <format>
               <echo:echoString xmlns:echo="http://echo.services.core.carbon.wso2.org">MySysDate $1</echo:echoString>
            </format>
            <args>
               <arg evaluator="xml" expression="$ctx:SYS_DATE"/>
            </args>
         </payloadFactory>
         <property xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope"
                   name="MSG_STORED"
                   expression="//soapenv:Body"
                   scope="default"/>
         <log level="custom">
            <property name="[ProxyQpidSender2MsgStoreFwdInOut] MSG STORED"
                      expression="$ctx:MSG_STORED"/>
         </log>
         <store messageStore="MsgStoreQpid02"/>
      </inSequence>
   </target>
   <description>Store the message in the WSO2 ESB Message Store (MsgStoreQpid02)</description>
</proxy>

Where:
* Action is the header property of SOAP message that represents the operation to be executed.
* OUT_ONLY: should be set to ‘false’ for IN-OUT pattern or to remove the entry.

2.2. Create the Reply Sequence and Fault Sequence

 
<sequence xmlns="http://ws.apache.org/ns/synapse" name="SequenceQpidReplyFwdInOut">
   <log level="custom">
      <property name="[SequenceQpidReplyFwdInOut]" value="---------------------- = [SequenceQpidReplyFwdInOut]"></property>
   </log>
   <log level="full"></log>
   <drop></drop>
</sequence>
 
<sequence xmlns="http://ws.apache.org/ns/synapse" name="SequenceQpidFaultFwdInOut">
   <log level="custom">
      <property name="[SequenceQpidFaultFwdInOut]" value="---- Executing default 'fault' seq ---- = [SequenceQpidFaultFwdInOut]"></property>
   </log>
   <log level="custom">
      <property xmlns:ns="http://org.apache.synapse/xsd" name="ERROR_CODE" expression="get-property('ERROR_CODE')"></property>
   </log>
   <log level="custom">
      <property xmlns:ns="http://org.apache.synapse/xsd" name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"></property>
   </log>
   <drop></drop>
</sequence>

2.3.- Adjust the existing Message Processor to include new Sequences for the IN-OUT pattern.

 
<messageProcessor name="MsgProcessorQpidForwardingInOut" 
                  class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor" 
                  targetEndpoint="EndpointOfBackendSrv" 
                  messageStore="MsgStoreQpid02" 
                  xmlns="http://ws.apache.org/ns/synapse">
   <parameter name="interval">1500</parameter>
   <parameter name="client.retry.interval">1000</parameter>
   <parameter name="message.processor.reply.sequence">SequenceQpidReplyFwdInOut</parameter>
   <parameter name="message.processor.fault.sequence">SequenceQpidFaultFwdInOut</parameter>
   <parameter name="cronExpression">0 0/1 * 1/1 * ? *</parameter>
   <parameter name="is.active">true</parameter>
</messageProcessor>

Where:

  • SequenceQpidReplyFwdInOut will process Backend message response in blocking-mode (with ACK)
  • SequenceQpidFaultFwdInOut, if there is an exception or Backend service give an error (HTTP_SC error), then this sequence will be executed and any ACK will be send.

2.4.- Check the logs.

 
[2014-11-27 23:46:26,401]  INFO - LogMediator [ProxyQpidSender2MsgStoreFwdInOut] MSG STORED = <soap:Body xmlns:soap="http://www.w3.org/2003/05/soap-envelope"><echo:echoString xmlns:echo="http://echo.services.core.carbon.wso2.org">MySysDate 11/27/14 11:46 PM</echo:echoString></soap:Body>
[2014-11-27 23:47:00,006] DEBUG - ServerWorker Starting a new Server Worker instance
[2014-11-27 23:47:00,007]  INFO - LogMediator [echoPassThroughProxy] = ----- inSeq ----- = [echoPassThroughProxy]
[2014-11-27 23:47:00,007]  INFO - LogMediator To: /services/echoPassThroughProxy, WSAction: urn:echoString, SOAPAction: urn:echoString, MessageID: urn:uuid:fad450ca-19df-4e36-888d-eed3478e936e, Direction: request, Envelope: <?xml version="1.0" encoding="utf-8"?><soapenv:Envelope xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope"><soapenv:Body><echo:echoString xmlns:echo="http://echo.services.core.carbon.wso2.org">MySysDate 11/27/14 11:46 PM</echo:echoString></soapenv:Body></soapenv:Envelope>
[2014-11-27 23:47:00,010] DEBUG - ServerWorker Starting a new Server Worker instance
[2014-11-27 23:47:00,013]  INFO - LogMediator [echoPassThroughProxy] = ----- outSeq ----- = [echoPassThroughProxy]
[2014-11-27 23:47:00,014]  INFO - LogMediator To: http://www.w3.org/2005/08/addressing/anonymous, WSAction: , SOAPAction: , MessageID: urn:uuid:b785f6d0-de3c-4941-a799-80a7cdc8c0d0, Direction: response, Envelope: <?xml version="1.0" encoding="utf-8"?><soapenv:Envelope xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope"><soapenv:Body><ns:echoStringResponse xmlns:ns="http://echo.services.core.carbon.wso2.org"><return xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:nil="true"></return></ns:echoStringResponse></soapenv:Body></soapenv:Envelope>
[2014-11-27 23:47:00,016]  INFO - LogMediator [SequenceQpidReplyFwdInOut] = ---------------------- = [SequenceQpidReplyFwdInOut]
[2014-11-27 23:47:00,017]  INFO - LogMediator To: /services/ProxyQpidSender2MsgStoreFwdInOut.ProxyQpidSender2MsgStoreFwdInOutHttpSoap12Endpoint, WSAction: urn:echoString, SOAPAction: urn:echoString, MessageID: urn:uuid:888E0D71F886F13CF71417131986402876015-666129391, Direction: request, Envelope: <?xml version="1.0" encoding="utf-8"?><soapenv:Envelope xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope"><soapenv:Body><ns:echoStringResponse xmlns:ns="http://echo.services.core.carbon.wso2.org"><return xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:nil="true"></return></ns:echoStringResponse></soapenv:Body></soapenv:Envelope>

V. Conclusions

  • Apache Qpid as good alternative to WSO2 Message Broker because you have robustness, integration patterns based messaging implemented, scalability, compatible with JMS and AMQP standard, everything in a lightweight bundle and easy to install it.
  • WSO2 ESB is perfectly integrated to Apache Qpid, it is not necessary to program or extend some library, all integration can be achieved based on configurations only. No need to write any line of code.

@Chilcano

Tagged with: , , , , ,
Posted in Big Data, Broker, IoT, SOA
2 comments on “WSO2 Message Broker vs. Apache Qpid – Messaging Integration Patterns
  1. […] mi anterior post expliqué como usar Apache Qpid junto con WSO2 ESB (https://holisticsecurity.wordpress.com/2014/12/03/wso2-message-broker-vs-apache-qpid-messaging-eip), un message broker ligero, potente y compatible con JMS y AMQP, en lugar de WSO2 Message Broker, […]

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Archives
%d bloggers like this: