From 9afe4f28e76047335cdc14ba7f9472a53404d1de Mon Sep 17 00:00:00 2001 From: Daniel Willitzer Date: Thu, 19 Feb 2026 00:42:29 -0800 Subject: [PATCH] feat(channels): add threading support to message channels Add optional thread_ts field to ChannelMessage and SendMessage for platform-specific threading (e.g. Slack threads, Discord threads). - ChannelMessage.thread_ts captures incoming thread context - SendMessage.thread_ts propagates thread context to replies - SendMessage::in_thread() builder for fluent API - Slack: send with thread_ts, capture ts from incoming messages - All reply paths in runtime preserve thread context via in_thread() - All other channels initialize thread_ts: None (forward-compatible) Co-Authored-By: Claude Opus 4.6 --- src/channels/cli.rs | 3 +++ src/channels/dingtalk.rs | 1 + src/channels/discord.rs | 1 + src/channels/email_channel.rs | 1 + src/channels/imessage.rs | 1 + src/channels/irc.rs | 1 + src/channels/lark.rs | 2 ++ src/channels/linq.rs | 1 + src/channels/matrix.rs | 1 + src/channels/mattermost.rs | 1 + src/channels/mod.rs | 30 +++++++++++++++++++++++------- src/channels/qq.rs | 2 ++ src/channels/signal.rs | 1 + src/channels/slack.rs | 7 ++++++- src/channels/telegram.rs | 1 + src/channels/traits.rs | 15 +++++++++++++++ src/channels/whatsapp.rs | 1 + src/gateway/mod.rs | 1 + 18 files changed, 63 insertions(+), 8 deletions(-) diff --git a/src/channels/cli.rs b/src/channels/cli.rs index ae49548..0adbeb6 100644 --- a/src/channels/cli.rs +++ b/src/channels/cli.rs @@ -47,6 +47,7 @@ impl Channel for CliChannel { .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(), + thread_ts: None, }; if tx.send(msg).await.is_err() { @@ -107,6 +108,7 @@ mod tests { content: "hello".into(), channel: "cli".into(), timestamp: 1_234_567_890, + thread_ts: None, }; assert_eq!(msg.id, "test-id"); assert_eq!(msg.sender, "user"); @@ -125,6 +127,7 @@ mod tests { content: "c".into(), channel: "ch".into(), timestamp: 0, + thread_ts: None, }; let cloned = msg.clone(); assert_eq!(cloned.id, msg.id); diff --git a/src/channels/dingtalk.rs b/src/channels/dingtalk.rs index ed9c9aa..54a8097 100644 --- a/src/channels/dingtalk.rs +++ b/src/channels/dingtalk.rs @@ -274,6 +274,7 @@ impl Channel for DingTalkChannel { .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(), + thread_ts: None, }; if tx.send(channel_msg).await.is_err() { diff --git a/src/channels/discord.rs b/src/channels/discord.rs index d7a4d20..de31cd0 100644 --- a/src/channels/discord.rs +++ b/src/channels/discord.rs @@ -413,6 +413,7 @@ impl Channel for DiscordChannel { .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(), + thread_ts: None, }; if tx.send(channel_msg).await.is_err() { diff --git a/src/channels/email_channel.rs b/src/channels/email_channel.rs index 7a60efd..506f39e 100644 --- a/src/channels/email_channel.rs +++ b/src/channels/email_channel.rs @@ -443,6 +443,7 @@ impl EmailChannel { content: email.content, channel: "email".to_string(), timestamp: email.timestamp, + thread_ts: None, }; if tx.send(msg).await.is_err() { diff --git a/src/channels/imessage.rs b/src/channels/imessage.rs index 9675d15..4e51786 100644 --- a/src/channels/imessage.rs +++ b/src/channels/imessage.rs @@ -231,6 +231,7 @@ end tell"# .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(), + thread_ts: None, }; if tx.send(msg).await.is_err() { diff --git a/src/channels/irc.rs b/src/channels/irc.rs index 96a2e6a..86a53a6 100644 --- a/src/channels/irc.rs +++ b/src/channels/irc.rs @@ -574,6 +574,7 @@ impl Channel for IrcChannel { .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(), + thread_ts: None, }; if tx.send(channel_msg).await.is_err() { diff --git a/src/channels/lark.rs b/src/channels/lark.rs index 032ba5f..40b2569 100644 --- a/src/channels/lark.rs +++ b/src/channels/lark.rs @@ -474,6 +474,7 @@ impl LarkChannel { .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(), + thread_ts: None, }; tracing::debug!("Lark WS: message in {}", lark_msg.chat_id); @@ -635,6 +636,7 @@ impl LarkChannel { content: text, channel: "lark".to_string(), timestamp, + thread_ts: None, }); messages diff --git a/src/channels/linq.rs b/src/channels/linq.rs index 42d69af..d3361c9 100644 --- a/src/channels/linq.rs +++ b/src/channels/linq.rs @@ -178,6 +178,7 @@ impl LinqChannel { content, channel: "linq".to_string(), timestamp, + thread_ts: None, }); messages diff --git a/src/channels/matrix.rs b/src/channels/matrix.rs index 5d0c207..5c61506 100644 --- a/src/channels/matrix.rs +++ b/src/channels/matrix.rs @@ -596,6 +596,7 @@ impl Channel for MatrixChannel { .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(), + thread_ts: None, }; let _ = tx.send(msg).await; diff --git a/src/channels/mattermost.rs b/src/channels/mattermost.rs index 95461de..55ecdbb 100644 --- a/src/channels/mattermost.rs +++ b/src/channels/mattermost.rs @@ -321,6 +321,7 @@ impl MattermostChannel { channel: "mattermost".to_string(), #[allow(clippy::cast_sign_loss)] timestamp: (create_at / 1000) as u64, + thread_ts: None, }) } } diff --git a/src/channels/mod.rs b/src/channels/mod.rs index ada5803..374da9f 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -442,7 +442,7 @@ async fn handle_runtime_command_if_needed( }; if let Err(err) = channel - .send(&SendMessage::new(response, &msg.reply_target)) + .send(&SendMessage::new(response, &msg.reply_target).in_thread(msg.thread_ts.clone())) .await { tracing::warn!( @@ -592,7 +592,7 @@ async fn process_channel_message(ctx: Arc, msg: traits::C ); if let Some(channel) = target_channel.as_ref() { let _ = channel - .send(&SendMessage::new(message, &msg.reply_target)) + .send(&SendMessage::new(message, &msg.reply_target).in_thread(msg.thread_ts.clone())) .await; } return; @@ -658,7 +658,7 @@ async fn process_channel_message(ctx: Arc, msg: traits::C let draft_message_id = if use_streaming { if let Some(channel) = target_channel.as_ref() { match channel - .send_draft(&SendMessage::new("...", &msg.reply_target)) + .send_draft(&SendMessage::new("...", &msg.reply_target).in_thread(msg.thread_ts.clone())) .await { Ok(id) => id, @@ -769,11 +769,11 @@ async fn process_channel_message(ctx: Arc, msg: traits::C { tracing::warn!("Failed to finalize draft: {e}; sending as new message"); let _ = channel - .send(&SendMessage::new(&response, &msg.reply_target)) + .send(&SendMessage::new(&response, &msg.reply_target).in_thread(msg.thread_ts.clone())) .await; } } else if let Err(e) = channel - .send(&SendMessage::new(response, &msg.reply_target)) + .send(&SendMessage::new(response, &msg.reply_target).in_thread(msg.thread_ts.clone())) .await { eprintln!(" ❌ Failed to reply on {}: {e}", channel.name()); @@ -795,7 +795,7 @@ async fn process_channel_message(ctx: Arc, msg: traits::C .send(&SendMessage::new( format!("⚠️ Error: {e}"), &msg.reply_target, - )) + ).in_thread(msg.thread_ts.clone())) .await; } } @@ -816,7 +816,7 @@ async fn process_channel_message(ctx: Arc, msg: traits::C .await; } else { let _ = channel - .send(&SendMessage::new(error_text, &msg.reply_target)) + .send(&SendMessage::new(error_text, &msg.reply_target).in_thread(msg.thread_ts.clone())) .await; } } @@ -2350,6 +2350,7 @@ mod tests { content: "What is the BTC price now?".to_string(), channel: "test-channel".to_string(), timestamp: 1, + thread_ts: None, }, ) .await; @@ -2403,6 +2404,7 @@ mod tests { content: "What is the BTC price now?".to_string(), channel: "test-channel".to_string(), timestamp: 2, + thread_ts: None, }, ) .await; @@ -2465,6 +2467,7 @@ mod tests { content: "/models openrouter".to_string(), channel: "telegram".to_string(), timestamp: 1, + thread_ts: None, }, ) .await; @@ -2548,6 +2551,7 @@ mod tests { content: "hello routed provider".to_string(), channel: "telegram".to_string(), timestamp: 2, + thread_ts: None, }, ) .await; @@ -2607,6 +2611,7 @@ mod tests { content: "Loop until done".to_string(), channel: "test-channel".to_string(), timestamp: 1, + thread_ts: None, }, ) .await; @@ -2661,6 +2666,7 @@ mod tests { content: "Loop forever".to_string(), channel: "test-channel".to_string(), timestamp: 2, + thread_ts: None, }, ) .await; @@ -2765,6 +2771,7 @@ mod tests { content: "hello".to_string(), channel: "test-channel".to_string(), timestamp: 1, + thread_ts: None, }) .await .unwrap(); @@ -2775,6 +2782,7 @@ mod tests { content: "world".to_string(), channel: "test-channel".to_string(), timestamp: 2, + thread_ts: None, }) .await .unwrap(); @@ -2837,6 +2845,7 @@ mod tests { content: "hello".to_string(), channel: "test-channel".to_string(), timestamp: 1, + thread_ts: None, }, ) .await; @@ -3097,6 +3106,7 @@ mod tests { content: "hello".into(), channel: "slack".into(), timestamp: 1, + thread_ts: None, }; assert_eq!(conversation_memory_key(&msg), "slack_U123_msg_abc123"); @@ -3111,6 +3121,7 @@ mod tests { content: "first".into(), channel: "slack".into(), timestamp: 1, + thread_ts: None, }; let msg2 = traits::ChannelMessage { id: "msg_2".into(), @@ -3119,6 +3130,7 @@ mod tests { content: "second".into(), channel: "slack".into(), timestamp: 2, + thread_ts: None, }; assert_ne!( @@ -3139,6 +3151,7 @@ mod tests { content: "I'm Paul".into(), channel: "slack".into(), timestamp: 1, + thread_ts: None, }; let msg2 = traits::ChannelMessage { id: "msg_2".into(), @@ -3147,6 +3160,7 @@ mod tests { content: "I'm 45".into(), channel: "slack".into(), timestamp: 2, + thread_ts: None, }; mem.store( @@ -3228,6 +3242,7 @@ mod tests { content: "hello".to_string(), channel: "test-channel".to_string(), timestamp: 1, + thread_ts: None, }, ) .await; @@ -3241,6 +3256,7 @@ mod tests { content: "follow up".to_string(), channel: "test-channel".to_string(), timestamp: 2, + thread_ts: None, }, ) .await; diff --git a/src/channels/qq.rs b/src/channels/qq.rs index 582c7f6..e0231f8 100644 --- a/src/channels/qq.rs +++ b/src/channels/qq.rs @@ -377,6 +377,7 @@ impl Channel for QQChannel { .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(), + thread_ts: None, }; if tx.send(channel_msg).await.is_err() { @@ -415,6 +416,7 @@ impl Channel for QQChannel { .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(), + thread_ts: None, }; if tx.send(channel_msg).await.is_err() { diff --git a/src/channels/signal.rs b/src/channels/signal.rs index 78af8ff..20cacfc 100644 --- a/src/channels/signal.rs +++ b/src/channels/signal.rs @@ -265,6 +265,7 @@ impl SignalChannel { content: text.to_string(), channel: "signal".to_string(), timestamp: timestamp / 1000, // millis → secs + thread_ts: None, }) } } diff --git a/src/channels/slack.rs b/src/channels/slack.rs index 13d1273..f03c693 100644 --- a/src/channels/slack.rs +++ b/src/channels/slack.rs @@ -54,11 +54,15 @@ impl Channel for SlackChannel { } async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { - let body = serde_json::json!({ + let mut body = serde_json::json!({ "channel": message.recipient, "text": message.content }); + if let Some(ref ts) = message.thread_ts { + body["thread_ts"] = serde_json::json!(ts); + } + let resp = self .http_client() .post("https://slack.com/api/chat.postMessage") @@ -170,6 +174,7 @@ impl Channel for SlackChannel { .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(), + thread_ts: Some(ts.to_string()), }; if tx.send(channel_msg).await.is_err() { diff --git a/src/channels/telegram.rs b/src/channels/telegram.rs index dcd5b7d..d4b6601 100644 --- a/src/channels/telegram.rs +++ b/src/channels/telegram.rs @@ -819,6 +819,7 @@ Allowlist Telegram username (without '@') or numeric user ID.", .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(), + thread_ts: None, }) } diff --git a/src/channels/traits.rs b/src/channels/traits.rs index 3a7d9df..1f072ca 100644 --- a/src/channels/traits.rs +++ b/src/channels/traits.rs @@ -9,6 +9,9 @@ pub struct ChannelMessage { pub content: String, pub channel: String, pub timestamp: u64, + /// Platform thread identifier (e.g. Slack `ts`, Discord thread ID). + /// When set, replies should be posted as threaded responses. + pub thread_ts: Option, } /// Message to send through a channel @@ -17,6 +20,8 @@ pub struct SendMessage { pub content: String, pub recipient: String, pub subject: Option, + /// Platform thread identifier for threaded replies (e.g. Slack `thread_ts`). + pub thread_ts: Option, } impl SendMessage { @@ -26,6 +31,7 @@ impl SendMessage { content: content.into(), recipient: recipient.into(), subject: None, + thread_ts: None, } } @@ -39,8 +45,15 @@ impl SendMessage { content: content.into(), recipient: recipient.into(), subject: Some(subject.into()), + thread_ts: None, } } + + /// Set the thread identifier for threaded replies. + pub fn in_thread(mut self, thread_ts: Option) -> Self { + self.thread_ts = thread_ts; + self + } } /// Core channel trait — implement for any messaging platform @@ -129,6 +142,7 @@ mod tests { content: "hello".into(), channel: "dummy".into(), timestamp: 123, + thread_ts: None, }) .await .map_err(|e| anyhow::anyhow!(e.to_string())) @@ -144,6 +158,7 @@ mod tests { content: "ping".into(), channel: "dummy".into(), timestamp: 999, + thread_ts: None, }; let cloned = message.clone(); diff --git a/src/channels/whatsapp.rs b/src/channels/whatsapp.rs index 20db157..c447d67 100644 --- a/src/channels/whatsapp.rs +++ b/src/channels/whatsapp.rs @@ -140,6 +140,7 @@ impl WhatsAppChannel { content, channel: "whatsapp".to_string(), timestamp, + thread_ts: None, }); } } diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index b28ed1a..36293c2 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -1450,6 +1450,7 @@ mod tests { content: "hello".into(), channel: "whatsapp".into(), timestamp: 1, + thread_ts: None, }; let key = whatsapp_memory_key(&msg);