diff --git a/ferry/database/generate_changelog.py b/ferry/database/generate_changelog.py index 4172b1c90..dfe79d19c 100644 --- a/ferry/database/generate_changelog.py +++ b/ferry/database/generate_changelog.py @@ -397,6 +397,14 @@ def print_courses_diff( # Process changed courses with indexed lookups course_updates = "" course_id_to_changes: dict[int, dict[str, tuple[Any, Any]]] = {} + for _, course in diff["courses"]["added_rows"].iterrows(): + course_id = cast(int, course["course_id"]) + course_id_to_changes[course_id] = {} + for column in courses_new_indexed.columns: + if column in computed_columns["courses"]: + continue + new_val = courses_new_indexed.loc[course_id, column] + course_id_to_changes[course_id][column] = (None, new_val) for _, course in diff["courses"]["changed_rows"].iterrows(): course_id = cast(int, course["course_id"]) if course_id not in course_id_to_changes: diff --git a/ferry/database/models.py b/ferry/database/models.py index b35c0baa4..417eace12 100644 --- a/ferry/database/models.py +++ b/ferry/database/models.py @@ -305,6 +305,12 @@ class Course(BaseModel): comment="[computed] Whether last enrollment offering is with same professor as current.", ) + last_sync_diff = Column( + JSONB, + comment="Last sync: non-computed courses columns as {field: {old, new}}", + nullable=True, + ) + class Listing(BaseModel): """ diff --git a/ferry/database/sync_db_courses.py b/ferry/database/sync_db_courses.py index 78ec66c11..c07808672 100644 --- a/ferry/database/sync_db_courses.py +++ b/ferry/database/sync_db_courses.py @@ -5,7 +5,7 @@ import logging from typing import Any, Dict, Union -from sqlalchemy import MetaData, text, inspect, Connection +from sqlalchemy import MetaData, text, inspect, Connection, bindparam from psycopg2.extensions import register_adapter, AsIs from ferry.database import Database @@ -39,6 +39,58 @@ def safe_isna(value) -> bool: } +def _jsonable_scalar(v: Any) -> Any: + if v is None: + return None + if isinstance(v, (np.integer, np.int64, np.int32)): + return int(v) + if isinstance(v, (np.floating, np.float64, np.float32)): + if np.isnan(v): + return None + return float(v) + if isinstance(v, np.bool_): + return bool(v) + if isinstance(v, pd.Timestamp): + return v.isoformat() + if isinstance(v, (bytes, bytearray)): + return v.decode("utf-8", errors="replace") + if isinstance(v, np.ndarray): + return _jsonable_scalar(v.tolist()) + if isinstance(v, (list, tuple)): + return [_jsonable_scalar(x) for x in v] + if isinstance(v, dict): + return {str(k): _jsonable_scalar(x) for k, x in v.items()} + try: + if pd.isna(v): + return None + except (TypeError, ValueError): + pass + return v + + +def course_scalar_last_sync_diffs( + diff: dict[str, DiffRecord], + tables_old: dict[str, pd.DataFrame], + tables: dict[str, pd.DataFrame], +) -> dict[int, dict[str, dict[str, Any]]]: + co = tables_old["courses"].set_index("course_id") + cn = tables["courses"].set_index("course_id") + out: dict[int, dict[str, dict[str, Any]]] = {} + for _, row in diff["courses"]["changed_rows"].iterrows(): + cid = int(row["course_id"]) + fields: dict[str, dict[str, Any]] = {} + for col in row["columns_changed"]: + if col in computed_columns["courses"]: + continue + fields[col] = { + "old": _jsonable_scalar(co.loc[cid, col]), + "new": _jsonable_scalar(cn.loc[cid, col]), + } + if fields: + out[cid] = fields + return out + + def get_tables_from_db(database_connect_string: str) -> dict[str, pd.DataFrame]: db = Database(database_connect_string) db_meta = MetaData() @@ -47,7 +99,8 @@ def get_tables_from_db(database_connect_string: str) -> dict[str, pd.DataFrame]: return { table_name: pd.read_sql_table(table_name, con=conn).drop( - columns=["time_added", "last_updated"], errors="ignore" + columns=["time_added", "last_updated", "last_sync_diff"], + errors="ignore", ) for table_name in primary_keys.keys() } @@ -701,6 +754,8 @@ def sync_db_courses( diff = generate_diff(tables_old_for_diff, tables_for_diff) + course_sync_diff_by_id = course_scalar_last_sync_diffs(diff, tables_old, tables) + print_diff(diff, tables_old, tables, data_dir / "change_log") inspector = inspect(db.Engine) @@ -726,6 +781,10 @@ def sync_db_courses( ) ) + course_col_names = {c["name"] for c in inspector.get_columns("courses")} + if "last_sync_diff" not in course_col_names: + conn.execute(text("ALTER TABLE courses ADD COLUMN last_sync_diff JSONB")) + # Process tables in dependency order (buildings before locations) location_mapping = {} for table_name in tables_order_add: @@ -812,6 +871,31 @@ def sync_db_courses( commit_deletions(table_name, deleted_rows, conn) print("\033[F", end="") + changed_course_ids = list(course_sync_diff_by_id.keys()) + if changed_course_ids: + clear_stmt = text( + "UPDATE courses SET last_sync_diff = NULL WHERE last_sync_diff IS NOT NULL AND course_id NOT IN :ids" + ).bindparams(bindparam("ids", expanding=True)) + conn.execute(clear_stmt, {"ids": changed_course_ids}) + else: + conn.execute( + text( + "UPDATE courses SET last_sync_diff = NULL WHERE last_sync_diff IS NOT NULL" + ) + ) + + upd = text( + "UPDATE courses SET last_sync_diff = CAST(:payload AS jsonb) WHERE course_id = :course_id" + ) + for course_id, payload in course_sync_diff_by_id.items(): + conn.execute( + upd, + { + "course_id": int(course_id), + "payload": ujson.dumps(payload), + }, + ) + # Print row counts for each table. print("\n[Table Statistics]") with db.Engine.begin() as conn: