The JavaScript SDK (eventdbxjs) is a native Node-API addon (via napi-rs) that speaks directly to the EventDBX control socket over TCP/Cap’n Proto. It targets Node.js services, serverless API routes, and any runtime that exposes standard TCP primitives.
Feature highlights
- JSON (de)serialisation for aggregate snapshots and event envelopes.
- RFC 6902 JSON Patch helpers baked into
client.patch.
- Async/await-friendly API surface with Promise-based helpers.
- Automatic retries with configurable exponential backoff.
- Per-call overrides for tokens, metadata, patch operations, and archived filters.
- Cross-platform builds covering macOS, Linux, Windows, and WASI targets.
Install
npm install eventdbxjs
# or
pnpm add eventdbxjs
Requires Node.js 18+ or any runtime that exposes WHATWG fetch, Web Crypto
APIs, and raw TCP access to port 6363.
import { createClient } from "eventdbxjs";
const client = createClient({
ip: process.env.EVENTDBX_HOST ?? "127.0.0.1",
port: Number(process.env.EVENTDBX_PORT) || 6363,
token: process.env.EVENTDBX_TOKEN,
tenantId: process.env.EVENTDBX_TENANT_ID ?? "default",
verbose: false,
retry: {
attempts: 3,
initialDelayMs: 100,
maxDelayMs: 1_000,
},
});
await client.connect();
createClient falls back to environment variables when options are omitted. Set EVENTDBX_HOST, EVENTDBX_PORT, EVENTDBX_TOKEN, and EVENTDBX_TENANT_ID (for multi-tenant deployments) to avoid hardcoding credentials in code. The retry option enables exponential backoff without recreating the client.
Connection lifecycle tips
- Connect once per process and reuse the underlying TCP socket to avoid repeated TLS/token handshakes.
await client.disconnect() during graceful shutdown so the server releases cursors quickly.
await client.isConnected() lets you guard re-use when hot reloading in development.
Serverless or Edge entry points
import { createClient, type DbxClient } from "eventdbxjs";
let cached: DbxClient | null = null;
async function getClient() {
if (!cached) {
cached = createClient({
ip: process.env.EVENTDBX_HOST,
port: Number(process.env.EVENTDBX_PORT) || 6363,
token: process.env.EVENTDBX_TOKEN,
tenantId: process.env.EVENTDBX_TENANT_ID,
});
}
if (!(await cached.isConnected())) {
await cached.connect();
}
return cached;
}
export default async function handler(request: Request) {
const client = await getClient();
const aggregate = await client.get(
"person",
new URL(request.url).searchParams.get("id")!
);
return Response.json(aggregate);
}
Module-level caches keep the connection alive across invocations in serverless/Edge runtimes that support global scope reuse (Next.js, Remix, Bun’s serverless runtime, or any environment that exposes TCP primitives).
Runtime configuration
| Variable | Default | Description |
|---|
EVENTDBX_HOST | 127.0.0.1 | Hostname or IP address of the control socket. |
EVENTDBX_PORT | 6363 | TCP port that hosts the control plane. |
EVENTDBX_TOKEN | empty | Authentication token that the client forwards on connect. |
EVENTDBX_TENANT_ID | empty | Tenant identifier included in the handshake. |
Set these variables or pass explicit overrides through createClient. Multi-tenant deployments always require tenantId (or EVENTDBX_TENANT_ID) so the server routes control requests correctly.
Manage authentication and tenancy
Tokens and tenant identifiers can be set globally or per call. Every method that issues a request to the server accepts an optional token, so you can scope permissions to an end-user session without recreating the client.
const sessionToken = request.headers.get("x-eventdbx-token") ?? undefined;
await client.apply("person", "p-110", "person_email_updated", {
payload: { email: request.body.email },
token: sessionToken,
metadata: {
"@actor": request.headers.get("x-actor"),
},
note: `Changed by ${request.headers.get("x-actor")}`,
});
Use this pattern when a single process talks to multiple tenants or when privileged background jobs share infrastructure with user-triggered API routes.
Retry configuration
Transport-level failures (socket resets, Cap’n Proto decode errors, etc.) can be retried automatically. Retries are disabled by default (attempts = 1), so opt in by setting the retry object:
const client = createClient({
token: process.env.EVENTDBX_TOKEN,
retry: {
attempts: 4, // total tries = initial attempt + (attempts - 1) retries
initialDelayMs: 100,
maxDelayMs: 2_000,
},
});
Backoff doubles per retry until maxDelayMs is hit. Logical server errors (validation failures, authorization errors, etc.) still bubble to your application immediately.
Write aggregates and events
// create – register a new aggregate + first event in one call
await client.create("person", "p-110", "person_registered", {
payload: {
first_name: "Jaya",
last_name: "Singh",
email: "[email protected]",
},
metadata: {
"@actor": "svc-directory",
note: "person created by [email protected]",
},
});
// apply – append events to an existing aggregate
await client.apply("person", "p-110", "person_email_updated", {
payload: { email: "[email protected]" },
});
// patch – apply RFC 6902 JSON patches to historical payloads
await client.patch("person", "p-110", "person_registered", [
{ op: "replace", path: "/first_name", value: "Jayah" },
]);
// archive – archive an aggregate, move to secondary index
await client.archive("person", "p-110", {
comment: "big beautiful comment",
});
// restore – restore an aggreate, mvoe to primary index
await client.restore("person", "p-110");
create seeds a snapshot and first event atomically. apply appends new events. patch issues RFC 6902 operations against the requested historical payload, and archive/restore toggle write access while preserving history.
Read aggregates and events
// get – fetch the latest aggregate snapshot and metadata
type Person = { first_name: string; last_name: string; email: string };
const state: Person = await client.get("person", "p-110");
// list – paginate aggregates with filters, sort, and cursor helpers
const { items: people, nextCursor } = await client.list("person", {
take: 50,
cursor: "a:person:xxx",
filter: "person.status = true AND person.last_name = 'thach'",
});
// select – project fields without returning the rest of the payload
const summary = await client.select("person", "p-110", [
"first_name",
"last_name",
]);
// events – pull the full envelope stream
const { items: history } = await client.events("person", "p-110");
Use get when you need the entire snapshot, select for sparse projections, list to traverse many aggregates, and events for CDC-style replay. All read methods share the same pagination helpers and filter grammar, returning a PageResult whose items contain either aggregates or event envelopes.
Filters, sorting, and pagination
Filters use the same SQL-like shorthand as the EventDBX server (field = value AND other_field > 10), and sort fields accept names such as aggregateType, aggregateId, version, merkleRoot, and archived. Each entry in PageResult.items is the aggregate snapshot or event payload returned by the control socket.
const firstPage = await client.list("person", {
take: 25,
includeArchived: false,
filter: "person.archived = false AND person.last_name LIKE 'S%'",
sort: [{ field: "aggregateId" }, { field: "version", descending: true }],
});
if (firstPage.nextCursor) {
const nextPage = await client.list("person", {
cursor: firstPage.nextCursor,
});
// Resume until nextCursor is undefined
}
includeArchived, archivedOnly, and request-scoped token overrides are part of the PageOptions passed to list and events. Always feed the returned nextCursor back into the next call when you need to resume pagination after hitting server-side page limits.
When you sort aggregates by created_at or updated_at, you can use shorthand timestamp cursors with the same control socket used by eventdbxjs: pass cursor: "ts:<aggregate_type>:<aggregate_id>" alongside the timestamp sort and the server expands it into the full token (ts:<field>:<order>:<scope>:<timestamp_ms>:...) for you.
TypeScript surface area
The SDK exports helper types so you can strongly type payloads, retries, and pagination in your application.
/* eslint-disable */
export declare class DbxClient {
constructor(options?: ClientOptions | undefined | null);
/** Establish a TCP connection to the EventDBX control socket. */
connect(): Promise<void>;
/** Close the underlying socket, if connected. */
disconnect(): Promise<void>;
/** Returns `true` if a socket connection is currently held. */
isConnected(): Promise<boolean>;
/** Returns the configured endpoint (host/port). */
get endpoint(): ClientEndpoint;
/** List aggregates, optionally restricting to a specific aggregate type. */
list(
aggregateType?: string | undefined | null,
options?: PageOptions | undefined | null
): Promise<PageResult>;
/** Fetch a single aggregate snapshot. */
get(aggregateType: string, aggregateId: string): Promise<any | null>;
/** Select a subset of fields from an aggregate snapshot. */
select(
aggregateType: string,
aggregateId: string,
fields: Array<string>
): Promise<any | null>;
/** List events for an aggregate. */
events(
aggregateType: string,
aggregateId: string,
options?: PageOptions | undefined | null
): Promise<PageResult>;
/** Append a new event with an arbitrary JSON payload. */
apply(
aggregateType: string,
aggregateId: string,
eventType: string,
options?: AppendOptions | undefined | null
): Promise<any>;
/** Create an aggregate and emit its initial event. */
create(
aggregateType: string,
aggregateId: string,
eventType: string,
options?: CreateAggregateOptions | undefined | null
): Promise<any>;
/** Archive an aggregate. */
archive(
aggregateType: string,
aggregateId: string,
options?: SetArchiveOptions | undefined | null
): Promise<any>;
/** Restore an archived aggregate. */
restore(
aggregateType: string,
aggregateId: string,
options?: SetArchiveOptions | undefined | null
): Promise<any>;
/** Apply a JSON Patch to the aggregate. Returns the updated snapshot. */
patch(
aggregateType: string,
aggregateId: string,
eventType: string,
operations: JsonPatch[],
options?: PatchOptions | undefined | null
): Promise<any>;
/** Create a snapshot for the provided aggregate. */
createSnapshot(
aggregateType: string,
aggregateId: string,
options?: CreateSnapshotOptions | undefined | null
): Promise<any>;
/** List snapshots with optional aggregate filters. */
listSnapshots(
options?: ListSnapshotsOptions | undefined | null
): Promise<Array<any>>;
/** Fetch a snapshot by identifier. */
getSnapshot(
snapshotId: bigint | number,
options?: GetSnapshotOptions | undefined | null
): Promise<any | null>;
}
export interface AppendOptions {
payload?: JsonValue;
metadata?: JsonValue;
note?: string;
token?: string;
publishTargets?: Array<PublishTargetOptions>;
}
export interface ClientEndpoint {
ip: string;
port: number;
}
export interface ClientOptions {
ip?: string;
port?: number;
token?: string;
tenantId?: string;
verbose?: boolean;
retry?: RetryOptions;
}
export interface CreateAggregateOptions {
token?: string;
payload?: JsonValue;
metadata?: JsonValue;
note?: string;
publishTargets?: Array<PublishTargetOptions>;
}
export declare function createClient(
options?: ClientOptions | undefined | null
): DbxClient;
export interface CreateSnapshotOptions {
token?: string;
comment?: string;
}
export interface GetSnapshotOptions {
token?: string;
}
export type JsonPatch =
| JsonPatchAddReplaceTest
| JsonPatchRemove
| JsonPatchMoveCopy;
export interface JsonPatchAddReplaceTest {
op: "add" | "replace" | "test";
path: string;
value: JsonValue;
}
export interface JsonPatchMoveCopy {
op: "move" | "copy";
from: string;
path: string;
}
export interface JsonPatchRemove {
op: "remove";
path: string;
}
export type JsonValue = any;
export interface ListSnapshotsOptions {
aggregateType?: string;
aggregateId?: string;
version?: bigint | number;
token?: string;
}
export interface PageOptions {
cursor?: string;
take?: number;
includeArchived?: boolean;
archivedOnly?: boolean;
token?: string;
filter?: string;
sort?: string;
}
export interface PageResult {
items: Array<JsonValue>;
nextCursor?: string;
}
export interface PatchOptions {
metadata?: JsonValue;
note?: string;
token?: string;
publishTargets?: Array<PublishTargetOptions>;
}
export declare const enum PayloadMode {
All = "all",
EventOnly = "event-only",
StateOnly = "state-only",
SchemaOnly = "schema-only",
EventAndSchema = "event-and-schema",
ExtensionsOnly = "extensions-only",
}
export declare const enum Priority {
High = "high",
Normal = "normal",
Low = "low",
}
export interface PublishTargetOptions {
plugin: string;
mode?: PayloadMode;
priority?: Priority;
}
export interface RetryOptions {
attempts?: number;
initialDelayMs?: number;
maxDelayMs?: number;
}
export interface SetArchiveOptions {
token?: string;
note?: string;
comment?: string;
}
The generated index.d.ts file still types aggregate and event payloads as any, but the runtime values follow the Aggregate and Event shapes above, so you can layer your own domain-specific interfaces on top.