fix: add per-sender conversation history for channel messages

Channel messages (Telegram, Discord, etc.) previously had no multi-turn
context — each incoming message was processed with a fresh history
containing only the system prompt and the current user message.

This patch:
- Maintains a per-sender conversation history map (Arc<Mutex<HashMap>>)
- Restores prior turns when processing each new message
- Saves user + assistant turns after successful LLM response
- Caps history at 50 messages per sender to bound memory usage

Fixes the channel context continuity issue where the bot would respond
with 'I have no context' to every follow-up question.
This commit is contained in:
hayoial 2026-02-18 15:23:25 +08:00 committed by Chummy
parent f1db63219c
commit 58958d9991

View file

@ -45,9 +45,14 @@ use std::collections::HashMap;
use std::fmt::Write;
use std::path::PathBuf;
use std::process::Command;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
/// Per-sender conversation history for channel messages.
type ConversationHistoryMap = Arc<Mutex<HashMap<String, Vec<ChatMessage>>>>;
/// Maximum history messages to keep per sender.
const MAX_CHANNEL_HISTORY: usize = 50;
/// Maximum characters per injected workspace file (matches `OpenClaw` default).
const BOOTSTRAP_MAX_CHARS: usize = 20_000;
@ -73,6 +78,7 @@ struct ChannelRuntimeContext {
auto_save_memory: bool,
max_tool_iterations: usize,
min_relevance_score: f64,
conversation_histories: ConversationHistoryMap,
}
fn conversation_memory_key(msg: &traits::ChannelMessage) -> String {
@ -212,10 +218,19 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
println!(" ⏳ Processing message...");
let started_at = Instant::now();
let mut history = vec![
ChatMessage::system(ctx.system_prompt.as_str()),
ChatMessage::user(&enriched_message),
];
// Build history from per-sender conversation cache
let history_key = format!("{}_{}", msg.channel, msg.sender);
let mut prior_turns = ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner())
.get(&history_key)
.cloned()
.unwrap_or_default();
let mut history = vec![ChatMessage::system(ctx.system_prompt.as_str())];
history.append(&mut prior_turns);
history.push(ChatMessage::user(&enriched_message));
if let Some(instructions) = channel_delivery_instructions(&msg.channel) {
history.push(ChatMessage::system(instructions));
@ -311,6 +326,20 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
match llm_result {
Ok(Ok(response)) => {
// Save user + assistant turn to per-sender history
{
let mut histories = ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner());
let turns = histories.entry(history_key).or_insert_with(Vec::new);
turns.push(ChatMessage::user(&enriched_message));
turns.push(ChatMessage::assistant(&response));
// Trim to MAX_CHANNEL_HISTORY (keep recent turns)
while turns.len() > MAX_CHANNEL_HISTORY {
turns.remove(0);
}
}
println!(
" 🤖 Reply ({}ms): {}",
started_at.elapsed().as_millis(),
@ -1379,6 +1408,7 @@ pub async fn start_channels(config: Config) -> Result<()> {
auto_save_memory: config.memory.auto_save,
max_tool_iterations: config.agent.max_tool_iterations,
min_relevance_score: config.memory.min_relevance_score,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
});
run_message_dispatch_loop(rx, runtime_ctx, max_in_flight_messages).await;
@ -1545,6 +1575,39 @@ mod tests {
}
}
#[derive(Default)]
struct HistoryCaptureProvider {
calls: std::sync::Mutex<Vec<Vec<(String, String)>>>,
}
#[async_trait::async_trait]
impl Provider for HistoryCaptureProvider {
async fn chat_with_system(
&self,
_system_prompt: Option<&str>,
_message: &str,
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
Ok("fallback".to_string())
}
async fn chat_with_history(
&self,
messages: &[ChatMessage],
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
let snapshot = messages
.iter()
.map(|m| (m.role.clone(), m.content.clone()))
.collect::<Vec<_>>();
let mut calls = self.calls.lock().unwrap_or_else(|e| e.into_inner());
calls.push(snapshot);
Ok(format!("response-{}", calls.len()))
}
}
struct MockPriceTool;
#[async_trait::async_trait]
@ -1605,6 +1668,7 @@ mod tests {
auto_save_memory: false,
max_tool_iterations: 10,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
});
process_channel_message(
@ -1648,6 +1712,7 @@ mod tests {
auto_save_memory: false,
max_tool_iterations: 10,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
});
process_channel_message(
@ -1745,6 +1810,7 @@ mod tests {
auto_save_memory: false,
max_tool_iterations: 10,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
});
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(4);
@ -2116,6 +2182,75 @@ mod tests {
assert!(context.contains("Age is 45"));
}
#[tokio::test]
async fn process_channel_message_restores_per_sender_history_on_follow_ups() {
let channel_impl = Arc::new(RecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let provider_impl = Arc::new(HistoryCaptureProvider::default());
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: provider_impl.clone(),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
});
process_channel_message(
runtime_ctx.clone(),
traits::ChannelMessage {
id: "msg-a".to_string(),
sender: "alice".to_string(),
reply_target: "chat-1".to_string(),
content: "hello".to_string(),
channel: "test-channel".to_string(),
timestamp: 1,
},
)
.await;
process_channel_message(
runtime_ctx,
traits::ChannelMessage {
id: "msg-b".to_string(),
sender: "alice".to_string(),
reply_target: "chat-1".to_string(),
content: "follow up".to_string(),
channel: "test-channel".to_string(),
timestamp: 2,
},
)
.await;
let calls = provider_impl
.calls
.lock()
.unwrap_or_else(|e| e.into_inner());
assert_eq!(calls.len(), 2);
assert_eq!(calls[0].len(), 2);
assert_eq!(calls[0][0].0, "system");
assert_eq!(calls[0][1].0, "user");
assert_eq!(calls[1].len(), 4);
assert_eq!(calls[1][0].0, "system");
assert_eq!(calls[1][1].0, "user");
assert_eq!(calls[1][2].0, "assistant");
assert_eq!(calls[1][3].0, "user");
assert!(calls[1][1].1.contains("hello"));
assert!(calls[1][2].1.contains("response-1"));
assert!(calls[1][3].1.contains("follow up"));
}
// ── AIEOS Identity Tests (Issue #168) ─────────────────────────
#[test]