This is the multi-page printable view of this section. Click here to print.
Subscriptions
- 1: Real-time subscription
- 2: Checkpoints
- 3: Subscription base
- 4: Consume pipe
- 5: Event handlers
- 6: Diagnostics
- 7: EventStoreDB
- 7.1: All stream subscription
- 7.2: Stream subscription
- 7.3: Persistent subscription
- 8: Google PubSub
- 9: RabbitMQ
1 - Real-time subscription
Real-time subscriptions are essential when building a successful event-sourced system. They support the following sequence of operations:
- Domain model emits new event
- The event is stored to the event store
- The command is now handled, and the transaction is complete
- All the read models get the new event from the store and project it if necessary
Most of the time, subscriptions are used for two purposes:
- Deliver events to reporting models
- Emit integration events
Subscriptions can subscribe from any position of a stream, but normally you’d subscribe from the beginning of the stream, which allows you to process all the historical events. For integration purposes, however, you would usually subscribe from now, as emitting historical events to other systems might produce undesired side effects.
One subscription can serve multiple event handlers. Normally, you would identify a group of event handlers, which need to be triggered together, and group them within one subscription. This way you avoid situations when, for example, two real models get updated at different speed and show confusing information if those read models are shown on the same screen.
Example (real life horror story)
Imagine an event stream, which contains positions of a car as well as the car state changes, like Parked
, Moving
or Idle
. The system also has two independent subscriptions serve two read model projections - one is the last know car location, and the other one is the car state. Those subscriptions will, with great probability, come out of sync, simply because position events are much more frequent than state updates. When using both of those read models on the map, you easily get a situation when the car pointer is moving, whilst the car is shown as parked.
By combining those two projections in one subscription, they could be both behind with updates, but the user experience will be much better as it will never be confusing. We could also say that those two projections belong to the same projections group.
Subscriptions need to maintain their own checkpoints, so when the service that host a subscription restarts, it will start receiving events from the last known position in the stream.
Most often, you’d want to subscribe to the global event stream, so you can build read models, which compose information from different aggregates. Eventuous offers the All stream subscription for this use case. In some cases you’d need to subscribe to a regular stream using the stream subscription.
In Eventuous, subscriptions are specific to event store implementation. In addition, Eventuous has subscriptions to message brokers like Google PubSub and RabbitMQ for integration purposes.
The wrong way
One of the common mistakes people make when building an event-sourced application is using an event store which is not capable of handling realtime subscriptions. It forces developers to engage some sort of message bus to deliver new events to subscribers. There are quite a few issues with that approach, but the most obvious one is a two-phase commit.
When using two distinct pieces of infrastructure in one transaction, you risk one of those operations to fail. Let’s use the following example code, which is very common:
await _repository.Save(newEvents);
await _bus.Publish(newEvents);
If the second operation fails, the command side of the application would remain consistent. However, any read models, which projects those events, will not be updated. So, essentially, the reporting model will become inconsistent against the transactional model. The worst part is that the reporting model will never recover from the failure.
As mentioned, there are multiple issues of using a message bus as transport to deliver events to reporting models, but we won’t be covering them on this page.
The easiest way to solve the issue is to use a database which supports realtime subscriptions to event streams out of the box. That’s why we use EventStoreDB as the primary event store implementation.
The right way
It is not hard to avoid the two-phase commits and ensure to publish domain events reliably if you use a proper event store. The aim here would be to achieve the following architecture:

Consistent event flow
Why is this flow “consistent”? It’s because the command handling process always finishes with an append operation towards event store (unless the command fails). There’s no explicit Publish
or Produce
call that happens after the event is persisted. A proper event store should have a capability for to subscribe for getting all the new events when they appear in the store. As the Append
operation of the event store is transactional, such a subscription will get the event only once, if it is capable to acknowledge the event correctly back to the subscription.
Subscriptions have many purposes. Most often, you would use a subscription to project domain events to your query models (read models). You can also use subscriptions to transform and publish integration events (see Gateway).
Subscriptions can also be used for integration purposes in combination with Producers.
Implementation
Eventuous implement subscriptions for different kinds of infrastructure (transport). As each transport is different, the way to configure them can be different, as well as the way subscriptions work. Read carefully the guidelines for each transport to understand what transport to use for your use case.
2 - Checkpoints
When you subscribe to an event store, you need to decide what events you want to receive. A proper event store would allow you to subscribe to any event stream, or to a global stream (All stream), which contains all the events from the store, ordered by the time they were appended. Event-oriented brokers that support persistence as ordered event logs also support subscriptions, normally called consumers, as it’s the publish-subscribe broker terminology.
The subscription decides at what stream position it wants to start receiving events. If you want to process all the historical events, you’d subscribe from the beginning of the stream. If you want to only receive real-time events, you need to subscribe from now.
What’s the checkpoint?
As the subscription receives and processes events, it moves further along the stream it subscribed to. Every event the subscription receives and processes has a position in the subscribed stream. This position can be used as a checkpoint of the subscription. If the application that hosts the subscription eventually shuts down, you’d want the subscription to resubscribe from the position, which was processed last, plus one. That’s how you achieve exactly one event handling. Therefore, the subscription needs to take care about storing its checkpoint somewhere, so the last known position can be retrieved from the checkpoint store and used to resubscribe.
Some log-based brokers also use the term offset to describe the checkpoint concept.
Subscriptions, which are managed by the server, don’t require storing checkpoints on the client side. For example, EventStoreDB persistent subscriptions and Google PubSub subscriptions don’t require a client-side checkpoint store. Some subscriptions, like RabbitMQ subscription, don’t have this concept at all, as RabbitMQ doesn’t keep consumed messages, neither ACKed nor NACKed.
Checkpoint store
Eventuous provides an abstraction, which allows subscriptions to store checkpoints reliably. You can decide to store it in a file or in a database. You can also decide if you want to store a checkpoint after processing each event, or only flush it now and then. Periodical checkpoint flush decreases the pressure on the infrastructure behind the checkpoint store, but also requires you to make your subscription idempotent. It’s usually hard or impossible for integration since you can rarely check if you published an event to a broker or not. However, it can work for read model projections.
π± Keep the checkpoint safe
Abstraction
The checkpoint store interface is simple, it only has two functions:
interface ICheckpointStore {
ValueTask<Checkpoint> GetLastCheckpoint(
string checkpointId,
CancellationToken cancellationToken
);
ValueTask<Checkpoint> StoreCheckpoint(
Checkpoint checkpoint,
CancellationToken cancellationToken
);
}
The Checkpoint
record is a simple record, which aims to represent a stream position in any kind of event store:
record Checkpoint(string Id, ulong? Position);
Available stores
If a supported projection type in an Eventuous package for projections (Eventuous.Projections.*
) requires a checkpoint store, you can find its implementation in that package. For example, the Eventuous.Projections.MongoDB
package has a checkpoint store implementation for MongoDB.
If you register subscriptions in the DI container, you also need to register the checkpoint store:
builder.Services.AddSingleton<IMongoDatabase>(Mongo.ConfigureMongo());
builder.Services.AddCheckpointStore<MongoCheckpointStore>();
The MongoDB checkpoint store will create a collection called checkpoint
where it will keep one document per subscription.
In addition to that, Eventuous has two implementations in the core subscriptions package:
MeasuredCheckpointStore
: creates a trace for all the IO operations, wraps an existing storeNoOpCheckpointStore
: does nothing, used in Eventuous tests
The measured store is used by default if Eventuous diagnostics aren’t disabled, and you use the AddCheckpointStore
container registration extension.
Checkpoint commit handler
In addition to checkpoint store, Eventuous has a more advanced way to work with checkpoints. It doesn’t load or store checkpoints by itself, for that purpose it uses the provided checkpoint store. However, the commit handler is able to receive a stream of unordered checkpoints, reorder them, detect possible gaps, and only store the checkpoint that is the latest before the gap.
For subscriptions that support delayed consume (see Partitioning filter) and require a checkpoint store, you must use the commit handler. All such subscription types provided by Eventuous use the checkpoint commit handler.
Unless you create your own subscription with such requirements, you don’t need to know the internals of the commit handler. However, you would benefit to know the consequences of delayed event processing with supported subscriptions.
When events get partitioned by the filter, several consumer instances process events in parallel. As a result, each partition will get checkpoints with gaps. When partitioned consumers process events, they run at different speed. Each event inside DelayedConsumeContext
is explicitly acknowledged, and when it happens, the checkpoint gets to the commit handler queue. The commit handler then is able to accumulate checkpoints, detect gaps in the sequence, and only store the latest checkpoint in a gap-less sequence.

Gap in the commit handler queue
As we talk about gaps, you might face a situation when the commit handler has a list of uncommitted checkpoints with gaps, and the application stops. When this happens, some events were already processed, whilst checkpoints for those events remain in-flight. When the application restarts, it loads the checkpoint that points to some position in the stream that is earlier than positions of already processed events. Because of that, some events will be processed by event handlers again. Therefore, you need to make sure that your event handlers are idempotent, so when the same events are processed again, the result of the processing won’t create any undesired side effects.
3 - Subscription base
The base abstract class for subscriptions is the IMessageSubscription
interface, but all the available subscriptions are based on the EventSubscription
base class, which is a generic abstract class where its type parameter is the subscription options type. All the provided subscription options types inherit from the SubscriptionOptions
base class.
The SubscriptionOptions
base class has three properties:
SubscriptionId
: a unique identifier for the subscriptionThrowOnError
: a boolean indicating whether the subscription should throw an exception if an error occurs. When the subscription throws, it either NACKs the message, or stops the subscription depending on the implementation.EventSerializer
: an instance of theIEventSerializer
interface, which is used to serialize and deserialize events. If not provided, the default serializer is used.
Each provided subscription options type has more options, which depend on the subscription implementation details.
To host a subscription and manage its lifecycle, Eventuous has a hosted service called SubscriptionHostedService
. Each registered subscription gets its own hosted service, so that each subscription can be managed independently. When using Eventuous subscription registration extensions for the DI container, the hosted service is registered automatically.
You’d normally use the DI container to register subscriptions with all the necessary handlers (described below).
Event handlers
As mentioned on the Concept page, one subscription might serve multiple event handlers, such as projections. It is especially relevant to keep a group of projections in sync, so they don’t produce inconsistent read models.
Each subscription service gets a list of event handlers. An event handler must implement the IEventHandler
interface, which has two members:
public interface IEventHandler {
string DiagnosticName { get; }
ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContext context);
}
The HandleEvent
function will be called by the subscription service for each event it receives. The event is already deserialized. The function also gets the event position in the stream. It might be used in projections to set some property of the read model. Using this property in queries will tell you if the projection is up-to-date.
π» Event handler failures
ThrowIfError
property of SubscriptionOptions
to false
.
The diagnostic name of the handler is used to distinguish logs in traces coming from a subscription per individual handler.
Normally Eventuous uses either the BaseEventHandler
abstract base class. For specific implementations, you’d either use a built-in handler provided by a projection (like MongoDB projection), or the EventHandler
abstract base class.
Consume context
The subscription will invoke all its event handlers at once for each event received. Instead of just getting the event, the handler will get an instance of the message context (IMessageConsumeContext
interface). The context contains the payload (event or other message) in its Message
property, which has the type object?
. It’s possible to handle each event type differently by using pattern matching:
public ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContext ctx) {
return ctx.Message switch {
V1.RoomBooked => ...
_ => EventHandlingStatus.Ignored
};
}
However, it’s easier and more explicit to use pre-optimised base handlers. For read model projections you can use [Projections] handlers, described separately. For integration purposes you might want to use the Gateway. For more generic needs, Eventuous offers the EventHandler
base class. It allows specifying typed handlers for each of the event types that the handler processes:
public class MyHandler : EventHandler {
public MyHandler(SmsService smsService) {
On<RoomBooked>(async ctx => await smsService.Send($"Room {ctx.Message.RoomId} booked!"));
}
}
The typed handler will get an instance of MessageConsumeContext<T>
where T
is the message type. There, you can access the message using the Message
property without casting it.
A subscription will invoke a single consumer and give it an instance of the consume context. The consumer’s job is to handle the message by invoking all the registered handlers. By default, Eventuous uses the DefaultConsumer
, unless specified otherwise when registering the subscription. The IMessageConsumeContext
interface has functions to acknowledge (ACK), not acknowledge (NACK), or ignore the message. The default consumer ACKs the message when all the handlers processed the message without failures, and at least one handler didn’t ignore the message. It NACKs the message if any handler returned an error or crashed. Finally, it will ignore the message if all the handlers ignored it. How the message handling result is processed is unknown to the consumer as this behaviour is transport-specific. Each subscription type has its own way to process the message handling result.
Handling result
The handler needs to return the handling status. It’s preferred to return the error status EventHandlingStatus.Failure
instead of throwing an exception. When using the EventHandler
base class, if the event handling function throws an exception, the handler will return the failure status and not float the exception back to the subscription.
The status is important for diagnostic purposes. For example, you don’t want to trace event handlers for events that are ignored. That’s why when you don’t want to process the event, you need to return EventHandlingStatus.Ignored
. The EventHandler
base class will do it automatically if it gets an event that has no registered handler.
When the event is handled successfully (neither failed nor ignored), the handler needs to return EventHandlingStatus.Success
. Again, the EventHandler
base class will do it automatically if the registered handler doesn’t throw.
The subscription will acknowledge the event only if all of its handlers don’t fail. How subscriptions handle failures depends on the transport type.
Registration
As mentioned before, you’d normally register subscriptions using the DI extensions provided by Eventuous:
builder.Services.AddSubscription<StreamSubscription, StreamSubscriptionOptions>(
"PaymentIntegration",
builder => builder
.Configure(x => x.StreamName = PaymentsIntegrationHandler.Stream)
.AddEventHandler<PaymentsIntegrationHandler>()
);
The AddSubscription
extension needs two generic arguments: subscription implementation and its options. Every implementation has its own options as the options configure the particular subscription transport.
The first parameter for AddSubscription
is the subscription name. It must be unique within the application scope. Eventuous uses the subscription name to separate one subscription from another, along with their options and other things. The subscription name is also used in diagnostics as a tag.
Then, you need to specify how the subscription builds. There are two most used functions in the builder:
Configure
: allows to change the subscription optionsAddEventHandler<T>
: adds an event handler to the subscription
You can add several handlers to the subscription, and they will always “move” together throw the events stream or topic. If any of the handlers fail, the subscription might fail, so it’s “all or nothing” strategy.
Eventuous uses the consume pipe, where it’s possible to add filters (similar to MassTransit). You won’t need to think about it in most of the cases, but you can read mode in the Pipes and filters section.
Subscription drops
A subscription could drop for different reasons. For example, it fails to pass the keep alive ping to the server due to a transient network failure, or it gets overloaded.
The subscription service handles such drops and issues a resubscribe request, unless the application is shutting down, so the drop is deliberate.
This feature makes the subscription service resilient to transient failures, so it will recover from drops and continue processing events, when possible.
You can configure the subscription to ignore failures and continue by setting ThrowIfError
property of SubscriptionOptions
to false
.
4 - Consume pipe
Pipes and filters
When a subscription gets a message from the transport, it will create a consume context with the message itself, and some contextual information like message id, message type, subscription id, etc. Then, it will pass it over to the consume pipe. A pipe is a set of filters, which are executed sequentially, like middlewares. Each filter can do something with the context, and then calls the next filter. The process continues until there are no filters left. By that time, the message is considered consumed, and the pipe finishes.
Eventuous consume pipe can be customised by adding filters to it, but normally you’d use the default pipe, which can be visualized like this:

Default consume pipeline
Some subscriptions conditionally or unconditionally add filters to the pipe, when they detect the default pipe. For example, EventStoreDB catch-up subscription might add the concurrent filter and partitioning filter, and RabbitMQ subscription uses the concurrent filter.
Available filters
Eventuous has two interfaces for filters, but all the provided filters are based on ConsumeFilter
and ConsumeFilter<T>
base classes. There, <T>
is the context type, as some filters can only process certain context types. The validation of filter vs context type matching is done when the pipeline is constructed, so it won’t fail at runtime.
Out of the box, you can find the following filters:
ConsumerFilter
: (mandatory) the final filter in the pipe, which hosts the consumerMessageFilter
: (optional) allows filtering out messages based on context data (message type, meta, payload, etc.)TracingFilter
: (optional) traces the event handling process, added by default when diagnostics aren’t disabledConcurrentFilter
: (optional) allows splitting message processing from the subscriptionPartitioningFilter
: (optional) allows parallel message processing with partitions
Consumer filter
The consumer filter holds an instance of the message consumer. By default, Eventuous uses the DefaultConsumer
, and it doesn’t require any configuration. You can override that using the UseConsumer<T>()
function of the subscription builder when you register a subscription.
The default consumer Acks the message when all the handlers processed the message without failures, and at least one handler didn’t ignore the message. It Nacks the message if any handler returned an error or crashed. Finally, it will ignore the message if all the handlers ignored it. How the message handling result is processed is unknown to the consumer as this behaviour is transport-specific. Each subscription type has its own way to process the message handling result.
When building a custom consumer, you’d need to include similar event handling result logic into it.
The consumer filter is a mandatory filter that should be the final filter in the pipe. When you register a subscription, the consumer filter is added to the pipe by default.
Message filter
The message filter can be used to prevent some messages from going down the pipe by filtering those messages out. When constructed, it needs a single constructor argument, which is a filtering function. The function gets an instance of IMessageContext
, so it can use all of its available properties to filter messages. When the message is filtered out, the filter will mark it as ignored.
Example:
builder.Services.AddSubscription<StreamSubscription, StreamSubscriptionOptions>(
"nonFooSubscription",
cfg => cfg.AddConsumeFilterFirst(new MessageFilter(x => !x.MessageType.StartWith("foo")))
);
Such a subscription will ignore all the events that have the type starting with “foo”.
Tracing filter
The tracing filter gets added automatically when Eventuous diagnostics is enabled (or, not disabled, as it’s enabled by default). Read more about Eventuous tracing in the Diagnostics section.
Concurrent filter
When using the concurrent filter, the pipe gets separated in two parts: before the concurrent filter and after it. The filter itself creates a channel of a given size (100 by default), where it puts all the messages in order. Then, the pipeline returns to the subscription as if the message was already consumed. A parallel process fetches messages from the channel continuously, and sends them to the second part of the pipe where messages actually gets consumed.
Because of such a split, the result returned to the subscription doesn’t contain the actual handling result. Therefore, the concurrent filter can only be used in combination with DelayedAckConsumeContext
. It’s the responsibility of a subscription to create such a context type, so only some subscription types support using the concurrent filter. This context type has two additional functions to Ack (Acknowledge
function) or Nack (Fail
function) each message. The subscription provides specific implementations of those functions. For example, an EventStoreDB catch-up subscription would commit the checkpoint when the event is acknowledged. The filter calls these functions itself, based on the result returned by the second part of the pipe (delayed consume).

Pipe with concurrent filter
The concurrent filter is useful in combination with partitioning filter, but it can also be used for other purposes, like batched event processing.
Subscriptions that support delayed consume add this filter by default when necessary (based on the subscription configuration). Check the particular transport subscription implementation to learn more.
Partitioning filter
Sometimes the subscription workload gets too high for it to cope with the number of delivered messages, and process them fast enough one by one. In this case, the partitioning filter can be useful as it allows handling messages in parallel. Most of the time you’d want the messages to be delivered in order within some partition. For example, it is often important to process messages from a single aggregate in order. To support that, you can use the stream name (which contains the aggregate id) as the partition key.

Pipe with concurrent and partitioning filters
You can use two options for configuring the filter:
- Provide a function that calculates the partition hash
- Provide a function that returns the partition key
- Provide nothing
Using the second option is easier as you don’t need to calculate the hash, it’s enough to return something like stream id. In that case, the filter will use MurmurHash3
to calculate the hash for a given partition key.
The third option will use the default partition key function that uses the message stream as partition key.
All those options require the number of partitions provided as an argument.
As the partitioning filter processes messages in multiple threads, it cannot provide the global processing order. The order of processing is guaranteed within a partition. Due to that fact, it needs to be able to Ack or Nack messages individually, without maintaining global order. Therefore, it only works with DelayedAckConsumeContext
, and it’s often used with the concurrent filter for best performance. As you can imagine, partitioning can only be used for subscriptions that support delayed consume.
Subscriptions that support delayed consume add this filter by default when necessary (based on the subscription configuration) together with the concurrent filter. Check the particular transport subscription implementation to learn more.
5 - Event handlers
Event handlers are at the end of the subscription event processing pipeline. That’s the place where something actually happens with the event.
6 - Diagnostics
Out of the box, Eventuous provides metrics and traces for monitoring your subscriptions.
Also, study the Diagnostics section for more information.
Mind the gap
When using subscriptions for read model projections, you enter to the world of CQRS and asynchronous messaging. After completing a transaction in the domain model, one or more events are added to the event store. Then, a subscription picks it up and calls a projection. Although in most cases you’ll see only a few milliseconds delay between these operations, if your projection becomes slow (for example, it uses a slow database), the processing time will increase.
The easiest way to detect such situations is to observe the gap between the last event in the stream, which the subscription listens to, and the event, which is currently being processed. We call it the subscription gap.
βοΈ Alerting for the gap
The gap is measured by subscriptions that implement the IMeasuredSubscription
interface:
public interface IMeasuredSubscription {
GetSubscriptionGap GetMeasure();
}
Subscription gaps are collected as metrics. Read more about Eventuous subscription metrics below.
Subscription metrics
Eventuous collects two types of metrics for subscriptions:
- Subscription gap in time and count
- Subscription performance
For the subscription gap, Eventuous collects the time and count of the gap between the last event in the stream, which the subscription listens to, and the event, which is currently being processed.
Keep in mind that due to the limitation of EventStoreDB, the gap event count is not accurate when subscribing to $all
stream. It’s because the $all
stream position is the commit position of the event in the global log, not the position of the event in the stream. Unfortunately, as per today, it is impossible to translate the commit position gap to the number of events.
Here is an example of the subscription gap metric exported to Prometheus:
# HELP eventuous_subscription_gap_count_events Gap between the last processed event and the stream tail
# TYPE eventuous_subscription_gap_count_events gauge
eventuous_subscription_gap_count_events{subscription_id="BookingsProjections"} 1098 1649081749067
eventuous_subscription_gap_count_events{subscription_id="PaymentIntegration"} 0 1649081749067
In addition to the subscription gap metric, Eventuous also collects metrics for the subscription performance as a histogram. The metric name is eventuous_subscription_duration
. It measures the duration of handling a single event. As handling different event types might trigger different kinds of processing, the histogram is tagged with the message type (message_type
tag).
All the subscription metrics are tagged by the subscription id (subscription_id
tag). It allows you to plot a graph of the subscription metrics for different subscriptions.
Here is an example of the subscription duration metrics exported as a Prometheus metric for the subscription with id BookingsProjections
:
# HELP eventuous_subscription_duration_ms Processing duration, milliseconds
# TYPE eventuous_subscription_duration_ms histogram
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="0"} 0 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="5"} 0 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="10"} 0 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="25"} 0 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="50"} 0 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="75"} 1 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="100"} 1 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="250"} 1 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="500"} 1 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="1000"} 1 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="+Inf"} 1 1649081749067
eventuous_subscription_duration_ms_sum{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections"} 50.429 1649081749067
eventuous_subscription_duration_ms_count{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections"} 1 1649081749067
Subscription tracing
Each part of the subscription event processing pipeline is traced. Each event is wrapped into a consume context by the subscription, then passed to the consumer. The consumer then calls all the event handlers.
As all Eventuous producers and event store(s) are instrumented, they propagate the trace context to the subscription. The subscription then propagates the trace context to the consumer, and the consumer then propagates the trace context to the event handlers, so all these operations are traced.
Here’s an example of the trace context propagated to the subscription visualised by Zipkin:

Subscription trace
As you can see, event handlers for the BookingProjection
subscription are projections, and trigger updates in MongoDB, which are also traced because the sample application is instrumented by MongoDB tracing library.
As mentioned before, Eventuous ensures that the tracing context is propagated also remotely, so you get the full trace even if the subscription is running in a different process.
Health checks
The subscription service class also implements the IHealthCheck
interface. Therefore, it can be used for ASP.NET Core health monitoring.
When using Eventuous dependency injection extensions, each registered subscription will also register its health checks.
However, you need to register the global Eventuous subscriptions health check by adding one line to the ASP.NET Core health checks registration:
builder.Services
.AddHealthChecks()
.AddSubscriptionsHealthCheck("subscriptions", HealthStatus.Unhealthy, new []{"tag"});
7 - EventStoreDB
EventStoreDB natively supports real-time subscriptions, which will also deliver historical events when you don’t specify the starting position. It makes the product a perfect candidate to support event-sourced systems, as you won’t need to invent things like CDC- or pull-based subscriptions.
Eventuous supports all subscription kinds provided by EventStoreDB:
7.1 - All stream subscription
Subscribing to all events in the store is extremely valuable. This way, you can build comprehensive read models, which consolidate information from multiple aggregates. You can also use such a subscription for integration purposes, to convert and publish integration events.
All stream subscription
For registering a subscription to $all
stream, use `AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions> as shown below:
builder.Services.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
"BookingsProjections",
builder => builder
.AddEventHandler<BookingStateProjection>()
.AddEventHandler<MyBookingsProjection>()
);
Subscription options for AllStreamSubscription
are defined in AllStreamSubscriptionOptions
class.
Option | Description |
---|---|
SubscriptionId |
Unique subscription identifier. |
ThrowOnError |
If true , an exception will be thrown if the subscription fails, otherwise the subscription continues to run. Default is false . |
EventSerilizer |
Serializer for events, if null the default serializer will be used. |
MetadataSerilizer |
Serializer for metadata, if null the default serializer will be used. |
Credentials |
EventStoreDB user credentials. If not specified, the credentials specified in the EventStoreClientSettings will be used. |
ResolveLinkTos |
If true , the subscription will automatically resolve the event link to the event that caused the event. Default is false . |
ConcurrencyLimit |
Maximum number of events to be processed in parallel. Default is 1 . |
EventFilter |
Filter for events, if null , the subscription will filter out system events. |
CheckpointInterval |
Interval between checkpoints when event filter is used. Default is 10 events. This interval tells the subscription to report the current checkpoint when the subscription doesn’t receive any events for this interval because all the events were filtered out. |
Checkpoint store
AllStreamSubscription
is a catch-up subscription that is fully managed on the client side (your application), so you need to manage the checkpoint. You can register the checkpoint store using AddCheckpointStore<T>
, but in that case it will be used for all subscriptions in the application. It might be that your app has multiple subscriptions, and you want to use different checkpoint stores for each of them. In that case, you can register the checkpoint store for each subscription using UseCheckpointStore<T>
extension of the subscription builder
builder.Services.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
"BookingsProjections",
builder => builder
.UseCheckpointStore<MongoCheckpointStore>()
.AddEventHandler<BookingStateProjection>()
.AddEventHandler<MyBookingsProjection>()
);
Concurrent event handlers
As any catch-up subscription, subscription to $all
runs sequentially, processing events one by one. In many cases that’s enough, but sometimes you might want to speed it up, and allow parallel processing of events. To do that, you need to set the ConcurrencyLimit
subscription option property to a value that is equal to the number of events being processed in parallel. In addition, you need to tell the subscription how to distribute events into partitions. That is needed as you rarely can tolerate processing events in a completely random order, so you can partition events using some key, and distribute them to different partitions.
Here is an example of using AllStreamSubscription
with ConcurrencyLimit
and partitioning by stream name:
var partitionCount = 2;
builder.Services.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
"BookingsProjections",
builder => builder
.Configure(cfg => cfg.ConcurrencyLimit = partitionCount)
.AddEventHandler<BookingStateProjection>()
.AddEventHandler<MyBookingsProjection>()
.WithPartitioningByStream(partitionCount)
);
You can build your own partitioning strategy by implementing the GetPartitionKey
function:
public delegate string GetPartitionKey(IMessageConsumeContext context);
and then using it in the WithPartitioning
extension:
builder => builder
.Configure(cfg => cfg.ConcurrencyLimit = partitionCount)
... // add handlers
.WithPartitioning(partitionCount, MyPartitionFunction)
7.2 - Stream subscription
Although subscribing to $all
using AllStreamSubscription
is the most efficient way to create, for example, read models using all events in the event store, it is also possible to subscribe to a single stream.
For example, you can subscribe to the $ce-Booking
stream to project all events for all the aggregates of type Booking
, and create some representation of the state of the aggregate in a queryable store.
Another scenario is to subscribe to an integration stream, when you use EventStoreDB as a backend for a messaging system.
For that purpose you can use the StreamSubscription
class.
Single stream subscription
For registering a subscription to a single stream, use `AddSubscription<StreamSubscription, StreamSubscriptionOptions> as shown below:
builder.Services.AddSubscription<StreamSubscription, StreamSubscriptionOptions>(
"BookingsStateProjections",
builder => builder
.Configure(cfg => {
cfg.StreamName = "$ce-Booking";
cfg.ResolveLinkTos = true;
)
.AddEventHandler<BookingStateProjection>()
);
Subscription options for StreamSubscription
are defined in StreamSubscriptionOptions
class.
Option | Description |
---|---|
SubscriptionId |
Unique subscription identifier. |
StreamName |
Name of the stream to subscribe to. |
ThrowOnError |
If true , an exception will be thrown if the subscription fails, otherwise the subscription continues to run. Default is false . |
EventSerilizer |
Serializer for events, if null the default serializer will be used. |
MetadataSerilizer |
Serializer for metadata, if null the default serializer will be used. |
Credentials |
EventStoreDB user credentials. If not specified, the credentials specified in the EventStoreClientSettings will be used. |
ResolveLinkTos |
If true , the subscription will automatically resolve the event link to the event that caused the event. Default is false . |
IgnoreSystemEvents |
Set to true to ignore system events. Default is true . |
ConcurrencyLimit |
Maximum number of events to be processed in parallel. Default is 1 . |
ResolveLinkTos
option to true
to resolve the link to the original event that is linked to the link event.
Checkpoint store
StreamSubscription
is a catch-up subscription that is fully managed on the client side (your application), so you need to manage the checkpoint. You can register the checkpoint store using AddCheckpointStore<T>
, but in that case it will be used for all subscriptions in the application. It might be that your app has multiple subscriptions, and you want to use different checkpoint stores for each of them. In that case, you can register the checkpoint store for each subscription using UseCheckpointStore<T>
extension of the subscription builder
builder.Services.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
"BookingsProjections",
builder => builder
.Configure(cfg => {
cfg.StreamName = "$ce-Booking";
cfg.ResolveLinkTos = true;
)
.UseCheckpointStore<MongoCheckpointStore>()
.AddEventHandler<BookingStateProjection>()
);
Concurrent event handlers
The single stream subscription is identical to the $all
stream subscription in terms of the event handlers execution. By default, all the events are processed one-by-one, but you can use the ConcurrencyLimit
option to process multiple events in parallel.
You can use the stream name partitioner when subscribing to a category ($ce
) stream. In that case events for a single aggregate instance will always be processed sequentially, but events for different aggregate instances can be processed in parallel.
Read more about concurrent event processing on the all stream subscription page.
7.3 - Persistent subscription
βοΈ Ordered events
8 - Google PubSub
WIP
9 - RabbitMQ
[WIP]