Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions packages/backend/app/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,17 @@ CREATE TABLE IF NOT EXISTS audit_logs (
action VARCHAR(100) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS bank_accounts (
id SERIAL PRIMARY KEY,
user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
provider VARCHAR(50) NOT NULL,
external_id VARCHAR(255) NOT NULL,
name VARCHAR(200) NOT NULL,
account_type VARCHAR(50),
currency VARCHAR(10) NOT NULL DEFAULT 'USD',
last_synced_at TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
UNIQUE(user_id, provider, external_id)
);
CREATE INDEX IF NOT EXISTS idx_bank_accounts_user ON bank_accounts(user_id);
13 changes: 13 additions & 0 deletions packages/backend/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,16 @@ class AuditLog(db.Model):
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True)
action = db.Column(db.String(100), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)


class BankAccount(db.Model):
__tablename__ = "bank_accounts"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
provider = db.Column(db.String(50), nullable=False)
external_id = db.Column(db.String(255), nullable=False)
name = db.Column(db.String(200), nullable=False)
account_type = db.Column(db.String(50), nullable=True)
currency = db.Column(db.String(10), default="USD", nullable=False)
last_synced_at = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
2 changes: 2 additions & 0 deletions packages/backend/app/routes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .categories import bp as categories_bp
from .docs import bp as docs_bp
from .dashboard import bp as dashboard_bp
from .bank_sync import bp as bank_sync_bp


def register_routes(app: Flask):
Expand All @@ -18,3 +19,4 @@ def register_routes(app: Flask):
app.register_blueprint(categories_bp, url_prefix="/categories")
app.register_blueprint(docs_bp, url_prefix="/docs")
app.register_blueprint(dashboard_bp, url_prefix="/dashboard")
app.register_blueprint(bank_sync_bp, url_prefix="/bank-sync")
210 changes: 210 additions & 0 deletions packages/backend/app/routes/bank_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import logging
from datetime import date, datetime
from decimal import Decimal

from flask import Blueprint, jsonify, request
from flask_jwt_extended import get_jwt_identity, jwt_required

from ..extensions import db
from ..models import BankAccount, Expense, User
from ..services import bank_sync
from ..services.cache import cache_delete_patterns, monthly_summary_key

bp = Blueprint("bank_sync", __name__)
logger = logging.getLogger("finmind.bank_sync")


@bp.get("/connectors")
@jwt_required()
def list_connectors():
return jsonify(bank_sync.list_connectors())


@bp.get("/accounts")
@jwt_required()
def list_accounts():
uid = int(get_jwt_identity())
items = (
db.session.query(BankAccount)
.filter_by(user_id=uid)
.order_by(BankAccount.created_at.desc())
.all()
)
return jsonify([_account_to_dict(a) for a in items])


@bp.post("/accounts")
@jwt_required()
def link_account():
uid = int(get_jwt_identity())
data = request.get_json() or {}
provider = (data.get("provider") or "").strip().lower()
credentials = data.get("credentials") or {}
if not provider:
return jsonify(error="provider required"), 400
try:
connector = bank_sync.get_connector(provider)
except KeyError:
return jsonify(error=f"unknown provider: {provider}"), 404
try:
accounts = connector.connect(credentials)
except bank_sync.ConnectorError as exc:
return jsonify(error=str(exc)), 400

requested_id = data.get("external_id")
if requested_id:
accounts = [a for a in accounts if a.external_id == requested_id]
if not accounts:
return jsonify(error="account not available from connector"), 404

created: list[BankAccount] = []
for info in accounts:
existing = (
db.session.query(BankAccount)
.filter_by(user_id=uid, provider=provider, external_id=info.external_id)
.first()
)
if existing:
created.append(existing)
continue
account = BankAccount(
user_id=uid,
provider=provider,
external_id=info.external_id,
name=info.name,
account_type=info.account_type,
currency=info.currency,
)
db.session.add(account)
created.append(account)
db.session.commit()
logger.info(
"Linked bank accounts user=%s provider=%s count=%s",
uid,
provider,
len(created),
)
return jsonify([_account_to_dict(a) for a in created]), 201


@bp.delete("/accounts/<int:account_id>")
@jwt_required()
def unlink_account(account_id: int):
uid = int(get_jwt_identity())
account = db.session.get(BankAccount, account_id)
if not account or account.user_id != uid:
return jsonify(error="not found"), 404
db.session.delete(account)
db.session.commit()
return jsonify(message="deleted")


@bp.post("/accounts/<int:account_id>/import")
@jwt_required()
def import_account(account_id: int):
return _sync_account(account_id, mode="import")


@bp.post("/accounts/<int:account_id>/refresh")
@jwt_required()
def refresh_account(account_id: int):
return _sync_account(account_id, mode="refresh")


def _sync_account(account_id: int, *, mode: str):
uid = int(get_jwt_identity())
account = db.session.get(BankAccount, account_id)
if not account or account.user_id != uid:
return jsonify(error="not found"), 404
try:
connector = bank_sync.get_connector(account.provider)
except KeyError:
return jsonify(error=f"unknown provider: {account.provider}"), 400

since: date | None = None
if mode == "refresh" and account.last_synced_at:
since = account.last_synced_at.date()

try:
transactions = connector.fetch_transactions(
account.external_id, since=since
)
except bank_sync.ConnectorError as exc:
return jsonify(error=str(exc)), 400

user = db.session.get(User, uid)
fallback_currency = (
account.currency or (user.preferred_currency if user else "USD")
)

inserted = 0
duplicates = 0
touched_months: set[str] = set()
for tx in transactions:
if _is_duplicate_tx(uid, tx):
duplicates += 1
continue
expense = Expense(
user_id=uid,
amount=Decimal(str(tx.amount)).quantize(Decimal("0.01")),
currency=tx.currency or fallback_currency,
expense_type=(tx.expense_type or "EXPENSE").upper(),
notes=tx.description[:500],
spent_at=tx.date,
)
db.session.add(expense)
inserted += 1
touched_months.add(tx.date.strftime("%Y-%m"))

account.last_synced_at = datetime.utcnow()
db.session.commit()
for ym in touched_months:
cache_delete_patterns(
[
monthly_summary_key(uid, ym),
f"insights:{uid}:*",
f"user:{uid}:dashboard_summary:*",
]
)
logger.info(
"Bank sync %s user=%s account=%s inserted=%s duplicates=%s",
mode,
uid,
account_id,
inserted,
duplicates,
)
return jsonify(
mode=mode,
inserted=inserted,
duplicates=duplicates,
last_synced_at=account.last_synced_at.isoformat(),
)


def _is_duplicate_tx(uid: int, tx) -> bool:
amount = Decimal(str(tx.amount)).quantize(Decimal("0.01"))
return (
db.session.query(Expense)
.filter_by(
user_id=uid,
spent_at=tx.date,
amount=amount,
notes=tx.description[:500],
)
.first()
is not None
)


def _account_to_dict(a: BankAccount) -> dict:
return {
"id": a.id,
"provider": a.provider,
"external_id": a.external_id,
"name": a.name,
"account_type": a.account_type,
"currency": a.currency,
"last_synced_at": a.last_synced_at.isoformat() if a.last_synced_at else None,
"created_at": a.created_at.isoformat() if a.created_at else None,
}
41 changes: 41 additions & 0 deletions packages/backend/app/services/bank_sync/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Pluggable bank sync connector architecture.

A connector is a class that implements :class:`BankConnector` and is
registered via :func:`register_connector`. Connectors expose two operations
needed for bank integration:

- ``connect``: validate credentials and return the linkable bank accounts
- ``fetch_transactions``: pull transactions for a linked account, optionally
filtered by ``since`` for incremental refresh

Concrete connectors live in sibling modules (e.g. ``mock.py``). Importing
this package eagerly imports the bundled mock connector so it is always
available.
"""

from .base import (
BankAccountInfo,
BankConnector,
BankTransaction,
ConnectorError,
)
from .registry import (
available_connectors,
get_connector,
list_connectors,
register_connector,
)

# Eagerly import bundled connectors so they self-register.
from . import mock # noqa: F401

__all__ = [
"BankAccountInfo",
"BankConnector",
"BankTransaction",
"ConnectorError",
"available_connectors",
"get_connector",
"list_connectors",
"register_connector",
]
79 changes: 79 additions & 0 deletions packages/backend/app/services/bank_sync/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import date
from typing import Any


class ConnectorError(Exception):
"""Raised when a connector cannot complete an operation."""


@dataclass
class BankAccountInfo:
"""Account metadata returned by a connector during ``connect``."""

external_id: str
name: str
account_type: str | None = None
currency: str = "USD"
metadata: dict[str, Any] = field(default_factory=dict)


@dataclass
class BankTransaction:
"""A single transaction returned by ``fetch_transactions``.

``external_id`` should be stable across refreshes so the caller can
deduplicate. ``amount`` is always positive; ``expense_type`` is one of
``"EXPENSE"`` or ``"INCOME"``.
"""

external_id: str
date: date
amount: float
description: str
currency: str = "USD"
expense_type: str = "EXPENSE"

def to_import_row(self) -> dict[str, Any]:
return {
"external_id": self.external_id,
"date": self.date.isoformat(),
"amount": float(self.amount),
"description": self.description,
"currency": self.currency,
"expense_type": self.expense_type,
}


class BankConnector(ABC):
"""Interface every bank connector must implement.

Subclasses must define ``provider_name`` and ``display_name`` and
implement ``connect`` and ``fetch_transactions``.
"""

provider_name: str = ""
display_name: str = ""
required_credentials: tuple[str, ...] = ()

@abstractmethod
def connect(self, credentials: dict[str, Any]) -> list[BankAccountInfo]:
"""Validate credentials and return the accounts available to link."""

@abstractmethod
def fetch_transactions(
self,
external_account_id: str,
*,
since: date | None = None,
) -> list[BankTransaction]:
"""Return transactions for an account.

``since`` is an inclusive lower bound on the transaction date. When
``None`` the connector should return its full available history,
which is what ``import`` uses for the initial pull. ``refresh`` will
pass the account's ``last_synced_at`` date.
"""
Loading