Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions software/control/core/multi_point_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from squid.abc import CameraFrame

if TYPE_CHECKING:
from control.core.qc import FOVMetrics, PolicyDecision
from control.slack_notifier import TimepointStats, AcquisitionStats


Expand Down Expand Up @@ -125,3 +126,6 @@ class MultiPointControllerFunctions:
# Zarr frame written callback - called when subprocess completes writing a frame
# Args: (fov, time_point, z_index, channel_name, region_idx)
signal_zarr_frame_written: Callable[[int, int, int, str, int], None] = lambda *a, **kw: None
# QC callbacks
signal_qc_metrics_updated: Callable[["FOVMetrics"], None] = lambda *a, **kw: None
signal_qc_policy_decision: Callable[["PolicyDecision"], None] = lambda *a, **kw: None
93 changes: 90 additions & 3 deletions software/control/core/multi_point_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
ensure_plate_resolution_in_well_resolutions,
)
from control.core.backpressure import BackpressureController, BackpressureValues
from control.core.qc import QCConfig, QCJob, QCPolicy, QCPolicyConfig, QCResult, TimepointMetricsStore
from squid.config import CameraPixelFormat

# Module-level logger for static methods
Expand Down Expand Up @@ -86,6 +87,8 @@ def __init__(
slack_notifier=None,
prewarmed_job_runner: Optional[JobRunner] = None,
prewarmed_bp_values: Optional["BackpressureValues"] = None,
qc_config: Optional[QCConfig] = None,
qc_policy_config: Optional[QCPolicyConfig] = None,
):
self._log = squid.logging.get_logger(__class__.__name__)
self._timing = utils.TimingManager("MultiPointWorker Timer Manager")
Expand Down Expand Up @@ -161,6 +164,14 @@ def __init__(
self.num_fovs = 0
self.total_scans = 0
self._last_time_point_z_pos = {}
self._qc_config = qc_config or QCConfig()
self._qc_policy_config = qc_policy_config or QCPolicyConfig()
if self._qc_policy_config.enabled and not self._qc_config.enabled:
self._log.warning("QC policy is enabled but QC metrics collection is disabled — policy checks will not run")
self._qc_policy = QCPolicy(self._qc_policy_config) if self._qc_policy_config.enabled else None
self._metrics_store: Optional[TimepointMetricsStore] = None
self._qc_pause_event = threading.Event()
self._qc_pause_event.set() # starts unpaused
self.scan_region_fov_coords_mm = (
acquisition_parameters.scan_position_information.scan_region_fov_coords_mm.copy()
)
Expand Down Expand Up @@ -264,7 +275,7 @@ def __init__(
# For now, use 1 runner per job class. There's no real reason/rationale behind this, though. The runners
# can all run any job type. But 1 per is a reasonable arbitrary arrangement while we don't have a lot
# of job types. If we have a lot of custom jobs, this could cause problems via resource hogging.
self._job_runners: List[Tuple[Type[Job], JobRunner]] = []
self._job_runners: List[Tuple[Type[Job], Optional[JobRunner]]] = []
self._log.info(f"Acquisition.USE_MULTIPROCESSING = {Acquisition.USE_MULTIPROCESSING}")

# Get the current log file path to share with subprocess workers
Expand Down Expand Up @@ -370,6 +381,10 @@ def __init__(
# Subprocess starts warming up in background - don't block here

self._job_runners.append((job_class, job_runner))

if self._qc_config.enabled:
self._job_runners.append((QCJob, None))
Comment on lines +385 to +386
Comment on lines +385 to +386

self._abort_on_failed_job = abort_on_failed_jobs
self._first_job_dispatched = False # Track if we've waited for subprocess warmup

Expand Down Expand Up @@ -644,6 +659,8 @@ def run_single_time_point(self):
self._timepoint_fov_count = 0
self._laser_af_successes = 0
self._laser_af_failures = 0
if self._qc_config.enabled:
self._metrics_store = TimepointMetricsStore(timepoint_index=self.time_point)
self.microcontroller.enable_joystick(False)

self._log.debug("multipoint acquisition - time point " + str(self.time_point + 1))
Expand All @@ -663,6 +680,26 @@ def run_single_time_point(self):
with self._timing.get_timer("run_coordinate_acquisition"):
self.run_coordinate_acquisition(current_path)

# QC policy check
if self._qc_policy is not None and self._qc_policy_config.check_after_timepoint:
if self._metrics_store is not None:
try:
decision = self._qc_policy.check_timepoint(self._metrics_store)
self.callbacks.signal_qc_policy_decision(decision)
Comment on lines +683 to +688
if decision.should_pause:
self._log.info(
f"QC policy flagged {len(decision.flagged_fovs)} FOVs — "
f"pausing acquisition. Call resume_from_qc_pause() to continue."
)
self._qc_pause_event.clear()
# Block until resumed or aborted
while not self._qc_pause_event.is_set():
if self.abort_requested_fn():
break
self._qc_pause_event.wait(timeout=0.5)
except Exception as e:
self._log.error(f"QC policy evaluation failed for timepoint {self.time_point}: {e}")

# Save plate view for this timepoint
if self._generate_downsampled_views and self._downsampled_view_manager is not None:
# Wait for pending downsampled view jobs to complete
Expand All @@ -678,6 +715,14 @@ def run_single_time_point(self):
# finished region scan
self.coordinates_pd.to_csv(os.path.join(current_path, "coordinates.csv"), index=False, header=True)

# Save QC metrics
if self._qc_config.enabled and self._metrics_store is not None:
qc_csv_path = os.path.join(current_path, "qc_metrics.csv")
try:
self._metrics_store.save(qc_csv_path)
except OSError as e:
self._log.error(f"Failed to save QC metrics to {qc_csv_path}: {e}")

# Send Slack timepoint notification via callback (allows main thread to capture screenshot)
if self._slack_notifier is not None:
try:
Expand Down Expand Up @@ -805,6 +850,23 @@ def _summarize_runner_outputs(self, drain_all: bool = False) -> SummarizeResult:

return SummarizeResult(none_failed=none_failed, had_results=had_results)

def resume_from_qc_pause(self) -> None:
"""Resume acquisition after QC policy pause. Called by UI."""
self._log.info("Resuming acquisition from QC pause")
self._qc_pause_event.set()

def _handle_qc_result(self, qc_result: QCResult) -> None:
"""Store QC metrics and emit signal."""
if qc_result.error:
self._log.error(
f"QC metric calculation failed for region={qc_result.metrics.fov_id.region_id} "
f"fov={qc_result.metrics.fov_id.fov_index}: {qc_result.error}"
)
# Always store metrics (positional data is valid even on partial failure)
if self._metrics_store is not None:
self._metrics_store.add(qc_result.metrics)
self.callbacks.signal_qc_metrics_updated(qc_result.metrics)

def _summarize_job_result(self, job_result: JobResult) -> bool:
"""
Prints a summary, then returns True if the result was successful or False otherwise.
Expand Down Expand Up @@ -833,6 +895,9 @@ def _summarize_job_result(self, job_result: JobResult) -> bool:
elif isinstance(job_result.result, ZarrWriteResult):
r = job_result.result
self.callbacks.signal_zarr_frame_written(r.fov, r.time_point, r.z_index, r.channel_name, r.region_idx)
# Handle QCResult - store metrics and emit signal
elif isinstance(job_result.result, QCResult):
self._handle_qc_result(job_result.result)
return True

def _handle_downsampled_view_result(self, result: DownsampledViewResult) -> None:
Expand Down Expand Up @@ -888,9 +953,31 @@ def _create_job(self, job_class: Type[Job], info: CaptureInfo, image: np.ndarray
"""
if job_class == DownsampledViewJob:
return self._create_downsampled_view_job(info, image)
elif job_class == QCJob:
return self._create_qc_job(info, image)
else:
return job_class(capture_info=info, capture_image=JobImage(image_array=image))

def _create_qc_job(self, info: CaptureInfo, image: np.ndarray) -> Optional[QCJob]:
"""Create a QCJob for the given capture.

Returns None for non-canonical frames to avoid overwriting metrics.
Only the configured channel and z-slice is used for QC.
"""
if info.z_index != self._qc_config.qc_z_index or info.configuration_idx != self._qc_config.qc_channel_index:
return None
previous_z = None
if self._qc_config.calculate_z_diff_from_last_timepoint and self.time_point > 0:
fov_key = (info.region_id, info.fov)
if fov_key in self._last_time_point_z_pos:
previous_z = self._last_time_point_z_pos[fov_key] * 1000 # mm -> um
return QCJob(
capture_info=info,
capture_image=JobImage(image_array=image),
qc_config=self._qc_config,
previous_timepoint_z=previous_z,
)

def _create_downsampled_view_job(self, info: CaptureInfo, image: np.ndarray) -> Optional[DownsampledViewJob]:
"""Create a DownsampledViewJob for the given capture.

Expand Down Expand Up @@ -1424,9 +1511,9 @@ def _image_callback(self, camera_frame: CameraFrame):
return
else:
try:
# NOTE(imo): We don't have any way of people using results, so for now just
# grab and ignore it.
result = job.run()
if isinstance(result, QCResult):
self._handle_qc_result(result)
except Exception:
self._log.exception("Failed to execute job, abandoning acquisition!")
self.request_abort_fn()
Expand Down
Loading
Loading