This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Subscriptions

Real-time event processing using subscriptions

1 - Real-time subscription

The concept of real-time subscriptions

Real-time subscriptions are essential when building a successful event-sourced system. They support the following sequence of operations:

  • Domain model emits new event
  • The event is stored to the event store
  • The command is now handled, and the transaction is complete
  • All the read models get the new event from the store and project it if necessary

Most of the time, subscriptions are used for two purposes:

  1. Deliver events to reporting models
  2. Emit integration events

Subscriptions can subscribe from any position of a stream, but normally you’d subscribe from the beginning of the stream, which allows you to process all the historical events. For integration purposes, however, you would usually subscribe from now, as emitting historical events to other systems might produce undesired side effects.

One subscription can serve multiple event handlers. Normally, you would identify a group of event handlers, which need to be triggered together, and group them within one subscription. This way you avoid situations when, for example, two real models get updated at different speed and show confusing information if those read models are shown on the same screen.

Subscriptions need to maintain their own checkpoints, so when the service that host a subscription restarts, it will start receiving events from the last known position in the stream.

Most often, you’d want to subscribe to the global event stream, so you can build read models, which compose information from different aggregates. Eventuous offers the All stream subscription for this use case. In some cases you’d need to subscribe to a regular stream using the stream subscription.

In Eventuous, subscriptions are specific to event store implementation. In addition, Eventuous has subscriptions to message brokers like Google PubSub and RabbitMQ for integration purposes.

The wrong way

One of the common mistakes people make when building an event-sourced application is using an event store which is not capable of handling realtime subscriptions. It forces developers to engage some sort of message bus to deliver new events to subscribers. There are quite a few issues with that approach, but the most obvious one is a two-phase commit.

When using two distinct pieces of infrastructure in one transaction, you risk one of those operations to fail. Let’s use the following example code, which is very common:

await _repository.Save(newEvents);
await _bus.Publish(newEvents);

If the second operation fails, the command side of the application would remain consistent. However, any read models, which projects those events, will not be updated. So, essentially, the reporting model will become inconsistent against the transactional model. The worst part is that the reporting model will never recover from the failure.

As mentioned, there are multiple issues of using a message bus as transport to deliver events to reporting models, but we won’t be covering them on this page.

The easiest way to solve the issue is to use a database which supports realtime subscriptions to event streams out of the box. That’s why we use EventStoreDB as the primary event store implementation.

The right way

It is not hard to avoid the two-phase commits and ensure to publish domain events reliably if you use a proper event store. The aim here would be to achieve the following architecture:

Consistent event flow

Why is this flow “consistent”? It’s because the command handling process always finishes with an append operation towards event store (unless the command fails). There’s no explicit Publish or Produce call that happens after the event is persisted. A proper event store should have a capability for to subscribe for getting all the new events when they appear in the store. As the Append operation of the event store is transactional, such a subscription will get the event only once, if it is capable to acknowledge the event correctly back to the subscription.

Subscriptions have many purposes. Most often, you would use a subscription to project domain events to your query models (read models). You can also use subscriptions to transform and publish integration events (see Gateway).

Subscriptions can also be used for integration purposes in combination with Producers.

Implementation

Eventuous implement subscriptions for different kinds of infrastructure (transport). As each transport is different, the way to configure them can be different, as well as the way subscriptions work. Read carefully the guidelines for each transport to understand what transport to use for your use case.

2 - Checkpoints

What’s a checkpoint and why you need to store it

When you subscribe to an event store, you need to decide what events you want to receive. A proper event store would allow you to subscribe to any event stream, or to a global stream (All stream), which contains all the events from the store, ordered by the time they were appended. Event-oriented brokers that support persistence as ordered event logs also support subscriptions, normally called consumers, as it’s the publish-subscribe broker terminology.

The subscription decides at what stream position it wants to start receiving events. If you want to process all the historical events, you’d subscribe from the beginning of the stream. If you want to only receive real-time events, you need to subscribe from now.

What’s the checkpoint?

As the subscription receives and processes events, it moves further along the stream it subscribed to. Every event the subscription receives and processes has a position in the subscribed stream. This position can be used as a checkpoint of the subscription. If the application that hosts the subscription eventually shuts down, you’d want the subscription to resubscribe from the position, which was processed last, plus one. That’s how you achieve exactly one event handling. Therefore, the subscription needs to take care about storing its checkpoint somewhere, so the last known position can be retrieved from the checkpoint store and used to resubscribe.

Some log-based brokers also use the term offset to describe the checkpoint concept.

Subscriptions, which are managed by the server, don’t require storing checkpoints on the client side. For example, EventStoreDB persistent subscriptions and Google PubSub subscriptions don’t require a client-side checkpoint store. Some subscriptions, like RabbitMQ subscription, don’t have this concept at all, as RabbitMQ doesn’t keep consumed messages, neither ACKed nor NACKed.

Checkpoint store

Eventuous provides an abstraction, which allows subscriptions to store checkpoints reliably. You can decide to store it in a file or in a database. You can also decide if you want to store a checkpoint after processing each event, or only flush it now and then. Periodical checkpoint flush decreases the pressure on the infrastructure behind the checkpoint store, but also requires you to make your subscription idempotent. It’s usually hard or impossible for integration since you can rarely check if you published an event to a broker or not. However, it can work for read model projections.

Abstraction

The checkpoint store interface is simple, it only has two functions:

interface ICheckpointStore {
    ValueTask<Checkpoint> GetLastCheckpoint(
        string checkpointId,
        CancellationToken cancellationToken
    );

    ValueTask<Checkpoint> StoreCheckpoint(
        Checkpoint checkpoint,
        CancellationToken cancellationToken
    );
}

The Checkpoint record is a simple record, which aims to represent a stream position in any kind of event store:

record Checkpoint(string Id, ulong? Position);

Available stores

If a supported projection type in an Eventuous package for projections requires a checkpoint store, you can find its implementation in that package. For example, the Eventuous.MongoDB package has a checkpoint store implementation for MongoDB.

If you register subscriptions in the DI container, you also need to register the checkpoint store:

builder.Services.AddSingleton<IMongoDatabase>(Mongo.ConfigureMongo());
builder.Services.AddCheckpointStore<MongoCheckpointStore>();

In case you have multiple subscriptions in one service, and you project to different databases (for example, MongoDB and PostgreSQL), you need to specify the checkpoint store for each subscription. In this case, you don’t need to register the checkpoint store globally in the DI container, but use the UseCheckpointStore method when building your subscription:

services.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
    "BookingsProjections",
    builder => builder
        .Configure(cfg => cfg.ConcurrencyLimit = 2)
        .UseCheckpointStore<MongoCheckpointStore>()
        .AddEventHandler<BookingStateProjection>()
        .AddEventHandler<MyBookingsProjection>()
        .WithPartitioningByStream(2)
);

MongoDB

The MongoDB checkpoint store will create a collection called checkpoint where it will keep one document per subscription.

Each checkpoint document contains the checkpoint id, which is the subscription id. Therefore, you only get one checkpoint collection per database.

Elasticsearch

The Elasticsearch checkpoint store will create and use the checkpoint index, and the document id there would be the subscription id.

PostgreSQL

The Postgres checkpoint store will create and use the checkpoint table, and the row id there would be the subscription id. Here is the script used to create that table:

create table if not exists __schema__.checkpoints (
    id varchar primary key, 
    position bigint null 
);

Other stores

In addition to that, Eventuous has two implementations in the core subscriptions package:

  • MeasuredCheckpointStore: creates a trace for all the IO operations, wraps an existing store
  • NoOpCheckpointStore: does nothing, used in Eventuous tests

The measured store is used by default if Eventuous diagnostics aren’t disabled, and you use the AddCheckpointStore container registration extension.

Checkpoint commit handler

In addition to checkpoint store, Eventuous has a more advanced way to work with checkpoints. It doesn’t load or store checkpoints by itself, for that purpose it uses the provided checkpoint store. However, the commit handler is able to receive a stream of unordered checkpoints, reorder them, detect possible gaps, and only store the checkpoint that is the latest before the gap.

For subscriptions that support delayed consume (see Partitioning filter) and require a checkpoint store, you must use the commit handler. All such subscription types provided by Eventuous use the checkpoint commit handler.

Unless you create your own subscription with such requirements, you don’t need to know the internals of the commit handler. However, you would benefit to know the consequences of delayed event processing with supported subscriptions.

When events get partitioned by the filter, several consumer instances process events in parallel. As a result, each partition will get checkpoints with gaps. When partitioned consumers process events, they run at different speed. Each event inside DelayedConsumeContext is explicitly acknowledged, and when it happens, the checkpoint gets to the commit handler queue. The commit handler then is able to accumulate checkpoints, detect gaps in the sequence, and only store the latest checkpoint in a gap-less sequence.

Gap in the commit handler queue

As we talk about gaps, you might face a situation when the commit handler has a list of uncommitted checkpoints with gaps, and the application stops. When this happens, some events were already processed, whilst checkpoints for those events remain in-flight. When the application restarts, it loads the checkpoint that points to some position in the stream that is earlier than positions of already processed events. Because of that, some events will be processed by event handlers again. Therefore, you need to make sure that your event handlers are idempotent, so when the same events are processed again, the result of the processing won’t create any undesired side effects.

3 - Subscription base

How Eventuous subscriptions work

The base abstract class for subscriptions is the IMessageSubscription interface, but all the available subscriptions are based on the EventSubscription base class, which is a generic abstract class where its type parameter is the subscription options type. All the provided subscription options types inherit from the SubscriptionOptions base class.

The SubscriptionOptions base class has three properties:

  • SubscriptionId: a unique identifier for the subscription
  • ThrowOnError: a boolean indicating whether the subscription should throw an exception if an error occurs. When the subscription throws, it either NACKs the message, or stops the subscription depending on the implementation.
  • EventSerializer: an instance of the IEventSerializer interface, which is used to serialize and deserialize events. If not provided, the default serializer is used.

Each provided subscription options type has more options, which depend on the subscription implementation details.

To host a subscription and manage its lifecycle, Eventuous has a hosted service called SubscriptionHostedService. Each registered subscription gets its own hosted service, so that each subscription can be managed independently. When using Eventuous subscription registration extensions for the DI container, the hosted service is registered automatically.

You’d normally use the DI container to register subscriptions with all the necessary handlers (described below).

Event handlers

As mentioned on the Concept page, one subscription might serve multiple event handlers, such as projections. It is especially relevant to keep a group of projections in sync, so they don’t produce inconsistent read models.

Each subscription service gets a list of event handlers. An event handler must implement the IEventHandler interface, which has two members:

public interface IEventHandler {
    string DiagnosticName { get; }
    ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContext context);
}

The HandleEvent function will be called by the subscription service for each event it receives. The event is already deserialized. The function also gets the event position in the stream. It might be used in projections to set some property of the read model. Using this property in queries will tell you if the projection is up-to-date.

The diagnostic name of the handler is used to distinguish logs in traces coming from a subscription per individual handler.

Normally Eventuous uses either the BaseEventHandler abstract base class. For specific implementations, you’d either use a built-in handler provided by a projection (like MongoDB projection), or the EventHandler abstract base class.

Consume context

The subscription will invoke all its event handlers at once for each event received. Instead of just getting the event, the handler will get an instance of the message context (IMessageConsumeContext interface). The context contains the payload (event or other message) in its Message property, which has the type object?. It’s possible to handle each event type differently by using pattern matching:

public ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContext ctx) {
    return ctx.Message switch {
        V1.RoomBooked => ...
        _ => EventHandlingStatus.Ignored
    };
}

However, it’s easier and more explicit to use pre-optimised base handlers. For read model projections you can use [Projections] handlers, described separately. For integration purposes you might want to use the Gateway. For more generic needs, Eventuous offers the EventHandler base class. It allows specifying typed handlers for each of the event types that the handler processes:

public class MyHandler : EventHandler {
    public MyHandler(SmsService smsService) {
        On<RoomBooked>(async ctx => await smsService.Send($"Room {ctx.Message.RoomId} booked!"));
    } 
}

The typed handler will get an instance of MessageConsumeContext<T> where T is the message type. There, you can access the message using the Message property without casting it.

A subscription will invoke a single consumer and give it an instance of the consume context. The consumer’s job is to handle the message by invoking all the registered handlers. By default, Eventuous uses the DefaultConsumer, unless specified otherwise when registering the subscription. The IMessageConsumeContext interface has functions to acknowledge (ACK), not acknowledge (NACK), or ignore the message. The default consumer ACKs the message when all the handlers processed the message without failures, and at least one handler didn’t ignore the message. It NACKs the message if any handler returned an error or crashed. Finally, it will ignore the message if all the handlers ignored it. How the message handling result is processed is unknown to the consumer as this behaviour is transport-specific. Each subscription type has its own way to process the message handling result.

Handling result

The handler needs to return the handling status. It’s preferred to return the error status EventHandlingStatus.Failure instead of throwing an exception. When using the EventHandler base class, if the event handling function throws an exception, the handler will return the failure status and not float the exception back to the subscription.

The status is important for diagnostic purposes. For example, you don’t want to trace event handlers for events that are ignored. That’s why when you don’t want to process the event, you need to return EventHandlingStatus.Ignored. The EventHandler base class will do it automatically if it gets an event that has no registered handler.

When the event is handled successfully (neither failed nor ignored), the handler needs to return EventHandlingStatus.Success. Again, the EventHandler base class will do it automatically if the registered handler doesn’t throw.

The subscription will acknowledge the event only if all of its handlers don’t fail. How subscriptions handle failures depends on the transport type.

Registration

As mentioned before, you’d normally register subscriptions using the DI extensions provided by Eventuous. This example uses the EventStoreDB subscription.

builder.Services.AddSubscription<StreamSubscription, StreamSubscriptionOptions>(
    "PaymentIntegration",
    builder => builder
        .Configure(x => x.StreamName = PaymentsIntegrationHandler.Stream)
        .AddEventHandler<PaymentsIntegrationHandler>()
);

The AddSubscription extension needs two generic arguments: subscription implementation and its options. Every implementation has its own options as the options configure the particular subscription transport.

The first parameter for AddSubscription is the subscription name. It must be unique within the application scope. Eventuous uses the subscription name to separate one subscription from another, along with their options and other things. The subscription name is also used in diagnostics as a tag.

Then, you need to specify how the subscription builds. There are two most used functions in the builder:

  • Configure: allows to change the subscription options
  • AddEventHandler<T>: adds an event handler to the subscription

You can add several handlers to the subscription, and they will always “move” together throw the events stream or topic. If any of the handlers fail, the subscription might fail, so it’s “all or nothing” strategy.

Eventuous uses the consume pipe, where it’s possible to add filters (similar to MassTransit). You won’t need to think about it in most of the cases, but you can read mode in the Pipes and filters section.

Subscription drops

A subscription could drop for different reasons. For example, it fails to pass the keep alive ping to the server due to a transient network failure, or it gets overloaded.

The subscription service handles such drops and issues a resubscribe request, unless the application is shutting down, so the drop is deliberate.

This feature makes the subscription service resilient to transient failures, so it will recover from drops and continue processing events, when possible.

You can configure the subscription to ignore failures and continue by setting ThrowIfError property of SubscriptionOptions to false.

4 - Consume pipe

Subscription consume pipes and filters

Pipes and filters

When a subscription gets a message from the transport, it will create a consume context with the message itself, and some contextual information like message id, message type, subscription id, etc. Then, it will pass it over to the consume pipe. A pipe is a set of filters, which are executed sequentially, like middlewares. Each filter can do something with the context, and then calls the next filter. The process continues until there are no filters left. By that time, the message is considered consumed, and the pipe finishes.

Eventuous consume pipe can be customised by adding filters to it, but normally you’d use the default pipe, which can be visualized like this:

Default consume pipeline

Some subscriptions conditionally or unconditionally add filters to the pipe, when they detect the default pipe. For example, EventStoreDB catch-up subscription might add the concurrent filter and partitioning filter, and RabbitMQ subscription uses the concurrent filter.

Available filters

Eventuous has two interfaces for filters, but all the provided filters are based on ConsumeFilter and ConsumeFilter<T> base classes. There, <T> is the context type, as some filters can only process certain context types. The validation of filter vs context type matching is done when the pipeline is constructed, so it won’t fail at runtime.

Out of the box, you can find the following filters:

  • ConsumerFilter: (mandatory) the final filter in the pipe, which hosts the consumer
  • MessageFilter: (optional) allows filtering out messages based on context data (message type, meta, payload, etc.)
  • TracingFilter: (optional) traces the event handling process, added by default when diagnostics aren’t disabled
  • ConcurrentFilter: (optional) allows splitting message processing from the subscription
  • PartitioningFilter: (optional) allows parallel message processing with partitions

Consumer filter

The consumer filter holds an instance of the message consumer. By default, Eventuous uses the DefaultConsumer, and it doesn’t require any configuration. You can override that using the UseConsumer<T>() function of the subscription builder when you register a subscription.

The default consumer Acks the message when all the handlers processed the message without failures, and at least one handler didn’t ignore the message. It Nacks the message if any handler returned an error or crashed. Finally, it will ignore the message if all the handlers ignored it. How the message handling result is processed is unknown to the consumer as this behaviour is transport-specific. Each subscription type has its own way to process the message handling result.

When building a custom consumer, you’d need to include similar event handling result logic into it.

The consumer filter is a mandatory filter that should be the final filter in the pipe. When you register a subscription, the consumer filter is added to the pipe by default.

Message filter

The message filter can be used to prevent some messages from going down the pipe by filtering those messages out. When constructed, it needs a single constructor argument, which is a filtering function. The function gets an instance of IMessageContext, so it can use all of its available properties to filter messages. When the message is filtered out, the filter will mark it as ignored.

Example:

builder.Services.AddSubscription<StreamSubscription, StreamSubscriptionOptions>(
    "nonFooSubscription",
    cfg => cfg.AddConsumeFilterFirst(new MessageFilter(x => !x.MessageType.StartWith("foo")))
);

Such a subscription will ignore all the events that have the type starting with “foo”.

Tracing filter

The tracing filter gets added automatically when Eventuous diagnostics is enabled (or, not disabled, as it’s enabled by default). Read more about Eventuous tracing in the Diagnostics section.

Concurrent filter

When using the concurrent filter, the pipe gets separated in two parts: before the concurrent filter and after it. The filter itself creates a channel of a given size (100 by default), where it puts all the messages in order. Then, the pipeline returns to the subscription as if the message was already consumed. A parallel process fetches messages from the channel continuously, and sends them to the second part of the pipe where messages actually gets consumed.

Because of such a split, the result returned to the subscription doesn’t contain the actual handling result. Therefore, the concurrent filter can only be used in combination with DelayedAckConsumeContext. It’s the responsibility of a subscription to create such a context type, so only some subscription types support using the concurrent filter. This context type has two additional functions to Ack (Acknowledge function) or Nack (Fail function) each message. The subscription provides specific implementations of those functions. For example, an EventStoreDB catch-up subscription would commit the checkpoint when the event is acknowledged. The filter calls these functions itself, based on the result returned by the second part of the pipe (delayed consume).

Pipe with concurrent filter

The concurrent filter is useful in combination with partitioning filter, but it can also be used for other purposes, like batched event processing.

Subscriptions that support delayed consume add this filter by default when necessary (based on the subscription configuration). Check the particular transport subscription implementation to learn more.

Partitioning filter

Sometimes the subscription workload gets too high for it to cope with the number of delivered messages, and process them fast enough one by one. In this case, the partitioning filter can be useful as it allows handling messages in parallel. Most of the time you’d want the messages to be delivered in order within some partition. For example, it is often important to process messages from a single aggregate in order. To support that, you can use the stream name (which contains the aggregate id) as the partition key.

Pipe with concurrent and partitioning filters

You can use two options for configuring the filter:

  • Provide a function that calculates the partition hash
  • Provide a function that returns the partition key
  • Provide nothing

Using the second option is easier as you don’t need to calculate the hash, it’s enough to return something like stream id. In that case, the filter will use MurmurHash3 to calculate the hash for a given partition key.

The third option will use the default partition key function that uses the message stream as partition key.

All those options require the number of partitions provided as an argument.

As the partitioning filter processes messages in multiple threads, it cannot provide the global processing order. The order of processing is guaranteed within a partition. Due to that fact, it needs to be able to Ack or Nack messages individually, without maintaining global order. Therefore, it only works with DelayedAckConsumeContext, and it’s often used with the concurrent filter for best performance. As you can imagine, partitioning can only be used for subscriptions that support delayed consume.

Subscriptions that support delayed consume add this filter by default when necessary (based on the subscription configuration) together with the concurrent filter. Check the particular transport subscription implementation to learn more.

5 - Event handlers

The last bit of the subscription process

Event handlers are at the end of the subscription event processing pipeline. That’s the place where something actually happens with the event.

6 - Diagnostics

Monitoring subscriptions

Out of the box, Eventuous provides metrics and traces for monitoring your subscriptions.

Also, study the Diagnostics section for more information.

Mind the gap

When using subscriptions for read model projections, you enter to the world of CQRS and asynchronous messaging. After completing a transaction in the domain model, one or more events are added to the event store. Then, a subscription picks it up and calls a projection. Although in most cases you’ll see only a few milliseconds delay between these operations, if your projection becomes slow (for example, it uses a slow database), the processing time will increase.

The easiest way to detect such situations is to observe the gap between the last event in the stream, which the subscription listens to, and the event, which is currently being processed. We call it the subscription gap.

The gap is measured by subscriptions that implement the IMeasuredSubscription interface:

public interface IMeasuredSubscription {
    GetSubscriptionGap GetMeasure();
}

Subscription gaps are collected as metrics. Read more about Eventuous subscription metrics below.

Subscription metrics

Eventuous collects two types of metrics for subscriptions:

  • Subscription gap in time and count
  • Subscription performance

For the subscription gap, Eventuous collects the time and count of the gap between the last event in the stream, which the subscription listens to, and the event, which is currently being processed.

Keep in mind that due to the limitation of EventStoreDB, the gap event count is not accurate when subscribing to $all stream. It’s because the $all stream position is the commit position of the event in the global log, not the position of the event in the stream. Unfortunately, as per today, it is impossible to translate the commit position gap to the number of events.

Here is an example of the subscription gap metric exported to Prometheus:

# HELP eventuous_subscription_gap_count_events Gap between the last processed event and the stream tail
# TYPE eventuous_subscription_gap_count_events gauge
eventuous_subscription_gap_count_events{subscription_id="BookingsProjections"} 1098 1649081749067
eventuous_subscription_gap_count_events{subscription_id="PaymentIntegration"} 0 1649081749067

In addition to the subscription gap metric, Eventuous also collects metrics for the subscription performance as a histogram. The metric name is eventuous_subscription_duration. It measures the duration of handling a single event. As handling different event types might trigger different kinds of processing, the histogram is tagged with the message type (message_type tag).

All the subscription metrics are tagged by the subscription id (subscription_id tag). It allows you to plot a graph of the subscription metrics for different subscriptions.

Here is an example of the subscription duration metrics exported as a Prometheus metric for the subscription with id BookingsProjections:

# HELP eventuous_subscription_duration_ms Processing duration, milliseconds
# TYPE eventuous_subscription_duration_ms histogram
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="0"} 0 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="5"} 0 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="10"} 0 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="25"} 0 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="50"} 0 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="75"} 1 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="100"} 1 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="250"} 1 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="500"} 1 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="1000"} 1 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="+Inf"} 1 1649081749067
eventuous_subscription_duration_ms_sum{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections"} 50.429 1649081749067
eventuous_subscription_duration_ms_count{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections"} 1 1649081749067

Subscription tracing

Each part of the subscription event processing pipeline is traced. Each event is wrapped into a consume context by the subscription, then passed to the consumer. The consumer then calls all the event handlers.

graph LR; Subscription --> Consumer Consumer --> B1[Handler 1] Consumer --> B2[Handler 2] Consumer --> B3[Handler 3]

As all Eventuous producers and event store(s) are instrumented, they propagate the trace context to the subscription. The subscription then propagates the trace context to the consumer, and the consumer then propagates the trace context to the event handlers, so all these operations are traced.

Here’s an example of the trace context propagated to the subscription visualised by Zipkin:

Subscription trace

As you can see, event handlers for the BookingProjection subscription are projections, and trigger updates in MongoDB, which are also traced because the sample application is instrumented by MongoDB tracing library.

As mentioned before, Eventuous ensures that the tracing context is propagated also remotely, so you get the full trace even if the subscription is running in a different process.

Health checks

The subscription service class also implements the IHealthCheck interface. Therefore, it can be used for ASP.NET Core health monitoring.

When using Eventuous dependency injection extensions, each registered subscription will also register its health checks.

However, you need to register the global Eventuous subscriptions health check by adding one line to the ASP.NET Core health checks registration:

builder.Services
  .AddHealthChecks()
  .AddSubscriptionsHealthCheck("subscriptions", HealthStatus.Unhealthy, new []{"tag"});