|
| 1 | +"""Framework-managed cursor pagination for list responses. |
| 2 | +
|
| 3 | +When an adopter sets ``auto_paginate=True`` on :class:`DecisioningCapabilities`, |
| 4 | +the handler intercepts ``get_products``, calls the adopter's implementation to |
| 5 | +retrieve the full product set, and slices the result to the requested page. |
| 6 | +
|
| 7 | +.. warning:: **Adoption ceiling.** ``auto_paginate=True`` requires the adopter's |
| 8 | + ``get_products`` to return the *complete* unfiltered product set on every call — |
| 9 | + the framework slices it. This pattern works well for in-memory and small-catalog |
| 10 | + sellers (≲10 k products). Adopters with DB-backed catalogs at production scale |
| 11 | + MUST handle cursor logic natively and leave ``auto_paginate=False`` (the default). |
| 12 | + Returning a 100 k-product list only to have the framework discard 99 950 rows is |
| 13 | + a silent production latency and memory spike that only manifests at scale. |
| 14 | +
|
| 15 | +The cursor is an **HMAC-SHA-256-signed** JSON envelope:: |
| 16 | +
|
| 17 | + base64url( JSON({ "p": JSON({"v":1,"o":<offset>,"qh":<filter-hash>}), "s":<hex-sig> }) ) |
| 18 | +
|
| 19 | +The ``qh`` field is a SHA-256 fingerprint of the request parameters (excluding |
| 20 | +``pagination`` itself), so if a buyer changes filters between pages the framework |
| 21 | +returns ``INVALID_REQUEST`` with ``field="pagination.cursor"`` rather than silently |
| 22 | +serving the wrong page slice. |
| 23 | +
|
| 24 | +**Secret management.** The HMAC key defaults to a per-process random secret (stable |
| 25 | +within the process, different across restarts). Stale cursors from a prior process |
| 26 | +return ``INVALID_REQUEST`` — this is the correct behaviour for stateless sellers. |
| 27 | +Horizontally-scaled or stateless sellers that need cursors to survive process restarts |
| 28 | +MUST set ``ADCP_PAGINATION_SECRET`` in the environment. |
| 29 | +""" |
| 30 | + |
| 31 | +from __future__ import annotations |
| 32 | + |
| 33 | +import base64 |
| 34 | +import hashlib |
| 35 | +import hmac |
| 36 | +import json |
| 37 | +import logging |
| 38 | +import os |
| 39 | +from typing import Any |
| 40 | + |
| 41 | +from adcp.decisioning.types import AdcpError |
| 42 | + |
| 43 | +logger = logging.getLogger(__name__) |
| 44 | + |
| 45 | +_CURSOR_VERSION = 1 |
| 46 | + |
| 47 | +# Per-process fallback secret. Generated once at import time; stable within a |
| 48 | +# process, different across restarts. Stale cursors from prior processes fail |
| 49 | +# the HMAC check and return INVALID_REQUEST — correct for stateless sellers. |
| 50 | +_PROCESS_SECRET: bytes = os.urandom(32) |
| 51 | + |
| 52 | + |
| 53 | +def _secret() -> bytes: |
| 54 | + """Return the active HMAC secret.""" |
| 55 | + env = os.environ.get("ADCP_PAGINATION_SECRET") |
| 56 | + return env.encode() if env else _PROCESS_SECRET |
| 57 | + |
| 58 | + |
| 59 | +def _sign(payload: bytes, secret: bytes) -> str: |
| 60 | + return hmac.new(secret, payload, digestmod="sha256").hexdigest() |
| 61 | + |
| 62 | + |
| 63 | +def _encode_cursor(offset: int, query_hash: str, secret: bytes | None = None) -> str: |
| 64 | + """Return an opaque, HMAC-signed cursor string for the given page offset.""" |
| 65 | + payload = json.dumps( |
| 66 | + {"v": _CURSOR_VERSION, "o": offset, "qh": query_hash}, separators=(",", ":") |
| 67 | + ).encode() |
| 68 | + sig = _sign(payload, secret if secret is not None else _secret()) |
| 69 | + envelope = json.dumps( |
| 70 | + {"p": payload.decode(), "s": sig}, separators=(",", ":") |
| 71 | + ).encode() |
| 72 | + return base64.urlsafe_b64encode(envelope).rstrip(b"=").decode() |
| 73 | + |
| 74 | + |
| 75 | +def _decode_cursor(cursor: str, expected_query_hash: str, secret: bytes | None = None) -> int: |
| 76 | + """Decode *cursor* and return the page offset. |
| 77 | +
|
| 78 | + :raises AdcpError: ``INVALID_REQUEST`` (``field="pagination.cursor"``, |
| 79 | + ``recovery="correctable"``) when the cursor is malformed, has an invalid |
| 80 | + HMAC signature, or embeds a query-hash that doesn't match |
| 81 | + *expected_query_hash* (filters changed between pages). |
| 82 | + """ |
| 83 | + _bad = AdcpError( |
| 84 | + "INVALID_REQUEST", |
| 85 | + message=( |
| 86 | + "pagination.cursor is malformed or expired. Omit the cursor to restart from page 1." |
| 87 | + ), |
| 88 | + field="pagination.cursor", |
| 89 | + recovery="correctable", |
| 90 | + ) |
| 91 | + try: |
| 92 | + # Restore stripped padding before decoding. |
| 93 | + padded = cursor + "=" * ((4 - len(cursor) % 4) % 4) |
| 94 | + raw = base64.urlsafe_b64decode(padded.encode()) |
| 95 | + envelope = json.loads(raw) |
| 96 | + payload_str: str = envelope["p"] |
| 97 | + sig: str = envelope["s"] |
| 98 | + except Exception: |
| 99 | + raise _bad from None # suppress decode-error chain; internal detail |
| 100 | + |
| 101 | + payload_bytes = payload_str.encode() |
| 102 | + expected_sig = _sign(payload_bytes, secret if secret is not None else _secret()) |
| 103 | + if not hmac.compare_digest(sig, expected_sig): |
| 104 | + raise _bad from None |
| 105 | + |
| 106 | + try: |
| 107 | + inner = json.loads(payload_bytes) |
| 108 | + offset: int = int(inner["o"]) |
| 109 | + qh: str = inner["qh"] |
| 110 | + except Exception: |
| 111 | + raise _bad from None |
| 112 | + |
| 113 | + if not hmac.compare_digest(qh, expected_query_hash): |
| 114 | + raise AdcpError( |
| 115 | + "INVALID_REQUEST", |
| 116 | + message=( |
| 117 | + "pagination.cursor is stale: request filters changed since this cursor was " |
| 118 | + "issued. Omit the cursor to restart pagination with the new filter values." |
| 119 | + ), |
| 120 | + field="pagination.cursor", |
| 121 | + recovery="correctable", |
| 122 | + suggestion="Omit the cursor to start from the first page with the updated filters.", |
| 123 | + ) |
| 124 | + |
| 125 | + return offset |
| 126 | + |
| 127 | + |
| 128 | +def _query_hash(params: Any) -> str: |
| 129 | + """Compute a stable fingerprint of request params, excluding ``pagination``. |
| 130 | +
|
| 131 | + Detects filter drift between pages. ``pagination`` is excluded so |
| 132 | + successive page requests with different cursors still match the same hash. |
| 133 | + """ |
| 134 | + try: |
| 135 | + if hasattr(params, "model_dump"): |
| 136 | + d: Any = params.model_dump(mode="json", exclude={"pagination"}) |
| 137 | + else: |
| 138 | + d = {k: v for k, v in dict(params).items() if k != "pagination"} |
| 139 | + canonical = json.dumps(d, sort_keys=True, separators=(",", ":"), default=str) |
| 140 | + except Exception: |
| 141 | + canonical = repr(params) |
| 142 | + return hashlib.sha256(canonical.encode()).hexdigest()[:32] |
| 143 | + |
| 144 | + |
| 145 | +def apply_framework_pagination( |
| 146 | + response: Any, |
| 147 | + pagination: Any, |
| 148 | + query_hash: str, |
| 149 | + secret: bytes | None = None, |
| 150 | +) -> Any: |
| 151 | + """Slice a full-list ``GetProductsResponse`` to the requested page. |
| 152 | +
|
| 153 | + Called by the handler post-adapter when ``auto_paginate=True`` on |
| 154 | + :class:`~adcp.decisioning.platform.DecisioningCapabilities`. |
| 155 | +
|
| 156 | + Short-circuits (returns *response* unchanged) when: |
| 157 | +
|
| 158 | + * ``response.pagination`` is already populated — the adopter handled |
| 159 | + pagination natively; the framework must not overwrite it. |
| 160 | + * ``response.products`` is absent — unexpected shape; pass through and |
| 161 | + let wire validation surface the issue. |
| 162 | +
|
| 163 | + Clamps ``max_results`` to ``[1, 100]`` before slicing. |
| 164 | +
|
| 165 | + :param response: The adopter's full-list ``GetProductsResponse``. |
| 166 | + :param pagination: The wire ``Pagination`` request object |
| 167 | + (``max_results``, ``cursor``). |
| 168 | + :param query_hash: Filter fingerprint from :func:`_query_hash` on the |
| 169 | + original request. Used to anchor the cursor. |
| 170 | + :param secret: HMAC key override for testing and direct use. ``None`` uses |
| 171 | + :func:`_secret` (reads ``ADCP_PAGINATION_SECRET`` env var, falls back |
| 172 | + to the per-process random secret). The handler always passes ``None`` |
| 173 | + — configure production secrets via the env var. |
| 174 | + :returns: A new ``GetProductsResponse`` with ``products`` sliced to the |
| 175 | + page and ``pagination`` populated, or the original *response* if the |
| 176 | + short-circuit fired. |
| 177 | + """ |
| 178 | + # Short-circuit: adopter already populated pagination. Also short-circuits |
| 179 | + # when _invoke_platform_method returned a TaskHandoff projection dict |
| 180 | + # ({"task_id": ..., "status": "submitted"}) — dicts have no .products |
| 181 | + # attribute so the products-None branch below fires and passes through. |
| 182 | + if getattr(response, "pagination", None) is not None: |
| 183 | + return response |
| 184 | + |
| 185 | + products = getattr(response, "products", None) |
| 186 | + if products is None: |
| 187 | + return response |
| 188 | + |
| 189 | + # Decode cursor or start at offset 0. |
| 190 | + cursor_str = getattr(pagination, "cursor", None) |
| 191 | + if cursor_str: |
| 192 | + offset = _decode_cursor(cursor_str, query_hash, secret) |
| 193 | + else: |
| 194 | + offset = 0 |
| 195 | + |
| 196 | + # Clamp max_results to wire schema bounds [1, 100]. |
| 197 | + raw_max = getattr(pagination, "max_results", None) |
| 198 | + max_results = max(1, min(100, raw_max if raw_max is not None else 50)) |
| 199 | + |
| 200 | + full_list = list(products) |
| 201 | + page = full_list[offset : offset + max_results] |
| 202 | + has_more = (offset + max_results) < len(full_list) |
| 203 | + next_cursor = ( |
| 204 | + _encode_cursor(offset + max_results, query_hash, secret) if has_more else None |
| 205 | + ) |
| 206 | + |
| 207 | + # Deferred: adcp.types imports adcp.decisioning.types; top-level import is circular. |
| 208 | + from adcp.types import PaginationResponse |
| 209 | + |
| 210 | + new_pagination = PaginationResponse(has_more=has_more, cursor=next_cursor) |
| 211 | + return response.model_copy(update={"products": page, "pagination": new_pagination}) |
| 212 | + |
| 213 | + |
| 214 | +__all__ = ["apply_framework_pagination"] |
0 commit comments