This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Eventuous Connectors

Eventuous Connector allow subscribing to events from a source subscription and produce derived events to another system (sink), as well as execute reduce-like operations that work as projections.

Right now, the only source that is built-in to the Connector application is the EventStoreDB source. You can, however, build a custom connector using another Eventuous subscription type, or your own subscription implementation.

1 - Connector concept

What is Eventuous Connector and how to use it

Eventuous connectors allow subscribing to events from EventStoreDB (source) and produce derived events to another system (sink). Some sinks also support reduce-like operations that work as projections.

The Connector is a combination of a real-time subscription (currently only to $all stream) and a sink. A sink can be configured to produce (producer mode) or to project (projector mode) events. Some sinks support both modes, some only support one.

Currently, the following sinks are supported:

Sink Supported modes
Elasticsearch producer, projector
MongoDB projector
MS SQL Server or Azure SQL projector

Connector modes

Below, you can find a description of the different modes that are supported by the connectors.

Producer mode

In producer mode, the connector subscribes to the source and produces events to the sink. How events will look like on the other side depends on the sink implementation.

The producer mode is most useful for the purpose of re-publishing domain events to a broker, or to be used as an archive or backup. For example, the Elasticsearch sink can be used in combination with the archive event store to keep archived events in a cheaper persistence tier, whilst keeping the EventStoreDB database size contained.

When used in combination with a message bus sink, the connector can be used to publish integration events for other services to consume. However, right now the connector lacks the transformation capabilities, so we don’t recommend using it in this way when the connector is deployed as-is. However, you can still build a custom connector for that purpose.

Projector mode

The projector mode allows to implement projections that are built using other stacks than .NET. Basically, the projector mode implements the MapReduce pattern. Using a projector you can reduce events to a single piece of state using some unique key.

Because the reduce function requires custom implementation, Eventuous Connector needs to call some custom code that will get events and send the reduce function back. That custom code can be completely stateless and, therefore, can execute in parallel, although the Connector will maintain ordered event processing.

The only mode that the connector is able to execute such custom code at this moment is by using an external gRPC service with bidirectional streaming. The connector will send events to the gRPC service and the gRPC service will send the reduce function back. Each projector sink implementation requires using its own reduce functions set. With this model, the external gRPC service implements a gRPC server, and the connector will connect to it as a client.

Using a gRPC service for reduce functions

A custom gRPC service can be built in any language or stack that supports gRPC. You’d normally deploy it as a sidecar for the Connector pod in Kubernetes or a serverless workload.

For example, a MongoDB projector will expect to get operations line IndexOne or UpdateOne back from the gRPC service, and SQL Server projector uses a single operation that returns an arbitrary SQL statement based on event data.

Example: SQL Server projector sidecar

The Connector role here is to maintain the subscription to EventStoreDB, send events to the gRPC service, receive the reduce function back, execute the response, and maintain the checkpoint. By convention, each sink uses its database for checkpointing.

As a result, it’s possible to build a stack-agnostic stateless projector and use the Connector to do the heavy lifting. Each sink also provides observability instrumentation for the database client library it uses.

Refer to the specific connector documentation page for more information.

2 - Configuration and deployment

How to configure and deploy Eventuous Connector

Eventuous Connector with EventStoreDB source needs to be hosted as a continuously running service because it must maintain a realtime gRPC subscription. When you deploy a projector, it can be deployed as a sidecar for the connector, or as a standalone service. It could be possible to deploy it as a serverless workload if the serverless solution supports gRPC streaming.

3 - EventStoreDB to MongoDB

Project events from EventStoreDB to MongoDB

WIP

4 - EventStoreDB to Elastic

Replicate and project events from EventStoreDB to Elasticsearch

An event-sourced system that stores domain events in EventStoreDB could benefit a lot from replicating these events to a search engine, such as Elasticsearch. When all the events are available in Elasticsearch, the system can be queried for events, and when events properly convey information about business operations, you can discover quite a lot using tools like Kibana.

This connector allows you to replicate events to Elasticsearch without having any knowledge about your system insights like event contracts, or event types. The natural limitation of this approach is that events must be stored in EventStoreDB in JSON format.

Configuration

The connector is configured using the config.yaml file. You can find an example of this file in the repository.

The config file consists of three sections:

  • connector - the connector configuration
  • source - configuration for the EventStoreDB source
  • target - configuration for the Elasticsearch target

Connector configuration

The connector section requires one parameter - the connector id. This connector uses the connector id also as the checkpoint id.

connector:
  connectorId: "esdb-esdb-elastic-connector"
  diagnostics:
    enabled: true

If you run multiple instances of the connector, you should use different connector ids.

If you want to enable diagnostics, you need to configure the diagnostics section.

Source configuration

The source configuration is used to connect to the EventStoreDB, as well as configure the subscription. At the moment, the connector will unconditionally subscribe to $all stream.

The following configuration parameters are supported:

  • connectionString - EventStoreDB connection string using gRPC protocol. For example: esdb://localhost:2113?tls=false
  • concurrencyLimit - the subscription concurrency limit. The default value is 1.
source:
    connectionString: "esdb://localhost:2113?tls=false"
    concurrencyLimit: 1

When the subscription concurrency limit is higher than 1, the subscription will partition events between multiple Elasticsearch producer instances. As those producers will run in parallel, it will increase the overall throughput.

Target configuration

The target configuration is used to connect to Elasticsearch, as well as create the necessary elements in Elasticsearch (index template, index rollover policy, and data stream).

The following configuration parameters are supported:

  • connectionString - Elasticsearch connection string, should not be used when the cloudId is specified
  • cloudId - Elasticsearch cloud id, should be used when the connectionString is not specified
  • apiKey - Elasticsearch API key
  • dataStream - the index configuration section
    • indexName - the index name (data stream name)
    • template - the template section
      • templateName - the template name
      • numberOfShards - the number of shards for the data stream, default is 1
      • numberOfReplicas - the number of replicas for the data stream, default is 1
    • lifecycle - the lifecycle section
      • policyName - the rollover policy name
      • tiers - the rollover policy tiers, see the structure of a tier section below

The tier section is used to configure the rollover policy tiers. The tier name must match the available tier in your Elasticsearch cluster.

  • tier - the tier name (hot, warm, cold, etc)
  • minAge - the minimum age of the data stream (for example 10d for 10 days)
  • priority - the priority of the tier (0 is the lowest priority)
  • rollover - the rollover policy section
    • maxAge - the maximum index age
    • maxSize - the maximum index size
    • maxDocs - the maximum index documents
  • forceMerge - the force merge policy section
    • maxNumSegments - the maximum number of segments
  • readOnly - if the tier will be read only
  • delete - if the tier will be deleted

Diagnotics configuration

The connector is fully instrumented with traces and metrics. The following configuration parameters are supported:

  • enabled - if diagnostics are enabled
  • tracing - the tracing configuration
    • enabled - if tracing is enabled
    • exporters - the tracing exporters (zipkin, jaeger, otpl)
  • metrics - the metrics configuration
    • enabled - if metrics are enabled
    • exporters - the metrics exporters (prometheus, otpl)

Example:

connector:
  connectorId: "esdb-esdb-elastic-connector"
  diagnostics:
    tracing:
      enabled: true
      exporters: [zipkin]
    metrics:
      enabled: true
      exporters: [prometheus]
    traceSamplerProbability: 0

There’s no way to configure the exporters by now (endpoints, etc), but they accept the usual environment variables.

Data in Elasticsearch

The connector will use Elastic data stream to store events. Documents in the data stream are immutable, which is a good choice for storing events.

Based on the configuration, the connector will create the following elements in Elasticsearch:

  • Index template
  • Data stream
  • Index rollover policy

You can optimise the rollover policy to keep the index size optimal, as well as move older events to a cheaper storage tier.

Events are replicated to Elasticsearch in the following format:

  • messageId - the unique identifier of the event
  • messageType - the type of the event
  • streamPosition - event position in the original stream
  • stream - original stream name
  • globalPosition - position of the event in the global stream ($all)
  • message - the event payload
  • metadata - flattened event metadata
  • @timestamp - the timestamp of the event

5 - EventStoreDB to SQL Server

Project events from EventStoreDB to Microsoft SQL Server or Azure SQL

WIP

6 - Custom connector

When you need to connect to a sink that is not supported yet, you can build a custom connector.

WIP