Skip to main content
The Go client opens a framed TCP session to the control plane, performs the Noise NNpsk0 handshake, and exchanges Cap’n Proto control messages. It mirrors the verb set used by the other SDKs and returns raw JSON strings so you can hydrate your own structs.

Feature highlights

  • Create/apply/patch/archive helpers with optional publish targets and metadata.
  • Aggregate reads for get, select, list, events, and verify with cursors, filters, and sorting.
  • Snapshot helpers (create, list, get) plus admin verbs for schemas and tenants.
  • Noise transport on by default; disable only for trusted lab sockets.
  • Retries with backoff and verbose mutation responses by default.

Install

go get github.com/eventdbx/eventdbx-go
Modules are tagged with semver (v0.x.y). Pin a specific tag in go.mod for reproducible builds.

Connect and configure

package main

import (
    "log"
    "os"
    "strconv"

    eventdbx "github.com/eventdbx/eventdbx-go"
)

func envOr(key, fallback string) string {
    if v := os.Getenv(key); v != "" {
        return v
    }
    return fallback
}

func envInt(key string, fallback int) int {
    if v := os.Getenv(key); v != "" {
        if n, err := strconv.Atoi(v); err == nil {
            return n
        }
    }
    return fallback
}

func mustEnv(key string) string {
    if v := os.Getenv(key); v != "" {
        return v
    }
    log.Fatalf("%s is required", key)
    return ""
}

func main() {
    cli, err := eventdbx.NewClient(eventdbx.Config{
        Host:     envOr("EVENTDBX_HOST", eventdbx.DefaultHost),
        Port:     envInt("EVENTDBX_PORT", eventdbx.DefaultPort),
        Token:    mustEnv("EVENTDBX_TOKEN"),
        TenantID: envOr("EVENTDBX_TENANT_ID", "default"),
        UseNoise: eventdbx.Bool(true),
    })
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()
}
NewClient performs the handshake and returns a connected client. Close it during graceful shutdown to release the socket.

Runtime configuration

VariableDefaultDescription
EVENTDBX_HOST127.0.0.1Hostname or IP address of the control socket.
EVENTDBX_PORT6363TCP port for the control plane.
EVENTDBX_TOKENemptyControl token sent during the handshake; required.
EVENTDBX_TENANT_IDdefaultTenant identifier included in the hello message.
EVENTDBX_NO_NOISEunset/falseWhen set to 1, disables Noise for plaintext lab sockets.

Write aggregates and events

note := "customer sign-up"
meta := `{"@actor":"svc-directory"}`
_, err := cli.Create("person", "p-110", "person_registered", `{
  "first_name":"Rafi",
  "last_name":"Stone",
  "email":"[email protected]"
}`, &eventdbx.ApplyOptions{
  Note:         &note,
  MetadataJSON: &meta,
})
if err != nil {
    log.Fatal(err)
}

_, _ = cli.Apply("person", "p-110", "person_email_updated", `{"email":"[email protected]"}`, nil)

_, _ = cli.Patch("person", "p-110", "person_registered", `[{"op":"replace","path":"/first_name","value":"Rafiq"}]`, nil)

_, _ = cli.Archive("person", "p-110", nil)
_, _ = cli.Restore("person", "p-110", nil)
Publish targets can be passed via ApplyOptions.PublishTargets or PatchOptions.PublishTargets to route events to specific plugins.

Read aggregates and events

latest, _ := cli.Get("person", "p-110")
if latest.Found {
    log.Println("aggregate json", latest.AggregateJSON)
}

projection, _ := cli.Select("person", "p-110", []string{"payload.first_name", "payload.email"})

take := uint64(25)
filter := "person.archived = false"
page, _ := cli.List(&eventdbx.ListAggregatesOptions{
    Take:   &take,
    Filter: &filter,
    SortOptions: []eventdbx.AggregateSortOption{
        {Field: eventdbx.AggregateSortCreatedAt, Descending: true},
    },
})

eventsTake := uint64(50)
events, _ := cli.Events("person", "p-110", &eventdbx.ListEventsOptions{Take: &eventsTake})
merkle, _ := cli.Verify("person", "p-110")
List and Events return JSON strings plus cursor metadata (HasNextCursor, NextCursor). Feed NextCursor into subsequent calls to resume paging.

Filters, sorting, and pagination

Filters use the same SQL-like grammar as the control plane (field = value AND other_field > 10). Sort fields accept aggregate_type, aggregate_id, created_at, updated_at, and archived. Provide a raw string via Sort ("created_at:desc,aggregate_id:asc") or structured options via SortOptions.

Snapshots and admin helpers

comment := "pre-migration"
_, _ = cli.CreateSnapshot("person", "p-110", &eventdbx.CreateSnapshotOptions{Comment: &comment})

aggType := "person"
snaps, _ := cli.ListSnapshots(&eventdbx.ListSnapshotsOptions{AggregateType: &aggType})
snap, _ := cli.GetSnapshot(42)

schemas, _ := cli.ListSchemas()
_, _ = cli.ReplaceSchemas(`{"person":{"version":1}}`)

assign, _ := cli.TenantAssign("tenant-123", "shard-a")
quota, _ := cli.TenantQuotaSet("tenant-123", 1024)
_, _ = cli.TenantSchemaPublish("default", &eventdbx.TenantSchemaPublishOptions{Activate: true, Reload: true})
These helpers cover point-in-time snapshots plus schema and tenant management flows (assign/unassign, quotas, reloads, and schema publication).

Noise transport

Noise NNpsk0 is enabled by default and derives a PSK from the control token. Disable only for trusted lab sockets by setting EVENTDBX_NO_NOISE=1 or UseNoise: eventdbx.Bool(false) in Config.

Testing

  • Unit tests run with go test ./....
  • Integration tests expect a running control plane; set EVENTDBX_INTEGRATION=1 along with EVENTDBX_TOKEN, EVENTDBX_TENANT_ID, and optionally EVENTDBX_HOST/EVENTDBX_PORT before running go test.
The Go client methods are synchronous and accept no context parameter; handle timeouts via Config.ReadTimeout and Config.ConnectTimeout.