Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ jobs:
uses: actions/setup-python@v6
with:
python-version: "3.12"
cache: "pip"
cache-dependency-path: requirements-cd.txt

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install db-dtypes duckdb google-cloud-bigquery google-cloud-bigquery-storage pandas pyarrow requests
pip install -r requirements-cd.txt

- name: Authorize Google Cloud
uses: google-github-actions/auth@v3
Expand All @@ -48,7 +50,8 @@ jobs:
run: |
python scripts/python/idc_index_data_manager.py \
--generate-parquet \
--output-dir release_artifacts
--output-dir release_artifacts \
--cache-bucket idc-index-data-cache
env:
GCP_PROJECT: ${{ env.GCP_PROJECT }}

Expand Down
10 changes: 9 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,18 @@ jobs:
with:
extra_args: --hook-stage manual --all-files

- name: Cache nox pylint virtualenv
uses: actions/cache@v4
with:
path: .nox/pylint
key:
nox-pylint-${{ runner.os }}-${{ hashFiles('pyproject.toml',
'noxfile.py') }}

- name: Run PyLint
run: |
echo "::add-matcher::$GITHUB_WORKSPACE/.github/matchers/pylint.json"
pipx run nox -s pylint
pipx run nox -s pylint -r

checks:
name: Check Python ${{ matrix.python-version }} on ${{ matrix.runs-on }}
Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ repos:
- pytest
- pandas-stubs
- google-cloud-bigquery
- google-cloud-storage

- repo: https://github.com/codespell-project/codespell
rev: "v2.4.2"
Expand Down
20 changes: 20 additions & 0 deletions assets/version_metadata_index.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# table-description:
# This table contains metadata about each IDC data release version. Each row
# corresponds to one IDC version and captures when that version was created.
# This index can be used to correlate data in other indexes (which include
# idc_version columns) with the corresponding release timestamps.

SELECT
# description:
# IDC version number identifying the data release
idc_version,

# description:
# timestamp when this IDC version was created
version_timestamp

FROM
`bigquery-public-data.idc_v24.version_metadata`

ORDER BY
idc_version
8 changes: 8 additions & 0 deletions requirements-cd.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
db-dtypes
duckdb
google-cloud-bigquery
google-cloud-bigquery-storage
google-cloud-storage
pandas
pyarrow
requests
108 changes: 108 additions & 0 deletions scripts/python/idc_index_data_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# mypy: ignore-errors
from __future__ import annotations

import hashlib
import json
import logging
import os
Expand Down Expand Up @@ -241,6 +243,55 @@ def _extract_column_name(column_def: str) -> str | None:

return None

@staticmethod
def _compute_file_hash(file_path: str) -> str:
with Path(file_path).open("rb") as f:
return hashlib.sha256(f.read()).hexdigest()

def _try_restore_from_cache(
self,
storage_client,
cache_bucket: str,
sql_hash: str,
output_basename: str,
output_dir: Path,
) -> bool:
"""Download cached artifacts from GCS. Returns True on cache hit."""
bucket = storage_client.bucket(cache_bucket)
cache_prefix = f"cache/{sql_hash}/{output_basename}"
suffixes = [".parquet", "_schema.json", ".sql"]
blobs = {s: bucket.blob(f"{cache_prefix}{s}") for s in suffixes}
if not all(blob.exists() for blob in blobs.values()):
logger.info("Cache miss for %s (hash %.8s)", output_basename, sql_hash)
return False
logger.info(
"Cache hit for %s (hash %.8s), restoring from GCS",
output_basename,
sql_hash,
)
for suffix, blob in blobs.items():
blob.download_to_filename(str(output_dir / f"{output_basename}{suffix}"))
return True

def _upload_to_cache(
self,
storage_client,
cache_bucket: str,
sql_hash: str,
output_basename: str,
output_dir: Path,
) -> None:
"""Upload generated artifacts to GCS cache."""
bucket = storage_client.bucket(cache_bucket)
cache_prefix = f"cache/{sql_hash}/{output_basename}"
for suffix in [".parquet", "_schema.json", ".sql"]:
local_path = output_dir / f"{output_basename}{suffix}"
if local_path.exists():
bucket.blob(f"{cache_prefix}{suffix}").upload_from_filename(
str(local_path)
)
logger.info("Cached %s in GCS (hash %.8s)", output_basename, sql_hash)

def execute_sql_query(
self, file_path: str
) -> tuple[pd.DataFrame, str, list[bigquery.SchemaField], str]:
Expand Down Expand Up @@ -375,6 +426,7 @@ def generate_index_data_files(
generate_compressed_csv: bool = True,
generate_parquet: bool = False,
output_dir: Path | None = None,
gcs_cache_bucket: str | None = None,
) -> None:
"""
Generates index-data files locally by executing queries against
Expand All @@ -389,6 +441,8 @@ def generate_index_data_files(
generate_compressed_csv: Whether to generate compressed CSV files
generate_parquet: Whether to generate Parquet files
output_dir: Optional directory path for the output files
gcs_cache_bucket: GCS bucket name for caching generated parquet files.
Cache key is the SHA256 hash of the SQL file content.
"""

scripts_dir = Path(__file__).parent.parent
Expand All @@ -398,6 +452,12 @@ def generate_index_data_files(
if output_dir:
output_dir.mkdir(parents=True, exist_ok=True)

storage_client = None
if gcs_cache_bucket and generate_parquet and output_dir:
from google.cloud import storage # noqa: PLC0415

storage_client = storage.Client(project=self.project_id)

# Process SQL files from both directories
sql_directories = [sql_dir, assets_dir]

Expand All @@ -409,6 +469,27 @@ def generate_index_data_files(
for file_name in Path.iterdir(directory):
if str(file_name).endswith(".sql"):
file_path = Path(directory) / file_name

sql_hash = None
if storage_client and gcs_cache_bucket and output_dir:
sql_hash = self._compute_file_hash(str(file_path))
output_basename = file_path.stem
try:
if self._try_restore_from_cache(
storage_client,
gcs_cache_bucket,
sql_hash,
output_basename,
output_dir,
):
continue
except Exception:
logger.warning(
"Cache restore failed for %s, falling back to BigQuery",
output_basename,
exc_info=True,
)

index_df, output_basename, schema, sql_query = (
self.execute_sql_query(str(file_path))
)
Expand Down Expand Up @@ -451,6 +532,27 @@ def generate_index_data_files(
# Save SQL query to file
self.save_sql_query(sql_query, output_basename, output_dir)

if (
storage_client
and gcs_cache_bucket
and sql_hash
and output_dir
):
try:
self._upload_to_cache(
storage_client,
gcs_cache_bucket,
sql_hash,
output_basename,
output_dir,
)
except Exception:
logger.warning(
"Cache upload failed for %s",
output_basename,
exc_info=True,
)

def retrieve_latest_idc_release_version(self) -> int:
"""
Retrieves the latest IDC release version.
Expand Down Expand Up @@ -495,6 +597,11 @@ def retrieve_latest_idc_release_version(self) -> int:
default=None,
help="Output directory for generated files (default: current directory)",
)
parser.add_argument(
"--cache-bucket",
default=None,
help="GCS bucket name for caching generated parquet files (cache key = SHA256 of SQL file)",
)
parser.add_argument(
"--retrieve-latest-idc-release-version",
action="store_true",
Expand All @@ -513,6 +620,7 @@ def retrieve_latest_idc_release_version(self) -> int:
generate_compressed_csv=args.generate_csv_archive,
generate_parquet=args.generate_parquet,
output_dir=args.output_dir,
gcs_cache_bucket=args.cache_bucket,
)
elif args.retrieve_latest_idc_release_version:
logging.basicConfig(level=logging.ERROR, force=True)
Expand Down
4 changes: 2 additions & 2 deletions scripts/sql/analysis_results_index.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ SELECT
modalities,
# description:
# timestamp of the last update to the analysis results collection
SAFE_CAST(Updated AS STRING) AS Updated,
SAFE_CAST(Updated AS STRING) AS updated,
# description:
# license URL for the analysis results collection
license_url,
Expand All @@ -38,7 +38,7 @@ SELECT
license_short_name,
# description:
# detailed description of the analysis results collection
Description,
description,
# description:
# citation for the analysis results collection that should be used for acknowledgment
citation
Expand Down
42 changes: 30 additions & 12 deletions scripts/sql/idc_index.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,38 @@
# This is the main metadata table provided by idc-index. Each row corresponds to a DICOM series, and contains
# attributes at the collection, patient, study, and series levels. The table also contains download-related
# attributes, such as the AWS S3 bucket and URL to download the series.
WITH aux_series AS (
SELECT
SeriesInstanceUID,
MIN(series_init_idc_version) AS series_init_idc_version,
MAX(series_revised_idc_version) AS series_revised_idc_version
FROM `bigquery-public-data.idc_v24.auxiliary_metadata`
GROUP BY SeriesInstanceUID
)
SELECT
# collection level attributes
# description:
# short string with the identifier of the collection the series belongs to
ANY_VALUE(collection_id) AS collection_id,
ANY_VALUE(dicom_all.collection_id) AS collection_id,
# description:
# this string is not empty if the specific series is
# part of an analysis results collection; analysis results can be added to a
# given collection over time
ANY_VALUE(analysis_result_id) AS analysis_result_id,
ANY_VALUE(dicom_all.analysis_result_id) AS analysis_result_id,
# description:
# identifier of the patient within the collection (DICOM attribute)
ANY_VALUE(PatientID) AS PatientID,
ANY_VALUE(dicom_all.PatientID) AS PatientID,
# description:
# unique identifier of the DICOM series (DICOM attribute)
SeriesInstanceUID,
dicom_all.SeriesInstanceUID,
# description:
# unique identifier of the DICOM study (DICOM attribute)
ANY_VALUE(StudyInstanceUID) AS StudyInstanceUID,
ANY_VALUE(dicom_all.StudyInstanceUID) AS StudyInstanceUID,
# description:
# Digital Object Identifier of the dataset that contains the given
# series; follow this DOI to learn more about the activity that produced
# this series
ANY_VALUE(source_DOI) AS source_DOI,
ANY_VALUE(dicom_all.source_DOI) AS source_DOI,
# patient level attributes:
# description:
# age of the subject at the time of imaging (DICOM attribute)
Expand Down Expand Up @@ -147,28 +155,38 @@ SELECT
COUNT(dicom_all.SOPInstanceUID) AS instanceCount,
# description:
# short name of the license that applies to this series
ANY_VALUE(license_short_name) as license_short_name,
ANY_VALUE(dicom_all.license_short_name) as license_short_name,
# description:
# IDC data release version number when this series first appeared in IDC (integer, e.g., 1 for v1)
ANY_VALUE(aux.series_init_idc_version) AS series_init_idc_version,
# description:
# IDC data release version number when this series was most recently revised in IDC (integer, e.g., 24 for v24)
ANY_VALUE(aux.series_revised_idc_version) AS series_revised_idc_version,
# download related attributes
# description:
# name of the AWS S3 bucket that contains the series
ANY_VALUE(aws_bucket) AS aws_bucket,
ANY_VALUE(dicom_all.aws_bucket) AS aws_bucket,
# description:
# unique identifier of the series within the IDC
ANY_VALUE(crdc_series_uuid) AS crdc_series_uuid,
ANY_VALUE(dicom_all.crdc_series_uuid) AS crdc_series_uuid,
# series_aws_url will be phased out in favor of constructing URL from bucket+UUID
# description:
# public AWS S3 URL to download the series in bulk (each instance is a separate file)
ANY_VALUE(CONCAT(series_aws_url,"*")) AS series_aws_url,
ANY_VALUE(CONCAT(dicom_all.series_aws_url,"*")) AS series_aws_url,
# description:
# total size of the series in megabytes
SUM(SAFE_CAST(instance_size AS float64))/1000000. AS series_size_MB,
SUM(SAFE_CAST(dicom_all.instance_size AS float64))/1000000. AS series_size_MB,
FROM
`bigquery-public-data.idc_v24.dicom_all` AS dicom_all
LEFT JOIN
`bigquery-public-data.idc_v24.dicom_metadata_curated` AS dicom_curated
ON
dicom_all.SOPInstanceUID = dicom_curated.SOPInstanceUID
LEFT JOIN
aux_series AS aux
ON
dicom_all.SeriesInstanceUID = aux.SeriesInstanceUID
GROUP BY
SeriesInstanceUID
dicom_all.SeriesInstanceUID
ORDER BY
collection_id, PatientID, StudyInstanceUID, SeriesInstanceUID
3 changes: 2 additions & 1 deletion src/idc_index_data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def _load_text(path: Path | None) -> str | None:
"idc_index_data/prior_versions_index.parquet"
)

# Build unified metadata dictionary for all 7 indices
# Build unified metadata dictionary for all indices
_ALL_INDICES = [
"idc_index",
"prior_versions_index",
Expand All @@ -94,6 +94,7 @@ def _load_text(path: Path | None) -> str | None:
"contrast_index",
"volume_geometry_index",
"rtstruct_index",
"version_metadata_index",
]

INDEX_METADATA: dict[str, dict[str, Path | dict[str, object] | str | None]] = {}
Expand Down
Loading