diff --git a/packages/backend/app/db/schema.sql b/packages/backend/app/db/schema.sql index 410189def..ff29d2dde 100644 --- a/packages/backend/app/db/schema.sql +++ b/packages/backend/app/db/schema.sql @@ -123,3 +123,30 @@ CREATE TABLE IF NOT EXISTS audit_logs ( action VARCHAR(100) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW() ); + +CREATE TABLE IF NOT EXISTS webhooks ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + url VARCHAR(500) NOT NULL, + secret VARCHAR(128) NOT NULL, + events VARCHAR(1000) NOT NULL DEFAULT '*', + active BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_webhooks_user ON webhooks(user_id, active); + +CREATE TABLE IF NOT EXISTS webhook_deliveries ( + id SERIAL PRIMARY KEY, + webhook_id INT NOT NULL REFERENCES webhooks(id) ON DELETE CASCADE, + event_type VARCHAR(100) NOT NULL, + payload TEXT NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'PENDING', + attempts INT NOT NULL DEFAULT 0, + last_status_code INT, + last_error VARCHAR(500), + next_attempt_at TIMESTAMP NOT NULL DEFAULT NOW(), + delivered_at TIMESTAMP, + created_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_status_next + ON webhook_deliveries(status, next_attempt_at); diff --git a/packages/backend/app/models.py b/packages/backend/app/models.py index 64d448104..6f10235bb 100644 --- a/packages/backend/app/models.py +++ b/packages/backend/app/models.py @@ -133,3 +133,39 @@ class AuditLog(db.Model): user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True) action = db.Column(db.String(100), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + +class Webhook(db.Model): + __tablename__ = "webhooks" + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False) + url = db.Column(db.String(500), nullable=False) + secret = db.Column(db.String(128), nullable=False) + events = db.Column(db.String(1000), nullable=False, default="*") + active = db.Column(db.Boolean, default=True, nullable=False) + created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + +class WebhookDeliveryStatus(str, Enum): + PENDING = "PENDING" + SUCCESS = "SUCCESS" + FAILED = "FAILED" + + +class WebhookDelivery(db.Model): + __tablename__ = "webhook_deliveries" + id = db.Column(db.Integer, primary_key=True) + webhook_id = db.Column( + db.Integer, db.ForeignKey("webhooks.id", ondelete="CASCADE"), nullable=False + ) + event_type = db.Column(db.String(100), nullable=False) + payload = db.Column(db.Text, nullable=False) + status = db.Column( + db.String(20), default=WebhookDeliveryStatus.PENDING.value, nullable=False + ) + attempts = db.Column(db.Integer, default=0, nullable=False) + last_status_code = db.Column(db.Integer, nullable=True) + last_error = db.Column(db.String(500), nullable=True) + next_attempt_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + delivered_at = db.Column(db.DateTime, nullable=True) + created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) diff --git a/packages/backend/app/routes/__init__.py b/packages/backend/app/routes/__init__.py index f13b0f897..75b957756 100644 --- a/packages/backend/app/routes/__init__.py +++ b/packages/backend/app/routes/__init__.py @@ -7,6 +7,7 @@ from .categories import bp as categories_bp from .docs import bp as docs_bp from .dashboard import bp as dashboard_bp +from .webhooks import bp as webhooks_bp def register_routes(app: Flask): @@ -18,3 +19,4 @@ def register_routes(app: Flask): app.register_blueprint(categories_bp, url_prefix="/categories") app.register_blueprint(docs_bp, url_prefix="/docs") app.register_blueprint(dashboard_bp, url_prefix="/dashboard") + app.register_blueprint(webhooks_bp, url_prefix="/webhooks") diff --git a/packages/backend/app/routes/bills.py b/packages/backend/app/routes/bills.py index f557e90d4..c39a196e1 100644 --- a/packages/backend/app/routes/bills.py +++ b/packages/backend/app/routes/bills.py @@ -4,6 +4,7 @@ from ..extensions import db from ..models import Bill, BillCadence, User from ..services.cache import cache_delete_patterns +from ..services.webhooks import emit_event import logging bp = Blueprint("bills", __name__) @@ -62,6 +63,18 @@ def create_bill(): cache_delete_patterns( [f"user:{uid}:upcoming_bills*", f"user:{uid}:dashboard_summary:*"] ) + emit_event( + uid, + "bill.created", + { + "id": b.id, + "name": b.name, + "amount": float(b.amount), + "currency": b.currency, + "next_due_date": b.next_due_date.isoformat(), + "cadence": b.cadence.value, + }, + ) return jsonify(id=b.id), 201 @@ -88,4 +101,16 @@ def mark_paid(bill_id: int): logger.info( "Marked bill paid id=%s user=%s next_due_date=%s", b.id, uid, b.next_due_date ) + emit_event( + uid, + "bill.paid", + { + "id": b.id, + "name": b.name, + "amount": float(b.amount), + "currency": b.currency, + "next_due_date": b.next_due_date.isoformat(), + "active": b.active, + }, + ) return jsonify(message="updated") diff --git a/packages/backend/app/routes/expenses.py b/packages/backend/app/routes/expenses.py index 1376d46f5..df6356d1d 100644 --- a/packages/backend/app/routes/expenses.py +++ b/packages/backend/app/routes/expenses.py @@ -8,6 +8,7 @@ from ..models import Expense, RecurringCadence, RecurringExpense, User from ..services.cache import cache_delete_patterns, monthly_summary_key from ..services import expense_import +from ..services.webhooks import emit_event import logging bp = Blueprint("expenses", __name__) @@ -84,6 +85,7 @@ def create_expense(): f"insights:{uid}:*", ] ) + emit_event(uid, "expense.created", _expense_to_dict(e)) return jsonify(_expense_to_dict(e)), 201 @@ -231,6 +233,7 @@ def update_expense(expense_id: int): e.spent_at = date.fromisoformat(raw_date) db.session.commit() _invalidate_expense_cache(uid, e.spent_at.isoformat()) + emit_event(uid, "expense.updated", _expense_to_dict(e)) return jsonify(_expense_to_dict(e)) @@ -242,9 +245,11 @@ def delete_expense(expense_id: int): if not e or e.user_id != uid: return jsonify(error="not found"), 404 spent_at = e.spent_at.isoformat() + deleted_payload = {"id": e.id, "date": spent_at} db.session.delete(e) db.session.commit() _invalidate_expense_cache(uid, spent_at) + emit_event(uid, "expense.deleted", deleted_payload) return jsonify(message="deleted") diff --git a/packages/backend/app/routes/reminders.py b/packages/backend/app/routes/reminders.py index 9ed7ea50e..283ae5372 100644 --- a/packages/backend/app/routes/reminders.py +++ b/packages/backend/app/routes/reminders.py @@ -5,6 +5,7 @@ from ..models import Bill, Reminder from ..observability import track_reminder_event from ..services.reminders import send_reminder +from ..services.webhooks import emit_event import logging bp = Blueprint("reminders", __name__) @@ -51,6 +52,16 @@ def create_reminder(): db.session.commit() logger.info("Created reminder id=%s user=%s", r.id, uid) track_reminder_event(event="created", channel=r.channel) + emit_event( + uid, + "reminder.created", + { + "id": r.id, + "message": r.message, + "send_at": r.send_at.isoformat(), + "channel": r.channel, + }, + ) return jsonify(id=r.id), 201 diff --git a/packages/backend/app/routes/webhooks.py b/packages/backend/app/routes/webhooks.py new file mode 100644 index 000000000..538f082b3 --- /dev/null +++ b/packages/backend/app/routes/webhooks.py @@ -0,0 +1,160 @@ +from flask import Blueprint, jsonify, request +from flask_jwt_extended import jwt_required, get_jwt_identity + +from ..extensions import db +from ..models import Webhook, WebhookDelivery +from ..services import webhooks as webhook_service + +bp = Blueprint("webhooks", __name__) + + +def _hook_to_dict(hook: Webhook, include_secret: bool = False) -> dict: + out = { + "id": hook.id, + "url": hook.url, + "events": [e.strip() for e in hook.events.split(",") if e.strip()], + "active": hook.active, + "created_at": hook.created_at.isoformat(), + } + if include_secret: + out["secret"] = hook.secret + return out + + +def _delivery_to_dict(d: WebhookDelivery) -> dict: + return { + "id": d.id, + "webhook_id": d.webhook_id, + "event_type": d.event_type, + "status": d.status, + "attempts": d.attempts, + "last_status_code": d.last_status_code, + "last_error": d.last_error, + "next_attempt_at": d.next_attempt_at.isoformat() if d.next_attempt_at else None, + "delivered_at": d.delivered_at.isoformat() if d.delivered_at else None, + "created_at": d.created_at.isoformat(), + } + + +def _normalize_events(raw) -> str | None: + if raw is None: + return "*" + if isinstance(raw, str): + items = [s.strip() for s in raw.split(",") if s.strip()] + elif isinstance(raw, list): + items = [str(s).strip() for s in raw if str(s).strip()] + else: + return None + if not items: + return "*" + for item in items: + if item == "*" or item.endswith(".*"): + continue + if item not in webhook_service.EVENT_TYPES: + return None + return ",".join(items) + + +@bp.get("") +@jwt_required() +def list_webhooks(): + uid = int(get_jwt_identity()) + items = ( + db.session.query(Webhook) + .filter_by(user_id=uid) + .order_by(Webhook.created_at.desc()) + .all() + ) + return jsonify([_hook_to_dict(h) for h in items]) + + +@bp.post("") +@jwt_required() +def create_webhook(): + uid = int(get_jwt_identity()) + data = request.get_json() or {} + url = (data.get("url") or "").strip() + if not url or not (url.startswith("http://") or url.startswith("https://")): + return jsonify(error="valid url required"), 400 + events = _normalize_events(data.get("events")) + if events is None: + return jsonify(error="invalid events; see /webhooks/events"), 400 + secret = (data.get("secret") or "").strip() or webhook_service.generate_secret() + if len(secret) > 128: + return jsonify(error="secret too long"), 400 + hook = Webhook( + user_id=uid, + url=url, + secret=secret, + events=events, + active=bool(data.get("active", True)), + ) + db.session.add(hook) + db.session.commit() + return jsonify(_hook_to_dict(hook, include_secret=True)), 201 + + +@bp.patch("/") +@jwt_required() +def update_webhook(webhook_id: int): + uid = int(get_jwt_identity()) + hook = db.session.get(Webhook, webhook_id) + if not hook or hook.user_id != uid: + return jsonify(error="not found"), 404 + data = request.get_json() or {} + if "url" in data: + url = (data.get("url") or "").strip() + if not url or not (url.startswith("http://") or url.startswith("https://")): + return jsonify(error="valid url required"), 400 + hook.url = url + if "events" in data: + events = _normalize_events(data.get("events")) + if events is None: + return jsonify(error="invalid events"), 400 + hook.events = events + if "active" in data: + hook.active = bool(data.get("active")) + db.session.commit() + return jsonify(_hook_to_dict(hook)) + + +@bp.delete("/") +@jwt_required() +def delete_webhook(webhook_id: int): + uid = int(get_jwt_identity()) + hook = db.session.get(Webhook, webhook_id) + if not hook or hook.user_id != uid: + return jsonify(error="not found"), 404 + db.session.delete(hook) + db.session.commit() + return jsonify(message="deleted") + + +@bp.get("/events") +def list_event_types(): + return jsonify(events=list(webhook_service.EVENT_TYPES)) + + +@bp.get("//deliveries") +@jwt_required() +def list_deliveries(webhook_id: int): + uid = int(get_jwt_identity()) + hook = db.session.get(Webhook, webhook_id) + if not hook or hook.user_id != uid: + return jsonify(error="not found"), 404 + items = ( + db.session.query(WebhookDelivery) + .filter_by(webhook_id=hook.id) + .order_by(WebhookDelivery.created_at.desc()) + .limit(100) + .all() + ) + return jsonify([_delivery_to_dict(d) for d in items]) + + +@bp.post("/run") +@jwt_required() +def run_pending(): + """Process pending deliveries whose retry window has elapsed.""" + result = webhook_service.process_pending() + return jsonify(result) diff --git a/packages/backend/app/services/webhooks.py b/packages/backend/app/services/webhooks.py new file mode 100644 index 000000000..e7dc408e4 --- /dev/null +++ b/packages/backend/app/services/webhooks.py @@ -0,0 +1,197 @@ +"""Webhook event system: signed delivery, retry, failure handling. + +Events emitted by the backend (see docs/webhooks.md): + expense.created, expense.updated, expense.deleted, + bill.created, bill.paid, + reminder.created. + +Subscribers register a target URL plus a secret. Each delivery is signed +with HMAC-SHA256 over the raw JSON body and sent in the +``X-FinMind-Signature`` header (``sha256=``). Failed attempts are +retried with exponential backoff up to ``MAX_ATTEMPTS`` and then marked +``FAILED``. +""" + +from __future__ import annotations + +import hashlib +import hmac +import json +import logging +import secrets +import uuid +from datetime import datetime, timedelta +from typing import Any, Iterable + +import requests + +from ..extensions import db +from ..models import Webhook, WebhookDelivery, WebhookDeliveryStatus + +logger = logging.getLogger("finmind.webhooks") + +EVENT_TYPES: tuple[str, ...] = ( + "expense.created", + "expense.updated", + "expense.deleted", + "bill.created", + "bill.paid", + "reminder.created", +) + +MAX_ATTEMPTS = 5 +REQUEST_TIMEOUT_SECONDS = 5 +SIGNATURE_HEADER = "X-FinMind-Signature" +EVENT_HEADER = "X-FinMind-Event" +DELIVERY_HEADER = "X-FinMind-Delivery" +TIMESTAMP_HEADER = "X-FinMind-Timestamp" + + +def generate_secret() -> str: + return secrets.token_hex(32) + + +def sign_payload(secret: str, body: bytes) -> str: + digest = hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest() + return f"sha256={digest}" + + +def verify_signature(secret: str, body: bytes, signature: str) -> bool: + return hmac.compare_digest(sign_payload(secret, body), signature or "") + + +def _matches(subscription: str, event_type: str) -> bool: + subs = [s.strip() for s in (subscription or "").split(",") if s.strip()] + if not subs or "*" in subs: + return True + if event_type in subs: + return True + namespace = event_type.split(".", 1)[0] + ".*" + return namespace in subs + + +def _backoff(attempts: int) -> timedelta: + # 30s, 1m, 5m, 15m, 30m + schedule = [30, 60, 300, 900, 1800] + idx = min(max(attempts - 1, 0), len(schedule) - 1) + return timedelta(seconds=schedule[idx]) + + +def emit_event(user_id: int, event_type: str, data: dict[str, Any]) -> int: + """Queue webhook deliveries for matching subscribers and try once. + + Returns the number of deliveries created. Failures are silently retried + later via :func:`process_pending`; emission never raises. + """ + try: + hooks: Iterable[Webhook] = ( + db.session.query(Webhook) + .filter_by(user_id=user_id, active=True) + .all() + ) + except Exception: + logger.exception("webhook lookup failed user=%s event=%s", user_id, event_type) + return 0 + + created = 0 + for hook in hooks: + if not _matches(hook.events, event_type): + continue + envelope = { + "id": str(uuid.uuid4()), + "type": event_type, + "created_at": datetime.utcnow().isoformat() + "Z", + "data": data, + } + delivery = WebhookDelivery( + webhook_id=hook.id, + event_type=event_type, + payload=json.dumps(envelope, separators=(",", ":"), default=str), + status=WebhookDeliveryStatus.PENDING.value, + next_attempt_at=datetime.utcnow(), + ) + db.session.add(delivery) + db.session.flush() + _attempt_delivery(hook, delivery) + created += 1 + if created: + db.session.commit() + return created + + +def _attempt_delivery(hook: Webhook, delivery: WebhookDelivery) -> bool: + body = delivery.payload.encode("utf-8") + headers = { + "Content-Type": "application/json", + EVENT_HEADER: delivery.event_type, + DELIVERY_HEADER: str(delivery.id), + TIMESTAMP_HEADER: str(int(datetime.utcnow().timestamp())), + SIGNATURE_HEADER: sign_payload(hook.secret, body), + } + delivery.attempts = (delivery.attempts or 0) + 1 + try: + response = requests.post( + hook.url, data=body, headers=headers, timeout=REQUEST_TIMEOUT_SECONDS + ) + delivery.last_status_code = response.status_code + if 200 <= response.status_code < 300: + delivery.status = WebhookDeliveryStatus.SUCCESS.value + delivery.delivered_at = datetime.utcnow() + delivery.last_error = None + logger.info( + "webhook delivered id=%s url=%s event=%s status=%s", + delivery.id, hook.url, delivery.event_type, response.status_code, + ) + return True + delivery.last_error = f"HTTP {response.status_code}" + except requests.RequestException as exc: + delivery.last_status_code = None + delivery.last_error = str(exc)[:500] + except Exception as exc: # pragma: no cover + delivery.last_status_code = None + delivery.last_error = f"unexpected: {exc}"[:500] + + if delivery.attempts >= MAX_ATTEMPTS: + delivery.status = WebhookDeliveryStatus.FAILED.value + logger.warning( + "webhook permanently failed id=%s url=%s event=%s error=%s", + delivery.id, hook.url, delivery.event_type, delivery.last_error, + ) + else: + delivery.status = WebhookDeliveryStatus.PENDING.value + delivery.next_attempt_at = datetime.utcnow() + _backoff(delivery.attempts) + logger.info( + "webhook retry scheduled id=%s next=%s attempts=%s", + delivery.id, delivery.next_attempt_at, delivery.attempts, + ) + return False + + +def process_pending(limit: int = 50) -> dict[str, int]: + """Retry deliveries whose ``next_attempt_at`` has elapsed.""" + now = datetime.utcnow() + pending = ( + db.session.query(WebhookDelivery) + .filter( + WebhookDelivery.status == WebhookDeliveryStatus.PENDING.value, + WebhookDelivery.next_attempt_at <= now, + ) + .order_by(WebhookDelivery.next_attempt_at) + .limit(limit) + .all() + ) + delivered = 0 + failed = 0 + for delivery in pending: + hook = db.session.get(Webhook, delivery.webhook_id) + if not hook or not hook.active: + delivery.status = WebhookDeliveryStatus.FAILED.value + delivery.last_error = "webhook removed or inactive" + failed += 1 + continue + if _attempt_delivery(hook, delivery): + delivered += 1 + elif delivery.status == WebhookDeliveryStatus.FAILED.value: + failed += 1 + db.session.commit() + return {"processed": len(pending), "delivered": delivered, "failed": failed} diff --git a/packages/backend/docs/webhooks.md b/packages/backend/docs/webhooks.md new file mode 100644 index 000000000..6b77211bd --- /dev/null +++ b/packages/backend/docs/webhooks.md @@ -0,0 +1,108 @@ +# Webhook Event System + +FinMind can notify external systems whenever key events happen in a user's +account (e.g. expense created, bill paid). Each delivery is signed with the +subscriber's secret so receivers can verify authenticity. + +## Managing webhooks + +All endpoints below require a JWT access token (`Authorization: Bearer ...`). + +| Method | Path | Description | +| ------ | --------------------------------------- | ---------------------------------------------------- | +| GET | `/webhooks` | List the caller's webhooks. | +| POST | `/webhooks` | Register a webhook. Returns the secret once. | +| PATCH | `/webhooks/{id}` | Update url / events / active flag. | +| DELETE | `/webhooks/{id}` | Remove a webhook (cascades its delivery history). | +| GET | `/webhooks/events` | Public list of supported event types. | +| GET | `/webhooks/{id}/deliveries` | Last 100 delivery attempts for a webhook. | +| POST | `/webhooks/run` | Process pending retries whose backoff has elapsed. | + +### Create example + +```http +POST /webhooks +Content-Type: application/json +Authorization: Bearer + +{ + "url": "https://example.com/finmind-hook", + "events": ["expense.created", "bill.paid"] +} +``` + +Response (`201`): + +```json +{ + "id": 12, + "url": "https://example.com/finmind-hook", + "events": ["expense.created", "bill.paid"], + "active": true, + "secret": "8d1c... (store this; it is not returned again)", + "created_at": "2025-01-01T00:00:00" +} +``` + +`events` accepts either a list or a comma-separated string. Use `"*"` (the +default) to receive every event, or a `namespace.*` shorthand such as +`"expense.*"`. + +## Event types + +| Event | When | +| ------------------ | ------------------------------------------------- | +| `expense.created` | A new expense is created (manual or import). | +| `expense.updated` | An expense is modified. | +| `expense.deleted` | An expense is deleted. | +| `bill.created` | A new bill is created. | +| `bill.paid` | A bill is marked paid (next due date advanced). | +| `reminder.created` | A reminder is created. | + +## Delivery format + +Each delivery is an HTTP `POST` with a JSON body: + +```json +{ + "id": "f7b6...-uuid", + "type": "expense.created", + "created_at": "2025-01-01T12:00:00Z", + "data": { "id": 99, "amount": 12.50, "currency": "USD", "...": "..." } +} +``` + +Headers: + +- `Content-Type: application/json` +- `X-FinMind-Event` — event type (e.g. `expense.created`) +- `X-FinMind-Delivery` — unique delivery id +- `X-FinMind-Timestamp` — Unix seconds when the request was sent +- `X-FinMind-Signature` — `sha256=` HMAC-SHA256 of the raw body keyed + with the webhook secret + +### Verifying signatures (Python) + +```python +import hmac, hashlib + +def verify(secret: str, body: bytes, header: str) -> bool: + digest = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() + return hmac.compare_digest(f"sha256={digest}", header or "") +``` + +Receivers should respond with a `2xx` status within 5 seconds. Any other +response (or a network error) is treated as a failure and retried. + +## Retry & failure handling + +- Each delivery is attempted up to **5 times**. +- Backoff schedule between attempts: **30s, 1m, 5m, 15m, 30m**. +- Retries fire when an authenticated client (or a scheduled job) calls + `POST /webhooks/run`; that endpoint picks up any delivery whose + `next_attempt_at` has elapsed. +- After the final failed attempt the delivery is marked `FAILED` and is + visible via `GET /webhooks/{id}/deliveries` along with the last status + code and error. +- Deliveries always carry the same `id`, so receivers can dedupe retries + idempotently. diff --git a/packages/backend/tests/test_webhooks.py b/packages/backend/tests/test_webhooks.py new file mode 100644 index 000000000..20a281427 --- /dev/null +++ b/packages/backend/tests/test_webhooks.py @@ -0,0 +1,239 @@ +import hashlib +import hmac +import json +from datetime import date, datetime, timedelta +from unittest.mock import MagicMock, patch + +import pytest +import requests + +from app.extensions import db +from app.models import Webhook, WebhookDelivery, WebhookDeliveryStatus +from app.services import webhooks as webhook_service + + +@pytest.fixture() +def webhook_url(): + return "https://example.test/hook" + + +def _ok_response(status: int = 200): + response = MagicMock() + response.status_code = status + return response + + +def test_event_types_listed_publicly(client): + r = client.get("/webhooks/events") + assert r.status_code == 200 + events = r.get_json()["events"] + assert "expense.created" in events + assert "bill.paid" in events + + +def test_register_webhook_returns_secret_then_lists_without_it(client, auth_header, webhook_url): + r = client.post( + "/webhooks", + json={"url": webhook_url, "events": ["expense.created"]}, + headers=auth_header, + ) + assert r.status_code == 201 + body = r.get_json() + assert body["secret"] + assert body["events"] == ["expense.created"] + + r = client.get("/webhooks", headers=auth_header) + assert r.status_code == 200 + items = r.get_json() + assert len(items) == 1 + assert "secret" not in items[0] + + +def test_register_rejects_invalid_url_and_events(client, auth_header): + r = client.post( + "/webhooks", json={"url": "ftp://nope"}, headers=auth_header + ) + assert r.status_code == 400 + + r = client.post( + "/webhooks", + json={"url": "https://example.test/hook", "events": ["bogus.event"]}, + headers=auth_header, + ) + assert r.status_code == 400 + + +def test_signature_can_be_verified_by_receiver(client, auth_header, app_fixture, webhook_url): + r = client.post( + "/webhooks", json={"url": webhook_url}, headers=auth_header + ) + secret = r.get_json()["secret"] + + captured: dict = {} + + def fake_post(url, data=None, headers=None, timeout=None): + captured["url"] = url + captured["data"] = data + captured["headers"] = headers + return _ok_response() + + with patch.object(webhook_service.requests, "post", side_effect=fake_post): + r = client.post( + "/expenses", + json={"amount": 10.0, "description": "coffee", "date": date.today().isoformat()}, + headers=auth_header, + ) + assert r.status_code == 201 + + body = captured["data"] + sig = captured["headers"][webhook_service.SIGNATURE_HEADER] + expected = "sha256=" + hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() + assert sig == expected + assert captured["headers"][webhook_service.EVENT_HEADER] == "expense.created" + envelope = json.loads(body.decode()) + assert envelope["type"] == "expense.created" + assert envelope["data"]["description"] == "coffee" + + with app_fixture.app_context(): + deliveries = db.session.query(WebhookDelivery).all() + assert len(deliveries) == 1 + assert deliveries[0].status == WebhookDeliveryStatus.SUCCESS.value + assert deliveries[0].attempts == 1 + + +def test_failed_delivery_is_retried_then_marked_failed(client, auth_header, app_fixture, webhook_url): + client.post("/webhooks", json={"url": webhook_url}, headers=auth_header) + + # First attempt fails on emission. + with patch.object( + webhook_service.requests, "post", + side_effect=requests.ConnectionError("boom"), + ): + client.post( + "/expenses", + json={"amount": 1.0, "description": "x", "date": date.today().isoformat()}, + headers=auth_header, + ) + + with app_fixture.app_context(): + delivery = db.session.query(WebhookDelivery).one() + assert delivery.status == WebhookDeliveryStatus.PENDING.value + assert delivery.attempts == 1 + assert delivery.last_error + # Force the retry window to elapse so /run picks it up. + delivery.next_attempt_at = datetime.utcnow() - timedelta(seconds=1) + db.session.commit() + delivery_id = delivery.id + + # Drive enough retries to exhaust MAX_ATTEMPTS. + with patch.object( + webhook_service.requests, "post", + side_effect=requests.ConnectionError("still down"), + ): + for _ in range(webhook_service.MAX_ATTEMPTS): + with app_fixture.app_context(): + d = db.session.get(WebhookDelivery, delivery_id) + if d.status != WebhookDeliveryStatus.PENDING.value: + break + d.next_attempt_at = datetime.utcnow() - timedelta(seconds=1) + db.session.commit() + r = client.post("/webhooks/run", headers=auth_header) + assert r.status_code == 200 + + with app_fixture.app_context(): + delivery = db.session.get(WebhookDelivery, delivery_id) + assert delivery.status == WebhookDeliveryStatus.FAILED.value + assert delivery.attempts == webhook_service.MAX_ATTEMPTS + + +def test_retry_eventually_succeeds(client, auth_header, app_fixture, webhook_url): + client.post("/webhooks", json={"url": webhook_url}, headers=auth_header) + + with patch.object( + webhook_service.requests, "post", + side_effect=requests.ConnectionError("temporary"), + ): + client.post( + "/expenses", + json={"amount": 1.0, "description": "x", "date": date.today().isoformat()}, + headers=auth_header, + ) + + with app_fixture.app_context(): + delivery = db.session.query(WebhookDelivery).one() + delivery.next_attempt_at = datetime.utcnow() - timedelta(seconds=1) + db.session.commit() + + with patch.object(webhook_service.requests, "post", return_value=_ok_response()): + r = client.post("/webhooks/run", headers=auth_header) + assert r.status_code == 200 + assert r.get_json()["delivered"] == 1 + + with app_fixture.app_context(): + delivery = db.session.query(WebhookDelivery).one() + assert delivery.status == WebhookDeliveryStatus.SUCCESS.value + + +def test_event_filter_skips_unsubscribed_events(client, auth_header, webhook_url): + client.post( + "/webhooks", + json={"url": webhook_url, "events": ["bill.paid"]}, + headers=auth_header, + ) + with patch.object(webhook_service.requests, "post", return_value=_ok_response()) as mock_post: + r = client.post( + "/expenses", + json={"amount": 5.0, "description": "y", "date": date.today().isoformat()}, + headers=auth_header, + ) + assert r.status_code == 201 + mock_post.assert_not_called() + + +def test_inactive_webhook_does_not_fire(client, auth_header, webhook_url): + r = client.post("/webhooks", json={"url": webhook_url}, headers=auth_header) + hook_id = r.get_json()["id"] + r = client.patch( + f"/webhooks/{hook_id}", json={"active": False}, headers=auth_header + ) + assert r.status_code == 200 + + with patch.object(webhook_service.requests, "post", return_value=_ok_response()) as mock_post: + client.post( + "/expenses", + json={"amount": 5.0, "description": "z", "date": date.today().isoformat()}, + headers=auth_header, + ) + mock_post.assert_not_called() + + +def test_delete_webhook_removes_it(client, auth_header, webhook_url): + r = client.post("/webhooks", json={"url": webhook_url}, headers=auth_header) + hook_id = r.get_json()["id"] + r = client.delete(f"/webhooks/{hook_id}", headers=auth_header) + assert r.status_code == 200 + r = client.get("/webhooks", headers=auth_header) + assert r.get_json() == [] + + +def test_user_isolation(client, auth_header, app_fixture, webhook_url): + # User A creates a webhook + client.post("/webhooks", json={"url": webhook_url}, headers=auth_header) + + # Register a second user + email, password = "other@example.com", "password123" + r = client.post("/auth/register", json={"email": email, "password": password}) + assert r.status_code in (200, 201) + r = client.post("/auth/login", json={"email": email, "password": password}) + other_header = {"Authorization": f"Bearer {r.get_json()['access_token']}"} + + r = client.get("/webhooks", headers=other_header) + assert r.get_json() == [] + + +def test_verify_signature_helper(): + body = b'{"hello":"world"}' + secret = "topsecret" + sig = webhook_service.sign_payload(secret, body) + assert webhook_service.verify_signature(secret, body, sig) is True + assert webhook_service.verify_signature(secret, body, "sha256=bad") is False