Wednesday, October 10, 2012

Mule ESB Reliability with Exception Strategy

Mule 3.x brings with it some nice exception strategy handling features that you can use to provide reliable message delivery and retrying service calls.

Your requirement is that: You need to try calling a web service from your Mule Flow.

Case 1 If service is unavailable (timeout, 500 etc), retry forever (or a specified number of times and if all retries fail, write the original payload to a JMS message queue).

Case 2 If service is available, the call was successful BUT the web service response indicates that processing failed but operation could be tried again (a recoverable exception on the web service end), then retry the call.

Case 3 If service is available, the call was successful BUT the web service response indicates that the processing  failed (non-recoverable exception on the web service end - meaning no matter how many times you retry, processing will fail), then skip out of retries but still write the original payload to a JMS message queue.

Using JMS, JMS Transactions and Exception Strategies

You break your implementation into two separate main flows.
Flow 1 which you see below receives the request and places it in a JMS queue, transactionally.

1:    <flow name="myServiceV2" doc:name="myServiceV2">  
2:      <http:inbound-endpoint exchange-pattern="one-way" host="localhost" port="63012" path="my/service/v2/operation" doc:name="myService"/>  
3:      <object-to-string-transformer doc:name="Object to String"/>  
4:      <message-properties-transformer doc:name="Message Properties">  
5:        <add-message-property key="srcPayload" value="#[payload:]"/>  
6:      </message-properties-transformer>  
7:      <set-session-variable value="#[payload:]" variableName="srcPayload" doc:name="Session Variable" />  
8:      <logger message="Request SRC message is #[payload:]" level="INFO" doc:name="Logger"/>  
9:      <vm:outbound-endpoint exchange-pattern="one-way" path="unprocessedOriginalMessageStoreV2" responseTimeout="20000" mimeType="text/plain" doc:name="Dispatch to audit"/>  
10:      </flow>  
11:        
12:    <flow name="unprocessedOriginalMessageStoreV2" doc:name="unprocessedOriginalMessageStoreV2">  
13:      <vm:inbound-endpoint exchange-pattern="one-way" path="unprocessedOriginalMessageStoreV2" doc:name="VM">  
14:      </vm:inbound-endpoint>  
15:        <object-to-string-transformer />  
16:      <expression-transformer doc:name="Expression">  
17:        <return-argument evaluator="header" expression="session:srcPayload"/>  
18:      </expression-transformer>  
19:      <jms:outbound-endpoint queue="esb.unprocessed.original.message" connector-ref="jmsPalEsbXAConnector" doc:name="JMS">  
20:          <jms:transaction action="ALWAYS_BEGIN"/>  
21:      </jms:outbound-endpoint>  
22:    </flow>  
23:    


Next, write another flow that reads from the message queue and processes the message. Note the transactions, the exception handling strategy and the rollback of the message.

1:    <flow name="processMyRequests" doc:name="processMyRequests">  
2:      <jms:inbound-endpoint queue="esb.unprocessed.original.message" connector-ref="jmsPalEsbXAConnector" doc:name=" processor">  
3:        <xa-transaction action="ALWAYS_BEGIN" timeout="60000"/>  
4:      </jms:inbound-endpoint>  
5:      <vm:outbound-endpoint exchange-pattern="one-way" path="jmsTask" responseTimeout="20000" mimeType="text/plain" doc:name="Dispatch to processing">  
6:        <xa-transaction action="ALWAYS_JOIN" timeout="60000"/>  
7:      </vm:outbound-endpoint>  
8:    </flow>  
9:      
10:    <flow name="processMyRequestJMS" processingStrategy="synchronous">  
11:      <vm:inbound-endpoint exchange-pattern="one-way" path="jmsTask" doc:name="VM">  
12:        <xa-transaction action="BEGIN_OR_JOIN" timeout="60000"/>  
13:      </vm:inbound-endpoint>  
14:      <set-session-variable value="#[payload:]" variableName="srcPayload" doc:name="Session Variable" />  
15:      <cxf:jaxws-service serviceClass="com.mycompany.operations.v1.IMemberManager" doc:name="SOAP Service"/>  
16:      <set-session-variable value="N" variableName="skipRetry" doc:name="Variable" />  
17:      <processor-chain doc:name="Processor Chain">  
18:        <message-properties-transformer scope="invocation" doc:name="Message Properties">  
19:          <add-message-property key="MessageHeader" value="#[groovy:payload.find{it instanceof javax.xml.ws.Holder}.value]"/>  
20:          <add-message-property key="MyRequest" value="#[groovy:payload.find{it instanceof com.mycompany.member.MyRequest}]"/>  
21:          <delete-message-property key=".*"/>  
22:        </message-properties-transformer>  
23:        <set-session-variable variableName="skipRetry" value="N" doc:name="Variable"/>  
24:        <choice doc:name="Choice">  
25:          <when expression="#[groovy:validationService.isValidValue(payload[1].data.value)]">  
26:            <processor-chain>  
27:              <custom-transformer class="com.mycompany.transformer.MyRequestTransformer" doc:name="Java"/>  
28:              <choice doc:name="Choice">  
29:                <when expression="groovy:message.getInvocationProperty('requestValid') == 'Y'">  
30:                  <processor-chain>  
31:  <!-- LEARN: Call the client webservice -->  
32:                    <flow-ref name="webServiceClientV2" doc:name="Invoke WebService"/>  
33:  <!-- LEARN: This is the custom transformer that reads the response on successful call of client and sets the skip property -->  
34:  <!-- LEARN: 'skip' is set to 'D' (for Done) if call was successfull; -->  
35:  <!-- LEARN: 'skip' is set to 'Y' if call was successfull but response indicated exception was non-recoverable; (so no retries but write to original msg to JMS Queue)-->  
36:  <!-- LEARN: 'skip' is set to 'N' if call was successfull but response indicated exception was recoverable; (so retry); -->  
37:                    <custom-transformer class="com.mycompany.transformer.MyResponseTransformer" doc:name="Java"/>  
38:                                      <choice>  
39:                                          <when evaluator="groovy" expression="message.getInvocationProperty('skip') != 'D'">  
40:  <!-- LEARN: When 'skip' is NOT 'D' then set the value of skip to a session or flow variable and throw a dummy exception-->   
41:                                              <set-session-variable value="#[groovy:message.getInvocationProperty('skip')]" variableName="skipRetry" doc:name="Variable" />  
42:                                              <scripting:transformer doc:name="Create Dummy Exception">  
43:                                            <scripting:script engine="groovy">  
44:                                              <scripting:text><![CDATA[throw new java.lang.Exception('IGNORE THIS EXCEPTION');]]></scripting:text>  
45:                                            </scripting:script>  
46:                                          </scripting:transformer>  
47:                            </when>  
48:                                      </choice>  
49:                  </processor-chain>  
50:                </when>  
51:                <otherwise>  
52:                  <processor-chain>  
53:                    <flow-ref name="errorHandlerBadInput" doc:name="errorHandlerBadInput"/>  
54:                  </processor-chain>  
55:                </otherwise>  
56:              </choice>  
57:            </processor-chain>  
58:          </when>  
59:          <otherwise>  
60:            <processor-chain>  
61:              <flow-ref name="defaultErrorHandlerV2" doc:name="defaultErrorHandlerV2"/>  
62:            </processor-chain>  
63:          </otherwise>  
64:        </choice>  
65:      </processor-chain>  
66:      <choice-exception-strategy doc:name="Choice Exception Strategy">  
67:  <!-- LEARN: When dummy exception thrown, check the flow or session variable 'skipRetry' ; If do not skip, then rollback JMS transaction to re queue the JMS message -->  
68:         <rollback-exception-strategy when="#[groovy:message.getProperty('skipRetry', org.mule.api.transport.PropertyScope.SESSION) == 'N' || exception.causedBy(java.net.UnknownHostException) || exception.causedBy(java.net.ConnectException)]" maxRedeliveryAttempts="2">  
69:            <on-redelivery-attempts-exceeded>  
70:              <logger message="STRATEGY ROLLBACK WITH LIMITED RETRIES #[payload:]" level="INFO" doc:name="Logger"/>  
71:              <flow-ref name="jmsAuditDirectWriteV2" doc:name="Log Unprocessed Message to Original Queue"/>  
72:            </on-redelivery-attempts-exceeded>  
73:        </rollback-exception-strategy>  
74:    
75:  <!-- LEARN: When dummy exception thrown, check the flow or session variable 'skipRetry' ; If do not skip, then place the original message back on main processing queue for reprocessing -->  
76:  <!-- LEARN: If you uncomment out this forever strategy AND comment out the above ROLLBACK strategy, you get retry forever -->  
77:  <!--      <catch-exception-strategy when="#[groovy:message.getProperty('skipRetry', org.mule.api.transport.PropertyScope.SESSION) == 'N' || exception.causedBy(java.net.UnknownHostException) || exception.causedBy(java.net.ConnectException)]">  
78:          <logger message="STRATEGY RETRY FOREVER #[payload:]" level="INFO" doc:name="Logger"/>  
79:              <flow-ref name="unprocessedOriginalMessageStoreV2" doc:name="Log Unprocessed Message to Original Queue"/>  
80:        </catch-exception-strategy>  
81:  -->  
82:        <catch-exception-strategy when="#[groovy:message.getProperty('skipRetry', org.mule.api.transport.PropertyScope.SESSION) == 'Y']" doc:name="Write to Error Queue">  
83:          <logger message="STRATEGY WRITE TO ERROR QUEUE; SKIP RETRY; #[payload:]" level="INFO" doc:name="Logger"/>  
84:          <flow-ref name="jmsAuditDirectWriteV2" doc:name="Log Unprocessed Message to Error Queue"/>  
85:        </catch-exception-strategy>  
86:        <catch-exception-strategy doc:name="Write to Error Queue">  
87:          <logger message="STRATEGY DEFAULT WRITE TO ERROR QUEUE; #[payload:]" level="INFO" doc:name="Logger"/>  
88:          <flow-ref name="jmsAuditDirectWriteV2" doc:name="Log Unprocessed Message to Error Queue"/>  
89:        </catch-exception-strategy>  
90:      </choice-exception-strategy>  
91:    </flow>  
92:    <sub-flow name="webServiceClientV2" doc:name="webServiceClientV2">  
93:      <logger message="client 0 : #[payload:]" level="ERROR" doc:name="Log Error"/>  
94:      <cxf:jaxws-client operation="MyRequest" serviceClass="com.mycompany.operations.IMemberManager" doc:name="Webservice Client">  
95:        <cxf:outInterceptors/>  
96:      </cxf:jaxws-client>  
97:      <http:outbound-endpoint exchange-pattern="request-response" address="http://..." doc:name="request"/>  
98:    </sub-flow>  
99:    <sub-flow name="jmsAuditDirectWriteV2" doc:name="jmsAuditDirectWriteV2">  
100:      <expression-transformer doc:name="Expression">  
101:        <return-argument evaluator="header" expression="session:srcPayload"/>  
102:      </expression-transformer>  
103:      <jms:outbound-endpoint queue="esb.unprocessed.request.err" connector-ref="jmsPalEsbXAConnector" doc:name="JMS"/>  
104:    </sub-flow>  
105:    <flow name="unprocessedOriginalMessageStoreV2" doc:name="unprocessedOriginalMessageStoreV2">  
106:      <expression-transformer doc:name="Expression">  
107:        <return-argument evaluator="header" expression="session:srcPayload"/>  
108:      </expression-transformer>  
109:        <object-to-string-transformer />  
110:      <logger message="JMS message is #[payload:]" level="INFO" doc:name="Logger"/>  
111:      <jms:outbound-endpoint queue="esb.unprocessed.original.message" connector-ref="jmsPalEsbXAConnector" doc:name="JMS">  
112:      </jms:outbound-endpoint>  
113:    </flow>  
114:  </mule>  
115:    


The Custom Response Transformer

Within com.mycompany.transformer.MyResponseTransformer, this happens:
// jsr is the unmarshalled response returned from the web service I called.
// It carries a status code. A non-zero value is a failure in processing.
// The resendFlag indicates whether it was a recoverable exception (resend=true)
// or a non-recoverable exception (resend=false)

 
1:  returnValue.setMessage(jsr.getMessage());  
2:  returnValue.setStatus(jsr.getStatus());  
3:  if (jsr.getStatus().codeValue() == 0) {  
4:      logger.info("Client SUCCESSFULLY PROCESSED with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
5:      message.setInvocationProperty("skip", "D");  
6:  } else {  
7:    if (!jsr.isResend()) {  
8:          logger.info("Client FAILED PROCESSING with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
9:          message.setInvocationProperty("skip", "Y"); //do not resend; store;  
10:  } else {  
11:          logger.info("Client requested a RESEND of the request with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
12:          message.setInvocationProperty("skip", "N");  
13:          }  
14:      }  
15:  }   




No comments: