|
| 1 | +# Proc meshes & ProcMeshAgent |
| 2 | + |
| 3 | +## What the `ProcMeshAgent` Is |
| 4 | + |
| 5 | +Every proc in a mesh runs a `ProcMeshAgent`. It plays the same role on the proc side that the `HostMeshAgent` plays on the host side: it implements the control-plane interface for "managing this proc as part of a mesh". |
| 6 | + |
| 7 | +The agent has several responsibilities, all of which will be documented on this page: |
| 8 | +- wiring the proc into the mesh router, |
| 9 | +- handling resource-style requests (`CreateOrUpdate`, `Stop`, `GetState`, `GetRankStatus`, ...), |
| 10 | +- forwarding or recording supervision events, |
| 11 | +- tracking the lifecycle of actors created on the proc, |
| 12 | +- and supporting both the legacy v0 and the current v1 spawn APIs. |
| 13 | + |
| 14 | +This chapter begins with the **v1 "resource-style" spawn path**:: how a request of the form |
| 15 | +```rust |
| 16 | +CreateOrUpdate<ActorSpec> |
| 17 | +``` |
| 18 | +results in an actual actor being constructed inside the proc using the `Remote` registry. |
| 19 | + |
| 20 | +To anchor that discussion, here is the essential shape of the agent: |
| 21 | +```rust |
| 22 | +pub struct ProcMeshAgent { |
| 23 | + proc: Proc, // local actor runtime |
| 24 | + remote: Remote, // registry of SpawnableActor entries (built from RemoteSpawn + remote!(...)) |
| 25 | + state: State, // v0/v1 bootstrapping mode |
| 26 | + actor_states: HashMap<Name, ActorInstanceState>, // per-actor spawn results & metadata |
| 27 | + record_supervision_events: bool, |
| 28 | + supervision_events: HashMap<ActorId, Vec<ActorSupervisionEvent>>, |
| 29 | +} |
| 30 | +``` |
| 31 | +- **`proc: Proc`** The proc-local runtime into which new actors will be installed |
| 32 | +- **`remote: Remote`** A snapshot of the process-local registry of `SpawnableActor` entries (populated from `RemoteSpawn` impls via `remote!(A)`). This is the bridge between *global type names* and the actual constructors used by `Remote::gspawn`. |
| 33 | +- **`actor_states`** The agent's bookkeeping: for each actor name in the mesh, what happened when this proc tried to spawn it. |
| 34 | + |
| 35 | +The sections that follow walk the spawn flow end-to-end. Additional responsibilities (status, supervision, teardown) will be documented after the spawn discussion. |
| 36 | + |
| 37 | +## The V1 Spawn Flow |
| 38 | + |
| 39 | +At a high level, the v1 path for creating an actor on every proc looks like this: |
| 40 | +```text |
| 41 | +ProcMeshRef ──(CreateOrUpdate<ActorSpec>)──▶ ProcMeshAgent mesh |
| 42 | + ProcMeshAgent ──(Remote::gspawn)──▶ Proc / Remote registry |
| 43 | +``` |
| 44 | +("`ProcMeshRef` turns `spawn::<A>` into a broadcast `CreateOrUpdate<ActorSpec>` to the `ProcMeshAgent` mesh; each `ProcMeshAgent` then calls `Remote::gspawn` into its local `Proc` using the `Remote` registry.") |
| 45 | + |
| 46 | +From the caller's point of view it starts as: |
| 47 | +```rust |
| 48 | +proc_mesh.spawn::<A>(cx, "name", ¶ms).await |
| 49 | +``` |
| 50 | +which is just a thin wrapper over: |
| 51 | +```rust |
| 52 | +proc_mesh |
| 53 | + .spawn_with_name::<A>(cx, Name::new("name"), ¶ms) |
| 54 | + .await |
| 55 | +``` |
| 56 | +The rest of this section unpacks what that call actually does. |
| 57 | + |
| 58 | +### From `spawn` to `ActorSpec` |
| 59 | + |
| 60 | +The real work happens in `spawn_with_name_inner`: |
| 61 | +```rust |
| 62 | +impl ProcMeshRef { |
| 63 | + async fn spawn_with_name_inner<A: Actor + Referable>( |
| 64 | + &self, |
| 65 | + cx: &impl context::Actor, |
| 66 | + name: Name, |
| 67 | + params: &A::Params, |
| 68 | + ) -> v1::Result<ActorMesh<A>> |
| 69 | + where |
| 70 | + A::Params: RemoteMessage, |
| 71 | + { |
| 72 | + let remote = Remote::collect(); |
| 73 | + // `RemoteSpawn` + `remote!(A)` ensure that `A` has a |
| 74 | + // `SpawnableActor` entry in this registry, so |
| 75 | + // `name_of::<A>()` can resolve its global type name. |
| 76 | + let actor_type = remote |
| 77 | + .name_of::<A>() |
| 78 | + .ok_or(Error::ActorTypeNotRegistered(type_name::<A>().to_string()))? |
| 79 | + .to_string(); |
| 80 | + |
| 81 | + let serialized_params = bincode::serialize(params)?; |
| 82 | + let agent_mesh = self.agent_mesh(); |
| 83 | + |
| 84 | + agent_mesh.cast( |
| 85 | + cx, |
| 86 | + resource::CreateOrUpdate::<mesh_agent::ActorSpec> { |
| 87 | + name: name.clone(), |
| 88 | + rank: Default::default(), |
| 89 | + spec: mesh_agent::ActorSpec { |
| 90 | + actor_type: actor_type.clone(), |
| 91 | + params_data: serialized_params.clone(), |
| 92 | + }, |
| 93 | + }, |
| 94 | + )?; |
| 95 | + |
| 96 | + // ... wait on GetRankStatus and build ActorMesh<A> ... |
| 97 | + } |
| 98 | +} |
| 99 | +``` |
| 100 | +What this does, step by step: |
| 101 | +1. **Resolve the Rust type `A` to a global type name** |
| 102 | + ```rust |
| 103 | + let remote = Remote::collect(); |
| 104 | + let actor_type = remote |
| 105 | + .name_of::<A>() |
| 106 | + .ok_or(Error::ActorTypeNotRegistered(type_name::<A>().to_string()))? |
| 107 | + .to_string(); |
| 108 | + ``` |
| 109 | + This is the point where the *type-level* contract kicks in: |
| 110 | + - elsewhere, the user has written `remote!(MyActor)` for each `A: RemoteSpawn`, |
| 111 | + - that registration adds a `SpawnableActor` entry to the `Remote` registry, |
| 112 | + - `Remote::name_of::<A>()` looks up that entry and reads its **global type name**. |
| 113 | + |
| 114 | + If `A` was never registered with `remote!(A)`, this call fails with `ActorTypeNotRegistered`, and the spawn never leaves the caller's process. |
| 115 | + |
| 116 | +2. **Serialize the spawn parameters** |
| 117 | + ```rust |
| 118 | + let serialized_params = bincode::serialize(params)?; |
| 119 | + ``` |
| 120 | + Spawn parameters travel as opaque bytes. The API only enforces that `A::Params: RemoteMessage`, meaning the caller’s side can serialize them. On the remote side there is no trait bound — the generated `RemoteSpawn::gspawn` simply attempts to deserialize the incoming byte payload into `A::Params` and will return an error if it cannot. |
| 121 | + |
| 122 | +3. **Broadcast a resource-style `CreateOrUpdate<ActorSpec>`** |
| 123 | + ```rust |
| 124 | + let agent_mesh = self.agent_mesh(); |
| 125 | + |
| 126 | + agent_mesh.cast( |
| 127 | + cx, |
| 128 | + resource::CreateOrUpdate::<mesh_agent::ActorSpec> { |
| 129 | + name: name.clone(), |
| 130 | + rank: Default::default(), |
| 131 | + spec: mesh_agent::ActorSpec { |
| 132 | + actor_type: actor_type.clone(), |
| 133 | + params_data: serialized_params.clone(), |
| 134 | + }, |
| 135 | + }, |
| 136 | + )?; |
| 137 | + ``` |
| 138 | + This is where the proc mesh turns a local method call into a **distributed control-plane request**: |
| 139 | + - `agent_mesh` is an `ActorMeshRef<ProcMeshAgent>` – one `ProcMeshAgent` per proc, |
| 140 | + - `cast` sends the same `CreateOrUpdate<ActorSpec>` to **every** `ProcMeshAgent`, |
| 141 | + - the `name` field is the *mesh-level* actor name ("this actor, on this mesh"), |
| 142 | + - `actor_type` is the *global* type name resolved via `Remote`, |
| 143 | + - `params_data` is the serialized `A::Params`. |
| 144 | + |
| 145 | +At this point the proc mesh has done its part: it has told every proc in the mesh: "For mesh actor name, please ensure you have one local actor of type `actor_type`, constructed from `params_data`." |
| 146 | + |
| 147 | +### How `ProcMeshAgent` handles `CreateOrUpdate<ActorSpec>` |
| 148 | + |
| 149 | +Once the `ProcMeshRef` has broadcast a `CreateOrUpdate<ActorSpec>` to every proc, each proc's `ProcMeshAgent` receives that message and attempts to construct the actor locally. |
| 150 | + |
| 151 | +The entry point is: |
| 152 | +```rust |
| 153 | +#[async_trait] |
| 154 | +impl Handler<resource::CreateOrUpdate<ActorSpec>> for ProcMeshAgent { |
| 155 | + async fn handle( |
| 156 | + &mut self, |
| 157 | + _cx: &Context<Self>, |
| 158 | + create_or_update: resource::CreateOrUpdate<ActorSpec>, |
| 159 | + ) -> anyhow::Result<()> { |
| 160 | + ... |
| 161 | + } |
| 162 | +} |
| 163 | +``` |
| 164 | + |
| 165 | +This handler performs four steps: |
| 166 | + |
| 167 | +--- |
| 168 | + |
| 169 | +1. Idempotence: only the first `CreateOrUpdate` matters |
| 170 | + |
| 171 | +```rust |
| 172 | +if self.actor_states.contains_key(&create_or_update.name) { |
| 173 | + // There is no update. |
| 174 | + return Ok(()); |
| 175 | +} |
| 176 | +``` |
| 177 | + |
| 178 | +The `CreateOrUpdate` resource verb supports "update" in principle, but actor meshes never update an existing actor by name. They only create a fresh actor mesh. |
| 179 | + |
| 180 | +So the agent simply ignores subsequent requests for the same name. |
| 181 | + |
| 182 | +--- |
| 183 | + |
| 184 | +2. Safety check: reject spawn if the proc has supervision errors |
| 185 | + |
| 186 | +```rust |
| 187 | +if !self.supervision_events.is_empty() { |
| 188 | + self.actor_states.insert( |
| 189 | + create_or_update.name.clone(), |
| 190 | + ActorInstanceState { |
| 191 | + spawn: Err(anyhow::anyhow!( |
| 192 | + "Cannot spawn new actors on mesh with supervision events" |
| 193 | + )), |
| 194 | + create_rank, |
| 195 | + stopped: false, |
| 196 | + }, |
| 197 | + ); |
| 198 | + return Ok(()); |
| 199 | +} |
| 200 | +``` |
| 201 | + |
| 202 | +If this proc previously recorded **any** supervision events for **any** actor, the proc is considered "poisoned": it may be in a bad state, and spawning new actors would be unsafe. |
| 203 | + |
| 204 | +The agent records the failure in `actor_states` and stops. |
| 205 | + |
| 206 | +Later, when the `ProcMesh::spawn_with_name_inner` calls `GetRankStatus::wait` to aggregate per-rank results, this proc will contribute a `Failed` status for that actor name instead of ever reporting it as `Running`. |
| 207 | + |
| 208 | +--- |
| 209 | + |
| 210 | +3. Unpack `ActorSpec` and call `remote.gspawn` |
| 211 | + |
| 212 | +```rust |
| 213 | +let ActorSpec { |
| 214 | + actor_type, |
| 215 | + params_data, |
| 216 | +} = create_or_update.spec; |
| 217 | + |
| 218 | +self.actor_states.insert( |
| 219 | + create_or_update.name.clone(), |
| 220 | + ActorInstanceState { |
| 221 | + create_rank, |
| 222 | + spawn: self |
| 223 | + .remote |
| 224 | + .gspawn( |
| 225 | + &self.proc, |
| 226 | + &actor_type, |
| 227 | + &create_or_update.name.to_string(), |
| 228 | + params_data, |
| 229 | + ) |
| 230 | + .await, |
| 231 | + stopped: false, |
| 232 | + }, |
| 233 | +); |
| 234 | +``` |
| 235 | +This is the core of v1 spawning. The agent: |
| 236 | +- unpacks the `ActorSpec` (type name + parameter bytes), and |
| 237 | +- passes those pieces into `remote.gspawn(...)` to construct the local actor. |
| 238 | +- `actor_type: String` – the logical type name registered by `remote!(A)`, computed in `ProcMeshRef::spawn_with_name_inner` via `remote.name_of::<A>()`, and used by `Remote::gspawn` on each proc to find the right constructor. |
| 239 | +- `params_data: Data` A raw byte buffer containing serialized `A::Params` (via `bincode::serialize`). |
| 240 | +- `self.remote.gspawn(...)` This method looks up the `SpawnableActor` entry for `actor_type` in the local `Remote` registry then invoks: |
| 241 | +```rust |
| 242 | +SpawnableActor::spawn(proc, name, params_data) |
| 243 | +``` |
| 244 | +Internally this calls the actor's `RemoteSpawn::new(params).await` construtor registers it under the given name in the proc's runtime, and returns an `ActorId`. |
| 245 | + |
| 246 | +The result -- success or failure -- is recorded in: |
| 247 | +```rust |
| 248 | +ActorInstanceState { |
| 249 | + create_rank, // this proc's rank in the mesh |
| 250 | + spawn: Result<ActorId, anyhow::Error>, |
| 251 | + stopped: false, |
| 252 | +} |
| 253 | +``` |
| 254 | +The `actor_states` map is later queried by `GetRankStatus` and `GetState`. |
| 255 | + |
| 256 | +--- |
| 257 | + |
| 258 | +4. Return success locally (no direct reply) |
| 259 | + |
| 260 | +Once the agent has updated `actor_states`, the handler simply returns: |
| 261 | +```rust |
| 262 | +Ok(()) |
| 263 | +``` |
| 264 | +There is no **direct reply** back to the caller for `CreateOrUpdate<ActorSpec>`. |
| 265 | + |
| 266 | +From the agent's point of view, the work for the message is: |
| 267 | +- decide whether to attempt a spawn (idempotence + supervision gate), |
| 268 | +- call `remote.gspawn(...)` into the local `Proc`, |
| 269 | +- record the outcome in `actor_states[name]` as `ActorInstanceState`. |
| 270 | + |
| 271 | +That's it. The handler does **not** try to decide whether the *mesh-level* spawn "succeeded" or "failed" - it just persists the per-proc result. |
| 272 | + |
| 273 | +Those per-proc results are later *read* by the resource query handlers (`Handler<GetRankStatus>`, `Handler<GetState<ActorState>>` on `ProcMeshAgent`). |
| 274 | + |
| 275 | +## Completing the Spawn: How `GetRankStatus` Decides Success |
| 276 | + |
| 277 | +Once every `ProcMeshAgent` has received the `CreateOrUpdate<ActorSpec>` message and updated its local `actor_states`, the caller still does not know: |
| 278 | +- **Did every proc spawn the actor successfully?** |
| 279 | +- **Did any proc report a supervision failure?** |
| 280 | +- **Are all actors running, or did one terminate immediately?** |
| 281 | + |
| 282 | +To answer these questions, the `ProcMeshRef` performs a *second* distributed query using the resource verb: |
| 283 | +```rust |
| 284 | +resource::GetRankStatus{ name, reply } |
| 285 | +``` |
| 286 | +This message is broadcast to the same `ProcMeshAgent` mesh. Each agent replies with a small "overlay" describing *its* result for that actor name: |
| 287 | +- no entry yet -> `NotExist` |
| 288 | +- spawn failed -> `Failed(error)` |
| 289 | +- spawned and running -> `Running` |
| 290 | +- terminated -> `Stopped`/`Failed`, |
| 291 | +- supervision events present -> `Failed`. |
| 292 | + |
| 293 | +The reply port used to collect all `GetRankStatus` responses is opened via: |
| 294 | +```rust |
| 295 | +let (port, rx) = cx.mailbox().open_accum_port_opts( |
| 296 | + StatusMesh::from_single(region.clone(), Status::NotExist), |
| 297 | + Some(ReducerOpts { max_update_interval: Some(Duration::from_millis(50)) }), |
| 298 | +); |
| 299 | +``` |
| 300 | +Here, `cx` is the callers context. In tests this is typically `testing::instance()`, a tiny driver actor (`Instance<>()`), so the accumulation port (`port`/`rx`)-and thus all collected replies-live in that test instance's mailbox. |
| 301 | + |
| 302 | +An accumulation port is just a mailbox port that keeps a running aggregate value. Each `GetRankStatus` reply is an overlay, and the mailbox's reducer merges those overlays into a single `StatusMesh`, with one final status per proc/rank. |
0 commit comments