Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ Import as `temporal_model.api`. Depends on `temporal-model-core`.
## Endpoints

- `GET /health` — readiness + loaded model name/version + API code version.
- `POST /predict` — body `{ "frames": ["<s3-key>", ...], "bucket": "<name>",
"roi_xyxyn": [x_min, y_min, x_max, y_max] }`
(ordered S3 keys; `bucket` optional, falls back to `S3_BUCKET`;
`roi_xyxyn` optional normalized region of interest — tubes with no real
detection intersecting it are dropped before scoring);
- `POST /predict` — body `{ "frames": [...], "source": "s3" | "local",
"bucket": "<name>", "roi_xyxyn": [x_min, y_min, x_max, y_max] }`
(ordered frames; `source` optional, falls back to `FRAME_SOURCE` — with
`s3`, frames are S3 keys and `bucket` optionally overrides `S3_BUCKET`;
with `local`, frames are relative paths under `FRAMES_ROOT` and `bucket`
is invalid; `roi_xyxyn` optional normalized region of interest — tubes
with no real detection intersecting it are dropped before scoring);
returns `{ is_smoke, probability, version }` (`probability` = max kept-tube
calibrated probability, `null` if uncalibrated).
`version` is `{api, model}` — the code release (== the Docker image tag,
Expand All @@ -23,7 +25,8 @@ Import as `temporal_model.api`. Depends on `temporal-model-core`.
top-level `trigger_frame_index` (`null` if nothing crossed) — with
`verbose=true` it also fills `details.decision.trigger_tube_id` and
per-tube `details.tubes[].first_crossing_frame`. See
`docs/specs/2026-06-02-api-service-design.md` for the full contract.
`docs/specs/2026-06-02-api-service-design.md` and
`docs/specs/2026-06-11-api-local-frames-design.md` for the full contract.

## Run

Expand All @@ -40,12 +43,31 @@ refuses to start if the file is missing. (`docker compose up --build` directly
will fail at the `COPY` step without it.)

Configuration via env vars (prefix `TEMPORAL_API_`): `MODEL_PATH`, `DEVICE`,
`CALIBRATOR_THRESHOLD`, `TOKEN`, `S3_BUCKET`, `S3_REGION`, `S3_ENDPOINT_URL`
(empty = real AWS; set for OVH or MinIO), `HOST`, `PORT`. AWS/OVH/MinIO credentials come from
the standard boto3 chain (env vars / IAM role). `S3_BUCKET` is an optional
default; a request may override it per call with its `bucket` field (needed for
alert-api stacks whose per-org bucket names are not known ahead of time). A
request with neither is rejected with `400 invalid_request`.
`CALIBRATOR_THRESHOLD`, `TOKEN`, `FRAME_SOURCE`, `FRAMES_ROOT`, `S3_BUCKET`,
`S3_REGION`, `S3_ENDPOINT_URL` (empty = real AWS; set for OVH or MinIO),
`HOST`, `PORT`. AWS/OVH/MinIO credentials come from the standard boto3 chain
(env vars / IAM role). `S3_BUCKET` is an optional default; a request may
override it per call with its `bucket` field (needed for alert-api stacks
whose per-org bucket names are not known ahead of time). A request with
neither is rejected with `400 invalid_request`.

`FRAME_SOURCE` (default `s3`) selects where `/predict` frames come from when
a request omits its optional `source` field. With `local` (an edge box whose
frames sit on a shared volume), `frames` are relative paths resolved under
`FRAMES_ROOT`; `FRAMES_ROOT` is settings-only by design — a request-supplied
root would let callers probe arbitrary server paths — and absolute paths or
`..` segments are rejected with `400 invalid_request`. A missing file is the
same `404 frame_not_found` as a missing S3 key, and local requests skip the
S3 download entirely (frames are read in place). `FRAME_SOURCE=local` without
`FRAMES_ROOT` fails at startup; a root that is not a directory at request
time (typo, unmounted volume) is a distinct `400`, not a per-frame 404.

Two invariants for local frame producers: frames are read in place at predict
time, so publish them atomically (write to a temp name, then rename) — a
frame mid-write can fail the request or score a truncated image. And when
the detection cache is enabled, frame basenames (stems) must stay globally
unique across cameras and time — the same invariant S3 keys already carry
(the cache is keyed by stem; see `detection_cache.py`).

`CALIBRATOR_THRESHOLD` (a probability in `[0, 1]`) overrides the packaged
calibrator decision threshold for every prediction; out-of-range values fail
Expand Down
53 changes: 40 additions & 13 deletions api/src/temporal_model/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import logging
import tempfile
from contextlib import asynccontextmanager
from contextlib import ExitStack, asynccontextmanager
from pathlib import Path

from fastapi import Depends, FastAPI, Request
Expand All @@ -16,6 +16,7 @@

from .auth import require_token
from .errors import ApiError, InferenceError, InvalidRequest, ModelNotLoaded
from .local import resolve_frames
from .model_runner import ModelRunner
from .s3 import fetch_frames, make_s3_client
from .schemas import PredictRequest, PredictResponse, to_response
Expand Down Expand Up @@ -132,29 +133,55 @@ async def predict(
verbose: bool = False,
compute_trigger: bool = False,
) -> PredictResponse:
bucket = body.bucket or settings.s3_bucket
if not bucket:
raise InvalidRequest(
"no S3 bucket: set request 'bucket' or TEMPORAL_API_S3_BUCKET"
)
source = body.source or settings.frame_source
if source == "local":
if body.bucket is not None:
raise InvalidRequest("bucket is not valid with local frames")
if not settings.frames_root:
raise InvalidRequest(
"local frames not enabled: set TEMPORAL_API_FRAMES_ROOT"
)
else:
bucket = body.bucket or settings.s3_bucket
if not bucket:
raise InvalidRequest(
"no S3 bucket: set request 'bucket' or TEMPORAL_API_S3_BUCKET"
)

runner = getattr(request.app.state, "runner", None)
if runner is None:
raise ModelNotLoaded("model is not loaded")
s3_client = request.app.state.s3_client

with tempfile.TemporaryDirectory() as tmp:
with ExitStack() as stack:
try:
# Timer carries the model device so GPU/MPS stages are synced for
# honest timing; on the CPU serving target this is a no-op.
timer = StageTimer(settings.device) if settings.profile else None
profile: dict | None = {} if settings.profile else None

with stage_ctx(timer, "s3_fetch"):
# fetch_frames is blocking boto3 I/O — run it off the event loop.
paths = await run_in_threadpool(
fetch_frames, s3_client, bucket, body.frames, Path(tmp)
)
if source == "local":
# Local frames are read in place — no temp dir, no copy.
with stage_ctx(timer, "local_resolve"):
# resolve_frames stats every frame — keep it off the event
# loop like the S3 fetch (the root may be a slow shared
# volume).
paths = await run_in_threadpool(
resolve_frames, Path(settings.frames_root), body.frames
)
else:
# The temp dir must outlive runner.predict — frames are read
# during inference.
tmp = stack.enter_context(tempfile.TemporaryDirectory())
with stage_ctx(timer, "s3_fetch"):
# fetch_frames is blocking boto3 I/O — run it off the
# event loop.
paths = await run_in_threadpool(
fetch_frames,
request.app.state.s3_client,
bucket,
body.frames,
Path(tmp),
)

out = await runner.predict(
paths,
Expand Down
46 changes: 46 additions & 0 deletions api/src/temporal_model/api/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Local-filesystem frame resolution.

Resolves request frame paths (relative identifiers) under the
server-configured ``frames_root`` and returns them in request order. Unlike
the S3 path, nothing is copied: the returned paths point at the real files.
The root comes from settings only — never from the request — so a request
cannot reference paths outside it (see
docs/specs/2026-06-11-api-local-frames-design.md, decision 2).
"""

from pathlib import Path

from .errors import FrameNotFound, InvalidRequest


def resolve_frames(root: Path, frames: list[str]) -> list[Path]:
"""Resolve ``frames`` under ``root``, in request order.

Rejects absolute paths and ``..`` segments outright, and anything whose
resolved path (symlinks followed) lands outside ``root``. A missing file
raises :class:`FrameNotFound` — the same error a missing S3 key maps to.
Error messages echo the request string, never the resolved server path.
"""
root = root.resolve()
if not root.is_dir():
# A typo'd or unmounted root would otherwise surface as a 404 per
# frame, indistinguishable from genuinely missing frames (the local
# analog of fetch_frames mapping NoSuchBucket to a distinct error).
raise InvalidRequest(
"frames root is not a directory: check TEMPORAL_API_FRAMES_ROOT"
)
paths: list[Path] = []
for frame in frames:
rel = Path(frame)
# Empty/"." have no parts and would resolve to the root itself.
if not rel.parts or rel.is_absolute() or ".." in rel.parts:
raise InvalidRequest(
f"frame must be a non-empty relative path without '..': {frame!r}"
)
resolved = (root / rel).resolve()
if not resolved.is_relative_to(root):
raise InvalidRequest(f"frame escapes the frames root: {frame!r}")
if not resolved.is_file():
raise FrameNotFound(f"frame not found: {frame}")
paths.append(resolved)
return paths
10 changes: 8 additions & 2 deletions api/src/temporal_model/api/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@

class PredictRequest(BaseModel):
frames: list[str]
# Where `frames` live: "s3" (keys in a bucket) or "local" (relative paths
# under the server's frames_root). None → the server's configured default
# (settings.frame_source).
source: Literal["s3", "local"] | None = None
# Optional per-request S3 bucket. Falls back to settings.s3_bucket when
# omitted (alert-api stacks use per-org dynamic bucket names that no single
# setting can cover).
Expand All @@ -38,10 +42,12 @@ class PredictRequest(BaseModel):
@classmethod
def _validate_frames(cls, v: list[str]) -> list[str]:
if not v:
raise ValueError("frames must contain at least one S3 key")
raise ValueError("frames must contain at least one entry")
for key in v:
if "://" in key:
raise ValueError(f"frame key must be a bare S3 key, not a URL: {key!r}")
raise ValueError(
f"frame must be a bare key or relative path, not a URL: {key!r}"
)
return v

@field_validator("bucket")
Expand Down
26 changes: 25 additions & 1 deletion api/src/temporal_model/api/settings.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Runtime configuration for the API, read from ``TEMPORAL_API_*`` env vars."""

from pydantic import Field, field_validator
from typing import Literal

from pydantic import Field, field_validator, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict


Expand Down Expand Up @@ -41,9 +43,31 @@ class Settings(BaseSettings):
s3_region: str | None = None
s3_endpoint_url: str | None = None

# Where /predict frames come from when a request omits its `source` field:
# "s3" downloads keys from a bucket; "local" resolves relative paths under
# `frames_root` (see docs/specs/2026-06-11-api-local-frames-design.md).
frame_source: Literal["s3", "local"] = "s3"

# Root directory for local frames. Required when serving local frames.
# Settings-only by design — a request-supplied root would let callers
# probe arbitrary server paths.
frames_root: str = ""

host: str = "0.0.0.0"
port: int = 8000

@model_validator(mode="after")
def _require_frames_root_for_local(self) -> "Settings":
# A local-default server without a root would 400 on every request;
# fail at boot like other server-level misconfig. (A per-request
# `source: "local"` override on an s3-default server is still checked
# in the route — it cannot be known at startup.)
if self.frame_source == "local" and not self.frames_root:
raise ValueError(
"TEMPORAL_API_FRAME_SOURCE=local requires TEMPORAL_API_FRAMES_ROOT"
)
return self

@field_validator("api_version")
@classmethod
def _empty_api_version_is_none(cls, v: str | None) -> str | None:
Expand Down
96 changes: 96 additions & 0 deletions api/tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,14 @@ def __init__(self, output=None, error=None):
self._output = output
self._error = error
self.roi = None
self.paths = None
self.compute_trigger = None

async def predict(
self, paths, *, roi=None, timer=None, profile=None, compute_trigger=False
):
self.roi = roi
self.paths = paths
self.compute_trigger = compute_trigger
if self._error:
raise self._error
Expand Down Expand Up @@ -461,3 +463,97 @@ def test_predict_invalid_roi_is_400(client):
body = r.json()
assert body["code"] == "invalid_request"
assert "roi_xyxyn" in body["detail"]


@pytest.fixture
def local_client(monkeypatch, tmp_path):
# An edge-box style deployment: frame_source=local, frames on a shared
# volume (tmp_path), no S3 involved.
monkeypatch.setattr(settings, "frame_source", "local")
monkeypatch.setattr(settings, "frames_root", str(tmp_path))
for key in KEYS:
p = tmp_path / key
p.parent.mkdir(parents=True, exist_ok=True)
p.write_bytes(b"\xff\xd8\xff\xe0jpeg")
with TestClient(app) as c:
c.app.state.runner = FakeRunner(output=_smoke_output())
yield c


def test_predict_local_default_source(local_client, tmp_path):
# `source` omitted follows settings.frame_source="local"; the runner gets
# the real files under the root, in request order — no copy.
r = local_client.post("/predict", json={"frames": KEYS})
assert r.status_code == 200
assert r.json()["is_smoke"] is True
runner = local_client.app.state.runner
assert runner.paths == [(tmp_path / k).resolve() for k in KEYS]


def test_predict_local_explicit_source(local_client):
r = local_client.post("/predict", json={"frames": KEYS, "source": "local"})
assert r.status_code == 200


def test_predict_local_rejects_bucket(local_client):
r = local_client.post("/predict", json={"frames": KEYS, "bucket": "some-bucket"})
assert r.status_code == 400
assert r.json()["code"] == "invalid_request"
assert "bucket" in r.json()["detail"]


def test_predict_local_without_root_400(local_client, monkeypatch):
monkeypatch.setattr(settings, "frames_root", "")
r = local_client.post("/predict", json={"frames": KEYS})
assert r.status_code == 400
assert r.json()["code"] == "invalid_request"
assert "TEMPORAL_API_FRAMES_ROOT" in r.json()["detail"]


def test_predict_local_no_root_400_takes_precedence_over_model(
local_client, monkeypatch
):
# Mirrors test_predict_no_bucket_400_takes_precedence_over_model: the
# prerequisite check runs before the model-loaded check.
monkeypatch.setattr(settings, "frames_root", "")
local_client.app.state.runner = None
r = local_client.post("/predict", json={"frames": KEYS})
assert r.status_code == 400
assert r.json()["code"] == "invalid_request"


def test_predict_local_missing_frame_404(local_client):
r = local_client.post("/predict", json={"frames": ["cam12/missing.jpg"]})
assert r.status_code == 404
assert r.json()["code"] == "frame_not_found"


def test_predict_local_traversal_400(local_client):
r = local_client.post("/predict", json={"frames": ["../etc/passwd"]})
assert r.status_code == 400
assert r.json()["code"] == "invalid_request"


def test_predict_explicit_s3_same_as_omitted(client):
# On an s3-default server, explicit source="s3" behaves identically to
# omitting it.
r = client.post("/predict", json={"frames": KEYS, "source": "s3"})
assert r.status_code == 200
assert r.json()["is_smoke"] is True


def test_predict_source_local_on_s3_server_needs_root(client):
# The s3-mode `client` fixture has no frames_root configured: an explicit
# local request is a clear 400, not a confusing fallback.
r = client.post("/predict", json={"frames": KEYS, "source": "local"})
assert r.status_code == 400
assert "TEMPORAL_API_FRAMES_ROOT" in r.json()["detail"]


def test_predict_local_profiling_stage(local_client, monkeypatch):
monkeypatch.setattr(settings, "profile", True)
r = local_client.post("/predict?verbose=true", json={"frames": KEYS})
assert r.status_code == 200
prof = r.json()["details"]["profiling"]
assert "local_resolve" in prof["stages_ms"]
assert "s3_fetch" not in prof["stages_ms"]
Loading
Loading