VendoVendo Docs
Concepts

Events

Subscribe to server-sent events for real-time connection and billing state changes.

Events require Vendo mode (VENDO_API_KEY must be set). events.subscribe() throws VendoOnlyFeature in OSS mode.

The events API delivers a live stream of connection and billing state changes over SSE. Use it to update your UI in real time or to trigger cache invalidation in your application.

Subscribing

events.subscribe() yields EventStreamMessage objects. The SSE event name is on event.type; the parsed JSON payload is on event.data.

import asyncio
from vendo import AsyncVendo

async def main():
    client = AsyncVendo()
    async with client.events.subscribe() as stream:
        async for event in stream:
            print(event.type, event.data)
            if event.type == "connection.connected":
                slug = event.data.get("slug")
                print(f"{slug} connected!")

asyncio.run(main())

Stop by break-ing the loop or cancelling the enclosing Task.

events.subscribe() returns an async iterable. The SSE event name is on event.type; the parsed JSON payload is on event.data.

import { Vendo } from "@vendodev/sdk";

const vendo = new Vendo();

for await (const event of vendo.events.subscribe()) {
  console.log(event.type, event.data);
  if (event.type === "connection.connected") {
    const { slug } = event.data as { slug: string };
    console.log(`${slug} connected!`);
  }
}

Use an AbortController to stop cleanly:

const ac = new AbortController();

for await (const event of vendo.events.subscribe({ signal: ac.signal })) {
  if (shouldStop) {
    ac.abort();
    break;
  }
}

events.subscribe() returns an AsyncThrowingStream.

import Vendo

let vendo = try Vendo()
let stream = try vendo.events.subscribe()

for try await event in stream {
    print(event.type, event.data)
    if event.type == "connection.connected" {
        print("A connection was made!")
    }
}

Stop by breaking or cancelling the enclosing Task:

let task = Task {
    for try await event in stream {
        // ...
    }
}
// Later:
task.cancel()

Event kinds

The wire format names the event in the SSE event: field (which the SDKs surface as event.type). On the server this is the same string as the discriminator kind on the VendoEvent union — so for every emitted event the value is identical. Each event's data payload also carries id, at, and kind alongside the kind-specific fields below.

event.type (SSE name)data payload fieldsWhen it fires
connection.connectedid, at, kind, slug, connection_idA connection for this tenant was created
connection.disconnectedid, at, kind, slug, connection_idA connection was removed
connection.changedid, at, kind, slug, connection_idThe connection row's non-status fields changed (display name, metadata)
connection.status_changedid, at, kind, slug, connection_id, statusStatus transitioned. status is one of connected, available, pending_setup, needs_reauth, error, revoked
billing.usage_recordedid, at, kind, provider, microsA usage event was billed against the tenant's balance
billing.balance_changedid, at, kind, remaining_microsThe wallet balance changed (top-up, settle, refund)
billing.cap_warnedid, at, kind, window, pctApproaching a daily or monthly spend cap. window is daily or monthly; pct is the percentage consumed
billing.cap_trippedid, at, kind, windowA spend cap fired and is now blocking calls until the window resets

To detect "this connection needs re-auth", listen for connection.status_changed with status === "needs_reauth". There is no standalone connection.needs_reauth kind.

Auto-reconnect

The stream reconnects automatically with exponential backoff on transient failures:

  • Initial reconnect delay: 1s, doubling each attempt
  • Maximum backoff: 30s (with ~25% jitter)
  • Loop is unbounded — both the Python and TypeScript SDKs keep reconnecting until you break out of the iterator (Python) or abort the AbortSignal (TypeScript)

There is no fail-after-N-attempts policy; if you want to surface persistent connection failures, observe the TypeScript onReconnect(attempt, err) callback (or wrap the Python iterator) and decide how many attempts to tolerate yourself.

Using events with the reconciler

The reconciler uses events.subscribe() internally to invalidate the token cache and re-run the mapping immediately when connections change, instead of waiting for the 30-second poll cycle.

On this page