Recently, I stumbled upon an interesting problem related to NServiceBus Saga concurrency and coarse-grained locks when using NHibernate Persistence.
With NServiceBus, concurrent access to an existing saga instance works out of the box. This is because it relies on the underlying database system. With NHibernate, it relies on optimistic concurrency, by adding a Version column to your Saga Data. But there is a catch: NHibernate doesn’t support Coarse-Grained Locks. Let’s first explore this general problem and then we’ll get back to the NServiceBus Saga.
Coarse-Grained Locks
With a coarse-grained lock you basically lock a group of entities together. If you’re familiar with DDD, then you know that an aggregate should be a consistency boundary. When I save an aggregate, I want to save the entire aggregate, to ensure it’s consistent. Why would you need this? Here’s an example:
Let’s take the already familiar example of an Order with OrderLines, Order being the aggregate root. We’re using optimistic concurrency control for each of our aggregate parts. In our domain there is an invariant that dictates that the total number of items in an order can’t be more than 10. Two commands that would increase the quantity could be processed in parallel and break the invariant. This is because in each transaction, the invariant would hold and each transaction would commit, since they update different lines. Now you’re in an inconsistent state.
If we could lock the entire aggregate (order and order lines), then the second transaction would fail and rollback, maintaining a consistent state. One way to achieve this when using optimistic concurrency is to update the root’s version whenever an aggregate part is updated.
Use Listeners to lock the root
There are some posts, stackoverflow questions and forum threads on how to update the parent’s version when a child is modified. Even though most of them are pretty old, there is no definitive answer. Ayende Rahien proposes probably the most well known solution to this problem. He discusses it in a couple of blog posts. The first touches on how you could lock the aggregate root:
session.Lock(aggregateRoot, LockMode.Force);
This basically forces a version increment for versioned entities. The questions is: how do you know when to call this? One option could be to have a coding guideline to increment the version every time you load an aggregate for updating. The problem is that you can get some nasty bugs if you forget to call this. The second post tackles this problem. The proposed solution uses NHibernate event listners and on every aggregate part update, finds the aggregate root and forces a version increment.
The blog post also notes the constraints of this approach. Two of them are inherent to DDD: the relation between aggregates and contained entities must be explicit and you access an aggregate part only by traversing from the root.
The third however is more restrictive in my view. It dictates that you need to have a direct reference from the aggregate part to the aggregate root. Since the aggregate root needs to know its parts, this basically means that we need to have bidirectional associations across the entire aggregate for this to work. I prefer to use a single traversal direction if possible. Even more, a direct reference means you need to add a reference to the aggregate root, even if it’s not the direct parent of the entity:
public interface IAggregateRoot { } public interface IAggregatePart { IAggregateRoot Root { get; } }
So it seems that there isn’t a general solution to the coarse-grained locking problem in NHibernate. You need to solve it by either:
- making it explicit in your domain logic
- update a property on the root every time you update an aggregate part
- have a special method for loading an aggregate for writes
- add extra relations in your domain to be able to traverse from the aggregate part to the aggregate root
But, the truth is, I don’t need a general solution. I need to solve the problem of NServiceBus Saga concurrency and coarse-grained locks when using NHibernate Persistence.
NServiceBus Saga concurrency and coarse-grained locks when using NHibernate Persistence
Fortunately, in my case, the problem was a bit simpler and more specific. I have a Process Manager which, when started, sends a small number of requests to some other components to do some tasks. Since these tasks might take some time, I send them all at once, so they can be processed in parallel. When I get a response back, I mark the task as completed and check if all tasks have been processed. (This is a simplified version, I actually need to send other messages and have multiple task types).
This is the saga data:
public class SagaData : IContainSagaData { public int Version { get; set; } public List<LongRunningTask> Tasks { get; set; } public Guid Id { get; set; } public string Originator { get; set; } public string OriginalMessageId { get; set; } } public class LongRunningTask { public Guid Id { get; set; } public Status Status { get; set; } public string Result { get; set; } } public enum Status { InProgress, Completed }
And here is the saga code:
public class Saga : Saga<SagaData>, IAmStartedByMessages<StartSagaCommand>, IHandleMessages<DoTaskResponse> { public void Handle(StartSagaCommand message) { Data.Tasks = new List<LongRunningTask>(); foreach (var tasks in message.Tasks) { var newTask = new LongRunningTask { Status = Status.InProgress }; Data.Tasks.Add(newTask); var doTaskRequest = new DoTaskRequest { TaskId = newTask.Id }; Bus.Send(doTaskRequest); } } public void Handle(DoTaskResponse message) { var task = Data.Tasks.Single(t => t.Id == message.TaskId); task.Status = Status.Completed; task.Result = message.Result; if (Data.Tasks.All(t => t.Status == Status.Completed)) { Bus.Send(new DoSomeOtherStuffCommand(Data)); MarkAsComplete(); } } }
The question is: what happens if we process the last two task responses concurrently? The completion condition highlighted above will be false for both messages. But since the DoTaskResponse handler doesn’t update the saga data (the parent), just the task (the child), will both transactions commit? If this is the case, then this saga instance will never send the DoSomeOtherStuffCommand.
Pessimistic locking of the saga instance
The good news is that you won’t have this problem if you use the default settings. This is because since Version 4.1.0, the NHibernate saga persister uses an additional pessimistic concurrency control. This means that it puts a lock on the saga data when it fetches it. Processing the second message will have to wait until the first transaction commits. So if you don’t adjust the lock strategy to Read, you should be OK. Thus, the additional pessimistic lock would ensure consistency, even if the optimistic lock fails to update the parent’s version on a child update.
Force a version upgrade on the Saga Data
I was still wondering if there is a simple way to get the optimistic lock control to work like I wanted. One option would be to always force a version upgrade when handling a message. This isn’t an ideal solution if you’re handling messages that don’t update the saga data. Since in my case I do modify the saga data whenever I’m processing a message, this approach is OK.
The NServiceBus framework always calls Save on the saga data class, so I hooked up in the Save event listener:
public class ForceVerionUpgradeOnSagaDataListner : ISaveOrUpdateEventListener { public void OnSaveOrUpdate(SaveOrUpdateEvent @event) { if (@event.Entity is IContainSagaData && !@event.Session.IsDirtyEntity(@event.Entity)) { @event.Session.Lock(@event.Entity, LockMode.Force); } } }
As you can see, I do this only for the parent saga data. Also, to prevent the multiple update per transaction issue that Ayende highlighted, I upgrade the version only if the saga data is not dirty. If it is dirty, it would be upgraded anyway.
Checking if an entity is dirty
IsDirtyEntity is an extension method that checks if an entity has changed after it has been loaded into the session. My NHibernate skills are not good enough to implement such a method from scratch. I’ve searched for a way to do this and, to my surprise, I didn’t find a definitive answer. The first version I found is on NHibernate’s site and does not work because of the following line:
Int32[] dirtyProps = oldState .Select((o, i) => (oldState[i] == currentState[i]) ? -1 : i) .Where(x => x >= 0) .ToArray();
If the property is a value type, this check will actually compare references to the boxed values, so it will always be false. The second version of this method is on nhforge and replaces the line above with:
Int32 [] dirtyProps = persister.FindDirty(currentState, oldState, entity, sessionImpl);
This fixes the boxing issue, but has another caveat: it won’t detect changes (add/remove) to a collection. The third version, found on stackoverflow fixes this issue:
public static bool IsDirtyEntity(this ISession session, object entity) { var sessionImplementation = session.GetSessionImplementation(); var persistenceContext = sessionImplementation.PersistenceContext; var entityPersister = sessionImplementation.GetEntityPersister(null, entity); var entityEntry = persistenceContext.GetEntry(entity); if ((entityEntry == null) && entity is INHibernateProxy) { var proxy = (INHibernateProxy)entity; var obj = sessionImplementation.PersistenceContext.Unproxy(proxy); entityEntry = sessionImplementation.PersistenceContext.GetEntry(obj); } object[] oldState = entityEntry.LoadedState; object[] currentState = entityPersister.GetPropertyValues(entity, sessionImplementation.EntityMode); int[] findDirty = entityEntry.Persister.FindDirty(currentState, oldState, entity, sessionImplementation); var hasDirtyCollection = currentState.OfType<IPersistentCollection>().Any(x => x.IsDirty); return (findDirty != null) || hasDirtyCollection; }
So this is the version I use. If you can spot any issue with it, please let me know.
Registering the listener
So the only thing left to do is to register the listener:
nhConfiguration.AppendListeners(ListenerType.Update, new[] { new ForceVerionUpgradeOnSagaDataListner() });
Now, the system will be consistent even if we don’t use the default pessimistic locking strategy and rely only on the optimistic locking control. If we process the last two responses in parallel, the first will commit (and update the version on the saga data). The second transaction will fail (with a StaleObjectStateException) and rollback. NServiceBus’s retry mechanism will ensure the message is retried and processed successfully.
As a note, there is no good reason to disable the pessimistic concurrency control. So you probably don’t need to force a version upgrade on the saga data.
Conclusion
I must admit, I assumed that NHibernate supports coarse-grained locking out of the box. I’m wondering how are people ensuring aggregate consistency when using NHibernate?
Fortunately, the problem I had was more specific, since it’s only about ensuring NServiceBus saga consistency when using NHibernate for persistence. So, in this particular case, the default pessimistic locking does its job, so no extra work is required. If you still want to update the saga data’s version on a child update, then you could force it.
Another solution would be to check for saga completion on a timeout, not when processing a message. Jimmy Bogard has a great blog post on how to reduce NServiceBus saga load by using this approach. This would actually be a better solution, if there was a big number of message. In my case, the number of tasks was limited, so I didn’t want to introduce the timeout.
If you can think of any other solution, please leave a comment. I’m definitely interested in seeing a better one!