Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 37 additions & 21 deletions controller/imager/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ def _start_acquisition(self, latest_message: dict[str, typing.Any]) -> None:
self._active_routine = ImageAcquisitionRoutine(
stopflow.Routine(output_path, acquisition_settings, self._pump, self._camera.camera),
self._mqtt,
metadata,
)
self._active_routine.start()

Expand Down Expand Up @@ -276,8 +277,9 @@ def _initialize_acquisition_directory(
) -> typing.Optional[str]:
"""Make the directory where images will be saved for the current image-acquisition routine.

This also saves the metadata to a `metadata.json` file and initializes a file integrity log in
the directory.
This also initializes a file integrity log in the directory. The metadata is not written
here — it is finalized at the end of acquisition so that `nb_frame` reflects the actual
number of frames captured (which may be less than planned if the run is interrupted).

Args:
base_path: directory under which a subdirectory tree will be created for the image
Expand Down Expand Up @@ -311,13 +313,7 @@ def _initialize_acquisition_directory(
raise ValueError("Chosen id are already in use!")

os.makedirs(acq_dir_path)
loguru.logger.info("Saving metadata...")
metadata_filepath = os.path.join(acq_dir_path, "metadata.json")
with open(metadata_filepath, "w", encoding="utf-8") as metadata_file:
json.dump(metadata, metadata_file, indent=4)
loguru.logger.debug(f"Saved metadata to {metadata_file}: {metadata}")
integrity.create_integrity_file(acq_dir_path)
integrity.append_to_integrity_file(metadata_filepath)
return acq_dir_path


Expand All @@ -327,35 +323,41 @@ class ImageAcquisitionRoutine(threading.Thread):
# TODO(ethanjli): instead of taking an arg of type mqtt.MQTT_CLIENT, just take an arg of
# whatever `mqtt_client.client`'s type is supposed to be. Or maybe we should just initialize
# our own MQTT client in here?
def __init__(self, routine: stopflow.Routine, mqtt_client: mqtt.MQTT_Client) -> None:
def __init__(
self,
routine: stopflow.Routine,
mqtt_client: mqtt.MQTT_Client,
metadata: dict[str, typing.Any],
) -> None:
"""Initialize the thread.

Args:
routine: the image-acquisition routine to run.
mqtt_client: an MQTT client which will be used to broadcast updates.
metadata: the acquisition metadata to write to `metadata.json` once the routine
finishes. `nb_frame` is overwritten with the actual number of frames captured.
"""
super().__init__()
self._routine = routine
self._mqtt_client = mqtt_client.client
self._metadata = metadata

def run(self) -> None:
"""Run a stop-flow image-acquisition routine until completion or interruption."""
self._mqtt_client.publish("status/imager", '{"status":"Started"}')
final_status: str = ""
while True:
if (result := self._routine.run_step()) is None:
if self._routine.interrupted:
loguru.logger.debug("Image-acquisition routine was interrupted!")
self._mqtt_client.publish("status/imager", '{"status":"Interrupted"}')
final_status = '{"status":"Interrupted"}'
break
loguru.logger.debug("Image-acquisition routine ran to completion!")
self._mqtt_client.publish(
"status/imager",
json.dumps(
{
"status": "Done",
"path": self._routine.output_path,
}
),
final_status = json.dumps(
{
"status": "Done",
"path": self._routine.output_path,
}
)
break

Expand All @@ -364,10 +366,9 @@ def run(self) -> None:
try:
integrity.append_to_integrity_file(path)
except FileNotFoundError:
self._mqtt_client.publish(
"status/imager",
final_status = (
f'{{"status":"Image {index + 1}/{self._routine.settings.total_images} '
+ 'WAS NOT CAPTURED! STOPPING THE PROCESS!"}}',
+ 'WAS NOT CAPTURED! STOPPING THE PROCESS!"}}'
)
break

Expand All @@ -389,6 +390,21 @@ def run(self) -> None:
),
)

# Finalize metadata.json with the actual frame count before announcing the final
# status, so that downstream consumers (e.g. the segmenter) cannot observe the
# acquisition as finished without a metadata file on disk.
self._metadata["nb_frame"] = self._routine.progress
metadata_filepath = os.path.join(self._routine.output_path, "metadata.json")
with open(metadata_filepath, "w", encoding="utf-8") as metadata_file:
json.dump(self._metadata, metadata_file, indent=4)
integrity.append_to_integrity_file(metadata_filepath)
loguru.logger.debug(
f"Saved metadata to {metadata_filepath} with nb_frame={self._routine.progress}"
)

if final_status:
self._mqtt_client.publish("status/imager", final_status)

def pause(self) -> None:
"""Pause the acquisition between steps."""
self._routine.pause()
Expand Down
6 changes: 6 additions & 0 deletions controller/imager/stopflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,9 @@ def paused(self) -> bool:
def interrupted(self) -> bool:
"""Check whether the routine was manually interrupted."""
return self._interrupted.is_set()

@property
def progress(self) -> int:
"""The number of images actually captured so far."""
with self._progress_lock:
return self._progress
Loading