Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions packages/backend/app/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,30 @@ CREATE TABLE IF NOT EXISTS audit_logs (
action VARCHAR(100) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS webhooks (
id SERIAL PRIMARY KEY,
user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
url VARCHAR(500) NOT NULL,
secret VARCHAR(128) NOT NULL,
events VARCHAR(1000) NOT NULL DEFAULT '*',
active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_webhooks_user ON webhooks(user_id, active);

CREATE TABLE IF NOT EXISTS webhook_deliveries (
id SERIAL PRIMARY KEY,
webhook_id INT NOT NULL REFERENCES webhooks(id) ON DELETE CASCADE,
event_type VARCHAR(100) NOT NULL,
payload TEXT NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
attempts INT NOT NULL DEFAULT 0,
last_status_code INT,
last_error VARCHAR(500),
next_attempt_at TIMESTAMP NOT NULL DEFAULT NOW(),
delivered_at TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_status_next
ON webhook_deliveries(status, next_attempt_at);
36 changes: 36 additions & 0 deletions packages/backend/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,39 @@ class AuditLog(db.Model):
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True)
action = db.Column(db.String(100), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)


class Webhook(db.Model):
__tablename__ = "webhooks"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
url = db.Column(db.String(500), nullable=False)
secret = db.Column(db.String(128), nullable=False)
events = db.Column(db.String(1000), nullable=False, default="*")
active = db.Column(db.Boolean, default=True, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)


class WebhookDeliveryStatus(str, Enum):
PENDING = "PENDING"
SUCCESS = "SUCCESS"
FAILED = "FAILED"


class WebhookDelivery(db.Model):
__tablename__ = "webhook_deliveries"
id = db.Column(db.Integer, primary_key=True)
webhook_id = db.Column(
db.Integer, db.ForeignKey("webhooks.id", ondelete="CASCADE"), nullable=False
)
event_type = db.Column(db.String(100), nullable=False)
payload = db.Column(db.Text, nullable=False)
status = db.Column(
db.String(20), default=WebhookDeliveryStatus.PENDING.value, nullable=False
)
attempts = db.Column(db.Integer, default=0, nullable=False)
last_status_code = db.Column(db.Integer, nullable=True)
last_error = db.Column(db.String(500), nullable=True)
next_attempt_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
delivered_at = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
2 changes: 2 additions & 0 deletions packages/backend/app/routes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .categories import bp as categories_bp
from .docs import bp as docs_bp
from .dashboard import bp as dashboard_bp
from .webhooks import bp as webhooks_bp


def register_routes(app: Flask):
Expand All @@ -18,3 +19,4 @@ def register_routes(app: Flask):
app.register_blueprint(categories_bp, url_prefix="/categories")
app.register_blueprint(docs_bp, url_prefix="/docs")
app.register_blueprint(dashboard_bp, url_prefix="/dashboard")
app.register_blueprint(webhooks_bp, url_prefix="/webhooks")
25 changes: 25 additions & 0 deletions packages/backend/app/routes/bills.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ..extensions import db
from ..models import Bill, BillCadence, User
from ..services.cache import cache_delete_patterns
from ..services.webhooks import emit_event
import logging

bp = Blueprint("bills", __name__)
Expand Down Expand Up @@ -62,6 +63,18 @@ def create_bill():
cache_delete_patterns(
[f"user:{uid}:upcoming_bills*", f"user:{uid}:dashboard_summary:*"]
)
emit_event(
uid,
"bill.created",
{
"id": b.id,
"name": b.name,
"amount": float(b.amount),
"currency": b.currency,
"next_due_date": b.next_due_date.isoformat(),
"cadence": b.cadence.value,
},
)
return jsonify(id=b.id), 201


Expand All @@ -88,4 +101,16 @@ def mark_paid(bill_id: int):
logger.info(
"Marked bill paid id=%s user=%s next_due_date=%s", b.id, uid, b.next_due_date
)
emit_event(
uid,
"bill.paid",
{
"id": b.id,
"name": b.name,
"amount": float(b.amount),
"currency": b.currency,
"next_due_date": b.next_due_date.isoformat(),
"active": b.active,
},
)
return jsonify(message="updated")
5 changes: 5 additions & 0 deletions packages/backend/app/routes/expenses.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ..models import Expense, RecurringCadence, RecurringExpense, User
from ..services.cache import cache_delete_patterns, monthly_summary_key
from ..services import expense_import
from ..services.webhooks import emit_event
import logging

bp = Blueprint("expenses", __name__)
Expand Down Expand Up @@ -84,6 +85,7 @@ def create_expense():
f"insights:{uid}:*",
]
)
emit_event(uid, "expense.created", _expense_to_dict(e))
return jsonify(_expense_to_dict(e)), 201


Expand Down Expand Up @@ -231,6 +233,7 @@ def update_expense(expense_id: int):
e.spent_at = date.fromisoformat(raw_date)
db.session.commit()
_invalidate_expense_cache(uid, e.spent_at.isoformat())
emit_event(uid, "expense.updated", _expense_to_dict(e))
return jsonify(_expense_to_dict(e))


Expand All @@ -242,9 +245,11 @@ def delete_expense(expense_id: int):
if not e or e.user_id != uid:
return jsonify(error="not found"), 404
spent_at = e.spent_at.isoformat()
deleted_payload = {"id": e.id, "date": spent_at}
db.session.delete(e)
db.session.commit()
_invalidate_expense_cache(uid, spent_at)
emit_event(uid, "expense.deleted", deleted_payload)
return jsonify(message="deleted")


Expand Down
11 changes: 11 additions & 0 deletions packages/backend/app/routes/reminders.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from ..models import Bill, Reminder
from ..observability import track_reminder_event
from ..services.reminders import send_reminder
from ..services.webhooks import emit_event
import logging

bp = Blueprint("reminders", __name__)
Expand Down Expand Up @@ -51,6 +52,16 @@ def create_reminder():
db.session.commit()
logger.info("Created reminder id=%s user=%s", r.id, uid)
track_reminder_event(event="created", channel=r.channel)
emit_event(
uid,
"reminder.created",
{
"id": r.id,
"message": r.message,
"send_at": r.send_at.isoformat(),
"channel": r.channel,
},
)
return jsonify(id=r.id), 201


Expand Down
160 changes: 160 additions & 0 deletions packages/backend/app/routes/webhooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from flask import Blueprint, jsonify, request
from flask_jwt_extended import jwt_required, get_jwt_identity

from ..extensions import db
from ..models import Webhook, WebhookDelivery
from ..services import webhooks as webhook_service

bp = Blueprint("webhooks", __name__)


def _hook_to_dict(hook: Webhook, include_secret: bool = False) -> dict:
out = {
"id": hook.id,
"url": hook.url,
"events": [e.strip() for e in hook.events.split(",") if e.strip()],
"active": hook.active,
"created_at": hook.created_at.isoformat(),
}
if include_secret:
out["secret"] = hook.secret
return out


def _delivery_to_dict(d: WebhookDelivery) -> dict:
return {
"id": d.id,
"webhook_id": d.webhook_id,
"event_type": d.event_type,
"status": d.status,
"attempts": d.attempts,
"last_status_code": d.last_status_code,
"last_error": d.last_error,
"next_attempt_at": d.next_attempt_at.isoformat() if d.next_attempt_at else None,
"delivered_at": d.delivered_at.isoformat() if d.delivered_at else None,
"created_at": d.created_at.isoformat(),
}


def _normalize_events(raw) -> str | None:
if raw is None:
return "*"
if isinstance(raw, str):
items = [s.strip() for s in raw.split(",") if s.strip()]
elif isinstance(raw, list):
items = [str(s).strip() for s in raw if str(s).strip()]
else:
return None
if not items:
return "*"
for item in items:
if item == "*" or item.endswith(".*"):
continue
if item not in webhook_service.EVENT_TYPES:
return None
return ",".join(items)


@bp.get("")
@jwt_required()
def list_webhooks():
uid = int(get_jwt_identity())
items = (
db.session.query(Webhook)
.filter_by(user_id=uid)
.order_by(Webhook.created_at.desc())
.all()
)
return jsonify([_hook_to_dict(h) for h in items])


@bp.post("")
@jwt_required()
def create_webhook():
uid = int(get_jwt_identity())
data = request.get_json() or {}
url = (data.get("url") or "").strip()
if not url or not (url.startswith("http://") or url.startswith("https://")):
return jsonify(error="valid url required"), 400
events = _normalize_events(data.get("events"))
if events is None:
return jsonify(error="invalid events; see /webhooks/events"), 400
secret = (data.get("secret") or "").strip() or webhook_service.generate_secret()
if len(secret) > 128:
return jsonify(error="secret too long"), 400
hook = Webhook(
user_id=uid,
url=url,
secret=secret,
events=events,
active=bool(data.get("active", True)),
)
db.session.add(hook)
db.session.commit()
return jsonify(_hook_to_dict(hook, include_secret=True)), 201


@bp.patch("/<int:webhook_id>")
@jwt_required()
def update_webhook(webhook_id: int):
uid = int(get_jwt_identity())
hook = db.session.get(Webhook, webhook_id)
if not hook or hook.user_id != uid:
return jsonify(error="not found"), 404
data = request.get_json() or {}
if "url" in data:
url = (data.get("url") or "").strip()
if not url or not (url.startswith("http://") or url.startswith("https://")):
return jsonify(error="valid url required"), 400
hook.url = url
if "events" in data:
events = _normalize_events(data.get("events"))
if events is None:
return jsonify(error="invalid events"), 400
hook.events = events
if "active" in data:
hook.active = bool(data.get("active"))
db.session.commit()
return jsonify(_hook_to_dict(hook))


@bp.delete("/<int:webhook_id>")
@jwt_required()
def delete_webhook(webhook_id: int):
uid = int(get_jwt_identity())
hook = db.session.get(Webhook, webhook_id)
if not hook or hook.user_id != uid:
return jsonify(error="not found"), 404
db.session.delete(hook)
db.session.commit()
return jsonify(message="deleted")


@bp.get("/events")
def list_event_types():
return jsonify(events=list(webhook_service.EVENT_TYPES))


@bp.get("/<int:webhook_id>/deliveries")
@jwt_required()
def list_deliveries(webhook_id: int):
uid = int(get_jwt_identity())
hook = db.session.get(Webhook, webhook_id)
if not hook or hook.user_id != uid:
return jsonify(error="not found"), 404
items = (
db.session.query(WebhookDelivery)
.filter_by(webhook_id=hook.id)
.order_by(WebhookDelivery.created_at.desc())
.limit(100)
.all()
)
return jsonify([_delivery_to_dict(d) for d in items])


@bp.post("/run")
@jwt_required()
def run_pending():
"""Process pending deliveries whose retry window has elapsed."""
result = webhook_service.process_pending()
return jsonify(result)
Loading