Fix/segmentation queue fix with parallelization #937
Open
Conversation
- Replace pandas with stdlib csv in ecotaxa.py (removes 60MB dependency)
- Add streamer.py for incremental JSONL metadata I/O per image
- Add worker.py with picklable functions for parallel workers
- Split _pipe() into _pipe_parallel() (asyncio + ProcessPoolExecutor)
and _pipe_sequential() (preserves flat recalc + remove_previous_mask)
- Share flat field via multiprocessing.SharedMemory (zero-copy)
- Default 3 workers for RPi 5, configurable via MQTT worker_count
- Graceful fallback to sequential on parallel failure
Collaborator
Author
|
The matching Node-RED changes live on the dashboard subrepo's To exercise this end-to-end, after checking out this PR, do: git submodule update --init
cd node-red/projects/dashboard
git fetch && git checkout feature/audit-page-port |
c7cd36d to
492ec81
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Parallel + Queue-Capable Segmenter PR Guide
Branch:
fix/segmentation-hard-stop→mainonfairscope/PlanktoScopeDate: 2026-04-22
Tested on: PlanktoScope v3.0 device
great-door(RPi 5, Bookworm)TL;DR
Adds mid-pipeline stop support to the segmenter and fixes related issues
surfaced during testing. See docs/segmentation-hard-stop-pr-guide.md for
architecture justification, commit-by-commit narrative, and test results.
Ten commits. The first five (inherited from
feature/parallel-segmenter) refactor the segmenter from a monolithic sequential loop into a parallel pipeline with per-image streaming. The last five (new on this branch) add mid-pipeline stop support, fix related bugs surfaced during testing, and restore the buggy magnification chip in the dashboard.The architectural justification is that the refactor of the segmenter also acts as the enabler for the queue UI features that were merged on the dashboard side (
fairscope/dashboard@b2d3d82). Those features were becoming difficult to implement reliably against the previous monolithic segmenter.Why enable parallel?
The previous segmenter (
_pipe()onmain) was a single synchronous loop:This shape has three problems that were blocking the dashboard's queue features:
1. No cooperative interrupt point
There is no clean seam between operations to check for a user involved stop if something strange happens. Adding stop checks without instrumenting every internal step (open, threshold, erode, dilate, close, slice, save) made pausing or canceling buggy. MQTT messages, as far as I understand, are pulled only from the segmenter's main
run()event loop — andrun()is blocked inside the long-running_pipe()call for the entire duration of the process. The result, before this PR: the orange Stop button in the dashboard had no visible effect; segmentation always ran to completion.The parallel refactor had a nice natural seam — the
for future, i, filename in futuresawait loop, where each completed image is a clean point to check the MQTT buffer, which is nice.2. Mutating global state is incompatible with queueing
The old
_pipe()accumulated objects intoself.__global_metadata["objects"]as it went through each image, then exported the EcoTaxa archive at the end. If the user wanted to stop a long acquisition mid-run and segment a different one first, there was no good way to set aside the partial state, and it becomes a mess. Restarting the segmentation meant losing everything done so far and often creating a strange mix of state contamination across acquisitions.The parallel refactor introduces nice per-image JSON lines that are streaming via
streamer.write_image_objects(). Object metadata for each of the completed images is committed to its own.jsonlfile in our temp directory, then assembled in an order at the end that makes sense. This is also nice because it means that partial state is there on disk in a well-defined location. Cleanup on interrupt is one niceshutil.rmtree(metadata_dir)call; resumption is becomes feasible if we think that is something a user might find interesting.3. Stop semantics need to be reliable for queue advancement
The Node-RED queue UI (
fairscope/dashboard@b2d3d82) lets users batch multiple acquisitions, drag-reorder pending items, and stop the current item without killing the queue (orange button) or stop the entire queue without killing the current item (red button). All of this can happen way better now because:done.txtwritten).These guarantees are only true when the segmenter has cooperative interrupt points and clean teardown which the old monolithic
_pipe()architecture did not. Patching this was becoming a nightmare.Commit narrative
db67979–711d416segmenter/PARALLEL_SEGMENTER.md)440aff7Add hard-stop in initSegmentationInterruptedexception,_check_for_stop()polling helper, and stop checks in_pipe_sequential,_run_parallel,_pipe,segment_path,segment_list. On stop: skipdone.txt, publishInterruptedinstead ofDone. Generated byadd-segmenter-stop.py.28594f4Fixed__init__.pybug for multiple button presses_run_parallel's per-futuretry/except Exceptionblock hadSegmentationInterruptedgetting silently caught by the broadexcept Exceptionclause and demoted to a "Future failed" log. Addsexcept SegmentationInterrupted: raisebefore the broad catch so the exception propagates cleanly out of the futures loop. Discovered when stop appeared to "work" but the pipeline still ran to completion silently.6e048acfixed bug indb.jsprocess_pixel: metadata.process_pixel ?? nullto the per-acquisition payload returned bygetAcquisitionFromPath()inlib/db.js. Without this, the dashboard's Mag chip (getMagInfo(item.process_pixel)) always receivedundefinedand rendered as an empty grey square for every row.5255005Drain stale messages + reset_interrupt_check_for_stop()call and prematurely kill that item. Drains the segmenter MQTT buffer at the top ofsegment_list()and resets the new_interrupt_requestedflag (which makes_check_for_stop()idempotent).29c28c2Fixed regex ro enable per image indexing for progress bars"Segmented image {filename}, {N}/{M} complete, ..."to"Segmenting image {N}/{M}: {filename}, ...". The dashboard template watchespayload.status.match(/image (\d+)\/(\d+)/)— the old format failed to match because the digits weren't preceded by the literalimage. Sequential mode was unaffected because its string already had the right shape.File-by-file changes (this branch's diff vs
main)(
benchmark.py,PARALLEL_SEGMENTER.md,streamer.py,worker.py, plus theecotaxa.pypandas removal andoperations.pysafety comments — all from the parallel-segmenter foundation. See the originalparallel-segmenter-diff-guide.mdfor those file-by-file details. The new content in this PR is below.)segmenter/planktoscope/segmenter/__init__.pyThe biggest file. Beyond the parallel refactor it now also contains:
Module-level
__init__Per-instance interrupt flag; reset at the start of every
segment_listcall.New helper
_check_for_stop()Once flag is set, returns True until reset.
_pipe()— interrupt-aware fallback that handles stoppage nicely_run_parallel()— nice stop check + clean re-raiseThe status-string ordering matters —
image {N}/{M}must appear for the dashboard's/image (\d+)\/(\d+)/regex to match. Can be improved probably later._pipe_sequential()— top-of-loop checksegment_path()— skipdone.txton interruptsegment_list()— drain + reset, then handle interruptlib/db.jsgallery: getGalleryPath(path), interupted, date: metadata.acq_local_datetime, + process_pixel: metadata.process_pixel ?? null, }Used by
node-red/nodes/list-acquisitions.js(the speciallist acquisitionsNode-RED node); fed into the dashboard'sgetMagInfo()to colorize the magnification chip.End-to-end stop flow (after this PR)
Latency: ~10 or so seconds in testing (one in-flight worker has to finish its current image).
Test results (
great-door, 3-worker parallel mode)done.txton stopped acquisition{"status":"Interrupted"}(not "Done")process_pixel=0.73)db.jsfixKnown limitations (out of scope for this PR)
/home/pi/data/objects/<acq>/are not cleaned up when a segmentation is interrupted. Re-segmenting overwrites them; deleting raw images leaves them orphaned. Recommend probably extending the dashboard's "Delete raw images" exec node to also clean the objects dir or have an option or something.Out-of-PR coordination
fairscope/dashboard@b2d3d82and are already pushed. Thenode-red/projects/dashboardsubmodule pointer infairscope/PlanktoScopewill be advanced separately.