The .NET SDK targets .NET 8+, opens a persistent TCP connection to the EventDBX control socket, and speaks Cap’n Proto over Noise NNpsk0 by default. It returns JSON payloads so you can hydrate POCOs with System.Text.Json or forward envelopes directly to workers.
Feature highlights
- Async-first API surface with Task-returning methods and
IAsyncDisposable cleanup.
- Create/apply/patch/archive helpers plus per-call publish target routing to plugins.
- Aggregate reads for
get, select, list, events, and verify with filters, cursors, and sorting.
- Snapshot helpers (
create, list, get) and control-plane admin for schemas and tenants.
- Noise transport enabled by default; opt out with
UseNoise = false for lab sockets.
- Per-call token overrides; tenant selection happens during the initial handshake.
Install
dotnet add package EventDbx.Client
Targets .NET 8.0. The package is cross-platform and works on Windows, Linux, and macOS.
using EventDbx;
var options = new EventDbxClientOptions
{
Host = Environment.GetEnvironmentVariable("EVENTDBX_HOST") ?? "127.0.0.1",
Port = int.TryParse(Environment.GetEnvironmentVariable("EVENTDBX_PORT"), out var port) ? port : 6363,
Token = Environment.GetEnvironmentVariable("EVENTDBX_TOKEN") ?? throw new InvalidOperationException("EVENTDBX_TOKEN is required"),
TenantId = Environment.GetEnvironmentVariable("EVENTDBX_TENANT_ID") ?? "default",
RequestTimeout = TimeSpan.FromSeconds(10),
ConnectTimeout = TimeSpan.FromSeconds(5),
UseNoise = true, // set false when connecting to plaintext lab sockets
};
await using var client = await EventDbxClient.ConnectAsync(options);
ConnectAsync performs the handshake and returns a connected client. Dispose it during graceful shutdown to release the socket.
Runtime configuration
| Variable | Default | Description |
|---|
EVENTDBX_HOST | 127.0.0.1 | Hostname or IP address of the control socket. |
EVENTDBX_PORT | 6363 | TCP port for the control plane. |
EVENTDBX_TOKEN | empty | Control token sent during the handshake; required. |
EVENTDBX_TENANT_ID | default | Tenant identifier included in the hello message. |
Pass these into EventDbxClientOptions; the client does not read environment variables automatically.
Manage authentication and tenancy
Override tokens per call to scope permissions. Tenant IDs are fixed at connect time; spin up a separate client per tenant.
var sessionToken = httpContext.Request.Headers["x-eventdbx-token"].FirstOrDefault();
var updated = await client.AppendEventAsync(new AppendEventParams
{
AggregateType = "person",
AggregateId = "p-110",
EventType = "person_email_updated",
Payload = new { email = "[email protected]" },
Metadata = new Dictionary<string, object?> { ["@actor"] = httpContext.User.Identity?.Name },
Note = "Changed by profile service",
PublishTargets = new[] { new PublishTarget(plugin: "webhook", mode: "async") },
Token = string.IsNullOrWhiteSpace(sessionToken) ? null : sessionToken,
});
If no override is provided, the client reuses the token from EventDbxClientOptions.
Write aggregates and events
await client.CreateAggregateAsync(new CreateAggregateParams
{
AggregateType = "person",
AggregateId = "p-110",
EventType = "person_registered",
Payload = new { first_name = "Jaya", last_name = "Singh", email = "[email protected]" },
Metadata = new Dictionary<string, object?> { ["@actor"] = "svc-directory" },
});
await client.AppendEventAsync(new AppendEventParams
{
AggregateType = "person",
AggregateId = "p-110",
EventType = "person_email_updated",
Payload = new { email = "[email protected]" },
});
await client.PatchEventAsync(new PatchEventParams
{
AggregateType = "person",
AggregateId = "p-110",
EventType = "person_registered",
Patch = new[] { new { op = "replace", path = "/first_name", value = "Jayah" } },
});
await client.SetArchiveAsync(new SetArchiveParams
{
AggregateType = "person",
AggregateId = "p-110",
Archived = true,
Note = "customer request",
});
// restore by setting Archived = false
await client.SetArchiveAsync(new SetArchiveParams
{
AggregateType = "person",
AggregateId = "p-110",
Archived = false,
});
Payloads, metadata, and patches are serialized with System.Text.Json; anonymous types or POCOs both work.
Read aggregates and events
using System.Text.Json;
var latest = await client.GetAggregateAsync("person", "p-110");
if (latest.Found)
{
using var doc = JsonDocument.Parse(latest.AggregateJson!);
var email = doc.RootElement.GetProperty("payload").GetProperty("email").GetString();
}
var projection = await client.SelectAggregateAsync(new SelectAggregateParams
{
AggregateType = "person",
AggregateId = "p-110",
Fields = new[] { "payload.first_name", "payload.email" },
});
var firstPage = await client.ListAggregatesAsync(new ListAggregatesOptions
{
Take = 25,
Filter = "person.archived = false AND person.last_name LIKE 'S%'",
Sort = new[] { new AggregateSortOption(AggregateSortField.CreatedAt, descending: true) },
});
if (firstPage.HasNextCursor)
{
var next = await client.ListAggregatesAsync(new ListAggregatesOptions { Cursor = firstPage.NextCursor });
}
var history = await client.ListEventsAsync("person", "p-110", new ListEventsOptions { Take = 100 });
var merkle = await client.VerifyAggregateAsync("person", "p-110");
ListAggregatesAsync and ListEventsAsync return JSON strings and cursor metadata; feed NextCursor into subsequent calls to resume paging.
Filters, sorting, and pagination
Filters use the same SQL-like grammar as the control plane (field = value AND other_field > 10). Sort fields accept names such as aggregate_type, aggregate_id, created_at, updated_at, and archived. You can pass a raw string via SortText ("created_at:desc,aggregate_id:asc") or a strongly typed list of AggregateSortOption. Timestamp cursors (ts:<aggregate_type>:<aggregate_id>) pass through unchanged when used with timestamp sorts.
Snapshots and admin helpers
await client.CreateSnapshotAsync(new CreateSnapshotParams
{
AggregateType = "person",
AggregateId = "p-110",
Comment = "pre-migration snapshot",
});
var snapshots = await client.ListSnapshotsAsync(new ListSnapshotsOptions { AggregateType = "person" });
var snapshot = await client.GetSnapshotAsync(new GetSnapshotParams { SnapshotId = 42 });
var schemasJson = await client.ListSchemasAsync();
var replaced = await client.ReplaceSchemasAsync(new ReplaceSchemasParams { Schemas = new { person = new { version = 1 } } });
var publish = await client.TenantSchemaPublishAsync(new TenantSchemaPublishParams
{
TenantId = "default",
Activate = true,
Reload = true,
});
var assigned = await client.TenantAssignAsync(new TenantAssignParams { TenantId = "tenant-123", ShardId = "shard-a" });
var quota = await client.TenantQuotaSetAsync(new TenantQuotaSetParams { TenantId = "tenant-123", MaxStorageMb = 1024 });
These helpers cover point-in-time snapshots plus schema and tenant management flows (assign/unassign, quotas, reloads, and schema publication).
Noise transport
Noise NNpsk0 is enabled by default and derives a PSK from the control token. Only disable it for trusted lab sockets by setting UseNoise = false in EventDbxClientOptions.