Wednesday, October 10, 2012

An Intro to Mongo DB

Introduction
 Glossary of a few terms you need to know as you read on:
 Sharding (or Horizontal partitioning) is a database design principle whereby the content of a database table are split across physical locations by rows instead of by columns (using referential integrity). Each partition forms part of a shard. Multiple shards together provide a complete data set but the partitioned shards are split logically to ensure faster reads and writes.
Database Replication is the process of sharing data between the primary and one or more redundant databases, to improve reliability, fault-tolerance, or accessibility. Typically, data is immediately copied over to backup location upon write so as to be available for recovery and/or read-only resources.
Mongo DB
MongoDB is a document-oriented database management system. It adheres to the NoSQL core concepts in being distributed, schema-free and highly performant for fast reads and writes. As a database built from the ground up, it is specifically designed for high read and write throughput, storing data as documents in a distributed and sharded environment all the while running on commodity servers.
MongoDB is especially suited for Web applications that require a rich domain model. Since a document can intuitively represent rich hierarchical model, one can do away with complicated data structures for managing relationships (a.k.a multi-table joins). A document can contain all the data related to a domain object without having to join to other data structures, resulting in faster reads, writes and more intuitive data management. MongoDB supports database replication with automatic failover support. It is designed for horizontal scaling, which I explain later in the post.
The Data Model
Since MongoDB is a document-oriented database, data stored by MongoDB is in the form of a document that is hierarchical and has attributes that can be easily ready by human and machine alike. The best way to illustrate this is by an example. Listing A shows a sample document that represents an employee in an organization.
Listing A

{ _id: ObjectID('5ddfaf7c7649b7e7hd638d65'),
firstName: 'Mike',
lastName: 'Smith,
department: 'Finance',
skills: ['SAP Finance','CFP','CPA'],
salary:100000,
image: {url:'companyserver/location/msmith.jpg'}
}

The “employee” document is simply a JSON document, which means it is a set of key/value pairs of data. Keys represent the name of the fields and the values are either a specific value or could in turn represent a nested document, an array of data, or an array of documents. If you had to represent this in a relational model, you will end up with multiple tables to represent the entities, multiple join tables and referential integrity constraints. To extract data, you will find yourself writing a complex SQL query that joins these flat structured one-dimensional tables. But, a document would represent the person as a whole with all data self-contained resulting in being more intuitive to use.
Because you always operate on a document as whole, you get atomic operations on the document.
Querying and Indexes
The code to query data in these documents also ends up being more intuitive to read. For instance, to find all employees and to find a particular emloyee, the code would look like this:
Listing B:

db.employees.find();
db.employees.find({‘firstName’:’Mike’, ‘lastName’:’Smith’});

Now, if you spend a few more minutes to look at this document, you would be thinking of various other ways your clients would need this data – all employees by department, employees making more than $90,000, employees with particular skills etc. You would also rightfully wonder if these queries would be fast. But what is critical for you to know is that these queries are also very efficient because MongoDB allows you to define secondary indexes on the document. You would add the following indexes and the query would run quite efficiently.
Listing C:

db.employees.ensureIndex({lastName: 1});
db.employees.ensureIndex({skills: 1});
db.employees.ensureIndex({salary: 1});
db.employees.find({'skills': 'CFP'});
db.employees.find({'salary': {'$gt': 90000}});
 
Database Replication
One of the biggest advantages of NoSQL databases and MongoDB in particular is its database replication features. MongoDB’s offering is called a replica set, which is a set of MongoDB database servers that operate seamlessly as an unit offering one primary database server for writes and (potentially reads) and at least one (but typically more than one) redundant failover servers. The failover servers are referred to as secondary servers and can be used for scaling data reads. The data in the secondary servers is kept in sync by the replication process in real-time and in case of a failure in the primary server, one of the secondary servers kicks in automatically as primary and the remaining servers continue to be secondary. The choice of which server becomes primary can be decided by a configurable arbitration process. Eventually, when the former primary comes back online and rejoins the cluster, it does so as a secondary server. This is what is known as a replicated set.

Scaling
If you have been involved in building and running any enterprise application or large scale web application, it is very likely that you have been faced with scenarios where performance has degraded for your end-users over time. With existing architectures involving application servers and a large database server, you may have taken the approach of beefing up the hardware on your application servers and database servers, also known as vertical scaling. The drawback of such a solution is that there is a finite amount of opportunity available with your application servers and database servers after which, you cannot benefit from upgrades while being pragmatic about costs. Also, you will notice a glaring weakness – your database server is a single point of failure. If the server fails, so does your application ecosystem. Granted you may have backup, but to load the backup data and bring the server back online maynot be a trivial effort.
Horizontal scaling (Sharding) involves scaling out – instead of concentrating on improving one server, you focus on distributing data across multiple smaller servers. You continue with your existing hardware profile, perhaps smaller commodity servers, add more of these servers into your ecosystem and distribute your data across this farm of servers. In this architecture, the entire set of servers together isyour database but each server in the farm contains only a slice of data. In such a scenario, it is easier to imagine that a failure of one server means you lose a small chunk of data while retaining most of it and having it available to your applications. To extend this further and couple with what you learned about replication in the previous section, if each server is a replicated set, you are almost guaranteed to have the smallest down time in case of a catastrophic failure in a particular server.
Mongo DB was designed to operate as a distributed database with support for auto-sharding,  which automatically manages the distribution of data across nodes. Adding a shard is easy and failover is handled by MongoDB’s sharding system. If you ensure that each shard operates as a replica set as discussed earlier, you would eliminate single-points of failure. MongoDB’s sharded cluster management also ensures that developers need not worry about the location of data and only need to address the sharded cluster as if it were a single MongoDB server.
Each shard in MongoDB consists of one or more servers running a mongod process. A collection is sharded based on a shard key which can be defined to use a certain pattern similar to defining an index. MongoDB's sharding is order-preserving; adjacent data by shard key tend to be on the same server. This page provides more detail on MongoDB's sharding policy.
Data Integrity
MongoDB offers an option called “Journaling”, a feature that addresses data integrity by recording every database write operation as a running incremental log in addition to writing to the database. This journal would then be used in case of a catastrophic failure to replay the write operations to bring the database back to its original state prior to the crash. An alternative to journaling would be run each server as a replicated set, but journaling is always recommended.
What do you give up?
RDBMS have long rules the database world for everything from simple persistence to complex analytical database engines. It is inevitable that NoSQL databases are compared to relative to RDBMS features. Here are a few things of note:
By using MongoDB, you give up the notion of atomic transactions (ACID) across domains. While updating a single document is an atomic transaction in MongoDB, writing to two separate documents (tables in RDBMS) is not a single transaction. One could succeed while the other could fail. You have to design for this by a separate strategy.
There is no concept equivalent to table joins and foreign key constraints in MongoDB.
By nature of a document-oriented model without joins and key constraints, you will end up with a more de-normalized model with MongoDB than you would expect with a similar RDBMS model.
You have to have a certain tolerance for some (but rare) loss of data because of in-memory manipulation of data in MongoDB as opposed to physical disk persistence in RDBMS databases.
By using MongoDB, you will give up some of the powerful SQL Querying constructs supported by RDBMS databases.
Summary
I have presented an overview of MongoDB, one of the most popular NoSQL databases around today. MongoDB is a strong and viable candidate for any company to consider as an alternative to relational databases or to other NoSQL databases. It is designed to be scalable, distributed, easy to administer and to run on commodity servers.
However, it is important to know that a NoSQL database may not be the right option for every use case and further, even if you have a strong case for a NoSQL database, MongoDB may still not be the right choice for you.
Consider the following as you determine a fit your usage scenarios:
  1. MongoDB is document oriented and within each document, maintains a key/value notation to store data. This format supports rich domains usually found in web applications, analytics, logging.
  2. The MongoDB document can act as a primary storage for your domain. The rich data model allows you to define secondary indexes for faster queries.
  3. MongoDB is schema-less. This allows you to model a domain whose attributes are expected to vary over time.
  4. Auto-sharding lets you operate your database as a distributed database and if your domain has naturally shardable attributes, then MongoDB is the best choice.
  5. MongoDB’s scaling features allows you to support a large database and still meet speed and availability specifications typically required of web applications.
  6. Not having to worry much about DBA activities, developers control the data format which leads to seamless development and enhancements to existing features all resulting in faster delivery.
  7. Community and Commercial support for MongoDB is significant. 10gen is the company behind MongoDB and seems committed to its success. There is a significant developer community excited about the product and its future.
  8. MongoDB has been deployed in many production environments around the world and has been proven to be a success. Craigslist (Craigslist uses MongoDB to archive billions of records), SAP (SAP uses MongoDB as a core component of SAP’s platform-as-a-service (PaaS) offering), Foursquare (to store venues and user check-ins) are some of the companies using MongoDB in production.
For documentation, instructions on installing and tutorials, visit www.mongodb.org.

Neo4j - A Graph Database

Introduction

This blog looks at Neo4j, a graph Database.Glossary of a few terms you need to know as you read on:
Sharding (or Horizontal partitioning) is a database design principle whereby the content of a database table are split across physical locations by rows instead of by columns (using referential integrity). Each partition forms part of a shard. Multiple shards together provide a complete data set but the partitioned shards are split logically to ensure faster reads and writes.
Database Replication is the process of sharing data between the primary and one or more redundant databases, to improve reliability, fault-tolerance, or accessibility. Typically, data is immediately copied over to backup location upon write so as to be available for recovery and/or read-only resources.

Neo4j

Neo4j is a Graph Oriented NoSQL database. This is distinct from the other three categories – Key-Value, Big Table, Document databases. These last three categories are all described as Aggregate databases. What this means is these databases define their basic unit of persistence (or the domain) as an chunk of data that holds meaning when read/written as a chunk. According to Martin Fowler (http://martinfowler.com/bliki/AggregateOrientedDatabase.html), an aggregate is “a fundamental unit of storage which is a rich structure of closely related data: for key-value stores it's the value, for document stores it's the document, and for column-family stores it's the column family. In DDD terms, this group of data is an aggregate.” These types of databases are very effective when you are querying them with the aggregates as your dimension data and the attributes as your facts. But, when you wish to change your query mode to enforce one or more of your attributes as dimensions, you run into sever performance issues and hence you revert to writing Map-Reduce functions.
Enter Graph Databases. Conceptually, graph databases are built using Nodes and Relationships. Each of those may contain properties. Thus, there isn’t an aggregate of your domain objects but a set of Nodes connected via Relationships. Neo4j and InfiniteGraph are two examples of Graph Databases.

The Data Model

The moment one hears about Nodes and Relationships, one tends to think of RDBMS concepts by instinct. This is a mistake. You have shift your thinking in terms of Nodes and Relationships and not entities and their relationships. What you get with a Graph databases is what you produce as your domain model and you can translate it directly into a Graph. There is no further normalization of your model into tables, relationship tables etc.
Neo4j, as an example of a Graph Database, is schema-less database. It allows you to do bottom-up data model design, is fully ACID compliant. It is written in Java and is available as a standalone database server or an embedded database server.

The circular objects represent Nodes and the arrows represent Relationships. As you can see, there are multiple relationships between Nodes and this artifact came out of a design session for the application. This translates into your Neo4j database design without further work to create tables, columns, keys etc.
Nodes have properties that are basically key-value pairs. By writing you code in Java, you can take advantage of strongly typing your attributes. Every Relationship must have a Start Node and an End Node. The Start and End Nodes can the same. They can also have properties similar to Nodes.

Creating Nodes and Relationships:

 GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase("/sample/mydb");  
 Transaction tx = db.beginTx();  
 try {  
  Node personA = db.createNode();  
  personA.setProperty("name", "Person A");  
  Node projectCRM = db.createNode();  
  projectCRM.setProperty("name", "CRM Project");  
 personA.createRelationshipTo(projectCRM, DynamicRelationshipType.withName("LEADS"));  
  tx.success();  
 } finally {  
  tx.finish();  
 }  

Querying and Indexes

Querying for data in Neo4j is typically programmatic. You can use the Java API available with the distribution to query for data. Neo4j by design, as a Graph, is indexed and hence creating indexes is usually limited to certain app-specific functionality. Creating an index is typically so as to focus on a set of nodes – say most frequently queried, or a logical grouping of nodes etc. So, as a best practice, index only what you need and not every node or relationship. (http://docs.neo4j.org/chunked/milestone/indexing.html)
Lucene is the default indexing implementation for Neo4j. This code snippet enhances the previous snippet by including code to add indexes.

 GraphDatabaseService db = = new GraphDatabaseFactory().newEmbeddedDatabase("/sample/mydb");  
 Transaction tx = db.beginTx();  
 try {  
  Node personA = db.createNode();  
  personA.setProperty("name", "Person A");  
  Node projectCRM = db.createNode();  
  projectCRM.setProperty("name", "CRM Project");  
 personA.createRelationshipTo(projectCRM, DynamicRelationshipType.withName("LEADS"));  
 Index<Node> projects = db.index().forNodes("projects");  
 Index<Relationship> projectTeamMembers = db.index().forRelationships("projectTeamMembers");  
 db.index().forNodes("projects").add(projectCRM, "name",  
  projectCRM.getProperty("name"));  
 tx.success();  
 } finally {  
  tx.finish();  
 }  


Indexes usually are used to retrieve the reference to the set of nodes you are interested in. The real works begins after you have the set of Nodes (or Relationships) you looked for.
For querying data, in addition to the Query API, you have the options to use Traversal API , of which there are two kinds: The Simple Traversal API and the enhanced Traversal API. Both are under active development though the Simple version has been out longer and is more proven. You can also use the REST interface to query data.
Another popular alternative is the Cypher Query language (http://docs.neo4j.org/chunked/stable/cypher-query-lang.html). It is a declarative language that can be compared to SQL in its structure. It is a pattern matching language that walks through the nodes using the indexes already created. Cypher is a powerful tool in your arsenal when working with Neo4j.
There are other tools available at http://tinkerpop.com/ that you will find very useful.

Database Replication and High Availability

Neo4j is a Graph Database and by its concept and design, is not built for sharding. A typical production configuration has a Master-Slave design with more than one Slave servers. Data is replicated across the servers using log shipping for eventual consistency. The Master database writes are fast and immediate and slaves eventually acquire consistency based on a polling schedule that is configurable. On the other hand, writing to a slave means that the data has to be synchronously written to the Master before commits and the other slaves catch up per design. For disaster recovery, you can configure some slaves to be write-only and all they do is hold latest data until a disaster strikes the Master! Then one of those permanent slaves can be elected the master and your apps continue working without missing a beat.

Scaling

By design, Graphs are hard to scale. Nodes can have relationships that span multiple servers in your database server farm. Load-balancing can cause many relationships crossing instances. These are very expensive to traverse, networks are many orders of magnitude slower than in-memory traversals. To achieve scaling to the max, Neo4j uses Java NIO, in-memory caching, and expects a large RAM that it can use to cache data. The cache itself can be sharded by sharing the main memory to keep the cache warm with most frequently used data and when used with sticky sessions, ensures a user get max performance. You can also shard your domain data with some application specific algorithms.

Summary

This is only a brief introduction to Graph Databases with focus on Neo4j. The topic is vast and there is much to learn about this unique NoSQL technology. Neo4j is a wonderfult alternative not only to RDBMS but to other NoSQL databases due to its intuitive design.
However, it is important to know that a NoSQL database may not be the right option for every use case and further, even if you have a strong case for a NoSQL database, MongoDB may still not be the right choice for you.
Consider the following as you determine a fit your usage scenarios:
  1. 1. Neo4j offers a powerful data model especially for connected data.
  2. 2. By its nature, data traversal is very fast compared to RDBMS.
  3. 3. Graph Databases are great for generating recommendations, ratings, web analytics, Social Apps
  4. 4. Compared to other NoSQL database, supports ACID transactions.
  5. 5. It is under active development with a company behind it - http://www.neotechnology.com
  6. 6. A community edition is available and is free
  7. 7. However, HA relies on Master-Slave configuration which limits certain capabilities – sharding is not available out of the box. You have to be creative in design and deployment to achieve desired (or close to desired) results.
  8. 8. Scaling is the same - you have to be creative in design and deployment to achieve desired (or close to desired) results.
  9. 9. Write scaling is harder than read scaling and even though your domain may fit the connected data model, it still may not fit the Graph Database technology due to the non-functional requirements of your applications.
For documentation, instructions on installing and tutorials, visit www.neo4j.org.

A brief intro to Riak NoSQL Database

Introduction
Glossary of a few terms you need to know as you read on: Sharding (or Horizontal partitioning) is a database design principle whereby the content of a database table are split across physical locations by rows instead of by columns (using referential integrity). Each partition forms part of a shard. Multiple shards together provide a complete data set but the partitioned shards are split logically to ensure faster reads and writes.
Database Replication is the process of sharing data between the primary and one or more redundant databases, to improve reliability, fault-tolerance, or accessibility. Typically, data is immediately copied over to backup location upon write so as to be available for recovery and/or read-only resources.
Riak
Riak is a key-value storage database management system. It is one of the databases that fall under the NoSQL umbrella. Inspired by Amazon’s Dynamo, it is designed to be horizontally scalable, distributed, providing fast reads and writes while being highly available through software and hardware failures.
Riak stores data as key-value pairs with the value being the data that can be encoded following the demands of the application. Keys are organized into buckets, which I will explain in more detail later in the post. Operations on data use HTTP verb like commands PUT, GET and DELETE. In addition to your own data, the value portion of the key-value pair stores metadata – both out of the box and custom. Another powerful feature of Riak is the ability to manage relationships via links which are HTTP Rest-style links. These links are easy to decipher and understand by machine and human alike.
Riak is especially suited when high availability is of primary importance. This is typically for logging, analytics data storage, and when performing aggregation and analytics specific queries. As a database built from the ground up, it is specifically designed for high read and write throughput, providing seamless failover and guaranteed up-time all the while running on commodity servers.
Riak is available as open-source or as an Enterprise version with additional features and technical support from Basho, the company behind Riak.
The Data Model
Riak is a key-value storage database. The key is a unique identifier. The value is an amorphous value that can be custom-encoded as the application requires. Associated with the key, along with the value, is metadata for the data. You can also add custom metadata to the document to supplement the Riak provided metadata. Keys are always associated with a bucket; thus making the bucket-key combination, the unique reference to any data stored.
A Bucket/Key pair is called a Riak object. This Riak object can also store links to other Bucket/Key pairs thus providing the relationships often desired in databases. Because access is via HTTP Rest-style addressing, the metadata is accessible via HTTP Headers when Riak objects are retrieved.
Listing A
“employees” bucket may contain multiple “employee” keys. Each “employee” key has value to represent that employee’s data.
Each employee is addressable as: employees/emp1001 where emp1001 is the employee id, a value that is unique to each employee.
The key emp1001 can now hold data and metadata as shown:

 emp1001.content_type = “application/json”  
 emp1001.data = {name: ‘Mike Smith’, dept:’FIN’,title:’Manager’}  

The default storage back-end for Riak is Bitcask. Riak uses a pluggable API to address the storage back-end and thus can seamlessly address other supported back-ends as needed.
Querying and Secondary Indexes
Querying options in Riak include the three following techniques - writing MapReduce functions, full-text search on data or writing custom indexes and performing http GET operations.
Riak allows clients to query for data using HTTP GET operations. An R value can be set for each fetch where R signifies the minimum number of nodes that must successfully process the query for the query operation to be completed successfully. Riak can also return objects based on links stored on the object.
Riak supports MapReduce operations to execute complex query operations. A series of functions comprise a MapReduce job with the result of one function serving as input to the next in series until the result is finally served back to the client.
A new and improved feature in the recent release of Riak v 1.0 supports secondary indexes for efficient querying. The goal of this feature is to provide simple indexing options on data buckets at data write-time.
For example, when creating an employee record, you can create it within the "employee" bucket with a key which is a unique employee id, add data element to it (name, title) and finally, add index metadata for fields that are expected to be used in querying (skills, salary).

 curl -XPUT -d"{name: 'Mike Smith', dept:'FIN',title:'Manager'}" -H"x-riak-index-skills_bin:accounting"\  
 -H"x-riak-index-salary_int:100000" http://127.0.0.1:8098/riak/employees/employee200  


To then query for employees with "accounting' skills, you would perform the following operation:
 
 curl http://127.0.0.1:8098/buckets/employees/index/skills_bin/accounting  

To then query for employees whose salaries range between 50000 and 150000, you would perform the following operation:

 curl http://127.0.0.1:8098/buckets/employees/index/salary_int/50000/150000  

NOTE: In the above two employee records, there are two fields in addition to the basic data element containg name, department and title. These are: salary and skills. To add a secondary index, the field name must contain a suffix "_bin" for binary fields or "_int" for integer fields.
Of course, if you decided that you should be able to search your employees data by title, you could add another index like below in addition to maintaining the title as part of your basic data.

 curl -XPUT -d"{name: 'Mike Smith', dept:'FIN',title:'Manager'}" -H"x-riak-index-skills_bin:accounting"\
-H"x-riak-index-salary_int:100000"-H"x-riak-index-title_bin:Manager" http://127.0.0.1:8098/riak/employees/employee200  


Database Clustering and Replication
Riak runs as a cluster by default. Each physical Riak server is called a node. Each node has multiple "virtual nodes" within it. In a node, the bucket/key pairs are stored on a ring-like configuration where the ring is segmented into the number of "virtual nodes" running within it. Each "virtual node" is responsible for a segment. A cluster of nodes does not have a designated Master node. All nodes are equal and can serve any request from a client. As I mentioned earlier, Riak stores copies of data across the nodes in the cluster. The number of copies stored is configurable and is consistent across nodes in a cluster. Riak promises seamless addition and removal of nodes to and from a cluster.
Riak aims for eventual consistency while focusing on complete availability. Eventual consistency means that your data will be eventually consistent on all nodes over time. The latency is not expected to be more than a few hundred milliseconds and thus is usually acceptable. Nodes focus on being up and available for reads and writes and after that, they replicate data to other nodes.
Data Integrity
Riak is Masterless and hence write and read operations are able to be fulfilled by any node in a cluster. Being a distributed system with multiple copies of data across nodes, Riak has to provide some sort of conflict resolution and write safety. For every update, Riak accepts a ‘W’ value which is a number that represents the minimum number of nodes in a cluster that must report a successful update before the operation is deemed successful. With this safety net, an application can write to a Riak cluster even if one node is (or more nodes are) down. This number, coupled with another configurable number to indicate number of copies of a value to be stored, can be used to optimize the cluster.
Riak also has a configurable option to handle conflicts when it encounters data updated by separate sources. It allows you to configure it so the most recent update wins and is propagated to all nodes or you can configure it to present the differences to the client application for resolution based on a custom algorithm or manual intervention.
What do you give up?
RDBMS have long rules the database world for everything from simple persistence to complex analytical database engines. It is inevitable that NoSQL databases are compared to relative to RDBMS features. Here are a few things of note:
By using Riak, you give up the notion of 100% consistent data in the database in a cluster. Since Riak works on the principle of eventual consistency, you will encounter situations where retrieved data is stale and you application has to be designed to handle such scenarios.
There is no concept equivalent to table joins and foreign key constraints in Riak.
Because content is not typed, queryability is not as efficient as RDBMS.
With the data model being a kay-value pair, you lose the concept of a rich domain with strongly typed data.
You have to have a certain tolerance for latency and conflict resolution for update operations.
By using Riak, you will give up some of the powerful SQL Querying constructs supported by RDBMS databases.
On the bright side, you give up complex administration of RDBMS, the expensive hardware and the potential single point of failure inherent to RDBMS solutions.
Summary
This was a brief introduction to Riak, a popular key-value storage NoSQL database designed and built for high availability applications that can accept eventual consistency. The latency for data consistency being at most a few 100 milliseconds makes it a palatable alternative.
However, it is important to know that a NoSQL database may not be the right option for every use case. And further, even if you have a strong case for a NoSQL database, Riak may still not be the right choice for you.
Consider the following as you determine a fit your usage scenarios:
  1. Riak is a key-value storage database.
  2. Riak can act as primary storage for your web apps but also excels as an ancillary database for your applications to store user session data for web applications, data for web services etc.
  3. It acts as a scalable low-latency database for mobile apps where application availability for data entry is critical no matter the situation.
  4. Riak is schema-less and light-weight. This allows you to model a domain whose attributes are expected to vary over time.
  5. Riak’s high availability feature is a good fit for systems that need to be up for writes under any and all circumstances and eventual consistency is acceptable. In such cases, the merge conflicts are typically easily resolved through the automatic algorithms in Riak so that the databases can self-repair conflicts.
  6. Riak’s scaling features allows you to support a large database and still meet speed and availability specifications typically required of web applications.
  7. Not having to worry much about DBA activities, developers control the data format which leads to seamless development and enhancements to existing features, all resulting in faster delivery.
  8. Being a key-value store databse, Riak is not best suited for ad-hoc querying since the data structure in the value is not clearly defined. This results in data-analytics queries directly on Riak to not perform as well as one may expect.
I have presented an overview of a powerful NoSQL option – Riak, which is a strong and viable candidate for any company to consider as an alternative to relational databases or to other NoSQL databases. It is designed to be scalable, distributed, easy to administer and to run on commodity servers.
For documentation, instructions on installing and tutorials, visit www.basho.com.

Mule ESB Reliability with Exception Strategy

Mule 3.x brings with it some nice exception strategy handling features that you can use to provide reliable message delivery and retrying service calls.

Your requirement is that: You need to try calling a web service from your Mule Flow.

Case 1 If service is unavailable (timeout, 500 etc), retry forever (or a specified number of times and if all retries fail, write the original payload to a JMS message queue).

Case 2 If service is available, the call was successful BUT the web service response indicates that processing failed but operation could be tried again (a recoverable exception on the web service end), then retry the call.

Case 3 If service is available, the call was successful BUT the web service response indicates that the processing  failed (non-recoverable exception on the web service end - meaning no matter how many times you retry, processing will fail), then skip out of retries but still write the original payload to a JMS message queue.

Using JMS, JMS Transactions and Exception Strategies

You break your implementation into two separate main flows.
Flow 1 which you see below receives the request and places it in a JMS queue, transactionally.

1:    <flow name="myServiceV2" doc:name="myServiceV2">  
2:      <http:inbound-endpoint exchange-pattern="one-way" host="localhost" port="63012" path="my/service/v2/operation" doc:name="myService"/>  
3:      <object-to-string-transformer doc:name="Object to String"/>  
4:      <message-properties-transformer doc:name="Message Properties">  
5:        <add-message-property key="srcPayload" value="#[payload:]"/>  
6:      </message-properties-transformer>  
7:      <set-session-variable value="#[payload:]" variableName="srcPayload" doc:name="Session Variable" />  
8:      <logger message="Request SRC message is #[payload:]" level="INFO" doc:name="Logger"/>  
9:      <vm:outbound-endpoint exchange-pattern="one-way" path="unprocessedOriginalMessageStoreV2" responseTimeout="20000" mimeType="text/plain" doc:name="Dispatch to audit"/>  
10:      </flow>  
11:        
12:    <flow name="unprocessedOriginalMessageStoreV2" doc:name="unprocessedOriginalMessageStoreV2">  
13:      <vm:inbound-endpoint exchange-pattern="one-way" path="unprocessedOriginalMessageStoreV2" doc:name="VM">  
14:      </vm:inbound-endpoint>  
15:        <object-to-string-transformer />  
16:      <expression-transformer doc:name="Expression">  
17:        <return-argument evaluator="header" expression="session:srcPayload"/>  
18:      </expression-transformer>  
19:      <jms:outbound-endpoint queue="esb.unprocessed.original.message" connector-ref="jmsPalEsbXAConnector" doc:name="JMS">  
20:          <jms:transaction action="ALWAYS_BEGIN"/>  
21:      </jms:outbound-endpoint>  
22:    </flow>  
23:    


Next, write another flow that reads from the message queue and processes the message. Note the transactions, the exception handling strategy and the rollback of the message.

1:    <flow name="processMyRequests" doc:name="processMyRequests">  
2:      <jms:inbound-endpoint queue="esb.unprocessed.original.message" connector-ref="jmsPalEsbXAConnector" doc:name=" processor">  
3:        <xa-transaction action="ALWAYS_BEGIN" timeout="60000"/>  
4:      </jms:inbound-endpoint>  
5:      <vm:outbound-endpoint exchange-pattern="one-way" path="jmsTask" responseTimeout="20000" mimeType="text/plain" doc:name="Dispatch to processing">  
6:        <xa-transaction action="ALWAYS_JOIN" timeout="60000"/>  
7:      </vm:outbound-endpoint>  
8:    </flow>  
9:      
10:    <flow name="processMyRequestJMS" processingStrategy="synchronous">  
11:      <vm:inbound-endpoint exchange-pattern="one-way" path="jmsTask" doc:name="VM">  
12:        <xa-transaction action="BEGIN_OR_JOIN" timeout="60000"/>  
13:      </vm:inbound-endpoint>  
14:      <set-session-variable value="#[payload:]" variableName="srcPayload" doc:name="Session Variable" />  
15:      <cxf:jaxws-service serviceClass="com.mycompany.operations.v1.IMemberManager" doc:name="SOAP Service"/>  
16:      <set-session-variable value="N" variableName="skipRetry" doc:name="Variable" />  
17:      <processor-chain doc:name="Processor Chain">  
18:        <message-properties-transformer scope="invocation" doc:name="Message Properties">  
19:          <add-message-property key="MessageHeader" value="#[groovy:payload.find{it instanceof javax.xml.ws.Holder}.value]"/>  
20:          <add-message-property key="MyRequest" value="#[groovy:payload.find{it instanceof com.mycompany.member.MyRequest}]"/>  
21:          <delete-message-property key=".*"/>  
22:        </message-properties-transformer>  
23:        <set-session-variable variableName="skipRetry" value="N" doc:name="Variable"/>  
24:        <choice doc:name="Choice">  
25:          <when expression="#[groovy:validationService.isValidValue(payload[1].data.value)]">  
26:            <processor-chain>  
27:              <custom-transformer class="com.mycompany.transformer.MyRequestTransformer" doc:name="Java"/>  
28:              <choice doc:name="Choice">  
29:                <when expression="groovy:message.getInvocationProperty('requestValid') == 'Y'">  
30:                  <processor-chain>  
31:  <!-- LEARN: Call the client webservice -->  
32:                    <flow-ref name="webServiceClientV2" doc:name="Invoke WebService"/>  
33:  <!-- LEARN: This is the custom transformer that reads the response on successful call of client and sets the skip property -->  
34:  <!-- LEARN: 'skip' is set to 'D' (for Done) if call was successfull; -->  
35:  <!-- LEARN: 'skip' is set to 'Y' if call was successfull but response indicated exception was non-recoverable; (so no retries but write to original msg to JMS Queue)-->  
36:  <!-- LEARN: 'skip' is set to 'N' if call was successfull but response indicated exception was recoverable; (so retry); -->  
37:                    <custom-transformer class="com.mycompany.transformer.MyResponseTransformer" doc:name="Java"/>  
38:                                      <choice>  
39:                                          <when evaluator="groovy" expression="message.getInvocationProperty('skip') != 'D'">  
40:  <!-- LEARN: When 'skip' is NOT 'D' then set the value of skip to a session or flow variable and throw a dummy exception-->   
41:                                              <set-session-variable value="#[groovy:message.getInvocationProperty('skip')]" variableName="skipRetry" doc:name="Variable" />  
42:                                              <scripting:transformer doc:name="Create Dummy Exception">  
43:                                            <scripting:script engine="groovy">  
44:                                              <scripting:text><![CDATA[throw new java.lang.Exception('IGNORE THIS EXCEPTION');]]></scripting:text>  
45:                                            </scripting:script>  
46:                                          </scripting:transformer>  
47:                            </when>  
48:                                      </choice>  
49:                  </processor-chain>  
50:                </when>  
51:                <otherwise>  
52:                  <processor-chain>  
53:                    <flow-ref name="errorHandlerBadInput" doc:name="errorHandlerBadInput"/>  
54:                  </processor-chain>  
55:                </otherwise>  
56:              </choice>  
57:            </processor-chain>  
58:          </when>  
59:          <otherwise>  
60:            <processor-chain>  
61:              <flow-ref name="defaultErrorHandlerV2" doc:name="defaultErrorHandlerV2"/>  
62:            </processor-chain>  
63:          </otherwise>  
64:        </choice>  
65:      </processor-chain>  
66:      <choice-exception-strategy doc:name="Choice Exception Strategy">  
67:  <!-- LEARN: When dummy exception thrown, check the flow or session variable 'skipRetry' ; If do not skip, then rollback JMS transaction to re queue the JMS message -->  
68:         <rollback-exception-strategy when="#[groovy:message.getProperty('skipRetry', org.mule.api.transport.PropertyScope.SESSION) == 'N' || exception.causedBy(java.net.UnknownHostException) || exception.causedBy(java.net.ConnectException)]" maxRedeliveryAttempts="2">  
69:            <on-redelivery-attempts-exceeded>  
70:              <logger message="STRATEGY ROLLBACK WITH LIMITED RETRIES #[payload:]" level="INFO" doc:name="Logger"/>  
71:              <flow-ref name="jmsAuditDirectWriteV2" doc:name="Log Unprocessed Message to Original Queue"/>  
72:            </on-redelivery-attempts-exceeded>  
73:        </rollback-exception-strategy>  
74:    
75:  <!-- LEARN: When dummy exception thrown, check the flow or session variable 'skipRetry' ; If do not skip, then place the original message back on main processing queue for reprocessing -->  
76:  <!-- LEARN: If you uncomment out this forever strategy AND comment out the above ROLLBACK strategy, you get retry forever -->  
77:  <!--      <catch-exception-strategy when="#[groovy:message.getProperty('skipRetry', org.mule.api.transport.PropertyScope.SESSION) == 'N' || exception.causedBy(java.net.UnknownHostException) || exception.causedBy(java.net.ConnectException)]">  
78:          <logger message="STRATEGY RETRY FOREVER #[payload:]" level="INFO" doc:name="Logger"/>  
79:              <flow-ref name="unprocessedOriginalMessageStoreV2" doc:name="Log Unprocessed Message to Original Queue"/>  
80:        </catch-exception-strategy>  
81:  -->  
82:        <catch-exception-strategy when="#[groovy:message.getProperty('skipRetry', org.mule.api.transport.PropertyScope.SESSION) == 'Y']" doc:name="Write to Error Queue">  
83:          <logger message="STRATEGY WRITE TO ERROR QUEUE; SKIP RETRY; #[payload:]" level="INFO" doc:name="Logger"/>  
84:          <flow-ref name="jmsAuditDirectWriteV2" doc:name="Log Unprocessed Message to Error Queue"/>  
85:        </catch-exception-strategy>  
86:        <catch-exception-strategy doc:name="Write to Error Queue">  
87:          <logger message="STRATEGY DEFAULT WRITE TO ERROR QUEUE; #[payload:]" level="INFO" doc:name="Logger"/>  
88:          <flow-ref name="jmsAuditDirectWriteV2" doc:name="Log Unprocessed Message to Error Queue"/>  
89:        </catch-exception-strategy>  
90:      </choice-exception-strategy>  
91:    </flow>  
92:    <sub-flow name="webServiceClientV2" doc:name="webServiceClientV2">  
93:      <logger message="client 0 : #[payload:]" level="ERROR" doc:name="Log Error"/>  
94:      <cxf:jaxws-client operation="MyRequest" serviceClass="com.mycompany.operations.IMemberManager" doc:name="Webservice Client">  
95:        <cxf:outInterceptors/>  
96:      </cxf:jaxws-client>  
97:      <http:outbound-endpoint exchange-pattern="request-response" address="http://..." doc:name="request"/>  
98:    </sub-flow>  
99:    <sub-flow name="jmsAuditDirectWriteV2" doc:name="jmsAuditDirectWriteV2">  
100:      <expression-transformer doc:name="Expression">  
101:        <return-argument evaluator="header" expression="session:srcPayload"/>  
102:      </expression-transformer>  
103:      <jms:outbound-endpoint queue="esb.unprocessed.request.err" connector-ref="jmsPalEsbXAConnector" doc:name="JMS"/>  
104:    </sub-flow>  
105:    <flow name="unprocessedOriginalMessageStoreV2" doc:name="unprocessedOriginalMessageStoreV2">  
106:      <expression-transformer doc:name="Expression">  
107:        <return-argument evaluator="header" expression="session:srcPayload"/>  
108:      </expression-transformer>  
109:        <object-to-string-transformer />  
110:      <logger message="JMS message is #[payload:]" level="INFO" doc:name="Logger"/>  
111:      <jms:outbound-endpoint queue="esb.unprocessed.original.message" connector-ref="jmsPalEsbXAConnector" doc:name="JMS">  
112:      </jms:outbound-endpoint>  
113:    </flow>  
114:  </mule>  
115:    


The Custom Response Transformer

Within com.mycompany.transformer.MyResponseTransformer, this happens:
// jsr is the unmarshalled response returned from the web service I called.
// It carries a status code. A non-zero value is a failure in processing.
// The resendFlag indicates whether it was a recoverable exception (resend=true)
// or a non-recoverable exception (resend=false)

 
1:  returnValue.setMessage(jsr.getMessage());  
2:  returnValue.setStatus(jsr.getStatus());  
3:  if (jsr.getStatus().codeValue() == 0) {  
4:      logger.info("Client SUCCESSFULLY PROCESSED with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
5:      message.setInvocationProperty("skip", "D");  
6:  } else {  
7:    if (!jsr.isResend()) {  
8:          logger.info("Client FAILED PROCESSING with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
9:          message.setInvocationProperty("skip", "Y"); //do not resend; store;  
10:  } else {  
11:          logger.info("Client requested a RESEND of the request with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
12:          message.setInvocationProperty("skip", "N");  
13:          }  
14:      }  
15:  }   




Mule 3.3.1 Using Until-Successful To Skip Retries

Introduction

Mule 3.x brings with it a new feature called Until-Successful to loop through some flow tasks where you can specify the number of retries, time between retries and an action to be performed upon retry exhaustion. It also allows you specify a failure condition in addition to the default failure conditions (network issues etc).

However, it is pretty rigid in that the configuration does not let you break out of retries on demand. In other words, your code inside the Until-successful flow succeeded BUT based on a response code (from your web service call) or any other condition, you just need to stop retrying and go straight to the failure condition or simulate the failure condition.

Your requirement is that: You need to try calling a web service from your Mule Flow.

Case 1 If service is unavailable (timeout, 500 etc), retry for a specified number of times and if all retries fail, write the original payload to a JMS message queue.

Case 2 If service is available, the call was successful BUT the web service response indicates that processing failed but operation could be tried again (a recoverable exception on the web service end), then retry the call.

Case 3 If service is available, the call was successful BUT the web service response indicates that the processing  failed (non-recoverable exception on the web service end - meaning no matter how many times you retry, processing will fail), then skip out of retries but still write the original payload to a JMS message queue.

The Most Obvious Solution (does not work)

If you dig deep into mule code, you will see the way mule decides the number of times to retry is by comparing the current retry number against the maxRetries configuration value. If current retry number exceeds the maxRetries, then the assumption is that it has tried to call the client web service 'x' number of times but has failed and so, will now go ahead and execute the failure flow.

Also, if you print out the Mule message, you will see that the current retry number data is in an invocation scoped variable called: process.attempt.count. You would be forgiven if you think, in Case 3 above, you can update this to a value higher than maxRetries and the Until-Successful code would see that and exit the loop and execute the failure condition. But alas no! Before entering the loop, Mule stores the original message in an in memory storage structure and reads that to try it every time. Thus, any invocation property you modify within the until-successful code will not be available because the failed message is discarded and the original message is read from memory for each retry.

The Not So Obvious Solution (and it works!!):

As you can see, first the message received is stored in a session variable for later use. Follow the <!-- LEARN:... tag for notes on what's happening in the flow.
The key takeaway is that you push all processing to a sub-flow of the Until-Successful loop. Within there, you handle your recoverable and non-recoverable exception conditions.
If you wish to simulate failure in any case, you can throw an exception using Groovy Script transformer which tells the Until-Successful in the main loop that processing failed and the message has to be retried.
In the example below, in case of an non-recoverable exception, the JMS message is written and a success is simulated in the sub-flow so that Until-Successful gets out of the loop assuming successful execution. For the recoverable exception, a failure is indicated by settng a message invocation property that is part of the failure expression check.

1:  <vm:endpoint exchange-pattern="one-way" path="unprocessedMessageStore" name="unprocessedMessageStore" doc:name="VM"/>  
2:  <flow name="myService" doc:name="myService">  
3:       <http:inbound-endpoint exchange-pattern="one-way" host="localhost" port="63012" path="my/service/path/v1/operation" doc:name="myService"/>  
4:       <object-to-string-transformer doc:name="Object to String"/>  
5:       <message-properties-transformer doc:name="Message Properties">  
6:              <add-message-property key="srcPayload" value="#[payload:]"/>  
7:       </message-properties-transformer>  
8:       <set-session-variable value="#[payload:]" variableName="srcPayload" doc:name="Session Variable" />  
9:  <!-- LEARN: This is a Web Service hosted from within Mule which receives the request from a client -->  
10:       <cxf:jaxws-service serviceClass="com.mycomp.member.IMemberManager" doc:name="SOAP Service"/>  
11:       <processor ref="mdcLogging"></processor>  
12:           <processor-chain doc:name="Processor Chain">  
13:              <message-properties-transformer scope="invocation" doc:name="Message Properties">  
14:              <add-message-property key="MessageHeader" value="#[groovy:payload.find{it instanceof javax.xml.ws.Holder}.value]"/>  
15:                  <add-message-property key="MyRequest" value="#[groovy:payload.find{it instanceof com.mycompany.MyRequest}]"/>  
16:                  <delete-message-property key=".*"/>  
17:              </message-properties-transformer>  
18:              <choice doc:name="Choice">  
19:                  <when expression="#[groovy:validationService.isValidValue(payload[1].data.value)]">  
20:                  <processor-chain>  
21:  <!-- LEARN: This is a custom request transformer -->  
22:            <custom-transformer class="com.mycompany.transformer.MyRequestTransformer" doc:name="Java"/>  
23:                      <choice>  
24:  <!-- LEARN: This verifies that the request is valid -->  
25:              <when evaluator="groovy" expression="message.getInvocationProperty('requestValid') == 'Y'">  
26:  <!-- LEARN: This is the until successful with a failure expression that checks that the invocation property indicates processing failure -->  
27:                <until-successful objectStore-ref="objectStore" maxRetries="5"   
28:                              secondsBetweenRetries="10" failureExpression="groovy:message.getInvocationProperty('callAction') == '500'"  
29:                              deadLetterQueue-ref="unprocessedMessageStore" doc:name="Until Successful">  
30:  <!-- LEARN: The key is to move all processing to a sub-flow where you can catch recoverable and unrecoverable exceptions. -->  
31:                  <processor-chain>  
32:                                      <flow-ref name="mySubFlow"/>  
33:                                  </processor-chain>  
34:                              </until-successful>  
35:                          </when>  
36:                          <otherwise>  
37:                              <flow-ref name="errorHandlerBadInput" doc:name="errorHandlerBadInput"/>  
38:                          </otherwise>  
39:                      </choice>  
40:                  </processor-chain>  
41:                  </when>  
42:              </choice>  
43:          </processor-chain>  
44:      <catch-exception-strategy doc:name="Catch Exception Strategy">  
45:          <flow-ref name="jmsAuditDirectWrite" doc:name="Log Unprocessed Message to Queue"/>  
46:    </catch-exception-strategy>  
47:  </flow>  
48:    
49:    <sub-flow name="myWebServiceClient" doc:name="myWebServiceClient">  
50:          <cxf:jaxws-client operation="MyRequest" serviceClass="com.my.client.v1.IMemberManager" doc:name="Webservice Client">  
51:              <cxf:outInterceptors/>  
52:          </cxf:jaxws-client>  
53:          <http:outbound-endpoint exchange-pattern="request-response" address="http://..." doc:name="myClientRequest"/>  
54:      </sub-flow>  
55:      <sub-flow name="jmsAuditDirectWrite" doc:name="jmsAuditDirectWrite">  
56:      <expression-transformer>  
57:      <return-argument expression="session:srcPayload" evaluator="header" />  
58:      </expression-transformer>  
59:      <jms:outbound-endpoint queue="esb.unprocessed.request.err" connector-ref="jmsEsbXAConnector" doc:name="JMS"/>  
60:      </sub-flow>  
61:    
62:      <sub-flow name="mySubFlow">  
63:          <processor-chain>  
64:  <!-- LEARN: Call the client web service. -->  
65:              <flow-ref name="myWebServiceClient" doc:name="Invoke WebService"/>  
66:  <!-- LEARN: This is the custom transformer of the response ( See code snippet below) that brings back with it a status code (which acts like a non-recoverable exception or a recoverable exception exception as the case may be. -->  
67:         <custom-transformer class="com.mycompany.transformer.MyResponseTransformer" doc:name="Java"/>  
68:         <choice>  
69:  <!-- LEARN: This is the condition that says this is a non-recoverable exception and hence, write to a JMS queue and skip the loop. -->  
70:           <when evaluator="groovy" expression="message.getInvocationProperty('skip') == 'Y'">  
71:                      <flow-ref name="jmsAuditDirectWrite" doc:name="Log Unprocessed Message to Queue"/>  
72:                      <message-properties-transformer scope="invocation" doc:name="Message Properties">  
73:  <!-- LEARN: This is the property that fails the failure expression on the Until-Successful loop and hence until successful loop processing stops -->  
74:                          <add-message-property key="callAction" value="202Y"/>  
75:                      </message-properties-transformer>  
76:                  </when>  
77:                  <when evaluator="groovy" expression="message.getInvocationProperty('skip') == '202Y'">  
78:  <!-- LEARN: This is the condition that says this message was processed successfully and hence skip the loop. -->  
79:                      <message-properties-transformer scope="invocation" doc:name="Message Properties">  
80:  <!-- LEARN: This is the property that fails the failure expression on the Until-Successful loop and hence until successful loop processing stops -->  
81:              <add-message-property key="callAction" value="202Y"/>  
82:                      </message-properties-transformer>  
83:                  </when>  
84:              </choice>  
85:          </processor-chain>  
86:      </sub-flow>  
87:    


The Custom Response Transformer

Within com.mycompany.transformer.MyResponseTransformer, this happens:
// jsr is the unmarshalled response returned from the web service I called.
// it carries a status code. A non-zero value is a failure in processing.
// the resendFlag indicates whether it was a recoverable exception (resend=true)
// or a non-recoverable exception (resend=false)

1:  returnValue.setMessage(jsr.getMessage());  
2:  returnValue.setStatus(jsr.getStatus());  
3:  if (jsr.getStatus().codeValue() == 0) {  
4:    message.setInvocationProperty("callAction", "202");  
5:      logger.info("Client SUCCESSFULLY PROCESSED with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
6:      message.setInvocationProperty("skip", "202Y");  
7:  } else {  
8:    if (!jsr.isResend()) {  
9:          message.setInvocationProperty("callAction", "500");  
10:          logger.info("Client FAILED PROCESSING with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
11:          message.setInvocationProperty("skip", "Y"); //do not resend; store;  
12:  } else {  
13:          message.setInvocationProperty("callAction", "500");  
14:          logger.info("Client requested a RESEND of the request with status code + message : " + returnValue.getStatus() + ";;;;" +returnValue.getMessage());  
15:          message.setInvocationProperty("skip", "N");  
16:          }  
17:      }  
18:  }   
19:    

Tuesday, March 6, 2012

Logfile Aggregation and Analytics

One of the more important concerns of any organization that runs any kind of a public website is to be able to troubleshoot the applications and websites they operate. A typical web application runs in the popular architecture of one or more web servers, a load balancer, a cluster of application servers and a database layer. Additionally, there may be other enterprise-level elements like a message broker, a web services layer, an ESB etc.
A common feature of this architecture is that most of the components produce operational log files. Some are out-of-the-box logs, while others are configured and written by each application. Another common feature is the operations group/production support teams striving to support these applications by parsing these log files to troubleshoot issues reported by end users, monitoring apps and internal test teams.
A typical strategy (which really is a lack of strategy) is for a person or a team to download log files from different locations into a central location (or perhaps the logs are already on a specific log file server) and running grep/find/vi commands to search for incidents based on a given timestamp and/or an error message expected to appear in the log file. Many of you reading this will probably be empathizing with this (because you have done the same in the past) while simultaneously shaking your heads pitying those poor souls (because you feel their pain). What then follows as the next logical step is for someone in your team to finally take the initiative to write a utility to search through files and display some sort of a report on a web page with some limited search capabilities. At one point, you will get tired of maintaining this search app and start looking for alternatives in the world at-large. A client of ours was at this point when they asked us to come in and assist in defining their log management strategy. The following blog is a high-level review of three options and a more detailed review of one of those options.
We considered 3 options – not with any particular bias to excluding the others out there. It just so happened that we had some references to the ones we chose. We will first present a couple of commercial solutions that operate on different principles and being commercial, have all the relevant documentation and instructions on their websites.
  • Splunk
  • Loggly
  • Logstash + Graylog2

Splunk

Splunk (www.splunk.com) is a commercial product available for free to download and install. As of this writing, Splunk is free up to 500MB indexed per day. Visit their website for latest pricing and licensing details. Splunk’s feature set has evolved over the years and now offers a rich feature set with a RIA interface to manage your Splunk instance as well as perform analytics and queries. The UI features are impressive and let you administer the instance on the one hand and manage and run queries on your indexed data on the other. Splunk stores incoming data as compressed files. This data is then indexed and stored alongside the raw compressed data for fast and easy querying. Splunk ages your data in 4 levels – hot (current), warm (recent), cold (older than recent) and frozen (typically very old and candidate to be removed from the index). You can also customize maximum storage sizes and schedule to age data as well as chose to move frozen data to backup rather than delete it as Splunk would do by default.
Being a commercial product, you can expect support and an expert knowledgebase behind the product offerings. There are also, out-of-the-box recipes to quickly setup your instance and manage. Splunk can run in a clustered environment for high availability and performance. Additionally, there is a concept of Splunk Apps targeted at common products that are candidates for being monitored – Apache web servers, F5 devices, etc. The query language is reasonably intuitive and is custom but familiar if you have used an indexing engine before.
In my test, I saw the following storage compression ratios:
  • 14 MB - compressed to 1MB raw and 5.7 MB indexed
  • 55 MB - compressed to 5.7 MB raw and 15 MB indexed
Splunk offers a range of products in addition to Log Management – Network Management, Security and Compliance, IT Operations etc.

Loggly

Loggly (www.loggly.com) is another commercial product that offers cloud-based solution for Log management. There is no software to install and manage since it is all hosted by Loggly. You have to configure your applications/infrastructure to forward logs to Loggly’s server using one of their prescribed methods – TCP, HTTP, File upload etc. At the time of this writing, Loggly is free upto 200MB indexed data per day with a 7 day retention policy. The latter is in focus since you upload all data to Loggly and that becomes the basis for your queries and searches. Loggly offers a maximum period of 90 days retention after which you can use Loggly's archive feature to store them in your own S3 bucket. Loggly offers secure logging via TLS/HTTPs for all your data uploads. A rich web interface allows you to manage/administer your account and run queries. The query language is reasonably intuitive and is custom but familiar if you have used an indexing engine before.
When you sign up, you setup a sub-domain with loggly.com that becomes your log management site. You can login, and administer “your site” by creating users, input methods etc. Because there is no software to install, there is no collection agent out-of-the-box; you will need to proactively send data to Loggly using one of their prescribed input methods. Loggly lets you embed a javascript on your web application pages to send data to Loggly directly from the application layer.

Logstash + Graylog2

The third option is perhaps the most interesting yet. We look at two open source/free applications each of which is a complete solution in itself but ends up being better when combined to take advantage of their particular strengths.
Logstash (www.logstash.net) and Graylog2 (www.graylog2.org) are two products that play well together. Logstash can be your “backend” to monitor and collect data from different sources and Graylog2 can be the presentation layer with a richer UI than what Logstash provides.
Logstash is easy to install and run. It works based on a simple Input-Process-Output concept that is configured in a *.conf file. Input specifies which files/sources to read. Process section – formally called “Filter” – defines how the incoming data has to be massaged by applying regular expressions, rules etc. Finally, the output section will define where the data should end up. By default, the current version of logstash uses ElasticSearch engine to index data. You can either run an embedded version of ElasticSearch or point the output to an external instance, which could be a robust clustered instance. One of the options for output is to direct it to Graylog2 running as a separate process. You do this to take advantage of the richer UI features of Graylog2 which are clearly better than Logstash’s features.
Logstash has a variety of ways to read data – TCP, files, syslogs etc. If you choose to just point to your log files for Logstash to read and process, be mindful that this translates into “tail -0” on that file and hence data starts feeding into Logstash from the point in time at which your monitoring begins. Prior data is not read. There are ways to get around this easily but you have to do that manually. You can also run Logstash as a network of file readers (Logstash instances that only read and forward data) on different physical machines that are forwarding data to a single indexing Logstash instance which is responsible for processing data and indexing it for searching or forwarding data to Graylog2.
Graylog2 is another log management tool that Logstash interfaces with seamlessly. Logstash can output data as GELF (Graylog2 Extended Log Format) data to be consumed by Graylog2. For those who wish to aggregate and search application logs written by Java apps using log4j, there is an easy way to write GELF data using the gelf4j jar. You would define a gelf4j appender (https://github.com/pstehlik/gelf4j) in your log4j properties as shown below and voila, you have GELF data. GELF is a reader friendly format that captures log data in JSON format thus making is wonderfully easy to index and search. This makes your Logstash forwarders operate on GELF log data and your process layer should account for that. If you choose to only write using traditional log4j appenders, you would then use Logstash’s feature to convert to GELF before forwarding to Graylog2.

log4j.appender.GELF=com.pstehlik.groovy.gelf4j.appender.Gelf4JAppender
log4j.appender.GELF.graylogServerHost=my.gelf.host
log4j.appender.GELF.host=www13
log4j.appender.GELF.facility=local1
# use the GELF appender. set the level to INFO.
log4j.category.com.my.class.where.i.DoLogging=INFO, GELF

Putting it all together:
  • I chose to use a combination of Logstash and Graylog2. With this choice, you will also end up using MongoDB and ElasticSearch to support your log aggregation. I did not use a gelf4j appender.
  • I chose to monitor file in a location – in this case, where the log files were being written.
  • I chose to monitor logs from a Java App running on Tomcat v6 and Apache Web Server.
Install Mongo DB
Start the mongod shell and do the following:

mongo
use admin
db.addUser('admin', 'grayloguser-password')
db.auth('admin', 'grayloguser-password')
use graylog2
db.addUser('grayloguser', 'grayloguser-password')
db.auth('grayloguser', 'grayloguser-password')

Install ElasticSearch
Here, for my initial research, I have the option to use the embedded ElasticSearch that comes with the Logstash monolithic jar. I chose to use the embedded server and start it up with default settings, and so I did not need a separate install. ElasticSearch will automatically be started during Logstash startup at the default port.
Install Graylog2
After I followed the instructions and copied the sample configuration file, I had to edit the “/etc/graylog2.conf” file. *Note: You may need sudo access.
I updated the mongo db section to update the mongo db Graylog2 database, user id, password and host. I set the host to 127.0.0.1 instead of localhost on my MacBook Pro. The mongo db related setting should be the same as the parameters you used when you configured mongoDB in the “Install MongoDB” step above. Also, once you install the web interface, you will need to update the *.yml files in the “config” directory (specifically mongoid.yml for mongodb properties).
Download Logstash Monolithic Jar
Since I know the input to be files, I now need a way to read this input, transform it a bit to be really useful to me for search and display. Enter Logstash. I downloaded the monolithic jar file from www.logstash.net. (Note: While there, do make some time to watch the Logstash presentation by the author of Logstash.) Once the jar was downloaded, I then created a mylogstash.conf file in my text editor. You can read more about this on the Logstash website. My conf file looks like this:

input {
file {
type => "log4j-ml-55"
path => "/workarea/app40_CPU10_Logs/app40_CPU10_server8.log"
}
}
filter {
multiline {
type => "log4j-ml-55"
pattern => "^\[20"
negate => true
what => "previous"
}
mutate {
type => log4j-ml-55
replace => ["@source_host", "log4j-ml-55"]
}
}
output {
stdout {}
elasticsearch { embedded => true }
gelf {
facility => "logstash-gelf"
host => "127.0.0.1"
sender => "app40_CPU10_server8"
}
}

The filter is specific to the situation in my log file. Since I am collecting logs from a Java app, I need to make sure stack traces are treated as one incident/entry and the expression in the multiline filter pattern tells logstash that if a line does not start with “[20” character set, treat that line as part of the previous line and append it to that line. The mutate filter is interesting. Because my logfiles are from a centralized logging server, the remote host will point to that server. But what I really need as source host is the original host that generated the file and that is precisely what the mutate filter accomplishes.
Start it all up
Make sure MongoDB is up. If using a separate ElasticSearch instance, startup ElasticSearch. If using the embedded ElasticSearch, then start Logstash with the following command.
java -jar logstash-1.1.0beta8-monolithic.jar agent -f mylogstash.conf -- web --backend elasticsearch:///?local

Once Logstash is up and running, startup the Graylog2 server and Graylog2 web client in that order. Visit www.graylog2.org for details on how to start the server and web client.
You are all set. As your application and web logs get written, the pipeline of Logstash and Graylog2 will route it through to Graylog2 Web Interface and you should be able to run searches and analytics.
http://127.0.0.1:3000 is where Graylog2 web interface is available.

Conclusion

Log Monitoring is a critical tool in your troubleshooting arsenal. Based on the information you capture in your log files, it can also be a strategic tool for your company to perform analytics and searches, which in turn enables you to be a better customer-focused organization. Because the quality of your analytic depends on the input data, it may be a good idea to setup your monitoring using free/open-source software like Logstash+Graylog2 and then refine your application code to fine tune your logging input. This is not a feature that is functional on Day 1 of going live. You will have to spend time tuning your entire setup to arrive at optimal input data and output analytics.
Some things to consider:
  • Treat this as a project with a definite charter and expected outcomes
  • Involve business and IT ops to define the analytics output desired
  • Involve your development teams to ensure that the data captured in the log files will support the analytics requirements
  • Ensure buy-in from teams for continuous improvements
  • Prototype with free/open-source tools initially.
This will definitely include involving your business and IT stakeholders to define requirements upfront for analytics. Also, your development teams must dedicate some time to design and implement an effective logging technique to capture the required data in the required format. With a little effort upfront and commitment to continuous improvement, you will get to the Promised Land of log aggregation and analytics.