From 95ae45c2707d7a8d5a3025d9d1004f4ea46377c8 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Mon, 6 Apr 2026 00:13:01 -0400 Subject: [PATCH 1/3] Close leaked CPS HDF stores --- .../changed/fix-cps-hdf5-file-handles.md | 1 + policyengine_us_data/datasets/cps/cps.py | 80 +++++++++---------- tests/unit/datasets/test_cps_file_handles.py | 67 ++++++++++++++++ 3 files changed, 108 insertions(+), 40 deletions(-) create mode 100644 changelog.d/changed/fix-cps-hdf5-file-handles.md create mode 100644 tests/unit/datasets/test_cps_file_handles.py diff --git a/changelog.d/changed/fix-cps-hdf5-file-handles.md b/changelog.d/changed/fix-cps-hdf5-file-handles.md new file mode 100644 index 000000000..b508c49ac --- /dev/null +++ b/changelog.d/changed/fix-cps-hdf5-file-handles.md @@ -0,0 +1 @@ +Close raw CPS HDF stores after previous-year income and auto-loan preprocessing so CPS builds do not leave `census_cps_2021.h5` and `census_cps_2022.h5` open at process shutdown. diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index e227760ae..72ef1f903 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -106,13 +106,13 @@ def generate(self): "For future years, use PolicyEngine's uprating at simulation time." ) - raw_data = self.raw_cps(require=True).load() cps = {} ENTITIES = ("person", "tax_unit", "family", "spm_unit", "household") - person, tax_unit, family, spm_unit, household = [ - raw_data[entity] for entity in ENTITIES - ] + with self.raw_cps(require=True).load() as raw_data: + person, tax_unit, family, spm_unit, household = [ + raw_data[entity] for entity in ENTITIES + ] logging.info("Adding ID variables") add_id_variables(cps, person, tax_unit, family, spm_unit, household) @@ -146,7 +146,6 @@ def generate(self): add_auto_loan_interest_and_net_worth(self, cps) logging.info("Added all variables") - raw_data.close() self.save_dataset(cps) logging.info("Adding takeup") add_takeup(self) @@ -911,39 +910,40 @@ def add_previous_year_income(self, cps: h5py.File) -> None: ) return - cps_current_year_data = self.raw_cps(require=True).load() - cps_previous_year_data = self.previous_year_raw_cps(require=True).load() - cps_previous_year = cps_previous_year_data.person.set_index( - cps_previous_year_data.person.PERIDNUM - ) - cps_current_year = cps_current_year_data.person.set_index( - cps_current_year_data.person.PERIDNUM - ) + with self.raw_cps(require=True).load() as cps_current_year_data, self.previous_year_raw_cps( + require=True + ).load() as cps_previous_year_data: + cps_previous_year = cps_previous_year_data.person.set_index( + cps_previous_year_data.person.PERIDNUM + ) + cps_current_year = cps_current_year_data.person.set_index( + cps_current_year_data.person.PERIDNUM + ) - previous_year_data = cps_previous_year[ - ["WSAL_VAL", "SEMP_VAL", "I_ERNVAL", "I_SEVAL"] - ].rename( - { - "WSAL_VAL": "employment_income_last_year", - "SEMP_VAL": "self_employment_income_last_year", - }, - axis=1, - ) + previous_year_data = cps_previous_year[ + ["WSAL_VAL", "SEMP_VAL", "I_ERNVAL", "I_SEVAL"] + ].rename( + { + "WSAL_VAL": "employment_income_last_year", + "SEMP_VAL": "self_employment_income_last_year", + }, + axis=1, + ) - previous_year_data = previous_year_data[ - (previous_year_data.I_ERNVAL == 0) & (previous_year_data.I_SEVAL == 0) - ] + previous_year_data = previous_year_data[ + (previous_year_data.I_ERNVAL == 0) & (previous_year_data.I_SEVAL == 0) + ] - previous_year_data.drop(["I_ERNVAL", "I_SEVAL"], axis=1, inplace=True) + previous_year_data.drop(["I_ERNVAL", "I_SEVAL"], axis=1, inplace=True) - joined_data = cps_current_year.join(previous_year_data)[ - [ - "employment_income_last_year", - "self_employment_income_last_year", - "I_ERNVAL", - "I_SEVAL", + joined_data = cps_current_year.join(previous_year_data)[ + [ + "employment_income_last_year", + "self_employment_income_last_year", + "I_ERNVAL", + "I_SEVAL", + ] ] - ] joined_data["previous_year_income_available"] = ( ~joined_data.employment_income_last_year.isna() & ~joined_data.self_employment_income_last_year.isna() @@ -1870,13 +1870,12 @@ def add_tips(self, cps: h5py.File): # Get is_married from raw CPS data (A_MARITL codes: 1,2 = married) # Note: is_married in policyengine-us is Family-level, but we need # person-level for imputation models - raw_data = self.raw_cps(require=True).load() - raw_person = raw_data["person"] - cps["is_married"] = raw_person.A_MARITL.isin([1, 2]).values - cps["is_tipped_occupation"] = derive_is_tipped_occupation( - derive_treasury_tipped_occupation_code(raw_person.PEIOOCC) - ) - raw_data.close() + with self.raw_cps(require=True).load() as raw_data: + raw_person = raw_data["person"] + cps["is_married"] = raw_person.A_MARITL.isin([1, 2]).values + cps["is_tipped_occupation"] = derive_is_tipped_occupation( + derive_treasury_tipped_occupation_code(raw_person.PEIOOCC) + ) cps["is_under_18"] = cps.age < 18 cps["is_under_6"] = cps.age < 6 @@ -2261,6 +2260,7 @@ def determine_reference_person(group): # Add is_married variable for household heads based on raw person data reference_persons = person_data[mask] receiver_data["is_married"] = reference_persons.A_MARITL.isin([1, 2]).values + raw_data.close() # Impute auto loan balance from the SCF from policyengine_us_data.datasets.scf.scf import SCF_2022 diff --git a/tests/unit/datasets/test_cps_file_handles.py b/tests/unit/datasets/test_cps_file_handles.py new file mode 100644 index 000000000..e59516bb3 --- /dev/null +++ b/tests/unit/datasets/test_cps_file_handles.py @@ -0,0 +1,67 @@ +from types import SimpleNamespace + +import numpy as np +import pandas as pd + +from policyengine_us_data.datasets.cps.cps import add_previous_year_income + + +class _FakeStore: + def __init__(self, person: pd.DataFrame): + self.person = person + self.closed = False + + def close(self): + self.closed = True + + +class _FakeDataset: + store: _FakeStore | None = None + + def __init__(self, require: bool = False): + assert require is True + + def load(self): + assert self.store is not None + return self.store + + +def test_add_previous_year_income_closes_raw_cps_handles(): + current_person = pd.DataFrame( + { + "PERIDNUM": [10, 20], + "I_ERNVAL": [0, 0], + "I_SEVAL": [0, 0], + } + ) + previous_person = pd.DataFrame( + { + "PERIDNUM": [10, 20], + "WSAL_VAL": [1_000, 2_000], + "SEMP_VAL": [100, 200], + "I_ERNVAL": [0, 0], + "I_SEVAL": [0, 0], + } + ) + + current_store = _FakeStore(current_person) + previous_store = _FakeStore(previous_person) + + current_dataset = type("CurrentDataset", (_FakeDataset,), {"store": current_store}) + previous_dataset = type( + "PreviousDataset", (_FakeDataset,), {"store": previous_store} + ) + + holder = SimpleNamespace( + raw_cps=current_dataset, + previous_year_raw_cps=previous_dataset, + ) + cps = {} + + add_previous_year_income(holder, cps) + + np.testing.assert_array_equal(cps["employment_income_last_year"], [1000, 2000]) + np.testing.assert_array_equal(cps["self_employment_income_last_year"], [100, 200]) + np.testing.assert_array_equal(cps["previous_year_income_available"], [True, True]) + assert current_store.closed is True + assert previous_store.closed is True From 84ba5c03c2c81ca4526c334e6ec670b1371393f9 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Mon, 6 Apr 2026 22:36:58 -0400 Subject: [PATCH 2/3] Use context managers for CPS raw stores --- policyengine_us_data/datasets/cps/cps.py | 223 +++++++++---------- tests/unit/datasets/test_cps_file_handles.py | 7 + 2 files changed, 118 insertions(+), 112 deletions(-) diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 72ef1f903..c421f0795 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -910,9 +910,10 @@ def add_previous_year_income(self, cps: h5py.File) -> None: ) return - with self.raw_cps(require=True).load() as cps_current_year_data, self.previous_year_raw_cps( - require=True - ).load() as cps_previous_year_data: + with ( + self.raw_cps(require=True).load() as cps_current_year_data, + self.previous_year_raw_cps(require=True).load() as cps_previous_year_data, + ): cps_previous_year = cps_previous_year_data.person.set_index( cps_previous_year_data.person.PERIDNUM ) @@ -2035,51 +2036,50 @@ def add_auto_loan_interest_and_net_worth(self, cps: h5py.File) -> None: cps_data = self.load_dataset() # Access raw CPS for additional variables - raw_data_instance = self.raw_cps(require=True) - raw_data = raw_data_instance.load() - person_data = raw_data.person - - # Preprocess the CPS for imputation - lengths = {k: len(v) for k, v in cps_data.items()} - var_len = cps_data["person_household_id"].shape[0] - vars_of_interest = [name for name, ln in lengths.items() if ln == var_len] - agg_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest}) - agg_data["interest_dividend_income"] = np.sum( - [ - agg_data["taxable_interest_income"], - agg_data["tax_exempt_interest_income"], - agg_data["qualified_dividend_income"], - agg_data["non_qualified_dividend_income"], - ], - axis=0, - ) - agg_data["social_security_pension_income"] = np.sum( - [ - agg_data["tax_exempt_private_pension_income"], - agg_data["taxable_private_pension_income"], - agg_data["social_security_retirement"], - ], - axis=0, - ) - - agg = ( - agg_data.groupby("person_household_id")[ + with self.raw_cps(require=True).load() as raw_data: + person_data = raw_data.person + + # Preprocess the CPS for imputation + lengths = {k: len(v) for k, v in cps_data.items()} + var_len = cps_data["person_household_id"].shape[0] + vars_of_interest = [name for name, ln in lengths.items() if ln == var_len] + agg_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest}) + agg_data["interest_dividend_income"] = np.sum( + [ + agg_data["taxable_interest_income"], + agg_data["tax_exempt_interest_income"], + agg_data["qualified_dividend_income"], + agg_data["non_qualified_dividend_income"], + ], + axis=0, + ) + agg_data["social_security_pension_income"] = np.sum( [ - "employment_income", - "interest_dividend_income", - "social_security_pension_income", + agg_data["tax_exempt_private_pension_income"], + agg_data["taxable_private_pension_income"], + agg_data["social_security_retirement"], + ], + axis=0, + ) + + agg = ( + agg_data.groupby("person_household_id")[ + [ + "employment_income", + "interest_dividend_income", + "social_security_pension_income", + ] ] - ] - .sum() - .rename( - columns={ - "employment_income": "household_employment_income", - "interest_dividend_income": "household_interest_dividend_income", - "social_security_pension_income": "household_social_security_pension_income", - } + .sum() + .rename( + columns={ + "employment_income": "household_employment_income", + "interest_dividend_income": "household_interest_dividend_income", + "social_security_pension_income": "household_social_security_pension_income", + } + ) + .reset_index() ) - .reset_index() - ) def create_scf_reference_person_mask(cps_data, raw_person_data): """ @@ -2189,78 +2189,77 @@ def determine_reference_person(group): return all_persons_data["is_scf_reference_person"].values - mask = create_scf_reference_person_mask(cps_data, person_data) - mask_len = mask.shape[0] - - cps_data = { - var: data[mask] if data.shape[0] == mask_len else data - for var, data in cps_data.items() - } - - CPS_RACE_MAPPING = { - 1: 1, # White only -> WHITE - 2: 2, # Black only -> BLACK/AFRICAN-AMERICAN - 3: 5, # American Indian, Alaskan Native only -> OTHER - 4: 4, # Asian only -> ASIAN - 5: 5, # Hawaiian/Pacific Islander only -> OTHER - 6: 5, # White-Black -> OTHER - 7: 5, # White-AI -> OTHER - 8: 5, # White-Asian -> OTHER - 9: 3, # White-HP -> HISPANIC - 10: 5, # Black-AI -> OTHER - 11: 5, # Black-Asian -> OTHER - 12: 3, # Black-HP -> HISPANIC - 13: 5, # AI-Asian -> OTHER - 14: 5, # AI-HP -> OTHER - 15: 3, # Asian-HP -> HISPANIC - 16: 5, # White-Black-AI -> OTHER - 17: 5, # White-Black-Asian -> OTHER - 18: 5, # White-Black-HP -> OTHER - 19: 5, # White-AI-Asian -> OTHER - 20: 5, # White-AI-HP -> OTHER - 21: 5, # White-Asian-HP -> OTHER - 22: 5, # Black-AI-Asian -> OTHER - 23: 5, # White-Black-AI-Asian -> OTHER - 24: 5, # White-AI-Asian-HP -> OTHER - 25: 5, # Other 3 race comb. -> OTHER - 26: 5, # Other 4 or 5 race comb. -> OTHER - } + mask = create_scf_reference_person_mask(cps_data, person_data) + mask_len = mask.shape[0] - # Apply the mapping to recode the race values - cps_data["cps_race"] = np.vectorize(CPS_RACE_MAPPING.get)(cps_data["cps_race"]) + cps_data = { + var: data[mask] if data.shape[0] == mask_len else data + for var, data in cps_data.items() + } - lengths = {k: len(v) for k, v in cps_data.items()} - var_len = cps_data["person_household_id"].shape[0] - vars_of_interest = [name for name, ln in lengths.items() if ln == var_len] - receiver_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest}) + CPS_RACE_MAPPING = { + 1: 1, # White only -> WHITE + 2: 2, # Black only -> BLACK/AFRICAN-AMERICAN + 3: 5, # American Indian, Alaskan Native only -> OTHER + 4: 4, # Asian only -> ASIAN + 5: 5, # Hawaiian/Pacific Islander only -> OTHER + 6: 5, # White-Black -> OTHER + 7: 5, # White-AI -> OTHER + 8: 5, # White-Asian -> OTHER + 9: 3, # White-HP -> HISPANIC + 10: 5, # Black-AI -> OTHER + 11: 5, # Black-Asian -> OTHER + 12: 3, # Black-HP -> HISPANIC + 13: 5, # AI-Asian -> OTHER + 14: 5, # AI-HP -> OTHER + 15: 3, # Asian-HP -> HISPANIC + 16: 5, # White-Black-AI -> OTHER + 17: 5, # White-Black-Asian -> OTHER + 18: 5, # White-Black-HP -> OTHER + 19: 5, # White-AI-Asian -> OTHER + 20: 5, # White-AI-HP -> OTHER + 21: 5, # White-Asian-HP -> OTHER + 22: 5, # Black-AI-Asian -> OTHER + 23: 5, # White-Black-AI-Asian -> OTHER + 24: 5, # White-AI-Asian-HP -> OTHER + 25: 5, # Other 3 race comb. -> OTHER + 26: 5, # Other 4 or 5 race comb. -> OTHER + } - receiver_data = receiver_data.merge( - agg[ - [ - "person_household_id", - "household_employment_income", - "household_interest_dividend_income", - "household_social_security_pension_income", - ] - ], - on="person_household_id", - how="left", - ) - receiver_data.drop("employment_income", axis=1, inplace=True) + # Apply the mapping to recode the race values + cps_data["cps_race"] = np.vectorize(CPS_RACE_MAPPING.get)(cps_data["cps_race"]) + + lengths = {k: len(v) for k, v in cps_data.items()} + var_len = cps_data["person_household_id"].shape[0] + vars_of_interest = [name for name, ln in lengths.items() if ln == var_len] + receiver_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest}) + + receiver_data = receiver_data.merge( + agg[ + [ + "person_household_id", + "household_employment_income", + "household_interest_dividend_income", + "household_social_security_pension_income", + ] + ], + on="person_household_id", + how="left", + ) + receiver_data.drop("employment_income", axis=1, inplace=True) - receiver_data.rename( - columns={ - "household_employment_income": "employment_income", - "household_interest_dividend_income": "interest_dividend_income", - "household_social_security_pension_income": "social_security_pension_income", - }, - inplace=True, - ) + receiver_data.rename( + columns={ + "household_employment_income": "employment_income", + "household_interest_dividend_income": "interest_dividend_income", + "household_social_security_pension_income": "social_security_pension_income", + }, + inplace=True, + ) - # Add is_married variable for household heads based on raw person data - reference_persons = person_data[mask] - receiver_data["is_married"] = reference_persons.A_MARITL.isin([1, 2]).values - raw_data.close() + # Add is_married variable for household heads based on raw person data + reference_persons = person_data[mask] + receiver_data["is_married"] = reference_persons.A_MARITL.isin([1, 2]).values # Impute auto loan balance from the SCF from policyengine_us_data.datasets.scf.scf import SCF_2022 diff --git a/tests/unit/datasets/test_cps_file_handles.py b/tests/unit/datasets/test_cps_file_handles.py index e59516bb3..84074b366 100644 --- a/tests/unit/datasets/test_cps_file_handles.py +++ b/tests/unit/datasets/test_cps_file_handles.py @@ -11,6 +11,13 @@ def __init__(self, person: pd.DataFrame): self.person = person self.closed = False + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + self.close() + return False + def close(self): self.closed = True From 8af99b7bf354fe64d24a143a31b6b2ea113a450a Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Thu, 9 Apr 2026 12:52:34 -0400 Subject: [PATCH 3/3] Use current changelog fragment naming --- ...ps-hdf5-file-handles.md => fix-cps-hdf5-file-handles.fixed.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog.d/{changed/fix-cps-hdf5-file-handles.md => fix-cps-hdf5-file-handles.fixed.md} (100%) diff --git a/changelog.d/changed/fix-cps-hdf5-file-handles.md b/changelog.d/fix-cps-hdf5-file-handles.fixed.md similarity index 100% rename from changelog.d/changed/fix-cps-hdf5-file-handles.md rename to changelog.d/fix-cps-hdf5-file-handles.fixed.md