diff --git a/src/channels/mattermost.rs b/src/channels/mattermost.rs index a10cd72..6b5fdda 100644 --- a/src/channels/mattermost.rs +++ b/src/channels/mattermost.rs @@ -9,7 +9,12 @@ pub struct MattermostChannel { bot_token: String, channel_id: Option, allowed_users: Vec, + /// When true, replies thread on the original post's root_id. + /// When false (default), replies go to the channel root. + thread_replies: bool, client: reqwest::Client, + /// Handle for the background typing-indicator loop (aborted on stop_typing). + typing_handle: std::sync::Mutex>>, } impl MattermostChannel { @@ -18,6 +23,7 @@ impl MattermostChannel { bot_token: String, channel_id: Option, allowed_users: Vec, + thread_replies: bool, ) -> Self { // Ensure base_url doesn't have a trailing slash for consistent path joining let base_url = base_url.trim_end_matches('/').to_string(); @@ -26,7 +32,9 @@ impl MattermostChannel { bot_token, channel_id, allowed_users, + thread_replies, client: reqwest::Client::new(), + typing_handle: std::sync::Mutex::new(None), } } @@ -177,6 +185,61 @@ impl Channel for MattermostChannel { .map(|r| r.status().is_success()) .unwrap_or(false) } + + async fn start_typing(&self, recipient: &str) -> Result<()> { + // Cancel any existing typing loop before starting a new one. + self.stop_typing(recipient).await?; + + let client = self.client.clone(); + let token = self.bot_token.clone(); + let base_url = self.base_url.clone(); + + // recipient is "channel_id" or "channel_id:root_id" + let channel_id = recipient.split(':').next().unwrap_or(recipient).to_string(); + let parent_id = recipient.split(':').nth(1).map(String::from); + + let handle = tokio::spawn(async move { + let url = format!("{base_url}/api/v4/users/me/typing"); + loop { + let mut body = serde_json::json!({ "channel_id": channel_id }); + if let Some(ref pid) = parent_id { + body.as_object_mut() + .unwrap() + .insert("parent_id".to_string(), serde_json::json!(pid)); + } + + if let Ok(r) = client + .post(&url) + .bearer_auth(&token) + .json(&body) + .send() + .await + { + if !r.status().is_success() { + tracing::debug!(status = %r.status(), "Mattermost typing indicator failed"); + } + } + + // Mattermost typing events expire after ~6s; re-fire every 4s. + tokio::time::sleep(std::time::Duration::from_secs(4)).await; + } + }); + + if let Ok(mut guard) = self.typing_handle.lock() { + *guard = Some(handle); + } + + Ok(()) + } + + async fn stop_typing(&self, _recipient: &str) -> Result<()> { + if let Ok(mut guard) = self.typing_handle.lock() { + if let Some(handle) = guard.take() { + handle.abort(); + } + } + Ok(()) + } } impl MattermostChannel { @@ -202,15 +265,16 @@ impl MattermostChannel { return None; } - // If it's a thread, include root_id in reply_to so we reply in the same thread - let reply_target = if root_id.is_empty() { - // Or if it's a top-level message that WE want to start a thread on, - // the next reply will use THIS post's ID as root_id. - // But for now, we follow Mattermost's 'reply' convention where - // replying to a post uses its ID as root_id. + // Reply routing depends on thread_replies config: + // - Existing thread (root_id set): always stay in the thread. + // - Top-level post + thread_replies=true: thread on the original post. + // - Top-level post + thread_replies=false: reply at channel level. + let reply_target = if !root_id.is_empty() { + format!("{}:{}", channel_id, root_id) + } else if self.thread_replies { format!("{}:{}", channel_id, id) } else { - format!("{}:{}", channel_id, root_id) + channel_id.to_string() }; Some(ChannelMessage { @@ -237,19 +301,22 @@ mod tests { "token".into(), None, vec![], + false, ); assert_eq!(ch.base_url, "https://mm.example.com"); } #[test] fn mattermost_allowlist_wildcard() { - let ch = MattermostChannel::new("url".into(), "token".into(), None, vec!["*".into()]); + let ch = + MattermostChannel::new("url".into(), "token".into(), None, vec!["*".into()], false); assert!(ch.is_user_allowed("any-id")); } #[test] fn mattermost_parse_post_basic() { - let ch = MattermostChannel::new("url".into(), "token".into(), None, vec!["*".into()]); + let ch = + MattermostChannel::new("url".into(), "token".into(), None, vec!["*".into()], false); let post = json!({ "id": "post123", "user_id": "user456", @@ -263,12 +330,30 @@ mod tests { .unwrap(); assert_eq!(msg.sender, "user456"); assert_eq!(msg.content, "hello world"); - assert_eq!(msg.reply_target, "chan789:post123"); // Threads on the post + assert_eq!(msg.reply_target, "chan789"); // Channel-level reply (thread_replies=false) + } + + #[test] + fn mattermost_parse_post_thread_replies_enabled() { + let ch = MattermostChannel::new("url".into(), "token".into(), None, vec!["*".into()], true); + let post = json!({ + "id": "post123", + "user_id": "user456", + "message": "hello world", + "create_at": 1_600_000_000_000_i64, + "root_id": "" + }); + + let msg = ch + .parse_mattermost_post(&post, "bot123", 1_500_000_000_000_i64, "chan789") + .unwrap(); + assert_eq!(msg.reply_target, "chan789:post123"); // Threaded reply } #[test] fn mattermost_parse_post_thread() { - let ch = MattermostChannel::new("url".into(), "token".into(), None, vec!["*".into()]); + let ch = + MattermostChannel::new("url".into(), "token".into(), None, vec!["*".into()], false); let post = json!({ "id": "post123", "user_id": "user456", @@ -285,7 +370,8 @@ mod tests { #[test] fn mattermost_parse_post_ignore_self() { - let ch = MattermostChannel::new("url".into(), "token".into(), None, vec!["*".into()]); + let ch = + MattermostChannel::new("url".into(), "token".into(), None, vec!["*".into()], false); let post = json!({ "id": "post123", "user_id": "bot123", @@ -299,7 +385,8 @@ mod tests { #[test] fn mattermost_parse_post_ignore_old() { - let ch = MattermostChannel::new("url".into(), "token".into(), None, vec!["*".into()]); + let ch = + MattermostChannel::new("url".into(), "token".into(), None, vec!["*".into()], false); let post = json!({ "id": "post123", "user_id": "user456", @@ -310,4 +397,41 @@ mod tests { let msg = ch.parse_mattermost_post(&post, "bot123", 1_500_000_000_000_i64, "chan789"); assert!(msg.is_none()); } + + #[test] + fn mattermost_parse_post_no_thread_when_disabled() { + let ch = + MattermostChannel::new("url".into(), "token".into(), None, vec!["*".into()], false); + let post = json!({ + "id": "post123", + "user_id": "user456", + "message": "hello world", + "create_at": 1_600_000_000_000_i64, + "root_id": "" + }); + + let msg = ch + .parse_mattermost_post(&post, "bot123", 1_500_000_000_000_i64, "chan789") + .unwrap(); + assert_eq!(msg.reply_target, "chan789"); // No thread suffix + } + + #[test] + fn mattermost_existing_thread_always_threads() { + // Even with thread_replies=false, replies to existing threads stay in the thread + let ch = + MattermostChannel::new("url".into(), "token".into(), None, vec!["*".into()], false); + let post = json!({ + "id": "post123", + "user_id": "user456", + "message": "reply in thread", + "create_at": 1_600_000_000_000_i64, + "root_id": "root789" + }); + + let msg = ch + .parse_mattermost_post(&post, "bot123", 1_500_000_000_000_i64, "chan789") + .unwrap(); + assert_eq!(msg.reply_target, "chan789:root789"); // Stays in existing thread + } } diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 3424dde..0c43fd1 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -1261,6 +1261,7 @@ pub async fn start_channels(config: Config) -> Result<()> { mm.bot_token.clone(), mm.channel_id.clone(), mm.allowed_users.clone(), + mm.thread_replies.unwrap_or(false), ))); } diff --git a/src/config/schema.rs b/src/config/schema.rs index e341b32..33e2fe3 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -1499,6 +1499,10 @@ pub struct MattermostConfig { pub channel_id: Option, #[serde(default)] pub allowed_users: Vec, + /// When true, replies thread on the original post. When false (default), + /// replies go to the channel root. + #[serde(default)] + pub thread_replies: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/cron/scheduler.rs b/src/cron/scheduler.rs index 9762651..fb8f61d 100644 --- a/src/cron/scheduler.rs +++ b/src/cron/scheduler.rs @@ -300,6 +300,7 @@ async fn deliver_if_configured(config: &Config, job: &CronJob, output: &str) -> mm.bot_token.clone(), mm.channel_id.clone(), mm.allowed_users.clone(), + mm.thread_replies.unwrap_or(false), ); channel.send(&SendMessage::new(output, target)).await?; }