diff --git a/Cargo.lock b/Cargo.lock index eff09a5..dc8acc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1196,9 +1196,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.175" +version = "0.2.180" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543" +checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" [[package]] name = "linux-raw-sys" diff --git a/crates/coco-tui/src/components/chat.rs b/crates/coco-tui/src/components/chat.rs index bbd9dfa..3ff1dd2 100644 --- a/crates/coco-tui/src/components/chat.rs +++ b/crates/coco-tui/src/components/chat.rs @@ -1,13 +1,14 @@ use coco_macro::ComponentExt; -use code_combo::tools::{BashInput, Final}; +use code_combo::tools::{ + ComboEvent as ComboToolEvent, ComboInfo, ComboStreamKind, Final, RUN_COMBO_TOOL_NAME, + RunComboInput, +}; use code_combo::{ Agent, Block as ChatBlock, ChatResponse, ChatStreamUpdate, Config, Content as ChatContent, - Message as ChatMessage, Output, RuntimeOverrides, SessionEnv, Starter, StarterCommand, - StarterError, StarterEvent, StopReason, TextEdit, ToolUse, UsageStats, bash_unsafe_ranges, - discover_starters, load_runtime_overrides, parse_primary_command, save_runtime_overrides, + Message as ChatMessage, Output, RuntimeOverrides, Starter, StopReason, TextEdit, ToolUse, + UsageStats, discover_starters, load_runtime_overrides, save_runtime_overrides, }; use crossterm::event::{KeyCode, KeyEvent, KeyModifiers}; -use futures::StreamExt; use ratatui::{ Frame, layout::{Constraint, Layout, Rect}, @@ -19,8 +20,8 @@ use ratatui::{ use serde::{Deserialize, Serialize}; use serde_json::Value; -use snafu::prelude::*; use std::{ + collections::{HashMap, HashSet}, path::PathBuf, sync::{ Arc, @@ -63,6 +64,8 @@ pub struct Chat<'a> { shortcut_hints: ShortcutHintsPanel, prev_focus: Option, combo_thinking_active: bool, + combo_tool_messages: HashSet, + pending_combo_tool_events: HashMap>, last_usage: Option, token_schedule_session_save: Option, @@ -212,7 +215,8 @@ impl CancellationGuard { impl Chat<'static> { pub fn new(config: Config) -> Self { - let agent = Agent::new(config); + let mut agent = Agent::new(config); + agent.set_ignore_workspace_scripts(global::ignore_workspace_scripts()); Self { state: State::default(), @@ -226,6 +230,8 @@ impl Chat<'static> { shortcut_hints: ShortcutHintsPanel::default(), prev_focus: None, combo_thinking_active: false, + combo_tool_messages: HashSet::new(), + pending_combo_tool_events: HashMap::new(), last_usage: None, token_schedule_session_save: None, cancellation_guard: CancellationGuard::default(), @@ -483,16 +489,170 @@ impl Chat<'static> { } fn spawn_combo_execute(&mut self, name: String, args: Vec) { - let cancel_token = self.cancellation_guard.start_token(); - let system_prompt = self.agent.system_prompt().to_string(); + // Generate a unique tool use id + let id = format!( + "toolu_{:016x}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() + % (1u128 << 64) + ); + + // Create tool use for run_combo + let tool_use = ToolUse { + id, + name: RUN_COMBO_TOOL_NAME.to_string(), + input: serde_json::json!({ "combo_name": name, "args": args }), + }; + + // Grant permission and execute + self.agent.grant_once(&tool_use.id, &tool_use.name); let agent = self.agent.clone(); - tokio::task::spawn(task_combo_execute( - name, - args, - system_prompt, - cancel_token, - agent, - )); + let tool_use_message = + ChatMessage::assistant(ChatContent::Multiple(vec![ChatBlock::tool_use( + &tool_use.id, + &tool_use.name, + tool_use.input.clone(), + )])); + let mut prompt = format!("Run combo {}", name); + if !args.is_empty() { + prompt.push_str(" with args: "); + prompt.push_str(&args.join(" ")); + } + let prompt_message = ChatMessage::user(ChatContent::Text(prompt)); + tokio::spawn(async move { + if agent.dump_messages().await.is_empty() { + agent.append_message(prompt_message).await; + } + agent.append_message(tool_use_message).await; + }); + self.register_combo_tool_message(tool_use.id.clone()); + self.spawn_tool_use(&tool_use); + } + + fn dispatch_combo_event(&mut self, event: ComboEvent) { + self.handle_combo_event_from_tool(&event); + let combo_event_wrapped = Event::Combo(event); + let combo_event_ref = &combo_event_wrapped; + handle_component_event!(self, combo_event_ref); + } + + fn register_combo_tool_message(&mut self, id: String) { + self.combo_tool_messages.insert(id.clone()); + if let Some(events) = self.pending_combo_tool_events.remove(&id) { + for event in events { + self.dispatch_combo_event(event); + } + } + } + + fn combo_tool_event_to_combo_event(&self, event: &ComboToolEvent) -> ComboEvent { + match event { + ComboToolEvent::NotFound { name } => ComboEvent::NotFound { name: name.clone() }, + ComboToolEvent::Executing { name, command_line } => ComboEvent::Executing { + name: name.clone(), + command_line: command_line.clone(), + }, + ComboToolEvent::Output { name, chunk } => ComboEvent::Output { + name: name.clone(), + chunk: chunk.clone(), + }, + ComboToolEvent::RecordStart { name, tool_use } => ComboEvent::RecordStart { + name: name.clone(), + tool_use: tool_use.clone(), + }, + ComboToolEvent::RecordOutput { + name, + tool_use_id, + chunk, + } => ComboEvent::RecordOutput { + name: name.clone(), + tool_use_id: tool_use_id.clone(), + chunk: chunk.clone(), + }, + ComboToolEvent::RecordEnd { + name, + tool_use_id, + is_error, + output, + } => ComboEvent::RecordEnd { + name: name.clone(), + tool_use_id: tool_use_id.clone(), + is_error: *is_error, + output: output.clone(), + }, + ComboToolEvent::Prompt { + name, + prompt, + thinking, + } => ComboEvent::Prompt { + name: name.clone(), + prompt: prompt.clone(), + thinking: thinking.clone(), + }, + ComboToolEvent::PromptStream { + name, + index, + kind, + text, + } => ComboEvent::PromptStream { + name: name.clone(), + index: *index, + kind: match kind { + ComboStreamKind::Plain => BotStreamKind::Plain, + ComboStreamKind::Thinking => BotStreamKind::Thinking, + }, + text: text.clone(), + }, + ComboToolEvent::ReplyToolUse { + name, + tool_use, + thinking, + offload, + } => ComboEvent::ReplyToolUse { + name: name.clone(), + tool_use: tool_use.clone(), + thinking: thinking.clone(), + offload: *offload, + }, + ComboToolEvent::ReplyToolResult { + name, + tool_use_id, + is_error, + output, + } => ComboEvent::ReplyToolResult { + name: name.clone(), + tool_use_id: tool_use_id.clone(), + is_error: *is_error, + output: output.clone(), + }, + ComboToolEvent::ReplyToolError { message } => ComboEvent::ReplyToolError { + message: message.clone(), + }, + ComboToolEvent::Executed { + name, + starter, + exit_code, + } => ComboEvent::Executed { + name: name.clone(), + starter: starter.clone(), + exit_code: *exit_code, + }, + ComboToolEvent::Cancelled { name } => ComboEvent::Cancelled { name: name.clone() }, + } + } + + fn handle_combo_event_from_tool(&mut self, event: &ComboEvent) { + match event { + ComboEvent::Executed { starter, .. } => { + self.set_combo_thinking_active(false); + if let Err(err) = starter.combo.as_ref() { + warn!(?err, "Failed to execute starter"); + } + } + _ => self.handle_combo_event(event), + } } fn spawn_tool_use(&mut self, tool_use: &ToolUse) { @@ -1017,6 +1177,22 @@ impl Component for Chat<'static> { self.handle_key_event(key); } Event::Combo(combo) => { + // Update agent's combo list when combos are discovered + if let ComboEvent::Discovered { starters } = combo { + let combos: Vec = starters + .iter() + .filter_map(|s| { + s.combo.as_ref().ok().map(|c| ComboInfo { + path: s.path.clone(), + combo: c.clone(), + }) + }) + .collect(); + let agent = self.agent.clone(); + tokio::spawn(async move { + agent.set_combos(combos).await; + }); + } self.handle_combo_event(combo); // Combo events need to be handled by children components handle_component_event!(self, event); @@ -1028,17 +1204,36 @@ impl Component for Chat<'static> { self.messages.finalize_stream(); self.messages.reset_stream(); self.set_ready(); - self.messages - .extend(msgs.iter().cloned().map(|msg| match msg { + let mut new_messages = Vec::with_capacity(msgs.len()); + let mut combo_tool_ids = Vec::new(); + for msg in msgs.iter().cloned() { + let message = match msg { BotMessage::Plain(text) => Message::bot(Plain::new(text).into()), BotMessage::ToolUse(tool_use) => { - Message::bot(Tool::new(tool_use.to_owned()).into()) + if tool_use.name == RUN_COMBO_TOOL_NAME { + if let Ok(input) = + serde_json::from_value::(tool_use.input.clone()) + { + combo_tool_ids.push(tool_use.id.clone()); + Message::bot(Combo::new(&input.combo_name).into()) + } else { + Message::bot(Tool::new(tool_use.to_owned()).into()) + } + } else { + Message::bot(Tool::new(tool_use.to_owned()).into()) + } } BotMessage::System(message) => Message::system(Plain::new(message).into()), BotMessage::Thinking(thinking) => { Message::bot(Thinking::new(thinking).into()) } - })); + }; + new_messages.push(message); + } + self.messages.extend(new_messages.into_iter()); + for id in combo_tool_ids { + self.register_combo_tool_message(id); + } // Trigger session save after receiving bot response global::trigger_schedule_session_save(); } @@ -1112,6 +1307,17 @@ impl Component for Chat<'static> { // Trigger session save after tool result global::trigger_schedule_session_save(); } + Event::Answer(AnswerEvent::ComboToolEvent { id, event }) => { + let combo_event = self.combo_tool_event_to_combo_event(event); + if !self.combo_tool_messages.contains(id) { + self.pending_combo_tool_events + .entry(id.clone()) + .or_default() + .push(combo_event); + return; + } + self.dispatch_combo_event(combo_event); + } Event::Answer(AnswerEvent::ToolOutput { .. }) => { let _ = self.messages.on_tool_event(event); } @@ -1521,182 +1727,6 @@ async fn task_combo_discover(cancel_token: CancellationToken) { tx.send(ComboEvent::Discovered { starters }.into()).unwrap(); } -fn format_command_line(command: &str, args: &[String]) -> String { - let command = resolve_command_display(command); - let mut parts = Vec::with_capacity(args.len() + 1); - parts.push(shell_escape(&command)); - for arg in args { - parts.push(shell_escape(arg)); - } - parts.join(" ") -} - -fn resolve_command_display(command: &str) -> String { - let command_path = std::path::Path::new(command); - let workspace_combo_dir = global::workspace_combo_dir(); - if let Ok(relative) = command_path.strip_prefix(&workspace_combo_dir) { - let display_path = std::path::Path::new(".coco/combos").join(relative); - return display_path.to_string_lossy().to_string(); - } - command.to_string() -} - -fn shell_escape(value: &str) -> String { - if value.is_empty() { - return "''".to_string(); - } - if value.bytes().all(|byte| { - matches!(byte, b'a'..=b'z' - | b'A'..=b'Z' - | b'0'..=b'9' - | b'_' - | b'-' - | b'.' - | b'/' - | b':' - | b'@' - | b'+' - | b'=' - | b',' - | b'%') - }) { - return value.to_string(); - } - let mut escaped = String::from("'"); - for ch in value.chars() { - if ch == '\'' { - escaped.push_str("'\"'\"'"); - } else { - escaped.push(ch); - } - } - escaped.push('\''); - escaped -} - -/// Build a directive prompt for LLM to use `coco reply` command. -fn build_offload_reply_directive(schemas: &[code_combo::PromptSchema]) -> String { - let field_args: Vec = schemas - .iter() - .map(|s| format!("--{}=", s.name)) - .collect(); - - let field_descriptions: Vec = schemas - .iter() - .map(|s| format!("- --{}=: {}", s.name, s.description)) - .collect(); - - format!( - r#"You must respond by calling the bash tool to execute the `coco reply` command. -Use this exact format: -``` -coco reply {field_args} -``` - -Required fields: -{field_list} - -The value should be properly shell-escaped if it contains special characters. -Do not output any other text or explanation. Only call the bash tool with the coco reply command."#, - field_args = field_args.join(" "), - field_list = field_descriptions.join("\n"), - ) -} - -fn build_offload_reply_retry_directive(schemas: &[code_combo::PromptSchema]) -> String { - let directive = build_offload_reply_directive(schemas); - format!("The previous response did not produce a valid coco reply. Retry.\n\n{directive}") -} - -enum OffloadCommandKind { - Coco, - Safe, - Unsafe, -} - -fn classify_offload_command(command: &str) -> OffloadCommandKind { - let is_coco_reply = is_coco_reply_command(command); - if is_coco_reply { - return OffloadCommandKind::Coco; - } - if is_safe_command(command) { - return OffloadCommandKind::Safe; - } - OffloadCommandKind::Unsafe -} - -fn build_offload_reply_guidance( - schemas: &[code_combo::PromptSchema], - command: &str, - executed: bool, -) -> String { - let field_args: Vec = schemas - .iter() - .map(|schema| format!("--{}=...", schema.name)) - .collect(); - let field_descriptions: Vec = schemas - .iter() - .map(|schema| format!("- {}: {}", schema.name, schema.description)) - .collect(); - let status = if executed { "executed" } else { "blocked" }; - format!( - "The previous tool call was {status} but did not use `coco reply` (command: {command}).\n\ -You must call the bash tool with `coco reply` and only that command.\n\ -Required fields:\n{field_list}\n\n\ -Example:\n\ -coco reply {field_args}", - field_list = field_descriptions.join("\n"), - field_args = field_args.join(" "), - ) -} - -#[derive(Debug, Snafu)] -enum ComboReplyError { - #[snafu(display("prompt reply cancelled"))] - Cancelled, - #[snafu(display("chat failed: {message}"))] - ChatFailed { message: String }, - #[snafu(display("LLM did not return a bash tool call for coco reply"))] - MissingBashToolUse, - #[snafu(display("failed to parse bash tool input: {message}"))] - InvalidBashInput { message: String }, - #[snafu(display("expected coco reply command, got: {command}"))] - UnexpectedCommand { command: String }, - #[snafu(display("bash execution failed: {message}"))] - BashExecutionFailed { message: String }, -} - -impl ComboReplyError { - fn should_retry(&self) -> bool { - matches!( - self, - ComboReplyError::MissingBashToolUse - | ComboReplyError::InvalidBashInput { .. } - | ComboReplyError::UnexpectedCommand { .. } - ) - } -} - -fn is_coco_command_name(name: &str) -> bool { - name == "coco" || name.ends_with("/coco") -} - -fn is_coco_reply_command(command: &str) -> bool { - let summary = match parse_primary_command(command) { - Ok(summary) => summary, - Err(_) => return false, - }; - if !is_coco_command_name(&summary.name) { - return false; - } - matches!(summary.args.first(), Some(arg) if arg == "reply") -} - -fn is_safe_command(command: &str) -> bool { - let trimmed = command.trim(); - !trimmed.is_empty() && bash_unsafe_ranges(command).is_empty() -} - fn add_usage(total: &mut UsageStats, delta: &UsageStats) { let has_breakdown = delta.input_tokens.is_some() || delta.output_tokens.is_some(); if has_breakdown { @@ -1712,606 +1742,9 @@ fn add_usage(total: &mut UsageStats, delta: &UsageStats) { } } -/// Handle combo reply via offload to bash `coco reply` command. -async fn handle_offload_combo_reply_with_retry( - agent: &mut Agent, - schemas: &[code_combo::PromptSchema], - combo_name: &str, - cancel_token: CancellationToken, - tx: tokio::sync::mpsc::UnboundedSender, -) -> Result<(), ComboReplyError> { - let max_retries = agent.combo_reply_retries(); - let mut attempt = 0usize; - loop { - if cancel_token.is_cancelled() { - return Err(ComboReplyError::Cancelled); - } - let directive = if attempt == 0 { - build_offload_reply_directive(schemas) - } else { - build_offload_reply_retry_directive(schemas) - }; - let response = handle_offload_combo_reply( - agent, - schemas, - combo_name, - cancel_token.clone(), - tx.clone(), - &directive, - ) - .await; - match response { - Ok(()) => return Ok(()), - Err(err) => { - if attempt >= max_retries || !err.should_retry() { - return Err(err); - } - attempt += 1; - } - } - } -} - -/// Handle combo reply via offload to bash `coco reply` command. -/// Response is sent via SESSION_SOCK, not returned here. -async fn handle_offload_combo_reply( - agent: &mut Agent, - schemas: &[code_combo::PromptSchema], - combo_name: &str, - cancel_token: CancellationToken, - tx: tokio::sync::mpsc::UnboundedSender, - directive: &str, -) -> Result<(), ComboReplyError> { - use code_combo::tools::BASH_TOOL_NAME; - - if cancel_token.is_cancelled() { - return Err(ComboReplyError::Cancelled); - } - - // Build and append the directive prompt - agent - .append_message(ChatMessage::user(ChatContent::Text(directive.to_string()))) - .await; - - // Call chat to get LLM response with streaming for thinking updates - let stream_tx = tx.clone(); - let stream_name = combo_name.to_string(); - let chat_response = agent - .chat_stream_with_history(cancel_token.clone(), move |update| { - let (index, kind, text) = match update { - ChatStreamUpdate::Plain { index, text } => (index, BotStreamKind::Plain, text), - ChatStreamUpdate::Thinking { index, text } => { - (index, BotStreamKind::Thinking, text) - } - }; - stream_tx - .send( - ComboEvent::PromptStream { - name: stream_name.clone(), - index, - kind, - text, - } - .into(), - ) - .ok(); - }) - .await - .map_err(|e| ComboReplyError::ChatFailed { - message: e.to_string(), - })?; - if let Some(usage) = chat_response.usage.clone() { - tx.send(AnswerEvent::Usage { usage }.into()).ok(); - } - - // Extract Bash tool_use from response - let blocks = match &chat_response.message.content { - ChatContent::Multiple(blocks) => blocks.as_slice(), - ChatContent::Text(_) => &[], - }; - - let bash_tool_use = blocks - .iter() - .find_map(|block| { - if let ChatBlock::ToolUse(tool_use) = block - && tool_use.name == BASH_TOOL_NAME - { - return Some(tool_use.clone()); - } - None - }) - .ok_or(ComboReplyError::MissingBashToolUse)?; - - let bash_input: BashInput = - serde_json::from_value(bash_tool_use.input.clone()).map_err(|err| { - ComboReplyError::InvalidBashInput { - message: err.to_string(), - } - })?; - - let original_command = bash_input.command.clone(); - let command_kind = classify_offload_command(&bash_input.command); - - // Send tool use event for UI feedback (thinking already streamed via PromptStream) - tx.send( - ComboEvent::ReplyToolUse { - name: combo_name.to_string(), - tool_use: bash_tool_use.clone(), - thinking: Vec::new(), - offload: true, - } - .into(), - ) - .ok(); - - if matches!(command_kind, OffloadCommandKind::Unsafe) { - let reason = match code_combo::bash_unsafe_reason(&original_command) { - Ok(_) => "command not allowlisted".to_string(), - Err(reason) => reason, - }; - let output = Final::Message(format!("command rejected: {reason}; expected coco reply")); - agent - .append_message(build_tool_result_message(&bash_tool_use.id, true, &output)) - .await; - tx.send( - ComboEvent::ReplyToolResult { - name: combo_name.to_string(), - tool_use_id: bash_tool_use.id.clone(), - is_error: true, - output: output.clone(), - } - .into(), - ) - .ok(); - let prompt = build_offload_reply_guidance(schemas, &original_command, false); - agent - .append_message(ChatMessage::user(ChatContent::Text(prompt))) - .await; - return Err(ComboReplyError::UnexpectedCommand { - command: original_command, - }); - } - - let bash_input_value = - serde_json::to_value(&bash_input).map_err(|err| ComboReplyError::InvalidBashInput { - message: err.to_string(), - })?; - - // Auto-grant and execute the bash command - agent.grant_once(&bash_tool_use.id, BASH_TOOL_NAME); - - // Execute the bash command - let mut final_output: Option = None; - let tool_use_id = bash_tool_use.id.clone(); - let _ = agent - .execute_with_output( - &bash_tool_use.id, - BASH_TOOL_NAME, - code_combo::Input::Starter(bash_input_value), - cancel_token.clone(), - |out| { - if let Output::Success(output) = out { - final_output = Some(Output::Success(output)); - } else if let Output::Failure(output) = out { - final_output = Some(Output::Failure(output)); - } - }, - ) - .await - .map_err(|e| ComboReplyError::BashExecutionFailed { - message: e.to_string(), - })?; - - if cancel_token.is_cancelled() { - return Err(ComboReplyError::Cancelled); - } - - // Parse the output and send result event - let (output, is_error) = match final_output.expect("bash execution should produce output") { - Output::Success(output) => (output, false), - Output::Failure(output) => (output, true), - _ => unreachable!("bash tool only produces Success or Failure"), - }; - - agent - .append_message(build_tool_result_message(&tool_use_id, is_error, &output)) - .await; - - // Send combo-specific result event for UI feedback (not AnswerEvent::ToolResult - // which would be intercepted by Chat and trigger another chat task) - tx.send( - ComboEvent::ReplyToolResult { - name: combo_name.to_string(), - tool_use_id, - is_error, - output: output.clone(), - } - .into(), - ) - .ok(); - - if matches!(command_kind, OffloadCommandKind::Safe) { - let prompt = build_offload_reply_guidance(schemas, &original_command, true); - agent - .append_message(ChatMessage::user(ChatContent::Text(prompt))) - .await; - return Err(ComboReplyError::UnexpectedCommand { - command: original_command, - }); - } - - Ok(()) -} - -async fn task_combo_execute( - name: String, - args: Vec, - system_prompt: String, - cancel_token: CancellationToken, - mut agent: Agent, -) { - let tx = global::event_tx(); - let Some(starters) = discover_combo_starters(cancel_token.clone(), Some(&name)).await else { - return; - }; - - let Some(starter) = starters.into_iter().find(|starter| match &starter.combo { - Ok(combo) => combo.metadata.name == name, - Err(err) => { - warn!(?starter.path, ?err, "Failed to load combo"); - false - } - }) else { - tx.send(ComboEvent::NotFound { name: name.clone() }.into()) - .unwrap(); - return; - }; - - let command_line = format_command_line(&starter.path, &args); - - // Skip the `ComboEvent::Discovered` event and advance directly to `ComboEvent::Executing` - tx.send( - ComboEvent::Executing { - name: name.clone(), - command_line, - } - .into(), - ) - .unwrap(); - - let session_env = SessionEnv::builder() - .build() - .expect("failed to build session"); - let session_socket_path = session_env.socket_path().to_path_buf(); - let mcp_envs = match code_combo::tools::prepare_mcp_envs().await { - Ok(envs) => envs, - Err(err) => { - warn!(?err, "failed to prepare mcp envs for combo"); - Vec::new() - } - }; - let starter_path = starter.path.clone(); - let mut exit_code: Option = None; - - let mut execution = StarterCommand::new(&starter.path) - .args(args) - .envs(mcp_envs) - .session_env(session_env) - .execute(); - let mut cancelled = false; - loop { - tokio::select! { - _ = cancel_token.cancelled(), if !cancelled => { - cancelled = true; - execution.cancel(); - } - event = execution.next() => { - let Some(event) = event else { break }; - match event { - StarterEvent::Output { chunk } => { - tx.send( - ComboEvent::Output { - name: name.clone(), - chunk, - } - .into(), - ) - .unwrap(); - } - StarterEvent::RecordOutput { tool_use_id, chunk } => { - tx.send( - ComboEvent::RecordOutput { - name: name.clone(), - tool_use_id, - chunk, - } - .into(), - ) - .unwrap(); - } - StarterEvent::RecordStart { tool_use } => { - agent - .append_message(build_tool_use_message(&tool_use)) - .await; - tx.send( - ComboEvent::RecordStart { - name: name.clone(), - tool_use, - } - .into(), - ) - .unwrap(); - } - StarterEvent::RecordEnd { - tool_use_id, - is_error, - output, - } => { - agent - .append_message(build_tool_result_message( - &tool_use_id, - is_error, - &output, - )) - .await; - tx.send( - ComboEvent::RecordEnd { - name: name.clone(), - tool_use_id, - is_error, - output, - } - .into(), - ) - .unwrap(); - } - StarterEvent::Prompt { prompt } => { - agent - .append_message(build_prompt_message(&prompt)) - .await; - tx.send( - ComboEvent::Prompt { - name: name.clone(), - prompt, - thinking: None, - } - .into(), - ) - .unwrap(); - } - StarterEvent::PromptRequest { - prompt, - schemas, - thinking, - responder, - } => { - agent - .append_message(build_prompt_message(&prompt)) - .await; - tx.send( - ComboEvent::Prompt { - name: name.clone(), - prompt: prompt.clone(), - thinking: thinking.clone(), - } - .into(), - ) - .unwrap(); - - // Check if offload_combo_reply is enabled for the current provider - if agent.offload_combo_reply() { - // Offload path: use bash tool to call `coco reply` - // Response is sent via SESSION_SOCK by the server - agent.set_bash_env( - "COCO_SESSION_SOCK", - session_socket_path.to_string_lossy().to_string(), - ); - let result = handle_offload_combo_reply_with_retry( - &mut agent, - &schemas, - &name, - cancel_token.clone(), - tx.clone(), - ) - .await; - agent.remove_bash_env("COCO_SESSION_SOCK"); - if let Err(err) = result { - tx.send( - ComboEvent::ReplyToolError { - message: err.to_string(), - } - .into(), - ) - .ok(); - } - } else { - // Original path: use combo_reply tool - let disable_stream = agent.disable_stream_for_current_model(); - let mut streamed_thinking = false; - let reply = if cancel_token.is_cancelled() { - Err("prompt reply cancelled".to_string()) - } else if disable_stream { - agent - .reply_prompt_with_thinking( - &system_prompt, - schemas, - thinking.clone(), - ) - .await - .map_err(|err| err.to_string()) - } else { - let stream_tx = tx.clone(); - let stream_name = name.clone(); - let thinking_seen = Arc::new(AtomicBool::new(false)); - let thinking_seen_stream = thinking_seen.clone(); - let reply = agent - .reply_prompt_stream_with_thinking( - &system_prompt, - schemas, - thinking.clone(), - cancel_token.clone(), - move |update| { - let (index, kind, text) = match update { - ChatStreamUpdate::Plain { index, text } => { - (index, BotStreamKind::Plain, text) - } - ChatStreamUpdate::Thinking { index, text } => { - thinking_seen_stream - .store(true, Ordering::Relaxed); - (index, BotStreamKind::Thinking, text) - } - }; - stream_tx - .send( - ComboEvent::PromptStream { - name: stream_name.clone(), - index, - kind, - text, - } - .into(), - ) - .ok(); - }, - ) - .await - .map_err(|err| err.to_string()); - streamed_thinking = thinking_seen.load(Ordering::Relaxed); - reply - }; - if let Ok(reply) = &reply { - if let Some(usage) = reply.usage.clone() { - tx.send(AnswerEvent::Usage { usage }.into()).ok(); - } - let thinking = if streamed_thinking { - Vec::new() - } else { - reply.thinking.clone() - }; - tx.send( - ComboEvent::ReplyToolUse { - name: name.clone(), - tool_use: reply.tool_use.clone(), - thinking, - offload: false, - } - .into(), - ) - .ok(); - } - let response = reply.map(|reply| reply.response); - if let Err(err) = &response { - tx.send( - ComboEvent::ReplyToolError { - message: err.clone(), - } - .into(), - ) - .ok(); - } - if let Err(err) = responder.send(response) { - tx.send( - ComboEvent::ReplyToolError { message: err }.into(), - ) - .ok(); - } - } - } - StarterEvent::Finished { exit_code: code } => { - exit_code = code; - } - _ => (), - } - } - } - } - - let starter = match execution.wait().await { - Ok(starter) => starter, - Err(err) => { - warn!(?err, "starter join error"); - let error_message = format!("Combo execution failed: starter join error: {err}"); - agent - .append_message(ChatMessage::user(ChatContent::Text(error_message))) - .await; - let starter = code_combo::Starter { - path: starter_path, - combo: Err(StarterError::Invalid { - reason: format!("starter join error: {err}"), - }), - }; - tx.send( - ComboEvent::Executed { - name: name.clone(), - starter, - exit_code, - } - .into(), - ) - .unwrap(); - return; - } - }; - - if cancel_token.is_cancelled() || matches!(&starter.combo, Err(StarterError::Cancelled)) { - tx.send( - ComboEvent::Cancelled { - name: Some(name.clone()), - } - .into(), - ) - .unwrap(); - return; - } - - if let Err(err) = starter.combo.as_ref() { - let error_message = format!("Combo execution failed: {err}"); - agent - .append_message(ChatMessage::user(ChatContent::Text(error_message))) - .await; - } else if let Some(code) = exit_code - && code != 0 - { - let error_message = format!("Combo execution failed: exit code {code}"); - agent - .append_message(ChatMessage::user(ChatContent::Text(error_message))) - .await; - } - - tx.send( - ComboEvent::Executed { - name: name.clone(), - starter, - exit_code, - } - .into(), - ) - .unwrap(); -} - -fn build_tool_use_message(tool_use: &ToolUse) -> ChatMessage { - ChatMessage::assistant(ChatContent::Multiple(vec![ChatBlock::tool_use( - &tool_use.id, - &tool_use.name, - tool_use.input.clone(), - )])) -} - const TOOL_RESULT_MAX_BYTES: usize = 80 * 1024; const TOOL_RESULT_TRUNCATION_SUFFIX: &str = "\n... (truncated)"; -fn build_tool_result_message(tool_use_id: &str, is_error: bool, output: &Final) -> ChatMessage { - ChatMessage::user(ChatContent::Multiple(vec![ChatBlock::tool_result( - tool_use_id, - Some(is_error), - final_to_tool_content(output), - )])) -} - -fn build_prompt_message(prompt: &str) -> ChatMessage { - ChatMessage::user(ChatContent::Text(prompt.to_string())) -} - fn final_to_tool_content(output: &Final) -> ChatContent { let text = match output { Final::Json(value) => truncate_json_tool_output(value, TOOL_RESULT_MAX_BYTES) @@ -2768,6 +2201,16 @@ async fn task_tool_use(mut agent: Agent, tool_use: ToolUse, cancel_token: Cancel ) .unwrap(); } + Output::ComboOutput(event) => { + tx.send( + AnswerEvent::ComboToolEvent { + id: id.clone(), + event, + } + .into(), + ) + .unwrap(); + } }, ) .await; @@ -2854,27 +2297,4 @@ mod tests { Some(true) ); } - - #[test] - fn combo_reply_error_should_retry() { - let retryable = [ - ComboReplyError::MissingBashToolUse, - ComboReplyError::InvalidBashInput { - message: "bad input".to_string(), - }, - ComboReplyError::UnexpectedCommand { - command: "ls".to_string(), - }, - ]; - for case in retryable { - assert!(case.should_retry(), "case: {case}"); - } - assert!(!ComboReplyError::Cancelled.should_retry()); - assert!( - !ComboReplyError::ChatFailed { - message: "network".to_string() - } - .should_retry() - ); - } } diff --git a/crates/coco-tui/src/components/messages/combo.rs b/crates/coco-tui/src/components/messages/combo.rs index 1b4e110..9afe3e4 100644 --- a/crates/coco-tui/src/components/messages/combo.rs +++ b/crates/coco-tui/src/components/messages/combo.rs @@ -118,6 +118,7 @@ impl Combo { Self { state: State::new(Inner { name: name.to_string(), + starter_state: StarterState::Executing, ..Default::default() }), command: None, diff --git a/crates/coco-tui/src/events.rs b/crates/coco-tui/src/events.rs index 0dab7b1..58d89b0 100644 --- a/crates/coco-tui/src/events.rs +++ b/crates/coco-tui/src/events.rs @@ -1,6 +1,6 @@ use code_combo::{ OutputChunk, Starter, TextEdit, ThinkingConfig, ToolUse, UsageStats, - tools::{Final, SubagentEvent}, + tools::{ComboEvent as ComboToolEvent, Final, SubagentEvent}, }; use crossterm::event::{KeyEvent, MouseEvent}; @@ -67,6 +67,11 @@ pub enum AnswerEvent { id: String, event: SubagentEvent, }, + /// Combo tool event (for run_combo tool). + ComboToolEvent { + id: String, + event: ComboToolEvent, + }, } impl From for Event { diff --git a/src/agent.rs b/src/agent.rs index 510ce07..dd9c7bb 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -10,7 +10,7 @@ use crate::provider::{ Block, Client, ContentBlockDelta, MessagesStreamEvent, Role, Thinking, ToolChoice, }; use futures_util::StreamExt; -use serde_json::{Map as JsonMap, json}; +use serde_json::{Map as JsonMap, Value, json}; use snafu::prelude::*; use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; @@ -19,7 +19,7 @@ use tracing::warn; use crate::{ Config, PromptSchema, ProviderConfig, RequestOptions, Result, ResultDisplayExt, ThinkingBlocksMode, ThinkingConfig, - tools::{RunTaskContext, RunTaskTool}, + tools::{ComboInfo, RunComboContext, RunComboTool, RunTaskContext, RunTaskTool}, }; use executor::PermissionControl; use prompt::{build_system_prompt_from_config, build_system_prompt_from_config_async}; @@ -53,12 +53,14 @@ pub struct Agent { /// Shared messages across cloned instances. messages: Arc>>, thinking_enabled: bool, - thinking_budget_tokens: usize, thinking_cleanup_pending: bool, model_override: Option, /// Full agent configuration loaded at initialization agent_config: AgentConfig, + + /// Shared context for run_combo tool. + combo_context: Arc>, } pub struct ChatResponse { @@ -296,12 +298,6 @@ impl Agent { .as_deref() .and_then(|path| path.parent()); - let thinking_budget_tokens = config - .providers - .first() - .and_then(|provider| provider.thinking_budget_tokens) - .unwrap_or(DEFAULT_THINKING_BUDGET_TOKENS); - // Load agent config (builtin -> global -> workspace, always override) let agent_config = load_agent_config_with_layers( &config.agent_path_layers, @@ -318,6 +314,13 @@ impl Agent { Some(&agent_config), ); + // Build system prompt from agent config + let system_prompt = build_system_prompt_from_config( + agent_config.system_prompt.as_ref(), + &config.config_dir, + workspace_dir.unwrap_or(&config.config_dir), + ); + // Register run_task tool only if subagents are configured // Must be done before apply_tool_policies so it can be retained if let Some(ref subagents) = agent_config.subagents @@ -332,6 +335,20 @@ impl Agent { executor.register_tool(std::sync::Arc::new(run_task_tool)); } + // Register run_combo tool with empty combo list initially + // Combos will be populated later via set_combos() + let combo_context = Arc::new(Mutex::new(RunComboContext { + combos: Vec::new(), + envs: Vec::new(), + config: config.clone(), + system_prompt: system_prompt.clone(), + model_override: None, + thinking_enabled: false, + ignore_workspace_scripts: false, + })); + let run_combo_tool = RunComboTool::new_with_shared_context(combo_context.clone()); + executor.register_tool(std::sync::Arc::new(run_combo_tool)); + // Apply agent tools as base if let Some(tools) = agent_config.tools.as_deref() { executor.apply_tool_policies(Some(tools), None); @@ -340,23 +357,16 @@ impl Agent { // Apply config.toml allow/deny on top (existing behavior) executor.apply_tool_policies(config.allow_tools.as_deref(), config.deny_tools.as_deref()); - // Build system prompt from agent config - let system_prompt = build_system_prompt_from_config( - agent_config.system_prompt.as_ref(), - &config.config_dir, - workspace_dir.unwrap_or(&config.config_dir), - ); - Self { config, system_prompt, executor, messages: Arc::new(Mutex::new(vec![])), thinking_enabled: false, - thinking_budget_tokens, thinking_cleanup_pending: false, model_override: None, agent_config, + combo_context, } } @@ -365,7 +375,11 @@ impl Agent { } pub fn set_system_prompt(&mut self, system_prompt: &str) { - self.system_prompt = system_prompt.to_string() + self.system_prompt = system_prompt.to_string(); + let system_prompt = self.system_prompt.clone(); + self.update_combo_context(move |ctx| { + ctx.system_prompt = system_prompt; + }); } /// Apply tool policies to restrict available tools. @@ -386,6 +400,14 @@ impl Agent { &self.executor } + /// Update the combo list for run_combo tool. + /// + /// This should be called after combo discovery to populate the available combos. + pub async fn set_combos(&self, combos: Vec) { + let mut ctx = self.combo_context.lock().await; + ctx.combos = combos; + } + /// Setup system prompt asynchronously from configuration and AGENTS.md files. /// /// This method builds the system prompt by: @@ -404,7 +426,9 @@ impl Agent { ) .await; - self.system_prompt = system_prompt; + self.system_prompt = system_prompt.clone(); + let mut ctx = self.combo_context.lock().await; + ctx.system_prompt = system_prompt; } /// Get the agent configuration. @@ -429,7 +453,29 @@ impl Agent { } pub fn set_model_override(&mut self, model: Option) { - self.model_override = model; + self.model_override = model.clone(); + self.update_combo_context(move |ctx| { + ctx.model_override = model; + }); + } + + pub fn set_ignore_workspace_scripts(&mut self, ignore: bool) { + self.update_combo_context(move |ctx| { + ctx.ignore_workspace_scripts = ignore; + }); + } + + fn update_combo_context(&self, update: F) + where + F: FnOnce(&mut RunComboContext) + Send + 'static, + { + let ctx = self.combo_context.clone(); + if let Ok(handle) = tokio::runtime::Handle::try_current() { + handle.spawn(async move { + let mut guard = ctx.lock().await; + update(&mut guard); + }); + } } pub async fn dump_messages(&self) -> Vec { @@ -447,7 +493,7 @@ impl Agent { pub async fn chat(&mut self, message: Message) -> Result { let request_options = self.request_options_for_current_model(); let (_, client) = self.pick_provider()?; - let thinking = self.thinking_payload(); + let thinking = self.thinking_payload(&request_options); let messages = { let mut messages = self.messages.lock().await; @@ -476,7 +522,10 @@ impl Agent { let message = if response.content.is_empty() { Message::assistant(Content::Multiple(Vec::default())) } else { - let msg = Message::assistant(Content::Multiple(response.content)); + let mut msg = Message::assistant(Content::Multiple(response.content)); + if request_options.stringify_nested_tool_inputs { + parse_stringified_tool_inputs_in_message(&mut msg, &self.executor); + } self.messages.lock().await.push(msg.clone()); msg }; @@ -491,7 +540,7 @@ impl Agent { pub async fn chat_with_history(&mut self) -> Result { let request_options = self.request_options_for_current_model(); let (_, client) = self.pick_provider()?; - let thinking = self.thinking_payload(); + let thinking = self.thinking_payload(&request_options); let messages = self.messages.lock().await.clone(); let messages = self.prepare_messages_for_request(messages, &request_options); @@ -515,7 +564,10 @@ impl Agent { let message = if response.content.is_empty() { Message::assistant(Content::Multiple(Vec::default())) } else { - let msg = Message::assistant(Content::Multiple(response.content)); + let mut msg = Message::assistant(Content::Multiple(response.content)); + if request_options.stringify_nested_tool_inputs { + parse_stringified_tool_inputs_in_message(&mut msg, &self.executor); + } self.messages.lock().await.push(msg.clone()); msg }; @@ -565,7 +617,7 @@ impl Agent { F: FnMut(ChatStreamUpdate) + Send, { let (_, client) = self.pick_provider()?; - let thinking = self.thinking_payload(); + let thinking = self.thinking_payload(request_options); let messages = { let mut messages = self.messages.lock().await; @@ -611,7 +663,10 @@ impl Agent { let message = if blocks.is_empty() { Message::assistant(Content::Multiple(Vec::default())) } else { - let msg = Message::assistant(Content::Multiple(blocks)); + let mut msg = Message::assistant(Content::Multiple(blocks)); + if request_options.stringify_nested_tool_inputs { + parse_stringified_tool_inputs_in_message(&mut msg, &self.executor); + } self.messages.lock().await.push(msg.clone()); msg }; @@ -653,7 +708,7 @@ impl Agent { } else { Some(system_prompt) }; - let thinking = self.thinking_payload_with_override(thinking.as_ref()); + let thinking = self.thinking_payload_with_override(&request_options, thinking.as_ref()); let mut attempt = 0usize; loop { let reply_tool = build_reply_tool(&schemas)?; @@ -770,7 +825,7 @@ impl Agent { } else { Some(system_prompt) }; - let thinking = self.thinking_payload_with_override(thinking.as_ref()); + let thinking = self.thinking_payload_with_override(&request_options, thinking.as_ref()); let mut attempt = 0usize; loop { let reply_tool = build_reply_tool(&schemas)?; @@ -910,6 +965,9 @@ impl Agent { pub fn set_thinking_enabled(&mut self, enabled: bool) { self.thinking_enabled = enabled; + self.update_combo_context(move |ctx| { + ctx.thinking_enabled = enabled; + }); } pub fn thinking_enabled(&self) -> bool { @@ -957,10 +1015,9 @@ impl Agent { Ok(idx) => { let provider = &self.config.providers[idx]; let model = Self::resolve_model(provider, selected_model); - let options = self.config.request_options_for_model(&model); - options - .offload_combo_reply - .unwrap_or(provider.offload_combo_reply) + let mut options = self.config.request_options_for_model(&model); + options.apply_override(&provider.request_overrides); + options.offload_combo_reply.unwrap_or(false) } Err(_) => false, } @@ -978,6 +1035,7 @@ impl Agent { name: &str, input: executor::Input<'a>, ) -> Output { + let input = self.normalize_tool_input_for_execution(name, input); self.executor .execute(id, name, input) .await @@ -995,14 +1053,19 @@ impl Agent { where F: FnMut(Output) + Send, { + let input = self.normalize_tool_input_for_execution(name, input); self.executor .execute_with_output(id, name, input, cancel_token, on_output) .await } - fn thinking_payload(&self) -> Option { + fn thinking_payload(&self, request_options: &RequestOptions) -> Option { if self.thinking_enabled { - Some(Thinking::enabled(self.thinking_budget_tokens)) + Some(Thinking::enabled( + request_options + .thinking_budget_tokens + .unwrap_or(DEFAULT_THINKING_BUDGET_TOKENS), + )) } else { None } @@ -1010,17 +1073,20 @@ impl Agent { fn thinking_payload_with_override( &self, + request_options: &RequestOptions, thinking: Option<&ThinkingConfig>, ) -> Option { let Some(thinking) = thinking else { - return self.thinking_payload(); + return self.thinking_payload(request_options); }; if !thinking.enabled { return None; } - let budget_tokens = thinking - .budget_tokens - .unwrap_or(self.thinking_budget_tokens); + let budget_tokens = thinking.budget_tokens.unwrap_or( + request_options + .thinking_budget_tokens + .unwrap_or(DEFAULT_THINKING_BUDGET_TOKENS), + ); Some(Thinking::enabled(budget_tokens)) } @@ -1030,12 +1096,36 @@ impl Agent { Ok(idx) => { let provider = &self.config.providers[idx]; let model = Self::resolve_model(provider, selected_model); - self.config.request_options_for_model(&model) + let mut options = self.config.request_options_for_model(&model); + options.apply_override(&provider.request_overrides); + options } Err(_) => RequestOptions::default(), } } + fn normalize_tool_input_for_execution<'a>( + &self, + name: &str, + input: executor::Input<'a>, + ) -> executor::Input<'a> { + let request_options = self.request_options_for_current_model(); + if !request_options.stringify_nested_tool_inputs { + return input; + } + match input { + Input::Starter(value) => { + let schema = self.executor.tool_input_schema(name); + let value = match schema.as_ref() { + Some(schema) => parse_stringified_tool_input(value, schema), + None => value, + }; + Input::Starter(value) + } + input => input, + } + } + fn provider_tools_for_request( &self, request_options: &RequestOptions, @@ -1043,7 +1133,13 @@ impl Agent { if request_options.disable_tools { Vec::new() } else { - self.executor.provider_tools() + let mut tools = self.executor.provider_tools(); + if request_options.stringify_nested_tool_inputs { + for tool in &mut tools { + tool.input_schema = stringify_nested_tool_schema(&tool.input_schema); + } + } + tools } } @@ -1135,6 +1231,9 @@ impl Agent { { Self::ensure_thinking_blocks(&mut messages); } + if request_options.stringify_nested_tool_inputs { + messages = stringify_nested_tool_inputs(messages, &self.executor); + } messages } @@ -1277,9 +1376,189 @@ fn should_use_tool_choice_fallback(request_options: &RequestOptions) -> bool { request_options.disable_tool_choice && request_options.tool_choice_fallback } +fn stringify_nested_tool_schema(schema: &Value) -> Value { + let mut output = schema.clone(); + stringify_nested_schema_in_place(&mut output, 0); + output +} + +fn stringify_nested_schema_in_place(schema: &mut Value, depth: usize) { + if depth > 0 && schema_is_object_or_array(schema) { + let desc = schema + .get("description") + .and_then(Value::as_str) + .unwrap_or(""); + let suffix = "JSON string"; + let description = if desc.is_empty() { + suffix.to_string() + } else { + format!("{desc} ({suffix})") + }; + *schema = json!({ + "type": "string", + "description": description, + }); + return; + } + + let Some(obj) = schema.as_object_mut() else { + return; + }; + + if let Some(props) = obj.get_mut("properties").and_then(Value::as_object_mut) { + for value in props.values_mut() { + stringify_nested_schema_in_place(value, depth + 1); + } + } + if let Some(items) = obj.get_mut("items") { + stringify_nested_schema_in_place(items, depth + 1); + } + if let Some(additional) = obj.get_mut("additionalProperties") { + stringify_nested_schema_in_place(additional, depth + 1); + } + for key in ["anyOf", "oneOf", "allOf"] { + if let Some(list) = obj.get_mut(key).and_then(Value::as_array_mut) { + for value in list { + stringify_nested_schema_in_place(value, depth + 1); + } + } + } +} + +fn schema_is_object_or_array(schema: &Value) -> bool { + if let Some(ty) = schema.get("type") { + match ty { + Value::String(value) => { + if value == "object" || value == "array" { + return true; + } + } + Value::Array(values) => { + if values.iter().any(|value| { + matches!(value, Value::String(value) if value == "object" || value == "array") + }) { + return true; + } + } + _ => (), + } + } + if schema.get("properties").is_some() || schema.get("items").is_some() { + return true; + } + for key in ["anyOf", "oneOf", "allOf"] { + if let Some(list) = schema.get(key).and_then(Value::as_array) + && list.iter().any(schema_is_object_or_array) + { + return true; + } + } + false +} + +fn stringify_nested_tool_inputs(messages: Vec, executor: &Executor) -> Vec { + messages + .into_iter() + .map(|mut message| { + let Content::Multiple(blocks) = &mut message.content else { + return message; + }; + for block in blocks { + let Block::ToolUse(tool_use) = block else { + continue; + }; + if let Some(schema) = executor.tool_input_schema(&tool_use.name) { + tool_use.input = stringify_tool_input_value(&tool_use.input, &schema); + } + } + message + }) + .collect() +} + +fn stringify_tool_input_value(input: &Value, schema: &Value) -> Value { + let Value::Object(map) = input else { + return input.clone(); + }; + let mut output = JsonMap::new(); + for (key, value) in map { + let prop_schema = schema + .get("properties") + .and_then(Value::as_object) + .and_then(|props| props.get(key)); + if let Some(schema) = prop_schema + && schema_is_object_or_array(schema) + && matches!(value, Value::Object(_) | Value::Array(_)) + { + let text = serde_json::to_string(value).unwrap_or_else(|_| value.to_string()); + output.insert(key.clone(), Value::String(text)); + } else { + output.insert(key.clone(), value.clone()); + } + } + Value::Object(output) +} + +fn parse_stringified_tool_input(input: Value, schema: &Value) -> Value { + let Value::Object(map) = input else { + return input; + }; + let mut output = JsonMap::new(); + for (key, value) in map { + let prop_schema = schema + .get("properties") + .and_then(Value::as_object) + .and_then(|props| props.get(&key)); + if let Some(schema) = prop_schema + && schema_is_object_or_array(schema) + && let Value::String(text) = &value + && let Ok(parsed) = serde_json::from_str::(text) + { + output.insert(key, parsed); + continue; + } + output.insert(key, value); + } + Value::Object(output) +} + +fn parse_stringified_tool_inputs_in_message(message: &mut Message, executor: &Executor) { + let Content::Multiple(blocks) = &mut message.content else { + return; + }; + for block in blocks { + let Block::ToolUse(tool_use) = block else { + continue; + }; + if let Some(schema) = executor.tool_input_schema(&tool_use.name) { + tool_use.input = parse_stringified_tool_input(tool_use.input.clone(), &schema); + } + } +} + #[cfg(test)] mod tests { use super::*; + use crate::{EnvString, ModelRequestConfig, ProviderKind}; + + #[test] + fn request_options_provider_override_stringify_nested_tool_inputs() { + let mut config = Config::default(); + config.providers.push(ProviderConfig { + name: "demo".to_string(), + kind: ProviderKind::OpenAI, + api_key: EnvString::String("test".to_string()), + base_url: "http://localhost".to_string(), + models: None, + request_overrides: ModelRequestConfig { + stringify_nested_tool_inputs: Some(true), + ..ModelRequestConfig::default() + }, + }); + let agent = Agent::new(config); + let options = agent.request_options_for_current_model(); + assert!(options.stringify_nested_tool_inputs); + } #[test] fn stream_accumulator_updates_plain_and_thinking() { @@ -1464,4 +1743,65 @@ mod tests { _ => panic!("expected multiple blocks"), } } + + #[test] + fn stringify_nested_schema_converts_nested_object_and_array() { + let schema = json!({ + "type": "object", + "properties": { + "meta": { + "type": "object", + "description": "metadata", + "properties": { + "name": {"type": "string"} + } + }, + "tags": { + "type": "array", + "items": {"type": "string"} + } + } + }); + + let output = stringify_nested_tool_schema(&schema); + let props = output + .get("properties") + .and_then(Value::as_object) + .expect("properties"); + assert_eq!(props["meta"]["type"], "string"); + assert!( + props["meta"]["description"] + .as_str() + .unwrap_or("") + .contains("JSON string") + ); + assert_eq!(props["tags"]["type"], "string"); + } + + #[test] + fn stringify_and_parse_tool_input_round_trip() { + let schema = json!({ + "type": "object", + "properties": { + "meta": { + "type": "object", + "properties": { + "name": {"type": "string"} + } + }, + "name": {"type": "string"} + } + }); + + let input = json!({ + "meta": {"name": "coco"}, + "name": "demo" + }); + let stringified = stringify_tool_input_value(&input, &schema); + assert!(stringified.get("meta").is_some()); + assert!(stringified["meta"].is_string()); + + let parsed = parse_stringified_tool_input(stringified, &schema); + assert_eq!(parsed, input); + } } diff --git a/src/agent/agent.toml b/src/agent/agent.toml index 0177c0f..eb33f1b 100644 --- a/src/agent/agent.toml +++ b/src/agent/agent.toml @@ -1,6 +1,6 @@ [agent] # Builtin available tools -tools = ["bash", "read", "list", "str_replace", "run_task"] +tools = ["bash", "read", "list", "str_replace", "run_task", "run_combo"] # Builtin system prompt (inline content) [agent.system_prompt] diff --git a/src/agent/executor.rs b/src/agent/executor.rs index af44138..9b07911 100644 --- a/src/agent/executor.rs +++ b/src/agent/executor.rs @@ -4,6 +4,7 @@ use std::{ }; use lazy_static::lazy_static; +use serde_json::Value; use snafu::prelude::*; use tokio_util::sync::CancellationToken; @@ -11,9 +12,10 @@ use super::bash_executor; use crate::{ AppliedTextEdit, OutputChunk, TextEdit, error, tools::{ - self, BASH_TOOL_NAME, BashInput, BashTool, Final, LIST_TOOL_NAME, ListTool, READ_TOOL_NAME, - RUN_TASK_TOOL_NAME, ReadTool, RunTaskTool, STR_REPLACE_TOOL_NAME, StrReplaceTool, - SubagentEvent, Tool, run_bash_chunked, run_task, + self, BASH_TOOL_NAME, BashInput, BashTool, ComboEvent, Final, LIST_TOOL_NAME, ListTool, + READ_TOOL_NAME, RUN_COMBO_TOOL_NAME, RUN_TASK_TOOL_NAME, ReadTool, RunComboTool, + RunTaskTool, STR_REPLACE_TOOL_NAME, StrReplaceTool, SubagentEvent, Tool, run_bash_chunked, + run_combo, run_task, }, }; @@ -86,6 +88,7 @@ pub enum Output { TextEdit(TextEdit), ToolOutput(OutputChunk), SubagentOutput(SubagentEvent), + ComboOutput(ComboEvent), Denied, AskPermission, } @@ -288,6 +291,7 @@ impl Executor { | READ_TOOL_NAME | LIST_TOOL_NAME | RUN_TASK_TOOL_NAME + | RUN_COMBO_TOOL_NAME ) { on_output(Output::AskPermission); @@ -393,6 +397,83 @@ impl Executor { return Ok(ExecuteStatus::Completed); } } + + // Special handling for run_combo tool to support streaming ComboEvents + if name == RUN_COMBO_TOOL_NAME + && let Some(run_combo_tool) = tool + .as_any() + .and_then(|any| any.downcast_ref::()) + { + let input_value = match input { + Input::Starter(v) => v, + _ => { + on_output(Output::Failure(Final::from( + "run_combo requires Starter input", + ))); + return Ok(ExecuteStatus::Completed); + } + }; + + let ctx = run_combo_tool.context(); + + let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel(); + + let task_cancel = cancel_token.clone(); + let task_handle = tokio::spawn(async move { + run_combo( + ctx, + Input::Starter(input_value), + task_cancel, + move |event| { + let _ = event_tx.send(event.clone()); + }, + ) + .await + }); + + let mut task_handle = std::pin::pin!(task_handle); + + loop { + tokio::select! { + event = event_rx.recv() => { + match event { + Some(e) => on_output(Output::ComboOutput(e)), + None => { + match task_handle.await { + Ok(output) => { + on_output(output.into()); + } + Err(e) => { + on_output(Output::Failure(Final::from(format!("Combo task panicked: {}", e)))); + } + } + break; + } + } + } + result = &mut task_handle => { + while let Ok(e) = event_rx.try_recv() { + on_output(Output::ComboOutput(e)); + } + match result { + Ok(output) => { + on_output(output.into()); + } + Err(e) => { + on_output(Output::Failure(Final::from(format!("Combo task panicked: {}", e)))); + } + } + break; + } + } + } + + if cancel_token.is_cancelled() { + return Ok(ExecuteStatus::Cancelled); + } else { + return Ok(ExecuteStatus::Completed); + } + } // Fall through to generic execution if downcast fails let output = tokio::select! { @@ -466,6 +547,10 @@ impl Executor { .collect() } + pub fn tool_input_schema(&self, name: &str) -> Option { + self.tools.get(name).map(|tool| tool.input_schema()) + } + fn normalize_tool_names(names: &[String]) -> HashSet { names .iter() diff --git a/src/combo/starter.rs b/src/combo/starter.rs index 1b0d975..0f2539d 100644 --- a/src/combo/starter.rs +++ b/src/combo/starter.rs @@ -451,7 +451,8 @@ fn execute_command( .chain(args.iter().cloned()) .collect::>(); let mut proc = match ExecCommand::from_argv(argv) - .stdin(Stdio::piped()) + .stdin(Stdio::null()) + .disable_tty() .envs(merged_envs) .spawn_chunked(ChunkConfig::default()) { diff --git a/src/config.rs b/src/config.rs index 99ed043..170f63a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -27,7 +27,8 @@ pub use mcp::{ McpConfig, McpServerCommandConfig, McpServerConfig, McpServerConnection, McpServerHttpConfig, }; pub use provider::{ - ModelRequestConfig, ProviderConfig, ProviderKind, RequestOptions, ThinkingBlocksMode, + ModelPreset, ModelRequestConfig, ProviderConfig, ProviderKind, RequestOptions, + ThinkingBlocksMode, }; pub use ui::{MarkdownRenderEngine, UI}; @@ -55,7 +56,7 @@ pub struct Config { #[serde(default, skip_serializing_if = "Vec::is_empty")] pub providers: Vec, #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub model_presets: Vec, + pub model_presets: Vec, #[serde(default, skip_serializing_if = "Option::is_none")] pub allow_tools: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -106,10 +107,10 @@ impl Config { } } -fn apply_model_presets(options: &mut RequestOptions, presets: &[ModelRequestConfig], model: &str) { +fn apply_model_presets(options: &mut RequestOptions, presets: &[ModelPreset], model: &str) { for preset in presets { if preset.model.eq_ignore_ascii_case(model) { - options.apply_override(preset); + options.apply_override(&preset.options); } } } @@ -414,8 +415,8 @@ fn table_name_from_table(table: &toml::value::Table, path: &str) -> Result String { @@ -661,13 +662,15 @@ safe_commands_mode = \"override\"\n\ #[test] fn request_options_config_overrides_builtin() { let mut config = Config::default(); - config.model_presets.push(ModelRequestConfig { + config.model_presets.push(ModelPreset { model: "deepseek-reasoner".to_string(), - disable_tool_choice: Some(false), - tool_choice_fallback: Some(false), - combo_reply_retries: Some(0), - max_tokens: Some(2048), - ..ModelRequestConfig::default() + options: ModelRequestConfig { + disable_tool_choice: Some(false), + tool_choice_fallback: Some(false), + combo_reply_retries: Some(0), + max_tokens: Some(2048), + ..ModelRequestConfig::default() + }, }); let options = config.request_options_for_model("deepseek-reasoner"); assert!(!options.disable_tool_choice); diff --git a/src/config/presets.rs b/src/config/presets.rs index b39aea1..8be3f62 100644 --- a/src/config/presets.rs +++ b/src/config/presets.rs @@ -2,16 +2,16 @@ use std::sync::OnceLock; use serde::Deserialize; -use super::provider::ModelRequestConfig; +use super::provider::ModelPreset; #[derive(Debug, Deserialize)] struct PresetFile { #[serde(default)] - model_presets: Vec, + model_presets: Vec, } -pub(crate) fn builtin_model_presets() -> Vec { - static PRESETS: OnceLock> = OnceLock::new(); +pub(crate) fn builtin_model_presets() -> Vec { + static PRESETS: OnceLock> = OnceLock::new(); PRESETS .get_or_init(|| { let content = include_str!("presets.toml"); diff --git a/src/config/provider.rs b/src/config/provider.rs index 5e04fd7..7433c2b 100644 --- a/src/config/provider.rs +++ b/src/config/provider.rs @@ -21,7 +21,6 @@ pub enum ThinkingBlocksMode { #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct ModelRequestConfig { - pub model: String, #[serde(default, skip_serializing_if = "Option::is_none")] pub disable_tools: Option, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -35,6 +34,8 @@ pub struct ModelRequestConfig { #[serde(default, skip_serializing_if = "Option::is_none")] pub disable_stream: Option, #[serde(default, skip_serializing_if = "Option::is_none")] + pub stringify_nested_tool_inputs: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] pub offload_combo_reply: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub combo_reply_retries: Option, @@ -46,6 +47,15 @@ pub struct ModelRequestConfig { pub temperature: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub max_tokens: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub thinking_budget_tokens: Option, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct ModelPreset { + pub model: String, + #[serde(default, flatten)] + pub options: ModelRequestConfig, } #[derive(Debug, Clone, Default)] @@ -56,12 +66,14 @@ pub struct RequestOptions { pub thinking_blocks: ThinkingBlocksMode, pub ensure_toolcall_thinking: bool, pub disable_stream: bool, + pub stringify_nested_tool_inputs: bool, pub offload_combo_reply: Option, pub combo_reply_retries: usize, pub context_window: Option, pub can_reason: Option, pub temperature: Option, pub max_tokens: Option, + pub thinking_budget_tokens: Option, } impl RequestOptions { @@ -84,6 +96,9 @@ impl RequestOptions { if let Some(value) = override_config.disable_stream { self.disable_stream = value; } + if let Some(value) = override_config.stringify_nested_tool_inputs { + self.stringify_nested_tool_inputs = value; + } if let Some(value) = override_config.offload_combo_reply { self.offload_combo_reply = Some(value); } @@ -102,6 +117,9 @@ impl RequestOptions { if override_config.max_tokens.is_some() { self.max_tokens = override_config.max_tokens; } + if let Some(value) = override_config.thinking_budget_tokens { + self.thinking_budget_tokens = Some(value); + } } } @@ -112,19 +130,14 @@ pub struct ProviderConfig { pub kind: ProviderKind, pub api_key: EnvString, pub base_url: String, - #[serde(default)] - pub thinking_budget_tokens: Option, /// Optional list of supported models. /// If None or empty, this provider accepts any model (wildcard). #[serde(default)] pub models: Option>, - /// When true, combo reply uses Bash tool to call `coco reply` command - /// instead of the built-in combo_reply tool. This offloads the structured - /// response extraction to an external command. - #[serde(default)] - pub offload_combo_reply: bool, + #[serde(default, flatten)] + pub request_overrides: ModelRequestConfig, } impl std::fmt::Debug for ProviderConfig { diff --git a/src/exec.rs b/src/exec.rs index 4edc077..c313e3c 100644 --- a/src/exec.rs +++ b/src/exec.rs @@ -112,6 +112,7 @@ pub struct ExecCommand { envs: Vec<(String, String)>, env_remove: Vec, stdin: Stdio, + disable_tty: bool, } impl ExecCommand { @@ -121,6 +122,7 @@ impl ExecCommand { envs: Vec::new(), env_remove: Vec::new(), stdin: Stdio::null(), + disable_tty: false, } } @@ -147,6 +149,11 @@ impl ExecCommand { self } + pub fn disable_tty(mut self) -> Self { + self.disable_tty = true; + self + } + pub fn spawn_chunked(self, config: ChunkConfig) -> io::Result { if self.argv.is_empty() { return Err(io::Error::new( @@ -167,6 +174,10 @@ impl ExecCommand { cmd.env_remove(key); } cmd.envs(self.envs.iter().map(|(k, v)| (k, v))); + if self.disable_tty { + cmd.env_remove("GPG_TTY"); + cmd.env_remove("SSH_TTY"); + } let mut child = cmd.spawn()?; let _ = child.stdin.take(); diff --git a/src/tools.rs b/src/tools.rs index 8299a11..7908d56 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -19,6 +19,7 @@ macro_rules! err_msg { mod bash; mod list; mod read; +mod run_combo; mod run_task; mod str_replace; @@ -29,6 +30,10 @@ pub use list::{DEFAULT_ENTRY_LIMIT, LIST_TOOL_NAME, ListInput, ListTool, MAX_ENT pub use read::{ DEFAULT_LINE_LIMIT, DEFAULT_LINE_OFFSET, MAX_LINE_LIMIT, READ_TOOL_NAME, ReadInput, ReadTool, }; +pub use run_combo::{ + ComboEvent, ComboInfo, ComboStreamKind, RUN_COMBO_TOOL_NAME, RunComboContext, RunComboInput, + RunComboOutput, RunComboTool, run_combo, +}; pub use run_task::{ RUN_TASK_TOOL_NAME, RunTaskContext, RunTaskInput, RunTaskOutput, RunTaskTool, SubagentEvent, ToolStatus, run_task, diff --git a/src/tools/run_combo.rs b/src/tools/run_combo.rs new file mode 100644 index 0000000..4e7f96c --- /dev/null +++ b/src/tools/run_combo.rs @@ -0,0 +1,1528 @@ +//! Run Combo Tool - Executes combo scripts within agent context. +//! +//! This tool allows the agent to execute combo scripts that have been +//! discovered during startup. Combos are executable scripts that can +//! perform complex operations with recorded tool calls. + +use std::{path::PathBuf, sync::Arc}; + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use serde_json::{Value, json}; +use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; +use tracing::{debug, warn}; + +use super::{ + BASH_TOOL_NAME, BashInput, ExecuteResult, Final, Input, Output, Tool, prepare_mcp_envs, + run_bash_chunked, +}; +use crate::{ + Agent, Block, ChatStreamUpdate, Combo, Config, Content, Message, OutputChunk, PromptSchema, + SessionEnv, Starter, StarterCommand, StarterError, StarterEvent, ThinkingConfig, ToolUse, + bash_unsafe_ranges, bash_unsafe_reason, discover_starters, exec::StreamKind, + parse_primary_command, workspace_dir, +}; + +pub const RUN_COMBO_TOOL_NAME: &str = "run_combo"; + +type ComboEventCallback = Arc>>; + +/// Input parameters for the run_combo tool. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RunComboInput { + /// Name of the combo to execute. + pub combo_name: String, + /// Arguments passed to the combo starter. + #[serde(default)] + pub args: Vec, +} + +/// Output from the run_combo tool. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RunComboOutput { + /// Whether the combo completed successfully. + pub success: bool, + /// Summary of the combo execution. + pub summary: String, + /// Number of tool calls made during execution. + pub tool_calls: usize, + /// Optional error message if failed. + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ComboStreamKind { + Plain, + Thinking, +} + +/// Event emitted during combo execution. +#[derive(Debug, Clone)] +pub enum ComboEvent { + /// Combo not found. + NotFound { + /// Combo name. + name: String, + }, + /// Combo is executing. + Executing { + /// Combo name. + name: String, + /// Command line with args. + command_line: String, + }, + /// Regular output chunk (stdout/stderr). + Output { + /// Combo name. + name: String, + /// Output chunk. + chunk: OutputChunk, + }, + /// Tool record started. + RecordStart { + /// Combo name. + name: String, + /// Tool use info. + tool_use: crate::ToolUse, + }, + /// Tool record output. + RecordOutput { + /// Combo name. + name: String, + /// Tool use ID. + tool_use_id: String, + /// Output chunk. + chunk: OutputChunk, + }, + /// Tool record ended. + RecordEnd { + /// Combo name. + name: String, + /// Tool use ID. + tool_use_id: String, + /// Whether the tool failed. + is_error: bool, + /// Tool output. + output: Final, + }, + /// Prompt message from combo. + Prompt { + /// Combo name. + name: String, + /// Prompt text. + prompt: String, + /// Optional thinking config. + thinking: Option, + }, + /// Prompt stream update for combo reply. + PromptStream { + /// Combo name. + name: String, + /// Stream index. + index: usize, + /// Stream kind. + kind: ComboStreamKind, + /// Streamed text. + text: String, + }, + /// Reply tool use from prompt. + ReplyToolUse { + /// Combo name. + name: String, + /// Tool use info. + tool_use: ToolUse, + /// Thinking blocks. + thinking: Vec, + /// Whether this is an offload reply (executed via bash). + offload: bool, + }, + /// Reply tool result for offload. + ReplyToolResult { + /// Combo name. + name: String, + /// Tool use ID. + tool_use_id: String, + /// Whether the tool failed. + is_error: bool, + /// Tool output. + output: Final, + }, + /// Reply tool error. + ReplyToolError { + /// Error message. + message: String, + }, + /// Combo execution finished. + Executed { + /// Combo name. + name: String, + /// Starter summary. + starter: Starter, + /// Exit code. + exit_code: Option, + }, + /// Combo execution was cancelled. + Cancelled { + /// Combo name if available. + name: Option, + }, +} + +/// Information about a discovered combo. +#[derive(Debug, Clone)] +pub struct ComboInfo { + /// Path to the combo executable. + pub path: String, + /// Combo metadata. + pub combo: Combo, +} + +/// Context needed to create and run combo instances. +#[derive(Clone)] +pub struct RunComboContext { + /// Available combos discovered at startup. + pub combos: Vec, + /// Environment variables for combo execution. + pub envs: Vec<(String, String)>, + /// Config for combo reply agent. + pub config: Config, + /// System prompt used for combo reply. + pub system_prompt: String, + /// Optional model override for combo reply. + pub model_override: Option, + /// Whether thinking is enabled for combo reply. + pub thinking_enabled: bool, + /// Whether to ignore workspace combo scripts. + pub ignore_workspace_scripts: bool, +} + +/// Tool for executing combo scripts. +pub struct RunComboTool { + context: Arc>, +} + +impl RunComboTool { + /// Create a new RunComboTool with the given context. + pub fn new(context: RunComboContext) -> Self { + Self { + context: Arc::new(Mutex::new(context)), + } + } + + /// Create a new RunComboTool with a shared context. + /// This allows external code to update the combo list after tool creation. + pub fn new_with_shared_context(context: Arc>) -> Self { + Self { context } + } + + /// Get the shared context. + pub fn context(&self) -> Arc> { + self.context.clone() + } +} + +#[async_trait] +impl Tool for RunComboTool { + fn name(&self) -> &'static str { + RUN_COMBO_TOOL_NAME + } + + fn description(&self) -> &'static str { + "Execute a combo script to perform predefined operations." + } + + fn input_schema(&self) -> Value { + json!({ + "type": "object", + "properties": { + "combo_name": { + "type": "string", + "description": "Name of the combo to execute" + }, + "args": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Arguments passed to the combo starter" + } + }, + "required": ["combo_name"] + }) + } + + async fn execute<'a>(&self, input: Input<'a>) -> ExecuteResult { + run_combo( + self.context.clone(), + input, + CancellationToken::new(), + |_| {}, + ) + .await + } + + fn as_any(&self) -> Option<&dyn std::any::Any> { + Some(self) + } +} + +/// Execute the run_combo tool with event callback support. +pub async fn run_combo( + context: Arc>, + input: Input<'_>, + cancel_token: CancellationToken, + on_event: F, +) -> ExecuteResult +where + F: FnMut(&ComboEvent) + Send + 'static, +{ + let Input::Starter(input) = input else { + return err_msg!("Input should be Starter variant"); + }; + + let input: RunComboInput = match serde_json::from_value(input) { + Ok(v) => v, + Err(e) => { + return Final::from(json!({ + "success": false, + "summary": "", + "tool_calls": 0, + "error": format!("Invalid input: {}", e) + })) + .err(); + } + }; + + debug!(combo_name = %input.combo_name, "Starting combo execution"); + + // Wrap callback early so errors can emit events. + let on_event_boxed: ComboEventCallback = Arc::new(std::sync::Mutex::new(Box::new(on_event))); + + let (envs, config, system_prompt, model_override, thinking_enabled, ignore_workspace_scripts) = { + let ctx = context.lock().await; + ( + ctx.envs.clone(), + ctx.config.clone(), + ctx.system_prompt.clone(), + ctx.model_override.clone(), + ctx.thinking_enabled, + ctx.ignore_workspace_scripts, + ) + }; + + let mut combo_info = { + let ctx = context.lock().await; + ctx.combos + .iter() + .find(|c| c.combo.metadata.name == input.combo_name) + .cloned() + }; + + if combo_info.is_none() { + let discovered = + discover_combo_infos(&config, cancel_token.clone(), ignore_workspace_scripts).await; + if discovered.cancelled || cancel_token.is_cancelled() { + if let Ok(mut f) = on_event_boxed.lock() { + (*f)(&ComboEvent::Cancelled { + name: Some(input.combo_name.clone()), + }); + } + return Final::from(json!({ + "success": false, + "summary": "", + "tool_calls": 0, + "error": "Cancelled" + })) + .err(); + } + { + let mut ctx = context.lock().await; + ctx.combos = discovered.combos.clone(); + } + combo_info = discovered + .combos + .iter() + .find(|c| c.combo.metadata.name == input.combo_name) + .cloned(); + } + + let Some(combo_info) = combo_info else { + let available: Vec = { + let ctx = context.lock().await; + ctx.combos + .iter() + .map(|c| c.combo.metadata.name.clone()) + .collect() + }; + let missing_name = input.combo_name.clone(); + if let Ok(mut f) = on_event_boxed.lock() { + (*f)(&ComboEvent::NotFound { + name: missing_name.clone(), + }); + } + return Final::from(json!({ + "success": false, + "summary": "", + "tool_calls": 0, + "error": format!( + "Combo '{}' not found. Available: {:?}", + missing_name, available + ) + })) + .err(); + }; + + let combo_path = combo_info.path.clone(); + let combo_name = combo_info.combo.metadata.name.clone(); + let combo_args = input.args.clone(); + + // Execute combo + let result = execute_combo( + combo_path, + combo_name, + combo_args, + envs, + config, + system_prompt, + model_override, + thinking_enabled, + cancel_token, + on_event_boxed, + ) + .await; + + match result { + Ok(output) => { + debug!( + success = output.success, + tool_calls = output.tool_calls, + summary_len = output.summary.len(), + "run_combo completed" + ); + let json_output = serde_json::to_value(&output) + .unwrap_or_else(|_| json!({"success": false, "error": "Serialization failed"})); + if output.success { + Final::from(json_output).ok() + } else { + Final::from(json_output).err() + } + } + Err(e) => { + debug!(error = %e, "run_combo failed"); + Final::from(json!({ + "success": false, + "summary": "", + "tool_calls": 0, + "error": e + })) + .err() + } + } +} + +#[allow(clippy::too_many_arguments)] +async fn execute_combo( + combo_path: String, + combo_name: String, + args: Vec, + envs: Vec<(String, String)>, + config: Config, + system_prompt: String, + model_override: Option, + thinking_enabled: bool, + cancel_token: CancellationToken, + on_event: ComboEventCallback, +) -> Result { + let session_env = SessionEnv::builder() + .build() + .map_err(|e| format!("Failed to create session env: {}", e))?; + let session_socket_path = session_env.socket_path().to_path_buf(); + let mut exec_envs = match prepare_mcp_envs().await { + Ok(envs) => envs, + Err(err) => { + warn!(?err, "Failed to prepare MCP env for combo execution"); + Vec::new() + } + }; + exec_envs.extend(envs); + + let mut reply_agent = build_reply_agent( + config, + system_prompt.clone(), + model_override, + thinking_enabled, + ); + + let now_ms = || { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(0) + }; + + let on_event_for_emit = on_event.clone(); + + // Buffers for aggregating output into lines + let stdout_buffer = Arc::new(std::sync::Mutex::new(String::new())); + let stderr_buffer = Arc::new(std::sync::Mutex::new(String::new())); + + let mut tool_calls = 0; + let mut summary_parts: Vec = Vec::new(); + let mut exit_code: Option = None; + let mut failed = false; + let mut cancelled = false; + + let command_line = format_command_line(&combo_path, &args); + // Emit executing event + emit_combo_event( + &on_event_for_emit, + ComboEvent::Executing { + name: combo_name.clone(), + command_line, + }, + ); + + let mut execution = StarterCommand::new(&combo_path) + .args(args) + .envs(exec_envs) + .session_env(session_env) + .execute(); + + loop { + tokio::select! { + _ = cancel_token.cancelled(), if !cancelled => { + cancelled = true; + execution.cancel(); + } + event = futures_util::StreamExt::next(&mut execution) => { + let Some(event) = event else { break }; + + match event { + StarterEvent::Started { .. } => {} + StarterEvent::Output { chunk } => { + emit_combo_event(&on_event_for_emit, ComboEvent::Output { + name: combo_name.clone(), + chunk: chunk.clone(), + }); + + // Aggregate output for summary + for line in &chunk.lines { + if !line.trim().is_empty() { + summary_parts.push(line.clone()); + } + } + } + StarterEvent::RecordStart { tool_use } => { + tool_calls += 1; + reply_agent + .append_message(build_tool_use_message(&tool_use)) + .await; + emit_combo_event(&on_event_for_emit, ComboEvent::RecordStart { + name: combo_name.clone(), + tool_use, + }); + } + StarterEvent::RecordOutput { tool_use_id, chunk } => { + let tool_use_id_summary = tool_use_id.clone(); + emit_combo_event(&on_event_for_emit, ComboEvent::RecordOutput { + name: combo_name.clone(), + tool_use_id, + chunk: chunk.clone(), + }); + for line in &chunk.lines { + if !line.trim().is_empty() { + summary_parts.push(format!("[{}] {}", tool_use_id_summary, line)); + } + } + } + StarterEvent::RecordEnd { + tool_use_id, + is_error, + output, + } => { + reply_agent + .append_message(build_tool_result_message( + &tool_use_id, + is_error, + &output, + )) + .await; + emit_combo_event(&on_event_for_emit, ComboEvent::RecordEnd { + name: combo_name.clone(), + tool_use_id, + is_error, + output: output.clone(), + }); + if is_error { + failed = true; + } + } + StarterEvent::Prompt { prompt } => { + reply_agent + .append_message(build_prompt_message(&prompt)) + .await; + emit_combo_event(&on_event_for_emit, ComboEvent::Prompt { + name: combo_name.clone(), + prompt: prompt.clone(), + thinking: None, + }); + summary_parts.push(format!("[Prompt] {}", prompt)); + } + StarterEvent::PromptRequest { + prompt, + schemas, + responder, + thinking, + } => { + reply_agent + .append_message(build_prompt_message(&prompt)) + .await; + emit_combo_event(&on_event_for_emit, ComboEvent::Prompt { + name: combo_name.clone(), + prompt: prompt.clone(), + thinking: thinking.clone(), + }); + summary_parts.push(format!("[Prompt] {}", prompt)); + + if reply_agent.offload_combo_reply() { + let result = handle_offload_combo_reply_with_retry( + &mut reply_agent, + &schemas, + &combo_name, + cancel_token.clone(), + session_socket_path.clone(), + &on_event_for_emit, + ) + .await; + if let Err(err) = result { + emit_combo_event( + &on_event_for_emit, + ComboEvent::ReplyToolError { + message: err.to_string(), + }, + ); + } + } else { + let disable_stream = + reply_agent.disable_stream_for_current_model(); + let mut streamed_thinking = false; + let reply = if cancel_token.is_cancelled() { + Err("prompt reply cancelled".to_string()) + } else if disable_stream { + reply_agent + .reply_prompt_with_thinking( + &system_prompt, + schemas.clone(), + thinking.clone(), + ) + .await + .map_err(|err| err.to_string()) + } else { + let stream_name = combo_name.clone(); + let thinking_seen = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let thinking_seen_stream = thinking_seen.clone(); + let on_event_stream = on_event_for_emit.clone(); + let reply = reply_agent + .reply_prompt_stream_with_thinking( + &system_prompt, + schemas.clone(), + thinking.clone(), + cancel_token.clone(), + move |update| { + let (index, kind, text) = match update { + ChatStreamUpdate::Plain { index, text } => { + (index, ComboStreamKind::Plain, text) + } + ChatStreamUpdate::Thinking { index, text } => { + thinking_seen_stream + .store(true, std::sync::atomic::Ordering::Relaxed); + (index, ComboStreamKind::Thinking, text) + } + }; + emit_combo_event( + &on_event_stream, + ComboEvent::PromptStream { + name: stream_name.clone(), + index, + kind, + text, + }, + ); + }, + ) + .await + .map_err(|err| err.to_string()); + streamed_thinking = + thinking_seen.load(std::sync::atomic::Ordering::Relaxed); + reply + }; + if let Ok(reply) = &reply { + let thinking_blocks = if streamed_thinking { + Vec::new() + } else { + reply.thinking.clone() + }; + emit_combo_event( + &on_event_for_emit, + ComboEvent::ReplyToolUse { + name: combo_name.clone(), + tool_use: reply.tool_use.clone(), + thinking: thinking_blocks, + offload: false, + }, + ); + } + let response = reply.map(|reply| reply.response); + if let Err(err) = &response { + emit_combo_event( + &on_event_for_emit, + ComboEvent::ReplyToolError { + message: err.clone(), + }, + ); + } + if let Err(err) = responder.send(response) { + emit_combo_event( + &on_event_for_emit, + ComboEvent::ReplyToolError { message: err }, + ); + } + } + } + StarterEvent::Finished { exit_code: code } => { + exit_code = code; + } + StarterEvent::Cancelled => { + cancelled = true; + } + StarterEvent::Failed { reason } => { + failed = true; + summary_parts.push(format!("[Failed] {}", reason)); + } + } + } + } + } + + // Wait for execution to complete + let starter = match execution.wait().await { + Ok(starter) => starter, + Err(err) => { + let reason = format!("starter join error: {err}"); + let starter = Starter { + path: combo_path.clone(), + combo: Err(StarterError::Invalid { + reason: reason.clone(), + }), + }; + emit_combo_event( + &on_event_for_emit, + ComboEvent::Executed { + name: combo_name.clone(), + starter, + exit_code, + }, + ); + return Err(format!("Join error: {}", err)); + } + }; + + // Flush remaining buffers + flush_buffer( + &combo_name, + &stdout_buffer, + StreamKind::Stdout, + &on_event, + now_ms, + ); + flush_buffer( + &combo_name, + &stderr_buffer, + StreamKind::Stderr, + &on_event, + now_ms, + ); + + if cancelled || matches!(&starter.combo, Err(StarterError::Cancelled)) { + emit_combo_event( + &on_event_for_emit, + ComboEvent::Cancelled { + name: Some(combo_name.clone()), + }, + ); + return Ok(RunComboOutput { + success: false, + summary: "Combo execution was cancelled".to_string(), + tool_calls, + error: Some("Cancelled".to_string()), + }); + } + + emit_combo_event( + &on_event_for_emit, + ComboEvent::Executed { + name: combo_name.clone(), + starter: starter.clone(), + exit_code, + }, + ); + + // Check combo result + if let Err(e) = starter.combo { + return Ok(RunComboOutput { + success: false, + summary: summary_parts.join("\n"), + tool_calls, + error: Some(format!("Combo error: {}", e)), + }); + } + + let success = !failed && exit_code.map(|c| c == 0).unwrap_or(true); + let fallback_summary = if summary_parts.is_empty() { + format!( + "Combo '{}' completed with {} tool call(s)", + combo_name, tool_calls + ) + } else { + let max_lines = 10; + let start = summary_parts.len().saturating_sub(max_lines); + summary_parts[start..].join("\n") + }; + let summary = if cancel_token.is_cancelled() { + fallback_summary + } else { + match generate_combo_summary(&mut reply_agent, &combo_name, tool_calls, exit_code, failed) + .await + { + Ok(summary) => summary, + Err(err) => { + warn!(?err, "Failed to generate combo summary"); + fallback_summary + } + } + }; + + Ok(RunComboOutput { + success, + summary, + tool_calls, + error: if failed { + Some("One or more tool calls failed".to_string()) + } else { + None + }, + }) +} + +fn emit_combo_event(on_event: &ComboEventCallback, event: ComboEvent) { + if let Ok(mut f) = on_event.lock() { + (*f)(&event); + } +} + +fn build_reply_agent( + config: Config, + system_prompt: String, + model_override: Option, + thinking_enabled: bool, +) -> Agent { + let mut agent = Agent::new(config); + agent.set_system_prompt(&system_prompt); + agent.set_model_override(model_override); + agent.set_thinking_enabled(thinking_enabled); + agent +} + +fn build_tool_use_message(tool_use: &ToolUse) -> Message { + Message::assistant(Content::Multiple(vec![Block::tool_use( + &tool_use.id, + &tool_use.name, + tool_use.input.clone(), + )])) +} + +fn build_tool_result_message(tool_use_id: &str, is_error: bool, output: &Final) -> Message { + Message::user(Content::Multiple(vec![Block::tool_result( + tool_use_id, + Some(is_error), + final_to_tool_content(output), + )])) +} + +fn build_prompt_message(prompt: &str) -> Message { + Message::user(Content::Text(prompt.to_string())) +} + +const TOOL_RESULT_MAX_BYTES: usize = 80 * 1024; +const TOOL_RESULT_TRUNCATION_SUFFIX: &str = "\n... (truncated)"; + +fn final_to_tool_content(output: &Final) -> Content { + let text = match output { + Final::Json(value) => truncate_json_tool_output(value, TOOL_RESULT_MAX_BYTES) + .unwrap_or_else(|| { + let raw = serde_json::to_string(value).unwrap_or_else(|_| value.to_string()); + truncate_with_suffix(&raw, TOOL_RESULT_MAX_BYTES, TOOL_RESULT_TRUNCATION_SUFFIX) + }), + Final::Message(message) => truncate_with_suffix( + message, + TOOL_RESULT_MAX_BYTES, + TOOL_RESULT_TRUNCATION_SUFFIX, + ), + }; + Content::Text(text) +} + +fn truncate_json_tool_output(value: &Value, max_bytes: usize) -> Option { + let obj = value.as_object()?; + let stdout_value = obj.get("stdout").and_then(|value| value.as_str()); + let stderr_value = obj.get("stderr").and_then(|value| value.as_str()); + if stdout_value.is_none() && stderr_value.is_none() { + return None; + } + + let serialized = serde_json::to_string(value).ok()?; + if serialized.len() <= max_bytes { + return Some(serialized); + } + + let stdout = stdout_value.unwrap_or(""); + let stderr = stderr_value.unwrap_or(""); + let stdout_len = stdout.len(); + let stderr_len = stderr.len(); + + let mut base = obj.clone(); + if stdout_value.is_some() { + base.insert("stdout".to_string(), Value::String(String::new())); + } + if stderr_value.is_some() { + base.insert("stderr".to_string(), Value::String(String::new())); + } + base.insert("_truncated".to_string(), Value::Bool(true)); + let base_text = serde_json::to_string(&Value::Object(base)).ok()?; + if base_text.len() >= max_bytes { + return Some(truncate_with_suffix( + &base_text, + max_bytes, + TOOL_RESULT_TRUNCATION_SUFFIX, + )); + } + + let available = max_bytes - base_text.len(); + let total_len = stdout_len + stderr_len; + let (mut stdout_budget, mut stderr_budget) = if total_len == 0 { + (0, 0) + } else if stderr_len == 0 { + (available, 0) + } else if stdout_len == 0 { + (0, available) + } else { + let stdout_budget = available * stdout_len / total_len; + let stderr_budget = available.saturating_sub(stdout_budget); + (stdout_budget, stderr_budget) + }; + + let mut last_text = base_text; + for _ in 0..5 { + let mut out = obj.clone(); + if stdout_value.is_some() { + let truncated = truncate_to_boundary(stdout, stdout_budget); + out.insert("stdout".to_string(), Value::String(truncated.to_string())); + } + if stderr_value.is_some() { + let truncated = truncate_to_boundary(stderr, stderr_budget); + out.insert("stderr".to_string(), Value::String(truncated.to_string())); + } + out.insert("_truncated".to_string(), Value::Bool(true)); + let text = serde_json::to_string(&Value::Object(out)).ok()?; + if text.len() <= max_bytes { + return Some(text); + } + + last_text = text; + if stdout_budget == 0 && stderr_budget == 0 { + break; + } + let overshoot = last_text.len().saturating_sub(max_bytes); + if stdout_budget >= stderr_budget { + stdout_budget = stdout_budget.saturating_sub(overshoot); + } else { + stderr_budget = stderr_budget.saturating_sub(overshoot); + } + } + + Some(truncate_with_suffix( + &last_text, + max_bytes, + TOOL_RESULT_TRUNCATION_SUFFIX, + )) +} + +fn truncate_with_suffix(text: &str, max_bytes: usize, suffix: &str) -> String { + if text.len() <= max_bytes { + return text.to_string(); + } + if max_bytes == 0 { + return String::new(); + } + + let suffix = if max_bytes <= suffix.len() { + truncate_to_boundary(suffix, max_bytes) + } else { + suffix + }; + if max_bytes <= suffix.len() { + return suffix.to_string(); + } + + let keep_len = max_bytes - suffix.len(); + let prefix = truncate_to_boundary(text, keep_len); + let mut out = String::with_capacity(max_bytes); + out.push_str(prefix); + out.push_str(suffix); + out +} + +fn truncate_to_boundary(text: &str, max_bytes: usize) -> &str { + if text.len() <= max_bytes { + return text; + } + let mut end = max_bytes; + while end > 0 && !text.is_char_boundary(end) { + end -= 1; + } + &text[..end] +} + +fn extract_text_response(message: &Message) -> String { + match &message.content { + Content::Text(text) => text.clone(), + Content::Multiple(blocks) => blocks + .iter() + .filter_map(|block| { + if let Block::Text { text } = block { + Some(text.as_str()) + } else { + None + } + }) + .collect::>() + .join("\n"), + } +} + +async fn generate_combo_summary( + agent: &mut Agent, + combo_name: &str, + tool_calls: usize, + exit_code: Option, + failed: bool, +) -> Result { + let mut summary_agent = agent.clone(); + summary_agent.apply_tool_policies(Some(&[]), None); + let prompt = format!( + "Summarize the combo execution for the user.\n\ +- Provide 3-5 concise bullet points.\n\ +- Include status (success/failure), key actions, notable outputs or errors, and next steps if any.\n\ +- Base the summary on the tool uses, tool results, and prompts already in the conversation history.\n\ +- Do not call tools.\n\ +\n\ +Context:\n\ +combo_name: {combo_name}\n\ +tool_calls: {tool_calls}\n\ +exit_code: {exit_code}\n\ +failed: {failed}\n\ +", + exit_code = exit_code + .map(|code| code.to_string()) + .unwrap_or_else(|| "none".to_string()), + ); + let response = summary_agent + .chat(Message::user(Content::Text(prompt))) + .await + .map_err(|err| err.to_string())?; + let summary = extract_text_response(&response.message); + let summary = summary.trim(); + if summary.is_empty() { + return Err("summary response is empty".to_string()); + } + Ok(summary.to_string()) +} + +fn build_offload_reply_directive(schemas: &[PromptSchema]) -> String { + let field_args: Vec = schemas + .iter() + .map(|schema| format!("--{}=", schema.name)) + .collect(); + + let field_descriptions: Vec = schemas + .iter() + .map(|schema| format!("- --{}=: {}", schema.name, schema.description)) + .collect(); + + format!( + r#"You must respond by calling the bash tool to execute the `coco reply` command. +Use this exact format: +``` +coco reply {field_args} +``` + +Required fields: +{field_list} + +The value should be properly shell-escaped if it contains special characters. +Do not output any other text or explanation. Only call the bash tool with the coco reply command."#, + field_args = field_args.join(" "), + field_list = field_descriptions.join("\n"), + ) +} + +fn build_offload_reply_retry_directive(schemas: &[PromptSchema]) -> String { + let directive = build_offload_reply_directive(schemas); + format!("The previous response did not produce a valid coco reply. Retry.\n\n{directive}") +} + +enum OffloadCommandKind { + Coco, + Safe, + Unsafe, +} + +fn classify_offload_command(command: &str) -> OffloadCommandKind { + let is_coco_reply = is_coco_reply_command(command); + if is_coco_reply { + return OffloadCommandKind::Coco; + } + if is_safe_command(command) { + return OffloadCommandKind::Safe; + } + OffloadCommandKind::Unsafe +} + +fn build_offload_reply_guidance(schemas: &[PromptSchema], command: &str, executed: bool) -> String { + let field_args: Vec = schemas + .iter() + .map(|schema| format!("--{}=...", schema.name)) + .collect(); + let field_descriptions: Vec = schemas + .iter() + .map(|schema| format!("- {}: {}", schema.name, schema.description)) + .collect(); + let status = if executed { "executed" } else { "blocked" }; + format!( + "The previous tool call was {status} but did not use `coco reply` (command: {command}).\n\ +You must call the bash tool with `coco reply` and only that command.\n\ +Required fields:\n{field_list}\n\n\ +Example:\n\ +coco reply {field_args}", + field_list = field_descriptions.join("\n"), + field_args = field_args.join(" "), + ) +} + +#[derive(Debug)] +enum ComboReplyError { + Cancelled, + ChatFailed { message: String }, + MissingBashToolUse, + InvalidBashInput { message: String }, + UnexpectedCommand { command: String }, +} + +impl ComboReplyError { + fn should_retry(&self) -> bool { + matches!( + self, + ComboReplyError::MissingBashToolUse + | ComboReplyError::InvalidBashInput { .. } + | ComboReplyError::UnexpectedCommand { .. } + ) + } +} + +impl std::fmt::Display for ComboReplyError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ComboReplyError::Cancelled => write!(f, "prompt reply cancelled"), + ComboReplyError::ChatFailed { message } => write!(f, "chat failed: {message}"), + ComboReplyError::MissingBashToolUse => { + write!(f, "LLM did not return a bash tool call for coco reply") + } + ComboReplyError::InvalidBashInput { message } => { + write!(f, "failed to parse bash tool input: {message}") + } + ComboReplyError::UnexpectedCommand { command } => { + write!(f, "expected coco reply command, got: {command}") + } + } + } +} + +impl std::error::Error for ComboReplyError {} + +fn is_coco_command_name(name: &str) -> bool { + name == "coco" || name.ends_with("/coco") +} + +fn is_coco_reply_command(command: &str) -> bool { + let summary = match parse_primary_command(command) { + Ok(summary) => summary, + Err(_) => return false, + }; + if !is_coco_command_name(&summary.name) { + return false; + } + matches!(summary.args.first(), Some(arg) if arg == "reply") +} + +fn is_safe_command(command: &str) -> bool { + let trimmed = command.trim(); + !trimmed.is_empty() && bash_unsafe_ranges(command).is_empty() +} + +async fn handle_offload_combo_reply_with_retry( + agent: &mut Agent, + schemas: &[PromptSchema], + combo_name: &str, + cancel_token: CancellationToken, + session_socket_path: PathBuf, + on_event: &ComboEventCallback, +) -> Result<(), ComboReplyError> { + let max_retries = agent.combo_reply_retries(); + let mut attempt = 0usize; + loop { + if cancel_token.is_cancelled() { + return Err(ComboReplyError::Cancelled); + } + let directive = if attempt == 0 { + build_offload_reply_directive(schemas) + } else { + build_offload_reply_retry_directive(schemas) + }; + let response = handle_offload_combo_reply( + agent, + schemas, + combo_name, + cancel_token.clone(), + session_socket_path.clone(), + on_event, + &directive, + ) + .await; + match response { + Ok(()) => return Ok(()), + Err(err) => { + if attempt >= max_retries || !err.should_retry() { + return Err(err); + } + attempt += 1; + } + } + } +} + +async fn handle_offload_combo_reply( + agent: &mut Agent, + schemas: &[PromptSchema], + combo_name: &str, + cancel_token: CancellationToken, + session_socket_path: PathBuf, + on_event: &ComboEventCallback, + directive: &str, +) -> Result<(), ComboReplyError> { + if cancel_token.is_cancelled() { + return Err(ComboReplyError::Cancelled); + } + + agent + .append_message(Message::user(Content::Text(directive.to_string()))) + .await; + + let on_event_stream = on_event.clone(); + let stream_name = combo_name.to_string(); + let chat_response = agent + .chat_stream_with_history(cancel_token.clone(), move |update| { + let (index, kind, text) = match update { + ChatStreamUpdate::Plain { index, text } => (index, ComboStreamKind::Plain, text), + ChatStreamUpdate::Thinking { index, text } => { + (index, ComboStreamKind::Thinking, text) + } + }; + emit_combo_event( + &on_event_stream, + ComboEvent::PromptStream { + name: stream_name.clone(), + index, + kind, + text, + }, + ); + }) + .await + .map_err(|e| ComboReplyError::ChatFailed { + message: e.to_string(), + })?; + + let blocks = match &chat_response.message.content { + Content::Multiple(blocks) => blocks.as_slice(), + Content::Text(_) => &[], + }; + + let bash_tool_use = blocks + .iter() + .find_map(|block| { + if let Block::ToolUse(tool_use) = block + && tool_use.name == BASH_TOOL_NAME + { + return Some(tool_use.clone()); + } + None + }) + .ok_or(ComboReplyError::MissingBashToolUse)?; + + let bash_input: BashInput = + serde_json::from_value(bash_tool_use.input.clone()).map_err(|err| { + ComboReplyError::InvalidBashInput { + message: err.to_string(), + } + })?; + + let original_command = bash_input.command.clone(); + let command_kind = classify_offload_command(&bash_input.command); + + emit_combo_event( + on_event, + ComboEvent::ReplyToolUse { + name: combo_name.to_string(), + tool_use: bash_tool_use.clone(), + thinking: Vec::new(), + offload: true, + }, + ); + + if matches!(command_kind, OffloadCommandKind::Unsafe) { + let reason = match bash_unsafe_reason(&original_command) { + Ok(_) => "command not allowlisted".to_string(), + Err(reason) => reason, + }; + let output = Final::Message(format!("command rejected: {reason}; expected coco reply")); + agent + .append_message(build_tool_result_message(&bash_tool_use.id, true, &output)) + .await; + emit_combo_event( + on_event, + ComboEvent::ReplyToolResult { + name: combo_name.to_string(), + tool_use_id: bash_tool_use.id.clone(), + is_error: true, + output: output.clone(), + }, + ); + let prompt = build_offload_reply_guidance(schemas, &original_command, false); + agent + .append_message(Message::user(Content::Text(prompt))) + .await; + return Err(ComboReplyError::UnexpectedCommand { + command: original_command, + }); + } + + let bash_input_value = + serde_json::to_value(&bash_input).map_err(|err| ComboReplyError::InvalidBashInput { + message: err.to_string(), + })?; + + let extra_envs = vec![( + "COCO_SESSION_SOCK".to_string(), + session_socket_path.to_string_lossy().to_string(), + )]; + let output = run_bash_chunked( + Input::Starter(bash_input_value), + &extra_envs, + cancel_token.clone(), + |_| {}, + ) + .await; + + if cancel_token.is_cancelled() { + return Err(ComboReplyError::Cancelled); + } + + let (output, is_error) = match output { + Ok(Output::Final(output)) => (output, false), + Ok(Output::TextEdit(_)) => ( + Final::Message("unexpected tool output from bash".to_string()), + true, + ), + Err(output) => (output, true), + }; + + agent + .append_message(build_tool_result_message( + &bash_tool_use.id, + is_error, + &output, + )) + .await; + emit_combo_event( + on_event, + ComboEvent::ReplyToolResult { + name: combo_name.to_string(), + tool_use_id: bash_tool_use.id.clone(), + is_error, + output: output.clone(), + }, + ); + + if matches!(command_kind, OffloadCommandKind::Safe) { + let prompt = build_offload_reply_guidance(schemas, &original_command, true); + agent + .append_message(Message::user(Content::Text(prompt))) + .await; + return Err(ComboReplyError::UnexpectedCommand { + command: original_command, + }); + } + + Ok(()) +} + +struct ComboDiscoveryResult { + combos: Vec, + cancelled: bool, +} + +async fn discover_combo_infos( + config: &Config, + cancel_token: CancellationToken, + ignore_workspace_scripts: bool, +) -> ComboDiscoveryResult { + let combo_dirs = combo_discovery_dirs(config, ignore_workspace_scripts); + let combo_dirs = combo_dirs.iter().map(PathBuf::as_path).collect::>(); + let result = discover_starters(&combo_dirs, cancel_token).await; + let combos = result + .starters + .into_iter() + .filter_map(|starter| match starter.combo { + Ok(combo) => Some(ComboInfo { + path: starter.path, + combo, + }), + Err(err) => { + warn!(?starter.path, ?err, "Failed to load combo"); + None + } + }) + .collect::>(); + ComboDiscoveryResult { + combos, + cancelled: result.cancelled, + } +} + +fn combo_discovery_dirs(config: &Config, ignore_workspace_scripts: bool) -> Vec { + let mut combo_dirs = Vec::with_capacity(2); + if !ignore_workspace_scripts { + combo_dirs.push(workspace_dir().join(".coco/combos")); + } + combo_dirs.push(config.combo_dir()); + combo_dirs +} + +fn flush_buffer( + combo_name: &str, + buffer: &Arc>, + stream: StreamKind, + on_event: &ComboEventCallback, + now_ms: impl Fn() -> i64, +) { + if let Ok(mut buf) = buffer.lock() + && !buf.is_empty() + { + let content = std::mem::take(&mut *buf); + if let Ok(mut f) = on_event.lock() { + (*f)(&ComboEvent::Output { + name: combo_name.to_string(), + chunk: OutputChunk { + timestamp: now_ms(), + stream, + lines: vec![content], + }, + }); + } + } +} + +fn format_command_line(command: &str, args: &[String]) -> String { + let command = resolve_command_display(command); + let mut parts = Vec::with_capacity(args.len() + 1); + parts.push(shell_escape(&command)); + for arg in args { + parts.push(shell_escape(arg)); + } + parts.join(" ") +} + +fn resolve_command_display(command: &str) -> String { + let command_path = std::path::Path::new(command); + let workspace_combo_dir = crate::workspace_dir().join(".coco/combos"); + if let Ok(relative) = command_path.strip_prefix(&workspace_combo_dir) { + let display_path = std::path::Path::new(".coco/combos").join(relative); + return display_path.to_string_lossy().to_string(); + } + command.to_string() +} + +fn shell_escape(value: &str) -> String { + if value.is_empty() { + return "''".to_string(); + } + if value.bytes().all(|byte| { + matches!(byte, b'a'..=b'z' + | b'A'..=b'Z' + | b'0'..=b'9' + | b'_' + | b'-' + | b'.' + | b'/' + | b':' + | b'@' + | b'+' + | b'=' + | b',' + | b'%') + }) { + return value.to_string(); + } + let mut escaped = String::from("'"); + for ch in value.chars() { + if ch == '\'' { + escaped.push_str("'\"'\"'"); + } else { + escaped.push(ch); + } + } + escaped.push('\''); + escaped +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn serialize_run_combo_output() { + let output = RunComboOutput { + success: true, + summary: "Combo completed".to_string(), + tool_calls: 3, + error: None, + }; + + let json = serde_json::to_value(&output).unwrap(); + assert_eq!(json["success"], true); + assert_eq!(json["tool_calls"], 3); + assert!(json.get("error").is_none() || json["error"].is_null()); + } +}