Slowly, slowly I am seeing some light at the end of the tunnel designing the FABRIQ. It’s a very challenging project and although I am having a lot of fun, it’s really much harder work than I initially thought.

Today I’d like to share some details with you on how I am employing the lightweight transaction coordinator “WorkSet” that Steve Swartz and I wrote during this year’s Scalable Applications tour inside the FABRIQ.

The obvious problem with one-way pipeline processing (and a problem with the composition of independent cross-cutting concerns in general) is that failure management is pretty difficult. Once one of the pipeline components fails, other components may already have done work that might not be valid if the processing fails further down through the pipeline. The simplest example of that is, of course, logging. If you log a message as the first stage of a pipeline and a subsequent stage fails, do you want the log entry to remain where it is? The problem is: it depends. So although you might need to see the message before it is being processed by stages further down the pipeline, you can only find out whether it is flagged as success or failure once processing is complete or you may want to discard the log entry altogether on failure.

Before I go into details, I’ll clarify some of the terminology I am using:

·         A message handler is an object that typically implements a ProcessMessage() method and a property Next pointing to the handler that immediately follows it in a chain of handlers.

·         A pipeline hosts a chain of message handlers and has a “head” and a “tail” message handler which link the pipeline with that chain of handlers. The pipeline itself is a message handler itself, so that pipelines can be nested inside pipelines. The FabriqPipeline is a concrete implementation of such a pipeline that has, amongst other things, support for the mechanism described here.

·         A message is an object representing a SOAP message and has a collection of headers, a body (as an XmlReader) and a transient collection of message properties that are only valid as long as the message is in memory.

·         A work set is a lightweight, in-memory 2PC transaction that provides really only the “atomicity” and “consistency” properties out of the well-known “ACID” transaction property set. “Durability” is not a goal here and “isolation” sort of guaranteed, because messages are not shared resources. If external resources are touched, isolation needs to be guaranteed by the enlisted workers. A worker is a lightweight resource manager that can enlist into a work set and provides Prepare/Commit/Abort entry points.

Whenever a new message arrives at a FabriqPipeline, a new work set is created that governs the fault management for processing the respective message. The work set is associated with the message by creating a “@WorkSet” property on the message that references the WorkSet object. The pipeline itself maintains no immediate reference to the work set – it is message-bound.

public class FabriqWorker : IWorker
{
   private Message msg;
   private FabriqMessageHandler handler;
       
    public FabriqWorker(Message msg, FabriqMessageHandler handler)
    {
         msg = msg;
         handler = handler;
    }
       
     bool IWorker.Prepare(bool vote)
    {
      return handler.Prepare(vote, msg );
    }

    void IWorker.Abort()
    {
       handler.Abort( msg );
    }

    void IWorker.Commit()
    {
       handler.Commit( msg );
    }
}

 

The FabriqPipeline does not enlist any workers into the work set directly. Instead, message handlers enlist their workers into the work set as the message flows through the pipeline. A “worker” is an implementation of the IWorker interface that can be enlisted into a work set as a participant. Because the pipeline instance along with all message handler instances shall be reusable and shall be capable of processing several messages concurrently, the worker is not implemented on the handler itself. Instead, workers are implemented as a separate helper class (FabriqWorker). Instances of these worker classes are enlisted into the message’s work set. The worker instance gets a reference to the message it deals with and to the handler which enlisted it into the work set; once the worker is called during the 2 phase commit protocol phases, it calls the message handler’s implementation of Prepare/Abort/Commit.

This way, we can have one “all in one place” implementation of all message-handling on the message handler, but are keeping the transaction dependent state in a very lightweight object; therefore we can share the entire (likely complex) pipeline and handlers for many concurrent transactions, because none of the pipeline is made dependent on the message or transaction state.

public abstract class FabriqMessageHandler :
   IMessageHandler
{
   IMessageHandler next = null;
   public FabriqMessageHandler()
   {
   }
  
   public virtual IMessageHandler Next  
   { get { return next; } set { next = value; }}
  
   bool IMessageHandler.Process(Message msg)
   {
      bool result = this.Preprocess(msg);
      WorkSet workSet = msg.Properties["@WorkSet"] as WorkSet;
      if ( workSet != null )
      {
          workSet.Register(new FabriqWorker( msg, this ) );
      }
      return result;           
   }

   protected bool Forward( Message msg )
   {
      if ( next != null )
      {
         return next.Process( msg );
      }
      else
      {
         return false;
      }
   }

   public virtual bool Preprocess( Message msg )
   {
       return false;
   }

   public abstract bool Prepare( bool vote, Message msg );

   public virtual void Commit( Message msg )
   {
   }

   public virtual void Abort( Message msg )
   {
   }
}

 

 

 

When a message flows into the pipeline, all a transactional message handler does when it gets called in ProcessMessage() is to enlist its worker and return. If the handler is not transactional, it must never fail (such things exist), can ignore the whole work set story and simply forward the message to the Next handler. So, in fact, a transactional message handler will never forward the message in the (non-transactional) ProcessMessage() method.

One problem that the dependencies between message handlers create is that it may be impossible to forward a message to the next message handler in the chain before the message is processed; at least you can’t make a Prepare==true promise for the transaction outcome until you’ve done most work on the message and have verified that all resultant work will very likely succeed. Messages may even be transformed into new messages or split into multiple messages inside the pipeline, so that you can’t do anything meaningful until you are at least preparing.

The resulting contradiction is that a transaction participant cannot perform all work resulting from on a message before it is asked to commit work, but that message handlers following in the sequence may not have received the resulting message until then and may not even be enlisted into the transaction.

To resolve this problem, the FABRIQ pipeline’s transaction management is governed by some special transaction handling rules that are more liberal than those of traditional transaction coordinators.

·         During the first (prepare) phase of the 2-phase commit protocol, workers may still enlist into the transaction. This allows a message handler to forward messages to a not-yet-enlisted message handler during the prepare phase. The worker(s) that is/are enlisted by a subsequent handler because the currently preparing message handler is forwarding one (or multiple) messages to it, is/are appended to the list of workers in the work set and asked to prepare their work once the current message handler is done preparing. We call this method a “rolling enlistment” during prepare.

·         Inside the pipeline, messages are considered to be transient data. Therefore, they may be manipulated and passed on during the Prepare phase, independent of the overall transaction outcome. The tail of the transaction controller pipeline (which is the outermost pipeline object) always enlists a worker into the transaction that will only forward messages to outside parties on Commit() and therefore takes care of hiding the transaction work to guarantee isolation.

·         Changes to any resources external to the message (so, anything that is not contained in message properties or message headers) must be guarded by the transaction workers. This means that all usual rules about guarding intermediate transaction state and transaction resources apply: The ability to make changes must be verified by tentative actions during Prepare() and changes may only be finally performed in Commit(). In case the external resources do not permit tentative actions, the Abort() method must take the necessary steps to undo actions performed during Prepare().

Whenever new messages get created during processing, the message properties (which hold the reference to the work set and, hence, to the current transaction) may be propagated into the newly created message, which causes the processing of these messages to be enlisted in the transaction, or a new or no work set can be created so that further processing of these messages is separate from the ongoing transaction. That’s what we do for failure messages.

During prepare, participants can log failure information to a message property called “@FaultInfo” that contains a collection of FaultInfo objects. If message processing fails, this information is logged and is, if possible, relayed to the message sender’s WS Addressing wsa:FaultTo, wsa:ReplyTo or wsa:From destination (in that order of preference) in a SOAP fault message.

For integration with “real” transactions, the entire work set may act as a DTC resource manager. If that’s so, the 2PC management is done by DTC and the work set acts as an aggregating proxy for the workers toward DTC. It collects its vote from its own enlistments and forwards the Commit/Abort to its enlistments.