From 58958d99912bdae7fd9f6956c44ab2420eea381b Mon Sep 17 00:00:00 2001 From: hayoial Date: Wed, 18 Feb 2026 15:23:25 +0800 Subject: [PATCH] fix: add per-sender conversation history for channel messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Channel messages (Telegram, Discord, etc.) previously had no multi-turn context — each incoming message was processed with a fresh history containing only the system prompt and the current user message. This patch: - Maintains a per-sender conversation history map (Arc>) - Restores prior turns when processing each new message - Saves user + assistant turns after successful LLM response - Caps history at 50 messages per sender to bound memory usage Fixes the channel context continuity issue where the bot would respond with 'I have no context' to every follow-up question. --- src/channels/mod.rs | 145 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 140 insertions(+), 5 deletions(-) diff --git a/src/channels/mod.rs b/src/channels/mod.rs index a5addcc..3424dde 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -45,9 +45,14 @@ use std::collections::HashMap; use std::fmt::Write; use std::path::PathBuf; use std::process::Command; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; +/// Per-sender conversation history for channel messages. +type ConversationHistoryMap = Arc>>>; +/// Maximum history messages to keep per sender. +const MAX_CHANNEL_HISTORY: usize = 50; + /// Maximum characters per injected workspace file (matches `OpenClaw` default). const BOOTSTRAP_MAX_CHARS: usize = 20_000; @@ -73,6 +78,7 @@ struct ChannelRuntimeContext { auto_save_memory: bool, max_tool_iterations: usize, min_relevance_score: f64, + conversation_histories: ConversationHistoryMap, } fn conversation_memory_key(msg: &traits::ChannelMessage) -> String { @@ -212,10 +218,19 @@ async fn process_channel_message(ctx: Arc, msg: traits::C println!(" ⏳ Processing message..."); let started_at = Instant::now(); - let mut history = vec![ - ChatMessage::system(ctx.system_prompt.as_str()), - ChatMessage::user(&enriched_message), - ]; + // Build history from per-sender conversation cache + let history_key = format!("{}_{}", msg.channel, msg.sender); + let mut prior_turns = ctx + .conversation_histories + .lock() + .unwrap_or_else(|e| e.into_inner()) + .get(&history_key) + .cloned() + .unwrap_or_default(); + + let mut history = vec![ChatMessage::system(ctx.system_prompt.as_str())]; + history.append(&mut prior_turns); + history.push(ChatMessage::user(&enriched_message)); if let Some(instructions) = channel_delivery_instructions(&msg.channel) { history.push(ChatMessage::system(instructions)); @@ -311,6 +326,20 @@ async fn process_channel_message(ctx: Arc, msg: traits::C match llm_result { Ok(Ok(response)) => { + // Save user + assistant turn to per-sender history + { + let mut histories = ctx + .conversation_histories + .lock() + .unwrap_or_else(|e| e.into_inner()); + let turns = histories.entry(history_key).or_insert_with(Vec::new); + turns.push(ChatMessage::user(&enriched_message)); + turns.push(ChatMessage::assistant(&response)); + // Trim to MAX_CHANNEL_HISTORY (keep recent turns) + while turns.len() > MAX_CHANNEL_HISTORY { + turns.remove(0); + } + } println!( " 🤖 Reply ({}ms): {}", started_at.elapsed().as_millis(), @@ -1379,6 +1408,7 @@ pub async fn start_channels(config: Config) -> Result<()> { auto_save_memory: config.memory.auto_save, max_tool_iterations: config.agent.max_tool_iterations, min_relevance_score: config.memory.min_relevance_score, + conversation_histories: Arc::new(Mutex::new(HashMap::new())), }); run_message_dispatch_loop(rx, runtime_ctx, max_in_flight_messages).await; @@ -1545,6 +1575,39 @@ mod tests { } } + #[derive(Default)] + struct HistoryCaptureProvider { + calls: std::sync::Mutex>>, + } + + #[async_trait::async_trait] + impl Provider for HistoryCaptureProvider { + async fn chat_with_system( + &self, + _system_prompt: Option<&str>, + _message: &str, + _model: &str, + _temperature: f64, + ) -> anyhow::Result { + Ok("fallback".to_string()) + } + + async fn chat_with_history( + &self, + messages: &[ChatMessage], + _model: &str, + _temperature: f64, + ) -> anyhow::Result { + let snapshot = messages + .iter() + .map(|m| (m.role.clone(), m.content.clone())) + .collect::>(); + let mut calls = self.calls.lock().unwrap_or_else(|e| e.into_inner()); + calls.push(snapshot); + Ok(format!("response-{}", calls.len())) + } + } + struct MockPriceTool; #[async_trait::async_trait] @@ -1605,6 +1668,7 @@ mod tests { auto_save_memory: false, max_tool_iterations: 10, min_relevance_score: 0.0, + conversation_histories: Arc::new(Mutex::new(HashMap::new())), }); process_channel_message( @@ -1648,6 +1712,7 @@ mod tests { auto_save_memory: false, max_tool_iterations: 10, min_relevance_score: 0.0, + conversation_histories: Arc::new(Mutex::new(HashMap::new())), }); process_channel_message( @@ -1745,6 +1810,7 @@ mod tests { auto_save_memory: false, max_tool_iterations: 10, min_relevance_score: 0.0, + conversation_histories: Arc::new(Mutex::new(HashMap::new())), }); let (tx, rx) = tokio::sync::mpsc::channel::(4); @@ -2116,6 +2182,75 @@ mod tests { assert!(context.contains("Age is 45")); } + #[tokio::test] + async fn process_channel_message_restores_per_sender_history_on_follow_ups() { + let channel_impl = Arc::new(RecordingChannel::default()); + let channel: Arc = channel_impl.clone(); + + let mut channels_by_name = HashMap::new(); + channels_by_name.insert(channel.name().to_string(), channel); + + let provider_impl = Arc::new(HistoryCaptureProvider::default()); + + let runtime_ctx = Arc::new(ChannelRuntimeContext { + channels_by_name: Arc::new(channels_by_name), + provider: provider_impl.clone(), + memory: Arc::new(NoopMemory), + tools_registry: Arc::new(vec![]), + observer: Arc::new(NoopObserver), + system_prompt: Arc::new("test-system-prompt".to_string()), + model: Arc::new("test-model".to_string()), + temperature: 0.0, + auto_save_memory: false, + max_tool_iterations: 5, + min_relevance_score: 0.0, + conversation_histories: Arc::new(Mutex::new(HashMap::new())), + }); + + process_channel_message( + runtime_ctx.clone(), + traits::ChannelMessage { + id: "msg-a".to_string(), + sender: "alice".to_string(), + reply_target: "chat-1".to_string(), + content: "hello".to_string(), + channel: "test-channel".to_string(), + timestamp: 1, + }, + ) + .await; + + process_channel_message( + runtime_ctx, + traits::ChannelMessage { + id: "msg-b".to_string(), + sender: "alice".to_string(), + reply_target: "chat-1".to_string(), + content: "follow up".to_string(), + channel: "test-channel".to_string(), + timestamp: 2, + }, + ) + .await; + + let calls = provider_impl + .calls + .lock() + .unwrap_or_else(|e| e.into_inner()); + assert_eq!(calls.len(), 2); + assert_eq!(calls[0].len(), 2); + assert_eq!(calls[0][0].0, "system"); + assert_eq!(calls[0][1].0, "user"); + assert_eq!(calls[1].len(), 4); + assert_eq!(calls[1][0].0, "system"); + assert_eq!(calls[1][1].0, "user"); + assert_eq!(calls[1][2].0, "assistant"); + assert_eq!(calls[1][3].0, "user"); + assert!(calls[1][1].1.contains("hello")); + assert!(calls[1][2].1.contains("response-1")); + assert!(calls[1][3].1.contains("follow up")); + } + // ── AIEOS Identity Tests (Issue #168) ───────────────────────── #[test]