From 4b89e91a5a508da179e18cd01c3705f2fde8c0fb Mon Sep 17 00:00:00 2001 From: JamesYin Date: Tue, 17 Feb 2026 21:17:33 +0800 Subject: [PATCH] fix(dingtalk,daemon): process stream callbacks and supervise DingTalk channel Include DingTalk in daemon supervised channel detection so the listener starts in daemon mode. Handle CALLBACK stream frames, subscribe to bot message topic, and improve session webhook routing for private/group replies. Add regression tests for supervised-channel detection and DingTalk payload/chat-id parsing. --- src/channels/dingtalk.rs | 117 ++++++++++++++++++++++++++------------- src/daemon/mod.rs | 11 ++++ 2 files changed, 88 insertions(+), 40 deletions(-) diff --git a/src/channels/dingtalk.rs b/src/channels/dingtalk.rs index c32db17..8e8f2a5 100644 --- a/src/channels/dingtalk.rs +++ b/src/channels/dingtalk.rs @@ -7,7 +7,9 @@ use tokio::sync::RwLock; use tokio_tungstenite::tungstenite::Message; use uuid::Uuid; -/// DingTalk channel — connects via Stream Mode WebSocket for real-time messages. +const DINGTALK_BOT_CALLBACK_TOPIC: &str = "/v1.0/im/bot/messages/get"; + +/// DingTalk (钉钉) channel — connects via Stream Mode WebSocket for real-time messages. /// Replies are sent through per-message session webhook URLs. pub struct DingTalkChannel { client_id: String, @@ -41,11 +43,46 @@ impl DingTalkChannel { self.allowed_users.iter().any(|u| u == "*" || u == user_id) } + fn parse_stream_data(frame: &serde_json::Value) -> Option { + match frame.get("data") { + Some(serde_json::Value::String(raw)) => serde_json::from_str(raw).ok(), + Some(serde_json::Value::Object(_)) => frame.get("data").cloned(), + _ => None, + } + } + + fn resolve_chat_id(data: &serde_json::Value, sender_id: &str) -> String { + let is_private_chat = data + .get("conversationType") + .and_then(|value| { + value + .as_str() + .map(|v| v == "1") + .or_else(|| value.as_i64().map(|v| v == 1)) + }) + .unwrap_or(true); + + if is_private_chat { + sender_id.to_string() + } else { + data.get("conversationId") + .and_then(|c| c.as_str()) + .unwrap_or(sender_id) + .to_string() + } + } + /// Register a connection with DingTalk's gateway to get a WebSocket endpoint. async fn register_connection(&self) -> anyhow::Result { let body = serde_json::json!({ "clientId": self.client_id, "clientSecret": self.client_secret, + "subscriptions": [ + { + "type": "CALLBACK", + "topic": DINGTALK_BOT_CALLBACK_TOPIC, + } + ], }); let resp = self @@ -65,17 +102,6 @@ impl DingTalkChannel { Ok(gw) } - fn resolve_reply_target( - sender_id: &str, - conversation_type: &str, - conversation_id: Option<&str>, - ) -> String { - if conversation_type == "1" { - sender_id.to_string() - } else { - conversation_id.unwrap_or(sender_id).to_string() - } - } } #[async_trait] @@ -168,13 +194,14 @@ impl Channel for DingTalkChannel { break; } } - "EVENT" => { - // Parse the chatbot callback data from the event - let data_str = frame.get("data").and_then(|d| d.as_str()).unwrap_or("{}"); - - let data: serde_json::Value = match serde_json::from_str(data_str) { - Ok(v) => v, - Err(_) => continue, + "EVENT" | "CALLBACK" => { + // Parse the chatbot callback data from the frame. + let data = match Self::parse_stream_data(&frame) { + Some(v) => v, + None => { + tracing::debug!("DingTalk: frame has no parseable data payload"); + continue; + } }; // Extract message content @@ -201,22 +228,16 @@ impl Channel for DingTalkChannel { continue; } - let conversation_type = data - .get("conversationType") - .and_then(|c| c.as_str()) - .unwrap_or("1"); - - // Private chat uses sender ID, group chat uses conversation ID - let chat_id = Self::resolve_reply_target( - sender_id, - conversation_type, - data.get("conversationId").and_then(|c| c.as_str()), - ); + // Private chat uses sender ID, group chat uses conversation ID. + let chat_id = Self::resolve_chat_id(&data, sender_id); // Store session webhook for later replies if let Some(webhook) = data.get("sessionWebhook").and_then(|w| w.as_str()) { + let webhook = webhook.to_string(); let mut webhooks = self.session_webhooks.write().await; - webhooks.insert(chat_id.clone(), webhook.to_string()); + // Use both keys so reply routing works for both group and private flows. + webhooks.insert(chat_id.clone(), webhook.clone()); + webhooks.insert(sender_id.to_string(), webhook); } // Acknowledge the event @@ -319,20 +340,36 @@ client_secret = "secret" } #[test] - fn test_resolve_reply_target_private_chat_uses_sender_id() { - let target = DingTalkChannel::resolve_reply_target("staff_1", "1", Some("conv_1")); - assert_eq!(target, "staff_1"); + fn parse_stream_data_supports_string_payload() { + let frame = serde_json::json!({ + "data": "{\"text\":{\"content\":\"hello\"}}" + }); + let parsed = DingTalkChannel::parse_stream_data(&frame).unwrap(); + assert_eq!( + parsed.get("text").and_then(|v| v.get("content")), + Some(&serde_json::json!("hello")) + ); } #[test] - fn test_resolve_reply_target_group_chat_uses_conversation_id() { - let target = DingTalkChannel::resolve_reply_target("staff_1", "2", Some("conv_1")); - assert_eq!(target, "conv_1"); + fn parse_stream_data_supports_object_payload() { + let frame = serde_json::json!({ + "data": {"text": {"content": "hello"}} + }); + let parsed = DingTalkChannel::parse_stream_data(&frame).unwrap(); + assert_eq!( + parsed.get("text").and_then(|v| v.get("content")), + Some(&serde_json::json!("hello")) + ); } #[test] - fn test_resolve_reply_target_group_chat_falls_back_to_sender_id() { - let target = DingTalkChannel::resolve_reply_target("staff_1", "2", None); - assert_eq!(target, "staff_1"); + fn resolve_chat_id_handles_numeric_group_conversation_type() { + let data = serde_json::json!({ + "conversationType": 2, + "conversationId": "cid-group", + }); + let chat_id = DingTalkChannel::resolve_chat_id(&data, "staff-1"); + assert_eq!(chat_id, "cid-group"); } } diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index bcd5a66..c60cd2d 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -299,4 +299,15 @@ mod tests { }); assert!(has_supervised_channels(&config)); } + + #[test] + fn detects_dingtalk_as_supervised_channel() { + let mut config = Config::default(); + config.channels_config.dingtalk = Some(crate::config::schema::DingTalkConfig { + client_id: "client_id".into(), + client_secret: "client_secret".into(), + allowed_users: vec!["*".into()], + }); + assert!(has_supervised_channels(&config)); + } }