Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion docs/keria_app.rst
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ keria.app.notifying
.. automodule:: keria.app.notifying
:members:

keria.app.streaming
-------------------

.. automodule:: keria.app.streaming
:members:

keria.app.presenting
--------------------

Expand All @@ -113,4 +119,4 @@ keria.app.specing
-----------------

.. automodule:: keria.app.specing
:members:
:members:
21 changes: 20 additions & 1 deletion src/keria/app/agenting.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,15 @@

from keria.utils.openapi import dataclassFromFielddom

from . import aiding, notifying, indirecting, credentialing, ipexing, delegating
from . import (
aiding,
notifying,
indirecting,
credentialing,
ipexing,
delegating,
streaming,
)
from . import grouping as keriagrouping
from .serving import GracefulShutdownDoer
from .. import log_name, ogler, set_log_level
Expand Down Expand Up @@ -629,6 +637,7 @@ def __init__(self, hby, rgy, agentHab, agency, caid, **opts):
grants (Deck): IPEX grant messages.
admits (Deck): IPEX admit messages.
submits (Deck): KEL messages to be resubmitted to witnesses to obtain receipts of.
signalCues (Deck): Generic signed SSE signal cues for connected edge clients.
"""
self.agency = agency
self.caid = caid
Expand Down Expand Up @@ -659,6 +668,8 @@ def __init__(self, hby, rgy, agentHab, agency, caid, **opts):
self.grants = decking.Deck()
self.admits = decking.Deck()
self.submits = decking.Deck()
self.signalCues = decking.Deck()
self.sseBroadcaster = streaming.SseBroadcaster()

receiptor = agenting.Receiptor(hby=hby)
self.witq = agenting.WitnessInquisitor(hby=self.hby)
Expand Down Expand Up @@ -707,6 +718,12 @@ def __init__(self, hby, rgy, agentHab, agency, caid, **opts):
verifier=self.verifier,
notifier=self.notifier,
)
self.sseBroadcasterDoer = streaming.SseBroadcasterDoer(
agent=self,
cues=self.signalCues,
broadcaster=self.sseBroadcaster,
tock=self.tocks.get("sseBroadcaster", 0.0),
)

self.seeker = basing.Seeker(
name=hby.name,
Expand Down Expand Up @@ -839,6 +856,7 @@ def __init__(self, hby, rgy, agentHab, agency, caid, **opts):
queries=self.queries,
tock=self.tocks.get("exchangecue", 0.0),
),
self.sseBroadcasterDoer,
self.submitter,
]
)
Expand Down Expand Up @@ -962,6 +980,7 @@ def createAdminServerDoer(config: KERIAServerConfig, agency: Agency):

keriaexchanging.loadEnds(app=adminApp)
ipexing.loadEnds(app=adminApp)
streaming.loadEnds(app=adminApp)

adminServer = createHttpServer(
config.adminPort, adminApp, config.keyPath, config.certPath, config.caFilePath
Expand Down
183 changes: 183 additions & 0 deletions src/keria/app/streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# -*- encoding: utf-8 -*-
"""Generic signed agent event streaming helpers for KERIA.

This module owns the reusable transport contract between one KERIA agent and
its connected edge clients. Topic modules may publish events here, but topic
modules must not own SSE framing, subscriber fan-out, or KERI ``rpy`` envelope
signing.
"""

import json
import time
from collections import deque

import falcon
from hio.base import doing
from keri.core import eventing

from .. import log_name, ogler

logger = ogler.getLogger(log_name)


class SseBroadcaster:
"""In-memory per-agent SSE broadcaster with independent subscriber queues."""

def __init__(self):
self.subscribers = {}
self._index = 0

def subscribe(self):
self._index += 1
sid = str(self._index)
queue = deque()
self.subscribers[sid] = queue
return SseEventIterable(self, sid, queue)

def unsubscribe(self, sid: str):
self.subscribers.pop(sid, None)

def publish(self, event: str, data: dict, event_id: str):
payload = json.dumps(data).encode("utf-8")
frame = {
"id": event_id,
"event": event,
"data": payload,
}
for queue in list(self.subscribers.values()):
queue.append(frame)


class SseEventIterable:
"""SSE iterable modeled after KERIpy signaling without shared draining."""

TimeoutSSE = 300 # seconds
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not an SSE expert but this might be short, and we may want to consider heartbeats to keep the connection open


def __init__(self, broadcaster: SseBroadcaster, sid: str, queue, retry=5000):
self.broadcaster = broadcaster
self.sid = sid
self.queue = queue
self.retry = retry
self.start = None
self.end = None

def __iter__(self):
self.start = self.end = time.perf_counter()
return self

def __next__(self):
if self.end - self.start >= self.TimeoutSSE:
self.broadcaster.unsubscribe(self.sid)
raise StopIteration

if self.start == self.end:
self.end = time.perf_counter()
return bytes(f"retry: {self.retry}\n\n".encode("utf-8"))

data = bytearray()
while self.queue:
event = self.queue.popleft()
data.extend(
bytearray(
"id: {}\nretry: {}\nevent: {}\ndata: ".format(
event["id"], self.retry, event["event"]
).encode("utf-8")
)
)
data.extend(event["data"])
data.extend(b"\n\n")

self.end = time.perf_counter()
return bytes(data)


class SseBroadcasterDoer(doing.Doer):
"""Drain generic agent signal cues into the Agent-owned broadcaster."""

def __init__(self, agent, cues=None, broadcaster=None, tock=0.0):
self.agent = agent
if cues is None:
raise ValueError("cues is required")
if broadcaster is None:
raise ValueError("broadcaster is required")
self.cues = cues
self.broadcaster = broadcaster
super().__init__(tock=tock)

def recur(self, tyme=None, tock=0.0, **opts):
while self.cues:
cue = self.cues.popleft()
try:
self.broadcaster.publish(
event=cue["event"],
data=signedReplyEnvelope(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly could sign the whole payload rather than the data

self.agent,
route=cue["route"],
payload=cue["payload"],
),
event_id=cue["event_id"],
)
except Exception: # pragma: no cover - defensive transient logging
logger.exception("failed to publish SSE signal cue %s", cue)

return False


def enqueueSignedReplyCue(cues, event: str, route: str, payload: dict, event_id: str):
"""Queue one agent-signed reply for live SSE publication."""
cues.append(
{
"event": event,
"route": route,
"payload": payload,
"event_id": event_id,
}
)


def signedReplyEnvelope(agent, route: str, payload: dict) -> dict:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably requires a discussion on a dev call, but does it need to be a reply envelope? Maybe a SPAC message or something makes the most sense, even if unencrypted for now.

"""Create a KERI ``rpy`` envelope signed by the KERIA agent AID."""
data = dict(payload)
data.setdefault("agent", agent.agentHab.pre)
rserder = eventing.reply(route=route, data=data)
sigs = agent.agentHab.sign(ser=rserder.raw)
return {"rpy": rserder.ked, "sigs": [siger.qb64 for siger in sigs]}


def loadEnds(app):
"""Register generic signed agent event streaming routes."""
app.add_route("/signals/stream", SignalsStreamEnd())


class SignalsStreamEnd:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume the 2 use cases are notifications and long running operation completions. So will this endpoint be phased out in favour of those later? (OK by me)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really, we need to be able to replay using the "Last-Event-ID" header when a client reconnects, so I think it might be pragmatic to build this against notifications, as something that is DB backed

"""Signed admin SSE endpoint for agent-to-edge-controller events."""

def on_get(self, req, rep):
"""Signal stream GET endpoint

Parameters:
req: falcon.Request HTTP request
rep: falcon.Response HTTP response
---
summary: Open authenticated agent signal stream
description: |
Opens an authenticated Server-Sent Events stream for live agent signals.
The stream sends an initial retry frame and later event frames whose
data is a KERI rpy envelope signed by the connected agent.
tags:
- Signals
responses:
200:
description: Server-Sent Events stream for generic agent signals.
content:
text/event-stream:
schema:
type: string
description: SSE frames with id, retry, event, and JSON data fields.
"""
agent = req.context.agent
rep.status = falcon.HTTP_200
rep.content_type = "text/event-stream"
rep.set_header("Cache-Control", "no-cache")
rep.set_header("connection", "close")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is meant to be long lived, we shouldn't have this header

rep.stream = agent.sseBroadcaster.subscribe()
122 changes: 122 additions & 0 deletions tests/app/test_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# -*- encoding: utf-8 -*-
"""
Generic agent SSE streaming tests.
"""

import json
from types import SimpleNamespace

import falcon
import pytest
from hio.help import decking
from keri.core import indexing, serdering

from keria.app import streaming


def test_sse_broadcaster_uses_independent_subscriber_queues():
broadcaster = streaming.SseBroadcaster()
left = iter(broadcaster.subscribe())
right = iter(broadcaster.subscribe())

assert next(left) == b"retry: 5000\n\n"
assert next(right) == b"retry: 5000\n\n"

broadcaster.publish(event="topic", data={"value": 1}, event_id="event-1")

left_frame = next(left).decode("utf-8")
right_frame = next(right).decode("utf-8")
assert "id: event-1" in left_frame
assert "event: topic" in left_frame
assert json.loads(left_frame.split("data: ", 1)[1].split("\n\n", 1)[0]) == {
"value": 1
}
assert right_frame == left_frame


def test_signed_reply_envelope_is_signed_by_agent(helpers):
with helpers.openKeria() as (_agency, agent, _app, _client):
envelope = streaming.signedReplyEnvelope(
agent, "/test/route", {"subject": "value"}
)

rserder = serdering.SerderKERI(sad=envelope["rpy"])
siger = indexing.Siger(qb64=envelope["sigs"][0])
assert rserder.ked["r"] == "/test/route"
assert rserder.ked["a"]["agent"] == agent.agentHab.pre
assert rserder.ked["a"]["subject"] == "value"
assert agent.agentHab.kever.verfers[0].verify(sig=siger.raw, ser=rserder.raw)


def test_sse_broadcaster_doer_drains_queued_signed_reply(helpers):
with helpers.openKeria() as (_agency, agent, _app, _client):
doer = agent.sseBroadcasterDoer
left = iter(agent.sseBroadcaster.subscribe())
right = iter(agent.sseBroadcaster.subscribe())
assert next(left) == b"retry: 5000\n\n"
assert next(right) == b"retry: 5000\n\n"

streaming.enqueueSignedReplyCue(
agent.signalCues,
event="topic",
route="/test/route",
payload={"subject": "value"},
event_id="event-1",
)

assert next(left) == b""

doer.recur(0)
left_frame = next(left).decode("utf-8")
right_frame = next(right).decode("utf-8")
assert "id: event-1" in left_frame
assert "event: topic" in left_frame
assert right_frame == left_frame

envelope = json.loads(left_frame.split("data: ", 1)[1].split("\n\n", 1)[0])
rserder = serdering.SerderKERI(sad=envelope["rpy"])
siger = indexing.Siger(qb64=envelope["sigs"][0])
assert rserder.ked["r"] == "/test/route"
assert rserder.ked["a"]["agent"] == agent.agentHab.pre
assert rserder.ked["a"]["subject"] == "value"
assert agent.agentHab.kever.verfers[0].verify(sig=siger.raw, ser=rserder.raw)


def test_agent_wires_sse_broadcaster_doer_without_topic_config(helpers):
with helpers.openKeria() as (_agency, agent, _app, _client):
assert isinstance(agent.sseBroadcaster, streaming.SseBroadcaster)
assert isinstance(agent.sseBroadcasterDoer, streaming.SseBroadcasterDoer)
assert agent.sseBroadcasterDoer in agent.doers
assert agent.sseBroadcasterDoer.cues is agent.signalCues
assert agent.sseBroadcasterDoer.broadcaster is agent.sseBroadcaster


def test_sse_broadcaster_doer_requires_explicit_cues_and_broadcaster():
with pytest.raises(ValueError, match="cues is required"):
streaming.SseBroadcasterDoer(
SimpleNamespace(), broadcaster=streaming.SseBroadcaster()
)

with pytest.raises(ValueError, match="broadcaster is required"):
streaming.SseBroadcasterDoer(SimpleNamespace(), cues=decking.Deck())


def test_signals_stream_endpoint_returns_sse_stream(helpers):
with helpers.openKeria() as (_agency, agent, _app, _client):
req = SimpleNamespace(context=SimpleNamespace(agent=agent))
headers = {}
rep = SimpleNamespace(
status=None,
content_type=None,
stream=None,
set_header=lambda name, value: headers.__setitem__(name, value),
)

streaming.SignalsStreamEnd().on_get(req, rep)

assert rep.status == falcon.HTTP_200
assert rep.content_type == "text/event-stream"
assert headers["Cache-Control"] == "no-cache"
assert headers["connection"] == "close"
assert rep.stream.broadcaster is agent.sseBroadcaster
assert next(iter(rep.stream)) == b"retry: 5000\n\n"
Loading