Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 94 additions & 16 deletions app/src/ai/ambient_agents/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@

use std::{str::FromStr, sync::Arc, time::Duration};

use futures::{select, FutureExt, Stream};
use anyhow::anyhow;
use futures::{select, FutureExt, Stream, StreamExt};
use session_sharing_protocol::common::SessionId;

use super::AmbientAgentTaskId;
use super::{AmbientAgentTask, AmbientAgentTaskState};
use crate::{
server::server_api::ai::{AIClient, SpawnAgentRequest, TaskStatusMessage},
server::server_api::ai::{AIClient, RunFollowupRequest, SpawnAgentRequest, TaskStatusMessage},
terminal::shared_session,
};

Expand Down Expand Up @@ -80,6 +81,13 @@ pub enum AmbientAgentEvent {
AtCapacity,
}

enum RunPollMode {
InitialRun,
Followup {
previous_session_id: Option<SessionId>,
},
}

/// Spawns an ambient agent task and monitors its state.
///
/// The stream completes when:
Expand Down Expand Up @@ -113,26 +121,72 @@ pub fn spawn_task(
yield Ok(AmbientAgentEvent::AtCapacity);
}

let mut stream = Box::pin(poll_run_until_joinable_session(
task_id,
ai_client,
RunPollMode::InitialRun,
timeout,
));
while let Some(event) = stream.next().await {
yield event;
}
}
}

pub fn submit_run_followup(
message: String,
run_id: AmbientAgentTaskId,
previous_session_id: Option<SessionId>,
ai_client: Arc<dyn AIClient>,
timeout: Option<Duration>,
) -> impl Stream<Item = Result<AmbientAgentEvent, anyhow::Error>> {
async_stream::stream! {
let request = RunFollowupRequest { message };
if let Err(err) = ai_client.submit_run_followup(&run_id, request).await {
yield Err(err);
return;
}

let mut stream = Box::pin(poll_run_until_joinable_session(
run_id,
ai_client,
RunPollMode::Followup {
previous_session_id,
},
timeout,
));
while let Some(event) = stream.next().await {
yield event;
}
}
}

fn poll_run_until_joinable_session(
run_id: AmbientAgentTaskId,
ai_client: Arc<dyn AIClient>,
mode: RunPollMode,
timeout: Option<Duration>,
) -> impl Stream<Item = Result<AmbientAgentEvent, anyhow::Error>> {
async_stream::stream! {
// Poll for the task until it completes OR has session join info.
// We use a timeout to ensure we don't wait indefinitely for session info.
// If no timeout is provided, we use a future that never completes.
let mut timeout_timer = match timeout {
let mut timeout_timer = FutureExt::fuse(match timeout {
Some(d) => warpui::r#async::Timer::after(d),
None => warpui::r#async::Timer::never(),
}.fuse();
});
let mut last_state = None;
loop {
let mut poll_timer = warpui::r#async::Timer::after(TASK_STATUS_POLL_INTERVAL).fuse();
let mut poll_timer = FutureExt::fuse(warpui::r#async::Timer::after(TASK_STATUS_POLL_INTERVAL));

select! {
_ = timeout_timer => {
yield Ok(AmbientAgentEvent::TimedOut);
return;
}
_ = poll_timer => {
match ai_client.get_ambient_agent_task(&task_id).await {
match ai_client.get_ambient_agent_task(&run_id).await {
Ok(task) => {
// Only emit a state-change event if the state has changed.
if last_state.as_ref() != Some(&task.state) {
last_state = Some(task.state.clone());
yield Ok(AmbientAgentEvent::StateChanged {
Expand All @@ -141,23 +195,47 @@ pub fn spawn_task(
});
}

// Check if the task has completed or started sharing its session.
if task.state.is_terminal() {
// Task completed, stream ends.
if matches!(&mode, RunPollMode::Followup { .. }) {
let message = task
.status_message
.as_ref()
.map(|msg| msg.message.clone())
.unwrap_or_else(|| {
if task.state.is_failure_like() {
"Cloud agent failed".to_string()
} else {
"Cloud follow-up finished before a new session became available".to_string()
}
});
yield Err(anyhow!(message));
}
return;
}

if task.state == AmbientAgentTaskState::InProgress {
if let Some(session_join_info) = SessionJoinInfo::from_task(&task) {
yield Ok(AmbientAgentEvent::SessionStarted {
session_join_info,
});
return;
let has_new_session = match &mode {
RunPollMode::InitialRun
| RunPollMode::Followup {
previous_session_id: None,
} => true,
RunPollMode::Followup {
previous_session_id: Some(previous_session_id),
} => session_join_info
.session_id
.as_ref()
.is_some_and(|session_id| session_id != previous_session_id),
};
Comment on lines +218 to +229
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ [IMPORTANT] For follow-ups with no previous session ID, this accepts link-only join info; handle_ambient_agent_event ignores SessionStarted without a SessionId, so the follow-up stream ends while the UI stays in WaitingForSession. Require a parsed session ID for all follow-up readiness.

Suggested change
let has_new_session = match &mode {
RunPollMode::InitialRun
| RunPollMode::Followup {
previous_session_id: None,
} => true,
RunPollMode::Followup {
previous_session_id: Some(previous_session_id),
} => session_join_info
.session_id
.as_ref()
.is_some_and(|session_id| session_id != previous_session_id),
};
let has_new_session = match &mode {
RunPollMode::InitialRun => true,
RunPollMode::Followup {
previous_session_id: None,
} => session_join_info.session_id.is_some(),
RunPollMode::Followup {
previous_session_id: Some(previous_session_id),
} => session_join_info
.session_id
.as_ref()
.is_some_and(|session_id| session_id != previous_session_id),
};

if has_new_session {
yield Ok(AmbientAgentEvent::SessionStarted {
session_join_info,
});
return;
}
}
// Continue polling.
} else {
log::info!("Agent {task_id} state: {:?}", task.state);
// Continue polling.
log::info!("Agent {run_id} state: {:?}", task.state);
}
}
Err(err) => {
Expand Down
Loading
Loading