Monday, March 25, 2013

Mule 3.3 Start and Stop Flows Programmatically - A Retry Strategy for your Messages

Mule 3.3 brings with it some updates to handle on demand execution of flows. This blog discusses the ability to use a combination of Mule and Active MQ (or a messaging tool of your choice) to provide reliability and fault tolerance by using retry techniques.
The retry is on demand and programmatically controlled. A retry flow programmatically starts and stops a flow.

Your requirement is that: When a call to an external service fails or results in a business exception, you need to be able to store the original request in an error queue and to be able to retry those requests on demand.

Start and Stop Flows Programmatically On-demand


You break your implementation into four separate main flows. Flows three and four are the meat of this blog.
The first flow reads an incoming message from a JMS queue and place it in an intermediate queue.
The second flow transactionally processes the intermediate queue and write to a *.retry queue where appropriate.
The third flow is a trigger for the retry process which listens to a specific message on a queue to programmatically start and stop the fourth flow.
The fourth flow wakes up when told to, sweeps the *.retry queue once for all retry messages to process them and is shut down until the next time it is needed.

To get to the point, I will document Flows 3 and 4 first. If you are insterested, you can scroll down to view the first two flows to understand how the message got into the retry queue in the first place.

Flow 3 The flow that retries the request. Initial state on the flow is "stopped".


   <flow name="retry.masterupdate.request.to.client.jms.flow" initialState="stopped" processingStrategy="synchronous">  
     <jms:inbound-endpoint exchange-pattern="request-response" queue="my.intermediate.queue.masterupdate.retry" mimeType="text/plain" connector-ref="myRetryInConnector" doc:name="JMS"/>  
     <object-to-string-transformer doc:name="Object to String"/>  
     <set-variable variableName="initialMessageReceived" value="#[payload:]" doc:name="Variable"/>  
 <!-- PLACE into the original queue to be reprocessed. See Flow 1 -->  
     <jms:outbound-endpoint queue="my.masterupdate" connector-ref="myOutboundConnector" doc:name="JMS"/>  
     <choice-exception-strategy doc:name="retryEndpoint.exception.strategy">  
       <catch-exception-strategy when="true" doc:name="Catch Exception Strategy">  
         <choice doc:name="Choice">  
           <when expression="#[flowVars['initialMessageReceived'] != null]">  
             <set-payload value="#[flowVars['initialMessageReceived']]" doc:name="Set Payload"/>  
           </when>  
         </choice>  
 <!-- PLACE back into the retry queue to be reprocessed the next time around. -->  
         <jms:outbound-endpoint queue="my.intermediate.queue.masterupdate.retry" connector-ref="myOutboundConnector" doc:name="JMS"/>  
       </catch-exception-strategy>  
     </choice-exception-strategy>  
   </flow>  

Flow 4 The flow that initiates the retry based on a triiger, which is a message in a specific queue.

   <flow name="initiate.retry" processingStrategy="synchronous">  
     <jms:inbound-endpoint exchange-pattern="request-response" queue="masterupdate.retry.trigger" mimeType="text/plain" connector-ref="myThrottledInConnector" doc:name="JMS"/>  
     <object-to-string-transformer doc:name="Object to String"/>  
     <choice doc:name="Choice">  
 <!-- THIS is the trigger queue with na specific message that controls which data to retry. -->  
       <when expression="#[groovy: payload.toUpperCase().startsWith('MASTER_UPDATE_RETRY')]">  
         <processor-chain>  
           <scripting:component doc:name="Start flow to retry">  
             <scripting:script engine="Groovy">  
               <scripting:text><![CDATA[((org.mule.construct.Flow)(muleContext.getRegistry().lookupFlowConstruct('retry.masterupdate.request.to.client.jms.flow'))).start();]]></scripting:text>  
             </scripting:script>  
           </scripting:component>  
 <!-- STOP retry flow after completion. -->  
           <flow-ref name="stop.retry.flows" doc:name="Flow Reference"/>  
         </processor-chain>  
       </when>  
     </choice>  
   </flow>  
   <sub-flow name="stop.retry.flows" doc:name="stop.retry.flows">  
     <scripting:component doc:name="Stop Flow">  
       <scripting:script engine="Groovy">  
         <scripting:text><![CDATA[((org.mule.construct.Flow)(muleContext.getRegistry().lookupFlowConstruct('retry.masterupdate.request.to.client.jms.flow'))).stop();]]></scripting:text>  
       </scripting:script>  
     </scripting:component>  
   </sub-flow>  


Flow 1 which you see below receives the request and places it in an intermediate JMS queue after some basic validations. If there is an error, the request goes to an error queue.


   <flow name="my.jms.flow" processingStrategy="synchronous">  
     <jms:inbound-endpoint exchange-pattern="request-response" queue="my.masterupdate" mimeType="text/xml" connector-ref="myThrottledInConnector" doc:name="JMS">  
     </jms:inbound-endpoint>  
     <set-variable variableName="initialMessageReceived" value="#[payload:]" doc:name="Variable"/>  
     <logger message="Incoming message from Master - #[payload:]" level="INFO" doc:name="Logger"/>  
     <processor-chain>  
       <set-payload value="#[flowVars['initialMessageReceived']]" doc:name="Set Payload"/>  
       <object-to-string-transformer />  
       <custom-transformer class="com.esb.transformers.MyCustomTransformer" doc:name="Java"/>  
       <jms:outbound-endpoint exchange-pattern="one-way" queue="my.intermediate.queue.masterupdate" connector-ref="myOutboundConnector" doc:name="JMS">  
       </jms:outbound-endpoint>  
     </processor-chain>  
     <choice-exception-strategy doc:name="my.jms.exception.strategy">  
       <catch-exception-strategy when="#[exception.causedBy(com.esb.exceptions.ValidationException)]" doc:name="Catch Exception Strategy">  
         <object-to-string-transformer doc:name="Object to String"/>  
         <logger message="My Validation Exception." level="ERROR" doc:name="Logger"/>  
         <logger level="ERROR" doc:name="Logger"/>  
         <choice>  
           <when expression="#[flowVars['initialMessageReceived'] != null]">  
             <set-payload value="#[flowVars['initialMessageReceived']]" doc:name="Set Payload"/>  
           </when>  
         </choice>  
         <message-properties-transformer scope="outbound" doc:name="Message Properties">  
           <add-message-property key="BIZ_ERR" value="My Validation Exception."/>  
         </message-properties-transformer>  
         <jms:outbound-endpoint exchange-pattern="one-way" queue="my.masterupdate.err" connector-ref="myOutboundConnector" doc:name="JMS">  
         </jms:outbound-endpoint>  
       </catch-exception-strategy>  
       <catch-exception-strategy when="true" doc:name="Catch Exception Strategy">  
         <choice>  
           <when expression="#[flowVars['initialMessageReceived'] != null]">  
             <set-payload value="#[flowVars['initialMessageReceived']]" doc:name="Set Payload"/>  
           </when>  
         </choice>  
         <logger message="Unhandled exception while calling. #[exception.message] for Payload: #[payload:]" level="ERROR" doc:name="Logger"/>  
         <logger level="ERROR" doc:name="Logger"/>  
         <message-properties-transformer scope="outbound" doc:name="Message Properties">  
           <add-message-property key="BIZ_ERR" value="#[exception.message]" />  
         </message-properties-transformer>  
         <jms:outbound-endpoint exchange-pattern="one-way" queue="my.masterupdate.err" connector-ref="myOutboundConnector" doc:name="JMS">  
         </jms:outbound-endpoint>  
       </catch-exception-strategy>  
     </choice-exception-strategy>  
   </flow>  

Flow 2 which transactionally reads the intermediate queue and processes the mesage. Upon errors, it writes message to a *.retry exception.


   <flow name="my.intermediate.queue.masterupdate" processingStrategy="synchronous" >  
     <jms:inbound-endpoint exchange-pattern="request-response" queue="my.intermediate.queue.masterupdate" mimeType="text/xml" connector-ref="myThrottledInConnectorInfiniteRetry" doc:name="JMS">  
       <ee:multi-transaction action="ALWAYS_BEGIN"/>  
     </jms:inbound-endpoint>  
     <set-variable variableName="initialMessageReceivedCP" value="#[payload:]" doc:name="Variable"/>  
     <message-properties-transformer scope="outbound" doc:name="Message Properties">  
       <add-message-property key="Content-Type" value="application/xml; charset=UTF8"/>  
     </message-properties-transformer>  
     <http:outbound-endpoint exchange-pattern="request-response" address="http://my.service.call" mimeType="application/xml" contentType="application/xml" responseTimeout="120000" doc:name="HTTP"/>  
     <object-to-string-transformer />  
     <logger message="RETURN message - #[payload:]" level="INFO" doc:name="Logger"/>  
     <choice>  
       <when expression="#[groovy: message.getInboundProperty('http.status') != null &amp;&amp; message.getInboundProperty('http.status').startsWith('400')]">  
         <logger level="DEBUG" message="Returned http status code 400."/>  
 <!-- THROW a business exception here. -->  
 <!-- THIS is a business exception. Can be retried later. -->  
       </when>  
       <when expression="#[groovy: message.getInboundProperty('http.status') != null &amp;&amp; message.getInboundProperty('http.status').startsWith('2')]">  
         <logger level="TRACE" message=" "/>  
 <!-- THIS is a SUCCESS. -->  
       </when>  
       <otherwise>  
 <!-- THIS is a Transaction ROLLBACK with a retry due to network issues with reaching the end point. This is not the same as retrying a business exception.  
 We do not want to use this feature for business exception retries because the same message will be rolled back and retried instead of moving on to the next message.  
 We believe that the business exception is perhaps a data clean up issue where the message can be retried later.  
 -->  
         <logger message="Unable to reach endpoint...HTTP Status #[groovy: message.getInboundProperty('http.status')]" level="DEBUG" doc:name="Logger"/>  
         <scripting:component doc:name="Throw Exception">  
           <scripting:script engine="Groovy">  
             <scripting:text><![CDATA[throw new java.net.ConnectException("Unable to reach end point. Retrying...");]]></scripting:text>  
           </scripting:script>  
         </scripting:component>  
       </otherwise>        
     </choice>  
     <choice-exception-strategy doc:name="my.exception.strategy">  
       <rollback-exception-strategy when="#[exception.causedBy(org.mule.api.routing.filter.FilterUnacceptedException) || exception.causedBy(java.net.UnknownHostException) || exception.causedBy(java.net.ConnectException) || exception.causedBy(java.net.SocketTimeoutException) || exception.causedBy(java.net.SocketException)]" doc:name="Catch Exception Strategy">  
         <logger message="Network/Connection exception while calling endpoint. Payload: #[payload:]. RETRYING..." level="ERROR" doc:name="Logger"/>  
         <set-payload value="#[flowVars['initialMessageReceivedCP']]" doc:name="Set Payload"/>  
       </rollback-exception-strategy>  
       <catch-exception-strategy when="#[exception.causedBy(com.esb.MyBusinessException)]" doc:name="Catch Exception Strategy">  
         <object-to-string-transformer doc:name="Object to String"/>  
         <logger message="My business exception." level="ERROR" doc:name="Logger"/>  
         <set-payload value="#[flowVars['initialMessageReceivedCP']]" doc:name="Set Payload"/>  
         <message-properties-transformer scope="outbound" doc:name="Message Properties">  
           <add-message-property key="BIZ_ERR" value="My business exception text"/>  
         </message-properties-transformer>  
 <!-- THIS is a business exception. Can be retried later. -->  
         <jms:outbound-endpoint exchange-pattern="one-way" queue="my.intermediate.queue.masterupdate.retry" connector-ref="myOutboundConnector" doc:name="JMS">  
         </jms:outbound-endpoint>  
       </catch-exception-strategy>  
       <catch-exception-strategy when="true" doc:name="Catch Exception Strategy">  
         <choice>  
           <when expression="#[flowVars['initialMessageReceivedCP'] != null]">  
             <set-payload value="#[flowVars['initialMessageReceivedCP']]" doc:name="Set Payload"/>  
           </when>  
         </choice>  
 <!-- THIS is an UNKNOWN exception. Can be retried later. -->  
         <logger message="Unhandled exception while calling end point. Payload: #[payload:]" level="ERROR"doc:name="Logger"/>  
         <jms:outbound-endpoint exchange-pattern="one-way" queue="my.intermediate.queue.masterupdate.retry" connector-ref="myOutboundConnector" doc:name="JMS">  
         </jms:outbound-endpoint>  
       </catch-exception-strategy>  
     </choice-exception-strategy>  
   </flow>  

Wednesday, October 10, 2012

An Intro to Mongo DB

Introduction
 Glossary of a few terms you need to know as you read on:
 Sharding (or Horizontal partitioning) is a database design principle whereby the content of a database table are split across physical locations by rows instead of by columns (using referential integrity). Each partition forms part of a shard. Multiple shards together provide a complete data set but the partitioned shards are split logically to ensure faster reads and writes.
Database Replication is the process of sharing data between the primary and one or more redundant databases, to improve reliability, fault-tolerance, or accessibility. Typically, data is immediately copied over to backup location upon write so as to be available for recovery and/or read-only resources.
Mongo DB
MongoDB is a document-oriented database management system. It adheres to the NoSQL core concepts in being distributed, schema-free and highly performant for fast reads and writes. As a database built from the ground up, it is specifically designed for high read and write throughput, storing data as documents in a distributed and sharded environment all the while running on commodity servers.
MongoDB is especially suited for Web applications that require a rich domain model. Since a document can intuitively represent rich hierarchical model, one can do away with complicated data structures for managing relationships (a.k.a multi-table joins). A document can contain all the data related to a domain object without having to join to other data structures, resulting in faster reads, writes and more intuitive data management. MongoDB supports database replication with automatic failover support. It is designed for horizontal scaling, which I explain later in the post.
The Data Model
Since MongoDB is a document-oriented database, data stored by MongoDB is in the form of a document that is hierarchical and has attributes that can be easily ready by human and machine alike. The best way to illustrate this is by an example. Listing A shows a sample document that represents an employee in an organization.
Listing A

{ _id: ObjectID('5ddfaf7c7649b7e7hd638d65'),
firstName: 'Mike',
lastName: 'Smith,
department: 'Finance',
skills: ['SAP Finance','CFP','CPA'],
salary:100000,
image: {url:'companyserver/location/msmith.jpg'}
}

The “employee” document is simply a JSON document, which means it is a set of key/value pairs of data. Keys represent the name of the fields and the values are either a specific value or could in turn represent a nested document, an array of data, or an array of documents. If you had to represent this in a relational model, you will end up with multiple tables to represent the entities, multiple join tables and referential integrity constraints. To extract data, you will find yourself writing a complex SQL query that joins these flat structured one-dimensional tables. But, a document would represent the person as a whole with all data self-contained resulting in being more intuitive to use.
Because you always operate on a document as whole, you get atomic operations on the document.
Querying and Indexes
The code to query data in these documents also ends up being more intuitive to read. For instance, to find all employees and to find a particular emloyee, the code would look like this:
Listing B:

db.employees.find();
db.employees.find({‘firstName’:’Mike’, ‘lastName’:’Smith’});

Now, if you spend a few more minutes to look at this document, you would be thinking of various other ways your clients would need this data – all employees by department, employees making more than $90,000, employees with particular skills etc. You would also rightfully wonder if these queries would be fast. But what is critical for you to know is that these queries are also very efficient because MongoDB allows you to define secondary indexes on the document. You would add the following indexes and the query would run quite efficiently.
Listing C:

db.employees.ensureIndex({lastName: 1});
db.employees.ensureIndex({skills: 1});
db.employees.ensureIndex({salary: 1});
db.employees.find({'skills': 'CFP'});
db.employees.find({'salary': {'$gt': 90000}});
 
Database Replication
One of the biggest advantages of NoSQL databases and MongoDB in particular is its database replication features. MongoDB’s offering is called a replica set, which is a set of MongoDB database servers that operate seamlessly as an unit offering one primary database server for writes and (potentially reads) and at least one (but typically more than one) redundant failover servers. The failover servers are referred to as secondary servers and can be used for scaling data reads. The data in the secondary servers is kept in sync by the replication process in real-time and in case of a failure in the primary server, one of the secondary servers kicks in automatically as primary and the remaining servers continue to be secondary. The choice of which server becomes primary can be decided by a configurable arbitration process. Eventually, when the former primary comes back online and rejoins the cluster, it does so as a secondary server. This is what is known as a replicated set.

Scaling
If you have been involved in building and running any enterprise application or large scale web application, it is very likely that you have been faced with scenarios where performance has degraded for your end-users over time. With existing architectures involving application servers and a large database server, you may have taken the approach of beefing up the hardware on your application servers and database servers, also known as vertical scaling. The drawback of such a solution is that there is a finite amount of opportunity available with your application servers and database servers after which, you cannot benefit from upgrades while being pragmatic about costs. Also, you will notice a glaring weakness – your database server is a single point of failure. If the server fails, so does your application ecosystem. Granted you may have backup, but to load the backup data and bring the server back online maynot be a trivial effort.
Horizontal scaling (Sharding) involves scaling out – instead of concentrating on improving one server, you focus on distributing data across multiple smaller servers. You continue with your existing hardware profile, perhaps smaller commodity servers, add more of these servers into your ecosystem and distribute your data across this farm of servers. In this architecture, the entire set of servers together isyour database but each server in the farm contains only a slice of data. In such a scenario, it is easier to imagine that a failure of one server means you lose a small chunk of data while retaining most of it and having it available to your applications. To extend this further and couple with what you learned about replication in the previous section, if each server is a replicated set, you are almost guaranteed to have the smallest down time in case of a catastrophic failure in a particular server.
Mongo DB was designed to operate as a distributed database with support for auto-sharding,  which automatically manages the distribution of data across nodes. Adding a shard is easy and failover is handled by MongoDB’s sharding system. If you ensure that each shard operates as a replica set as discussed earlier, you would eliminate single-points of failure. MongoDB’s sharded cluster management also ensures that developers need not worry about the location of data and only need to address the sharded cluster as if it were a single MongoDB server.
Each shard in MongoDB consists of one or more servers running a mongod process. A collection is sharded based on a shard key which can be defined to use a certain pattern similar to defining an index. MongoDB's sharding is order-preserving; adjacent data by shard key tend to be on the same server. This page provides more detail on MongoDB's sharding policy.
Data Integrity
MongoDB offers an option called “Journaling”, a feature that addresses data integrity by recording every database write operation as a running incremental log in addition to writing to the database. This journal would then be used in case of a catastrophic failure to replay the write operations to bring the database back to its original state prior to the crash. An alternative to journaling would be run each server as a replicated set, but journaling is always recommended.
What do you give up?
RDBMS have long rules the database world for everything from simple persistence to complex analytical database engines. It is inevitable that NoSQL databases are compared to relative to RDBMS features. Here are a few things of note:
By using MongoDB, you give up the notion of atomic transactions (ACID) across domains. While updating a single document is an atomic transaction in MongoDB, writing to two separate documents (tables in RDBMS) is not a single transaction. One could succeed while the other could fail. You have to design for this by a separate strategy.
There is no concept equivalent to table joins and foreign key constraints in MongoDB.
By nature of a document-oriented model without joins and key constraints, you will end up with a more de-normalized model with MongoDB than you would expect with a similar RDBMS model.
You have to have a certain tolerance for some (but rare) loss of data because of in-memory manipulation of data in MongoDB as opposed to physical disk persistence in RDBMS databases.
By using MongoDB, you will give up some of the powerful SQL Querying constructs supported by RDBMS databases.
Summary
I have presented an overview of MongoDB, one of the most popular NoSQL databases around today. MongoDB is a strong and viable candidate for any company to consider as an alternative to relational databases or to other NoSQL databases. It is designed to be scalable, distributed, easy to administer and to run on commodity servers.
However, it is important to know that a NoSQL database may not be the right option for every use case and further, even if you have a strong case for a NoSQL database, MongoDB may still not be the right choice for you.
Consider the following as you determine a fit your usage scenarios:
  1. MongoDB is document oriented and within each document, maintains a key/value notation to store data. This format supports rich domains usually found in web applications, analytics, logging.
  2. The MongoDB document can act as a primary storage for your domain. The rich data model allows you to define secondary indexes for faster queries.
  3. MongoDB is schema-less. This allows you to model a domain whose attributes are expected to vary over time.
  4. Auto-sharding lets you operate your database as a distributed database and if your domain has naturally shardable attributes, then MongoDB is the best choice.
  5. MongoDB’s scaling features allows you to support a large database and still meet speed and availability specifications typically required of web applications.
  6. Not having to worry much about DBA activities, developers control the data format which leads to seamless development and enhancements to existing features all resulting in faster delivery.
  7. Community and Commercial support for MongoDB is significant. 10gen is the company behind MongoDB and seems committed to its success. There is a significant developer community excited about the product and its future.
  8. MongoDB has been deployed in many production environments around the world and has been proven to be a success. Craigslist (Craigslist uses MongoDB to archive billions of records), SAP (SAP uses MongoDB as a core component of SAP’s platform-as-a-service (PaaS) offering), Foursquare (to store venues and user check-ins) are some of the companies using MongoDB in production.
For documentation, instructions on installing and tutorials, visit www.mongodb.org.

Neo4j - A Graph Database

Introduction

This blog looks at Neo4j, a graph Database.Glossary of a few terms you need to know as you read on:
Sharding (or Horizontal partitioning) is a database design principle whereby the content of a database table are split across physical locations by rows instead of by columns (using referential integrity). Each partition forms part of a shard. Multiple shards together provide a complete data set but the partitioned shards are split logically to ensure faster reads and writes.
Database Replication is the process of sharing data between the primary and one or more redundant databases, to improve reliability, fault-tolerance, or accessibility. Typically, data is immediately copied over to backup location upon write so as to be available for recovery and/or read-only resources.

Neo4j

Neo4j is a Graph Oriented NoSQL database. This is distinct from the other three categories – Key-Value, Big Table, Document databases. These last three categories are all described as Aggregate databases. What this means is these databases define their basic unit of persistence (or the domain) as an chunk of data that holds meaning when read/written as a chunk. According to Martin Fowler (http://martinfowler.com/bliki/AggregateOrientedDatabase.html), an aggregate is “a fundamental unit of storage which is a rich structure of closely related data: for key-value stores it's the value, for document stores it's the document, and for column-family stores it's the column family. In DDD terms, this group of data is an aggregate.” These types of databases are very effective when you are querying them with the aggregates as your dimension data and the attributes as your facts. But, when you wish to change your query mode to enforce one or more of your attributes as dimensions, you run into sever performance issues and hence you revert to writing Map-Reduce functions.
Enter Graph Databases. Conceptually, graph databases are built using Nodes and Relationships. Each of those may contain properties. Thus, there isn’t an aggregate of your domain objects but a set of Nodes connected via Relationships. Neo4j and InfiniteGraph are two examples of Graph Databases.

The Data Model

The moment one hears about Nodes and Relationships, one tends to think of RDBMS concepts by instinct. This is a mistake. You have shift your thinking in terms of Nodes and Relationships and not entities and their relationships. What you get with a Graph databases is what you produce as your domain model and you can translate it directly into a Graph. There is no further normalization of your model into tables, relationship tables etc.
Neo4j, as an example of a Graph Database, is schema-less database. It allows you to do bottom-up data model design, is fully ACID compliant. It is written in Java and is available as a standalone database server or an embedded database server.

The circular objects represent Nodes and the arrows represent Relationships. As you can see, there are multiple relationships between Nodes and this artifact came out of a design session for the application. This translates into your Neo4j database design without further work to create tables, columns, keys etc.
Nodes have properties that are basically key-value pairs. By writing you code in Java, you can take advantage of strongly typing your attributes. Every Relationship must have a Start Node and an End Node. The Start and End Nodes can the same. They can also have properties similar to Nodes.

Creating Nodes and Relationships:

 GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase("/sample/mydb");  
 Transaction tx = db.beginTx();  
 try {  
  Node personA = db.createNode();  
  personA.setProperty("name", "Person A");  
  Node projectCRM = db.createNode();  
  projectCRM.setProperty("name", "CRM Project");  
 personA.createRelationshipTo(projectCRM, DynamicRelationshipType.withName("LEADS"));  
  tx.success();  
 } finally {  
  tx.finish();  
 }  

Querying and Indexes

Querying for data in Neo4j is typically programmatic. You can use the Java API available with the distribution to query for data. Neo4j by design, as a Graph, is indexed and hence creating indexes is usually limited to certain app-specific functionality. Creating an index is typically so as to focus on a set of nodes – say most frequently queried, or a logical grouping of nodes etc. So, as a best practice, index only what you need and not every node or relationship. (http://docs.neo4j.org/chunked/milestone/indexing.html)
Lucene is the default indexing implementation for Neo4j. This code snippet enhances the previous snippet by including code to add indexes.

 GraphDatabaseService db = = new GraphDatabaseFactory().newEmbeddedDatabase("/sample/mydb");  
 Transaction tx = db.beginTx();  
 try {  
  Node personA = db.createNode();  
  personA.setProperty("name", "Person A");  
  Node projectCRM = db.createNode();  
  projectCRM.setProperty("name", "CRM Project");  
 personA.createRelationshipTo(projectCRM, DynamicRelationshipType.withName("LEADS"));  
 Index<Node> projects = db.index().forNodes("projects");  
 Index<Relationship> projectTeamMembers = db.index().forRelationships("projectTeamMembers");  
 db.index().forNodes("projects").add(projectCRM, "name",  
  projectCRM.getProperty("name"));  
 tx.success();  
 } finally {  
  tx.finish();  
 }  


Indexes usually are used to retrieve the reference to the set of nodes you are interested in. The real works begins after you have the set of Nodes (or Relationships) you looked for.
For querying data, in addition to the Query API, you have the options to use Traversal API , of which there are two kinds: The Simple Traversal API and the enhanced Traversal API. Both are under active development though the Simple version has been out longer and is more proven. You can also use the REST interface to query data.
Another popular alternative is the Cypher Query language (http://docs.neo4j.org/chunked/stable/cypher-query-lang.html). It is a declarative language that can be compared to SQL in its structure. It is a pattern matching language that walks through the nodes using the indexes already created. Cypher is a powerful tool in your arsenal when working with Neo4j.
There are other tools available at http://tinkerpop.com/ that you will find very useful.

Database Replication and High Availability

Neo4j is a Graph Database and by its concept and design, is not built for sharding. A typical production configuration has a Master-Slave design with more than one Slave servers. Data is replicated across the servers using log shipping for eventual consistency. The Master database writes are fast and immediate and slaves eventually acquire consistency based on a polling schedule that is configurable. On the other hand, writing to a slave means that the data has to be synchronously written to the Master before commits and the other slaves catch up per design. For disaster recovery, you can configure some slaves to be write-only and all they do is hold latest data until a disaster strikes the Master! Then one of those permanent slaves can be elected the master and your apps continue working without missing a beat.

Scaling

By design, Graphs are hard to scale. Nodes can have relationships that span multiple servers in your database server farm. Load-balancing can cause many relationships crossing instances. These are very expensive to traverse, networks are many orders of magnitude slower than in-memory traversals. To achieve scaling to the max, Neo4j uses Java NIO, in-memory caching, and expects a large RAM that it can use to cache data. The cache itself can be sharded by sharing the main memory to keep the cache warm with most frequently used data and when used with sticky sessions, ensures a user get max performance. You can also shard your domain data with some application specific algorithms.

Summary

This is only a brief introduction to Graph Databases with focus on Neo4j. The topic is vast and there is much to learn about this unique NoSQL technology. Neo4j is a wonderfult alternative not only to RDBMS but to other NoSQL databases due to its intuitive design.
However, it is important to know that a NoSQL database may not be the right option for every use case and further, even if you have a strong case for a NoSQL database, MongoDB may still not be the right choice for you.
Consider the following as you determine a fit your usage scenarios:
  1. 1. Neo4j offers a powerful data model especially for connected data.
  2. 2. By its nature, data traversal is very fast compared to RDBMS.
  3. 3. Graph Databases are great for generating recommendations, ratings, web analytics, Social Apps
  4. 4. Compared to other NoSQL database, supports ACID transactions.
  5. 5. It is under active development with a company behind it - http://www.neotechnology.com
  6. 6. A community edition is available and is free
  7. 7. However, HA relies on Master-Slave configuration which limits certain capabilities – sharding is not available out of the box. You have to be creative in design and deployment to achieve desired (or close to desired) results.
  8. 8. Scaling is the same - you have to be creative in design and deployment to achieve desired (or close to desired) results.
  9. 9. Write scaling is harder than read scaling and even though your domain may fit the connected data model, it still may not fit the Graph Database technology due to the non-functional requirements of your applications.
For documentation, instructions on installing and tutorials, visit www.neo4j.org.

A brief intro to Riak NoSQL Database

Introduction
Glossary of a few terms you need to know as you read on: Sharding (or Horizontal partitioning) is a database design principle whereby the content of a database table are split across physical locations by rows instead of by columns (using referential integrity). Each partition forms part of a shard. Multiple shards together provide a complete data set but the partitioned shards are split logically to ensure faster reads and writes.
Database Replication is the process of sharing data between the primary and one or more redundant databases, to improve reliability, fault-tolerance, or accessibility. Typically, data is immediately copied over to backup location upon write so as to be available for recovery and/or read-only resources.
Riak
Riak is a key-value storage database management system. It is one of the databases that fall under the NoSQL umbrella. Inspired by Amazon’s Dynamo, it is designed to be horizontally scalable, distributed, providing fast reads and writes while being highly available through software and hardware failures.
Riak stores data as key-value pairs with the value being the data that can be encoded following the demands of the application. Keys are organized into buckets, which I will explain in more detail later in the post. Operations on data use HTTP verb like commands PUT, GET and DELETE. In addition to your own data, the value portion of the key-value pair stores metadata – both out of the box and custom. Another powerful feature of Riak is the ability to manage relationships via links which are HTTP Rest-style links. These links are easy to decipher and understand by machine and human alike.
Riak is especially suited when high availability is of primary importance. This is typically for logging, analytics data storage, and when performing aggregation and analytics specific queries. As a database built from the ground up, it is specifically designed for high read and write throughput, providing seamless failover and guaranteed up-time all the while running on commodity servers.
Riak is available as open-source or as an Enterprise version with additional features and technical support from Basho, the company behind Riak.
The Data Model
Riak is a key-value storage database. The key is a unique identifier. The value is an amorphous value that can be custom-encoded as the application requires. Associated with the key, along with the value, is metadata for the data. You can also add custom metadata to the document to supplement the Riak provided metadata. Keys are always associated with a bucket; thus making the bucket-key combination, the unique reference to any data stored.
A Bucket/Key pair is called a Riak object. This Riak object can also store links to other Bucket/Key pairs thus providing the relationships often desired in databases. Because access is via HTTP Rest-style addressing, the metadata is accessible via HTTP Headers when Riak objects are retrieved.
Listing A
“employees” bucket may contain multiple “employee” keys. Each “employee” key has value to represent that employee’s data.
Each employee is addressable as: employees/emp1001 where emp1001 is the employee id, a value that is unique to each employee.
The key emp1001 can now hold data and metadata as shown:

 emp1001.content_type = “application/json”  
 emp1001.data = {name: ‘Mike Smith’, dept:’FIN’,title:’Manager’}  

The default storage back-end for Riak is Bitcask. Riak uses a pluggable API to address the storage back-end and thus can seamlessly address other supported back-ends as needed.
Querying and Secondary Indexes
Querying options in Riak include the three following techniques - writing MapReduce functions, full-text search on data or writing custom indexes and performing http GET operations.
Riak allows clients to query for data using HTTP GET operations. An R value can be set for each fetch where R signifies the minimum number of nodes that must successfully process the query for the query operation to be completed successfully. Riak can also return objects based on links stored on the object.
Riak supports MapReduce operations to execute complex query operations. A series of functions comprise a MapReduce job with the result of one function serving as input to the next in series until the result is finally served back to the client.
A new and improved feature in the recent release of Riak v 1.0 supports secondary indexes for efficient querying. The goal of this feature is to provide simple indexing options on data buckets at data write-time.
For example, when creating an employee record, you can create it within the "employee" bucket with a key which is a unique employee id, add data element to it (name, title) and finally, add index metadata for fields that are expected to be used in querying (skills, salary).

 curl -XPUT -d"{name: 'Mike Smith', dept:'FIN',title:'Manager'}" -H"x-riak-index-skills_bin:accounting"\  
 -H"x-riak-index-salary_int:100000" http://127.0.0.1:8098/riak/employees/employee200  


To then query for employees with "accounting' skills, you would perform the following operation:
 
 curl http://127.0.0.1:8098/buckets/employees/index/skills_bin/accounting  

To then query for employees whose salaries range between 50000 and 150000, you would perform the following operation:

 curl http://127.0.0.1:8098/buckets/employees/index/salary_int/50000/150000  

NOTE: In the above two employee records, there are two fields in addition to the basic data element containg name, department and title. These are: salary and skills. To add a secondary index, the field name must contain a suffix "_bin" for binary fields or "_int" for integer fields.
Of course, if you decided that you should be able to search your employees data by title, you could add another index like below in addition to maintaining the title as part of your basic data.

 curl -XPUT -d"{name: 'Mike Smith', dept:'FIN',title:'Manager'}" -H"x-riak-index-skills_bin:accounting"\
-H"x-riak-index-salary_int:100000"-H"x-riak-index-title_bin:Manager" http://127.0.0.1:8098/riak/employees/employee200  


Database Clustering and Replication
Riak runs as a cluster by default. Each physical Riak server is called a node. Each node has multiple "virtual nodes" within it. In a node, the bucket/key pairs are stored on a ring-like configuration where the ring is segmented into the number of "virtual nodes" running within it. Each "virtual node" is responsible for a segment. A cluster of nodes does not have a designated Master node. All nodes are equal and can serve any request from a client. As I mentioned earlier, Riak stores copies of data across the nodes in the cluster. The number of copies stored is configurable and is consistent across nodes in a cluster. Riak promises seamless addition and removal of nodes to and from a cluster.
Riak aims for eventual consistency while focusing on complete availability. Eventual consistency means that your data will be eventually consistent on all nodes over time. The latency is not expected to be more than a few hundred milliseconds and thus is usually acceptable. Nodes focus on being up and available for reads and writes and after that, they replicate data to other nodes.
Data Integrity
Riak is Masterless and hence write and read operations are able to be fulfilled by any node in a cluster. Being a distributed system with multiple copies of data across nodes, Riak has to provide some sort of conflict resolution and write safety. For every update, Riak accepts a ‘W’ value which is a number that represents the minimum number of nodes in a cluster that must report a successful update before the operation is deemed successful. With this safety net, an application can write to a Riak cluster even if one node is (or more nodes are) down. This number, coupled with another configurable number to indicate number of copies of a value to be stored, can be used to optimize the cluster.
Riak also has a configurable option to handle conflicts when it encounters data updated by separate sources. It allows you to configure it so the most recent update wins and is propagated to all nodes or you can configure it to present the differences to the client application for resolution based on a custom algorithm or manual intervention.
What do you give up?
RDBMS have long rules the database world for everything from simple persistence to complex analytical database engines. It is inevitable that NoSQL databases are compared to relative to RDBMS features. Here are a few things of note:
By using Riak, you give up the notion of 100% consistent data in the database in a cluster. Since Riak works on the principle of eventual consistency, you will encounter situations where retrieved data is stale and you application has to be designed to handle such scenarios.
There is no concept equivalent to table joins and foreign key constraints in Riak.
Because content is not typed, queryability is not as efficient as RDBMS.
With the data model being a kay-value pair, you lose the concept of a rich domain with strongly typed data.
You have to have a certain tolerance for latency and conflict resolution for update operations.
By using Riak, you will give up some of the powerful SQL Querying constructs supported by RDBMS databases.
On the bright side, you give up complex administration of RDBMS, the expensive hardware and the potential single point of failure inherent to RDBMS solutions.
Summary
This was a brief introduction to Riak, a popular key-value storage NoSQL database designed and built for high availability applications that can accept eventual consistency. The latency for data consistency being at most a few 100 milliseconds makes it a palatable alternative.
However, it is important to know that a NoSQL database may not be the right option for every use case. And further, even if you have a strong case for a NoSQL database, Riak may still not be the right choice for you.
Consider the following as you determine a fit your usage scenarios:
  1. Riak is a key-value storage database.
  2. Riak can act as primary storage for your web apps but also excels as an ancillary database for your applications to store user session data for web applications, data for web services etc.
  3. It acts as a scalable low-latency database for mobile apps where application availability for data entry is critical no matter the situation.
  4. Riak is schema-less and light-weight. This allows you to model a domain whose attributes are expected to vary over time.
  5. Riak’s high availability feature is a good fit for systems that need to be up for writes under any and all circumstances and eventual consistency is acceptable. In such cases, the merge conflicts are typically easily resolved through the automatic algorithms in Riak so that the databases can self-repair conflicts.
  6. Riak’s scaling features allows you to support a large database and still meet speed and availability specifications typically required of web applications.
  7. Not having to worry much about DBA activities, developers control the data format which leads to seamless development and enhancements to existing features, all resulting in faster delivery.
  8. Being a key-value store databse, Riak is not best suited for ad-hoc querying since the data structure in the value is not clearly defined. This results in data-analytics queries directly on Riak to not perform as well as one may expect.
I have presented an overview of a powerful NoSQL option – Riak, which is a strong and viable candidate for any company to consider as an alternative to relational databases or to other NoSQL databases. It is designed to be scalable, distributed, easy to administer and to run on commodity servers.
For documentation, instructions on installing and tutorials, visit www.basho.com.

Mule ESB Reliability with Exception Strategy

Mule 3.x brings with it some nice exception strategy handling features that you can use to provide reliable message delivery and retrying service calls.

Your requirement is that: You need to try calling a web service from your Mule Flow.

Case 1 If service is unavailable (timeout, 500 etc), retry forever (or a specified number of times and if all retries fail, write the original payload to a JMS message queue).

Case 2 If service is available, the call was successful BUT the web service response indicates that processing failed but operation could be tried again (a recoverable exception on the web service end), then retry the call.

Case 3 If service is available, the call was successful BUT the web service response indicates that the processing  failed (non-recoverable exception on the web service end - meaning no matter how many times you retry, processing will fail), then skip out of retries but still write the original payload to a JMS message queue.

Using JMS, JMS Transactions and Exception Strategies

You break your implementation into two separate main flows.
Flow 1 which you see below receives the request and places it in a JMS queue, transactionally.

1:    <flow name="myServiceV2" doc:name="myServiceV2">  
2:      <http:inbound-endpoint exchange-pattern="one-way" host="localhost" port="63012" path="my/service/v2/operation" doc:name="myService"/>  
3:      <object-to-string-transformer doc:name="Object to String"/>  
4:      <message-properties-transformer doc:name="Message Properties">  
5:        <add-message-property key="srcPayload" value="#[payload:]"/>  
6:      </message-properties-transformer>  
7:      <set-session-variable value="#[payload:]" variableName="srcPayload" doc:name="Session Variable" />  
8:      <logger message="Request SRC message is #[payload:]" level="INFO" doc:name="Logger"/>  
9:      <vm:outbound-endpoint exchange-pattern="one-way" path="unprocessedOriginalMessageStoreV2" responseTimeout="20000" mimeType="text/plain" doc:name="Dispatch to audit"/>  
10:      </flow>  
11:        
12:    <flow name="unprocessedOriginalMessageStoreV2" doc:name="unprocessedOriginalMessageStoreV2">  
13:      <vm:inbound-endpoint exchange-pattern="one-way" path="unprocessedOriginalMessageStoreV2" doc:name="VM">  
14:      </vm:inbound-endpoint>  
15:        <object-to-string-transformer />  
16:      <expression-transformer doc:name="Expression">  
17:        <return-argument evaluator="header" expression="session:srcPayload"/>  
18:      </expression-transformer>  
19:      <jms:outbound-endpoint queue="esb.unprocessed.original.message" connector-ref="jmsPalEsbXAConnector" doc:name="JMS">  
20:          <jms:transaction action="ALWAYS_BEGIN"/>  
21:      </jms:outbound-endpoint>  
22:    </flow>  
23:    


Next, write another flow that reads from the message queue and processes the message. Note the transactions, the exception handling strategy and the rollback of the message.

1:    <flow name="processMyRequests" doc:name="processMyRequests">  
2:      <jms:inbound-endpoint queue="esb.unprocessed.original.message" connector-ref="jmsPalEsbXAConnector" doc:name=" processor">  
3:        <xa-transaction action="ALWAYS_BEGIN" timeout="60000"/>  
4:      </jms:inbound-endpoint>  
5:      <vm:outbound-endpoint exchange-pattern="one-way" path="jmsTask" responseTimeout="20000" mimeType="text/plain" doc:name="Dispatch to processing">  
6:        <xa-transaction action="ALWAYS_JOIN" timeout="60000"/>  
7:      </vm:outbound-endpoint>  
8:    </flow>  
9:      
10:    <flow name="processMyRequestJMS" processingStrategy="synchronous">  
11:      <vm:inbound-endpoint exchange-pattern="one-way" path="jmsTask" doc:name="VM">  
12:        <xa-transaction action="BEGIN_OR_JOIN" timeout="60000"/>  
13:      </vm:inbound-endpoint>  
14:      <set-session-variable value="#[payload:]" variableName="srcPayload" doc:name="Session Variable" />  
15:      <cxf:jaxws-service serviceClass="com.mycompany.operations.v1.IMemberManager" doc:name="SOAP Service"/>  
16:      <set-session-variable value="N" variableName="skipRetry" doc:name="Variable" />  
17:      <processor-chain doc:name="Processor Chain">  
18:        <message-properties-transformer scope="invocation" doc:name="Message Properties">  
19:          <add-message-property key="MessageHeader" value="#[groovy:payload.find{it instanceof javax.xml.ws.Holder}.value]"/>  
20:          <add-message-property key="MyRequest" value="#[groovy:payload.find{it instanceof com.mycompany.member.MyRequest}]"/>  
21:          <delete-message-property key=".*"/>  
22:        </message-properties-transformer>  
23:        <set-session-variable variableName="skipRetry" value="N" doc:name="Variable"/>  
24:        <choice doc:name="Choice">  
25:          <when expression="#[groovy:validationService.isValidValue(payload[1].data.value)]">  
26:            <processor-chain>  
27:              <custom-transformer class="com.mycompany.transformer.MyRequestTransformer" doc:name="Java"/>  
28:              <choice doc:name="Choice">  
29:                <when expression="groovy:message.getInvocationProperty('requestValid') == 'Y'">  
30:                  <processor-chain>  
31:  <!-- LEARN: Call the client webservice -->  
32:                    <flow-ref name="webServiceClientV2" doc:name="Invoke WebService"/>  
33:  <!-- LEARN: This is the custom transformer that reads the response on successful call of client and sets the skip property -->  
34:  <!-- LEARN: 'skip' is set to 'D' (for Done) if call was successfull; -->  
35:  <!-- LEARN: 'skip' is set to 'Y' if call was successfull but response indicated exception was non-recoverable; (so no retries but write to original msg to JMS Queue)-->  
36:  <!-- LEARN: 'skip' is set to 'N' if call was successfull but response indicated exception was recoverable; (so retry); -->  
37:                    <custom-transformer class="com.mycompany.transformer.MyResponseTransformer" doc:name="Java"/>  
38:                                      <choice>  
39:                                          <when evaluator="groovy" expression="message.getInvocationProperty('skip') != 'D'">  
40:  <!-- LEARN: When 'skip' is NOT 'D' then set the value of skip to a session or flow variable and throw a dummy exception-->   
41:                                              <set-session-variable value="#[groovy:message.getInvocationProperty('skip')]" variableName="skipRetry" doc:name="Variable" />  
42:                                              <scripting:transformer doc:name="Create Dummy Exception">  
43:                                            <scripting:script engine="groovy">  
44:                                              <scripting:text><![CDATA[throw new java.lang.Exception('IGNORE THIS EXCEPTION');]]></scripting:text>  
45:                                            </scripting:script>  
46:                                          </scripting:transformer>  
47:                            </when>  
48:                                      </choice>  
49:                  </processor-chain>  
50:                </when>  
51:                <otherwise>  
52:                  <processor-chain>  
53:                    <flow-ref name="errorHandlerBadInput" doc:name="errorHandlerBadInput"/>  
54:                  </processor-chain>  
55:                </otherwise>  
56:              </choice>  
57:            </processor-chain>  
58:          </when>  
59:          <otherwise>  
60:            <processor-chain>  
61:              <flow-ref name="defaultErrorHandlerV2" doc:name="defaultErrorHandlerV2"/>  
62:            </processor-chain>  
63:          </otherwise>  
64:        </choice>  
65:      </processor-chain>  
66:      <choice-exception-strategy doc:name="Choice Exception Strategy">  
67:  <!-- LEARN: When dummy exception thrown, check the flow or session variable 'skipRetry' ; If do not skip, then rollback JMS transaction to re queue the JMS message -->  
68:         <rollback-exception-strategy when="#[groovy:message.getProperty('skipRetry', org.mule.api.transport.PropertyScope.SESSION) == 'N' || exception.causedBy(java.net.UnknownHostException) || exception.causedBy(java.net.ConnectException)]" maxRedeliveryAttempts="2">  
69:            <on-redelivery-attempts-exceeded>  
70:              <logger message="STRATEGY ROLLBACK WITH LIMITED RETRIES #[payload:]" level="INFO" doc:name="Logger"/>  
71:              <flow-ref name="jmsAuditDirectWriteV2" doc:name="Log Unprocessed Message to Original Queue"/>  
72:            </on-redelivery-attempts-exceeded>  
73:        </rollback-exception-strategy>  
74:    
75:  <!-- LEARN: When dummy exception thrown, check the flow or session variable 'skipRetry' ; If do not skip, then place the original message back on main processing queue for reprocessing -->  
76:  <!-- LEARN: If you uncomment out this forever strategy AND comment out the above ROLLBACK strategy, you get retry forever -->  
77:  <!--      <catch-exception-strategy when="#[groovy:message.getProperty('skipRetry', org.mule.api.transport.PropertyScope.SESSION) == 'N' || exception.causedBy(java.net.UnknownHostException) || exception.causedBy(java.net.ConnectException)]">  
78:          <logger message="STRATEGY RETRY FOREVER #[payload:]" level="INFO" doc:name="Logger"/>  
79:              <flow-ref name="unprocessedOriginalMessageStoreV2" doc:name="Log Unprocessed Message to Original Queue"/>  
80:        </catch-exception-strategy>  
81:  -->  
82:        <catch-exception-strategy when="#[groovy:message.getProperty('skipRetry', org.mule.api.transport.PropertyScope.SESSION) == 'Y']" doc:name="Write to Error Queue">  
83:          <logger message="STRATEGY WRITE TO ERROR QUEUE; SKIP RETRY; #[payload:]" level="INFO" doc:name="Logger"/>  
84:          <flow-ref name="jmsAuditDirectWriteV2" doc:name="Log Unprocessed Message to Error Queue"/>  
85:        </catch-exception-strategy>  
86:        <catch-exception-strategy doc:name="Write to Error Queue">  
87:          <logger message="STRATEGY DEFAULT WRITE TO ERROR QUEUE; #[payload:]" level="INFO" doc:name="Logger"/>  
88:          <flow-ref name="jmsAuditDirectWriteV2" doc:name="Log Unprocessed Message to Error Queue"/>  
89:        </catch-exception-strategy>  
90:      </choice-exception-strategy>  
91:    </flow>  
92:    <sub-flow name="webServiceClientV2" doc:name="webServiceClientV2">  
93:      <logger message="client 0 : #[payload:]" level="ERROR" doc:name="Log Error"/>  
94:      <cxf:jaxws-client operation="MyRequest" serviceClass="com.mycompany.operations.IMemberManager" doc:name="Webservice Client">  
95:        <cxf:outInterceptors/>  
96:      </cxf:jaxws-client>  
97:      <http:outbound-endpoint exchange-pattern="request-response" address="http://..." doc:name="request"/>  
98:    </sub-flow>  
99:    <sub-flow name="jmsAuditDirectWriteV2" doc:name="jmsAuditDirectWriteV2">  
100:      <expression-transformer doc:name="Expression">  
101:        <return-argument evaluator="header" expression="session:srcPayload"/>  
102:      </expression-transformer>  
103:      <jms:outbound-endpoint queue="esb.unprocessed.request.err" connector-ref="jmsPalEsbXAConnector" doc:name="JMS"/>  
104:    </sub-flow>  
105:    <flow name="unprocessedOriginalMessageStoreV2" doc:name="unprocessedOriginalMessageStoreV2">  
106:      <expression-transformer doc:name="Expression">  
107:        <return-argument evaluator="header" expression="session:srcPayload"/>  
108:      </expression-transformer>  
109:        <object-to-string-transformer />  
110:      <logger message="JMS message is #[payload:]" level="INFO" doc:name="Logger"/>  
111:      <jms:outbound-endpoint queue="esb.unprocessed.original.message" connector-ref="jmsPalEsbXAConnector" doc:name="JMS">  
112:      </jms:outbound-endpoint>  
113:    </flow>  
114:  </mule>  
115:    


The Custom Response Transformer

Within com.mycompany.transformer.MyResponseTransformer, this happens:
// jsr is the unmarshalled response returned from the web service I called.
// It carries a status code. A non-zero value is a failure in processing.
// The resendFlag indicates whether it was a recoverable exception (resend=true)
// or a non-recoverable exception (resend=false)

 
1:  returnValue.setMessage(jsr.getMessage());  
2:  returnValue.setStatus(jsr.getStatus());  
3:  if (jsr.getStatus().codeValue() == 0) {  
4:      logger.info("Client SUCCESSFULLY PROCESSED with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
5:      message.setInvocationProperty("skip", "D");  
6:  } else {  
7:    if (!jsr.isResend()) {  
8:          logger.info("Client FAILED PROCESSING with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
9:          message.setInvocationProperty("skip", "Y"); //do not resend; store;  
10:  } else {  
11:          logger.info("Client requested a RESEND of the request with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
12:          message.setInvocationProperty("skip", "N");  
13:          }  
14:      }  
15:  }   




Mule 3.3.1 Using Until-Successful To Skip Retries

Introduction

Mule 3.x brings with it a new feature called Until-Successful to loop through some flow tasks where you can specify the number of retries, time between retries and an action to be performed upon retry exhaustion. It also allows you specify a failure condition in addition to the default failure conditions (network issues etc).

However, it is pretty rigid in that the configuration does not let you break out of retries on demand. In other words, your code inside the Until-successful flow succeeded BUT based on a response code (from your web service call) or any other condition, you just need to stop retrying and go straight to the failure condition or simulate the failure condition.

Your requirement is that: You need to try calling a web service from your Mule Flow.

Case 1 If service is unavailable (timeout, 500 etc), retry for a specified number of times and if all retries fail, write the original payload to a JMS message queue.

Case 2 If service is available, the call was successful BUT the web service response indicates that processing failed but operation could be tried again (a recoverable exception on the web service end), then retry the call.

Case 3 If service is available, the call was successful BUT the web service response indicates that the processing  failed (non-recoverable exception on the web service end - meaning no matter how many times you retry, processing will fail), then skip out of retries but still write the original payload to a JMS message queue.

The Most Obvious Solution (does not work)

If you dig deep into mule code, you will see the way mule decides the number of times to retry is by comparing the current retry number against the maxRetries configuration value. If current retry number exceeds the maxRetries, then the assumption is that it has tried to call the client web service 'x' number of times but has failed and so, will now go ahead and execute the failure flow.

Also, if you print out the Mule message, you will see that the current retry number data is in an invocation scoped variable called: process.attempt.count. You would be forgiven if you think, in Case 3 above, you can update this to a value higher than maxRetries and the Until-Successful code would see that and exit the loop and execute the failure condition. But alas no! Before entering the loop, Mule stores the original message in an in memory storage structure and reads that to try it every time. Thus, any invocation property you modify within the until-successful code will not be available because the failed message is discarded and the original message is read from memory for each retry.

The Not So Obvious Solution (and it works!!):

As you can see, first the message received is stored in a session variable for later use. Follow the <!-- LEARN:... tag for notes on what's happening in the flow.
The key takeaway is that you push all processing to a sub-flow of the Until-Successful loop. Within there, you handle your recoverable and non-recoverable exception conditions.
If you wish to simulate failure in any case, you can throw an exception using Groovy Script transformer which tells the Until-Successful in the main loop that processing failed and the message has to be retried.
In the example below, in case of an non-recoverable exception, the JMS message is written and a success is simulated in the sub-flow so that Until-Successful gets out of the loop assuming successful execution. For the recoverable exception, a failure is indicated by settng a message invocation property that is part of the failure expression check.

1:  <vm:endpoint exchange-pattern="one-way" path="unprocessedMessageStore" name="unprocessedMessageStore" doc:name="VM"/>  
2:  <flow name="myService" doc:name="myService">  
3:       <http:inbound-endpoint exchange-pattern="one-way" host="localhost" port="63012" path="my/service/path/v1/operation" doc:name="myService"/>  
4:       <object-to-string-transformer doc:name="Object to String"/>  
5:       <message-properties-transformer doc:name="Message Properties">  
6:              <add-message-property key="srcPayload" value="#[payload:]"/>  
7:       </message-properties-transformer>  
8:       <set-session-variable value="#[payload:]" variableName="srcPayload" doc:name="Session Variable" />  
9:  <!-- LEARN: This is a Web Service hosted from within Mule which receives the request from a client -->  
10:       <cxf:jaxws-service serviceClass="com.mycomp.member.IMemberManager" doc:name="SOAP Service"/>  
11:       <processor ref="mdcLogging"></processor>  
12:           <processor-chain doc:name="Processor Chain">  
13:              <message-properties-transformer scope="invocation" doc:name="Message Properties">  
14:              <add-message-property key="MessageHeader" value="#[groovy:payload.find{it instanceof javax.xml.ws.Holder}.value]"/>  
15:                  <add-message-property key="MyRequest" value="#[groovy:payload.find{it instanceof com.mycompany.MyRequest}]"/>  
16:                  <delete-message-property key=".*"/>  
17:              </message-properties-transformer>  
18:              <choice doc:name="Choice">  
19:                  <when expression="#[groovy:validationService.isValidValue(payload[1].data.value)]">  
20:                  <processor-chain>  
21:  <!-- LEARN: This is a custom request transformer -->  
22:            <custom-transformer class="com.mycompany.transformer.MyRequestTransformer" doc:name="Java"/>  
23:                      <choice>  
24:  <!-- LEARN: This verifies that the request is valid -->  
25:              <when evaluator="groovy" expression="message.getInvocationProperty('requestValid') == 'Y'">  
26:  <!-- LEARN: This is the until successful with a failure expression that checks that the invocation property indicates processing failure -->  
27:                <until-successful objectStore-ref="objectStore" maxRetries="5"   
28:                              secondsBetweenRetries="10" failureExpression="groovy:message.getInvocationProperty('callAction') == '500'"  
29:                              deadLetterQueue-ref="unprocessedMessageStore" doc:name="Until Successful">  
30:  <!-- LEARN: The key is to move all processing to a sub-flow where you can catch recoverable and unrecoverable exceptions. -->  
31:                  <processor-chain>  
32:                                      <flow-ref name="mySubFlow"/>  
33:                                  </processor-chain>  
34:                              </until-successful>  
35:                          </when>  
36:                          <otherwise>  
37:                              <flow-ref name="errorHandlerBadInput" doc:name="errorHandlerBadInput"/>  
38:                          </otherwise>  
39:                      </choice>  
40:                  </processor-chain>  
41:                  </when>  
42:              </choice>  
43:          </processor-chain>  
44:      <catch-exception-strategy doc:name="Catch Exception Strategy">  
45:          <flow-ref name="jmsAuditDirectWrite" doc:name="Log Unprocessed Message to Queue"/>  
46:    </catch-exception-strategy>  
47:  </flow>  
48:    
49:    <sub-flow name="myWebServiceClient" doc:name="myWebServiceClient">  
50:          <cxf:jaxws-client operation="MyRequest" serviceClass="com.my.client.v1.IMemberManager" doc:name="Webservice Client">  
51:              <cxf:outInterceptors/>  
52:          </cxf:jaxws-client>  
53:          <http:outbound-endpoint exchange-pattern="request-response" address="http://..." doc:name="myClientRequest"/>  
54:      </sub-flow>  
55:      <sub-flow name="jmsAuditDirectWrite" doc:name="jmsAuditDirectWrite">  
56:      <expression-transformer>  
57:      <return-argument expression="session:srcPayload" evaluator="header" />  
58:      </expression-transformer>  
59:      <jms:outbound-endpoint queue="esb.unprocessed.request.err" connector-ref="jmsEsbXAConnector" doc:name="JMS"/>  
60:      </sub-flow>  
61:    
62:      <sub-flow name="mySubFlow">  
63:          <processor-chain>  
64:  <!-- LEARN: Call the client web service. -->  
65:              <flow-ref name="myWebServiceClient" doc:name="Invoke WebService"/>  
66:  <!-- LEARN: This is the custom transformer of the response ( See code snippet below) that brings back with it a status code (which acts like a non-recoverable exception or a recoverable exception exception as the case may be. -->  
67:         <custom-transformer class="com.mycompany.transformer.MyResponseTransformer" doc:name="Java"/>  
68:         <choice>  
69:  <!-- LEARN: This is the condition that says this is a non-recoverable exception and hence, write to a JMS queue and skip the loop. -->  
70:           <when evaluator="groovy" expression="message.getInvocationProperty('skip') == 'Y'">  
71:                      <flow-ref name="jmsAuditDirectWrite" doc:name="Log Unprocessed Message to Queue"/>  
72:                      <message-properties-transformer scope="invocation" doc:name="Message Properties">  
73:  <!-- LEARN: This is the property that fails the failure expression on the Until-Successful loop and hence until successful loop processing stops -->  
74:                          <add-message-property key="callAction" value="202Y"/>  
75:                      </message-properties-transformer>  
76:                  </when>  
77:                  <when evaluator="groovy" expression="message.getInvocationProperty('skip') == '202Y'">  
78:  <!-- LEARN: This is the condition that says this message was processed successfully and hence skip the loop. -->  
79:                      <message-properties-transformer scope="invocation" doc:name="Message Properties">  
80:  <!-- LEARN: This is the property that fails the failure expression on the Until-Successful loop and hence until successful loop processing stops -->  
81:              <add-message-property key="callAction" value="202Y"/>  
82:                      </message-properties-transformer>  
83:                  </when>  
84:              </choice>  
85:          </processor-chain>  
86:      </sub-flow>  
87:    


The Custom Response Transformer

Within com.mycompany.transformer.MyResponseTransformer, this happens:
// jsr is the unmarshalled response returned from the web service I called.
// it carries a status code. A non-zero value is a failure in processing.
// the resendFlag indicates whether it was a recoverable exception (resend=true)
// or a non-recoverable exception (resend=false)

1:  returnValue.setMessage(jsr.getMessage());  
2:  returnValue.setStatus(jsr.getStatus());  
3:  if (jsr.getStatus().codeValue() == 0) {  
4:    message.setInvocationProperty("callAction", "202");  
5:      logger.info("Client SUCCESSFULLY PROCESSED with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
6:      message.setInvocationProperty("skip", "202Y");  
7:  } else {  
8:    if (!jsr.isResend()) {  
9:          message.setInvocationProperty("callAction", "500");  
10:          logger.info("Client FAILED PROCESSING with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
11:          message.setInvocationProperty("skip", "Y"); //do not resend; store;  
12:  } else {  
13:          message.setInvocationProperty("callAction", "500");  
14:          logger.info("Client requested a RESEND of the request with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
15:          message.setInvocationProperty("skip", "N");  
16:          }  
17:      }  
18:  }   
19: