diff --git a/app/src/ai/ambient_agents/spawn.rs b/app/src/ai/ambient_agents/spawn.rs index df47499ea..af0a61141 100644 --- a/app/src/ai/ambient_agents/spawn.rs +++ b/app/src/ai/ambient_agents/spawn.rs @@ -3,13 +3,14 @@ use std::{str::FromStr, sync::Arc, time::Duration}; -use futures::{select, FutureExt, Stream}; +use anyhow::anyhow; +use futures::{select, FutureExt, Stream, StreamExt}; use session_sharing_protocol::common::SessionId; use super::AmbientAgentTaskId; use super::{AmbientAgentTask, AmbientAgentTaskState}; use crate::{ - server::server_api::ai::{AIClient, SpawnAgentRequest, TaskStatusMessage}, + server::server_api::ai::{AIClient, RunFollowupRequest, SpawnAgentRequest, TaskStatusMessage}, terminal::shared_session, }; @@ -80,6 +81,13 @@ pub enum AmbientAgentEvent { AtCapacity, } +enum RunPollMode { + InitialRun, + Followup { + previous_session_id: Option, + }, +} + /// Spawns an ambient agent task and monitors its state. /// /// The stream completes when: @@ -113,16 +121,63 @@ pub fn spawn_task( yield Ok(AmbientAgentEvent::AtCapacity); } + let mut stream = Box::pin(poll_run_until_joinable_session( + task_id, + ai_client, + RunPollMode::InitialRun, + timeout, + )); + while let Some(event) = stream.next().await { + yield event; + } + } +} + +pub fn submit_run_followup( + message: String, + run_id: AmbientAgentTaskId, + previous_session_id: Option, + ai_client: Arc, + timeout: Option, +) -> impl Stream> { + async_stream::stream! { + let request = RunFollowupRequest { message }; + if let Err(err) = ai_client.submit_run_followup(&run_id, request).await { + yield Err(err); + return; + } + + let mut stream = Box::pin(poll_run_until_joinable_session( + run_id, + ai_client, + RunPollMode::Followup { + previous_session_id, + }, + timeout, + )); + while let Some(event) = stream.next().await { + yield event; + } + } +} + +fn poll_run_until_joinable_session( + run_id: AmbientAgentTaskId, + ai_client: Arc, + mode: RunPollMode, + timeout: Option, +) -> impl Stream> { + async_stream::stream! { // Poll for the task until it completes OR has session join info. // We use a timeout to ensure we don't wait indefinitely for session info. // If no timeout is provided, we use a future that never completes. - let mut timeout_timer = match timeout { + let mut timeout_timer = FutureExt::fuse(match timeout { Some(d) => warpui::r#async::Timer::after(d), None => warpui::r#async::Timer::never(), - }.fuse(); + }); let mut last_state = None; loop { - let mut poll_timer = warpui::r#async::Timer::after(TASK_STATUS_POLL_INTERVAL).fuse(); + let mut poll_timer = FutureExt::fuse(warpui::r#async::Timer::after(TASK_STATUS_POLL_INTERVAL)); select! { _ = timeout_timer => { @@ -130,9 +185,8 @@ pub fn spawn_task( return; } _ = poll_timer => { - match ai_client.get_ambient_agent_task(&task_id).await { + match ai_client.get_ambient_agent_task(&run_id).await { Ok(task) => { - // Only emit a state-change event if the state has changed. if last_state.as_ref() != Some(&task.state) { last_state = Some(task.state.clone()); yield Ok(AmbientAgentEvent::StateChanged { @@ -141,23 +195,47 @@ pub fn spawn_task( }); } - // Check if the task has completed or started sharing its session. if task.state.is_terminal() { - // Task completed, stream ends. + if matches!(&mode, RunPollMode::Followup { .. }) { + let message = task + .status_message + .as_ref() + .map(|msg| msg.message.clone()) + .unwrap_or_else(|| { + if task.state.is_failure_like() { + "Cloud agent failed".to_string() + } else { + "Cloud follow-up finished before a new session became available".to_string() + } + }); + yield Err(anyhow!(message)); + } return; } if task.state == AmbientAgentTaskState::InProgress { if let Some(session_join_info) = SessionJoinInfo::from_task(&task) { - yield Ok(AmbientAgentEvent::SessionStarted { - session_join_info, - }); - return; + let has_new_session = match &mode { + RunPollMode::InitialRun + | RunPollMode::Followup { + previous_session_id: None, + } => true, + RunPollMode::Followup { + previous_session_id: Some(previous_session_id), + } => session_join_info + .session_id + .as_ref() + .is_some_and(|session_id| session_id != previous_session_id), + }; + if has_new_session { + yield Ok(AmbientAgentEvent::SessionStarted { + session_join_info, + }); + return; + } } - // Continue polling. } else { - log::info!("Agent {task_id} state: {:?}", task.state); - // Continue polling. + log::info!("Agent {run_id} state: {:?}", task.state); } } Err(err) => { diff --git a/app/src/ai/ambient_agents/spawn_tests.rs b/app/src/ai/ambient_agents/spawn_tests.rs index 0da003c5c..d42083af8 100644 --- a/app/src/ai/ambient_agents/spawn_tests.rs +++ b/app/src/ai/ambient_agents/spawn_tests.rs @@ -8,10 +8,10 @@ use session_sharing_protocol::common::SessionId; use crate::ai::agent::UserQueryMode; use crate::ai::ambient_agents::{AmbientAgentTask, AmbientAgentTaskState}; -use crate::server::server_api::ai::{MockAIClient, SpawnAgentResponse}; +use crate::server::server_api::ai::{MockAIClient, SpawnAgentResponse, TaskStatusMessage}; use crate::terminal::shared_session; -use super::{spawn_task, AmbientAgentEvent, SessionJoinInfo}; +use super::{spawn_task, submit_run_followup, AmbientAgentEvent, SessionJoinInfo}; fn task_with( state: AmbientAgentTaskState, @@ -42,6 +42,274 @@ fn task_with( } } +#[tokio::test] +async fn followup_submits_before_polling_and_ignores_previous_session_id() { + use futures::StreamExt; + + let previous_session_id = SessionId::new(); + let new_session_id = SessionId::new(); + let submitted = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let call_count = Arc::new(AtomicUsize::new(0)); + let mut mock = MockAIClient::new(); + + mock.expect_submit_run_followup().times(1).returning({ + let submitted = submitted.clone(); + move |observed_run_id, request| { + assert_eq!(observed_run_id.to_string(), run_id().to_string()); + assert_eq!(request.message, "continue from here"); + submitted.store(true, Ordering::SeqCst); + Ok(()) + } + }); + + mock.expect_get_ambient_agent_task().returning({ + let submitted = submitted.clone(); + let call_count = call_count.clone(); + move |observed_run_id| { + assert!(submitted.load(Ordering::SeqCst)); + assert_eq!(observed_run_id.to_string(), run_id().to_string()); + let idx = call_count.fetch_add(1, Ordering::SeqCst); + let (session_id, session_link) = if idx == 0 { + ( + previous_session_id.to_string(), + "https://example.com/session/previous".to_string(), + ) + } else { + ( + new_session_id.to_string(), + "https://example.com/session/new".to_string(), + ) + }; + + Ok(task_with( + AmbientAgentTaskState::InProgress, + Some(session_id), + Some(session_link), + )) + } + }); + + let ai_client = Arc::new(mock); + let mut stream = Box::pin(submit_run_followup( + "continue from here".to_string(), + run_id(), + Some(previous_session_id), + ai_client, + None, + )); + + let event = stream + .next() + .await + .expect("expected state changed") + .expect("expected ok"); + assert!(matches!( + event, + AmbientAgentEvent::StateChanged { + state: AmbientAgentTaskState::InProgress, + .. + } + )); + + let event = stream + .next() + .await + .expect("expected session started") + .expect("expected ok"); + let AmbientAgentEvent::SessionStarted { session_join_info } = event else { + panic!("Expected SessionStarted event"); + }; + assert_eq!(session_join_info.session_id, Some(new_session_id)); + assert!(stream.next().await.is_none()); +} + +#[tokio::test] +async fn followup_api_error_does_not_poll() { + use futures::StreamExt; + + let mut mock = MockAIClient::new(); + mock.expect_submit_run_followup() + .times(1) + .returning(|_, _| Err(anyhow::anyhow!("follow-up rejected"))); + mock.expect_get_ambient_agent_task().times(0); + + let ai_client = Arc::new(mock); + let mut stream = Box::pin(submit_run_followup( + "continue".to_string(), + run_id(), + Some(SessionId::new()), + ai_client, + None, + )); + + let err = stream + .next() + .await + .expect("expected error") + .expect_err("expected follow-up error"); + assert_eq!(err.to_string(), "follow-up rejected"); + assert!(stream.next().await.is_none()); +} + +#[tokio::test] +async fn followup_terminal_failure_surfaces_status_message() { + use futures::StreamExt; + + let mut mock = MockAIClient::new(); + mock.expect_submit_run_followup() + .times(1) + .returning(|_, _| Ok(())); + mock.expect_get_ambient_agent_task() + .times(1) + .returning(|_| { + let mut task = task_with(AmbientAgentTaskState::Error, None, None); + task.status_message = Some(TaskStatusMessage { + message: "failed to provision runtime".to_string(), + }); + Ok(task) + }); + + let ai_client = Arc::new(mock); + let mut stream = Box::pin(submit_run_followup( + "continue".to_string(), + run_id(), + Some(SessionId::new()), + ai_client, + None, + )); + + let event = stream + .next() + .await + .expect("expected state changed") + .expect("expected ok"); + assert!(matches!( + event, + AmbientAgentEvent::StateChanged { + state: AmbientAgentTaskState::Error, + .. + } + )); + + let err = stream + .next() + .await + .expect("expected terminal error") + .expect_err("expected terminal error"); + assert_eq!(err.to_string(), "failed to provision runtime"); + assert!(stream.next().await.is_none()); +} + +#[tokio::test] +async fn followup_without_previous_session_id_accepts_joinable_session() { + use futures::StreamExt; + + let session_id = SessionId::new(); + let expected_session_id = session_id; + let mut mock = MockAIClient::new(); + + mock.expect_submit_run_followup() + .times(1) + .returning(|_, _| Ok(())); + mock.expect_get_ambient_agent_task() + .times(1) + .returning(move |_| { + Ok(task_with( + AmbientAgentTaskState::InProgress, + Some(session_id.to_string()), + Some("https://example.com/session/joinable".to_string()), + )) + }); + + let ai_client = Arc::new(mock); + let mut stream = Box::pin(submit_run_followup( + "continue".to_string(), + run_id(), + None, + ai_client, + None, + )); + + let event = stream + .next() + .await + .expect("expected state changed") + .expect("expected ok"); + assert!(matches!( + event, + AmbientAgentEvent::StateChanged { + state: AmbientAgentTaskState::InProgress, + .. + } + )); + + let event = stream + .next() + .await + .expect("expected session started") + .expect("expected ok"); + let AmbientAgentEvent::SessionStarted { session_join_info } = event else { + panic!("Expected SessionStarted event"); + }; + assert_eq!(session_join_info.session_id, Some(expected_session_id)); + assert_eq!( + session_join_info.session_link, + "https://example.com/session/joinable" + ); + assert!(stream.next().await.is_none()); +} + +#[tokio::test] +async fn followup_without_previous_session_id_errors_if_run_finishes_before_session() { + use futures::StreamExt; + + let mut mock = MockAIClient::new(); + + mock.expect_submit_run_followup() + .times(1) + .returning(|_, _| Ok(())); + mock.expect_get_ambient_agent_task() + .times(1) + .returning(|_| Ok(task_with(AmbientAgentTaskState::Succeeded, None, None))); + + let ai_client = Arc::new(mock); + let mut stream = Box::pin(submit_run_followup( + "continue".to_string(), + run_id(), + None, + ai_client, + None, + )); + + let event = stream + .next() + .await + .expect("expected state changed") + .expect("expected ok"); + assert!(matches!( + event, + AmbientAgentEvent::StateChanged { + state: AmbientAgentTaskState::Succeeded, + .. + } + )); + + let err = stream + .next() + .await + .expect("expected terminal error") + .expect_err("expected terminal error"); + assert_eq!( + err.to_string(), + "Cloud follow-up finished before a new session became available" + ); + assert!(stream.next().await.is_none()); +} + +fn run_id() -> crate::ai::ambient_agents::AmbientAgentTaskId { + "550e8400-e29b-41d4-a716-446655440000".parse().unwrap() +} + #[tokio::test] async fn poll_stops_on_terminal_failure_like_state() { use futures::StreamExt; diff --git a/app/src/terminal/shared_session/viewer/terminal_manager.rs b/app/src/terminal/shared_session/viewer/terminal_manager.rs index 54cf74389..ab0271ffc 100644 --- a/app/src/terminal/shared_session/viewer/terminal_manager.rs +++ b/app/src/terminal/shared_session/viewer/terminal_manager.rs @@ -1559,7 +1559,22 @@ impl TerminalManager { model .lock() .clear_write_to_pty_events_for_shared_session_tx(); - *current_network.lock() = None; + if FeatureFlag::HandoffCloudCloud.is_enabled() { + terminal_view.update(ctx, |terminal_view, ctx| { + if let Some(ambient_agent_view_model) = + terminal_view.ambient_agent_view_model().cloned() + { + ambient_agent_view_model.update(ctx, |model, _| { + model.record_ambient_execution_ended(ended_session_id); + }); + } + }); + } + if Self::current_network(current_network) + .is_some_and(|network| network.as_ref(ctx).session_id() == ended_session_id) + { + *current_network.lock() = None; + } true } } diff --git a/app/src/terminal/view/ambient_agent/mod.rs b/app/src/terminal/view/ambient_agent/mod.rs index 6d97d361d..1b02cf7e9 100644 --- a/app/src/terminal/view/ambient_agent/mod.rs +++ b/app/src/terminal/view/ambient_agent/mod.rs @@ -90,6 +90,7 @@ pub fn create_cloud_mode_view( AmbientAgentViewModelEvent::EnteredSetupState | AmbientAgentViewModelEvent::EnteredComposingState | AmbientAgentViewModelEvent::DispatchedAgent + | AmbientAgentViewModelEvent::FollowupDispatched | AmbientAgentViewModelEvent::ProgressUpdated | AmbientAgentViewModelEvent::EnvironmentSelected | AmbientAgentViewModelEvent::Failed { .. } diff --git a/app/src/terminal/view/ambient_agent/model.rs b/app/src/terminal/view/ambient_agent/model.rs index 89a891811..302d8318a 100644 --- a/app/src/terminal/view/ambient_agent/model.rs +++ b/app/src/terminal/view/ambient_agent/model.rs @@ -10,7 +10,7 @@ use warpui::{Entity, EntityId, ModelContext, SingletonEntity}; use crate::ai::active_agent_views_model::ActiveAgentViewsModel; use crate::ai::agent::{conversation::AIConversationId, extract_user_query_mode}; -use crate::ai::ambient_agents::spawn::{spawn_task, AmbientAgentEvent}; +use crate::ai::ambient_agents::spawn::{spawn_task, submit_run_followup, AmbientAgentEvent}; use crate::ai::ambient_agents::task::HarnessConfig; use crate::ai::ambient_agents::telemetry::CloudAgentTelemetryEvent; use crate::ai::ambient_agents::AmbientAgentTaskId; @@ -45,6 +45,24 @@ pub struct AgentProgress { pub stopped_at: Option, } +impl AgentProgress { + fn new() -> Self { + Self { + spawned_at: Instant::now(), + claimed_at: None, + harness_started_at: None, + stopped_at: None, + } + } +} + +/// Identifies what kind of session startup the model is currently waiting on. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SessionStartupKind { + InitialRun, + Followup, +} + /// Status of the ambient agent run. #[derive(Debug, Clone)] pub enum Status { @@ -53,7 +71,10 @@ pub enum Status { /// The user is composing their ambient agent prompt. Composing, /// Waiting for the ambient agent run to be ready. - WaitingForSession { progress: AgentProgress }, + WaitingForSession { + progress: AgentProgress, + kind: SessionStartupKind, + }, /// The agent is running and the session is ready. AgentRunning, /// The agent failed. @@ -109,6 +130,9 @@ pub struct AmbientAgentViewModel { /// Used to transition the cloud-mode setup UI out of the pre-first-exchange phase when /// there is no oz `AppendedExchange` to key off of. harness_command_started: bool, + + active_execution_session_id: Option, + last_ended_execution_session_id: Option, } impl AmbientAgentViewModel { @@ -140,6 +164,8 @@ impl AmbientAgentViewModel { harness: Harness::default(), has_inserted_cloud_mode_user_query_block: false, harness_command_started: false, + active_execution_session_id: None, + last_ended_execution_session_id: None, } } @@ -215,7 +241,7 @@ impl AmbientAgentViewModel { /// Returns `None` if not in the `WaitingForSession`, `Failed`, `NeedsGithubAuth`, or `Cancelled` state. pub fn agent_progress(&self) -> Option<&AgentProgress> { match &self.status { - Status::WaitingForSession { progress } + Status::WaitingForSession { progress, .. } | Status::Failed { progress, .. } | Status::NeedsGithubAuth { progress, .. } | Status::Cancelled { progress } => Some(progress), @@ -430,10 +456,50 @@ impl AmbientAgentViewModel { /// terminal manager to append that session's scrollback to the existing transcript. pub fn attach_followup_session(&mut self, session_id: SessionId, ctx: &mut ModelContext) { self.stop_progress_timer(); + self.active_execution_session_id = Some(session_id); + self.last_ended_execution_session_id = None; self.status = Status::AgentRunning; ctx.emit(AmbientAgentViewModelEvent::FollowupSessionReady { session_id }); } + pub fn record_ambient_execution_ended(&mut self, session_id: SessionId) { + if self.active_execution_session_id.as_ref() == Some(&session_id) { + self.active_execution_session_id = None; + } + self.last_ended_execution_session_id = Some(session_id); + } + + pub fn submit_cloud_followup(&mut self, prompt: String, ctx: &mut ModelContext) { + if !FeatureFlag::HandoffCloudCloud.is_enabled() { + log::warn!("Attempted to submit cloud follow-up while HandoffCloudCloud is disabled"); + return; + } + + let Some(task_id) = self.task_id else { + log::warn!("Attempted to submit cloud follow-up without an ambient task ID"); + return; + }; + + let previous_session_id = self + .active_execution_session_id + .or(self.last_ended_execution_session_id); + let ai_client = ServerApiProvider::as_ref(ctx).get_ai_client(); + let stream = submit_run_followup(prompt, task_id, previous_session_id, ai_client, None); + + self.status = Status::WaitingForSession { + progress: AgentProgress::new(), + kind: SessionStartupKind::Followup, + }; + self.start_progress_timer(ctx); + ctx.emit(AmbientAgentViewModelEvent::FollowupDispatched); + + ctx.spawn_stream_local( + stream, + |me, event_result, ctx| me.handle_ambient_agent_event_result(event_result, ctx), + |_me, _ctx| {}, + ); + } + pub fn status(&self) -> &Status { &self.status } @@ -446,6 +512,8 @@ impl AmbientAgentViewModel { self.conversation_id = None; self.has_inserted_cloud_mode_user_query_block = false; self.harness_command_started = false; + self.active_execution_session_id = None; + self.last_ended_execution_session_id = None; self.stop_progress_timer(); ctx.notify(); } @@ -543,206 +611,226 @@ impl AmbientAgentViewModel { ctx.spawn_stream_local( stream, - |me, event_result, ctx| { - // If we're in Cancelled or Failed state, ignore most events from the stream - // except for TaskSpawned (which we need to handle for early cancellation). - let ignore_events = matches!(me.status, Status::Cancelled { .. } | Status::Failed { .. }); - - match event_result { - Ok(event) => match event { - AmbientAgentEvent::TaskSpawned { task_id, run_id } => { - // Store the task ID for later use (e.g., populating details panel) - me.task_id = Some(task_id); - - // If we already transitioned to Cancelled state (because user cancelled - // before we received the task_id), send the cancellation to the server now. - if matches!(me.status, Status::Cancelled { .. }) { - log::info!( - "Received task_id after cancellation, sending server cancellation for task {}", - task_id - ); - let ai_client = ServerApiProvider::as_ref(ctx).get_ai_client(); - ctx.spawn( - async move { - if let Err(e) = ai_client.cancel_ambient_agent_task(&task_id).await { - log::error!("Failed to cancel ambient agent task {}: {:?}", task_id, e); - } - }, - |_, _, _| {}, - ); - return; - } + |me, event_result, ctx| me.handle_ambient_agent_event_result(event_result, ctx), + |_me, _ctx| {}, + ); - // Wire the run_id to the associated conversation for - // orchestration v2. This unblocks the parent agent's - // pending start_agent tool call. - if let Some(conversation_id) = me.conversation_id { - let terminal_view_id = me.terminal_view_id; - let spawned_task_id = Some(task_id); - BlocklistAIHistoryModel::handle(ctx).update( - ctx, - |history, ctx| { - history.assign_run_id_for_conversation( - conversation_id, - run_id, - spawned_task_id, - terminal_view_id, - ctx, - ); - }, - ); - } + self.status = Status::WaitingForSession { + progress: AgentProgress::new(), + kind: SessionStartupKind::InitialRun, + }; + self.start_progress_timer(ctx); + ctx.emit(AmbientAgentViewModelEvent::DispatchedAgent); + } - // Mark this task as active immediately so it renders under the Active section - // (and doesn't briefly appear under Past before the shared session join completes). - ActiveAgentViewsModel::handle(ctx).update(ctx, |model, ctx| { - model.register_ambient_session(me.terminal_view_id, task_id, ctx); - }); + fn handle_ambient_agent_event_result( + &mut self, + event_result: Result, + ctx: &mut ModelContext, + ) { + let ignore_events = matches!( + self.status, + Status::Cancelled { .. } | Status::Failed { .. } + ); - // Emit event so terminal view knows to show the info button - ctx.emit(AmbientAgentViewModelEvent::ProgressUpdated); - } - AmbientAgentEvent::StateChanged { - state, - status_message, - } => { - // Ignore state changes if we're already in a terminal state - if ignore_events { - return; - } + match event_result { + Ok(event) => self.handle_ambient_agent_event(event, ignore_events, ctx), + Err(err) => { + if ignore_events { + return; + } + self.handle_ambient_agent_stream_error(err, ctx); + } + } + } - if let Status::WaitingForSession { progress } = &mut me.status { - match state { - AmbientAgentTaskState::Cancelled => { - me.handle_cancellation(ctx); - } - AmbientAgentTaskState::Queued | AmbientAgentTaskState::Pending => { - // Clear later states in case the agent failed to start and was retried. - progress.claimed_at = None; - progress.harness_started_at = None; - ctx.emit(AmbientAgentViewModelEvent::ProgressUpdated); - } - AmbientAgentTaskState::Claimed => { - if progress.claimed_at.is_none() { - progress.claimed_at = Some(Instant::now()); - progress.harness_started_at = None; - ctx.emit(AmbientAgentViewModelEvent::ProgressUpdated); - } - } - AmbientAgentTaskState::InProgress => { - if progress.harness_started_at.is_none() { - progress.harness_started_at = Some(Instant::now()); - ctx.emit(AmbientAgentViewModelEvent::ProgressUpdated); - } - } - AmbientAgentTaskState::Succeeded => {} - AmbientAgentTaskState::Failed - | AmbientAgentTaskState::Error - | AmbientAgentTaskState::Blocked - | AmbientAgentTaskState::Unknown => { - let error = status_message - .map(|msg| msg.message) - .unwrap_or_else(|| "Cloud agent failed".to_string()); - me.handle_spawn_error(error, ctx); - } + fn handle_ambient_agent_event( + &mut self, + event: AmbientAgentEvent, + ignore_events: bool, + ctx: &mut ModelContext, + ) { + match event { + AmbientAgentEvent::TaskSpawned { task_id, run_id } => { + self.task_id = Some(task_id); + + if matches!(self.status, Status::Cancelled { .. }) { + log::info!( + "Received task_id after cancellation, sending server cancellation for task {}", + task_id + ); + let ai_client = ServerApiProvider::as_ref(ctx).get_ai_client(); + ctx.spawn( + async move { + if let Err(e) = ai_client.cancel_ambient_agent_task(&task_id).await { + log::error!( + "Failed to cancel ambient agent task {}: {:?}", + task_id, + e + ); } + }, + |_, _, _| {}, + ); + return; + } + + if let Some(conversation_id) = self.conversation_id { + let terminal_view_id = self.terminal_view_id; + let spawned_task_id = Some(task_id); + BlocklistAIHistoryModel::handle(ctx).update(ctx, |history, ctx| { + history.assign_run_id_for_conversation( + conversation_id, + run_id, + spawned_task_id, + terminal_view_id, + ctx, + ); + }); + } + + ActiveAgentViewsModel::handle(ctx).update(ctx, |model, ctx| { + model.register_ambient_session(self.terminal_view_id, task_id, ctx); + }); + + ctx.emit(AmbientAgentViewModelEvent::ProgressUpdated); + } + AmbientAgentEvent::StateChanged { + state, + status_message, + } => { + if ignore_events { + return; + } + + if let Status::WaitingForSession { progress, .. } = &mut self.status { + match state { + AmbientAgentTaskState::Cancelled => { + self.handle_cancellation(ctx); } - } - AmbientAgentEvent::SessionStarted { session_join_info } => { - // Ignore session started if we're already in a terminal state - if ignore_events { - return; + AmbientAgentTaskState::Queued | AmbientAgentTaskState::Pending => { + progress.claimed_at = None; + progress.harness_started_at = None; + ctx.emit(AmbientAgentViewModelEvent::ProgressUpdated); } - - if let Some(session_id) = session_join_info.session_id { - me.stop_progress_timer(); - let event = if matches!(me.status, Status::AgentRunning) { - AmbientAgentViewModelEvent::FollowupSessionReady { session_id } - } else { - AmbientAgentViewModelEvent::SessionReady { session_id } - }; - me.status = Status::AgentRunning; - ctx.emit(event); + AmbientAgentTaskState::Claimed => { + if progress.claimed_at.is_none() { + progress.claimed_at = Some(Instant::now()); + progress.harness_started_at = None; + ctx.emit(AmbientAgentViewModelEvent::ProgressUpdated); + } } - } - AmbientAgentEvent::AtCapacity => { - if ignore_events { - return; + AmbientAgentTaskState::InProgress => { + if progress.harness_started_at.is_none() { + progress.harness_started_at = Some(Instant::now()); + ctx.emit(AmbientAgentViewModelEvent::ProgressUpdated); + } } - - if matches!(me.status, Status::WaitingForSession { .. }) { - ctx.emit(AmbientAgentViewModelEvent::ShowCloudAgentCapacityModal); + AmbientAgentTaskState::Succeeded => {} + AmbientAgentTaskState::Failed + | AmbientAgentTaskState::Error + | AmbientAgentTaskState::Blocked + | AmbientAgentTaskState::Unknown => { + let error = status_message + .map(|msg| msg.message) + .unwrap_or_else(|| "Cloud agent failed".to_string()); + self.handle_spawn_error(error, ctx); } } - AmbientAgentEvent::TimedOut => {} - }, - Err(err) => { - // Ignore errors if we're already in a terminal state - if ignore_events { - return; - } - let error_message = err.to_string(); - send_telemetry_from_ctx!( - CloudAgentTelemetryEvent::DispatchFailed { - error: error_message.clone() - }, - ctx - ); + } + } + AmbientAgentEvent::SessionStarted { session_join_info } => { + if ignore_events { + return; + } - // Check if this is a ClientError with an auth_url - use crate::server::server_api::ClientError; - if let Some(client_error) = err.downcast_ref::() { - if let Some(auth_url) = &client_error.auth_url { - me.handle_needs_github_auth( - auth_url.clone(), - client_error.error.clone(), - ctx, - ); - return; + if let Some(session_id) = session_join_info.session_id { + self.stop_progress_timer(); + let event_session_id = session_id; + let event = match &self.status { + Status::WaitingForSession { + kind: SessionStartupKind::InitialRun, + .. + } => AmbientAgentViewModelEvent::SessionReady { + session_id: event_session_id, + }, + Status::WaitingForSession { + kind: SessionStartupKind::Followup, + .. } - } - if let Some(capacity_error) = err.downcast_ref::() { - me.handle_spawn_error(capacity_error.error.clone(), ctx); - ctx.emit(AmbientAgentViewModelEvent::ShowCloudAgentCapacityModal); - return; - } - if let Some(ai_api_error) = err.downcast_ref::() { - match ai_api_error { - AIApiError::QuotaLimit => { - me.handle_spawn_error( - OUT_OF_CREDITS_TASK_FAILURE_MESSAGE.to_string(), - ctx, - ); - ctx.emit(AmbientAgentViewModelEvent::ShowAICreditModal); - return; + | Status::AgentRunning => { + AmbientAgentViewModelEvent::FollowupSessionReady { + session_id: event_session_id, } - AIApiError::ServerOverloaded => { - me.handle_spawn_error( - SERVER_OVERLOADED_TASK_FAILURE_MESSAGE.to_string(), - ctx, - ); - return; - } - _ => {} } - } - me.handle_spawn_error(error_message, ctx); + Status::Setup + | Status::Composing + | Status::Failed { .. } + | Status::NeedsGithubAuth { .. } + | Status::Cancelled { .. } => return, + }; + self.active_execution_session_id = Some(session_id); + self.last_ended_execution_session_id = None; + self.status = Status::AgentRunning; + ctx.emit(event); } } + AmbientAgentEvent::AtCapacity => { + if ignore_events { + return; + } + + if matches!(self.status, Status::WaitingForSession { .. }) { + ctx.emit(AmbientAgentViewModelEvent::ShowCloudAgentCapacityModal); + } + } + AmbientAgentEvent::TimedOut => {} + } + } + + fn handle_ambient_agent_stream_error( + &mut self, + err: anyhow::Error, + ctx: &mut ModelContext, + ) { + let error_message = err.to_string(); + send_telemetry_from_ctx!( + CloudAgentTelemetryEvent::DispatchFailed { + error: error_message.clone() }, - |_me, _ctx| {}, + ctx ); - self.status = Status::WaitingForSession { - progress: AgentProgress { - spawned_at: Instant::now(), - claimed_at: None, - harness_started_at: None, - stopped_at: None, - }, - }; - self.start_progress_timer(ctx); - ctx.emit(AmbientAgentViewModelEvent::DispatchedAgent); + use crate::server::server_api::ClientError; + if let Some(client_error) = err.downcast_ref::() { + if let Some(auth_url) = &client_error.auth_url { + self.handle_needs_github_auth(auth_url.clone(), client_error.error.clone(), ctx); + return; + } + } + if let Some(capacity_error) = err.downcast_ref::() { + self.handle_spawn_error(capacity_error.error.clone(), ctx); + ctx.emit(AmbientAgentViewModelEvent::ShowCloudAgentCapacityModal); + return; + } + if let Some(ai_api_error) = err.downcast_ref::() { + match ai_api_error { + AIApiError::QuotaLimit => { + self.handle_spawn_error(OUT_OF_CREDITS_TASK_FAILURE_MESSAGE.to_string(), ctx); + ctx.emit(AmbientAgentViewModelEvent::ShowAICreditModal); + return; + } + AIApiError::ServerOverloaded => { + self.handle_spawn_error( + SERVER_OVERLOADED_TASK_FAILURE_MESSAGE.to_string(), + ctx, + ); + return; + } + _ => {} + } + } + self.handle_spawn_error(error_message, ctx); } /// Starts the periodic timer that updates the progress UI while waiting for a session. @@ -783,7 +871,7 @@ impl AmbientAgentViewModel { let now = Instant::now(); // Extract or create progress tracking. - let progress = if let Status::WaitingForSession { mut progress } = + let progress = if let Status::WaitingForSession { mut progress, .. } = std::mem::replace(&mut self.status, Status::Composing) { progress.stopped_at = Some(now); @@ -817,7 +905,7 @@ impl AmbientAgentViewModel { let now = Instant::now(); // Extract or create progress tracking. - let progress = if let Status::WaitingForSession { mut progress } = + let progress = if let Status::WaitingForSession { mut progress, .. } = std::mem::replace(&mut self.status, Status::Composing) { progress.stopped_at = Some(now); @@ -848,7 +936,7 @@ impl AmbientAgentViewModel { let now = Instant::now(); // Extract or create progress tracking. - let progress = if let Status::WaitingForSession { mut progress } = + let progress = if let Status::WaitingForSession { mut progress, .. } = std::mem::replace(&mut self.status, Status::Composing) { progress.stopped_at = Some(now); @@ -909,6 +997,8 @@ pub enum AmbientAgentViewModelEvent { EnteredComposingState, /// The ambient agent run has been dispatched. DispatchedAgent, + /// A follow-up execution has been submitted and is waiting for a new session. + FollowupDispatched, /// The spawn progress has been updated (e.g., task claimed or in-progress). ProgressUpdated, /// The ambient agent has started sharing its session. diff --git a/app/src/terminal/view/ambient_agent/view_impl.rs b/app/src/terminal/view/ambient_agent/view_impl.rs index 2b55eecdc..a2593b6d7 100644 --- a/app/src/terminal/view/ambient_agent/view_impl.rs +++ b/app/src/terminal/view/ambient_agent/view_impl.rs @@ -192,6 +192,14 @@ impl TerminalView { // Re-render to show loading state. ctx.notify(); } + AmbientAgentViewModelEvent::FollowupDispatched => { + self.update_active_ambient_agent_conversation_status( + ConversationStatus::InProgress, + None, + ctx, + ); + ctx.notify(); + } AmbientAgentViewModelEvent::SessionReady { .. } | AmbientAgentViewModelEvent::FollowupSessionReady { .. } => { // Auto-open details panel for local cloud mode once the session is ready. diff --git a/specs/APP-4319/handoff-cloud-cloud-pr2/TECH.md b/specs/APP-4319/handoff-cloud-cloud-pr2/TECH.md new file mode 100644 index 000000000..0e3b2f28a --- /dev/null +++ b/specs/APP-4319/handoff-cloud-cloud-pr2/TECH.md @@ -0,0 +1,69 @@ +# Cloud-to-cloud handoff PR 2 tech spec +## Problem statement +PR 2 should add the orchestration layer that turns an existing cloud agent run into a follow-up execution and hands the resulting fresh shared session to the hotswap path. This PR should remain mergeable while `HandoffCloudCloud` is disabled by default, and it should not add the user-visible tombstone Continue button or terminal-input submission route yet. +The intended boundary is model/API behavior: a future UI can call one ambient-model method with a follow-up prompt, the client submits `POST agent/runs/{runId}/followups`, polls the same run until a new joinable session appears, ignores the ended session, and emits `FollowupSessionReady` so the existing viewer manager attaches the new session in append mode. +## Current state +PR 1 added the disabled `HandoffCloudCloud` flag and encoded the Cargo feature dependency on `cloud_mode_setup_v2` in `app/Cargo.toml:924`. It also added `RunFollowupRequest`, `AIClient::submit_run_followup`, and `build_run_followup_url` in `app/src/server/server_api/ai.rs (211-216, 837-840, 1467-1471)`, with endpoint/serialization tests in `app/src/server/server_api/ai_test.rs (988-1000)`. +`AmbientAgentTask` now has `run_id()`, `conversation_id()`, and `active_run_execution()` accessors that project the current flattened response fields into a `RunExecution` view in `app/src/ai/ambient_agents/task.rs (247-279)`. `SessionJoinInfo::from_task` already consumes that projection in `app/src/ai/ambient_agents/spawn.rs (31-57)`. +Initial cloud startup is still a single combined helper: `spawn_task` creates a run, polls `get_ambient_agent_task`, emits state changes, and ends when the first session is joinable in `app/src/ai/ambient_agents/spawn.rs (85-176)`. There is no reusable “poll an existing run until a new execution session is ready” helper yet. +`AmbientAgentViewModel` still models startup as `Status::WaitingForSession { progress }` without distinguishing an initial run from a follow-up execution in `app/src/terminal/view/ambient_agent/model.rs (50-64)`. The existing `SessionStarted` handler infers follow-up readiness from being already in `AgentRunning`, which is too implicit for a model-driven follow-up flow in `app/src/terminal/view/ambient_agent/model.rs (627-638)`. The current `attach_followup_session` method simply emits `FollowupSessionReady` for a known session ID, which is useful test scaffolding but does not submit or poll a follow-up in `app/src/terminal/view/ambient_agent/model.rs:349`. +The hotswap receiver already exists. `create_cloud_mode_view` routes `SessionReady` to `connect_to_session` and `FollowupSessionReady` to `attach_followup_session` in `app/src/terminal/view/ambient_agent/mod.rs (69-82)`. The viewer manager’s follow-up attach path replaces the active network and joins with append-mode scrollback in `app/src/terminal/shared_session/viewer/terminal_manager.rs (338-384)`. +The UI has important side effects tied to initial dispatch. `DispatchedAgent` inserts the initial optimistic user query in `TerminalView::handle_ambient_agent_event` and drives the ambient entry-block insertion subscription in `app/src/terminal/view/ambient_agent/view_impl.rs (105-131, 445-481)`. PR 2 should avoid reusing that event for follow-ups, because doing so would blur initial-run and follow-up behavior before the UX PR. +## Goals +Add reusable follow-up orchestration that submits a prompt to an existing run and waits for a fresh active execution session. +Make the ambient view model explicitly track whether it is waiting for an initial session or a follow-up session. +Track the active or previous execution session ID so follow-up polling can ignore stale readiness from the ended session. +Emit the already-supported `FollowupSessionReady` event when the new session is ready, allowing the existing hotswap path to attach it. +Reuse existing Cloud Mode setup/loading/error state machinery for follow-up waiting and failures, but without adding a visible Continue entrypoint. +Keep the implementation behind `FeatureFlag::HandoffCloudCloud` and preserve behavior with the flag off. +## Non-goals +No tombstone Continue button, action, or copy changes. +No terminal input routing changes for submitting follow-up prompts. +No embedded follow-up prompt editor in the tombstone. +No product decision on tombstone stacking or update-in-place behavior. +No first-class server execution-array parsing unless the public API response shape already exposes it in this branch. +No rollout enablement for `HandoffCloudCloud`. +## Proposed changes +### Reusable run polling and follow-up helper +Refactor `spawn_task` in `app/src/ai/ambient_agents/spawn.rs` so run creation and run readiness monitoring are separate. Keep the public `spawn_task(request, ai_client, timeout)` behavior the same by having it call a new internal polling helper after `spawn_agent` succeeds. +Add a helper such as `poll_run_until_joinable_session(run_id, ai_client, previous_session_id, timeout)` that repeatedly calls `get_ambient_agent_task(&run_id)`, emits `StateChanged` when state changes, and returns `SessionStarted` only when the task is `InProgress` and `SessionJoinInfo::from_task` contains a parseable `session_id` that differs from `previous_session_id` when one was provided. +Add a follow-up stream/helper such as `submit_run_followup(prompt, run_id, previous_session_id, ai_client, timeout)`. It should call `AIClient::submit_run_followup(run_id, RunFollowupRequest { message: prompt })` first, then call the polling helper. API failure before acceptance should yield an error without polling. Polling errors should surface through the same error path as initial spawn. +For initial spawn, preserve the existing tolerance for a session link without a parsed session ID if any caller still needs that metadata. For follow-up readiness, require a parsed session ID because the hotswap API needs a `SessionId`. +Terminal states before a fresh session is found should not leave the follow-up wait indefinitely. Failure-like states should emit the state change and then surface the task status message as an error; successful terminal completion without a new session should complete with a clear “no follow-up session became available” error. +### Explicit ambient model startup kind +Add a small enum such as `SessionStartupKind { InitialRun, Followup }` and change `Status::WaitingForSession` to carry `{ progress, kind }`. Existing accessors like `agent_progress()` and `is_waiting_for_session()` should remain behavior-preserving. +Add fields to `AmbientAgentViewModel` for follow-up bookkeeping: the active execution `SessionId`, the last ended execution `SessionId` if available, and the currently submitted follow-up prompt. The prompt field is for PR 3’s optimistic rendering; PR 2 should store it but not insert a visible follow-up query block. +Update initial spawn to set `WaitingForSession { kind: InitialRun }`. Update `AmbientAgentEvent::SessionStarted` handling to emit `SessionReady` for `InitialRun` and `FollowupSessionReady` for `Followup`, rather than relying on whether the current status happens to be `AgentRunning`. +Add `AmbientAgentViewModel::submit_cloud_followup(prompt, ctx)`. It should require `FeatureFlag::HandoffCloudCloud`, require an existing `task_id`/run ID, capture the previous active or ended session ID, set `WaitingForSession { kind: Followup }`, start the progress timer, store the pending prompt, emit a distinct follow-up dispatch event, and spawn the follow-up helper stream. +On follow-up success, stop the timer, set `status` to `AgentRunning`, update the active execution session ID, clear the pending prompt, and emit `FollowupSessionReady { session_id }`. On failure, reuse the existing failure/auth/quota/capacity mapping logic as much as possible so follow-up setup errors render through the same state as initial setup errors. +### Execution-ended bookkeeping without visible UI +Extend the ambient session-ended path only enough for bookkeeping. `viewer::TerminalManager::ambient_session_ended` currently leaves the pane resumable and clears the active network in `app/src/terminal/shared_session/viewer/terminal_manager.rs (1490-1515)`. In PR 2 it can notify the ambient view model of the ended session ID behind `HandoffCloudCloud`, so the model records `last_ended_execution_session_id` and can reject duplicate readiness from that session. +This notification should not call `TerminalView::on_session_share_ended`, should not insert a tombstone, should not set `SharedSessionStatus::FinishedViewer`, and should not cancel the local conversation. Those UI and lifecycle decisions remain PR 3 scope. +### Event and view integration +Add a new model event such as `FollowupDispatched` instead of reusing `DispatchedAgent`. `create_cloud_mode_view` only needs an exhaustive-match update for the new event because `FollowupSessionReady` is already wired to `attach_followup_session`. +Update `TerminalView::handle_ambient_agent_event` to handle `FollowupDispatched` by notifying/re-rendering progress UI and marking the active ambient conversation as `ConversationStatus::InProgress` if one exists. It should not insert `CloudModeInitialUserQuery`, should not insert a second `AmbientAgentEntryBlock`, and should not auto-open new UI beyond the existing setup/progress rendering. +The existing loading screen in `app/src/terminal/view/ambient_agent/view_impl.rs (529-571)` can continue to derive messages from `AgentProgress` for PR 2. If copy changes are desired for follow-ups, keep them minimal and keyed off `SessionStartupKind`, but deferring user-facing copy to PR 3 is acceptable. +## Testing strategy +Add stream-level tests in `app/src/ai/ambient_agents/spawn_tests.rs` covering follow-up submission and polling. The important cases are: the helper calls `submit_run_followup` before polling; it ignores the previous session ID returned by the server; it emits `SessionStarted` for a different new session ID; it propagates API errors before polling; it surfaces terminal failure before readiness. +Preserve existing `spawn_task` tests so initial spawn behavior remains unchanged after the refactor. +Add model-level tests if there is a lightweight existing harness for `AmbientAgentViewModel`; otherwise keep model changes small and validate via stream tests plus targeted compile checks. Model assertions should cover `submit_cloud_followup` preconditions, `WaitingForSession { kind: Followup }`, and `FollowupSessionReady` emission on a fresh session. +Run targeted validation after implementation: `cargo nextest run -p warp ai::ambient_agents::spawn::tests server::server_api::ai::tests::build_run_followup_url_routes_to_run_followups server::server_api::ai::tests::serialize_run_followup_request` and `cargo check -p warp --features handoff_cloud_cloud`. If model or terminal-view tests are added, include their module filters. Do not use `cargo fmt --all` or file-specific `cargo fmt`; use the repo’s standard formatting command only when preparing a PR update. +## Rollout and compatibility +With `HandoffCloudCloud` off, no production UI should call the new follow-up method and existing initial Cloud Mode startup should behave as it does today. +With the flag on, PR 2 only exposes an internal/model-level follow-up path. The absence of a visible entrypoint makes this safe to merge before product UX lands, while unit tests can still exercise the orchestration path. +The runtime code may assume `CloudModeSetupV2` when `HandoffCloudCloud` is enabled because the Cargo feature dependency was added in PR 1. +## Risks and mitigations +The server may briefly return the ended execution’s session fields after accepting a follow-up. Mitigate by passing the previous session ID into the polling helper and requiring a different parsed session ID before emitting readiness. +Reusing `DispatchedAgent` for follow-ups would insert initial-run UI artifacts again. Mitigate with a distinct follow-up event and explicit startup kind. +Refactoring `spawn_task` could regress initial Cloud Mode startup. Mitigate by preserving the public stream contract and keeping existing spawn tests green. +A follow-up may be accepted but fail before any session becomes joinable. Mitigate by reusing the existing failed/auth/quota/capacity UI states and leaving future UI free to retry from the tombstone in PR 3. +Model bookkeeping could drift if session-ended notifications are missed. Mitigate by also falling back to the last active execution session ID when submitting a follow-up. +## Parallelization +This PR is small enough to implement sequentially, but two independent tracks could run in parallel if needed. One track can refactor and test `spawn.rs` follow-up polling with mocked `AIClient`; the other can wire `AmbientAgentViewModel` state/events and terminal-manager bookkeeping. They converge at `submit_cloud_followup` consuming the follow-up helper and emitting `FollowupSessionReady`. +## Definition of done +`spawn_task` still behaves the same for initial runs after extracting reusable polling. +A follow-up helper submits a prompt, polls the stable run, ignores stale session IDs, and returns a fresh joinable session. +`AmbientAgentViewModel::submit_cloud_followup` exists behind `HandoffCloudCloud` and drives `WaitingForSession { kind: Followup }` through success and error states. +`FollowupSessionReady` is emitted for fresh sessions and continues to attach through the existing hotswap path. +No tombstone Continue UI or terminal-input follow-up route is added in this PR. +Targeted tests and `cargo check -p warp --features handoff_cloud_cloud` pass.