Skip to content

Using Local Queueing

Using the Wolverine.IMessageBus service that is automatically registered in your system through the IHostBuilder.UseWolverine() extensions, you can either invoke message handlers inline, enqueue messages to local, in process queues, or schedule message execution within the system. All known message handlers within a Wolverine application can be used from IMessageBus without any additional explicit configuration.

Publishing Messages Locally

The queueing is all based around the TPL Dataflow library objects from the TPL Dataflow library. As such, you have a fair amount of control over parallelization and even some back pressure. These local queues can be used directly, or as a transport to accept messages sent through IMessageBus.SendAsync() or IMessageBus.PublishAsync(). using the application's message routing rules.

This feature is useful for asynchronous processing in web applications or really any kind of application where you need some parallelization or concurrency.

Some things to know about the local queues:

  • Local worker queues can be durable, meaning that the enqueued messages are persisted first so that they aren't lost if the application is shut down before they're processed. More on that below.
  • You can use any number of named local queues, and they don't even have to be declared upfront (might want to be careful with that though)
  • Local worker queues utilize Wolverine's error handling policies to selectively handle any detected exceptions from the message handlers.
  • You can control the priority and parallelization of each individual local queue
  • Message types can be routed to particular queues, but by default Wolverine will route messages to an individual local queue for each message type that is named for the message type name
  • Cascading messages can be used with the local queues
  • The local queues can be used like any other message transport and be the target of routing rules

Explicitly Publish to a Specific Local Queue

If you want to enqueue a message locally to a specific worker queue, you can use this syntax:

cs
public ValueTask EnqueueToQueue(IMessageContext bus)
{
    var @event = new InvoiceCreated
    {
        Time = DateTimeOffset.Now,
        Purchaser = "Guy Fieri",
        Amount = 112.34,
        Item = "Cookbook"
    };

    // Put this message in a local worker
    // queue named 'highpriority'
    return bus.EndpointFor("highpriority").SendAsync(@event);
}

snippet source | anchor

Scheduling Local Execution

TIP

If you need the command scheduling to be persistent or be persisted across service restarts, you'll need to enable the message persistence within Wolverine.

The "scheduled execution" feature can be used with local execution within the same application. See Scheduled Messages for more information. Use the IMessageBus.ScheduleAsync() extension methods like this:

cs
public async Task ScheduleLocally(IMessageContext bus, Guid invoiceId)
{
    var message = new ValidateInvoiceIsNotLate
    {
        InvoiceId = invoiceId
    };

    // Schedule the message to be processed in a certain amount
    // of time
    await bus.ScheduleAsync(message, 30.Days());

    // Schedule the message to be processed at a certain time
    await bus.ScheduleAsync(message, DateTimeOffset.Now.AddDays(30));
}

snippet source | anchor

Explicit Local Message Routing

In the absence of any kind of routing rules, any message enqueued with IMessageBus.PublishAsync() will just be handled by a local queue with the message type name. To override that choice on a message type by message type basis, you can use the [LocalQueue] attribute on a message type:

cs
[LocalQueue("important")]
public class ImportanceMessage;

snippet source | anchor

Otherwise, you can take advantage of Wolverine's message routing rules like this:

cs
using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        // Publish Message2 messages to the "important"
        // local queue
        opts.PublishMessage<Message2>()
            .ToLocalQueue("important");
    }).StartAsync();

snippet source | anchor

The routing rules and/or [LocalQueue] routing is also honored for cascading messages, meaning that any message that is handled inside a Wolverine system could publish cascading messages to the local worker queues.

See message routing rules for more information.

Conventional Local Messaging

Conventional local message routing is applied to every message type handled by the system that does not have some kind of explicit message type routing rule. You can override the message type to local queue configuration with this syntax:

cs
using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        // Out of the box, this uses a separate local queue
        // for each message based on the message type name
        opts.Policies.ConfigureConventionalLocalRouting()

            // Or you can customize the usage of queues
            // per message type
            .Named(type => type.Namespace)

            // Optionally configure the local queues
            .CustomizeQueues((type, listener) => { listener.Sequential(); });
    }).StartAsync();

snippet source | anchor

Disable Conventional Local Routing

Sometimes you'll want to disable the conventional routing to local queues, especially if you want to evenly distribute work across active nodes in an application. To do so, use this syntax:

cs
public static async Task disable_queue_routing()
{
    using var host = await Host.CreateDefaultBuilder()
        .UseWolverine(opts =>
        {
            // This will disable the conventional local queue
            // routing that would take precedence over other conventional
            // routing
            opts.Policies.DisableConventionalLocalRouting();

            // Other routing conventions. Rabbit MQ? SQS?
        }).StartAsync();

snippet source | anchor

Configuring Local Queues

WARNING

The current default is for local queues to allow for parallel processing with the maximum number of parallel threads set at the number of processors for the current machine. Likewise, the queues are unordered by default.

You can configure durability or parallelization rules on single queues or conventional configuration for queues with this usage:

cs
using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        // Explicit configuration for the local queue
        // by the message type it handles:
        opts.LocalQueueFor<Message1>()
            .UseDurableInbox()
            .Sequential();

        // Explicit configuration by queue name
        opts.LocalQueue("one")
            .Sequential();

        opts.LocalQueue("two")
            .MaximumParallelMessages(10)
            .UseDurableInbox();

        // Apply configuration options to all local queues,
        // but explicit changes to specific local queues take precedence
        opts.Policies.AllLocalQueues(x => x.UseDurableInbox());
    }).StartAsync();

snippet source | anchor

Using IConfigureLocalQueue to Configure Local Queues 3.7

INFO

This feature was added in reaction to the newer "sticky" handler to local queue usage, but it's perfectly usable for message types that are happily handled without any "sticky" handler configuration.

The advent of "sticky handlers" or the separated handler mode for better Wolverine usage in modular monoliths admittedly made it a little harder to fine tune the local queue behavior for different message types or message handlers without understanding the Wolverine naming conventions. To get back to leaning more on the type system, Wolverine introduced the static IConfigureLocalQueue interface that can be implemented on any handler type to configure the local queue where that handler would run:

cs
/// <summary>
/// Helps mark a handler to configure the local queue that its messages
/// would be routed to. It's probably only useful to use this with "sticky" handlers
/// that run on an isolated local queue
/// </summary>
public interface IConfigureLocalQueue
{
    static abstract void Configure(LocalQueueConfiguration configuration);
}

snippet source | anchor

TIP

Static interfaces can only be used on non-static types, so even if all your message handler methods are static, the handler type itself cannot be static. Just a .NET quirk.

To use this, just implement that interface on any message handler type:

cs
public class MultipleMessage1Handler : IConfigureLocalQueue
{
    public static void Handle(MultipleMessage message)
    {
        
    }

    // This method is configuring the local queue that executes this
    // handler to be strictly ordered
    public static void Configure(LocalQueueConfiguration configuration)
    {
        configuration.Sequential();
    }
}

snippet source | anchor

Durable Local Messages

The local worker queues can optionally be designated as "durable," meaning that local messages would be persisted until they can be successfully processed to provide a guarantee that the message will be successfully processed in the case of the running application faulting or having been shut down prematurely (assuming that other nodes are running or it's restarted later of course).

Here is an example of configuring a local queue to be durable:

cs
using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        // Make the default local queue durable
        opts.DefaultLocalQueue.UseDurableInbox();

        // Or do just this by name
        opts.LocalQueue("important")
            .UseDurableInbox();
    }).StartAsync();

snippet source | anchor

See Durable Inbox and Outbox Messaging for more information.

Configuring Parallelization and Execution Properties

The queues are built on top of the TPL Dataflow library, so it's pretty easy to configure parallelization (how many concurrent messages could be handled by a queue). Here's an example of how to establish this:

cs
using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        // Force a local queue to be
        // strictly first in, first out
        // with no more than a single
        // thread handling messages enqueued
        // here

        // Use this option if message ordering is
        // important
        opts.LocalQueue("one")
            .Sequential();

        // Specify the maximum number of parallel threads
        opts.LocalQueue("two")
            .MaximumParallelMessages(5);

        // Or just edit the ActionBlock options directly
        opts.LocalQueue("three")
            .ConfigureExecution(options =>
            {
                options.MaxDegreeOfParallelism = 5;
                options.BoundedCapacity = 1000;
            });

        // And finally, this enrolls a queue into the persistent inbox
        // so that messages can happily be retained and processed
        // after the service is restarted
        opts.LocalQueue("four").UseDurableInbox();
    }).StartAsync();

snippet source | anchor

Local Queues as a Messaging Transport

warning

The local transport is used underneath the covers by Wolverine for retrying locally enqueued messages or scheduled messages that may have initially failed.

In the sample Wolverine configuration shown below:

cs
using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        // Publish Message2 messages to the "important"
        // local queue
        opts.PublishMessage<Message2>()
            .ToLocalQueue("important");
    }).StartAsync();

snippet source | anchor

Calling IMessageBus.SendAsync(new Message2()) would publish the message to the local "important" queue.

Released under the MIT License.