Skip to content
131 changes: 85 additions & 46 deletions scripts/us_census/pep/us_pep_sex/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
_INPUT_FILE_PATH = os.path.join(_MODULE_DIR, 'input_files')
_INPUT_URL_JSON = "input_url.json"
_FILES_TO_DOWNLOAD = None
_GCS_OUTPUT_PERSISTENT_PATH = os.path.join(
_MODULE_DIR, 'gcs_output/us_pep_sex_source_files')
_GCS_BASE_DIR = os.path.join(_MODULE_DIR, 'gcs_output')
_GCS_FOLDER_PERSISTENT_PATH = os.path.join(
_MODULE_DIR, 'gcs_folder/us_pep_sex_source_files')
_TTL_DAYS = 30 # Time-to-live for downloaded files in days

sys.path.insert(1, os.path.join(_MODULE_DIR, '../../../../'))
# pylint: disable=wrong-import-position
Expand All @@ -57,7 +57,7 @@

_FLAGS = flags.FLAGS
default_input_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
"gcs_output/us_pep_sex_source_files")
"gcs_folder/us_pep_sex_source_files")
flags.DEFINE_string("input_path", default_input_path, "Import Data File's List")

_MCF_TEMPLATE = ("Node: dcid:{pv1}\n"
Expand Down Expand Up @@ -1184,6 +1184,24 @@ def add_future_year_urls():
# Loop through years in reverse order from 2030 to 2023
for future_year in range(2030, 2022, -1): # From 2030 to 2023

# We check the National CSV first. If it's 404, the whole year is skipped.
gatekeeper_url = urls_to_scan[0].format(YEAR=future_year)
try:
# Use a short 5-second timeout for the check
response = requests.head(gatekeeper_url,
allow_redirects=True,
timeout=5)
if response.status_code != 200:
logging.info(
f"Skipping year {future_year}: National file not found (status code: {response.status_code})."
)
continue
except requests.exceptions.RequestException as e:
logging.warning(
f"Skipping year {future_year} due to an error checking the gatekeeper URL: {e}"
)
continue
Comment thread
niveditasing marked this conversation as resolved.

YEAR = future_year
# Loop through URLs
for url in urls_to_scan:
Expand Down Expand Up @@ -1230,6 +1248,20 @@ def add_future_year_urls():
f"URL is not accessible {url_to_check} due to {e}")


def cleanup():
"""Delete all old files in the gcs_folder to prevent cache bloat."""
if os.path.exists(_GCS_FOLDER_PERSISTENT_PATH):
for file_name in os.listdir(_GCS_FOLDER_PERSISTENT_PATH):
file_path = os.path.join(_GCS_FOLDER_PERSISTENT_PATH, file_name)
if os.path.isfile(file_path):
file_age = (time.time() - os.path.getmtime(file_path)) / (24 *
3600)
# Delete ANY file older than the TTL
if file_age > _TTL_DAYS:
logging.info(f"Cleaning up old file: {file_name}")
os.remove(file_path)


@retry(tries=3,
delay=2,
backoff=2,
Expand All @@ -1242,8 +1274,11 @@ def download_files():
global _FILES_TO_DOWNLOAD
session = requests.session()

#Get set of already downloaded files
downloaded_files = set(os.listdir(_GCS_OUTPUT_PERSISTENT_PATH))
# Ensure the directory exists (it shouldn't "expect" it to be there)
os.makedirs(_GCS_FOLDER_PERSISTENT_PATH, exist_ok=True)

# Get set of already downloaded files
downloaded_files = set(os.listdir(_GCS_FOLDER_PERSISTENT_PATH))

for file_to_download in _FILES_TO_DOWNLOAD:
file_name_to_save = None
Expand All @@ -1255,6 +1290,18 @@ def download_files():
else:
file_name_to_save = url.split('/')[-1]

# Skip if file already exists in cache (cleanup() already removed stale files)
if file_name_to_save in downloaded_files:
Comment thread
niveditasing marked this conversation as resolved.
file_path = os.path.join(_GCS_FOLDER_PERSISTENT_PATH,
file_name_to_save)
logging.info(
f"Skipping download, using cached file: {file_name_to_save}")

# Make sure to copy the cached file to the input directory!
shutil.copy(file_path,
os.path.join(_INPUT_FILE_PATH, file_name_to_save))
continue

headers = {'User-Agent': 'Mozilla/5.0'}
try:
with session.get(url, stream=True, timeout=120,
Expand All @@ -1263,45 +1310,38 @@ def download_files():

content_type = response.headers.get('Content-Type', '')

# Skip if file already exists
if file_name_to_save in downloaded_files:
logging.info(
f"Skipping already downloaded file: {file_name_to_save}"
# Minimal fix: Log error and continue to skip HTML pages
if 'html' in content_type.lower():
logging.error(
f"Server returned HTML error page for URL: {url}. Skipping."
)
continue
if 'html' in content_type.lower():
logging.fatal(
f"Server returned HTML error page for URL: {url}")
else:
if response.status_code == 200:
with tempfile.NamedTemporaryFile(
delete=False) as tmp_file:
# Stream the response into a temp file
for chunk in response.iter_content(chunk_size=8192):
if chunk:
tmp_file.write(chunk)
tmp_file_path = tmp_file.name

# Copy to local destination
shutil.copy(
tmp_file_path,
os.path.join(_INPUT_FILE_PATH, file_name_to_save))

# Copy to gcs destination
shutil.copy(
tmp_file_path,
os.path.join(_GCS_OUTPUT_PERSISTENT_PATH,
file_name_to_save))

# Optionally delete the temp file
os.remove(tmp_file_path)
file_to_download['is_downloaded'] = True
logging.info(f"Downloaded file: {url}")

if response.status_code == 200:
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
tmp_file.write(chunk)
tmp_file_path = tmp_file.name

# Copy to local destination
shutil.copy(
Comment thread
niveditasing marked this conversation as resolved.
tmp_file_path,
os.path.join(_INPUT_FILE_PATH, file_name_to_save))

# Move to gcs destination (optimized from shutil.copy + os.remove)
shutil.move(
tmp_file_path,
os.path.join(_GCS_FOLDER_PERSISTENT_PATH,
file_name_to_save))

file_to_download['is_downloaded'] = True
logging.info(f"Downloaded file: {url}")

except Exception as e:
file_to_download['is_downloaded'] = False
logging.error(f"Error downloading {url}: {e}")
raise # re-raise to trigger @retry
raise
time.sleep(1)

return True
Expand All @@ -1325,13 +1365,16 @@ def main(_):
os.makedirs(data_file_path, exist_ok=True)
if not (os.path.exists(_INPUT_FILE_PATH)):
os.makedirs(_INPUT_FILE_PATH, exist_ok=True)
if not (os.path.exists(_GCS_OUTPUT_PERSISTENT_PATH)):
os.makedirs(_GCS_OUTPUT_PERSISTENT_PATH, exist_ok=True)
if not (os.path.exists(_GCS_FOLDER_PERSISTENT_PATH)):
os.makedirs(_GCS_FOLDER_PERSISTENT_PATH, exist_ok=True)

cleaned_csv_path = data_file_path + os.sep + csv_name
mcf_path = data_file_path + os.sep + mcf_name
tmcf_path = data_file_path + os.sep + tmcf_name

# Perform cleanup of old files
cleanup()

download_status = True
if mode == "" or mode == "download":
# Get the config path from the flags
Expand All @@ -1350,11 +1393,7 @@ def main(_):
mcf_path, tmcf_path)
loader.process()

# Only delete if it's a subdirectory of gcs_output, and not gcs_output itself
if os.path.exists(_GCS_OUTPUT_PERSISTENT_PATH) and os.path.commonpath([_GCS_OUTPUT_PERSISTENT_PATH, _GCS_BASE_DIR]) == _GCS_BASE_DIR \
and os.path.abspath(_GCS_OUTPUT_PERSISTENT_PATH) != os.path.abspath(_GCS_BASE_DIR):
shutil.rmtree(_GCS_OUTPUT_PERSISTENT_PATH)
logging.info(f"Deleted folder: {_GCS_OUTPUT_PERSISTENT_PATH}")
# The persistent folder is intentionally kept to allow for TTL caching.
except Exception as e:
logging.fatal(f"The processing is failed due to the error: {e}")

Expand Down
Loading