Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions app/src/terminal/model/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,47 @@ impl BlockList {
);
}

pub(super) fn append_followup_shared_session_scrollback(
&mut self,
scrollback: &[SerializedBlock],
) {
self.set_bootstrapped();
let mut processor = Processor::new();

let Some((active_block, completed_blocks)) = scrollback.split_last() else {
return;
};

for block in completed_blocks {
if self.block_index_for_id(&block.id).is_some() {
continue;
}
if block.start_ts.is_some() && block.completed_ts.is_some() {
self.finish_active_block_before_followup_append();
self.restore_block(block, BootstrapStage::PostBootstrapPrecmd, &mut processor);
} else {
log::warn!("A non-active follow-up scrollback block was either not started or not completed");
}
}

if self.block_index_for_id(&active_block.id).is_none() {
debug_assert!(active_block.completed_ts.is_none());
self.finish_active_block_before_followup_append();
self.restore_block(
active_block,
BootstrapStage::PostBootstrapPrecmd,
&mut processor,
);
}
}

fn finish_active_block_before_followup_append(&mut self) {
if !self.active_block().finished() {
self.active_block_mut().finish(0);
self.update_active_block_height();
}
}

/// This is an important function in the block list lifecycle. After this
/// is called, there's an invariant where we always have an active block
/// that's hidden until it's `start`ed.
Expand Down
20 changes: 11 additions & 9 deletions app/src/terminal/model/terminal_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1461,23 +1461,25 @@ impl TerminalModel {
// TODO: we should be doing this in the constructor of the
// terminal model for the viewers so that we're guaranteed that
// loading scrollback is the first thing that we do.
pub fn load_shared_session_scrollback(
&mut self,
scrollback: &[SerializedBlock],
is_alt_screen_active: bool,
) {
pub fn load_shared_session_scrollback(&mut self, scrollback: &[SerializedBlock]) {
debug_assert!(self.shared_session_status().is_viewer());

self.block_list_mut()
.load_shared_session_scrollback(scrollback);
if is_alt_screen_active {
self.enter_alt_screen(true);
}

// The scrollback contains the prompt for the active block, and the terminal view needs to be notified to render it.
self.event_proxy.send_wakeup_event();
}

pub fn append_followup_shared_session_scrollback(&mut self, scrollback: &[SerializedBlock]) {
debug_assert!(self.shared_session_status().is_viewer());

self.block_list_mut()
.append_followup_shared_session_scrollback(scrollback);

self.event_proxy.send_wakeup_event();
}

pub fn obfuscate_secrets(&self) -> ObfuscateSecrets {
self.obfuscate_secrets
}
Expand Down Expand Up @@ -2016,7 +2018,7 @@ impl TerminalModel {
///
/// If the alternate screen is already active, this will not re-initialize
/// it.
fn enter_alt_screen(&mut self, save_cursor_and_clear_screen: bool) {
pub(crate) fn enter_alt_screen(&mut self, save_cursor_and_clear_screen: bool) {
if self.alt_screen_active {
log::info!("Tried to enter the alternate screen, but it was already active");
return;
Expand Down
5 changes: 3 additions & 2 deletions app/src/terminal/shared_session/mod_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ fn test_loading_scrollback() {
];
let channel_event_proxy = ChannelEventListener::new_for_test();
let mut model = terminal_model_for_viewer(channel_event_proxy);
model.load_shared_session_scrollback(scrollback_blocks, false);
model.load_shared_session_scrollback(scrollback_blocks);

// 4 blocks: first is the bootstrap block, the next two are completed scrollback blocks.
// The last is the active block, whose prompt came from the last scrollback.
Expand Down Expand Up @@ -309,7 +309,8 @@ fn test_loading_scrollback_in_alt_screen() {
];
let channel_event_proxy = ChannelEventListener::new_for_test();
let mut model = terminal_model_for_viewer(channel_event_proxy);
model.load_shared_session_scrollback(scrollback_blocks, true);
model.load_shared_session_scrollback(scrollback_blocks);
model.enter_alt_screen(true);

// 3 blocks: first is the bootstrap block, the second is the completed scrollback blocks.
// The last is the active block, whose prompt came from the last scrollback.
Expand Down
11 changes: 4 additions & 7 deletions app/src/terminal/shared_session/selections_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,10 @@ fn create_sharer_and_viewer_models_with_same_block(
let mut viewer_model = terminal_model_for_viewer(channel_event_proxy);
let block = sharer_model.block_list().last_non_hidden_block().unwrap();
let serialized_block = SerializedBlock::from(block);
viewer_model.load_shared_session_scrollback(
&[
serialized_block,
SerializedBlock::new_active_block_for_test(),
],
false,
);
viewer_model.load_shared_session_scrollback(&[
serialized_block,
SerializedBlock::new_active_block_for_test(),
]);

assert_eq!(
viewer_model
Expand Down
29 changes: 26 additions & 3 deletions app/src/terminal/shared_session/viewer/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ use std::collections::HashMap;
/// could indicate an issue.
const TOO_MANY_BUFFERED_EVENTS: usize = 50;

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum SharedSessionInitialLoadMode {
/// Replace the viewer's placeholder block list with the scrollback snapshot from the session
/// being joined.
ReplaceFromSessionScrollback,
/// Add only the new blocks from a follow-up session while preserving the existing shared
/// ambient-agent transcript.
AppendFollowupScrollback,
}

/// The event loop is used to process a stream of events
/// originating from the sender.
pub struct EventLoop {
Expand Down Expand Up @@ -67,13 +77,26 @@ impl EventLoop {
window_size: WindowSize,
scrollback: Scrollback,
catching_up_to_event_no: Option<usize>,
load_mode: SharedSessionInitialLoadMode,
ctx: &mut ModelContext<Self>,
) -> Self {
let scrollback_blocks = decode_scrollback(&scrollback);
let is_alt_screen_active = scrollback.is_alt_screen_active;
terminal_model
.lock()
.load_shared_session_scrollback(scrollback_blocks.as_slice(), is_alt_screen_active);
{
let mut terminal_model = terminal_model.lock();
match load_mode {
SharedSessionInitialLoadMode::ReplaceFromSessionScrollback => {
terminal_model.load_shared_session_scrollback(scrollback_blocks.as_slice());
}
SharedSessionInitialLoadMode::AppendFollowupScrollback => {
terminal_model
.append_followup_shared_session_scrollback(scrollback_blocks.as_slice());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ [IMPORTANT] Append mode only changes scrollback loading; replay events from the follow-up session are still processed from event 0, so a session that replays the existing agent transcript can duplicate AI history. Preserve the load mode on EventLoop and skip replay events for follow-up joins.

}
}
if is_alt_screen_active {
terminal_model.enter_alt_screen(true);
}
}

// When we load scrollback, we might not actually complete a block (e.g. shared session started
// without any scrollback except active block). In this case, we want to make sure the input
Expand Down
114 changes: 111 additions & 3 deletions app/src/terminal/shared_session/viewer/event_loop_test.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::ai::blocklist::agent_view::AgentViewState;
use crate::terminal::model::block::SerializedBlock;
use crate::terminal::model::block::{BlockId, SerializedBlock};
use crate::terminal::shared_session::tests::terminal_model_for_viewer;
use crate::terminal::TerminalView;
use crate::terminal::{
event_listener::ChannelEventListener, shared_session::viewer::event_loop::EventLoop,
event_listener::ChannelEventListener,
shared_session::viewer::event_loop::{EventLoop, SharedSessionInitialLoadMode},
};
use crate::test_util::add_window_with_terminal;
use crate::test_util::terminal::initialize_app_for_terminal_view;
Expand Down Expand Up @@ -32,6 +33,25 @@ fn terminal_view(app: &mut App) -> ViewHandle<TerminalView> {
add_window_with_terminal(app, None)
}

fn completed_block(command: &str, output: &str) -> SerializedBlock {
let mut block =
SerializedBlock::new_for_test(command.as_bytes().into(), output.as_bytes().into());
block.id = BlockId::new();
block
}

fn active_block() -> SerializedBlock {
let mut block = SerializedBlock::new_active_block_for_test();
block.id = BlockId::new();
block
}

fn scrollback_block(block: &SerializedBlock) -> ScrollbackBlock {
ScrollbackBlock {
raw: serde_json::to_vec(block).unwrap(),
}
}

#[test]
fn test_terminal_model_is_correct() {
App::test((), |mut app| async move {
Expand All @@ -55,6 +75,7 @@ fn test_terminal_model_is_correct() {
is_alt_screen_active: false,
},
None,
SharedSessionInitialLoadMode::ReplaceFromSessionScrollback,
ctx,
)
});
Expand All @@ -74,7 +95,7 @@ fn test_terminal_model_is_correct() {
];
{
let mut model = model.lock();
model.load_shared_session_scrollback(scrollback, false);
model.load_shared_session_scrollback(scrollback);
// A hidden block, a completed scrollback block, then the active block.
assert_eq!(model.block_list().blocks().len(), 3);
assert_eq!(
Expand Down Expand Up @@ -116,6 +137,91 @@ fn test_terminal_model_is_correct() {
})
}

#[test]
fn test_append_followup_scrollback_skips_duplicates() {
App::test((), |mut app| async move {
let channel_event_proxy = ChannelEventListener::new_for_test();
let model = Arc::new(FairMutex::new(terminal_model_for_viewer(
channel_event_proxy.clone(),
)));

let terminal_view = terminal_view(&mut app);
let initial_completed = completed_block("initial-command", "initial-output");
let initial_active = active_block();
app.add_model(|ctx| {
EventLoop::new(
model.clone(),
terminal_view.downgrade(),
channel_event_proxy.clone(),
WindowSize {
num_rows: 0,
num_cols: 0,
},
Scrollback {
blocks: vec![
scrollback_block(&initial_completed),
scrollback_block(&initial_active),
],
is_alt_screen_active: false,
},
None,
SharedSessionInitialLoadMode::ReplaceFromSessionScrollback,
ctx,
)
});

assert_eq!(model.lock().block_list().blocks().len(), 3);

let followup_completed = completed_block("followup-command", "followup-output");
let followup_active = active_block();
app.add_model(|ctx| {
EventLoop::new(
model.clone(),
terminal_view.downgrade(),
channel_event_proxy.clone(),
WindowSize {
num_rows: 0,
num_cols: 0,
},
Scrollback {
blocks: vec![
scrollback_block(&initial_completed),
scrollback_block(&followup_completed),
scrollback_block(&followup_active),
],
is_alt_screen_active: false,
},
None,
SharedSessionInitialLoadMode::AppendFollowupScrollback,
ctx,
)
});

let model = model.lock();
let commands = model
.block_list()
.blocks()
.iter()
.map(|block| block.command_to_string())
.collect::<Vec<_>>();
assert_eq!(model.block_list().blocks().len(), 5);
assert_eq!(
commands
.iter()
.filter(|command| command.contains("initial-command"))
.count(),
1
);
assert_eq!(
commands
.iter()
.filter(|command| command.contains("followup-command"))
.count(),
1
);
})
}

#[test]
fn test_out_of_order_buffering() {
App::test((), |mut app| async move {
Expand All @@ -142,6 +248,7 @@ fn test_out_of_order_buffering() {
is_alt_screen_active: false,
},
None,
SharedSessionInitialLoadMode::ReplaceFromSessionScrollback,
ctx,
)
});
Expand Down Expand Up @@ -208,6 +315,7 @@ fn test_pty_bytes_buffered_before_command_execution_started() {
is_alt_screen_active: false,
},
None,
SharedSessionInitialLoadMode::ReplaceFromSessionScrollback,
ctx,
)
});
Expand Down
7 changes: 6 additions & 1 deletion app/src/terminal/shared_session/viewer/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::{
shared_session::{
connect_endpoint,
network::heartbeat::{Event as HeartbeatEvent, Heartbeat},
viewer::event_loop::EventLoop,
viewer::event_loop::{EventLoop, SharedSessionInitialLoadMode},
EventNumber, SELECTION_THROTTLE_PERIOD,
},
TerminalModel, TerminalView,
Expand Down Expand Up @@ -123,6 +123,7 @@ pub struct Network {

channel_event_proxy: ChannelEventListener,
terminal_model: Arc<FairMutex<TerminalModel>>,
initial_load_mode: SharedSessionInitialLoadMode,

stage: Stage,

Expand Down Expand Up @@ -158,6 +159,7 @@ impl Network {
terminal_view: WeakViewHandle<TerminalView>,
terminal_model: Arc<FairMutex<TerminalModel>>,
write_to_pty_events_rx: Receiver<Vec<u8>>,
initial_load_mode: SharedSessionInitialLoadMode,
ctx: &mut ModelContext<Self>,
) -> Self {
let (ws_proxy_tx, ws_proxy_rx) = async_channel::unbounded();
Expand All @@ -175,6 +177,7 @@ impl Network {
ws_proxy_rx: ws_proxy_rx.clone(),
channel_event_proxy,
terminal_model,
initial_load_mode,
terminal_view,
stage: Stage::BeforeJoined,
id: None,
Expand Down Expand Up @@ -237,6 +240,7 @@ impl Network {
ws_proxy_rx,
channel_event_proxy,
terminal_model,
initial_load_mode: SharedSessionInitialLoadMode::ReplaceFromSessionScrollback,
terminal_view,
stage: Stage::BeforeJoined,
id: Some(viewer_id.clone()),
Expand Down Expand Up @@ -555,6 +559,7 @@ impl Network {
window_size,
*scrollback,
latest_event_no,
self.initial_load_mode,
ctx,
)
});
Expand Down
Loading
Loading