diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 0fff1ec..435deb7 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -30,7 +30,7 @@ pub use telegram::TelegramChannel; pub use traits::{Channel, SendMessage}; pub use whatsapp::WhatsAppChannel; -use crate::agent::loop_::{build_tool_instructions, run_tool_call_loop}; +use crate::agent::loop_::{build_tool_instructions, run_tool_call_loop, ToolStatusEvent}; use crate::config::Config; use crate::identity; use crate::memory::{self, Memory}; @@ -60,9 +60,6 @@ const BOOTSTRAP_MAX_CHARS: usize = 20_000; const DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS: u64 = 2; const DEFAULT_CHANNEL_MAX_BACKOFF_SECS: u64 = 60; -/// Timeout for processing a single channel message (LLM + tools). -/// 300s for on-device LLMs (Ollama) which are slower than cloud APIs. -const CHANNEL_MESSAGE_TIMEOUT_SECS: u64 = 300; const CHANNEL_PARALLELISM_PER_CHANNEL: usize = 4; const CHANNEL_MIN_IN_FLIGHT_MESSAGES: usize = 8; const CHANNEL_MAX_IN_FLIGHT_MESSAGES: usize = 64; @@ -120,6 +117,7 @@ struct ChannelRuntimeContext { reliability: Arc, provider_runtime_options: providers::ProviderRuntimeOptions, workspace_dir: Arc, + channel_message_timeout_secs: u64, } fn conversation_memory_key(msg: &traits::ChannelMessage) -> String { @@ -523,6 +521,15 @@ fn log_worker_join_result(result: Result<(), tokio::task::JoinError>) { } } +/// Compose tool status lines with the current draft content for display. +fn format_tool_display(tool_lines: &str, content: &str) -> String { + if tool_lines.is_empty() { + content.to_string() + } else { + format!("{tool_lines}{content}") + } +} + fn spawn_scoped_typing_task( channel: Arc, recipient: String, @@ -632,19 +639,25 @@ async fn process_channel_message(ctx: Arc, msg: traits::C .as_ref() .map_or(false, |ch| ch.supports_draft_updates()); - // Set up streaming channel if supported + // Set up streaming channels if supported let (delta_tx, delta_rx) = if use_streaming { let (tx, rx) = tokio::sync::mpsc::channel::(64); (Some(tx), Some(rx)) } else { (None, None) }; + let (tool_status_tx, tool_status_rx) = if use_streaming { + let (tx, rx) = tokio::sync::mpsc::channel::(32); + (Some(tx), Some(rx)) + } else { + (None, None) + }; // Send initial draft message if streaming let draft_message_id = if use_streaming { if let Some(channel) = target_channel.as_ref() { match channel - .send_draft(&SendMessage::new("...", &msg.reply_target)) + .send_draft(&SendMessage::new("Thinking...", &msg.reply_target)) .await { Ok(id) => id, @@ -660,24 +673,88 @@ async fn process_channel_message(ctx: Arc, msg: traits::C None }; - // Spawn a task to forward streaming deltas to draft updates - let draft_updater = if let (Some(mut rx), Some(draft_id_ref), Some(channel_ref)) = ( - delta_rx, - draft_message_id.as_deref(), - target_channel.as_ref(), - ) { + // Spawn a task to merge tool status events and streaming deltas into draft updates + let draft_updater = if let (Some(draft_id_ref), Some(channel_ref)) = + (draft_message_id.as_deref(), target_channel.as_ref()) + { let channel = Arc::clone(channel_ref); let reply_target = msg.reply_target.clone(); let draft_id = draft_id_ref.to_string(); + let mut delta_rx = delta_rx; + let mut tool_status_rx = tool_status_rx; Some(tokio::spawn(async move { + let mut tool_lines = String::new(); let mut accumulated = String::new(); - while let Some(delta) = rx.recv().await { - accumulated.push_str(&delta); - if let Err(e) = channel - .update_draft(&reply_target, &draft_id, &accumulated) - .await - { - tracing::debug!("Draft update failed: {e}"); + + loop { + tokio::select! { + evt = async { + match tool_status_rx.as_mut() { + Some(rx) => rx.recv().await, + None => std::future::pending().await, + } + } => { + match evt { + Some(ToolStatusEvent::Thinking) => { + let display = format_tool_display(&tool_lines, "Thinking..."); + if let Err(e) = channel + .update_draft(&reply_target, &draft_id, &display) + .await + { + tracing::debug!("Draft update failed: {e}"); + } + } + Some(ToolStatusEvent::ToolStart { name, detail }) => { + let label = match detail { + Some(d) => format!("\u{1f527} {name}({d})\n"), + None => format!("\u{1f527} {name}\n"), + }; + tool_lines.push_str(&label); + let display = + format_tool_display(&tool_lines, "Thinking..."); + if let Err(e) = channel + .update_draft(&reply_target, &draft_id, &display) + .await + { + tracing::debug!("Draft update failed: {e}"); + } + } + None => { + // Tool status channel closed; keep consuming deltas + tool_status_rx = None; + if delta_rx.is_none() { + break; + } + } + } + } + delta = async { + match delta_rx.as_mut() { + Some(rx) => rx.recv().await, + None => std::future::pending().await, + } + } => { + match delta { + Some(text) => { + accumulated.push_str(&text); + let display = + format_tool_display(&tool_lines, &accumulated); + if let Err(e) = channel + .update_draft(&reply_target, &draft_id, &display) + .await + { + tracing::debug!("Draft update failed: {e}"); + } + } + None => { + // Delta channel closed; keep consuming tool events + delta_rx = None; + if tool_status_rx.is_none() { + break; + } + } + } + } } } })) @@ -696,7 +773,7 @@ async fn process_channel_message(ctx: Arc, msg: traits::C }; let llm_result = tokio::time::timeout( - Duration::from_secs(CHANNEL_MESSAGE_TIMEOUT_SECS), + Duration::from_secs(ctx.channel_message_timeout_secs), run_tool_call_loop( active_provider.as_ref(), &mut history, @@ -710,6 +787,7 @@ async fn process_channel_message(ctx: Arc, msg: traits::C msg.channel.as_str(), ctx.max_tool_iterations, delta_tx, + tool_status_tx, ), ) .await; @@ -789,7 +867,7 @@ async fn process_channel_message(ctx: Arc, msg: traits::C Err(_) => { let timeout_msg = format!( "LLM response timed out after {}s", - CHANNEL_MESSAGE_TIMEOUT_SECS + ctx.channel_message_timeout_secs ); eprintln!( " ❌ {} (elapsed: {}ms)", @@ -1835,6 +1913,7 @@ pub async fn start_channels(config: Config) -> Result<()> { reliability: Arc::new(config.reliability.clone()), provider_runtime_options, workspace_dir: Arc::new(config.workspace_dir.clone()), + channel_message_timeout_secs: config.agent.channel_message_timeout_secs, }); run_message_dispatch_loop(rx, runtime_ctx, max_in_flight_messages).await; diff --git a/src/config/schema.rs b/src/config/schema.rs index 8d9138f..3b0141a 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -253,6 +253,10 @@ pub struct AgentConfig { pub parallel_tools: bool, #[serde(default = "default_agent_tool_dispatcher")] pub tool_dispatcher: String, + /// Timeout in seconds for processing a single channel message (LLM + tools). + /// Default 300s accommodates on-device LLMs (Ollama) which are slower than cloud APIs. + #[serde(default = "default_channel_message_timeout_secs")] + pub channel_message_timeout_secs: u64, } fn default_agent_max_tool_iterations() -> usize { @@ -267,6 +271,10 @@ fn default_agent_tool_dispatcher() -> String { "auto".into() } +fn default_channel_message_timeout_secs() -> u64 { + 300 +} + impl Default for AgentConfig { fn default() -> Self { Self { @@ -275,6 +283,7 @@ impl Default for AgentConfig { max_history_messages: default_agent_max_history_messages(), parallel_tools: false, tool_dispatcher: default_agent_tool_dispatcher(), + channel_message_timeout_secs: default_channel_message_timeout_secs(), } } }