-
Notifications
You must be signed in to change notification settings - Fork 38
feat: SSE broadcaster for agent events #435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,183 @@ | ||
| # -*- encoding: utf-8 -*- | ||
| """Generic signed agent event streaming helpers for KERIA. | ||
|
|
||
| This module owns the reusable transport contract between one KERIA agent and | ||
| its connected edge clients. Topic modules may publish events here, but topic | ||
| modules must not own SSE framing, subscriber fan-out, or KERI ``rpy`` envelope | ||
| signing. | ||
| """ | ||
|
|
||
| import json | ||
| import time | ||
| from collections import deque | ||
|
|
||
| import falcon | ||
| from hio.base import doing | ||
| from keri.core import eventing | ||
|
|
||
| from .. import log_name, ogler | ||
|
|
||
| logger = ogler.getLogger(log_name) | ||
|
|
||
|
|
||
| class SseBroadcaster: | ||
| """In-memory per-agent SSE broadcaster with independent subscriber queues.""" | ||
|
|
||
| def __init__(self): | ||
| self.subscribers = {} | ||
| self._index = 0 | ||
|
|
||
| def subscribe(self): | ||
| self._index += 1 | ||
| sid = str(self._index) | ||
| queue = deque() | ||
| self.subscribers[sid] = queue | ||
| return SseEventIterable(self, sid, queue) | ||
|
|
||
| def unsubscribe(self, sid: str): | ||
| self.subscribers.pop(sid, None) | ||
|
|
||
| def publish(self, event: str, data: dict, event_id: str): | ||
| payload = json.dumps(data).encode("utf-8") | ||
| frame = { | ||
| "id": event_id, | ||
| "event": event, | ||
| "data": payload, | ||
| } | ||
| for queue in list(self.subscribers.values()): | ||
| queue.append(frame) | ||
|
|
||
|
|
||
| class SseEventIterable: | ||
| """SSE iterable modeled after KERIpy signaling without shared draining.""" | ||
|
|
||
| TimeoutSSE = 300 # seconds | ||
|
|
||
| def __init__(self, broadcaster: SseBroadcaster, sid: str, queue, retry=5000): | ||
| self.broadcaster = broadcaster | ||
| self.sid = sid | ||
| self.queue = queue | ||
| self.retry = retry | ||
| self.start = None | ||
| self.end = None | ||
|
|
||
| def __iter__(self): | ||
| self.start = self.end = time.perf_counter() | ||
| return self | ||
|
|
||
| def __next__(self): | ||
| if self.end - self.start >= self.TimeoutSSE: | ||
| self.broadcaster.unsubscribe(self.sid) | ||
| raise StopIteration | ||
|
|
||
| if self.start == self.end: | ||
| self.end = time.perf_counter() | ||
| return bytes(f"retry: {self.retry}\n\n".encode("utf-8")) | ||
|
|
||
| data = bytearray() | ||
| while self.queue: | ||
| event = self.queue.popleft() | ||
| data.extend( | ||
| bytearray( | ||
| "id: {}\nretry: {}\nevent: {}\ndata: ".format( | ||
| event["id"], self.retry, event["event"] | ||
| ).encode("utf-8") | ||
| ) | ||
| ) | ||
| data.extend(event["data"]) | ||
| data.extend(b"\n\n") | ||
|
|
||
| self.end = time.perf_counter() | ||
| return bytes(data) | ||
|
|
||
|
|
||
| class SseBroadcasterDoer(doing.Doer): | ||
| """Drain generic agent signal cues into the Agent-owned broadcaster.""" | ||
|
|
||
| def __init__(self, agent, cues=None, broadcaster=None, tock=0.0): | ||
| self.agent = agent | ||
| if cues is None: | ||
| raise ValueError("cues is required") | ||
| if broadcaster is None: | ||
| raise ValueError("broadcaster is required") | ||
| self.cues = cues | ||
| self.broadcaster = broadcaster | ||
| super().__init__(tock=tock) | ||
|
|
||
| def recur(self, tyme=None, tock=0.0, **opts): | ||
| while self.cues: | ||
| cue = self.cues.popleft() | ||
| try: | ||
| self.broadcaster.publish( | ||
| event=cue["event"], | ||
| data=signedReplyEnvelope( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Possibly could sign the whole payload rather than the data |
||
| self.agent, | ||
| route=cue["route"], | ||
| payload=cue["payload"], | ||
| ), | ||
| event_id=cue["event_id"], | ||
| ) | ||
| except Exception: # pragma: no cover - defensive transient logging | ||
| logger.exception("failed to publish SSE signal cue %s", cue) | ||
|
|
||
| return False | ||
|
|
||
|
|
||
| def enqueueSignedReplyCue(cues, event: str, route: str, payload: dict, event_id: str): | ||
| """Queue one agent-signed reply for live SSE publication.""" | ||
| cues.append( | ||
| { | ||
| "event": event, | ||
| "route": route, | ||
| "payload": payload, | ||
| "event_id": event_id, | ||
| } | ||
| ) | ||
|
|
||
|
|
||
| def signedReplyEnvelope(agent, route: str, payload: dict) -> dict: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This probably requires a discussion on a dev call, but does it need to be a reply envelope? Maybe a SPAC message or something makes the most sense, even if unencrypted for now. |
||
| """Create a KERI ``rpy`` envelope signed by the KERIA agent AID.""" | ||
| data = dict(payload) | ||
| data.setdefault("agent", agent.agentHab.pre) | ||
| rserder = eventing.reply(route=route, data=data) | ||
| sigs = agent.agentHab.sign(ser=rserder.raw) | ||
| return {"rpy": rserder.ked, "sigs": [siger.qb64 for siger in sigs]} | ||
|
|
||
|
|
||
| def loadEnds(app): | ||
| """Register generic signed agent event streaming routes.""" | ||
| app.add_route("/signals/stream", SignalsStreamEnd()) | ||
|
|
||
|
|
||
| class SignalsStreamEnd: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I presume the 2 use cases are notifications and long running operation completions. So will this endpoint be phased out in favour of those later? (OK by me)
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Really, we need to be able to replay using the "Last-Event-ID" header when a client reconnects, so I think it might be pragmatic to build this against notifications, as something that is DB backed |
||
| """Signed admin SSE endpoint for agent-to-edge-controller events.""" | ||
|
|
||
| def on_get(self, req, rep): | ||
| """Signal stream GET endpoint | ||
|
|
||
| Parameters: | ||
| req: falcon.Request HTTP request | ||
| rep: falcon.Response HTTP response | ||
| --- | ||
| summary: Open authenticated agent signal stream | ||
| description: | | ||
| Opens an authenticated Server-Sent Events stream for live agent signals. | ||
| The stream sends an initial retry frame and later event frames whose | ||
| data is a KERI rpy envelope signed by the connected agent. | ||
| tags: | ||
| - Signals | ||
| responses: | ||
| 200: | ||
| description: Server-Sent Events stream for generic agent signals. | ||
| content: | ||
| text/event-stream: | ||
| schema: | ||
| type: string | ||
| description: SSE frames with id, retry, event, and JSON data fields. | ||
| """ | ||
| agent = req.context.agent | ||
| rep.status = falcon.HTTP_200 | ||
| rep.content_type = "text/event-stream" | ||
| rep.set_header("Cache-Control", "no-cache") | ||
| rep.set_header("connection", "close") | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is meant to be long lived, we shouldn't have this header |
||
| rep.stream = agent.sseBroadcaster.subscribe() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| # -*- encoding: utf-8 -*- | ||
| """ | ||
| Generic agent SSE streaming tests. | ||
| """ | ||
|
|
||
| import json | ||
| from types import SimpleNamespace | ||
|
|
||
| import falcon | ||
| import pytest | ||
| from hio.help import decking | ||
| from keri.core import indexing, serdering | ||
|
|
||
| from keria.app import streaming | ||
|
|
||
|
|
||
| def test_sse_broadcaster_uses_independent_subscriber_queues(): | ||
| broadcaster = streaming.SseBroadcaster() | ||
| left = iter(broadcaster.subscribe()) | ||
| right = iter(broadcaster.subscribe()) | ||
|
|
||
| assert next(left) == b"retry: 5000\n\n" | ||
| assert next(right) == b"retry: 5000\n\n" | ||
|
|
||
| broadcaster.publish(event="topic", data={"value": 1}, event_id="event-1") | ||
|
|
||
| left_frame = next(left).decode("utf-8") | ||
| right_frame = next(right).decode("utf-8") | ||
| assert "id: event-1" in left_frame | ||
| assert "event: topic" in left_frame | ||
| assert json.loads(left_frame.split("data: ", 1)[1].split("\n\n", 1)[0]) == { | ||
| "value": 1 | ||
| } | ||
| assert right_frame == left_frame | ||
|
|
||
|
|
||
| def test_signed_reply_envelope_is_signed_by_agent(helpers): | ||
| with helpers.openKeria() as (_agency, agent, _app, _client): | ||
| envelope = streaming.signedReplyEnvelope( | ||
| agent, "/test/route", {"subject": "value"} | ||
| ) | ||
|
|
||
| rserder = serdering.SerderKERI(sad=envelope["rpy"]) | ||
| siger = indexing.Siger(qb64=envelope["sigs"][0]) | ||
| assert rserder.ked["r"] == "/test/route" | ||
| assert rserder.ked["a"]["agent"] == agent.agentHab.pre | ||
| assert rserder.ked["a"]["subject"] == "value" | ||
| assert agent.agentHab.kever.verfers[0].verify(sig=siger.raw, ser=rserder.raw) | ||
|
|
||
|
|
||
| def test_sse_broadcaster_doer_drains_queued_signed_reply(helpers): | ||
| with helpers.openKeria() as (_agency, agent, _app, _client): | ||
| doer = agent.sseBroadcasterDoer | ||
| left = iter(agent.sseBroadcaster.subscribe()) | ||
| right = iter(agent.sseBroadcaster.subscribe()) | ||
| assert next(left) == b"retry: 5000\n\n" | ||
| assert next(right) == b"retry: 5000\n\n" | ||
|
|
||
| streaming.enqueueSignedReplyCue( | ||
| agent.signalCues, | ||
| event="topic", | ||
| route="/test/route", | ||
| payload={"subject": "value"}, | ||
| event_id="event-1", | ||
| ) | ||
|
|
||
| assert next(left) == b"" | ||
|
|
||
| doer.recur(0) | ||
| left_frame = next(left).decode("utf-8") | ||
| right_frame = next(right).decode("utf-8") | ||
| assert "id: event-1" in left_frame | ||
| assert "event: topic" in left_frame | ||
| assert right_frame == left_frame | ||
|
|
||
| envelope = json.loads(left_frame.split("data: ", 1)[1].split("\n\n", 1)[0]) | ||
| rserder = serdering.SerderKERI(sad=envelope["rpy"]) | ||
| siger = indexing.Siger(qb64=envelope["sigs"][0]) | ||
| assert rserder.ked["r"] == "/test/route" | ||
| assert rserder.ked["a"]["agent"] == agent.agentHab.pre | ||
| assert rserder.ked["a"]["subject"] == "value" | ||
| assert agent.agentHab.kever.verfers[0].verify(sig=siger.raw, ser=rserder.raw) | ||
|
|
||
|
|
||
| def test_agent_wires_sse_broadcaster_doer_without_topic_config(helpers): | ||
| with helpers.openKeria() as (_agency, agent, _app, _client): | ||
| assert isinstance(agent.sseBroadcaster, streaming.SseBroadcaster) | ||
| assert isinstance(agent.sseBroadcasterDoer, streaming.SseBroadcasterDoer) | ||
| assert agent.sseBroadcasterDoer in agent.doers | ||
| assert agent.sseBroadcasterDoer.cues is agent.signalCues | ||
| assert agent.sseBroadcasterDoer.broadcaster is agent.sseBroadcaster | ||
|
|
||
|
|
||
| def test_sse_broadcaster_doer_requires_explicit_cues_and_broadcaster(): | ||
| with pytest.raises(ValueError, match="cues is required"): | ||
| streaming.SseBroadcasterDoer( | ||
| SimpleNamespace(), broadcaster=streaming.SseBroadcaster() | ||
| ) | ||
|
|
||
| with pytest.raises(ValueError, match="broadcaster is required"): | ||
| streaming.SseBroadcasterDoer(SimpleNamespace(), cues=decking.Deck()) | ||
|
|
||
|
|
||
| def test_signals_stream_endpoint_returns_sse_stream(helpers): | ||
| with helpers.openKeria() as (_agency, agent, _app, _client): | ||
| req = SimpleNamespace(context=SimpleNamespace(agent=agent)) | ||
| headers = {} | ||
| rep = SimpleNamespace( | ||
| status=None, | ||
| content_type=None, | ||
| stream=None, | ||
| set_header=lambda name, value: headers.__setitem__(name, value), | ||
| ) | ||
|
|
||
| streaming.SignalsStreamEnd().on_get(req, rep) | ||
|
|
||
| assert rep.status == falcon.HTTP_200 | ||
| assert rep.content_type == "text/event-stream" | ||
| assert headers["Cache-Control"] == "no-cache" | ||
| assert headers["connection"] == "close" | ||
| assert rep.stream.broadcaster is agent.sseBroadcaster | ||
| assert next(iter(rep.stream)) == b"retry: 5000\n\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not an SSE expert but this might be short, and we may want to consider heartbeats to keep the connection open