Apache Kafka
Producer
Eventuous support producing messages to Apache Kafka topics. Currently, there is no schema registry support available when using Eventuous Kafka producer, and all messages are serialized using the default serializer as JSON, or using a custom serializer in any format. The serialized payload is then sent to Kafka topics as a byte array. There's no magic byte appended to the message, so you can use any serializer you want.
Configuration
The producer configuration should be provided in the KafkaProducerOptions
record. It only has one property: ProducerConfig
which is a ProducerConfig
class from the Confluent.Kafka
library.
For example, you can configure and start the producer like this:
var options = new KafkaProducerOptions(
new ProducerConfig { BootstrapServers = "localhost:9092" }
);
await using var producer = new KafkaBasicProducer(options);
await producer.StartAsync(default);
await producer.Produce(new StreamName(topicName), events, new Metadata());
You can also add the producer to the dependency injection container:
var options = new KafkaProducerOptions(
new ProducerConfig { BootstrapServers = "localhost:9092" }
);
builder.Services.AddSingleton(options);
builder.Services.AddProducer<KafkaBasicProducer>();
Because Kafka producer requires lifecycle management, it also registers as a hosted service. When using AddProducer
extension, the producer is registered as a hosted service automatically.
::: tip The producer does not create topics in Kafka. You need to create topics manually before using the producer. :::
Producing messages
There are two ways to produce messages to Kafka: with or without partitioning.
Partition key is provided in KafkaProduceOptions
, which is the optional parameter of the Produce
method. If the partition key is not provided, the message is sent to a random partition.
// Produce withouth partitioning
await producer.Produce(new StreamName(topicName), events, new Metadata());
// Produce with partitioning
await producer.Produce(new StreamName(topicName), events, new Metadata(), new("MyKey");
The events
parameter is a list of ProducedMessage
records. Each record contains the message payload, metadata and message id.
The producer supports publishing with immediate and delayed acknowledgement. The way to produce a message from the provided list is determined by presence of ProducedMessage.OnAck
property. If the property is not set, the message is produced with immediate acknowledgement. If the property is set, the message is produced with delayed acknowledgement, and the OnAck
function will be called when the producer receives an acknowledgement from Kafka. If the broker doesn't acknowledge the message, the producer will call the OnNack
function if it is set in the ProducedMessage
record.
For transmitting message details downstream, the producer sets two default message headers:
message-type
: the type of the message as string, usually coming from the type mappercontent-type
: the content type of the message, set toapplication/json
when using the default serializer
Additional headers are set by the user in the Metadata
record.
Tracing
The Kafka producer creates a span for the whole batch of messages, so every message will get a span id and trace id headers set to the same value for all the messages.
When the batch only contains one message, the producer will also set the message type span tag.