diff --git a/src/gen_worker/cozy_cas.py b/src/gen_worker/cozy_cas.py index a03717c..6866c89 100644 --- a/src/gen_worker/cozy_cas.py +++ b/src/gen_worker/cozy_cas.py @@ -300,10 +300,11 @@ def _safe_symlink_dir(target: Path, link: Path) -> None: @backoff.on_exception( backoff.expo, - (aiohttp.ClientError, asyncio.TimeoutError, ValueError, OSError, ConnectionError), - max_tries=max(1, int(os.getenv("WORKER_MODEL_DOWNLOAD_MAX_RETRIES", "12") or "12")), - max_time=max(30.0, float(os.getenv("WORKER_MODEL_DOWNLOAD_RETRY_MAX_TIME_S", "900") or "900")), - max_value=max(1.0, float(os.getenv("WORKER_MODEL_DOWNLOAD_BACKOFF_MAX_S", "8") or "8")), + (aiohttp.ClientError, asyncio.TimeoutError, ValueError, OSError), + max_tries=30, + max_time=3600, + factor=1, + max_value=30, # cap backoff at 30s between retries ) async def _download_one_file(url: str, dst: Path, expected_size: int, expected_blake3: str) -> None: if dst.exists(): @@ -319,12 +320,11 @@ async def _download_one_file(url: str, dst: Path, expected_size: int, expected_b # Fall through to re-download. pass - timeout = aiohttp.ClientTimeout( - total=None, - connect=float(os.getenv("WORKER_MODEL_DOWNLOAD_CONNECT_TIMEOUT_S", "60") or "60"), - sock_connect=float(os.getenv("WORKER_MODEL_DOWNLOAD_SOCK_CONNECT_TIMEOUT_S", "60") or "60"), - sock_read=float(os.getenv("WORKER_MODEL_DOWNLOAD_SOCK_READ_TIMEOUT_S", "120") or "120"), - ) + # Use sock_read instead of total timeout so actively-streaming large files + # are not killed. total=None lets multi-GB downloads run as long as data + # keeps flowing; sock_read=120 catches genuine stalls. + timeout = aiohttp.ClientTimeout(total=None, sock_connect=float(os.getenv("WORKER_MODEL_DOWNLOAD_SOCK_CONNECT_TIMEOUT_S", "60")), + sock_read=float(os.getenv("WORKER_MODEL_DOWNLOAD_SOCK_READ_TIMEOUT_S", "120"))) tmp = dst.with_suffix(dst.suffix + ".part") # If we have a partial file, try to resume via HTTP Range. offset = 0 diff --git a/src/gen_worker/cozy_snapshot_v2_downloader.py b/src/gen_worker/cozy_snapshot_v2_downloader.py index 27318df..cd74459 100644 --- a/src/gen_worker/cozy_snapshot_v2_downloader.py +++ b/src/gen_worker/cozy_snapshot_v2_downloader.py @@ -198,6 +198,24 @@ def _get_lock(self, mp: Dict[str, asyncio.Lock], key: str) -> asyncio.Lock: return lock +async def ensure_snapshot_async( + *, + base_dir: Path, + ref: CozyRef, + base_url: str, + token: Optional[str], + resolved: Optional[Any] = None, +) -> Path: + """Async version of ensure_snapshot_sync for use in async contexts.""" + client: Optional[CozyHubV2Client] = None + if resolved is None: + if not (base_url or "").strip(): + raise RuntimeError("cozy downloads require TENSORHUB_URL") + client = CozyHubV2Client(base_url=base_url, token=token) + dl = CozySnapshotV2Downloader(client) + return await dl.ensure_snapshot(base_dir, ref, resolved=resolved) + + def ensure_snapshot_sync( *, base_dir: Path, diff --git a/src/gen_worker/diffusers_model_manager.py b/src/gen_worker/diffusers_model_manager.py index 29f1b1e..3fe4d9a 100644 --- a/src/gen_worker/diffusers_model_manager.py +++ b/src/gen_worker/diffusers_model_manager.py @@ -41,10 +41,18 @@ async def load_model_into_vram(self, model_id: str) -> bool: local_path: Optional[str] = None if self._downloader is not None: from .cache_paths import worker_model_cache_dir + from .model_refs import parse_model_ref + from pathlib import Path cache_dir = str(worker_model_cache_dir()) try: - local_path = self._downloader.download(model_id, cache_dir) + # Use async download path directly to avoid nested event loop issues. + if hasattr(self._downloader, '_download_async'): + parsed = parse_model_ref(model_id) + result = await self._downloader._download_async(parsed, Path(cache_dir)) + local_path = result.as_posix() + else: + local_path = self._downloader.download(model_id, cache_dir) except Exception as e: logger.warning("DiffusersModelManager: download failed for %s: %s", model_id, e) diff --git a/src/gen_worker/model_ref_downloader.py b/src/gen_worker/model_ref_downloader.py index d055fd5..4bbe8c3 100644 --- a/src/gen_worker/model_ref_downloader.py +++ b/src/gen_worker/model_ref_downloader.py @@ -9,7 +9,7 @@ import time from .cozy_cas import CozyHubClient, CozySnapshotDownloader -from .cozy_snapshot_v2_downloader import ensure_snapshot_sync +from .cozy_snapshot_v2_downloader import ensure_snapshot_async, ensure_snapshot_sync from .downloader import ModelDownloader from .tensorhub_v2 import ( CozyHubError, @@ -19,6 +19,7 @@ ) from .hf_downloader import HuggingFaceHubDownloader from .model_refs import CozyRef, ParsedModelRef, parse_model_ref +import threading # Per-task resolved manifests provided by gen-orchestrator (issue #92). # Shape: {canonical_model_id: ResolvedCozyModel-like object} @@ -134,7 +135,7 @@ async def _download_async(self, parsed: ParsedModelRef, dest_dir: Path) -> Path: canonical = parsed.hf.canonical() prefs = _get_prefs_for_ref(canonical) resolved_artifact = await self._request_public_model_with_wait(canonical, prefs=prefs) - return ensure_snapshot_sync( + return await ensure_snapshot_async( base_dir=dest_dir, ref=CozyRef(owner="public", repo="public", tag="latest"), base_url=self._cozy_base_url or "", @@ -151,7 +152,7 @@ async def _download_async(self, parsed: ParsedModelRef, dest_dir: Path) -> Path: resolved_entry = _lookup_resolved_cozy_entry(resolved_mapping, canonical) if resolved_entry is not None: - return ensure_snapshot_sync( + return await ensure_snapshot_async( base_dir=dest_dir, ref=parsed.cozy, base_url=self._cozy_base_url or "", @@ -164,7 +165,7 @@ async def _download_async(self, parsed: ParsedModelRef, dest_dir: Path) -> Path: if self._cozy_v2 is not None and parsed.cozy.digest is None: prefs = _get_prefs_for_ref(canonical) resolved = await self._request_public_model_with_wait(canonical, prefs=prefs) - return ensure_snapshot_sync( + return await ensure_snapshot_async( base_dir=dest_dir, ref=parsed.cozy, base_url=self._cozy_base_url or "", @@ -181,7 +182,7 @@ async def _download_async(self, parsed: ParsedModelRef, dest_dir: Path) -> Path: # Prefer Cozy Hub v2 resolve flow. try: - return ensure_snapshot_sync( + return await ensure_snapshot_async( base_dir=dest_dir, ref=parsed.cozy, base_url=self._cozy_base_url, @@ -291,14 +292,14 @@ def _run_in_thread(coro: Coroutine[Any, Any, Path]) -> str: out: dict[str, str] = {} err: dict[str, BaseException] = {} + ctx = contextvars.copy_context() + def runner() -> None: try: - out["v"] = asyncio.run(coro).as_posix() + out["v"] = ctx.run(asyncio.run, coro).as_posix() except BaseException as e: err["e"] = e - import threading - t = threading.Thread(target=runner, daemon=True) t.start() t.join() diff --git a/src/gen_worker/pb/frontend_pb2.py b/src/gen_worker/pb/frontend_pb2.py index 1f4d0af..e3b1e83 100644 --- a/src/gen_worker/pb/frontend_pb2.py +++ b/src/gen_worker/pb/frontend_pb2.py @@ -24,7 +24,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0e\x66rontend.proto\x12\x0b\x66rontend.v1\"C\n\rActionOptions\x12\n\n\x02id\x18\x01 \x01(\t\x12\x12\n\ntimeout_ms\x18\x02 \x01(\x03J\x04\x08\x03\x10\x04R\x0cretry_policy\"\x84\x03\n\x12VariantPreferences\x12;\n\x05quant\x18\x01 \x01(\x0e\x32,.frontend.v1.VariantPreferences.Quantization\x12<\n\tpackaging\x18\x02 \x01(\x0e\x32).frontend.v1.VariantPreferences.Packaging\x12\x36\n\x06layout\x18\x03 \x01(\x0e\x32&.frontend.v1.VariantPreferences.Layout\"B\n\x0cQuantization\x12\x15\n\x11QUANT_UNSPECIFIED\x10\x00\x12\x08\n\x04\x46P16\x10\x01\x12\x08\n\x04\x42\x46\x31\x36\x10\x02\x12\x07\n\x03\x46P8\x10\x03\"F\n\tPackaging\x12\x19\n\x15PACKAGING_UNSPECIFIED\x10\x00\x12\x0f\n\x0bSAFETENSORS\x10\x01\x12\r\n\tFLASHPACK\x10\x02\"/\n\x06Layout\x12\x16\n\x12LAYOUT_UNSPECIFIED\x10\x00\x12\r\n\tDIFFUSERS\x10\x01\"\x95\x02\n\x14\x45xecuteActionRequest\x12\x15\n\rfunction_name\x18\x01 \x01(\t\x12\x15\n\rinput_payload\x18\x02 \x01(\x0c\x12+\n\x07options\x18\x03 \x01(\x0b\x32\x1a.frontend.v1.ActionOptions\x12\x12\n\nrelease_id\x18\x04 \x01(\t\x12\x12\n\ninvoker_id\x18\x06 \x01(\t\x12\x1a\n\x12required_repo_refs\x18\x07 \x03(\t\x12\r\n\x05owner\x18\x08 \x01(\t\x12\x36\n\rvariant_prefs\x18\t \x01(\x0b\x32\x1f.frontend.v1.VariantPreferencesJ\x04\x08\x05\x10\x06R\x11required_model_id\"+\n\x15\x45xecuteActionResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\"*\n\x14\x43\x61ncelRequestRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\"#\n\x15\x43\x61ncelRequestResponse\x12\n\n\x02ok\x18\x01 \x01(\x08\"\x84\x01\n\x0cRealtimeOpen\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x12\n\nrelease_id\x18\x02 \x01(\t\x12\x15\n\rfunction_name\x18\x03 \x01(\t\x12\r\n\x05owner\x18\x04 \x01(\t\x12\x12\n\ninvoker_id\x18\x05 \x01(\t\x12\x12\n\ntimeout_ms\x18\x06 \x01(\x03\"B\n\rRealtimeFrame\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\x0f\n\x07is_text\x18\x03 \x01(\x08\"3\n\rRealtimeClose\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0e\n\x06reason\x18\x02 \x01(\t\"\xa2\x01\n\x14RealtimeProxyMessage\x12)\n\x04open\x18\x01 \x01(\x0b\x32\x19.frontend.v1.RealtimeOpenH\x00\x12+\n\x05\x66rame\x18\x02 \x01(\x0b\x32\x1a.frontend.v1.RealtimeFrameH\x00\x12+\n\x05\x63lose\x18\x03 \x01(\x0b\x32\x1a.frontend.v1.RealtimeCloseH\x00\x42\x05\n\x03msg2\x9e\x02\n\x0f\x46rontendService\x12V\n\rExecuteAction\x12!.frontend.v1.ExecuteActionRequest\x1a\".frontend.v1.ExecuteActionResponse\x12V\n\rCancelRequest\x12!.frontend.v1.CancelRequestRequest\x1a\".frontend.v1.CancelRequestResponse\x12[\n\x0fRealtimeSession\x12!.frontend.v1.RealtimeProxyMessage\x1a!.frontend.v1.RealtimeProxyMessage(\x01\x30\x01\x42GZEgithub.com/cozy-creator/gen-orchestrator/pkg/pb/frontendv1;frontendv1b\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0e\x66rontend.proto\x12\x0b\x66rontend.v1\"C\n\rActionOptions\x12\n\n\x02id\x18\x01 \x01(\t\x12\x12\n\ntimeout_ms\x18\x02 \x01(\x03J\x04\x08\x03\x10\x04R\x0cretry_policy\"\x84\x03\n\x12VariantPreferences\x12;\n\x05quant\x18\x01 \x01(\x0e\x32,.frontend.v1.VariantPreferences.Quantization\x12<\n\tpackaging\x18\x02 \x01(\x0e\x32).frontend.v1.VariantPreferences.Packaging\x12\x36\n\x06layout\x18\x03 \x01(\x0e\x32&.frontend.v1.VariantPreferences.Layout\"B\n\x0cQuantization\x12\x15\n\x11QUANT_UNSPECIFIED\x10\x00\x12\x08\n\x04\x46P16\x10\x01\x12\x08\n\x04\x42\x46\x31\x36\x10\x02\x12\x07\n\x03\x46P8\x10\x03\"F\n\tPackaging\x12\x19\n\x15PACKAGING_UNSPECIFIED\x10\x00\x12\x0f\n\x0bSAFETENSORS\x10\x01\x12\r\n\tFLASHPACK\x10\x02\"/\n\x06Layout\x12\x16\n\x12LAYOUT_UNSPECIFIED\x10\x00\x12\r\n\tDIFFUSERS\x10\x01\"\x95\x02\n\x14\x45xecuteActionRequest\x12\x15\n\rfunction_name\x18\x01 \x01(\t\x12\x15\n\rinput_payload\x18\x02 \x01(\x0c\x12+\n\x07options\x18\x03 \x01(\x0b\x32\x1a.frontend.v1.ActionOptions\x12\x12\n\nrelease_id\x18\x04 \x01(\t\x12\x12\n\ninvoker_id\x18\x06 \x01(\t\x12\x1a\n\x12required_repo_refs\x18\x07 \x03(\t\x12\r\n\x05owner\x18\x08 \x01(\t\x12\x36\n\rvariant_prefs\x18\t \x01(\x0b\x32\x1f.frontend.v1.VariantPreferencesJ\x04\x08\x05\x10\x06R\x11required_model_id\"\'\n\x15\x45xecuteActionResponse\x12\x0e\n\x06run_id\x18\x01 \x01(\t\"\"\n\x10\x43\x61ncelRunRequest\x12\x0e\n\x06run_id\x18\x01 \x01(\t\"\x1f\n\x11\x43\x61ncelRunResponse\x12\n\n\x02ok\x18\x01 \x01(\x08\"\x84\x01\n\x0cRealtimeOpen\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x12\n\nrelease_id\x18\x02 \x01(\t\x12\x15\n\rfunction_name\x18\x03 \x01(\t\x12\r\n\x05owner\x18\x04 \x01(\t\x12\x12\n\ninvoker_id\x18\x05 \x01(\t\x12\x12\n\ntimeout_ms\x18\x06 \x01(\x03\"B\n\rRealtimeFrame\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\x0f\n\x07is_text\x18\x03 \x01(\x08\"3\n\rRealtimeClose\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0e\n\x06reason\x18\x02 \x01(\t\"\xa2\x01\n\x14RealtimeProxyMessage\x12)\n\x04open\x18\x01 \x01(\x0b\x32\x19.frontend.v1.RealtimeOpenH\x00\x12+\n\x05\x66rame\x18\x02 \x01(\x0b\x32\x1a.frontend.v1.RealtimeFrameH\x00\x12+\n\x05\x63lose\x18\x03 \x01(\x0b\x32\x1a.frontend.v1.RealtimeCloseH\x00\x42\x05\n\x03msg2\x92\x02\n\x0f\x46rontendService\x12V\n\rExecuteAction\x12!.frontend.v1.ExecuteActionRequest\x1a\".frontend.v1.ExecuteActionResponse\x12J\n\tCancelRun\x12\x1d.frontend.v1.CancelRunRequest\x1a\x1e.frontend.v1.CancelRunResponse\x12[\n\x0fRealtimeSession\x12!.frontend.v1.RealtimeProxyMessage\x1a!.frontend.v1.RealtimeProxyMessage(\x01\x30\x01\x42GZEgithub.com/cozy-creator/gen-orchestrator/pkg/pb/frontendv1;frontendv1b\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -45,19 +45,19 @@ _globals['_EXECUTEACTIONREQUEST']._serialized_start=492 _globals['_EXECUTEACTIONREQUEST']._serialized_end=769 _globals['_EXECUTEACTIONRESPONSE']._serialized_start=771 - _globals['_EXECUTEACTIONRESPONSE']._serialized_end=814 - _globals['_CANCELREQUESTREQUEST']._serialized_start=816 - _globals['_CANCELREQUESTREQUEST']._serialized_end=858 - _globals['_CANCELREQUESTRESPONSE']._serialized_start=860 - _globals['_CANCELREQUESTRESPONSE']._serialized_end=895 - _globals['_REALTIMEOPEN']._serialized_start=898 - _globals['_REALTIMEOPEN']._serialized_end=1030 - _globals['_REALTIMEFRAME']._serialized_start=1032 - _globals['_REALTIMEFRAME']._serialized_end=1098 - _globals['_REALTIMECLOSE']._serialized_start=1100 - _globals['_REALTIMECLOSE']._serialized_end=1151 - _globals['_REALTIMEPROXYMESSAGE']._serialized_start=1154 - _globals['_REALTIMEPROXYMESSAGE']._serialized_end=1316 - _globals['_FRONTENDSERVICE']._serialized_start=1319 - _globals['_FRONTENDSERVICE']._serialized_end=1605 + _globals['_EXECUTEACTIONRESPONSE']._serialized_end=810 + _globals['_CANCELRUNREQUEST']._serialized_start=812 + _globals['_CANCELRUNREQUEST']._serialized_end=846 + _globals['_CANCELRUNRESPONSE']._serialized_start=848 + _globals['_CANCELRUNRESPONSE']._serialized_end=879 + _globals['_REALTIMEOPEN']._serialized_start=882 + _globals['_REALTIMEOPEN']._serialized_end=1014 + _globals['_REALTIMEFRAME']._serialized_start=1016 + _globals['_REALTIMEFRAME']._serialized_end=1082 + _globals['_REALTIMECLOSE']._serialized_start=1084 + _globals['_REALTIMECLOSE']._serialized_end=1135 + _globals['_REALTIMEPROXYMESSAGE']._serialized_start=1138 + _globals['_REALTIMEPROXYMESSAGE']._serialized_end=1300 + _globals['_FRONTENDSERVICE']._serialized_start=1303 + _globals['_FRONTENDSERVICE']._serialized_end=1577 # @@protoc_insertion_point(module_scope) diff --git a/src/gen_worker/pb/frontend_pb2_grpc.py b/src/gen_worker/pb/frontend_pb2_grpc.py index a3d4fb3..dcd9c3b 100644 --- a/src/gen_worker/pb/frontend_pb2_grpc.py +++ b/src/gen_worker/pb/frontend_pb2_grpc.py @@ -40,10 +40,10 @@ def __init__(self, channel): request_serializer=frontend__pb2.ExecuteActionRequest.SerializeToString, response_deserializer=frontend__pb2.ExecuteActionResponse.FromString, _registered_method=True) - self.CancelRequest = channel.unary_unary( - '/frontend.v1.FrontendService/CancelRequest', - request_serializer=frontend__pb2.CancelRequestRequest.SerializeToString, - response_deserializer=frontend__pb2.CancelRequestResponse.FromString, + self.CancelRun = channel.unary_unary( + '/frontend.v1.FrontendService/CancelRun', + request_serializer=frontend__pb2.CancelRunRequest.SerializeToString, + response_deserializer=frontend__pb2.CancelRunResponse.FromString, _registered_method=True) self.RealtimeSession = channel.stream_stream( '/frontend.v1.FrontendService/RealtimeSession', @@ -63,8 +63,8 @@ def ExecuteAction(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') - def CancelRequest(self, request, context): - """2) Cancel an in-flight request. + def CancelRun(self, request, context): + """2) Cancel an in-flight action/job. """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') @@ -85,10 +85,10 @@ def add_FrontendServiceServicer_to_server(servicer, server): request_deserializer=frontend__pb2.ExecuteActionRequest.FromString, response_serializer=frontend__pb2.ExecuteActionResponse.SerializeToString, ), - 'CancelRequest': grpc.unary_unary_rpc_method_handler( - servicer.CancelRequest, - request_deserializer=frontend__pb2.CancelRequestRequest.FromString, - response_serializer=frontend__pb2.CancelRequestResponse.SerializeToString, + 'CancelRun': grpc.unary_unary_rpc_method_handler( + servicer.CancelRun, + request_deserializer=frontend__pb2.CancelRunRequest.FromString, + response_serializer=frontend__pb2.CancelRunResponse.SerializeToString, ), 'RealtimeSession': grpc.stream_stream_rpc_method_handler( servicer.RealtimeSession, @@ -135,7 +135,7 @@ def ExecuteAction(request, _registered_method=True) @staticmethod - def CancelRequest(request, + def CancelRun(request, target, options=(), channel_credentials=None, @@ -148,9 +148,9 @@ def CancelRequest(request, return grpc.experimental.unary_unary( request, target, - '/frontend.v1.FrontendService/CancelRequest', - frontend__pb2.CancelRequestRequest.SerializeToString, - frontend__pb2.CancelRequestResponse.FromString, + '/frontend.v1.FrontendService/CancelRun', + frontend__pb2.CancelRunRequest.SerializeToString, + frontend__pb2.CancelRunResponse.FromString, options, channel_credentials, insecure, diff --git a/src/gen_worker/pb/worker_scheduler_pb2.py b/src/gen_worker/pb/worker_scheduler_pb2.py index d82c370..63c0575 100644 --- a/src/gen_worker/pb/worker_scheduler_pb2.py +++ b/src/gen_worker/pb/worker_scheduler_pb2.py @@ -24,7 +24,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x16worker_scheduler.proto\x12\x0cscheduler.v1\"\xb7\x06\n\x0fWorkerResources\x12\x11\n\tworker_id\x18\x01 \x01(\t\x12\x11\n\tcpu_cores\x18\x02 \x01(\x05\x12\x14\n\x0cmemory_bytes\x18\x03 \x01(\x03\x12\x11\n\tgpu_count\x18\x04 \x01(\x05\x12\x18\n\x10gpu_memory_bytes\x18\x05 \x01(\x03\x12\x1b\n\x13\x61vailable_functions\x18\x06 \x03(\t\x12\x13\n\x0bvram_models\x18\x07 \x03(\t\x12\x1e\n\x16supports_model_loading\x18\x08 \x01(\x08\x12\x12\n\nrelease_id\x18\t \x01(\t\x12\x15\n\rrunpod_pod_id\x18\n \x01(\t\x12\x13\n\x0bgpu_is_busy\x18\x0b \x01(\x08\x12\x1d\n\x15gpu_memory_used_bytes\x18\x0c \x01(\x03\x12\x10\n\x08gpu_name\x18\r \x01(\t\x12\x12\n\ngpu_driver\x18\x0e \x01(\t\x12\x1d\n\x15gpu_memory_free_bytes\x18\x0f \x01(\x03\x12\x17\n\x0fmax_concurrency\x18\x10 \x01(\x05\x12T\n\x14\x66unction_concurrency\x18\x11 \x03(\x0b\x32\x36.scheduler.v1.WorkerResources.FunctionConcurrencyEntry\x12\x14\n\x0c\x63uda_version\x18\x12 \x01(\t\x12\x15\n\rtorch_version\x18\x13 \x01(\t\x12\x36\n\x10\x66unction_schemas\x18\x14 \x03(\x0b\x32\x1c.scheduler.v1.FunctionSchema\x12\x0e\n\x06gpu_sm\x18\x15 \x01(\t\x12\x18\n\x10tensorrt_version\x18\x16 \x01(\t\x12\x1b\n\x13onnxruntime_version\x18\x17 \x01(\t\x12\x13\n\x0b\x64isk_models\x18\x18 \x03(\t\x12\x16\n\x0einstalled_libs\x18\x19 \x03(\t\x12\x14\n\x0cimage_digest\x18\x1a \x01(\t\x12\x12\n\ngit_commit\x18\x1b \x01(\t\x12\x17\n\x0f\x62uild_timestamp\x18\x1c \x01(\x03\x1a:\n\x18\x46unctionConcurrencyEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x05:\x02\x38\x01\"\xa4\x01\n\x0e\x46unctionSchema\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x19\n\x11input_schema_json\x18\x02 \x01(\x0c\x12\x1a\n\x12output_schema_json\x18\x03 \x01(\x0c\x12\x16\n\x0einjection_json\x18\x04 \x01(\x0c\x12\x1a\n\x12incremental_output\x18\x05 \x01(\x08\x12\x19\n\x11\x64\x65lta_schema_json\x18\x06 \x01(\x0c\"\x8c\x01\n\x12WorkerRegistration\x12\x30\n\tresources\x18\x01 \x01(\x0b\x32\x1d.scheduler.v1.WorkerResources\x12\x14\n\x0cis_heartbeat\x18\x02 \x01(\x08\x12\x16\n\x0eprotocol_major\x18\x03 \x01(\x05\x12\x16\n\x0eprotocol_minor\x18\x04 \x01(\x05\"$\n\x10LoadModelCommand\x12\x10\n\x08model_id\x18\x01 \x01(\t\"&\n\x12UnloadModelCommand\x12\x10\n\x08model_id\x18\x01 \x01(\t\"X\n\x14InterruptTaskCommand\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x10\n\x08item_ids\x18\x02 \x03(\t\x12\x1a\n\x12\x63\x61ncel_queued_only\x18\x03 \x01(\x08\"\xb7\x01\n\x15RuntimeBatchingConfig\x12\x15\n\rfunction_name\x18\x01 \x01(\t\x12\x19\n\x11\x62\x61tch_size_target\x18\x02 \x01(\x05\x12\x16\n\x0e\x62\x61tch_size_min\x18\x03 \x01(\x05\x12\x16\n\x0e\x62\x61tch_size_max\x18\x04 \x01(\x05\x12\x16\n\x0eprefetch_depth\x18\x05 \x01(\x05\x12\x13\n\x0bmax_wait_ms\x18\x06 \x01(\x05\x12\x0f\n\x07version\x18\x07 \x01(\x03\"S\n\x1cRuntimeBatchingConfigCommand\x12\x33\n\x06\x63onfig\x18\x01 \x01(\x0b\x32#.scheduler.v1.RuntimeBatchingConfig\"m\n\x1bRuntimeBatchingConfigResult\x12\x15\n\rfunction_name\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\x03\x12\x0f\n\x07success\x18\x03 \x01(\x08\x12\x15\n\rerror_message\x18\x04 \x01(\t\"V\n\x15ResolvedCozyModelFile\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x12\n\nsize_bytes\x18\x02 \x01(\x03\x12\x0e\n\x06\x62lake3\x18\x03 \x01(\t\x12\x0b\n\x03url\x18\x04 \x01(\t\"`\n\x11ResolvedCozyModel\x12\x17\n\x0fsnapshot_digest\x18\x01 \x01(\t\x12\x32\n\x05\x66iles\x18\x02 \x03(\x0b\x32#.scheduler.v1.ResolvedCozyModelFile\".\n\x1a\x43ozyModelURLRefreshRequest\x12\x10\n\x08model_id\x18\x01 \x01(\t\"b\n\x1b\x43ozyModelURLRefreshResponse\x12\x10\n\x08model_id\x18\x01 \x01(\t\x12\x31\n\x08resolved\x18\x02 \x01(\x0b\x32\x1f.scheduler.v1.ResolvedCozyModel\"\xf7\x03\n\x14TaskExecutionRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x15\n\rfunction_name\x18\x02 \x01(\t\x12\x15\n\rinput_payload\x18\x03 \x01(\x0c\x12\x1d\n\x15required_variant_refs\x18\x04 \x03(\t\x12\x12\n\ntimeout_ms\x18\x05 \x01(\x03\x12\r\n\x05owner\x18\x06 \x01(\t\x12\x12\n\ninvoker_id\x18\x07 \x01(\t\x12\x15\n\rfile_base_url\x18\x08 \x01(\t\x12\x12\n\nfile_token\x18\t \x01(\t\x12\x62\n\x1aresolved_cozy_models_by_id\x18\n \x03(\x0b\x32>.scheduler.v1.TaskExecutionRequest.ResolvedCozyModelsByIdEntry\x12\x19\n\x11parent_request_id\x18\x0b \x01(\t\x12\x18\n\x10\x63hild_request_id\x18\x0c \x01(\t\x12\x0f\n\x07item_id\x18\r \x01(\t\x12\x12\n\nitem_index\x18\x0e \x01(\x05\x1a^\n\x1bResolvedCozyModelsByIdEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12.\n\x05value\x18\x02 \x01(\x0b\x32\x1f.scheduler.v1.ResolvedCozyModel:\x02\x38\x01\"\x80\x02\n\x13TaskExecutionResult\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x16\n\x0eoutput_payload\x18\x03 \x01(\x0c\x12\x15\n\rerror_message\x18\x04 \x01(\t\x12\x12\n\nerror_type\x18\x05 \x01(\t\x12\x11\n\tretryable\x18\x06 \x01(\x08\x12\x14\n\x0csafe_message\x18\x07 \x01(\t\x12\x19\n\x11parent_request_id\x18\x08 \x01(\t\x12\x18\n\x10\x63hild_request_id\x18\t \x01(\t\x12\x0f\n\x07item_id\x18\n \x01(\t\x12\x12\n\nitem_index\x18\x0b \x01(\x05\"\xc8\x03\n\x12\x42\x61tchExecutionItem\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07item_id\x18\x02 \x01(\t\x12\r\n\x05owner\x18\x03 \x01(\t\x12\x12\n\ninvoker_id\x18\x04 \x01(\t\x12\x15\n\rfunction_name\x18\x05 \x01(\t\x12\x15\n\rinput_payload\x18\x06 \x01(\x0c\x12\x1d\n\x15required_variant_refs\x18\x07 \x03(\t\x12\x12\n\ntimeout_ms\x18\x08 \x01(\x03\x12`\n\x1aresolved_cozy_models_by_id\x18\t \x03(\x0b\x32<.scheduler.v1.BatchExecutionItem.ResolvedCozyModelsByIdEntry\x12\x19\n\x11parent_request_id\x18\n \x01(\t\x12\x18\n\x10\x63hild_request_id\x18\x0b \x01(\t\x12\x12\n\nitem_index\x18\x0c \x01(\x05\x1a^\n\x1bResolvedCozyModelsByIdEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12.\n\x05value\x18\x02 \x01(\x0b\x32\x1f.scheduler.v1.ResolvedCozyModel:\x02\x38\x01\"\xbc\x01\n\x15\x42\x61tchExecutionRequest\x12\x10\n\x08\x62\x61tch_id\x18\x01 \x01(\t\x12\x12\n\nrelease_id\x18\x02 \x01(\t\x12\x15\n\rfunction_name\x18\x03 \x01(\t\x12\x19\n\x11\x63ompatibility_key\x18\x04 \x01(\t\x12\x1a\n\x12\x63reated_at_unix_ms\x18\x05 \x01(\x03\x12/\n\x05items\x18\x06 \x03(\x0b\x32 .scheduler.v1.BatchExecutionItem\"\x85\x02\n\x18\x42\x61tchExecutionItemResult\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07item_id\x18\x02 \x01(\t\x12\x0f\n\x07success\x18\x03 \x01(\x08\x12\x16\n\x0eoutput_payload\x18\x04 \x01(\x0c\x12\x12\n\nerror_type\x18\x05 \x01(\t\x12\x11\n\tretryable\x18\x06 \x01(\x08\x12\x14\n\x0csafe_message\x18\x07 \x01(\t\x12\x15\n\rerror_message\x18\x08 \x01(\t\x12\x19\n\x11parent_request_id\x18\t \x01(\t\x12\x18\n\x10\x63hild_request_id\x18\n \x01(\t\x12\x12\n\nitem_index\x18\x0b \x01(\x05\"_\n\x14\x42\x61tchExecutionResult\x12\x10\n\x08\x62\x61tch_id\x18\x01 \x01(\t\x12\x35\n\x05items\x18\x02 \x03(\x0b\x32&.scheduler.v1.BatchExecutionItemResult\"\xa5\x01\n\x0bWorkerEvent\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x12\n\nevent_type\x18\x02 \x01(\t\x12\x14\n\x0cpayload_json\x18\x03 \x01(\x0c\x12\x19\n\x11parent_request_id\x18\x04 \x01(\t\x12\x18\n\x10\x63hild_request_id\x18\x05 \x01(\t\x12\x0f\n\x07item_id\x18\x06 \x01(\t\x12\x12\n\nitem_index\x18\x07 \x01(\x05\"\xf3\x01\n\x15IncrementalTokenDelta\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07item_id\x18\x02 \x01(\t\x12\x15\n\rfunction_name\x18\x03 \x01(\t\x12\x10\n\x08sequence\x18\x04 \x01(\x03\x12\x19\n\x11timestamp_unix_ms\x18\x05 \x01(\x03\x12\x12\n\ndelta_text\x18\x06 \x01(\t\x12\x14\n\x0cpayload_json\x18\x07 \x01(\x0c\x12\x19\n\x11parent_request_id\x18\x08 \x01(\t\x12\x18\n\x10\x63hild_request_id\x18\t \x01(\t\x12\x12\n\nitem_index\x18\n \x01(\x05\"\xce\x01\n\x1aIncrementalTokenStreamDone\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07item_id\x18\x02 \x01(\t\x12\x15\n\rfunction_name\x18\x03 \x01(\t\x12\x10\n\x08sequence\x18\x04 \x01(\x03\x12\x19\n\x11timestamp_unix_ms\x18\x05 \x01(\x03\x12\x19\n\x11parent_request_id\x18\x06 \x01(\t\x12\x18\n\x10\x63hild_request_id\x18\x07 \x01(\t\x12\x12\n\nitem_index\x18\x08 \x01(\x05\"\xe6\x01\n\x1bIncrementalTokenStreamError\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07item_id\x18\x02 \x01(\t\x12\x15\n\rfunction_name\x18\x03 \x01(\t\x12\x10\n\x08sequence\x18\x04 \x01(\x03\x12\x19\n\x11timestamp_unix_ms\x18\x05 \x01(\x03\x12\x15\n\rerror_message\x18\x06 \x01(\t\x12\x19\n\x11parent_request_id\x18\x07 \x01(\t\x12\x18\n\x10\x63hild_request_id\x18\x08 \x01(\t\x12\x12\n\nitem_index\x18\t \x01(\x05\"\xc1\x01\n\x13RealtimeOpenCommand\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x15\n\rfunction_name\x18\x02 \x01(\t\x12\x1d\n\x15required_variant_refs\x18\x03 \x03(\t\x12\r\n\x05owner\x18\x04 \x01(\t\x12\x12\n\ninvoker_id\x18\x05 \x01(\t\x12\x12\n\ntimeout_ms\x18\x06 \x01(\x03\x12\x15\n\rfile_base_url\x18\x07 \x01(\t\x12\x12\n\nfile_token\x18\x08 \x01(\t\"B\n\rRealtimeFrame\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\x0f\n\x07is_text\x18\x03 \x01(\x08\":\n\x14RealtimeCloseCommand\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0e\n\x06reason\x18\x02 \x01(\t\"K\n\x0fLoadModelResult\x12\x10\n\x08model_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x15\n\rerror_message\x18\x03 \x01(\t\"M\n\x11UnloadModelResult\x12\x10\n\x08model_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x15\n\rerror_message\x18\x03 \x01(\t\"(\n\tModelSpec\x12\x0b\n\x03ref\x18\x01 \x01(\t\x12\x0e\n\x06\x64types\x18\x02 \x03(\t\"\x8c\x01\n\x0bModelsByKey\x12\x35\n\x06models\x18\x01 \x03(\x0b\x32%.scheduler.v1.ModelsByKey.ModelsEntry\x1a\x46\n\x0bModelsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12&\n\x05value\x18\x02 \x01(\x0b\x32\x17.scheduler.v1.ModelSpec:\x02\x38\x01\"\xc5\x04\n\x0e\x45ndpointConfig\x12\x1b\n\x13supported_repo_refs\x18\x01 \x03(\t\x12G\n\x0frepo_ref_by_key\x18\x02 \x03(\x0b\x32..scheduler.v1.EndpointConfig.RepoRefByKeyEntry\x12m\n#resolved_cozy_models_by_variant_ref\x18\x03 \x03(\x0b\x32@.scheduler.v1.EndpointConfig.ResolvedCozyModelsByVariantRefEntry\x12\x1d\n\x15required_variant_refs\x18\x04 \x03(\t\x12N\n\x12models_by_function\x18\x05 \x03(\x0b\x32\x32.scheduler.v1.EndpointConfig.ModelsByFunctionEntry\x1a\x33\n\x11RepoRefByKeyEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x66\n#ResolvedCozyModelsByVariantRefEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12.\n\x05value\x18\x02 \x01(\x0b\x32\x1f.scheduler.v1.ResolvedCozyModel:\x02\x38\x01\x1aR\n\x15ModelsByFunctionEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12(\n\x05value\x18\x02 \x01(\x0b\x32\x19.scheduler.v1.ModelsByKey:\x02\x38\x01\"\x85\x0c\n\x16WorkerSchedulerMessage\x12?\n\x13worker_registration\x18\x01 \x01(\x0b\x32 .scheduler.v1.WorkerRegistrationH\x00\x12\x37\n\nrun_result\x18\x02 \x01(\x0b\x32!.scheduler.v1.TaskExecutionResultH\x00\x12:\n\x11load_model_result\x18\x04 \x01(\x0b\x32\x1d.scheduler.v1.LoadModelResultH\x00\x12>\n\x13unload_model_result\x18\x05 \x01(\x0b\x32\x1f.scheduler.v1.UnloadModelResultH\x00\x12\x31\n\x0cworker_event\x18\x06 \x01(\x0b\x32\x19.scheduler.v1.WorkerEventH\x00\x12R\n\x1e\x63ozy_model_url_refresh_request\x18\x07 \x01(\x0b\x32(.scheduler.v1.CozyModelURLRefreshRequestH\x00\x12S\n\x1eruntime_batching_config_result\x18\x13 \x01(\x0b\x32).scheduler.v1.RuntimeBatchingConfigResultH\x00\x12>\n\x10\x62\x61tch_run_result\x18\x15 \x01(\x0b\x32\".scheduler.v1.BatchExecutionResultH\x00\x12\x46\n\x17incremental_token_delta\x18\x17 \x01(\x0b\x32#.scheduler.v1.IncrementalTokenDeltaH\x00\x12Q\n\x1dincremental_token_stream_done\x18\x18 \x01(\x0b\x32(.scheduler.v1.IncrementalTokenStreamDoneH\x00\x12S\n\x1eincremental_token_stream_error\x18\x19 \x01(\x0b\x32).scheduler.v1.IncrementalTokenStreamErrorH\x00\x12\x39\n\x0brun_request\x18\n \x01(\x0b\x32\".scheduler.v1.TaskExecutionRequestH\x00\x12@\n\x11\x62\x61tch_run_request\x18\x16 \x01(\x0b\x32#.scheduler.v1.BatchExecutionRequestH\x00\x12\x38\n\x0eload_model_cmd\x18\x0b \x01(\x0b\x32\x1e.scheduler.v1.LoadModelCommandH\x00\x12<\n\x10unload_model_cmd\x18\x0c \x01(\x0b\x32 .scheduler.v1.UnloadModelCommandH\x00\x12?\n\x11interrupt_run_cmd\x18\r \x01(\x0b\x32\".scheduler.v1.InterruptTaskCommandH\x00\x12\x37\n\x0f\x65ndpoint_config\x18\x0e \x01(\x0b\x32\x1c.scheduler.v1.EndpointConfigH\x00\x12>\n\x11realtime_open_cmd\x18\x0f \x01(\x0b\x32!.scheduler.v1.RealtimeOpenCommandH\x00\x12\x35\n\x0erealtime_frame\x18\x10 \x01(\x0b\x32\x1b.scheduler.v1.RealtimeFrameH\x00\x12@\n\x12realtime_close_cmd\x18\x11 \x01(\x0b\x32\".scheduler.v1.RealtimeCloseCommandH\x00\x12T\n\x1f\x63ozy_model_url_refresh_response\x18\x12 \x01(\x0b\x32).scheduler.v1.CozyModelURLRefreshResponseH\x00\x12Q\n\x1bruntime_batching_config_cmd\x18\x14 \x01(\x0b\x32*.scheduler.v1.RuntimeBatchingConfigCommandH\x00\x42\x05\n\x03msgJ\x04\x08\x03\x10\x04R\x0bspawn_tasks2y\n\x16SchedulerWorkerService\x12_\n\rConnectWorker\x12$.scheduler.v1.WorkerSchedulerMessage\x1a$.scheduler.v1.WorkerSchedulerMessage(\x01\x30\x01\x42UZSgithub.com/cozy-creator/gen-orchestrator/pkg/pb/workerschedulerv1;workerschedulerv1b\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x16worker_scheduler.proto\x12\x0cscheduler.v1\"\xb7\x06\n\x0fWorkerResources\x12\x11\n\tworker_id\x18\x01 \x01(\t\x12\x11\n\tcpu_cores\x18\x02 \x01(\x05\x12\x14\n\x0cmemory_bytes\x18\x03 \x01(\x03\x12\x11\n\tgpu_count\x18\x04 \x01(\x05\x12\x18\n\x10gpu_memory_bytes\x18\x05 \x01(\x03\x12\x1b\n\x13\x61vailable_functions\x18\x06 \x03(\t\x12\x13\n\x0bvram_models\x18\x07 \x03(\t\x12\x1e\n\x16supports_model_loading\x18\x08 \x01(\x08\x12\x12\n\nrelease_id\x18\t \x01(\t\x12\x15\n\rrunpod_pod_id\x18\n \x01(\t\x12\x13\n\x0bgpu_is_busy\x18\x0b \x01(\x08\x12\x1d\n\x15gpu_memory_used_bytes\x18\x0c \x01(\x03\x12\x10\n\x08gpu_name\x18\r \x01(\t\x12\x12\n\ngpu_driver\x18\x0e \x01(\t\x12\x1d\n\x15gpu_memory_free_bytes\x18\x0f \x01(\x03\x12\x17\n\x0fmax_concurrency\x18\x10 \x01(\x05\x12T\n\x14\x66unction_concurrency\x18\x11 \x03(\x0b\x32\x36.scheduler.v1.WorkerResources.FunctionConcurrencyEntry\x12\x14\n\x0c\x63uda_version\x18\x12 \x01(\t\x12\x15\n\rtorch_version\x18\x13 \x01(\t\x12\x36\n\x10\x66unction_schemas\x18\x14 \x03(\x0b\x32\x1c.scheduler.v1.FunctionSchema\x12\x0e\n\x06gpu_sm\x18\x15 \x01(\t\x12\x18\n\x10tensorrt_version\x18\x16 \x01(\t\x12\x1b\n\x13onnxruntime_version\x18\x17 \x01(\t\x12\x13\n\x0b\x64isk_models\x18\x18 \x03(\t\x12\x16\n\x0einstalled_libs\x18\x19 \x03(\t\x12\x14\n\x0cimage_digest\x18\x1a \x01(\t\x12\x12\n\ngit_commit\x18\x1b \x01(\t\x12\x17\n\x0f\x62uild_timestamp\x18\x1c \x01(\x03\x1a:\n\x18\x46unctionConcurrencyEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x05:\x02\x38\x01\"\xa4\x01\n\x0e\x46unctionSchema\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x19\n\x11input_schema_json\x18\x02 \x01(\x0c\x12\x1a\n\x12output_schema_json\x18\x03 \x01(\x0c\x12\x16\n\x0einjection_json\x18\x04 \x01(\x0c\x12\x1a\n\x12incremental_output\x18\x05 \x01(\x08\x12\x19\n\x11\x64\x65lta_schema_json\x18\x06 \x01(\x0c\"\x8c\x01\n\x12WorkerRegistration\x12\x30\n\tresources\x18\x01 \x01(\x0b\x32\x1d.scheduler.v1.WorkerResources\x12\x14\n\x0cis_heartbeat\x18\x02 \x01(\x08\x12\x16\n\x0eprotocol_major\x18\x03 \x01(\x05\x12\x16\n\x0eprotocol_minor\x18\x04 \x01(\x05\"\xe4\x01\n\x10LoadModelCommand\x12\x10\n\x08model_id\x18\x01 \x01(\t\x12^\n\x1aresolved_cozy_models_by_id\x18\x02 \x03(\x0b\x32:.scheduler.v1.LoadModelCommand.ResolvedCozyModelsByIdEntry\x1a^\n\x1bResolvedCozyModelsByIdEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12.\n\x05value\x18\x02 \x01(\x0b\x32\x1f.scheduler.v1.ResolvedCozyModel:\x02\x38\x01\"&\n\x12UnloadModelCommand\x12\x10\n\x08model_id\x18\x01 \x01(\t\"&\n\x14InterruptTaskCommand\x12\x0e\n\x06run_id\x18\x01 \x01(\t\"V\n\x15ResolvedCozyModelFile\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x12\n\nsize_bytes\x18\x02 \x01(\x03\x12\x0e\n\x06\x62lake3\x18\x03 \x01(\t\x12\x0b\n\x03url\x18\x04 \x01(\t\"`\n\x11ResolvedCozyModel\x12\x17\n\x0fsnapshot_digest\x18\x01 \x01(\t\x12\x32\n\x05\x66iles\x18\x02 \x03(\x0b\x32#.scheduler.v1.ResolvedCozyModelFile\".\n\x1a\x43ozyModelURLRefreshRequest\x12\x10\n\x08model_id\x18\x01 \x01(\t\"b\n\x1b\x43ozyModelURLRefreshResponse\x12\x10\n\x08model_id\x18\x01 \x01(\t\x12\x31\n\x08resolved\x18\x02 \x01(\x0b\x32\x1f.scheduler.v1.ResolvedCozyModel\"\x99\x03\n\x14TaskExecutionRequest\x12\x0e\n\x06run_id\x18\x01 \x01(\t\x12\x15\n\rfunction_name\x18\x02 \x01(\t\x12\x15\n\rinput_payload\x18\x03 \x01(\x0c\x12\x1d\n\x15required_variant_refs\x18\x04 \x03(\t\x12\x12\n\ntimeout_ms\x18\x05 \x01(\x03\x12\r\n\x05owner\x18\x06 \x01(\t\x12\x12\n\ninvoker_id\x18\x07 \x01(\t\x12\x15\n\rfile_base_url\x18\x08 \x01(\t\x12\x12\n\nfile_token\x18\t \x01(\t\x12\x62\n\x1aresolved_cozy_models_by_id\x18\n \x03(\x0b\x32>.scheduler.v1.TaskExecutionRequest.ResolvedCozyModelsByIdEntry\x1a^\n\x1bResolvedCozyModelsByIdEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12.\n\x05value\x18\x02 \x01(\x0b\x32\x1f.scheduler.v1.ResolvedCozyModel:\x02\x38\x01\"\xa2\x01\n\x13TaskExecutionResult\x12\x0e\n\x06run_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x16\n\x0eoutput_payload\x18\x03 \x01(\x0c\x12\x15\n\rerror_message\x18\x04 \x01(\t\x12\x12\n\nerror_type\x18\x05 \x01(\t\x12\x11\n\tretryable\x18\x06 \x01(\x08\x12\x14\n\x0csafe_message\x18\x07 \x01(\t\"G\n\x0bWorkerEvent\x12\x0e\n\x06run_id\x18\x01 \x01(\t\x12\x12\n\nevent_type\x18\x02 \x01(\t\x12\x14\n\x0cpayload_json\x18\x03 \x01(\x0c\"\xc1\x01\n\x13RealtimeOpenCommand\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x15\n\rfunction_name\x18\x02 \x01(\t\x12\x1d\n\x15required_variant_refs\x18\x03 \x03(\t\x12\r\n\x05owner\x18\x04 \x01(\t\x12\x12\n\ninvoker_id\x18\x05 \x01(\t\x12\x12\n\ntimeout_ms\x18\x06 \x01(\x03\x12\x15\n\rfile_base_url\x18\x07 \x01(\t\x12\x12\n\nfile_token\x18\x08 \x01(\t\"B\n\rRealtimeFrame\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\x0f\n\x07is_text\x18\x03 \x01(\x08\":\n\x14RealtimeCloseCommand\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0e\n\x06reason\x18\x02 \x01(\t\"K\n\x0fLoadModelResult\x12\x10\n\x08model_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x15\n\rerror_message\x18\x03 \x01(\t\"M\n\x11UnloadModelResult\x12\x10\n\x08model_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x15\n\rerror_message\x18\x03 \x01(\t\"(\n\tModelSpec\x12\x0b\n\x03ref\x18\x01 \x01(\t\x12\x0e\n\x06\x64types\x18\x02 \x03(\t\"\x8c\x01\n\x0bModelsByKey\x12\x35\n\x06models\x18\x01 \x03(\x0b\x32%.scheduler.v1.ModelsByKey.ModelsEntry\x1a\x46\n\x0bModelsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12&\n\x05value\x18\x02 \x01(\x0b\x32\x17.scheduler.v1.ModelSpec:\x02\x38\x01\"\xe1\x04\n\x15ReleaseArtifactConfig\x12\x1b\n\x13supported_repo_refs\x18\x01 \x03(\t\x12N\n\x0frepo_ref_by_key\x18\x02 \x03(\x0b\x32\x35.scheduler.v1.ReleaseArtifactConfig.RepoRefByKeyEntry\x12t\n#resolved_cozy_models_by_variant_ref\x18\x03 \x03(\x0b\x32G.scheduler.v1.ReleaseArtifactConfig.ResolvedCozyModelsByVariantRefEntry\x12\x1d\n\x15required_variant_refs\x18\x04 \x03(\t\x12U\n\x12models_by_function\x18\x05 \x03(\x0b\x32\x39.scheduler.v1.ReleaseArtifactConfig.ModelsByFunctionEntry\x1a\x33\n\x11RepoRefByKeyEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x66\n#ResolvedCozyModelsByVariantRefEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12.\n\x05value\x18\x02 \x01(\x0b\x32\x1f.scheduler.v1.ResolvedCozyModel:\x02\x38\x01\x1aR\n\x15ModelsByFunctionEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12(\n\x05value\x18\x02 \x01(\x0b\x32\x19.scheduler.v1.ModelsByKey:\x02\x38\x01\"\xfa\x07\n\x16WorkerSchedulerMessage\x12?\n\x13worker_registration\x18\x01 \x01(\x0b\x32 .scheduler.v1.WorkerRegistrationH\x00\x12\x37\n\nrun_result\x18\x02 \x01(\x0b\x32!.scheduler.v1.TaskExecutionResultH\x00\x12:\n\x11load_model_result\x18\x04 \x01(\x0b\x32\x1d.scheduler.v1.LoadModelResultH\x00\x12>\n\x13unload_model_result\x18\x05 \x01(\x0b\x32\x1f.scheduler.v1.UnloadModelResultH\x00\x12\x31\n\x0cworker_event\x18\x06 \x01(\x0b\x32\x19.scheduler.v1.WorkerEventH\x00\x12R\n\x1e\x63ozy_model_url_refresh_request\x18\x07 \x01(\x0b\x32(.scheduler.v1.CozyModelURLRefreshRequestH\x00\x12\x39\n\x0brun_request\x18\n \x01(\x0b\x32\".scheduler.v1.TaskExecutionRequestH\x00\x12\x38\n\x0eload_model_cmd\x18\x0b \x01(\x0b\x32\x1e.scheduler.v1.LoadModelCommandH\x00\x12<\n\x10unload_model_cmd\x18\x0c \x01(\x0b\x32 .scheduler.v1.UnloadModelCommandH\x00\x12?\n\x11interrupt_run_cmd\x18\r \x01(\x0b\x32\".scheduler.v1.InterruptTaskCommandH\x00\x12\x46\n\x17release_artifact_config\x18\x0e \x01(\x0b\x32#.scheduler.v1.ReleaseArtifactConfigH\x00\x12>\n\x11realtime_open_cmd\x18\x0f \x01(\x0b\x32!.scheduler.v1.RealtimeOpenCommandH\x00\x12\x35\n\x0erealtime_frame\x18\x10 \x01(\x0b\x32\x1b.scheduler.v1.RealtimeFrameH\x00\x12@\n\x12realtime_close_cmd\x18\x11 \x01(\x0b\x32\".scheduler.v1.RealtimeCloseCommandH\x00\x12T\n\x1f\x63ozy_model_url_refresh_response\x18\x12 \x01(\x0b\x32).scheduler.v1.CozyModelURLRefreshResponseH\x00\x42\x05\n\x03msgJ\x04\x08\x03\x10\x04R\x0bspawn_tasks2y\n\x16SchedulerWorkerService\x12_\n\rConnectWorker\x12$.scheduler.v1.WorkerSchedulerMessage\x1a$.scheduler.v1.WorkerSchedulerMessage(\x01\x30\x01\x42UZSgithub.com/cozy-creator/gen-orchestrator/pkg/pb/workerschedulerv1;workerschedulerv1b\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -34,18 +34,18 @@ _globals['DESCRIPTOR']._serialized_options = b'ZSgithub.com/cozy-creator/gen-orchestrator/pkg/pb/workerschedulerv1;workerschedulerv1' _globals['_WORKERRESOURCES_FUNCTIONCONCURRENCYENTRY']._loaded_options = None _globals['_WORKERRESOURCES_FUNCTIONCONCURRENCYENTRY']._serialized_options = b'8\001' + _globals['_LOADMODELCOMMAND_RESOLVEDCOZYMODELSBYIDENTRY']._loaded_options = None + _globals['_LOADMODELCOMMAND_RESOLVEDCOZYMODELSBYIDENTRY']._serialized_options = b'8\001' _globals['_TASKEXECUTIONREQUEST_RESOLVEDCOZYMODELSBYIDENTRY']._loaded_options = None _globals['_TASKEXECUTIONREQUEST_RESOLVEDCOZYMODELSBYIDENTRY']._serialized_options = b'8\001' - _globals['_BATCHEXECUTIONITEM_RESOLVEDCOZYMODELSBYIDENTRY']._loaded_options = None - _globals['_BATCHEXECUTIONITEM_RESOLVEDCOZYMODELSBYIDENTRY']._serialized_options = b'8\001' _globals['_MODELSBYKEY_MODELSENTRY']._loaded_options = None _globals['_MODELSBYKEY_MODELSENTRY']._serialized_options = b'8\001' - _globals['_ENDPOINTCONFIG_REPOREFBYKEYENTRY']._loaded_options = None - _globals['_ENDPOINTCONFIG_REPOREFBYKEYENTRY']._serialized_options = b'8\001' - _globals['_ENDPOINTCONFIG_RESOLVEDCOZYMODELSBYVARIANTREFENTRY']._loaded_options = None - _globals['_ENDPOINTCONFIG_RESOLVEDCOZYMODELSBYVARIANTREFENTRY']._serialized_options = b'8\001' - _globals['_ENDPOINTCONFIG_MODELSBYFUNCTIONENTRY']._loaded_options = None - _globals['_ENDPOINTCONFIG_MODELSBYFUNCTIONENTRY']._serialized_options = b'8\001' + _globals['_RELEASEARTIFACTCONFIG_REPOREFBYKEYENTRY']._loaded_options = None + _globals['_RELEASEARTIFACTCONFIG_REPOREFBYKEYENTRY']._serialized_options = b'8\001' + _globals['_RELEASEARTIFACTCONFIG_RESOLVEDCOZYMODELSBYVARIANTREFENTRY']._loaded_options = None + _globals['_RELEASEARTIFACTCONFIG_RESOLVEDCOZYMODELSBYVARIANTREFENTRY']._serialized_options = b'8\001' + _globals['_RELEASEARTIFACTCONFIG_MODELSBYFUNCTIONENTRY']._loaded_options = None + _globals['_RELEASEARTIFACTCONFIG_MODELSBYFUNCTIONENTRY']._serialized_options = b'8\001' _globals['_WORKERRESOURCES']._serialized_start=41 _globals['_WORKERRESOURCES']._serialized_end=864 _globals['_WORKERRESOURCES_FUNCTIONCONCURRENCYENTRY']._serialized_start=806 @@ -54,76 +54,56 @@ _globals['_FUNCTIONSCHEMA']._serialized_end=1031 _globals['_WORKERREGISTRATION']._serialized_start=1034 _globals['_WORKERREGISTRATION']._serialized_end=1174 - _globals['_LOADMODELCOMMAND']._serialized_start=1176 - _globals['_LOADMODELCOMMAND']._serialized_end=1212 - _globals['_UNLOADMODELCOMMAND']._serialized_start=1214 - _globals['_UNLOADMODELCOMMAND']._serialized_end=1252 - _globals['_INTERRUPTTASKCOMMAND']._serialized_start=1254 - _globals['_INTERRUPTTASKCOMMAND']._serialized_end=1342 - _globals['_RUNTIMEBATCHINGCONFIG']._serialized_start=1345 - _globals['_RUNTIMEBATCHINGCONFIG']._serialized_end=1528 - _globals['_RUNTIMEBATCHINGCONFIGCOMMAND']._serialized_start=1530 - _globals['_RUNTIMEBATCHINGCONFIGCOMMAND']._serialized_end=1613 - _globals['_RUNTIMEBATCHINGCONFIGRESULT']._serialized_start=1615 - _globals['_RUNTIMEBATCHINGCONFIGRESULT']._serialized_end=1724 - _globals['_RESOLVEDCOZYMODELFILE']._serialized_start=1726 - _globals['_RESOLVEDCOZYMODELFILE']._serialized_end=1812 - _globals['_RESOLVEDCOZYMODEL']._serialized_start=1814 - _globals['_RESOLVEDCOZYMODEL']._serialized_end=1910 - _globals['_COZYMODELURLREFRESHREQUEST']._serialized_start=1912 - _globals['_COZYMODELURLREFRESHREQUEST']._serialized_end=1958 - _globals['_COZYMODELURLREFRESHRESPONSE']._serialized_start=1960 - _globals['_COZYMODELURLREFRESHRESPONSE']._serialized_end=2058 - _globals['_TASKEXECUTIONREQUEST']._serialized_start=2061 - _globals['_TASKEXECUTIONREQUEST']._serialized_end=2564 - _globals['_TASKEXECUTIONREQUEST_RESOLVEDCOZYMODELSBYIDENTRY']._serialized_start=2470 - _globals['_TASKEXECUTIONREQUEST_RESOLVEDCOZYMODELSBYIDENTRY']._serialized_end=2564 - _globals['_TASKEXECUTIONRESULT']._serialized_start=2567 - _globals['_TASKEXECUTIONRESULT']._serialized_end=2823 - _globals['_BATCHEXECUTIONITEM']._serialized_start=2826 - _globals['_BATCHEXECUTIONITEM']._serialized_end=3282 - _globals['_BATCHEXECUTIONITEM_RESOLVEDCOZYMODELSBYIDENTRY']._serialized_start=2470 - _globals['_BATCHEXECUTIONITEM_RESOLVEDCOZYMODELSBYIDENTRY']._serialized_end=2564 - _globals['_BATCHEXECUTIONREQUEST']._serialized_start=3285 - _globals['_BATCHEXECUTIONREQUEST']._serialized_end=3473 - _globals['_BATCHEXECUTIONITEMRESULT']._serialized_start=3476 - _globals['_BATCHEXECUTIONITEMRESULT']._serialized_end=3737 - _globals['_BATCHEXECUTIONRESULT']._serialized_start=3739 - _globals['_BATCHEXECUTIONRESULT']._serialized_end=3834 - _globals['_WORKEREVENT']._serialized_start=3837 - _globals['_WORKEREVENT']._serialized_end=4002 - _globals['_INCREMENTALTOKENDELTA']._serialized_start=4005 - _globals['_INCREMENTALTOKENDELTA']._serialized_end=4248 - _globals['_INCREMENTALTOKENSTREAMDONE']._serialized_start=4251 - _globals['_INCREMENTALTOKENSTREAMDONE']._serialized_end=4457 - _globals['_INCREMENTALTOKENSTREAMERROR']._serialized_start=4460 - _globals['_INCREMENTALTOKENSTREAMERROR']._serialized_end=4690 - _globals['_REALTIMEOPENCOMMAND']._serialized_start=4693 - _globals['_REALTIMEOPENCOMMAND']._serialized_end=4886 - _globals['_REALTIMEFRAME']._serialized_start=4888 - _globals['_REALTIMEFRAME']._serialized_end=4954 - _globals['_REALTIMECLOSECOMMAND']._serialized_start=4956 - _globals['_REALTIMECLOSECOMMAND']._serialized_end=5014 - _globals['_LOADMODELRESULT']._serialized_start=5016 - _globals['_LOADMODELRESULT']._serialized_end=5091 - _globals['_UNLOADMODELRESULT']._serialized_start=5093 - _globals['_UNLOADMODELRESULT']._serialized_end=5170 - _globals['_MODELSPEC']._serialized_start=5172 - _globals['_MODELSPEC']._serialized_end=5212 - _globals['_MODELSBYKEY']._serialized_start=5215 - _globals['_MODELSBYKEY']._serialized_end=5355 - _globals['_MODELSBYKEY_MODELSENTRY']._serialized_start=5285 - _globals['_MODELSBYKEY_MODELSENTRY']._serialized_end=5355 - _globals['_ENDPOINTCONFIG']._serialized_start=5358 - _globals['_ENDPOINTCONFIG']._serialized_end=5939 - _globals['_ENDPOINTCONFIG_REPOREFBYKEYENTRY']._serialized_start=5700 - _globals['_ENDPOINTCONFIG_REPOREFBYKEYENTRY']._serialized_end=5751 - _globals['_ENDPOINTCONFIG_RESOLVEDCOZYMODELSBYVARIANTREFENTRY']._serialized_start=5753 - _globals['_ENDPOINTCONFIG_RESOLVEDCOZYMODELSBYVARIANTREFENTRY']._serialized_end=5855 - _globals['_ENDPOINTCONFIG_MODELSBYFUNCTIONENTRY']._serialized_start=5857 - _globals['_ENDPOINTCONFIG_MODELSBYFUNCTIONENTRY']._serialized_end=5939 - _globals['_WORKERSCHEDULERMESSAGE']._serialized_start=5942 - _globals['_WORKERSCHEDULERMESSAGE']._serialized_end=7483 - _globals['_SCHEDULERWORKERSERVICE']._serialized_start=7485 - _globals['_SCHEDULERWORKERSERVICE']._serialized_end=7606 + _globals['_LOADMODELCOMMAND']._serialized_start=1177 + _globals['_LOADMODELCOMMAND']._serialized_end=1405 + _globals['_LOADMODELCOMMAND_RESOLVEDCOZYMODELSBYIDENTRY']._serialized_start=1311 + _globals['_LOADMODELCOMMAND_RESOLVEDCOZYMODELSBYIDENTRY']._serialized_end=1405 + _globals['_UNLOADMODELCOMMAND']._serialized_start=1407 + _globals['_UNLOADMODELCOMMAND']._serialized_end=1445 + _globals['_INTERRUPTTASKCOMMAND']._serialized_start=1447 + _globals['_INTERRUPTTASKCOMMAND']._serialized_end=1485 + _globals['_RESOLVEDCOZYMODELFILE']._serialized_start=1487 + _globals['_RESOLVEDCOZYMODELFILE']._serialized_end=1573 + _globals['_RESOLVEDCOZYMODEL']._serialized_start=1575 + _globals['_RESOLVEDCOZYMODEL']._serialized_end=1671 + _globals['_COZYMODELURLREFRESHREQUEST']._serialized_start=1673 + _globals['_COZYMODELURLREFRESHREQUEST']._serialized_end=1719 + _globals['_COZYMODELURLREFRESHRESPONSE']._serialized_start=1721 + _globals['_COZYMODELURLREFRESHRESPONSE']._serialized_end=1819 + _globals['_TASKEXECUTIONREQUEST']._serialized_start=1822 + _globals['_TASKEXECUTIONREQUEST']._serialized_end=2231 + _globals['_TASKEXECUTIONREQUEST_RESOLVEDCOZYMODELSBYIDENTRY']._serialized_start=1311 + _globals['_TASKEXECUTIONREQUEST_RESOLVEDCOZYMODELSBYIDENTRY']._serialized_end=1405 + _globals['_TASKEXECUTIONRESULT']._serialized_start=2234 + _globals['_TASKEXECUTIONRESULT']._serialized_end=2396 + _globals['_WORKEREVENT']._serialized_start=2398 + _globals['_WORKEREVENT']._serialized_end=2469 + _globals['_REALTIMEOPENCOMMAND']._serialized_start=2472 + _globals['_REALTIMEOPENCOMMAND']._serialized_end=2665 + _globals['_REALTIMEFRAME']._serialized_start=2667 + _globals['_REALTIMEFRAME']._serialized_end=2733 + _globals['_REALTIMECLOSECOMMAND']._serialized_start=2735 + _globals['_REALTIMECLOSECOMMAND']._serialized_end=2793 + _globals['_LOADMODELRESULT']._serialized_start=2795 + _globals['_LOADMODELRESULT']._serialized_end=2870 + _globals['_UNLOADMODELRESULT']._serialized_start=2872 + _globals['_UNLOADMODELRESULT']._serialized_end=2949 + _globals['_MODELSPEC']._serialized_start=2951 + _globals['_MODELSPEC']._serialized_end=2991 + _globals['_MODELSBYKEY']._serialized_start=2994 + _globals['_MODELSBYKEY']._serialized_end=3134 + _globals['_MODELSBYKEY_MODELSENTRY']._serialized_start=3064 + _globals['_MODELSBYKEY_MODELSENTRY']._serialized_end=3134 + _globals['_RELEASEARTIFACTCONFIG']._serialized_start=3137 + _globals['_RELEASEARTIFACTCONFIG']._serialized_end=3746 + _globals['_RELEASEARTIFACTCONFIG_REPOREFBYKEYENTRY']._serialized_start=3507 + _globals['_RELEASEARTIFACTCONFIG_REPOREFBYKEYENTRY']._serialized_end=3558 + _globals['_RELEASEARTIFACTCONFIG_RESOLVEDCOZYMODELSBYVARIANTREFENTRY']._serialized_start=3560 + _globals['_RELEASEARTIFACTCONFIG_RESOLVEDCOZYMODELSBYVARIANTREFENTRY']._serialized_end=3662 + _globals['_RELEASEARTIFACTCONFIG_MODELSBYFUNCTIONENTRY']._serialized_start=3664 + _globals['_RELEASEARTIFACTCONFIG_MODELSBYFUNCTIONENTRY']._serialized_end=3746 + _globals['_WORKERSCHEDULERMESSAGE']._serialized_start=3749 + _globals['_WORKERSCHEDULERMESSAGE']._serialized_end=4767 + _globals['_SCHEDULERWORKERSERVICE']._serialized_start=4769 + _globals['_SCHEDULERWORKERSERVICE']._serialized_end=4890 # @@protoc_insertion_point(module_scope) diff --git a/src/gen_worker/testing/http_runner.py b/src/gen_worker/testing/http_runner.py index b6d905a..e38da01 100644 --- a/src/gen_worker/testing/http_runner.py +++ b/src/gen_worker/testing/http_runner.py @@ -113,7 +113,7 @@ def run_task_sync( raw = msgspec.msgpack.encode(payload_obj) req = pb.TaskExecutionRequest( - request_id=rid, + run_id=rid, function_name=fn, input_payload=raw, required_variant_refs=[str(v).strip() for v in (required_variant_refs or []) if str(v).strip()], @@ -152,7 +152,7 @@ def run_task_sync( payload = {} events.append( { - "request_id": str(ev.request_id or ""), + "request_id": str(ev.run_id or ""), "event_type": str(ev.event_type or ""), "payload": payload, } diff --git a/src/gen_worker/testing/mock_orchestrator.py b/src/gen_worker/testing/mock_orchestrator.py index de3bb37..ee32dee 100644 --- a/src/gen_worker/testing/mock_orchestrator.py +++ b/src/gen_worker/testing/mock_orchestrator.py @@ -198,7 +198,7 @@ def run_task( payload = _rewrite_refs_to_urls(payload_obj, input_ref_urls) raw = msgspec.msgpack.encode(payload) req = pb.TaskExecutionRequest( - request_id=rid, + run_id=rid, function_name=function_name, input_payload=raw, required_variant_refs=list(required_variant_refs), @@ -336,18 +336,18 @@ def _format_msg(msg: pb.WorkerSchedulerMessage) -> str: return "worker_registration" if msg.HasField("run_result"): rr = msg.run_result - return f"run_result request_id={rr.request_id} success={rr.success} error_type={rr.error_type!r} retryable={rr.retryable}" + return f"run_result request_id={rr.run_id} success={rr.success} error_type={rr.error_type!r} retryable={rr.retryable}" if msg.HasField("worker_event"): ev = msg.worker_event - return f"worker_event request_id={ev.request_id} type={ev.event_type}" + return f"worker_event request_id={ev.run_id} type={ev.event_type}" if msg.HasField("load_model_result"): return "load_model_result" if msg.HasField("unload_model_result"): return "unload_model_result" if msg.HasField("interrupt_run_cmd"): return "interrupt_run_cmd" - if msg.HasField("endpoint_config"): - return "endpoint_config" + if msg.HasField("release_artifact_config"): + return "release_artifact_config" if msg.HasField("realtime_open_cmd"): return "realtime_open_cmd" if msg.HasField("realtime_frame"): @@ -455,7 +455,7 @@ def main(argv: Optional[list[str]] = None) -> int: payload = "" print(f"[event] {msg.worker_event.event_type}: {payload}") continue - if msg.HasField("run_result") and msg.run_result.request_id == request_id: + if msg.HasField("run_result") and msg.run_result.run_id == request_id: rr = msg.run_result if rr.output_payload: out_obj = msgspec.msgpack.decode(rr.output_payload) diff --git a/src/gen_worker/worker.py b/src/gen_worker/worker.py index 0b63f37..7423fd4 100644 --- a/src/gen_worker/worker.py +++ b/src/gen_worker/worker.py @@ -2752,7 +2752,7 @@ def _process_message(self, message: WorkerSchedulerMessage) -> None: self._handle_unload_model_cmd(message.unload_model_cmd) elif msg_type == 'interrupt_run_cmd': cmd = message.interrupt_run_cmd - request_id = cmd.request_id + request_id = cmd.run_id item_ids = [str(x).strip() for x in list(getattr(cmd, "item_ids", []) or []) if str(x).strip()] cancel_queued_only = bool(getattr(cmd, "cancel_queued_only", False)) self._handle_interrupt_request(request_id, item_ids=item_ids, cancel_queued_only=cancel_queued_only) @@ -2767,11 +2767,11 @@ def _process_message(self, message: WorkerSchedulerMessage) -> None: elif msg_type == "worker_event": self._handle_worker_event_from_scheduler(message.worker_event) # Add handling for other message types if needed (e.g., config updates) - elif msg_type == 'endpoint_config': - cfg = message.endpoint_config + elif msg_type == 'release_artifact_config': + cfg = message.release_artifact_config resolved_by_variant = dict(getattr(cfg, "resolved_cozy_models_by_variant_ref", {}) or {}) logger.info( - "Received EndpointConfig (supported=%d required=%d resolved=%d)", + "Received ReleaseArtifactConfig (supported=%d required=%d resolved=%d)", len(cfg.supported_repo_refs), len(cfg.required_variant_refs), len(resolved_by_variant), @@ -2911,6 +2911,17 @@ def _canonicalize_resolved_models_map(mp: Dict[str, Any]) -> Dict[str, Any]: # Also keep the raw key if different, to be tolerant of non-canonical senders. if raw != canon: out[raw] = v + # For digest-based refs (e.g. "cozy:owner/repo@blake3:"), also add + # a tag-based alias (e.g. "cozy:owner/repo:latest") so that lookups by + # tag in model_ref_downloader will find the resolved entry. + try: + parsed = parse_model_ref(canon) + if parsed.scheme == "cozy" and parsed.cozy is not None and parsed.cozy.digest: + tag_canon = f"cozy:{parsed.cozy.owner}/{parsed.cozy.repo}:{parsed.cozy.tag}" + if tag_canon not in out: + out[tag_canon] = v + except Exception: + pass return out def _start_startup_prefetch(self, model_ids: List[str]) -> None: @@ -2973,12 +2984,12 @@ def worker() -> None: ).encode("utf-8") self._send_message( pb.WorkerSchedulerMessage( - worker_event=pb.WorkerEvent(request_id="", event_type="model.download.completed", payload_json=payload) + worker_event=pb.WorkerEvent(run_id="", event_type="model.download.completed", payload_json=payload) ) ) self._send_message( pb.WorkerSchedulerMessage( - worker_event=pb.WorkerEvent(request_id="", event_type="model.ready", payload_json=json.dumps({"model_id": canon}, separators=(",", ":"), sort_keys=True).encode("utf-8")) + worker_event=pb.WorkerEvent(run_id="", event_type="model.ready", payload_json=json.dumps({"model_id": canon}, separators=(",", ":"), sort_keys=True).encode("utf-8")) ) ) except Exception: @@ -2992,7 +3003,7 @@ def worker() -> None: ).encode("utf-8") self._send_message( pb.WorkerSchedulerMessage( - worker_event=pb.WorkerEvent(request_id="", event_type="model.cached", payload_json=payload) + worker_event=pb.WorkerEvent(run_id="", event_type="model.cached", payload_json=payload) ) ) except Exception: @@ -3006,7 +3017,7 @@ def worker() -> None: payload = json.dumps({"model_id": canon}, separators=(",", ":"), sort_keys=True).encode("utf-8") self._send_message( pb.WorkerSchedulerMessage( - worker_event=pb.WorkerEvent(request_id="", event_type="model.download.started", payload_json=payload) + worker_event=pb.WorkerEvent(run_id="", event_type="model.download.started", payload_json=payload) ) ) except Exception: @@ -3028,7 +3039,7 @@ def worker() -> None: payload = json.dumps({"model_id": canon}, separators=(",", ":"), sort_keys=True).encode("utf-8") self._send_message( pb.WorkerSchedulerMessage( - worker_event=pb.WorkerEvent(request_id="", event_type="model.ready", payload_json=payload) + worker_event=pb.WorkerEvent(run_id="", event_type="model.ready", payload_json=payload) ) ) except Exception: @@ -3043,7 +3054,7 @@ def worker() -> None: ).encode("utf-8") self._send_message( pb.WorkerSchedulerMessage( - worker_event=pb.WorkerEvent(request_id="", event_type="model.cached", payload_json=payload) + worker_event=pb.WorkerEvent(run_id="", event_type="model.cached", payload_json=payload) ) ) except Exception: @@ -3057,7 +3068,7 @@ def worker() -> None: ).encode("utf-8") self._send_message( pb.WorkerSchedulerMessage( - worker_event=pb.WorkerEvent(request_id="", event_type="model.download.completed", payload_json=payload) + worker_event=pb.WorkerEvent(run_id="", event_type="model.download.completed", payload_json=payload) ) ) except Exception: @@ -3078,7 +3089,7 @@ def worker() -> None: payload = json.dumps({"model_id": canon}, separators=(",", ":"), sort_keys=True).encode("utf-8") self._send_message( pb.WorkerSchedulerMessage( - worker_event=pb.WorkerEvent(request_id="", event_type="model.url_refresh", payload_json=payload) + worker_event=pb.WorkerEvent(run_id="", event_type="model.url_refresh", payload_json=payload) ) ) except Exception: @@ -3092,7 +3103,7 @@ def worker() -> None: ).encode("utf-8") self._send_message( pb.WorkerSchedulerMessage( - worker_event=pb.WorkerEvent(request_id="", event_type="model.download.failed", payload_json=payload) + worker_event=pb.WorkerEvent(run_id="", event_type="model.download.failed", payload_json=payload) ) ) except Exception: @@ -3195,7 +3206,7 @@ def _handle_load_model_cmd(self, cmd: LoadModelCommand) -> None: payload = json.dumps({"model_id": model_id}, separators=(",", ":"), sort_keys=True).encode("utf-8") self._send_message( pb.WorkerSchedulerMessage( - worker_event=pb.WorkerEvent(request_id="", event_type="model.load.started", payload_json=payload) + worker_event=pb.WorkerEvent(run_id="", event_type="model.load.started", payload_json=payload) ) ) except Exception: @@ -3213,10 +3224,19 @@ def _handle_load_model_cmd(self, cmd: LoadModelCommand) -> None: # Timeout for this wait, can be adjusted if not self._model_init_done_event.wait(timeout=300.0): # 5 minutes raise TimeoutError("Timeout waiting for model initialization before VRAM load.") - + logger.info(f"Model Memory Manager attempting to load '{model_id}' into VRAM...") - # load_model_into_vram is async - success = asyncio.run(self._model_manager.load_model_into_vram(model_id)) + # Set resolved cozy models context so downloads can use orchestrator-resolved URLs. + from .model_ref_downloader import reset_resolved_cozy_models_by_id, set_resolved_cozy_models_by_id + per_cmd = dict(getattr(cmd, "resolved_cozy_models_by_id", {}) or {}) + baseline = self._resolved_cozy_models_by_id_baseline or {} + merged = {**baseline, **per_cmd} if per_cmd else dict(baseline) + tok = set_resolved_cozy_models_by_id(merged or None) + try: + # load_model_into_vram is async + success = asyncio.run(self._model_manager.load_model_into_vram(model_id)) + finally: + reset_resolved_cozy_models_by_id(tok) if success: logger.info(f"Model '{model_id}' loaded to VRAM by Model Memory Manager.") else: error_msg = f"MMM.load_model_into_vram failed for '{model_id}'."; logger.error(error_msg) except Exception as e: @@ -3251,7 +3271,7 @@ def _handle_load_model_cmd(self, cmd: LoadModelCommand) -> None: ).encode("utf-8") self._send_message( pb.WorkerSchedulerMessage( - worker_event=pb.WorkerEvent(request_id="", event_type=ev_type, payload_json=payload) + worker_event=pb.WorkerEvent(run_id="", event_type=ev_type, payload_json=payload) ) ) except Exception: @@ -3276,7 +3296,7 @@ def _handle_unload_model_cmd(self, cmd: Any) -> None: ) self._send_message( pb.WorkerSchedulerMessage( - worker_event=pb.WorkerEvent(request_id="", event_type="model.unload.failed", payload_json=payload) + worker_event=pb.WorkerEvent(run_id="", event_type="model.unload.failed", payload_json=payload) ) ) except Exception: @@ -3287,7 +3307,7 @@ def _handle_unload_model_cmd(self, cmd: Any) -> None: payload = json.dumps({"model_id": model_id}, separators=(",", ":"), sort_keys=True).encode("utf-8") self._send_message( pb.WorkerSchedulerMessage( - worker_event=pb.WorkerEvent(request_id="", event_type="model.unload.started", payload_json=payload) + worker_event=pb.WorkerEvent(run_id="", event_type="model.unload.started", payload_json=payload) ) ) except Exception: @@ -3334,7 +3354,7 @@ def _handle_unload_model_cmd(self, cmd: Any) -> None: ).encode("utf-8") self._send_message( pb.WorkerSchedulerMessage( - worker_event=pb.WorkerEvent(request_id="", event_type=ev_type, payload_json=payload) + worker_event=pb.WorkerEvent(run_id="", event_type=ev_type, payload_json=payload) ) ) except Exception: @@ -3342,7 +3362,7 @@ def _handle_unload_model_cmd(self, cmd: Any) -> None: def _handle_run_request(self, request: TaskExecutionRequest) -> None: """Handle a task execution request from the scheduler.""" - request_id = request.request_id + request_id = request.run_id function_name = request.function_name input_payload = request.input_payload required_model_id_for_exec = "" @@ -4015,7 +4035,7 @@ def emit_delta(delta_obj: msgspec.Struct) -> None: if not emitted: self._send_message( pb.WorkerSchedulerMessage( - worker_event=pb.WorkerEvent(request_id=request_id, event_type="output.delta", payload_json=raw) + worker_event=pb.WorkerEvent(run_id=request_id, event_type="output.delta", payload_json=raw) ) ) last_item_id = item_id @@ -4061,7 +4081,7 @@ async def consume_async() -> None: if not emitted_done: self._send_message( pb.WorkerSchedulerMessage( - worker_event=pb.WorkerEvent(request_id=request_id, event_type="output.completed", payload_json=b"{}") + worker_event=pb.WorkerEvent(run_id=request_id, event_type="output.completed", payload_json=b"{}") ) ) output_payload = b"" @@ -4125,7 +4145,7 @@ async def consume_async() -> None: if not emitted_err: self._send_message( pb.WorkerSchedulerMessage( - worker_event=pb.WorkerEvent(request_id=request_id, event_type="output.error", payload_json=payload) + worker_event=pb.WorkerEvent(run_id=request_id, event_type="output.error", payload_json=payload) ) ) except Exception: @@ -4426,7 +4446,7 @@ def _resolve_injected_value(self, ctx: ActionContext, requested_type: Any, model ).encode("utf-8") self._send_message( pb.WorkerSchedulerMessage( - worker_event=pb.WorkerEvent(request_id="", event_type="model.cached", payload_json=payload) + worker_event=pb.WorkerEvent(run_id="", event_type="model.cached", payload_json=payload) ) ) except Exception: @@ -4845,7 +4865,7 @@ def _send_task_result( ) else: result = pb.TaskExecutionResult( - request_id=request_id, + run_id=request_id, success=success, output_payload=(output_payload or b'') if success else b'', # Default to b'' if None error_message=error_message if not success else "", diff --git a/tests/test_startup_model_prefetch.py b/tests/test_startup_model_prefetch.py index 48ab6f9..354be6d 100644 --- a/tests/test_startup_model_prefetch.py +++ b/tests/test_startup_model_prefetch.py @@ -57,7 +57,7 @@ def test_startup_prefetch_warms_disk_and_reports_disk_models(tmp_path: Path, mon sess = orch.get_session(timeout_s=30.0) assert sess is not None - cfg = pb.EndpointConfig( + cfg = pb.ReleaseArtifactConfig( supported_repo_refs=[variant_ref], required_variant_refs=[variant_ref], resolved_cozy_models_by_variant_ref={ @@ -74,7 +74,7 @@ def test_startup_prefetch_warms_disk_and_reports_disk_models(tmp_path: Path, mon ) }, ) - sess.send(pb.WorkerSchedulerMessage(endpoint_config=cfg)) + sess.send(pb.WorkerSchedulerMessage(release_artifact_config=cfg)) # The worker prefetch thread triggers an immediate registration update after caching. start = time.monotonic() diff --git a/tests/test_worker_jwt_rotation.py b/tests/test_worker_jwt_rotation.py index 85cbc44..d5252f9 100644 --- a/tests/test_worker_jwt_rotation.py +++ b/tests/test_worker_jwt_rotation.py @@ -74,7 +74,7 @@ def hello(ctx: ActionContext, payload: Input) -> Output: # Send rotation signal over the stream. Worker stores it for next reconnect. payload = json.dumps({"worker_jwt": "jwt-2"}, separators=(",", ":"), sort_keys=True).encode("utf-8") - sess.send(pb.WorkerSchedulerMessage(worker_event=pb.WorkerEvent(request_id="", event_type="worker.jwt.rotate", payload_json=payload))) + sess.send(pb.WorkerSchedulerMessage(worker_event=pb.WorkerEvent(run_id="", event_type="worker.jwt.rotate", payload_json=payload))) # Ensure the worker processed the rotation signal before we force a reconnect. start = time.monotonic()