Framework designers usually make sure there is a way to decouple infrastructure code from business code. Usually this comes in the form of hooks that you can use you to inject custom code. The benefit is twofold: you don’t have duplicated code and you don’t break the Single Responsibility Principle. If you’re using ASP.NET Core/MVC/WebApi, then you have ActionFilters. NServiceBus has its own mechanisms that you can use. In this blog post we’ll discuss two ways of hooking into the message processing pipeline that are useful for managing units of work in NServiceBus : IManageUnitsOfWork and the NServiceBus pipeline. There are many use cases where these prove useful. Here are some examples:
- Manage the unit of work (i.e. the session in NHibernate and RavenDB)
- Logging & Monitoring
- Error Handling
- Authorization
You definitely don’t want to add these concerns to your handlers. Here are two approaches that can help you to keep the handlers clean of infrastructural concerns.
Note: I’ll be using NServiceBus version 5 for the code samples.
IManageUnitsOfWork
The first method is to implement the IManageUnitsOfWork interface. This behavior was introduced in NServiceBus 3.0 and allows you to wrap all the handlers in a piece of code. As the names says, managing the unit of work for data stores that support it is a good use case for this interface. For example, you can use this to commit the NHibernate transaction or call SaveChanges on a RavenDB session. If you want to find out more about how to do this, Andreas Öhlund blogged about how to implement a RavenDB Unit of Work for NServiceBus.
The interface is quite simple, with just two methods, Begin and End:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// Interface used by NServiceBus to manage units of work as a part of the | |
/// message processing pipeline. | |
/// </summary> | |
public interface IManageUnitsOfWork | |
{ | |
/// <summary> | |
/// Called before all message handlers and modules | |
/// </summary> | |
void Begin(); | |
/// <summary> | |
/// Called after all message handlers and modules, if an error has occurred the exception will be passed | |
/// </summary> | |
void End(Exception ex = null); | |
} |
The Begin method is called before the first handler. The End method is called after the last handler, passing the exception if one was thrown while processing the message. Here is a basic example that just logs some information:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class LoggerUnitOfWork : IManageUnitsOfWork | |
{ | |
private static readonly ILog logger = LogManager.GetLogger(typeof(LoggerUnitOfWork)); | |
public void Begin() | |
{ | |
logger.Info("Started processing a message."); | |
} | |
public void End(Exception ex = null) | |
{ | |
if (ex != null) | |
{ | |
logger.Error("Finished processing with errors.", ex); | |
} | |
else | |
{ | |
logger.Info("Successfully processed the message."); | |
} | |
} | |
} |
The only thing left to do is to register the implementation into the dependency injection container:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
configuration.RegisterComponents(c => c.ConfigureComponent<LoggerUnitOfWork>(DependencyLifecycle.InstancePerCall)); |
This is an easy way to run code before the first handler and after the last handler in the message processing pipeline. As a small note, although you can define multiple implementations of IManageUnitsOfWork, you cannot order them. If you need ordering, then you can opt for the second option, described in the next section.
NServiceBus Processing Pipeline
NServiceBus 5.0 introduced a new feature that allows you to customize the processing pipeline. This is a better and more powerful abstraction that you can use to hook into the pipeline and add/replace processing steps. As the name says, this is an implementation of the pipeline pattern (which is similar to Chain of Responsibility). This means that you have behaviors wrapping other behaviors, with each behavior doing just one thing. There are two pipelines, one for incoming messages and one for outgoing messages. Several NServiceBus features are built using behaviors (e.g. DataBus and Second Level Retries). As you might guess, the main building block is the IBehavior inteface:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary> | |
/// This is the base interface to implement to create a behavior that can be registered in a pipeline. | |
/// </summary> | |
/// <typeparam name="TContext">The context that this behavior should receive.</typeparam> | |
public interface IBehavior<in TContext> where TContext : BehaviorContext | |
{ | |
/// <summary> | |
/// Called when the behavior is executed. | |
/// </summary> | |
/// <param name="context">The current context.</param> | |
/// <param name="next">The next <see cref="T:NServiceBus.Pipeline.IBehavior`1"/> in the chain to execute.</param> | |
void Invoke(TContext context, Action next); | |
} |
The Invoke method has two parameters:
- Context, used for passing data from parent behaviors to child behaviors and for registering dependencies in the IoC container. The context parameter is generic and must inherit from BehaviorContext. There are two subtypes that you can use, depending on the type of the pipeline: IncomingContext and OutgoingContext.
- Next, which is the next action in the pipeline.
Creating a behavior
So let’s go over a simple example of a logging behavior.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class LoggerBehavior : IBehavior<IncomingContext> | |
{ | |
private static readonly ILog logger = LogManager.GetLogger(typeof(LoggerBehavior)); | |
public void Invoke(IncomingContext context, Action next) | |
{ | |
try | |
{ | |
logger.Info("Entered the behavior."); | |
next(); | |
logger.Info("Exited the behavior."); | |
} | |
catch (Exception ex) | |
{ | |
logger.Error("Finished processing with errors.", ex); | |
} | |
} | |
} |
The behavior implements the IBehavior<IncomingContext> interface, which means this is a behavior for the incoming message pipeline. What’s important here is the call to next(), which triggers the next behavior in the pipeline. We can insert code before, after, or around it (which means you can use using statements). We also have the option of not calling next, which means we’ll stop the execution of the next behaviors in the pipeline.
Registering the behavior
The next step is to register the behavior in the pipeline. First, we need to create a step for the new behavior, so we can define its execution order in the pipeline:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class LoggerStepRegistration : RegisterStep | |
{ | |
public LoggerStepRegistration() | |
: base("LoggerStep", typeof(LoggerBehavior), "Logger Step describes how you can register a new step") | |
{ | |
InsertBefore(WellKnownStep.LoadHandlers); | |
// InsertBeforeIfExists(WellKnownStep.InvokeSaga); | |
// InsertAfter(WellKnownStep.ExecuteUnitOfWork); | |
// InsertAfterIfExists(WellKnownStep.InvokeSaga); | |
} | |
} |
We call the base constructor the RegisterStep class, passing in the step Id, behavior type and a short description for the step. We also have the option of specifying the execution order for the step by inserting it before or after a well known step. It’s good to know that not all steps are necessarily part of the pipeline. For example, if we don’t use Sagas, then the InvokeSaga step won’t exist. For these cases, you can use InsertAfterIfExists or InsertBeforeIfExists. Also, if you’d like your behavior to register a dependency in the IoC container, then you should insert your step before LoadHandlers, as in the example above. This will ensure that dependency will get injected in the handlers.
Now that you have defined the step, you need to register it in the pipeline:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class LoggerStepInitialization : INeedInitialization | |
{ | |
public void Customize(BusConfiguration cfg) | |
{ | |
cfg.Pipeline.Register<LoggerStepRegistration>(); | |
} | |
} |
And that’s it! Your new behavior will run in the incoming message execution pipeline, before the LoadHandlers step.
Conclusion
As you can see, creating and registering a new behavior is more involved and requires a bit more code than implementing IManageUnitsOfWork. But this is a small price to pay if you require more flexibility. Although both options have their use cases, anything you can do with IManageUnitsOfWork can also be done with a custom behavior.
If you’d like to find out more about these topics, you can checkout the official documentation or David Boike’s great book Learning NServiceBus.