use super::traits::{Channel, ChannelMessage, SendMessage}; use async_trait::async_trait; use futures_util::{SinkExt, StreamExt}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; use tokio_tungstenite::tungstenite::Message; use uuid::Uuid; /// DingTalk channel — connects via Stream Mode WebSocket for real-time messages. /// Replies are sent through per-message session webhook URLs. pub struct DingTalkChannel { client_id: String, client_secret: String, allowed_users: Vec, client: reqwest::Client, /// Per-chat session webhooks for sending replies (chatID -> webhook URL). /// DingTalk provides a unique webhook URL with each incoming message. session_webhooks: Arc>>, } /// Response from DingTalk gateway connection registration. #[derive(serde::Deserialize)] struct GatewayResponse { endpoint: String, ticket: String, } impl DingTalkChannel { pub fn new(client_id: String, client_secret: String, allowed_users: Vec) -> Self { Self { client_id, client_secret, allowed_users, client: reqwest::Client::new(), session_webhooks: Arc::new(RwLock::new(HashMap::new())), } } fn is_user_allowed(&self, user_id: &str) -> bool { self.allowed_users.iter().any(|u| u == "*" || u == user_id) } /// Register a connection with DingTalk's gateway to get a WebSocket endpoint. async fn register_connection(&self) -> anyhow::Result { let body = serde_json::json!({ "clientId": self.client_id, "clientSecret": self.client_secret, }); let resp = self .client .post("https://api.dingtalk.com/v1.0/gateway/connections/open") .json(&body) .send() .await?; if !resp.status().is_success() { let status = resp.status(); let err = resp.text().await.unwrap_or_default(); anyhow::bail!("DingTalk gateway registration failed ({status}): {err}"); } let gw: GatewayResponse = resp.json().await?; Ok(gw) } fn resolve_reply_target( sender_id: &str, conversation_type: &str, conversation_id: Option<&str>, ) -> String { if conversation_type == "1" { sender_id.to_string() } else { conversation_id.unwrap_or(sender_id).to_string() } } } #[async_trait] impl Channel for DingTalkChannel { fn name(&self) -> &str { "dingtalk" } async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { let webhooks = self.session_webhooks.read().await; let webhook_url = webhooks.get(&message.recipient).ok_or_else(|| { anyhow::anyhow!( "No session webhook found for chat {}. \ The user must send a message first to establish a session.", message.recipient ) })?; let title = message.subject.as_deref().unwrap_or("ZeroClaw"); let body = serde_json::json!({ "msgtype": "markdown", "markdown": { "title": title, "text": message.content, } }); let resp = self.client.post(webhook_url).json(&body).send().await?; if !resp.status().is_success() { let status = resp.status(); let err = resp.text().await.unwrap_or_default(); anyhow::bail!("DingTalk webhook reply failed ({status}): {err}"); } Ok(()) } async fn listen(&self, tx: tokio::sync::mpsc::Sender) -> anyhow::Result<()> { tracing::info!("DingTalk: registering gateway connection..."); let gw = self.register_connection().await?; let ws_url = format!("{}?ticket={}", gw.endpoint, gw.ticket); tracing::info!("DingTalk: connecting to stream WebSocket..."); let (ws_stream, _) = tokio_tungstenite::connect_async(&ws_url).await?; let (mut write, mut read) = ws_stream.split(); tracing::info!("DingTalk: connected and listening for messages..."); while let Some(msg) = read.next().await { let msg = match msg { Ok(Message::Text(t)) => t, Ok(Message::Close(_)) => break, Err(e) => { tracing::warn!("DingTalk WebSocket error: {e}"); break; } _ => continue, }; let frame: serde_json::Value = match serde_json::from_str(&msg) { Ok(v) => v, Err(_) => continue, }; let frame_type = frame.get("type").and_then(|t| t.as_str()).unwrap_or(""); match frame_type { "SYSTEM" => { // Respond to system pings to keep the connection alive let message_id = frame .get("headers") .and_then(|h| h.get("messageId")) .and_then(|m| m.as_str()) .unwrap_or(""); let pong = serde_json::json!({ "code": 200, "headers": { "contentType": "application/json", "messageId": message_id, }, "message": "OK", "data": "", }); if let Err(e) = write.send(Message::Text(pong.to_string())).await { tracing::warn!("DingTalk: failed to send pong: {e}"); break; } } "EVENT" => { // Parse the chatbot callback data from the event let data_str = frame.get("data").and_then(|d| d.as_str()).unwrap_or("{}"); let data: serde_json::Value = match serde_json::from_str(data_str) { Ok(v) => v, Err(_) => continue, }; // Extract message content let content = data .get("text") .and_then(|t| t.get("content")) .and_then(|c| c.as_str()) .unwrap_or("") .trim(); if content.is_empty() { continue; } let sender_id = data .get("senderStaffId") .and_then(|s| s.as_str()) .unwrap_or("unknown"); if !self.is_user_allowed(sender_id) { tracing::warn!( "DingTalk: ignoring message from unauthorized user: {sender_id}" ); continue; } let conversation_type = data .get("conversationType") .and_then(|c| c.as_str()) .unwrap_or("1"); // Private chat uses sender ID, group chat uses conversation ID let chat_id = Self::resolve_reply_target( sender_id, conversation_type, data.get("conversationId").and_then(|c| c.as_str()), ); // Store session webhook for later replies if let Some(webhook) = data.get("sessionWebhook").and_then(|w| w.as_str()) { let mut webhooks = self.session_webhooks.write().await; webhooks.insert(chat_id.clone(), webhook.to_string()); } // Acknowledge the event let message_id = frame .get("headers") .and_then(|h| h.get("messageId")) .and_then(|m| m.as_str()) .unwrap_or(""); let ack = serde_json::json!({ "code": 200, "headers": { "contentType": "application/json", "messageId": message_id, }, "message": "OK", "data": "", }); let _ = write.send(Message::Text(ack.to_string())).await; let channel_msg = ChannelMessage { id: Uuid::new_v4().to_string(), sender: sender_id.to_string(), reply_target: chat_id, content: content.to_string(), channel: "dingtalk".to_string(), timestamp: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(), }; if tx.send(channel_msg).await.is_err() { tracing::warn!("DingTalk: message channel closed"); break; } } _ => {} } } anyhow::bail!("DingTalk WebSocket stream ended") } async fn health_check(&self) -> bool { self.register_connection().await.is_ok() } } #[cfg(test)] mod tests { use super::*; #[test] fn test_name() { let ch = DingTalkChannel::new("id".into(), "secret".into(), vec![]); assert_eq!(ch.name(), "dingtalk"); } #[test] fn test_user_allowed_wildcard() { let ch = DingTalkChannel::new("id".into(), "secret".into(), vec!["*".into()]); assert!(ch.is_user_allowed("anyone")); } #[test] fn test_user_allowed_specific() { let ch = DingTalkChannel::new("id".into(), "secret".into(), vec!["user123".into()]); assert!(ch.is_user_allowed("user123")); assert!(!ch.is_user_allowed("other")); } #[test] fn test_user_denied_empty() { let ch = DingTalkChannel::new("id".into(), "secret".into(), vec![]); assert!(!ch.is_user_allowed("anyone")); } #[test] fn test_config_serde() { let toml_str = r#" client_id = "app_id_123" client_secret = "secret_456" allowed_users = ["user1", "*"] "#; let config: crate::config::schema::DingTalkConfig = toml::from_str(toml_str).unwrap(); assert_eq!(config.client_id, "app_id_123"); assert_eq!(config.client_secret, "secret_456"); assert_eq!(config.allowed_users, vec!["user1", "*"]); } #[test] fn test_config_serde_defaults() { let toml_str = r#" client_id = "id" client_secret = "secret" "#; let config: crate::config::schema::DingTalkConfig = toml::from_str(toml_str).unwrap(); assert!(config.allowed_users.is_empty()); } #[test] fn test_resolve_reply_target_private_chat_uses_sender_id() { let target = DingTalkChannel::resolve_reply_target("staff_1", "1", Some("conv_1")); assert_eq!(target, "staff_1"); } #[test] fn test_resolve_reply_target_group_chat_uses_conversation_id() { let target = DingTalkChannel::resolve_reply_target("staff_1", "2", Some("conv_1")); assert_eq!(target, "conv_1"); } #[test] fn test_resolve_reply_target_group_chat_falls_back_to_sender_id() { let target = DingTalkChannel::resolve_reply_target("staff_1", "2", None); assert_eq!(target, "staff_1"); } }