diff --git a/software/control/_def.py b/software/control/_def.py index 182a0683e..a20e482e6 100644 --- a/software/control/_def.py +++ b/software/control/_def.py @@ -1221,6 +1221,22 @@ def get_wellplate_settings(wellplate_format): XERYON_OBJECTIVE_SWITCHER_POS_2 = ["20x", "40x", "60x"] XERYON_OBJECTIVE_SWITCHER_POS_2_OFFSET_MM = 2 +# Motorized 4-position objective turret (NiMotion RS-485 stepper, Modbus-RTU) +USE_OBJECTIVE_TURRET = False +OBJECTIVE_TURRET_SERIAL_NUMBER = "" +OBJECTIVE_TURRET_SLAVE_ID = 1 +OBJECTIVE_TURRET_BAUDRATE = 115200 +# Objective name -> turret slot index (1..4). Override per machine in .ini. +OBJECTIVE_TURRET_POSITIONS = {"4x": 1, "10x": 2, "20x": 3, "40x": 4} + + +def _validate_objective_changer_flags(use_xeryon: bool, use_turret: bool) -> None: + if use_xeryon and use_turret: + raise ValueError( + "USE_XERYON and USE_OBJECTIVE_TURRET are mutually exclusive " "(set only one to True in the machine .ini)" + ) + + # fluidics RUN_FLUIDICS = False FLUIDICS_CONFIG_PATH = "./merfish_config/MERFISH_config.json" @@ -1325,6 +1341,8 @@ class SlackNotifications: myclass = locals()[classkey] populate_class_from_dict(myclass, pop_items) + _validate_objective_changer_flags(USE_XERYON, USE_OBJECTIVE_TURRET) + with open("cache/config_file_path.txt", "w") as file: file.write(config_files[0]) CACHED_CONFIG_FILE_PATH = config_files[0] @@ -1337,6 +1355,7 @@ class SlackNotifications: sys.exit(1) log.info("load machine-specific configuration") exec(open(config_files[0]).read()) + _validate_objective_changer_flags(USE_XERYON, USE_OBJECTIVE_TURRET) else: log.error("machine-specific configuration not present, the program will exit") sys.exit(1) diff --git a/software/control/gui_hcs.py b/software/control/gui_hcs.py index 07414e56e..2caa562ca 100644 --- a/software/control/gui_hcs.py +++ b/software/control/gui_hcs.py @@ -878,10 +878,7 @@ def load_widgets(self): if self.piezo: self.piezoWidget = widgets.PiezoWidget(self.piezo) - if USE_XERYON: - self.objectivesWidget = widgets.ObjectivesWidget(self.objectiveStore, self.objective_changer) - else: - self.objectivesWidget = widgets.ObjectivesWidget(self.objectiveStore) + self.objectivesWidget = widgets.ObjectivesWidget(self.objectiveStore, self.objective_changer) if self.emission_filter_wheel: self.filterControllerWidget = widgets.FilterControllerWidget( @@ -1928,6 +1925,28 @@ def openWorkflowRunner(self): self.workflowRunnerDialog.raise_() self.workflowRunnerDialog.activateWindow() + def resetObjectiveTurret(self): + """Clear faults, re-enable the motor, re-home, and rotate back to the current objective.""" + if self.objective_changer is None: + return + self.log.info("Resetting objective turret") + try: + self.objective_changer.clear_alarm() + self.objective_changer.enable() + # Re-home so the position tracker matches the physical slot: avoids short-circuiting the + # rotate in move_to_objective if the tracker is stale from a fault mid-rotation. + self.objective_changer.home() + current = self.objectiveStore.current_objective + if current: + self.objective_changer.move_to_objective(current) + except Exception as exc: + self.log.exception("Reset of objective turret failed") + QMessageBox.warning( + self, + "Reset Objective Turret", + f"Failed to reset objective turret:\n{exc}", + ) + def _get_actual_acquisition_path(self) -> str: """Get the actual acquisition path (base_path + experiment_ID with timestamp).""" if hasattr(self, "multipointController") and self.multipointController: @@ -2602,8 +2621,9 @@ def _cleanup_common(self, for_restart: bool = False): Args: for_restart: If True, wrap operations in try-except to ensure cleanup completes. - Z retraction and objective reset still run when using Xeryon - (Xeryon must be zeroed before re-init), but are skipped otherwise. + Z retraction and objective-changer teardown still run when using Xeryon + (Xeryon must be zeroed before re-init). For a turret or a manual setup, + restart skips this block (turret re-inits cleanly from any position). """ context = "restart" if for_restart else "shutdown" @@ -2707,9 +2727,8 @@ def _cleanup_common(self, for_restart: bool = False): else: raise - # Retract Z and reset objective changer on full shutdown. - # On restart, only retract Z and reset if Xeryon objective changer is present - # (Xeryon must be zeroed before re-init; Z must retract first for safety). + # Z-retract + Xeryon zero: needed on full shutdown, and on Xeryon restart + # (Xeryon must be zeroed before re-init). if not for_restart or USE_XERYON: z_retracted = False try: @@ -2726,10 +2745,22 @@ def _cleanup_common(self, for_restart: bool = False): self.objective_changer.moveToZero() except Exception: if for_restart: - self.log.exception(f"Error resetting objective changer during {context}") + self.log.exception(f"Error resetting Xeryon during {context}") else: raise + # Turret close: always release the serial port so the new process (on restart) + # can acquire it, and so the motor is de-energized on full shutdown. Independent + # of Z-retract success — the close path must run even if Z retract failed. + if USE_OBJECTIVE_TURRET and self.objective_changer: + try: + self.objective_changer.close() + except Exception: + if for_restart: + self.log.exception(f"Error closing turret during {context}") + else: + raise + if not for_restart: self.microcontroller.turn_off_all_pid() @@ -2775,7 +2806,8 @@ def _cleanup_common(self, for_restart: bool = False): raise def _cleanup_for_restart(self): - """Clean up hardware and resources for restart. Retracts Z and resets Xeryon if present.""" + """Clean up hardware and resources for restart. Retracts Z and zeros Xeryon if present + (skipped for turret or manual setups; the turret re-inits cleanly from any position).""" self._cleanup_common(for_restart=True) def closeEvent(self, event): diff --git a/software/control/microscope.py b/software/control/microscope.py index 78b8eb782..1054d2133 100644 --- a/software/control/microscope.py +++ b/software/control/microscope.py @@ -38,6 +38,14 @@ else: ObjectiveChanger2PosController = None +if control._def.USE_OBJECTIVE_TURRET: + from control.objective_turret_controller import ( + ObjectiveTurret4PosController, + ObjectiveTurret4PosControllerSimulation, + ) +else: + ObjectiveTurret4PosController = None + if control._def.RUN_FLUIDICS: from control.fluidics import Fluidics else: @@ -129,6 +137,19 @@ def build_from_global_config( if not objective_changer_simulated else ObjectiveChanger2PosController_Simulation(sn=control._def.XERYON_SERIAL_NUMBER, stage=stage) ) + elif control._def.USE_OBJECTIVE_TURRET: + turret_kwargs = dict( + serial_number=control._def.OBJECTIVE_TURRET_SERIAL_NUMBER, + slave_id=control._def.OBJECTIVE_TURRET_SLAVE_ID, + baudrate=control._def.OBJECTIVE_TURRET_BAUDRATE, + positions=control._def.OBJECTIVE_TURRET_POSITIONS, + stage=stage, + ) + objective_changer = ( + ObjectiveTurret4PosController(**turret_kwargs) + if not objective_changer_simulated + else ObjectiveTurret4PosControllerSimulation(**turret_kwargs) + ) camera_focus = None if control._def.SUPPORT_LASER_AUTOFOCUS: @@ -184,7 +205,7 @@ def __init__( nl5: Optional[NL5] = None, cellx: Optional[serial_peripherals.CellX] = None, emission_filter_wheel: Optional[AbstractFilterWheelController] = None, - objective_changer: Optional[ObjectiveChanger2PosController] = None, + objective_changer: Optional[object] = None, camera_focus: Optional[AbstractCamera] = None, fluidics: Optional[Fluidics] = None, piezo_stage: Optional[PiezoStage] = None, @@ -470,12 +491,14 @@ def _prepare_for_use(self, skip_init: bool = False): raise if self.addons.objective_changer: - self.addons.objective_changer.home() - self.addons.objective_changer.setSpeed(control._def.XERYON_SPEED) - if control._def.DEFAULT_OBJECTIVE in control._def.XERYON_OBJECTIVE_SWITCHER_POS_1: - self.addons.objective_changer.moveToPosition1(move_z=False) - elif control._def.DEFAULT_OBJECTIVE in control._def.XERYON_OBJECTIVE_SWITCHER_POS_2: - self.addons.objective_changer.moveToPosition2(move_z=False) + # Xeryon always re-homes (findIndex is fast and required). The turret skips + # homing on a software restart: the motor stays powered across close()/re-init + # and retains its position register, so a re-home would just be wasted motion. + if control._def.USE_XERYON or not skip_init: + self.addons.objective_changer.home() + if control._def.USE_XERYON: + self.addons.objective_changer.setSpeed(control._def.XERYON_SPEED) + self.addons.objective_changer.move_to_objective(control._def.DEFAULT_OBJECTIVE) def _sync_confocal_mode_from_hardware(self) -> bool: """Sync confocal mode state from spinning disk hardware. diff --git a/software/control/modbus_rtu.py b/software/control/modbus_rtu.py new file mode 100644 index 000000000..c19fcf802 --- /dev/null +++ b/software/control/modbus_rtu.py @@ -0,0 +1,519 @@ +from __future__ import annotations + +import logging +import struct +import threading +import time +from typing import Optional + +import serial + +logger = logging.getLogger(__name__) + +# CRC-16 Modbus lookup table (polynomial: 0xA001, initial: 0xFFFF) +CRC16_TABLE = [ + 0x0000, + 0xC0C1, + 0xC181, + 0x0140, + 0xC301, + 0x03C0, + 0x0280, + 0xC241, + 0xC601, + 0x06C0, + 0x0780, + 0xC741, + 0x0500, + 0xC5C1, + 0xC481, + 0x0440, + 0xCC01, + 0x0CC0, + 0x0D80, + 0xCD41, + 0x0F00, + 0xCFC1, + 0xCE81, + 0x0E40, + 0x0A00, + 0xCAC1, + 0xCB81, + 0x0B40, + 0xC901, + 0x09C0, + 0x0880, + 0xC841, + 0xD801, + 0x18C0, + 0x1980, + 0xD941, + 0x1B00, + 0xDBC1, + 0xDA81, + 0x1A40, + 0x1E00, + 0xDEC1, + 0xDF81, + 0x1F40, + 0xDD01, + 0x1DC0, + 0x1C80, + 0xDC41, + 0x1400, + 0xD4C1, + 0xD581, + 0x1540, + 0xD701, + 0x17C0, + 0x1680, + 0xD641, + 0xD201, + 0x12C0, + 0x1380, + 0xD341, + 0x1100, + 0xD1C1, + 0xD081, + 0x1040, + 0xF001, + 0x30C0, + 0x3180, + 0xF141, + 0x3300, + 0xF3C1, + 0xF281, + 0x3240, + 0x3600, + 0xF6C1, + 0xF781, + 0x3740, + 0xF501, + 0x35C0, + 0x3480, + 0xF441, + 0x3C00, + 0xFCC1, + 0xFD81, + 0x3D40, + 0xFF01, + 0x3FC0, + 0x3E80, + 0xFE41, + 0xFA01, + 0x3AC0, + 0x3B80, + 0xFB41, + 0x3900, + 0xF9C1, + 0xF881, + 0x3840, + 0x2800, + 0xE8C1, + 0xE981, + 0x2940, + 0xEB01, + 0x2BC0, + 0x2A80, + 0xEA41, + 0xEE01, + 0x2EC0, + 0x2F80, + 0xEF41, + 0x2D00, + 0xEDC1, + 0xEC81, + 0x2C40, + 0xE401, + 0x24C0, + 0x2580, + 0xE541, + 0x2700, + 0xE7C1, + 0xE681, + 0x2640, + 0x2200, + 0xE2C1, + 0xE381, + 0x2340, + 0xE101, + 0x21C0, + 0x2080, + 0xE041, + 0xA001, + 0x60C0, + 0x6180, + 0xA141, + 0x6300, + 0xA3C1, + 0xA281, + 0x6240, + 0x6600, + 0xA6C1, + 0xA781, + 0x6740, + 0xA501, + 0x65C0, + 0x6480, + 0xA441, + 0x6C00, + 0xACC1, + 0xAD81, + 0x6D40, + 0xAF01, + 0x6FC0, + 0x6E80, + 0xAE41, + 0xAA01, + 0x6AC0, + 0x6B80, + 0xAB41, + 0x6900, + 0xA9C1, + 0xA881, + 0x6840, + 0x7800, + 0xB8C1, + 0xB981, + 0x7940, + 0xBB01, + 0x7BC0, + 0x7A80, + 0xBA41, + 0xBE01, + 0x7EC0, + 0x7F80, + 0xBF41, + 0x7D00, + 0xBDC1, + 0xBC81, + 0x7C40, + 0xB401, + 0x74C0, + 0x7580, + 0xB541, + 0x7700, + 0xB7C1, + 0xB681, + 0x7640, + 0x7200, + 0xB2C1, + 0xB381, + 0x7340, + 0xB101, + 0x71C0, + 0x7080, + 0xB041, + 0x5000, + 0x90C1, + 0x9181, + 0x5140, + 0x9301, + 0x53C0, + 0x5280, + 0x9241, + 0x9601, + 0x56C0, + 0x5780, + 0x9741, + 0x5500, + 0x95C1, + 0x9481, + 0x5440, + 0x9C01, + 0x5CC0, + 0x5D80, + 0x9D41, + 0x5F00, + 0x9FC1, + 0x9E81, + 0x5E40, + 0x5A00, + 0x9AC1, + 0x9B81, + 0x5B40, + 0x9901, + 0x59C0, + 0x5880, + 0x9841, + 0x8801, + 0x48C0, + 0x4980, + 0x8941, + 0x4B00, + 0x8BC1, + 0x8A81, + 0x4A40, + 0x4E00, + 0x8EC1, + 0x8F81, + 0x4F40, + 0x8D01, + 0x4DC0, + 0x4C80, + 0x8C41, + 0x4400, + 0x84C1, + 0x8581, + 0x4540, + 0x8701, + 0x47C0, + 0x4680, + 0x8641, + 0x8201, + 0x42C0, + 0x4380, + 0x8341, + 0x4100, + 0x81C1, + 0x8081, + 0x4040, +] + +FRAME_INTERVAL = 0.003 + +# Modbus exception codes treated as transient — "slave is busy, ask again later". +# Per Modbus spec: 0x05 = ACKNOWLEDGE (slave accepted the request, still processing), +# 0x06 = SLAVE_DEVICE_BUSY (slave is engaged in a long-duration command). +TRANSIENT_EXCEPTION_CODES = {0x05, 0x06} +TRANSIENT_RETRIES = 20 +TRANSIENT_BASE_DELAY_S = 0.1 + + +def calculate_crc(data: bytes | bytearray) -> int: + crc = 0xFFFF + for byte in data: + crc = (crc >> 8) ^ CRC16_TABLE[(crc ^ byte) & 0xFF] + return crc + + +def _append_crc(data: bytes | bytearray) -> bytes: + crc = calculate_crc(data) + return bytes(data) + bytes([crc & 0xFF, (crc >> 8) & 0xFF]) + + +def _verify_crc(data: bytes | bytearray) -> bool: + if len(data) < 3: + return False + payload = data[:-2] + received_crc = data[-2] | (data[-1] << 8) + return calculate_crc(payload) == received_crc + + +def build_read_registers_frame(slave_id: int, address: int, count: int) -> bytes: + frame = struct.pack(">BBHH", slave_id, 0x03, address, count) + return _append_crc(frame) + + +def build_read_input_registers_frame(slave_id: int, address: int, count: int) -> bytes: + frame = struct.pack(">BBHH", slave_id, 0x04, address, count) + return _append_crc(frame) + + +def build_write_register_frame(slave_id: int, address: int, value: int) -> bytes: + frame = struct.pack(">BBHH", slave_id, 0x06, address, value) + return _append_crc(frame) + + +def build_write_multiple_registers_frame(slave_id: int, address: int, values: list[int]) -> bytes: + count = len(values) + byte_count = count * 2 + frame = struct.pack(">BBHHB", slave_id, 0x10, address, count, byte_count) + for v in values: + frame += struct.pack(">H", v) + return _append_crc(frame) + + +class ModbusError(Exception): + def __init__(self, message: str, slave_id: Optional[int] = None): + super().__init__(message) + self.message = message + self.slave_id = slave_id + + def __str__(self) -> str: + return self.message + + +class ModbusRTUClient: + def __init__( + self, + port: Optional[str] = None, + baudrate: int = 115200, + timeout: float = 0.5, + retries: int = 3, + ): + self._port = port + self._baudrate = baudrate + self._timeout = timeout + self._retries = retries + self._serial: Optional[serial.Serial] = None + self._lock = threading.Lock() + + @property + def is_connected(self) -> bool: + return self._serial is not None and self._serial.is_open + + def connect(self, port: Optional[str] = None, baudrate: Optional[int] = None): + if port is not None: + self._port = port + if baudrate is not None: + self._baudrate = baudrate + if self._port is None: + raise ModbusError("No serial port specified") + if self._serial is not None: + self._serial.close() + self._serial = None + try: + self._serial = serial.Serial(self._port, baudrate=self._baudrate, timeout=self._timeout) + except (serial.SerialException, OSError) as e: + raise ModbusError(str(e)) from e + logger.info(f"Modbus RTU connected: {self._port}") + + def disconnect(self): + if self._serial is not None: + try: + self._serial.close() + finally: + self._serial = None + logger.info("Modbus RTU disconnected") + + def _require_connected(self): + if not self.is_connected: + raise ModbusError("Client is not connected") + + def read_register(self, slave_id: int, address: int) -> int: + self._require_connected() + frame = build_read_registers_frame(slave_id, address, 1) + # Response: slave(1) + fc(1) + byte_count(1) + data(2) + crc(2) = 7 + response = self._send_receive(frame, expected_response_len=7) + return (response[3] << 8) | response[4] + + def read_register_32bit(self, slave_id: int, address: int, signed: bool = False) -> int: + self._require_connected() + frame = build_read_registers_frame(slave_id, address, 2) + # Response: slave(1) + fc(1) + byte_count(1) + data(4) + crc(2) = 9 + response = self._send_receive(frame, expected_response_len=9) + high = (response[3] << 8) | response[4] + low = (response[5] << 8) | response[6] + value = (high << 16) | low + if signed and value >= 0x80000000: + value -= 0x100000000 + return value + + def read_input_register(self, slave_id: int, address: int) -> int: + """Read a single 16-bit input register (FC 0x04). + + Input registers are a distinct address space from holding registers — the + same numeric address may refer to different data depending on FC. Some + devices (like the NiMotion stepper) place the status word and current + position in the input-register space, so FC 0x03 would return unrelated + holding-register data. + """ + self._require_connected() + frame = build_read_input_registers_frame(slave_id, address, 1) + response = self._send_receive(frame, expected_response_len=7) + return (response[3] << 8) | response[4] + + def read_input_register_32bit(self, slave_id: int, address: int, signed: bool = False) -> int: + """Read a 32-bit input register pair via FC 0x04 (see read_input_register).""" + self._require_connected() + frame = build_read_input_registers_frame(slave_id, address, 2) + response = self._send_receive(frame, expected_response_len=9) + high = (response[3] << 8) | response[4] + low = (response[5] << 8) | response[6] + value = (high << 16) | low + if signed and value >= 0x80000000: + value -= 0x100000000 + return value + + def write_register(self, slave_id: int, address: int, value: int): + self._require_connected() + frame = build_write_register_frame(slave_id, address, value) + # Response: slave(1) + fc(1) + address(2) + value(2) + crc(2) = 8 + self._send_receive(frame, expected_response_len=8) + + def write_register_32bit(self, slave_id: int, address: int, value: int, signed: bool = False): + self._require_connected() + if signed and value < 0: + value += 0x100000000 + high = (value >> 16) & 0xFFFF + low = value & 0xFFFF + frame = build_write_multiple_registers_frame(slave_id, address, [high, low]) + # Response: slave(1) + fc(1) + address(2) + quantity(2) + crc(2) = 8 + self._send_receive(frame, expected_response_len=8) + + def _send_receive(self, frame: bytes, expected_response_len: int) -> bytes: + with self._lock: + last_error: Optional[Exception] = None + transient_attempts = 0 + attempt = 0 + while attempt <= self._retries: + try: + self._serial.reset_input_buffer() + self._serial.write(frame) + time.sleep(FRAME_INTERVAL) + response = self._serial.read(expected_response_len) + except (serial.SerialException, OSError) as e: + last_error = ModbusError(str(e), slave_id=frame[0]) + logger.warning(f"Modbus request failed (attempt {attempt + 1}/" f"{self._retries + 1}): {e}") + if attempt < self._retries: + time.sleep(FRAME_INTERVAL * 2) + attempt += 1 + continue + + # Exception responses are 5 bytes — check before incomplete check + if len(response) >= 5 and (response[1] & 0x80) and _verify_crc(response[:5]): + exception_code = response[2] + # Transient "slave busy / acknowledge" responses: back off and retry per + # Modbus spec. Doesn't consume a normal retry slot — the slave is working, + # not failing. Capped at TRANSIENT_RETRIES total. + if exception_code in TRANSIENT_EXCEPTION_CODES and transient_attempts < TRANSIENT_RETRIES: + backoff = TRANSIENT_BASE_DELAY_S * (2 ** min(transient_attempts, 4)) + logger.debug( + "Modbus slave busy (FC=0x%02X, code=0x%02X); retry %d/%d after %.2fs", + response[1], + exception_code, + transient_attempts + 1, + TRANSIENT_RETRIES, + backoff, + ) + transient_attempts += 1 + time.sleep(backoff) + continue # redrive without incrementing `attempt` + raise ModbusError( + f"Modbus exception response: FC=0x{response[1]:02X}, " f"code=0x{exception_code:02X}", + slave_id=response[0], + ) + + if len(response) < expected_response_len: + last_error = ModbusError( + f"Incomplete response: expected {expected_response_len} " f"bytes, got {len(response)}", + slave_id=frame[0], + ) + logger.warning( + f"Modbus request failed (attempt {attempt + 1}/" f"{self._retries + 1}): {last_error}" + ) + if attempt < self._retries: + time.sleep(FRAME_INTERVAL * 2) + attempt += 1 + continue + + if not _verify_crc(response): + last_error = ModbusError("CRC verification failed", slave_id=frame[0]) + logger.warning( + f"Modbus request failed (attempt {attempt + 1}/" f"{self._retries + 1}): {last_error}" + ) + if attempt < self._retries: + time.sleep(FRAME_INTERVAL * 2) + attempt += 1 + continue + + return response + + raise last_error + + def __enter__(self) -> "ModbusRTUClient": + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.disconnect() diff --git a/software/control/objective_changer_2_pos_controller.py b/software/control/objective_changer_2_pos_controller.py index 5680b4de7..bf2709e07 100644 --- a/software/control/objective_changer_2_pos_controller.py +++ b/software/control/objective_changer_2_pos_controller.py @@ -60,6 +60,21 @@ def currentPosition(self) -> int: def setSpeed(self, value: float): self.axisX.setSpeed(value) + def move_to_objective(self, objective_name: str) -> None: + """Unified dispatcher to moveToPosition1 / moveToPosition2 based on the + per-machine XERYON_OBJECTIVE_SWITCHER_POS_1/POS_2 lists. Short-circuits + if already at the target position. Raises KeyError for unknown names.""" + from control._def import XERYON_OBJECTIVE_SWITCHER_POS_1, XERYON_OBJECTIVE_SWITCHER_POS_2 + + if objective_name in XERYON_OBJECTIVE_SWITCHER_POS_1: + if self.currentPosition() != 1: + self.moveToPosition1() + elif objective_name in XERYON_OBJECTIVE_SWITCHER_POS_2: + if self.currentPosition() != 2: + self.moveToPosition2() + else: + raise KeyError(f"Unknown objective '{objective_name}' for Xeryon 2-pos changer") + class ObjectiveChanger2PosController_Simulation: def __init__(self, sn: str, stage: Optional[squid.abc.AbstractStage] = None): @@ -97,3 +112,18 @@ def currentPosition(self) -> int: def setSpeed(self, value: float): pass + + def move_to_objective(self, objective_name: str) -> None: + """Unified dispatcher to moveToPosition1 / moveToPosition2 based on the + per-machine XERYON_OBJECTIVE_SWITCHER_POS_1/POS_2 lists. Short-circuits + if already at the target position. Raises KeyError for unknown names.""" + from control._def import XERYON_OBJECTIVE_SWITCHER_POS_1, XERYON_OBJECTIVE_SWITCHER_POS_2 + + if objective_name in XERYON_OBJECTIVE_SWITCHER_POS_1: + if self.currentPosition() != 1: + self.moveToPosition1() + elif objective_name in XERYON_OBJECTIVE_SWITCHER_POS_2: + if self.currentPosition() != 2: + self.moveToPosition2() + else: + raise KeyError(f"Unknown objective '{objective_name}' for Xeryon 2-pos changer") diff --git a/software/control/objective_turret_controller.py b/software/control/objective_turret_controller.py new file mode 100644 index 000000000..786076841 --- /dev/null +++ b/software/control/objective_turret_controller.py @@ -0,0 +1,451 @@ +"""Controller for a motorized 4-position objective turret (NiMotion RS-485 stepper). + +The real controller talks Modbus-RTU to the motor. A simulation twin mirrors +the public API for CI and offline development. +""" + +from __future__ import annotations + +import logging +import time +from typing import Optional + +from serial.tools import list_ports +from control.modbus_rtu import ModbusRTUClient + +import squid.abc + +logger = logging.getLogger(__name__) + +# Turret mechanics +GEAR_RATIO = 132 / 48 +MOTOR_STEPS_PER_REV = 200 +POSITIONS_PER_REV = 4 # 90 degrees per objective +POSITION_TOLERANCE_PULSES = 50 + +# NiMotion Modbus register map +REG_SAVE_PARAMS = 0x0008 +REG_DI_FUNCTION = 0x002C +REG_MICROSTEP = 0x001A +REG_STATUS_WORD = 0x001F +REG_CURRENT_POSITION = 0x0021 +REG_RUN_MODE = 0x0039 +REG_CONTROL_WORD = 0x0051 +REG_TARGET_POSITION = 0x0053 +REG_MAX_SPEED = 0x005B +REG_ACCEL = 0x005F +REG_DECEL = 0x0061 +REG_HOMING_OFFSET = 0x0069 +REG_HOMING_METHOD = 0x006B +REG_ZERO_RETURN = 0x0072 +REG_CLEAR_ERROR_STORAGE = 0x0073 + +# Control word values +CW_DISABLE = 0x0000 +CW_STARTUP = 0x0006 +CW_ENABLE = 0x0007 +CW_RUN_ABSOLUTE = 0x000F +CW_TRIGGER_ABSOLUTE = 0x001F +CW_CLEAR_FAULT = 0x0080 + +# Magic values +SAVE_PARAMS_MAGIC = 0x7376 +CLEAR_ERROR_STORAGE_MAGIC = 0x6C64 + +# Run modes +MODE_POSITION = 1 +MODE_HOMING = 3 + +# Status word bits +STATUS_BIT_FAULT = 1 << 3 +STATUS_BIT_RUNNING = 1 << 12 + +# Motion parameter defaults (auto-calibrated on first connect) +EXPECTED_ACCEL = 200 +EXPECTED_DECEL = 200 +EXPECTED_MAX_SPEED = 250 + +# Homing defaults (auto-calibrated on first connect) +HOMING_METHOD = 17 +HOMING_ORIGIN_OFFSET = 500 +HOMING_ZERO_RETURN = 1 +DI1_FUNCTION_NEG_LIMIT = 1 + +# Polling +POLL_INTERVAL_S = 0.05 +DEFAULT_MOVE_TIMEOUT_S = 10.0 +DEFAULT_HOME_TIMEOUT_S = 30.0 + + +def _resolve_position(objective_name: str, positions: dict) -> int: + try: + return positions[objective_name] + except KeyError: + raise KeyError(f"Unknown objective '{objective_name}'. Valid names: {sorted(positions)}") from None + + +def _find_port(serial_number: str) -> str: + matches = [p.device for p in list_ports.comports() if p.serial_number == serial_number] + if not matches: + raise ValueError(f"No serial device found with serial number: {serial_number}") + if len(matches) > 1: + logger.warning( + "Multiple devices match serial number %s: %s. Using %s.", + serial_number, + matches, + matches[0], + ) + return matches[0] + + +class ObjectiveTurret4PosControllerSimulation: + """In-memory stand-in for ObjectiveTurret4PosController. + + Mirrors the real controller's public API for tests and offline use. + Implements the Z retract/restore dance when a stage reference is provided. + """ + + def __init__( + self, + serial_number: Optional[str] = None, + slave_id: int = 1, + baudrate: int = 115200, + timeout: float = 0.5, + positions: Optional[dict] = None, + stage: Optional[squid.abc.AbstractStage] = None, + ): + from control._def import OBJECTIVE_TURRET_POSITIONS + + self._is_open = True + self._current_objective: Optional[str] = None + self._positions = dict(positions) if positions is not None else dict(OBJECTIVE_TURRET_POSITIONS) + self._stage = stage + logger.info("Simulated turret opened (sn=%s)", serial_number) + + def home(self, timeout_s: float = DEFAULT_HOME_TIMEOUT_S) -> None: + self._require_open() + self._current_objective = None + logger.info("Simulated turret homed") + + def enable(self) -> None: + """Mirror of the real controller's disable -> startup -> enable state-machine cycle.""" + self._require_open() + logger.info("Simulated turret enabled") + + def move_to_objective(self, objective_name: str, timeout_s: float = DEFAULT_MOVE_TIMEOUT_S) -> None: + self._require_open() + _resolve_position(objective_name, self._positions) + if self._current_objective == objective_name: + return + + captured_z = self._retract_z_if_possible() + self._current_objective = objective_name + self._restore_z_if_captured(captured_z) + + logger.info( + "Simulated turret moved to %s (position %d)", + objective_name, + self._positions[objective_name], + ) + + def clear_alarm(self) -> None: + self._require_open() + logger.info("Simulated turret alarm cleared") + + def close(self) -> None: + if self._is_open: + self._is_open = False + logger.info("Simulated turret closed") + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.close() + + @property + def current_objective(self) -> Optional[str]: + return self._current_objective + + @property + def is_open(self) -> bool: + return self._is_open + + def _require_open(self) -> None: + if not self._is_open: + raise RuntimeError("Turret controller is closed") + + def _retract_z_if_possible(self) -> Optional[float]: + """If stage + Z homing are usable, capture Z and move to safe retract. Return captured z, else None.""" + from control._def import HOMING_ENABLED_Z, OBJECTIVE_RETRACTED_POS_MM + + if self._stage is None or not HOMING_ENABLED_Z: + return None + z_mm = self._stage.get_pos().z_mm + self._stage.move_z_to(OBJECTIVE_RETRACTED_POS_MM) + return z_mm + + def _restore_z_if_captured(self, captured_z: Optional[float]) -> None: + if captured_z is None or self._stage is None: + return + self._stage.move_z_to(captured_z) + + +class ObjectiveTurret4PosController: + """Synchronous controller for a 4-position objective turret over Modbus-RTU.""" + + def __init__( + self, + serial_number: str, + slave_id: int = 1, + baudrate: int = 115200, + timeout: float = 0.5, + positions: Optional[dict] = None, + stage: Optional[squid.abc.AbstractStage] = None, + ) -> None: + from control._def import OBJECTIVE_TURRET_POSITIONS + + self._slave_id = slave_id + self._positions = dict(positions) if positions is not None else dict(OBJECTIVE_TURRET_POSITIONS) + self._stage = stage + self._current_objective: Optional[str] = None + self._is_open = False + + port = _find_port(serial_number) + self._modbus = ModbusRTUClient(port=port, baudrate=baudrate, timeout=timeout) + self._modbus.connect() + try: + self.clear_alarm() + + microstep_raw = self._modbus.read_register(self._slave_id, REG_MICROSTEP) + if not 0 <= microstep_raw <= 7: + raise ValueError(f"Invalid microstep register value {microstep_raw} (expected 0..7)") + self._microstep = 2**microstep_raw + self._pulses_per_position = int(MOTOR_STEPS_PER_REV * self._microstep * GEAR_RATIO / POSITIONS_PER_REV) + + changed = [self._calibrate_motion_params(), self._calibrate_homing_config()] + if any(changed): + self._save_to_eeprom() + + logger.info( + "Turret controller ready: port=%s microstep=%d pulses/position=%d calibrated=%s", + port, + self._microstep, + self._pulses_per_position, + any(changed), + ) + + self.enable() + self._is_open = True + except Exception: + self._modbus.disconnect() + raise + + def home(self, timeout_s: float = DEFAULT_HOME_TIMEOUT_S) -> None: + self._require_open() + self._write_control(CW_DISABLE) + self._write_holding(REG_RUN_MODE, MODE_HOMING) + self._write_control(CW_STARTUP) + self._write_control(CW_ENABLE) + self._write_control(CW_RUN_ABSOLUTE) + self._write_control(CW_TRIGGER_ABSOLUTE) + self._wait_until_idle(timeout_s) + self._current_objective = None + + def enable(self) -> None: + """Run the disable -> startup -> enable state-machine cycle.""" + self._write_control(CW_DISABLE) + self._write_control(CW_STARTUP) + self._write_control(CW_ENABLE) + + def move_to_objective(self, objective_name: str, timeout_s: float = DEFAULT_MOVE_TIMEOUT_S) -> None: + self._require_open() + _resolve_position(objective_name, self._positions) + if self._current_objective == objective_name: + return + + captured_z = self._retract_z_if_possible() + self._rotate_to(objective_name, timeout_s) + self._current_objective = objective_name + self._restore_z_if_captured(captured_z) + + def clear_alarm(self) -> None: + self._write_control(CW_CLEAR_FAULT) + self._write_holding(REG_CLEAR_ERROR_STORAGE, CLEAR_ERROR_STORAGE_MAGIC) + + def close(self) -> None: + if not self._is_open and not self._modbus.is_connected: + return + if self._modbus.is_connected: + try: + self._write_control(CW_DISABLE) + except Exception as exc: + logger.warning("Failed to disable motor during close: %s", exc) + self._modbus.disconnect() + self._is_open = False + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.close() + + @property + def pulses_per_position(self) -> int: + return self._pulses_per_position + + @property + def current_position_pulses(self) -> int: + return self._modbus.read_input_register_32bit(self._slave_id, REG_CURRENT_POSITION, signed=True) + + @property + def current_objective(self) -> Optional[str]: + return self._current_objective + + @property + def is_open(self) -> bool: + return self._is_open + + # --- internal helpers --- + + def _require_open(self) -> None: + if not self._is_open: + raise RuntimeError("Turret controller is closed") + + def _rotate_to(self, objective_name: str, timeout_s: float) -> None: + position_index = _resolve_position(objective_name, self._positions) + target_pulses = (position_index - 1) * self._pulses_per_position + + self._write_control(CW_DISABLE) + self._write_holding(REG_RUN_MODE, MODE_POSITION) + self._modbus.write_register_32bit(self._slave_id, REG_TARGET_POSITION, target_pulses, signed=True) + self._write_control(CW_STARTUP) + self._write_control(CW_ENABLE) + self._write_control(CW_RUN_ABSOLUTE) + self._write_control(CW_TRIGGER_ABSOLUTE) + self._wait_for_position(target_pulses, timeout_s) + + def _retract_z_if_possible(self) -> Optional[float]: + from control._def import HOMING_ENABLED_Z, OBJECTIVE_RETRACTED_POS_MM + + if self._stage is None or not HOMING_ENABLED_Z: + return None + z_mm = self._stage.get_pos().z_mm + self._stage.move_z_to(OBJECTIVE_RETRACTED_POS_MM) + return z_mm + + def _restore_z_if_captured(self, captured_z: Optional[float]) -> None: + if captured_z is None or self._stage is None: + return + self._stage.move_z_to(captured_z) + + def _calibrate_one( + self, + addr: int, + expected: int, + label: str, + *, + is_32bit: bool = False, + signed: bool = False, + mask: Optional[int] = None, + ) -> bool: + if is_32bit: + current = self._modbus.read_register_32bit(self._slave_id, addr, signed=signed) + else: + current = self._modbus.read_register(self._slave_id, addr) + desired = (current & ~mask) | expected if mask is not None else expected + if current == desired: + return False + if is_32bit: + self._modbus.write_register_32bit(self._slave_id, addr, desired, signed=signed) + else: + self._modbus.write_register(self._slave_id, addr, desired) + fmt = "0x%08X" if mask is not None else "%d" + logger.info(f"Calibrated %s: {fmt} -> {fmt}", label, current, desired) + return True + + def _calibrate_motion_params(self) -> bool: + return any( + [ + self._calibrate_one(REG_ACCEL, EXPECTED_ACCEL, "accel", is_32bit=True), + self._calibrate_one(REG_DECEL, EXPECTED_DECEL, "decel", is_32bit=True), + self._calibrate_one(REG_MAX_SPEED, EXPECTED_MAX_SPEED, "max_speed", is_32bit=True), + ] + ) + + def _calibrate_homing_config(self) -> bool: + return any( + [ + self._calibrate_one(REG_HOMING_METHOD, HOMING_METHOD, "homing_method"), + self._calibrate_one( + REG_HOMING_OFFSET, + HOMING_ORIGIN_OFFSET, + "homing_offset", + is_32bit=True, + signed=True, + ), + self._calibrate_one(REG_ZERO_RETURN, HOMING_ZERO_RETURN, "zero_return"), + self._calibrate_one( + REG_DI_FUNCTION, + DI1_FUNCTION_NEG_LIMIT, + "DI1_function", + is_32bit=True, + mask=0xF, + ), + ] + ) + + def _save_to_eeprom(self) -> None: + self._write_holding(REG_SAVE_PARAMS, SAVE_PARAMS_MAGIC) + logger.info("Saved parameters to EEPROM") + + def _write_control(self, value: int) -> None: + self._modbus.write_register(self._slave_id, REG_CONTROL_WORD, value) + + def _write_holding(self, address: int, value: int) -> None: + self._modbus.write_register(self._slave_id, address, value) + + def _read_status_word(self) -> int: + return self._modbus.read_input_register(self._slave_id, REG_STATUS_WORD) + + @staticmethod + def _check_fault(status_word: int) -> None: + if status_word & STATUS_BIT_FAULT: + raise RuntimeError(f"Motor reported fault (status word=0x{status_word:04X})") + + def _wait_until_idle(self, timeout_s: float) -> None: + deadline = time.monotonic() + timeout_s + time.sleep(POLL_INTERVAL_S) + while time.monotonic() < deadline: + status = self._read_status_word() + self._check_fault(status) + if not (status & STATUS_BIT_RUNNING): + return + time.sleep(POLL_INTERVAL_S) + raise TimeoutError(f"Motion did not finish within {timeout_s:.1f}s") + + def _wait_for_position(self, target_pulses: int, timeout_s: float) -> None: + # No leading sleep: seen_running prevents stall detection before the motor asserts RUNNING. + deadline = time.monotonic() + timeout_s + seen_running = False + last_pos: Optional[int] = None + while time.monotonic() < deadline: + status = self._read_status_word() + self._check_fault(status) + running = bool(status & STATUS_BIT_RUNNING) + last_pos = self.current_position_pulses + in_tolerance = abs(last_pos - target_pulses) <= POSITION_TOLERANCE_PULSES + + if running: + seen_running = True + if in_tolerance and not running: + return + if seen_running and not running and not in_tolerance: + raise RuntimeError( + f"Motor stopped at {last_pos} pulses, target {target_pulses} " + f"(tolerance ±{POSITION_TOLERANCE_PULSES})" + ) + time.sleep(POLL_INTERVAL_S) + raise TimeoutError( + f"Move to {target_pulses} pulses timed out after {timeout_s:.1f}s " f"(last position={last_pos})" + ) diff --git a/software/control/widgets.py b/software/control/widgets.py index 52323c74e..0f2d0718a 100644 --- a/software/control/widgets.py +++ b/software/control/widgets.py @@ -3557,11 +3557,8 @@ def init_ui(self): def on_objective_changed(self, objective_name): self.objectiveStore.set_current_objective(objective_name) - if USE_XERYON: - if objective_name in XERYON_OBJECTIVE_SWITCHER_POS_1 and self.objective_changer.currentPosition() != 1: - self.objective_changer.moveToPosition1() - elif objective_name in XERYON_OBJECTIVE_SWITCHER_POS_2 and self.objective_changer.currentPosition() != 2: - self.objective_changer.moveToPosition2() + if self.objective_changer is not None: + self.objective_changer.move_to_objective(objective_name) self.signal_objective_changed.emit() diff --git a/software/main_hcs.py b/software/main_hcs.py index 54ac13522..8e2e1ba75 100644 --- a/software/main_hcs.py +++ b/software/main_hcs.py @@ -88,6 +88,11 @@ stage_utils_action.triggered.connect(win.stageUtils.show) microscope_utils_menu.addAction(stage_utils_action) + if control._def.USE_OBJECTIVE_TURRET: + reset_turret_action = QAction("Reset Objective Turret", win) + reset_turret_action.triggered.connect(win.resetObjectiveTurret) + microscope_utils_menu.addAction(reset_turret_action) + workflow_runner_action = QAction("Workflow Runner...", win) workflow_runner_action.triggered.connect(win.openWorkflowRunner) microscope_utils_menu.addAction(workflow_runner_action) diff --git a/software/tests/control/test_objective_changer_2_pos_controller.py b/software/tests/control/test_objective_changer_2_pos_controller.py new file mode 100644 index 000000000..c3ee0661d --- /dev/null +++ b/software/tests/control/test_objective_changer_2_pos_controller.py @@ -0,0 +1,53 @@ +"""Tests for the move_to_objective dispatcher on the Xeryon 2-pos simulation.""" + +from __future__ import annotations + +import control._def +from control.objective_changer_2_pos_controller import ( + ObjectiveChanger2PosController_Simulation, +) + + +class FakeStage: + def __init__(self): + self.z_moves: list[float] = [] + + def move_z(self, delta_mm: float): + self.z_moves.append(delta_mm) + + +def test_move_to_objective_dispatches_pos1(monkeypatch): + monkeypatch.setattr(control._def, "XERYON_OBJECTIVE_SWITCHER_POS_1", ["4x", "10x"]) + monkeypatch.setattr(control._def, "XERYON_OBJECTIVE_SWITCHER_POS_2", ["20x", "40x"]) + sim = ObjectiveChanger2PosController_Simulation(sn="SIM", stage=FakeStage()) + sim.move_to_objective("4x") + assert sim.currentPosition() == 1 + + +def test_move_to_objective_dispatches_pos2(monkeypatch): + monkeypatch.setattr(control._def, "XERYON_OBJECTIVE_SWITCHER_POS_1", ["4x", "10x"]) + monkeypatch.setattr(control._def, "XERYON_OBJECTIVE_SWITCHER_POS_2", ["20x", "40x"]) + sim = ObjectiveChanger2PosController_Simulation(sn="SIM", stage=FakeStage()) + sim.move_to_objective("40x") + assert sim.currentPosition() == 2 + + +def test_move_to_objective_short_circuits_when_already_there(monkeypatch): + monkeypatch.setattr(control._def, "XERYON_OBJECTIVE_SWITCHER_POS_1", ["4x", "10x"]) + monkeypatch.setattr(control._def, "XERYON_OBJECTIVE_SWITCHER_POS_2", ["20x", "40x"]) + stage = FakeStage() + sim = ObjectiveChanger2PosController_Simulation(sn="SIM", stage=stage) + sim.move_to_objective("40x") # pos1 -> pos2: records Z move + z_moves_after_first = list(stage.z_moves) + sim.move_to_objective("40x") # already at pos2: no extra Z move + assert stage.z_moves == z_moves_after_first + + +def test_move_to_objective_unknown_raises(monkeypatch): + monkeypatch.setattr(control._def, "XERYON_OBJECTIVE_SWITCHER_POS_1", ["4x", "10x"]) + monkeypatch.setattr(control._def, "XERYON_OBJECTIVE_SWITCHER_POS_2", ["20x", "40x"]) + import pytest + + sim = ObjectiveChanger2PosController_Simulation(sn="SIM", stage=FakeStage()) + with pytest.raises(KeyError): + sim.move_to_objective("999x") diff --git a/software/tests/control/test_objective_changer_config.py b/software/tests/control/test_objective_changer_config.py new file mode 100644 index 000000000..452e1aefa --- /dev/null +++ b/software/tests/control/test_objective_changer_config.py @@ -0,0 +1,27 @@ +"""Unit tests for mutual-exclusion of Xeryon and turret flags.""" + +import pytest + +from control._def import _validate_objective_changer_flags, OBJECTIVE_TURRET_POSITIONS + + +def test_mutual_exclusion_raises_when_both_true(): + with pytest.raises(ValueError, match="mutually exclusive"): + _validate_objective_changer_flags(use_xeryon=True, use_turret=True) + + +def test_mutual_exclusion_allows_xeryon_only(): + _validate_objective_changer_flags(use_xeryon=True, use_turret=False) + + +def test_mutual_exclusion_allows_turret_only(): + _validate_objective_changer_flags(use_xeryon=False, use_turret=True) + + +def test_mutual_exclusion_allows_neither(): + _validate_objective_changer_flags(use_xeryon=False, use_turret=False) + + +def test_objective_turret_positions_shape(): + assert len(OBJECTIVE_TURRET_POSITIONS) == 4 + assert sorted(OBJECTIVE_TURRET_POSITIONS.values()) == [1, 2, 3, 4] diff --git a/software/tests/control/test_objective_turret_controller.py b/software/tests/control/test_objective_turret_controller.py new file mode 100644 index 000000000..ce15ce4ea --- /dev/null +++ b/software/tests/control/test_objective_turret_controller.py @@ -0,0 +1,147 @@ +"""Tests for ObjectiveTurret4PosControllerSimulation (no hardware required).""" + +from __future__ import annotations + +import pytest + +import control._def +from control._def import OBJECTIVE_TURRET_POSITIONS, OBJECTIVE_RETRACTED_POS_MM +from control.objective_turret_controller import ( + ObjectiveTurret4PosControllerSimulation, +) + + +class FakeStage: + """Records move_z_to calls and reports a preset Z position.""" + + def __init__(self, z_mm: float = 3.5): + self._z_mm = z_mm + self.z_moves: list[float] = [] + + def move_z_to(self, abs_mm: float, blocking: bool = True): + self.z_moves.append(abs_mm) + self._z_mm = abs_mm + + def get_pos(self): + class _Pos: + pass + + p = _Pos() + p.z_mm = self._z_mm + return p + + +def _make_sim(stage=None): + return ObjectiveTurret4PosControllerSimulation( + serial_number="SIM-001", + positions=OBJECTIVE_TURRET_POSITIONS, + stage=stage, + ) + + +def test_init_opens_controller(): + sim = _make_sim() + assert sim.is_open + assert sim.current_objective is None + sim.close() + + +def test_home_clears_current_objective(): + sim = _make_sim() + sim.move_to_objective("10x") + sim.home() + assert sim.current_objective is None + sim.close() + + +@pytest.mark.parametrize("name", list(OBJECTIVE_TURRET_POSITIONS)) +def test_move_to_each_known_objective(name): + sim = _make_sim() + sim.move_to_objective(name) + assert sim.current_objective == name + sim.close() + + +def test_move_unknown_objective_raises_key_error(): + sim = _make_sim() + with pytest.raises(KeyError): + sim.move_to_objective("1000x") + sim.close() + + +def test_clear_alarm_is_callable(): + sim = _make_sim() + sim.clear_alarm() + assert sim.is_open + sim.close() + + +def test_enable_is_callable(): + sim = _make_sim() + sim.enable() + assert sim.is_open + sim.close() + + +def test_operations_after_close_raise(): + sim = _make_sim() + sim.close() + with pytest.raises(RuntimeError): + sim.home() + with pytest.raises(RuntimeError): + sim.move_to_objective("10x") + with pytest.raises(RuntimeError): + sim.clear_alarm() + with pytest.raises(RuntimeError): + sim.enable() + + +def test_close_is_idempotent(): + sim = _make_sim() + sim.close() + sim.close() + assert not sim.is_open + + +def test_context_manager_closes_on_exit(): + with _make_sim() as sim: + sim.move_to_objective("20x") + assert sim.is_open + assert not sim.is_open + + +def test_move_to_objective_retracts_and_restores_z(monkeypatch): + monkeypatch.setattr(control._def, "HOMING_ENABLED_Z", True) + stage = FakeStage(z_mm=3.5) + sim = _make_sim(stage=stage) + + sim.move_to_objective("40x") + + # First switch: retract to OBJECTIVE_RETRACTED_POS_MM, then restore captured z. + assert stage.z_moves == [OBJECTIVE_RETRACTED_POS_MM, 3.5] + assert sim.current_objective == "40x" + + # Second call with same objective: no-op (early exit), no new z motion. + stage.z_moves.clear() + sim.move_to_objective("40x") + assert stage.z_moves == [] + + sim.close() + + +def test_move_to_objective_skips_z_retract_when_no_stage(monkeypatch): + monkeypatch.setattr(control._def, "HOMING_ENABLED_Z", True) + sim = _make_sim(stage=None) + sim.move_to_objective("10x") # must not raise even without a stage + assert sim.current_objective == "10x" + sim.close() + + +def test_move_to_objective_skips_z_retract_when_homing_z_disabled(monkeypatch): + monkeypatch.setattr(control._def, "HOMING_ENABLED_Z", False) + stage = FakeStage(z_mm=3.5) + sim = _make_sim(stage=stage) + sim.move_to_objective("10x") + assert stage.z_moves == [] # retract is gated on HOMING_ENABLED_Z + assert sim.current_objective == "10x" + sim.close()