This is the multi-page printable view of this section.
Click here to print.
Return to the regular view of this page.
Eventuous Connectors
Eventuous Connector allow subscribing to events from a source subscription and produce derived events to another system (sink), as well as execute reduce-like operations that work as projections.
Right now, the only source that is built-in to the Connector application is the EventStoreDB source. You can, however, build a custom connector using another Eventuous subscription type, or your own subscription implementation.
🧑🏽💻
Eventuous Connector is a pre-built ready-to-use application. You can find its source code, which include the code for all the provided sinks, in the
GitHub repository.
1 - Connector concept
What is Eventuous Connector and how to use it
Eventuous connectors allow subscribing to events from EventStoreDB (source) and produce derived events to another system (sink). Some sinks also support reduce-like operations that work as projections.
The Connector is a combination of a real-time subscription (currently only to $all
stream) and a sink. A sink can be configured to produce (producer mode) or to project (projector mode) events. Some sinks support both modes, some only support one.
Currently, the following sinks are supported:
👉
Because the Connector uses all the features of Eventuous, it is able to execute both produce and project operations in parallel using the
partitioning filter. The only partition key supported right now is the
stream name. In addition, each sink is fully instrumented for observability with traces and metrics.
Connector modes
Below, you can find a description of the different modes that are supported by the connectors.
Producer mode
In producer mode, the connector subscribes to the source and produces events to the sink. How events will look like on the other side depends on the sink implementation.
The producer mode is most useful for the purpose of re-publishing domain events to a broker, or to be used as an archive or backup. For example, the Elasticsearch sink can be used in combination with the archive event store to keep archived events in a cheaper persistence tier, whilst keeping the EventStoreDB database size contained.
When used in combination with a message bus sink, the connector can be used to publish integration events for other services to consume. However, right now the connector lacks the transformation capabilities, so we don’t recommend using it in this way when the connector is deployed as-is. However, you can still build a custom connector for that purpose.
Projector mode
The projector mode allows to implement projections that are built using other stacks than .NET. Basically, the projector mode implements the MapReduce pattern. Using a projector you can reduce events to a single piece of state using some unique key.
Because the reduce function requires custom implementation, Eventuous Connector needs to call some custom code that will get events and send the reduce function back. That custom code can be completely stateless and, therefore, can execute in parallel, although the Connector will maintain ordered event processing.
The only mode that the connector is able to execute such custom code at this moment is by using an external gRPC service with bidirectional streaming. The connector will send events to the gRPC service and the gRPC service will send the reduce function back. Each projector sink implementation requires using its own reduce functions set. With this model, the external gRPC service implements a gRPC server, and the connector will connect to it as a client.
Using a gRPC service for reduce functions
A custom gRPC service can be built in any language or stack that supports gRPC. You’d normally deploy it as a sidecar for the Connector pod in Kubernetes or a serverless workload.
For example, a MongoDB projector will expect to get operations line IndexOne
or UpdateOne
back from the gRPC service, and SQL Server projector uses a single operation that returns an arbitrary SQL statement based on event data.
Example: SQL Server projector sidecar
The Connector role here is to maintain the subscription to EventStoreDB, send events to the gRPC service, receive the reduce function back, execute the response, and maintain the checkpoint. By convention, each sink uses its database for checkpointing.
As a result, it’s possible to build a stack-agnostic stateless projector and use the Connector to do the heavy lifting. Each sink also provides observability instrumentation for the database client library it uses.
Refer to the specific connector documentation page for more information.
2 - Configuration and deployment
How to configure and deploy Eventuous Connector
Eventuous Connector with EventStoreDB source needs to be hosted as a continuously running service because it must maintain a realtime gRPC subscription. When you deploy a projector, it can be deployed as a sidecar for the connector, or as a standalone service. It could be possible to deploy it as a serverless workload if the serverless solution supports gRPC streaming.
3 - EventStoreDB to MongoDB
Project events from EventStoreDB to MongoDB
WIP
4 - EventStoreDB to Elastic
Replicate and project events from EventStoreDB to Elasticsearch
An event-sourced system that stores domain events in EventStoreDB could benefit a lot from replicating these events to a search engine, such as Elasticsearch.
When all the events are available in Elasticsearch, the system can be queried for events, and when events properly convey information about business operations, you can discover quite a lot using tools like Kibana.
This connector allows you to replicate events to Elasticsearch without having any knowledge about your system insights like event contracts, or event types. The natural limitation of this approach is that events must be stored in EventStoreDB in JSON format.
Configuration
The connector is configured using the config.yaml
file. You can find an example of this file in the repository.
The config file consists of three sections:
connector
- the connector configuration
source
- configuration for the EventStoreDB source
target
- configuration for the Elasticsearch target
Connector configuration
The connector section requires one parameter - the connector id. This connector uses the connector id also as the checkpoint id.
connector:
connectorId: "esdb-esdb-elastic-connector"
diagnostics:
enabled: true
If you run multiple instances of the connector, you should use different connector ids.
If you want to enable diagnostics, you need to configure the diagnostics
section.
Source configuration
The source configuration is used to connect to the EventStoreDB, as well as configure the subscription. At the moment, the connector will unconditionally subscribe to $all
stream.
The following configuration parameters are supported:
connectionString
- EventStoreDB connection string using gRPC protocol. For example: esdb://localhost:2113?tls=false
concurrencyLimit
- the subscription concurrency limit. The default value is 1
.
source:
connectionString: "esdb://localhost:2113?tls=false"
concurrencyLimit: 1
When the subscription concurrency limit is higher than 1
, the subscription will partition events between multiple Elasticsearch producer instances. As those producers will run in parallel, it will increase the overall throughput.
Target configuration
The target configuration is used to connect to Elasticsearch, as well as create the necessary elements in Elasticsearch (index template, index rollover policy, and data stream).
The following configuration parameters are supported:
connectionString
- Elasticsearch connection string, should not be used when the cloudId
is specified
cloudId
- Elasticsearch cloud id, should be used when the connectionString
is not specified
apiKey
- Elasticsearch API key
dataStream
- the index configuration section
indexName
- the index name (data stream name)
template
- the template section
templateName
- the template name
numberOfShards
- the number of shards for the data stream, default is 1
numberOfReplicas
- the number of replicas for the data stream, default is 1
lifecycle
- the lifecycle section
policyName
- the rollover policy name
tiers
- the rollover policy tiers, see the structure of a tier
section below
The tier
section is used to configure the rollover policy tiers. The tier name must match the available tier in your Elasticsearch cluster.
tier
- the tier name (hot
, warm
, cold
, etc)
minAge
- the minimum age of the data stream (for example 10d
for 10 days)
priority
- the priority of the tier (0
is the lowest priority)
rollover
- the rollover policy section
maxAge
- the maximum index age
maxSize
- the maximum index size
maxDocs
- the maximum index documents
forceMerge
- the force merge policy section
maxNumSegments
- the maximum number of segments
readOnly
- if the tier will be read only
delete
- if the tier will be deleted
Diagnotics configuration
The connector is fully instrumented with traces and metrics. The following configuration parameters are supported:
enabled
- if diagnostics are enabled
tracing
- the tracing configuration
enabled
- if tracing is enabled
exporters
- the tracing exporters (zipkin, jaeger, otpl)
metrics
- the metrics configuration
enabled
- if metrics are enabled
exporters
- the metrics exporters (prometheus, otpl)
Example:
connector:
connectorId: "esdb-esdb-elastic-connector"
diagnostics:
tracing:
enabled: true
exporters: [zipkin]
metrics:
enabled: true
exporters: [prometheus]
traceSamplerProbability: 0
There’s no way to configure the exporters by now (endpoints, etc), but they accept the usual environment variables.
Data in Elasticsearch
The connector will use Elastic data stream to store events. Documents in the data stream are immutable, which is a good choice for storing events.
Based on the configuration, the connector will create the following elements in Elasticsearch:
- Index template
- Data stream
- Index rollover policy
You can optimise the rollover policy to keep the index size optimal, as well as move older events to a cheaper storage tier.
Events are replicated to Elasticsearch in the following format:
messageId
- the unique identifier of the event
messageType
- the type of the event
streamPosition
- event position in the original stream
stream
- original stream name
globalPosition
- position of the event in the global stream ($all
)
message
- the event payload
metadata
- flattened event metadata
@timestamp
- the timestamp of the event
5 - EventStoreDB to SQL Server
Project events from EventStoreDB to Microsoft SQL Server or Azure SQL
WIP
6 - Custom connector
When you need to connect to a sink that is not supported yet, you can build a custom connector.
WIP