diff --git a/src/gen_worker/cozy_cas.py b/src/gen_worker/cozy_cas.py index 45836a0..d871f1d 100644 --- a/src/gen_worker/cozy_cas.py +++ b/src/gen_worker/cozy_cas.py @@ -307,7 +307,16 @@ def _safe_symlink_dir(target: Path, link: Path) -> None: max_value=30, # cap backoff at 30s between retries ) async def _download_one_file(url: str, dst: Path, expected_size: int, expected_blake3: str) -> None: + import fcntl + import logging + log = logging.getLogger("gen_worker.download") + + log.info("download_start path=%s expected_size=%s expected_blake3=%s", dst.name, expected_size, (expected_blake3 or "")[:16]) + print(f"DEBUG download_start path={dst.name} expected_size={expected_size} expected_blake3={(expected_blake3 or '')[:16]}") + if dst.exists(): + log.info("dst_exists path=%s size=%s", dst, dst.stat().st_size) + print(f"DEBUG dst_exists path={dst} size={dst.stat().st_size}") try: if expected_size and dst.stat().st_size != expected_size: raise ValueError("size mismatch") @@ -326,73 +335,131 @@ async def _download_one_file(url: str, dst: Path, expected_size: int, expected_b timeout = aiohttp.ClientTimeout(total=None, sock_connect=float(os.getenv("WORKER_MODEL_DOWNLOAD_SOCK_CONNECT_TIMEOUT_S", "60")), sock_read=float(os.getenv("WORKER_MODEL_DOWNLOAD_SOCK_READ_TIMEOUT_S", "180"))) tmp = dst.with_suffix(dst.suffix + ".part") - # If we have a partial file, try to resume via HTTP Range. - offset = 0 - if tmp.exists(): - try: - offset = tmp.stat().st_size - except Exception: - offset = 0 - if expected_size and offset > expected_size: - tmp.unlink(missing_ok=True) - offset = 0 - - # If the partial file is already complete, validate + finalize. - if offset and expected_size and offset == expected_size: - got = _blake3_file(tmp) - if expected_blake3 and got.lower() != expected_blake3.lower(): - tmp.unlink(missing_ok=True) - else: - tmp.rename(dst) - return + lock_path = dst.with_suffix(dst.suffix + ".lock") - headers: Dict[str, str] = {} - mode = "wb" - if offset and expected_size: - headers["Range"] = f"bytes={offset}-" - mode = "ab" - - async def _stream_to_file(resp: aiohttp.ClientResponse, *, mode: str, start: int) -> None: - nonlocal expected_size - size = start - with open(tmp, mode) as f: - async for chunk in resp.content.iter_chunked(1 << 20): - if not chunk: - continue - f.write(chunk) - size += len(chunk) - if expected_size and size > expected_size: - raise ValueError("download exceeded expected size") - - async with aiohttp.ClientSession(timeout=timeout) as session: - async with session.get(url, headers=headers) as resp: - # If the server ignored our Range request, restart from scratch to avoid - # duplicating bytes by appending a full response. - # Some gateways can return 206 with an unexpected range start. - # Treat that the same as a 200-on-resume and restart from byte 0. - if offset and ( - resp.status == 200 - or ( - resp.status == 206 - and not str(resp.headers.get("Content-Range") or "").strip().startswith(f"bytes {offset}-") - ) - ): - resp.release() - async with session.get(url) as resp2: - resp2.raise_for_status() - await _stream_to_file(resp2, mode="wb", start=0) + # File-level exclusive lock: prevents concurrent writes to the same .part + # file even from different async tasks or downloader instances. + lock_path.parent.mkdir(parents=True, exist_ok=True) + lock_fd = open(lock_path, "w") + try: + fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX) + print(f"DEBUG file_lock_acquired path={dst.name}") + + # Re-check dst after acquiring the lock — another holder might have + # already completed the download while we waited. + if dst.exists(): + try: + if expected_size and dst.stat().st_size != expected_size: + raise ValueError("size mismatch after lock") + if expected_blake3: + got = _blake3_file(dst) + if got.lower() != expected_blake3.lower(): + raise ValueError("blake3 mismatch after lock") + print(f"DEBUG file_lock_dst_completed path={dst.name} (another writer finished)") + return + except Exception: + pass + + # If we have a partial file, try to resume via HTTP Range. + offset = 0 + if tmp.exists(): + try: + offset = tmp.stat().st_size + except OSError: + # Another coroutine may have renamed tmp→dst between the exists() check and stat(). + offset = 0 + if offset: + log.info("resume_attempt path=%s offset=%s expected_size=%s", dst.name, offset, expected_size) + print(f"DEBUG resume_attempt path={dst.name} offset={offset} expected_size={expected_size}") + if expected_size and offset > expected_size: + tmp.unlink(missing_ok=True) + offset = 0 + + # If the partial file is already complete, validate + finalize. + if offset and expected_size and offset == expected_size: + got = _blake3_file(tmp) + if expected_blake3 and got.lower() != expected_blake3.lower(): + tmp.unlink(missing_ok=True) else: - resp.raise_for_status() - await _stream_to_file(resp, mode=mode, start=offset) - - # Validate final file. - if expected_size and tmp.stat().st_size != expected_size: - raise ValueError(f"size mismatch (expected {expected_size}, got {tmp.stat().st_size})") - if expected_blake3: - got = _blake3_file(tmp) - if got.lower() != expected_blake3.lower(): - raise ValueError("blake3 mismatch") - tmp.rename(dst) + tmp.rename(dst) + return + + headers: Dict[str, str] = {} + mode = "wb" + if offset and expected_size: + headers["Range"] = f"bytes={offset}-" + mode = "ab" + print(f"DEBUG range_header path={dst.name} Range=bytes={offset}- mode={mode}") + + async def _stream_to_file(resp: aiohttp.ClientResponse, *, mode: str, start: int) -> None: + nonlocal expected_size + size = start + with open(tmp, mode) as f: + async for chunk in resp.content.iter_chunked(1 << 20): + if not chunk: + continue + f.write(chunk) + size += len(chunk) + if expected_size and size > expected_size: + raise ValueError("download exceeded expected size") + + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.get(url, headers=headers) as resp: + content_range = str(resp.headers.get("Content-Range") or "").strip() + print(f"DEBUG http_response path={dst.name} status={resp.status} content_range={content_range!r} content_length={resp.headers.get('Content-Length', 'unknown')} offset={offset}") + # If the server ignored our Range request, restart from scratch to avoid + # duplicating bytes by appending a full response. + # Some gateways can return 206 with an unexpected range start. + # Treat that the same as a 200-on-resume and restart from byte 0. + if offset and ( + resp.status == 200 + or ( + resp.status == 206 + and not content_range.startswith(f"bytes {offset}-") + ) + ): + print(f"DEBUG range_ignored path={dst.name} status={resp.status} content_range={content_range!r} restarting_from_zero=True") + resp.release() + async with session.get(url) as resp2: + resp2.raise_for_status() + print(f"DEBUG range_restart path={dst.name} status={resp2.status} content_length={resp2.headers.get('Content-Length', 'unknown')}") + await _stream_to_file(resp2, mode="wb", start=0) + else: + resp.raise_for_status() + await _stream_to_file(resp, mode=mode, start=offset) + + # Validate final file. + actual_size = tmp.stat().st_size + log.info("download_complete path=%s actual_size=%s expected_size=%s", dst.name, actual_size, expected_size) + print(f"DEBUG download_complete path={dst.name} actual_size={actual_size} expected_size={expected_size}") + if expected_size and actual_size != expected_size: + log.error("size_mismatch path=%s expected=%s got=%s url=%s", dst.name, expected_size, actual_size, url[:80]) + print(f"DEBUG size_mismatch path={dst.name} expected={expected_size} got={actual_size} url={url[:80]}") + tmp.unlink(missing_ok=True) + raise ValueError(f"size mismatch (expected {expected_size}, got {actual_size})") + if expected_blake3: + got = _blake3_file(tmp) + log.info("blake3_check path=%s expected=%s got=%s", dst.name, (expected_blake3 or "")[:16], got[:16]) + print(f"DEBUG blake3_check path={dst.name} expected={(expected_blake3 or '')[:16]} got={got[:16]}") + if got.lower() != expected_blake3.lower(): + log.error("blake3_mismatch path=%s", dst.name) + print(f"DEBUG blake3_mismatch path={dst.name}") + tmp.unlink(missing_ok=True) + raise ValueError("blake3 mismatch") + # A concurrent coroutine may have already renamed tmp→dst (won the race). + # Use an atomic replace so we don't fail if dst now exists. + try: + tmp.replace(dst) + except OSError: + # dst was created by another coroutine; .part is stale, just remove it. + tmp.unlink(missing_ok=True) + finally: + fcntl.flock(lock_fd.fileno(), fcntl.LOCK_UN) + lock_fd.close() + try: + lock_path.unlink(missing_ok=True) + except OSError: + pass def _blake3_file(path: Path, chunk_size: int = 1 << 20) -> str: diff --git a/src/gen_worker/cozy_pipeline_spec.py b/src/gen_worker/cozy_pipeline_spec.py index 21fa5a7..c8c80f0 100644 --- a/src/gen_worker/cozy_pipeline_spec.py +++ b/src/gen_worker/cozy_pipeline_spec.py @@ -1,16 +1,21 @@ from __future__ import annotations import json +import logging import os +import tomllib from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Optional, Tuple import yaml +logger = logging.getLogger(__name__) COZY_PIPELINE_LOCK_FILENAME = "cozy.pipeline.lock.yaml" COZY_PIPELINE_FILENAME = "cozy.pipeline.yaml" +PIPELINE_LOCK_TOML_FILENAME = "pipeline.lock" +PIPELINE_TOML_FILENAME = "pipeline.toml" DIFFUSERS_MODEL_INDEX_FILENAME = "model_index.json" @@ -33,6 +38,13 @@ def custom_pipeline_path(self) -> Optional[str]: s = str(v).strip() return s or None + @property + def variant(self) -> Optional[str]: + """Diffusers variant (e.g. 'fp16', 'fp8') from the pipeline spec.""" + pipe = self.raw.get("pipe") or {} + v = str(pipe.get("variant") or "").strip() + return v or None + def _safe_child_path(root: Path, rel: str) -> Path: # Ensure rel doesn't escape root (best-effort). @@ -50,24 +62,41 @@ def load_cozy_pipeline_spec(model_root: Path) -> Optional[CozyPipelineSpec]: This is a worker-side helper used during pipeline loading to implement: - prefer `cozy.pipeline.lock.yaml` when present - fall back to `cozy.pipeline.yaml` otherwise + - fall back to `pipeline.lock` / `pipeline.toml` (TOML) if no YAML found """ root = Path(model_root) lock_path = root / COZY_PIPELINE_LOCK_FILENAME spec_path = lock_path if lock_path.exists() else (root / COZY_PIPELINE_FILENAME) - if not spec_path.exists(): - return None - - raw = yaml.safe_load(spec_path.read_text(encoding="utf-8")) - if not isinstance(raw, dict): - raise ValueError("invalid cozy pipeline spec (expected mapping)") - api = str(raw.get("apiVersion") or "").strip() - kind = str(raw.get("kind") or "").strip() - if api and api != "v1": - raise ValueError(f"unsupported cozy pipeline apiVersion: {api!r}") - if kind and kind != "DiffusersPipeline": - raise ValueError(f"unsupported cozy pipeline kind: {kind!r}") - - return CozyPipelineSpec(source_path=spec_path, raw=raw) + if spec_path.exists(): + raw = yaml.safe_load(spec_path.read_text(encoding="utf-8")) + if not isinstance(raw, dict): + raise ValueError("invalid cozy pipeline spec (expected mapping)") + api = str(raw.get("apiVersion") or "").strip() + kind = str(raw.get("kind") or "").strip() + if api and api != "v1": + raise ValueError(f"unsupported cozy pipeline apiVersion: {api!r}") + if kind and kind != "DiffusersPipeline": + raise ValueError(f"unsupported cozy pipeline kind: {kind!r}") + logger.info("DEBUG loaded cozy pipeline spec from %s", spec_path.name) + return CozyPipelineSpec(source_path=spec_path, raw=raw) + + # Fallback: read pipeline.lock / pipeline.toml (TOML format, stored by tensorhub ingest). + toml_lock = root / PIPELINE_LOCK_TOML_FILENAME + toml_spec = toml_lock if toml_lock.exists() else (root / PIPELINE_TOML_FILENAME) + if toml_spec.exists(): + raw = tomllib.loads(toml_spec.read_text(encoding="utf-8")) + if not isinstance(raw, dict): + raise ValueError("invalid pipeline toml (expected mapping)") + api = str(raw.get("apiVersion") or "").strip() + kind = str(raw.get("kind") or "").strip() + if api and api != "v1": + raise ValueError(f"unsupported pipeline toml apiVersion: {api!r}") + if kind and kind != "DiffusersPipeline": + raise ValueError(f"unsupported pipeline toml kind: {kind!r}") + logger.info("DEBUG loaded cozy pipeline spec from %s (toml fallback)", toml_spec.name) + return CozyPipelineSpec(source_path=toml_spec, raw=raw) + + return None def cozy_custom_pipeline_arg(model_root: Path, spec: CozyPipelineSpec) -> Optional[str]: diff --git a/src/gen_worker/cozy_snapshot_v2_downloader.py b/src/gen_worker/cozy_snapshot_v2_downloader.py index 9399988..f15020c 100644 --- a/src/gen_worker/cozy_snapshot_v2_downloader.py +++ b/src/gen_worker/cozy_snapshot_v2_downloader.py @@ -15,6 +15,40 @@ from .tensorhub_v2 import CozyHubV2Client, CozyHubResolveArtifactResult, CozyHubSnapshotFile from .model_refs import CozyRef +# Module-global blob download locks shared across ALL CozySnapshotV2Downloader +# instances. Without this, concurrent callers (startup prefetch, task handler, +# LoadModelCommand) each create their own downloader instance with separate +# _blob_locks dicts, allowing parallel writes to the same .part file and +# causing file corruption (interleaved appends → oversized / invalid blobs). + +# threading.Lock (not asyncio.Lock) is used so that the locks work correctly +# even when callers run in different event loops (e.g. startup prefetch via +# asyncio.run() in a thread vs. LoadModelCommand on the main loop). +_GLOBAL_BLOB_LOCKS_LOCK = threading.Lock() +_GLOBAL_BLOB_LOCKS: Dict[str, threading.Lock] = {} + + +class _SnapshotEntry: + """Coordinates concurrent snapshot builds: one builder, zero-or-more waiters.""" + + def __init__(self) -> None: + self.event = threading.Event() # set when snap_dir is ready (or build failed) + self.exception: Optional[BaseException] = None + + +_GLOBAL_SNAPSHOT_LOCKS_LOCK = threading.Lock() +_GLOBAL_SNAPSHOT_LOCKS: Dict[str, _SnapshotEntry] = {} + + +def _get_blob_lock(digest: str) -> threading.Lock: + """Return (or create) a per-digest threading.Lock from the module-global map.""" + with _GLOBAL_BLOB_LOCKS_LOCK: + lock = _GLOBAL_BLOB_LOCKS.get(digest) + if lock is None: + lock = threading.Lock() + _GLOBAL_BLOB_LOCKS[digest] = lock + return lock + def _blob_path(blobs_root: Path, digest: str) -> Path: digest = (digest or "").strip().lower() @@ -134,9 +168,6 @@ class CozySnapshotV2Downloader: def __init__(self, client: Optional[CozyHubV2Client]) -> None: self._client = client - self._locks_lock = threading.Lock() - self._blob_locks: Dict[str, asyncio.Lock] = {} - self._snapshot_locks: Dict[str, asyncio.Lock] = {} async def ensure_snapshot( self, @@ -159,8 +190,32 @@ async def ensure_snapshot( if snap_dir.exists(): return snap_dir - lock = self._get_lock(self._snapshot_locks, res.snapshot_digest) - async with lock: + # Claim building responsibility or wait for another caller already building + # this snapshot. threading primitives (not asyncio.Lock) are used so that + # this works correctly across different event loops (startup prefetch thread + # vs. LoadModelCommand on the main loop). + loop = asyncio.get_running_loop() + with _GLOBAL_SNAPSHOT_LOCKS_LOCK: + if snap_dir.exists(): # double-check under the guard + return snap_dir + _snap_entry = _GLOBAL_SNAPSHOT_LOCKS.get(res.snapshot_digest) + if _snap_entry is None: + _snap_entry = _SnapshotEntry() + _GLOBAL_SNAPSHOT_LOCKS[res.snapshot_digest] = _snap_entry + _is_builder = True + else: + _is_builder = False + + if not _is_builder: + # Another caller is already building this snapshot; wait for it. + await loop.run_in_executor(None, _snap_entry.event.wait) + if _snap_entry.exception is not None: + raise RuntimeError( + f"concurrent snapshot build failed for {res.snapshot_digest}" + ) from _snap_entry.exception + return snap_dir + + try: if snap_dir.exists(): return snap_dir @@ -176,7 +231,12 @@ async def ensure_snapshot( parts_manifest_entries = [f for f in res.files if _is_parts_manifest(f.path)] part_file_paths = {f.path for f in res.files if _is_part_file(f.path)} + import logging as _logging + _log = _logging.getLogger("gen_worker.download") + for pm_entry in parts_manifest_entries: + _log.info("reassemble_start manifest=%s", pm_entry.path) + print(f"DEBUG reassemble_start manifest={pm_entry.path}") parts_json_blob = _blob_path(blobs_root, pm_entry.blake3) try: parts_manifest = json.loads(parts_json_blob.read_bytes()) @@ -197,13 +257,17 @@ async def ensure_snapshot( dst.unlink() with open(dst, "wb") as out_f: - for part in parts: + for i, part in enumerate(parts): part_digest = _strip_blake3_prefix( str(part.get("digest") or "").strip().lower() ) if not part_digest: raise ValueError(f"part entry in {pm_entry.path} missing digest") part_blob = _blob_path(blobs_root, part_digest) + _log.info(" concat_part index=%d digest=%s exists=%s size=%s", + i, part_digest[:16], part_blob.exists(), + part_blob.stat().st_size if part_blob.exists() else -1) + print(f"DEBUG concat_part index={i} digest={part_digest[:16]} exists={part_blob.exists()} size={part_blob.stat().st_size if part_blob.exists() else -1}") with open(part_blob, "rb") as in_f: shutil.copyfileobj(in_f, out_f) @@ -218,8 +282,25 @@ async def ensure_snapshot( src = _blob_path(blobs_root, f.blake3) _try_hardlink_or_copy(src, dst) - tmp.rename(snap_dir) + # Another concurrent caller (e.g. a different downloader instance) may have + # already materialized and renamed the snapshot while we were assembling ours. + # In that case, discard our tmp dir and return the existing snapshot. + if snap_dir.exists(): + shutil.rmtree(tmp, ignore_errors=True) + return snap_dir + try: + tmp.rename(snap_dir) + except OSError: + # Lost the race — snap_dir was created between the exists() check and rename(). + shutil.rmtree(tmp, ignore_errors=True) + if not snap_dir.exists(): + raise return snap_dir + except BaseException as _exc: + _snap_entry.exception = _exc + raise + finally: + _snap_entry.event.set() async def _resolve(self, ref: CozyRef) -> CozyHubResolveArtifactResult: if self._client is None: @@ -238,7 +319,16 @@ async def _resolve(self, ref: CozyRef) -> CozyHubResolveArtifactResult: ) async def _ensure_blobs(self, blobs_root: Path, files: List[CozyHubSnapshotFile]) -> None: - pending: List[tuple[CozyHubSnapshotFile, str, Path]] = [] + import logging + log = logging.getLogger("gen_worker.download") + + log.info("ensure_blobs total_files=%d", len(files)) + print(f"DEBUG ensure_blobs total_files={len(files)}") + for f in files: + log.info(" entry path=%s size=%s digest=%s url_present=%s", f.path, f.size_bytes, (f.blake3 or "")[:16], bool(f.url)) + print(f"DEBUG entry path={f.path} size={f.size_bytes} digest={(f.blake3 or '')[:16]} url_present={bool(f.url)}") + + all_blobs: List[tuple[CozyHubSnapshotFile, str, Path]] = [] for f in files: digest = (f.blake3 or "").strip().lower() if not digest: @@ -247,12 +337,7 @@ async def _ensure_blobs(self, blobs_root: Path, files: List[CozyHubSnapshotFile] raise ValueError(f"missing url for {f.path}") dst = _blob_path(blobs_root, digest) dst.parent.mkdir(parents=True, exist_ok=True) - if dst.exists(): - continue - pending.append((f, digest, dst)) - - if not pending: - return + all_blobs.append((f, digest, dst)) # Parallelize shard/blob downloads to reduce first-load latency for # multi-file transformer checkpoints. @@ -260,9 +345,18 @@ async def _ensure_blobs(self, blobs_root: Path, files: List[CozyHubSnapshotFile] sem = asyncio.Semaphore(max_conc) async def _ensure_one(f: CozyHubSnapshotFile, digest: str, dst: Path) -> None: - lock = self._get_lock(self._blob_locks, digest) - async with sem: - async with lock: + # Acquire the GLOBAL per-digest threading.Lock (loop-independent) so that + # concurrent callers across different downloader instances (startup prefetch, + # task request, LoadModelCommand) don't write to the same .part file in + # parallel. run_in_executor is used so that waiting for the lock does not + # block the event loop. + _lock = _get_blob_lock(digest) + _lloop = asyncio.get_running_loop() + await _lloop.run_in_executor(None, _lock.acquire) + try: + if dst.exists(): + return + async with sem: if dst.exists(): return assert f.url is not None @@ -272,18 +366,14 @@ async def _ensure_one(f: CozyHubSnapshotFile, digest: str, dst: Path) -> None: expected_size=int(f.size_bytes or 0), expected_blake3=digest, ) + finally: + _lock.release() # Start larger blobs first for better overlap. - pending.sort(key=lambda row: int(row[0].size_bytes or 0), reverse=True) - await asyncio.gather(*(_ensure_one(f, digest, dst) for f, digest, dst in pending)) - - def _get_lock(self, mp: Dict[str, asyncio.Lock], key: str) -> asyncio.Lock: - with self._locks_lock: - lock = mp.get(key) - if lock is None: - lock = asyncio.Lock() - mp[key] = lock - return lock + all_blobs.sort(key=lambda row: int(row[0].size_bytes or 0), reverse=True) + await asyncio.gather(*(_ensure_one(f, digest, dst) for f, digest, dst in all_blobs)) + + async def ensure_snapshot_async( diff --git a/src/gen_worker/diffusers_model_manager.py b/src/gen_worker/diffusers_model_manager.py index 3fe4d9a..b4d720d 100644 --- a/src/gen_worker/diffusers_model_manager.py +++ b/src/gen_worker/diffusers_model_manager.py @@ -55,6 +55,7 @@ async def load_model_into_vram(self, model_id: str) -> bool: local_path = self._downloader.download(model_id, cache_dir) except Exception as e: logger.warning("DiffusersModelManager: download failed for %s: %s", model_id, e) + return False loaded = await self._loader.load(model_id, model_path=local_path) logger.info( diff --git a/src/gen_worker/pipeline_loader.py b/src/gen_worker/pipeline_loader.py index 20ae643..72be53f 100644 --- a/src/gen_worker/pipeline_loader.py +++ b/src/gen_worker/pipeline_loader.py @@ -552,7 +552,9 @@ def detect_diffusers_variant(model_path: Path) -> Optional[str]: name = p.name.lower() for v in candidates: if f".{v}." in name and name.endswith((".safetensors", ".json")): + print(f"DEBUG detect_diffusers_variant matched variant={v} file={p.name}") return v + print(f"DEBUG detect_diffusers_variant no variant found in {model_path}") return None @@ -1456,6 +1458,17 @@ async def load( raise ModelNotFoundError(model_id, path) config = config or PipelineConfig(model_path=str(path)) + if config.variant is None: + # Prefer variant from cozy/pipeline spec, fall back to file-name scan. + try: + from .cozy_pipeline_spec import load_cozy_pipeline_spec + + spec = load_cozy_pipeline_spec(path) + if spec is not None and spec.variant: + config.variant = spec.variant + print(f"DEBUG pipeline_loader variant_from_spec={config.variant}") + except Exception: + pass if config.variant is None: config.variant = detect_diffusers_variant(path) diff --git a/src/gen_worker/worker.py b/src/gen_worker/worker.py index 7423fd4..936ddf9 100644 --- a/src/gen_worker/worker.py +++ b/src/gen_worker/worker.py @@ -3809,12 +3809,17 @@ def _execute_task( models_in_use: set[str] = set() inference_watchdog: Optional[threading.Timer] = None + print(f"DEBUG [execute_task] entered request_id={request_id} function={spec.name} payload_bytes={len(input_payload or b'')} timeout_ms={ctx.timeout_ms or 'none'}", flush=True) try: - if ctx.is_canceled(): + _is_canceled = ctx.is_canceled() + print(f"DEBUG [execute_task] cancellation_check request_id={request_id} is_canceled={_is_canceled} deadline={getattr(ctx, '_deadline', None)}", flush=True) + if _is_canceled: raise CanceledError("canceled") # Decode payload strictly. + print(f"DEBUG [execute_task] decoding_payload request_id={request_id} payload_type={spec.payload_type}", flush=True) input_obj = msgspec.msgpack.decode(input_payload, type=spec.payload_type) + print(f"DEBUG [execute_task] payload_decoded request_id={request_id} input={input_obj!r}", flush=True) # Optional post-decode constraints (e.g. clamping) declared on the payload type. try: from .payload_constraints import apply_payload_constraints @@ -3822,7 +3827,9 @@ def _execute_task( _ = apply_payload_constraints(input_obj) except Exception: pass + print(f"DEBUG [execute_task] materializing_assets request_id={request_id}", flush=True) self._materialize_assets(ctx, input_obj) + print(f"DEBUG [execute_task] assets_materialized request_id={request_id}", flush=True) # Best-effort extract diffusion-ish numeric fields for metrics.run. try: def _get_num(name: str) -> Optional[float]: @@ -3856,8 +3863,10 @@ def _get_num(name: str) -> Optional[float]: call_kwargs[spec.ctx_param] = ctx call_kwargs[spec.payload_param] = input_obj + print(f"DEBUG [execute_task] injection_loop request_id={request_id} injections={len(spec.injections)}", flush=True) for inj in spec.injections: resolve_t0 = time.monotonic() + print(f"DEBUG [execute_task] resolving_model request_id={request_id} param={inj.param_name}", flush=True) resolve_watchdog = self._start_task_phase_watchdog( request_id=request_id, phase="model_resolve", @@ -3940,7 +3949,9 @@ def _get_num(name: str) -> Optional[float]: }, ) try: + print(f"DEBUG [execute_task] loading_model request_id={request_id} param={inj.param_name} model_id={canon_model_id}", flush=True) call_kwargs[inj.param_name] = self._resolve_injected_value(ctx, inj.param_type, model_id, inj) + print(f"DEBUG [execute_task] model_loaded request_id={request_id} param={inj.param_name} model_id={canon_model_id} pipeline_type={type(call_kwargs[inj.param_name]).__name__}", flush=True) self._emit_task_event( request_id, "task.model_load.completed", @@ -3970,10 +3981,18 @@ def _get_num(name: str) -> Optional[float]: # Invoke. t_infer0 = time.monotonic() + _infer_warn_s = float(getattr(self, "_warn_inference_s", 60.0)) + logger.info( + "inference.start request_id=%s function=%s timeout_ms=%s warn_after_s=%.1f", + request_id, + spec.name, + ctx.timeout_ms if ctx.timeout_ms else "none", + _infer_warn_s, + ) inference_watchdog = self._start_task_phase_watchdog( request_id=request_id, phase="inference", - warn_after_s=float(getattr(self, "_warn_inference_s", 60.0)), + warn_after_s=_infer_warn_s, payload={"function_name": spec.name, "output_mode": spec.output_mode}, ) self._emit_task_event( @@ -3981,12 +4000,14 @@ def _get_num(name: str) -> Optional[float]: "task.inference.started", {"function_name": spec.name, "output_mode": spec.output_mode}, ) + print(f"DEBUG [execute_task] calling_func request_id={request_id} function={spec.name} is_canceled={ctx.is_canceled()} kwargs_keys={list(call_kwargs.keys())}", flush=True) if inspect.iscoroutinefunction(spec.func): result = asyncio.run(spec.func(**call_kwargs)) elif inspect.isasyncgenfunction(spec.func): result = spec.func(**call_kwargs) else: result = spec.func(**call_kwargs) + print(f"DEBUG [execute_task] func_returned request_id={request_id} function={spec.name} result_type={type(result).__name__}", flush=True) if ctx.is_canceled(): raise CanceledError("canceled") @@ -4529,17 +4550,27 @@ def _resolve_injected_value(self, ctx: ActionContext, requested_type: Any, model kwargs["custom_pipeline"] = custom_pipeline except Exception: pass - except Exception: - pass - - try: - from gen_worker.pipeline_loader import detect_diffusers_variant + # Read variant from cozy/pipeline spec (authoritative). + if spec.variant: + kwargs["variant"] = spec.variant + print(f"DEBUG cozy_spec_variant={spec.variant} source={spec.source_path.name}") + except Exception as _spec_exc: + print(f"DEBUG cozy_spec_load_error={_spec_exc}") + + # Fallback: scan files on disk for diffusers variant naming. + if "variant" not in kwargs: + try: + from gen_worker.pipeline_loader import detect_diffusers_variant - variant = detect_diffusers_variant(local_path) - if variant is not None: - kwargs["variant"] = variant - except Exception: - pass + variant = detect_diffusers_variant(local_path) + if variant is not None: + kwargs["variant"] = variant + print(f"DEBUG detect_diffusers_variant={variant} path={local_path}") + else: + print(f"DEBUG detect_diffusers_variant=None path={local_path}") + except Exception as _detect_exc: + print(f"DEBUG detect_variant_error={_detect_exc}") + print(f"DEBUG from_pretrained variant={kwargs.get('variant')} path={local_path}") # Quantized weight-only inference requires explicit loader hints. #