Skip to content

Fix/segmentation queue fix with parallelization #937

Open
babo989 wants to merge 12 commits intomainfrom
fix/segmentation-hard-stop
Open

Fix/segmentation queue fix with parallelization #937
babo989 wants to merge 12 commits intomainfrom
fix/segmentation-hard-stop

Conversation

@babo989
Copy link
Copy Markdown
Collaborator

@babo989 babo989 commented Apr 22, 2026

Parallel + Queue-Capable Segmenter PR Guide

Branch: fix/segmentation-hard-stopmain on fairscope/PlanktoScope
Date: 2026-04-22
Tested on: PlanktoScope v3.0 device great-door (RPi 5, Bookworm)


TL;DR

Adds mid-pipeline stop support to the segmenter and fixes related issues
surfaced during testing. See docs/segmentation-hard-stop-pr-guide.md for
architecture justification, commit-by-commit narrative, and test results.

Ten commits. The first five (inherited from feature/parallel-segmenter) refactor the segmenter from a monolithic sequential loop into a parallel pipeline with per-image streaming. The last five (new on this branch) add mid-pipeline stop support, fix related bugs surfaced during testing, and restore the buggy magnification chip in the dashboard.

The architectural justification is that the refactor of the segmenter also acts as the enabler for the queue UI features that were merged on the dashboard side (fairscope/dashboard@b2d3d82). Those features were becoming difficult to implement reliably against the previous monolithic segmenter.


Why enable parallel?

The previous segmenter (_pipe() on main) was a single synchronous loop:

for image in acquisition:
    open → flat-correct → mask → slice → save crops → mutate global state → MQTT publish

This shape has three problems that were blocking the dashboard's queue features:

1. No cooperative interrupt point

There is no clean seam between operations to check for a user involved stop if something strange happens. Adding stop checks without instrumenting every internal step (open, threshold, erode, dilate, close, slice, save) made pausing or canceling buggy. MQTT messages, as far as I understand, are pulled only from the segmenter's main run() event loop — and run() is blocked inside the long-running _pipe() call for the entire duration of the process. The result, before this PR: the orange Stop button in the dashboard had no visible effect; segmentation always ran to completion.

The parallel refactor had a nice natural seam — the for future, i, filename in futures await loop, where each completed image is a clean point to check the MQTT buffer, which is nice.

2. Mutating global state is incompatible with queueing

The old _pipe() accumulated objects into self.__global_metadata["objects"] as it went through each image, then exported the EcoTaxa archive at the end. If the user wanted to stop a long acquisition mid-run and segment a different one first, there was no good way to set aside the partial state, and it becomes a mess. Restarting the segmentation meant losing everything done so far and often creating a strange mix of state contamination across acquisitions.

The parallel refactor introduces nice per-image JSON lines that are streaming via streamer.write_image_objects(). Object metadata for each of the completed images is committed to its own .jsonl file in our temp directory, then assembled in an order at the end that makes sense. This is also nice because it means that partial state is there on disk in a well-defined location. Cleanup on interrupt is one nice shutil.rmtree(metadata_dir) call; resumption is becomes feasible if we think that is something a user might find interesting.

3. Stop semantics need to be reliable for queue advancement

The Node-RED queue UI (fairscope/dashboard@b2d3d82) lets users batch multiple acquisitions, drag-reorder pending items, and stop the current item without killing the queue (orange button) or stop the entire queue without killing the current item (red button). All of this can happen way better now because:

  • Stop is fast and visible.
  • Stop on item N doesn't accidentally kill item N+1.
  • A stopped item is re-segmentable (no weird done.txt written).
  • The queue can keep advancing all night, or day.

These guarantees are only true when the segmenter has cooperative interrupt points and clean teardown which the old monolithic _pipe() architecture did not. Patching this was becoming a nightmare.


Commit narrative

29c28c2  Fixed regex ro enable per image indexing for progress bars
5255005  Drain stale messages + reset _interrupt
6e048ac  fixed bug in db.js
28594f4  Fixed __init__.py bug for multiple button presses
440aff7  Add hard-stop in init
711d416  fix: count objects before ecotaxa_export pops them          ← inherited
3594ff2  fix: benchmark initializes process_uuid before calling segment_path  ← inherited
dd10f98  fix: benchmark uses isolated temp directory, never touches real data  ← inherited
951c56e  add benchmark script                                                  ← inherited
db67979  segmenter: parallelize image processing with ProcessPoolExecutor       ← inherited
Commit What & Why
db67979711d416 Parallel segmenter foundation (see "Why parallel?" above and segmenter/PARALLEL_SEGMENTER.md)
440aff7 Add hard-stop in init Adds SegmentationInterrupted exception, _check_for_stop() polling helper, and stop checks in _pipe_sequential, _run_parallel, _pipe, segment_path, segment_list. On stop: skip done.txt, publish Interrupted instead of Done. Generated by add-segmenter-stop.py.
28594f4 Fixed __init__.py bug for multiple button presses The patcher's stop check inside _run_parallel's per-future try/except Exception block had SegmentationInterrupted getting silently caught by the broad except Exception clause and demoted to a "Future failed" log. Adds except SegmentationInterrupted: raise before the broad catch so the exception propagates cleanly out of the futures loop. Discovered when stop appeared to "work" but the pipeline still ran to completion silently.
6e048ac fixed bug in db.js Adds process_pixel: metadata.process_pixel ?? null to the per-acquisition payload returned by getAcquisitionFromPath() in lib/db.js. Without this, the dashboard's Mag chip (getMagInfo(item.process_pixel)) always received undefined and rendered as an empty grey square for every row.
5255005 Drain stale messages + reset _interrupt Hardens against the edge case where a stop click arrives between two queued items: the buffered MQTT message would be consumed by the next item's first _check_for_stop() call and prematurely kill that item. Drains the segmenter MQTT buffer at the top of segment_list() and resets the new _interrupt_requested flag (which makes _check_for_stop() idempotent).
29c28c2 Fixed regex ro enable per image indexing for progress bars Reorders the parallel-mode status string from "Segmented image {filename}, {N}/{M} complete, ..." to "Segmenting image {N}/{M}: {filename}, ...". The dashboard template watches payload.status.match(/image (\d+)\/(\d+)/) — the old format failed to match because the digits weren't preceded by the literal image . Sequential mode was unaffected because its string already had the right shape.

File-by-file changes (this branch's diff vs main)

 lib/db.js                                       |   1 +
 segmenter/PARALLEL_SEGMENTER.md                 | 234 +++++++++++++++
 segmenter/benchmark.py                          | 391 +++++++++++++++++++++++
 segmenter/planktoscope/segmenter/__init__.py    | 320 ++++++++++++++++---
 segmenter/planktoscope/segmenter/ecotaxa.py     |  80 +++--
 segmenter/planktoscope/segmenter/operations.py  |   6 +
 segmenter/planktoscope/segmenter/streamer.py    |  74 +++++
 segmenter/planktoscope/segmenter/worker.py      | 396 +++++++++++++++++++++++
 segmenter/pyproject.toml                        |   2 +-

(benchmark.py, PARALLEL_SEGMENTER.md, streamer.py, worker.py, plus the ecotaxa.py pandas removal and operations.py safety comments — all from the parallel-segmenter foundation. See the original parallel-segmenter-diff-guide.md for those file-by-file details. The new content in this PR is below.)

segmenter/planktoscope/segmenter/__init__.py

The biggest file. Beyond the parallel refactor it now also contains:

Module-level

class SegmentationInterrupted(Exception):
    """Raised when a user requests stop during an active segmentation pipeline."""
    pass

__init__

self._interrupt_requested = False

Per-instance interrupt flag; reset at the start of every segment_list call.

New helper _check_for_stop()

Once flag is set, returns True until reset.

def _check_for_stop(self):
    if self._interrupt_requested:
        return True
    if self.segmenter_client.new_message_received():
        peek = self.segmenter_client.msg
        if peek and peek.get("payload", {}).get("action") == "stop":
            logger.info("Stop requested during active segmentation")
            self.segmenter_client.read_message()
            self._interrupt_requested = True
            return True
    return False

_pipe() — interrupt-aware fallback that handles stoppage nicely

try:
    self._pipe_parallel(images_list, images_count, shm.name, metadata_dir)
except SegmentationInterrupted:
    raise            # Do not fall back to sequential on user stop
except Exception as e:
    logger.error(f"Parallel segmentation failed, falling back to sequential: {e}")
    self._pipe_sequential(images_list, images_count, metadata_dir)

_run_parallel() — nice  stop check + clean re-raise

for future, i, filename in futures:
    try:
        result = await future
        completed += 1
        if self._check_for_stop():
            executor.shutdown(wait=True, cancel_futures=True)
            raise SegmentationInterrupted("User requested stop")
        if "error" in result:
            ...
        else:
            self.segmenter_client.client.publish(
                "status/segmenter",
                json.dumps({
                    "status": f"Segmenting image {completed}/{images_count}: "
                              f"{filename}, "
                              f"{result['object_count']} objects in "
                              f"{result['duration']:.1f}s"
                }),
            )
    except SegmentationInterrupted:
        raise        # ← critical: do NOT let the broad except below swallow this
    except Exception as e:
        completed += 1
        errors.append({"image_name": filename, "error": str(e)})
        logger.error(f"Future failed for {filename}: {e}")

The status-string ordering matters — image {N}/{M} must appear for the dashboard's /image (\d+)\/(\d+)/ regex to match. Can be improved probably later.

_pipe_sequential() — top-of-loop check

for i, filename in enumerate(images_list):
    if self._check_for_stop():
        raise SegmentationInterrupted("User requested stop")
    name = os.path.splitext(filename)[0]
    ...

segment_path() — skip done.txt on interrupt

try:
    self._pipe(ecotaxa_export)
except SegmentationInterrupted:
    logger.info(f"Pipeline interrupted by user for {path}, not marking as done")
    raise
except Exception as e:
    logger.exception(f"There was an error in the pipeline {e}")
    raise e

segment_list() — drain + reset, then handle interrupt

def segment_list(self, path_list, force=False, ecotaxa_export=True):
    logger.info(f"The pipeline will be run in {len(path_list)} directories")
    logger.debug(f"Those are {path_list}")

    # Drain any stop/garbage messages buffered between runs, then reset the
    # interrupt flag so a stale click from a previous run can't kill this one.
    while self.segmenter_client.new_message_received():
        self.segmenter_client.read_message()
    self._interrupt_requested = False

    ...
    for path in path_list:
        ...
        try:
            self.segment_path(path, ecotaxa_export)
        except SegmentationInterrupted:
            logger.info(f"User stopped segmentation at {path}")
            exception = SegmentationInterrupted("User stopped")
            break
        except Exception as e:
            ...

    if exception is None:
        self.segmenter_client.client.publish("status/segmenter", '{"status":"Done"}')
    elif isinstance(exception, SegmentationInterrupted):
        logger.info("Publishing Interrupted status after user stop")
        self.segmenter_client.client.publish("status/segmenter", '{"status":"Interrupted"}')
    else:
        ...

lib/db.js

     gallery: getGalleryPath(path),
     interupted,
     date: metadata.acq_local_datetime,
+    process_pixel: metadata.process_pixel ?? null,
   }

Used by node-red/nodes/list-acquisitions.js (the special list acquisitions Node-RED node); fed into the dashboard's getMagInfo() to colorize the magnification chip.


End-to-end stop flow (after this PR)

User clicks orange "Stop segmentation" in dashboard
  └→ MQTT publish: segmenter/segment {"action":"stop"}
       └→ paho-mqtt background thread stores in segmenter_client.msg
            └→ next _check_for_stop() inside _run_parallel/await loop sees it
                 ├→ logger.info "Stop requested during active segmentation"
                 ├→ self._interrupt_requested = True   (idempotent — survives multiple clicks)
                 └→ raise SegmentationInterrupted
                      └→ except SegmentationInterrupted: raise   (NOT swallowed by broad except)
                           └→ executor.shutdown(wait=True, cancel_futures=True)
                                └→ propagates out of _run_parallel → _pipe → segment_path → segment_list
                                     ├→ segment_path: "not marking as done" → no done.txt
                                     ├→ _pipe finally: cleanup SharedMemory + .metadata_tmp/
                                     ├→ segment_list: break, publish {"status":"Interrupted"}
                                     └→ Node-RED queue processor sees Interrupted
                                          └→ advances to next queued item OR stops queue
                                               └→ next segment_list call drains buffer
                                                    + resets self._interrupt_requested = False

Latency: ~10 or so seconds in testing (one in-flight worker has to finish its current image).


Test results (great-door, 3-worker parallel mode)

Scenario Result
Single click stop, 3-item queue, click during item 2 ✅ Item 1 done, item 2 cleanly stopped, item 3 ran to completion
Single click stop, 3-item queue, click during item 3 (final) ✅ Items 1 + 2 done, item 3 cleanly stopped
done.txt on stopped acquisition ✅ Not written, item is re-segmentable
Status published on stop {"status":"Interrupted"} (not "Done")
Mag chip color (Med/Brown for process_pixel=0.73) ✅ Restored after db.js fix
Per-image progress bar in queue modal ✅ Advances per image after regex fix

Known limitations (out of scope for this PR)

  • Per-object cropped JPGs in /home/pi/data/objects/<acq>/ are not cleaned up when a segmentation is interrupted. Re-segmenting overwrites them; deleting raw images leaves them orphaned. Recommend probably extending the dashboard's "Delete raw images" exec node to also clean the objects dir or have an option or something.
  • EcoTaxa export and UMAP projection (the tail-end phases after the segmentation loop) have no stop checks. A stop click during these phases sits in the buffer and only fires after they complete. Confusing UI flash but no data corruption.
  • Stop latency is bounded by one in-flight image's duration (~4–10 or 15 seconds or so in observed runs). This is inherent to the parallel architecture; cancelling in-flight workers mid-image would require additional coordination and I don't know how to do it.

Out-of-PR coordination

  • Node-RED dashboard changes (the queue UI, stop button, delete dialog) live in fairscope/dashboard@b2d3d82 and are already pushed. The node-red/projects/dashboard submodule pointer in fairscope/PlanktoScope will be advanced separately.

babo989 added 10 commits March 25, 2026 21:05
  - Replace pandas with stdlib csv in ecotaxa.py (removes 60MB dependency)
  - Add streamer.py for incremental JSONL metadata I/O per image
  - Add worker.py with picklable functions for parallel workers
  - Split _pipe() into _pipe_parallel() (asyncio + ProcessPoolExecutor)
    and _pipe_sequential() (preserves flat recalc + remove_previous_mask)
  - Share flat field via multiprocessing.SharedMemory (zero-copy)
  - Default 3 workers for RPi 5, configurable via MQTT worker_count
  - Graceful fallback to sequential on parallel failure
@babo989
Copy link
Copy Markdown
Collaborator Author

babo989 commented Apr 23, 2026

The matching Node-RED changes live on the dashboard subrepo's feature/audit-page-port branch (contains hard-stop b2d3d82, audit page port f61e86b, and layout/browse tweaks on top).

To exercise this end-to-end, after checking out this PR, do:

git submodule update --init
cd node-red/projects/dashboard
git fetch && git checkout feature/audit-page-port

@babo989 babo989 force-pushed the fix/segmentation-hard-stop branch from c7cd36d to 492ec81 Compare April 23, 2026 06:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant