diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index 9fed1b5..8a240c8 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -31,11 +31,13 @@ jobs: uses: actions/setup-python@v6 with: python-version: "3.12" + cache: "pip" + cache-dependency-path: requirements-cd.txt - name: Install dependencies run: | python -m pip install --upgrade pip - pip install db-dtypes duckdb google-cloud-bigquery google-cloud-bigquery-storage pandas pyarrow requests + pip install -r requirements-cd.txt - name: Authorize Google Cloud uses: google-github-actions/auth@v3 @@ -48,7 +50,8 @@ jobs: run: | python scripts/python/idc_index_data_manager.py \ --generate-parquet \ - --output-dir release_artifacts + --output-dir release_artifacts \ + --cache-bucket idc-index-data-cache env: GCP_PROJECT: ${{ env.GCP_PROJECT }} diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 282d178..60d4caf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,10 +38,18 @@ jobs: with: extra_args: --hook-stage manual --all-files + - name: Cache nox pylint virtualenv + uses: actions/cache@v4 + with: + path: .nox/pylint + key: + nox-pylint-${{ runner.os }}-${{ hashFiles('pyproject.toml', + 'noxfile.py') }} + - name: Run PyLint run: | echo "::add-matcher::$GITHUB_WORKSPACE/.github/matchers/pylint.json" - pipx run nox -s pylint + pipx run nox -s pylint -r checks: name: Check Python ${{ matrix.python-version }} on ${{ matrix.runs-on }} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 553d7ca..8baf460 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -58,6 +58,7 @@ repos: - pytest - pandas-stubs - google-cloud-bigquery + - google-cloud-storage - repo: https://github.com/codespell-project/codespell rev: "v2.4.2" diff --git a/assets/version_metadata_index.sql b/assets/version_metadata_index.sql new file mode 100644 index 0000000..c6b722f --- /dev/null +++ b/assets/version_metadata_index.sql @@ -0,0 +1,20 @@ +# table-description: +# This table contains metadata about each IDC data release version. Each row +# corresponds to one IDC version and captures when that version was created. +# This index can be used to correlate data in other indexes (which include +# idc_version columns) with the corresponding release timestamps. + +SELECT + # description: + # IDC version number identifying the data release + idc_version, + + # description: + # timestamp when this IDC version was created + version_timestamp + +FROM + `bigquery-public-data.idc_v24.version_metadata` + +ORDER BY + idc_version diff --git a/requirements-cd.txt b/requirements-cd.txt new file mode 100644 index 0000000..6cf8cce --- /dev/null +++ b/requirements-cd.txt @@ -0,0 +1,8 @@ +db-dtypes +duckdb +google-cloud-bigquery +google-cloud-bigquery-storage +google-cloud-storage +pandas +pyarrow +requests diff --git a/scripts/python/idc_index_data_manager.py b/scripts/python/idc_index_data_manager.py index 95f9613..d4914af 100644 --- a/scripts/python/idc_index_data_manager.py +++ b/scripts/python/idc_index_data_manager.py @@ -1,5 +1,7 @@ +# mypy: ignore-errors from __future__ import annotations +import hashlib import json import logging import os @@ -241,6 +243,55 @@ def _extract_column_name(column_def: str) -> str | None: return None + @staticmethod + def _compute_file_hash(file_path: str) -> str: + with Path(file_path).open("rb") as f: + return hashlib.sha256(f.read()).hexdigest() + + def _try_restore_from_cache( + self, + storage_client, + cache_bucket: str, + sql_hash: str, + output_basename: str, + output_dir: Path, + ) -> bool: + """Download cached artifacts from GCS. Returns True on cache hit.""" + bucket = storage_client.bucket(cache_bucket) + cache_prefix = f"cache/{sql_hash}/{output_basename}" + suffixes = [".parquet", "_schema.json", ".sql"] + blobs = {s: bucket.blob(f"{cache_prefix}{s}") for s in suffixes} + if not all(blob.exists() for blob in blobs.values()): + logger.info("Cache miss for %s (hash %.8s)", output_basename, sql_hash) + return False + logger.info( + "Cache hit for %s (hash %.8s), restoring from GCS", + output_basename, + sql_hash, + ) + for suffix, blob in blobs.items(): + blob.download_to_filename(str(output_dir / f"{output_basename}{suffix}")) + return True + + def _upload_to_cache( + self, + storage_client, + cache_bucket: str, + sql_hash: str, + output_basename: str, + output_dir: Path, + ) -> None: + """Upload generated artifacts to GCS cache.""" + bucket = storage_client.bucket(cache_bucket) + cache_prefix = f"cache/{sql_hash}/{output_basename}" + for suffix in [".parquet", "_schema.json", ".sql"]: + local_path = output_dir / f"{output_basename}{suffix}" + if local_path.exists(): + bucket.blob(f"{cache_prefix}{suffix}").upload_from_filename( + str(local_path) + ) + logger.info("Cached %s in GCS (hash %.8s)", output_basename, sql_hash) + def execute_sql_query( self, file_path: str ) -> tuple[pd.DataFrame, str, list[bigquery.SchemaField], str]: @@ -375,6 +426,7 @@ def generate_index_data_files( generate_compressed_csv: bool = True, generate_parquet: bool = False, output_dir: Path | None = None, + gcs_cache_bucket: str | None = None, ) -> None: """ Generates index-data files locally by executing queries against @@ -389,6 +441,8 @@ def generate_index_data_files( generate_compressed_csv: Whether to generate compressed CSV files generate_parquet: Whether to generate Parquet files output_dir: Optional directory path for the output files + gcs_cache_bucket: GCS bucket name for caching generated parquet files. + Cache key is the SHA256 hash of the SQL file content. """ scripts_dir = Path(__file__).parent.parent @@ -398,6 +452,12 @@ def generate_index_data_files( if output_dir: output_dir.mkdir(parents=True, exist_ok=True) + storage_client = None + if gcs_cache_bucket and generate_parquet and output_dir: + from google.cloud import storage # noqa: PLC0415 + + storage_client = storage.Client(project=self.project_id) + # Process SQL files from both directories sql_directories = [sql_dir, assets_dir] @@ -409,6 +469,27 @@ def generate_index_data_files( for file_name in Path.iterdir(directory): if str(file_name).endswith(".sql"): file_path = Path(directory) / file_name + + sql_hash = None + if storage_client and gcs_cache_bucket and output_dir: + sql_hash = self._compute_file_hash(str(file_path)) + output_basename = file_path.stem + try: + if self._try_restore_from_cache( + storage_client, + gcs_cache_bucket, + sql_hash, + output_basename, + output_dir, + ): + continue + except Exception: + logger.warning( + "Cache restore failed for %s, falling back to BigQuery", + output_basename, + exc_info=True, + ) + index_df, output_basename, schema, sql_query = ( self.execute_sql_query(str(file_path)) ) @@ -451,6 +532,27 @@ def generate_index_data_files( # Save SQL query to file self.save_sql_query(sql_query, output_basename, output_dir) + if ( + storage_client + and gcs_cache_bucket + and sql_hash + and output_dir + ): + try: + self._upload_to_cache( + storage_client, + gcs_cache_bucket, + sql_hash, + output_basename, + output_dir, + ) + except Exception: + logger.warning( + "Cache upload failed for %s", + output_basename, + exc_info=True, + ) + def retrieve_latest_idc_release_version(self) -> int: """ Retrieves the latest IDC release version. @@ -495,6 +597,11 @@ def retrieve_latest_idc_release_version(self) -> int: default=None, help="Output directory for generated files (default: current directory)", ) + parser.add_argument( + "--cache-bucket", + default=None, + help="GCS bucket name for caching generated parquet files (cache key = SHA256 of SQL file)", + ) parser.add_argument( "--retrieve-latest-idc-release-version", action="store_true", @@ -513,6 +620,7 @@ def retrieve_latest_idc_release_version(self) -> int: generate_compressed_csv=args.generate_csv_archive, generate_parquet=args.generate_parquet, output_dir=args.output_dir, + gcs_cache_bucket=args.cache_bucket, ) elif args.retrieve_latest_idc_release_version: logging.basicConfig(level=logging.ERROR, force=True) diff --git a/scripts/sql/analysis_results_index.sql b/scripts/sql/analysis_results_index.sql index 5db481a..69dafa4 100644 --- a/scripts/sql/analysis_results_index.sql +++ b/scripts/sql/analysis_results_index.sql @@ -26,7 +26,7 @@ SELECT modalities, # description: # timestamp of the last update to the analysis results collection - SAFE_CAST(Updated AS STRING) AS Updated, + SAFE_CAST(Updated AS STRING) AS updated, # description: # license URL for the analysis results collection license_url, @@ -38,7 +38,7 @@ SELECT license_short_name, # description: # detailed description of the analysis results collection - Description, + description, # description: # citation for the analysis results collection that should be used for acknowledgment citation diff --git a/scripts/sql/idc_index.sql b/scripts/sql/idc_index.sql index 72bc564..8c2458a 100644 --- a/scripts/sql/idc_index.sql +++ b/scripts/sql/idc_index.sql @@ -2,30 +2,38 @@ # This is the main metadata table provided by idc-index. Each row corresponds to a DICOM series, and contains # attributes at the collection, patient, study, and series levels. The table also contains download-related # attributes, such as the AWS S3 bucket and URL to download the series. +WITH aux_series AS ( + SELECT + SeriesInstanceUID, + MIN(series_init_idc_version) AS series_init_idc_version, + MAX(series_revised_idc_version) AS series_revised_idc_version + FROM `bigquery-public-data.idc_v24.auxiliary_metadata` + GROUP BY SeriesInstanceUID +) SELECT # collection level attributes # description: # short string with the identifier of the collection the series belongs to - ANY_VALUE(collection_id) AS collection_id, + ANY_VALUE(dicom_all.collection_id) AS collection_id, # description: # this string is not empty if the specific series is # part of an analysis results collection; analysis results can be added to a # given collection over time - ANY_VALUE(analysis_result_id) AS analysis_result_id, + ANY_VALUE(dicom_all.analysis_result_id) AS analysis_result_id, # description: # identifier of the patient within the collection (DICOM attribute) - ANY_VALUE(PatientID) AS PatientID, + ANY_VALUE(dicom_all.PatientID) AS PatientID, # description: # unique identifier of the DICOM series (DICOM attribute) - SeriesInstanceUID, + dicom_all.SeriesInstanceUID, # description: # unique identifier of the DICOM study (DICOM attribute) - ANY_VALUE(StudyInstanceUID) AS StudyInstanceUID, + ANY_VALUE(dicom_all.StudyInstanceUID) AS StudyInstanceUID, # description: # Digital Object Identifier of the dataset that contains the given # series; follow this DOI to learn more about the activity that produced # this series - ANY_VALUE(source_DOI) AS source_DOI, + ANY_VALUE(dicom_all.source_DOI) AS source_DOI, # patient level attributes: # description: # age of the subject at the time of imaging (DICOM attribute) @@ -147,28 +155,38 @@ SELECT COUNT(dicom_all.SOPInstanceUID) AS instanceCount, # description: # short name of the license that applies to this series - ANY_VALUE(license_short_name) as license_short_name, + ANY_VALUE(dicom_all.license_short_name) as license_short_name, + # description: + # IDC data release version number when this series first appeared in IDC (integer, e.g., 1 for v1) + ANY_VALUE(aux.series_init_idc_version) AS series_init_idc_version, + # description: + # IDC data release version number when this series was most recently revised in IDC (integer, e.g., 24 for v24) + ANY_VALUE(aux.series_revised_idc_version) AS series_revised_idc_version, # download related attributes # description: # name of the AWS S3 bucket that contains the series - ANY_VALUE(aws_bucket) AS aws_bucket, + ANY_VALUE(dicom_all.aws_bucket) AS aws_bucket, # description: # unique identifier of the series within the IDC - ANY_VALUE(crdc_series_uuid) AS crdc_series_uuid, + ANY_VALUE(dicom_all.crdc_series_uuid) AS crdc_series_uuid, # series_aws_url will be phased out in favor of constructing URL from bucket+UUID # description: # public AWS S3 URL to download the series in bulk (each instance is a separate file) - ANY_VALUE(CONCAT(series_aws_url,"*")) AS series_aws_url, + ANY_VALUE(CONCAT(dicom_all.series_aws_url,"*")) AS series_aws_url, # description: # total size of the series in megabytes - SUM(SAFE_CAST(instance_size AS float64))/1000000. AS series_size_MB, + SUM(SAFE_CAST(dicom_all.instance_size AS float64))/1000000. AS series_size_MB, FROM `bigquery-public-data.idc_v24.dicom_all` AS dicom_all LEFT JOIN `bigquery-public-data.idc_v24.dicom_metadata_curated` AS dicom_curated ON dicom_all.SOPInstanceUID = dicom_curated.SOPInstanceUID +LEFT JOIN + aux_series AS aux +ON + dicom_all.SeriesInstanceUID = aux.SeriesInstanceUID GROUP BY - SeriesInstanceUID + dicom_all.SeriesInstanceUID ORDER BY collection_id, PatientID, StudyInstanceUID, SeriesInstanceUID diff --git a/src/idc_index_data/__init__.py b/src/idc_index_data/__init__.py index 25226ac..9c90bdd 100644 --- a/src/idc_index_data/__init__.py +++ b/src/idc_index_data/__init__.py @@ -79,7 +79,7 @@ def _load_text(path: Path | None) -> str | None: "idc_index_data/prior_versions_index.parquet" ) -# Build unified metadata dictionary for all 7 indices +# Build unified metadata dictionary for all indices _ALL_INDICES = [ "idc_index", "prior_versions_index", @@ -94,6 +94,7 @@ def _load_text(path: Path | None) -> str | None: "contrast_index", "volume_geometry_index", "rtstruct_index", + "version_metadata_index", ] INDEX_METADATA: dict[str, dict[str, Path | dict[str, object] | str | None]] = {}