fix(channels): compact sender history and filter oversized memory context
This commit is contained in:
parent
2c07fb1792
commit
2016382f42
1 changed files with 168 additions and 15 deletions
|
|
@ -78,6 +78,11 @@ const CHANNEL_MAX_IN_FLIGHT_MESSAGES: usize = 64;
|
||||||
const CHANNEL_TYPING_REFRESH_INTERVAL_SECS: u64 = 4;
|
const CHANNEL_TYPING_REFRESH_INTERVAL_SECS: u64 = 4;
|
||||||
const MODEL_CACHE_FILE: &str = "models_cache.json";
|
const MODEL_CACHE_FILE: &str = "models_cache.json";
|
||||||
const MODEL_CACHE_PREVIEW_LIMIT: usize = 10;
|
const MODEL_CACHE_PREVIEW_LIMIT: usize = 10;
|
||||||
|
const MEMORY_CONTEXT_MAX_ENTRIES: usize = 4;
|
||||||
|
const MEMORY_CONTEXT_ENTRY_MAX_CHARS: usize = 800;
|
||||||
|
const MEMORY_CONTEXT_MAX_CHARS: usize = 4_000;
|
||||||
|
const CHANNEL_HISTORY_COMPACT_KEEP_MESSAGES: usize = 12;
|
||||||
|
const CHANNEL_HISTORY_COMPACT_CONTENT_CHARS: usize = 600;
|
||||||
|
|
||||||
type ProviderCacheMap = Arc<Mutex<HashMap<String, Arc<dyn Provider>>>>;
|
type ProviderCacheMap = Arc<Mutex<HashMap<String, Arc<dyn Provider>>>>;
|
||||||
type RouteSelectionMap = Arc<Mutex<HashMap<String, ChannelRouteSelection>>>;
|
type RouteSelectionMap = Arc<Mutex<HashMap<String, ChannelRouteSelection>>>;
|
||||||
|
|
@ -254,6 +259,44 @@ fn clear_sender_history(ctx: &ChannelRuntimeContext, sender_key: &str) {
|
||||||
.remove(sender_key);
|
.remove(sender_key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn compact_sender_history(ctx: &ChannelRuntimeContext, sender_key: &str) -> bool {
|
||||||
|
let mut histories = ctx
|
||||||
|
.conversation_histories
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(|e| e.into_inner());
|
||||||
|
|
||||||
|
let Some(turns) = histories.get_mut(sender_key) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
if turns.is_empty() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
let keep_from = turns
|
||||||
|
.len()
|
||||||
|
.saturating_sub(CHANNEL_HISTORY_COMPACT_KEEP_MESSAGES);
|
||||||
|
let mut compacted = turns[keep_from..].to_vec();
|
||||||
|
|
||||||
|
for turn in &mut compacted {
|
||||||
|
if turn.content.chars().count() > CHANNEL_HISTORY_COMPACT_CONTENT_CHARS {
|
||||||
|
turn.content =
|
||||||
|
truncate_with_ellipsis(&turn.content, CHANNEL_HISTORY_COMPACT_CONTENT_CHARS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*turns = compacted;
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
fn should_skip_memory_context_entry(key: &str, content: &str) -> bool {
|
||||||
|
if key.trim().to_ascii_lowercase().ends_with("_history") {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
content.chars().count() > MEMORY_CONTEXT_MAX_CHARS
|
||||||
|
}
|
||||||
|
|
||||||
fn is_context_window_overflow_error(err: &anyhow::Error) -> bool {
|
fn is_context_window_overflow_error(err: &anyhow::Error) -> bool {
|
||||||
let lower = err.to_string().to_lowercase();
|
let lower = err.to_string().to_lowercase();
|
||||||
[
|
[
|
||||||
|
|
@ -478,19 +521,43 @@ async fn build_memory_context(
|
||||||
let mut context = String::new();
|
let mut context = String::new();
|
||||||
|
|
||||||
if let Ok(entries) = mem.recall(user_msg, 5, None).await {
|
if let Ok(entries) = mem.recall(user_msg, 5, None).await {
|
||||||
let relevant: Vec<_> = entries
|
let mut included = 0usize;
|
||||||
.iter()
|
let mut used_chars = 0usize;
|
||||||
.filter(|e| match e.score {
|
|
||||||
Some(score) => score >= min_relevance_score,
|
|
||||||
None => true, // keep entries without a score (e.g. non-vector backends)
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
if !relevant.is_empty() {
|
for entry in entries.iter().filter(|e| match e.score {
|
||||||
context.push_str("[Memory context]\n");
|
Some(score) => score >= min_relevance_score,
|
||||||
for entry in &relevant {
|
None => true, // keep entries without a score (e.g. non-vector backends)
|
||||||
let _ = writeln!(context, "- {}: {}", entry.key, entry.content);
|
}) {
|
||||||
|
if included >= MEMORY_CONTEXT_MAX_ENTRIES {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if should_skip_memory_context_entry(&entry.key, &entry.content) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let content = if entry.content.chars().count() > MEMORY_CONTEXT_ENTRY_MAX_CHARS {
|
||||||
|
truncate_with_ellipsis(&entry.content, MEMORY_CONTEXT_ENTRY_MAX_CHARS)
|
||||||
|
} else {
|
||||||
|
entry.content.clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
let line = format!("- {}: {}\n", entry.key, content);
|
||||||
|
let line_chars = line.chars().count();
|
||||||
|
if used_chars + line_chars > MEMORY_CONTEXT_MAX_CHARS {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if included == 0 {
|
||||||
|
context.push_str("[Memory context]\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
context.push_str(&line);
|
||||||
|
used_chars += line_chars;
|
||||||
|
included += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if included > 0 {
|
||||||
context.push('\n');
|
context.push('\n');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -809,11 +876,16 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
|
||||||
}
|
}
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
if is_context_window_overflow_error(&e) {
|
if is_context_window_overflow_error(&e) {
|
||||||
clear_sender_history(ctx.as_ref(), &history_key);
|
let compacted = compact_sender_history(ctx.as_ref(), &history_key);
|
||||||
let error_text = "⚠️ Context window exceeded for this conversation. I cleared this sender history. Please resend your last message.";
|
let error_text = if compacted {
|
||||||
|
"⚠️ Context window exceeded for this conversation. I compacted recent history and kept the latest context. Please resend your last message."
|
||||||
|
} else {
|
||||||
|
"⚠️ Context window exceeded for this conversation. Please resend your last message."
|
||||||
|
};
|
||||||
eprintln!(
|
eprintln!(
|
||||||
" ⚠️ Context window exceeded after {}ms; sender history cleared",
|
" ⚠️ Context window exceeded after {}ms; sender history compacted={}",
|
||||||
started_at.elapsed().as_millis()
|
started_at.elapsed().as_millis(),
|
||||||
|
compacted
|
||||||
);
|
);
|
||||||
if let Some(channel) = target_channel.as_ref() {
|
if let Some(channel) = target_channel.as_ref() {
|
||||||
if let Some(ref draft_id) = draft_message_id {
|
if let Some(ref draft_id) = draft_message_id {
|
||||||
|
|
@ -2059,6 +2131,87 @@ mod tests {
|
||||||
assert!(!is_context_window_overflow_error(&other_err));
|
assert!(!is_context_window_overflow_error(&other_err));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn memory_context_skip_rules_exclude_history_blobs() {
|
||||||
|
assert!(should_skip_memory_context_entry(
|
||||||
|
"telegram_123_history",
|
||||||
|
r#"[{"role":"user"}]"#
|
||||||
|
));
|
||||||
|
assert!(!should_skip_memory_context_entry("telegram_123_45", "hi"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn compact_sender_history_keeps_recent_truncated_messages() {
|
||||||
|
let mut histories = HashMap::new();
|
||||||
|
let sender = "telegram_u1".to_string();
|
||||||
|
histories.insert(
|
||||||
|
sender.clone(),
|
||||||
|
(0..20)
|
||||||
|
.map(|idx| {
|
||||||
|
let content = format!("msg-{idx}-{}", "x".repeat(700));
|
||||||
|
if idx % 2 == 0 {
|
||||||
|
ChatMessage::user(content)
|
||||||
|
} else {
|
||||||
|
ChatMessage::assistant(content)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let ctx = ChannelRuntimeContext {
|
||||||
|
channels_by_name: Arc::new(HashMap::new()),
|
||||||
|
provider: Arc::new(DummyProvider),
|
||||||
|
default_provider: Arc::new("test-provider".to_string()),
|
||||||
|
memory: Arc::new(NoopMemory),
|
||||||
|
tools_registry: Arc::new(vec![]),
|
||||||
|
observer: Arc::new(NoopObserver),
|
||||||
|
system_prompt: Arc::new("system".to_string()),
|
||||||
|
model: Arc::new("test-model".to_string()),
|
||||||
|
temperature: 0.0,
|
||||||
|
auto_save_memory: false,
|
||||||
|
max_tool_iterations: 5,
|
||||||
|
min_relevance_score: 0.0,
|
||||||
|
conversation_histories: Arc::new(Mutex::new(histories)),
|
||||||
|
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
api_key: None,
|
||||||
|
api_url: None,
|
||||||
|
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
|
||||||
|
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
|
||||||
|
workspace_dir: Arc::new(std::env::temp_dir()),
|
||||||
|
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
|
||||||
|
};
|
||||||
|
|
||||||
|
assert!(compact_sender_history(&ctx, &sender));
|
||||||
|
|
||||||
|
let histories = ctx
|
||||||
|
.conversation_histories
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(|e| e.into_inner());
|
||||||
|
let kept = histories
|
||||||
|
.get(&sender)
|
||||||
|
.expect("sender history should remain");
|
||||||
|
assert_eq!(kept.len(), CHANNEL_HISTORY_COMPACT_KEEP_MESSAGES);
|
||||||
|
assert!(kept
|
||||||
|
.iter()
|
||||||
|
.all(|turn| turn.content.chars().count() <= CHANNEL_HISTORY_COMPACT_CONTENT_CHARS));
|
||||||
|
}
|
||||||
|
|
||||||
|
struct DummyProvider;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl Provider for DummyProvider {
|
||||||
|
async fn chat_with_system(
|
||||||
|
&self,
|
||||||
|
_system_prompt: Option<&str>,
|
||||||
|
_message: &str,
|
||||||
|
_model: &str,
|
||||||
|
_temperature: f64,
|
||||||
|
) -> anyhow::Result<String> {
|
||||||
|
Ok("ok".to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
struct RecordingChannel {
|
struct RecordingChannel {
|
||||||
sent_messages: tokio::sync::Mutex<Vec<String>>,
|
sent_messages: tokio::sync::Mutex<Vec<String>>,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue