From 8b12ea310c3f67c1d7e5e27e1ca2e031b7eb497c Mon Sep 17 00:00:00 2001 From: Kent Bull Date: Wed, 6 May 2026 15:57:06 -0600 Subject: [PATCH] feat: SSE broadcaster for agent events Signed-off-by: Kent Bull --- docs/keria_app.rst | 8 +- src/keria/app/agenting.py | 21 ++++- src/keria/app/streaming.py | 183 ++++++++++++++++++++++++++++++++++++ tests/app/test_streaming.py | 122 ++++++++++++++++++++++++ 4 files changed, 332 insertions(+), 2 deletions(-) create mode 100644 src/keria/app/streaming.py create mode 100644 tests/app/test_streaming.py diff --git a/docs/keria_app.rst b/docs/keria_app.rst index 6427eb55..7b9791bb 100644 --- a/docs/keria_app.rst +++ b/docs/keria_app.rst @@ -103,6 +103,12 @@ keria.app.notifying .. automodule:: keria.app.notifying :members: +keria.app.streaming +------------------- + +.. automodule:: keria.app.streaming + :members: + keria.app.presenting -------------------- @@ -113,4 +119,4 @@ keria.app.specing ----------------- .. automodule:: keria.app.specing - :members: \ No newline at end of file + :members: diff --git a/src/keria/app/agenting.py b/src/keria/app/agenting.py index a05b19f3..35a64564 100644 --- a/src/keria/app/agenting.py +++ b/src/keria/app/agenting.py @@ -58,7 +58,15 @@ from keria.utils.openapi import dataclassFromFielddom -from . import aiding, notifying, indirecting, credentialing, ipexing, delegating +from . import ( + aiding, + notifying, + indirecting, + credentialing, + ipexing, + delegating, + streaming, +) from . import grouping as keriagrouping from .serving import GracefulShutdownDoer from .. import log_name, ogler, set_log_level @@ -629,6 +637,7 @@ def __init__(self, hby, rgy, agentHab, agency, caid, **opts): grants (Deck): IPEX grant messages. admits (Deck): IPEX admit messages. submits (Deck): KEL messages to be resubmitted to witnesses to obtain receipts of. + signalCues (Deck): Generic signed SSE signal cues for connected edge clients. """ self.agency = agency self.caid = caid @@ -659,6 +668,8 @@ def __init__(self, hby, rgy, agentHab, agency, caid, **opts): self.grants = decking.Deck() self.admits = decking.Deck() self.submits = decking.Deck() + self.signalCues = decking.Deck() + self.sseBroadcaster = streaming.SseBroadcaster() receiptor = agenting.Receiptor(hby=hby) self.witq = agenting.WitnessInquisitor(hby=self.hby) @@ -707,6 +718,12 @@ def __init__(self, hby, rgy, agentHab, agency, caid, **opts): verifier=self.verifier, notifier=self.notifier, ) + self.sseBroadcasterDoer = streaming.SseBroadcasterDoer( + agent=self, + cues=self.signalCues, + broadcaster=self.sseBroadcaster, + tock=self.tocks.get("sseBroadcaster", 0.0), + ) self.seeker = basing.Seeker( name=hby.name, @@ -839,6 +856,7 @@ def __init__(self, hby, rgy, agentHab, agency, caid, **opts): queries=self.queries, tock=self.tocks.get("exchangecue", 0.0), ), + self.sseBroadcasterDoer, self.submitter, ] ) @@ -962,6 +980,7 @@ def createAdminServerDoer(config: KERIAServerConfig, agency: Agency): keriaexchanging.loadEnds(app=adminApp) ipexing.loadEnds(app=adminApp) + streaming.loadEnds(app=adminApp) adminServer = createHttpServer( config.adminPort, adminApp, config.keyPath, config.certPath, config.caFilePath diff --git a/src/keria/app/streaming.py b/src/keria/app/streaming.py new file mode 100644 index 00000000..212aace5 --- /dev/null +++ b/src/keria/app/streaming.py @@ -0,0 +1,183 @@ +# -*- encoding: utf-8 -*- +"""Generic signed agent event streaming helpers for KERIA. + +This module owns the reusable transport contract between one KERIA agent and +its connected edge clients. Topic modules may publish events here, but topic +modules must not own SSE framing, subscriber fan-out, or KERI ``rpy`` envelope +signing. +""" + +import json +import time +from collections import deque + +import falcon +from hio.base import doing +from keri.core import eventing + +from .. import log_name, ogler + +logger = ogler.getLogger(log_name) + + +class SseBroadcaster: + """In-memory per-agent SSE broadcaster with independent subscriber queues.""" + + def __init__(self): + self.subscribers = {} + self._index = 0 + + def subscribe(self): + self._index += 1 + sid = str(self._index) + queue = deque() + self.subscribers[sid] = queue + return SseEventIterable(self, sid, queue) + + def unsubscribe(self, sid: str): + self.subscribers.pop(sid, None) + + def publish(self, event: str, data: dict, event_id: str): + payload = json.dumps(data).encode("utf-8") + frame = { + "id": event_id, + "event": event, + "data": payload, + } + for queue in list(self.subscribers.values()): + queue.append(frame) + + +class SseEventIterable: + """SSE iterable modeled after KERIpy signaling without shared draining.""" + + TimeoutSSE = 300 # seconds + + def __init__(self, broadcaster: SseBroadcaster, sid: str, queue, retry=5000): + self.broadcaster = broadcaster + self.sid = sid + self.queue = queue + self.retry = retry + self.start = None + self.end = None + + def __iter__(self): + self.start = self.end = time.perf_counter() + return self + + def __next__(self): + if self.end - self.start >= self.TimeoutSSE: + self.broadcaster.unsubscribe(self.sid) + raise StopIteration + + if self.start == self.end: + self.end = time.perf_counter() + return bytes(f"retry: {self.retry}\n\n".encode("utf-8")) + + data = bytearray() + while self.queue: + event = self.queue.popleft() + data.extend( + bytearray( + "id: {}\nretry: {}\nevent: {}\ndata: ".format( + event["id"], self.retry, event["event"] + ).encode("utf-8") + ) + ) + data.extend(event["data"]) + data.extend(b"\n\n") + + self.end = time.perf_counter() + return bytes(data) + + +class SseBroadcasterDoer(doing.Doer): + """Drain generic agent signal cues into the Agent-owned broadcaster.""" + + def __init__(self, agent, cues=None, broadcaster=None, tock=0.0): + self.agent = agent + if cues is None: + raise ValueError("cues is required") + if broadcaster is None: + raise ValueError("broadcaster is required") + self.cues = cues + self.broadcaster = broadcaster + super().__init__(tock=tock) + + def recur(self, tyme=None, tock=0.0, **opts): + while self.cues: + cue = self.cues.popleft() + try: + self.broadcaster.publish( + event=cue["event"], + data=signedReplyEnvelope( + self.agent, + route=cue["route"], + payload=cue["payload"], + ), + event_id=cue["event_id"], + ) + except Exception: # pragma: no cover - defensive transient logging + logger.exception("failed to publish SSE signal cue %s", cue) + + return False + + +def enqueueSignedReplyCue(cues, event: str, route: str, payload: dict, event_id: str): + """Queue one agent-signed reply for live SSE publication.""" + cues.append( + { + "event": event, + "route": route, + "payload": payload, + "event_id": event_id, + } + ) + + +def signedReplyEnvelope(agent, route: str, payload: dict) -> dict: + """Create a KERI ``rpy`` envelope signed by the KERIA agent AID.""" + data = dict(payload) + data.setdefault("agent", agent.agentHab.pre) + rserder = eventing.reply(route=route, data=data) + sigs = agent.agentHab.sign(ser=rserder.raw) + return {"rpy": rserder.ked, "sigs": [siger.qb64 for siger in sigs]} + + +def loadEnds(app): + """Register generic signed agent event streaming routes.""" + app.add_route("/signals/stream", SignalsStreamEnd()) + + +class SignalsStreamEnd: + """Signed admin SSE endpoint for agent-to-edge-controller events.""" + + def on_get(self, req, rep): + """Signal stream GET endpoint + + Parameters: + req: falcon.Request HTTP request + rep: falcon.Response HTTP response + --- + summary: Open authenticated agent signal stream + description: | + Opens an authenticated Server-Sent Events stream for live agent signals. + The stream sends an initial retry frame and later event frames whose + data is a KERI rpy envelope signed by the connected agent. + tags: + - Signals + responses: + 200: + description: Server-Sent Events stream for generic agent signals. + content: + text/event-stream: + schema: + type: string + description: SSE frames with id, retry, event, and JSON data fields. + """ + agent = req.context.agent + rep.status = falcon.HTTP_200 + rep.content_type = "text/event-stream" + rep.set_header("Cache-Control", "no-cache") + rep.set_header("connection", "close") + rep.stream = agent.sseBroadcaster.subscribe() diff --git a/tests/app/test_streaming.py b/tests/app/test_streaming.py new file mode 100644 index 00000000..4fc5775f --- /dev/null +++ b/tests/app/test_streaming.py @@ -0,0 +1,122 @@ +# -*- encoding: utf-8 -*- +""" +Generic agent SSE streaming tests. +""" + +import json +from types import SimpleNamespace + +import falcon +import pytest +from hio.help import decking +from keri.core import indexing, serdering + +from keria.app import streaming + + +def test_sse_broadcaster_uses_independent_subscriber_queues(): + broadcaster = streaming.SseBroadcaster() + left = iter(broadcaster.subscribe()) + right = iter(broadcaster.subscribe()) + + assert next(left) == b"retry: 5000\n\n" + assert next(right) == b"retry: 5000\n\n" + + broadcaster.publish(event="topic", data={"value": 1}, event_id="event-1") + + left_frame = next(left).decode("utf-8") + right_frame = next(right).decode("utf-8") + assert "id: event-1" in left_frame + assert "event: topic" in left_frame + assert json.loads(left_frame.split("data: ", 1)[1].split("\n\n", 1)[0]) == { + "value": 1 + } + assert right_frame == left_frame + + +def test_signed_reply_envelope_is_signed_by_agent(helpers): + with helpers.openKeria() as (_agency, agent, _app, _client): + envelope = streaming.signedReplyEnvelope( + agent, "/test/route", {"subject": "value"} + ) + + rserder = serdering.SerderKERI(sad=envelope["rpy"]) + siger = indexing.Siger(qb64=envelope["sigs"][0]) + assert rserder.ked["r"] == "/test/route" + assert rserder.ked["a"]["agent"] == agent.agentHab.pre + assert rserder.ked["a"]["subject"] == "value" + assert agent.agentHab.kever.verfers[0].verify(sig=siger.raw, ser=rserder.raw) + + +def test_sse_broadcaster_doer_drains_queued_signed_reply(helpers): + with helpers.openKeria() as (_agency, agent, _app, _client): + doer = agent.sseBroadcasterDoer + left = iter(agent.sseBroadcaster.subscribe()) + right = iter(agent.sseBroadcaster.subscribe()) + assert next(left) == b"retry: 5000\n\n" + assert next(right) == b"retry: 5000\n\n" + + streaming.enqueueSignedReplyCue( + agent.signalCues, + event="topic", + route="/test/route", + payload={"subject": "value"}, + event_id="event-1", + ) + + assert next(left) == b"" + + doer.recur(0) + left_frame = next(left).decode("utf-8") + right_frame = next(right).decode("utf-8") + assert "id: event-1" in left_frame + assert "event: topic" in left_frame + assert right_frame == left_frame + + envelope = json.loads(left_frame.split("data: ", 1)[1].split("\n\n", 1)[0]) + rserder = serdering.SerderKERI(sad=envelope["rpy"]) + siger = indexing.Siger(qb64=envelope["sigs"][0]) + assert rserder.ked["r"] == "/test/route" + assert rserder.ked["a"]["agent"] == agent.agentHab.pre + assert rserder.ked["a"]["subject"] == "value" + assert agent.agentHab.kever.verfers[0].verify(sig=siger.raw, ser=rserder.raw) + + +def test_agent_wires_sse_broadcaster_doer_without_topic_config(helpers): + with helpers.openKeria() as (_agency, agent, _app, _client): + assert isinstance(agent.sseBroadcaster, streaming.SseBroadcaster) + assert isinstance(agent.sseBroadcasterDoer, streaming.SseBroadcasterDoer) + assert agent.sseBroadcasterDoer in agent.doers + assert agent.sseBroadcasterDoer.cues is agent.signalCues + assert agent.sseBroadcasterDoer.broadcaster is agent.sseBroadcaster + + +def test_sse_broadcaster_doer_requires_explicit_cues_and_broadcaster(): + with pytest.raises(ValueError, match="cues is required"): + streaming.SseBroadcasterDoer( + SimpleNamespace(), broadcaster=streaming.SseBroadcaster() + ) + + with pytest.raises(ValueError, match="broadcaster is required"): + streaming.SseBroadcasterDoer(SimpleNamespace(), cues=decking.Deck()) + + +def test_signals_stream_endpoint_returns_sse_stream(helpers): + with helpers.openKeria() as (_agency, agent, _app, _client): + req = SimpleNamespace(context=SimpleNamespace(agent=agent)) + headers = {} + rep = SimpleNamespace( + status=None, + content_type=None, + stream=None, + set_header=lambda name, value: headers.__setitem__(name, value), + ) + + streaming.SignalsStreamEnd().on_get(req, rep) + + assert rep.status == falcon.HTTP_200 + assert rep.content_type == "text/event-stream" + assert headers["Cache-Control"] == "no-cache" + assert headers["connection"] == "close" + assert rep.stream.broadcaster is agent.sseBroadcaster + assert next(iter(rep.stream)) == b"retry: 5000\n\n"