Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions app/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,20 @@

import httpx
from delego import ProposedAction
from delego.brokers import BrokerRefusal


class HttpxBroker:
"""Executes an authorised action with httpx. Holds no credentials here, but
this is exactly where you'd inject them for a real upstream service."""
this is exactly where you'd inject them for a real upstream service.

Per the ``BrokerAdapter`` execution contract (spec §4.2) it sends only the
**fingerprinted** action — method + host + path + the declared ``params`` —
and refuses (``BrokerRefusal``) if ``action.url`` carries a query string or
fragment, since through protocol 0.2 the query is outside the fingerprint and
so was never authorised. Forwarding it verbatim would let ``/orders?to=me``
and ``/orders?to=attacker`` (one fingerprint) reach different upstreams — the
confused-deputy gap the firewall exists to close."""

name = "httpx"

Expand All @@ -25,12 +34,23 @@ def __init__(self, timeout: float = 15.0) -> None:
)

def execute(self, action: ProposedAction) -> dict:
# Fail closed on a query/fragment the fingerprint never represented,
# rather than silently forwarding decision-relevant data (spec §4.2).
if action.has_query:
raise BrokerRefusal(
"broker refuses to execute: action.url carries a query string or "
"fragment outside the fingerprint (method+host+path+params), so it "
"was never authorised (spec §4.2). Put decision-relevant values in "
"params. Offending url: " + action.url
)

method = action.method.upper()
kwargs: dict = {}
# For writes, forward the declared params as the JSON body.
if method in ("POST", "PUT", "PATCH"):
kwargs["json"] = action.params
resp = self._client.request(method, action.url, **kwargs)
# Request only the fingerprinted URL (scheme+host+path); no query is sent.
resp = self._client.request(method, action.fingerprinted_url, **kwargs)
return {
"broker": self.name,
"http_status": resp.status_code,
Expand Down
81 changes: 77 additions & 4 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,50 @@
Run a **single** uvicorn worker: as of delego 0.2.1 concurrent writes are
serialised with a file lock (corruption-safe), but one writer is simplest until
delego's single-writer daemon lands.

────────────────────────────────────────────────────────────────────────────
SECURITY — APPROVAL IS A SEPARATE TRUST DOMAIN (read before deploying)
────────────────────────────────────────────────────────────────────────────
The agent-facing endpoints (``/propose``, ``/resolve``) and the *human* approval
endpoints (``/approvals/{id}/approve`` and ``/deny``) MUST NOT share one trust
boundary. The whole point of the firewall is that a compromised agent cannot get
a sensitive action executed without an out-of-band human "yes" (spec §2, §7). If
the agent can also reach ``/approve``, it simply approves its own action and the
human-in-the-loop guarantee collapses.

So the approval endpoints are gated by a bearer token (``DELEGO_APPROVAL_TOKEN``)
that the agent must never hold; ``/propose`` and ``/resolve`` are deliberately
left on the open agent surface. This is the *minimum* separation for a sample. A
real deployment SHOULD put approvals behind a distinct service / network / identity
(the human's console), not merely a second secret in the same process.

If ``DELEGO_APPROVAL_TOKEN`` is unset the approval endpoints **fail closed**
(refuse every call) rather than fall open to the agent.
"""

from __future__ import annotations

import hmac
import os
import shutil
from dataclasses import asdict
from pathlib import Path

from delego import ProposedAction, build_firewall
from delego.brokers import BrokerAdapter
from delego.brokers import BrokerAdapter, BrokerRefusal
from delego.config import Paths
from fastapi import FastAPI, HTTPException
from fastapi import Depends, FastAPI, Header, HTTPException, status
from pydantic import BaseModel, Field

from .broker import HttpxBroker

REPO_ROOT = Path(__file__).resolve().parent.parent
BUNDLED_POLICY = REPO_ROOT / "policy.yaml"

# The bearer token gating the human-approval endpoints. It lives in a separate
# trust domain from the agent: the agent never holds it (see the module note).
APPROVAL_TOKEN_ENV = "DELEGO_APPROVAL_TOKEN"


class ActionIn(BaseModel):
instruction: str = Field(..., min_length=1, description="The human ask this action serves")
Expand All @@ -44,6 +68,37 @@ class ResolveIn(ActionIn):
approval_id: str = Field(..., description="The approval id returned by /propose")


def require_approval_auth(authorization: str | None = Header(default=None)) -> None:
"""Gate the human-approval endpoints on the ``DELEGO_APPROVAL_TOKEN`` bearer.

Distinct from ``/propose`` and ``/resolve``, which are the agent's surface:
approval is a *separate trust domain* (spec §2, §7) so a compromised agent
cannot approve its own actions. Fails **closed** — if the token is not
configured, or the header is missing/wrong, the call is refused.

Comparison is constant-time (``hmac.compare_digest``) so a wrong token can't
be recovered by timing.
"""
expected = os.environ.get(APPROVAL_TOKEN_ENV)
if not expected:
# No token configured ⇒ approval cannot be authenticated ⇒ refuse,
# rather than leave approval open to the agent.
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=(
f"approval endpoint disabled: set {APPROVAL_TOKEN_ENV} to a secret "
"the agent does not hold (approval is a separate trust domain)"
),
)
scheme, _, token = (authorization or "").partition(" ")
if scheme.lower() != "bearer" or not token or not hmac.compare_digest(token, expected):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="approval requires a valid bearer token (separate trust domain from the agent)",
headers={"WWW-Authenticate": "Bearer"},
)


def create_app(broker: BrokerAdapter | None = None, home: str | os.PathLike | None = None) -> FastAPI:
"""Build the FastAPI app around a delego ``Firewall``.

Expand All @@ -63,6 +118,18 @@ def create_app(broker: BrokerAdapter | None = None, home: str | os.PathLike | No
version="0.1.0",
)

@app.exception_handler(BrokerRefusal)
def _broker_refused(_request, exc: BrokerRefusal):
# The firewall authorised the fingerprinted action, but the broker refused
# to forward decision-relevant data outside that fingerprint (a query
# string; spec §4.2). Surface it as a clear 4xx, not a 500.
from fastapi.responses import JSONResponse

return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={"detail": str(exc), "broker_refused": True},
)

def as_json(decision) -> dict:
out = asdict(decision)
out["result"] = decision.result
Expand All @@ -75,6 +142,8 @@ def root() -> dict:
"docs": "/docs",
"flow": "POST /propose -> (needs_approval) human POSTs /approvals/{id}/approve "
"-> agent POSTs /resolve; read the trail at /audit, check it at /verify",
"note": "approval endpoints require a bearer token (a separate trust "
"domain from the agent); set DELEGO_APPROVAL_TOKEN",
}

@app.get("/policy")
Expand Down Expand Up @@ -106,14 +175,18 @@ def resolve(req: ResolveIn) -> dict:
def pending() -> list[dict]:
return fw.approvals.pending()

@app.post("/approvals/{approval_id}/approve")
# --- human approval (SEPARATE TRUST DOMAIN) -------------------------------
# These two endpoints are the human's, not the agent's. They are gated by
# DELEGO_APPROVAL_TOKEN (see module note + require_approval_auth): a
# compromised agent must not be able to approve its own actions.
@app.post("/approvals/{approval_id}/approve", dependencies=[Depends(require_approval_auth)])
def approve(approval_id: str, approver: str = "api") -> dict:
rec = fw.approvals.decide(approval_id, approved=True, approver=approver)
if rec is None:
raise HTTPException(status_code=404, detail="unknown approval id")
return rec

@app.post("/approvals/{approval_id}/deny")
@app.post("/approvals/{approval_id}/deny", dependencies=[Depends(require_approval_auth)])
def deny(approval_id: str, approver: str = "api") -> dict:
rec = fw.approvals.decide(approval_id, approved=False, approver=approver)
if rec is None:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
delego>=0.2.2 # 0.2.2 fixes the amount-cap nan bypass this policy relies on
delego>=0.2.3 # 0.2.3 adds BrokerRefusal + the confused-deputy broker fix (C1) this app uses
fastapi>=0.110
uvicorn[standard]>=0.29
httpx>=0.27
13 changes: 12 additions & 1 deletion tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,25 @@

import tempfile

import pytest
from delego.brokers import NullBroker
from fastapi.testclient import TestClient

from app.main import create_app


@pytest.fixture(autouse=True)
def _approval_token(monkeypatch):
# The human-approval endpoints are a separate trust domain gated by a bearer
# (DELEGO_APPROVAL_TOKEN) — see test_approval_auth.py. Configure it so the
# end-to-end loop can approve; client() sends the matching header.
monkeypatch.setenv("DELEGO_APPROVAL_TOKEN", "test-token")


def client() -> TestClient:
return TestClient(create_app(broker=NullBroker(), home=tempfile.mkdtemp()))
c = TestClient(create_app(broker=NullBroker(), home=tempfile.mkdtemp()))
c.headers["Authorization"] = "Bearer test-token" # approval trust domain
return c


SMALL = {
Expand Down
40 changes: 40 additions & 0 deletions tests/test_approval_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""C2 regression: the human-approval endpoints are a separate trust domain.

They are gated by a ``DELEGO_APPROVAL_TOKEN`` bearer and fail closed when it is
unset, so a compromised agent that controls /propose & /resolve cannot approve
its own actions.
"""

from __future__ import annotations

import tempfile

from delego.brokers import NullBroker
from fastapi.testclient import TestClient

from app.main import create_app


def _client() -> TestClient:
return TestClient(create_app(broker=NullBroker(), home=tempfile.mkdtemp()))


def test_approval_fails_closed_when_token_unset(monkeypatch):
monkeypatch.delenv("DELEGO_APPROVAL_TOKEN", raising=False)
c = _client()
r = c.post("/approvals/any/approve")
# No token configured => approval cannot be authenticated => refuse.
assert r.status_code == 503


def test_approval_rejects_missing_bearer(monkeypatch):
monkeypatch.setenv("DELEGO_APPROVAL_TOKEN", "s3cret")
c = _client()
assert c.post("/approvals/any/approve").status_code == 401


def test_approval_rejects_wrong_bearer(monkeypatch):
monkeypatch.setenv("DELEGO_APPROVAL_TOKEN", "s3cret")
c = _client()
r = c.post("/approvals/any/approve", headers={"Authorization": "Bearer wrong"})
assert r.status_code == 401
Loading