1 - Prologue

Introduction to Eventuous, how Eventuous embraces the original idea of Event Sourcing as formulated by Greg Young.

1.1 - Introduction

What is Eventuous and why you want to use it for implementing an event-sourced system with .NET or .NET Core?

What is Eventuous?

Eventuous is a (relatively) lightweight library, which allows building production-grade applications using the Event Sourcing pattern.

The base library has a set of abstractions, following Domain-Driven Design tactical patterns, like Aggregate.

Additional components include:

  • Aggregate persistence using EventStoreDB
  • Real-time subscriptions for EventStoreDB, RabbitMQ, and Google PubSub
  • Extensive observability, including Open Telemetry support
  • Integration with ASP.NET Core dependency injection, logging, and Web API
  • Producers for EventStoreDB, RabbitMQ, Google PubSub, and Apache Kafka
  • Read model projections for MongoDB
  • Gateway for producing events to other services (Event-Driven Architecture support)

Packages

You can find all the NuGet packages by visiting the Eventuous profile.

Package What’s it for
Eventuous The core library
Eventuous.Subscriptions Subscriptions base library, including diagnostics and DI support
Eventuous.Subscriptions.Polly Support for retries in event handlers using Polly
Eventuous.Producers Producers base library, including diagnostics and DI support
Eventuous.Diagnostics Diagnostics base library
Eventuous.Diagnostics.OpenTelemetry Diagnostics integration with OpenTelemetry
Eventuous.Diagnostics.Logging Eventuous internal logs adapter for ASP.NET Core logging
Eventuous.Gateway Eventuous gateway for connecting subscriptions with producers
Eventuous.EventStore Support for EventStoreDB (event store, subscriptions, producers)
Eventuous.RabbitMq Support for RabbitMQ (subscriptions, producers)
Eventuous.GooglePubSub Support for Google PubSub (subscriptions, producers)
Eventuous.Kafka Support for Apache Kafka (producers)
Eventuous.ElasticSearch Support for Elasticsearch (producers, event store for archive purposes)
Eventuous.Projections.MongoDB Projections support for MongoDB
Eventuous.AspNetCore DI extensions for app services, aggregate factory, etc.
Eventuous.AspNetCore.Web HTTP API automation for app services

Go further - WIP

Read about the right way to understand how Eventuous embraces the original idea of Event Sourcing.

You can have a look at the sample project in a separate repository.

1.2 - The Right Way

Event Sourcing done right, as it meant to be. Don’t get caught up in the misconception that Event Sourcing is the same thing as Event-Driven Architecture.

If you ever searched for a diagram that can give you an idea of what Event Sourcing is about, you can find many images out there. The only issue is that most of them are confusing or just plain wrong.

It’s a bold statement, you’d say, isn’t it? It might be, yes. Yet, a lot of people who claim that Event Sourcing is hard, have either never tried it or made mistakes when doing the implementation.

Eventuous does not claim to be the Almighty Source of Truth. Quite the opposite, we argue that the library’s code is opinionated and leans towards what was done before with this library in production with success.

Still, as people demand answers about how to do it right, we have one for you here.

The "Right Way" (according to us)

Quite a few diagrams from articles that claim to explain to you about what Event Sourcing is and how to implement it suffer from the same issues:

  • Using some kind of bus to propagate domain events to read models
  • Bring unnecessary components and, therefore, complexity to the picture
  • Mixing up Event-Driven Architecture (EDA) with Event Sourcing
  • Not using domain events as the domain model state

Let’s briefly go through those issues.

Event bus

Message brokers are great, really. It’s way better to integrate (micro)services using asynchronous messaging rather than RPC calls. RPC calls introduce spatial and temporal coupling, something you’d want to avoid when doing services.

Still, the integration concern is orthogonal to Event Sourcing; as much as domain events enable message-based integration, it’s not a requirement.

The Bad Bus

When propagating events to reporting models (read models, query side, whatever you call it) using a broker, you will encounter the following issues:

  • Out of order events. When projecting events to reporting models, you really want the events to be processed in the same order as they were produced. Not all message brokers give you such a guarantee, as they were designed for a different purpose.
  • Two-phase commit. We already mentioned this issue in this documentation. Once again, a database used to store the events and the message broker are two distinct infrastructure components. You can rarely to make a single operation of persisting the event and publishing it to the broker transactional. As a result, one of these operations could fail, effectively making a part of your system inconsistent (having an invalid state). Claims that you can apply technical patterns like Outbox to mitigate the issue are valid. However, the Outbox pattern implementation is very complex and often exceeds the complexity of the essential part of the system itself.
  • Replay. This one is probably the most important. Message brokers are often used for the purpose of integration. It means that you find event consumers out there (which you cannot control) reacting to published events. Those consumers produce side effects. Unlike reporting models, integration side effects are not idempotent, as they don’t expect an event to happen multiple times. Effectively, when you want to replay all the events from your event store to rebuilt a single reporting model, you will affect all the other consumers. Not only those will be your own reporting models, which you probably don’t want to rebuild. You’ll also send historical events to consumers outside your area of control, which will start reacting to those events and, more often than not, produce undesired side effects.

Publishing events to a message broker from a subscription is an entirely legit scenario. Do not build reporting models by consuming from a broker unless it’s an event log-based broker like Apache Kafka, Apache Pulsar or Azure Event Hub. Prefer projecting events by subscribing directly to your event store.

Using reporting database as system state

The very nature of Event Sourcing as a persistence mechanism makes it fully consistent. You append an event to the aggregate stream, using the aggregate version as the expected version for the optimistic concurrency as a transaction. You restore the aggregate state by reading those events. It is atomic, consistent, isolated and durable - ACID, read your own writes and all that.

Still, you find lots of articles describing that just storing events to some kind of ordered log is Event Sourcing, whilst they don’t use those events as the source of truth. Instead, they project events to another store asynchronously, outside the transaction scope, and claim it to be Event Sourcing. Make no mistake, it is not. Unlike true Event Sourcing, this method does not guarantee consistency. Using some database on a side, which is fed by events out-of-proc and outside the transaction boundary, inevitably produces a delay between the moment an event is stored, and the moment the state change happens in another database. That other database effectively is a reporting model, but it cannot be considered as consistent storage for the system state. In a genuinely event-sourced system, you can always get the state of any object, which is fully consistent, from the event store. From a projected store, you get a particular state projection, which is not necessary up to date. It is fine for reporting models, but it is not fine for decision-making.

Event-Driven vs Event Sourcing

What we described in two previous paragraphs are two parts of Event-Driven Architecture (EDA). Although Event Sourcing enables you to implement EDA, those two concepts are orthogonal. You can implement an event-sourced system, which doesn’t distribute events to the outside. You can also implement an event-driven system without using Event Sourcing.

1.3 - Quick start

Sample application

You can find a sample application in the .NET sample repository. It’s being updated with the latest features and improvements.

2 - Domain

Building blocks for your domain model.

2.1 - Aggregate

Aggregate: consistency boundaries

Concept

Aggregate is probably the most important tactical pattern in Domain-Driven Design. It is a building block of the domain model. An Aggregate is a model on its own, a model of a particular business objects, which can be uniquely identified and by that distinguished from any other object of the same kind.

When handling a command, you need to ensure it only changes the state of a single aggregate. An aggregate boundary is a transaction boundary, so the state transition for the aggregate needs to happen entirely or not at all.

Traditionally, DDD defines three concepts, which are related to aggregate:

  • Entity - a representation of a business object, which has an identifier
  • Aggregate Root - an entity, which might aggregate other entities and value objects
  • Aggregate - the Aggregate Root and all the things inside it

The idea of an aggregate, which holds more than one entity, seems to be derived from the technical concerns of persisting the state. You can imagine an aggregate root type called Booking (for a hotel room), which holds a collection of ExtraService entities. Each of those entities represent a single extra service ordered by the guest when they made this booking. It could be a room service late at night, a baby cot, anything else that the guest needs to order in advance. Since those extra services might be also cancelled, we need to have a way to uniquely identify each of them inside the Booking aggregate, so those are entities.

If we decide to persist the Booking state in a relational database, the natural choice would be to have one table for Booking and one table for ExtraService with one-to-many relationship. Still, when loading the Booking state, we load the whole aggregate, so we have to read from the Booking table with inner join on the ExtraService table.

Those entities might also have behaviour, but to reach out to an entity within an aggregate, you go through the aggregate root (Booking). For example, to cancel the baby cot service, we’d have code like this:

var booking = bookingRepository.Load(bookingId);
booking.CancelExtraService(extraServiceId);
bookingRepository.Save(booking);

In the Booking code it would expand to:

void CancelExtraService(ExtraServiceId id) {
    extraServices.RemoveAll(x => x.Id == id);
    RecalculateTotal();
}

So, we have an entity here, but it doesn’t really expose any behaviour. Even if it does, you first call the aggregate root logic, which finds the entity, and then routes the call to the entity.

In Eventuous, we consider it as a burden. If you need to find the entity in the aggregate root logic, why can’t you also execute the operation logic right away? If you want to keep the entity logic separated, you can always create a module with a pure function, which takes the entity state and returns an event to the aggregate root.

The relational database persistence concern doesn’t exist in Event Sourcing world. Therefore, we decided not to implement concepts like Entity and AggregateRoot. Instead, we provide a single abstraction for the logical and physical transaction boundary, which is the Aggregate.

Implementation

Eventuous provides three abstract classes for the Aggregate pattern, which are all event-sourced. The reason to have three and not one is that all of them allow you to implement the pattern differently. You can choose the one you prefer.

Aggregate

The Aggregate abstract class is quite technical and provides very little out of the box.

Member Kind What it’s for
Changes Read-only collection Events, which represent new state changes, get added here
ClearChanges Method Clears the changes collection
Version Property, int Current aggregate version, used for optimistic concurrency. Default is -1
AddChange Method Adds an event to the list of changes

It also has two helpful methods, which aren’t related to Event Sourcing:

  • EnsureExists - throws if Version is -1
  • EnsureDoesntExist - throws if Version is not -1

All other members are methods. You either need to implement them, or use one of the derived classes (see below).

Member What it’s for
Load Given the list of previously stored events, restores the aggregate state. Normally, it’s used for synchronous load, when all the stored events come from event store at once.
Fold Applies a single state transition event to the aggregate state and increases the version. Normally, it’s used for asynchronous loads, when events come from event store one by one.
GetId Returns the aggregate identity as string. As most databases support string identity, it’s the most generic type to support persistence.

When building an application, you’d not need to use the Aggregate abstract class as-is. You still might want to use it to implement some advanced scenarios.

Aggregate with state

Inherited from Aggregate, the Aggregate<T> adds a separate concept of the aggregate state. Traditionally, we consider state as part of the aggregate. However, state is the only part of the aggregate that mutated. We decided to separate state from the behaviour by splitting them into two distinct objects.

The aggregate state in Eventuous is immutable. When applying an event to it, we get a new state.

The stateful aggregate class implements most of the abstract members of the original Aggregate. It exposes an API, which allows you to use the stateful aggregate base class directly.

Member Kind What it’s for
Apply Method Given a domain event, applies it to the state. Replaces the current state with the new version. Adds the event to the list of changes. Returns a tuple with the previous and the current state versions.
State Property Returns the current aggregate state.

As we don’t know how to extract the aggregate identity from this implementation, you still need to implement the GetId function.

The Apply function is virtual, so you can override it to add some contract-based checks (ensure if the state is valid, or the state transition was valid).

Aggregate state

We have an abstraction for the aggregate state. It might seem unnecessary, but it has a single abstract method, which you need to implement for your own state classes. As mentioned previously, we separated the aggregate behaviour from its state. Moving along, we consider event-based state transitions as part of the state handling. Therefore, the state objects needs to expose an API to receive events and produce a new instance of itself (remember that the state is immutable).

To support state immutability, AggregateState is an abstract record, not class. Therefore, it supports immutability out of the box and supports with syntax to make state transitions easier.

A record, which inherits from AggregateState needs to implement a single function called When. It gets an event as an argument and returns the new state instance. There are two ways to define how events mutate the state, described below.

Using pattern matching

Using pattern matching, you can define how events mutate the state with functions that return the new AggregateState instance.

For example:

record BookingState : AggregateState<BookingState, BookingId> {
    decimal Price { get; init; }

    public override BookingState When(object @event)
        => @event switch {
            RoomBooked booked        => this with
                { Id = new BookingId(booked.BookingId), Price = booked.Price },
            BookingImported imported => this with
                { Id = new BookingId(imported.BookingId) },
            _                        => this
        };
}
Using explicit handlers

You can also use explicit event handlers, where you define one function per event, and register them in the constructor. In that case, there’s no need to override the When function.

The syntax is similar to registered command handlers for the application service:

public record BookingState : AggregateState<BookingState, BookingId> {
    public BookingState() {
        On<RoomBooked>(
            (state, booked) => state with { 
                Id = new BookingId(booked.BookingId), 
                Price = booked.Price 
            }
        );

        On<BookingImported>(
            (state, imported) => state with { 
                Id = new BookingId(imported.BookingId) 
            }
        );

        On<BookingPaymentRegistered>(
            (state, paid) => state with {
                PaymentRecords = state.PaymentRecords.Add(
                    new PaymentRecord(paid.PaymentId, paid.AmountPaid)
                ),
                AmountPaid = paid.FullPaidAmount
            }
        );
    }

    decimal Price          { get; init; }
    decimal AmountPaid     { get; init; }

    ImmutableList<PaymentRecord> PaymentRecords { get; init; } =
        ImmutableList<PaymentRecord>.Empty;
}

The default branch of the switch expression returns the current instance as it received an unknown event. You might decide to throw an exception there.

Aggregate with typed identity

The last abstraction is Aggregate<T, TId>, where T is AggregateState and TId is the identity type. You can use it if you want to have a typed identity. We provide a small identity value object abstraction, which allows Eventuous to understand that it’s indeed the aggregate identity.

Aggregate identity

Use the AggregateId abstract record, which needs a string value for its constructor:

record BookingId : AggregateId {
    public BookingId(string id) : base(id) { }
}

The abstract record overrides its ToString to return the string value as-is. It also has an implicit conversion operator, which allows you to use a string value without explicitly instantiating the identity record. However, we still recommend instantiating the identity explicitly to benefit from type safety.

Aggregate state with typed identity

The aggregate with typed identity also uses the aggregate state with typed identity. It’s because the identity value is a part of the aggregate state.

A typed state base class has its identity property built-in, so you don’t need to do anything in addition. The BookingState example above uses the typed state and, therefore, is able to set the identity value when it gets it from the event.

As we know what the aggregate identity is when using aggregates with typed identity, the GetId function is implemented in the base class. Therefore, there are no more abstract methods to implement in derived classes.

Although the number of generic parameters for this version of the Aggregate base class comes to three, it is still the most useful one. It gives you type safety for the aggregate identity, and also nicely separates state from behaviour.

Example:

class Booking : Aggregate<BookingState, BookingId> {
    public void BookRoom(
        BookingId id,
        string roomId,
        StayPeriod period,
        decimal price
    ) {
        EnsureDoesntExist();
        Apply(new RoomBooked(
            id, roomId, period.CheckIn, period.CheckOut, price
        ));
    }

    public void Import(BookingId id, string roomId, StayPeriod period) {
        Apply(new BookingImported(
            id, roomId, period.CheckIn, period.CheckOut
        ));
    }
}

Aggregate factory

Eventuous needs to instantiate your aggregates when it loads them from the store. New instances are also created by the ApplicationService when handling a command that operates on a new aggregate. Normally, aggregate classes don’t have dependencies, so it is possible to instantiate one by calling its default constructor. However, you might need to have a dependency or two, like a domain service. We advise providing such dependencies when calling the aggregate function from the application service, as an argument. But it’s still possible to instruct Eventuous how to construct aggregates that don’t have a default parameterless constructor. That’s the purpose of the AggregateFactory and AggregateFactoryRegistry.

The AggregateFactory is a simple function:

public delegate T AggregateFactory<out T>() where T : Aggregate;

The registry allows you to add custom factory for a particular aggregate type. The registry itself is a singleton, accessible by AggregateFactoryRegistry.Instance. You can register your custom factory by using the CreateAggregateUsing<T> method of the registry:

AggregateFactoryRegistry.CreateAggregateUsing(() => new Booking(availabilityService));

By default, when there’s no custom factory registered in the registry for a particular aggregate type, Eventuous will create new aggregate instances by using reflections. It will only work when the aggregate class has a parameterless constructor (it’s provided by the Aggregate base class).

It’s not a requirement to use the default factory registry singleton. Both ApplicationService and AggregateStore have an optional parameter that allows you to provide the registry as a dependency. When not provided, the default instance will be used. If you use a custom registry, you can add it to the DI container as singleton.

Dependency injection

The aggregate factory can inject registered dependencies to aggregates when constructing them. For this to work, you need to tell Eventuous that the aggregate needs to be constructed using the container. To do so, use the AddAggregate<T> service collection extension:

builder.Services.AddAggregate<Booking>();
builder.Services.AddAggregate<Payment>(
    sp => new Payment(sp.GetRequiredService<PaymentProcessor>, otherService)
);

When that’s done, you also need to tell the host to use the registered factories:

app.UseAggregateFactory();

These extensions are available in the Eventuous.AspNetCore (DI extensions and IApplicationBuilder extensions) and Eventuous.AspNetCore.Web (IHost extensions).

2.2 - Domain events

Domain events: persisted behaviour

Concept

If you ever read the Blue Book, you’d notice that the Domain Event concept is not mentioned there. Still, years after the book was published, events have become popular, and domain events in particular.

Eric Evans, the author of the Blue Book, has added the definition to his Domain-Design Reference. Let us start with Eric’s definition:

Model information about activity in the domain as a series of discrete events. Represent each event as a domain object. […] A domain event is a full-fledged part of the domain model, a representation of something that happened in the domain. Ignore irrelevant domain activity while making explicit the events that the domain experts want to track or be notified of, or which are associated with state changes in the other model objects.

When talking about Event Sourcing, we focus on the last bit: “making explicit the events […], which are associated with state changes.” Event Sourcing takes this definition further, and suggests:

Persist the domain objects state as series of domain events. Each domain event represents an explicit state transition. Applying previously recorded events to a domain objects allows us to recover the current state of the object itself.

We can also cite an article from Medium (a bit controversial one):

In the past, the goto statement was widely used in programming languages, before the advent of procedures/functions. The goto statement simply allowed the program to jump to any part of the code during execution. This made it really hard for the developers to answer the question β€œhow did I get to this point of execution?”. And yes, this has caused a large number of bugs. A very similar problem is happening nowadays. Only this time the difficult question is β€œhow did I get to this state” instead of β€œhow did I get to this point of execution”.

Event Sourcing effectively answers this question by giving you a history of all the state transitions for your domain objects, represented as domain events.

So, what this page is about? It doesn’t look like a conventional documentation page, does it? Nevertheless, let’s see how domain events look like when you build a system with Eventuous.

public static class BookingEvents {
    public record RoomBooked(
        string BookingId,
        string RoomId,
        LocalDate CheckIn,
        LocalDate CheckOut,
        decimal Price
    );

    public record BookingPaid(
        string BookingId,
        decimal AmountPaid,
        bool PaidInFull
    );

    public record BookingCancelled(string BookingId);

    public record BookingImported(
        string BookingId,
        string RoomId,
        LocalDate CheckIn,
        LocalDate CheckOut
    );
}

Oh, that’s it? A record? Yes, a record. Domain events are property bags. Their only purpose is to convey the state transition using the language of your domain. Technically, a domain event should just be an object, which can be serialised and deserialized for the purpose of persistence.

Eventuous do’s and dont’s:

  • Do make sure your domain events can be serialised to a commonly understood format, like JSON.
  • Do make domain events immutable.
  • Do implement equality by value for domain events.
  • Don’t apply things like marker interfaces (or any interfaces) to domain events.
  • Don’t use constructor logic, which can prevent domain events from deserializing.
  • Don’t use value objects in your domain events.

The last point might require some elaboration. The Value Object pattern in DDD doesn’t only require for those objects to be immutable and implement equality by value. The main attribute of a value object is that it must be correct. It means that you can try instantiating a value object with invalid arguments, but it will deny them. This characteristic along forbids value objects from being used in domain events, as events must be unconditionally deserializable. No matter what logic your current domain model has, events from the past are equally valid today. By bringing value objects to domain events you make them prone to failure when their validity rules change, which might prevent them from being deserialized. As a result, your aggregates won’t be able to restore their state from previously persistent events and nothing will work.

3 - Persistence

Event-sourced persistence abstractions and implementations.

As mentioned in the Prologue, Event Sourcing is a way to persist state. Therefore, the way to handle persistence is one of the fundamental differences of event-sourced systems, compared with state-based systems.

Read more about essential concepts of event-sourced persistence.

3.1 - Aggregate stream

Aggregate as a stream of events

Concept

So far, we figured out that an Aggregate is the transaction and consistency boundary within the domain model.

The aggregate store (application-level abstraction) uses an event store (infrastructure) to store events in streams. Each aggregate instance has its own stream, so the event store needs to be capable to read and write events from/to the correct stream.

When appending events to a stream, the append operation for a single stream must be transactional to ensure that the stream is consistent. Eventuous handles commands using the application service, and one command handler is the unit of work. All the events generated by the aggregate instance during the unit of work are appended to the stream as the final step in the command handling process.

Stream name

By default, Eventuous uses the AggregateType.Name combined with the aggregate id as the stream name. For example, the Booking aggregate with id 1 has a stream name Booking-1. That’s what StreamName.For<Booking>(1) returns.

However, you might want to have more fine-grained control over the stream name. For example, you might want to include the tenant id in the stream name. It’s possible to override the default convention by configuring the stream name mapping. The stream map contains a mapping between the aggregate identity type (derived from AggregateId) and the stream name generation function. Therefore, any additional property of the aggregate identity type can be used to generate the stream name.

For example, the following code registers a stream name mapping for the Booking aggregate:

public record BookingId : AggregateId {
    public BookingId(string id, string tenantId) : base(id) {
        TenantId = tenantId;
    }

    public string TenantId { get; }
}

public class BookingService : ApplicationService<Booking, BookingState, BookingId> {
    public BookingService(IAggregateStore store, StreamNameMap streamNameMap)
        : base(store, streamNameMap: streamNameMap) {
        // command handlers registered here
    }
}

// code of the bootstrapper
var streamNameMap = new StreamNameMap();
streamNameMap.Register<Booking, BookingState, BookingId>(
    id => $"Booking-{id.TenantId}-{id.Id}"
);
services.AddSingleton(streamNameMap);
services.AddApplicationService<BookingService>();

3.2 - Event store

Event store infrastructure

In order to isolate the core library from a particular way of storing events, Eventuous uses the IEventStore abstraction. Whilst it’s used by AggregateStore, you can also use it in a more generic way, when you need to persist or read events without having an aggregate.

The IEventStore interface inherits from IEventReader and IEventWriter interfaces. Each of those interfaces is focused on one specific task - reading events from streams, and appending events to streams. This separation is necessary for scenarios when you only need, for example, to read events from a specific store, but not to append them. In such case, you’d want to use the IEventReader interface only.

We have two implementations of event store:

  • EsdbEventStore which uses EventStoreDB - the default event store.
  • ElasticEventStore which uses Elasticsearch - it doesn’t support subscriptions, so the intended use is for archive purposes.
  • In-memory store in the test project

Primitives

Event store works with a couple of primitives, which allow wrapping infrastructure-specific structures. Those primitives are:

Record type What it’s for
StreamReadPosition Represent the stream revision, from there the event store will read the stream forwards or backwards.
ExpectedStreamVersion The stream version (revision), which we expect to have in the database, when event store tries to append new events. Used for optimistic concurrency.
StreamEvent A structure, which holds the event type as a string as well as serialised event payload and metadata.

All of those are immutable records.

Operations

Right now, we only have four operations for an event store:

Function What’s it for
AppendEvents Append one or more events to a given stream.
ReadEvents Read events from a stream forwards, from a given start position.

For the parameters, you can look at the interface source code. If you use the EventStoreDB implementation we provide, you won’t need to know about the event store abstraction. It is required though if you want to implement it for your preferred database.

3.3 - Aggregate store

How aggregates are stored in an event store

Eventuous provides a single abstraction for the domain objects persistence, which is the AggregateStore.

The AggregateStore uses the IEventStore abstraction to be persistence-agnostic, so it can be used as-is, when you give it a proper implementation of event store.

We have only two operations in the AggregateStore:

  • Load - retrieves events from an aggregate stream and restores the aggregate state using those events.
  • Store - collects new events from an aggregate and stores those events to the aggregate stream.

The AggregateStore constructor needs two arguments:

Our ApplicationService uses the AggregateStore in its command-handling flow.

Using EventStoreDB

Eventuous supports EventStoreDB out of the box, but only v20+ with gRPC protocol.

Using this pre-made event persistence is easy. You can register the necessary dependencies in your startup code:

var esdbSettings = EventStoreClientSettings.Create(connectionString);
builder.Services
    .AddSingleton(new EventStoreClient(esdbSettings))
    .AddAggregateStore<EsDbEventStore>();

The AddAggregateStore extension is available in the Eventuous.AspNetCore NuGet package.

Bring your own event store

You can build your own event store, if you want to use a different database. If you have another implementation of IEventStore interface, you can register the aggregate store like this:

builder.Services
    .AddSingleton<IEventStore>(new MyEventStore(...))
    .AddAggregateStore<MyEventStore>();

Multi-tier store

When you have an event-sourced system in production for a while, you might collect a lot of historical events. Some event streams might be as old, as they won’t be frequently accessed, but when using a single event store, you’d have to store all those events in a hot store, as it’s the only one you have. It might create an issue of misusing the hot store, as those historical events are expensive to keep.

Eventuous has a feature to store historical events in a separate store, called the Multi-tier store. It works best when combined with a connector, which copies all the events from the hot store to the archive store real-time. Here, you find an example of using Elasticsearch as the archive store.

Archive in real-time

The connector is running continuously, and it copies all the events from the hot store to the archive store. It subscribes to the hot store using a catch-up subscription to the global log, and copies all the events from there to the archive. In this example, we use the Elastic Connector to connect the hot store to the archive store.

Replication process

As the connector replicate all the events, there’s no need to perform an explicit archive action when storing the events normally using the regular event store.

Delete events from the hot store

Then, to ensure that old events get removed from the hot store, you need to do so on the infrastructure level. For example, you can use the EventStoreDB feature to define the stream TTL (time to live), after which the events are removed from the hot store during the scavenge process.

Right now, there’s no built-in support for setting the stream metadata using Eventuous, but it is being built.

When all the bits and pieces are in place, old events will be disappearing from the hot store automatically. Alternatively, you can delete old streams using some scheduled job with a pre-defined retention rules. The archive store will keep all the events forever, unless you delete them manually.

Aggregate store with archive

Now, we need to tell Eventuous to use the archive store. It’s done using the AggregateStore<T> class, where T is the archive event reader. To register an aggregate store that uses EventStoreDB as the hot store and Elastic for archive, use the following code:

Elastic is a particularly good candidate for the archive store because it has multi-tier architecture as a native feature, and you can easily set up a cluster with warm, cold, and frozen tiers, so that you pay less to keep the historical events.

// register EventStoreClient
// register IElasticClient
builder.Services.AddAggregateStore<EsdbEventStore, ElasticEventStore>();

The Elastic event reader needs to have a pre-configured index. You’d need to call the following from the bootstrap code:

var indexConfig = new IndexConfig { ... };
var app = builder.Build();
await app.Services.GetRequiredService<IElasticClient>()
    .CreateIndexIfNecessary<PersistedEvent>(indexConfig);

From there, the process of loading an aggregate is seamless from the outside, but it will work differently.

Reading from two stores

Here’s the reading process described step-by-step:

  1. First, read the stream from the hot store
  2. If the stream doesn’t exist, read all the stream events from the archive store
  3. If the stream exists, check the first retrieved event number
  4. If the number is greater than zero, it means that part of the stream was truncated, so read the rest of the stream from the archive store
  5. Combine events from both stores in one collection
  6. Select only distinct events using the event version
  7. Rehydrate the aggregate using the resulting events collection

As the result, the load operation would be as fast as it would be with the hot store, because when the hot store returns a full stream, the aggregate store won’t fall back to the archive store. However, when the aggregate store discovers that the hot store contains an incomplete stream, it will attempt to load the historical events from the archive store. As this process is seamless for the user of the aggregate store, there’s no difference on the aggregate store signature.

In many cases, replaying the full history is the opposite of what you want. If you add a new subscription to host a new read model, that new read model might only need to process the latest events, and processing the full history would just trigger useless database operations.

For example, if you add an Upcoming check-ins read model, it would need to process events from reservations during the booking period that hasn’t completed yet. It’s not often that you have reservations that are made five years back, in many cases it’s just impossible as hotels set their prices only a year or so upfront. Therefore, projecting the full history of reservations would trigger adding and removing thousands of records to the database, which is not what you want. In fact, if you have only reservation events for the past year, and the rest in the archive, you will have the new read model rebuilt much faster, and the fact that all the historical events are archived won’t be noticeable.

3.4 - Serialisation

How events are serialized and deserialized

As described on the Domain events page, events must be (de)serializable. Eventuous doesn’t care about the serialisation format, but requires you to provide a serializer instance, which implements the IEventSerializer interface.

The serializer interface is simple:

public interface IEventSerializer {
    DeserializationResult DeserializeEvent(ReadOnlySpan<byte> data, string eventType, string contentType);

    SerializationResult SerializeEvent(object evt);
}

The serialization result contains not only the serialized object as bytes, but also the event type as string (see below), and the content type:

public record SerializationResult(string EventType, string ContentType, byte[] Payload);

Type map

For deserialization, the serializer will get the binary payload and the event type as string. Event store is unaware of your event types, it just stores the payload in a binary format to the database, along with the event type as string. It is up to you how your strong event types map to the event type string.

Therefore, we need to have a way to map strong types of the events to strings, which are used to identify those types in the database and for serialisation. For that purpose, Eventuous uses the TypeMap. It is a singleton, which is available globally. When you add new events to your domain model, remember to also add a mapping for those events. The mapping is static, so you can implement it anywhere in the application. The only requirement is that the mapping code must execute when the application starts.

For example, if you have a place where domain events are defined, you can put the mapping code there, as a static member:

static class BookingEvents {
    // events are defined here

    public static void MapBookingEvents() {
        TypeMap.AddType<RoomBooked>("RoomBooked");
        TypeMap.AddType<BookingPaid>("BookingPaid");
        TypeMap.AddType<BookingCancelled>("BookingCancelled");
        TypeMap.AddType<BookingImported>("BookingImported");
    }
}

Then, you can call this code in your Startup:

BookingEvents.MapBookingEvents();

Auto-registration of types

For convenience purposes, you can avoid manual mapping between type names and types by using the EventType attribute.

Annotate your events with it like this:

[EventType("V1.FullyPaid")]
public record BookingFullyPaid(string BookingId, DateTimeOffset FullyPaidAt);

Then, use the registration code in the bootstrap code:

TypeMap.RegisterKnownEventTypes();

The registration won’t work if event classes are defined in another assembly, which hasn’t been loaded yet. You can work around this limitation by specifying one or more assemblies explicitly:

TypeMap.RegisterKnownEventTypes(typeof(BookingFullyPaid).Assembly);

If you use the .NET version that supports module initializers, you can register event types in the module. For example, if the domain event classes are located in a separate project, add the file DomainModule.cs to that project with the following code:

using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using Eventuous;

namespace Bookings.Domain; 

static class DomainModule {
    [ModuleInitializer]
    [SuppressMessage("Usage", "CA2255", MessageId = "The \'ModuleInitializer\' attribute should not be used in libraries")]
    internal static void InitializeDomainModule() => TypeMap.RegisterKnownEventTypes();
}

Then, you won’t need to call the TypeMap registration in the application code at all.

Default serializer

Eventuous provides a default serializer implementation, which uses System.Text.Json. You just need to register it in the Startup to make it available for the infrastructure components, like aggregate store and subscriptions.

Normally, you don’t need to register or provide the serializer instance to any of the Eventuous classes that perform serialization and deserialization work. It’s because they will use the default serializer instance instead.

However, you can register the default serializer with different options, or a custom serializer instead:

services.AddSingleton<IEventSerializer>(
    new DefaultEventSerializer(
        new JsonSerializerOptions(JsonSerializerDefaults.Default)
    )
);

You might want to avoid registering the serializer and override the one that Eventuous uses as the default instance:

var defaultSerializer = new DefaultEventSerializer(
    new JsonSerializerOptions(JsonSerializerDefaults.Default)
);
DefaultEventSerializer.SetDefaultSerializer(serializer);

Metadata serializer

In many cases you might want to store event metadata in addition to the event payload. Normally, you’d use the same way to serialize both the event payload and its metadata, but it’s not always the case. For example, you might store your events in Protobuf, but keep metadata as JSON.

Eventuous only uses the metadata serializer when the event store implementation, or a producer can store metadata as a byte array. For example, EventStoreDB supports that, but Google PubSub doesn’t. Therefore, the event store and producer that use EventStoreDB will use the metadata serializer, but the Google PubSub producer will add metadata to events as headers, and won’t use the metadata serializer.

For the metadata serializer the same principles apply as for the event serializer. Eventuous has a separate interface IMetadataSerializer, which has a default instance created on startup by implicitly. You can register a custom metadata serializer as a singleton or override the default one by calling DefaultMetadataSerializer.SetDefaultSerializer function.

4 - Application

Application layer and service

The application layer sits between the system edge (different APIs), and the domain model. It is responsible for handling commands coming from the edge.

Concept

In general, the command handling flow can be described like this:

  1. The edge receives a command via its API (HTTP, gRPC, SignalR, messaging, etc).
  2. It passes the command over to the application service. As the edge is responsible for authentication and some authorisation, it can enrich commands with user credentials.
  3. The command service, which is agnostic to the API itself, handles the command and gives a response to the edge (positive or negative).
  4. The API layer then returns the response to the calling party.

Eventuous gives you a base class to implement command services in the application layer: the ApplicationService.

4.1 - Application service

Application service and unit of work

Concept

The command service itself performs the following operations when handling one command:

  1. Extract the aggregate id from the command, if necessary.
  2. Instantiate all the necessary value objects. This could effectively reject the command if value objects cannot be constructed. The command service could also load some other aggregates, or any other information, which is needed to execute the command but won’t change state.
  3. If the command expects to operate on an existing aggregate instance, this instance gets loaded from the Aggregate Store.
  4. Execute an operation on the loaded (or new) aggregate, using values from the command, and the constructed value objects.
  5. The aggregate either performs the operation and changes its state by producing new events, or rejects the operation.
  6. If the operation was successful, the service persists new events to the store. Otherwise, it returns a failure to the edge.

Application service base class

Eventuous provides a base class for you to build command services. It is a generic abstract class, which is typed to the aggregate type. You should create your own implementation of a command service for each aggregate type. As command execution is transactional, it can only operate on a single aggregate instance, and, logically, only one aggregate type.

Registering command handlers

We have three methods, which you call in your class constructor to register the command handlers:

Function What’s it for
OnNew Registers the handler, which expects no instance aggregate to exist (create, register, initialise, etc).
It will get a new aggregate instance. The operation will fail when it will try storing the aggregate state due to version mismatch.
OnExisting Registers the handler, which expect an aggregate instance to exist.
You need to provide a function to extract the aggregate id from the command.
The handler will get the aggregate instance loaded from the store, and will throw if there’s no aggregate to load.
OnAny Used for handlers, which can operate both on new and existing aggregate instances.
The command service will try to load the aggregate, but won’t throw if the load fails, and will pass a new instance instead.

Here is an example of a command service form our test project:

public class BookingService
  : ApplicationService<Booking, BookingState, BookingId> {
    public BookingService(IAggregateStore store) : base(store) {
        OnNew<Commands.BookRoom>(
            (booking, cmd)
                => booking.BookRoom(
                    new BookingId(cmd.BookingId),
                    cmd.RoomId,
                    new StayPeriod(cmd.CheckIn, cmd.CheckOut),
                    cmd.Price,
                    cmd.BookedBy,
                    cmd.BookedAt
                )
        );

        OnAny<Commands.ImportBooking>(
            cmd => new BookingId(cmd.BookingId),
            (booking, cmd)
                => booking.Import(
                    new BookingId(cmd.BookingId),
                    cmd.RoomId,
                    new StayPeriod(cmd.CheckIn, cmd.CheckOut)
                )
        );
    }
}

You pass the command handler as a function to one of those methods. The function can be inline, like in the example, or it could be a method in the command service class.

In addition, OnAny and OnExisting need a function, which extracts the aggregate id from the command, as both of those methods will try loading the aggregate instance from the store.

Async command handlers

If you need to get outside your process boundary when handling a command, you most probably would need to execute an asynchronous call to something like an external HTTP API or a database. For those cases you need to use async overloads:

  • OnNewAsync
  • OnExistingAsync
  • OnAnyAsync

These overloads are identical to sync functions, but the command handler function needs to return Task, so it can be awaited.

Result

The command service will return an instance of Result.

It could be an OkResult, which contains the new aggregate state and the list of new events. You use the data in the result to pass it over to the caller, if needed.

If the operation was not successful, the command service will return an instance of ErrorResult that contains the error message and the exception details.

Bootstrap

If you registered the EsdbEventStore and the AggregateStore in your Startup as described on the Aggregate store page, you can also register the application service:

services.AddApplicationService<BookingCommandService, Booking>();

The AddApplicationService extension will register the BookingService, and also as IApplicationService<Booking>, as a singleton. Remember that all the DI extensions are part of the Eventuous.AspNetCore NuGet package.

When you also use AddControllers, you get the command service injected to your controllers.

You can simplify your application and avoid creating HTTP endpoints explicitly (as controllers or minimal API endpoints) if you use the command API feature.

Application HTTP API

The most common use case is to connect the application service to an HTTP API.

Read the Command API feature documentation for more details.

4.2 - Command API

Auto-generated HTTP API for command handling

Controller base

When using an application service from an HTTP controller, you’d usually inject the service as a dependency, and call it’s Handle method using the request body:

[Route("/booking")]
public class CommandApi : ConteollerBase {
    IApiService<Booking> _service;

    public CommandApi(IApplicationService<Booking> service) => _service = service;

    [HttpPost]
    [Route("book")]
    public async Task<ActionResult<Result>> BookRoom(
        [FromBody] BookRoom cmd, 
        CancellationToken cancellationToken
    ) {
        var result = await _service.Handle(cmd, cancellationToken);
        result Ok(result);
    }
}

The issue here is there’s no way to know if the command was successful or not. As the application service won’t throw an exception if the command fails, we can’t return an error via the HTTP response, unless we parse the result and return a meaningful HTTP response.

Eventuous allows you to simplify the command handling in the API controller by providing a CommandHttpApiBase<TAggregate> abstract class, which implements the ControllerBase and contains the Handle method. The class takes IApplicationService<TAggregate> as a dependency. The Handle method will call the application service, and also convert the handling result to ActionResult<Result>. Here are the rules for exception handling:

Result exception HTTP response
OptimisticConcurrencyException Conflict
AggregateNotFoundException NotFound
Any other exception BadRequest

Here is an example of a command API controller:

[Route("/booking")]
public class CommandApi : CommandHttpApiBase<Booking> {
    public CommandApi(IApplicationService<Booking> service) : base(service) { }

    [HttpPost]
    [Route("book")]
    public Task<ActionResult<Result>> BookRoom(
        [FromBody] BookRoom cmd, 
        CancellationToken cancellationToken
    ) => Handle(cmd, cancellationToken);
}

We recommend using the CommandHttpApiBase class when you want to handle commands using the HTTP API.

Generated command API

Eventuous can use your application service to generate a command API. Such an API will accept JSON models matching the application service command contracts, and pass those commands as-is to the application service. This feature removes the need to create API endpoints manually using controllers or .NET minimal API.

To use generated APIs, you need to add Eventuous.AspNetCore.Web package.

All the auto-generated API endpoints will use the POST HTTP method.

Annotating commands

For Eventuous to understand what commands need to be exposed as API endpoints and on what routes, those commands need to be annotated by the HttpCommand attribute:

[HttpCommand(Route = "payment", Aggregate = typeof(Booking))]
public record ProcessPayment(string BookingId, float PaidAmount);

You can skip the Route property, in that case Eventuous will use the command class name. For the example above the generated route would be processPayment. We recommend specifying the route explicitly as you might refactor the command class and give it a different name, and it will break your API if the route is auto-generated.

If your application has a single application service working with a single aggregate type, you don’t need to specify the aggregate type, and then use a different command registration method (described below).

Another way to specify the aggregate type for a group of commands is to annotate the parent class (command container):

[AggregateCommands(typeof(Booking))]
public static class BookingCommands {
    [HttpCommand(Route = "payment")]
    public record ProcessPayment(string BookingId, float PaidAmount);
}

In such case, Eventuous will treat all the commands defined inside the BookingCommands static class as commands operating on the Booking aggregate.

Also, you don’t need to specify the aggregate type in the command annotation if you use the MapAggregateCommands registration (see below).

Finally, you don’t need to annotate the command at all if you use the explicit command registration with the route parameter.

Registering commands

There are several extensions for IEndpointRouteBuilder that allow you to register HTTP endpoints for one or more commands.

Single command

The simplest way to register a single command is to make it explicitly in the bootstrap code:

var builder = WebApplication.CreateBuilder();

// Register the app service
builder.Services.AddApplicationService<BookingService, Booking>();

var app = builder.Build();

// Map the command to an API endpoint
app.MapCommand<ProcessPayment, Booking>("payment");

app.Run();

record ProcessPayment(string BookingId, float PaidAmount);

If you annotate the command with the HttpCommand attribute, and specify the route, you can avoid providing the route when registering the command:

app.MapCommand<BookingCommand, Booking>();
...

[HttpCommand(Route = "payment")]
public record ProcessPayment(string BookingId, float PaidAmount);

Multiple commands for an aggregate

You can also register multiple commands for the same aggregate type, without a need to provide the aggregate type in the command annotation. To do that, use the extension that will create an ApplicationServiceRouteBuilder, then register commands using that builder:

app
    .MapAggregateCommands<Booking>()
    .MapCommand<ProcessPayment>()
    .MapCommand<ApplyDiscount>("discount");
    
...

// route specified in the annotation
[HttpCommand(Route = "payment")] 
public record ProcessPayment(string BookingId, float PaidAmount);

// No annotation needed
public record ApplyDiscount(string BookingId, float Discount);

Discover commands

There are two extensions that are able to scan your application for annotated commands, and register them automatically.

First, the MapDiscoveredCommand<TAggregate>, which assumes your application only serves commands for a single aggregate type:

app.MapDiscoveredCommands<Booking>();

...
[HttpCommand(Route = "payment")] 
record ProcessPayment(string BookingId, float PaidAmount);

For it to work, all the commands must be annotated and have the route defined in the annotation.

The second extension will discover all the annotated commands, which need to have an association with the aggregate type by using the Aggregate argument of the attribute, or by using the AggregateCommands attribute on the container class (described above):

app.MapDiscoveredCommands();

...

[HttpCommand(Route = "bookings/payment", Aggregate = typeof(Booking))] 
record ProcessPayment(string BookingId, float PaidAmount);

[AggregateCommands(typeof(Payment))]
class V1.PaymentCommands {
    [HttpCommand(Route = "payments/register")]
    public record RegisterPayment(string PaymentId, string Provider, float Amount);
    
    [HttpCommand(Route = "payments/refund")]
    public record RefundPayment(string PaymentId);
}

Both extensions will scan the current assembly by default, but you can also provide a list of assemblies to scan as an argument:

app.MapDiscoveredCommands(typeof(V1.PaymentCommands).Assembly);

Using HttpContext data

Commands processed by the application service might include properties that aren’t provided by the API client, but are available in the HttpContext object. For example, you can think about the user that is making the request. The details about the user, and the user claims, are available in HttpContext.

You can instruct Eventuous to enrich the command before it gets sent to the application service, using the HttpContext data. In that case, you also might want to hide the command property from being exposed to the client in the OpenAPI spec.

To hide a property from being exposed to the client, use the JsonIgnore attribute:

[HttpCommand(Route = "book")]
public record BookRoom(string RoomId, string BookingId, [property: JsonIgnore] string UserId);

Then, you can use the HttpContext data in your command:

app
    .MapAggregateCommands<Booking>()
    .MapCommand<BookRoom>((cmd, ctx) => cmd with { UserId = ctx.User.Identity.Name });

When the command is mapped to the API endpoint like that, and the property is ignored, the OpenAPI specification won’t include the ignored property, and the application service will get the command populated with the user id from HttpContext.

5 - Subscriptions

Real-time event processing using subscriptions

5.1 - Real-time subscription

The concept of real-time subscriptions

Real-time subscriptions are essential when building a successful event-sourced system. They support the following sequence of operations:

  • Domain model emits new event
  • The event is stored to the event store
  • The command is now handled, and the transaction is complete
  • All the read models get the new event from the store and project it if necessary

Most of the time, subscriptions are used for two purposes:

  1. Deliver events to reporting models
  2. Emit integration events

Subscriptions can subscribe from any position of a stream, but normally you’d subscribe from the beginning of the stream, which allows you to process all the historical events. For integration purposes, however, you would usually subscribe from now, as emitting historical events to other systems might produce undesired side effects.

One subscription can serve multiple event handlers. Normally, you would identify a group of event handlers, which need to be triggered together, and group them within one subscription. This way you avoid situations when, for example, two real models get updated at different speed and show confusing information if those read models are shown on the same screen.

Subscriptions need to maintain their own checkpoints, so when the service that host a subscription restarts, it will start receiving events from the last known position in the stream.

Most often, you’d want to subscribe to the global event stream, so you can build read models, which compose information from different aggregates. Eventuous offers the All stream subscription for this use case. In some cases you’d need to subscribe to a regular stream using the stream subscription.

In Eventuous, subscriptions are specific to event store implementation. In addition, Eventuous has subscriptions to message brokers like Google PubSub and RabbitMQ for integration purposes.

The wrong way

One of the common mistakes people make when building an event-sourced application is using an event store which is not capable of handling realtime subscriptions. It forces developers to engage some sort of message bus to deliver new events to subscribers. There are quite a few issues with that approach, but the most obvious one is a two-phase commit.

When using two distinct pieces of infrastructure in one transaction, you risk one of those operations to fail. Let’s use the following example code, which is very common:

await _repository.Save(newEvents);
await _bus.Publish(newEvents);

If the second operation fails, the command side of the application would remain consistent. However, any read models, which projects those events, will not be updated. So, essentially, the reporting model will become inconsistent against the transactional model. The worst part is that the reporting model will never recover from the failure.

As mentioned, there are multiple issues of using a message bus as transport to deliver events to reporting models, but we won’t be covering them on this page.

The easiest way to solve the issue is to use a database which supports realtime subscriptions to event streams out of the box. That’s why we use EventStoreDB as the primary event store implementation.

The right way

It is not hard to avoid the two-phase commits and ensure to publish domain events reliably if you use a proper event store. The aim here would be to achieve the following architecture:

Consistent event flow

Why is this flow “consistent”? It’s because the command handling process always finishes with an append operation towards event store (unless the command fails). There’s no explicit Publish or Produce call that happens after the event is persisted. A proper event store should have a capability for to subscribe for getting all the new events when they appear in the store. As the Append operation of the event store is transactional, such a subscription will get the event only once, if it is capable to acknowledge the event correctly back to the subscription.

Subscriptions have many purposes. Most often, you would use a subscription to project domain events to your query models (read models). You can also use subscriptions to transform and publish integration events (see Gateway).

Subscriptions can also be used for integration purposes in combination with Producers.

Implementation

Eventuous implement subscriptions for different kinds of infrastructure (transport). As each transport is different, the way to configure them can be different, as well as the way subscriptions work. Read carefully the guidelines for each transport to understand what transport to use for your use case.

5.2 - Checkpoints

What’s a checkpoint and why you need to store it

When you subscribe to an event store, you need to decide what events you want to receive. A proper event store would allow you to subscribe to any event stream, or to a global stream (All stream), which contains all the events from the store, ordered by the time they were appended. Event-oriented brokers that support persistence as ordered event logs also support subscriptions, normally called consumers, as it’s the publish-subscribe broker terminology.

The subscription decides at what stream position it wants to start receiving events. If you want to process all the historical events, you’d subscribe from the beginning of the stream. If you want to only receive real-time events, you need to subscribe from now.

What’s the checkpoint?

As the subscription receives and processes events, it moves further along the stream it subscribed to. Every event the subscription receives and processes has a position in the subscribed stream. This position can be used as a checkpoint of the subscription. If the application that hosts the subscription eventually shuts down, you’d want the subscription to resubscribe from the position, which was processed last, plus one. That’s how you achieve exactly one event handling. Therefore, the subscription needs to take care about storing its checkpoint somewhere, so the last known position can be retrieved from the checkpoint store and used to resubscribe.

Some log-based brokers also use the term offset to describe the checkpoint concept.

Subscriptions, which are managed by the server, don’t require storing checkpoints on the client side. For example, EventStoreDB persistent subscriptions and Google PubSub subscriptions don’t require a client-side checkpoint store. Some subscriptions, like RabbitMQ subscription, don’t have this concept at all, as RabbitMQ doesn’t keep consumed messages, neither ACKed nor NACKed.

Checkpoint store

Eventuous provides an abstraction, which allows subscriptions to store checkpoints reliably. You can decide to store it in a file or in a database. You can also decide if you want to store a checkpoint after processing each event, or only flush it now and then. Periodical checkpoint flush decreases the pressure on the infrastructure behind the checkpoint store, but also requires you to make your subscription idempotent. It’s usually hard or impossible for integration since you can rarely check if you published an event to a broker or not. However, it can work for read model projections.

Abstraction

The checkpoint store interface is simple, it only has two functions:

interface ICheckpointStore {
    ValueTask<Checkpoint> GetLastCheckpoint(
        string checkpointId,
        CancellationToken cancellationToken
    );

    ValueTask<Checkpoint> StoreCheckpoint(
        Checkpoint checkpoint,
        CancellationToken cancellationToken
    );
}

The Checkpoint record is a simple record, which aims to represent a stream position in any kind of event store:

record Checkpoint(string Id, ulong? Position);

Available stores

If a supported projection type in an Eventuous package for projections (Eventuous.Projections.*) requires a checkpoint store, you can find its implementation in that package. For example, the Eventuous.Projections.MongoDB package has a checkpoint store implementation for MongoDB.

If you register subscriptions in the DI container, you also need to register the checkpoint store:

builder.Services.AddSingleton<IMongoDatabase>(Mongo.ConfigureMongo());
builder.Services.AddCheckpointStore<MongoCheckpointStore>();

The MongoDB checkpoint store will create a collection called checkpoint where it will keep one document per subscription.

In addition to that, Eventuous has two implementations in the core subscriptions package:

  • MeasuredCheckpointStore: creates a trace for all the IO operations, wraps an existing store
  • NoOpCheckpointStore: does nothing, used in Eventuous tests

The measured store is used by default if Eventuous diagnostics aren’t disabled, and you use the AddCheckpointStore container registration extension.

Checkpoint commit handler

In addition to checkpoint store, Eventuous has a more advanced way to work with checkpoints. It doesn’t load or store checkpoints by itself, for that purpose it uses the provided checkpoint store. However, the commit handler is able to receive a stream of unordered checkpoints, reorder them, detect possible gaps, and only store the checkpoint that is the latest before the gap.

For subscriptions that support delayed consume (see Partitioning filter) and require a checkpoint store, you must use the commit handler. All such subscription types provided by Eventuous use the checkpoint commit handler.

Unless you create your own subscription with such requirements, you don’t need to know the internals of the commit handler. However, you would benefit to know the consequences of delayed event processing with supported subscriptions.

When events get partitioned by the filter, several consumer instances process events in parallel. As a result, each partition will get checkpoints with gaps. When partitioned consumers process events, they run at different speed. Each event inside DelayedConsumeContext is explicitly acknowledged, and when it happens, the checkpoint gets to the commit handler queue. The commit handler then is able to accumulate checkpoints, detect gaps in the sequence, and only store the latest checkpoint in a gap-less sequence.

Gap in the commit handler queue

As we talk about gaps, you might face a situation when the commit handler has a list of uncommitted checkpoints with gaps, and the application stops. When this happens, some events were already processed, whilst checkpoints for those events remain in-flight. When the application restarts, it loads the checkpoint that points to some position in the stream that is earlier than positions of already processed events. Because of that, some events will be processed by event handlers again. Therefore, you need to make sure that your event handlers are idempotent, so when the same events are processed again, the result of the processing won’t create any undesired side effects.

5.3 - Subscription base

How Eventuous subscriptions work

The base abstract class for subscriptions is the IMessageSubscription interface, but all the available subscriptions are based on the EventSubscription base class, which is a generic abstract class where its type parameter is the subscription options type. All the provided subscription options types inherit from the SubscriptionOptions base class.

The SubscriptionOptions base class has three properties:

  • SubscriptionId: a unique identifier for the subscription
  • ThrowOnError: a boolean indicating whether the subscription should throw an exception if an error occurs. When the subscription throws, it either NACKs the message, or stops the subscription depending on the implementation.
  • EventSerializer: an instance of the IEventSerializer interface, which is used to serialize and deserialize events. If not provided, the default serializer is used.

Each provided subscription options type has more options, which depend on the subscription implementation details.

To host a subscription and manage its lifecycle, Eventuous has a hosted service called SubscriptionHostedService. Each registered subscription gets its own hosted service, so that each subscription can be managed independently. When using Eventuous subscription registration extensions for the DI container, the hosted service is registered automatically.

You’d normally use the DI container to register subscriptions with all the necessary handlers (described below).

Event handlers

As mentioned on the Concept page, one subscription might serve multiple event handlers, such as projections. It is especially relevant to keep a group of projections in sync, so they don’t produce inconsistent read models.

Each subscription service gets a list of event handlers. An event handler must implement the IEventHandler interface, which has two members:

public interface IEventHandler {
    string DiagnosticName { get; }
    ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContext context);
}

The HandleEvent function will be called by the subscription service for each event it receives. The event is already deserialized. The function also gets the event position in the stream. It might be used in projections to set some property of the read model. Using this property in queries will tell you if the projection is up-to-date.

The diagnostic name of the handler is used to distinguish logs in traces coming from a subscription per individual handler.

Normally Eventuous uses either the BaseEventHandler abstract base class. For specific implementations, you’d either use a built-in handler provided by a projection (like MongoDB projection), or the EventHandler abstract base class.

Consume context

The subscription will invoke all its event handlers at once for each event received. Instead of just getting the event, the handler will get an instance of the message context (IMessageConsumeContext interface). The context contains the payload (event or other message) in its Message property, which has the type object?. It’s possible to handle each event type differently by using pattern matching:

public ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContext ctx) {
    return ctx.Message switch {
        V1.RoomBooked => ...
        _ => EventHandlingStatus.Ignored
    };
}

However, it’s easier and more explicit to use pre-optimised base handlers. For read model projections you can use [Projections] handlers, described separately. For integration purposes you might want to use the Gateway. For more generic needs, Eventuous offers the EventHandler base class. It allows specifying typed handlers for each of the event types that the handler processes:

public class MyHandler : EventHandler {
    public MyHandler(SmsService smsService) {
        On<RoomBooked>(async ctx => await smsService.Send($"Room {ctx.Message.RoomId} booked!"));
    } 
}

The typed handler will get an instance of MessageConsumeContext<T> where T is the message type. There, you can access the message using the Message property without casting it.

A subscription will invoke a single consumer and give it an instance of the consume context. The consumer’s job is to handle the message by invoking all the registered handlers. By default, Eventuous uses the DefaultConsumer, unless specified otherwise when registering the subscription. The IMessageConsumeContext interface has functions to acknowledge (ACK), not acknowledge (NACK), or ignore the message. The default consumer ACKs the message when all the handlers processed the message without failures, and at least one handler didn’t ignore the message. It NACKs the message if any handler returned an error or crashed. Finally, it will ignore the message if all the handlers ignored it. How the message handling result is processed is unknown to the consumer as this behaviour is transport-specific. Each subscription type has its own way to process the message handling result.

Handling result

The handler needs to return the handling status. It’s preferred to return the error status EventHandlingStatus.Failure instead of throwing an exception. When using the EventHandler base class, if the event handling function throws an exception, the handler will return the failure status and not float the exception back to the subscription.

The status is important for diagnostic purposes. For example, you don’t want to trace event handlers for events that are ignored. That’s why when you don’t want to process the event, you need to return EventHandlingStatus.Ignored. The EventHandler base class will do it automatically if it gets an event that has no registered handler.

When the event is handled successfully (neither failed nor ignored), the handler needs to return EventHandlingStatus.Success. Again, the EventHandler base class will do it automatically if the registered handler doesn’t throw.

The subscription will acknowledge the event only if all of its handlers don’t fail. How subscriptions handle failures depends on the transport type.

Registration

As mentioned before, you’d normally register subscriptions using the DI extensions provided by Eventuous:

builder.Services.AddSubscription<StreamSubscription, StreamSubscriptionOptions>(
    "PaymentIntegration",
    builder => builder
        .Configure(x => x.StreamName = PaymentsIntegrationHandler.Stream)
        .AddEventHandler<PaymentsIntegrationHandler>()
);

The AddSubscription extension needs two generic arguments: subscription implementation and its options. Every implementation has its own options as the options configure the particular subscription transport.

The first parameter for AddSubscription is the subscription name. It must be unique within the application scope. Eventuous uses the subscription name to separate one subscription from another, along with their options and other things. The subscription name is also used in diagnostics as a tag.

Then, you need to specify how the subscription builds. There are two most used functions in the builder:

  • Configure: allows to change the subscription options
  • AddEventHandler<T>: adds an event handler to the subscription

You can add several handlers to the subscription, and they will always “move” together throw the events stream or topic. If any of the handlers fail, the subscription might fail, so it’s “all or nothing” strategy.

Eventuous uses the consume pipe, where it’s possible to add filters (similar to MassTransit). You won’t need to think about it in most of the cases, but you can read mode in the Pipes and filters section.

Subscription drops

A subscription could drop for different reasons. For example, it fails to pass the keep alive ping to the server due to a transient network failure, or it gets overloaded.

The subscription service handles such drops and issues a resubscribe request, unless the application is shutting down, so the drop is deliberate.

This feature makes the subscription service resilient to transient failures, so it will recover from drops and continue processing events, when possible.

You can configure the subscription to ignore failures and continue by setting ThrowIfError property of SubscriptionOptions to false.

5.4 - Consume pipe

Subscription consume pipes and filters

Pipes and filters

When a subscription gets a message from the transport, it will create a consume context with the message itself, and some contextual information like message id, message type, subscription id, etc. Then, it will pass it over to the consume pipe. A pipe is a set of filters, which are executed sequentially, like middlewares. Each filter can do something with the context, and then calls the next filter. The process continues until there are no filters left. By that time, the message is considered consumed, and the pipe finishes.

Eventuous consume pipe can be customised by adding filters to it, but normally you’d use the default pipe, which can be visualized like this:

Default consume pipeline

Some subscriptions conditionally or unconditionally add filters to the pipe, when they detect the default pipe. For example, EventStoreDB catch-up subscription might add the concurrent filter and partitioning filter, and RabbitMQ subscription uses the concurrent filter.

Available filters

Eventuous has two interfaces for filters, but all the provided filters are based on ConsumeFilter and ConsumeFilter<T> base classes. There, <T> is the context type, as some filters can only process certain context types. The validation of filter vs context type matching is done when the pipeline is constructed, so it won’t fail at runtime.

Out of the box, you can find the following filters:

  • ConsumerFilter: (mandatory) the final filter in the pipe, which hosts the consumer
  • MessageFilter: (optional) allows filtering out messages based on context data (message type, meta, payload, etc.)
  • TracingFilter: (optional) traces the event handling process, added by default when diagnostics aren’t disabled
  • ConcurrentFilter: (optional) allows splitting message processing from the subscription
  • PartitioningFilter: (optional) allows parallel message processing with partitions

Consumer filter

The consumer filter holds an instance of the message consumer. By default, Eventuous uses the DefaultConsumer, and it doesn’t require any configuration. You can override that using the UseConsumer<T>() function of the subscription builder when you register a subscription.

The default consumer Acks the message when all the handlers processed the message without failures, and at least one handler didn’t ignore the message. It Nacks the message if any handler returned an error or crashed. Finally, it will ignore the message if all the handlers ignored it. How the message handling result is processed is unknown to the consumer as this behaviour is transport-specific. Each subscription type has its own way to process the message handling result.

When building a custom consumer, you’d need to include similar event handling result logic into it.

The consumer filter is a mandatory filter that should be the final filter in the pipe. When you register a subscription, the consumer filter is added to the pipe by default.

Message filter

The message filter can be used to prevent some messages from going down the pipe by filtering those messages out. When constructed, it needs a single constructor argument, which is a filtering function. The function gets an instance of IMessageContext, so it can use all of its available properties to filter messages. When the message is filtered out, the filter will mark it as ignored.

Example:

builder.Services.AddSubscription<StreamSubscription, StreamSubscriptionOptions>(
    "nonFooSubscription",
    cfg => cfg.AddConsumeFilterFirst(new MessageFilter(x => !x.MessageType.StartWith("foo")))
);

Such a subscription will ignore all the events that have the type starting with “foo”.

Tracing filter

The tracing filter gets added automatically when Eventuous diagnostics is enabled (or, not disabled, as it’s enabled by default). Read more about Eventuous tracing in the Diagnostics section.

Concurrent filter

When using the concurrent filter, the pipe gets separated in two parts: before the concurrent filter and after it. The filter itself creates a channel of a given size (100 by default), where it puts all the messages in order. Then, the pipeline returns to the subscription as if the message was already consumed. A parallel process fetches messages from the channel continuously, and sends them to the second part of the pipe where messages actually gets consumed.

Because of such a split, the result returned to the subscription doesn’t contain the actual handling result. Therefore, the concurrent filter can only be used in combination with DelayedAckConsumeContext. It’s the responsibility of a subscription to create such a context type, so only some subscription types support using the concurrent filter. This context type has two additional functions to Ack (Acknowledge function) or Nack (Fail function) each message. The subscription provides specific implementations of those functions. For example, an EventStoreDB catch-up subscription would commit the checkpoint when the event is acknowledged. The filter calls these functions itself, based on the result returned by the second part of the pipe (delayed consume).

Pipe with concurrent filter

The concurrent filter is useful in combination with partitioning filter, but it can also be used for other purposes, like batched event processing.

Subscriptions that support delayed consume add this filter by default when necessary (based on the subscription configuration). Check the particular transport subscription implementation to learn more.

Partitioning filter

Sometimes the subscription workload gets too high for it to cope with the number of delivered messages, and process them fast enough one by one. In this case, the partitioning filter can be useful as it allows handling messages in parallel. Most of the time you’d want the messages to be delivered in order within some partition. For example, it is often important to process messages from a single aggregate in order. To support that, you can use the stream name (which contains the aggregate id) as the partition key.

Pipe with concurrent and partitioning filters

You can use two options for configuring the filter:

  • Provide a function that calculates the partition hash
  • Provide a function that returns the partition key
  • Provide nothing

Using the second option is easier as you don’t need to calculate the hash, it’s enough to return something like stream id. In that case, the filter will use MurmurHash3 to calculate the hash for a given partition key.

The third option will use the default partition key function that uses the message stream as partition key.

All those options require the number of partitions provided as an argument.

As the partitioning filter processes messages in multiple threads, it cannot provide the global processing order. The order of processing is guaranteed within a partition. Due to that fact, it needs to be able to Ack or Nack messages individually, without maintaining global order. Therefore, it only works with DelayedAckConsumeContext, and it’s often used with the concurrent filter for best performance. As you can imagine, partitioning can only be used for subscriptions that support delayed consume.

Subscriptions that support delayed consume add this filter by default when necessary (based on the subscription configuration) together with the concurrent filter. Check the particular transport subscription implementation to learn more.

5.5 - Event handlers

The last bit of the subscription process

Event handlers are at the end of the subscription event processing pipeline. That’s the place where something actually happens with the event.

5.6 - Diagnostics

Monitoring subscriptions

Out of the box, Eventuous provides metrics and traces for monitoring your subscriptions.

Also, study the Diagnostics section for more information.

Mind the gap

When using subscriptions for read model projections, you enter to the world of CQRS and asynchronous messaging. After completing a transaction in the domain model, one or more events are added to the event store. Then, a subscription picks it up and calls a projection. Although in most cases you’ll see only a few milliseconds delay between these operations, if your projection becomes slow (for example, it uses a slow database), the processing time will increase.

The easiest way to detect such situations is to observe the gap between the last event in the stream, which the subscription listens to, and the event, which is currently being processed. We call it the subscription gap.

The gap is measured by subscriptions that implement the IMeasuredSubscription interface:

public interface IMeasuredSubscription {
    GetSubscriptionGap GetMeasure();
}

Subscription gaps are collected as metrics. Read more about Eventuous subscription metrics below.

Subscription metrics

Eventuous collects two types of metrics for subscriptions:

  • Subscription gap in time and count
  • Subscription performance

For the subscription gap, Eventuous collects the time and count of the gap between the last event in the stream, which the subscription listens to, and the event, which is currently being processed.

Keep in mind that due to the limitation of EventStoreDB, the gap event count is not accurate when subscribing to $all stream. It’s because the $all stream position is the commit position of the event in the global log, not the position of the event in the stream. Unfortunately, as per today, it is impossible to translate the commit position gap to the number of events.

Here is an example of the subscription gap metric exported to Prometheus:

# HELP eventuous_subscription_gap_count_events Gap between the last processed event and the stream tail
# TYPE eventuous_subscription_gap_count_events gauge
eventuous_subscription_gap_count_events{subscription_id="BookingsProjections"} 1098 1649081749067
eventuous_subscription_gap_count_events{subscription_id="PaymentIntegration"} 0 1649081749067

In addition to the subscription gap metric, Eventuous also collects metrics for the subscription performance as a histogram. The metric name is eventuous_subscription_duration. It measures the duration of handling a single event. As handling different event types might trigger different kinds of processing, the histogram is tagged with the message type (message_type tag).

All the subscription metrics are tagged by the subscription id (subscription_id tag). It allows you to plot a graph of the subscription metrics for different subscriptions.

Here is an example of the subscription duration metrics exported as a Prometheus metric for the subscription with id BookingsProjections:

# HELP eventuous_subscription_duration_ms Processing duration, milliseconds
# TYPE eventuous_subscription_duration_ms histogram
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="0"} 0 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="5"} 0 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="10"} 0 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="25"} 0 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="50"} 0 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="75"} 1 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="100"} 1 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="250"} 1 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="500"} 1 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="1000"} 1 1649081749067
eventuous_subscription_duration_ms_bucket{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections",le="+Inf"} 1 1649081749067
eventuous_subscription_duration_ms_sum{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections"} 50.429 1649081749067
eventuous_subscription_duration_ms_count{message_type="V1.RoomBooked",partition="0",subscription_id="BookingsProjections"} 1 1649081749067

Subscription tracing

Each part of the subscription event processing pipeline is traced. Each event is wrapped into a consume context by the subscription, then passed to the consumer. The consumer then calls all the event handlers.

graph LR; Subscription --> Consumer Consumer --> B1[Handler 1] Consumer --> B2[Handler 2] Consumer --> B3[Handler 3]

As all Eventuous producers and event store(s) are instrumented, they propagate the trace context to the subscription. The subscription then propagates the trace context to the consumer, and the consumer then propagates the trace context to the event handlers, so all these operations are traced.

Here’s an example of the trace context propagated to the subscription visualised by Zipkin:

Subscription trace

As you can see, event handlers for the BookingProjection subscription are projections, and trigger updates in MongoDB, which are also traced because the sample application is instrumented by MongoDB tracing library.

As mentioned before, Eventuous ensures that the tracing context is propagated also remotely, so you get the full trace even if the subscription is running in a different process.

Health checks

The subscription service class also implements the IHealthCheck interface. Therefore, it can be used for ASP.NET Core health monitoring.

When using Eventuous dependency injection extensions, each registered subscription will also register its health checks.

However, you need to register the global Eventuous subscriptions health check by adding one line to the ASP.NET Core health checks registration:

builder.Services
  .AddHealthChecks()
  .AddSubscriptionsHealthCheck("subscriptions", HealthStatus.Unhealthy, new []{"tag"});

5.7 - EventStoreDB

Subscriptions for EventStoreDB

EventStoreDB natively supports real-time subscriptions, which will also deliver historical events when you don’t specify the starting position. It makes the product a perfect candidate to support event-sourced systems, as you won’t need to invent things like CDC- or pull-based subscriptions.

Eventuous supports all subscription kinds provided by EventStoreDB:

5.7.1 - All stream subscription

Subscribe to all events in the store

Subscribing to all events in the store is extremely valuable. This way, you can build comprehensive read models, which consolidate information from multiple aggregates. You can also use such a subscription for integration purposes, to convert and publish integration events.

All stream subscription

For registering a subscription to $all stream, use `AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions> as shown below:

builder.Services.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
    "BookingsProjections",
    builder => builder
        .AddEventHandler<BookingStateProjection>()
        .AddEventHandler<MyBookingsProjection>()
);

Subscription options for AllStreamSubscription are defined in AllStreamSubscriptionOptions class.

Option Description
SubscriptionId Unique subscription identifier.
ThrowOnError If true, an exception will be thrown if the subscription fails, otherwise the subscription continues to run. Default is false.
EventSerilizer Serializer for events, if null the default serializer will be used.
MetadataSerilizer Serializer for metadata, if null the default serializer will be used.
Credentials EventStoreDB user credentials. If not specified, the credentials specified in the EventStoreClientSettings will be used.
ResolveLinkTos If true, the subscription will automatically resolve the event link to the event that caused the event. Default is false.
ConcurrencyLimit Maximum number of events to be processed in parallel. Default is 1.
EventFilter Filter for events, if null, the subscription will filter out system events.
CheckpointInterval Interval between checkpoints when event filter is used. Default is 10 events. This interval tells the subscription to report the current checkpoint when the subscription doesn’t receive any events for this interval because all the events were filtered out.

Checkpoint store

AllStreamSubscription is a catch-up subscription that is fully managed on the client side (your application), so you need to manage the checkpoint. You can register the checkpoint store using AddCheckpointStore<T>, but in that case it will be used for all subscriptions in the application. It might be that your app has multiple subscriptions, and you want to use different checkpoint stores for each of them. In that case, you can register the checkpoint store for each subscription using UseCheckpointStore<T> extension of the subscription builder

builder.Services.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
    "BookingsProjections",
    builder => builder
        .UseCheckpointStore<MongoCheckpointStore>()
        .AddEventHandler<BookingStateProjection>()
        .AddEventHandler<MyBookingsProjection>()
);

Concurrent event handlers

As any catch-up subscription, subscription to $all runs sequentially, processing events one by one. In many cases that’s enough, but sometimes you might want to speed it up, and allow parallel processing of events. To do that, you need to set the ConcurrencyLimit subscription option property to a value that is equal to the number of events being processed in parallel. In addition, you need to tell the subscription how to distribute events into partitions. That is needed as you rarely can tolerate processing events in a completely random order, so you can partition events using some key, and distribute them to different partitions.

Here is an example of using AllStreamSubscription with ConcurrencyLimit and partitioning by stream name:

var partitionCount = 2;
builder.Services.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
    "BookingsProjections",
    builder => builder
        .Configure(cfg => cfg.ConcurrencyLimit = partitionCount)
        .AddEventHandler<BookingStateProjection>()
        .AddEventHandler<MyBookingsProjection>()
        .WithPartitioningByStream(partitionCount)
);

You can build your own partitioning strategy by implementing the GetPartitionKey function:

public delegate string GetPartitionKey(IMessageConsumeContext context);

and then using it in the WithPartitioning extension:

builder => builder
    .Configure(cfg => cfg.ConcurrencyLimit = partitionCount)
    ... // add handlers
    .WithPartitioning(partitionCount, MyPartitionFunction)

5.7.2 - Stream subscription

Subscribe to events from a single stream

Although subscribing to $all using AllStreamSubscription is the most efficient way to create, for example, read models using all events in the event store, it is also possible to subscribe to a single stream.

For example, you can subscribe to the $ce-Booking stream to project all events for all the aggregates of type Booking, and create some representation of the state of the aggregate in a queryable store.

Another scenario is to subscribe to an integration stream, when you use EventStoreDB as a backend for a messaging system.

For that purpose you can use the StreamSubscription class.

Single stream subscription

For registering a subscription to a single stream, use `AddSubscription<StreamSubscription, StreamSubscriptionOptions> as shown below:

builder.Services.AddSubscription<StreamSubscription, StreamSubscriptionOptions>(
    "BookingsStateProjections",
    builder => builder
        .Configure(cfg => {
            cfg.StreamName = "$ce-Booking";
            cfg.ResolveLinkTos = true;
        )
        .AddEventHandler<BookingStateProjection>()
);

Subscription options for StreamSubscription are defined in StreamSubscriptionOptions class.

Option Description
SubscriptionId Unique subscription identifier.
StreamName Name of the stream to subscribe to.
ThrowOnError If true, an exception will be thrown if the subscription fails, otherwise the subscription continues to run. Default is false.
EventSerilizer Serializer for events, if null the default serializer will be used.
MetadataSerilizer Serializer for metadata, if null the default serializer will be used.
Credentials EventStoreDB user credentials. If not specified, the credentials specified in the EventStoreClientSettings will be used.
ResolveLinkTos If true, the subscription will automatically resolve the event link to the event that caused the event. Default is false.
IgnoreSystemEvents Set to true to ignore system events. Default is true.
ConcurrencyLimit Maximum number of events to be processed in parallel. Default is 1.

Checkpoint store

StreamSubscription is a catch-up subscription that is fully managed on the client side (your application), so you need to manage the checkpoint. You can register the checkpoint store using AddCheckpointStore<T>, but in that case it will be used for all subscriptions in the application. It might be that your app has multiple subscriptions, and you want to use different checkpoint stores for each of them. In that case, you can register the checkpoint store for each subscription using UseCheckpointStore<T> extension of the subscription builder

builder.Services.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
    "BookingsProjections",
    builder => builder
        .Configure(cfg => {
            cfg.StreamName = "$ce-Booking";
            cfg.ResolveLinkTos = true;
        )
        .UseCheckpointStore<MongoCheckpointStore>()
        .AddEventHandler<BookingStateProjection>()
);

Concurrent event handlers

The single stream subscription is identical to the $all stream subscription in terms of the event handlers execution. By default, all the events are processed one-by-one, but you can use the ConcurrencyLimit option to process multiple events in parallel.

You can use the stream name partitioner when subscribing to a category ($ce) stream. In that case events for a single aggregate instance will always be processed sequentially, but events for different aggregate instances can be processed in parallel.

Read more about concurrent event processing on the all stream subscription page.

5.7.3 - Persistent subscription

Subscribe to event streams using a server-managed subscription.

5.8 - Google PubSub

Subscriptions for Google PubSub

WIP

5.9 - RabbitMQ

Subscriptions for RabbitMQ

[WIP]

6 - Read models

Read (query, reporting) models

6.1 - Concept

The concept of read models

Queries in event-sourced systems

As described previously, the domain model is using events as the source of truth. These events represent individual and atomic state transitions of the system. We add events to event store one by one, in append-only fashion. When restoring the state of an aggregate, we read all the events from a single stream, and apply those events to the aggregate state. When all events are applied, the state is fully restored. This process takes nanoseconds to complete, so it’s not really a burden.

However, when all you have in your database is events, you can hardly query the system state for more than one object at a time. The only query that the event store supports is to get one event stream using the aggregate id. In many cases, though, we need to query using some other attribute of the aggregate state, and we expect more than one result. Many see it as a major downside of Event Sourcing, but, in fact, it’s not a big problem.

When building an event-sourced system, after some initial focus on the domain model and its behaviour, you start to work on queries that provide things like lists of objects via an API, so the UI can display them. There you need to write queries, and that’s where the idea of CQRS comes in.

CQRS (do you mean “cars”?)

The term CQRS was coined more than a decade ago by Greg Young, who also established a lot of practices of Event Sourcing as implemented by Eventuous.

The concept can be traced back in time to a separation between operational and reporting store:

[The main] database supports operational updates of the application’s state, and also various reports used for decision support and analysis. The operational needs and the reporting needs are, however, often quite different - with different requirements from a schema and different data access patterns. When this happens it’s often a wise idea to separate the reporting needs into a reporting database…

ReportingDatabase - Martin Fowler’s bliki

Greg argues that it’s not a requirement to separate two databases, but it’s a good idea to at least understand that the need for transactional updates requires a different approach compared with reporting needs. Say, you use something like EntityFramework to persist your domain entities state. Although it works quite well, it’s not a good idea to use it for reporting purposes. You’d be limited to reach the data using EntityFramework’s DbContext, when in reality you’d want to make more direct queries, joining different tables, etc.

CQRS and Event Sourcing

In real life, CQRS in event-sourced system means that you will have to separate the operation and the reporting stores. It’s because querying the state of a single aggregate is not the only query you’d like to do. You might want to query across multiple aggregates, or across different aggregate types. In addition, you don’t always need to return the full aggregate state, but only a subset of it.

That’s where read models come in. Read models are projections of the system state, which are built based on the query needs. Therefore, we sometime reference them as views, or query models. You’d normally use some other database than your event store database for storing read models, and that database needs to support rich indexing and querying.

Benefits of read models

In state-based systems you normally have access to the state of your domain object in a very optimised, normalised schema. When executing a query over a normalised database, you’d often need to build a comprehensive set of joins across multiple tables or collections, so you can get all the required information in one go. That approach is not always optimal. Let’s say you want to display a widget that shows the number of reservations made for a give hotel during the last 30 days. You’d need to run a count query across the reservations table, and then a join across the hotels table to get the hotel name.

Now imagine all the reservations made are represented as events. By projecting those events to a read model that just calculates the number of reservations made for the last 30 days per hotel, you can get the same result in a much more efficient way. When you have a read model, you can do the same query in a single query, without the need to build joins. You’d just need to run a query against the read model, and it would return the required information in a single query, just using the hotel id as a single query argument.

You could see this approach as a denormalisation of an operational database schema. However, it’s not the only thing that happens. When building read models, you are no longer bound to the primary key of the aggregate that emit state transitions. You can use another attribute as the primary key, or even a composite key. For example, with the number of reservations of a hotel, you could use the hotel id and the date of the reservation as the read model primary key.

The point here is that when building read models, you’d normally start designing them based on the needs of the query, not the needs of the database schema. The query needs most often come from the user interface requirements for data visualisations, which are often orthogonal to the operational needs of the domain model. Read model allow you to find a balance between operational and reporting needs without sacrificing the explicitness of the model for the richness and effectiveness of the query model.

Here are some examples of the read models that can be built for a given domain model:

  • My reservations (per guest)
  • My past stays (per guest)
  • My upcoming stays (per guest)
  • Upcoming arrivals (per hotel)
  • Cancellations for the last three months (per hotel)

Built as read models, all those queries can be run in a single query, without the need to build joins over multiple tables and potentially thousands of rows or documents.

7 - Producers

Message producers

Producers, along with Subscriptions, are the foundation of the Eventuous' messaging system.

Concept

Although an event store produces events too, it is normally used via the Aggregate store. Sometimes, you just need to produce arbitrary messages, which aren’t necessarily events. For example, you can also produce commands. Still, the main purpose of a producer is to put events to an event database or a broker, so they can be consumed by a subscription.

Within Eventuous, the main purpose of producers is to support gateways, and, through gateways, enable creation of connectors.

7.1 - Implementation

How producers are implemented

Abstraction

Eventuous has two types of producers: with or without produce options. The producer without produce options is a simple producer that does not support any options, and it normally produces messages as-is. The producer with produce options can have more fine-grained control on how messages are produced. Produce options are provided per message batch.

The base interface for a producer is called IEventProducer and its only method is Produce.

Task Produce(
    StreamName                   stream,
    IEnumerable<ProducedMessage> messages,
    CancellationToken            cancellationToken = default
);

Producers with produce options must implement the IEventProducer<TProduceOptions> interface, which is derived from IEventProducer. Therefore, a producer with options can also be used as an option-less producer.

Task Produce(
    StreamName                   stream,
    IEnumerable<ProducedMessage> messages,
    TProduceOptions?             options,
    CancellationToken            cancellationToken = default
);

The IEventProducer interface also has a property ReadyNow, which indicates if the producer is ready. It is needed because gateways to know if the producer is ready to produce messages. In many cases, a producer needs to arrange or check some infrastructure (queue or topic) before it can produce messages. When that work is done, the producer should set the ReadyNow property to true.

Base producer

There are two abstract base classes for producers, one without options, and the other one with options.

The purpose for the base class is to enable tracing for produced messages. All producers implemented in Eventuous use the base producer class. For the purpose of tracing, the base producer class accepts ProducerTracingOptions as a parameter.

public record ProducerTracingOptions {
    public string? MessagingSystem  { get; init; }
    public string? DestinationKind  { get; init; }
    public string? ProduceOperation { get; init; }
}

These options are used to set the producer trace tags that are specific for the infrastructure. For example, the messaging system tag for RabbitMqProducer is rabbitmq.

Both base classes implement the Produce method. It is only used to enable tracing. The actual producing is done by the ProduceMessages abstract method. When implementing a new producer using the base class, you’d only need to implement the ProduceMessages method.

You can see that for producing a message, the producer gets a collection of ProducedMessage record. It looks like this:

public record ProducedMessage {
    public object               Message     { get; }
    public Metadata?            Metadata    { get; init; }
    public Guid                 MessageId   { get; }
    public string               MessageType { get; }
    public AcknowledgeProduce?  OnAck       { get; init; }
    public ReportFailedProduce? OnNack      { get; init; }
}

The Message property is the actual message payload. Normally, producers use IEventSerializer instance to serialize the message payload. Sometimes, producers must comply with their supporting infrastructure, and use a different way to serialize the message payload. In that case, the MessageType property can be added to the produced message body or header, so it can be deserialized by subscriptions.

Registration

Eventuous provide several extensions to IServiceCollection to register producers. You can provide a pre-made producer instance, a function to resolve the producer from the IServiceProvider, or just the producer type if its dependencies can be resolved.

For example, if you have registered the EventStoreClient instance, you can then register the EventStoreProducer like this:

builder.Services.AddEventProducer<EventStoreProducer>();

If a producer needs to do some work before ot becomes ready, it should implement the IHostedService interface, so it can do all the necessary startup work in StartAsync method. When using any of the AddProducer extensions, the producer will be registered as a IHostedService if the producer implements it.

Remember that producers are registered as singletons. If you need to have multiple producer instances in your application, you’d need to provide them as direct dependencies instead of registering them. It’s not often that you need multiple producer instances, unless you’re using gateways. Gateway registration extensions are able to use individual producer instances as dependencies.

8 - Gateway

Event gateway copies events between databases and brokers

An event gateway is an engine to bridge Event Sourcing with Event-Driven Architecture (EDA). When you store events to an event store, you can use an event gateway to receive stored events, transform them, and distribute downstream using different transport.

Scenarios where an event gateway is useful:

  • Publish transformed domain events as integration events using a broker
  • Scale out projections using a partitioned, event-based broker, such as Kafka, Amazon Kinesis, Google PubSub or Azure Event Hub
  • Backup or archive domain events in another event store or time-series database
  • Send events to an analytics store or service

How a gateway works

A gateway needs three components that form a gateway event pipeline:

8.1 - Implementation

Implementing the Gateway

Gateway is a ready-made Eventuous construct that needs other components to work properly (subscription and producer at least). Any of subscription and producer type provided by Eventuous, as well as custom ones, can be used in a gateway.

When you implement a gateway, the only things that you need to do are:

  • Provide an optional transformation and filtering function
  • Register a gateway given one subscription and one producer

Transformation

One common scenario for a gateway is to distribute domain events to other systems via a message broker. However, it’s not a good idea to publish domain events as-is for others to consume. By doing so, you are coupling downstream consumers to your domain model. When you decide to change your domain model, and, possibly, enrich your domain events, you force developers of downstream consumers to change their code. Effectively, you either lose the ability to change your domain model, or you are coupling downstream systems to your bounded context.

That’s why we strongly suggest to establish a contract-like event schema for the outside world, and keep it stable. That’s also one more reason not to allow other systems to subscribe to your domain (private) events directly from the event store, but deploy a gateway and distribute your integration and notification (public) events using a message broker.

If you decide to revamp the public events schema, you can do it the same way as you’d publish a new API schema version. Using a gateway, you can produce multiple public events given one private event, so you can always produce different versions of the same event as a public event for the period of support for both versions. In short, using a clear private vs public event transformation you can treat integrations events schema as a versioned contract, and be free to evolve your domain (private) events as you wish.

Based on the producer kind (with or without options), you can perform the transformation using a function that implements RouteAndTransform or RouteAndTransform<TProducerOptions>:

delegate ValueTask<GatewayMessage[]> RouteAndTransform(IMessageConsumeContext context);
delegate ValueTask<GatewayMessage<TProduceOptions>[]> RouteAndTransform<TProduceOptions>(
    IMessageConsumeContext message
);

If you prefer to do the transformation using classes, you can implement the IGatewayTransform interface:

interface IGatewayTransform {
    ValueTask<GatewayMessage[]> RouteAndTransform(IMessageConsumeContext context);
}

interface IGatewayTransform<TProduceOptions> {
    ValueTask<GatewayMessage<TProduceOptions>[]> RouteAndTransform(
        IMessageConsumeContext context
    );
}

As you can see, both ways require to return an array of GatewayMessage objects. The returned array could be empty if you don’t want to produce a public event for a given private event.

The GatewayMessage signatures are:

record GatewayMessage(StreamName TargetStream, object Message, Metadata? Metadata);

record GatewayMessage<TProduceOptions>(
    StreamName      TargetStream,
    object          Message,
    Metadata?       Metadata,
    TProduceOptions ProduceOptions
) : GatewayMessage(TargetStream, Message, Metadata);

Registration

There’s no other component to implement for getting a working gateway. You need to register a gateway using one subscription, one producer, and one transformation function or class.

To register a gateway, use one of the AddGateway methods. For example, the sample application uses this gateway registration for publishing integration events to EventStoreDB integration stream:

services
    .AddGateway<AllStreamSubscription, AllStreamSubscriptionOptions, EventStoreProducer>(
        "IntegrationSubscription",
        PaymentsGateway.Transform
    );

Here, PaymentsGateway.Transform is a transformation function that transforms private events to public events.

You can use any available subscription or producer for the gateway. If the subscription needs a checkpoint store, you can either register it globally, or provide one using the subscription configuration function for the AddGateway method. The same function can be used for any additional subscription configuration, like partitioning.

There are overloads to register a gateway using a producer with options as well. You can also provide additional functions to configure the subscription when using, for example, a specific checkpoint store. You can also register a gateway that uses the transformation class instead of a function.

All of the AddGateway overloads also have a parameter called awaitProduce of type bool that is set to true by default. It only works for producers that support delayed delivery reports, like the Kafka producer. When you set it to false, you might get better performance of the producer, but you can get undesired consequences if the producer fails for some messages, as when the produce action is retried you might get duplicate messages produced.

9 - Diagnostics

Observability of applications build with Eventuous

WIP

10 - FAQ

Frequently asked questions.

Find answers to most common questions here.

10.1 - Compatibility

Platforms and SDKs

Only .NET 5+? Why not .NET Framework 3.5?

Eventuous uses the latest features of C#, like records and advanced pattern matching. Therefore, we rely on compiler versions, which support C# 9.

We also aim to support the current application hosting model that only got consistent and stable in .NET 5.

Eventuous supports .NET Core 3.1, but it’s not a priority. Some packages only support .NET 6 as they need the latest features like minimal API. Right now, Eventuous provides packages for the following runtimes:

  • .NET Core 3.1
  • .NET 5
  • .NET 6

Targets will be added and removed when getting our of support or when new versions get released.

10.2 - Compare

How Eventuous is different from others?

EventFlow

EventFlow is probably the most popular Event Sourcing and CQRS framework for .NET. It has a rich history, it’s battle-tested and ready for production use. So, why not EventFlow?

Abstractions

EventFlow is full of abstractions. There’s an interface for everything, including commands and events. In fact, Eventuous discourages you to use interfaces for these primitives, as they are just serializable property bags. For example, a Command base class in EventFlow has 60 lines of code with three constrained generic parameters, one inheritance (ValueObject), and it implements a generic interface ICommand. In Eventuous, we recommend using records, but you can use classes too.

Codebase size

We strongly believe in code, which fits in one’s head. You can read all the Eventuous core library code in half an hour and understand what it does. There is no magic there. EventFlow, on the other side, has lots of interfaces and attributes that aim to prevent developers from doing the wrong thing. All those guide rails need code, which Eventuous doesn’t have and will never have.

Just enough

Unlike EventFlow, we don’t provide anything special for logging, DI containers, scheduled jobs, sagas, etc. We know some open-source great tools to deal with these concerns, which are out of scope for this project. However, Eventuous plays well the any DI, as well as it uses Microsoft logging abstractions.

10.3 - Persistence

What about repositories and other questions about persistence

Why you don’t have repositories?

The definition of the Repository pattern from bliki:

[Repository] Mediates between the domain and data mapping layers using a collection-like interface for accessing domain objects.

When using Event Sourcing, the idea of having a collection-like interface for accessing domain objects might be challenging to implement. It’s because we don’t persist domain objects state, but their state transitions as events instead. Therefore, having a collection-like abstraction on top of an event-sourced persistence would require, essentially, to load the whole event store to memory.

As Event Sourcing greatly benefits from using CQRS, the need to have a collection-like persistence abstraction also diminishes. In the command-oriented flow, you would only change the state of a single aggregate by handling one command. Therefore, you neither need a collection-like interface to load the state of a single aggregate, nor to append new events for a single aggregate to the store.