The Python SDK targets CPython 3.9+ and mirrors the ergonomics of the Node and Rust clients. It opens a persistent TCP control connection to the EventDBX control socket, performs a Noise XX handshake by default, and exchanges Cap’n Proto messages for aggregate reads/writes.
Feature highlights
- Synchronous TCP client that speaks the same Cap’n Proto control protocol as
eventdbxjs.
- Noise XX (+ PSK) transport enabled by default; opt out with
no_noise=True / EVENTDBX_NO_NOISE=1 for test labs.
- Mutation helpers for
create, apply, patch, archive, and restore, plus per-call publish target routing.
- Read helpers for
list, events, get, select, and verify with pagination and sorting.
- Snapshot and schema/tenant admin helpers (
create_snapshot, list_snapshots, tenant_schema_publish, tenant_assign, quotas, etc.).
Install
python -m pip install --upgrade pip
python -m pip install eventdbx
pycapnp depends on Cap’n Proto system libraries. Install them first
(brew install capnp on macOS or apt-get install capnproto libcapnp-dev
on Debian/Ubuntu) before running pip install.
import json
import os
from eventdbx import EventDBXClient, PublishTarget, RetryOptions
with EventDBXClient(
token=os.getenv("EVENTDBX_TOKEN", "control_token"),
tenant_id=os.getenv("EVENTDBX_TENANT_ID", "tenant-123"),
host=os.getenv("EVENTDBX_HOST", "127.0.0.1"),
port=int(os.getenv("EVENTDBX_PORT", "6363")),
verbose=True, # set False to return boolean acknowledgements
no_noise=False, # set True or EVENTDBX_NO_NOISE=1 to skip Noise in test labs
retry=RetryOptions(attempts=3, initial_delay_ms=100, max_delay_ms=1_000),
) as client:
created = client.create(
aggregate_type="orders",
aggregate_id="ord_123",
event_type="created",
payload_json=json.dumps({"total": 42.15}),
publish_targets=[PublishTarget(plugin="webhook", mode="async")],
)
EventDBXClient performs the handshake during construction and is a context manager; exit the block to close the socket. payload_json and metadata_json accept serialized JSON strings. When verbose=True (default) mutation methods return stored JSON blobs; set verbose=False to receive boolean acknowledgements.
Runtime configuration
| Variable | Default | Description |
|---|
EVENTDBX_HOST | 127.0.0.1 | Hostname or IP address of the control socket. |
EVENTDBX_PORT | 6363 | TCP port for the control plane. |
EVENTDBX_TOKEN | empty | Control token; required when constructing the client. |
EVENTDBX_TENANT_ID | empty | Tenant identifier included in the initial hello. |
EVENTDBX_NO_NOISE | false | Set 1/true to request plaintext transport (testing only). |
Pass these values from your environment into EventDBXClient as shown above. Only EVENTDBX_NO_NOISE is read automatically when no_noise is omitted.
Manage authentication and tenancy
Tokens and tenant identifiers are required when constructing the client. Create separate client instances per tenant or per control token. The same socket is reused for all calls until the context exits or close() is invoked.
Retry configuration
Transport-level failures (socket resets, Cap’n Proto decode errors, etc.) can be retried automatically. Retries are disabled by default (attempts = 1); opt in by passing retry:
from eventdbx import EventDBXClient, RetryOptions
client = EventDBXClient(
token="control_token",
tenant_id="tenant-123",
retry=RetryOptions(
attempts=4, # initial try + 3 retries
initial_delay_ms=100,
max_delay_ms=2_000,
),
)
Only transport errors are retried; logical API errors surface immediately.
Publish targets
Select which plugins receive a write, override payload mode, and bump priority per call:
from eventdbx import EventDBXClient, PublishTarget
with EventDBXClient(token="<token>", tenant_id="<tenant>") as client:
client.apply(
aggregate_type="invoice",
aggregate_id="inv-1",
event_type="invoice_paid",
payload_json="{}",
publish_targets=[
PublishTarget(plugin="analytics-engine", mode="event-only"),
PublishTarget(plugin="fraud-worker", mode="all", priority="high"),
],
)
Omit publish_targets to fan out to every enabled plugin using their configured payload mode.
Write aggregates and events
import json
client.create(
aggregate_type="person",
aggregate_id="p-110",
event_type="person_registered",
payload_json=json.dumps({"first_name": "Jaya", "last_name": "Singh", "email": "[email protected]"}),
metadata_json=json.dumps({"@actor": "svc-directory", "note": "created by [email protected]"}),
)
client.apply(
aggregate_type="person",
aggregate_id="p-110",
event_type="person_email_updated",
payload_json=json.dumps({"email": "[email protected]"}),
)
client.patch(
aggregate_type="person",
aggregate_id="p-110",
event_type="person_registered",
patches=[{"op": "replace", "path": "/first_name", "value": "Jayah"}],
)
client.archive(aggregate_type="person", aggregate_id="p-110", note="customer request")
client.restore(aggregate_type="person", aggregate_id="p-110")
create seeds a snapshot and first event atomically, apply appends events, patch issues RFC 6902 operations against historical payloads, and archive/restore toggle write access while preserving history.
Read aggregates and events
latest = client.get(aggregate_type="person", aggregate_id="p-110")
projection = client.select(
aggregate_type="person",
aggregate_id="p-110",
fields=["payload.email", "metadata.@actor"],
)
first_page = client.list(
take=25,
filter_expr="person.archived = false AND person.last_name LIKE 'S%'",
sort="aggregate_id",
)
if first_page.has_next_cursor:
next_page = client.list(cursor=first_page.next_cursor)
history = client.events(aggregate_type="person", aggregate_id="p-110", take=100)
merkle_root = client.verify(aggregate_type="person", aggregate_id="p-110")
Use next_cursor to resume pagination. list returns aggregate snapshots; events returns full envelopes.
Filters, sorting, and pagination
Filters use the same SQL-like grammar as the server (field = value AND other_field > 10). Sort fields accept names such as aggregate_id, aggregate_type, version, created_at, updated_at, and archived. You can pass a string ("created_at:desc,aggregate_id:asc") or a list of AggregateSortOption objects. When sorting by timestamps, shorthand cursors (ts:<aggregate_type>:<aggregate_id>) can be used with cursor.
Snapshots and admin helpers
- Snapshots:
create_snapshot, list_snapshots, and get_snapshot handle point-in-time copies.
- Schema & tenant admin:
list_schemas / replace_schemas, tenant_schema_publish, tenant_assign / tenant_unassign, and quota helpers wrap control-plane maintenance flows.
Noise transport
Noise XX with a token-derived PSK is enabled by default. Disable it only for controlled testing by passing no_noise=True (or setting EVENTDBX_NO_NOISE). Production deployments should keep Noise on.
Development & testing
python3 -m venv .venv && source .venv/bin/activate
pip install -e ."[dev]"
pytest
If pycapnp fails to build, install the Cap’n Proto toolchain first and retry the pip install.