feat(channel): add tool status display and configurable message timeout

Show real-time tool execution status in channels with draft support
(e.g. Telegram with stream_mode=partial). During processing, the draft
message shows "Thinking..." and progressively adds tool lines like
"🔧 shell(ls -la)" as tools execute. The final response replaces
all status lines cleanly via finalize_draft.

Also makes the channel message timeout configurable via
agent.channel_message_timeout_secs (default 300s), replacing the
previously hardcoded constant.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
harald 2026-02-21 07:39:39 +01:00
parent 61a998cae3
commit 7df2102d9d
2 changed files with 109 additions and 21 deletions

View file

@ -30,7 +30,7 @@ pub use telegram::TelegramChannel;
pub use traits::{Channel, SendMessage}; pub use traits::{Channel, SendMessage};
pub use whatsapp::WhatsAppChannel; pub use whatsapp::WhatsAppChannel;
use crate::agent::loop_::{build_tool_instructions, run_tool_call_loop}; use crate::agent::loop_::{build_tool_instructions, run_tool_call_loop, ToolStatusEvent};
use crate::config::Config; use crate::config::Config;
use crate::identity; use crate::identity;
use crate::memory::{self, Memory}; use crate::memory::{self, Memory};
@ -60,9 +60,6 @@ const BOOTSTRAP_MAX_CHARS: usize = 20_000;
const DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS: u64 = 2; const DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS: u64 = 2;
const DEFAULT_CHANNEL_MAX_BACKOFF_SECS: u64 = 60; const DEFAULT_CHANNEL_MAX_BACKOFF_SECS: u64 = 60;
/// Timeout for processing a single channel message (LLM + tools).
/// 300s for on-device LLMs (Ollama) which are slower than cloud APIs.
const CHANNEL_MESSAGE_TIMEOUT_SECS: u64 = 300;
const CHANNEL_PARALLELISM_PER_CHANNEL: usize = 4; const CHANNEL_PARALLELISM_PER_CHANNEL: usize = 4;
const CHANNEL_MIN_IN_FLIGHT_MESSAGES: usize = 8; const CHANNEL_MIN_IN_FLIGHT_MESSAGES: usize = 8;
const CHANNEL_MAX_IN_FLIGHT_MESSAGES: usize = 64; const CHANNEL_MAX_IN_FLIGHT_MESSAGES: usize = 64;
@ -120,6 +117,7 @@ struct ChannelRuntimeContext {
reliability: Arc<crate::config::ReliabilityConfig>, reliability: Arc<crate::config::ReliabilityConfig>,
provider_runtime_options: providers::ProviderRuntimeOptions, provider_runtime_options: providers::ProviderRuntimeOptions,
workspace_dir: Arc<PathBuf>, workspace_dir: Arc<PathBuf>,
channel_message_timeout_secs: u64,
} }
fn conversation_memory_key(msg: &traits::ChannelMessage) -> String { fn conversation_memory_key(msg: &traits::ChannelMessage) -> String {
@ -523,6 +521,15 @@ fn log_worker_join_result(result: Result<(), tokio::task::JoinError>) {
} }
} }
/// Compose tool status lines with the current draft content for display.
fn format_tool_display(tool_lines: &str, content: &str) -> String {
if tool_lines.is_empty() {
content.to_string()
} else {
format!("{tool_lines}{content}")
}
}
fn spawn_scoped_typing_task( fn spawn_scoped_typing_task(
channel: Arc<dyn Channel>, channel: Arc<dyn Channel>,
recipient: String, recipient: String,
@ -632,19 +639,25 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
.as_ref() .as_ref()
.map_or(false, |ch| ch.supports_draft_updates()); .map_or(false, |ch| ch.supports_draft_updates());
// Set up streaming channel if supported // Set up streaming channels if supported
let (delta_tx, delta_rx) = if use_streaming { let (delta_tx, delta_rx) = if use_streaming {
let (tx, rx) = tokio::sync::mpsc::channel::<String>(64); let (tx, rx) = tokio::sync::mpsc::channel::<String>(64);
(Some(tx), Some(rx)) (Some(tx), Some(rx))
} else { } else {
(None, None) (None, None)
}; };
let (tool_status_tx, tool_status_rx) = if use_streaming {
let (tx, rx) = tokio::sync::mpsc::channel::<ToolStatusEvent>(32);
(Some(tx), Some(rx))
} else {
(None, None)
};
// Send initial draft message if streaming // Send initial draft message if streaming
let draft_message_id = if use_streaming { let draft_message_id = if use_streaming {
if let Some(channel) = target_channel.as_ref() { if let Some(channel) = target_channel.as_ref() {
match channel match channel
.send_draft(&SendMessage::new("...", &msg.reply_target)) .send_draft(&SendMessage::new("Thinking...", &msg.reply_target))
.await .await
{ {
Ok(id) => id, Ok(id) => id,
@ -660,24 +673,88 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
None None
}; };
// Spawn a task to forward streaming deltas to draft updates // Spawn a task to merge tool status events and streaming deltas into draft updates
let draft_updater = if let (Some(mut rx), Some(draft_id_ref), Some(channel_ref)) = ( let draft_updater = if let (Some(draft_id_ref), Some(channel_ref)) =
delta_rx, (draft_message_id.as_deref(), target_channel.as_ref())
draft_message_id.as_deref(), {
target_channel.as_ref(),
) {
let channel = Arc::clone(channel_ref); let channel = Arc::clone(channel_ref);
let reply_target = msg.reply_target.clone(); let reply_target = msg.reply_target.clone();
let draft_id = draft_id_ref.to_string(); let draft_id = draft_id_ref.to_string();
let mut delta_rx = delta_rx;
let mut tool_status_rx = tool_status_rx;
Some(tokio::spawn(async move { Some(tokio::spawn(async move {
let mut tool_lines = String::new();
let mut accumulated = String::new(); let mut accumulated = String::new();
while let Some(delta) = rx.recv().await {
accumulated.push_str(&delta); loop {
if let Err(e) = channel tokio::select! {
.update_draft(&reply_target, &draft_id, &accumulated) evt = async {
.await match tool_status_rx.as_mut() {
{ Some(rx) => rx.recv().await,
tracing::debug!("Draft update failed: {e}"); None => std::future::pending().await,
}
} => {
match evt {
Some(ToolStatusEvent::Thinking) => {
let display = format_tool_display(&tool_lines, "Thinking...");
if let Err(e) = channel
.update_draft(&reply_target, &draft_id, &display)
.await
{
tracing::debug!("Draft update failed: {e}");
}
}
Some(ToolStatusEvent::ToolStart { name, detail }) => {
let label = match detail {
Some(d) => format!("\u{1f527} {name}({d})\n"),
None => format!("\u{1f527} {name}\n"),
};
tool_lines.push_str(&label);
let display =
format_tool_display(&tool_lines, "Thinking...");
if let Err(e) = channel
.update_draft(&reply_target, &draft_id, &display)
.await
{
tracing::debug!("Draft update failed: {e}");
}
}
None => {
// Tool status channel closed; keep consuming deltas
tool_status_rx = None;
if delta_rx.is_none() {
break;
}
}
}
}
delta = async {
match delta_rx.as_mut() {
Some(rx) => rx.recv().await,
None => std::future::pending().await,
}
} => {
match delta {
Some(text) => {
accumulated.push_str(&text);
let display =
format_tool_display(&tool_lines, &accumulated);
if let Err(e) = channel
.update_draft(&reply_target, &draft_id, &display)
.await
{
tracing::debug!("Draft update failed: {e}");
}
}
None => {
// Delta channel closed; keep consuming tool events
delta_rx = None;
if tool_status_rx.is_none() {
break;
}
}
}
}
} }
} }
})) }))
@ -696,7 +773,7 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
}; };
let llm_result = tokio::time::timeout( let llm_result = tokio::time::timeout(
Duration::from_secs(CHANNEL_MESSAGE_TIMEOUT_SECS), Duration::from_secs(ctx.channel_message_timeout_secs),
run_tool_call_loop( run_tool_call_loop(
active_provider.as_ref(), active_provider.as_ref(),
&mut history, &mut history,
@ -710,6 +787,7 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
msg.channel.as_str(), msg.channel.as_str(),
ctx.max_tool_iterations, ctx.max_tool_iterations,
delta_tx, delta_tx,
tool_status_tx,
), ),
) )
.await; .await;
@ -789,7 +867,7 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
Err(_) => { Err(_) => {
let timeout_msg = format!( let timeout_msg = format!(
"LLM response timed out after {}s", "LLM response timed out after {}s",
CHANNEL_MESSAGE_TIMEOUT_SECS ctx.channel_message_timeout_secs
); );
eprintln!( eprintln!(
" ❌ {} (elapsed: {}ms)", " ❌ {} (elapsed: {}ms)",
@ -1835,6 +1913,7 @@ pub async fn start_channels(config: Config) -> Result<()> {
reliability: Arc::new(config.reliability.clone()), reliability: Arc::new(config.reliability.clone()),
provider_runtime_options, provider_runtime_options,
workspace_dir: Arc::new(config.workspace_dir.clone()), workspace_dir: Arc::new(config.workspace_dir.clone()),
channel_message_timeout_secs: config.agent.channel_message_timeout_secs,
}); });
run_message_dispatch_loop(rx, runtime_ctx, max_in_flight_messages).await; run_message_dispatch_loop(rx, runtime_ctx, max_in_flight_messages).await;

View file

@ -253,6 +253,10 @@ pub struct AgentConfig {
pub parallel_tools: bool, pub parallel_tools: bool,
#[serde(default = "default_agent_tool_dispatcher")] #[serde(default = "default_agent_tool_dispatcher")]
pub tool_dispatcher: String, pub tool_dispatcher: String,
/// Timeout in seconds for processing a single channel message (LLM + tools).
/// Default 300s accommodates on-device LLMs (Ollama) which are slower than cloud APIs.
#[serde(default = "default_channel_message_timeout_secs")]
pub channel_message_timeout_secs: u64,
} }
fn default_agent_max_tool_iterations() -> usize { fn default_agent_max_tool_iterations() -> usize {
@ -267,6 +271,10 @@ fn default_agent_tool_dispatcher() -> String {
"auto".into() "auto".into()
} }
fn default_channel_message_timeout_secs() -> u64 {
300
}
impl Default for AgentConfig { impl Default for AgentConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
@ -275,6 +283,7 @@ impl Default for AgentConfig {
max_history_messages: default_agent_max_history_messages(), max_history_messages: default_agent_max_history_messages(),
parallel_tools: false, parallel_tools: false,
tool_dispatcher: default_agent_tool_dispatcher(), tool_dispatcher: default_agent_tool_dispatcher(),
channel_message_timeout_secs: default_channel_message_timeout_secs(),
} }
} }
} }