Skip to content

The search box in the website knows all the secrets—try it!

For any queries, join our Discord Channel to reach us faster.

JasperFx Logo

JasperFx provides formal support for Wolverine and other JasperFx libraries. Please check our Support Plans for more details.

Using Pulsar 3.0

INFO

Fun fact, the Pulsar transport was actually the very first messaging broker to be supported by Jasper/Wolverine, but for whatever reason, wasn't officially released until Wolverine 3.0.

Installing

To use Apache Pulsar as a messaging transport with Wolverine, first install the WolverineFx.Pulsar library via nuget to your project. Behind the scenes, this package uses the DotPulsar client library managed library for accessing Pulsar brokers.

bash
dotnet add WolverineFx.Pulsar

To connect to Pulsar and configure senders and listeners, use this syntax:

cs
var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
    opts.UsePulsar(c =>
    {
        var pulsarUri = builder.Configuration.GetValue<Uri>("pulsar");
        c.ServiceUrl(pulsarUri);
        
        // Any other configuration you want to apply to your
        // Pulsar client
    });

    // Publish messages to a particular Pulsar topic
    opts.PublishMessage<Message1>()
        .ToPulsarTopic("persistent://public/default/one")
        
        // And all the normal Wolverine options...
        .SendInline();

    // Listen for incoming messages from a Pulsar topic
    opts.ListenToPulsarTopic("persistent://public/default/two")
        .SubscriptionName("two")
        .SubscriptionType(SubscriptionType.Exclusive)
        
        // And all the normal Wolverine options...
        .Sequential();

    // Listen for incoming messages from a Pulsar topic with a shared subscription and using RETRY and DLQ queues
    opts.ListenToPulsarTopic("persistent://public/default/three")
        .WithSharedSubscriptionType()
        .DeadLetterQueueing(new DeadLetterTopic(DeadLetterTopicMode.Native))
        .RetryLetterQueueing(new RetryLetterTopic([TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(5)]))
        .Sequential();
});

snippet source | anchor

The topic name format is set by Pulsar itself, and you can learn more about its format in Pulsar Topics.

INFO

Depending on demand, the Pulsar transport will be enhanced to support conventional routing topologies and more advanced topic routing later.

Read Only Subscriptions 3.13

As part of Wolverine's "Requeue" error handling action, the Pulsar transport tries to quietly create a matching sender for each Pulsar topic it's listening to. Great, but that will blow up if your application only has receive-only permissions to Pulsar. In this case, you probably want to disable Pulsar requeue actions altogether with this setting:

cs
var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
    opts.UsePulsar(c =>
    {
        var pulsarUri = builder.Configuration.GetValue<Uri>("pulsar");
        c.ServiceUrl(pulsarUri);
    });

    // Listen for incoming messages from a Pulsar topic
    opts.ListenToPulsarTopic("persistent://public/default/two")
        .SubscriptionName("two")
        .SubscriptionType(SubscriptionType.Exclusive)
        
        // Disable the requeue for this topic
        .DisableRequeue()
        
        // And all the normal Wolverine options...
        .Sequential();

    // Disable requeue for all Pulsar endpoints
    opts.DisablePulsarRequeue();
});

snippet source | anchor

If you have an application that has receive only access to a subscription but not permissions to publish to Pulsar, you cannot use the Wolverine "Requeue" error handling policy.

Subscription behavior when closing connection

By default, the Pulsar transport will automatically close the subscription when the endpoints is being stopped. If the subscription is created for you, and should be kept after application shut down, you can change this behavior.

cs
var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
    opts.UsePulsar(c =>
    {
        var pulsarUri = builder.Configuration.GetValue<Uri>("pulsar");
        c.ServiceUrl(pulsarUri);
    });

    // Disable unsubscribe on close for all Pulsar endpoints
    opts.UnsubscribePulsarOnClose(PulsarUnsubscribeOnClose.Disabled);
});

snippet source | anchor

Global Partitioning

Pulsar topics can be used as the external transport for global partitioned messaging. This creates a set of sharded Pulsar topics with companion local queues for sequential processing across a multi-node cluster.

Use UseShardedPulsarTopics() within a GlobalPartitioned() configuration:

cs
using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        opts.UsePulsar();

        opts.MessagePartitioning.ByMessage<IMyMessage>(x => x.GroupId);

        opts.MessagePartitioning.GlobalPartitioned(topology =>
        {
            // Creates 4 sharded Pulsar topics named "orders1" through "orders4"
            // with matching companion local queues for sequential processing
            topology.UseShardedPulsarTopics("orders", 4);
            topology.MessagesImplementing<IMyMessage>();
        });
    }).StartAsync();

This creates Pulsar topics named orders1 through orders4 with companion local queues global-orders1 through global-orders4. Messages are routed to the correct shard based on their group id, and Wolverine handles the coordination between nodes automatically.

Interoperability

TIP

Also see the more generic Wolverine Guide on Interoperability

Pulsar interoperability is done through the IPulsarEnvelopeMapper interface.

Released under the MIT License.