Skip to content

Durable Messaging

INFO

A major goal of Wolverine 4.0 is to bring the EF Core integration capabilities (including multi-tenancy support) up to match the current integration with Marten, add event sourcing support for SQL Server, and at least envelope storage integration with CosmosDb.

Wolverine can integrate with several database engines and persistence tools for:

  • Durable messaging through the transactional inbox and outbox pattern
  • Transactional middleware to simplify your application code
  • Saga persistence
  • Durable, scheduled message handling
  • Durable & replayable dead letter queueing
  • Node and agent assignment persistence that is necessary for Wolverine to do agent assignments (its virtual actor capability)

Transactional Inbox/Outbox

See the blog post Transactional Outbox/Inbox with Wolverine and why you care for more context.

One of Wolverine's most important features is durable message persistence using your application's database for reliable "store and forward" queueing with all possible Wolverine transport options, including the lightweight TCP transport and external transports like the Rabbit MQ transport.

It's a chaotic world out when high volume systems need to interact with other systems. Your system may fail, other systems may be down, there's network hiccups, occasional failures -- and you still need your systems to get to a consistent state without messages just getting lost en route.

Consider this sample message handler from Wolverine's AppWithMiddleware sample project:

cs
[Transactional]
public static async Task Handle(
    DebitAccount command,
    Account account,
    IDocumentSession session,
    IMessageContext messaging)
{
    account.Balance -= command.Amount;

    // This just marks the account as changed, but
    // doesn't actually commit changes to the database
    // yet. That actually matters as I hopefully explain
    session.Store(account);

    // Conditionally trigger other, cascading messages
    if (account.Balance > 0 && account.Balance < account.MinimumThreshold)
    {
        await messaging.SendAsync(new LowBalanceDetected(account.Id));
    }
    else if (account.Balance < 0)
    {
        await messaging.SendAsync(new AccountOverdrawn(account.Id), new DeliveryOptions{DeliverWithin = 1.Hours()});

        // Give the customer 10 days to deal with the overdrawn account
        await messaging.ScheduleAsync(new EnforceAccountOverdrawnDeadline(account.Id), 10.Days());
    }

    // "messaging" is a Wolverine IMessageContext or IMessageBus service
    // Do the deliver within rule on individual messages
    await messaging.SendAsync(new AccountUpdated(account.Id, account.Balance),
        new DeliveryOptions { DeliverWithin = 5.Seconds() });
}

snippet source | anchor

The handler code above is committing changes to an Account in the underlying database and potentially sending out additional messages based on the state of the Account. For folks who are experienced with asynchronous messaging systems who hear me say that Wolverine does not support any kind of 2 phase commits between the database and message brokers, you’re probably already concerned with some potential problems in that code above:

  • Maybe the database changes fail, but there are “ghost” messages already queued that pertain to data changes that never actually happened
  • Maybe the messages actually manage to get through to their downstream handlers and are applied erroneously because the related database changes have not yet been applied. That’s a race condition that absolutely happens if you’re not careful (ask me how I know 😦 )
  • Maybe the database changes succeed, but the messages fail to be sent because of a network hiccup or who knows what problem happens with the message broker

What you need is to guarantee that both the outgoing messages and the database changes succeed or fail together, and that the new messages are not actually published until the database transaction succeeds. To that end, Wolverine relies on message persistence within your application database as its implementation of the Transactional Outbox pattern. Using the "outbox" pattern is a way to avoid the need for problematic and slow distributed transactions while still maintaining eventual consistency between database changes and the outgoing messages that are part of the logical transaction. Wolverine implementation of the outbox pattern also includes a separate message relay process that will send the persisted outgoing messages in background processes (it's done by marshalling the outgoing message envelopes through TPL Dataflow queues if you're curious.)

If any node of a Wolverine system that uses durable messaging goes down before all the messages are processed, the persisted messages will be loaded from storage and processed when the system is restarted. Wolverine does this through its DurabilityAgent that will run within your application through Wolverine's IHostedService runtime that is automatically registered in your system through the UseWolverine() extension method.

TIP

At the moment, Wolverine only supports Postgresql, Sql Server, and RavenDb as the underlying database and either Marten or Entity Framework Core as the application persistence framework.

There are four things you need to enable for the transactional outbox (and inbox for incoming messages):

  1. Set up message storage in your application, and manage the storage schema objects -- don't worry though, Wolverine comes with a lot of tooling to help you with that
  2. Enroll outgoing subscriber or listener endpoints in the durable storage at configuration time
  3. Enable Wolverine's transactional middleware or utilize one of Wolverine's outbox publishing services

The last bullet point varies a little bit between the Marten integration and the EF Core integration, so see the the specific documentation on each for more details.

Using the Outbox for Outgoing Messages

TIP

It might be valuable to leave some endpoints as "buffered" or "inline" for message types that have limited lifetimes. See the blog post Ephemeral Messages with Wolverine for an example of this.

To make the Wolverine outbox feature persist messages in the durable message storage, you need to explicitly make the outgoing subscriber endpoints (Rabbit MQ queues or exchange/binding, Azure Service Bus queues, TCP port, etc.) be configured to be durable.

That can be done either on specific endpoints like this sample:

cs
using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        opts.PublishAllMessages().ToPort(5555)

            // This option makes just this one outgoing subscriber use
            // durable message storage
            .UseDurableOutbox();
    }).StartAsync();

snippet source | anchor

Or globally through a built in policy:

cs
using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        // This forces every outgoing subscriber to use durable
        // messaging
        opts.Policies.UseDurableOutboxOnAllSendingEndpoints();
    }).StartAsync();

snippet source | anchor

Using the Inbox for Incoming Messages

On the incoming side, external transport endpoint listeners can be enrolled into Wolverine's transactional inbox mechanics where messages received will be immediately persisted to the durable message storage and tracked there until the message is successfully processed, expires, discarded due to error conditions, or moved to dead letter storage.

To enroll individual listening endpoints or all listening endpoints in the Wolverine inbox mechanics, use one of these options:

cs
using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        opts.ListenAtPort(5555)

            // Make specific endpoints be enrolled
            // in the durable inbox
            .UseDurableInbox();

        // Make every single listener endpoint use
        // durable message storage
        opts.Policies.UseDurableInboxOnAllListeners();
    }).StartAsync();

snippet source | anchor

Local Queues

When you mark a local queue as durable, you're telling Wolverine to store every message published to that queue be stored in the backing message database until it is successfully processed. Doing so makes even the local queues be able to guarantee eventual delivery even if the current node where the message was published fails before the message is processed.

To configure individual or set durability on local queues by some kind of convention, consider these possible usages:

cs
using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        opts.Policies.UseDurableLocalQueues();

        // or

        opts.LocalQueue("important").UseDurableInbox();

        // or conventionally, make the local queues for messages in a certain namespace
        // be durable
        opts.Policies.ConfigureConventionalLocalRouting().CustomizeQueues((type, queue) =>
        {
            if (type.IsInNamespace("MyApp.Commands.Durable"))
            {
                queue.UseDurableInbox();
            }
        });
    }).StartAsync();

snippet source | anchor

Released under the MIT License.