Checkpoints
When subscribing to an event store, it is important to consider which events you wish to receive. An effective event store should allow you to subscribe to individual event streams or to a global stream known as the "All stream", which contains all events in the store, organized in the order they were recorded. Many event-driven brokers that persist events as ordered logs also support subscriptions, which are often referred to as "consumers".
The subscription you choose will determine at what point in the stream you begin to receive events. If you want to process all historical events, it is necessary to subscribe from the beginning of the stream. However, if you only wish to receive real-time events, it is necessary to subscribe from the current point in time.
What's the checkpoint?
As the subscription receives and processes events, it progresses along the subscribed stream. Each event that is received and processed has a unique position within the stream, which serves as a checkpoint for the subscription. If the application hosting the subscription shuts down, it is necessary to resume processing events from the last recorded checkpoint, which is the position of the last processed event plus one. This ensures that each event is handled exactly once. As a result, the subscription must keep track of its checkpoint, either by storing it in a dedicated checkpoint store or by using the event store's built-in functionality.
In some log-based brokers, the concept of a checkpoint is referred to as an "offset". Some event-driven brokers manage subscriptions on the server-side, eliminating the need for client-side checkpoint storage. For example, persistent subscriptions in EventStoreDB and subscriptions in Google PubSub do not require a client-side checkpoint store. Other subscriptions, such as those managed by RabbitMQ, do not have the concept of a checkpoint as RabbitMQ does not retain consumed messages, whether they have been acknowledged or not.
Checkpoint store
Eventuous offers an abstraction layer that enables subscriptions to store checkpoints securely and reliably. You can choose to store the checkpoint in a file or database and determine the frequency at which you wish to store the checkpoint, whether after processing each event or periodically. Although periodic checkpoint storage reduces the stress on the infrastructure supporting the checkpoint store, it requires the subscription to be idempotent. This can be challenging, especially in integration scenarios where it is often difficult or impossible to determine if an event has been published to the broker or not. However, this approach may work for read model projections.
It is important to keep the checkpoint safe, as its loss will result in the subscription receiving all events. This may be intentional when creating a new read model, but it can also have unintended consequences in other scenarios.
On top of the abstraction Eventuous provides a few implementations of the checkpoint store, which you can use out of the box. You can also implement your own checkpoint store if you need to store the checkpoint in a custom location.
Abstraction
The checkpoint store interface is simple, it only has two functions:
interface ICheckpointStore {
ValueTask<Checkpoint> GetLastCheckpoint(
string checkpointId,
CancellationToken cancellationToken
);
ValueTask<Checkpoint> StoreCheckpoint(
Checkpoint checkpoint,
CancellationToken cancellationToken
);
}
The Checkpoint
record is a simple record, which aims to represent a stream position in any kind of event store:
record Checkpoint(string Id, ulong? Position);
Available stores
If a supported projection type in an Eventuous package for projections requires a checkpoint store, you can find its implementation in that package. For example, the Eventuous.MongoDB
package has a checkpoint store implementation for MongoDB.
If you register subscriptions in the DI container, you also need to register the checkpoint store:
builder.Services.AddSingleton<IMongoDatabase>(Mongo.ConfigureMongo());
builder.Services.AddCheckpointStore<MongoCheckpointStore>();
In case you have multiple subscriptions in one service, and you project to different databases (for example, MongoDB and PostgreSQL), you need to specify the checkpoint store for each subscription. In this case, you don't need to register the checkpoint store globally in the DI container, but use the UseCheckpointStore
method when building your subscription:
builder.Services.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
"BookingsProjections",
builder => builder
.Configure(cfg => cfg.ConcurrencyLimit = 2)
.UseCheckpointStore<MongoCheckpointStore>()
.AddEventHandler<BookingStateProjection>()
.AddEventHandler<MyBookingsProjection>()
.WithPartitioningByStream(2)
);
MongoDB
The MongoDB checkpoint store will create a collection called checkpoint
where it will keep one document per subscription.
Each checkpoint document contains the checkpoint id, which is the subscription id. Therefore, you only get one checkpoint
collection per database.
Elasticsearch
The Elasticsearch checkpoint store will create and use the checkpoint
index, and the document id there would be the subscription id.
PostgreSQL
The Postgres checkpoint store will create and use the checkpoint
table, and the row id there would be the subscription id. Here is the script used to create that table:
create table if not exists __schema__.checkpoints (
id varchar primary key,
position bigint null
);
Other stores
In addition to that, Eventuous has two implementations in the core subscriptions package:
MeasuredCheckpointStore
: creates a trace for all the IO operations, wraps an existing storeNoOpCheckpointStore
: does nothing, used in Eventuous tests
The measured store is used by default if Eventuous diagnostics aren't disabled, and you use the AddCheckpointStore
container registration extension.
Checkpoint commit handler
In addition to checkpoint store, Eventuous has a more advanced way to work with checkpoints. It doesn't load or store checkpoints by itself, for that purpose it uses the provided checkpoint store. However, the commit handler is able to receive a stream of unordered checkpoints, reorder them, detect possible gaps, and only store the checkpoint that is the latest before the gap.
For subscriptions that support delayed consume (see Partitioning filter) and require a checkpoint store, you must use the commit handler. All such subscription types provided by Eventuous use the checkpoint commit handler.
Unless you create your own subscription with such requirements, you don't need to know the internals of the commit handler. However, you would benefit to know the consequences of delayed event processing with supported subscriptions.
When events get partitioned by the filter, several consumer instances process events in parallel. As a result, each partition will get checkpoints with gaps. When partitioned consumers process events, they run at different speed. Each event inside DelayedConsumeContext
is explicitly acknowledged, and when it happens, the checkpoint gets to the commit handler queue. The commit handler then is able to accumulate checkpoints, detect gaps in the sequence, and only store the latest checkpoint in a gap-less sequence.
On the illustration above, the commit queue has a gap, and event 95 is still in-flight. As soon as the event 95 is processed, its position will get to the queue, the commit handler will detect a gap-less sequence, and commit the checkpoint 97.
As we talk about gaps, you might face a situation when the commit handler has a list of uncommitted checkpoints with gaps, and the application stops. When this happens, some events were already processed, whilst checkpoints for those events remain in-flight. When the application restarts, it loads the checkpoint that points to some position in the stream that is earlier than positions of already processed events. Because of that, some events will be processed by event handlers again. Therefore, you need to make sure that your event handlers are idempotent, so when the same events are processed again, the result of the processing won't create any undesired side effects.