diff --git a/app/src/ai/agent_sdk/driver/harness/codex.rs b/app/src/ai/agent_sdk/driver/harness/codex.rs index 37ccc1da6..a6d0234a0 100644 --- a/app/src/ai/agent_sdk/driver/harness/codex.rs +++ b/app/src/ai/agent_sdk/driver/harness/codex.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::fs; -use std::path::Path; -use std::sync::Arc; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, OnceLock}; use anyhow::{Context, Result}; use async_trait::async_trait; @@ -9,19 +9,25 @@ use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use tempfile::NamedTempFile; +use uuid::Uuid; use warp_cli::agent::Harness; use warp_managed_secrets::ManagedSecretValue; -use warpui::{ModelHandle, ModelSpawner}; +use warpui::{ModelHandle, ModelSpawner, SingletonEntity}; use crate::ai::agent::conversation::AIConversationId; use crate::ai::ambient_agents::AmbientAgentTaskId; -use crate::server::server_api::harness_support::HarnessSupportClient; +use crate::server::server_api::harness_support::{upload_to_target, HarnessSupportClient}; use crate::server::server_api::ServerApi; +use crate::terminal::cli_agent_sessions::CLIAgentSessionsModel; use crate::terminal::model::block::BlockId; use crate::terminal::CLIAgent; use super::super::terminal::{CommandHandle, TerminalDriver}; use super::super::{AgentDriver, AgentDriverError}; +use super::claude_transcript::read_jsonl; +use super::codex_transcript::{ + codex_sessions_root, find_session_file, parse_session_meta, CodexTranscriptEnvelope, +}; use super::json_utils::read_json_file_or_default; use super::{write_temp_file, HarnessRunner, ResumePayload, SavePoint, ThirdPartyHarness}; @@ -108,6 +114,13 @@ struct CodexHarnessRunner { client: Arc, terminal_driver: ModelHandle, state: Mutex, + /// Codex session UUID. Populated lazily by [`HarnessRunner::handle_session_update`] + /// once the codex hooks emit `SessionStart`. Set once (using `OnceLock`). + session_id: OnceLock, + /// Path to the codex session rollout JSONL file. Populated by the first + /// successful [`find_session_file`] walk so that subsequent saves skip the YYYY/MM/DD + /// directory walk and read the JSONL file directly. + transcript_path: OnceLock, } impl CodexHarnessRunner { @@ -128,8 +141,28 @@ impl CodexHarnessRunner { client, terminal_driver, state: Mutex::new(CodexRunnerState::Preexec), + session_id: OnceLock::new(), + transcript_path: OnceLock::new(), }) } + + /// Return the filepath for the session transcript, walking the codex sessions tree to find it on the + /// first save call. + async fn resolve_transcript_path(&self) -> Option { + if let Some(cached) = self.transcript_path.get() { + return Some(cached.clone()); + } + let session_id = self.session_id.get().copied()?; + let resolved = tokio::task::spawn_blocking(move || -> Option { + let root = codex_sessions_root().ok()?; + find_session_file(&root, session_id) + }) + .await + .ok() + .flatten()?; + let _ = self.transcript_path.set(resolved.clone()); + Some(resolved) + } } #[cfg_attr(not(target_family = "wasm"), async_trait)] @@ -179,6 +212,38 @@ impl HarnessRunner for CodexHarnessRunner { .map_err(|_| anyhow::anyhow!("Agent driver dropped while sending /exit")) } + /// Capture the codex session ID from the `SessionStart` event picked up by the `CLIAgentSessionsModel`. + /// + /// Relies on codex hooks being set up to emit this event correctly. + async fn handle_session_update(&self, foreground: &ModelSpawner) -> Result<()> { + if self.session_id.get().is_some() { + return Ok(()); + } + let terminal_driver = self.terminal_driver.clone(); + let session_id_str = foreground + .spawn(move |_, ctx| { + let terminal_view_id = terminal_driver.as_ref(ctx).terminal_view().id(); + CLIAgentSessionsModel::handle(ctx) + .as_ref(ctx) + .session(terminal_view_id) + .and_then(|s| s.session_context.session_id.clone()) + }) + .await + .ok() + .flatten(); + let Some(session_id_str) = session_id_str else { + return Ok(()); + }; + match Uuid::parse_str(&session_id_str) { + Ok(uuid) => { + log::info!("Captured codex session id {uuid}"); + let _ = self.session_id.set(uuid); + } + Err(e) => log::warn!("Failed to parse codex session id '{session_id_str}': {e}"), + } + Ok(()) + } + async fn save_conversation( &self, save_point: SavePoint, @@ -202,18 +267,71 @@ impl HarnessRunner for CodexHarnessRunner { } => (*conversation_id, block_id.clone()), }; - // TODO(REMOTE-1504) Also save the conversation transcript. - super::upload_current_block_snapshot( - foreground, - &self.terminal_driver, - self.client.as_ref(), - conversation_id, - block_id, - ) - .await + let session_id = self.session_id.get().copied(); + let rollout_path = self.resolve_transcript_path().await; + let client = self.client.as_ref(); + + let is_final = matches!(save_point, SavePoint::Final); + futures::try_join!( + super::upload_current_block_snapshot( + foreground, + &self.terminal_driver, + client, + conversation_id, + block_id, + ), + upload_transcript(client, conversation_id, session_id, rollout_path, is_final), + )?; + Ok(()) } } +/// Upload the codex session transcript to the server. No-ops if the session UUID hasn't +/// been captured yet or no rollout file is on disk yet. +async fn upload_transcript( + client: &dyn HarnessSupportClient, + conversation_id: AIConversationId, + session_id: Option, + transcript_path: Option, + is_final: bool, +) -> Result<()> { + let Some(session_id) = session_id else { + if is_final { + log::warn!( + "Codex session id still unknown at final save; transcript was never uploaded" + ); + } else { + log::debug!("Codex session id not yet known; skipping transcript upload"); + } + return Ok(()); + }; + let Some(transcript_path) = transcript_path else { + if is_final { + log::warn!("No codex rollout file found at final save for session {session_id}; transcript was never uploaded"); + } else { + log::debug!("No codex rollout file yet for session {session_id}"); + } + return Ok(()); + }; + log::info!("Uploading codex transcript to conversation {conversation_id}"); + + let body = tokio::task::spawn_blocking(move || -> Result> { + let entries = read_jsonl(&transcript_path)?; + let metadata = parse_session_meta(entries.first()).unwrap_or_default(); + let envelope = CodexTranscriptEnvelope::new(session_id, metadata, entries); + serde_json::to_vec(&envelope).context("Failed to serialize codex transcript") + }) + .await + .context("read_envelope task panicked")??; + + let target = client + .get_transcript_upload_target(&conversation_id) + .await + .with_context(|| format!("Failed to get transcript upload target for {conversation_id}"))?; + upload_to_target(client.http_client(), &target, body).await?; + Ok(()) +} + const CODEX_CONFIG_DIR: &str = ".codex"; const CODEX_AGENTS_OVERRIDE_FILE_NAME: &str = "AGENTS.override.md"; const CODEX_AUTH_FILE_NAME: &str = "auth.json"; diff --git a/app/src/ai/agent_sdk/driver/harness/codex_transcript.rs b/app/src/ai/agent_sdk/driver/harness/codex_transcript.rs new file mode 100644 index 000000000..98c1c8d7f --- /dev/null +++ b/app/src/ai/agent_sdk/driver/harness/codex_transcript.rs @@ -0,0 +1,125 @@ +//! Codex session transcript envelope + read helpers. +//! +//! Owns: +//! - [`CodexTranscriptEnvelope`] — the on-wire/on-GCS shape of a saved Codex rollout +//! (parsed JSONL entries plus session-level metadata). Reader functions interoperate +//! with Codex's own `~/.codex/sessions/YYYY/MM/DD/rollout--.jsonl` layout +//! (codex `rollout/src/recorder.rs`). +use std::fs; +use std::path::{Path, PathBuf}; + +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use uuid::Uuid; + +/// Env var codex honors to override `~/.codex` (see codex `core/src/config/mod.rs`). +const CODEX_HOME_ENV: &str = "CODEX_HOME"; +const CODEX_HOME_DIRNAME: &str = ".codex"; +/// Subdirectory under `$CODEX_HOME` where rollouts live. +const CODEX_SESSIONS_SUBDIR: &str = "sessions"; + +/// JSON envelope sent to the server representing a complete Codex session. +/// +/// The transcript is the parsed JSONL content of the rollout file; codex's resume +/// path re-reads this JSONL line by line. +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub(crate) struct CodexTranscriptEnvelope { + /// The directory the codex session started in (recovered from the `SessionMeta` line). + pub(crate) cwd: PathBuf, + /// Codex session/thread UUID. Matches the trailing `-` in the rollout filename. + pub(crate) session_id: Uuid, + /// `cli_version` from `SessionMeta`, surfaced separately for the server. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub(crate) codex_version: Option, + /// Parsed JSONL entries. + pub(crate) entries: Vec, +} + +impl CodexTranscriptEnvelope { + pub(crate) fn new(session_id: Uuid, meta: CodexSessionMetadata, entries: Vec) -> Self { + Self { + cwd: meta.cwd, + session_id, + codex_version: meta.codex_version, + entries, + } + } +} + +/// Session-level metadata pulled from the rollout's `SessionMeta` line. +#[derive(Clone, Debug, Default, PartialEq)] +pub(crate) struct CodexSessionMetadata { + pub(crate) cwd: PathBuf, + pub(crate) codex_version: Option, +} + +/// Resolve the codex sessions root, honoring `$CODEX_HOME` then falling back to `~/.codex`. +pub(crate) fn codex_sessions_root() -> anyhow::Result { + let home = if let Ok(dir) = std::env::var(CODEX_HOME_ENV) { + PathBuf::from(dir) + } else { + dirs::home_dir() + .ok_or_else(|| anyhow::anyhow!("could not determine home directory"))? + .join(CODEX_HOME_DIRNAME) + }; + Ok(home.join(CODEX_SESSIONS_SUBDIR)) +} + +/// Walk `/YYYY/MM/DD/` looking for a `rollout-*-.jsonl`. +/// +/// Returns `None` if `sessions_root` doesn't exist yet or no matching file is found. +pub(crate) fn find_session_file(sessions_root: &Path, session_id: Uuid) -> Option { + if !sessions_root.exists() { + return None; + } + let suffix = format!("-{session_id}.jsonl"); + for year_dir in read_subdirs(sessions_root) { + for month_dir in read_subdirs(&year_dir) { + for day_dir in read_subdirs(&month_dir) { + let entries = match fs::read_dir(&day_dir) { + Ok(e) => e, + Err(_) => continue, + }; + for entry in entries.flatten() { + let path = entry.path(); + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + if name.starts_with("rollout-") && name.ends_with(&suffix) { + return Some(path); + } + } + } + } + } + None +} + +fn read_subdirs(parent: &Path) -> impl Iterator { + fs::read_dir(parent) + .into_iter() + .flatten() + .filter_map(|entry| { + let entry = entry.ok()?; + entry.file_type().ok()?.is_dir().then(|| entry.path()) + }) +} + +/// Pull `cwd` and `cli_version` out of the first JSONL line if it's a `SessionMeta`. +pub(crate) fn parse_session_meta(first: Option<&Value>) -> Option { + let entry = first?; + if entry.get("type").and_then(|v| v.as_str()) != Some("session_meta") { + return None; + } + let payload = entry.get("payload")?; + let cwd = PathBuf::from(payload.get("cwd").and_then(|v| v.as_str())?); + let codex_version = payload + .get("cli_version") + .and_then(|v| v.as_str()) + .map(str::to_owned); + Some(CodexSessionMetadata { cwd, codex_version }) +} + +#[cfg(test)] +#[path = "codex_transcript_tests.rs"] +mod tests; diff --git a/app/src/ai/agent_sdk/driver/harness/codex_transcript_tests.rs b/app/src/ai/agent_sdk/driver/harness/codex_transcript_tests.rs new file mode 100644 index 000000000..1725e6d52 --- /dev/null +++ b/app/src/ai/agent_sdk/driver/harness/codex_transcript_tests.rs @@ -0,0 +1,112 @@ +use std::fs; +use std::path::Path; + +use anyhow::Result; +use tempfile::TempDir; +use uuid::Uuid; + +use super::super::claude_transcript::read_jsonl; +use super::*; + +/// Walk `sessions_root` for `session_id`'s rollout and assemble an envelope. +fn read_envelope( + session_id: Uuid, + sessions_root: &Path, +) -> Result> { + let Some(path) = find_session_file(sessions_root, session_id) else { + return Ok(None); + }; + let entries = read_jsonl(&path)?; + let meta = parse_session_meta(entries.first()).unwrap_or_default(); + Ok(Some(CodexTranscriptEnvelope::new( + session_id, meta, entries, + ))) +} + +/// Minimal SessionMeta line in the same shape codex writes (codex +/// `protocol/src/protocol.rs::RolloutItem`): `{type, payload}`. +fn session_meta_line(uuid: Uuid, cwd: &str, timestamp: &str, cli_version: &str) -> String { + serde_json::json!({ + "type": "session_meta", + "payload": { + "id": uuid.to_string(), + "timestamp": timestamp, + "cwd": cwd, + "originator": "test", + "cli_version": cli_version, + }, + }) + .to_string() +} + +#[test] +#[serial_test::serial] +fn codex_sessions_root_honors_codex_home_env() { + let tmp = TempDir::new().unwrap(); + let prev = std::env::var(CODEX_HOME_ENV).ok(); + std::env::set_var(CODEX_HOME_ENV, tmp.path()); + + let root = codex_sessions_root().unwrap(); + + match prev { + Some(v) => std::env::set_var(CODEX_HOME_ENV, v), + None => std::env::remove_var(CODEX_HOME_ENV), + } + assert_eq!(root, tmp.path().join(CODEX_SESSIONS_SUBDIR)); +} + +#[test] +fn find_session_file_walks_yyyy_mm_dd_tree() { + let tmp = TempDir::new().unwrap(); + let uuid = Uuid::new_v4(); + let day = tmp.path().join("2026").join("04").join("30"); + fs::create_dir_all(&day).unwrap(); + let file = day.join(format!("rollout-ignored-ts-{uuid}.jsonl")); + fs::write(&file, "").unwrap(); + + let found = find_session_file(tmp.path(), uuid); + assert_eq!(found, Some(file)); +} + +#[test] +fn find_session_file_returns_none_when_no_match() { + let tmp = TempDir::new().unwrap(); + let day = tmp.path().join("2026").join("04").join("30"); + fs::create_dir_all(&day).unwrap(); + fs::write( + day.join(format!("rollout-ignored-ts-{}.jsonl", Uuid::new_v4())), + "", + ) + .unwrap(); + + assert!(find_session_file(tmp.path(), Uuid::new_v4()).is_none()); +} + +#[test] +fn find_session_file_returns_none_when_root_missing() { + let tmp = TempDir::new().unwrap(); + assert!(find_session_file(&tmp.path().join("missing"), Uuid::new_v4()).is_none()); +} + +#[test] +fn read_envelope_recovers_cwd_and_version_from_session_meta() { + let tmp = TempDir::new().unwrap(); + let uuid = Uuid::new_v4(); + let day = tmp.path().join("2026").join("04").join("30"); + fs::create_dir_all(&day).unwrap(); + let meta = session_meta_line(uuid, "/work/proj", "2026-04-30T01:54:20.000Z", "0.55.0"); + let body = format!("{meta}\n{{\"type\":\"event_msg\",\"payload\":{{\"x\":1}}}}\n"); + fs::write(day.join(format!("rollout-ignored-ts-{uuid}.jsonl")), body).unwrap(); + + let envelope = read_envelope(uuid, tmp.path()).unwrap().unwrap(); + assert_eq!(envelope.session_id, uuid); + assert_eq!(envelope.cwd, std::path::PathBuf::from("/work/proj")); + assert_eq!(envelope.codex_version.as_deref(), Some("0.55.0")); + assert_eq!(envelope.entries.len(), 2); +} + +#[test] +fn read_envelope_returns_none_when_missing() { + let tmp = TempDir::new().unwrap(); + assert!(read_envelope(Uuid::new_v4(), tmp.path()).unwrap().is_none()); +} diff --git a/app/src/ai/agent_sdk/driver/harness/mod.rs b/app/src/ai/agent_sdk/driver/harness/mod.rs index cbb083520..9417e4211 100644 --- a/app/src/ai/agent_sdk/driver/harness/mod.rs +++ b/app/src/ai/agent_sdk/driver/harness/mod.rs @@ -37,6 +37,7 @@ use super::{ mod claude_code; pub(crate) mod claude_transcript; mod codex; +pub(crate) mod codex_transcript; mod gemini; mod json_utils; diff --git a/specs/REMOTE-1504/TECH.md b/specs/REMOTE-1504/TECH.md new file mode 100644 index 000000000..6eb8eda6c --- /dev/null +++ b/specs/REMOTE-1504/TECH.md @@ -0,0 +1,53 @@ +# REMOTE-1504: Save and Upload Codex Conversation Transcript + +## Context +Cloud agent runs using the Codex harness already upload block snapshots on each save, but the session transcript (the JSONL rollout Codex writes to disk) was not being captured. Claude Code already does this — `claude_code.rs` calls `upload_transcript` alongside `upload_current_block_snapshot` via `futures::try_join!`, reading the session JSONL from `~/.claude/projects/…/.jsonl`. Codex stores its rollouts differently: `~/.codex/sessions/YYYY/MM/DD/rollout--.jsonl`, so transcript capture needs its own envelope format and file-discovery logic. + +### Relevant files +- `app/src/ai/agent_sdk/driver/harness/codex.rs` — `CodexHarnessRunner` impl, owns the per-run state and `save_conversation` +- `app/src/ai/agent_sdk/driver/harness/claude_code.rs (504-528)` — Claude's `upload_transcript`, the pattern being mirrored +- `app/src/ai/agent_sdk/driver/harness/claude_transcript.rs` — `ClaudeTranscriptEnvelope`, `read_envelope`, `read_jsonl` +- `app/src/terminal/cli_agent_sessions/mod.rs` — `CLIAgentSessionsModel`, singleton that tracks CLI agent session context including `session_id` +- `app/src/ai/agent_sdk/driver/harness/mod.rs` — `HarnessRunner` trait, `upload_current_block_snapshot`, `handle_session_update` + +## Proposed changes + +### New module: `codex_transcript.rs` +Parallel to `claude_transcript.rs`. Contains: + +- **`CodexTranscriptEnvelope`** — on-wire JSON shape: `{ cwd, session_id, codex_version?, entries }`. Simpler than Claude's envelope (no subagents/todos). `entries` is the parsed JSONL content. +- **`CodexSessionMetadata`** — `{ cwd, codex_version }` extracted from the first JSONL line (`SessionMeta`). Cached via `OnceLock` on the runner so subsequent saves skip reparsing. +- **`codex_sessions_root()`** — resolves `$CODEX_HOME/sessions` or `~/.codex/sessions`. +- **`find_session_file(sessions_root, session_id)`** — walks `YYYY/MM/DD/` dirs looking for `rollout-*-.jsonl`. Returns `Ok(None)` when root doesn't exist or no match found. The walk is unavoidable since Codex names files with timestamps we don't control; acceptable cost on cloud agents where the sessions dir is small. The path is cached on the runner after first discovery. +- **`parse_session_meta(first_entry)`** — pulls `cwd` and `cli_version` from the first JSONL entry's `payload` object. Constant for session lifetime so callers cache the result. + +Reuses `read_jsonl` from `claude_transcript` for the actual JSONL parsing. + +### Changes to `CodexHarnessRunner` (`codex.rs`) +Three `OnceLock` fields added for lazy, set-once caching: +- `session_id: OnceLock` — captured from `CLIAgentSessionsModel` when hooks emit `SessionStart` +- `transcript_path: OnceLock` — resolved by `find_session_file` on first save, cached thereafter + +This caching pattern differs from Claude Code, which re-reads the config dir every save. Done here for consistency with the immutable-once-set nature of Codex's `SessionMeta` line, and because the `YYYY/MM/DD` dir walk is more expensive than Claude's direct path lookup. + +**`handle_session_update`** — new override. Reads session ID from `CLIAgentSessionsModel` (the singleton that receives events from the Codex hooks plugin). Parses the string into a `Uuid` and stores it in the `OnceLock`. No-ops once set. The session ID is needed to find the rollout file. + +**`save_conversation`** — now runs `upload_current_block_snapshot` and `upload_transcript` concurrently via `futures::try_join!`, matching Claude's pattern. `upload_transcript` is a standalone async fn that: +1. Returns early if session ID or transcript path aren't available yet (early periodic saves before hooks fire) +2. Reads + parses JSONL in `spawn_blocking` +3. Uses cached metadata or parses it from the first entry +4. Builds `CodexTranscriptEnvelope`, serializes, uploads via `get_transcript_upload_target` + `upload_to_target` +5. Returns newly-parsed metadata (if any) so the caller can cache it + +### Design note: dir walk vs timestamp-based path +Codex filenames embed `rollout--.jsonl`. An alternative to the walk would be computing the expected `YYYY/MM/DD` from the session start time. Rejected because timezone/midnight-boundary bugs make it fragile — a session starting at 23:59 local might land in tomorrow's dir depending on Codex's clock handling. The walk is safe and runs once per session. + +## Testing and validation +- `codex_transcript_tests.rs` — unit tests covering: + - `codex_sessions_root` honors `$CODEX_HOME` env var override + - `find_session_file` walks a synthetic `YYYY/MM/DD` tree and matches the right UUID + - `find_session_file` returns `None` for non-matching UUID + - `find_session_file` returns `None` when root is missing + - `read_envelope` round-trip: writes a synthetic rollout with `SessionMeta` + event lines, recovers `cwd`, `codex_version`, correct entry count + - `read_envelope` returns `None` for missing session +- Manual: run a cloud agent with `--harness codex`, verify transcript appears in GCS after save