fix(channels): add reply_to field to ChannelMessage for correct reply routing

ChannelMessage.sender was used both for display (username) and as the
reply target in Channel::send(). For Telegram, sender is the username
(e.g. "unknown") while send() requires the numeric chat_id, causing
"Bad Request: chat not found" errors.

Add a dedicated reply_to field to ChannelMessage that stores the
channel-specific reply address (Telegram chat_id, Discord channel_id,
Slack channel, etc.). Update all channel implementations and dispatch
code to use reply_to for send/start_typing/stop_typing calls.

This also fixes the same latent bug in Discord and Slack channels where
sender (user ID) was incorrectly passed as the reply target.
This commit is contained in:
chenmi 2026-02-17 09:13:30 +08:00 committed by Chummy
parent e21285f453
commit 18952f9a2b
15 changed files with 42 additions and 10 deletions

View file

@ -12,6 +12,8 @@ use tokio::sync::mpsc;
pub struct ChannelMessage { pub struct ChannelMessage {
pub id: String, pub id: String,
pub sender: String, pub sender: String,
/// Channel-specific reply address (e.g. Telegram chat_id, Discord channel_id).
pub reply_to: String,
pub content: String, pub content: String,
pub channel: String, pub channel: String,
pub timestamp: u64, pub timestamp: u64,
@ -90,9 +92,12 @@ impl Channel for TelegramChannel {
continue; continue;
} }
let chat_id = msg["chat"]["id"].to_string();
let channel_msg = ChannelMessage { let channel_msg = ChannelMessage {
id: msg["message_id"].to_string(), id: msg["message_id"].to_string(),
sender, sender,
reply_to: chat_id,
content: msg["text"].as_str().unwrap_or("").to_string(), content: msg["text"].as_str().unwrap_or("").to_string(),
channel: "telegram".into(), channel: "telegram".into(),
timestamp: msg["date"].as_u64().unwrap_or(0), timestamp: msg["date"].as_u64().unwrap_or(0),

View file

@ -40,6 +40,7 @@ impl Channel for CliChannel {
let msg = ChannelMessage { let msg = ChannelMessage {
id: Uuid::new_v4().to_string(), id: Uuid::new_v4().to_string(),
sender: "user".to_string(), sender: "user".to_string(),
reply_to: "user".to_string(),
content: line, content: line,
channel: "cli".to_string(), channel: "cli".to_string(),
timestamp: std::time::SystemTime::now() timestamp: std::time::SystemTime::now()
@ -90,6 +91,7 @@ mod tests {
let msg = ChannelMessage { let msg = ChannelMessage {
id: "test-id".into(), id: "test-id".into(),
sender: "user".into(), sender: "user".into(),
reply_to: "user".into(),
content: "hello".into(), content: "hello".into(),
channel: "cli".into(), channel: "cli".into(),
timestamp: 1_234_567_890, timestamp: 1_234_567_890,
@ -106,6 +108,7 @@ mod tests {
let msg = ChannelMessage { let msg = ChannelMessage {
id: "id".into(), id: "id".into(),
sender: "s".into(), sender: "s".into(),
reply_to: "s".into(),
content: "c".into(), content: "c".into(),
channel: "ch".into(), channel: "ch".into(),
timestamp: 0, timestamp: 0,

View file

@ -229,6 +229,7 @@ impl Channel for DingTalkChannel {
let channel_msg = ChannelMessage { let channel_msg = ChannelMessage {
id: Uuid::new_v4().to_string(), id: Uuid::new_v4().to_string(),
sender: sender_id.to_string(), sender: sender_id.to_string(),
reply_to: sender_id.to_string(),
content: content.to_string(), content: content.to_string(),
channel: "dingtalk".to_string(), channel: "dingtalk".to_string(),
timestamp: std::time::SystemTime::now() timestamp: std::time::SystemTime::now()

View file

@ -353,6 +353,7 @@ impl Channel for DiscordChannel {
format!("discord_{message_id}") format!("discord_{message_id}")
}, },
sender: author_id.to_string(), sender: author_id.to_string(),
reply_to: channel_id.clone(),
content: content.to_string(), content: content.to_string(),
channel: "discord".to_string(), channel: "discord".to_string(),
timestamp: std::time::SystemTime::now() timestamp: std::time::SystemTime::now()

View file

@ -428,7 +428,8 @@ impl Channel for EmailChannel {
} // MutexGuard dropped before await } // MutexGuard dropped before await
let msg = ChannelMessage { let msg = ChannelMessage {
id, id,
sender, sender: sender.clone(),
reply_to: sender,
content, content,
channel: "email".to_string(), channel: "email".to_string(),
timestamp: ts, timestamp: ts,

View file

@ -172,6 +172,7 @@ end tell"#
let msg = ChannelMessage { let msg = ChannelMessage {
id: rowid.to_string(), id: rowid.to_string(),
sender: sender.clone(), sender: sender.clone(),
reply_to: sender.clone(),
content: text, content: text,
channel: "imessage".to_string(), channel: "imessage".to_string(),
timestamp: std::time::SystemTime::now() timestamp: std::time::SystemTime::now()

View file

@ -565,7 +565,8 @@ impl Channel for IrcChannel {
let seq = MSG_SEQ.fetch_add(1, Ordering::Relaxed); let seq = MSG_SEQ.fetch_add(1, Ordering::Relaxed);
let channel_msg = ChannelMessage { let channel_msg = ChannelMessage {
id: format!("irc_{}_{seq}", chrono::Utc::now().timestamp_millis()), id: format!("irc_{}_{seq}", chrono::Utc::now().timestamp_millis()),
sender: reply_to, sender: reply_to.clone(),
reply_to,
content, content,
channel: "irc".to_string(), channel: "irc".to_string(),
timestamp: std::time::SystemTime::now() timestamp: std::time::SystemTime::now()

View file

@ -613,6 +613,7 @@ impl LarkChannel {
messages.push(ChannelMessage { messages.push(ChannelMessage {
id: Uuid::new_v4().to_string(), id: Uuid::new_v4().to_string(),
sender: chat_id.to_string(), sender: chat_id.to_string(),
reply_to: chat_id.to_string(),
content: text, content: text,
channel: "lark".to_string(), channel: "lark".to_string(),
timestamp, timestamp,

View file

@ -230,6 +230,7 @@ impl Channel for MatrixChannel {
let msg = ChannelMessage { let msg = ChannelMessage {
id: format!("mx_{}", chrono::Utc::now().timestamp_millis()), id: format!("mx_{}", chrono::Utc::now().timestamp_millis()),
sender: event.sender.clone(), sender: event.sender.clone(),
reply_to: event.sender.clone(),
content: body.clone(), content: body.clone(),
channel: "matrix".to_string(), channel: "matrix".to_string(),
timestamp: std::time::SystemTime::now() timestamp: std::time::SystemTime::now()

View file

@ -171,7 +171,7 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
let target_channel = ctx.channels_by_name.get(&msg.channel).cloned(); let target_channel = ctx.channels_by_name.get(&msg.channel).cloned();
if let Some(channel) = target_channel.as_ref() { if let Some(channel) = target_channel.as_ref() {
if let Err(e) = channel.start_typing(&msg.sender).await { if let Err(e) = channel.start_typing(&msg.reply_to).await {
tracing::debug!("Failed to start typing on {}: {e}", channel.name()); tracing::debug!("Failed to start typing on {}: {e}", channel.name());
} }
} }
@ -200,7 +200,7 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
.await; .await;
if let Some(channel) = target_channel.as_ref() { if let Some(channel) = target_channel.as_ref() {
if let Err(e) = channel.stop_typing(&msg.sender).await { if let Err(e) = channel.stop_typing(&msg.reply_to).await {
tracing::debug!("Failed to stop typing on {}: {e}", channel.name()); tracing::debug!("Failed to stop typing on {}: {e}", channel.name());
} }
} }
@ -213,7 +213,7 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
truncate_with_ellipsis(&response, 80) truncate_with_ellipsis(&response, 80)
); );
if let Some(channel) = target_channel.as_ref() { if let Some(channel) = target_channel.as_ref() {
if let Err(e) = channel.send(&response, &msg.sender).await { if let Err(e) = channel.send(&response, &msg.reply_to).await {
eprintln!(" ❌ Failed to reply on {}: {e}", channel.name()); eprintln!(" ❌ Failed to reply on {}: {e}", channel.name());
} }
} }
@ -224,7 +224,7 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
started_at.elapsed().as_millis() started_at.elapsed().as_millis()
); );
if let Some(channel) = target_channel.as_ref() { if let Some(channel) = target_channel.as_ref() {
let _ = channel.send(&format!("⚠️ Error: {e}"), &msg.sender).await; let _ = channel.send(&format!("⚠️ Error: {e}"), &msg.reply_to).await;
} }
} }
Err(_) => { Err(_) => {
@ -241,7 +241,7 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
let _ = channel let _ = channel
.send( .send(
"⚠️ Request timed out while waiting for the model. Please try again.", "⚠️ Request timed out while waiting for the model. Please try again.",
&msg.sender, &msg.reply_to,
) )
.await; .await;
} }
@ -1232,6 +1232,7 @@ mod tests {
traits::ChannelMessage { traits::ChannelMessage {
id: "msg-1".to_string(), id: "msg-1".to_string(),
sender: "alice".to_string(), sender: "alice".to_string(),
reply_to: "alice".to_string(),
content: "What is the BTC price now?".to_string(), content: "What is the BTC price now?".to_string(),
channel: "test-channel".to_string(), channel: "test-channel".to_string(),
timestamp: 1, timestamp: 1,
@ -1321,6 +1322,7 @@ mod tests {
tx.send(traits::ChannelMessage { tx.send(traits::ChannelMessage {
id: "1".to_string(), id: "1".to_string(),
sender: "alice".to_string(), sender: "alice".to_string(),
reply_to: "alice".to_string(),
content: "hello".to_string(), content: "hello".to_string(),
channel: "test-channel".to_string(), channel: "test-channel".to_string(),
timestamp: 1, timestamp: 1,
@ -1330,6 +1332,7 @@ mod tests {
tx.send(traits::ChannelMessage { tx.send(traits::ChannelMessage {
id: "2".to_string(), id: "2".to_string(),
sender: "bob".to_string(), sender: "bob".to_string(),
reply_to: "bob".to_string(),
content: "world".to_string(), content: "world".to_string(),
channel: "test-channel".to_string(), channel: "test-channel".to_string(),
timestamp: 2, timestamp: 2,
@ -1573,6 +1576,7 @@ mod tests {
let msg = traits::ChannelMessage { let msg = traits::ChannelMessage {
id: "msg_abc123".into(), id: "msg_abc123".into(),
sender: "U123".into(), sender: "U123".into(),
reply_to: "U123".into(),
content: "hello".into(), content: "hello".into(),
channel: "slack".into(), channel: "slack".into(),
timestamp: 1, timestamp: 1,
@ -1586,6 +1590,7 @@ mod tests {
let msg1 = traits::ChannelMessage { let msg1 = traits::ChannelMessage {
id: "msg_1".into(), id: "msg_1".into(),
sender: "U123".into(), sender: "U123".into(),
reply_to: "U123".into(),
content: "first".into(), content: "first".into(),
channel: "slack".into(), channel: "slack".into(),
timestamp: 1, timestamp: 1,
@ -1593,6 +1598,7 @@ mod tests {
let msg2 = traits::ChannelMessage { let msg2 = traits::ChannelMessage {
id: "msg_2".into(), id: "msg_2".into(),
sender: "U123".into(), sender: "U123".into(),
reply_to: "U123".into(),
content: "second".into(), content: "second".into(),
channel: "slack".into(), channel: "slack".into(),
timestamp: 2, timestamp: 2,
@ -1612,6 +1618,7 @@ mod tests {
let msg1 = traits::ChannelMessage { let msg1 = traits::ChannelMessage {
id: "msg_1".into(), id: "msg_1".into(),
sender: "U123".into(), sender: "U123".into(),
reply_to: "U123".into(),
content: "I'm Paul".into(), content: "I'm Paul".into(),
channel: "slack".into(), channel: "slack".into(),
timestamp: 1, timestamp: 1,
@ -1619,6 +1626,7 @@ mod tests {
let msg2 = traits::ChannelMessage { let msg2 = traits::ChannelMessage {
id: "msg_2".into(), id: "msg_2".into(),
sender: "U123".into(), sender: "U123".into(),
reply_to: "U123".into(),
content: "I'm 45".into(), content: "I'm 45".into(),
channel: "slack".into(), channel: "slack".into(),
timestamp: 2, timestamp: 2,

View file

@ -161,6 +161,7 @@ impl Channel for SlackChannel {
let channel_msg = ChannelMessage { let channel_msg = ChannelMessage {
id: format!("slack_{channel_id}_{ts}"), id: format!("slack_{channel_id}_{ts}"),
sender: user.to_string(), sender: user.to_string(),
reply_to: channel_id.to_string(),
content: text.to_string(), content: text.to_string(),
channel: "slack".to_string(), channel: "slack".to_string(),
timestamp: std::time::SystemTime::now() timestamp: std::time::SystemTime::now()

View file

@ -598,6 +598,7 @@ Allowlist Telegram @username or numeric user ID, then run `zeroclaw onboard --ch
let msg = ChannelMessage { let msg = ChannelMessage {
id: format!("telegram_{chat_id}_{message_id}"), id: format!("telegram_{chat_id}_{message_id}"),
sender: username.to_string(), sender: username.to_string(),
reply_to: chat_id.clone(),
content: text.to_string(), content: text.to_string(),
channel: "telegram".to_string(), channel: "telegram".to_string(),
timestamp: std::time::SystemTime::now() timestamp: std::time::SystemTime::now()

View file

@ -5,6 +5,9 @@ use async_trait::async_trait;
pub struct ChannelMessage { pub struct ChannelMessage {
pub id: String, pub id: String,
pub sender: String, pub sender: String,
/// Channel-specific reply address (e.g. Telegram chat_id, Discord channel_id, Slack channel).
/// Used by `Channel::send()` to route the reply to the correct destination.
pub reply_to: String,
pub content: String, pub content: String,
pub channel: String, pub channel: String,
pub timestamp: u64, pub timestamp: u64,
@ -62,6 +65,7 @@ mod tests {
tx.send(ChannelMessage { tx.send(ChannelMessage {
id: "1".into(), id: "1".into(),
sender: "tester".into(), sender: "tester".into(),
reply_to: "tester".into(),
content: "hello".into(), content: "hello".into(),
channel: "dummy".into(), channel: "dummy".into(),
timestamp: 123, timestamp: 123,
@ -76,6 +80,7 @@ mod tests {
let message = ChannelMessage { let message = ChannelMessage {
id: "42".into(), id: "42".into(),
sender: "alice".into(), sender: "alice".into(),
reply_to: "alice".into(),
content: "ping".into(), content: "ping".into(),
channel: "dummy".into(), channel: "dummy".into(),
timestamp: 999, timestamp: 999,

View file

@ -119,7 +119,8 @@ impl WhatsAppChannel {
messages.push(ChannelMessage { messages.push(ChannelMessage {
id: Uuid::new_v4().to_string(), id: Uuid::new_v4().to_string(),
sender: normalized_from, sender: normalized_from.clone(),
reply_to: normalized_from,
content, content,
channel: "whatsapp".to_string(), channel: "whatsapp".to_string(),
timestamp, timestamp,

View file

@ -709,7 +709,7 @@ async fn handle_whatsapp_message(
{ {
Ok(response) => { Ok(response) => {
// Send reply via WhatsApp // Send reply via WhatsApp
if let Err(e) = wa.send(&response, &msg.sender).await { if let Err(e) = wa.send(&response, &msg.reply_to).await {
tracing::error!("Failed to send WhatsApp reply: {e}"); tracing::error!("Failed to send WhatsApp reply: {e}");
} }
} }
@ -718,7 +718,7 @@ async fn handle_whatsapp_message(
let _ = wa let _ = wa
.send( .send(
"Sorry, I couldn't process your message right now.", "Sorry, I couldn't process your message right now.",
&msg.sender, &msg.reply_to,
) )
.await; .await;
} }
@ -860,6 +860,7 @@ mod tests {
let msg = ChannelMessage { let msg = ChannelMessage {
id: "wamid-123".into(), id: "wamid-123".into(),
sender: "+1234567890".into(), sender: "+1234567890".into(),
reply_to: "+1234567890".into(),
content: "hello".into(), content: "hello".into(),
channel: "whatsapp".into(), channel: "whatsapp".into(),
timestamp: 1, timestamp: 1,