From 0569cf8d7bcf666580d30027f147d6b8f8631f81 Mon Sep 17 00:00:00 2001 From: Mikhail Petrov Date: Sat, 27 Jun 2026 14:08:18 +0300 Subject: [PATCH] feat(map-efficient): parallel-wave worktree merge coordinator (part of #284 Phase 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the existing wave/DAG scheduler to per-subtask worktree isolation so a parallel wave's independent subtasks each run in their own worktree and are accepted atomically via a new runner-owned coordinator `merge_wave_worktrees`. Every worktree of a wave is cut off the same base, so they can't be merged one at a time — the first `merge_subtask_worktree` advances HEAD and the next trips `BASE_DIVERGED`. The coordinator relaxes only that guard to a wave-scoped form (refuses EXTERNAL HEAD movement, allows sibling divergence), derives `wave_base_sha` from the sidecar, preflights every worktree (commit + guards + pre-merge verify) before touching the working branch, squash-merges each accepted worktree by frozen SHA in sorted id order (one runner commit per subtask), then runs one post-wave full gate on the merged tree INSIDE the atomic transaction. All-or-nothing (council-reviewed, conv c29d6fa9): any conflict, commit failure, or post-wave-gate failure rolls the whole branch back to base via reset --hard + clean -fd (squash leaves no MERGE_HEAD, so merge --abort is never used; MAP runtime state excluded from clean) and leaves every worktree intact for retry. Safety: advisory flock serializes coordinators (MERGE_IN_PROGRESS); attached/clean-target preconditions; conflict attribution back to the subtasks that touched each file; actual-overlap telemetry (declared-disjoint is only a scheduler hint, git's textual conflict stays the hard guard). The shared `_wt_freeze_and_verify` primitive is extracted once and reused by both the single-subtask and wave merge paths. - runner: merge_wave_worktrees + helpers + CLI; refactor merge_subtask_worktree onto the shared primitive (single-subtask behavior byte-identical, 30 tests) - skill wiring: map-efficient SKILL.md + efficient-reference.md (claude + codex) - docs: ARCHITECTURE Phase 2 section, USAGE parallel-waves, CHANGELOG - tests: 12 wave-coordinator cases (happy path, conflict rollback, external-head, post-wave fail/pass, preflight verify fail, no-change, overlap, CLI x2) - drive-by: widen build_minimality_rollout_report -> dict[str, Any], clearing 19 pre-existing pyright errors in tests/test_minimality_report.py (dict[str,object] __getitem__ barrier) make check green: ruff/mypy/pyright 0 (src+tests), 2922 tests, render check. --- .../map-efficient/efficient-reference.md | 14 + .claude/skills/map-efficient/SKILL.md | 2 +- .../map-efficient/efficient-reference.md | 39 +- .map/scripts/map_step_runner.py | 605 ++++++++++++++++-- CHANGELOG.md | 1 + docs/ARCHITECTURE.md | 43 +- docs/USAGE.md | 23 +- src/mapify_cli/minimality_report.py | 2 +- .../map-efficient/efficient-reference.md | 14 + .../templates/map/scripts/map_step_runner.py | 605 ++++++++++++++++-- .../templates/skills/map-efficient/SKILL.md | 2 +- .../map-efficient/efficient-reference.md | 39 +- .../efficient-reference.md.jinja | 14 + .../map/scripts/map_step_runner.py.jinja | 605 ++++++++++++++++-- .../skills/map-efficient/SKILL.md.jinja | 2 +- .../efficient-reference.md.jinja | 39 +- tests/test_worktree_isolation.py | 222 +++++++ 17 files changed, 2113 insertions(+), 158 deletions(-) diff --git a/.agents/skills/map-efficient/efficient-reference.md b/.agents/skills/map-efficient/efficient-reference.md index 0c9198ab..f8252ce1 100644 --- a/.agents/skills/map-efficient/efficient-reference.md +++ b/.agents/skills/map-efficient/efficient-reference.md @@ -94,6 +94,20 @@ python3 .map/scripts/map_orchestrator.py advance_wave Do not mix wave APIs with the sequential `get_next_step` cursor for the same wave unless the orchestrator response explicitly tells you to fall back. +When `worktree.isolation` is enabled and a wave runs in parallel (≥2 disjoint +subtasks), give each subtask its own worktree and accept the whole wave +atomically after all pass Monitor — never merge them one at a time (the first +merge advances HEAD and the next trips `BASE_DIVERGED`): + +```bash +python3 .map/scripts/map_step_runner.py merge_wave_worktrees "$ST_A" "$ST_B" +``` + +It runs the post-wave gate inside the transaction and rolls the whole wave back +to base on any conflict or gate failure (worktrees kept for retry). On a single +subtask's Monitor failure, `discard_subtask_worktree` that subtask and retry it +before calling `merge_wave_worktrees`. + ## TDD Mode `--tdd` inserts `TEST_WRITER` and `TEST_FAIL_GATE` before `ACTOR`. diff --git a/.claude/skills/map-efficient/SKILL.md b/.claude/skills/map-efficient/SKILL.md index fcdf525f..5069fa4d 100644 --- a/.claude/skills/map-efficient/SKILL.md +++ b/.claude/skills/map-efficient/SKILL.md @@ -419,7 +419,7 @@ Every Monitor failure must create a durable `code-review-N.md` with exact issue, ### Per-Wave Gates (after all subtasks in wave pass Monitor) -Run build first, then tests, then linter. If build fails, skip tests/lint and reopen the owning subtask. Run the FULL test suite (not a `-k` subset) whenever any subtask in the wave tripped the cross-subtask regression gate (`recommended_gate == "full_suite"`) — a parallel wave that edits a shared file is the highest-risk case for a regression no single subtask's scoped run can see. +Run build first, then tests, then linter. If build fails, skip tests/lint and reopen the owning subtask. Run the FULL test suite (not a `-k` subset) whenever any subtask in the wave tripped the cross-subtask regression gate (`recommended_gate == "full_suite"`) — a parallel wave that edits a shared file is the highest-risk case for a regression no single subtask's scoped run can see. **Worktree isolation, parallel wave:** when `worktree.isolation` is on and the wave has ≥2 isolated subtasks, accept the whole wave atomically with `merge_wave_worktrees` (never one at a time — the first merge trips `BASE_DIVERGED`); it runs this post-wave gate inside the transaction and rolls the wave back on failure — see [efficient-reference.md](efficient-reference.md#worktree-isolation). ## Step 2a: Validate Step Completion diff --git a/.claude/skills/map-efficient/efficient-reference.md b/.claude/skills/map-efficient/efficient-reference.md index 0c7965c2..80284856 100644 --- a/.claude/skills/map-efficient/efficient-reference.md +++ b/.claude/skills/map-efficient/efficient-reference.md @@ -105,7 +105,7 @@ including clean passes — must carry concrete evidence references. ## Wave Execution -Sequential is default. Parallel execution is allowed only when a wave has satisfied dependencies, low risk, and disjoint new-file writes, or when the user explicitly requests it. Use `get_wave_step`, `validate_wave_step`, and `advance_wave`; do not mix wave APIs with the single-current-subtask API. +Sequential is default. Parallel execution is allowed only when a wave has satisfied dependencies, low risk, and disjoint new-file writes, or when the user explicitly requests it. Use `get_wave_step`, `validate_wave_step`, and `advance_wave`; do not mix wave APIs with the single-current-subtask API. When `worktree.isolation` is on and a wave runs in parallel, each subtask gets its own worktree and the wave is accepted atomically via `merge_wave_worktrees` — see [Parallel waves](#worktree-isolation) under Worktree isolation. ## Predictor Recovery @@ -551,6 +551,43 @@ python3 .map/scripts/map_step_runner.py discard_subtask_worktree "$SUBTASK_ID" - The retry creates a fresh worktree off the current HEAD. Inspect state any time with `worktree_isolation_status`. +### Parallel waves (≥2 worktree-isolated subtasks) — #284 Phase 2 + +When `get_wave_step` returns `mode:"parallel"` (a wave with ≥2 disjoint-file +subtasks) AND isolation is enabled, give EACH subtask its own worktree and +dispatch the Actors concurrently (separate Task agents, each pinned to its own +`$WT_PATH`). Do NOT merge them one at a time: every worktree was cut off the same +HEAD, so the first `merge_subtask_worktree` advances the working branch and the +next trips `BASE_DIVERGED`. Accept the whole wave atomically instead — only after +EVERY subtask in the wave has passed Monitor (+ Evaluator): + +```bash +python3 .map/scripts/map_step_runner.py merge_wave_worktrees "$ST_A" "$ST_B" "$ST_C" +``` + +The coordinator (council-reviewed, conv `c29d6fa9`): derives the wave base from +the sidecar; refuses EXTERNAL HEAD movement but allows the sibling divergence each +in-wave squash-merge creates; runs each worktree's pre-merge `verification_checks`, +then squash-merges every accepted worktree by frozen SHA in sorted id order (one +runner commit per subtask), then runs the post-wave full gate **inside the same +transaction**. It is **all-or-nothing**: any textual conflict, commit failure, or +post-wave-gate failure rolls the WHOLE wave back to the base (`reset --hard` + +`clean -fd`, never `git merge --abort` — squash leaves no `MERGE_HEAD`) and leaves +every worktree intact for retry. Pass each subtask's `merged_sha` from the result +to `record_subtask_result --commit-sha`. This **replaces** the separate Per-Wave +Gate when isolation is on — the post-wave gate runs inside `merge_wave_worktrees`. + +Failure `kind`s (working branch untouched / rolled back to base, worktrees kept): +`WAVE_MERGE_CONFLICT` (with `attribution` naming the subtasks that touched each +conflicted file — fix `affected_files` or re-decompose), `WAVE_VERIFY_FAILED` +(post-wave gate red), `EXTERNAL_HEAD_MOVED` (a commit landed outside the wave — +recreate the worktrees off the new HEAD), `WAVE_BASE_MISMATCH`, `DIRTY_TARGET`, +`MERGE_IN_PROGRESS`, plus the per-worktree preflight `kind`s (`VERIFY_FAILED`, +`BULK_DELETION`, … with `phase:"preflight"`). On any Monitor `valid=false` for a +single wave subtask, `discard_subtask_worktree` THAT subtask and retry it; call +`merge_wave_worktrees` only once the whole wave is green. The `overlaps` field is +advisory telemetry (actual changed-file intersections), not a gate. + ## Troubleshooting - Blueprint validation fails: fix the decomposer output before Actor starts. diff --git a/.map/scripts/map_step_runner.py b/.map/scripts/map_step_runner.py index 8a37b1de..c1ca6541 100755 --- a/.map/scripts/map_step_runner.py +++ b/.map/scripts/map_step_runner.py @@ -15534,62 +15534,27 @@ def create_subtask_worktree( } -def merge_subtask_worktree( +def _wt_freeze_and_verify( subtask_id: str, - attempt: int = 0, - branch: Optional[str] = None, + record: dict, + project_dir: Path, + branch_name: str, verify_cmds: Optional[list[str]] = None, skip_verify: bool = False, ) -> dict[str, object]: - """Accept a subtask: commit worktree work, run pre-merge verification IN the - worktree, then squash-merge ONE commit into the working branch (#284). - - Council-mandated guards run BEFORE the merge touches the working branch: - base-divergence, runtime-state-in-diff, bulk-deletion, submodule-pointer, - detached-HEAD, and the pre-merge `verification_checks` gate. Any failure - leaves the working branch untouched and returns a structured ``kind``. + """Commit a worktree's work + run per-worktree guards + pre-merge verify. + + Operates ONLY inside the worktree — never touches the working branch. Shared + by ``merge_subtask_worktree`` (single) and ``merge_wave_worktrees`` (wave) so + the guard/verify logic has exactly one definition (council Q4: share + lower-level primitives, keep the two coordinators as separate compositions). + On success returns ``{"ok": True, "wt_head", "deleted", "no_changes", + "verification"}``; on any guard/verify failure returns a structured + ``_wt_error`` (``status=="error"``). """ - project_dir = _wt_project_dir() - if not _wt_is_git_repo(): - return _wt_error("NOT_A_REPO", "not inside a git work tree") - if _wt_cwd_is_managed_worktree(): - return _wt_error( - "NESTED_WORKTREE", "run merge from the main checkout, not inside a worktree" - ) - active = _wt_active_git_operation() - if active: - return _wt_error("ACTIVE_GIT_OP", f"a {active} is in progress; resolve it first") - slug = _wt_slug(subtask_id) - if slug is None: - return _wt_error("INVALID_SUBTASK_ID", f"unsafe subtask id: {subtask_id!r}") - branch_name = branch or get_branch_name() - state = _read_worktree_state(branch_name) - worktrees = state["worktrees"] - record = worktrees.get(slug) if isinstance(worktrees, dict) else None - if not isinstance(record, dict): - return _wt_error( - "NO_WORKTREE", - f"no recorded worktree for subtask {subtask_id!r}; create it first", - ) wt_path = Path(str(record.get("path", ""))) wt_branch = str(record.get("branch", "")) base_sha = str(record.get("base_sha", "")) - if not wt_path.is_dir() or not wt_branch or not base_sha: - return _wt_error( - "WORKTREE_MISSING", - "the recorded worktree is missing on disk; discard and recreate", - ) - - working_head = _wt_head_sha() - if working_head != base_sha: - return _wt_error( - "BASE_DIVERGED", - f"working branch advanced since the worktree was created " - f"(base={base_sha[:8]}, head={(working_head or '?')[:8]}); discard and " - "recreate the worktree off the new HEAD", - base_sha=base_sha, - working_head=working_head, - ) add = _wt_git(["add", "-A"], cwd=wt_path) if add.returncode != 0: @@ -15599,7 +15564,12 @@ def merge_subtask_worktree( staged = _wt_git(["diff", "--cached", "--quiet"], cwd=wt_path) if staged.returncode == 1: commit = _wt_git( - ["commit", "--no-verify", "-m", f"map-wt: {subtask_id} (attempt {record.get('attempt', 0)})"], + [ + "commit", + "--no-verify", + "-m", + f"map-wt: {subtask_id} (attempt {record.get('attempt', 0)})", + ], cwd=wt_path, ) if commit.returncode != 0: @@ -15698,6 +15668,200 @@ def merge_subtask_worktree( ) verification = {"ran": True, "status": "passed", "checks": results} + return { + "status": "success", + "ok": True, + "wt_head": wt_head, + "deleted": deleted, + "no_changes": no_changes, + "verification": verification, + } + + +def _wt_rollback(base_sha: str) -> None: + """Undo an in-progress wave merge: hard-reset to the wave base + clean. + + A ``git merge --squash`` records NO ``MERGE_HEAD``, so ``git merge --abort`` + is unusable (council Q2). ``reset --hard`` + ``clean -fd`` is the only correct + undo. MAP runtime state (.map/.codex/.agents) is EXCLUDED from the clean so a + rollback never destroys the worktree sidecar or step state. + """ + _wt_git(["reset", "--hard", base_sha]) + _wt_git(["clean", "-fd", "-e", ".map", "-e", ".codex", "-e", ".agents"]) + + +def _wt_unmerged_paths() -> list[str]: + """Paths left in a conflicted (unmerged) state after a failed squash merge.""" + r = _wt_git(["diff", "--name-only", "--diff-filter=U"]) + if r.returncode != 0: + return [] + return [ln.strip() for ln in r.stdout.splitlines() if ln.strip()] + + +def _wt_changed_files(base_sha: str, wt_head: str, wt_path: Path) -> list[str]: + """The set of files a worktree actually changed vs the wave base.""" + r = _wt_git(["diff", "--name-only", f"{base_sha}..{wt_head}"], cwd=wt_path) + if r.returncode != 0: + return [] + return [ln.strip() for ln in r.stdout.splitlines() if ln.strip()] + + +def _wt_overlap_pairs(prepared: list[dict[str, Any]]) -> list[dict[str, object]]: + """Telemetry: subtask pairs whose ACTUAL changed-file sets intersect. + + The scheduler's ``split_wave_by_file_conflicts`` only guarantees *declared* + ``affected_files`` are disjoint; an Actor can touch an unlisted file. Git's + textual-conflict abort is the HARD guard — this overlap report is advisory + attribution only (which subtasks "lied" about their boundaries). + """ + out: list[dict[str, object]] = [] + for i in range(len(prepared)): + for j in range(i + 1, len(prepared)): + a = set(prepared[i].get("changed_files") or []) + b = set(prepared[j].get("changed_files") or []) + shared = sorted(a & b) + if shared: + out.append( + { + "subtasks": [ + prepared[i]["subtask_id"], + prepared[j]["subtask_id"], + ], + "files": shared[:50], + } + ) + return out + + +def _wt_attribute_conflict( + conflict_files: list[str], prepared: list[dict[str, Any]] +) -> list[dict[str, object]]: + """Map conflicted paths back to the wave subtasks that touched them.""" + out: list[dict[str, object]] = [] + cset = set(conflict_files) + for item in prepared: + touched = sorted(cset & set(item.get("changed_files") or [])) + if touched: + out.append({"subtask_id": item["subtask_id"], "files": touched}) + return out + + +def _wt_merge_lock_path() -> Optional[Path]: + common = _wt_git_common_dir() + if common is None: + return None + return common / "map-framework" / "wave-merge.lock" + + +def _wt_acquire_merge_lock() -> Optional[Any]: + """Advisory lock so two wave merges never interleave squash commits. + + Returns an open file handle holding the lock, or None if the lock is already + held (the caller maps that to ``MERGE_IN_PROGRESS``). Degrades to a held-open + sentinel handle where ``fcntl`` is unavailable (non-POSIX) — concurrency + protection is best-effort there, but the release path stays uniform. + """ + lock_path = _wt_merge_lock_path() + if lock_path is None: + return None + lock_path.parent.mkdir(parents=True, exist_ok=True) + handle = lock_path.open("w") + try: + import fcntl # noqa: PLC0415 + except ImportError: + return handle # best-effort: no advisory lock available + try: + fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + except OSError: + handle.close() + return None + return handle + + +def _wt_release_merge_lock(handle: Optional[Any]) -> None: + if handle is None: + return + try: + import fcntl # noqa: PLC0415 + + try: + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) + except OSError: + pass + except ImportError: + pass + try: + handle.close() + except OSError: + pass + + +def merge_subtask_worktree( + subtask_id: str, + attempt: int = 0, + branch: Optional[str] = None, + verify_cmds: Optional[list[str]] = None, + skip_verify: bool = False, +) -> dict[str, object]: + """Accept a subtask: commit worktree work, run pre-merge verification IN the + worktree, then squash-merge ONE commit into the working branch (#284). + + Council-mandated guards run BEFORE the merge touches the working branch: + base-divergence, runtime-state-in-diff, bulk-deletion, submodule-pointer, + detached-HEAD, and the pre-merge `verification_checks` gate. Any failure + leaves the working branch untouched and returns a structured ``kind``. + """ + project_dir = _wt_project_dir() + if not _wt_is_git_repo(): + return _wt_error("NOT_A_REPO", "not inside a git work tree") + if _wt_cwd_is_managed_worktree(): + return _wt_error( + "NESTED_WORKTREE", "run merge from the main checkout, not inside a worktree" + ) + active = _wt_active_git_operation() + if active: + return _wt_error("ACTIVE_GIT_OP", f"a {active} is in progress; resolve it first") + slug = _wt_slug(subtask_id) + if slug is None: + return _wt_error("INVALID_SUBTASK_ID", f"unsafe subtask id: {subtask_id!r}") + branch_name = branch or get_branch_name() + state = _read_worktree_state(branch_name) + worktrees = state["worktrees"] + record = worktrees.get(slug) if isinstance(worktrees, dict) else None + if not isinstance(record, dict): + return _wt_error( + "NO_WORKTREE", + f"no recorded worktree for subtask {subtask_id!r}; create it first", + ) + wt_path = Path(str(record.get("path", ""))) + wt_branch = str(record.get("branch", "")) + base_sha = str(record.get("base_sha", "")) + if not wt_path.is_dir() or not wt_branch or not base_sha: + return _wt_error( + "WORKTREE_MISSING", + "the recorded worktree is missing on disk; discard and recreate", + ) + + working_head = _wt_head_sha() + if working_head != base_sha: + return _wt_error( + "BASE_DIVERGED", + f"working branch advanced since the worktree was created " + f"(base={base_sha[:8]}, head={(working_head or '?')[:8]}); discard and " + "recreate the worktree off the new HEAD", + base_sha=base_sha, + working_head=working_head, + ) + + prep = _wt_freeze_and_verify( + subtask_id, record, project_dir, branch_name, verify_cmds, skip_verify + ) + if prep.get("status") == "error": + return prep + deleted = cast(list, prep["deleted"]) + no_changes = bool(prep["no_changes"]) + verification = cast(dict, prep["verification"]) + merged_sha = working_head if not no_changes: merge = _wt_git(["merge", "--squash", wt_branch]) @@ -15824,6 +15988,319 @@ def discard_subtask_worktree( } +def merge_wave_worktrees( + subtask_ids: list[str], + branch: Optional[str] = None, + verify_cmds: Optional[list[str]] = None, + skip_verify: bool = False, + post_wave_cmds: Optional[list[str]] = None, + skip_post_wave: bool = False, +) -> dict[str, object]: + """Accept a whole parallel wave atomically (#284 Phase 2, wave/DAG). + + Every subtask in a wave ran in its own worktree cut off the SAME base (HEAD + at wave start). Merging them one-by-one via ``merge_subtask_worktree`` is + impossible: the first merge advances HEAD, so the second trips its + ``BASE_DIVERGED`` guard. This coordinator relaxes ONLY that guard to a + wave-scoped form — it refuses EXTERNAL HEAD movement but ALLOWS the sibling + divergence each in-wave squash-merge creates. + + All-or-nothing (council Q2): any conflict, commit, or post-wave-gate failure + rolls the working branch back to the wave base via ``reset --hard`` + + ``clean -fd`` (squash merges leave no ``MERGE_HEAD`` so ``git merge --abort`` + is NOT used) and leaves EVERY worktree intact for retry. Council-reviewed + (conv ``c29d6fa9``): dedicated coordinator over a flag on the single path; + ``wave_base_sha`` derived from the sidecar; merge by frozen SHA; per-worktree + pre-merge verify + ONE post-wave full gate inside the atomic transaction. + """ + project_dir = _wt_project_dir() + if not _wt_is_git_repo(): + return _wt_error("NOT_A_REPO", "not inside a git work tree") + if _wt_cwd_is_managed_worktree(): + return _wt_error( + "NESTED_WORKTREE", "run wave merge from the main checkout, not inside a worktree" + ) + active = _wt_active_git_operation() + if active: + return _wt_error("ACTIVE_GIT_OP", f"a {active} is in progress; resolve it first") + + ids = sorted({str(s) for s in subtask_ids if str(s).strip()}) + if not ids: + return _wt_error("NO_SUBTASKS", "no subtask ids supplied for the wave merge") + + branch_name = branch or get_branch_name() + state = _read_worktree_state(branch_name) + worktrees = state["worktrees"] + if not isinstance(worktrees, dict): + return _wt_error("NO_WORKTREE", "no worktree state recorded for this branch") + + # Resolve every subtask's record; validate slug + on-disk presence. + records: list[tuple[str, str, dict]] = [] # (subtask_id, slug, record) + base_shas: set[str] = set() + for sid in ids: + slug = _wt_slug(sid) + if slug is None: + return _wt_error("INVALID_SUBTASK_ID", f"unsafe subtask id: {sid!r}") + record = worktrees.get(slug) + if not isinstance(record, dict): + return _wt_error( + "NO_WORKTREE", + f"no recorded worktree for subtask {sid!r}; create it first", + subtask_id=sid, + ) + wt_path = Path(str(record.get("path", ""))) + if not wt_path.is_dir() or not record.get("branch") or not record.get("base_sha"): + return _wt_error( + "WORKTREE_MISSING", + f"the recorded worktree for {sid!r} is missing on disk; discard and recreate", + subtask_id=sid, + ) + base_shas.add(str(record.get("base_sha"))) + records.append((sid, slug, record)) + + # A coherent wave's worktrees all share one base (cut off the same HEAD). + if len(base_shas) != 1: + return _wt_error( + "WAVE_BASE_MISMATCH", + "worktrees in the wave were created off different bases; recreate them " + "off a single HEAD before a wave merge", + bases=sorted(b[:8] for b in base_shas), + ) + wave_base_sha = next(iter(base_shas)) + + # External-movement guard: the working branch must still sit at the wave base. + # Sibling divergence WITHIN the wave is expected and allowed; commits made + # outside the wave are not (they invalidate every worktree's pre-merge state). + working_head = _wt_head_sha() + if working_head != wave_base_sha: + return _wt_error( + "EXTERNAL_HEAD_MOVED", + f"working branch advanced outside the wave (base={wave_base_sha[:8]}, " + f"head={(working_head or '?')[:8]}); recreate the wave worktrees off the " + "new HEAD", + base_sha=wave_base_sha, + working_head=working_head, + ) + + # The target must be an attached, clean branch before we touch it — rollback + # semantics depend on it. MAP runtime state is excluded from the dirty check. + cur = _wt_git(["rev-parse", "--abbrev-ref", "HEAD"], timeout=10) + if cur.returncode != 0 or cur.stdout.strip() == "HEAD": + return _wt_error( + "DETACHED_HEAD", + "refusing to wave-merge onto a detached HEAD; check out the working branch", + ) + status = _wt_git(["status", "--porcelain"]) + dirty = [ + ln + for ln in status.stdout.splitlines() + if ln.strip() and not _wt_is_runtime_state_path(_wt_porcelain_path(ln)) + ] + if dirty: + return _wt_error( + "DIRTY_TARGET", + "the working tree has uncommitted changes; commit/stash before a wave merge", + dirty=dirty[:20], + ) + + # Serialize coordinators so two waves never interleave squash commits. + lock_handle = _wt_acquire_merge_lock() + if lock_handle is None: + return _wt_error( + "MERGE_IN_PROGRESS", + "another wave merge is in progress on this repository; retry when it completes", + ) + + try: + # PHASE 1 — preflight every worktree (commit + guards + pre-merge verify) + # WITHOUT touching the working branch. A failure here aborts BEFORE any + # merge, so the working branch is trivially untouched. + prepared: list[dict[str, Any]] = [] + for sid, slug, record in records: + prep = _wt_freeze_and_verify( + sid, record, project_dir, branch_name, verify_cmds, skip_verify + ) + if prep.get("status") == "error": + prep.setdefault("subtask_id", sid) + prep["phase"] = "preflight" + return prep + changed_files = _wt_changed_files( + str(record.get("base_sha")), + str(prep["wt_head"]), + Path(str(record.get("path", ""))), + ) + prepared.append( + { + "subtask_id": sid, + "slug": slug, + "record": record, + "wt_head": str(prep["wt_head"]), + "no_changes": bool(prep["no_changes"]), + "deleted": prep["deleted"], + "changed_files": changed_files, + } + ) + + # Declared-disjoint is only a scheduler hint; report ACTUAL overlap for + # attribution. Git's textual-conflict abort below is the HARD guard. + overlaps = _wt_overlap_pairs(prepared) + + # PHASE 2 — sequential squash-merge by FROZEN SHA onto the advancing HEAD. + merged: list[dict[str, Any]] = [] + for item in prepared: + sid = str(item["subtask_id"]) + if item["no_changes"]: + continue + wt_head = str(item["wt_head"]) + merge = _wt_git(["merge", "--squash", wt_head]) + if merge.returncode != 0: + conflict_files = _wt_unmerged_paths() + attribution = _wt_attribute_conflict(conflict_files, prepared) + _wt_rollback(wave_base_sha) + _wt_set_manifest( + branch_name, + "wave_failed", + { + "subtask_id": sid, + "reason": "merge_conflict", + "conflict_files": conflict_files[:50], + }, + ) + return _wt_error( + "WAVE_MERGE_CONFLICT", + f"squash-merge of {sid} hit a conflict; rolled the wave back to " + f"base {wave_base_sha[:8]} (NO subtask merged). The conflicting " + "files were touched by more than one subtask — fix affected_files " + "or re-decompose.", + subtask_id=sid, + conflict_files=conflict_files[:50], + attribution=attribution, + stderr_tail=_clip_probe_output(merge.stderr)[-2000:], + ) + commit = _wt_git( + ["commit", "--no-verify", "-m", f"{sid}: merge isolated worktree (wave)"] + ) + combined = (commit.stdout + commit.stderr).lower() + if commit.returncode != 0 and "nothing to commit" not in combined: + _wt_rollback(wave_base_sha) + _wt_set_manifest( + branch_name, "wave_failed", {"subtask_id": sid, "reason": "commit_failed"} + ) + return _wt_error( + "WAVE_COMMIT_FAILED", + commit.stderr.strip() or f"git commit failed after squash for {sid}", + subtask_id=sid, + ) + merged.append( + { + "subtask_id": sid, + "merged_sha": _wt_head_sha(), + "deletions": len(item["deleted"]) if isinstance(item["deleted"], list) else 0, + } + ) + + # PHASE 3 — ONE post-wave full gate on the merged tree, INSIDE the atomic + # transaction (council Q3): a semantic break two subtasks create together + # (A renames a symbol B references) is caught here, not by git's textual + # merge. Failure rolls the WHOLE wave back. + post_checks = ( + list(post_wave_cmds) + if post_wave_cmds is not None + else _wt_config_verification_checks(project_dir) + ) + post_wave: dict[str, object] = {"ran": False, "status": "skipped", "checks": []} + if not skip_post_wave and post_checks and merged: + results: list[dict[str, object]] = [] + top = _wt_toplevel() or Path(".") + for cmd in post_checks: + argv = shlex.split(cmd) + if not argv: + continue + try: + cp = subprocess.run( + argv, + cwd=str(top), + capture_output=True, + text=True, + timeout=WORKTREE_VERIFY_TIMEOUT, + ) + except subprocess.TimeoutExpired: + _wt_rollback(wave_base_sha) + _wt_set_manifest( + branch_name, + "wave_failed", + {"reason": "post_wave_timeout", "command": cmd}, + ) + return _wt_error( + "WAVE_VERIFY_TIMEOUT", + f"post-wave verification timed out: {cmd}; rolled back to base", + command=cmd, + ) + except (OSError, subprocess.SubprocessError) as exc: + _wt_rollback(wave_base_sha) + return _wt_error( + "WAVE_VERIFY_ERROR", + f"post-wave verification failed to run: {cmd}: {exc}", + command=cmd, + ) + results.append({"command": cmd, "returncode": cp.returncode}) + if cp.returncode != 0: + _wt_rollback(wave_base_sha) + _wt_set_manifest( + branch_name, + "wave_failed", + {"reason": "post_wave_failed", "command": cmd, "returncode": cp.returncode}, + ) + return _wt_error( + "WAVE_VERIFY_FAILED", + f"post-wave gate failed: {cmd} (exit {cp.returncode}); rolled the " + f"wave back to base {wave_base_sha[:8]} (NO subtask merged)", + command=cmd, + returncode=cp.returncode, + stderr_tail=_clip_probe_output(cp.stderr)[-2000:], + ) + post_wave = {"ran": True, "status": "passed", "checks": results} + + # PHASE 4 — accept: remove every worktree+branch, drop from the sidecar. + state = _read_worktree_state(branch_name) + worktrees = state["worktrees"] + for item in prepared: + rec = cast(dict, item["record"]) + _wt_force_remove(Path(str(rec.get("path", ""))), str(rec.get("branch", ""))) + if isinstance(worktrees, dict): + worktrees.pop(str(item["slug"]), None) + _write_worktree_state(branch_name, state) + final_head = _wt_head_sha() + no_change_ids = [str(p["subtask_id"]) for p in prepared if p["no_changes"]] + merged_ids = [str(m["subtask_id"]) for m in merged] + _wt_set_manifest( + branch_name, + "wave_merged", + { + "subtasks": merged_ids, + "merged_count": len(merged), + "no_change_count": len(no_change_ids), + "final_sha": final_head, + "post_wave": post_wave.get("status"), + }, + ) + + return { + "status": "success", + "ok": True, + "wave_base_sha": wave_base_sha, + "final_sha": final_head, + "merged": merged_ids, + "merged_count": len(merged), + "no_changes": no_change_ids, + "post_wave": post_wave, + "overlaps": overlaps, + "note": "all wave subtasks squash-merged atomically; worktrees cleaned up", + } + finally: + _wt_release_merge_lock(lock_handle) + + def worktree_isolation_status(branch: Optional[str] = None) -> dict[str, object]: """Report whether isolation is enabled + reconcile recorded vs live worktrees.""" project_dir = _wt_project_dir() @@ -17300,6 +17777,36 @@ def _flag_val(name: str) -> Optional[str]: if _wt_r.get("status") == "error": sys.exit(1) + elif func_name == "merge_wave_worktrees": + # CLI: merge_wave_worktrees [ ...] [--branch B] + # [--verify-cmd CMD ...] [--skip-verify] + # [--post-wave-cmd CMD ...] [--skip-post-wave] + # Accept a whole parallel wave atomically: per-worktree pre-merge verify, + # sequential squash-merge by frozen SHA onto the advancing HEAD, ONE + # post-wave gate inside the transaction. Any failure rolls the wave back + # to base and exits 1; worktrees are left intact for retry. + import argparse as _ap + + _p = _ap.ArgumentParser(prog="map_step_runner.py merge_wave_worktrees") + _p.add_argument("subtask_ids", nargs="+") + _p.add_argument("--branch", default=None) + _p.add_argument("--verify-cmd", action="append", default=None) + _p.add_argument("--skip-verify", action="store_true") + _p.add_argument("--post-wave-cmd", action="append", default=None) + _p.add_argument("--skip-post-wave", action="store_true") + _a = _p.parse_args(sys.argv[2:]) + _wt_r = merge_wave_worktrees( + _a.subtask_ids, + _a.branch, + _a.verify_cmd, + _a.skip_verify, + _a.post_wave_cmd, + _a.skip_post_wave, + ) + print(json.dumps(_wt_r, indent=2)) + if _wt_r.get("status") == "error": + sys.exit(1) + elif func_name == "discard_subtask_worktree": # CLI: discard_subtask_worktree [--attempt N] [--branch B] # [--save-patch] diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ba0aaab..2d47acb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- **Parallel-wave merge coordinator for worktree isolation (`merge_wave_worktrees`, part of #284 Phase 2).** Wires the existing wave/DAG scheduler to per-subtask worktree isolation so a parallel wave's independent subtasks each run in their own worktree and are accepted **atomically**. Every worktree of a wave is cut off the same base (HEAD at wave start), so they cannot be merged one at a time — the first `merge_subtask_worktree` advances HEAD and the next trips `BASE_DIVERGED`. The new coordinator relaxes *only* that guard to a wave-scoped form: it refuses **external** HEAD movement (`EXTERNAL_HEAD_MOVED`) but allows the sibling divergence each in-wave squash-merge creates. It derives `wave_base_sha` from the sidecar (never a caller parameter), preflights every worktree (commit + per-worktree guards + pre-merge verify) BEFORE touching the working branch, then squash-merges each accepted worktree **by frozen SHA in sorted id order** (one runner commit per subtask — the one-commit-per-subtask contract holds), then runs **one post-wave full gate on the merged tree inside the same transaction**. It is **all-or-nothing** (council-reviewed, conv `c29d6fa9`): any textual conflict, commit failure, or post-wave-gate failure rolls the whole working branch back to the wave base via `git reset --hard` + `git clean -fd` (squash leaves no `MERGE_HEAD`, so `git merge --abort` is never used; MAP runtime state is excluded from the clean) and leaves **every** worktree intact for retry — no partial-wave state ever survives. Safety extras: an advisory `flock` serializes coordinators (`MERGE_IN_PROGRESS`); attached-/clean-target preconditions; conflicted paths are attributed back to the subtasks that touched them (declared-disjoint `affected_files` is only a scheduler hint, so actual changed-file overlap is reported as advisory telemetry while git's textual conflict stays the hard guard). The shared `_wt_freeze_and_verify` primitive (commit + guards + pre-merge verify) is extracted once and reused by both the single-subtask and wave merge paths. CLI: `merge_wave_worktrees [--branch B] [--verify-cmd CMD…] [--skip-verify] [--post-wave-cmd CMD…] [--skip-post-wave]`. Phase 3 (context-budget hooks) remains open on #284. - **Per-subtask git worktree isolation for `/map-efficient` (`worktree.isolation`, part of #284).** Opt-in, OFF by default. When enabled, each subtask's Actor runs in a dedicated, throwaway git worktree and its result is squash-merged back into the working branch ONLY after the configured `verification_checks` pass IN the worktree (a **pre-merge** gate, strictly stronger than today's post-commit check) — a rejected attempt (Monitor `valid=false` / Evaluator fail) is discarded so the working branch is never touched by a bad attempt. The Python step runner owns the whole lifecycle and every safety guard (producer-owns-parse): `create_subtask_worktree` (crash-safe remove-and-recreate; guards: not-a-repo, protected-ref, nested-worktree refusal, active-git-op, `subtask_id` ref/path sanitization, dirty-main refusal, submodule init), `merge_subtask_worktree` (guards run BEFORE the working branch is touched: base-divergence `git merge-base` check, runtime-state-in-diff, configurable bulk-deletion threshold `worktree.max_deletions`, submodule-pointer change, detached-HEAD, then the pre-merge verify gate; accept = `git merge --squash` + one runner-authored commit, never `--no-ff`, preserving one-commit-per-subtask), `discard_subtask_worktree` (atomic reject, idempotent, optional `--save-patch` forensics), and `worktree_isolation_status` (reconciles recorded vs live worktrees). Worktrees are stored OUT of the working tree under the repo's git common dir (`/map-framework/worktrees/`), so `git clean -fdx`, recursive scanners, and accidental commits can never touch them; MAP runtime state (`.map//...`) always resolves against the main checkout — state-mutating commands refuse if invoked from inside a managed worktree (the silent state-desync footgun). Every guard returns a structured `{kind, message}` the skill branches on. Config keys `worktree.{isolation,max_deletions}`; new `worktree` manifest stage; `.map//worktrees.json` sidecar. Design was llm-council-reviewed (runner-owned worktrees over harness-native `isolation="worktree"`; squash-merge over `--no-ff`; always-discard on reject; pre-merge verification + crash-safe retry + atomic reject folded in so the slice is not a no-op; explicit state-root separation). Phase 2 (wave/DAG parallelism) and Phase 3 (context-budget hooks) remain open on #284. - **Cross-AI peer review for `/map-review` (`--cross-ai `, part of #288).** `/map-review --cross-ai codex|gemini|claude|opencode` dispatches the review to an INDEPENDENT external AI CLI for a true second opinion (different model/vendor, fresh context with no shared session). The dispatch, parsing, normalization, and untrusted-wrapping all live in the Python step runner (`run_cross_ai_review` / `dispatch_cross_ai_review`, producer-owns-parse) — the skill only handles consent and presentation. Egress is **double-consent**: the per-run `--cross-ai` flag AND `review.cross_ai.enabled: true` in `.map/config.yaml` (off by default) are both required, because the diff/code leaves the machine. Mandatory guardrails: a **high-confidence outbound secret scan** (private keys, AWS/GitHub/Google/Slack credentials) BLOCKS dispatch before the subprocess and surfaces only the pattern name, never the value; the external CLI is invoked `shell=False` with a literal-argv adapter and a configurable timeout; the returned findings ALWAYS enter context behind an `EXTERNAL UNTRUSTED REFERENCE` fence (link/injection scan, applied deterministically in Python so the model cannot skip it) and are advisory-only (`source: cross_ai`, never auto-applied); same-vendor runtimes (`claude`) are honestly labeled `independent_vendor: false`. Any dispatch failure (disabled, CLI missing, not authenticated, timeout, non-JSON output, secret-blocked) degrades non-blockingly and falls back to the in-session review. Config keys `review.cross_ai.{enabled,runtime,timeout_seconds}`. Design was llm-council-reviewed (Python-owned dispatch; single-runtime slice with `--cross-ai all` consensus deferred to a follow-up slice). diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index d3f1e473..e653f016 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -147,8 +147,47 @@ Design decisions (llm-council-reviewed, conv `461b92f9`): State lives in `.map//worktrees.json` (sidecar) plus a `worktree` manifest stage; every guard returns a structured `{kind, message}` the skill -branches on. Phase 2 (wave/DAG parallelism) and Phase 3 (context-budget hooks) -remain open on #284. +branches on. + +### Phase 2: parallel-wave merge coordinator + +`merge_wave_worktrees` accepts a whole **parallel wave** atomically. Every +subtask of a wave runs in its own worktree cut off the same base (HEAD at wave +start), so they cannot be merged one at a time — the first +`merge_subtask_worktree` advances HEAD and the next trips `BASE_DIVERGED`. The +coordinator relaxes *only* that guard to a wave-scoped form: it refuses +**external** HEAD movement (`EXTERNAL_HEAD_MOVED`) but allows the sibling +divergence each in-wave squash-merge creates. Design (llm-council-reviewed, conv +`c29d6fa9`): + +- **Dedicated coordinator, not a flag.** Kept separate from the single-subtask + `merge_subtask_worktree` (zero blast radius on the shipped path); the two share + the extracted `_wt_freeze_and_verify` primitive (commit + per-worktree guards + + pre-merge verify) but stay separate compositions. `wave_base_sha` is derived + from the sidecar, never a caller parameter. +- **Merge by frozen SHA, deterministic order.** Subtask ids are sorted; each + accepted worktree is squash-merged by its frozen head SHA (`git merge --squash`, + one runner commit per subtask — the one-commit-per-subtask contract holds). +- **All-or-nothing.** Any textual conflict, commit failure, **or post-wave-gate + failure** rolls the whole working branch back to the wave base with `reset + --hard` + `clean -fd` (squash leaves no `MERGE_HEAD`, so `git merge --abort` is + never used; MAP runtime state is excluded from the clean) and leaves **every** + worktree intact for retry. No partial-wave state ever survives. +- **One post-wave full gate inside the transaction.** Per-worktree pre-merge + verify is a local sanity gate; the post-wave `verification_checks` run on the + fully merged tree are the true correctness gate (they catch a semantic break + two subtasks create together — e.g. A renames a symbol B references — that no + textual merge can see). It runs inside the atomic transaction, so a red gate + rolls the wave back. +- **Safety extras:** an advisory `flock` serializes coordinators + (`MERGE_IN_PROGRESS`); attached-/clean-target preconditions; conflicted paths + are attributed back to the subtasks that touched them (declared-disjoint + `affected_files` is only a scheduler hint, so actual changed-file overlap is + reported as advisory telemetry while git's textual conflict stays the hard + guard). + +Phase 3 (context-budget hooks: statusline, threshold warnings, heartbeat) +remains open on #284. ## Stack Overflow for Agents (SOFA) Integration diff --git a/docs/USAGE.md b/docs/USAGE.md index 1443fb1d..beb09d2b 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -238,10 +238,29 @@ python3 .map/scripts/map_step_runner.py create_subtask_worktree ST-001 python3 .map/scripts/map_step_runner.py merge_subtask_worktree ST-001 # Reject (Monitor/Evaluator fail) — discard, retry starts from a clean HEAD: python3 .map/scripts/map_step_runner.py discard_subtask_worktree ST-001 --save-patch +# Accept a whole PARALLEL wave atomically (≥2 disjoint subtasks; see below): +python3 .map/scripts/map_step_runner.py merge_wave_worktrees ST-001 ST-002 ST-003 # Inspect: python3 .map/scripts/map_step_runner.py worktree_isolation_status ``` +### Parallel waves (Phase 2) + +When a wave has ≥2 independent, disjoint-file subtasks and isolation is on, each +subtask runs in its own worktree and the wave is accepted **atomically** with +`merge_wave_worktrees ST-A ST-B …`. They cannot be merged one at a time: every +worktree was cut off the same base, so the first single merge advances HEAD and +the next trips `BASE_DIVERGED`. The coordinator squash-merges every accepted +worktree by frozen SHA in sorted id order (one runner commit per subtask), then +runs the **post-wave gate inside the same transaction**. It is **all-or-nothing** +— any textual conflict, commit failure, or post-wave-gate failure rolls the whole +wave back to the base (`reset --hard` + `clean -fd`; squash leaves no +`MERGE_HEAD`, so `git merge --abort` is never used) and leaves every worktree +intact for retry. Failure `kind`s: `WAVE_MERGE_CONFLICT` (with `attribution` +naming the subtasks that touched each conflicted file), `WAVE_VERIFY_FAILED`, +`EXTERNAL_HEAD_MOVED`, `WAVE_BASE_MISMATCH`, `DIRTY_TARGET`, `MERGE_IN_PROGRESS`. +A concurrent second coordinator is blocked by an advisory lock. + ### Safety model - **Runner-owned, not harness-native.** The runner creates explicit worktrees; @@ -259,8 +278,8 @@ python3 .map/scripts/map_step_runner.py worktree_isolation_status structured `{kind, message}` the skill branches on (e.g. `VERIFY_FAILED`, `BULK_DELETION`, `BASE_DIVERGED`). -Phase 2 (wave/DAG parallelism across independent subtasks) and Phase 3 -(context-budget hooks) remain open on #284. +Phase 2's wave-merge coordinator (`merge_wave_worktrees`) has landed; Phase 3 +(context-budget hooks) remains open on #284. ## Stack Overflow for Agents (SOFA) diff --git a/src/mapify_cli/minimality_report.py b/src/mapify_cli/minimality_report.py index 25190030..22b0f78d 100644 --- a/src/mapify_cli/minimality_report.py +++ b/src/mapify_cli/minimality_report.py @@ -344,7 +344,7 @@ def _summarize( def build_minimality_rollout_report( project_path: Path, min_complete_runs: int = 3 -) -> dict[str, object]: +) -> dict[str, Any]: """Build a local telemetry report for the minimality Phase 3 gate. The report is read-only. It treats run-health reports that carry their own diff --git a/src/mapify_cli/templates/codex/skills/map-efficient/efficient-reference.md b/src/mapify_cli/templates/codex/skills/map-efficient/efficient-reference.md index 0c9198ab..f8252ce1 100644 --- a/src/mapify_cli/templates/codex/skills/map-efficient/efficient-reference.md +++ b/src/mapify_cli/templates/codex/skills/map-efficient/efficient-reference.md @@ -94,6 +94,20 @@ python3 .map/scripts/map_orchestrator.py advance_wave Do not mix wave APIs with the sequential `get_next_step` cursor for the same wave unless the orchestrator response explicitly tells you to fall back. +When `worktree.isolation` is enabled and a wave runs in parallel (≥2 disjoint +subtasks), give each subtask its own worktree and accept the whole wave +atomically after all pass Monitor — never merge them one at a time (the first +merge advances HEAD and the next trips `BASE_DIVERGED`): + +```bash +python3 .map/scripts/map_step_runner.py merge_wave_worktrees "$ST_A" "$ST_B" +``` + +It runs the post-wave gate inside the transaction and rolls the whole wave back +to base on any conflict or gate failure (worktrees kept for retry). On a single +subtask's Monitor failure, `discard_subtask_worktree` that subtask and retry it +before calling `merge_wave_worktrees`. + ## TDD Mode `--tdd` inserts `TEST_WRITER` and `TEST_FAIL_GATE` before `ACTOR`. diff --git a/src/mapify_cli/templates/map/scripts/map_step_runner.py b/src/mapify_cli/templates/map/scripts/map_step_runner.py index 8a37b1de..c1ca6541 100755 --- a/src/mapify_cli/templates/map/scripts/map_step_runner.py +++ b/src/mapify_cli/templates/map/scripts/map_step_runner.py @@ -15534,62 +15534,27 @@ def create_subtask_worktree( } -def merge_subtask_worktree( +def _wt_freeze_and_verify( subtask_id: str, - attempt: int = 0, - branch: Optional[str] = None, + record: dict, + project_dir: Path, + branch_name: str, verify_cmds: Optional[list[str]] = None, skip_verify: bool = False, ) -> dict[str, object]: - """Accept a subtask: commit worktree work, run pre-merge verification IN the - worktree, then squash-merge ONE commit into the working branch (#284). - - Council-mandated guards run BEFORE the merge touches the working branch: - base-divergence, runtime-state-in-diff, bulk-deletion, submodule-pointer, - detached-HEAD, and the pre-merge `verification_checks` gate. Any failure - leaves the working branch untouched and returns a structured ``kind``. + """Commit a worktree's work + run per-worktree guards + pre-merge verify. + + Operates ONLY inside the worktree — never touches the working branch. Shared + by ``merge_subtask_worktree`` (single) and ``merge_wave_worktrees`` (wave) so + the guard/verify logic has exactly one definition (council Q4: share + lower-level primitives, keep the two coordinators as separate compositions). + On success returns ``{"ok": True, "wt_head", "deleted", "no_changes", + "verification"}``; on any guard/verify failure returns a structured + ``_wt_error`` (``status=="error"``). """ - project_dir = _wt_project_dir() - if not _wt_is_git_repo(): - return _wt_error("NOT_A_REPO", "not inside a git work tree") - if _wt_cwd_is_managed_worktree(): - return _wt_error( - "NESTED_WORKTREE", "run merge from the main checkout, not inside a worktree" - ) - active = _wt_active_git_operation() - if active: - return _wt_error("ACTIVE_GIT_OP", f"a {active} is in progress; resolve it first") - slug = _wt_slug(subtask_id) - if slug is None: - return _wt_error("INVALID_SUBTASK_ID", f"unsafe subtask id: {subtask_id!r}") - branch_name = branch or get_branch_name() - state = _read_worktree_state(branch_name) - worktrees = state["worktrees"] - record = worktrees.get(slug) if isinstance(worktrees, dict) else None - if not isinstance(record, dict): - return _wt_error( - "NO_WORKTREE", - f"no recorded worktree for subtask {subtask_id!r}; create it first", - ) wt_path = Path(str(record.get("path", ""))) wt_branch = str(record.get("branch", "")) base_sha = str(record.get("base_sha", "")) - if not wt_path.is_dir() or not wt_branch or not base_sha: - return _wt_error( - "WORKTREE_MISSING", - "the recorded worktree is missing on disk; discard and recreate", - ) - - working_head = _wt_head_sha() - if working_head != base_sha: - return _wt_error( - "BASE_DIVERGED", - f"working branch advanced since the worktree was created " - f"(base={base_sha[:8]}, head={(working_head or '?')[:8]}); discard and " - "recreate the worktree off the new HEAD", - base_sha=base_sha, - working_head=working_head, - ) add = _wt_git(["add", "-A"], cwd=wt_path) if add.returncode != 0: @@ -15599,7 +15564,12 @@ def merge_subtask_worktree( staged = _wt_git(["diff", "--cached", "--quiet"], cwd=wt_path) if staged.returncode == 1: commit = _wt_git( - ["commit", "--no-verify", "-m", f"map-wt: {subtask_id} (attempt {record.get('attempt', 0)})"], + [ + "commit", + "--no-verify", + "-m", + f"map-wt: {subtask_id} (attempt {record.get('attempt', 0)})", + ], cwd=wt_path, ) if commit.returncode != 0: @@ -15698,6 +15668,200 @@ def merge_subtask_worktree( ) verification = {"ran": True, "status": "passed", "checks": results} + return { + "status": "success", + "ok": True, + "wt_head": wt_head, + "deleted": deleted, + "no_changes": no_changes, + "verification": verification, + } + + +def _wt_rollback(base_sha: str) -> None: + """Undo an in-progress wave merge: hard-reset to the wave base + clean. + + A ``git merge --squash`` records NO ``MERGE_HEAD``, so ``git merge --abort`` + is unusable (council Q2). ``reset --hard`` + ``clean -fd`` is the only correct + undo. MAP runtime state (.map/.codex/.agents) is EXCLUDED from the clean so a + rollback never destroys the worktree sidecar or step state. + """ + _wt_git(["reset", "--hard", base_sha]) + _wt_git(["clean", "-fd", "-e", ".map", "-e", ".codex", "-e", ".agents"]) + + +def _wt_unmerged_paths() -> list[str]: + """Paths left in a conflicted (unmerged) state after a failed squash merge.""" + r = _wt_git(["diff", "--name-only", "--diff-filter=U"]) + if r.returncode != 0: + return [] + return [ln.strip() for ln in r.stdout.splitlines() if ln.strip()] + + +def _wt_changed_files(base_sha: str, wt_head: str, wt_path: Path) -> list[str]: + """The set of files a worktree actually changed vs the wave base.""" + r = _wt_git(["diff", "--name-only", f"{base_sha}..{wt_head}"], cwd=wt_path) + if r.returncode != 0: + return [] + return [ln.strip() for ln in r.stdout.splitlines() if ln.strip()] + + +def _wt_overlap_pairs(prepared: list[dict[str, Any]]) -> list[dict[str, object]]: + """Telemetry: subtask pairs whose ACTUAL changed-file sets intersect. + + The scheduler's ``split_wave_by_file_conflicts`` only guarantees *declared* + ``affected_files`` are disjoint; an Actor can touch an unlisted file. Git's + textual-conflict abort is the HARD guard — this overlap report is advisory + attribution only (which subtasks "lied" about their boundaries). + """ + out: list[dict[str, object]] = [] + for i in range(len(prepared)): + for j in range(i + 1, len(prepared)): + a = set(prepared[i].get("changed_files") or []) + b = set(prepared[j].get("changed_files") or []) + shared = sorted(a & b) + if shared: + out.append( + { + "subtasks": [ + prepared[i]["subtask_id"], + prepared[j]["subtask_id"], + ], + "files": shared[:50], + } + ) + return out + + +def _wt_attribute_conflict( + conflict_files: list[str], prepared: list[dict[str, Any]] +) -> list[dict[str, object]]: + """Map conflicted paths back to the wave subtasks that touched them.""" + out: list[dict[str, object]] = [] + cset = set(conflict_files) + for item in prepared: + touched = sorted(cset & set(item.get("changed_files") or [])) + if touched: + out.append({"subtask_id": item["subtask_id"], "files": touched}) + return out + + +def _wt_merge_lock_path() -> Optional[Path]: + common = _wt_git_common_dir() + if common is None: + return None + return common / "map-framework" / "wave-merge.lock" + + +def _wt_acquire_merge_lock() -> Optional[Any]: + """Advisory lock so two wave merges never interleave squash commits. + + Returns an open file handle holding the lock, or None if the lock is already + held (the caller maps that to ``MERGE_IN_PROGRESS``). Degrades to a held-open + sentinel handle where ``fcntl`` is unavailable (non-POSIX) — concurrency + protection is best-effort there, but the release path stays uniform. + """ + lock_path = _wt_merge_lock_path() + if lock_path is None: + return None + lock_path.parent.mkdir(parents=True, exist_ok=True) + handle = lock_path.open("w") + try: + import fcntl # noqa: PLC0415 + except ImportError: + return handle # best-effort: no advisory lock available + try: + fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + except OSError: + handle.close() + return None + return handle + + +def _wt_release_merge_lock(handle: Optional[Any]) -> None: + if handle is None: + return + try: + import fcntl # noqa: PLC0415 + + try: + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) + except OSError: + pass + except ImportError: + pass + try: + handle.close() + except OSError: + pass + + +def merge_subtask_worktree( + subtask_id: str, + attempt: int = 0, + branch: Optional[str] = None, + verify_cmds: Optional[list[str]] = None, + skip_verify: bool = False, +) -> dict[str, object]: + """Accept a subtask: commit worktree work, run pre-merge verification IN the + worktree, then squash-merge ONE commit into the working branch (#284). + + Council-mandated guards run BEFORE the merge touches the working branch: + base-divergence, runtime-state-in-diff, bulk-deletion, submodule-pointer, + detached-HEAD, and the pre-merge `verification_checks` gate. Any failure + leaves the working branch untouched and returns a structured ``kind``. + """ + project_dir = _wt_project_dir() + if not _wt_is_git_repo(): + return _wt_error("NOT_A_REPO", "not inside a git work tree") + if _wt_cwd_is_managed_worktree(): + return _wt_error( + "NESTED_WORKTREE", "run merge from the main checkout, not inside a worktree" + ) + active = _wt_active_git_operation() + if active: + return _wt_error("ACTIVE_GIT_OP", f"a {active} is in progress; resolve it first") + slug = _wt_slug(subtask_id) + if slug is None: + return _wt_error("INVALID_SUBTASK_ID", f"unsafe subtask id: {subtask_id!r}") + branch_name = branch or get_branch_name() + state = _read_worktree_state(branch_name) + worktrees = state["worktrees"] + record = worktrees.get(slug) if isinstance(worktrees, dict) else None + if not isinstance(record, dict): + return _wt_error( + "NO_WORKTREE", + f"no recorded worktree for subtask {subtask_id!r}; create it first", + ) + wt_path = Path(str(record.get("path", ""))) + wt_branch = str(record.get("branch", "")) + base_sha = str(record.get("base_sha", "")) + if not wt_path.is_dir() or not wt_branch or not base_sha: + return _wt_error( + "WORKTREE_MISSING", + "the recorded worktree is missing on disk; discard and recreate", + ) + + working_head = _wt_head_sha() + if working_head != base_sha: + return _wt_error( + "BASE_DIVERGED", + f"working branch advanced since the worktree was created " + f"(base={base_sha[:8]}, head={(working_head or '?')[:8]}); discard and " + "recreate the worktree off the new HEAD", + base_sha=base_sha, + working_head=working_head, + ) + + prep = _wt_freeze_and_verify( + subtask_id, record, project_dir, branch_name, verify_cmds, skip_verify + ) + if prep.get("status") == "error": + return prep + deleted = cast(list, prep["deleted"]) + no_changes = bool(prep["no_changes"]) + verification = cast(dict, prep["verification"]) + merged_sha = working_head if not no_changes: merge = _wt_git(["merge", "--squash", wt_branch]) @@ -15824,6 +15988,319 @@ def discard_subtask_worktree( } +def merge_wave_worktrees( + subtask_ids: list[str], + branch: Optional[str] = None, + verify_cmds: Optional[list[str]] = None, + skip_verify: bool = False, + post_wave_cmds: Optional[list[str]] = None, + skip_post_wave: bool = False, +) -> dict[str, object]: + """Accept a whole parallel wave atomically (#284 Phase 2, wave/DAG). + + Every subtask in a wave ran in its own worktree cut off the SAME base (HEAD + at wave start). Merging them one-by-one via ``merge_subtask_worktree`` is + impossible: the first merge advances HEAD, so the second trips its + ``BASE_DIVERGED`` guard. This coordinator relaxes ONLY that guard to a + wave-scoped form — it refuses EXTERNAL HEAD movement but ALLOWS the sibling + divergence each in-wave squash-merge creates. + + All-or-nothing (council Q2): any conflict, commit, or post-wave-gate failure + rolls the working branch back to the wave base via ``reset --hard`` + + ``clean -fd`` (squash merges leave no ``MERGE_HEAD`` so ``git merge --abort`` + is NOT used) and leaves EVERY worktree intact for retry. Council-reviewed + (conv ``c29d6fa9``): dedicated coordinator over a flag on the single path; + ``wave_base_sha`` derived from the sidecar; merge by frozen SHA; per-worktree + pre-merge verify + ONE post-wave full gate inside the atomic transaction. + """ + project_dir = _wt_project_dir() + if not _wt_is_git_repo(): + return _wt_error("NOT_A_REPO", "not inside a git work tree") + if _wt_cwd_is_managed_worktree(): + return _wt_error( + "NESTED_WORKTREE", "run wave merge from the main checkout, not inside a worktree" + ) + active = _wt_active_git_operation() + if active: + return _wt_error("ACTIVE_GIT_OP", f"a {active} is in progress; resolve it first") + + ids = sorted({str(s) for s in subtask_ids if str(s).strip()}) + if not ids: + return _wt_error("NO_SUBTASKS", "no subtask ids supplied for the wave merge") + + branch_name = branch or get_branch_name() + state = _read_worktree_state(branch_name) + worktrees = state["worktrees"] + if not isinstance(worktrees, dict): + return _wt_error("NO_WORKTREE", "no worktree state recorded for this branch") + + # Resolve every subtask's record; validate slug + on-disk presence. + records: list[tuple[str, str, dict]] = [] # (subtask_id, slug, record) + base_shas: set[str] = set() + for sid in ids: + slug = _wt_slug(sid) + if slug is None: + return _wt_error("INVALID_SUBTASK_ID", f"unsafe subtask id: {sid!r}") + record = worktrees.get(slug) + if not isinstance(record, dict): + return _wt_error( + "NO_WORKTREE", + f"no recorded worktree for subtask {sid!r}; create it first", + subtask_id=sid, + ) + wt_path = Path(str(record.get("path", ""))) + if not wt_path.is_dir() or not record.get("branch") or not record.get("base_sha"): + return _wt_error( + "WORKTREE_MISSING", + f"the recorded worktree for {sid!r} is missing on disk; discard and recreate", + subtask_id=sid, + ) + base_shas.add(str(record.get("base_sha"))) + records.append((sid, slug, record)) + + # A coherent wave's worktrees all share one base (cut off the same HEAD). + if len(base_shas) != 1: + return _wt_error( + "WAVE_BASE_MISMATCH", + "worktrees in the wave were created off different bases; recreate them " + "off a single HEAD before a wave merge", + bases=sorted(b[:8] for b in base_shas), + ) + wave_base_sha = next(iter(base_shas)) + + # External-movement guard: the working branch must still sit at the wave base. + # Sibling divergence WITHIN the wave is expected and allowed; commits made + # outside the wave are not (they invalidate every worktree's pre-merge state). + working_head = _wt_head_sha() + if working_head != wave_base_sha: + return _wt_error( + "EXTERNAL_HEAD_MOVED", + f"working branch advanced outside the wave (base={wave_base_sha[:8]}, " + f"head={(working_head or '?')[:8]}); recreate the wave worktrees off the " + "new HEAD", + base_sha=wave_base_sha, + working_head=working_head, + ) + + # The target must be an attached, clean branch before we touch it — rollback + # semantics depend on it. MAP runtime state is excluded from the dirty check. + cur = _wt_git(["rev-parse", "--abbrev-ref", "HEAD"], timeout=10) + if cur.returncode != 0 or cur.stdout.strip() == "HEAD": + return _wt_error( + "DETACHED_HEAD", + "refusing to wave-merge onto a detached HEAD; check out the working branch", + ) + status = _wt_git(["status", "--porcelain"]) + dirty = [ + ln + for ln in status.stdout.splitlines() + if ln.strip() and not _wt_is_runtime_state_path(_wt_porcelain_path(ln)) + ] + if dirty: + return _wt_error( + "DIRTY_TARGET", + "the working tree has uncommitted changes; commit/stash before a wave merge", + dirty=dirty[:20], + ) + + # Serialize coordinators so two waves never interleave squash commits. + lock_handle = _wt_acquire_merge_lock() + if lock_handle is None: + return _wt_error( + "MERGE_IN_PROGRESS", + "another wave merge is in progress on this repository; retry when it completes", + ) + + try: + # PHASE 1 — preflight every worktree (commit + guards + pre-merge verify) + # WITHOUT touching the working branch. A failure here aborts BEFORE any + # merge, so the working branch is trivially untouched. + prepared: list[dict[str, Any]] = [] + for sid, slug, record in records: + prep = _wt_freeze_and_verify( + sid, record, project_dir, branch_name, verify_cmds, skip_verify + ) + if prep.get("status") == "error": + prep.setdefault("subtask_id", sid) + prep["phase"] = "preflight" + return prep + changed_files = _wt_changed_files( + str(record.get("base_sha")), + str(prep["wt_head"]), + Path(str(record.get("path", ""))), + ) + prepared.append( + { + "subtask_id": sid, + "slug": slug, + "record": record, + "wt_head": str(prep["wt_head"]), + "no_changes": bool(prep["no_changes"]), + "deleted": prep["deleted"], + "changed_files": changed_files, + } + ) + + # Declared-disjoint is only a scheduler hint; report ACTUAL overlap for + # attribution. Git's textual-conflict abort below is the HARD guard. + overlaps = _wt_overlap_pairs(prepared) + + # PHASE 2 — sequential squash-merge by FROZEN SHA onto the advancing HEAD. + merged: list[dict[str, Any]] = [] + for item in prepared: + sid = str(item["subtask_id"]) + if item["no_changes"]: + continue + wt_head = str(item["wt_head"]) + merge = _wt_git(["merge", "--squash", wt_head]) + if merge.returncode != 0: + conflict_files = _wt_unmerged_paths() + attribution = _wt_attribute_conflict(conflict_files, prepared) + _wt_rollback(wave_base_sha) + _wt_set_manifest( + branch_name, + "wave_failed", + { + "subtask_id": sid, + "reason": "merge_conflict", + "conflict_files": conflict_files[:50], + }, + ) + return _wt_error( + "WAVE_MERGE_CONFLICT", + f"squash-merge of {sid} hit a conflict; rolled the wave back to " + f"base {wave_base_sha[:8]} (NO subtask merged). The conflicting " + "files were touched by more than one subtask — fix affected_files " + "or re-decompose.", + subtask_id=sid, + conflict_files=conflict_files[:50], + attribution=attribution, + stderr_tail=_clip_probe_output(merge.stderr)[-2000:], + ) + commit = _wt_git( + ["commit", "--no-verify", "-m", f"{sid}: merge isolated worktree (wave)"] + ) + combined = (commit.stdout + commit.stderr).lower() + if commit.returncode != 0 and "nothing to commit" not in combined: + _wt_rollback(wave_base_sha) + _wt_set_manifest( + branch_name, "wave_failed", {"subtask_id": sid, "reason": "commit_failed"} + ) + return _wt_error( + "WAVE_COMMIT_FAILED", + commit.stderr.strip() or f"git commit failed after squash for {sid}", + subtask_id=sid, + ) + merged.append( + { + "subtask_id": sid, + "merged_sha": _wt_head_sha(), + "deletions": len(item["deleted"]) if isinstance(item["deleted"], list) else 0, + } + ) + + # PHASE 3 — ONE post-wave full gate on the merged tree, INSIDE the atomic + # transaction (council Q3): a semantic break two subtasks create together + # (A renames a symbol B references) is caught here, not by git's textual + # merge. Failure rolls the WHOLE wave back. + post_checks = ( + list(post_wave_cmds) + if post_wave_cmds is not None + else _wt_config_verification_checks(project_dir) + ) + post_wave: dict[str, object] = {"ran": False, "status": "skipped", "checks": []} + if not skip_post_wave and post_checks and merged: + results: list[dict[str, object]] = [] + top = _wt_toplevel() or Path(".") + for cmd in post_checks: + argv = shlex.split(cmd) + if not argv: + continue + try: + cp = subprocess.run( + argv, + cwd=str(top), + capture_output=True, + text=True, + timeout=WORKTREE_VERIFY_TIMEOUT, + ) + except subprocess.TimeoutExpired: + _wt_rollback(wave_base_sha) + _wt_set_manifest( + branch_name, + "wave_failed", + {"reason": "post_wave_timeout", "command": cmd}, + ) + return _wt_error( + "WAVE_VERIFY_TIMEOUT", + f"post-wave verification timed out: {cmd}; rolled back to base", + command=cmd, + ) + except (OSError, subprocess.SubprocessError) as exc: + _wt_rollback(wave_base_sha) + return _wt_error( + "WAVE_VERIFY_ERROR", + f"post-wave verification failed to run: {cmd}: {exc}", + command=cmd, + ) + results.append({"command": cmd, "returncode": cp.returncode}) + if cp.returncode != 0: + _wt_rollback(wave_base_sha) + _wt_set_manifest( + branch_name, + "wave_failed", + {"reason": "post_wave_failed", "command": cmd, "returncode": cp.returncode}, + ) + return _wt_error( + "WAVE_VERIFY_FAILED", + f"post-wave gate failed: {cmd} (exit {cp.returncode}); rolled the " + f"wave back to base {wave_base_sha[:8]} (NO subtask merged)", + command=cmd, + returncode=cp.returncode, + stderr_tail=_clip_probe_output(cp.stderr)[-2000:], + ) + post_wave = {"ran": True, "status": "passed", "checks": results} + + # PHASE 4 — accept: remove every worktree+branch, drop from the sidecar. + state = _read_worktree_state(branch_name) + worktrees = state["worktrees"] + for item in prepared: + rec = cast(dict, item["record"]) + _wt_force_remove(Path(str(rec.get("path", ""))), str(rec.get("branch", ""))) + if isinstance(worktrees, dict): + worktrees.pop(str(item["slug"]), None) + _write_worktree_state(branch_name, state) + final_head = _wt_head_sha() + no_change_ids = [str(p["subtask_id"]) for p in prepared if p["no_changes"]] + merged_ids = [str(m["subtask_id"]) for m in merged] + _wt_set_manifest( + branch_name, + "wave_merged", + { + "subtasks": merged_ids, + "merged_count": len(merged), + "no_change_count": len(no_change_ids), + "final_sha": final_head, + "post_wave": post_wave.get("status"), + }, + ) + + return { + "status": "success", + "ok": True, + "wave_base_sha": wave_base_sha, + "final_sha": final_head, + "merged": merged_ids, + "merged_count": len(merged), + "no_changes": no_change_ids, + "post_wave": post_wave, + "overlaps": overlaps, + "note": "all wave subtasks squash-merged atomically; worktrees cleaned up", + } + finally: + _wt_release_merge_lock(lock_handle) + + def worktree_isolation_status(branch: Optional[str] = None) -> dict[str, object]: """Report whether isolation is enabled + reconcile recorded vs live worktrees.""" project_dir = _wt_project_dir() @@ -17300,6 +17777,36 @@ def _flag_val(name: str) -> Optional[str]: if _wt_r.get("status") == "error": sys.exit(1) + elif func_name == "merge_wave_worktrees": + # CLI: merge_wave_worktrees [ ...] [--branch B] + # [--verify-cmd CMD ...] [--skip-verify] + # [--post-wave-cmd CMD ...] [--skip-post-wave] + # Accept a whole parallel wave atomically: per-worktree pre-merge verify, + # sequential squash-merge by frozen SHA onto the advancing HEAD, ONE + # post-wave gate inside the transaction. Any failure rolls the wave back + # to base and exits 1; worktrees are left intact for retry. + import argparse as _ap + + _p = _ap.ArgumentParser(prog="map_step_runner.py merge_wave_worktrees") + _p.add_argument("subtask_ids", nargs="+") + _p.add_argument("--branch", default=None) + _p.add_argument("--verify-cmd", action="append", default=None) + _p.add_argument("--skip-verify", action="store_true") + _p.add_argument("--post-wave-cmd", action="append", default=None) + _p.add_argument("--skip-post-wave", action="store_true") + _a = _p.parse_args(sys.argv[2:]) + _wt_r = merge_wave_worktrees( + _a.subtask_ids, + _a.branch, + _a.verify_cmd, + _a.skip_verify, + _a.post_wave_cmd, + _a.skip_post_wave, + ) + print(json.dumps(_wt_r, indent=2)) + if _wt_r.get("status") == "error": + sys.exit(1) + elif func_name == "discard_subtask_worktree": # CLI: discard_subtask_worktree [--attempt N] [--branch B] # [--save-patch] diff --git a/src/mapify_cli/templates/skills/map-efficient/SKILL.md b/src/mapify_cli/templates/skills/map-efficient/SKILL.md index fcdf525f..5069fa4d 100644 --- a/src/mapify_cli/templates/skills/map-efficient/SKILL.md +++ b/src/mapify_cli/templates/skills/map-efficient/SKILL.md @@ -419,7 +419,7 @@ Every Monitor failure must create a durable `code-review-N.md` with exact issue, ### Per-Wave Gates (after all subtasks in wave pass Monitor) -Run build first, then tests, then linter. If build fails, skip tests/lint and reopen the owning subtask. Run the FULL test suite (not a `-k` subset) whenever any subtask in the wave tripped the cross-subtask regression gate (`recommended_gate == "full_suite"`) — a parallel wave that edits a shared file is the highest-risk case for a regression no single subtask's scoped run can see. +Run build first, then tests, then linter. If build fails, skip tests/lint and reopen the owning subtask. Run the FULL test suite (not a `-k` subset) whenever any subtask in the wave tripped the cross-subtask regression gate (`recommended_gate == "full_suite"`) — a parallel wave that edits a shared file is the highest-risk case for a regression no single subtask's scoped run can see. **Worktree isolation, parallel wave:** when `worktree.isolation` is on and the wave has ≥2 isolated subtasks, accept the whole wave atomically with `merge_wave_worktrees` (never one at a time — the first merge trips `BASE_DIVERGED`); it runs this post-wave gate inside the transaction and rolls the wave back on failure — see [efficient-reference.md](efficient-reference.md#worktree-isolation). ## Step 2a: Validate Step Completion diff --git a/src/mapify_cli/templates/skills/map-efficient/efficient-reference.md b/src/mapify_cli/templates/skills/map-efficient/efficient-reference.md index 0c7965c2..80284856 100644 --- a/src/mapify_cli/templates/skills/map-efficient/efficient-reference.md +++ b/src/mapify_cli/templates/skills/map-efficient/efficient-reference.md @@ -105,7 +105,7 @@ including clean passes — must carry concrete evidence references. ## Wave Execution -Sequential is default. Parallel execution is allowed only when a wave has satisfied dependencies, low risk, and disjoint new-file writes, or when the user explicitly requests it. Use `get_wave_step`, `validate_wave_step`, and `advance_wave`; do not mix wave APIs with the single-current-subtask API. +Sequential is default. Parallel execution is allowed only when a wave has satisfied dependencies, low risk, and disjoint new-file writes, or when the user explicitly requests it. Use `get_wave_step`, `validate_wave_step`, and `advance_wave`; do not mix wave APIs with the single-current-subtask API. When `worktree.isolation` is on and a wave runs in parallel, each subtask gets its own worktree and the wave is accepted atomically via `merge_wave_worktrees` — see [Parallel waves](#worktree-isolation) under Worktree isolation. ## Predictor Recovery @@ -551,6 +551,43 @@ python3 .map/scripts/map_step_runner.py discard_subtask_worktree "$SUBTASK_ID" - The retry creates a fresh worktree off the current HEAD. Inspect state any time with `worktree_isolation_status`. +### Parallel waves (≥2 worktree-isolated subtasks) — #284 Phase 2 + +When `get_wave_step` returns `mode:"parallel"` (a wave with ≥2 disjoint-file +subtasks) AND isolation is enabled, give EACH subtask its own worktree and +dispatch the Actors concurrently (separate Task agents, each pinned to its own +`$WT_PATH`). Do NOT merge them one at a time: every worktree was cut off the same +HEAD, so the first `merge_subtask_worktree` advances the working branch and the +next trips `BASE_DIVERGED`. Accept the whole wave atomically instead — only after +EVERY subtask in the wave has passed Monitor (+ Evaluator): + +```bash +python3 .map/scripts/map_step_runner.py merge_wave_worktrees "$ST_A" "$ST_B" "$ST_C" +``` + +The coordinator (council-reviewed, conv `c29d6fa9`): derives the wave base from +the sidecar; refuses EXTERNAL HEAD movement but allows the sibling divergence each +in-wave squash-merge creates; runs each worktree's pre-merge `verification_checks`, +then squash-merges every accepted worktree by frozen SHA in sorted id order (one +runner commit per subtask), then runs the post-wave full gate **inside the same +transaction**. It is **all-or-nothing**: any textual conflict, commit failure, or +post-wave-gate failure rolls the WHOLE wave back to the base (`reset --hard` + +`clean -fd`, never `git merge --abort` — squash leaves no `MERGE_HEAD`) and leaves +every worktree intact for retry. Pass each subtask's `merged_sha` from the result +to `record_subtask_result --commit-sha`. This **replaces** the separate Per-Wave +Gate when isolation is on — the post-wave gate runs inside `merge_wave_worktrees`. + +Failure `kind`s (working branch untouched / rolled back to base, worktrees kept): +`WAVE_MERGE_CONFLICT` (with `attribution` naming the subtasks that touched each +conflicted file — fix `affected_files` or re-decompose), `WAVE_VERIFY_FAILED` +(post-wave gate red), `EXTERNAL_HEAD_MOVED` (a commit landed outside the wave — +recreate the worktrees off the new HEAD), `WAVE_BASE_MISMATCH`, `DIRTY_TARGET`, +`MERGE_IN_PROGRESS`, plus the per-worktree preflight `kind`s (`VERIFY_FAILED`, +`BULK_DELETION`, … with `phase:"preflight"`). On any Monitor `valid=false` for a +single wave subtask, `discard_subtask_worktree` THAT subtask and retry it; call +`merge_wave_worktrees` only once the whole wave is green. The `overlaps` field is +advisory telemetry (actual changed-file intersections), not a gate. + ## Troubleshooting - Blueprint validation fails: fix the decomposer output before Actor starts. diff --git a/src/mapify_cli/templates_src/codex/skills/map-efficient/efficient-reference.md.jinja b/src/mapify_cli/templates_src/codex/skills/map-efficient/efficient-reference.md.jinja index 0c9198ab..f8252ce1 100644 --- a/src/mapify_cli/templates_src/codex/skills/map-efficient/efficient-reference.md.jinja +++ b/src/mapify_cli/templates_src/codex/skills/map-efficient/efficient-reference.md.jinja @@ -94,6 +94,20 @@ python3 .map/scripts/map_orchestrator.py advance_wave Do not mix wave APIs with the sequential `get_next_step` cursor for the same wave unless the orchestrator response explicitly tells you to fall back. +When `worktree.isolation` is enabled and a wave runs in parallel (≥2 disjoint +subtasks), give each subtask its own worktree and accept the whole wave +atomically after all pass Monitor — never merge them one at a time (the first +merge advances HEAD and the next trips `BASE_DIVERGED`): + +```bash +python3 .map/scripts/map_step_runner.py merge_wave_worktrees "$ST_A" "$ST_B" +``` + +It runs the post-wave gate inside the transaction and rolls the whole wave back +to base on any conflict or gate failure (worktrees kept for retry). On a single +subtask's Monitor failure, `discard_subtask_worktree` that subtask and retry it +before calling `merge_wave_worktrees`. + ## TDD Mode `--tdd` inserts `TEST_WRITER` and `TEST_FAIL_GATE` before `ACTOR`. diff --git a/src/mapify_cli/templates_src/map/scripts/map_step_runner.py.jinja b/src/mapify_cli/templates_src/map/scripts/map_step_runner.py.jinja index 8a37b1de..c1ca6541 100755 --- a/src/mapify_cli/templates_src/map/scripts/map_step_runner.py.jinja +++ b/src/mapify_cli/templates_src/map/scripts/map_step_runner.py.jinja @@ -15534,62 +15534,27 @@ def create_subtask_worktree( } -def merge_subtask_worktree( +def _wt_freeze_and_verify( subtask_id: str, - attempt: int = 0, - branch: Optional[str] = None, + record: dict, + project_dir: Path, + branch_name: str, verify_cmds: Optional[list[str]] = None, skip_verify: bool = False, ) -> dict[str, object]: - """Accept a subtask: commit worktree work, run pre-merge verification IN the - worktree, then squash-merge ONE commit into the working branch (#284). - - Council-mandated guards run BEFORE the merge touches the working branch: - base-divergence, runtime-state-in-diff, bulk-deletion, submodule-pointer, - detached-HEAD, and the pre-merge `verification_checks` gate. Any failure - leaves the working branch untouched and returns a structured ``kind``. + """Commit a worktree's work + run per-worktree guards + pre-merge verify. + + Operates ONLY inside the worktree — never touches the working branch. Shared + by ``merge_subtask_worktree`` (single) and ``merge_wave_worktrees`` (wave) so + the guard/verify logic has exactly one definition (council Q4: share + lower-level primitives, keep the two coordinators as separate compositions). + On success returns ``{"ok": True, "wt_head", "deleted", "no_changes", + "verification"}``; on any guard/verify failure returns a structured + ``_wt_error`` (``status=="error"``). """ - project_dir = _wt_project_dir() - if not _wt_is_git_repo(): - return _wt_error("NOT_A_REPO", "not inside a git work tree") - if _wt_cwd_is_managed_worktree(): - return _wt_error( - "NESTED_WORKTREE", "run merge from the main checkout, not inside a worktree" - ) - active = _wt_active_git_operation() - if active: - return _wt_error("ACTIVE_GIT_OP", f"a {active} is in progress; resolve it first") - slug = _wt_slug(subtask_id) - if slug is None: - return _wt_error("INVALID_SUBTASK_ID", f"unsafe subtask id: {subtask_id!r}") - branch_name = branch or get_branch_name() - state = _read_worktree_state(branch_name) - worktrees = state["worktrees"] - record = worktrees.get(slug) if isinstance(worktrees, dict) else None - if not isinstance(record, dict): - return _wt_error( - "NO_WORKTREE", - f"no recorded worktree for subtask {subtask_id!r}; create it first", - ) wt_path = Path(str(record.get("path", ""))) wt_branch = str(record.get("branch", "")) base_sha = str(record.get("base_sha", "")) - if not wt_path.is_dir() or not wt_branch or not base_sha: - return _wt_error( - "WORKTREE_MISSING", - "the recorded worktree is missing on disk; discard and recreate", - ) - - working_head = _wt_head_sha() - if working_head != base_sha: - return _wt_error( - "BASE_DIVERGED", - f"working branch advanced since the worktree was created " - f"(base={base_sha[:8]}, head={(working_head or '?')[:8]}); discard and " - "recreate the worktree off the new HEAD", - base_sha=base_sha, - working_head=working_head, - ) add = _wt_git(["add", "-A"], cwd=wt_path) if add.returncode != 0: @@ -15599,7 +15564,12 @@ def merge_subtask_worktree( staged = _wt_git(["diff", "--cached", "--quiet"], cwd=wt_path) if staged.returncode == 1: commit = _wt_git( - ["commit", "--no-verify", "-m", f"map-wt: {subtask_id} (attempt {record.get('attempt', 0)})"], + [ + "commit", + "--no-verify", + "-m", + f"map-wt: {subtask_id} (attempt {record.get('attempt', 0)})", + ], cwd=wt_path, ) if commit.returncode != 0: @@ -15698,6 +15668,200 @@ def merge_subtask_worktree( ) verification = {"ran": True, "status": "passed", "checks": results} + return { + "status": "success", + "ok": True, + "wt_head": wt_head, + "deleted": deleted, + "no_changes": no_changes, + "verification": verification, + } + + +def _wt_rollback(base_sha: str) -> None: + """Undo an in-progress wave merge: hard-reset to the wave base + clean. + + A ``git merge --squash`` records NO ``MERGE_HEAD``, so ``git merge --abort`` + is unusable (council Q2). ``reset --hard`` + ``clean -fd`` is the only correct + undo. MAP runtime state (.map/.codex/.agents) is EXCLUDED from the clean so a + rollback never destroys the worktree sidecar or step state. + """ + _wt_git(["reset", "--hard", base_sha]) + _wt_git(["clean", "-fd", "-e", ".map", "-e", ".codex", "-e", ".agents"]) + + +def _wt_unmerged_paths() -> list[str]: + """Paths left in a conflicted (unmerged) state after a failed squash merge.""" + r = _wt_git(["diff", "--name-only", "--diff-filter=U"]) + if r.returncode != 0: + return [] + return [ln.strip() for ln in r.stdout.splitlines() if ln.strip()] + + +def _wt_changed_files(base_sha: str, wt_head: str, wt_path: Path) -> list[str]: + """The set of files a worktree actually changed vs the wave base.""" + r = _wt_git(["diff", "--name-only", f"{base_sha}..{wt_head}"], cwd=wt_path) + if r.returncode != 0: + return [] + return [ln.strip() for ln in r.stdout.splitlines() if ln.strip()] + + +def _wt_overlap_pairs(prepared: list[dict[str, Any]]) -> list[dict[str, object]]: + """Telemetry: subtask pairs whose ACTUAL changed-file sets intersect. + + The scheduler's ``split_wave_by_file_conflicts`` only guarantees *declared* + ``affected_files`` are disjoint; an Actor can touch an unlisted file. Git's + textual-conflict abort is the HARD guard — this overlap report is advisory + attribution only (which subtasks "lied" about their boundaries). + """ + out: list[dict[str, object]] = [] + for i in range(len(prepared)): + for j in range(i + 1, len(prepared)): + a = set(prepared[i].get("changed_files") or []) + b = set(prepared[j].get("changed_files") or []) + shared = sorted(a & b) + if shared: + out.append( + { + "subtasks": [ + prepared[i]["subtask_id"], + prepared[j]["subtask_id"], + ], + "files": shared[:50], + } + ) + return out + + +def _wt_attribute_conflict( + conflict_files: list[str], prepared: list[dict[str, Any]] +) -> list[dict[str, object]]: + """Map conflicted paths back to the wave subtasks that touched them.""" + out: list[dict[str, object]] = [] + cset = set(conflict_files) + for item in prepared: + touched = sorted(cset & set(item.get("changed_files") or [])) + if touched: + out.append({"subtask_id": item["subtask_id"], "files": touched}) + return out + + +def _wt_merge_lock_path() -> Optional[Path]: + common = _wt_git_common_dir() + if common is None: + return None + return common / "map-framework" / "wave-merge.lock" + + +def _wt_acquire_merge_lock() -> Optional[Any]: + """Advisory lock so two wave merges never interleave squash commits. + + Returns an open file handle holding the lock, or None if the lock is already + held (the caller maps that to ``MERGE_IN_PROGRESS``). Degrades to a held-open + sentinel handle where ``fcntl`` is unavailable (non-POSIX) — concurrency + protection is best-effort there, but the release path stays uniform. + """ + lock_path = _wt_merge_lock_path() + if lock_path is None: + return None + lock_path.parent.mkdir(parents=True, exist_ok=True) + handle = lock_path.open("w") + try: + import fcntl # noqa: PLC0415 + except ImportError: + return handle # best-effort: no advisory lock available + try: + fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + except OSError: + handle.close() + return None + return handle + + +def _wt_release_merge_lock(handle: Optional[Any]) -> None: + if handle is None: + return + try: + import fcntl # noqa: PLC0415 + + try: + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) + except OSError: + pass + except ImportError: + pass + try: + handle.close() + except OSError: + pass + + +def merge_subtask_worktree( + subtask_id: str, + attempt: int = 0, + branch: Optional[str] = None, + verify_cmds: Optional[list[str]] = None, + skip_verify: bool = False, +) -> dict[str, object]: + """Accept a subtask: commit worktree work, run pre-merge verification IN the + worktree, then squash-merge ONE commit into the working branch (#284). + + Council-mandated guards run BEFORE the merge touches the working branch: + base-divergence, runtime-state-in-diff, bulk-deletion, submodule-pointer, + detached-HEAD, and the pre-merge `verification_checks` gate. Any failure + leaves the working branch untouched and returns a structured ``kind``. + """ + project_dir = _wt_project_dir() + if not _wt_is_git_repo(): + return _wt_error("NOT_A_REPO", "not inside a git work tree") + if _wt_cwd_is_managed_worktree(): + return _wt_error( + "NESTED_WORKTREE", "run merge from the main checkout, not inside a worktree" + ) + active = _wt_active_git_operation() + if active: + return _wt_error("ACTIVE_GIT_OP", f"a {active} is in progress; resolve it first") + slug = _wt_slug(subtask_id) + if slug is None: + return _wt_error("INVALID_SUBTASK_ID", f"unsafe subtask id: {subtask_id!r}") + branch_name = branch or get_branch_name() + state = _read_worktree_state(branch_name) + worktrees = state["worktrees"] + record = worktrees.get(slug) if isinstance(worktrees, dict) else None + if not isinstance(record, dict): + return _wt_error( + "NO_WORKTREE", + f"no recorded worktree for subtask {subtask_id!r}; create it first", + ) + wt_path = Path(str(record.get("path", ""))) + wt_branch = str(record.get("branch", "")) + base_sha = str(record.get("base_sha", "")) + if not wt_path.is_dir() or not wt_branch or not base_sha: + return _wt_error( + "WORKTREE_MISSING", + "the recorded worktree is missing on disk; discard and recreate", + ) + + working_head = _wt_head_sha() + if working_head != base_sha: + return _wt_error( + "BASE_DIVERGED", + f"working branch advanced since the worktree was created " + f"(base={base_sha[:8]}, head={(working_head or '?')[:8]}); discard and " + "recreate the worktree off the new HEAD", + base_sha=base_sha, + working_head=working_head, + ) + + prep = _wt_freeze_and_verify( + subtask_id, record, project_dir, branch_name, verify_cmds, skip_verify + ) + if prep.get("status") == "error": + return prep + deleted = cast(list, prep["deleted"]) + no_changes = bool(prep["no_changes"]) + verification = cast(dict, prep["verification"]) + merged_sha = working_head if not no_changes: merge = _wt_git(["merge", "--squash", wt_branch]) @@ -15824,6 +15988,319 @@ def discard_subtask_worktree( } +def merge_wave_worktrees( + subtask_ids: list[str], + branch: Optional[str] = None, + verify_cmds: Optional[list[str]] = None, + skip_verify: bool = False, + post_wave_cmds: Optional[list[str]] = None, + skip_post_wave: bool = False, +) -> dict[str, object]: + """Accept a whole parallel wave atomically (#284 Phase 2, wave/DAG). + + Every subtask in a wave ran in its own worktree cut off the SAME base (HEAD + at wave start). Merging them one-by-one via ``merge_subtask_worktree`` is + impossible: the first merge advances HEAD, so the second trips its + ``BASE_DIVERGED`` guard. This coordinator relaxes ONLY that guard to a + wave-scoped form — it refuses EXTERNAL HEAD movement but ALLOWS the sibling + divergence each in-wave squash-merge creates. + + All-or-nothing (council Q2): any conflict, commit, or post-wave-gate failure + rolls the working branch back to the wave base via ``reset --hard`` + + ``clean -fd`` (squash merges leave no ``MERGE_HEAD`` so ``git merge --abort`` + is NOT used) and leaves EVERY worktree intact for retry. Council-reviewed + (conv ``c29d6fa9``): dedicated coordinator over a flag on the single path; + ``wave_base_sha`` derived from the sidecar; merge by frozen SHA; per-worktree + pre-merge verify + ONE post-wave full gate inside the atomic transaction. + """ + project_dir = _wt_project_dir() + if not _wt_is_git_repo(): + return _wt_error("NOT_A_REPO", "not inside a git work tree") + if _wt_cwd_is_managed_worktree(): + return _wt_error( + "NESTED_WORKTREE", "run wave merge from the main checkout, not inside a worktree" + ) + active = _wt_active_git_operation() + if active: + return _wt_error("ACTIVE_GIT_OP", f"a {active} is in progress; resolve it first") + + ids = sorted({str(s) for s in subtask_ids if str(s).strip()}) + if not ids: + return _wt_error("NO_SUBTASKS", "no subtask ids supplied for the wave merge") + + branch_name = branch or get_branch_name() + state = _read_worktree_state(branch_name) + worktrees = state["worktrees"] + if not isinstance(worktrees, dict): + return _wt_error("NO_WORKTREE", "no worktree state recorded for this branch") + + # Resolve every subtask's record; validate slug + on-disk presence. + records: list[tuple[str, str, dict]] = [] # (subtask_id, slug, record) + base_shas: set[str] = set() + for sid in ids: + slug = _wt_slug(sid) + if slug is None: + return _wt_error("INVALID_SUBTASK_ID", f"unsafe subtask id: {sid!r}") + record = worktrees.get(slug) + if not isinstance(record, dict): + return _wt_error( + "NO_WORKTREE", + f"no recorded worktree for subtask {sid!r}; create it first", + subtask_id=sid, + ) + wt_path = Path(str(record.get("path", ""))) + if not wt_path.is_dir() or not record.get("branch") or not record.get("base_sha"): + return _wt_error( + "WORKTREE_MISSING", + f"the recorded worktree for {sid!r} is missing on disk; discard and recreate", + subtask_id=sid, + ) + base_shas.add(str(record.get("base_sha"))) + records.append((sid, slug, record)) + + # A coherent wave's worktrees all share one base (cut off the same HEAD). + if len(base_shas) != 1: + return _wt_error( + "WAVE_BASE_MISMATCH", + "worktrees in the wave were created off different bases; recreate them " + "off a single HEAD before a wave merge", + bases=sorted(b[:8] for b in base_shas), + ) + wave_base_sha = next(iter(base_shas)) + + # External-movement guard: the working branch must still sit at the wave base. + # Sibling divergence WITHIN the wave is expected and allowed; commits made + # outside the wave are not (they invalidate every worktree's pre-merge state). + working_head = _wt_head_sha() + if working_head != wave_base_sha: + return _wt_error( + "EXTERNAL_HEAD_MOVED", + f"working branch advanced outside the wave (base={wave_base_sha[:8]}, " + f"head={(working_head or '?')[:8]}); recreate the wave worktrees off the " + "new HEAD", + base_sha=wave_base_sha, + working_head=working_head, + ) + + # The target must be an attached, clean branch before we touch it — rollback + # semantics depend on it. MAP runtime state is excluded from the dirty check. + cur = _wt_git(["rev-parse", "--abbrev-ref", "HEAD"], timeout=10) + if cur.returncode != 0 or cur.stdout.strip() == "HEAD": + return _wt_error( + "DETACHED_HEAD", + "refusing to wave-merge onto a detached HEAD; check out the working branch", + ) + status = _wt_git(["status", "--porcelain"]) + dirty = [ + ln + for ln in status.stdout.splitlines() + if ln.strip() and not _wt_is_runtime_state_path(_wt_porcelain_path(ln)) + ] + if dirty: + return _wt_error( + "DIRTY_TARGET", + "the working tree has uncommitted changes; commit/stash before a wave merge", + dirty=dirty[:20], + ) + + # Serialize coordinators so two waves never interleave squash commits. + lock_handle = _wt_acquire_merge_lock() + if lock_handle is None: + return _wt_error( + "MERGE_IN_PROGRESS", + "another wave merge is in progress on this repository; retry when it completes", + ) + + try: + # PHASE 1 — preflight every worktree (commit + guards + pre-merge verify) + # WITHOUT touching the working branch. A failure here aborts BEFORE any + # merge, so the working branch is trivially untouched. + prepared: list[dict[str, Any]] = [] + for sid, slug, record in records: + prep = _wt_freeze_and_verify( + sid, record, project_dir, branch_name, verify_cmds, skip_verify + ) + if prep.get("status") == "error": + prep.setdefault("subtask_id", sid) + prep["phase"] = "preflight" + return prep + changed_files = _wt_changed_files( + str(record.get("base_sha")), + str(prep["wt_head"]), + Path(str(record.get("path", ""))), + ) + prepared.append( + { + "subtask_id": sid, + "slug": slug, + "record": record, + "wt_head": str(prep["wt_head"]), + "no_changes": bool(prep["no_changes"]), + "deleted": prep["deleted"], + "changed_files": changed_files, + } + ) + + # Declared-disjoint is only a scheduler hint; report ACTUAL overlap for + # attribution. Git's textual-conflict abort below is the HARD guard. + overlaps = _wt_overlap_pairs(prepared) + + # PHASE 2 — sequential squash-merge by FROZEN SHA onto the advancing HEAD. + merged: list[dict[str, Any]] = [] + for item in prepared: + sid = str(item["subtask_id"]) + if item["no_changes"]: + continue + wt_head = str(item["wt_head"]) + merge = _wt_git(["merge", "--squash", wt_head]) + if merge.returncode != 0: + conflict_files = _wt_unmerged_paths() + attribution = _wt_attribute_conflict(conflict_files, prepared) + _wt_rollback(wave_base_sha) + _wt_set_manifest( + branch_name, + "wave_failed", + { + "subtask_id": sid, + "reason": "merge_conflict", + "conflict_files": conflict_files[:50], + }, + ) + return _wt_error( + "WAVE_MERGE_CONFLICT", + f"squash-merge of {sid} hit a conflict; rolled the wave back to " + f"base {wave_base_sha[:8]} (NO subtask merged). The conflicting " + "files were touched by more than one subtask — fix affected_files " + "or re-decompose.", + subtask_id=sid, + conflict_files=conflict_files[:50], + attribution=attribution, + stderr_tail=_clip_probe_output(merge.stderr)[-2000:], + ) + commit = _wt_git( + ["commit", "--no-verify", "-m", f"{sid}: merge isolated worktree (wave)"] + ) + combined = (commit.stdout + commit.stderr).lower() + if commit.returncode != 0 and "nothing to commit" not in combined: + _wt_rollback(wave_base_sha) + _wt_set_manifest( + branch_name, "wave_failed", {"subtask_id": sid, "reason": "commit_failed"} + ) + return _wt_error( + "WAVE_COMMIT_FAILED", + commit.stderr.strip() or f"git commit failed after squash for {sid}", + subtask_id=sid, + ) + merged.append( + { + "subtask_id": sid, + "merged_sha": _wt_head_sha(), + "deletions": len(item["deleted"]) if isinstance(item["deleted"], list) else 0, + } + ) + + # PHASE 3 — ONE post-wave full gate on the merged tree, INSIDE the atomic + # transaction (council Q3): a semantic break two subtasks create together + # (A renames a symbol B references) is caught here, not by git's textual + # merge. Failure rolls the WHOLE wave back. + post_checks = ( + list(post_wave_cmds) + if post_wave_cmds is not None + else _wt_config_verification_checks(project_dir) + ) + post_wave: dict[str, object] = {"ran": False, "status": "skipped", "checks": []} + if not skip_post_wave and post_checks and merged: + results: list[dict[str, object]] = [] + top = _wt_toplevel() or Path(".") + for cmd in post_checks: + argv = shlex.split(cmd) + if not argv: + continue + try: + cp = subprocess.run( + argv, + cwd=str(top), + capture_output=True, + text=True, + timeout=WORKTREE_VERIFY_TIMEOUT, + ) + except subprocess.TimeoutExpired: + _wt_rollback(wave_base_sha) + _wt_set_manifest( + branch_name, + "wave_failed", + {"reason": "post_wave_timeout", "command": cmd}, + ) + return _wt_error( + "WAVE_VERIFY_TIMEOUT", + f"post-wave verification timed out: {cmd}; rolled back to base", + command=cmd, + ) + except (OSError, subprocess.SubprocessError) as exc: + _wt_rollback(wave_base_sha) + return _wt_error( + "WAVE_VERIFY_ERROR", + f"post-wave verification failed to run: {cmd}: {exc}", + command=cmd, + ) + results.append({"command": cmd, "returncode": cp.returncode}) + if cp.returncode != 0: + _wt_rollback(wave_base_sha) + _wt_set_manifest( + branch_name, + "wave_failed", + {"reason": "post_wave_failed", "command": cmd, "returncode": cp.returncode}, + ) + return _wt_error( + "WAVE_VERIFY_FAILED", + f"post-wave gate failed: {cmd} (exit {cp.returncode}); rolled the " + f"wave back to base {wave_base_sha[:8]} (NO subtask merged)", + command=cmd, + returncode=cp.returncode, + stderr_tail=_clip_probe_output(cp.stderr)[-2000:], + ) + post_wave = {"ran": True, "status": "passed", "checks": results} + + # PHASE 4 — accept: remove every worktree+branch, drop from the sidecar. + state = _read_worktree_state(branch_name) + worktrees = state["worktrees"] + for item in prepared: + rec = cast(dict, item["record"]) + _wt_force_remove(Path(str(rec.get("path", ""))), str(rec.get("branch", ""))) + if isinstance(worktrees, dict): + worktrees.pop(str(item["slug"]), None) + _write_worktree_state(branch_name, state) + final_head = _wt_head_sha() + no_change_ids = [str(p["subtask_id"]) for p in prepared if p["no_changes"]] + merged_ids = [str(m["subtask_id"]) for m in merged] + _wt_set_manifest( + branch_name, + "wave_merged", + { + "subtasks": merged_ids, + "merged_count": len(merged), + "no_change_count": len(no_change_ids), + "final_sha": final_head, + "post_wave": post_wave.get("status"), + }, + ) + + return { + "status": "success", + "ok": True, + "wave_base_sha": wave_base_sha, + "final_sha": final_head, + "merged": merged_ids, + "merged_count": len(merged), + "no_changes": no_change_ids, + "post_wave": post_wave, + "overlaps": overlaps, + "note": "all wave subtasks squash-merged atomically; worktrees cleaned up", + } + finally: + _wt_release_merge_lock(lock_handle) + + def worktree_isolation_status(branch: Optional[str] = None) -> dict[str, object]: """Report whether isolation is enabled + reconcile recorded vs live worktrees.""" project_dir = _wt_project_dir() @@ -17300,6 +17777,36 @@ if __name__ == "__main__": if _wt_r.get("status") == "error": sys.exit(1) + elif func_name == "merge_wave_worktrees": + # CLI: merge_wave_worktrees [ ...] [--branch B] + # [--verify-cmd CMD ...] [--skip-verify] + # [--post-wave-cmd CMD ...] [--skip-post-wave] + # Accept a whole parallel wave atomically: per-worktree pre-merge verify, + # sequential squash-merge by frozen SHA onto the advancing HEAD, ONE + # post-wave gate inside the transaction. Any failure rolls the wave back + # to base and exits 1; worktrees are left intact for retry. + import argparse as _ap + + _p = _ap.ArgumentParser(prog="map_step_runner.py merge_wave_worktrees") + _p.add_argument("subtask_ids", nargs="+") + _p.add_argument("--branch", default=None) + _p.add_argument("--verify-cmd", action="append", default=None) + _p.add_argument("--skip-verify", action="store_true") + _p.add_argument("--post-wave-cmd", action="append", default=None) + _p.add_argument("--skip-post-wave", action="store_true") + _a = _p.parse_args(sys.argv[2:]) + _wt_r = merge_wave_worktrees( + _a.subtask_ids, + _a.branch, + _a.verify_cmd, + _a.skip_verify, + _a.post_wave_cmd, + _a.skip_post_wave, + ) + print(json.dumps(_wt_r, indent=2)) + if _wt_r.get("status") == "error": + sys.exit(1) + elif func_name == "discard_subtask_worktree": # CLI: discard_subtask_worktree [--attempt N] [--branch B] # [--save-patch] diff --git a/src/mapify_cli/templates_src/skills/map-efficient/SKILL.md.jinja b/src/mapify_cli/templates_src/skills/map-efficient/SKILL.md.jinja index fcdf525f..5069fa4d 100644 --- a/src/mapify_cli/templates_src/skills/map-efficient/SKILL.md.jinja +++ b/src/mapify_cli/templates_src/skills/map-efficient/SKILL.md.jinja @@ -419,7 +419,7 @@ Every Monitor failure must create a durable `code-review-N.md` with exact issue, ### Per-Wave Gates (after all subtasks in wave pass Monitor) -Run build first, then tests, then linter. If build fails, skip tests/lint and reopen the owning subtask. Run the FULL test suite (not a `-k` subset) whenever any subtask in the wave tripped the cross-subtask regression gate (`recommended_gate == "full_suite"`) — a parallel wave that edits a shared file is the highest-risk case for a regression no single subtask's scoped run can see. +Run build first, then tests, then linter. If build fails, skip tests/lint and reopen the owning subtask. Run the FULL test suite (not a `-k` subset) whenever any subtask in the wave tripped the cross-subtask regression gate (`recommended_gate == "full_suite"`) — a parallel wave that edits a shared file is the highest-risk case for a regression no single subtask's scoped run can see. **Worktree isolation, parallel wave:** when `worktree.isolation` is on and the wave has ≥2 isolated subtasks, accept the whole wave atomically with `merge_wave_worktrees` (never one at a time — the first merge trips `BASE_DIVERGED`); it runs this post-wave gate inside the transaction and rolls the wave back on failure — see [efficient-reference.md](efficient-reference.md#worktree-isolation). ## Step 2a: Validate Step Completion diff --git a/src/mapify_cli/templates_src/skills/map-efficient/efficient-reference.md.jinja b/src/mapify_cli/templates_src/skills/map-efficient/efficient-reference.md.jinja index 0c7965c2..80284856 100644 --- a/src/mapify_cli/templates_src/skills/map-efficient/efficient-reference.md.jinja +++ b/src/mapify_cli/templates_src/skills/map-efficient/efficient-reference.md.jinja @@ -105,7 +105,7 @@ including clean passes — must carry concrete evidence references. ## Wave Execution -Sequential is default. Parallel execution is allowed only when a wave has satisfied dependencies, low risk, and disjoint new-file writes, or when the user explicitly requests it. Use `get_wave_step`, `validate_wave_step`, and `advance_wave`; do not mix wave APIs with the single-current-subtask API. +Sequential is default. Parallel execution is allowed only when a wave has satisfied dependencies, low risk, and disjoint new-file writes, or when the user explicitly requests it. Use `get_wave_step`, `validate_wave_step`, and `advance_wave`; do not mix wave APIs with the single-current-subtask API. When `worktree.isolation` is on and a wave runs in parallel, each subtask gets its own worktree and the wave is accepted atomically via `merge_wave_worktrees` — see [Parallel waves](#worktree-isolation) under Worktree isolation. ## Predictor Recovery @@ -551,6 +551,43 @@ python3 .map/scripts/map_step_runner.py discard_subtask_worktree "$SUBTASK_ID" - The retry creates a fresh worktree off the current HEAD. Inspect state any time with `worktree_isolation_status`. +### Parallel waves (≥2 worktree-isolated subtasks) — #284 Phase 2 + +When `get_wave_step` returns `mode:"parallel"` (a wave with ≥2 disjoint-file +subtasks) AND isolation is enabled, give EACH subtask its own worktree and +dispatch the Actors concurrently (separate Task agents, each pinned to its own +`$WT_PATH`). Do NOT merge them one at a time: every worktree was cut off the same +HEAD, so the first `merge_subtask_worktree` advances the working branch and the +next trips `BASE_DIVERGED`. Accept the whole wave atomically instead — only after +EVERY subtask in the wave has passed Monitor (+ Evaluator): + +```bash +python3 .map/scripts/map_step_runner.py merge_wave_worktrees "$ST_A" "$ST_B" "$ST_C" +``` + +The coordinator (council-reviewed, conv `c29d6fa9`): derives the wave base from +the sidecar; refuses EXTERNAL HEAD movement but allows the sibling divergence each +in-wave squash-merge creates; runs each worktree's pre-merge `verification_checks`, +then squash-merges every accepted worktree by frozen SHA in sorted id order (one +runner commit per subtask), then runs the post-wave full gate **inside the same +transaction**. It is **all-or-nothing**: any textual conflict, commit failure, or +post-wave-gate failure rolls the WHOLE wave back to the base (`reset --hard` + +`clean -fd`, never `git merge --abort` — squash leaves no `MERGE_HEAD`) and leaves +every worktree intact for retry. Pass each subtask's `merged_sha` from the result +to `record_subtask_result --commit-sha`. This **replaces** the separate Per-Wave +Gate when isolation is on — the post-wave gate runs inside `merge_wave_worktrees`. + +Failure `kind`s (working branch untouched / rolled back to base, worktrees kept): +`WAVE_MERGE_CONFLICT` (with `attribution` naming the subtasks that touched each +conflicted file — fix `affected_files` or re-decompose), `WAVE_VERIFY_FAILED` +(post-wave gate red), `EXTERNAL_HEAD_MOVED` (a commit landed outside the wave — +recreate the worktrees off the new HEAD), `WAVE_BASE_MISMATCH`, `DIRTY_TARGET`, +`MERGE_IN_PROGRESS`, plus the per-worktree preflight `kind`s (`VERIFY_FAILED`, +`BULK_DELETION`, … with `phase:"preflight"`). On any Monitor `valid=false` for a +single wave subtask, `discard_subtask_worktree` THAT subtask and retry it; call +`merge_wave_worktrees` only once the whole wave is green. The `overlaps` field is +advisory telemetry (actual changed-file intersections), not a gate. + ## Troubleshooting - Blueprint validation fails: fix the decomposer output before Actor starts. diff --git a/tests/test_worktree_isolation.py b/tests/test_worktree_isolation.py index 1f288878..b997cd9e 100644 --- a/tests/test_worktree_isolation.py +++ b/tests/test_worktree_isolation.py @@ -10,6 +10,7 @@ from __future__ import annotations +import json import subprocess import sys from pathlib import Path @@ -313,3 +314,224 @@ def test_no_changes_when_actor_ignores_worktree(self) -> None: @pytest.mark.usefixtures("repo") def test_merge_without_create_errors(self) -> None: assert m.merge_subtask_worktree("ST-019", verify_cmds=[])["kind"] == "NO_WORKTREE" + + +# --------------------------------------------------------------------------- # +# Wave merge coordinator (#284 Phase 2 — parallel wave / DAG) +# --------------------------------------------------------------------------- # +def _wt_with_files(sid: str, files: dict[str, str]) -> Path: + """Create a subtask worktree and write `files` (path -> content) into it.""" + created = m.create_subtask_worktree(sid) + assert created["status"] == "success", created + wt = Path(str(created["worktree_path"])) + for rel, content in files.items(): + (wt / rel).write_text(content, encoding="utf-8") + return wt + + +class TestWaveWorktreeMerge: + def test_no_subtasks_errors(self, repo: Path) -> None: + del repo + assert m.merge_wave_worktrees([])["kind"] == "NO_SUBTASKS" + + def test_unknown_subtask_errors(self, repo: Path) -> None: + del repo + result = m.merge_wave_worktrees(["ST-404"]) + assert result["kind"] == "NO_WORKTREE" + + def test_happy_path_two_disjoint_subtasks(self, repo: Path) -> None: + base = _git(["rev-parse", "HEAD"], repo).stdout.strip() + _wt_with_files("ST-001", {"b.txt": "from-001\n"}) + _wt_with_files("ST-002", {"c.txt": "from-002\n"}) + + result = m.merge_wave_worktrees( + ["ST-002", "ST-001"], verify_cmds=[], post_wave_cmds=[] + ) + assert result["status"] == "success", result + # Deterministic sorted merge order regardless of input order. + assert result["merged"] == ["ST-001", "ST-002"] + assert result["merged_count"] == 2 + # Both disjoint files landed on the working branch. + assert (repo / "b.txt").read_text().strip() == "from-001" + assert (repo / "c.txt").read_text().strip() == "from-002" + # Exactly TWO squash commits (one per subtask) on top of init. + assert _git(["rev-list", "--count", "HEAD"], repo).stdout.strip() == "3" + # Commit subjects carry the subtask ids in sorted order (newest first). + subjects = _git(["log", "-2", "--format=%s"], repo).stdout.split("\n") + assert subjects[0].startswith("ST-002:") + assert subjects[1].startswith("ST-001:") + # HEAD advanced past the wave base. + assert _git(["rev-parse", "HEAD"], repo).stdout.strip() != base + # Worktrees cleaned up + sidecar emptied. + status = m.worktree_isolation_status() + assert status["active_worktrees"] == [] + assert "map-wt/ST-001-0" not in _git(["branch"], repo).stdout + assert "map-wt/ST-002-0" not in _git(["branch"], repo).stdout + + def test_conflict_rolls_whole_wave_back(self, repo: Path) -> None: + base = _git(["rev-parse", "HEAD"], repo).stdout.strip() + # Both subtasks rewrite the SAME line of a.txt differently -> real + # textual conflict on the second squash-merge. + _wt_with_files("ST-001", {"a.txt": "conflict-from-001\n"}) + _wt_with_files("ST-002", {"a.txt": "conflict-from-002\n"}) + + result = m.merge_wave_worktrees( + ["ST-001", "ST-002"], verify_cmds=[], post_wave_cmds=[] + ) + assert result["status"] == "error" + assert result["kind"] == "WAVE_MERGE_CONFLICT" + assert result["subtask_id"] == "ST-002" + assert "a.txt" in result["conflict_files"] + # Attribution names the culprits that touched a.txt. + attributed = {a["subtask_id"] for a in result["attribution"]} + assert "ST-002" in attributed + # All-or-nothing: working branch is back at the wave base, tree clean. + assert _git(["rev-parse", "HEAD"], repo).stdout.strip() == base + porcelain = [ + ln + for ln in _git(["status", "--porcelain"], repo).stdout.splitlines() + if ln.strip() and ".map" not in ln + ] + assert porcelain == [], porcelain + # No MERGE_HEAD left behind (squash merge has none; abort would error). + assert not (repo / ".git" / "MERGE_HEAD").exists() + # Worktrees left intact for retry. + status = m.worktree_isolation_status() + slugs = {w["subtask_id"] for w in status["active_worktrees"]} + assert {"ST-001", "ST-002"} <= slugs + + def test_external_head_movement_refused(self, repo: Path) -> None: + _wt_with_files("ST-001", {"b.txt": "x\n"}) + _wt_with_files("ST-002", {"c.txt": "y\n"}) + # An external commit advances HEAD past the wave base. + (repo / "external.txt").write_text("outside the wave\n") + _git(["add", "external.txt"], repo) + _git(["commit", "-q", "-m", "external"], repo) + + result = m.merge_wave_worktrees( + ["ST-001", "ST-002"], verify_cmds=[], post_wave_cmds=[] + ) + assert result["kind"] == "EXTERNAL_HEAD_MOVED" + # No merge attempted: the external commit is still HEAD, files unmerged. + assert not (repo / "b.txt").exists() + + def test_post_wave_gate_failure_rolls_back(self, repo: Path) -> None: + base = _git(["rev-parse", "HEAD"], repo).stdout.strip() + _wt_with_files("ST-001", {"b.txt": "x\n"}) + _wt_with_files("ST-002", {"c.txt": "y\n"}) + + result = m.merge_wave_worktrees( + ["ST-001", "ST-002"], + verify_cmds=[], + post_wave_cmds=['bash -lc "exit 7"'], + ) + assert result["kind"] == "WAVE_VERIFY_FAILED" + assert result["returncode"] == 7 + # Atomic rollback: branch at base, no squash commits survived. + assert _git(["rev-parse", "HEAD"], repo).stdout.strip() == base + assert not (repo / "b.txt").exists() + assert not (repo / "c.txt").exists() + # Worktrees intact for retry. + status = m.worktree_isolation_status() + assert len(status["active_worktrees"]) == 2 + + def test_post_wave_gate_pass_accepts(self, repo: Path) -> None: + _wt_with_files("ST-001", {"b.txt": "x\n"}) + result = m.merge_wave_worktrees( + ["ST-001"], verify_cmds=[], post_wave_cmds=['bash -lc "exit 0"'] + ) + assert result["status"] == "success" + assert result["post_wave"]["status"] == "passed" + assert (repo / "b.txt").read_text().strip() == "x" + + def test_per_worktree_verify_failure_aborts_before_merge(self, repo: Path) -> None: + base = _git(["rev-parse", "HEAD"], repo).stdout.strip() + _wt_with_files("ST-001", {"b.txt": "x\n"}) + _wt_with_files("ST-002", {"c.txt": "y\n"}) + + result = m.merge_wave_worktrees( + ["ST-001", "ST-002"], verify_cmds=['bash -lc "exit 5"'] + ) + assert result["kind"] == "VERIFY_FAILED" + assert result["phase"] == "preflight" + # Aborted at preflight: working branch never touched. + assert _git(["rev-parse", "HEAD"], repo).stdout.strip() == base + assert not (repo / "b.txt").exists() + status = m.worktree_isolation_status() + assert len(status["active_worktrees"]) == 2 + + def test_no_change_subtask_counted_not_merged(self, repo: Path) -> None: + # ST-002 has real changes; ST-001's worktree is left empty (actor + # edited the main tree instead). + m.create_subtask_worktree("ST-001") + _wt_with_files("ST-002", {"c.txt": "y\n"}) + + result = m.merge_wave_worktrees( + ["ST-001", "ST-002"], verify_cmds=[], post_wave_cmds=[] + ) + assert result["status"] == "success" + assert result["merged"] == ["ST-002"] + assert result["no_changes"] == ["ST-001"] + # Only ONE squash commit (ST-002); the no-op subtask added none. + assert _git(["rev-list", "--count", "HEAD"], repo).stdout.strip() == "2" + + def test_overlap_reported_when_actual_files_auto_merge(self, repo: Path) -> None: + # Both subtasks touch a.txt but in DIFFERENT hunks (one appends, one + # prepends) -> git auto-merges, no textual conflict. The declared-disjoint + # scheduler hint was wrong, so the overlap telemetry must surface a.txt + # for attribution even though the merge succeeds. + _wt_with_files("ST-001", {"a.txt": "hello\nappended-by-001\n"}) + _wt_with_files("ST-002", {"a.txt": "prepended-by-002\nhello\n"}) + + result = m.merge_wave_worktrees( + ["ST-001", "ST-002"], verify_cmds=[], post_wave_cmds=[] + ) + assert result["status"] == "success", result + overlap_files = {f for o in result["overlaps"] for f in o["files"]} + assert "a.txt" in overlap_files + merged_a = (repo / "a.txt").read_text() + assert "appended-by-001" in merged_a + assert "prepended-by-002" in merged_a + + def test_cli_wave_merge_happy_path(self, repo: Path) -> None: + _wt_with_files("ST-001", {"b.txt": "x\n"}) + _wt_with_files("ST-002", {"c.txt": "y\n"}) + runner = SCRIPTS_PATH / "map_step_runner.py" + proc = subprocess.run( + [ + sys.executable, + str(runner), + "merge_wave_worktrees", + "ST-001", + "ST-002", + "--skip-verify", + "--skip-post-wave", + ], + cwd=repo, + capture_output=True, + text=True, + ) + assert proc.returncode == 0, proc.stderr + out = json.loads(proc.stdout) + assert out["status"] == "success" + assert out["merged"] == ["ST-001", "ST-002"] + assert (repo / "b.txt").exists() and (repo / "c.txt").exists() + + def test_cli_wave_merge_unknown_subtask_exits_nonzero(self, repo: Path) -> None: + runner = SCRIPTS_PATH / "map_step_runner.py" + proc = subprocess.run( + [ + sys.executable, + str(runner), + "merge_wave_worktrees", + "ST-404", + "--skip-verify", + "--skip-post-wave", + ], + cwd=repo, + capture_output=True, + text=True, + ) + assert proc.returncode == 1 + out = json.loads(proc.stdout) + assert out["kind"] == "NO_WORKTREE"