From e9224065a94c6683506e1b4faae282a1da5aad66 Mon Sep 17 00:00:00 2001 From: reybahl Date: Tue, 10 Mar 2026 22:08:51 -0400 Subject: [PATCH 1/2] fix: use schema-preserving empty dataframes in changelog junction lookups --- ferry/database/generate_changelog.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/ferry/database/generate_changelog.py b/ferry/database/generate_changelog.py index 5fda40e0f..4172b1c90 100644 --- a/ferry/database/generate_changelog.py +++ b/ferry/database/generate_changelog.py @@ -254,12 +254,13 @@ def register_junction_change( junction_old_grouped: dict[int, pd.DataFrame], junction_new_grouped: dict[int, pd.DataFrame], junction_name: str, + empty_old: pd.DataFrame, + empty_new: pd.DataFrame, ): if course_id not in course_id_to_changes: course_id_to_changes[course_id] = {} - # Use pre-grouped data instead of filtering - old_group = junction_old_grouped.get(course_id, pd.DataFrame()) - new_group = junction_new_grouped.get(course_id, pd.DataFrame()) + old_group = junction_old_grouped.get(course_id, empty_old) + new_group = junction_new_grouped.get(course_id, empty_new) course_id_to_changes[course_id][junction_name] = (old_group, new_group) @@ -285,6 +286,10 @@ def register_junction_changes( for course_id, group in junction_new.groupby("course_id") } + # Schema-preserving empty DataFrames for missing course_id lookups + empty_old = junction_old.iloc[:0] + empty_new = junction_new.iloc[:0] + # Pre-index courses for O(1) lookups instead of O(n) filtering courses_old_indexed = set(tables_old["courses"]["course_id"].values) courses_new_indexed = set(tables_new["courses"]["course_id"].values) @@ -295,7 +300,7 @@ def register_junction_changes( continue register_junction_change( course_id_to_changes, course_id, junction_old_grouped, - junction_new_grouped, junction_name + junction_new_grouped, junction_name, empty_old, empty_new ) for course_id in junction_diff["deleted_rows"]["course_id"].values: # Course itself was removed @@ -303,7 +308,7 @@ def register_junction_changes( continue register_junction_change( course_id_to_changes, course_id, junction_old_grouped, - junction_new_grouped, junction_name + junction_new_grouped, junction_name, empty_old, empty_new ) for row in junction_diff["changed_rows"].itertuples(): course_id = row.course_id @@ -314,7 +319,7 @@ def register_junction_changes( continue register_junction_change( course_id_to_changes, course_id, junction_old_grouped, - junction_new_grouped, junction_name + junction_new_grouped, junction_name, empty_old, empty_new ) From c4b4f5a14b50684716ba291258a9d9182e459444 Mon Sep 17 00:00:00 2001 From: reybahl Date: Tue, 10 Mar 2026 22:50:23 -0400 Subject: [PATCH 2/2] feat: diff optimizations --- ferry/database/sync_db_courses.py | 62 ++++++++++++++++++------------- 1 file changed, 36 insertions(+), 26 deletions(-) diff --git a/ferry/database/sync_db_courses.py b/ferry/database/sync_db_courses.py index 9411a6e83..f407ebb4f 100644 --- a/ferry/database/sync_db_courses.py +++ b/ferry/database/sync_db_courses.py @@ -73,15 +73,20 @@ def generate_diff( old_df = old_df.set_index(pk) new_df = new_df.set_index(pk) - deleted_rows = old_df[~old_df.index.isin(new_df.index)] - added_rows = new_df[~new_df.index.isin(old_df.index)] + # Use set for O(1) index lookups instead of O(n) scan + new_index_set = set(new_df.index) + old_index_set = set(old_df.index) + + deleted_rows = old_df[~old_df.index.isin(new_index_set)] + added_rows = new_df[~new_df.index.isin(old_index_set)] # Must sort index in order to compare dataframes cell-wise + shared_index = old_index_set & new_index_set shared_rows_old = ( - old_df[old_df.index.isin(new_df.index)].sort_index().sort_index(axis=1) + old_df[old_df.index.isin(shared_index)].sort_index().sort_index(axis=1) ) shared_rows_new = ( - new_df[new_df.index.isin(old_df.index)].sort_index().sort_index(axis=1) + new_df[new_df.index.isin(shared_index)].sort_index().sort_index(axis=1) ) if (shared_rows_old.index != shared_rows_new.index).any(): print(shared_rows_old.index) @@ -96,32 +101,37 @@ def generate_diff( raise ValueError( f"Column mismatch in table {table_name}. Run with --rewrite once to fix." ) - # Do not allow type changes unless one of them is NA - old_types = shared_rows_old.map(type) - new_types = shared_rows_new.map(type) - different_types = ~( - (old_types == new_types) | shared_rows_old.isna() | shared_rows_new.isna() + + # Hash-based change detection: only do O(rows*cols) cell comparison for rows + # where hashes differ. When most rows are unchanged (typical for incremental + # sync), this reduces complexity from O(n*m) to O(n) + O(changed*cols). + old_hashes = pd.util.hash_pandas_object(shared_rows_old, index=False) + new_hashes = pd.util.hash_pandas_object(shared_rows_new, index=False) + unchanged_mask = (old_hashes == new_hashes).reindex(shared_rows_old.index) + + unequal_mask = pd.DataFrame( + False, index=shared_rows_old.index, columns=shared_rows_old.columns ) - if different_types.any().any(): - row, col = list(zip(*different_types.values.nonzero()))[0] - print( - f"Type mismatch in {table_name} at ({row}, {col}) (column {shared_rows_old.columns[col]})" + possibly_changed = ~unchanged_mask + if possibly_changed.any(): + # Only compare cells for rows that might have changed (hash differed) + subset_old = shared_rows_old.loc[possibly_changed] + subset_new = shared_rows_new.loc[possibly_changed] + subset_unequal = ~( + (subset_old == subset_new) + | (subset_old.isna() & subset_new.isna()) ) - print(f"Old type: {old_types.iat[row, col]}") - print(f"New type: {new_types.iat[row, col]}") - print(f"Old value: {shared_rows_old.iat[row, col]}") - print(f"New value: {shared_rows_new.iat[row, col]}") - raise TypeError("Type mismatch") - unequal_mask = ~( - (shared_rows_old == shared_rows_new) - | (shared_rows_old.isna() & shared_rows_new.isna()) - ) + unequal_mask.loc[possibly_changed] = subset_unequal - changed_rows = shared_rows_new[unequal_mask.any(axis=1)].copy() + changed_row_mask = unequal_mask.any(axis=1) + changed_rows = shared_rows_new[changed_row_mask].copy() if len(changed_rows) > 0: - changed_rows["columns_changed"] = unequal_mask[ - unequal_mask.any(axis=1) - ].apply(lambda row: shared_rows_new.columns[row].tolist(), axis=1) + # Vectorized: avoid slow apply(axis=1), use list comprehension over numpy + cols = shared_rows_new.columns + unequal_arr = unequal_mask.loc[changed_row_mask].values + changed_rows["columns_changed"] = [ + cols[unequal_arr[i]].tolist() for i in range(len(changed_rows)) + ] else: changed_rows["columns_changed"] = pd.Series(dtype=object)