Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions gateway/sds_gateway/api_methods/helpers/temporal_filtering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import re

from django.db.models import QuerySet

from opensearchpy.exceptions import NotFoundError as OpenSearchNotFoundError
from sds_gateway.api_methods.models import CaptureType, Capture, File, DRF_RF_FILENAME_REGEX_STR
from sds_gateway.api_methods.utils.opensearch_client import get_opensearch_client
from sds_gateway.api_methods.utils.relationship_utils import get_capture_files
from loguru import logger as log

# Digital RF spec: rf@SECONDS.MILLISECONDS.h5 (e.g. rf@1396379502.000.h5)
# https://github.com/MITHaystack/digital_rf
DRF_RF_FILENAME_PATTERN = re.compile(
r"^rf@(\d+)\.(\d+)\.h5$",
re.IGNORECASE,
)


def drf_rf_filename_from_ms(ms: int) -> str:
"""Format ms as DRF rf data filename (canonical for range queries)."""
return f"rf@{ms // 1000}.{ms % 1000:03d}.h5"


def drf_rf_filename_to_ms(file_name: str) -> int | None:
"""
Parse DRF rf data filename to milliseconds.
Handles rf@SECONDS.MILLISECONDS.h5; fractional part padded to 3 digits.
"""
name = file_name.strip()
match = DRF_RF_FILENAME_PATTERN.match(name)
if not match:
return None
try:
seconds = int(match.group(1))
frac = match.group(2).ljust(3, "0")[:3]
return seconds * 1000 + int(frac)
except (ValueError, TypeError):
return None


def _catch_capture_type_error(capture_type: CaptureType) -> None:
if capture_type != CaptureType.DigitalRF:
msg = "Only DigitalRF captures are supported for temporal filtering."
log.error(msg)
raise ValueError(msg)


def get_capture_bounds(capture_type: CaptureType, capture_uuid: str) -> tuple[int, int]:
"""Get start and end bounds for capture from opensearch."""
_catch_capture_type_error(capture_type)

client = get_opensearch_client()
index = f"captures-{capture_type}"

try:
response = client.get(index=index, id=capture_uuid)
except OpenSearchNotFoundError as e:
raise ValueError(
f"Capture {capture_uuid} not found in OpenSearch index {index}"
) from e

if not response.get("found"):
raise ValueError(
f"Capture {capture_uuid} not found in OpenSearch index {index}"
)

source = response.get("_source", {})
search_props = source.get("search_props", {})
start_time = search_props.get("start_time", 0)
end_time = search_props.get("end_time", 0)
return start_time, end_time


def get_file_cadence(capture_type: CaptureType, capture: Capture) -> int:
"""Get the file cadence in milliseconds. OpenSearch bounds are in seconds."""
_catch_capture_type_error(capture_type)

capture_uuid = str(capture.uuid)
start_time, end_time = get_capture_bounds(capture_type, capture_uuid)

count = capture.get_drf_data_files_stats()["total_count"]
if count == 0:
return 0
duration_sec = end_time - start_time
duration_ms = duration_sec * 1000
return max(1, int(duration_ms / count))


def filter_capture_data_files_selection_bounds(
capture_type: CaptureType,
capture: Capture,
start_time: int, # relative ms from start of capture (from UI)
end_time: int, # relative ms from start of capture (from UI)
) -> QuerySet[File]:
"""Filter the capture file selection bounds to the given start and end times."""
_catch_capture_type_error(capture_type)
epoch_start_sec, _ = get_capture_bounds(capture_type, str(capture.uuid))
epoch_start_ms = epoch_start_sec * 1000
start_ms = epoch_start_ms + start_time
end_ms = epoch_start_ms + end_time

start_file_name = drf_rf_filename_from_ms(start_ms)
end_file_name = drf_rf_filename_from_ms(end_ms)

data_files = capture.get_drf_data_files_queryset()
return data_files.filter(
name__gte=start_file_name,
name__lte=end_file_name,
).order_by("name")

def get_capture_files_with_temporal_filter(
capture_type: CaptureType,
capture: Capture,
start_time: int | None = None, # milliseconds since start of capture
end_time: int | None = None,
) -> QuerySet[File]:
"""Get the capture files with temporal filtering."""
_catch_capture_type_error(capture_type)

if start_time is None or end_time is None:
log.warning("Start or end time is None, returning all capture files without temporal filtering")
return get_capture_files(capture)

# get non-data files
non_data_files = get_capture_files(capture).exclude(name__regex=DRF_RF_FILENAME_REGEX_STR)

# get data files with temporal filtering
data_files = filter_capture_data_files_selection_bounds(
capture_type, capture, start_time, end_time
)

# return all files
return non_data_files.union(data_files)
31 changes: 31 additions & 0 deletions gateway/sds_gateway/api_methods/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from blake3 import blake3 as Blake3 # noqa: N812
from django.conf import settings
from django.db import models
from django.db.models import Sum
from django.db.models import Count
from django.db.models import ProtectedError
from django.db.models import QuerySet
from django.db.models.signals import post_save
Expand All @@ -27,6 +29,8 @@

log = logging.getLogger(__name__)

DRF_RF_FILENAME_REGEX_STR = r"^rf@\d+\.\d+\.h5$"


class KeywordNameField(models.CharField):
"""
Expand Down Expand Up @@ -419,6 +423,33 @@ def get_capture(self) -> dict[str, Any]:
"owner": self.owner,
}


def get_drf_data_files_queryset(self) -> QuerySet[File]:
"""DRF data files (rf@*.h5) for this capture (M2M + FK)."""
if self.capture_type != CaptureType.DigitalRF:
log.warning("Capture %s is not a DigitalRF capture", self.uuid)
return File.objects.none()

# Local import avoids circular import (relationship_utils imports Capture).
from sds_gateway.api_methods.utils.relationship_utils import get_capture_files

return get_capture_files(self, include_deleted=False).filter(
name__regex=DRF_RF_FILENAME_REGEX_STR,
)

def get_drf_data_files_stats(self) -> dict[str, int]:
"""Count + total size in one query; cached per instance. File PK is ``uuid`` — use ``pk``."""
if hasattr(self, "_drf_data_files_stats_cache"):
return self._drf_data_files_stats_cache

qs = self.get_drf_data_files_queryset()
agg = qs.aggregate(total_count=Count("pk"), total_size=Sum("size"))
self._drf_data_files_stats_cache = {
"total_count": agg["total_count"] or 0,
"total_size": int(agg["total_size"] or 0),
}
return self._drf_data_files_stats_cache

def get_opensearch_frequency_metadata(self) -> dict[str, Any]:
"""
Query OpenSearch for frequency metadata for this specific capture.
Expand Down
Loading
Loading