diff --git a/app/src/terminal/model/blocks.rs b/app/src/terminal/model/blocks.rs index a55862282..874f238c0 100644 --- a/app/src/terminal/model/blocks.rs +++ b/app/src/terminal/model/blocks.rs @@ -722,6 +722,47 @@ impl BlockList { ); } + pub(super) fn append_followup_shared_session_scrollback( + &mut self, + scrollback: &[SerializedBlock], + ) { + self.set_bootstrapped(); + let mut processor = Processor::new(); + + let Some((active_block, completed_blocks)) = scrollback.split_last() else { + return; + }; + + for block in completed_blocks { + if self.block_index_for_id(&block.id).is_some() { + continue; + } + if block.start_ts.is_some() && block.completed_ts.is_some() { + self.finish_active_block_before_followup_append(); + self.restore_block(block, BootstrapStage::PostBootstrapPrecmd, &mut processor); + } else { + log::warn!("A non-active follow-up scrollback block was either not started or not completed"); + } + } + + if self.block_index_for_id(&active_block.id).is_none() { + debug_assert!(active_block.completed_ts.is_none()); + self.finish_active_block_before_followup_append(); + self.restore_block( + active_block, + BootstrapStage::PostBootstrapPrecmd, + &mut processor, + ); + } + } + + fn finish_active_block_before_followup_append(&mut self) { + if !self.active_block().finished() { + self.active_block_mut().finish(0); + self.update_active_block_height(); + } + } + /// This is an important function in the block list lifecycle. After this /// is called, there's an invariant where we always have an active block /// that's hidden until it's `start`ed. diff --git a/app/src/terminal/model/terminal_model.rs b/app/src/terminal/model/terminal_model.rs index 875663393..433299c05 100644 --- a/app/src/terminal/model/terminal_model.rs +++ b/app/src/terminal/model/terminal_model.rs @@ -1461,23 +1461,25 @@ impl TerminalModel { // TODO: we should be doing this in the constructor of the // terminal model for the viewers so that we're guaranteed that // loading scrollback is the first thing that we do. - pub fn load_shared_session_scrollback( - &mut self, - scrollback: &[SerializedBlock], - is_alt_screen_active: bool, - ) { + pub fn load_shared_session_scrollback(&mut self, scrollback: &[SerializedBlock]) { debug_assert!(self.shared_session_status().is_viewer()); self.block_list_mut() .load_shared_session_scrollback(scrollback); - if is_alt_screen_active { - self.enter_alt_screen(true); - } // The scrollback contains the prompt for the active block, and the terminal view needs to be notified to render it. self.event_proxy.send_wakeup_event(); } + pub fn append_followup_shared_session_scrollback(&mut self, scrollback: &[SerializedBlock]) { + debug_assert!(self.shared_session_status().is_viewer()); + + self.block_list_mut() + .append_followup_shared_session_scrollback(scrollback); + + self.event_proxy.send_wakeup_event(); + } + pub fn obfuscate_secrets(&self) -> ObfuscateSecrets { self.obfuscate_secrets } @@ -2016,7 +2018,7 @@ impl TerminalModel { /// /// If the alternate screen is already active, this will not re-initialize /// it. - fn enter_alt_screen(&mut self, save_cursor_and_clear_screen: bool) { + pub(crate) fn enter_alt_screen(&mut self, save_cursor_and_clear_screen: bool) { if self.alt_screen_active { log::info!("Tried to enter the alternate screen, but it was already active"); return; diff --git a/app/src/terminal/shared_session/mod_test.rs b/app/src/terminal/shared_session/mod_test.rs index 260371af9..f00d7d997 100644 --- a/app/src/terminal/shared_session/mod_test.rs +++ b/app/src/terminal/shared_session/mod_test.rs @@ -244,7 +244,7 @@ fn test_loading_scrollback() { ]; let channel_event_proxy = ChannelEventListener::new_for_test(); let mut model = terminal_model_for_viewer(channel_event_proxy); - model.load_shared_session_scrollback(scrollback_blocks, false); + model.load_shared_session_scrollback(scrollback_blocks); // 4 blocks: first is the bootstrap block, the next two are completed scrollback blocks. // The last is the active block, whose prompt came from the last scrollback. @@ -309,7 +309,8 @@ fn test_loading_scrollback_in_alt_screen() { ]; let channel_event_proxy = ChannelEventListener::new_for_test(); let mut model = terminal_model_for_viewer(channel_event_proxy); - model.load_shared_session_scrollback(scrollback_blocks, true); + model.load_shared_session_scrollback(scrollback_blocks); + model.enter_alt_screen(true); // 3 blocks: first is the bootstrap block, the second is the completed scrollback blocks. // The last is the active block, whose prompt came from the last scrollback. diff --git a/app/src/terminal/shared_session/selections_test.rs b/app/src/terminal/shared_session/selections_test.rs index ed86bdb80..b7ceb75e8 100644 --- a/app/src/terminal/shared_session/selections_test.rs +++ b/app/src/terminal/shared_session/selections_test.rs @@ -43,13 +43,10 @@ fn create_sharer_and_viewer_models_with_same_block( let mut viewer_model = terminal_model_for_viewer(channel_event_proxy); let block = sharer_model.block_list().last_non_hidden_block().unwrap(); let serialized_block = SerializedBlock::from(block); - viewer_model.load_shared_session_scrollback( - &[ - serialized_block, - SerializedBlock::new_active_block_for_test(), - ], - false, - ); + viewer_model.load_shared_session_scrollback(&[ + serialized_block, + SerializedBlock::new_active_block_for_test(), + ]); assert_eq!( viewer_model diff --git a/app/src/terminal/shared_session/viewer/event_loop.rs b/app/src/terminal/shared_session/viewer/event_loop.rs index c4863b507..cfc09d79f 100644 --- a/app/src/terminal/shared_session/viewer/event_loop.rs +++ b/app/src/terminal/shared_session/viewer/event_loop.rs @@ -24,6 +24,16 @@ use std::collections::HashMap; /// could indicate an issue. const TOO_MANY_BUFFERED_EVENTS: usize = 50; +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum SharedSessionInitialLoadMode { + /// Replace the viewer's placeholder block list with the scrollback snapshot from the session + /// being joined. + ReplaceFromSessionScrollback, + /// Add only the new blocks from a follow-up session while preserving the existing shared + /// ambient-agent transcript. + AppendFollowupScrollback, +} + /// The event loop is used to process a stream of events /// originating from the sender. pub struct EventLoop { @@ -67,13 +77,26 @@ impl EventLoop { window_size: WindowSize, scrollback: Scrollback, catching_up_to_event_no: Option, + load_mode: SharedSessionInitialLoadMode, ctx: &mut ModelContext, ) -> Self { let scrollback_blocks = decode_scrollback(&scrollback); let is_alt_screen_active = scrollback.is_alt_screen_active; - terminal_model - .lock() - .load_shared_session_scrollback(scrollback_blocks.as_slice(), is_alt_screen_active); + { + let mut terminal_model = terminal_model.lock(); + match load_mode { + SharedSessionInitialLoadMode::ReplaceFromSessionScrollback => { + terminal_model.load_shared_session_scrollback(scrollback_blocks.as_slice()); + } + SharedSessionInitialLoadMode::AppendFollowupScrollback => { + terminal_model + .append_followup_shared_session_scrollback(scrollback_blocks.as_slice()); + } + } + if is_alt_screen_active { + terminal_model.enter_alt_screen(true); + } + } // When we load scrollback, we might not actually complete a block (e.g. shared session started // without any scrollback except active block). In this case, we want to make sure the input diff --git a/app/src/terminal/shared_session/viewer/event_loop_test.rs b/app/src/terminal/shared_session/viewer/event_loop_test.rs index 3ea07c523..e7afcac8d 100644 --- a/app/src/terminal/shared_session/viewer/event_loop_test.rs +++ b/app/src/terminal/shared_session/viewer/event_loop_test.rs @@ -1,9 +1,10 @@ use crate::ai::blocklist::agent_view::AgentViewState; -use crate::terminal::model::block::SerializedBlock; +use crate::terminal::model::block::{BlockId, SerializedBlock}; use crate::terminal::shared_session::tests::terminal_model_for_viewer; use crate::terminal::TerminalView; use crate::terminal::{ - event_listener::ChannelEventListener, shared_session::viewer::event_loop::EventLoop, + event_listener::ChannelEventListener, + shared_session::viewer::event_loop::{EventLoop, SharedSessionInitialLoadMode}, }; use crate::test_util::add_window_with_terminal; use crate::test_util::terminal::initialize_app_for_terminal_view; @@ -32,6 +33,25 @@ fn terminal_view(app: &mut App) -> ViewHandle { add_window_with_terminal(app, None) } +fn completed_block(command: &str, output: &str) -> SerializedBlock { + let mut block = + SerializedBlock::new_for_test(command.as_bytes().into(), output.as_bytes().into()); + block.id = BlockId::new(); + block +} + +fn active_block() -> SerializedBlock { + let mut block = SerializedBlock::new_active_block_for_test(); + block.id = BlockId::new(); + block +} + +fn scrollback_block(block: &SerializedBlock) -> ScrollbackBlock { + ScrollbackBlock { + raw: serde_json::to_vec(block).unwrap(), + } +} + #[test] fn test_terminal_model_is_correct() { App::test((), |mut app| async move { @@ -55,6 +75,7 @@ fn test_terminal_model_is_correct() { is_alt_screen_active: false, }, None, + SharedSessionInitialLoadMode::ReplaceFromSessionScrollback, ctx, ) }); @@ -74,7 +95,7 @@ fn test_terminal_model_is_correct() { ]; { let mut model = model.lock(); - model.load_shared_session_scrollback(scrollback, false); + model.load_shared_session_scrollback(scrollback); // A hidden block, a completed scrollback block, then the active block. assert_eq!(model.block_list().blocks().len(), 3); assert_eq!( @@ -116,6 +137,91 @@ fn test_terminal_model_is_correct() { }) } +#[test] +fn test_append_followup_scrollback_skips_duplicates() { + App::test((), |mut app| async move { + let channel_event_proxy = ChannelEventListener::new_for_test(); + let model = Arc::new(FairMutex::new(terminal_model_for_viewer( + channel_event_proxy.clone(), + ))); + + let terminal_view = terminal_view(&mut app); + let initial_completed = completed_block("initial-command", "initial-output"); + let initial_active = active_block(); + app.add_model(|ctx| { + EventLoop::new( + model.clone(), + terminal_view.downgrade(), + channel_event_proxy.clone(), + WindowSize { + num_rows: 0, + num_cols: 0, + }, + Scrollback { + blocks: vec![ + scrollback_block(&initial_completed), + scrollback_block(&initial_active), + ], + is_alt_screen_active: false, + }, + None, + SharedSessionInitialLoadMode::ReplaceFromSessionScrollback, + ctx, + ) + }); + + assert_eq!(model.lock().block_list().blocks().len(), 3); + + let followup_completed = completed_block("followup-command", "followup-output"); + let followup_active = active_block(); + app.add_model(|ctx| { + EventLoop::new( + model.clone(), + terminal_view.downgrade(), + channel_event_proxy.clone(), + WindowSize { + num_rows: 0, + num_cols: 0, + }, + Scrollback { + blocks: vec![ + scrollback_block(&initial_completed), + scrollback_block(&followup_completed), + scrollback_block(&followup_active), + ], + is_alt_screen_active: false, + }, + None, + SharedSessionInitialLoadMode::AppendFollowupScrollback, + ctx, + ) + }); + + let model = model.lock(); + let commands = model + .block_list() + .blocks() + .iter() + .map(|block| block.command_to_string()) + .collect::>(); + assert_eq!(model.block_list().blocks().len(), 5); + assert_eq!( + commands + .iter() + .filter(|command| command.contains("initial-command")) + .count(), + 1 + ); + assert_eq!( + commands + .iter() + .filter(|command| command.contains("followup-command")) + .count(), + 1 + ); + }) +} + #[test] fn test_out_of_order_buffering() { App::test((), |mut app| async move { @@ -142,6 +248,7 @@ fn test_out_of_order_buffering() { is_alt_screen_active: false, }, None, + SharedSessionInitialLoadMode::ReplaceFromSessionScrollback, ctx, ) }); @@ -208,6 +315,7 @@ fn test_pty_bytes_buffered_before_command_execution_started() { is_alt_screen_active: false, }, None, + SharedSessionInitialLoadMode::ReplaceFromSessionScrollback, ctx, ) }); diff --git a/app/src/terminal/shared_session/viewer/network.rs b/app/src/terminal/shared_session/viewer/network.rs index b8d999582..084b15856 100644 --- a/app/src/terminal/shared_session/viewer/network.rs +++ b/app/src/terminal/shared_session/viewer/network.rs @@ -51,7 +51,7 @@ use crate::{ shared_session::{ connect_endpoint, network::heartbeat::{Event as HeartbeatEvent, Heartbeat}, - viewer::event_loop::EventLoop, + viewer::event_loop::{EventLoop, SharedSessionInitialLoadMode}, EventNumber, SELECTION_THROTTLE_PERIOD, }, TerminalModel, TerminalView, @@ -123,6 +123,7 @@ pub struct Network { channel_event_proxy: ChannelEventListener, terminal_model: Arc>, + initial_load_mode: SharedSessionInitialLoadMode, stage: Stage, @@ -158,6 +159,7 @@ impl Network { terminal_view: WeakViewHandle, terminal_model: Arc>, write_to_pty_events_rx: Receiver>, + initial_load_mode: SharedSessionInitialLoadMode, ctx: &mut ModelContext, ) -> Self { let (ws_proxy_tx, ws_proxy_rx) = async_channel::unbounded(); @@ -175,6 +177,7 @@ impl Network { ws_proxy_rx: ws_proxy_rx.clone(), channel_event_proxy, terminal_model, + initial_load_mode, terminal_view, stage: Stage::BeforeJoined, id: None, @@ -237,6 +240,7 @@ impl Network { ws_proxy_rx, channel_event_proxy, terminal_model, + initial_load_mode: SharedSessionInitialLoadMode::ReplaceFromSessionScrollback, terminal_view, stage: Stage::BeforeJoined, id: Some(viewer_id.clone()), @@ -555,6 +559,7 @@ impl Network { window_size, *scrollback, latest_event_no, + self.initial_load_mode, ctx, ) }); diff --git a/app/src/terminal/shared_session/viewer/terminal_manager.rs b/app/src/terminal/shared_session/viewer/terminal_manager.rs index d0dab43d8..b3cc5bf63 100644 --- a/app/src/terminal/shared_session/viewer/terminal_manager.rs +++ b/app/src/terminal/shared_session/viewer/terminal_manager.rs @@ -14,7 +14,9 @@ use std::any::Any; use std::sync::Arc; -use warpui::{AppContext, ModelHandle, SingletonEntity, ViewHandle, WeakViewHandle, WindowId}; +use warpui::{ + AppContext, ModelContext, ModelHandle, SingletonEntity, ViewHandle, WeakViewHandle, WindowId, +}; use crate::ai::active_agent_views_model::ActiveAgentViewsModel; use crate::ai::agent::conversation::ConversationStatus; @@ -54,6 +56,7 @@ use crate::terminal::shared_session::shared_handlers::{ use crate::terminal::shared_session::SharedSessionStatus; use crate::terminal::terminal_manager::{compute_block_size, terminal_colors_list}; +use super::event_loop::SharedSessionInitialLoadMode; use super::network::{ agent_prompt_failure_reason_string, command_execution_failure_reason_string, control_action_failure_reason_string, session_ended_reason_string, @@ -67,15 +70,19 @@ use crate::view_components::ToastFlavor; use crate::{pane_group::TerminalViewResources, terminal::model::session::Sessions}; enum NetworkState { + /// No viewer network is attached yet; deferred cloud-mode viewers start here until the + /// follow-up shared session is created. + Idle, Active(ModelHandle), - PendingJoin { - prompt_type: ModelHandle, - channel_event_proxy: ChannelEventListener, - }, - /// Transient state while transitioning from PendingJoin to Active. + /// Transient state while connecting a viewer network. Connecting, } +struct NetworkResources { + prompt_type: ModelHandle, + channel_event_proxy: ChannelEventListener, +} + pub struct TerminalManager { model: Arc>, view: ViewHandle, @@ -87,18 +94,19 @@ pub struct TerminalManager { /// We hold onto this so that the broadcast channel isn't closed prematurely. _inactive_pty_reads_rx: InactiveReceiver>>, - /// The network state for the shared session viewer. When in `PendingJoin` state, - /// holds the resources needed to connect to a session. When in `Active` state, - /// holds the connected network model. + /// The network state for the shared session viewer. network_state: NetworkState, + network_resources: NetworkResources, + current_network: Arc>>>, + viewer_remote_update_guard: RemoteUpdateGuard, + outbound_handlers_registered: bool, } impl TerminalManager { - /// Send selected_conversation update from viewer. - fn send_selected_conversation_update_for_viewer( + fn send_selected_conversation_update_for_viewer_to_current_network( guard: &RemoteUpdateGuard, model: &Arc>, - network: &ModelHandle, + current_network: &Arc>>>, agent_view_controller: &ModelHandle, ai_context_model: &ModelHandle, ctx: &mut AppContext, @@ -109,15 +117,36 @@ impl TerminalManager { return; }; - Self::send_input_context_update(guard, model, network, update, ctx); + Self::send_input_context_update_to_current_network( + guard, + model, + current_network, + update, + ctx, + ); + } + + fn current_network( + current_network: &Arc>>>, + ) -> Option> { + current_network.lock().clone() + } + + fn update_current_network( + current_network: &Arc>>>, + ctx: &mut AppContext, + update: impl FnOnce(&mut Network, &mut ModelContext), + ) { + let Some(network) = Self::current_network(current_network) else { + return; + }; + network.update(ctx, update); } - /// Sends a `UniversalDeveloperInputContextUpdate` to the remote side, - /// gated on the `RemoteUpdateGuard` (echo-suppression) and Editor role. - fn send_input_context_update( + fn send_input_context_update_to_current_network( guard: &RemoteUpdateGuard, model: &Arc>, - network: &ModelHandle, + current_network: &Arc>>>, update: UniversalDeveloperInputContextUpdate, ctx: &mut AppContext, ) { @@ -127,7 +156,8 @@ impl TerminalManager { if !model.lock().shared_session_status().is_executor() { return; } - network.update(ctx, |network, _| { + + Self::update_current_network(current_network, ctx, |network, _| { network.send_universal_developer_input_context_update(update); }); } @@ -244,10 +274,14 @@ impl TerminalManager { _model_events: model_events, view, _inactive_pty_reads_rx: inactive_pty_reads_rx, - network_state: NetworkState::PendingJoin { + network_state: NetworkState::Idle, + network_resources: NetworkResources { prompt_type, channel_event_proxy, }, + current_network: Arc::new(FairMutex::new(None)), + viewer_remote_update_guard: RemoteUpdateGuard::new(), + outbound_handlers_registered: false, } } @@ -262,7 +296,11 @@ impl TerminalManager { let mut terminal_manager = Self::new_internal(resources, initial_size, window_id, false, ctx); - terminal_manager.connect_session(session_id, ctx); + terminal_manager.connect_session( + session_id, + SharedSessionInitialLoadMode::ReplaceFromSessionScrollback, + ctx, + ); terminal_manager } @@ -283,8 +321,12 @@ impl TerminalManager { /// Returns `true` if the connection was initiated, `false` if already connected. pub fn connect_to_session(&mut self, session_id: SessionId, ctx: &mut AppContext) -> bool { match self.network_state { - NetworkState::PendingJoin { .. } => { - self.connect_session(session_id, ctx); + NetworkState::Idle => { + self.connect_session( + session_id, + SharedSessionInitialLoadMode::ReplaceFromSessionScrollback, + ctx, + ); true } NetworkState::Connecting => { @@ -295,21 +337,54 @@ impl TerminalManager { } } + pub fn attach_followup_session(&mut self, session_id: SessionId, ctx: &mut AppContext) -> bool { + match std::mem::replace(&mut self.network_state, NetworkState::Connecting) { + NetworkState::Active(network) => { + network.update(ctx, |network, _| { + network.close_without_reconnection(); + }); + self.model + .lock() + .clear_write_to_pty_events_for_shared_session_tx(); + *self.current_network.lock() = None; + self.network_state = NetworkState::Idle; + } + NetworkState::Idle => { + self.network_state = NetworkState::Idle; + } + NetworkState::Connecting => { + self.network_state = NetworkState::Connecting; + log::warn!( + "attach_followup_session called while already connecting to shared session" + ); + return false; + } + } + + self.connect_session( + session_id, + SharedSessionInitialLoadMode::AppendFollowupScrollback, + ctx, + ); + true + } + /// Connects this terminal manager to a shared session. /// This method sets up the network model and all associated event handlers. - fn connect_session(&mut self, session_id: SessionId, ctx: &mut AppContext) { - let (prompt_type, channel_event_proxy) = - match std::mem::replace(&mut self.network_state, NetworkState::Connecting) { - NetworkState::PendingJoin { - prompt_type, - channel_event_proxy, - } => (prompt_type, channel_event_proxy), - other => { - self.network_state = other; - log::warn!("connect_session called on already-connected TerminalManager"); - return; - } - }; + fn connect_session( + &mut self, + session_id: SessionId, + initial_load_mode: SharedSessionInitialLoadMode, + ctx: &mut AppContext, + ) { + match std::mem::replace(&mut self.network_state, NetworkState::Connecting) { + NetworkState::Idle => {} + other => { + self.network_state = other; + log::warn!("connect_session called on already-connected TerminalManager"); + return; + } + } // Set up the channel for forwarding write-to-pty events over the network to the sharer. // Whenever the user writes to a long-running command (e.g. ctrl-c or typing), those bytes @@ -318,245 +393,256 @@ impl TerminalManager { self.model .lock() .set_write_to_pty_events_for_shared_session_tx(write_to_pty_events_tx); + self.model + .lock() + .set_shared_session_status(SharedSessionStatus::ViewPending); let network = ctx.add_model(|ctx| { Network::new( session_id, - channel_event_proxy, + self.network_resources.channel_event_proxy.clone(), self.view.downgrade(), self.model.clone(), write_to_pty_events_rx, + initial_load_mode, ctx, ) }); - - let viewer_remote_update_guard = RemoteUpdateGuard::new(); + *self.current_network.lock() = Some(network.clone()); Self::handle_network_events( &network, &self.view, self.model.clone(), - prompt_type, - viewer_remote_update_guard.clone(), - ctx, - ); - Self::handle_view_events( - network.clone(), - &self.view, - self.model.clone(), - viewer_remote_update_guard.clone(), + self.current_network.clone(), + self.network_resources.prompt_type.clone(), + self.viewer_remote_update_guard.clone(), ctx, ); - Self::handle_network_status_events(&self.view, network.clone(), ctx); - - // Send model selection updates during session sharing (if viewer has Editor role) - let network_for_models = network.clone(); - let terminal_view_id = self.view.id(); - let model_clone = self.model.clone(); - let model_remote_update_guard = viewer_remote_update_guard.clone(); - ctx.subscribe_to_model(&LLMPreferences::handle(ctx), move |_prefs, event, ctx| { - // Only react to agent mode LLM changes - if !matches!(event, LLMPreferencesEvent::UpdatedActiveAgentModeLLM) { - return; - } - - let llm_prefs = &LLMPreferences::as_ref(ctx); - let selected_model_id: String = llm_prefs - .get_active_base_model(ctx, Some(terminal_view_id)) - .id - .clone() - .into(); - - Self::send_input_context_update( - &model_remote_update_guard, - &model_clone, - &network_for_models, - UniversalDeveloperInputContextUpdate { - selected_model: Some(SelectedAgentModel::new(selected_model_id)), - ..Default::default() - }, + if !self.outbound_handlers_registered { + Self::handle_view_events( + self.current_network.clone(), + &self.view, + self.model.clone(), + self.viewer_remote_update_guard.clone(), ctx, ); - }); - - // Send input mode updates during session sharing (if viewer has Editor role). - // When AgentView is enabled, we only send updates when in an active agent view. - // For ambient agent sessions, input mode is controlled locally, so we skip sending updates. - let network_for_input_mode = network.clone(); - let model_clone_for_input = self.model.clone(); - let ai_input_model = self.view.as_ref(ctx).ai_input_model().clone(); - let weak_view_for_input_mode = self.view.downgrade(); - let input_mode_remote_update_guard = viewer_remote_update_guard.clone(); - ctx.subscribe_to_model(&ai_input_model, move |_, event, ctx| { - // In ambient agent sessions, input mode is controlled locally. - if model_clone_for_input - .lock() - .is_shared_ambient_agent_session() - { - return; - } - - // When AgentView is enabled, only send input mode updates when in an active agent view. - if FeatureFlag::AgentView.is_enabled() { - let Some(view) = weak_view_for_input_mode.upgrade(ctx) else { - return; - }; - let agent_view_controller = view.as_ref(ctx).agent_view_controller().clone(); - if !agent_view_controller.as_ref(ctx).is_active() { + Self::handle_network_status_events(&self.view, self.current_network.clone(), ctx); + + // Send model selection updates during session sharing (if viewer has Editor role) + let current_network_for_models = self.current_network.clone(); + let terminal_view_id = self.view.id(); + let model_clone = self.model.clone(); + let model_remote_update_guard = self.viewer_remote_update_guard.clone(); + ctx.subscribe_to_model(&LLMPreferences::handle(ctx), move |_prefs, event, ctx| { + // Only react to agent mode LLM changes + if !matches!(event, LLMPreferencesEvent::UpdatedActiveAgentModeLLM) { return; } - } - let config = event.updated_config(); + let llm_prefs = &LLMPreferences::as_ref(ctx); + let selected_model_id: String = llm_prefs + .get_active_base_model(ctx, Some(terminal_view_id)) + .id + .clone() + .into(); + + Self::send_input_context_update_to_current_network( + &model_remote_update_guard, + &model_clone, + ¤t_network_for_models, + UniversalDeveloperInputContextUpdate { + selected_model: Some(SelectedAgentModel::new(selected_model_id)), + ..Default::default() + }, + ctx, + ); + }); - Self::send_input_context_update( - &input_mode_remote_update_guard, - &model_clone_for_input, - &network_for_input_mode, - UniversalDeveloperInputContextUpdate { - input_mode: Some((*config).into()), - ..Default::default() - }, - ctx, - ); - }); + // Send input mode updates during session sharing (if viewer has Editor role). + // When AgentView is enabled, we only send updates when in an active agent view. + // For ambient agent sessions, input mode is controlled locally, so we skip sending updates. + let current_network_for_input_mode = self.current_network.clone(); + let model_clone_for_input = self.model.clone(); + let ai_input_model = self.view.as_ref(ctx).ai_input_model().clone(); + let weak_view_for_input_mode = self.view.downgrade(); + let input_mode_remote_update_guard = self.viewer_remote_update_guard.clone(); + ctx.subscribe_to_model(&ai_input_model, move |_, event, ctx| { + // In ambient agent sessions, input mode is controlled locally. + if model_clone_for_input + .lock() + .is_shared_ambient_agent_session() + { + return; + } - let agent_view_controller = self.view.as_ref(ctx).agent_view_controller().clone(); - let ai_context_model = self.view.as_ref(ctx).ai_context_model().clone(); - // Send selected conversation updates during session sharing (if viewer has Editor role) - if FeatureFlag::AgentView.is_enabled() { - // When agent view is enabled, we listen to the agent view controller - // as the authoritative source for which conversation is selected. - let network_for_conversation = network.clone(); - let model_for_conversation = self.model.clone(); - let ai_context_model_for_conversation = ai_context_model.clone(); - let conversation_remote_update_guard = viewer_remote_update_guard.clone(); - ctx.subscribe_to_model( - &agent_view_controller, - move |agent_view_controller, event, ctx| match event { - AgentViewControllerEvent::EnteredAgentView { .. } - | AgentViewControllerEvent::ExitedAgentView { .. } => { - Self::send_selected_conversation_update_for_viewer( - &conversation_remote_update_guard, - &model_for_conversation, - &network_for_conversation, - &agent_view_controller, - &ai_context_model_for_conversation, - ctx, - ); + // When AgentView is enabled, only send input mode updates when in an active agent view. + if FeatureFlag::AgentView.is_enabled() { + let Some(view) = weak_view_for_input_mode.upgrade(ctx) else { + return; + }; + let agent_view_controller = view.as_ref(ctx).agent_view_controller().clone(); + if !agent_view_controller.as_ref(ctx).is_active() { + return; } - AgentViewControllerEvent::ExitConfirmed { .. } => {} - }, - ); - } else { - // When agent view is disabled, we fallback to the legacy behavior - // of listening for pending query state changes to know which conversation is selected. - let network_for_conversation = network.clone(); - let model_for_conversation = self.model.clone(); - let agent_view_controller_for_conversation = agent_view_controller.clone(); - let conversation_remote_update_guard = viewer_remote_update_guard.clone(); - ctx.subscribe_to_model(&ai_context_model, move |ai_context_model, event, ctx| { - if !matches!(event, BlocklistAIContextEvent::PendingQueryStateUpdated) { - return; } - Self::send_selected_conversation_update_for_viewer( - &conversation_remote_update_guard, - &model_for_conversation, - &network_for_conversation, - &agent_view_controller_for_conversation, - &ai_context_model, + let config = event.updated_config(); + + Self::send_input_context_update_to_current_network( + &input_mode_remote_update_guard, + &model_clone_for_input, + ¤t_network_for_input_mode, + UniversalDeveloperInputContextUpdate { + input_mode: Some((*config).into()), + ..Default::default() + }, ctx, ); }); - } - // Send auto-approve updates during session sharing (if viewer has Editor role) - let network_for_auto = network.clone(); - let model_clone_for_auto = self.model.clone(); - let view_id_for_auto = self.view.id(); - let weak_view_for_auto = self.view.downgrade(); - let auto_approve_remote_update_guard = viewer_remote_update_guard.clone(); - ctx.subscribe_to_model( - &BlocklistAIHistoryModel::handle(ctx), - move |_, event, ctx| { - // We intentionally keep this as a full match so new variants - // are forced to be handled here - #[allow(clippy::single_match)] - match event { - BlocklistAIHistoryEvent::UpdatedAutoexecuteOverride { terminal_view_id } => { - if *terminal_view_id != view_id_for_auto { - return; + let agent_view_controller = self.view.as_ref(ctx).agent_view_controller().clone(); + let ai_context_model = self.view.as_ref(ctx).ai_context_model().clone(); + // Send selected conversation updates during session sharing (if viewer has Editor role) + if FeatureFlag::AgentView.is_enabled() { + // When agent view is enabled, we listen to the agent view controller + // as the authoritative source for which conversation is selected. + let current_network_for_conversation = self.current_network.clone(); + let model_for_conversation = self.model.clone(); + let ai_context_model_for_conversation = ai_context_model.clone(); + let conversation_remote_update_guard = self.viewer_remote_update_guard.clone(); + ctx.subscribe_to_model( + &agent_view_controller, + move |agent_view_controller, event, ctx| match event { + AgentViewControllerEvent::EnteredAgentView { .. } + | AgentViewControllerEvent::ExitedAgentView { .. } => { + Self::send_selected_conversation_update_for_viewer_to_current_network( + &conversation_remote_update_guard, + &model_for_conversation, + ¤t_network_for_conversation, + &agent_view_controller, + &ai_context_model_for_conversation, + ctx, + ); } - - let Some(view) = weak_view_for_auto.upgrade(ctx) else { - return; - }; - - let auto_approve = view - .as_ref(ctx) - .ai_context_model() - .as_ref(ctx) - .pending_query_autoexecute_override(ctx) - .is_autoexecute_any_action(); - Self::send_input_context_update( - &auto_approve_remote_update_guard, - &model_clone_for_auto, - &network_for_auto, - UniversalDeveloperInputContextUpdate { - auto_approve_agent_actions: Some(auto_approve), - ..Default::default() - }, - ctx, - ); + AgentViewControllerEvent::ExitConfirmed { .. } => {} + }, + ); + } else { + // When agent view is disabled, we fallback to the legacy behavior + // of listening for pending query state changes to know which conversation is selected. + let current_network_for_conversation = self.current_network.clone(); + let model_for_conversation = self.model.clone(); + let agent_view_controller_for_conversation = agent_view_controller.clone(); + let conversation_remote_update_guard = self.viewer_remote_update_guard.clone(); + ctx.subscribe_to_model(&ai_context_model, move |ai_context_model, event, ctx| { + if !matches!(event, BlocklistAIContextEvent::PendingQueryStateUpdated) { + return; } - _ => {} - } - }, - ); - // Broadcast CLI agent rich input open/close changes from viewer back to sharer. - let network_for_cli = network.clone(); - let model_for_cli = self.model.clone(); - let view_id_for_cli = self.view.id(); - let cli_remote_update_guard = viewer_remote_update_guard.clone(); - ctx.subscribe_to_model(&CLIAgentSessionsModel::handle(ctx), move |_, event, ctx| { - let CLIAgentSessionsModelEvent::InputSessionChanged { - terminal_view_id, - new_input_state, - .. - } = event - else { - return; - }; - if *terminal_view_id != view_id_for_cli || !cli_remote_update_guard.should_broadcast() { - return; + Self::send_selected_conversation_update_for_viewer_to_current_network( + &conversation_remote_update_guard, + &model_for_conversation, + ¤t_network_for_conversation, + &agent_view_controller_for_conversation, + &ai_context_model, + ctx, + ); + }); } - let cli_agent_session = { - let sessions_model = CLIAgentSessionsModel::as_ref(ctx); - match sessions_model.session(view_id_for_cli) { - Some(session) => CLIAgentSessionState::Active { - cli_agent: session.agent.to_serialized_name(), - is_rich_input_open: matches!( - new_input_state, - CLIAgentInputState::Open { .. } - ), - }, - None => CLIAgentSessionState::Inactive, - } - }; - Self::send_input_context_update( - &cli_remote_update_guard, - &model_for_cli, - &network_for_cli, - UniversalDeveloperInputContextUpdate { - cli_agent_session: Some(cli_agent_session), - ..Default::default() + + // Send auto-approve updates during session sharing (if viewer has Editor role) + let current_network_for_auto = self.current_network.clone(); + let model_clone_for_auto = self.model.clone(); + let view_id_for_auto = self.view.id(); + let weak_view_for_auto = self.view.downgrade(); + let auto_approve_remote_update_guard = self.viewer_remote_update_guard.clone(); + ctx.subscribe_to_model( + &BlocklistAIHistoryModel::handle(ctx), + move |_, event, ctx| { + // We intentionally keep this as a full match so new variants + // are forced to be handled here + #[allow(clippy::single_match)] + match event { + BlocklistAIHistoryEvent::UpdatedAutoexecuteOverride { + terminal_view_id, + } => { + if *terminal_view_id != view_id_for_auto { + return; + } + + let Some(view) = weak_view_for_auto.upgrade(ctx) else { + return; + }; + + let auto_approve = view + .as_ref(ctx) + .ai_context_model() + .as_ref(ctx) + .pending_query_autoexecute_override(ctx) + .is_autoexecute_any_action(); + Self::send_input_context_update_to_current_network( + &auto_approve_remote_update_guard, + &model_clone_for_auto, + ¤t_network_for_auto, + UniversalDeveloperInputContextUpdate { + auto_approve_agent_actions: Some(auto_approve), + ..Default::default() + }, + ctx, + ); + } + _ => {} + } }, - ctx, ); - }); + // Broadcast CLI agent rich input open/close changes from viewer back to sharer. + let current_network_for_cli = self.current_network.clone(); + let model_for_cli = self.model.clone(); + let view_id_for_cli = self.view.id(); + let cli_remote_update_guard = self.viewer_remote_update_guard.clone(); + ctx.subscribe_to_model(&CLIAgentSessionsModel::handle(ctx), move |_, event, ctx| { + let CLIAgentSessionsModelEvent::InputSessionChanged { + terminal_view_id, + new_input_state, + .. + } = event + else { + return; + }; + if *terminal_view_id != view_id_for_cli + || !cli_remote_update_guard.should_broadcast() + { + return; + } + let cli_agent_session = { + let sessions_model = CLIAgentSessionsModel::as_ref(ctx); + match sessions_model.session(view_id_for_cli) { + Some(session) => CLIAgentSessionState::Active { + cli_agent: session.agent.to_serialized_name(), + is_rich_input_open: matches!( + new_input_state, + CLIAgentInputState::Open { .. } + ), + }, + None => CLIAgentSessionState::Inactive, + } + }; + Self::send_input_context_update_to_current_network( + &cli_remote_update_guard, + &model_for_cli, + ¤t_network_for_cli, + UniversalDeveloperInputContextUpdate { + cli_agent_session: Some(cli_agent_session), + ..Default::default() + }, + ctx, + ); + }); + + self.outbound_handlers_registered = true; + } self.network_state = NetworkState::Active(network); } @@ -564,6 +650,7 @@ impl TerminalManager { network: &ModelHandle, view: &ViewHandle, model: Arc>, + current_network: Arc>>>, prompt_type: ModelHandle, viewer_remote_update_guard: RemoteUpdateGuard, ctx: &mut AppContext, @@ -678,8 +765,18 @@ impl TerminalManager { let Some(view) = weak_view_handle.upgrade(ctx) else { return; }; - let is_cloud_mode = model.lock().is_shared_ambient_agent_session(); - Self::shared_session_ended(&view, model.clone(), ctx); + let is_ambient_agent = model.lock().is_shared_ambient_agent_session(); + if is_ambient_agent { + Self::ambient_session_ended( + &view, + model.clone(), + ¤t_network, + &network, + ctx, + ); + } else { + Self::shared_session_ended(&view, model.clone(), ctx); + } view.update(ctx, |terminal_view, ctx| { let reason_string = session_ended_reason_string(reason); match reason { @@ -692,7 +789,7 @@ impl TerminalManager { ctx, ); } - SessionEndedReason::InternalServerError if is_cloud_mode => { + SessionEndedReason::InternalServerError if is_ambient_agent => { // Don't show toast for cloud mode sessions - the error message // "ask sharer to reshare" doesn't apply. } @@ -1221,7 +1318,7 @@ impl TerminalManager { } fn handle_view_events( - network: ModelHandle, + current_network: Arc>>>, view: &ViewHandle, model: Arc>, viewer_remote_update_guard: RemoteUpdateGuard, @@ -1232,17 +1329,17 @@ impl TerminalManager { let selection = view.read(ctx, |view, ctx| { view.get_shared_session_presence_selection(ctx) }); - network.update(ctx, |network, _| { + Self::update_current_network(¤t_network, ctx, |network, _| { network.send_presence_selection_if_changed(selection); }); } TerminalViewEvent::RequestSharedSessionRole(role) => { - network.update(ctx, |network, _| { + Self::update_current_network(¤t_network, ctx, |network, _| { network.send_role_request(*role); }); } TerminalViewEvent::CancelRoleRequest(role_request_id) => { - network.update(ctx, |network, _| { + Self::update_current_network(¤t_network, ctx, |network, _| { network.send_cancel_role_request(role_request_id.clone()); }); } @@ -1258,7 +1355,7 @@ impl TerminalManager { // Only send input updates if the viewer is an executor if model.lock().shared_session_status().is_executor() { - network.update(ctx, |network, _| { + Self::update_current_network(¤t_network, ctx, |network, _| { network.send_input_update(block_id, operations.iter()); }); } @@ -1281,13 +1378,13 @@ impl TerminalManager { // Only send command execution request if the viewer is an executor. if model.lock().shared_session_status().is_executor() { - network.update(ctx, |network, _| { + Self::update_current_network(¤t_network, ctx, |network, _| { network.send_command_execution_request(block_id, command.to_owned()); }); } } TerminalViewEvent::RejoinCurrentSession => { - network.update(ctx, |network, ctx| { + Self::update_current_network(¤t_network, ctx, |network, ctx| { network.reauthenticate_viewer(ctx); }); } @@ -1296,7 +1393,7 @@ impl TerminalManager { prompt, attachments, } => { - network.update(ctx, |network, _| { + Self::update_current_network(¤t_network, ctx, |network, _| { network.send_agent_prompt_request( *server_conversation_token, prompt.clone(), @@ -1307,20 +1404,20 @@ impl TerminalManager { TerminalViewEvent::CancelSharedSessionConversation { server_conversation_token, } => { - network.update(ctx, |network, _| { + Self::update_current_network(¤t_network, ctx, |network, _| { network.send_cancel_control_action(*server_conversation_token); }); } TerminalViewEvent::ReportViewerTerminalSize { window_size } => { - network.update(ctx, |network, _| { + Self::update_current_network(¤t_network, ctx, |network, _| { network.send_report_terminal_size(*window_size); }); } TerminalViewEvent::LongRunningCommandAgentInteractionStateChanged { state } => { - Self::send_input_context_update( + Self::send_input_context_update_to_current_network( &viewer_remote_update_guard, &model, - &network, + ¤t_network, UniversalDeveloperInputContextUpdate { long_running_command_agent_interaction_state: Some(*state), ..Default::default() @@ -1329,37 +1426,37 @@ impl TerminalManager { ); } TerminalViewEvent::UpdateSessionLinkPermissions { role } => { - network.update(ctx, |network, _| { + Self::update_current_network(¤t_network, ctx, |network, _| { network.send_link_permission_update(*role); }); } TerminalViewEvent::UpdateSessionTeamPermissions { role, team_uid } => { - network.update(ctx, |network, _| { + Self::update_current_network(¤t_network, ctx, |network, _| { network.send_team_permission_update(*role, team_uid.clone()); }); } TerminalViewEvent::AddGuests { emails, role } => { - network.update(ctx, |network, _| { + Self::update_current_network(¤t_network, ctx, |network, _| { network.send_add_guests(emails.clone(), *role); }); } TerminalViewEvent::RemoveGuest { user_uid } => { - network.update(ctx, |network, _| { + Self::update_current_network(¤t_network, ctx, |network, _| { network.send_remove_guest(*user_uid); }); } TerminalViewEvent::RemovePendingGuest { email } => { - network.update(ctx, |network, _| { + Self::update_current_network(¤t_network, ctx, |network, _| { network.send_remove_pending_guest(email.clone()); }); } TerminalViewEvent::UpdateUserRole { user_uid, role } => { - network.update(ctx, |network, _| { + Self::update_current_network(¤t_network, ctx, |network, _| { network.send_user_role_update(*user_uid, *role); }); } TerminalViewEvent::UpdatePendingUserRole { email, role } => { - network.update(ctx, |network, _| { + Self::update_current_network(¤t_network, ctx, |network, _| { network.send_pending_user_role_update(email.clone(), *role); }); } @@ -1369,7 +1466,7 @@ impl TerminalManager { fn handle_network_status_events( view: &ViewHandle, - network: ModelHandle, + current_network: Arc>>>, ctx: &mut AppContext, ) { let weak_view_handle = view.downgrade(); @@ -1382,7 +1479,9 @@ impl TerminalManager { let NetworkStatusEvent::NetworkStatusChanged { new_status } = event; match new_status { NetworkStatusKind::Online => { - if network.as_ref(ctx).is_connected() { + if Self::current_network(¤t_network) + .is_some_and(|network| network.as_ref(ctx).is_connected()) + { view.update(ctx, |view, ctx| { view.on_shared_session_reconnection_status_changed(false, ctx) }); @@ -1437,6 +1536,29 @@ impl TerminalManager { .lock() .clear_write_to_pty_events_for_shared_session_tx(); } + + fn ambient_session_ended( + terminal_view: &ViewHandle, + model: Arc>, + current_network: &Arc>>>, + ended_network: &ModelHandle, + ctx: &mut AppContext, + ) { + Manager::handle(ctx).update(ctx, |manager, _| { + manager.left_share(terminal_view.id()); + }); + + model + .lock() + .clear_write_to_pty_events_for_shared_session_tx(); + + let ended_session_id = ended_network.as_ref(ctx).session_id(); + if Self::current_network(current_network) + .is_some_and(|network| network.as_ref(ctx).session_id() == ended_session_id) + { + *current_network.lock() = None; + } + } } impl crate::terminal::TerminalManager for TerminalManager { diff --git a/app/src/terminal/view.rs b/app/src/terminal/view.rs index e68ea0bbd..606f20fb8 100644 --- a/app/src/terminal/view.rs +++ b/app/src/terminal/view.rs @@ -54,7 +54,8 @@ use crate::ai::blocklist::agent_view::{ agent_view_bg_fill, AgentViewController, AgentViewControllerEvent, AgentViewDisplayMode, AgentViewEntryBlockParams, AgentViewEntryOrigin, AgentViewHeaderDisabledTheme, AgentViewHeaderTheme, AgentViewZeroStateBlock, AgentViewZeroStateEvent, EphemeralMessageModel, - ExitConfirmationTrigger, InlineAgentViewHeader, ENTER_OR_EXIT_CONFIRMATION_WINDOW, + ExitAgentViewError, ExitConfirmationTrigger, InlineAgentViewHeader, + ENTER_OR_EXIT_CONFIRMATION_WINDOW, }; use crate::ai::conversation_utils; use crate::ai::predict::prompt_suggestions::{ @@ -4492,6 +4493,24 @@ impl TerminalView { callback(self, ctx); } + fn can_exit_agent_view_for_terminal_view( + &self, + ctx: &AppContext, + ) -> Result<(), ExitAgentViewError> { + match self.agent_view_controller.as_ref(ctx).can_exit_agent_view() { + Err(ExitAgentViewError::LongRunningCommand) + if self.can_pop_nested_cloud_agent_view(ctx) => + { + Ok(()) + } + result => result, + } + } + + fn can_pop_nested_cloud_agent_view(&self, ctx: &AppContext) -> bool { + self.is_ambient_agent_session(ctx) && self.is_nested_cloud_mode(ctx) + } + /// Exits the active agent, either: /// * Exiting agent view for the selected conversation /// * Popping the current view off the navigation stack (for cloud mode agents) @@ -10067,9 +10086,7 @@ impl TerminalView { /// the user can exit agent mode, and shows a tooltip explaining when exiting is blocked. fn update_agent_view_back_button_state(&mut self, ctx: &mut ViewContext) { let disabled_reason = self - .agent_view_controller - .as_ref(ctx) - .can_exit_agent_view() + .can_exit_agent_view_for_terminal_view(ctx) .err() .map(|e| e.to_string()); @@ -19889,22 +19906,19 @@ impl TerminalView { && self.agent_view_controller.as_ref(ctx).is_active() { // Disable escape completely for ambient agents without a parent terminal. - if self - .agent_view_controller - .as_ref(ctx) - .can_exit_agent_view() - .is_err() - { + if self.can_exit_agent_view_for_terminal_view(ctx).is_err() { return; } - if !self + let is_long_running = self .model .lock() .block_list() .active_block() - .is_active_and_long_running() - { + .is_active_and_long_running(); + if is_long_running && self.can_pop_nested_cloud_agent_view(ctx) { + self.exit_agent_view(ctx); + } else if !is_long_running { // During first-time setup, always exit directly without confirmation // since the setup overlay would obscure any confirmation dialog. let is_in_setup = self @@ -25420,8 +25434,10 @@ impl TypedActionView for TerminalView { ctx.notify(); } ExitAgentView => { - self.exit_agent_view(ctx); - ctx.notify(); + if self.can_exit_agent_view_for_terminal_view(ctx).is_ok() { + self.exit_agent_view(ctx); + ctx.notify(); + } } EnterCloudAgentView => { let mut draft_text = self.input.as_ref(ctx).buffer_text(ctx); diff --git a/app/src/terminal/view/ambient_agent/mod.rs b/app/src/terminal/view/ambient_agent/mod.rs index 498cc7b59..6d97d361d 100644 --- a/app/src/terminal/view/ambient_agent/mod.rs +++ b/app/src/terminal/view/ambient_agent/mod.rs @@ -74,14 +74,33 @@ pub fn create_cloud_mode_view( }; terminal_manager.update(ctx, |_, ctx| { ctx.subscribe_to_model(&view_model, move |manager, event, ctx| { - if let AmbientAgentViewModelEvent::SessionReady { session_id } = event { - if let Some(manager) = manager - .as_any_mut() - .downcast_mut::() - { + let Some(manager) = manager + .as_any_mut() + .downcast_mut::() + else { + return; + }; + match event { + AmbientAgentViewModelEvent::SessionReady { session_id } => { manager.connect_to_session(*session_id, ctx); } - }; + AmbientAgentViewModelEvent::FollowupSessionReady { session_id } => { + manager.attach_followup_session(*session_id, ctx); + } + AmbientAgentViewModelEvent::EnteredSetupState + | AmbientAgentViewModelEvent::EnteredComposingState + | AmbientAgentViewModelEvent::DispatchedAgent + | AmbientAgentViewModelEvent::ProgressUpdated + | AmbientAgentViewModelEvent::EnvironmentSelected + | AmbientAgentViewModelEvent::Failed { .. } + | AmbientAgentViewModelEvent::ShowCloudAgentCapacityModal + | AmbientAgentViewModelEvent::ShowAICreditModal + | AmbientAgentViewModelEvent::NeedsGithubAuth + | AmbientAgentViewModelEvent::Cancelled + | AmbientAgentViewModelEvent::HarnessSelected + | AmbientAgentViewModelEvent::HarnessCommandStarted + | AmbientAgentViewModelEvent::UpdatedSetupCommandVisibility => {} + } }); }); diff --git a/app/src/terminal/view/ambient_agent/model.rs b/app/src/terminal/view/ambient_agent/model.rs index 1b03dc344..978e08eda 100644 --- a/app/src/terminal/view/ambient_agent/model.rs +++ b/app/src/terminal/view/ambient_agent/model.rs @@ -427,6 +427,14 @@ impl AmbientAgentViewModel { ); } + /// Attach the view model to the shared session created for a follow-up prompt and notify the + /// terminal manager to append that session's scrollback to the existing transcript. + pub fn attach_followup_session(&mut self, session_id: SessionId, ctx: &mut ModelContext) { + self.stop_progress_timer(); + self.status = Status::AgentRunning; + ctx.emit(AmbientAgentViewModelEvent::FollowupSessionReady { session_id }); + } + pub fn status(&self) -> &Status { &self.status } @@ -653,8 +661,13 @@ impl AmbientAgentViewModel { if let Some(session_id) = session_join_info.session_id { me.stop_progress_timer(); + let event = if matches!(me.status, Status::AgentRunning) { + AmbientAgentViewModelEvent::FollowupSessionReady { session_id } + } else { + AmbientAgentViewModelEvent::SessionReady { session_id } + }; me.status = Status::AgentRunning; - ctx.emit(AmbientAgentViewModelEvent::SessionReady { session_id }); + ctx.emit(event); } } AmbientAgentEvent::AtCapacity => { @@ -907,6 +920,10 @@ pub enum AmbientAgentViewModelEvent { SessionReady { session_id: SessionId, }, + /// A follow-up execution has started sharing a fresh session. + FollowupSessionReady { + session_id: SessionId, + }, /// An environment was selected. EnvironmentSelected, /// The ambient agent failed. diff --git a/app/src/terminal/view/ambient_agent/view_impl.rs b/app/src/terminal/view/ambient_agent/view_impl.rs index fb9f4aadf..84bceced7 100644 --- a/app/src/terminal/view/ambient_agent/view_impl.rs +++ b/app/src/terminal/view/ambient_agent/view_impl.rs @@ -185,7 +185,8 @@ impl TerminalView { // Re-render to show loading state. ctx.notify(); } - AmbientAgentViewModelEvent::SessionReady { .. } => { + AmbientAgentViewModelEvent::SessionReady { .. } + | AmbientAgentViewModelEvent::FollowupSessionReady { .. } => { // Auto-open details panel for local cloud mode once the session is ready. self.maybe_auto_open_cloud_mode_details_panel(ctx); // Re-render to hide the loading screen now that the session is ready. diff --git a/app/src/terminal/view_test.rs b/app/src/terminal/view_test.rs index 846213208..73047e3d6 100644 --- a/app/src/terminal/view_test.rs +++ b/app/src/terminal/view_test.rs @@ -1,8 +1,11 @@ +use std::any::Any; use std::cell::RefCell; use std::pin::pin; use std::rc::Rc; +use std::sync::Arc; use crate::ai::agent::conversation::ConversationStatus; +use parking_lot::FairMutex; use warp_terminal::model::escape_sequences::{BRACKETED_PASTE_END, BRACKETED_PASTE_START}; use warpui::{ notification::UserNotification, platform::WindowStyle, Presenter, WindowInvalidation, @@ -10,10 +13,11 @@ use warpui::{ use crate::ai::agent::task::TaskId; use crate::ai::blocklist::block::cli_controller::UserTakeOverReason; -use warpui::App; +use warpui::{App, ReadModel}; use crate::pane_group::focus_state::PaneGroupFocusState; -use crate::pane_group::{BackingView, TerminalPaneId}; +use crate::pane_group::{pane::PaneStack, BackingView, TerminalPaneId}; +use crate::settings::import::model::ImportedConfigModel; use crate::terminal::model::grid::Dimensions as _; use crate::{ terminal::alt_screen::should_intercept_mouse, @@ -50,7 +54,7 @@ use crate::terminal::model::ansi::{BootstrappedValue, PreexecValue}; use crate::terminal::model::blocks::{insert_block, TotalIndex}; use crate::terminal::model::terminal_model::WithinBlock; -use crate::terminal::MockTerminalManager; +use crate::terminal::{MockTerminalManager, TerminalManager, TerminalModel}; use crate::test_util::terminal::initialize_app_for_terminal_view; use crate::test_util::{add_window_with_terminal, assert_eventually}; @@ -64,6 +68,29 @@ fn add_window_with_cloud_mode_terminal(app: &mut App) -> ViewHandle>, + view: ViewHandle, +} + +impl TerminalManager for TestTerminalManager { + fn model(&self) -> Arc> { + self.model.clone() + } + + fn view(&self) -> ViewHandle { + self.view.clone() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + /// Test to verify that blocks created through normal execution /// have the correct local status set #[test] @@ -338,38 +365,101 @@ fn command_first_word_and_suffix_handles_alias_without_args() { } #[test] -fn root_cloud_mode_pane_sets_root_cloud_mode_context_key() { - use std::any::Any; - use std::sync::Arc; +fn escape_pops_nested_cloud_agent_view_with_long_running_command() { + App::test((), |mut app| async move { + initialize_app_for_terminal_view(&mut app); + let _agent_view = FeatureFlag::AgentView.override_enabled(true); + let _cloud_mode = FeatureFlag::CloudMode.override_enabled(true); - use parking_lot::FairMutex; + let parent_terminal = add_window_with_terminal(&mut app, None); + let cloud_terminal = add_window_with_cloud_mode_terminal(&mut app); - use crate::pane_group::pane::PaneStack; - use crate::settings::import::model::ImportedConfigModel; - use crate::terminal::{TerminalManager, TerminalModel}; + let parent_view = parent_terminal.clone(); + let cloud_view = cloud_terminal.clone(); + let parent_model = parent_terminal.read(&app, |view, _| view.model.clone()); + let cloud_model = cloud_terminal.read(&app, |view, _| view.model.clone()); + let pane_stack = app.update(move |ctx| { + let parent_manager = ctx.add_model(|_| { + let manager: Box = Box::new(TestTerminalManager { + model: parent_model, + view: parent_view.clone(), + }); + manager + }); + let cloud_manager = ctx.add_model(|_| { + let manager: Box = Box::new(TestTerminalManager { + model: cloud_model, + view: cloud_view.clone(), + }); + manager + }); + let pane_stack = ctx.add_model(|ctx| PaneStack::new(parent_manager, parent_view, ctx)); + pane_stack.update(ctx, |stack, ctx| { + stack.push(cloud_manager, cloud_view, ctx); + }); + pane_stack + }); - struct TestTerminalManager { - model: Arc>, - view: ViewHandle, - } + cloud_terminal.update(&mut app, |view, ctx| { + view.enter_agent_view_for_new_conversation(None, AgentViewEntryOrigin::CloudAgent, ctx); + view.model + .lock() + .simulate_long_running_block("sleep 10", "running"); - impl TerminalManager for TestTerminalManager { - fn model(&self) -> Arc> { - self.model.clone() - } + assert!(view.can_pop_nested_cloud_agent_view(ctx)); + assert_eq!(view.can_exit_agent_view_for_terminal_view(ctx), Ok(())); + }); - fn view(&self) -> ViewHandle { - self.view.clone() - } + assert_eq!( + app.read_model(&pane_stack, |stack, _| stack.active_view().id()), + cloud_terminal.id() + ); - fn as_any(&self) -> &dyn Any { - self - } + cloud_terminal.update(&mut app, |view, ctx| { + view.handle_input_event(&InputEvent::Escape, ctx); + }); - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } - } + assert_eq!( + app.read_model(&pane_stack, |stack, _| stack.active_view().id()), + parent_terminal.id() + ); + }) +} + +#[test] +fn escape_does_not_exit_local_agent_view_with_long_running_command() { + App::test((), |mut app| async move { + initialize_app_for_terminal_view(&mut app); + let _agent_view = FeatureFlag::AgentView.override_enabled(true); + + let terminal = add_window_with_terminal(&mut app, None); + + terminal.update(&mut app, |view, ctx| { + view.enter_agent_view_for_new_conversation( + None, + AgentViewEntryOrigin::Input { + was_prompt_autodetected: false, + }, + ctx, + ); + view.model + .lock() + .simulate_long_running_block("sleep 10", "running"); + + assert!(matches!( + view.can_exit_agent_view_for_terminal_view(ctx), + Err(ExitAgentViewError::LongRunningCommand) + )); + + view.handle_input_event(&InputEvent::Escape, ctx); + + assert!(view.agent_view_controller().as_ref(ctx).is_active()); + }); + }) +} + +#[test] +fn root_cloud_mode_pane_sets_root_cloud_mode_context_key() { App::test((), |mut app| async move { initialize_app_for_terminal_view(&mut app); app.add_singleton_model(ImportedConfigModel::new); diff --git a/specs/REMOTE-1478/TECH.md b/specs/REMOTE-1478/TECH.md new file mode 100644 index 000000000..71466185f --- /dev/null +++ b/specs/REMOTE-1478/TECH.md @@ -0,0 +1,110 @@ +# REMOTE-1478: Follow-up session attach for ambient agent conversations +## Context +Ambient cloud-agent views currently use the session-sharing viewer stack as a one-session transport. The initial cloud-mode view is created as a deferred shared-session viewer, and `create_cloud_mode_view` subscribes to `AmbientAgentViewModelEvent::SessionReady` to call `viewer::TerminalManager::connect_to_session` once in `app/src/terminal/view/ambient_agent/mod.rs (28-72)`. +That one-shot shape is encoded in `viewer::TerminalManager`. `NetworkState::PendingJoin` owns `prompt_type` and `channel_event_proxy`; `connect_session` consumes those values, creates a `Network`, wires subscriptions, and transitions to `NetworkState::Active`. After that, `connect_to_session` returns `false` for `Active`, so the same manager cannot attach to a second session ID in `app/src/terminal/shared_session/viewer/terminal_manager.rs (73-292)`. +When the shared session ends, the viewer path treats the view as permanently finished. `Network::process_websocket_message` handles `DownstreamMessage::SessionEnded` by calling `close_without_reconnection` and emitting `NetworkEvent::SessionEnded` in `app/src/terminal/shared_session/viewer/network.rs (597-600)`. The terminal manager then calls `shared_session_ended`, which cancels in-progress conversations, unregisters from the shared-session manager, calls `TerminalView::on_session_share_ended`, sets `SharedSessionStatus::FinishedViewer`, and clears the write-to-PTY sender in `app/src/terminal/shared_session/viewer/terminal_manager.rs (635-684)` and `app/src/terminal/shared_session/viewer/terminal_manager.rs (1397-1425)`. +`TerminalView::on_session_share_ended` is a UI teardown routine, not a resumable boundary. It may insert the conversation-ended tombstone, clears `shared_session`, unregisters remote peers, flips viewer input to selectable/read-only, and updates pane sharing state in `app/src/terminal/view/shared_session/view_impl.rs (683-735)`. +The viewer network is scoped to a single session ID and websocket endpoint. Same-session reconnect is handled internally by `Network::reconnect_websocket`, but this work does not need new same-session reconnect behavior. Follow-up executions should join a new session with a fresh `Network` and `InitPayload.last_received_event_no: None`, while preserving the existing `TerminalView`, `TerminalModel`, ambient view model, and AI history. +The first join path loads a session snapshot through `EventLoop::new`. It decodes the `scrollback` from `JoinedSuccessfully` and calls `TerminalModel::load_shared_session_scrollback` before processing ordered terminal events in `app/src/terminal/shared_session/viewer/event_loop.rs (68-131)`. That model method restores serialized blocks into the current blocklist but has no explicit follow-up mode or duplicate-block policy in `app/src/terminal/model/terminal_model.rs (1443-1455)` and `app/src/terminal/model/blocks.rs (729-757)`. +The session-sharing server already supports joining arbitrary session IDs as fresh sessions. A fresh viewer gets `JoinedSuccessfully` with that session's scrollback, active prompt, source type, and latest event number from `/Users/zachbai/dev/session-sharing-server/server/src/sessions/manager/join.rs (190-259)` and `/Users/zachbai/dev/session-sharing-server/server/src/sessions/manager/join.rs (546-641)`. The missing client-side contract is how much of a follow-up session's scrollback is new versus rehydrated from the previous VM. +## Implemented changes +The implementation introduces a fresh-session attach path for ambient follow-up executions. This is not a same-session reconnect feature. A follow-up always means: +- the previous session has ended; +- the caller has a new `SessionId`; +- the ambient agent run ID is unchanged; +- the existing ambient terminal view/model should remain the user-visible conversation; +- a new viewer `Network` should join the new session; +- the new session's contents should append to the existing blocklist. +### Rework viewer manager connection ownership +The one-shot `NetworkState` shape was replaced with state that keeps reusable viewer resources outside the per-session network: +- `NetworkResources { prompt_type, channel_event_proxy }` +- `current_network: Arc>>>` +- `NetworkState::Idle | Connecting | Active(ModelHandle)` +`new_internal` now stores `prompt_type` and `channel_event_proxy` on the manager instead of hiding them inside a pending-join state. Initial viewers call the same internal `connect_session` helper used by deferred cloud-mode viewers with `SharedSessionInitialLoadMode::ReplaceFromSessionScrollback`. +The public attach API is intentionally narrow: +- `connect_to_session(session_id, ctx)` for the initial deferred attach; +- `attach_followup_session(session_id, ctx)` for ambient follow-up attaches. +For follow-up attaches, `attach_followup_session`: +- close/drop the old `current_network` if present; +- install a fresh write-to-PTY channel on the terminal model; +- set shared-session status back to `ViewPending`; +- create a new `Network` for the new session ID; +- wire inbound network events for that specific network; +- update `current_network`. +### Avoid per-session outbound subscription leaks +Outbound subscriptions that previously captured a concrete `network` handle now register once and route through `current_network`: view events, LLM preference changes, input mode changes, selected conversation changes, auto-approve changes, CLI agent input changes, and network status changes. +This one-time route matches the stable-view/stable-manager model: +- view/model subscriptions live for the manager lifetime; +- each callback asks for the current active network; +- if no current network is active, it no-ops; +- inbound subscriptions remain per-network because each `Network` emits events independently. +This also makes N follow-up executions behave like a sequence of network replacements rather than a growing chain of listeners. +### Make ambient session end resumable +Permanent viewer teardown is split from ambient execution-ended handling. Normal shared-session viewers still keep today's behavior: ended banner, read-only input, finished viewer state, and no future attach path. +For shared ambient agent sessions, `SessionEnded` is treated as a resumable execution boundary. `ambient_session_ended` unregisters the live shared-session transport from `Manager`, clears the write-to-PTY sender, and clears `current_network` if it still points at the ended session. It does not call `TerminalView::on_session_share_ended`, does not set `SharedSessionStatus::FinishedViewer`, and does not cancel the ambient conversation, so a later `attach_followup_session` can reuse the same view/model. +### Add ambient follow-up attach event +The ambient view model emits `SessionReady { session_id }` for the initial session and `FollowupSessionReady { session_id }` for later fresh session IDs. +The subscription in `create_cloud_mode_view` dispatches initial sessions to `connect_to_session` and follow-up sessions to `attach_followup_session`. +The “continue” trigger and server API for creating the follow-up execution can land separately, but the session-sharing infra should expose a narrow API that only needs a new `SessionId`. +### Add append-aware scrollback loading +The viewer event loop accepts a `SharedSessionInitialLoadMode`: +- `ReplaceFromSessionScrollback` for initial joins; +- `AppendFollowupScrollback` for ambient follow-up joins. +For initial joins, preserve the current call to `load_shared_session_scrollback`. +For follow-up joins, `EventLoop::new` calls the model/blocklist append path: +`append_followup_shared_session_scrollback(scrollback, is_alt_screen_active)`. +That method: +- finish any previous active block that cannot continue receiving output from the old session; +- mark the model bootstrapped/view-pending for the new session; +- skip scrollback blocks already present in the current blocklist; +- append only new blocks from the follow-up session; +- preserve block ID to block index mappings without duplicates; +- restore the new session's active block as the live block for subsequent ordered terminal events; +- send the same wakeup/refresh signals the current initial load path sends. +The dedupe contract should be explicit. The preferred contract is that the follow-up VM preserves `SerializedBlock.id` for rehydrated prior blocks. Then client-side dedupe is deterministic: skip incoming scrollback blocks whose IDs are already present, and append from the first unknown block onward. If the follow-up VM cannot preserve block IDs, the handoff/session producer needs to provide continuation-only scrollback or a join-payload continuation marker. Without one of those contracts, the client cannot reliably distinguish “old output replayed in a new session” from “new output that happens to look identical.” +### Keep conversation identity stable +`SessionSourceType::AmbientAgent { task_id }` currently drives `AmbientAgentViewModel::enter_viewing_existing_session` and `ActiveAgentViewsModel::register_ambient_session` in `app/src/terminal/shared_session/viewer/terminal_manager.rs (624-666)`. Follow-up executions are guaranteed to keep the same run ID, so the ambient view should continue to use the existing task/run identity across session attachments. +The new session ID is only a new transport for the same ambient run, not a new active conversation. `ActiveAgentViewsModel` continues pointing the same terminal view at the same ambient task ID, and follow-up attach updates session transport state without changing the user-visible conversation identity. +## End-to-end flow +1. User starts an ambient cloud conversation. +2. `AmbientAgentViewModel` emits initial `SessionReady`. +3. `viewer::TerminalManager` attaches the first session with initial load mode. +4. The cloud VM finishes; the session-sharing server sends `SessionEnded`. +5. The viewer manager records an ambient execution-ended state and removes the active network without permanently poisoning the view. +6. User clicks “continue” and sends a follow-up prompt. +7. The follow-up orchestration creates a new VM/session and returns a new `SessionId`. +8. `AmbientAgentViewModel` emits a follow-up session-ready event. +9. The existing terminal manager creates a fresh `Network` for the new session ID. +10. The new event loop loads follow-up scrollback in append mode, skipping already-known blocks. +11. New ordered terminal events stream into the same blocklist and ambient view. +12. Steps 4-11 can repeat for N follow-up executions. +## Risks and mitigations +The highest-risk area is scrollback dedupe. Mitigate by making the rehydration contract explicit before implementation: either preserve block IDs, send continuation-only scrollback, or add a continuation marker. Do not rely on byte/string comparisons of terminal output. +Replacing networks without cleaning subscriptions can leak old networks and duplicate outbound messages. Mitigate by routing all outbound subscriptions through one stable current-network handle and testing repeated follow-ups. +Treating ambient `SessionEnded` as resumable may regress normal shared-session viewers if the paths are not cleanly separated. Mitigate with an explicit ambient-only branch keyed off `TerminalModel::is_shared_ambient_agent_session()` or the attach kind, plus regression coverage for non-ambient viewers. +Conversation identity should be stable because follow-up executions reuse the same run ID. The main risk is accidentally treating the new session ID as a new conversation identity; mitigate by keeping `ActiveAgentViewsModel` registration tied to the existing ambient task/run and treating session IDs as transport state only. +## Testing and validation +Targeted event-loop/blocklist coverage was added for: +- follow-up load skips scrollback blocks whose IDs already exist; +- follow-up load appends new blocks in order; +- duplicate block IDs do not corrupt `block_id_to_block_index`. +Additional follow-up work should add direct `viewer::TerminalManager` coverage for: +- initial deferred attach creates one active network; +- ambient `SessionEnded` transitions to resumable ended state; +- follow-up attach replaces the old network with a new session ID; +- repeated follow-up attach does not duplicate outbound sends; +- non-ambient `SessionEnded` still produces finished/read-only viewer behavior. +Additional integration coverage should exercise the ambient flow: +- start cloud mode and join an initial ambient session; +- simulate session ended; +- attach a second session ID into the same terminal view; +- assert old output remains and new output appears after it; +- repeat with a third session ID; +- assert the same ambient view remains visible and active throughout. +Manual validation should exercise a real cloud-mode conversation once the follow-up API exists: start a cloud agent, wait for the VM/session to end, click continue, confirm a new VM starts, and verify the same ambient view appends new output instead of opening a new transcript or replacing prior blocks. +## Parallelization +The work can split across three mostly independent tracks: +- viewer manager lifecycle: reusable resources, current-network replacement, ambient-ended state, and outbound subscription routing; +- blocklist/event-loop append mode: dedupe contract, append loader, and event-loop load mode tests; +- ambient follow-up trigger: server/API integration for “continue,” model event wiring, and stable conversation identity. +The final integration point is the ambient model handing a fresh follow-up `SessionId` to the viewer manager attach API.