diff --git a/examples/custom_channel.rs b/examples/custom_channel.rs index dd3fdf8..790762d 100644 --- a/examples/custom_channel.rs +++ b/examples/custom_channel.rs @@ -12,6 +12,8 @@ use tokio::sync::mpsc; pub struct ChannelMessage { pub id: String, pub sender: String, + /// Channel-specific reply address (e.g. Telegram chat_id, Discord channel_id). + pub reply_to: String, pub content: String, pub channel: String, pub timestamp: u64, @@ -90,9 +92,12 @@ impl Channel for TelegramChannel { continue; } + let chat_id = msg["chat"]["id"].to_string(); + let channel_msg = ChannelMessage { id: msg["message_id"].to_string(), sender, + reply_to: chat_id, content: msg["text"].as_str().unwrap_or("").to_string(), channel: "telegram".into(), timestamp: msg["date"].as_u64().unwrap_or(0), diff --git a/src/channels/cli.rs b/src/channels/cli.rs index 8b414fd..8e070dd 100644 --- a/src/channels/cli.rs +++ b/src/channels/cli.rs @@ -40,6 +40,7 @@ impl Channel for CliChannel { let msg = ChannelMessage { id: Uuid::new_v4().to_string(), sender: "user".to_string(), + reply_to: "user".to_string(), content: line, channel: "cli".to_string(), timestamp: std::time::SystemTime::now() @@ -90,6 +91,7 @@ mod tests { let msg = ChannelMessage { id: "test-id".into(), sender: "user".into(), + reply_to: "user".into(), content: "hello".into(), channel: "cli".into(), timestamp: 1_234_567_890, @@ -106,6 +108,7 @@ mod tests { let msg = ChannelMessage { id: "id".into(), sender: "s".into(), + reply_to: "s".into(), content: "c".into(), channel: "ch".into(), timestamp: 0, diff --git a/src/channels/dingtalk.rs b/src/channels/dingtalk.rs index f55135a..1cb985d 100644 --- a/src/channels/dingtalk.rs +++ b/src/channels/dingtalk.rs @@ -229,6 +229,7 @@ impl Channel for DingTalkChannel { let channel_msg = ChannelMessage { id: Uuid::new_v4().to_string(), sender: sender_id.to_string(), + reply_to: sender_id.to_string(), content: content.to_string(), channel: "dingtalk".to_string(), timestamp: std::time::SystemTime::now() diff --git a/src/channels/discord.rs b/src/channels/discord.rs index 71b9892..1f9993d 100644 --- a/src/channels/discord.rs +++ b/src/channels/discord.rs @@ -353,6 +353,7 @@ impl Channel for DiscordChannel { format!("discord_{message_id}") }, sender: author_id.to_string(), + reply_to: channel_id.clone(), content: content.to_string(), channel: "discord".to_string(), timestamp: std::time::SystemTime::now() diff --git a/src/channels/email_channel.rs b/src/channels/email_channel.rs index 2cb5db8..bce6618 100644 --- a/src/channels/email_channel.rs +++ b/src/channels/email_channel.rs @@ -428,7 +428,8 @@ impl Channel for EmailChannel { } // MutexGuard dropped before await let msg = ChannelMessage { id, - sender, + sender: sender.clone(), + reply_to: sender, content, channel: "email".to_string(), timestamp: ts, diff --git a/src/channels/imessage.rs b/src/channels/imessage.rs index f001c56..f4fcd62 100644 --- a/src/channels/imessage.rs +++ b/src/channels/imessage.rs @@ -172,6 +172,7 @@ end tell"# let msg = ChannelMessage { id: rowid.to_string(), sender: sender.clone(), + reply_to: sender.clone(), content: text, channel: "imessage".to_string(), timestamp: std::time::SystemTime::now() diff --git a/src/channels/irc.rs b/src/channels/irc.rs index 41c7d05..1221234 100644 --- a/src/channels/irc.rs +++ b/src/channels/irc.rs @@ -565,7 +565,8 @@ impl Channel for IrcChannel { let seq = MSG_SEQ.fetch_add(1, Ordering::Relaxed); let channel_msg = ChannelMessage { id: format!("irc_{}_{seq}", chrono::Utc::now().timestamp_millis()), - sender: reply_to, + sender: reply_to.clone(), + reply_to, content, channel: "irc".to_string(), timestamp: std::time::SystemTime::now() diff --git a/src/channels/lark.rs b/src/channels/lark.rs index 5e61cbd..4e3ad9f 100644 --- a/src/channels/lark.rs +++ b/src/channels/lark.rs @@ -613,6 +613,7 @@ impl LarkChannel { messages.push(ChannelMessage { id: Uuid::new_v4().to_string(), sender: chat_id.to_string(), + reply_to: chat_id.to_string(), content: text, channel: "lark".to_string(), timestamp, diff --git a/src/channels/matrix.rs b/src/channels/matrix.rs index 9f8924c..dceb2ee 100644 --- a/src/channels/matrix.rs +++ b/src/channels/matrix.rs @@ -230,6 +230,7 @@ impl Channel for MatrixChannel { let msg = ChannelMessage { id: format!("mx_{}", chrono::Utc::now().timestamp_millis()), sender: event.sender.clone(), + reply_to: event.sender.clone(), content: body.clone(), channel: "matrix".to_string(), timestamp: std::time::SystemTime::now() diff --git a/src/channels/mod.rs b/src/channels/mod.rs index bf8c543..6c21fe8 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -171,7 +171,7 @@ async fn process_channel_message(ctx: Arc, msg: traits::C let target_channel = ctx.channels_by_name.get(&msg.channel).cloned(); if let Some(channel) = target_channel.as_ref() { - if let Err(e) = channel.start_typing(&msg.sender).await { + if let Err(e) = channel.start_typing(&msg.reply_to).await { tracing::debug!("Failed to start typing on {}: {e}", channel.name()); } } @@ -200,7 +200,7 @@ async fn process_channel_message(ctx: Arc, msg: traits::C .await; if let Some(channel) = target_channel.as_ref() { - if let Err(e) = channel.stop_typing(&msg.sender).await { + if let Err(e) = channel.stop_typing(&msg.reply_to).await { tracing::debug!("Failed to stop typing on {}: {e}", channel.name()); } } @@ -213,7 +213,7 @@ async fn process_channel_message(ctx: Arc, msg: traits::C truncate_with_ellipsis(&response, 80) ); if let Some(channel) = target_channel.as_ref() { - if let Err(e) = channel.send(&response, &msg.sender).await { + if let Err(e) = channel.send(&response, &msg.reply_to).await { eprintln!(" ❌ Failed to reply on {}: {e}", channel.name()); } } @@ -224,7 +224,7 @@ async fn process_channel_message(ctx: Arc, msg: traits::C started_at.elapsed().as_millis() ); if let Some(channel) = target_channel.as_ref() { - let _ = channel.send(&format!("⚠️ Error: {e}"), &msg.sender).await; + let _ = channel.send(&format!("⚠️ Error: {e}"), &msg.reply_to).await; } } Err(_) => { @@ -241,7 +241,7 @@ async fn process_channel_message(ctx: Arc, msg: traits::C let _ = channel .send( "⚠️ Request timed out while waiting for the model. Please try again.", - &msg.sender, + &msg.reply_to, ) .await; } @@ -1232,6 +1232,7 @@ mod tests { traits::ChannelMessage { id: "msg-1".to_string(), sender: "alice".to_string(), + reply_to: "alice".to_string(), content: "What is the BTC price now?".to_string(), channel: "test-channel".to_string(), timestamp: 1, @@ -1321,6 +1322,7 @@ mod tests { tx.send(traits::ChannelMessage { id: "1".to_string(), sender: "alice".to_string(), + reply_to: "alice".to_string(), content: "hello".to_string(), channel: "test-channel".to_string(), timestamp: 1, @@ -1330,6 +1332,7 @@ mod tests { tx.send(traits::ChannelMessage { id: "2".to_string(), sender: "bob".to_string(), + reply_to: "bob".to_string(), content: "world".to_string(), channel: "test-channel".to_string(), timestamp: 2, @@ -1573,6 +1576,7 @@ mod tests { let msg = traits::ChannelMessage { id: "msg_abc123".into(), sender: "U123".into(), + reply_to: "U123".into(), content: "hello".into(), channel: "slack".into(), timestamp: 1, @@ -1586,6 +1590,7 @@ mod tests { let msg1 = traits::ChannelMessage { id: "msg_1".into(), sender: "U123".into(), + reply_to: "U123".into(), content: "first".into(), channel: "slack".into(), timestamp: 1, @@ -1593,6 +1598,7 @@ mod tests { let msg2 = traits::ChannelMessage { id: "msg_2".into(), sender: "U123".into(), + reply_to: "U123".into(), content: "second".into(), channel: "slack".into(), timestamp: 2, @@ -1612,6 +1618,7 @@ mod tests { let msg1 = traits::ChannelMessage { id: "msg_1".into(), sender: "U123".into(), + reply_to: "U123".into(), content: "I'm Paul".into(), channel: "slack".into(), timestamp: 1, @@ -1619,6 +1626,7 @@ mod tests { let msg2 = traits::ChannelMessage { id: "msg_2".into(), sender: "U123".into(), + reply_to: "U123".into(), content: "I'm 45".into(), channel: "slack".into(), timestamp: 2, diff --git a/src/channels/slack.rs b/src/channels/slack.rs index fd6b2f0..24632f3 100644 --- a/src/channels/slack.rs +++ b/src/channels/slack.rs @@ -161,6 +161,7 @@ impl Channel for SlackChannel { let channel_msg = ChannelMessage { id: format!("slack_{channel_id}_{ts}"), sender: user.to_string(), + reply_to: channel_id.to_string(), content: text.to_string(), channel: "slack".to_string(), timestamp: std::time::SystemTime::now() diff --git a/src/channels/telegram.rs b/src/channels/telegram.rs index bfe8dd6..01f0b98 100644 --- a/src/channels/telegram.rs +++ b/src/channels/telegram.rs @@ -598,6 +598,7 @@ Allowlist Telegram @username or numeric user ID, then run `zeroclaw onboard --ch let msg = ChannelMessage { id: format!("telegram_{chat_id}_{message_id}"), sender: username.to_string(), + reply_to: chat_id.clone(), content: text.to_string(), channel: "telegram".to_string(), timestamp: std::time::SystemTime::now() diff --git a/src/channels/traits.rs b/src/channels/traits.rs index 59b361e..c41442e 100644 --- a/src/channels/traits.rs +++ b/src/channels/traits.rs @@ -5,6 +5,9 @@ use async_trait::async_trait; pub struct ChannelMessage { pub id: String, pub sender: String, + /// Channel-specific reply address (e.g. Telegram chat_id, Discord channel_id, Slack channel). + /// Used by `Channel::send()` to route the reply to the correct destination. + pub reply_to: String, pub content: String, pub channel: String, pub timestamp: u64, @@ -62,6 +65,7 @@ mod tests { tx.send(ChannelMessage { id: "1".into(), sender: "tester".into(), + reply_to: "tester".into(), content: "hello".into(), channel: "dummy".into(), timestamp: 123, @@ -76,6 +80,7 @@ mod tests { let message = ChannelMessage { id: "42".into(), sender: "alice".into(), + reply_to: "alice".into(), content: "ping".into(), channel: "dummy".into(), timestamp: 999, diff --git a/src/channels/whatsapp.rs b/src/channels/whatsapp.rs index feda26d..de8230a 100644 --- a/src/channels/whatsapp.rs +++ b/src/channels/whatsapp.rs @@ -119,7 +119,8 @@ impl WhatsAppChannel { messages.push(ChannelMessage { id: Uuid::new_v4().to_string(), - sender: normalized_from, + sender: normalized_from.clone(), + reply_to: normalized_from, content, channel: "whatsapp".to_string(), timestamp, diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index fc13b95..6301015 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -709,7 +709,7 @@ async fn handle_whatsapp_message( { Ok(response) => { // Send reply via WhatsApp - if let Err(e) = wa.send(&response, &msg.sender).await { + if let Err(e) = wa.send(&response, &msg.reply_to).await { tracing::error!("Failed to send WhatsApp reply: {e}"); } } @@ -718,7 +718,7 @@ async fn handle_whatsapp_message( let _ = wa .send( "Sorry, I couldn't process your message right now.", - &msg.sender, + &msg.reply_to, ) .await; } @@ -860,6 +860,7 @@ mod tests { let msg = ChannelMessage { id: "wamid-123".into(), sender: "+1234567890".into(), + reply_to: "+1234567890".into(), content: "hello".into(), channel: "whatsapp".into(), timestamp: 1,