Skip to content

Add WebSocket API for real-time subscriptions #17

@kitplummer

Description

@kitplummer

Summary

Add a WebSocket endpoint alongside the Connect RPC API for real-time document change subscriptions. This makes peat-node accessible from browsers, CLI tools, and lightweight integrations without any RPC client library.

Motivation

The Subscribe RPC is already server-streaming, but consuming it requires a Connect/gRPC client. A plain WebSocket endpoint streaming JSON would enable:

  • Browser-based fleet dashboards with real-time updates (no gRPC-Web complexity)
  • Simple CLI monitoring tools (websocat ws://localhost:50051/ws/subscribe)
  • Language-agnostic integrations that can open a WebSocket but don't have a Connect RPC library
  • SSE (Server-Sent Events) as an alternative for HTTP/1.1-only environments

Proposed Design

WebSocket endpoint

GET /ws/subscribe?collections=platforms,commands
Upgrade: websocket

After upgrade, the server pushes JSON messages matching the DocumentChange proto schema:

{"collection":"platforms","docId":"alpha-agent","changeType":"CHANGE_TYPE_UPSERT","jsonData":"{...}"}
{"collection":"commands","docId":"cmd-1","changeType":"CHANGE_TYPE_UPSERT","jsonData":"{...}"}

Implementation

Since we're already using Axum (via connect-rust), adding WebSocket support is straightforward:

use axum::{extract::ws::WebSocket, routing::get};

let app = axum::Router::new()
    .route("/ws/subscribe", get(ws_subscribe_handler))
    .fallback_service(connect_router.into_axum_service());

The handler would:

  1. Accept query param collections (comma-separated, empty = all)
  2. Upgrade to WebSocket
  3. Subscribe to the node's broadcast channel (same as the RPC Subscribe)
  4. Stream JSON-serialized DocumentChange messages
  5. Optionally accept client messages for dynamic filter changes

Optional: Server-Sent Events (SSE)

For environments where WebSocket isn't available (some proxies, HTTP/1.1 only):

GET /sse/subscribe?collections=platforms
Accept: text/event-stream

Axum supports SSE via axum::response::sse::Sse.

References

  • src/service.rs — existing subscribe() RPC implementation (broadcast channel → stream)
  • src/main.rs — Axum router setup (add WebSocket route alongside Connect RPC)
  • connect-rust Axum integration already in place

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions