Monday, March 25, 2013

Mule 3.3 Start and Stop Flows Programmatically - A Retry Strategy for your Messages

Mule 3.3 brings with it some updates to handle on demand execution of flows. This blog discusses the ability to use a combination of Mule and Active MQ (or a messaging tool of your choice) to provide reliability and fault tolerance by using retry techniques.
The retry is on demand and programmatically controlled. A retry flow programmatically starts and stops a flow.

Your requirement is that: When a call to an external service fails or results in a business exception, you need to be able to store the original request in an error queue and to be able to retry those requests on demand.

Start and Stop Flows Programmatically On-demand


You break your implementation into four separate main flows. Flows three and four are the meat of this blog.
The first flow reads an incoming message from a JMS queue and place it in an intermediate queue.
The second flow transactionally processes the intermediate queue and write to a *.retry queue where appropriate.
The third flow is a trigger for the retry process which listens to a specific message on a queue to programmatically start and stop the fourth flow.
The fourth flow wakes up when told to, sweeps the *.retry queue once for all retry messages to process them and is shut down until the next time it is needed.

To get to the point, I will document Flows 3 and 4 first. If you are insterested, you can scroll down to view the first two flows to understand how the message got into the retry queue in the first place.

Flow 3 The flow that retries the request. Initial state on the flow is "stopped".


   <flow name="retry.masterupdate.request.to.client.jms.flow" initialState="stopped" processingStrategy="synchronous">  
     <jms:inbound-endpoint exchange-pattern="request-response" queue="my.intermediate.queue.masterupdate.retry" mimeType="text/plain" connector-ref="myRetryInConnector" doc:name="JMS"/>  
     <object-to-string-transformer doc:name="Object to String"/>  
     <set-variable variableName="initialMessageReceived" value="#[payload:]" doc:name="Variable"/>  
 <!-- PLACE into the original queue to be reprocessed. See Flow 1 -->  
     <jms:outbound-endpoint queue="my.masterupdate" connector-ref="myOutboundConnector" doc:name="JMS"/>  
     <choice-exception-strategy doc:name="retryEndpoint.exception.strategy">  
       <catch-exception-strategy when="true" doc:name="Catch Exception Strategy">  
         <choice doc:name="Choice">  
           <when expression="#[flowVars['initialMessageReceived'] != null]">  
             <set-payload value="#[flowVars['initialMessageReceived']]" doc:name="Set Payload"/>  
           </when>  
         </choice>  
 <!-- PLACE back into the retry queue to be reprocessed the next time around. -->  
         <jms:outbound-endpoint queue="my.intermediate.queue.masterupdate.retry" connector-ref="myOutboundConnector" doc:name="JMS"/>  
       </catch-exception-strategy>  
     </choice-exception-strategy>  
   </flow>  

Flow 4 The flow that initiates the retry based on a triiger, which is a message in a specific queue.

   <flow name="initiate.retry" processingStrategy="synchronous">  
     <jms:inbound-endpoint exchange-pattern="request-response" queue="masterupdate.retry.trigger" mimeType="text/plain" connector-ref="myThrottledInConnector" doc:name="JMS"/>  
     <object-to-string-transformer doc:name="Object to String"/>  
     <choice doc:name="Choice">  
 <!-- THIS is the trigger queue with na specific message that controls which data to retry. -->  
       <when expression="#[groovy: payload.toUpperCase().startsWith('MASTER_UPDATE_RETRY')]">  
         <processor-chain>  
           <scripting:component doc:name="Start flow to retry">  
             <scripting:script engine="Groovy">  
               <scripting:text><![CDATA[((org.mule.construct.Flow)(muleContext.getRegistry().lookupFlowConstruct('retry.masterupdate.request.to.client.jms.flow'))).start();]]></scripting:text>  
             </scripting:script>  
           </scripting:component>  
 <!-- STOP retry flow after completion. -->  
           <flow-ref name="stop.retry.flows" doc:name="Flow Reference"/>  
         </processor-chain>  
       </when>  
     </choice>  
   </flow>  
   <sub-flow name="stop.retry.flows" doc:name="stop.retry.flows">  
     <scripting:component doc:name="Stop Flow">  
       <scripting:script engine="Groovy">  
         <scripting:text><![CDATA[((org.mule.construct.Flow)(muleContext.getRegistry().lookupFlowConstruct('retry.masterupdate.request.to.client.jms.flow'))).stop();]]></scripting:text>  
       </scripting:script>  
     </scripting:component>  
   </sub-flow>  


Flow 1 which you see below receives the request and places it in an intermediate JMS queue after some basic validations. If there is an error, the request goes to an error queue.


   <flow name="my.jms.flow" processingStrategy="synchronous">  
     <jms:inbound-endpoint exchange-pattern="request-response" queue="my.masterupdate" mimeType="text/xml" connector-ref="myThrottledInConnector" doc:name="JMS">  
     </jms:inbound-endpoint>  
     <set-variable variableName="initialMessageReceived" value="#[payload:]" doc:name="Variable"/>  
     <logger message="Incoming message from Master - #[payload:]" level="INFO" doc:name="Logger"/>  
     <processor-chain>  
       <set-payload value="#[flowVars['initialMessageReceived']]" doc:name="Set Payload"/>  
       <object-to-string-transformer />  
       <custom-transformer class="com.esb.transformers.MyCustomTransformer" doc:name="Java"/>  
       <jms:outbound-endpoint exchange-pattern="one-way" queue="my.intermediate.queue.masterupdate" connector-ref="myOutboundConnector" doc:name="JMS">  
       </jms:outbound-endpoint>  
     </processor-chain>  
     <choice-exception-strategy doc:name="my.jms.exception.strategy">  
       <catch-exception-strategy when="#[exception.causedBy(com.esb.exceptions.ValidationException)]" doc:name="Catch Exception Strategy">  
         <object-to-string-transformer doc:name="Object to String"/>  
         <logger message="My Validation Exception." level="ERROR" doc:name="Logger"/>  
         <logger level="ERROR" doc:name="Logger"/>  
         <choice>  
           <when expression="#[flowVars['initialMessageReceived'] != null]">  
             <set-payload value="#[flowVars['initialMessageReceived']]" doc:name="Set Payload"/>  
           </when>  
         </choice>  
         <message-properties-transformer scope="outbound" doc:name="Message Properties">  
           <add-message-property key="BIZ_ERR" value="My Validation Exception."/>  
         </message-properties-transformer>  
         <jms:outbound-endpoint exchange-pattern="one-way" queue="my.masterupdate.err" connector-ref="myOutboundConnector" doc:name="JMS">  
         </jms:outbound-endpoint>  
       </catch-exception-strategy>  
       <catch-exception-strategy when="true" doc:name="Catch Exception Strategy">  
         <choice>  
           <when expression="#[flowVars['initialMessageReceived'] != null]">  
             <set-payload value="#[flowVars['initialMessageReceived']]" doc:name="Set Payload"/>  
           </when>  
         </choice>  
         <logger message="Unhandled exception while calling. #[exception.message] for Payload: #[payload:]" level="ERROR" doc:name="Logger"/>  
         <logger level="ERROR" doc:name="Logger"/>  
         <message-properties-transformer scope="outbound" doc:name="Message Properties">  
           <add-message-property key="BIZ_ERR" value="#[exception.message]" />  
         </message-properties-transformer>  
         <jms:outbound-endpoint exchange-pattern="one-way" queue="my.masterupdate.err" connector-ref="myOutboundConnector" doc:name="JMS">  
         </jms:outbound-endpoint>  
       </catch-exception-strategy>  
     </choice-exception-strategy>  
   </flow>  

Flow 2 which transactionally reads the intermediate queue and processes the mesage. Upon errors, it writes message to a *.retry exception.


   <flow name="my.intermediate.queue.masterupdate" processingStrategy="synchronous" >  
     <jms:inbound-endpoint exchange-pattern="request-response" queue="my.intermediate.queue.masterupdate" mimeType="text/xml" connector-ref="myThrottledInConnectorInfiniteRetry" doc:name="JMS">  
       <ee:multi-transaction action="ALWAYS_BEGIN"/>  
     </jms:inbound-endpoint>  
     <set-variable variableName="initialMessageReceivedCP" value="#[payload:]" doc:name="Variable"/>  
     <message-properties-transformer scope="outbound" doc:name="Message Properties">  
       <add-message-property key="Content-Type" value="application/xml; charset=UTF8"/>  
     </message-properties-transformer>  
     <http:outbound-endpoint exchange-pattern="request-response" address="http://my.service.call" mimeType="application/xml" contentType="application/xml" responseTimeout="120000" doc:name="HTTP"/>  
     <object-to-string-transformer />  
     <logger message="RETURN message - #[payload:]" level="INFO" doc:name="Logger"/>  
     <choice>  
       <when expression="#[groovy: message.getInboundProperty('http.status') != null &amp;&amp; message.getInboundProperty('http.status').startsWith('400')]">  
         <logger level="DEBUG" message="Returned http status code 400."/>  
 <!-- THROW a business exception here. -->  
 <!-- THIS is a business exception. Can be retried later. -->  
       </when>  
       <when expression="#[groovy: message.getInboundProperty('http.status') != null &amp;&amp; message.getInboundProperty('http.status').startsWith('2')]">  
         <logger level="TRACE" message=" "/>  
 <!-- THIS is a SUCCESS. -->  
       </when>  
       <otherwise>  
 <!-- THIS is a Transaction ROLLBACK with a retry due to network issues with reaching the end point. This is not the same as retrying a business exception.  
 We do not want to use this feature for business exception retries because the same message will be rolled back and retried instead of moving on to the next message.  
 We believe that the business exception is perhaps a data clean up issue where the message can be retried later.  
 -->  
         <logger message="Unable to reach endpoint...HTTP Status #[groovy: message.getInboundProperty('http.status')]" level="DEBUG" doc:name="Logger"/>  
         <scripting:component doc:name="Throw Exception">  
           <scripting:script engine="Groovy">  
             <scripting:text><![CDATA[throw new java.net.ConnectException("Unable to reach end point. Retrying...");]]></scripting:text>  
           </scripting:script>  
         </scripting:component>  
       </otherwise>        
     </choice>  
     <choice-exception-strategy doc:name="my.exception.strategy">  
       <rollback-exception-strategy when="#[exception.causedBy(org.mule.api.routing.filter.FilterUnacceptedException) || exception.causedBy(java.net.UnknownHostException) || exception.causedBy(java.net.ConnectException) || exception.causedBy(java.net.SocketTimeoutException) || exception.causedBy(java.net.SocketException)]" doc:name="Catch Exception Strategy">  
         <logger message="Network/Connection exception while calling endpoint. Payload: #[payload:]. RETRYING..." level="ERROR" doc:name="Logger"/>  
         <set-payload value="#[flowVars['initialMessageReceivedCP']]" doc:name="Set Payload"/>  
       </rollback-exception-strategy>  
       <catch-exception-strategy when="#[exception.causedBy(com.esb.MyBusinessException)]" doc:name="Catch Exception Strategy">  
         <object-to-string-transformer doc:name="Object to String"/>  
         <logger message="My business exception." level="ERROR" doc:name="Logger"/>  
         <set-payload value="#[flowVars['initialMessageReceivedCP']]" doc:name="Set Payload"/>  
         <message-properties-transformer scope="outbound" doc:name="Message Properties">  
           <add-message-property key="BIZ_ERR" value="My business exception text"/>  
         </message-properties-transformer>  
 <!-- THIS is a business exception. Can be retried later. -->  
         <jms:outbound-endpoint exchange-pattern="one-way" queue="my.intermediate.queue.masterupdate.retry" connector-ref="myOutboundConnector" doc:name="JMS">  
         </jms:outbound-endpoint>  
       </catch-exception-strategy>  
       <catch-exception-strategy when="true" doc:name="Catch Exception Strategy">  
         <choice>  
           <when expression="#[flowVars['initialMessageReceivedCP'] != null]">  
             <set-payload value="#[flowVars['initialMessageReceivedCP']]" doc:name="Set Payload"/>  
           </when>  
         </choice>  
 <!-- THIS is an UNKNOWN exception. Can be retried later. -->  
         <logger message="Unhandled exception while calling end point. Payload: #[payload:]" level="ERROR"doc:name="Logger"/>  
         <jms:outbound-endpoint exchange-pattern="one-way" queue="my.intermediate.queue.masterupdate.retry" connector-ref="myOutboundConnector" doc:name="JMS">  
         </jms:outbound-endpoint>  
       </catch-exception-strategy>  
     </choice-exception-strategy>  
   </flow>