From b2a4ca47109d93b161dc33e0f209d67f90ce02a1 Mon Sep 17 00:00:00 2001 From: Nivedita Singh Date: Mon, 16 Feb 2026 10:18:39 +0000 Subject: [PATCH 1/8] fixed process.py --- scripts/us_census/pep/us_pep_sex/process.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/scripts/us_census/pep/us_pep_sex/process.py b/scripts/us_census/pep/us_pep_sex/process.py index 31bc30ef49..6c69fc88b9 100644 --- a/scripts/us_census/pep/us_pep_sex/process.py +++ b/scripts/us_census/pep/us_pep_sex/process.py @@ -1184,6 +1184,18 @@ def add_future_year_urls(): # Loop through years in reverse order from 2030 to 2023 for future_year in range(2030, 2022, -1): # From 2030 to 2023 + # We check the National CSV first. If it's 404, the whole year is skipped. + gatekeeper_url = urls_to_scan[0].format(YEAR=future_year) + try: + # Use a short 5-second timeout for the check + response = requests.head(gatekeeper_url, allow_redirects=True, timeout=5) + if response.status_code != 200: + logging.info(f"Skipping year {future_year}: National file not found (404).") + continue + except Exception: + continue + # --- NEW GATEKEEPER LOGIC END --- + YEAR = future_year # Loop through URLs for url in urls_to_scan: From a061ef2de2ad9cf97a11255914ed5fe0bdbb0ec5 Mon Sep 17 00:00:00 2001 From: Nivedita Singh Date: Mon, 16 Feb 2026 12:06:40 +0000 Subject: [PATCH 2/8] fixed process.py --- scripts/us_census/pep/us_pep_sex/process.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/scripts/us_census/pep/us_pep_sex/process.py b/scripts/us_census/pep/us_pep_sex/process.py index 6c69fc88b9..7cb84953bb 100644 --- a/scripts/us_census/pep/us_pep_sex/process.py +++ b/scripts/us_census/pep/us_pep_sex/process.py @@ -1188,10 +1188,14 @@ def add_future_year_urls(): gatekeeper_url = urls_to_scan[0].format(YEAR=future_year) try: # Use a short 5-second timeout for the check - response = requests.head(gatekeeper_url, allow_redirects=True, timeout=5) + response = requests.head(gatekeeper_url, + allow_redirects=True, + timeout=5) if response.status_code != 200: - logging.info(f"Skipping year {future_year}: National file not found (404).") - continue + logging.info( + f"Skipping year {future_year}: National file not found (404)." + ) + continue except Exception: continue # --- NEW GATEKEEPER LOGIC END --- From 8edf119e55b8f7f7e42060f894ed230f4f2e880b Mon Sep 17 00:00:00 2001 From: Nivedita Singh Date: Mon, 16 Feb 2026 12:36:50 +0000 Subject: [PATCH 3/8] fixed process.py --- scripts/us_census/pep/us_pep_sex/process.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/scripts/us_census/pep/us_pep_sex/process.py b/scripts/us_census/pep/us_pep_sex/process.py index 7cb84953bb..207acbda67 100644 --- a/scripts/us_census/pep/us_pep_sex/process.py +++ b/scripts/us_census/pep/us_pep_sex/process.py @@ -1192,11 +1192,10 @@ def add_future_year_urls(): allow_redirects=True, timeout=5) if response.status_code != 200: - logging.info( - f"Skipping year {future_year}: National file not found (404)." - ) + logging.info(f"Skipping year {future_year}: National file not found (status code: {response.status_code}).") continue - except Exception: + except requests.exceptions.RequestException as e: + logging.warning(f"Skipping year {future_year} due to an error checking the gatekeeper URL: {e}") continue # --- NEW GATEKEEPER LOGIC END --- From cc9af8a599130b7d83090d3d266db28ebb49a582 Mon Sep 17 00:00:00 2001 From: Nivedita Singh Date: Mon, 16 Feb 2026 12:59:57 +0000 Subject: [PATCH 4/8] fixed process.py --- scripts/us_census/pep/us_pep_sex/process.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/scripts/us_census/pep/us_pep_sex/process.py b/scripts/us_census/pep/us_pep_sex/process.py index 207acbda67..1d65478b53 100644 --- a/scripts/us_census/pep/us_pep_sex/process.py +++ b/scripts/us_census/pep/us_pep_sex/process.py @@ -1192,10 +1192,14 @@ def add_future_year_urls(): allow_redirects=True, timeout=5) if response.status_code != 200: - logging.info(f"Skipping year {future_year}: National file not found (status code: {response.status_code}).") + logging.info( + f"Skipping year {future_year}: National file not found (status code: {response.status_code})." + ) continue except requests.exceptions.RequestException as e: - logging.warning(f"Skipping year {future_year} due to an error checking the gatekeeper URL: {e}") + logging.warning( + f"Skipping year {future_year} due to an error checking the gatekeeper URL: {e}" + ) continue # --- NEW GATEKEEPER LOGIC END --- From 6d88082b4b4100545f1d9f7cd37a8b499af96b99 Mon Sep 17 00:00:00 2001 From: Nivedita Singh Date: Thu, 2 Apr 2026 13:05:48 +0000 Subject: [PATCH 5/8] added logic to skip file if server returns html --- scripts/us_census/pep/us_pep_sex/process.py | 69 ++++++++++----------- 1 file changed, 34 insertions(+), 35 deletions(-) diff --git a/scripts/us_census/pep/us_pep_sex/process.py b/scripts/us_census/pep/us_pep_sex/process.py index 1d65478b53..c07fd28406 100644 --- a/scripts/us_census/pep/us_pep_sex/process.py +++ b/scripts/us_census/pep/us_pep_sex/process.py @@ -1201,7 +1201,6 @@ def add_future_year_urls(): f"Skipping year {future_year} due to an error checking the gatekeeper URL: {e}" ) continue - # --- NEW GATEKEEPER LOGIC END --- YEAR = future_year # Loop through URLs @@ -1261,7 +1260,7 @@ def download_files(): global _FILES_TO_DOWNLOAD session = requests.session() - #Get set of already downloaded files + # Get set of already downloaded files downloaded_files = set(os.listdir(_GCS_OUTPUT_PERSISTENT_PATH)) for file_to_download in _FILES_TO_DOWNLOAD: @@ -1274,6 +1273,12 @@ def download_files(): else: file_name_to_save = url.split('/')[-1] + # Skip if file already exists (Moved up for efficiency) + if file_name_to_save in downloaded_files: + logging.info( + f"Skipping already downloaded file: {file_name_to_save}") + continue + headers = {'User-Agent': 'Mozilla/5.0'} try: with session.get(url, stream=True, timeout=120, @@ -1282,45 +1287,39 @@ def download_files(): content_type = response.headers.get('Content-Type', '') - # Skip if file already exists - if file_name_to_save in downloaded_files: - logging.info( - f"Skipping already downloaded file: {file_name_to_save}" + # Minimal fix: Log error and continue to skip HTML pages + if 'html' in content_type.lower(): + logging.error( + f"Server returned HTML error page for URL: {url}. Skipping." ) continue - if 'html' in content_type.lower(): - logging.fatal( - f"Server returned HTML error page for URL: {url}") - else: - if response.status_code == 200: - with tempfile.NamedTemporaryFile( - delete=False) as tmp_file: - # Stream the response into a temp file - for chunk in response.iter_content(chunk_size=8192): - if chunk: - tmp_file.write(chunk) - tmp_file_path = tmp_file.name - - # Copy to local destination - shutil.copy( - tmp_file_path, - os.path.join(_INPUT_FILE_PATH, file_name_to_save)) - - # Copy to gcs destination - shutil.copy( - tmp_file_path, - os.path.join(_GCS_OUTPUT_PERSISTENT_PATH, - file_name_to_save)) - - # Optionally delete the temp file - os.remove(tmp_file_path) - file_to_download['is_downloaded'] = True - logging.info(f"Downloaded file: {url}") + + if response.status_code == 200: + with tempfile.NamedTemporaryFile(delete=False) as tmp_file: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + tmp_file.write(chunk) + tmp_file_path = tmp_file.name + + # Copy to local destination + shutil.copy( + tmp_file_path, + os.path.join(_INPUT_FILE_PATH, file_name_to_save)) + + # Copy to gcs destination + shutil.copy( + tmp_file_path, + os.path.join(_GCS_OUTPUT_PERSISTENT_PATH, + file_name_to_save)) + + os.remove(tmp_file_path) + file_to_download['is_downloaded'] = True + logging.info(f"Downloaded file: {url}") except Exception as e: file_to_download['is_downloaded'] = False logging.error(f"Error downloading {url}: {e}") - raise # re-raise to trigger @retry + raise time.sleep(1) return True From fe35dc1f0c9e58e282752b2e50784e74f6490345 Mon Sep 17 00:00:00 2001 From: Nivedita Singh Date: Fri, 17 Apr 2026 09:45:22 +0000 Subject: [PATCH 6/8] resloved comments --- scripts/us_census/pep/us_pep_sex/process.py | 29 +++++++++++++-------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/scripts/us_census/pep/us_pep_sex/process.py b/scripts/us_census/pep/us_pep_sex/process.py index c07fd28406..dcb4d4f005 100644 --- a/scripts/us_census/pep/us_pep_sex/process.py +++ b/scripts/us_census/pep/us_pep_sex/process.py @@ -45,6 +45,7 @@ _GCS_OUTPUT_PERSISTENT_PATH = os.path.join( _MODULE_DIR, 'gcs_output/us_pep_sex_source_files') _GCS_BASE_DIR = os.path.join(_MODULE_DIR, 'gcs_output') +_TTL_DAYS = 30 # Time-to-live for downloaded files in days sys.path.insert(1, os.path.join(_MODULE_DIR, '../../../../')) # pylint: disable=wrong-import-position @@ -1260,6 +1261,9 @@ def download_files(): global _FILES_TO_DOWNLOAD session = requests.session() + # Ensure the directory exists (it shouldn't "expect" it to be there) + os.makedirs(_GCS_OUTPUT_PERSISTENT_PATH, exist_ok=True) + # Get set of already downloaded files downloaded_files = set(os.listdir(_GCS_OUTPUT_PERSISTENT_PATH)) @@ -1273,11 +1277,19 @@ def download_files(): else: file_name_to_save = url.split('/')[-1] - # Skip if file already exists (Moved up for efficiency) + # Skip if file already exists and is not stale (TTL) if file_name_to_save in downloaded_files: + file_path = os.path.join(_GCS_OUTPUT_PERSISTENT_PATH, + file_name_to_save) + file_age = (time.time() - os.path.getmtime(file_path)) / (24 * 3600) + if file_age < _TTL_DAYS: + logging.info( + f"Skipping already downloaded file (age: {file_age:.1f} days): {file_name_to_save}" + ) + continue logging.info( - f"Skipping already downloaded file: {file_name_to_save}") - continue + f"File {file_name_to_save} is stale (age: {file_age:.1f} days). Re-downloading..." + ) headers = {'User-Agent': 'Mozilla/5.0'} try: @@ -1306,13 +1318,12 @@ def download_files(): tmp_file_path, os.path.join(_INPUT_FILE_PATH, file_name_to_save)) - # Copy to gcs destination - shutil.copy( + # Move to gcs destination (optimized from shutil.copy + os.remove) + shutil.move( tmp_file_path, os.path.join(_GCS_OUTPUT_PERSISTENT_PATH, file_name_to_save)) - os.remove(tmp_file_path) file_to_download['is_downloaded'] = True logging.info(f"Downloaded file: {url}") @@ -1368,11 +1379,7 @@ def main(_): mcf_path, tmcf_path) loader.process() - # Only delete if it's a subdirectory of gcs_output, and not gcs_output itself - if os.path.exists(_GCS_OUTPUT_PERSISTENT_PATH) and os.path.commonpath([_GCS_OUTPUT_PERSISTENT_PATH, _GCS_BASE_DIR]) == _GCS_BASE_DIR \ - and os.path.abspath(_GCS_OUTPUT_PERSISTENT_PATH) != os.path.abspath(_GCS_BASE_DIR): - shutil.rmtree(_GCS_OUTPUT_PERSISTENT_PATH) - logging.info(f"Deleted folder: {_GCS_OUTPUT_PERSISTENT_PATH}") + # The persistent folder is intentionally kept to allow for TTL caching. except Exception as e: logging.fatal(f"The processing is failed due to the error: {e}") From 7208f1752edc9043dcb8586dc664126faab35c60 Mon Sep 17 00:00:00 2001 From: Nivedita Singh Date: Fri, 17 Apr 2026 16:47:24 +0000 Subject: [PATCH 7/8] fixed comments --- scripts/us_census/pep/us_pep_sex/process.py | 52 ++++++++++++--------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/scripts/us_census/pep/us_pep_sex/process.py b/scripts/us_census/pep/us_pep_sex/process.py index dcb4d4f005..f39cae43c9 100644 --- a/scripts/us_census/pep/us_pep_sex/process.py +++ b/scripts/us_census/pep/us_pep_sex/process.py @@ -42,9 +42,8 @@ _INPUT_FILE_PATH = os.path.join(_MODULE_DIR, 'input_files') _INPUT_URL_JSON = "input_url.json" _FILES_TO_DOWNLOAD = None -_GCS_OUTPUT_PERSISTENT_PATH = os.path.join( - _MODULE_DIR, 'gcs_output/us_pep_sex_source_files') -_GCS_BASE_DIR = os.path.join(_MODULE_DIR, 'gcs_output') +_GCS_FOLDER_PERSISTENT_PATH = os.path.join( + _MODULE_DIR, 'gcs_folder/us_pep_sex_source_files') _TTL_DAYS = 30 # Time-to-live for downloaded files in days sys.path.insert(1, os.path.join(_MODULE_DIR, '../../../../')) @@ -58,7 +57,7 @@ _FLAGS = flags.FLAGS default_input_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), - "gcs_output/us_pep_sex_source_files") + "gcs_folder/us_pep_sex_source_files") flags.DEFINE_string("input_path", default_input_path, "Import Data File's List") _MCF_TEMPLATE = ("Node: dcid:{pv1}\n" @@ -1249,6 +1248,19 @@ def add_future_year_urls(): f"URL is not accessible {url_to_check} due to {e}") +def cleanup(): + """Delete all old files in the gcs_folder to prevent cache bloat.""" + if os.path.exists(_GCS_FOLDER_PERSISTENT_PATH): + for file_name in os.listdir(_GCS_FOLDER_PERSISTENT_PATH): + file_path = os.path.join(_GCS_FOLDER_PERSISTENT_PATH, file_name) + if os.path.isfile(file_path): + file_age = (time.time() - os.path.getmtime(file_path)) / (24 * 3600) + # Delete ANY file older than the TTL + if file_age > _TTL_DAYS: + logging.info(f"Cleaning up old file: {file_name}") + os.remove(file_path) + + @retry(tries=3, delay=2, backoff=2, @@ -1262,10 +1274,10 @@ def download_files(): session = requests.session() # Ensure the directory exists (it shouldn't "expect" it to be there) - os.makedirs(_GCS_OUTPUT_PERSISTENT_PATH, exist_ok=True) + os.makedirs(_GCS_FOLDER_PERSISTENT_PATH, exist_ok=True) # Get set of already downloaded files - downloaded_files = set(os.listdir(_GCS_OUTPUT_PERSISTENT_PATH)) + downloaded_files = set(os.listdir(_GCS_FOLDER_PERSISTENT_PATH)) for file_to_download in _FILES_TO_DOWNLOAD: file_name_to_save = None @@ -1277,19 +1289,14 @@ def download_files(): else: file_name_to_save = url.split('/')[-1] - # Skip if file already exists and is not stale (TTL) + # Skip if file already exists in cache (cleanup() already removed stale files) if file_name_to_save in downloaded_files: - file_path = os.path.join(_GCS_OUTPUT_PERSISTENT_PATH, - file_name_to_save) - file_age = (time.time() - os.path.getmtime(file_path)) / (24 * 3600) - if file_age < _TTL_DAYS: - logging.info( - f"Skipping already downloaded file (age: {file_age:.1f} days): {file_name_to_save}" - ) - continue - logging.info( - f"File {file_name_to_save} is stale (age: {file_age:.1f} days). Re-downloading..." - ) + file_path = os.path.join(_GCS_FOLDER_PERSISTENT_PATH, file_name_to_save) + logging.info(f"Skipping download, using cached file: {file_name_to_save}") + + # Make sure to copy the cached file to the input directory! + shutil.copy(file_path, os.path.join(_INPUT_FILE_PATH, file_name_to_save)) + continue headers = {'User-Agent': 'Mozilla/5.0'} try: @@ -1321,7 +1328,7 @@ def download_files(): # Move to gcs destination (optimized from shutil.copy + os.remove) shutil.move( tmp_file_path, - os.path.join(_GCS_OUTPUT_PERSISTENT_PATH, + os.path.join(_GCS_FOLDER_PERSISTENT_PATH, file_name_to_save)) file_to_download['is_downloaded'] = True @@ -1354,13 +1361,16 @@ def main(_): os.makedirs(data_file_path, exist_ok=True) if not (os.path.exists(_INPUT_FILE_PATH)): os.makedirs(_INPUT_FILE_PATH, exist_ok=True) - if not (os.path.exists(_GCS_OUTPUT_PERSISTENT_PATH)): - os.makedirs(_GCS_OUTPUT_PERSISTENT_PATH, exist_ok=True) + if not (os.path.exists(_GCS_FOLDER_PERSISTENT_PATH)): + os.makedirs(_GCS_FOLDER_PERSISTENT_PATH, exist_ok=True) cleaned_csv_path = data_file_path + os.sep + csv_name mcf_path = data_file_path + os.sep + mcf_name tmcf_path = data_file_path + os.sep + tmcf_name + # Perform cleanup of old files + cleanup() + download_status = True if mode == "" or mode == "download": # Get the config path from the flags From 6af1735dd8bb57b2c51d23c7c19ebed6d7ad6e2a Mon Sep 17 00:00:00 2001 From: Nivedita Singh Date: Fri, 17 Apr 2026 17:16:34 +0000 Subject: [PATCH 8/8] fixed comments --- scripts/us_census/pep/us_pep_sex/process.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/scripts/us_census/pep/us_pep_sex/process.py b/scripts/us_census/pep/us_pep_sex/process.py index f39cae43c9..8eda40aa4d 100644 --- a/scripts/us_census/pep/us_pep_sex/process.py +++ b/scripts/us_census/pep/us_pep_sex/process.py @@ -1254,7 +1254,8 @@ def cleanup(): for file_name in os.listdir(_GCS_FOLDER_PERSISTENT_PATH): file_path = os.path.join(_GCS_FOLDER_PERSISTENT_PATH, file_name) if os.path.isfile(file_path): - file_age = (time.time() - os.path.getmtime(file_path)) / (24 * 3600) + file_age = (time.time() - os.path.getmtime(file_path)) / (24 * + 3600) # Delete ANY file older than the TTL if file_age > _TTL_DAYS: logging.info(f"Cleaning up old file: {file_name}") @@ -1291,11 +1292,14 @@ def download_files(): # Skip if file already exists in cache (cleanup() already removed stale files) if file_name_to_save in downloaded_files: - file_path = os.path.join(_GCS_FOLDER_PERSISTENT_PATH, file_name_to_save) - logging.info(f"Skipping download, using cached file: {file_name_to_save}") + file_path = os.path.join(_GCS_FOLDER_PERSISTENT_PATH, + file_name_to_save) + logging.info( + f"Skipping download, using cached file: {file_name_to_save}") # Make sure to copy the cached file to the input directory! - shutil.copy(file_path, os.path.join(_INPUT_FILE_PATH, file_name_to_save)) + shutil.copy(file_path, + os.path.join(_INPUT_FILE_PATH, file_name_to_save)) continue headers = {'User-Agent': 'Mozilla/5.0'}