Using Kafka
WARNING
The Kafka transport does not really support the "Requeue" error handling policy in Wolverine. "Requeue" in this case becomes effectively an inline "Retry"
Installing
To use Kafka as a messaging transport with Wolverine, first install the Wolverine.Kafka library via nuget to your project. Behind the scenes, this package uses the Confluent.Kafka client library managed library for accessing Kafka brokers.
dotnet add WolverineFx.KafkaThe configuration in `ConfigureConsumer()` for each topic completely overwrites any previous configurationTo connect to Kafka, use this syntax:
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka("localhost:9092")
// See https://github.com/confluentinc/confluent-kafka-dotnet for the exact options here
.ConfigureClient(client =>
{
// configure both producers and consumers
})
.ConfigureConsumers(consumer =>
{
// configure only consumers
})
.ConfigureProducers(producer =>
{
// configure only producers
})
.ConfigureProducerBuilders(builder =>
{
// there are some options that are only exposed
// on the ProducerBuilder
})
.ConfigureConsumerBuilders(builder =>
{
// there are some Kafka client options that
// are only exposed from the builder
})
.ConfigureAdminClientBuilders(builder =>
{
// configure admin client builders
});
// Just publish all messages to Kafka topics
// based on the message type (or message attributes)
// This will get fancier in the near future
opts.PublishAllMessages().ToKafkaTopics();
// Or explicitly make subscription rules
opts.PublishMessage<ColorMessage>()
.ToKafkaTopic("colors")
// Override the producer configuration for just this topic
.ConfigureProducer(config =>
{
config.BatchSize = 100;
config.EnableGaplessGuarantee = true;
config.EnableIdempotence = true;
});
// Listen to topics
opts.ListenToKafkaTopic("red")
.ProcessInline()
// Override the consumer configuration for only this
// topic
// This is NOT combinatorial with the ConfigureConsumers() call above
// and completely replaces the parent configuration
.ConfigureConsumer(config =>
{
// This will also set the Envelope.GroupId for any
// received messages at this topic
config.GroupId = "foo";
config.BootstrapServers = "localhost:9092";
// Other configuration
});
opts.ListenToKafkaTopic("green")
.BufferedInMemory();
// This will direct Wolverine to try to ensure that all
// referenced Kafka topics exist at application start up
// time
opts.Services.AddResourceSetupOnStartup();
}).StartAsync();The various Configure*****() methods provide quick access to the full API of the Confluent Kafka library for security and fine tuning the Kafka topic behavior.
Publishing by Partition Key
To publish messages with Kafka using a designated partition key, use the DeliveryOptions to designate a partition like so:
public static ValueTask publish_by_partition_key(IMessageBus bus)
{
return bus.PublishAsync(new Message1(), new DeliveryOptions { PartitionKey = "one" });
}Interoperability
TIP
Also see the more generic Wolverine Guide on Interoperability
It's a complex world out there, and it's more than likely you'll need your Wolverine application to interact with system that aren't also Wolverine applications. At this time, it's possible to send or receive raw JSON through Kafka and Wolverine by using the options shown below in test harness code:
_receiver = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
//opts.EnableAutomaticFailureAcks = false;
opts.UseKafka("localhost:9092").AutoProvision();
opts.ListenToKafkaTopic("json")
// You do have to tell Wolverine what the message type
// is that you'll receive here so that it can deserialize the
// incoming data
.ReceiveRawJson<ColorMessage>();
opts.Services.AddResourceSetupOnStartup();
opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "kafka");
opts.Services.AddResourceSetupOnStartup();
opts.Policies.UseDurableInboxOnAllListeners();
}).StartAsync();
_sender = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka("localhost:9092").AutoProvision();
opts.Policies.DisableConventionalLocalRouting();
opts.Services.AddResourceSetupOnStartup();
opts.PublishAllMessages().ToKafkaTopic("json")
// Just publish the outgoing information as pure JSON
// and no other Wolverine metadata
.PublishRawJson(new JsonSerializerOptions());
}).StartAsync();Instrumentation & Diagnostics 3.13
When receiving messages through Kafka and Wolverine, there are some useful elements of Kafka metadata on the Wolverine Envelope you can use for instrumentation or diagnostics as shown in this sample middleware:
public static class KafkaInstrumentation
{
// Just showing what data elements are available to use for
// extra instrumentation when listening to Kafka topics
public static void Before(Envelope envelope, ILogger logger)
{
logger.LogDebug("Received message from Kafka topic {TopicName} with Offset={Offset} and GroupId={GroupId}",
envelope.TopicName, envelope.Offset, envelope.GroupId);
}
}Connecting to Multiple Brokers 4.7
Wolverine supports interacting with multiple Kafka brokers within one application like this:
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka("localhost:9092");
opts.AddNamedKafkaBroker(new BrokerName("americas"), "americas-kafka:9092");
opts.AddNamedKafkaBroker(new BrokerName("emea"), "emea-kafka:9092");
// Just publish all messages to Kafka topics
// based on the message type (or message attributes)
// This will get fancier in the near future
opts.PublishAllMessages().ToKafkaTopicsOnNamedBroker(new BrokerName("americas"));
// Or explicitly make subscription rules
opts.PublishMessage<ColorMessage>()
.ToKafkaTopicOnNamedBroker(new BrokerName("emea"), "colors");
// Listen to topics
opts.ListenToKafkaTopicOnNamedBroker(new BrokerName("americas"), "red");
// Other configuration
}).StartAsync();Note that the Uri scheme within Wolverine for any endpoints from a "named" Kafka broker is the name that you supply for the broker. So in the example above, you might see Uri values for emea://colors or americas://red.

