Subscription service

Eventuous implements all subscription types as hosted services. It’s because subscriptions need to start when the application starts, work in background when the application is running, then shut down when the application stops.

Eventuous has a SubscriptionService base class. Both AllStreamSubscriptionService and StreamSubscriptionService inherit from it as it handles most of the subscription mechanics, such as:

Service arguments

The subscription service requires the following arguments:

Parameter name Type What’s it for Required
eventStoreClient EventStoreClient Client for EventStoreDB Yes
subscriptionName string Identifier to select event handlers Yes
checkpointStore ICheckpointStore Checkpoint store Yes
eventSerializer IEventSerializer Event serializer Yes
eventHandlers IEnumerable<IEventHandler> List of event handlers Yes
loggerFactory ILoggerFactory? Microsoft logging logger factory instance No
measure ProjectionGapMeasure? Callback to report the subscription gap No

Event handlers

As mentioned on the Concept page, one subscription might serve multiple event handlers, such as projections. It is especially relevant to keep a group of projections in sync, so they don’t produce inconsistent [read models]({{ ref “rm-concepts” }}).

Each subscription service gets a list of event handlers. An event handler must implement the IEventHandler interface, which has two members:

public interface IEventHandler {
    string SubscriptionGroup { get; }
    Task HandleEvent(object evt, long? position);
}

The HandleEvent function will be called by the subscription service for each event it receives. The event is already deserialized. The function also gets the event position in the stream. It might be used in projections to set some property of the read model. Using this property in queries will tell you if the projection is up to date.

The interface also has the SubscriptionGroup property. It is used by the subscription to only select the relevant event handlers, as it might get all the handlers in the application (for example, if all the handlers are registered in the DI container). The subscription will only serve those handlers, which have the SubscriptionGroup property value matching the subscription own subscriptionName argument value.

Subscription drops

An EventStoreDB subscription could drop for different reasons. For example, it fails to pass the keep alive ping to the server due to a transient network failure, or it gets overloaded.

The subscription service handles such drops and issues a resubscribe request, unless the application is shutting down, so the drop is deliberate.

This feature makes the subscription service resilient to transient failures, so it will recover from drops and continue processing events, when possible.

Mind the gap

When using subscriptions for read model projections, you enter to the world of CQRS and asynchronous messaging. After completing a transaction in the domain model, one or more events are added to the event store. Then, a subscription picks it up and calls a projection. Although in most cases you’ll see only a few milliseconds delay between these operations, if your projection becomes slow (for example, it uses a slow database), the processing time will increase.

The easiest way to detect such situations is to observe the gap between the last event in the stream, which the subscription listens to, and the event, which is currently being processed. We call it the subscription gap.

The gap is measured by supplying a SubscriptionGapMeasure instance, which has a function that you can use for your metric:

public ulong GetGap(string checkpointId) => _gaps[checkpointId];

You only need a single instance of the SubscriptionGapMeasure in the application, as it handles multiple subscriptions. The checkpointId there has the same value as the subscriptionName. The gap is measured once per second.

Health checks

The subscription service class also implements the IHealthCheck interface. Therefore, it can be used for ASP.NET Core health monitoring.

For example, you can register your subscription as a hosted service, and then add it to the health check configuration:

services.AddHostedService<MySubscription>();
services.AddHealthChecks().AddCheck<MySubscriptionService>();

In addition, Eventuous provides a helper registration method for the DI container, which does both. You can also supply the health name and tags for each subscription:

services.AddSubscription<MySubscriptionService>("state-update", new[] {"esdb"});

Edit this page on GitHub