Skip to main content
The Python SDK targets CPython 3.9+ and mirrors the ergonomics of the Node and Rust clients. It opens a persistent TCP control connection to the EventDBX control socket, performs a Noise XX handshake by default, and exchanges Cap’n Proto messages for aggregate reads/writes.

Feature highlights

  • Synchronous TCP client that speaks the same Cap’n Proto control protocol as eventdbxjs.
  • Noise XX (+ PSK) transport enabled by default; opt out with no_noise=True / EVENTDBX_NO_NOISE=1 for test labs.
  • Mutation helpers for create, apply, patch, archive, and restore, plus per-call publish target routing.
  • Read helpers for list, events, get, select, and verify with pagination and sorting.
  • Snapshot and schema/tenant admin helpers (create_snapshot, list_snapshots, tenant_schema_publish, tenant_assign, quotas, etc.).

Install

python -m pip install --upgrade pip
python -m pip install eventdbx
pycapnp depends on Cap’n Proto system libraries. Install them first (brew install capnp on macOS or apt-get install capnproto libcapnp-dev on Debian/Ubuntu) before running pip install.

Connect and configure

import json
import os

from eventdbx import EventDBXClient, PublishTarget, RetryOptions

with EventDBXClient(
    token=os.getenv("EVENTDBX_TOKEN", "control_token"),
    tenant_id=os.getenv("EVENTDBX_TENANT_ID", "tenant-123"),
    host=os.getenv("EVENTDBX_HOST", "127.0.0.1"),
    port=int(os.getenv("EVENTDBX_PORT", "6363")),
    verbose=True,  # set False to return boolean acknowledgements
    no_noise=False,  # set True or EVENTDBX_NO_NOISE=1 to skip Noise in test labs
    retry=RetryOptions(attempts=3, initial_delay_ms=100, max_delay_ms=1_000),
) as client:
    created = client.create(
        aggregate_type="orders",
        aggregate_id="ord_123",
        event_type="created",
        payload_json=json.dumps({"total": 42.15}),
        publish_targets=[PublishTarget(plugin="webhook", mode="async")],
    )
EventDBXClient performs the handshake during construction and is a context manager; exit the block to close the socket. payload_json and metadata_json accept serialized JSON strings. When verbose=True (default) mutation methods return stored JSON blobs; set verbose=False to receive boolean acknowledgements.

Runtime configuration

VariableDefaultDescription
EVENTDBX_HOST127.0.0.1Hostname or IP address of the control socket.
EVENTDBX_PORT6363TCP port for the control plane.
EVENTDBX_TOKENemptyControl token; required when constructing the client.
EVENTDBX_TENANT_IDemptyTenant identifier included in the initial hello.
EVENTDBX_NO_NOISEfalseSet 1/true to request plaintext transport (testing only).
Pass these values from your environment into EventDBXClient as shown above. Only EVENTDBX_NO_NOISE is read automatically when no_noise is omitted.

Manage authentication and tenancy

Tokens and tenant identifiers are required when constructing the client. Create separate client instances per tenant or per control token. The same socket is reused for all calls until the context exits or close() is invoked.

Retry configuration

Transport-level failures (socket resets, Cap’n Proto decode errors, etc.) can be retried automatically. Retries are disabled by default (attempts = 1); opt in by passing retry:
from eventdbx import EventDBXClient, RetryOptions

client = EventDBXClient(
    token="control_token",
    tenant_id="tenant-123",
    retry=RetryOptions(
        attempts=4,          # initial try + 3 retries
        initial_delay_ms=100,
        max_delay_ms=2_000,
    ),
)
Only transport errors are retried; logical API errors surface immediately.

Publish targets

Select which plugins receive a write, override payload mode, and bump priority per call:
from eventdbx import EventDBXClient, PublishTarget

with EventDBXClient(token="<token>", tenant_id="<tenant>") as client:
    client.apply(
        aggregate_type="invoice",
        aggregate_id="inv-1",
        event_type="invoice_paid",
        payload_json="{}",
        publish_targets=[
            PublishTarget(plugin="analytics-engine", mode="event-only"),
            PublishTarget(plugin="fraud-worker", mode="all", priority="high"),
        ],
    )
Omit publish_targets to fan out to every enabled plugin using their configured payload mode.

Write aggregates and events

import json

client.create(
    aggregate_type="person",
    aggregate_id="p-110",
    event_type="person_registered",
    payload_json=json.dumps({"first_name": "Jaya", "last_name": "Singh", "email": "[email protected]"}),
    metadata_json=json.dumps({"@actor": "svc-directory", "note": "created by [email protected]"}),
)

client.apply(
    aggregate_type="person",
    aggregate_id="p-110",
    event_type="person_email_updated",
    payload_json=json.dumps({"email": "[email protected]"}),
)

client.patch(
    aggregate_type="person",
    aggregate_id="p-110",
    event_type="person_registered",
    patches=[{"op": "replace", "path": "/first_name", "value": "Jayah"}],
)

client.archive(aggregate_type="person", aggregate_id="p-110", note="customer request")
client.restore(aggregate_type="person", aggregate_id="p-110")
create seeds a snapshot and first event atomically, apply appends events, patch issues RFC 6902 operations against historical payloads, and archive/restore toggle write access while preserving history.

Read aggregates and events

latest = client.get(aggregate_type="person", aggregate_id="p-110")

projection = client.select(
    aggregate_type="person",
    aggregate_id="p-110",
    fields=["payload.email", "metadata.@actor"],
)

first_page = client.list(
    take=25,
    filter_expr="person.archived = false AND person.last_name LIKE 'S%'",
    sort="aggregate_id",
)

if first_page.has_next_cursor:
    next_page = client.list(cursor=first_page.next_cursor)

history = client.events(aggregate_type="person", aggregate_id="p-110", take=100)
merkle_root = client.verify(aggregate_type="person", aggregate_id="p-110")
Use next_cursor to resume pagination. list returns aggregate snapshots; events returns full envelopes.

Filters, sorting, and pagination

Filters use the same SQL-like grammar as the server (field = value AND other_field > 10). Sort fields accept names such as aggregate_id, aggregate_type, version, created_at, updated_at, and archived. You can pass a string ("created_at:desc,aggregate_id:asc") or a list of AggregateSortOption objects. When sorting by timestamps, shorthand cursors (ts:<aggregate_type>:<aggregate_id>) can be used with cursor.

Snapshots and admin helpers

  • Snapshots: create_snapshot, list_snapshots, and get_snapshot handle point-in-time copies.
  • Schema & tenant admin: list_schemas / replace_schemas, tenant_schema_publish, tenant_assign / tenant_unassign, and quota helpers wrap control-plane maintenance flows.

Noise transport

Noise XX with a token-derived PSK is enabled by default. Disable it only for controlled testing by passing no_noise=True (or setting EVENTDBX_NO_NOISE). Production deployments should keep Noise on.

Development & testing

python3 -m venv .venv && source .venv/bin/activate
pip install -e ."[dev]"
pytest
If pycapnp fails to build, install the Cap’n Proto toolchain first and retry the pip install.