diff --git a/segmenter/PARALLEL_SEGMENTER.md b/segmenter/PARALLEL_SEGMENTER.md new file mode 100644 index 000000000..8d42b05e1 --- /dev/null +++ b/segmenter/PARALLEL_SEGMENTER.md @@ -0,0 +1,234 @@ +# Parallel Segmenter — Design & Implementation + +## Overview + +This document details the parallelization of the PlanktoScope segmenter pipeline, the incremental metadata streaming system, and the removal of the pandas dependency. These changes target the Raspberry Pi 5 (4 cores) and aim for near-linear scaling of image segmentation throughput. + +## Problem Statement + +The original segmenter processed images sequentially in a single loop within `_pipe()`. On a 4-core RPi 5, three cores sat idle during the CPU-bound image processing (flat correction, thresholding, morphological operations, regionprops extraction, color analysis). Additionally: + +- All object metadata accumulated in a Python list in memory until the final EcoTaxa export, increasing peak RAM usage unnecessarily. +- The `pandas` library (60MB on disk) was used solely for a single `DataFrame.to_csv()` call to generate a tab-separated file. + +## Architecture + +### Before + +``` +_pipe() + ├── Calculate flat field (median of first 10 images) + └── FOR each image (sequential): + ├── Open image, apply flat correction + ├── Create binary mask (threshold → erode → dilate → close → erode) + ├── Slice: label connected components, extract regionprops + │ ├── For each object: extract morphology, color, blur + │ ├── Save cropped object JPG + │ ├── MQTT publish per-object metrics + │ └── Append object dict to self.__global_metadata["objects"] + ├── MQTT publish per-image progress + └── Flat recalculation heuristic (if object count spikes) + └── ecotaxa_export(): pandas DataFrame → TSV in ZIP archive +``` + +### After + +``` +_pipe() + ├── Calculate flat field (unchanged) + ├── Create SharedMemory for flat array (zero-copy sharing) + ├── Create temp .metadata_tmp/ directory + ├── IF worker_count > 1 AND remove_previous_mask == False: + │ └── _pipe_parallel() — asyncio + ProcessPoolExecutor + │ ├── Dispatch process_single_image() per image to worker pool + │ ├── Each worker: flat correction → mask → slice → write .jsonl + │ ├── Parent awaits results, publishes MQTT progress + │ └── On failure: falls back to _pipe_sequential() + │ ELSE: + │ └── _pipe_sequential() — original loop with streaming + │ ├── Preserves flat recalculation heuristic + │ ├── Preserves remove_previous_mask support + │ └── Writes .jsonl per image instead of in-memory accumulation + ├── Assemble all .jsonl files in image order → global_metadata["objects"] + ├── Cleanup SharedMemory + temp directory + └── ecotaxa_export(): stdlib csv.writer → TSV in ZIP archive +``` + +## File Changes + +### New Files + +#### `segmenter/planktoscope/segmenter/worker.py` + +Module-level (picklable) functions for `ProcessPoolExecutor` workers: + +- **`worker_init(shm_name, flat_shape, flat_dtype)`** — Called once per worker process at pool startup. Attaches to the parent's `SharedMemory` block and creates a numpy array view of the flat field. This is zero-copy — the ~49MB flat array is not duplicated per worker. + +- **`process_single_image(...)`** — The main worker entry point. Receives all parameters as serializable arguments (file paths, scalars, a metadata dict). Performs the complete per-image pipeline: + 1. `cv2.imread()` → divide by shared flat → `rescale_intensity()` + 2. `_create_mask()` — threshold → no_op → erode → dilate → close → erode2 + 3. `_slice_image()` — `skimage.measure.label/regionprops`, extract morphology + color + blur per object, save cropped JPGs + 4. Write object metadata to `.jsonl` via `streamer.write_image_objects()` + 5. Return result dict `{image_name, image_index, object_count, duration}` + + Error handling: entire function wrapped in try/except. On failure, logs the error and returns `{image_name, image_index, error: str}` instead of crashing the pool. + +- **Pure helper functions** (exact copies from `__init__.py`, made module-level): + - `_get_color_info(bgr_img, mask)` — HSV mean/std statistics + - `_extract_metadata_from_regionprop(prop, pixel_size_um=None)` — 24+ morphological features with optional µm calibration via `process_pixel` + - `_augment_slice(dim_slice, max_dims, size)` — Expand bounding box by padding pixels + - `_create_mask(img, debug_path, save_debug)` — Mask pipeline with `no_op` hardcoded for the `remove_previous_mask` slot (parallel mode always disables this) + - `_slice_image(img, name, mask, ...)` — Full object extraction loop. Includes `process_pixel` calibration, threshold value capture, debug image output. No MQTT, no shared state. + +#### `segmenter/planktoscope/segmenter/streamer.py` + +Incremental metadata I/O using JSON Lines format: + +- **`write_image_objects(metadata_dir, image_name, objects)`** — Writes one `.jsonl` file per image. Each line is a JSON object `{"name": "...", "metadata": {...}}` serialized with `NpEncoder` (handles numpy types). File is named `{image_name}.jsonl`, so workers writing different images never conflict. + +- **`read_image_objects(filepath)`** — Reads a `.jsonl` file back into a list of dicts. + +- **`assemble_all_objects(metadata_dir, images_list)`** — Reads all `.jsonl` files in the order of `images_list` (the sorted image filename list). This guarantees deterministic output order regardless of which worker finished first. Returns the combined object list ready for `ecotaxa_export()`. + +### Modified Files + +#### `segmenter/planktoscope/segmenter/__init__.py` + +**New imports:** +- `planktoscope.segmenter.streamer` +- `planktoscope.segmenter.worker` + +**New instance variable:** +- `self.__worker_count = 3` — Default for RPi 5 (4 cores, 1 reserved for Node-RED/system). Configurable via MQTT `settings.worker_count`. + +**`_slice_image()` changes:** +- Now returns a 3-tuple: `(object_count, unfiltered_count, objects_list)` instead of `(object_count, unfiltered_count)`. +- Objects are collected in a local `objects_list` and returned, instead of being appended to `self.__global_metadata["objects"]`. +- MQTT per-object publishes remain in place (they execute in the main process during sequential mode). + +**`_pipe()` rewrite:** +- Calculates flat field (unchanged). +- Creates `SharedMemory` for the flat array and a temp `.metadata_tmp/` directory. +- Branches to `_pipe_parallel()` or `_pipe_sequential()` based on `worker_count` and `remove_previous_mask`. +- After processing: calls `streamer.assemble_all_objects()` to rebuild the ordered object list. +- Cleanup in `finally` block: `shm.close()`, `shm.unlink()`, `shutil.rmtree(metadata_dir)`. +- Graceful degradation: if parallel dispatch raises an exception, catches it, logs a warning, and falls back to `_pipe_sequential()`. + +**`_pipe_parallel()` (new method):** +- Uses `asyncio.run()` scoped to this method only (the rest of the segmenter remains synchronous). +- Creates `ProcessPoolExecutor(max_workers=N, initializer=worker_init)`. +- Dispatches `process_single_image()` per image via `loop.run_in_executor()`. +- Awaits all futures, publishes MQTT progress as each completes. +- Collects errors from failed workers and logs them after the batch. + +**`_pipe_sequential()` (new method):** +- Preserves the original sequential loop exactly, including: + - Flat recalculation heuristic (object count > average + 20) + - `remove_previous_mask` support + - Per-image and per-object MQTT publishes +- Only difference: writes objects via `streamer.write_image_objects()` after each image instead of appending to `self.__global_metadata["objects"]`. + +**`treat_message()` addition:** +- Parses `settings.worker_count` from MQTT segmentation requests (default: 3). + +#### `segmenter/planktoscope/segmenter/ecotaxa.py` + +Complete rewrite of the TSV generation, removing the `pandas` and `numpy` dependencies: + +- **Removed:** `import pandas`, `import numpy`, `dtype_to_ecotaxa()` function. +- **Added:** `import csv`, `_infer_ecotaxa_type(value)` function — uses `isinstance(value, (int, float))` to determine `[f]` vs `[t]` annotation. +- **`ecotaxa_export()`** now builds rows as a list of dicts, determines column order from the first row, writes using `csv.writer(buf, delimiter="\t")`: + - Row 1: column names + - Row 2: EcoTaxa type annotations (`[f]` or `[t]`) + - Rows 3+: data values +- Output format is identical to the previous pandas-based output. + +#### `segmenter/planktoscope/segmenter/operations.py` + +Comments added to the module-level globals `__mask_to_remove` and `__last_threshold_value` explaining: +- `__mask_to_remove` is NOT safe for parallel use (requires sequential processing). +- `__last_threshold_value` is safe in parallel mode because each worker process gets its own copy via fork/spawn. + +#### `segmenter/pyproject.toml` + +- Removed `pandas>=2.3.3,<3` from `dependencies`. No new dependencies added — `asyncio`, `concurrent.futures`, `csv`, `json`, `multiprocessing.shared_memory` are all Python stdlib. + +## Key Design Decisions + +### Why ProcessPoolExecutor (not threading)? + +The segmentation workload is CPU-bound (numpy, opencv, scikit-image). Python's GIL prevents threads from achieving true parallelism for CPU work. `ProcessPoolExecutor` spawns real OS processes that run on separate cores. + +### Why SharedMemory for the flat field? + +The flat field is a ~49MB float64 array (4056×3040×3). Without shared memory, pickling it into each worker would cost 49MB × N workers in additional memory. `multiprocessing.shared_memory.SharedMemory` provides zero-copy access — all workers read the same physical memory. + +### Why JSON Lines (not CSV or SQLite)? + +- Each worker writes to a uniquely-named file (`{image_name}.jsonl`), so there's no locking or contention. +- Crash-resilient: if a worker fails, completed images' results survive. +- Assembly in deterministic order is trivial: read files in the sorted `images_list` order. +- JSON naturally handles the nested metadata structure and numpy type conversion (via `NpEncoder`). + +### Why disable flat recalculation in parallel mode? + +The flat recalculation heuristic checks if the current image's object count exceeds the running average by >20. This creates a sequential feedback loop — you can't know whether to recalculate until the previous image is done. The heuristic was already flagged with `TODO: this heuristic should be improved or removed if deemed unnecessary`. In parallel mode, the flat is computed once from the first 10 images and used for all. + +### Why force sequential when remove_previous_mask is enabled? + +`remove_previous_mask` subtracts the previous image's mask from the current image's mask. This is a strict image-to-image dependency that cannot be parallelized. When enabled (`remove_previous_mask=True` in MQTT settings), the segmenter automatically falls back to sequential processing. + +### Why asyncio.run() scoped to _pipe_parallel() only? + +The segmenter runs as a `multiprocessing.Process` with a synchronous main loop polling MQTT every 0.5s. Converting the entire process to async would require replacing `paho-mqtt` with an async MQTT library and restructuring the process lifecycle — a large refactor with marginal benefit since the bottleneck is CPU-bound image processing, not I/O. Scoping `asyncio.run()` to just the parallel dispatch method is the minimal, low-risk approach. + +### Why default to 3 workers? + +The RPi 5 has 4 cores. Leaving 1 core for Node-RED, MQTT, and system processes prevents the segmenter from starving the dashboard UI. Each worker uses ~150MB for image processing buffers. With the shared flat (~49MB) + parent overhead (~100MB) + 3 workers (~450MB) = ~600MB total, well within the Pi 5's available RAM. + +## Memory Profile + +| Component | Memory | Notes | +|-----------|--------|-------| +| Flat field (shared) | ~49MB | Single copy via SharedMemory, read by all workers | +| Per-worker image buffers | ~150MB | BGR image + mask + working arrays | +| Parent process overhead | ~100MB | MQTT client, metadata, orchestration | +| **Total (3 workers)** | **~650MB** | Fits comfortably in RPi 5 RAM | +| **Total (1 worker, sequential)** | **~300MB** | Same as original behavior | + +## Configuration + +### MQTT Settings + +The `worker_count` parameter can be sent in the segmentation MQTT message: + +```json +{ + "action": "segment", + "path": "/path/to/images", + "settings": { + "worker_count": 3, + "force": false, + "recursive": true, + "ecotaxa": true, + "keep": true, + "process_min_ESD": 20, + "remove_previous_mask": false + } +} +``` + +- `worker_count`: Number of parallel workers (default: 3). Set to 1 to force sequential processing. +- `remove_previous_mask`: When `true`, forces sequential processing regardless of `worker_count`. + +## Testing Checklist + +- [ ] Sequential mode (`worker_count=1`): output identical to pre-refactor +- [ ] Sequential mode with `remove_previous_mask=true`: behavior preserved +- [ ] Parallel mode (`worker_count=3`): same objects detected, same EcoTaxa TSV content (order is deterministic because assembly reads in image-list order) +- [ ] Parallel mode with image processing failure: error logged, remaining images processed, pipeline completes +- [ ] EcoTaxa ZIP archive: TSV format matches expectations (2-row header with `[f]`/`[t]` annotations) +- [ ] MQTT dashboard: progress updates still appear during segmentation +- [ ] Memory: RSS stays within expected bounds during parallel run +- [ ] `process_pixel` calibration: µm/µm² measurements correct in both modes +- [ ] Threshold value captured in object metadata in both modes diff --git a/segmenter/benchmark.py b/segmenter/benchmark.py new file mode 100644 index 000000000..c22c8ee27 --- /dev/null +++ b/segmenter/benchmark.py @@ -0,0 +1,404 @@ +#!/usr/bin/env python3 +"""Benchmark script for PlanktoScope segmenter performance. + +Runs segmentation on a test acquisition folder and reports timing, throughput, +memory usage, and object counts. Works on both the original (main) and +parallel (feature/parallel-segmenter) branches. + +Usage: + # On the Pi, from the segmenter/ directory: + python3 benchmark.py /path/to/acquisition/folder [--workers N] [--runs N] [--validate REF] + + # Examples: + # Baseline (main branch, or new branch with sequential): + python3 benchmark.py /data/img/20210122/sample_1/acq_1 + + # Parallel mode (new branch only): + python3 benchmark.py /data/img/20210122/sample_1/acq_1 --workers 3 + + # Multiple runs for stable averages: + python3 benchmark.py /data/img/20210122/sample_1/acq_1 --workers 3 --runs 3 + + # Validate output against a reference (baseline) result: + python3 benchmark.py /data/img/20210122/sample_1/acq_1 --workers 3 --validate baseline_result.json + +Workflow: + 1. git checkout main + python3 benchmark.py /path/to/acq --runs 3 + # Save the output JSON as baseline_result.json + + 2. git checkout feature/parallel-segmenter + python3 benchmark.py /path/to/acq --workers 1 --runs 3 + # Compare sequential-on-new-branch vs baseline + + 3. python3 benchmark.py /path/to/acq --workers 3 --runs 3 --validate baseline_result.json + # Parallel mode, validate objects match baseline +""" + +import argparse +import datetime +import json +import os +import platform +import resource +import shutil +import sys +import time + + +def get_peak_rss_mb(): + """Get peak resident set size in MB (Linux/macOS).""" + # ru_maxrss is in KB on Linux, bytes on macOS + usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + if platform.system() == "Darwin": + return usage / (1024 * 1024) + return usage / 1024 + + +def prepare_test_folder(source_path, work_dir): + """Copy acquisition folder to a clean working directory. + + We copy so that: + - done.txt from previous runs doesn't skip processing + - Object images from previous runs don't interfere + - The original data is never modified + """ + if os.path.exists(work_dir): + shutil.rmtree(work_dir) + + # Copy only the source images and metadata.json + os.makedirs(work_dir, exist_ok=True) + for f in os.listdir(source_path): + src = os.path.join(source_path, f) + if os.path.isfile(src) and ( + f.endswith((".jpg", ".JPG", ".jpeg", ".JPEG")) or f == "metadata.json" + ): + shutil.copy2(src, os.path.join(work_dir, f)) + + # Remove done.txt if it was copied + done_file = os.path.join(work_dir, "done.txt") + if os.path.exists(done_file): + os.remove(done_file) + + return work_dir + + +def count_images(path): + """Count JPG images in a directory.""" + return len([f for f in os.listdir(path) if f.lower().endswith((".jpg", ".jpeg"))]) + + +def run_segmentation(data_path, acq_path, worker_count=None): + """Run the segmenter pipeline and return metrics. + + Args: + data_path: Root data directory (parent of img/, objects/, export/) + acq_path: Path to the acquisition folder to segment + worker_count: Number of workers (None = use branch default) + + Returns: + dict with timing, object count, and memory metrics + """ + import multiprocessing + + # Import segmenter (works on both branches) + from planktoscope.segmenter import SegmenterProcess + + # Create a mock event (never set — we don't use the run loop) + event = multiprocessing.Event() + seg = SegmenterProcess(event, data_path) + + # On the new branch, set worker_count if provided + if worker_count is not None and hasattr(seg, "_SegmenterProcess__worker_count"): + seg._SegmenterProcess__worker_count = worker_count + + # We need to set up a minimal MQTT mock since _pipe publishes status + class MockMQTTClient: + def publish(self, topic, payload): + pass # Discard all MQTT publishes during benchmark + + class MockSegmenterClient: + def __init__(self): + self.client = MockMQTTClient() + + seg.segmenter_client = MockSegmenterClient() + + # Set up state that segment_list() normally initializes before calling segment_path() + from uuid import uuid4 + + seg._SegmenterProcess__process_uuid = str(uuid4()) + if seg._SegmenterProcess__process_id == "": + seg._SegmenterProcess__process_id = seg._SegmenterProcess__process_uuid + + # Measure + rss_before = get_peak_rss_mb() + start_time = time.monotonic() + + # Run without ecotaxa export first so we can count objects before pop() + seg.segment_path(acq_path, ecotaxa_export=False) + + elapsed = time.monotonic() - start_time + rss_after = get_peak_rss_mb() + + # Count results (must read before ecotaxa_export pops "objects") + objects = seg._SegmenterProcess__global_metadata.get("objects", []) + object_count = len(objects) + object_names = sorted([obj["name"] for obj in objects]) + + # Now run ecotaxa export separately + obj_path = seg._SegmenterProcess__working_obj_path + archive_path = seg._SegmenterProcess__archive_fn + import planktoscope.segmenter.ecotaxa + + archive_dir = os.path.dirname(archive_path) + os.makedirs(archive_dir, exist_ok=True) + planktoscope.segmenter.ecotaxa.ecotaxa_export( + archive_path, + seg._SegmenterProcess__global_metadata, + obj_path, + keep_files=True, + ) + + # Count output files + obj_images = ( + len([f for f in os.listdir(obj_path) if f.endswith(".jpg")]) + if os.path.exists(obj_path) + else 0 + ) + + # Check for ecotaxa archive + archive_exists = os.path.exists(archive_path) + archive_size_mb = os.path.getsize(archive_path) / (1024 * 1024) if archive_exists else 0 + + return { + "elapsed_seconds": round(elapsed, 2), + "object_count": object_count, + "object_images_saved": obj_images, + "archive_exists": archive_exists, + "archive_size_mb": round(archive_size_mb, 2), + "peak_rss_mb": round(rss_after, 1), + "rss_delta_mb": round(rss_after - rss_before, 1), + "object_names": object_names, + } + + +def validate_against_reference(result, ref_path): + """Compare result against a saved reference.""" + with open(ref_path, "r") as f: + ref = json.load(f) + + issues = [] + + # Compare object counts + if result["object_count"] != ref["object_count"]: + issues.append( + f"Object count mismatch: got {result['object_count']}, expected {ref['object_count']}" + ) + + # Compare object names (deterministic order check) + ref_names = ref.get("object_names", []) + result_names = result.get("object_names", []) + if ref_names and result_names: + missing = set(ref_names) - set(result_names) + extra = set(result_names) - set(ref_names) + if missing: + issues.append( + f"Missing objects: {sorted(missing)[:10]}{'...' if len(missing) > 10 else ''}" + ) + if extra: + issues.append(f"Extra objects: {sorted(extra)[:10]}{'...' if len(extra) > 10 else ''}") + + return issues + + +def main(): + parser = argparse.ArgumentParser(description="Benchmark PlanktoScope segmenter performance") + parser.add_argument( + "acquisition_path", + help="Path to acquisition folder (must contain metadata.json + images)", + ) + parser.add_argument( + "--workers", + type=int, + default=None, + help="Number of parallel workers (default: branch default). " + "Only works on feature/parallel-segmenter branch.", + ) + parser.add_argument( + "--runs", + type=int, + default=1, + help="Number of runs to average (default: 1)", + ) + parser.add_argument( + "--validate", + type=str, + default=None, + help="Path to a reference result JSON to validate against", + ) + parser.add_argument( + "--output", + type=str, + default=None, + help="Save result JSON to this path (for use as --validate reference)", + ) + args = parser.parse_args() + + acq_path = os.path.abspath(args.acquisition_path) + if not os.path.exists(os.path.join(acq_path, "metadata.json")): + print(f"ERROR: No metadata.json found in {acq_path}") + sys.exit(1) + + image_count = count_images(acq_path) + if image_count == 0: + print(f"ERROR: No images found in {acq_path}") + sys.exit(1) + + # SAFETY: Always use an isolated temp directory as the data root. + # The segmenter writes to data_path/objects/, data_path/export/, data_path/clean/. + # We must NEVER point data_path at the real /home/pi/data/ to avoid destroying + # previously segmented results. + data_path = os.path.join("/tmp", "benchmark_data") + + # Detect branch + try: + import subprocess + + branch = ( + subprocess.check_output( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], + stderr=subprocess.DEVNULL, + ) + .decode() + .strip() + ) + except Exception: + branch = "unknown" + + worker_label = args.workers if args.workers else "default" + + print("=" * 60) + print("PlanktoScope Segmenter Benchmark") + print("=" * 60) + print(f" Branch: {branch}") + print(f" Acquisition: {acq_path}") + print(f" Images: {image_count}") + print(f" Data path: {data_path}") + print(f" Workers: {worker_label}") + print(f" Runs: {args.runs}") + print(f" Date: {datetime.datetime.now().isoformat()}") + print("=" * 60) + + all_results = [] + for run_num in range(1, args.runs + 1): + print(f"\n--- Run {run_num}/{args.runs} ---") + + # Clean the entire temp data root between runs for isolation + if os.path.exists(data_path): + shutil.rmtree(data_path) + + # Prepare clean working copy of acquisition images + # Build the img/ subdirectory structure the segmenter expects + # e.g., /tmp/benchmark_data/img/DATE/SAMPLE/ACQ/ + img_root = os.path.join(data_path, "img") + work_dir = os.path.join(img_root, "benchmark_date", "benchmark_sample", "benchmark_acq") + prepare_test_folder(acq_path, work_dir) + + # Create the output directories the segmenter needs + for subdir in ["objects", "export", "clean"]: + os.makedirs(os.path.join(data_path, subdir), exist_ok=True) + + try: + result = run_segmentation(data_path, work_dir, args.workers) + all_results.append(result) + + print(f" Time: {result['elapsed_seconds']}s") + print(f" Objects found: {result['object_count']}") + print(f" Object images: {result['object_images_saved']}") + print(f" Archive created: {result['archive_exists']}") + if result["archive_exists"]: + print(f" Archive size: {result['archive_size_mb']} MB") + print(f" Peak RSS: {result['peak_rss_mb']} MB") + imgs_per_sec = ( + image_count / result["elapsed_seconds"] if result["elapsed_seconds"] > 0 else 0 + ) + objs_per_sec = ( + result["object_count"] / result["elapsed_seconds"] + if result["elapsed_seconds"] > 0 + else 0 + ) + print(f" Throughput: {imgs_per_sec:.2f} images/s, {objs_per_sec:.1f} objects/s") + except Exception as e: + print(f" ERROR: {e}") + import traceback + + traceback.print_exc() + all_results.append({"error": str(e)}) + finally: + pass + + # Summary + successful = [r for r in all_results if "error" not in r] + if not successful: + print("\nAll runs failed!") + sys.exit(1) + + print("\n" + "=" * 60) + print("SUMMARY") + print("=" * 60) + + avg_time = sum(r["elapsed_seconds"] for r in successful) / len(successful) + avg_objects = sum(r["object_count"] for r in successful) / len(successful) + max_rss = max(r["peak_rss_mb"] for r in successful) + avg_imgs_per_sec = image_count / avg_time if avg_time > 0 else 0 + avg_objs_per_sec = avg_objects / avg_time if avg_time > 0 else 0 + + summary = { + "branch": branch, + "workers": args.workers, + "image_count": image_count, + "runs": len(successful), + "avg_elapsed_seconds": round(avg_time, 2), + "avg_object_count": round(avg_objects), + "avg_images_per_second": round(avg_imgs_per_sec, 3), + "avg_objects_per_second": round(avg_objs_per_sec, 1), + "max_peak_rss_mb": round(max_rss, 1), + "object_count": successful[-1]["object_count"], + "object_names": successful[-1].get("object_names", []), + "date": datetime.datetime.now().isoformat(), + } + + print(f" Branch: {branch}") + print(f" Workers: {worker_label}") + print(f" Avg time: {avg_time:.2f}s ({len(successful)} runs)") + print(f" Avg objects: {int(avg_objects)}") + print(f" Throughput: {avg_imgs_per_sec:.3f} images/s") + print(f" Throughput: {avg_objs_per_sec:.1f} objects/s") + print(f" Max peak RSS: {max_rss:.1f} MB") + + # Validation + if args.validate: + print(f"\n--- Validation against {args.validate} ---") + issues = validate_against_reference(summary, args.validate) + if issues: + print(" VALIDATION FAILED:") + for issue in issues: + print(f" - {issue}") + else: + print(" VALIDATION PASSED: object counts and names match") + + # Save output + output_path = args.output + if output_path is None: + safe_branch = branch.replace("/", "_") + worker_str = f"_w{args.workers}" if args.workers else "" + output_path = f"benchmark_{safe_branch}{worker_str}.json" + + with open(output_path, "w") as f: + json.dump(summary, f, indent=2) + print(f"\n Results saved to: {output_path}") + print("=" * 60) + + +if __name__ == "__main__": + main() diff --git a/segmenter/planktoscope/segmenter/__init__.py b/segmenter/planktoscope/segmenter/__init__.py index 9f83e6f65..af2321961 100644 --- a/segmenter/planktoscope/segmenter/__init__.py +++ b/segmenter/planktoscope/segmenter/__init__.py @@ -50,6 +50,8 @@ import planktoscope.segmenter.ecotaxa import planktoscope.segmenter.encoder import planktoscope.segmenter.operations +import planktoscope.segmenter.streamer +import planktoscope.segmenter.worker logger.info("planktoscope.segmenter is loaded") @@ -57,6 +59,12 @@ ################################################################################ # Main Segmenter class ################################################################################ +class SegmentationInterrupted(Exception): + """Raised when a user requests stop during an active segmentation pipeline.""" + + pass + + class SegmenterProcess(multiprocessing.Process): """This class contains the main definitions for the segmenter of the PlanktoScope""" @@ -101,6 +109,8 @@ def __init__(self, event, data_path): self.__process_min_ESD = 20 # microns # https://planktoscope.slack.com/archives/C01V5ENKG0M/p1714146253356569 self.__remove_previous_mask = False + self.__worker_count = 3 # default for RPi 5 (4 cores, leave 1 for system) + self._interrupt_requested = False # create all base path for path in [ @@ -434,6 +444,8 @@ def __augment_slice(dim_slice, max_dims, size=10): dim_slice = tuple(dim_slice) return dim_slice + objects_list = [] + labels, nlabels = skimage.measure.label(mask, return_num=True) regionprops = skimage.measure.regionprops(labels) @@ -528,10 +540,7 @@ def __augment_slice(dim_slice, max_dims, size=10): json.dumps(object_metadata, cls=planktoscope.segmenter.encoder.NpEncoder), ) - if "objects" in self.__global_metadata: - self.__global_metadata["objects"].append(object_metadata) - else: - self.__global_metadata.update({"objects": [object_metadata]}) + objects_list.append(object_metadata) if self.__save_debug_img: if object_number: @@ -571,7 +580,23 @@ def __augment_slice(dim_slice, max_dims, size=10): img, os.path.join(self.__working_debug_path, "tagged.jpg"), ) - return (object_number, len(regionprops)) + return (object_number, len(regionprops), objects_list) + + def _check_for_stop(self): + """Check if a stop request arrived via MQTT during the pipeline. + + Idempotent — once True, stays True until segment_list() resets it. + """ + if self._interrupt_requested: + return True + if self.segmenter_client.new_message_received(): + peek = self.segmenter_client.msg + if peek and peek.get("payload", {}).get("action") == "stop": + logger.info("Stop requested during active segmentation") + self.segmenter_client.read_message() + self._interrupt_requested = True + return True + return False def _pipe(self, ecotaxa_export): logger.info("Finding images") @@ -587,34 +612,200 @@ def _pipe(self, ecotaxa_export): else: logger.debug(f"We found {images_count} images, good luck!") - first_start = time.monotonic() - self.__mask_to_remove = None - # average = 0 - total_objects = 0 - average_objects = 0 - recalculate_flat = True - # TODO check image list here to find if a flat exists - # we recalculate the flat every 10 pictures - if recalculate_flat: - recalculate_flat = False - self.segmenter_client.client.publish( - "status/segmenter", '{"status":"Calculating flat"}' + # Calculate initial flat field + self.segmenter_client.client.publish("status/segmenter", '{"status":"Calculating flat"}') + if images_count < 10: + self._calculate_flat(images_list[0:images_count], images_count, self.__working_path) + else: + self._calculate_flat(images_list[0:10], 10, self.__working_path) + + if self.__save_debug_img: + self._save_image( + self.__flat, + os.path.join(self.__working_debug_path, "flat_color.jpg"), + ) + + # Create temp directory for intermediate metadata + metadata_dir = os.path.join(self.__working_obj_path, ".metadata_tmp") + os.makedirs(metadata_dir, exist_ok=True) + + # Decide parallel vs sequential + use_parallel = self.__worker_count > 1 and not self.__remove_previous_mask + + shm = None + try: + if use_parallel: + # Create shared memory for the flat field array + import multiprocessing.shared_memory + + flat_bytes = self.__flat.nbytes + shm = multiprocessing.shared_memory.SharedMemory(create=True, size=flat_bytes) + flat_shared = np.ndarray(self.__flat.shape, dtype=self.__flat.dtype, buffer=shm.buf) + flat_shared[:] = self.__flat[:] + + try: + self._pipe_parallel(images_list, images_count, shm.name, metadata_dir) + except SegmentationInterrupted: + raise # Do not fall back to sequential on user stop + except Exception as e: + logger.error(f"Parallel segmentation failed, falling back to sequential: {e}") + self._pipe_sequential(images_list, images_count, metadata_dir) + else: + if self.__remove_previous_mask and self.__worker_count > 1: + logger.info("remove_previous_mask is enabled — using sequential processing") + self._pipe_sequential(images_list, images_count, metadata_dir) + + # Assemble all objects from .jsonl files in image order + all_objects = planktoscope.segmenter.streamer.assemble_all_objects( + metadata_dir, images_list ) - if images_count < 10: - self._calculate_flat(images_list[0:images_count], images_count, self.__working_path) + self.__global_metadata["objects"] = all_objects + total_objects = len(all_objects) + logger.success(f"Total objects assembled: {total_objects}") + + finally: + # Cleanup shared memory + if shm is not None: + shm.close() + shm.unlink() + # Cleanup temp metadata dir + import shutil + + shutil.rmtree(metadata_dir, ignore_errors=True) + + if ecotaxa_export: + if "objects" in self.__global_metadata and self.__global_metadata["objects"]: + if planktoscope.segmenter.ecotaxa.ecotaxa_export( + self.__archive_fn, + self.__global_metadata, + self.__working_obj_path, + keep_files=True, + ): + logger.success("Ecotaxa archive export completed for this folder") + else: + logger.error("The ecotaxa export could not be completed") else: - self._calculate_flat(images_list[0:10], 10, self.__working_path) + logger.info("There are no objects to export") + else: + logger.info("We are not creating the ecotaxa output archive for this folder") - if self.__save_debug_img: - self._save_image( - self.__flat, - os.path.join(self.__working_debug_path, "flat_color.jpg"), + # cleanup + # we're done free some mem + self.__flat = None + + def _pipe_parallel(self, images_list, images_count, shm_name, metadata_dir): + """Process images in parallel using asyncio + ProcessPoolExecutor.""" + import asyncio + import concurrent.futures + + first_start = time.monotonic() + + # Build the base debug path (without per-image suffix) + sample_rel = self.__working_path.split(self.__img_path)[1].strip() + + async def _run_parallel(): + loop = asyncio.get_event_loop() + executor = concurrent.futures.ProcessPoolExecutor( + max_workers=self.__worker_count, + initializer=planktoscope.segmenter.worker.worker_init, + initargs=( + shm_name, + self.__flat.shape, + str(self.__flat.dtype), + ), + ) + + # Build a serializable copy of metadata for workers + # (excludes non-serializable items, keeps process_pixel etc.) + worker_metadata = {k: v for k, v in self.__global_metadata.items() if k != "objects"} + + futures = [] + for i, filename in enumerate(images_list): + name = os.path.splitext(filename)[0] + debug_path = os.path.join(self.__debug_objects_root, sample_rel, name) + + future = loop.run_in_executor( + executor, + planktoscope.segmenter.worker.process_single_image, + os.path.join(self.__working_path, filename), + name, + i, + images_count, + self.__working_obj_path, + debug_path, + metadata_dir, + self.__save_debug_img, + self.__process_min_ESD, + worker_metadata, + ) + futures.append((future, i, filename)) + + errors = [] + completed = 0 + for future, i, filename in futures: + try: + result = await future + completed += 1 + # Check for stop request between image completions + if self._check_for_stop(): + executor.shutdown(wait=True, cancel_futures=True) + raise SegmentationInterrupted("User requested stop") + if "error" in result: + errors.append(result) + logger.error(f"Worker error for {result['image_name']}: {result['error']}") + else: + self.segmenter_client.client.publish( + "status/segmenter", + json.dumps( + { + "status": f"Segmenting image {completed}/{images_count}: " + f"{filename}, " + f"{result['object_count']} objects in " + f"{result['duration']:.1f}s" + } + ), + ) + logger.success( + f"Image {result['image_name']}: " + f"{result['object_count']} objects in " + f"{result['duration']:.1f}s" + ) + except SegmentationInterrupted: + raise + except Exception as e: + completed += 1 + errors.append({"image_name": filename, "error": str(e)}) + logger.error(f"Future failed for {filename}: {e}") + + executor.shutdown(wait=True) + + if errors: + logger.warning( + f"{len(errors)} image(s) failed during parallel segmentation: " + f"{[e['image_name'] for e in errors]}" ) + total_duration = (time.monotonic() - first_start) / 60 + logger.success( + f"{images_count} images done in {total_duration:.1f} minutes " + f"({self.__worker_count} workers, parallel mode)" + ) + + asyncio.run(_run_parallel()) + + def _pipe_sequential(self, images_list, images_count, metadata_dir): + """Process images sequentially, preserving flat recalc heuristic and remove_previous_mask.""" + first_start = time.monotonic() + self.__mask_to_remove = None + total_objects = 0 + average_objects = 0 + recalculate_flat = False average_time = 0 - # TODO here would be a good place to parallelize the computation for i, filename in enumerate(images_list): + # Check for stop request between images + if self._check_for_stop(): + raise SegmentationInterrupted("User requested stop") name = os.path.splitext(filename)[0] # Publish the object_id to via MQTT to Node-RED @@ -627,10 +818,8 @@ def _pipe(self, ecotaxa_export): if recalculate_flat: # not i % 10 and i < (images_count - 10) recalculate_flat = False if len(images_list) == 10: - # We are too close to the end of the list, take the previous 10 images instead of the next 10 flat = self._calculate_flat(images_list, 10, self.__working_path) elif i > (len(images_list) - 11): - # We are too close to the end of the list, take the previous 10 images instead of the next 10 flat = self._calculate_flat(images_list[i - 10 : i], 10, self.__working_path) else: flat = self._calculate_flat(images_list[i : i + 10], 10, self.__working_path) # noqa: F841 @@ -652,7 +841,6 @@ def _pipe(self, ecotaxa_export): logger.debug(f"The debug objects path is {self.__working_debug_path}") # Create the debug objects path if needed if self.__save_debug_img: - # create the path! os.makedirs(self.__working_debug_path, exist_ok=True) start = time.monotonic() @@ -662,34 +850,23 @@ def _pipe(self, ecotaxa_export): os.path.join(self.__working_path, images_list[i]), self.__flat ) - # logger.debug(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) - # logger.debug(time.monotonic() - start) - - # start = time.monotonic() - # logger.debug(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) - mask = self._create_mask(img, self.__working_debug_path) - # logger.debug(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) - # logger.debug(time.monotonic() - start) + objects_count, _, objects_list = self._slice_image(img, name, mask, total_objects) - # start = time.monotonic() - # logger.debug(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) + # Stream objects to disk incrementally + planktoscope.segmenter.streamer.write_image_objects(metadata_dir, name, objects_list) - objects_count, _ = self._slice_image(img, name, mask, total_objects) total_objects += objects_count # Simple heuristic to detect a movement of the flow cell and a change in the resulting flat # TODO: this heuristic should be improved or removed if deemed unnecessary if average_objects != 0 and objects_count > average_objects + 20: - # FIXME: this should force a new slice of the current image logger.debug( f"We need to recalculate a flat since we have {objects_count} new objects instead of the average of {average_objects}" ) recalculate_flat = True average_objects = (average_objects * i + objects_count) / (i + 1) - # logger.debug(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) - # logger.debug(time.monotonic() - start) delay = time.monotonic() - start average_time = (average_time * i + delay) / (i + 1) logger.success( @@ -708,26 +885,6 @@ def _pipe(self, ecotaxa_export): f"We also found {total_objects} objects, or an average of {total_objects / (total_duration * 60)}objects per second" ) - if ecotaxa_export: - if "objects" in self.__global_metadata: - if planktoscope.segmenter.ecotaxa.ecotaxa_export( - self.__archive_fn, - self.__global_metadata, - self.__working_obj_path, - keep_files=True, - ): - logger.success("Ecotaxa archive export completed for this folder") - else: - logger.error("The ecotaxa export could not be completed") - else: - logger.info("There are no objects to export") - else: - logger.info("We are not creating the ecotaxa output archive for this folder") - - # cleanup - # we're done free some mem - self.__flat = None - def segment_all(self, paths: list, force=False, ecotaxa_export=True): """Starts the segmentation in all the folders given recursively @@ -754,6 +911,12 @@ def segment_list(self, path_list: list, force=False, ecotaxa_export=True): logger.info(f"The pipeline will be run in {len(path_list)} directories") logger.debug(f"Those are {path_list}") + # Drain any stop/garbage messages buffered between runs, then reset the + # interrupt flag so a stale click from a previous run can't kill this one. + while self.segmenter_client.new_message_received(): + self.segmenter_client.read_message() + self._interrupt_requested = False + self.__process_uuid = str(uuid4()) if self.__process_id == "": @@ -775,6 +938,10 @@ def segment_list(self, path_list: list, force=False, ecotaxa_export=True): # forcing, let's gooooo try: self.segment_path(path, ecotaxa_export) + except SegmentationInterrupted: + logger.info(f"User stopped segmentation at {path}") + exception = SegmentationInterrupted("User stopped") + break except Exception as e: logger.error(f"There was an error while segmenting {path}") exception = e @@ -783,6 +950,9 @@ def segment_list(self, path_list: list, force=False, ecotaxa_export=True): if exception is None: # Publish the status "Done" to via MQTT to Node-RED self.segmenter_client.client.publish("status/segmenter", '{"status":"Done"}') + elif isinstance(exception, SegmentationInterrupted): + logger.info("Publishing Interrupted status after user stop") + self.segmenter_client.client.publish("status/segmenter", '{"status":"Interrupted"}') else: self.segmenter_client.client.publish( "status/segmenter", @@ -889,6 +1059,9 @@ def segment_path(self, path, ecotaxa_export): try: self._pipe(ecotaxa_export) + except SegmentationInterrupted: + logger.info(f"Pipeline interrupted by user for {path}, not marking as done") + raise except Exception as e: logger.exception(f"There was an error in the pipeline {e}") raise e @@ -935,6 +1108,8 @@ def treat_message(self): self.__remove_previous_mask = settings.get("remove_previous_mask", False) + self.__worker_count = settings.get("worker_count", 3) + path = last_message["path"] if "path" in last_message else None # Publish the status "Started" to via MQTT to Node-RED diff --git a/segmenter/planktoscope/segmenter/ecotaxa.py b/segmenter/planktoscope/segmenter/ecotaxa.py index 38d8f0197..c87cb20a8 100644 --- a/segmenter/planktoscope/segmenter/ecotaxa.py +++ b/segmenter/planktoscope/segmenter/ecotaxa.py @@ -19,8 +19,7 @@ from loguru import logger -import numpy -import pandas # FIXME: just use python's csv library, to shave off pandas's 60 MB of unnecessary disk space usage +import csv import zipfile import os import io @@ -202,18 +201,10 @@ """ -def dtype_to_ecotaxa(dtype): - """Determines the EcoTaxa header field type annotation for the dtype""" - # Note: this code was copied from the MIT-licensed MorphoCut library at - # https://github.com/morphocut/morphocut/blob/0.1.2/src/morphocut/contrib/ecotaxa.py . - # The MorphoCut library is copyright 2019 Simon-Martin Schroeder and others. - try: - if numpy.issubdtype(dtype, numpy.number): - return "[f]" - except TypeError: # pragma: no cover - print(type(dtype)) - raise - +def _infer_ecotaxa_type(value) -> str: + """Infer EcoTaxa type annotation [f] or [t] from a Python value.""" + if isinstance(value, (int, float)): + return "[f]" return "[t]" @@ -228,9 +219,6 @@ def ecotaxa_export(archive_filepath, metadata, image_base_path, keep_files=False """ logger.info("Starting the ecotaxa archive export") with zipfile.ZipFile(archive_filepath, "w") as archive: - # empty table, one line per object - tsv_content = [] - if "objects" in metadata: object_list = metadata.pop("objects") else: @@ -238,36 +226,49 @@ def ecotaxa_export(archive_filepath, metadata, image_base_path, keep_files=False return 0 # sometimes the camera resolution is not exported as string - if not isinstance(metadata["acq_camera_resolution"], str): - metadata["acq_camera_resolution"] = ( - f"{metadata['acq_camera_resolution'][0]}x{metadata['acq_camera_resolution'][1]}" - ) + if not isinstance(metadata.get("acq_camera_resolution", ""), str): + res = metadata["acq_camera_resolution"] + metadata["acq_camera_resolution"] = f"{res[0]}x{res[1]}" - # let's go! + # Build rows, determine columns from first object + rows = [] + columns = None for rank, roi in enumerate(object_list, start=1): - tsv_line = {} - tsv_line.update(metadata) - tsv_line.update(("object_" + k, v) for k, v in roi["metadata"].items()) - tsv_line["object_id"] = roi["name"] + row = {} + row.update(metadata) + row.update(("object_" + k, v) for k, v in roi["metadata"].items()) + row["object_id"] = roi["name"] filename = roi["name"] + ".jpg" + row["img_file_name"] = filename + row["img_rank"] = 1 - tsv_line.update({"img_file_name": filename, "img_rank": 1}) - tsv_content.append(tsv_line) + if columns is None: + columns = list(row.keys()) + rows.append(row) image_path = os.path.join(image_base_path, filename) - archive.write(image_path, arcname=filename) if not keep_files: # we remove the image file if we don't want to keep it! os.remove(image_path) - tsv_content = pandas.DataFrame(tsv_content) + if not rows: + logger.error("No objects to export") + return 0 + + # Determine type annotations from first row values + type_row = [_infer_ecotaxa_type(rows[0].get(col)) for col in columns] + + # Write TSV to string buffer + buf = io.StringIO() + writer = csv.writer(buf, delimiter="\t", lineterminator="\n") + writer.writerow(columns) + writer.writerow(type_row) + for row in rows: + writer.writerow([row.get(col, "") for col in columns]) - tsv_type_header = [dtype_to_ecotaxa(dt) for dt in tsv_content.dtypes] - tsv_content.columns = pandas.MultiIndex.from_tuples( - list(zip(tsv_content.columns, tsv_type_header)) - ) + tsv_content = buf.getvalue() # create the filename with project name and acquisition ID project = metadata.get("sample_project", "unknown_project").replace(" ", "_") @@ -275,12 +276,10 @@ def ecotaxa_export(archive_filepath, metadata, image_base_path, keep_files=False tsv_filename = f"Ecotaxa_{project}_{acquisition_id}.tsv" # add the tsv to the archive - archive.writestr( - tsv_filename, - io.BytesIO(tsv_content.to_csv(sep="\t", encoding="utf-8", index=False).encode()).read(), - ) + archive.writestr(tsv_filename, tsv_content.encode("utf-8")) if keep_files: tsv_file = os.path.join(image_base_path, tsv_filename) - tsv_content.to_csv(path_or_buf=tsv_file, sep="\t", encoding="utf-8", index=False) + with open(tsv_file, "w", encoding="utf-8") as f: + f.write(tsv_content) logger.success("Ecotaxa archive is ready!") return 1 diff --git a/segmenter/planktoscope/segmenter/operations.py b/segmenter/planktoscope/segmenter/operations.py index 1d002eac4..7202ccd64 100644 --- a/segmenter/planktoscope/segmenter/operations.py +++ b/segmenter/planktoscope/segmenter/operations.py @@ -19,6 +19,12 @@ import cv2 from loguru import logger +# WARNING: These module-level globals use process-local state. +# remove_previous_mask / __mask_to_remove: NOT safe for parallel use — requires +# sequential image-to-image processing. In parallel mode, the pipeline substitutes +# no_op() for the remove_previous_mask step. +# __last_threshold_value: safe in parallel mode because each worker process gets +# its own copy of this global via fork/spawn. __mask_to_remove = None __last_threshold_value = None diff --git a/segmenter/planktoscope/segmenter/streamer.py b/segmenter/planktoscope/segmenter/streamer.py new file mode 100644 index 000000000..1684f5507 --- /dev/null +++ b/segmenter/planktoscope/segmenter/streamer.py @@ -0,0 +1,74 @@ +"""Incremental metadata streaming to disk and assembly for EcoTaxa export. + +Each image's segmented objects are written to a separate .jsonl file during processing. +After all images are done, the files are assembled in image order for deterministic output. +""" + +import json +import os + +from loguru import logger + +from planktoscope.segmenter.encoder import NpEncoder + + +def write_image_objects(metadata_dir: str, image_name: str, objects: list[dict]) -> str: + """Write per-image object metadata to a .jsonl file. + + Args: + metadata_dir: Directory for intermediate metadata files + image_name: Base name of the source image (no extension) + objects: List of object dicts, each with "name" and "metadata" keys + + Returns: + Path to the written .jsonl file + """ + os.makedirs(metadata_dir, exist_ok=True) + filepath = os.path.join(metadata_dir, f"{image_name}.jsonl") + with open(filepath, "w") as f: + for obj in objects: + f.write(json.dumps(obj, cls=NpEncoder) + "\n") + logger.debug(f"Wrote {len(objects)} objects to {filepath}") + return filepath + + +def read_image_objects(filepath: str) -> list[dict]: + """Read object metadata from a .jsonl file. + + Args: + filepath: Path to a .jsonl file written by write_image_objects + + Returns: + List of object dicts + """ + objects = [] + with open(filepath, "r") as f: + for line in f: + line = line.strip() + if line: + objects.append(json.loads(line)) + return objects + + +def assemble_all_objects(metadata_dir: str, images_list: list[str]) -> list[dict]: + """Read all per-image .jsonl files in image order and return combined object list. + + Args: + metadata_dir: Directory containing .jsonl files + images_list: Ordered list of image filenames (with extension) + + Returns: + Combined list of all object dicts, in image order + """ + all_objects = [] + for filename in images_list: + image_name = os.path.splitext(filename)[0] + filepath = os.path.join(metadata_dir, f"{image_name}.jsonl") + if os.path.exists(filepath): + image_objects = read_image_objects(filepath) + all_objects.extend(image_objects) + logger.debug(f"Assembled {len(image_objects)} objects from {image_name}") + else: + logger.warning(f"No metadata file found for {image_name}") + logger.info(f"Assembled {len(all_objects)} total objects from {len(images_list)} images") + return all_objects diff --git a/segmenter/planktoscope/segmenter/worker.py b/segmenter/planktoscope/segmenter/worker.py new file mode 100644 index 000000000..a345cc164 --- /dev/null +++ b/segmenter/planktoscope/segmenter/worker.py @@ -0,0 +1,386 @@ +"""Top-level worker function for parallel image segmentation. + +All functions in this module are module-level (picklable) so they can be used +with ProcessPoolExecutor. Workers do not hold MQTT connections — the parent +process publishes progress updates based on returned results. +""" + +import multiprocessing.shared_memory +import os +import time + +import cv2 +import numpy as np +import PIL.Image +import skimage.exposure +import skimage.measure +from loguru import logger + +import planktoscope.segmenter.operations +import planktoscope.segmenter.streamer + +# Per-worker cached flat reference (set by initializer) +_flat_shm = None +_flat_array = None + + +def worker_init(shm_name: str, flat_shape: tuple, flat_dtype: str) -> None: + """Initializer for each worker process. Attaches to shared flat field.""" + global _flat_shm, _flat_array + _flat_shm = multiprocessing.shared_memory.SharedMemory(name=shm_name, create=False) + _flat_array = np.ndarray(flat_shape, dtype=flat_dtype, buffer=_flat_shm.buf) + logger.debug(f"Worker {os.getpid()} attached to shared flat field '{shm_name}'") + + +def process_single_image( + image_filepath: str, + image_name: str, + image_index: int, + images_count: int, + working_obj_path: str, + working_debug_path: str, + metadata_dir: str, + save_debug_img: bool, + process_min_ESD: float, + global_metadata: dict, +) -> dict: + """Process a single image: flat correction, masking, slicing, metadata write. + + Args: + image_filepath: Full path to the source image + image_name: Base name of the image (no extension) + image_index: Index of this image in the acquisition + images_count: Total number of images in the acquisition + working_obj_path: Directory to save cropped object images + working_debug_path: Directory for debug output (per-image subdirectory) + metadata_dir: Directory for .jsonl intermediate metadata files + save_debug_img: Whether to save debug images + process_min_ESD: Minimum equivalent spherical diameter threshold (µm) + global_metadata: Acquisition metadata dict (for process_pixel calibration etc.) + + Returns: + dict with keys: image_name, image_index, object_count, duration + On error: dict with keys: image_name, image_index, error + """ + try: + start = time.monotonic() + + # Create debug path if needed + if save_debug_img: + os.makedirs(working_debug_path, exist_ok=True) + + # 1. Open and apply flat + image = cv2.imread(image_filepath) + if image is None: + raise FileNotFoundError(f"Could not read image: {image_filepath}") + image = image / _flat_array + image[0][0] = [0, 0, 0] + image = skimage.exposure.rescale_intensity(image, in_range=(0, 1.04), out_range="uint8") + + if save_debug_img: + _save_image(image, os.path.join(working_debug_path, "cleaned_image.jpg")) + + # 2. Create mask (no remove_previous_mask in parallel mode) + mask = _create_mask(image, working_debug_path, save_debug_img) + + # 3. Slice image and extract objects + objects, object_count, unfiltered_count = _slice_image( + image, + image_name, + mask, + working_obj_path, + working_debug_path, + save_debug_img, + process_min_ESD, + global_metadata, + ) + + # 4. Write objects metadata to disk incrementally + planktoscope.segmenter.streamer.write_image_objects(metadata_dir, image_name, objects) + + duration = time.monotonic() - start + logger.info( + f"Worker {os.getpid()}: {image_name} done — {object_count} objects in {duration:.1f}s" + ) + return { + "image_name": image_name, + "image_index": image_index, + "object_count": object_count, + "unfiltered_count": unfiltered_count, + "duration": duration, + } + + except Exception as e: + logger.error(f"Worker {os.getpid()}: failed on {image_name}: {e}") + return { + "image_name": image_name, + "image_index": image_index, + "error": str(e), + } + + +def _save_image(image, path: str) -> None: + """Save a BGR image as JPEG.""" + PIL.Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)).save(path) + + +def _save_mask(mask, path: str) -> None: + """Save a binary mask as JPEG.""" + PIL.Image.fromarray(mask).save(path) + + +def _create_mask(img, debug_path: str, save_debug: bool): + """Create segmentation mask without global state (no remove_previous_mask). + + In parallel mode, remove_previous_mask is always disabled (replaced by no_op) + because it requires sequential image-to-image state. + """ + logger.info("Starting the mask creation") + + pipeline = [ + ("simple_threshold", planktoscope.segmenter.operations.simple_threshold), + ("no_op", planktoscope.segmenter.operations.no_op), + ("erode", planktoscope.segmenter.operations.erode), + ("dilate", planktoscope.segmenter.operations.dilate), + ("close", planktoscope.segmenter.operations.close), + ("erode2", planktoscope.segmenter.operations.erode2), + ] + + mask = img + for i, (name, func) in enumerate(pipeline): + mask = func(mask) + if save_debug and debug_path: + PIL.Image.fromarray(mask).save(os.path.join(debug_path, f"mask_{i}_{name}.jpg")) + + logger.success("Mask created") + return mask + + +def _get_color_info(bgr_img, mask) -> dict: + """Compute HSV color statistics for an object region. + + Args: + bgr_img: BGR image of the object region + mask: Boolean mask of the object within the region + + Returns: + dict with MeanHue, StdHue, MeanSaturation, StdSaturation, MeanValue, StdValue + """ + hsv_img = cv2.cvtColor(bgr_img, cv2.COLOR_BGR2HSV) + (h_channel, s_channel, v_channel) = cv2.split(hsv_img) + return { + "MeanHue": np.mean(h_channel, where=mask), + "MeanSaturation": np.mean(s_channel, where=mask), + "MeanValue": np.mean(v_channel, where=mask), + "StdHue": np.std(h_channel, where=mask), + "StdSaturation": np.std(s_channel, where=mask), + "StdValue": np.std(v_channel, where=mask), + } + + +def _extract_metadata_from_regionprop(prop, pixel_size_um=None) -> dict: + """Extract morphological metadata from a scikit-image regionprop. + + Args: + prop: scikit-image regionprop object + pixel_size_um (float or None): pixel size in µm/pixel (process_pixel). + If provided, linear measurements are in µm and area measurements in µm². + If None, all measurements remain in pixel units. + """ + # Scale factors: linear (µm/px) and area (µm²/px²) + px = pixel_size_um if pixel_size_um and pixel_size_um > 0 else 1.0 + px2 = px * px + + return { + "label": prop.label, + # width of the smallest rectangle enclosing the object (µm if calibrated) + "width": (prop.bbox[3] - prop.bbox[1]) * px, + # height of the smallest rectangle enclosing the object (µm if calibrated) + "height": (prop.bbox[2] - prop.bbox[0]) * px, + # X coordinates of the top left point (pixels) + "bx": prop.bbox[1], + # Y coordinates of the top left point (pixels) + "by": prop.bbox[0], + # circularity — dimensionless ratio + "circ.": (4 * np.pi * prop.filled_area) / prop.perimeter**2, + # Surface area excluding holes (µm² if calibrated) + "area_exc": prop.area * px2, + # Surface area (µm² if calibrated) + "area": prop.filled_area * px2, + # Percentage of holes — dimensionless + "%area": 1 - (prop.area / prop.filled_area), + # Primary axis (µm if calibrated) + "major": prop.major_axis_length * px, + # Secondary axis (µm if calibrated) + "minor": prop.minor_axis_length * px, + # Y center of gravity (pixels) + "y": prop.centroid[0], + # X center of gravity (pixels) + "x": prop.centroid[1], + # Convex area (µm² if calibrated) + "convex_area": prop.convex_area * px2, + # Perimeter (µm if calibrated) + "perim.": prop.perimeter * px, + # major/minor — dimensionless + "elongation": np.divide(prop.major_axis_length, prop.minor_axis_length), + # perim/area_exc — units: 1/µm if calibrated + "perimareaexc": prop.perimeter / prop.area * (1.0 / px), + # perim/major — dimensionless + "perimmajor": prop.perimeter / prop.major_axis_length, + # (4∗π∗Area_exc)/perim² — dimensionless + "circex": np.divide(4 * np.pi * prop.area, prop.perimeter**2), + # Angle in degrees + "angle": prop.orientation / np.pi * 180 + 90, + # Bounding box area (µm² if calibrated) + "bounding_box_area": prop.bbox_area * px2, + "eccentricity": prop.eccentricity, + # Equivalent diameter (µm if calibrated) + "equivalent_diameter": prop.equivalent_diameter * px, + "euler_number": prop.euler_number, + # extent — dimensionless + "extent": prop.extent, + "local_centroid_col": prop.local_centroid[1], + "local_centroid_row": prop.local_centroid[0], + # solidity — dimensionless + "solidity": prop.solidity, + } + + +def _augment_slice(dim_slice, max_dims, size=10): + """Expand a region slice by `size` pixels in each direction.""" + dim_slice = list(dim_slice) + for i in range(2): + if dim_slice[i].start < size: + dim_slice[i] = slice(0, dim_slice[i].stop) + else: + dim_slice[i] = slice(dim_slice[i].start - size, dim_slice[i].stop) + + for i in range(2): + if dim_slice[i].stop + size == max_dims[i]: + dim_slice[i] = slice(dim_slice[i].start, max_dims[i]) + else: + dim_slice[i] = slice(dim_slice[i].start, dim_slice[i].stop + size) + + return tuple(dim_slice) + + +def _slice_image( + img, + name: str, + mask, + obj_path: str, + debug_path: str, + save_debug: bool, + min_ESD: float, + global_metadata: dict, +) -> tuple[list[dict], int, int]: + """Slice image into objects and extract metadata. No MQTT, no shared state. + + Args: + img: BGR image array + name: Base name of the source image + mask: Binary segmentation mask + obj_path: Directory to save cropped object images + debug_path: Directory for debug output + save_debug: Whether to save debug images + min_ESD: Minimum equivalent spherical diameter threshold (µm) + global_metadata: Acquisition metadata dict (for process_pixel) + + Returns: + tuple: (objects_list, filtered_count, unfiltered_count) + """ + labels, nlabels = skimage.measure.label(mask, return_num=True) + regionprops = skimage.measure.regionprops(labels) + + # Convert min ESD threshold from µm to pixels for filtering + pixel_size = global_metadata.get("process_pixel", None) + try: + pixel_size = float(pixel_size) if pixel_size is not None else None + except (ValueError, TypeError): + pixel_size = None + if pixel_size and pixel_size > 0: + min_esd_pixels = min_ESD / pixel_size + else: + min_esd_pixels = min_ESD + logger.warning( + f"No valid process_pixel calibration — using min ESD of {min_esd_pixels} as pixels" + ) + logger.debug( + f"Min ESD filter: {min_ESD} µm = {min_esd_pixels:.1f} px (process_pixel={pixel_size})" + ) + + filtered = [r for r in regionprops if r.equivalent_diameter_area >= min_esd_pixels] + object_number = len(filtered) + logger.debug(f"Found {nlabels} labels, or {object_number} after size filtering") + + # Determine pixel_size_um for calibrated measurements + pixel_size_um = None + if pixel_size and pixel_size > 0: + pixel_size_um = pixel_size + + objects = [] + for i, region in enumerate(filtered): + region.label = i + + # Extract metadata + obj_image = img[region.slice] + colors = _get_color_info(obj_image, region.filled_image) + metadata = _extract_metadata_from_regionprop(region, pixel_size_um=pixel_size_um) + + # Blur metric + metadata["blur_laplacian"] = planktoscope.segmenter.operations.calculate_blur(obj_image) + + # Threshold value (each worker has its own process-local global) + threshold_value = planktoscope.segmenter.operations.get_last_threshold_value() + if threshold_value is not None: + metadata["threshold"] = threshold_value + + # Save cropped object image with augmented slice + obj_image_aug = img[_augment_slice(region.slice, labels.shape, 10)] + object_id = f"{name}_{i}" + _save_image(obj_image_aug, os.path.join(obj_path, f"{object_id}.jpg")) + + if save_debug and debug_path: + _save_mask( + region.filled_image, + os.path.join(debug_path, f"obj_{i}_mask.jpg"), + ) + + objects.append({"name": object_id, "metadata": {**metadata, **colors}}) + + # Debug tagged image + if save_debug and debug_path: + if object_number: + tagged_image = img.copy() + for region in filtered: + tagged_image = cv2.drawMarker( + tagged_image, + (int(region.centroid[1]), int(region.centroid[0])), + (0, 0, 255), + cv2.MARKER_CROSS, + ) + tagged_image = cv2.rectangle( + tagged_image, + pt1=region.bbox[-3:-5:-1], + pt2=region.bbox[-1:-3:-1], + color=(150, 0, 200), + thickness=1, + ) + contours, _ = cv2.findContours( + np.uint8(region.image), + mode=cv2.RETR_TREE, + method=cv2.CHAIN_APPROX_NONE, + ) + tagged_image = cv2.drawContours( + tagged_image, + contours, + -1, + (238, 130, 238), + thickness=1, + offset=(region.bbox[1], region.bbox[0]), + ) + _save_image(tagged_image, os.path.join(debug_path, "tagged.jpg")) + else: + _save_image(img, os.path.join(debug_path, "tagged.jpg")) + + return objects, object_number, len(regionprops) diff --git a/segmenter/pyproject.toml b/segmenter/pyproject.toml index 212b054b2..aab7d1216 100644 --- a/segmenter/pyproject.toml +++ b/segmenter/pyproject.toml @@ -14,7 +14,7 @@ classifiers = [ dependencies = [ "paho-mqtt>=2.1.0,<3", "numpy>=2.3.3,<3", - "pandas>=2.3.3,<3", + "loguru>=0.7.3,<0.8", "opencv-python-headless>=4.6.0.66,<5", "scikit-image>=0.25.2,<0.26", diff --git a/segmenter/uv.lock b/segmenter/uv.lock index ae7d50fdb..2bf5efc5c 100644 --- a/segmenter/uv.lock +++ b/segmenter/uv.lock @@ -194,46 +194,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c4/cb/00451c3cf31790287768bb12c6bec834f5d292eaf3022afc88e14b8afc94/paho_mqtt-2.1.0-py3-none-any.whl", hash = "sha256:6db9ba9b34ed5bc6b6e3812718c7e06e2fd7444540df2455d2c51bd58808feee", size = 67219, upload-time = "2024-04-29T19:52:48.345Z" }, ] -[[package]] -name = "pandas" -version = "2.3.3" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "numpy" }, - { name = "python-dateutil" }, - { name = "pytz" }, - { name = "tzdata" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/33/01/d40b85317f86cf08d853a4f495195c73815fdf205eef3993821720274518/pandas-2.3.3.tar.gz", hash = "sha256:e05e1af93b977f7eafa636d043f9f94c7ee3ac81af99c13508215942e64c993b", size = 4495223, upload-time = "2025-09-29T23:34:51.853Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/cd/4b/18b035ee18f97c1040d94debd8f2e737000ad70ccc8f5513f4eefad75f4b/pandas-2.3.3-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:56851a737e3470de7fa88e6131f41281ed440d29a9268dcbf0002da5ac366713", size = 11544671, upload-time = "2025-09-29T23:21:05.024Z" }, - { url = "https://files.pythonhosted.org/packages/31/94/72fac03573102779920099bcac1c3b05975c2cb5f01eac609faf34bed1ca/pandas-2.3.3-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:bdcd9d1167f4885211e401b3036c0c8d9e274eee67ea8d0758a256d60704cfe8", size = 10680807, upload-time = "2025-09-29T23:21:15.979Z" }, - { url = "https://files.pythonhosted.org/packages/16/87/9472cf4a487d848476865321de18cc8c920b8cab98453ab79dbbc98db63a/pandas-2.3.3-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:e32e7cc9af0f1cc15548288a51a3b681cc2a219faa838e995f7dc53dbab1062d", size = 11709872, upload-time = "2025-09-29T23:21:27.165Z" }, - { url = "https://files.pythonhosted.org/packages/15/07/284f757f63f8a8d69ed4472bfd85122bd086e637bf4ed09de572d575a693/pandas-2.3.3-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:318d77e0e42a628c04dc56bcef4b40de67918f7041c2b061af1da41dcff670ac", size = 12306371, upload-time = "2025-09-29T23:21:40.532Z" }, - { url = "https://files.pythonhosted.org/packages/33/81/a3afc88fca4aa925804a27d2676d22dcd2031c2ebe08aabd0ae55b9ff282/pandas-2.3.3-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:4e0a175408804d566144e170d0476b15d78458795bb18f1304fb94160cabf40c", size = 12765333, upload-time = "2025-09-29T23:21:55.77Z" }, - { url = "https://files.pythonhosted.org/packages/8d/0f/b4d4ae743a83742f1153464cf1a8ecfafc3ac59722a0b5c8602310cb7158/pandas-2.3.3-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:93c2d9ab0fc11822b5eece72ec9587e172f63cff87c00b062f6e37448ced4493", size = 13418120, upload-time = "2025-09-29T23:22:10.109Z" }, - { url = "https://files.pythonhosted.org/packages/4f/c7/e54682c96a895d0c808453269e0b5928a07a127a15704fedb643e9b0a4c8/pandas-2.3.3-cp313-cp313-win_amd64.whl", hash = "sha256:f8bfc0e12dc78f777f323f55c58649591b2cd0c43534e8355c51d3fede5f4dee", size = 10993991, upload-time = "2025-09-29T23:25:04.889Z" }, - { url = "https://files.pythonhosted.org/packages/f9/ca/3f8d4f49740799189e1395812f3bf23b5e8fc7c190827d55a610da72ce55/pandas-2.3.3-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:75ea25f9529fdec2d2e93a42c523962261e567d250b0013b16210e1d40d7c2e5", size = 12048227, upload-time = "2025-09-29T23:22:24.343Z" }, - { url = "https://files.pythonhosted.org/packages/0e/5a/f43efec3e8c0cc92c4663ccad372dbdff72b60bdb56b2749f04aa1d07d7e/pandas-2.3.3-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:74ecdf1d301e812db96a465a525952f4dde225fdb6d8e5a521d47e1f42041e21", size = 11411056, upload-time = "2025-09-29T23:22:37.762Z" }, - { url = "https://files.pythonhosted.org/packages/46/b1/85331edfc591208c9d1a63a06baa67b21d332e63b7a591a5ba42a10bb507/pandas-2.3.3-cp313-cp313t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6435cb949cb34ec11cc9860246ccb2fdc9ecd742c12d3304989017d53f039a78", size = 11645189, upload-time = "2025-09-29T23:22:51.688Z" }, - { url = "https://files.pythonhosted.org/packages/44/23/78d645adc35d94d1ac4f2a3c4112ab6f5b8999f4898b8cdf01252f8df4a9/pandas-2.3.3-cp313-cp313t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:900f47d8f20860de523a1ac881c4c36d65efcb2eb850e6948140fa781736e110", size = 12121912, upload-time = "2025-09-29T23:23:05.042Z" }, - { url = "https://files.pythonhosted.org/packages/53/da/d10013df5e6aaef6b425aa0c32e1fc1f3e431e4bcabd420517dceadce354/pandas-2.3.3-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:a45c765238e2ed7d7c608fc5bc4a6f88b642f2f01e70c0c23d2224dd21829d86", size = 12712160, upload-time = "2025-09-29T23:23:28.57Z" }, - { url = "https://files.pythonhosted.org/packages/bd/17/e756653095a083d8a37cbd816cb87148debcfcd920129b25f99dd8d04271/pandas-2.3.3-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:c4fc4c21971a1a9f4bdb4c73978c7f7256caa3e62b323f70d6cb80db583350bc", size = 13199233, upload-time = "2025-09-29T23:24:24.876Z" }, - { url = "https://files.pythonhosted.org/packages/04/fd/74903979833db8390b73b3a8a7d30d146d710bd32703724dd9083950386f/pandas-2.3.3-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:ee15f284898e7b246df8087fc82b87b01686f98ee67d85a17b7ab44143a3a9a0", size = 11540635, upload-time = "2025-09-29T23:25:52.486Z" }, - { url = "https://files.pythonhosted.org/packages/21/00/266d6b357ad5e6d3ad55093a7e8efc7dd245f5a842b584db9f30b0f0a287/pandas-2.3.3-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:1611aedd912e1ff81ff41c745822980c49ce4a7907537be8692c8dbc31924593", size = 10759079, upload-time = "2025-09-29T23:26:33.204Z" }, - { url = "https://files.pythonhosted.org/packages/ca/05/d01ef80a7a3a12b2f8bbf16daba1e17c98a2f039cbc8e2f77a2c5a63d382/pandas-2.3.3-cp314-cp314-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6d2cefc361461662ac48810cb14365a365ce864afe85ef1f447ff5a1e99ea81c", size = 11814049, upload-time = "2025-09-29T23:27:15.384Z" }, - { url = "https://files.pythonhosted.org/packages/15/b2/0e62f78c0c5ba7e3d2c5945a82456f4fac76c480940f805e0b97fcbc2f65/pandas-2.3.3-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:ee67acbbf05014ea6c763beb097e03cd629961c8a632075eeb34247120abcb4b", size = 12332638, upload-time = "2025-09-29T23:27:51.625Z" }, - { url = "https://files.pythonhosted.org/packages/c5/33/dd70400631b62b9b29c3c93d2feee1d0964dc2bae2e5ad7a6c73a7f25325/pandas-2.3.3-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:c46467899aaa4da076d5abc11084634e2d197e9460643dd455ac3db5856b24d6", size = 12886834, upload-time = "2025-09-29T23:28:21.289Z" }, - { url = "https://files.pythonhosted.org/packages/d3/18/b5d48f55821228d0d2692b34fd5034bb185e854bdb592e9c640f6290e012/pandas-2.3.3-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:6253c72c6a1d990a410bc7de641d34053364ef8bcd3126f7e7450125887dffe3", size = 13409925, upload-time = "2025-09-29T23:28:58.261Z" }, - { url = "https://files.pythonhosted.org/packages/a6/3d/124ac75fcd0ecc09b8fdccb0246ef65e35b012030defb0e0eba2cbbbe948/pandas-2.3.3-cp314-cp314-win_amd64.whl", hash = "sha256:1b07204a219b3b7350abaae088f451860223a52cfb8a6c53358e7948735158e5", size = 11109071, upload-time = "2025-09-29T23:32:27.484Z" }, - { url = "https://files.pythonhosted.org/packages/89/9c/0e21c895c38a157e0faa1fb64587a9226d6dd46452cac4532d80c3c4a244/pandas-2.3.3-cp314-cp314t-macosx_10_13_x86_64.whl", hash = "sha256:2462b1a365b6109d275250baaae7b760fd25c726aaca0054649286bcfbb3e8ec", size = 12048504, upload-time = "2025-09-29T23:29:31.47Z" }, - { url = "https://files.pythonhosted.org/packages/d7/82/b69a1c95df796858777b68fbe6a81d37443a33319761d7c652ce77797475/pandas-2.3.3-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:0242fe9a49aa8b4d78a4fa03acb397a58833ef6199e9aa40a95f027bb3a1b6e7", size = 11410702, upload-time = "2025-09-29T23:29:54.591Z" }, - { url = "https://files.pythonhosted.org/packages/f9/88/702bde3ba0a94b8c73a0181e05144b10f13f29ebfc2150c3a79062a8195d/pandas-2.3.3-cp314-cp314t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:a21d830e78df0a515db2b3d2f5570610f5e6bd2e27749770e8bb7b524b89b450", size = 11634535, upload-time = "2025-09-29T23:30:21.003Z" }, - { url = "https://files.pythonhosted.org/packages/a4/1e/1bac1a839d12e6a82ec6cb40cda2edde64a2013a66963293696bbf31fbbb/pandas-2.3.3-cp314-cp314t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2e3ebdb170b5ef78f19bfb71b0dc5dc58775032361fa188e814959b74d726dd5", size = 12121582, upload-time = "2025-09-29T23:30:43.391Z" }, - { url = "https://files.pythonhosted.org/packages/44/91/483de934193e12a3b1d6ae7c8645d083ff88dec75f46e827562f1e4b4da6/pandas-2.3.3-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:d051c0e065b94b7a3cea50eb1ec32e912cd96dba41647eb24104b6c6c14c5788", size = 12699963, upload-time = "2025-09-29T23:31:10.009Z" }, - { url = "https://files.pythonhosted.org/packages/70/44/5191d2e4026f86a2a109053e194d3ba7a31a2d10a9c2348368c63ed4e85a/pandas-2.3.3-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:3869faf4bd07b3b66a9f462417d0ca3a9df29a9f6abd5d0d0dbab15dac7abe87", size = 13202175, upload-time = "2025-09-29T23:31:59.173Z" }, -] - [[package]] name = "pastel" version = "0.2.1" @@ -319,7 +279,6 @@ dependencies = [ { name = "numpy" }, { name = "opencv-python-headless" }, { name = "paho-mqtt" }, - { name = "pandas" }, { name = "scikit-image" }, ] @@ -338,7 +297,6 @@ requires-dist = [ { name = "numpy", specifier = ">=2.3.3,<3" }, { name = "opencv-python-headless", specifier = ">=4.6.0.66,<5" }, { name = "paho-mqtt", specifier = ">=2.1.0,<3" }, - { name = "pandas", specifier = ">=2.3.3,<3" }, { name = "scikit-image", specifier = ">=0.25.2,<0.26" }, ] @@ -398,27 +356,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a8/a4/20da314d277121d6534b3a980b29035dcd51e6744bd79075a6ce8fa4eb8d/pytest-8.4.2-py3-none-any.whl", hash = "sha256:872f880de3fc3a5bdc88a11b39c9710c3497a547cfa9320bc3c5e62fbf272e79", size = 365750, upload-time = "2025-09-04T14:34:20.226Z" }, ] -[[package]] -name = "python-dateutil" -version = "2.9.0.post0" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "six" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/66/c0/0c8b6ad9f17a802ee498c46e004a0eb49bc148f2fd230864601a86dcf6db/python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3", size = 342432, upload-time = "2024-03-01T18:36:20.211Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427", size = 229892, upload-time = "2024-03-01T18:36:18.57Z" }, -] - -[[package]] -name = "pytz" -version = "2025.2" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/f8/bf/abbd3cdfb8fbc7fb3d4d38d320f2441b1e7cbe29be4f23797b4a2b5d8aac/pytz-2025.2.tar.gz", hash = "sha256:360b9e3dbb49a209c21ad61809c7fb453643e048b38924c765813546746e81c3", size = 320884, upload-time = "2025-03-25T02:25:00.538Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/81/c4/34e93fe5f5429d7570ec1fa436f1986fb1f00c3e0f43a589fe2bbcd22c3f/pytz-2025.2-py2.py3-none-any.whl", hash = "sha256:5ddf76296dd8c44c26eb8f4b6f35488f3ccbf6fbbd7adee0b7262d43f0ec2f00", size = 509225, upload-time = "2025-03-25T02:24:58.468Z" }, -] - [[package]] name = "pyyaml" version = "6.0.3" @@ -556,15 +493,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/97/30/2f9a5243008f76dfc5dee9a53dfb939d9b31e16ce4bd4f2e628bfc5d89d2/scipy-1.16.2-cp314-cp314t-win_arm64.whl", hash = "sha256:d2a4472c231328d4de38d5f1f68fdd6d28a615138f842580a8a321b5845cf779", size = 26448374, upload-time = "2025-09-11T17:45:03.45Z" }, ] -[[package]] -name = "six" -version = "1.17.0" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/94/e7/b2c673351809dca68a0e064b6af791aa332cf192da575fd474ed7d6f16a2/six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81", size = 34031, upload-time = "2024-12-04T17:35:28.174Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050, upload-time = "2024-12-04T17:35:26.475Z" }, -] - [[package]] name = "tifffile" version = "2025.10.16" @@ -595,15 +523,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" }, ] -[[package]] -name = "tzdata" -version = "2025.2" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/95/32/1a225d6164441be760d75c2c42e2780dc0873fe382da3e98a2e1e48361e5/tzdata-2025.2.tar.gz", hash = "sha256:b60a638fcc0daffadf82fe0f57e53d06bdec2f36c4df66280ae79bce6bd6f2b9", size = 196380, upload-time = "2025-03-23T13:54:43.652Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/5c/23/c7abc0ca0a1526a0774eca151daeb8de62ec457e77262b66b359c3c7679e/tzdata-2025.2-py2.py3-none-any.whl", hash = "sha256:1a403fada01ff9221ca8044d701868fa132215d84beb92242d9acd2147f667a8", size = 347839, upload-time = "2025-03-23T13:54:41.845Z" }, -] - [[package]] name = "win32-setctime" version = "1.2.0"