diff --git a/scripts/us_census/pep/us_pep_sex/process.py b/scripts/us_census/pep/us_pep_sex/process.py index 31bc30ef49..8eda40aa4d 100644 --- a/scripts/us_census/pep/us_pep_sex/process.py +++ b/scripts/us_census/pep/us_pep_sex/process.py @@ -42,9 +42,9 @@ _INPUT_FILE_PATH = os.path.join(_MODULE_DIR, 'input_files') _INPUT_URL_JSON = "input_url.json" _FILES_TO_DOWNLOAD = None -_GCS_OUTPUT_PERSISTENT_PATH = os.path.join( - _MODULE_DIR, 'gcs_output/us_pep_sex_source_files') -_GCS_BASE_DIR = os.path.join(_MODULE_DIR, 'gcs_output') +_GCS_FOLDER_PERSISTENT_PATH = os.path.join( + _MODULE_DIR, 'gcs_folder/us_pep_sex_source_files') +_TTL_DAYS = 30 # Time-to-live for downloaded files in days sys.path.insert(1, os.path.join(_MODULE_DIR, '../../../../')) # pylint: disable=wrong-import-position @@ -57,7 +57,7 @@ _FLAGS = flags.FLAGS default_input_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), - "gcs_output/us_pep_sex_source_files") + "gcs_folder/us_pep_sex_source_files") flags.DEFINE_string("input_path", default_input_path, "Import Data File's List") _MCF_TEMPLATE = ("Node: dcid:{pv1}\n" @@ -1184,6 +1184,24 @@ def add_future_year_urls(): # Loop through years in reverse order from 2030 to 2023 for future_year in range(2030, 2022, -1): # From 2030 to 2023 + # We check the National CSV first. If it's 404, the whole year is skipped. + gatekeeper_url = urls_to_scan[0].format(YEAR=future_year) + try: + # Use a short 5-second timeout for the check + response = requests.head(gatekeeper_url, + allow_redirects=True, + timeout=5) + if response.status_code != 200: + logging.info( + f"Skipping year {future_year}: National file not found (status code: {response.status_code})." + ) + continue + except requests.exceptions.RequestException as e: + logging.warning( + f"Skipping year {future_year} due to an error checking the gatekeeper URL: {e}" + ) + continue + YEAR = future_year # Loop through URLs for url in urls_to_scan: @@ -1230,6 +1248,20 @@ def add_future_year_urls(): f"URL is not accessible {url_to_check} due to {e}") +def cleanup(): + """Delete all old files in the gcs_folder to prevent cache bloat.""" + if os.path.exists(_GCS_FOLDER_PERSISTENT_PATH): + for file_name in os.listdir(_GCS_FOLDER_PERSISTENT_PATH): + file_path = os.path.join(_GCS_FOLDER_PERSISTENT_PATH, file_name) + if os.path.isfile(file_path): + file_age = (time.time() - os.path.getmtime(file_path)) / (24 * + 3600) + # Delete ANY file older than the TTL + if file_age > _TTL_DAYS: + logging.info(f"Cleaning up old file: {file_name}") + os.remove(file_path) + + @retry(tries=3, delay=2, backoff=2, @@ -1242,8 +1274,11 @@ def download_files(): global _FILES_TO_DOWNLOAD session = requests.session() - #Get set of already downloaded files - downloaded_files = set(os.listdir(_GCS_OUTPUT_PERSISTENT_PATH)) + # Ensure the directory exists (it shouldn't "expect" it to be there) + os.makedirs(_GCS_FOLDER_PERSISTENT_PATH, exist_ok=True) + + # Get set of already downloaded files + downloaded_files = set(os.listdir(_GCS_FOLDER_PERSISTENT_PATH)) for file_to_download in _FILES_TO_DOWNLOAD: file_name_to_save = None @@ -1255,6 +1290,18 @@ def download_files(): else: file_name_to_save = url.split('/')[-1] + # Skip if file already exists in cache (cleanup() already removed stale files) + if file_name_to_save in downloaded_files: + file_path = os.path.join(_GCS_FOLDER_PERSISTENT_PATH, + file_name_to_save) + logging.info( + f"Skipping download, using cached file: {file_name_to_save}") + + # Make sure to copy the cached file to the input directory! + shutil.copy(file_path, + os.path.join(_INPUT_FILE_PATH, file_name_to_save)) + continue + headers = {'User-Agent': 'Mozilla/5.0'} try: with session.get(url, stream=True, timeout=120, @@ -1263,45 +1310,38 @@ def download_files(): content_type = response.headers.get('Content-Type', '') - # Skip if file already exists - if file_name_to_save in downloaded_files: - logging.info( - f"Skipping already downloaded file: {file_name_to_save}" + # Minimal fix: Log error and continue to skip HTML pages + if 'html' in content_type.lower(): + logging.error( + f"Server returned HTML error page for URL: {url}. Skipping." ) continue - if 'html' in content_type.lower(): - logging.fatal( - f"Server returned HTML error page for URL: {url}") - else: - if response.status_code == 200: - with tempfile.NamedTemporaryFile( - delete=False) as tmp_file: - # Stream the response into a temp file - for chunk in response.iter_content(chunk_size=8192): - if chunk: - tmp_file.write(chunk) - tmp_file_path = tmp_file.name - - # Copy to local destination - shutil.copy( - tmp_file_path, - os.path.join(_INPUT_FILE_PATH, file_name_to_save)) - - # Copy to gcs destination - shutil.copy( - tmp_file_path, - os.path.join(_GCS_OUTPUT_PERSISTENT_PATH, - file_name_to_save)) - - # Optionally delete the temp file - os.remove(tmp_file_path) - file_to_download['is_downloaded'] = True - logging.info(f"Downloaded file: {url}") + + if response.status_code == 200: + with tempfile.NamedTemporaryFile(delete=False) as tmp_file: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + tmp_file.write(chunk) + tmp_file_path = tmp_file.name + + # Copy to local destination + shutil.copy( + tmp_file_path, + os.path.join(_INPUT_FILE_PATH, file_name_to_save)) + + # Move to gcs destination (optimized from shutil.copy + os.remove) + shutil.move( + tmp_file_path, + os.path.join(_GCS_FOLDER_PERSISTENT_PATH, + file_name_to_save)) + + file_to_download['is_downloaded'] = True + logging.info(f"Downloaded file: {url}") except Exception as e: file_to_download['is_downloaded'] = False logging.error(f"Error downloading {url}: {e}") - raise # re-raise to trigger @retry + raise time.sleep(1) return True @@ -1325,13 +1365,16 @@ def main(_): os.makedirs(data_file_path, exist_ok=True) if not (os.path.exists(_INPUT_FILE_PATH)): os.makedirs(_INPUT_FILE_PATH, exist_ok=True) - if not (os.path.exists(_GCS_OUTPUT_PERSISTENT_PATH)): - os.makedirs(_GCS_OUTPUT_PERSISTENT_PATH, exist_ok=True) + if not (os.path.exists(_GCS_FOLDER_PERSISTENT_PATH)): + os.makedirs(_GCS_FOLDER_PERSISTENT_PATH, exist_ok=True) cleaned_csv_path = data_file_path + os.sep + csv_name mcf_path = data_file_path + os.sep + mcf_name tmcf_path = data_file_path + os.sep + tmcf_name + # Perform cleanup of old files + cleanup() + download_status = True if mode == "" or mode == "download": # Get the config path from the flags @@ -1350,11 +1393,7 @@ def main(_): mcf_path, tmcf_path) loader.process() - # Only delete if it's a subdirectory of gcs_output, and not gcs_output itself - if os.path.exists(_GCS_OUTPUT_PERSISTENT_PATH) and os.path.commonpath([_GCS_OUTPUT_PERSISTENT_PATH, _GCS_BASE_DIR]) == _GCS_BASE_DIR \ - and os.path.abspath(_GCS_OUTPUT_PERSISTENT_PATH) != os.path.abspath(_GCS_BASE_DIR): - shutil.rmtree(_GCS_OUTPUT_PERSISTENT_PATH) - logging.info(f"Deleted folder: {_GCS_OUTPUT_PERSISTENT_PATH}") + # The persistent folder is intentionally kept to allow for TTL caching. except Exception as e: logging.fatal(f"The processing is failed due to the error: {e}")