From 118cd53922ac11f6a9445fd51a1cbb5ae465e7f3 Mon Sep 17 00:00:00 2001 From: Xiangjun Ma Date: Tue, 17 Feb 2026 23:46:32 -0800 Subject: [PATCH] feat(channel): stream LLM responses to Telegram via draft message edits Wire the existing provider-layer streaming infrastructure through the channel trait and agent loop so Telegram users see tokens arrive progressively via editMessageText, instead of waiting for the full response. Changes: - Add StreamMode enum (off/partial/block) and draft_update_interval_ms to TelegramConfig (backward-compatible defaults: off, 1000ms) - Add supports_draft_updates/send_draft/update_draft/finalize_draft to Channel trait with no-op defaults (zero impact on existing channels) - Implement draft methods on TelegramChannel using sendMessage + editMessageText with rate limiting and Markdown fallback - Add on_delta mpsc::Sender parameter to run_tool_call_loop (None preserves existing behavior) - Wire streaming in process_channel_message: when channel supports drafts, send initial draft, spawn updater task, finalize on completion Edge cases handled: - 4096-char limit: finalize draft and fall back to chunked send - Broken Markdown: use no parse_mode during streaming, apply on finalize - Edit failures: fall back to sending complete response as new message - Rate limiting: configurable draft_update_interval_ms (default 1s) --- src/agent/loop_.rs | 10 +- src/channels/mod.rs | 129 +++++++++++++++++++---- src/channels/telegram.rs | 192 ++++++++++++++++++++++++++++++++++- src/channels/traits.rs | 47 +++++++++ src/config/mod.rs | 5 +- src/config/schema.rs | 37 +++++++ src/cron/store.rs | 3 +- src/daemon/mod.rs | 2 + src/integrations/registry.rs | 4 +- src/memory/sqlite.rs | 15 +-- src/onboard/wizard.rs | 4 +- src/tools/git_operations.rs | 5 +- 12 files changed, 410 insertions(+), 43 deletions(-) diff --git a/src/agent/loop_.rs b/src/agent/loop_.rs index fcc16a5..cb0fe17 100644 --- a/src/agent/loop_.rs +++ b/src/agent/loop_.rs @@ -790,6 +790,7 @@ pub(crate) async fn agent_turn( None, "channel", max_tool_iterations, + None, ) .await } @@ -809,6 +810,7 @@ pub(crate) async fn run_tool_call_loop( approval: Option<&ApprovalManager>, channel_name: &str, max_tool_iterations: usize, + on_delta: Option>, ) -> Result { let max_iterations = if max_tool_iterations == 0 { DEFAULT_MAX_TOOL_ITERATIONS @@ -938,7 +940,11 @@ pub(crate) async fn run_tool_call_loop( }; if tool_calls.is_empty() { - // No tool calls — this is the final response + // No tool calls — this is the final response. + // If a streaming sender is provided, send the final text through it. + if let Some(ref tx) = on_delta { + let _ = tx.send(display_text.clone()).await; + } history.push(ChatMessage::assistant(response_text.clone())); return Ok(display_text); } @@ -1358,6 +1364,7 @@ pub async fn run( Some(&approval_manager), "cli", config.agent.max_tool_iterations, + None, ) .await?; final_output = response.clone(); @@ -1483,6 +1490,7 @@ pub async fn run( Some(&approval_manager), "cli", config.agent.max_tool_iterations, + None, ) .await { diff --git a/src/channels/mod.rs b/src/channels/mod.rs index ec11c2b..a5addcc 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -221,6 +221,64 @@ async fn process_channel_message(ctx: Arc, msg: traits::C history.push(ChatMessage::system(instructions)); } + // Determine if this channel supports streaming draft updates + let use_streaming = target_channel + .as_ref() + .map_or(false, |ch| ch.supports_draft_updates()); + + // Set up streaming channel if supported + let (delta_tx, delta_rx) = if use_streaming { + let (tx, rx) = tokio::sync::mpsc::channel::(64); + (Some(tx), Some(rx)) + } else { + (None, None) + }; + + // Send initial draft message if streaming + let draft_message_id = if use_streaming { + if let Some(channel) = target_channel.as_ref() { + match channel + .send_draft(&SendMessage::new("...", &msg.reply_target)) + .await + { + Ok(id) => id, + Err(e) => { + tracing::debug!("Failed to send draft on {}: {e}", channel.name()); + None + } + } + } else { + None + } + } else { + None + }; + + // Spawn a task to forward streaming deltas to draft updates + let draft_updater = if let (Some(mut rx), Some(draft_id_ref), Some(channel_ref)) = ( + delta_rx, + draft_message_id.as_deref(), + target_channel.as_ref(), + ) { + let channel = Arc::clone(channel_ref); + let reply_target = msg.reply_target.clone(); + let draft_id = draft_id_ref.to_string(); + Some(tokio::spawn(async move { + let mut accumulated = String::new(); + while let Some(delta) = rx.recv().await { + accumulated.push_str(&delta); + if let Err(e) = channel + .update_draft(&reply_target, &draft_id, &accumulated) + .await + { + tracing::debug!("Draft update failed: {e}"); + } + } + })) + } else { + None + }; + let llm_result = tokio::time::timeout( Duration::from_secs(CHANNEL_MESSAGE_TIMEOUT_SECS), run_tool_call_loop( @@ -231,14 +289,20 @@ async fn process_channel_message(ctx: Arc, msg: traits::C "channel-runtime", ctx.model.as_str(), ctx.temperature, - true, // silent — channels don't write to stdout + true, None, msg.channel.as_str(), ctx.max_tool_iterations, + delta_tx, ), ) .await; + // Wait for draft updater to finish + if let Some(handle) = draft_updater { + let _ = handle.await; + } + if let Some(channel) = target_channel.as_ref() { if let Err(e) = channel.stop_typing(&msg.reply_target).await { tracing::debug!("Failed to stop typing on {}: {e}", channel.name()); @@ -253,7 +317,17 @@ async fn process_channel_message(ctx: Arc, msg: traits::C truncate_with_ellipsis(&response, 80) ); if let Some(channel) = target_channel.as_ref() { - if let Err(e) = channel + if let Some(ref draft_id) = draft_message_id { + if let Err(e) = channel + .finalize_draft(&msg.reply_target, draft_id, &response) + .await + { + tracing::warn!("Failed to finalize draft: {e}; sending as new message"); + let _ = channel + .send(&SendMessage::new(&response, &msg.reply_target)) + .await; + } + } else if let Err(e) = channel .send(&SendMessage::new(response, &msg.reply_target)) .await { @@ -267,12 +341,18 @@ async fn process_channel_message(ctx: Arc, msg: traits::C started_at.elapsed().as_millis() ); if let Some(channel) = target_channel.as_ref() { - let _ = channel - .send(&SendMessage::new( - format!("⚠️ Error: {e}"), - &msg.reply_target, - )) - .await; + if let Some(ref draft_id) = draft_message_id { + let _ = channel + .finalize_draft(&msg.reply_target, draft_id, &format!("⚠️ Error: {e}")) + .await; + } else { + let _ = channel + .send(&SendMessage::new( + format!("⚠️ Error: {e}"), + &msg.reply_target, + )) + .await; + } } } Err(_) => { @@ -286,12 +366,17 @@ async fn process_channel_message(ctx: Arc, msg: traits::C started_at.elapsed().as_millis() ); if let Some(channel) = target_channel.as_ref() { - let _ = channel - .send(&SendMessage::new( - "⚠️ Request timed out while waiting for the model. Please try again.", - &msg.reply_target, - )) - .await; + let error_text = + "⚠️ Request timed out while waiting for the model. Please try again."; + if let Some(ref draft_id) = draft_message_id { + let _ = channel + .finalize_draft(&msg.reply_target, draft_id, error_text) + .await; + } else { + let _ = channel + .send(&SendMessage::new(error_text, &msg.reply_target)) + .await; + } } } } @@ -797,10 +882,10 @@ pub async fn doctor_channels(config: Config) -> Result<()> { if let Some(ref tg) = config.channels_config.telegram { channels.push(( "Telegram", - Arc::new(TelegramChannel::new( - tg.bot_token.clone(), - tg.allowed_users.clone(), - )), + Arc::new( + TelegramChannel::new(tg.bot_token.clone(), tg.allowed_users.clone()) + .with_streaming(tg.stream_mode, tg.draft_update_interval_ms), + ), )); } @@ -1117,10 +1202,10 @@ pub async fn start_channels(config: Config) -> Result<()> { let mut channels: Vec> = Vec::new(); if let Some(ref tg) = config.channels_config.telegram { - channels.push(Arc::new(TelegramChannel::new( - tg.bot_token.clone(), - tg.allowed_users.clone(), - ))); + channels.push(Arc::new( + TelegramChannel::new(tg.bot_token.clone(), tg.allowed_users.clone()) + .with_streaming(tg.stream_mode, tg.draft_update_interval_ms), + )); } if let Some(ref dc) = config.channels_config.discord { diff --git a/src/channels/telegram.rs b/src/channels/telegram.rs index 1ac0ffa..73d3429 100644 --- a/src/channels/telegram.rs +++ b/src/channels/telegram.rs @@ -1,5 +1,5 @@ use super::traits::{Channel, ChannelMessage, SendMessage}; -use crate::config::Config; +use crate::config::{Config, StreamMode}; use crate::security::pairing::PairingGuard; use anyhow::Context; use async_trait::async_trait; @@ -237,6 +237,9 @@ pub struct TelegramChannel { pairing: Option, client: reqwest::Client, typing_handle: Mutex>>, + stream_mode: StreamMode, + draft_update_interval_ms: u64, + last_draft_edit: Mutex>, } impl TelegramChannel { @@ -258,10 +261,33 @@ impl TelegramChannel { allowed_users: Arc::new(RwLock::new(normalized_allowed)), pairing, client: reqwest::Client::new(), + stream_mode: StreamMode::Off, + draft_update_interval_ms: 1000, + last_draft_edit: Mutex::new(None), typing_handle: Mutex::new(None), } } + /// Configure streaming mode for progressive draft updates. + pub fn with_streaming( + mut self, + stream_mode: StreamMode, + draft_update_interval_ms: u64, + ) -> Self { + self.stream_mode = stream_mode; + self.draft_update_interval_ms = draft_update_interval_ms; + self + } + + /// Parse reply_target into (chat_id, optional thread_id). + fn parse_reply_target(reply_target: &str) -> (String, Option) { + if let Some((chat_id, thread_id)) = reply_target.split_once(':') { + (chat_id.to_string(), Some(thread_id.to_string())) + } else { + (reply_target.to_string(), None) + } + } + fn normalize_identity(value: &str) -> String { value.trim().trim_start_matches('@').to_string() } @@ -1182,6 +1208,170 @@ impl Channel for TelegramChannel { "telegram" } + fn supports_draft_updates(&self) -> bool { + self.stream_mode != StreamMode::Off + } + + async fn send_draft(&self, message: &SendMessage) -> anyhow::Result> { + if self.stream_mode == StreamMode::Off { + return Ok(None); + } + + let (chat_id, thread_id) = Self::parse_reply_target(&message.recipient); + let initial_text = if message.content.is_empty() { + "...".to_string() + } else { + message.content.clone() + }; + + let mut body = serde_json::json!({ + "chat_id": chat_id, + "text": initial_text, + }); + if let Some(tid) = thread_id { + body["message_thread_id"] = serde_json::Value::String(tid.to_string()); + } + + let resp = self + .client + .post(self.api_url("sendMessage")) + .json(&body) + .send() + .await?; + + if !resp.status().is_success() { + let err = resp.text().await.unwrap_or_default(); + anyhow::bail!("Telegram sendMessage (draft) failed: {err}"); + } + + let resp_json: serde_json::Value = resp.json().await?; + let message_id = resp_json + .get("result") + .and_then(|r| r.get("message_id")) + .and_then(|id| id.as_i64()) + .map(|id| id.to_string()); + + *self.last_draft_edit.lock() = Some(std::time::Instant::now()); + + Ok(message_id) + } + + async fn update_draft( + &self, + recipient: &str, + message_id: &str, + text: &str, + ) -> anyhow::Result<()> { + // Rate-limit edits + { + let last = self.last_draft_edit.lock(); + if let Some(last_time) = *last { + let elapsed = u64::try_from(last_time.elapsed().as_millis()).unwrap_or(u64::MAX); + if elapsed < self.draft_update_interval_ms { + return Ok(()); + } + } + } + + let (chat_id, _) = Self::parse_reply_target(recipient); + + // Truncate to Telegram limit for mid-stream edits + let display_text = if text.len() > TELEGRAM_MAX_MESSAGE_LENGTH { + &text[..TELEGRAM_MAX_MESSAGE_LENGTH] + } else { + text + }; + + let body = serde_json::json!({ + "chat_id": chat_id, + "message_id": message_id.parse::().unwrap_or(0), + "text": display_text, + }); + + let resp = self + .client + .post(self.api_url("editMessageText")) + .json(&body) + .send() + .await?; + + if resp.status().is_success() { + *self.last_draft_edit.lock() = Some(std::time::Instant::now()); + } else { + let status = resp.status(); + let err = resp.text().await.unwrap_or_default(); + tracing::debug!("Telegram editMessageText failed ({status}): {err}"); + } + + Ok(()) + } + + async fn finalize_draft( + &self, + recipient: &str, + message_id: &str, + text: &str, + ) -> anyhow::Result<()> { + let (chat_id, thread_id) = Self::parse_reply_target(recipient); + + // If text exceeds limit, delete draft and send as chunked messages + if text.len() > TELEGRAM_MAX_MESSAGE_LENGTH { + // Delete the draft + let _ = self + .client + .post(self.api_url("deleteMessage")) + .json(&serde_json::json!({ + "chat_id": chat_id, + "message_id": message_id.parse::().unwrap_or(0), + })) + .send() + .await; + + // Fall back to chunked send + return self + .send_text_chunks(text, &chat_id, thread_id.as_deref()) + .await; + } + + // Try editing with Markdown formatting + let mut body = serde_json::json!({ + "chat_id": chat_id, + "message_id": message_id.parse::().unwrap_or(0), + "text": text, + "parse_mode": "Markdown", + }); + + let resp = self + .client + .post(self.api_url("editMessageText")) + .json(&body) + .send() + .await?; + + if resp.status().is_success() { + return Ok(()); + } + + // Markdown failed — retry without parse_mode + body.as_object_mut().unwrap().remove("parse_mode"); + + let resp = self + .client + .post(self.api_url("editMessageText")) + .json(&body) + .send() + .await?; + + if resp.status().is_success() { + return Ok(()); + } + + // Edit failed entirely — fall back to new message + tracing::warn!("Telegram finalize_draft edit failed; falling back to sendMessage"); + self.send_text_chunks(text, &chat_id, thread_id.as_deref()) + .await + } + async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { // Strip tool_call tags before processing to prevent Markdown parsing failures let content = strip_tool_call_tags(&message.content); diff --git a/src/channels/traits.rs b/src/channels/traits.rs index 1731ba8..3a7d9df 100644 --- a/src/channels/traits.rs +++ b/src/channels/traits.rs @@ -70,6 +70,36 @@ pub trait Channel: Send + Sync { async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> { Ok(()) } + + /// Whether this channel supports progressive message updates via draft edits. + fn supports_draft_updates(&self) -> bool { + false + } + + /// Send an initial draft message. Returns a platform-specific message ID for later edits. + async fn send_draft(&self, _message: &SendMessage) -> anyhow::Result> { + Ok(None) + } + + /// Update a previously sent draft message with new accumulated content. + async fn update_draft( + &self, + _recipient: &str, + _message_id: &str, + _text: &str, + ) -> anyhow::Result<()> { + Ok(()) + } + + /// Finalize a draft with the complete response (e.g. apply Markdown formatting). + async fn finalize_draft( + &self, + _recipient: &str, + _message_id: &str, + _text: &str, + ) -> anyhow::Result<()> { + Ok(()) + } } #[cfg(test)] @@ -138,6 +168,23 @@ mod tests { .is_ok()); } + #[tokio::test] + async fn default_draft_methods_return_success() { + let channel = DummyChannel; + + assert!(!channel.supports_draft_updates()); + assert!(channel + .send_draft(&SendMessage::new("draft", "bob")) + .await + .unwrap() + .is_none()); + assert!(channel.update_draft("bob", "msg_1", "text").await.is_ok()); + assert!(channel + .finalize_draft("bob", "msg_1", "final text") + .await + .is_ok()); + } + #[tokio::test] async fn listen_sends_message_to_channel() { let channel = DummyChannel; diff --git a/src/config/mod.rs b/src/config/mod.rs index bfe18e7..5c8d3d5 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -9,7 +9,8 @@ pub use schema::{ LarkConfig, MatrixConfig, MemoryConfig, ModelRouteConfig, ObservabilityConfig, PeripheralBoardConfig, PeripheralsConfig, QueryClassificationConfig, ReliabilityConfig, ResourceLimitsConfig, RuntimeConfig, SandboxBackend, SandboxConfig, SchedulerConfig, - SecretsConfig, SecurityConfig, SlackConfig, TelegramConfig, TunnelConfig, WebSearchConfig, + SecretsConfig, SecurityConfig, SlackConfig, StreamMode, TelegramConfig, TunnelConfig, + WebSearchConfig, WebhookConfig, }; @@ -31,6 +32,8 @@ mod tests { let telegram = TelegramConfig { bot_token: "token".into(), allowed_users: vec!["alice".into()], + stream_mode: StreamMode::default(), + draft_update_interval_ms: 1000, }; let discord = DiscordConfig { diff --git a/src/config/schema.rs b/src/config/schema.rs index 5e9d20a..a540af4 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -1440,10 +1440,33 @@ impl Default for ChannelsConfig { } } +/// Streaming mode for channels that support progressive message updates. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum StreamMode { + /// No streaming -- send the complete response as a single message (default). + #[default] + Off, + /// Update a draft message with every flush interval. + Partial, + /// Update a draft message in larger chunks. + Block, +} + +fn default_draft_update_interval_ms() -> u64 { + 1000 +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TelegramConfig { pub bot_token: String, pub allowed_users: Vec, + /// Streaming mode for progressive response delivery via message edits. + #[serde(default)] + pub stream_mode: StreamMode, + /// Minimum interval (ms) between draft message edits to avoid rate limits. + #[serde(default = "default_draft_update_interval_ms")] + pub draft_update_interval_ms: u64, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -2508,6 +2531,8 @@ default_temperature = 0.7 telegram: Some(TelegramConfig { bot_token: "123:ABC".into(), allowed_users: vec!["user1".into()], + stream_mode: StreamMode::default(), + draft_update_interval_ms: default_draft_update_interval_ms(), }), discord: None, slack: None, @@ -2779,11 +2804,23 @@ tool_dispatcher = "xml" let tc = TelegramConfig { bot_token: "123:XYZ".into(), allowed_users: vec!["alice".into(), "bob".into()], + stream_mode: StreamMode::Partial, + draft_update_interval_ms: 500, }; let json = serde_json::to_string(&tc).unwrap(); let parsed: TelegramConfig = serde_json::from_str(&json).unwrap(); assert_eq!(parsed.bot_token, "123:XYZ"); assert_eq!(parsed.allowed_users.len(), 2); + assert_eq!(parsed.stream_mode, StreamMode::Partial); + assert_eq!(parsed.draft_update_interval_ms, 500); + } + + #[test] + fn telegram_config_defaults_stream_off() { + let json = r#"{"bot_token":"tok","allowed_users":[]}"#; + let parsed: TelegramConfig = serde_json::from_str(json).unwrap(); + assert_eq!(parsed.stream_mode, StreamMode::Off); + assert_eq!(parsed.draft_update_interval_ms, 1000); } #[test] diff --git a/src/cron/store.rs b/src/cron/store.rs index 107e793..7d0001a 100644 --- a/src/cron/store.rs +++ b/src/cron/store.rs @@ -337,7 +337,8 @@ pub fn record_run( ) .context("Failed to prune cron run history")?; - tx.commit().context("Failed to commit cron run transaction")?; + tx.commit() + .context("Failed to commit cron run transaction")?; Ok(()) }) } diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index c60cd2d..ecb905d 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -296,6 +296,8 @@ mod tests { config.channels_config.telegram = Some(crate::config::TelegramConfig { bot_token: "token".into(), allowed_users: vec![], + stream_mode: crate::config::StreamMode::default(), + draft_update_interval_ms: 1000, }); assert!(has_supervised_channels(&config)); } diff --git a/src/integrations/registry.rs b/src/integrations/registry.rs index 442fb0f..d2d161b 100644 --- a/src/integrations/registry.rs +++ b/src/integrations/registry.rs @@ -725,7 +725,7 @@ pub fn all_integrations() -> Vec { #[cfg(test)] mod tests { use super::*; - use crate::config::schema::{IMessageConfig, MatrixConfig, TelegramConfig}; + use crate::config::schema::{IMessageConfig, MatrixConfig, StreamMode, TelegramConfig}; use crate::config::Config; #[test] @@ -788,6 +788,8 @@ mod tests { config.channels_config.telegram = Some(TelegramConfig { bot_token: "123:ABC".into(), allowed_users: vec!["user".into()], + stream_mode: StreamMode::default(), + draft_update_interval_ms: 1000, }); let entries = all_integrations(); let tg = entries.iter().find(|e| e.name == "Telegram").unwrap(); diff --git a/src/memory/sqlite.rs b/src/memory/sqlite.rs index 9f2a25c..6faaddb 100644 --- a/src/memory/sqlite.rs +++ b/src/memory/sqlite.rs @@ -337,8 +337,7 @@ impl SqliteMemory { category: Option<&str>, session_id: Option<&str>, ) -> anyhow::Result> { - let mut sql = - "SELECT id, embedding FROM memories WHERE embedding IS NOT NULL".to_string(); + let mut sql = "SELECT id, embedding FROM memories WHERE embedding IS NOT NULL".to_string(); let mut param_values: Vec> = Vec::new(); let mut idx = 1; @@ -500,13 +499,11 @@ impl Memory for SqliteMemory { let session_ref = session_id.as_deref(); // FTS5 BM25 keyword search - let keyword_results = - Self::fts5_search(&conn, &query, limit * 2).unwrap_or_default(); + let keyword_results = Self::fts5_search(&conn, &query, limit * 2).unwrap_or_default(); // Vector similarity search (if embeddings available) let vector_results = if let Some(ref qe) = query_embedding { - Self::vector_search(&conn, qe, limit * 2, None, session_ref) - .unwrap_or_default() + Self::vector_search(&conn, qe, limit * 2, None, session_ref).unwrap_or_default() } else { Vec::new() }; @@ -604,11 +601,7 @@ impl Memory for SqliteMemory { .iter() .enumerate() .map(|(i, _)| { - format!( - "(content LIKE ?{} OR key LIKE ?{})", - i * 2 + 1, - i * 2 + 2 - ) + format!("(content LIKE ?{} OR key LIKE ?{})", i * 2 + 1, i * 2 + 2) }) .collect(); let where_clause = conditions.join(" OR "); diff --git a/src/onboard/wizard.rs b/src/onboard/wizard.rs index 74b13d4..b0cbe24 100644 --- a/src/onboard/wizard.rs +++ b/src/onboard/wizard.rs @@ -1,4 +1,4 @@ -use crate::config::schema::{DingTalkConfig, IrcConfig, QQConfig, WhatsAppConfig}; +use crate::config::schema::{DingTalkConfig, IrcConfig, QQConfig, StreamMode, WhatsAppConfig}; use crate::config::{ AutonomyConfig, BrowserConfig, ChannelsConfig, ComposioConfig, Config, DiscordConfig, HeartbeatConfig, IMessageConfig, MatrixConfig, MemoryConfig, ObservabilityConfig, @@ -2665,6 +2665,8 @@ fn setup_channels() -> Result { config.telegram = Some(TelegramConfig { bot_token: token, allowed_users, + stream_mode: StreamMode::default(), + draft_update_interval_ms: 1000, }); } 1 => { diff --git a/src/tools/git_operations.rs b/src/tools/git_operations.rs index 42338c7..5b2e64e 100644 --- a/src/tools/git_operations.rs +++ b/src/tools/git_operations.rs @@ -735,10 +735,7 @@ mod tests { }); let tool = GitOperationsTool::new(security, tmp.path().to_path_buf()); - let result = tool - .execute(json!({"operation": "branch"})) - .await - .unwrap(); + let result = tool.execute(json!({"operation": "branch"})).await.unwrap(); // Branch listing must not be blocked by read-only autonomy let error_msg = result.error.as_deref().unwrap_or(""); assert!(