Wednesday, October 10, 2012

Mule 3.3.1 Using Until-Successful To Skip Retries

Introduction

Mule 3.x brings with it a new feature called Until-Successful to loop through some flow tasks where you can specify the number of retries, time between retries and an action to be performed upon retry exhaustion. It also allows you specify a failure condition in addition to the default failure conditions (network issues etc).

However, it is pretty rigid in that the configuration does not let you break out of retries on demand. In other words, your code inside the Until-successful flow succeeded BUT based on a response code (from your web service call) or any other condition, you just need to stop retrying and go straight to the failure condition or simulate the failure condition.

Your requirement is that: You need to try calling a web service from your Mule Flow.

Case 1 If service is unavailable (timeout, 500 etc), retry for a specified number of times and if all retries fail, write the original payload to a JMS message queue.

Case 2 If service is available, the call was successful BUT the web service response indicates that processing failed but operation could be tried again (a recoverable exception on the web service end), then retry the call.

Case 3 If service is available, the call was successful BUT the web service response indicates that the processing  failed (non-recoverable exception on the web service end - meaning no matter how many times you retry, processing will fail), then skip out of retries but still write the original payload to a JMS message queue.

The Most Obvious Solution (does not work)

If you dig deep into mule code, you will see the way mule decides the number of times to retry is by comparing the current retry number against the maxRetries configuration value. If current retry number exceeds the maxRetries, then the assumption is that it has tried to call the client web service 'x' number of times but has failed and so, will now go ahead and execute the failure flow.

Also, if you print out the Mule message, you will see that the current retry number data is in an invocation scoped variable called: process.attempt.count. You would be forgiven if you think, in Case 3 above, you can update this to a value higher than maxRetries and the Until-Successful code would see that and exit the loop and execute the failure condition. But alas no! Before entering the loop, Mule stores the original message in an in memory storage structure and reads that to try it every time. Thus, any invocation property you modify within the until-successful code will not be available because the failed message is discarded and the original message is read from memory for each retry.

The Not So Obvious Solution (and it works!!):

As you can see, first the message received is stored in a session variable for later use. Follow the <!-- LEARN:... tag for notes on what's happening in the flow.
The key takeaway is that you push all processing to a sub-flow of the Until-Successful loop. Within there, you handle your recoverable and non-recoverable exception conditions.
If you wish to simulate failure in any case, you can throw an exception using Groovy Script transformer which tells the Until-Successful in the main loop that processing failed and the message has to be retried.
In the example below, in case of an non-recoverable exception, the JMS message is written and a success is simulated in the sub-flow so that Until-Successful gets out of the loop assuming successful execution. For the recoverable exception, a failure is indicated by settng a message invocation property that is part of the failure expression check.

1:  <vm:endpoint exchange-pattern="one-way" path="unprocessedMessageStore" name="unprocessedMessageStore" doc:name="VM"/>  
2:  <flow name="myService" doc:name="myService">  
3:       <http:inbound-endpoint exchange-pattern="one-way" host="localhost" port="63012" path="my/service/path/v1/operation" doc:name="myService"/>  
4:       <object-to-string-transformer doc:name="Object to String"/>  
5:       <message-properties-transformer doc:name="Message Properties">  
6:              <add-message-property key="srcPayload" value="#[payload:]"/>  
7:       </message-properties-transformer>  
8:       <set-session-variable value="#[payload:]" variableName="srcPayload" doc:name="Session Variable" />  
9:  <!-- LEARN: This is a Web Service hosted from within Mule which receives the request from a client -->  
10:       <cxf:jaxws-service serviceClass="com.mycomp.member.IMemberManager" doc:name="SOAP Service"/>  
11:       <processor ref="mdcLogging"></processor>  
12:           <processor-chain doc:name="Processor Chain">  
13:              <message-properties-transformer scope="invocation" doc:name="Message Properties">  
14:              <add-message-property key="MessageHeader" value="#[groovy:payload.find{it instanceof javax.xml.ws.Holder}.value]"/>  
15:                  <add-message-property key="MyRequest" value="#[groovy:payload.find{it instanceof com.mycompany.MyRequest}]"/>  
16:                  <delete-message-property key=".*"/>  
17:              </message-properties-transformer>  
18:              <choice doc:name="Choice">  
19:                  <when expression="#[groovy:validationService.isValidValue(payload[1].data.value)]">  
20:                  <processor-chain>  
21:  <!-- LEARN: This is a custom request transformer -->  
22:            <custom-transformer class="com.mycompany.transformer.MyRequestTransformer" doc:name="Java"/>  
23:                      <choice>  
24:  <!-- LEARN: This verifies that the request is valid -->  
25:              <when evaluator="groovy" expression="message.getInvocationProperty('requestValid') == 'Y'">  
26:  <!-- LEARN: This is the until successful with a failure expression that checks that the invocation property indicates processing failure -->  
27:                <until-successful objectStore-ref="objectStore" maxRetries="5"   
28:                              secondsBetweenRetries="10" failureExpression="groovy:message.getInvocationProperty('callAction') == '500'"  
29:                              deadLetterQueue-ref="unprocessedMessageStore" doc:name="Until Successful">  
30:  <!-- LEARN: The key is to move all processing to a sub-flow where you can catch recoverable and unrecoverable exceptions. -->  
31:                  <processor-chain>  
32:                                      <flow-ref name="mySubFlow"/>  
33:                                  </processor-chain>  
34:                              </until-successful>  
35:                          </when>  
36:                          <otherwise>  
37:                              <flow-ref name="errorHandlerBadInput" doc:name="errorHandlerBadInput"/>  
38:                          </otherwise>  
39:                      </choice>  
40:                  </processor-chain>  
41:                  </when>  
42:              </choice>  
43:          </processor-chain>  
44:      <catch-exception-strategy doc:name="Catch Exception Strategy">  
45:          <flow-ref name="jmsAuditDirectWrite" doc:name="Log Unprocessed Message to Queue"/>  
46:    </catch-exception-strategy>  
47:  </flow>  
48:    
49:    <sub-flow name="myWebServiceClient" doc:name="myWebServiceClient">  
50:          <cxf:jaxws-client operation="MyRequest" serviceClass="com.my.client.v1.IMemberManager" doc:name="Webservice Client">  
51:              <cxf:outInterceptors/>  
52:          </cxf:jaxws-client>  
53:          <http:outbound-endpoint exchange-pattern="request-response" address="http://..." doc:name="myClientRequest"/>  
54:      </sub-flow>  
55:      <sub-flow name="jmsAuditDirectWrite" doc:name="jmsAuditDirectWrite">  
56:      <expression-transformer>  
57:      <return-argument expression="session:srcPayload" evaluator="header" />  
58:      </expression-transformer>  
59:      <jms:outbound-endpoint queue="esb.unprocessed.request.err" connector-ref="jmsEsbXAConnector" doc:name="JMS"/>  
60:      </sub-flow>  
61:    
62:      <sub-flow name="mySubFlow">  
63:          <processor-chain>  
64:  <!-- LEARN: Call the client web service. -->  
65:              <flow-ref name="myWebServiceClient" doc:name="Invoke WebService"/>  
66:  <!-- LEARN: This is the custom transformer of the response ( See code snippet below) that brings back with it a status code (which acts like a non-recoverable exception or a recoverable exception exception as the case may be. -->  
67:         <custom-transformer class="com.mycompany.transformer.MyResponseTransformer" doc:name="Java"/>  
68:         <choice>  
69:  <!-- LEARN: This is the condition that says this is a non-recoverable exception and hence, write to a JMS queue and skip the loop. -->  
70:           <when evaluator="groovy" expression="message.getInvocationProperty('skip') == 'Y'">  
71:                      <flow-ref name="jmsAuditDirectWrite" doc:name="Log Unprocessed Message to Queue"/>  
72:                      <message-properties-transformer scope="invocation" doc:name="Message Properties">  
73:  <!-- LEARN: This is the property that fails the failure expression on the Until-Successful loop and hence until successful loop processing stops -->  
74:                          <add-message-property key="callAction" value="202Y"/>  
75:                      </message-properties-transformer>  
76:                  </when>  
77:                  <when evaluator="groovy" expression="message.getInvocationProperty('skip') == '202Y'">  
78:  <!-- LEARN: This is the condition that says this message was processed successfully and hence skip the loop. -->  
79:                      <message-properties-transformer scope="invocation" doc:name="Message Properties">  
80:  <!-- LEARN: This is the property that fails the failure expression on the Until-Successful loop and hence until successful loop processing stops -->  
81:              <add-message-property key="callAction" value="202Y"/>  
82:                      </message-properties-transformer>  
83:                  </when>  
84:              </choice>  
85:          </processor-chain>  
86:      </sub-flow>  
87:    


The Custom Response Transformer

Within com.mycompany.transformer.MyResponseTransformer, this happens:
// jsr is the unmarshalled response returned from the web service I called.
// it carries a status code. A non-zero value is a failure in processing.
// the resendFlag indicates whether it was a recoverable exception (resend=true)
// or a non-recoverable exception (resend=false)

1:  returnValue.setMessage(jsr.getMessage());  
2:  returnValue.setStatus(jsr.getStatus());  
3:  if (jsr.getStatus().codeValue() == 0) {  
4:    message.setInvocationProperty("callAction", "202");  
5:      logger.info("Client SUCCESSFULLY PROCESSED with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
6:      message.setInvocationProperty("skip", "202Y");  
7:  } else {  
8:    if (!jsr.isResend()) {  
9:          message.setInvocationProperty("callAction", "500");  
10:          logger.info("Client FAILED PROCESSING with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
11:          message.setInvocationProperty("skip", "Y"); //do not resend; store;  
12:  } else {  
13:          message.setInvocationProperty("callAction", "500");  
14:          logger.info("Client requested a RESEND of the request with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
15:          message.setInvocationProperty("skip", "N");  
16:          }  
17:      }  
18:  }   
19:    

2 comments:

Unknown said...

I have a scenario wherein I am pinging HTTP endpoint and getting a JSON response. I am checking that response if the "Completed", I am moving ahead.
I have tried -
But this is giving an error.

AJN said...

This is exactly what I was looking for! Very well explained. Thank you.