Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 131 additions & 13 deletions app/src/ai/agent_sdk/driver/harness/codex.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::sync::Arc;
use std::path::{Path, PathBuf};
use std::sync::{Arc, OnceLock};

use anyhow::{Context, Result};
use async_trait::async_trait;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use tempfile::NamedTempFile;
use uuid::Uuid;
use warp_cli::agent::Harness;
use warp_managed_secrets::ManagedSecretValue;
use warpui::{ModelHandle, ModelSpawner};
use warpui::{ModelHandle, ModelSpawner, SingletonEntity};

use crate::ai::agent::conversation::AIConversationId;
use crate::ai::ambient_agents::AmbientAgentTaskId;
use crate::server::server_api::harness_support::HarnessSupportClient;
use crate::server::server_api::harness_support::{upload_to_target, HarnessSupportClient};
use crate::server::server_api::ServerApi;
use crate::terminal::cli_agent_sessions::CLIAgentSessionsModel;
use crate::terminal::model::block::BlockId;
use crate::terminal::CLIAgent;

use super::super::terminal::{CommandHandle, TerminalDriver};
use super::super::{AgentDriver, AgentDriverError};
use super::claude_transcript::read_jsonl;
use super::codex_transcript::{
codex_sessions_root, find_session_file, parse_session_meta, CodexTranscriptEnvelope,
};
use super::json_utils::read_json_file_or_default;
use super::{write_temp_file, HarnessRunner, ResumePayload, SavePoint, ThirdPartyHarness};

Expand Down Expand Up @@ -108,6 +114,13 @@ struct CodexHarnessRunner {
client: Arc<dyn HarnessSupportClient>,
terminal_driver: ModelHandle<TerminalDriver>,
state: Mutex<CodexRunnerState>,
/// Codex session UUID. Populated lazily by [`HarnessRunner::handle_session_update`]
/// once the codex hooks emit `SessionStart`. Set once (using `OnceLock`).
session_id: OnceLock<Uuid>,
/// Path to the codex session rollout JSONL file. Populated by the first
/// successful [`find_session_file`] walk so that subsequent saves skip the YYYY/MM/DD
/// directory walk and read the JSONL file directly.
transcript_path: OnceLock<PathBuf>,
}

impl CodexHarnessRunner {
Expand All @@ -128,8 +141,28 @@ impl CodexHarnessRunner {
client,
terminal_driver,
state: Mutex::new(CodexRunnerState::Preexec),
session_id: OnceLock::new(),
transcript_path: OnceLock::new(),
})
}

/// Return the filepath for the session transcript, walking the codex sessions tree to find it on the
/// first save call.
async fn resolve_transcript_path(&self) -> Option<PathBuf> {
if let Some(cached) = self.transcript_path.get() {
return Some(cached.clone());
}
let session_id = self.session_id.get().copied()?;
let resolved = tokio::task::spawn_blocking(move || -> Option<PathBuf> {
let root = codex_sessions_root().ok()?;
find_session_file(&root, session_id)
})
.await
.ok()
.flatten()?;
let _ = self.transcript_path.set(resolved.clone());
Some(resolved)
}
}

#[cfg_attr(not(target_family = "wasm"), async_trait)]
Expand Down Expand Up @@ -179,6 +212,38 @@ impl HarnessRunner for CodexHarnessRunner {
.map_err(|_| anyhow::anyhow!("Agent driver dropped while sending /exit"))
}

/// Capture the codex session ID from the `SessionStart` event picked up by the `CLIAgentSessionsModel`.
///
/// Relies on codex hooks being set up to emit this event correctly.
async fn handle_session_update(&self, foreground: &ModelSpawner<AgentDriver>) -> Result<()> {
if self.session_id.get().is_some() {
return Ok(());
}
let terminal_driver = self.terminal_driver.clone();
let session_id_str = foreground
.spawn(move |_, ctx| {
let terminal_view_id = terminal_driver.as_ref(ctx).terminal_view().id();
CLIAgentSessionsModel::handle(ctx)
.as_ref(ctx)
.session(terminal_view_id)
.and_then(|s| s.session_context.session_id.clone())
})
.await
.ok()
.flatten();
let Some(session_id_str) = session_id_str else {
return Ok(());
};
match Uuid::parse_str(&session_id_str) {
Ok(uuid) => {
log::info!("Captured codex session id {uuid}");
let _ = self.session_id.set(uuid);
}
Err(e) => log::warn!("Failed to parse codex session id '{session_id_str}': {e}"),
}
Ok(())
}

async fn save_conversation(
&self,
save_point: SavePoint,
Expand All @@ -202,18 +267,71 @@ impl HarnessRunner for CodexHarnessRunner {
} => (*conversation_id, block_id.clone()),
};

// TODO(REMOTE-1504) Also save the conversation transcript.
super::upload_current_block_snapshot(
foreground,
&self.terminal_driver,
self.client.as_ref(),
conversation_id,
block_id,
)
.await
let session_id = self.session_id.get().copied();
let rollout_path = self.resolve_transcript_path().await;
let client = self.client.as_ref();

let is_final = matches!(save_point, SavePoint::Final);
futures::try_join!(
super::upload_current_block_snapshot(
foreground,
&self.terminal_driver,
client,
conversation_id,
block_id,
),
upload_transcript(client, conversation_id, session_id, rollout_path, is_final),
)?;
Ok(())
}
}

/// Upload the codex session transcript to the server. No-ops if the session UUID hasn't
/// been captured yet or no rollout file is on disk yet.
async fn upload_transcript(
client: &dyn HarnessSupportClient,
conversation_id: AIConversationId,
session_id: Option<Uuid>,
transcript_path: Option<PathBuf>,
is_final: bool,
) -> Result<()> {
let Some(session_id) = session_id else {
if is_final {
log::warn!(
"Codex session id still unknown at final save; transcript was never uploaded"
);
} else {
log::debug!("Codex session id not yet known; skipping transcript upload");
}
return Ok(());
};
let Some(transcript_path) = transcript_path else {
if is_final {
log::warn!("No codex rollout file found at final save for session {session_id}; transcript was never uploaded");
} else {
log::debug!("No codex rollout file yet for session {session_id}");
}
return Ok(());
};
log::info!("Uploading codex transcript to conversation {conversation_id}");

let body = tokio::task::spawn_blocking(move || -> Result<Vec<u8>> {
let entries = read_jsonl(&transcript_path)?;
let metadata = parse_session_meta(entries.first()).unwrap_or_default();
let envelope = CodexTranscriptEnvelope::new(session_id, metadata, entries);
serde_json::to_vec(&envelope).context("Failed to serialize codex transcript")
})
.await
.context("read_envelope task panicked")??;

let target = client
.get_transcript_upload_target(&conversation_id)
.await
.with_context(|| format!("Failed to get transcript upload target for {conversation_id}"))?;
upload_to_target(client.http_client(), &target, body).await?;
Ok(())
}

const CODEX_CONFIG_DIR: &str = ".codex";
const CODEX_AGENTS_OVERRIDE_FILE_NAME: &str = "AGENTS.override.md";
const CODEX_AUTH_FILE_NAME: &str = "auth.json";
Expand Down
125 changes: 125 additions & 0 deletions app/src/ai/agent_sdk/driver/harness/codex_transcript.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
//! Codex session transcript envelope + read helpers.
//!
//! Owns:
//! - [`CodexTranscriptEnvelope`] — the on-wire/on-GCS shape of a saved Codex rollout
//! (parsed JSONL entries plus session-level metadata). Reader functions interoperate
//! with Codex's own `~/.codex/sessions/YYYY/MM/DD/rollout-<ts>-<uuid>.jsonl` layout
//! (codex `rollout/src/recorder.rs`).
use std::fs;
use std::path::{Path, PathBuf};

use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;

/// Env var codex honors to override `~/.codex` (see codex `core/src/config/mod.rs`).
const CODEX_HOME_ENV: &str = "CODEX_HOME";
const CODEX_HOME_DIRNAME: &str = ".codex";
/// Subdirectory under `$CODEX_HOME` where rollouts live.
const CODEX_SESSIONS_SUBDIR: &str = "sessions";

/// JSON envelope sent to the server representing a complete Codex session.
///
/// The transcript is the parsed JSONL content of the rollout file; codex's resume
/// path re-reads this JSONL line by line.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub(crate) struct CodexTranscriptEnvelope {
/// The directory the codex session started in (recovered from the `SessionMeta` line).
pub(crate) cwd: PathBuf,
/// Codex session/thread UUID. Matches the trailing `-<uuid>` in the rollout filename.
pub(crate) session_id: Uuid,
/// `cli_version` from `SessionMeta`, surfaced separately for the server.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(crate) codex_version: Option<String>,
/// Parsed JSONL entries.
pub(crate) entries: Vec<Value>,
}

impl CodexTranscriptEnvelope {
pub(crate) fn new(session_id: Uuid, meta: CodexSessionMetadata, entries: Vec<Value>) -> Self {
Self {
cwd: meta.cwd,
session_id,
codex_version: meta.codex_version,
entries,
}
}
}

/// Session-level metadata pulled from the rollout's `SessionMeta` line.
#[derive(Clone, Debug, Default, PartialEq)]
pub(crate) struct CodexSessionMetadata {
pub(crate) cwd: PathBuf,
pub(crate) codex_version: Option<String>,
}

/// Resolve the codex sessions root, honoring `$CODEX_HOME` then falling back to `~/.codex`.
pub(crate) fn codex_sessions_root() -> anyhow::Result<PathBuf> {
let home = if let Ok(dir) = std::env::var(CODEX_HOME_ENV) {
PathBuf::from(dir)
} else {
dirs::home_dir()
.ok_or_else(|| anyhow::anyhow!("could not determine home directory"))?
.join(CODEX_HOME_DIRNAME)
};
Ok(home.join(CODEX_SESSIONS_SUBDIR))
}

/// Walk `<sessions_root>/YYYY/MM/DD/` looking for a `rollout-*-<session_id>.jsonl`.
///
/// Returns `None` if `sessions_root` doesn't exist yet or no matching file is found.
pub(crate) fn find_session_file(sessions_root: &Path, session_id: Uuid) -> Option<PathBuf> {
if !sessions_root.exists() {
return None;
}
let suffix = format!("-{session_id}.jsonl");
for year_dir in read_subdirs(sessions_root) {
for month_dir in read_subdirs(&year_dir) {
for day_dir in read_subdirs(&month_dir) {
let entries = match fs::read_dir(&day_dir) {
Ok(e) => e,
Err(_) => continue,
};
for entry in entries.flatten() {
let path = entry.path();
let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
continue;
};
if name.starts_with("rollout-") && name.ends_with(&suffix) {
return Some(path);
}
}
}
}
}
None
}

fn read_subdirs(parent: &Path) -> impl Iterator<Item = PathBuf> {
fs::read_dir(parent)
.into_iter()
.flatten()
.filter_map(|entry| {
let entry = entry.ok()?;
entry.file_type().ok()?.is_dir().then(|| entry.path())
})
}

/// Pull `cwd` and `cli_version` out of the first JSONL line if it's a `SessionMeta`.
pub(crate) fn parse_session_meta(first: Option<&Value>) -> Option<CodexSessionMetadata> {
let entry = first?;
if entry.get("type").and_then(|v| v.as_str()) != Some("session_meta") {
return None;
}
let payload = entry.get("payload")?;
let cwd = PathBuf::from(payload.get("cwd").and_then(|v| v.as_str())?);
let codex_version = payload
.get("cli_version")
.and_then(|v| v.as_str())
.map(str::to_owned);
Some(CodexSessionMetadata { cwd, codex_version })
}

#[cfg(test)]
#[path = "codex_transcript_tests.rs"]
mod tests;
Loading
Loading