From 2016382f425c812af4e1e1a07164888e62959409 Mon Sep 17 00:00:00 2001 From: Chummy Date: Thu, 19 Feb 2026 20:01:23 +0800 Subject: [PATCH] fix(channels): compact sender history and filter oversized memory context --- src/channels/mod.rs | 183 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 168 insertions(+), 15 deletions(-) diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 2f77512..52303d0 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -78,6 +78,11 @@ const CHANNEL_MAX_IN_FLIGHT_MESSAGES: usize = 64; const CHANNEL_TYPING_REFRESH_INTERVAL_SECS: u64 = 4; const MODEL_CACHE_FILE: &str = "models_cache.json"; const MODEL_CACHE_PREVIEW_LIMIT: usize = 10; +const MEMORY_CONTEXT_MAX_ENTRIES: usize = 4; +const MEMORY_CONTEXT_ENTRY_MAX_CHARS: usize = 800; +const MEMORY_CONTEXT_MAX_CHARS: usize = 4_000; +const CHANNEL_HISTORY_COMPACT_KEEP_MESSAGES: usize = 12; +const CHANNEL_HISTORY_COMPACT_CONTENT_CHARS: usize = 600; type ProviderCacheMap = Arc>>>; type RouteSelectionMap = Arc>>; @@ -254,6 +259,44 @@ fn clear_sender_history(ctx: &ChannelRuntimeContext, sender_key: &str) { .remove(sender_key); } +fn compact_sender_history(ctx: &ChannelRuntimeContext, sender_key: &str) -> bool { + let mut histories = ctx + .conversation_histories + .lock() + .unwrap_or_else(|e| e.into_inner()); + + let Some(turns) = histories.get_mut(sender_key) else { + return false; + }; + + if turns.is_empty() { + return false; + } + + let keep_from = turns + .len() + .saturating_sub(CHANNEL_HISTORY_COMPACT_KEEP_MESSAGES); + let mut compacted = turns[keep_from..].to_vec(); + + for turn in &mut compacted { + if turn.content.chars().count() > CHANNEL_HISTORY_COMPACT_CONTENT_CHARS { + turn.content = + truncate_with_ellipsis(&turn.content, CHANNEL_HISTORY_COMPACT_CONTENT_CHARS); + } + } + + *turns = compacted; + true +} + +fn should_skip_memory_context_entry(key: &str, content: &str) -> bool { + if key.trim().to_ascii_lowercase().ends_with("_history") { + return true; + } + + content.chars().count() > MEMORY_CONTEXT_MAX_CHARS +} + fn is_context_window_overflow_error(err: &anyhow::Error) -> bool { let lower = err.to_string().to_lowercase(); [ @@ -478,19 +521,43 @@ async fn build_memory_context( let mut context = String::new(); if let Ok(entries) = mem.recall(user_msg, 5, None).await { - let relevant: Vec<_> = entries - .iter() - .filter(|e| match e.score { - Some(score) => score >= min_relevance_score, - None => true, // keep entries without a score (e.g. non-vector backends) - }) - .collect(); + let mut included = 0usize; + let mut used_chars = 0usize; - if !relevant.is_empty() { - context.push_str("[Memory context]\n"); - for entry in &relevant { - let _ = writeln!(context, "- {}: {}", entry.key, entry.content); + for entry in entries.iter().filter(|e| match e.score { + Some(score) => score >= min_relevance_score, + None => true, // keep entries without a score (e.g. non-vector backends) + }) { + if included >= MEMORY_CONTEXT_MAX_ENTRIES { + break; } + + if should_skip_memory_context_entry(&entry.key, &entry.content) { + continue; + } + + let content = if entry.content.chars().count() > MEMORY_CONTEXT_ENTRY_MAX_CHARS { + truncate_with_ellipsis(&entry.content, MEMORY_CONTEXT_ENTRY_MAX_CHARS) + } else { + entry.content.clone() + }; + + let line = format!("- {}: {}\n", entry.key, content); + let line_chars = line.chars().count(); + if used_chars + line_chars > MEMORY_CONTEXT_MAX_CHARS { + break; + } + + if included == 0 { + context.push_str("[Memory context]\n"); + } + + context.push_str(&line); + used_chars += line_chars; + included += 1; + } + + if included > 0 { context.push('\n'); } } @@ -809,11 +876,16 @@ async fn process_channel_message(ctx: Arc, msg: traits::C } Ok(Err(e)) => { if is_context_window_overflow_error(&e) { - clear_sender_history(ctx.as_ref(), &history_key); - let error_text = "⚠️ Context window exceeded for this conversation. I cleared this sender history. Please resend your last message."; + let compacted = compact_sender_history(ctx.as_ref(), &history_key); + let error_text = if compacted { + "⚠️ Context window exceeded for this conversation. I compacted recent history and kept the latest context. Please resend your last message." + } else { + "⚠️ Context window exceeded for this conversation. Please resend your last message." + }; eprintln!( - " ⚠️ Context window exceeded after {}ms; sender history cleared", - started_at.elapsed().as_millis() + " ⚠️ Context window exceeded after {}ms; sender history compacted={}", + started_at.elapsed().as_millis(), + compacted ); if let Some(channel) = target_channel.as_ref() { if let Some(ref draft_id) = draft_message_id { @@ -2059,6 +2131,87 @@ mod tests { assert!(!is_context_window_overflow_error(&other_err)); } + #[test] + fn memory_context_skip_rules_exclude_history_blobs() { + assert!(should_skip_memory_context_entry( + "telegram_123_history", + r#"[{"role":"user"}]"# + )); + assert!(!should_skip_memory_context_entry("telegram_123_45", "hi")); + } + + #[test] + fn compact_sender_history_keeps_recent_truncated_messages() { + let mut histories = HashMap::new(); + let sender = "telegram_u1".to_string(); + histories.insert( + sender.clone(), + (0..20) + .map(|idx| { + let content = format!("msg-{idx}-{}", "x".repeat(700)); + if idx % 2 == 0 { + ChatMessage::user(content) + } else { + ChatMessage::assistant(content) + } + }) + .collect::>(), + ); + + let ctx = ChannelRuntimeContext { + channels_by_name: Arc::new(HashMap::new()), + provider: Arc::new(DummyProvider), + default_provider: Arc::new("test-provider".to_string()), + memory: Arc::new(NoopMemory), + tools_registry: Arc::new(vec![]), + observer: Arc::new(NoopObserver), + system_prompt: Arc::new("system".to_string()), + model: Arc::new("test-model".to_string()), + temperature: 0.0, + auto_save_memory: false, + max_tool_iterations: 5, + min_relevance_score: 0.0, + conversation_histories: Arc::new(Mutex::new(histories)), + provider_cache: Arc::new(Mutex::new(HashMap::new())), + route_overrides: Arc::new(Mutex::new(HashMap::new())), + api_key: None, + api_url: None, + reliability: Arc::new(crate::config::ReliabilityConfig::default()), + provider_runtime_options: providers::ProviderRuntimeOptions::default(), + workspace_dir: Arc::new(std::env::temp_dir()), + message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, + }; + + assert!(compact_sender_history(&ctx, &sender)); + + let histories = ctx + .conversation_histories + .lock() + .unwrap_or_else(|e| e.into_inner()); + let kept = histories + .get(&sender) + .expect("sender history should remain"); + assert_eq!(kept.len(), CHANNEL_HISTORY_COMPACT_KEEP_MESSAGES); + assert!(kept + .iter() + .all(|turn| turn.content.chars().count() <= CHANNEL_HISTORY_COMPACT_CONTENT_CHARS)); + } + + struct DummyProvider; + + #[async_trait::async_trait] + impl Provider for DummyProvider { + async fn chat_with_system( + &self, + _system_prompt: Option<&str>, + _message: &str, + _model: &str, + _temperature: f64, + ) -> anyhow::Result { + Ok("ok".to_string()) + } + } + #[derive(Default)] struct RecordingChannel { sent_messages: tokio::sync::Mutex>,