Skip to main content
Version: 0.14

Aggregate stream

Concept

So far, we figured out that an Aggregate is the transaction and consistency boundary within the domain model.

The aggregate store (application-level abstraction) uses an event store (infrastructure) to store events in streams. Each aggregate instance has its own stream, so the event store needs to be capable to read and write events from/to the correct stream.

When appending events to a stream, the append operation for a single stream must be transactional to ensure that the stream is consistent. Eventuous handles commands using the command service, and one command handler is the unit of work. All the events generated by the aggregate instance during the unit of work are appended to the stream as the final step in the command handling process.

Stream name

By default, Eventuous uses the AggregateType.Name combined with the aggregate id as the stream name. For example, the Booking aggregate with id 1 has a stream name Booking-1. That's what StreamName.For<Booking>(1) returns.

However, you might want to have more fine-grained control over the stream name. For example, you might want to include the tenant id in the stream name. It's possible to override the default convention by configuring the stream name mapping. The stream map contains a mapping between the aggregate identity type (derived from AggregateId) and the stream name generation function. Therefore, any additional property of the aggregate identity type can be used to generate the stream name.

For example, the following code registers a stream name mapping for the Booking aggregate:

BookingId.cs
public record BookingId : AggregateId {
public BookingId(string id, string tenantId) : base(id) {
TenantId = tenantId;
}

public string TenantId { get; }
}

Create a StreamNameMap and register in the container:

Program.cs
var streamNameMap = new StreamNameMap();
streamNameMap.Register<BookingId>(
id => new StreamName($"Booking-{id.TenantId}:{id.Value}") // Split in example with : if you use a Guid as identifier.
);
builder.Services.AddSingleton(streamNameMap);
builder.Services.AddCommandService<BookingService, Booking>();

Then, use the registered StreamNameMap in the CommandService:

BookingService.cs
public class BookingService : CommandService<Booking, BookingState, BookingId> {
public BookingService(IAggregateStore store, StreamNameMap streamNameMap)
: base(store, streamNameMap: streamNameMap) {
// command handlers registered here
}
}

In your projections you can retrieve the Id and TenantId from the StreamName in the IMessageConsumeContext<out T>:

BookingStateProjection.cs
static UpdateDefinition<BookingDocument> HandleRoomBooked(
IMessageConsumeContext<V1.RoomBooked> ctx,
UpdateDefinitionBuilder<BookingDocument> update
) {
var evt = ctx.Message;

// Get Id and TenantId
var (id, tenantId) = ctx.Stream.ExtractMultiTenantIds();

return update
.SetOnInsert(x => x.Id, id)
.SetOnInsert(x => x.TenantId, tenantId)
.Set(x => x.GuestId, evt.GuestId)
.Set(x => x.RoomId, evt.RoomId)
.Set(x => x.CheckInDate, evt.CheckInDate)
.Set(x => x.CheckOutDate, evt.CheckOutDate)
.Set(x => x.BookingPrice, evt.BookingPrice)
.Set(x => x.Outstanding, evt.OutstandingAmount);
}

The snippet above uses the following extension method to extract the Id and TenantId from the StreamName:

StreamNameExtensions.cs
public static class StreamNameExtensions
{
/// <summary>
/// Split the StreamName into multiple parts for multi tenant stream id.
/// </summary>
/// <param name="stream">The streamname</param>
/// <param name="separator">The seperator for splitting. Default is ':'.</param>
/// <returns>A tuple with TenantId and Id property.</returns>
/// <exception cref="InvalidStreamName">When stream id can't be split in 2 sections.</exception>
public static (string TenantId, string Id) ExtractMultiTenantIds(this StreamName stream, char separator = ':')
{
string streamId = stream.GetId();
var streamIdParts = streamId.Split(separator);

if (streamIdParts.Length != 2)
{
throw new InvalidStreamName(streamId);
}

return (streamIdParts[0], streamIdParts[1]);
}
}