This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Eventuous Connectors

Eventuous Connector allow subscribing to events from a source subscription and produce derived events to another system (sink), as well as execute reduce-like operations that work as projections.

Right now, the only source that is built-in to the Connector application is the EventStoreDB source. You can, however, build a custom connector using another Eventuous subscription type, or your own subscription implementation.

1 - Connector concept

What is Eventuous Connector and how to use it

Eventuous connectors allow subscribing to events from EventStoreDB (source) and produce derived events to another system (sink). Some sinks also support reduce-like operations that work as projections.

The Connector is a combination of a real-time subscription (currently only to $all stream) and a sink. A sink can be configured to produce (producer mode) or to project (projector mode) events. Some sinks support both modes, some only support one.

Currently, the following sinks are supported:

Sink Supported modes
Elasticsearch producer, projector
MongoDB projector
MS SQL Server or Azure SQL projector

Connector modes

Below, you can find a description of the different modes that are supported by the connectors.

Producer mode

In producer mode, the connector subscribes to the source and produces events to the sink. How events will look like on the other side depends on the sink implementation.

The producer mode is most useful for the purpose of re-publishing domain events to a broker, or to be used as an archive or backup. For example, the Elasticsearch sink can be used in combination with the archive event store to keep archived events in a cheaper persistence tier, whilst keeping the EventStoreDB database size contained.

When used in combination with a message bus sink, the connector can be used to publish integration events for other services to consume. However, right now the connector lacks the transformation capabilities, so we don’t recommend using it in this way when the connector is deployed as-is. However, you can still build a custom connector for that purpose.

Projector mode

The projector mode allows to implement projections that are built using other stacks than .NET. Basically, the projector mode implements the MapReduce pattern. Using a projector you can reduce events to a single piece of state using some unique key.

Because the reduce function requires custom implementation, Eventuous Connector needs to call some custom code that will get events and send the reduce function back. That custom code can be completely stateless and, therefore, can execute in parallel, although the Connector will maintain ordered event processing.

The only mode that the connector is able to execute such custom code at this moment is by using an external gRPC service with bidirectional streaming. The connector will send events to the gRPC service and the gRPC service will send the reduce function back. Each projector sink implementation requires using its own reduce functions set. With this model, the external gRPC service implements a gRPC server, and the connector will connect to it as a client.

Using a gRPC service for reduce functions

A custom gRPC service can be built in any language or stack that supports gRPC. You’d normally deploy it as a sidecar for the Connector pod in Kubernetes or a serverless workload.

For example, a MongoDB projector will expect to get operations line IndexOne or UpdateOne back from the gRPC service, and SQL Server projector uses a single operation that returns an arbitrary SQL statement based on event data.

Example: SQL Server projector sidecar

The Connector role here is to maintain the subscription to EventStoreDB, send events to the gRPC service, receive the reduce function back, execute the response, and maintain the checkpoint. By convention, each sink uses its database for checkpointing.

As a result, it’s possible to build a stack-agnostic stateless projector and use the Connector to do the heavy lifting. Each sink also provides observability instrumentation for the database client library it uses.

Refer to the specific connector documentation page for more information.

2 - Configuration and deployment

How to configure and deploy Eventuous Connector

Eventuous Connector with EventStoreDB source needs to be hosted as a continuously running service because it must maintain a realtime gRPC subscription. When you deploy a projector, it can be deployed as a sidecar for the connector, or as a standalone service. It could be possible to deploy it as a serverless workload if the serverless solution supports gRPC streaming.

Deployment

You can use the Connector container to available on Docker Hub in Kubernetes, Docker Compose, or serverless environment like Google CLoud Run or Amazon Fargate. If you want to use the Connector in projector mode, we recommend deploying your gRPC server as a sidecar for the connector.

When running the Connector in a serverless environment, you’d need to set the minimal and maximum container count to one. Remember that the Connector should run continuously, and if you run more than one instance simultaneously, you risk getting unexpected side effects.

More information will be available soon.

Configuration

To configure the Connector, you need to have a config.yaml file in the same directory as the Connector. The file consists of three sections:

  • connector: This section contains the configuration for the Connector.
  • source: This section contains the configuration for the EventStoreDB source database.
  • target: This section contains the configuration for the target database or broker.

The target configuration is specific to the target type. Find out more on the specific connector page in the left navigation menu.

Connector configuration

Here’s an example of a Cconnector section in the configuration file:

connector:
  connectorId: "esdb-elastic-connector"
  connectorAssembly: "Eventuous.Connector.EsdbElastic"
  diagnostics:
    tracing:
      enabled: true
      exporters: [zipkin]
    metrics:
      enabled: true
      exporters: [prometheus]
    traceSamplerProbability: 0

The connector id is used as the Eventuous subscription id. If the target is using a checkpoint store, this value will also be used as the checkpoint id. When running multiple connectors with different targets, make sure that each connector has its own unique id.

The assembly name is needed so the Connector can load the specific target implementation. All the target assemblies are included to the container by default, but only the specified assembly is loaded at runtime.

Diagnotics configuration

The connector is fully instrumented with traces and metrics. The following configuration parameters are supported:

  • enabled - if diagnostics are enabled
  • tracing - the tracing configuration
    • enabled - if tracing is enabled
    • exporters - the tracing exporters (zipkin, jaeger, otpl)
  • metrics - the metrics configuration
    • enabled - if metrics are enabled
    • exporters - the metrics exporters (prometheus, otpl)

Metrics exporters

  • otlp: Exports metrics using OpenTelemetry protocol. You need to configure the exporter using environment variables as described in the documentation.
  • prometheus: Adds the Prometheus metrics endpoint at /metrics path.

Trace exporters

Source configuration

The source configuration is used to connect to the EventStoreDB, as well as configure the subscription. At the moment, the connector will unconditionally subscribe to $all stream.

The following configuration parameters are supported:

  • connectionString - EventStoreDB connection string using gRPC protocol. For example: esdb://localhost:2113?tls=false
  • concurrencyLimit - the subscription concurrency limit. The default value is 1.
source:
    connectionString: "esdb://localhost:2113?tls=false"
    concurrencyLimit: 1

When the subscription concurrency limit is higher than 1, the subscription will partition events between multiple Elasticsearch producer instances. As those producers will run in parallel, it will increase the overall throughput.

3 - Projectors

Connector in Projector mode

3.1 - gRPC sidecar

Aggregate events to state using gRPC sidecar

As described in the Connector concept section, you can project events to state-oriented databases like MS SQL Server or MongoDB using the Connector running in Projector mode.

As the Connector doesn’t know how to project different event types to a document or a table, you need to have a supporting application that would do that kind of work. The Connector itself would be responsible for projecting events to whatever target it supports, maintaining the checkpoint, partitioning projections if necessary, etc.

Here’s the diagram from the Concept page:

Using a gRPC service for reduce functions

The sidecar application must implement a gRPC server with bidirectional streaming that has to run in a separate process. It should be accessible by the Connector via HTTP/2.

Each target needs to support its own response types. See the target documentation for more details.

The Connector would send requests to the sidecar as described in the following Proto file:

syntax = "proto3";

package projection;
import "google/protobuf/any.proto";
import "google/protobuf/struct.proto";

service Projection {
  rpc Project (stream ProjectionRequest) returns (stream ProjectionResponse);
}

message ProjectionRequest {
  string eventType = 1;
  string eventId = 2;
  string stream = 3;
  google.protobuf.Struct eventPayload = 4;
  map<string, string> metadata = 5;
}

message ProjectionResponse {
  string eventId = 1;
  google.protobuf.Any operation = 2;
  map<string, string> metadata = 3;
}

message Ignore {}

The sidecar receives the ProjectionRequest message that contains the necessary details about the event to project. It then needs to respond with the ProjectionResponse message that contains the operation property with one of the supported response, which is target-specific. You must pass the event id over in the response, so the Connector can match requests and responses and project events in the correct order.

The Ignore message is the only response message that is common for all the targets. You can respond with the Ignore message if you need to ignore some events, so they won’t get projected.

More details

Check the documentation for available targets to see how to implement a sidecar for each of them.

4 - Targets

Available targets for the Connector

Currently, the following targets are supported:

4.1 - MongoDB

Project events from EventStoreDB to MongoDB

The MongoDB target only support the projector mode for now.

You need to run a gRPC server accessible by the Connector to make the MongoDB target work.

Projector sidecar

You can create a projector gRPC server using any language or stack. The server must support bidirectional streaming. Below, you can find MongoDB-specific response types:

syntax = "proto3";

package projection;

import "google/protobuf/struct.proto";

message InsertOne {
  google.protobuf.Struct document = 1;
}

message UpdateOne {
  google.protobuf.Struct filter = 1;
  google.protobuf.Struct update = 2;
}

message DeleteOne {
  google.protobuf.Struct filter = 1;
}

Configuration

There are two sections to configure in the Connector configuration: target and grpc. The target section specified the MongoDB configuration, and the grpc section contains the sidecar URL.

For the MongoDB target, you need to configure the following parameters:

  • connectionString: The connection string to the MongoDB instance.
  • database: The name of the database to use.
  • collection: The name of the collection to use.

You can only project to one collection in one database using a single Connector instance.

Here’s the sample configuration for this connector:

connector:
  connectorId: "esdb-mongo-connector"
  connectorAssembly: "Eventuous.Connector.EsdbMongo"
  diagnostics:
    tracing:
      enabled: true
      exporters: [zipkin]
    metrics:
      enabled: true
      exporters: [prometheus]
    traceSamplerProbability: 0
source:
  connectionString: "esdb://localhost:2113?tls=false"
  concurrencyLimit: 1
target:
  connectionString: "mongodb://mongoadmin:[email protected]:27017"
  database: test
  collection: bookings
grpc:
  uri: "http://localhost:9091"
  credentials: "insecure"

Samples

We have a few samples for this target:

4.2 - Elasticsearch

Replicate and project events from EventStoreDB to Elasticsearch

An event-sourced system that stores domain events in EventStoreDB could benefit a lot from replicating these events to a search engine, such as Elasticsearch. When all the events are available in Elasticsearch, the system can be queried for events, and when events properly convey information about business operations, you can discover quite a lot using tools like Kibana.

This connector allows you to replicate events to Elasticsearch without having any knowledge about your system insights like event contracts, or event types. The natural limitation of this approach is that events must be stored in EventStoreDB in JSON format.

Configuration

The target configuration is used to connect to Elasticsearch, as well as create the necessary elements in Elasticsearch (index template, index rollover policy, and data stream).

The first step is to specify the target assembly in the connector section of the configuration file.

connector:
  connectorId: "esdb-elastic-connector"
  connectorAssembly: "Eventuous.Connector.EsdbElastic"

The target configuration has the following parameters:

  • connectionString - Elasticsearch connection string, should not be used when the cloudId is specified
  • connectorMode - producer or projector
  • cloudId - Elasticsearch cloud id, should be used when the connectionString is not specified
  • apiKey - Elasticsearch API key
  • dataStream - the index configuration section
    • indexName - the index name (data stream name)
    • template - the template section
      • templateName - the template name
      • numberOfShards - the number of shards for the data stream, default is 1
      • numberOfReplicas - the number of replicas for the data stream, default is 1
    • lifecycle - the lifecycle section
      • policyName - the rollover policy name
      • tiers - the rollover policy tiers, see the structure of a tier section below

The tier section is used to configure the rollover policy tiers. The tier name must match the available tier in your Elasticsearch cluster.

  • tier - the tier name (hot, warm, cold, etc)
  • minAge - the minimum age of the data stream (for example 10d for 10 days)
  • priority - the priority of the tier (0 is the lowest priority)
  • rollover - the rollover policy section
    • maxAge - the maximum index age
    • maxSize - the maximum index size
    • maxDocs - the maximum index documents
  • forceMerge - the force merge policy section
    • maxNumSegments - the maximum number of segments
  • readOnly - if the tier will be read only
  • delete - if the tier will be deleted

Producer mode

When running in Producer mode, the Connector will replicate events from EventStoreDB to Elasticsearch using data streams. Documents in the data stream are immutable, which is a good choice for storing events.

Based on the configuration, the connector will create the following elements in Elasticsearch:

  • Index template
  • Data stream
  • Index rollover policy

You can optimize the rollover policy to keep the index size optimal, as well as move older events to a cheaper storage tier.

Events are replicated to Elasticsearch in the following format:

  • messageId - the unique identifier of the event
  • messageType - the type of the event
  • streamPosition - event position in the original stream
  • stream - original stream name
  • globalPosition - position of the event in the global stream ($all)
  • message - the event payload
  • metadata - flattened event metadata
  • @timestamp - the timestamp of the event

There’s no need for a sidecar to run the Connector in Producer mode.

Projector mode

WIP

4.3 - MS SQL Server

Project events from EventStoreDB to Microsoft SQL Server or Azure SQL

WIP

5 - Custom connector

When you need to connect to a sink that is not supported yet, you can build a custom connector.

WIP