Showing posts with label mule failure expression. Show all posts
Showing posts with label mule failure expression. Show all posts

Wednesday, October 10, 2012

Mule ESB Reliability with Exception Strategy

Mule 3.x brings with it some nice exception strategy handling features that you can use to provide reliable message delivery and retrying service calls.

Your requirement is that: You need to try calling a web service from your Mule Flow.

Case 1 If service is unavailable (timeout, 500 etc), retry forever (or a specified number of times and if all retries fail, write the original payload to a JMS message queue).

Case 2 If service is available, the call was successful BUT the web service response indicates that processing failed but operation could be tried again (a recoverable exception on the web service end), then retry the call.

Case 3 If service is available, the call was successful BUT the web service response indicates that the processing  failed (non-recoverable exception on the web service end - meaning no matter how many times you retry, processing will fail), then skip out of retries but still write the original payload to a JMS message queue.

Using JMS, JMS Transactions and Exception Strategies

You break your implementation into two separate main flows.
Flow 1 which you see below receives the request and places it in a JMS queue, transactionally.

1:    <flow name="myServiceV2" doc:name="myServiceV2">  
2:      <http:inbound-endpoint exchange-pattern="one-way" host="localhost" port="63012" path="my/service/v2/operation" doc:name="myService"/>  
3:      <object-to-string-transformer doc:name="Object to String"/>  
4:      <message-properties-transformer doc:name="Message Properties">  
5:        <add-message-property key="srcPayload" value="#[payload:]"/>  
6:      </message-properties-transformer>  
7:      <set-session-variable value="#[payload:]" variableName="srcPayload" doc:name="Session Variable" />  
8:      <logger message="Request SRC message is #[payload:]" level="INFO" doc:name="Logger"/>  
9:      <vm:outbound-endpoint exchange-pattern="one-way" path="unprocessedOriginalMessageStoreV2" responseTimeout="20000" mimeType="text/plain" doc:name="Dispatch to audit"/>  
10:      </flow>  
11:        
12:    <flow name="unprocessedOriginalMessageStoreV2" doc:name="unprocessedOriginalMessageStoreV2">  
13:      <vm:inbound-endpoint exchange-pattern="one-way" path="unprocessedOriginalMessageStoreV2" doc:name="VM">  
14:      </vm:inbound-endpoint>  
15:        <object-to-string-transformer />  
16:      <expression-transformer doc:name="Expression">  
17:        <return-argument evaluator="header" expression="session:srcPayload"/>  
18:      </expression-transformer>  
19:      <jms:outbound-endpoint queue="esb.unprocessed.original.message" connector-ref="jmsPalEsbXAConnector" doc:name="JMS">  
20:          <jms:transaction action="ALWAYS_BEGIN"/>  
21:      </jms:outbound-endpoint>  
22:    </flow>  
23:    


Next, write another flow that reads from the message queue and processes the message. Note the transactions, the exception handling strategy and the rollback of the message.

1:    <flow name="processMyRequests" doc:name="processMyRequests">  
2:      <jms:inbound-endpoint queue="esb.unprocessed.original.message" connector-ref="jmsPalEsbXAConnector" doc:name=" processor">  
3:        <xa-transaction action="ALWAYS_BEGIN" timeout="60000"/>  
4:      </jms:inbound-endpoint>  
5:      <vm:outbound-endpoint exchange-pattern="one-way" path="jmsTask" responseTimeout="20000" mimeType="text/plain" doc:name="Dispatch to processing">  
6:        <xa-transaction action="ALWAYS_JOIN" timeout="60000"/>  
7:      </vm:outbound-endpoint>  
8:    </flow>  
9:      
10:    <flow name="processMyRequestJMS" processingStrategy="synchronous">  
11:      <vm:inbound-endpoint exchange-pattern="one-way" path="jmsTask" doc:name="VM">  
12:        <xa-transaction action="BEGIN_OR_JOIN" timeout="60000"/>  
13:      </vm:inbound-endpoint>  
14:      <set-session-variable value="#[payload:]" variableName="srcPayload" doc:name="Session Variable" />  
15:      <cxf:jaxws-service serviceClass="com.mycompany.operations.v1.IMemberManager" doc:name="SOAP Service"/>  
16:      <set-session-variable value="N" variableName="skipRetry" doc:name="Variable" />  
17:      <processor-chain doc:name="Processor Chain">  
18:        <message-properties-transformer scope="invocation" doc:name="Message Properties">  
19:          <add-message-property key="MessageHeader" value="#[groovy:payload.find{it instanceof javax.xml.ws.Holder}.value]"/>  
20:          <add-message-property key="MyRequest" value="#[groovy:payload.find{it instanceof com.mycompany.member.MyRequest}]"/>  
21:          <delete-message-property key=".*"/>  
22:        </message-properties-transformer>  
23:        <set-session-variable variableName="skipRetry" value="N" doc:name="Variable"/>  
24:        <choice doc:name="Choice">  
25:          <when expression="#[groovy:validationService.isValidValue(payload[1].data.value)]">  
26:            <processor-chain>  
27:              <custom-transformer class="com.mycompany.transformer.MyRequestTransformer" doc:name="Java"/>  
28:              <choice doc:name="Choice">  
29:                <when expression="groovy:message.getInvocationProperty('requestValid') == 'Y'">  
30:                  <processor-chain>  
31:  <!-- LEARN: Call the client webservice -->  
32:                    <flow-ref name="webServiceClientV2" doc:name="Invoke WebService"/>  
33:  <!-- LEARN: This is the custom transformer that reads the response on successful call of client and sets the skip property -->  
34:  <!-- LEARN: 'skip' is set to 'D' (for Done) if call was successfull; -->  
35:  <!-- LEARN: 'skip' is set to 'Y' if call was successfull but response indicated exception was non-recoverable; (so no retries but write to original msg to JMS Queue)-->  
36:  <!-- LEARN: 'skip' is set to 'N' if call was successfull but response indicated exception was recoverable; (so retry); -->  
37:                    <custom-transformer class="com.mycompany.transformer.MyResponseTransformer" doc:name="Java"/>  
38:                                      <choice>  
39:                                          <when evaluator="groovy" expression="message.getInvocationProperty('skip') != 'D'">  
40:  <!-- LEARN: When 'skip' is NOT 'D' then set the value of skip to a session or flow variable and throw a dummy exception-->   
41:                                              <set-session-variable value="#[groovy:message.getInvocationProperty('skip')]" variableName="skipRetry" doc:name="Variable" />  
42:                                              <scripting:transformer doc:name="Create Dummy Exception">  
43:                                            <scripting:script engine="groovy">  
44:                                              <scripting:text><![CDATA[throw new java.lang.Exception('IGNORE THIS EXCEPTION');]]></scripting:text>  
45:                                            </scripting:script>  
46:                                          </scripting:transformer>  
47:                            </when>  
48:                                      </choice>  
49:                  </processor-chain>  
50:                </when>  
51:                <otherwise>  
52:                  <processor-chain>  
53:                    <flow-ref name="errorHandlerBadInput" doc:name="errorHandlerBadInput"/>  
54:                  </processor-chain>  
55:                </otherwise>  
56:              </choice>  
57:            </processor-chain>  
58:          </when>  
59:          <otherwise>  
60:            <processor-chain>  
61:              <flow-ref name="defaultErrorHandlerV2" doc:name="defaultErrorHandlerV2"/>  
62:            </processor-chain>  
63:          </otherwise>  
64:        </choice>  
65:      </processor-chain>  
66:      <choice-exception-strategy doc:name="Choice Exception Strategy">  
67:  <!-- LEARN: When dummy exception thrown, check the flow or session variable 'skipRetry' ; If do not skip, then rollback JMS transaction to re queue the JMS message -->  
68:         <rollback-exception-strategy when="#[groovy:message.getProperty('skipRetry', org.mule.api.transport.PropertyScope.SESSION) == 'N' || exception.causedBy(java.net.UnknownHostException) || exception.causedBy(java.net.ConnectException)]" maxRedeliveryAttempts="2">  
69:            <on-redelivery-attempts-exceeded>  
70:              <logger message="STRATEGY ROLLBACK WITH LIMITED RETRIES #[payload:]" level="INFO" doc:name="Logger"/>  
71:              <flow-ref name="jmsAuditDirectWriteV2" doc:name="Log Unprocessed Message to Original Queue"/>  
72:            </on-redelivery-attempts-exceeded>  
73:        </rollback-exception-strategy>  
74:    
75:  <!-- LEARN: When dummy exception thrown, check the flow or session variable 'skipRetry' ; If do not skip, then place the original message back on main processing queue for reprocessing -->  
76:  <!-- LEARN: If you uncomment out this forever strategy AND comment out the above ROLLBACK strategy, you get retry forever -->  
77:  <!--      <catch-exception-strategy when="#[groovy:message.getProperty('skipRetry', org.mule.api.transport.PropertyScope.SESSION) == 'N' || exception.causedBy(java.net.UnknownHostException) || exception.causedBy(java.net.ConnectException)]">  
78:          <logger message="STRATEGY RETRY FOREVER #[payload:]" level="INFO" doc:name="Logger"/>  
79:              <flow-ref name="unprocessedOriginalMessageStoreV2" doc:name="Log Unprocessed Message to Original Queue"/>  
80:        </catch-exception-strategy>  
81:  -->  
82:        <catch-exception-strategy when="#[groovy:message.getProperty('skipRetry', org.mule.api.transport.PropertyScope.SESSION) == 'Y']" doc:name="Write to Error Queue">  
83:          <logger message="STRATEGY WRITE TO ERROR QUEUE; SKIP RETRY; #[payload:]" level="INFO" doc:name="Logger"/>  
84:          <flow-ref name="jmsAuditDirectWriteV2" doc:name="Log Unprocessed Message to Error Queue"/>  
85:        </catch-exception-strategy>  
86:        <catch-exception-strategy doc:name="Write to Error Queue">  
87:          <logger message="STRATEGY DEFAULT WRITE TO ERROR QUEUE; #[payload:]" level="INFO" doc:name="Logger"/>  
88:          <flow-ref name="jmsAuditDirectWriteV2" doc:name="Log Unprocessed Message to Error Queue"/>  
89:        </catch-exception-strategy>  
90:      </choice-exception-strategy>  
91:    </flow>  
92:    <sub-flow name="webServiceClientV2" doc:name="webServiceClientV2">  
93:      <logger message="client 0 : #[payload:]" level="ERROR" doc:name="Log Error"/>  
94:      <cxf:jaxws-client operation="MyRequest" serviceClass="com.mycompany.operations.IMemberManager" doc:name="Webservice Client">  
95:        <cxf:outInterceptors/>  
96:      </cxf:jaxws-client>  
97:      <http:outbound-endpoint exchange-pattern="request-response" address="http://..." doc:name="request"/>  
98:    </sub-flow>  
99:    <sub-flow name="jmsAuditDirectWriteV2" doc:name="jmsAuditDirectWriteV2">  
100:      <expression-transformer doc:name="Expression">  
101:        <return-argument evaluator="header" expression="session:srcPayload"/>  
102:      </expression-transformer>  
103:      <jms:outbound-endpoint queue="esb.unprocessed.request.err" connector-ref="jmsPalEsbXAConnector" doc:name="JMS"/>  
104:    </sub-flow>  
105:    <flow name="unprocessedOriginalMessageStoreV2" doc:name="unprocessedOriginalMessageStoreV2">  
106:      <expression-transformer doc:name="Expression">  
107:        <return-argument evaluator="header" expression="session:srcPayload"/>  
108:      </expression-transformer>  
109:        <object-to-string-transformer />  
110:      <logger message="JMS message is #[payload:]" level="INFO" doc:name="Logger"/>  
111:      <jms:outbound-endpoint queue="esb.unprocessed.original.message" connector-ref="jmsPalEsbXAConnector" doc:name="JMS">  
112:      </jms:outbound-endpoint>  
113:    </flow>  
114:  </mule>  
115:    


The Custom Response Transformer

Within com.mycompany.transformer.MyResponseTransformer, this happens:
// jsr is the unmarshalled response returned from the web service I called.
// It carries a status code. A non-zero value is a failure in processing.
// The resendFlag indicates whether it was a recoverable exception (resend=true)
// or a non-recoverable exception (resend=false)

 
1:  returnValue.setMessage(jsr.getMessage());  
2:  returnValue.setStatus(jsr.getStatus());  
3:  if (jsr.getStatus().codeValue() == 0) {  
4:      logger.info("Client SUCCESSFULLY PROCESSED with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
5:      message.setInvocationProperty("skip", "D");  
6:  } else {  
7:    if (!jsr.isResend()) {  
8:          logger.info("Client FAILED PROCESSING with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
9:          message.setInvocationProperty("skip", "Y"); //do not resend; store;  
10:  } else {  
11:          logger.info("Client requested a RESEND of the request with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
12:          message.setInvocationProperty("skip", "N");  
13:          }  
14:      }  
15:  }   




Mule 3.3.1 Using Until-Successful To Skip Retries

Introduction

Mule 3.x brings with it a new feature called Until-Successful to loop through some flow tasks where you can specify the number of retries, time between retries and an action to be performed upon retry exhaustion. It also allows you specify a failure condition in addition to the default failure conditions (network issues etc).

However, it is pretty rigid in that the configuration does not let you break out of retries on demand. In other words, your code inside the Until-successful flow succeeded BUT based on a response code (from your web service call) or any other condition, you just need to stop retrying and go straight to the failure condition or simulate the failure condition.

Your requirement is that: You need to try calling a web service from your Mule Flow.

Case 1 If service is unavailable (timeout, 500 etc), retry for a specified number of times and if all retries fail, write the original payload to a JMS message queue.

Case 2 If service is available, the call was successful BUT the web service response indicates that processing failed but operation could be tried again (a recoverable exception on the web service end), then retry the call.

Case 3 If service is available, the call was successful BUT the web service response indicates that the processing  failed (non-recoverable exception on the web service end - meaning no matter how many times you retry, processing will fail), then skip out of retries but still write the original payload to a JMS message queue.

The Most Obvious Solution (does not work)

If you dig deep into mule code, you will see the way mule decides the number of times to retry is by comparing the current retry number against the maxRetries configuration value. If current retry number exceeds the maxRetries, then the assumption is that it has tried to call the client web service 'x' number of times but has failed and so, will now go ahead and execute the failure flow.

Also, if you print out the Mule message, you will see that the current retry number data is in an invocation scoped variable called: process.attempt.count. You would be forgiven if you think, in Case 3 above, you can update this to a value higher than maxRetries and the Until-Successful code would see that and exit the loop and execute the failure condition. But alas no! Before entering the loop, Mule stores the original message in an in memory storage structure and reads that to try it every time. Thus, any invocation property you modify within the until-successful code will not be available because the failed message is discarded and the original message is read from memory for each retry.

The Not So Obvious Solution (and it works!!):

As you can see, first the message received is stored in a session variable for later use. Follow the <!-- LEARN:... tag for notes on what's happening in the flow.
The key takeaway is that you push all processing to a sub-flow of the Until-Successful loop. Within there, you handle your recoverable and non-recoverable exception conditions.
If you wish to simulate failure in any case, you can throw an exception using Groovy Script transformer which tells the Until-Successful in the main loop that processing failed and the message has to be retried.
In the example below, in case of an non-recoverable exception, the JMS message is written and a success is simulated in the sub-flow so that Until-Successful gets out of the loop assuming successful execution. For the recoverable exception, a failure is indicated by settng a message invocation property that is part of the failure expression check.

1:  <vm:endpoint exchange-pattern="one-way" path="unprocessedMessageStore" name="unprocessedMessageStore" doc:name="VM"/>  
2:  <flow name="myService" doc:name="myService">  
3:       <http:inbound-endpoint exchange-pattern="one-way" host="localhost" port="63012" path="my/service/path/v1/operation" doc:name="myService"/>  
4:       <object-to-string-transformer doc:name="Object to String"/>  
5:       <message-properties-transformer doc:name="Message Properties">  
6:              <add-message-property key="srcPayload" value="#[payload:]"/>  
7:       </message-properties-transformer>  
8:       <set-session-variable value="#[payload:]" variableName="srcPayload" doc:name="Session Variable" />  
9:  <!-- LEARN: This is a Web Service hosted from within Mule which receives the request from a client -->  
10:       <cxf:jaxws-service serviceClass="com.mycomp.member.IMemberManager" doc:name="SOAP Service"/>  
11:       <processor ref="mdcLogging"></processor>  
12:           <processor-chain doc:name="Processor Chain">  
13:              <message-properties-transformer scope="invocation" doc:name="Message Properties">  
14:              <add-message-property key="MessageHeader" value="#[groovy:payload.find{it instanceof javax.xml.ws.Holder}.value]"/>  
15:                  <add-message-property key="MyRequest" value="#[groovy:payload.find{it instanceof com.mycompany.MyRequest}]"/>  
16:                  <delete-message-property key=".*"/>  
17:              </message-properties-transformer>  
18:              <choice doc:name="Choice">  
19:                  <when expression="#[groovy:validationService.isValidValue(payload[1].data.value)]">  
20:                  <processor-chain>  
21:  <!-- LEARN: This is a custom request transformer -->  
22:            <custom-transformer class="com.mycompany.transformer.MyRequestTransformer" doc:name="Java"/>  
23:                      <choice>  
24:  <!-- LEARN: This verifies that the request is valid -->  
25:              <when evaluator="groovy" expression="message.getInvocationProperty('requestValid') == 'Y'">  
26:  <!-- LEARN: This is the until successful with a failure expression that checks that the invocation property indicates processing failure -->  
27:                <until-successful objectStore-ref="objectStore" maxRetries="5"   
28:                              secondsBetweenRetries="10" failureExpression="groovy:message.getInvocationProperty('callAction') == '500'"  
29:                              deadLetterQueue-ref="unprocessedMessageStore" doc:name="Until Successful">  
30:  <!-- LEARN: The key is to move all processing to a sub-flow where you can catch recoverable and unrecoverable exceptions. -->  
31:                  <processor-chain>  
32:                                      <flow-ref name="mySubFlow"/>  
33:                                  </processor-chain>  
34:                              </until-successful>  
35:                          </when>  
36:                          <otherwise>  
37:                              <flow-ref name="errorHandlerBadInput" doc:name="errorHandlerBadInput"/>  
38:                          </otherwise>  
39:                      </choice>  
40:                  </processor-chain>  
41:                  </when>  
42:              </choice>  
43:          </processor-chain>  
44:      <catch-exception-strategy doc:name="Catch Exception Strategy">  
45:          <flow-ref name="jmsAuditDirectWrite" doc:name="Log Unprocessed Message to Queue"/>  
46:    </catch-exception-strategy>  
47:  </flow>  
48:    
49:    <sub-flow name="myWebServiceClient" doc:name="myWebServiceClient">  
50:          <cxf:jaxws-client operation="MyRequest" serviceClass="com.my.client.v1.IMemberManager" doc:name="Webservice Client">  
51:              <cxf:outInterceptors/>  
52:          </cxf:jaxws-client>  
53:          <http:outbound-endpoint exchange-pattern="request-response" address="http://..." doc:name="myClientRequest"/>  
54:      </sub-flow>  
55:      <sub-flow name="jmsAuditDirectWrite" doc:name="jmsAuditDirectWrite">  
56:      <expression-transformer>  
57:      <return-argument expression="session:srcPayload" evaluator="header" />  
58:      </expression-transformer>  
59:      <jms:outbound-endpoint queue="esb.unprocessed.request.err" connector-ref="jmsEsbXAConnector" doc:name="JMS"/>  
60:      </sub-flow>  
61:    
62:      <sub-flow name="mySubFlow">  
63:          <processor-chain>  
64:  <!-- LEARN: Call the client web service. -->  
65:              <flow-ref name="myWebServiceClient" doc:name="Invoke WebService"/>  
66:  <!-- LEARN: This is the custom transformer of the response ( See code snippet below) that brings back with it a status code (which acts like a non-recoverable exception or a recoverable exception exception as the case may be. -->  
67:         <custom-transformer class="com.mycompany.transformer.MyResponseTransformer" doc:name="Java"/>  
68:         <choice>  
69:  <!-- LEARN: This is the condition that says this is a non-recoverable exception and hence, write to a JMS queue and skip the loop. -->  
70:           <when evaluator="groovy" expression="message.getInvocationProperty('skip') == 'Y'">  
71:                      <flow-ref name="jmsAuditDirectWrite" doc:name="Log Unprocessed Message to Queue"/>  
72:                      <message-properties-transformer scope="invocation" doc:name="Message Properties">  
73:  <!-- LEARN: This is the property that fails the failure expression on the Until-Successful loop and hence until successful loop processing stops -->  
74:                          <add-message-property key="callAction" value="202Y"/>  
75:                      </message-properties-transformer>  
76:                  </when>  
77:                  <when evaluator="groovy" expression="message.getInvocationProperty('skip') == '202Y'">  
78:  <!-- LEARN: This is the condition that says this message was processed successfully and hence skip the loop. -->  
79:                      <message-properties-transformer scope="invocation" doc:name="Message Properties">  
80:  <!-- LEARN: This is the property that fails the failure expression on the Until-Successful loop and hence until successful loop processing stops -->  
81:              <add-message-property key="callAction" value="202Y"/>  
82:                      </message-properties-transformer>  
83:                  </when>  
84:              </choice>  
85:          </processor-chain>  
86:      </sub-flow>  
87:    


The Custom Response Transformer

Within com.mycompany.transformer.MyResponseTransformer, this happens:
// jsr is the unmarshalled response returned from the web service I called.
// it carries a status code. A non-zero value is a failure in processing.
// the resendFlag indicates whether it was a recoverable exception (resend=true)
// or a non-recoverable exception (resend=false)

1:  returnValue.setMessage(jsr.getMessage());  
2:  returnValue.setStatus(jsr.getStatus());  
3:  if (jsr.getStatus().codeValue() == 0) {  
4:    message.setInvocationProperty("callAction", "202");  
5:      logger.info("Client SUCCESSFULLY PROCESSED with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
6:      message.setInvocationProperty("skip", "202Y");  
7:  } else {  
8:    if (!jsr.isResend()) {  
9:          message.setInvocationProperty("callAction", "500");  
10:          logger.info("Client FAILED PROCESSING with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
11:          message.setInvocationProperty("skip", "Y"); //do not resend; store;  
12:  } else {  
13:          message.setInvocationProperty("callAction", "500");  
14:          logger.info("Client requested a RESEND of the request with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
15:          message.setInvocationProperty("skip", "N");  
16:          }  
17:      }  
18:  }   
19: