From 9463bf08a430a374bec2347e7752a99d8dd671f8 Mon Sep 17 00:00:00 2001 From: elonf Date: Tue, 17 Feb 2026 00:02:05 +0800 Subject: [PATCH] feat(channels): add DingTalk channel via Stream Mode Implement DingTalk messaging channel using the official Stream Mode WebSocket protocol with per-message session webhook replies. - Add DingTalkChannel with send/listen/health_check support - Add DingTalkConfig (client_id, client_secret, allowed_users) - Integrate with onboard wizard, integrations registry, and channel list/doctor commands - Include unit tests for user allowlist rules and config serialization --- src/channels/dingtalk.rs | 308 +++++++++++++++++++++++++++++++++++ src/channels/mod.rs | 22 +++ src/config/schema.rs | 17 ++ src/integrations/registry.rs | 12 ++ src/onboard/wizard.rs | 95 ++++++++++- 5 files changed, 449 insertions(+), 5 deletions(-) create mode 100644 src/channels/dingtalk.rs diff --git a/src/channels/dingtalk.rs b/src/channels/dingtalk.rs new file mode 100644 index 0000000..f55135a --- /dev/null +++ b/src/channels/dingtalk.rs @@ -0,0 +1,308 @@ +use super::traits::{Channel, ChannelMessage}; +use async_trait::async_trait; +use futures_util::{SinkExt, StreamExt}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; +use tokio_tungstenite::tungstenite::Message; +use uuid::Uuid; + +/// DingTalk (钉钉) channel — connects via Stream Mode WebSocket for real-time messages. +/// Replies are sent through per-message session webhook URLs. +pub struct DingTalkChannel { + client_id: String, + client_secret: String, + allowed_users: Vec, + client: reqwest::Client, + /// Per-chat session webhooks for sending replies (chatID -> webhook URL). + /// DingTalk provides a unique webhook URL with each incoming message. + session_webhooks: Arc>>, +} + +/// Response from DingTalk gateway connection registration. +#[derive(serde::Deserialize)] +struct GatewayResponse { + endpoint: String, + ticket: String, +} + +impl DingTalkChannel { + pub fn new(client_id: String, client_secret: String, allowed_users: Vec) -> Self { + Self { + client_id, + client_secret, + allowed_users, + client: reqwest::Client::new(), + session_webhooks: Arc::new(RwLock::new(HashMap::new())), + } + } + + fn is_user_allowed(&self, user_id: &str) -> bool { + self.allowed_users.iter().any(|u| u == "*" || u == user_id) + } + + /// Register a connection with DingTalk's gateway to get a WebSocket endpoint. + async fn register_connection(&self) -> anyhow::Result { + let body = serde_json::json!({ + "clientId": self.client_id, + "clientSecret": self.client_secret, + }); + + let resp = self + .client + .post("https://api.dingtalk.com/v1.0/gateway/connections/open") + .json(&body) + .send() + .await?; + + if !resp.status().is_success() { + let status = resp.status(); + let err = resp.text().await.unwrap_or_default(); + anyhow::bail!("DingTalk gateway registration failed ({status}): {err}"); + } + + let gw: GatewayResponse = resp.json().await?; + Ok(gw) + } +} + +#[async_trait] +impl Channel for DingTalkChannel { + fn name(&self) -> &str { + "dingtalk" + } + + async fn send(&self, message: &str, recipient: &str) -> anyhow::Result<()> { + let webhooks = self.session_webhooks.read().await; + let webhook_url = webhooks.get(recipient).ok_or_else(|| { + anyhow::anyhow!( + "No session webhook found for chat {recipient}. \ + The user must send a message first to establish a session." + ) + })?; + + let body = serde_json::json!({ + "msgtype": "markdown", + "markdown": { + "title": "ZeroClaw", + "text": message, + } + }); + + let resp = self.client.post(webhook_url).json(&body).send().await?; + + if !resp.status().is_success() { + let status = resp.status(); + let err = resp.text().await.unwrap_or_default(); + anyhow::bail!("DingTalk webhook reply failed ({status}): {err}"); + } + + Ok(()) + } + + async fn listen(&self, tx: tokio::sync::mpsc::Sender) -> anyhow::Result<()> { + tracing::info!("DingTalk: registering gateway connection..."); + + let gw = self.register_connection().await?; + let ws_url = format!("{}?ticket={}", gw.endpoint, gw.ticket); + + tracing::info!("DingTalk: connecting to stream WebSocket..."); + let (ws_stream, _) = tokio_tungstenite::connect_async(&ws_url).await?; + let (mut write, mut read) = ws_stream.split(); + + tracing::info!("DingTalk: connected and listening for messages..."); + + while let Some(msg) = read.next().await { + let msg = match msg { + Ok(Message::Text(t)) => t, + Ok(Message::Close(_)) => break, + Err(e) => { + tracing::warn!("DingTalk WebSocket error: {e}"); + break; + } + _ => continue, + }; + + let frame: serde_json::Value = match serde_json::from_str(&msg) { + Ok(v) => v, + Err(_) => continue, + }; + + let frame_type = frame.get("type").and_then(|t| t.as_str()).unwrap_or(""); + + match frame_type { + "SYSTEM" => { + // Respond to system pings to keep the connection alive + let message_id = frame + .get("headers") + .and_then(|h| h.get("messageId")) + .and_then(|m| m.as_str()) + .unwrap_or(""); + + let pong = serde_json::json!({ + "code": 200, + "headers": { + "contentType": "application/json", + "messageId": message_id, + }, + "message": "OK", + "data": "", + }); + + if let Err(e) = write.send(Message::Text(pong.to_string())).await { + tracing::warn!("DingTalk: failed to send pong: {e}"); + break; + } + } + "EVENT" => { + // Parse the chatbot callback data from the event + let data_str = frame.get("data").and_then(|d| d.as_str()).unwrap_or("{}"); + + let data: serde_json::Value = match serde_json::from_str(data_str) { + Ok(v) => v, + Err(_) => continue, + }; + + // Extract message content + let content = data + .get("text") + .and_then(|t| t.get("content")) + .and_then(|c| c.as_str()) + .unwrap_or("") + .trim(); + + if content.is_empty() { + continue; + } + + let sender_id = data + .get("senderStaffId") + .and_then(|s| s.as_str()) + .unwrap_or("unknown"); + + if !self.is_user_allowed(sender_id) { + tracing::warn!( + "DingTalk: ignoring message from unauthorized user: {sender_id}" + ); + continue; + } + + let conversation_type = data + .get("conversationType") + .and_then(|c| c.as_str()) + .unwrap_or("1"); + + // Private chat uses sender ID, group chat uses conversation ID + let chat_id = if conversation_type == "1" { + sender_id.to_string() + } else { + data.get("conversationId") + .and_then(|c| c.as_str()) + .unwrap_or(sender_id) + .to_string() + }; + + // Store session webhook for later replies + if let Some(webhook) = data.get("sessionWebhook").and_then(|w| w.as_str()) { + let mut webhooks = self.session_webhooks.write().await; + webhooks.insert(chat_id.clone(), webhook.to_string()); + } + + // Acknowledge the event + let message_id = frame + .get("headers") + .and_then(|h| h.get("messageId")) + .and_then(|m| m.as_str()) + .unwrap_or(""); + + let ack = serde_json::json!({ + "code": 200, + "headers": { + "contentType": "application/json", + "messageId": message_id, + }, + "message": "OK", + "data": "", + }); + let _ = write.send(Message::Text(ack.to_string())).await; + + let channel_msg = ChannelMessage { + id: Uuid::new_v4().to_string(), + sender: sender_id.to_string(), + content: content.to_string(), + channel: "dingtalk".to_string(), + timestamp: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(), + }; + + if tx.send(channel_msg).await.is_err() { + tracing::warn!("DingTalk: message channel closed"); + break; + } + } + _ => {} + } + } + + anyhow::bail!("DingTalk WebSocket stream ended") + } + + async fn health_check(&self) -> bool { + self.register_connection().await.is_ok() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_name() { + let ch = DingTalkChannel::new("id".into(), "secret".into(), vec![]); + assert_eq!(ch.name(), "dingtalk"); + } + + #[test] + fn test_user_allowed_wildcard() { + let ch = DingTalkChannel::new("id".into(), "secret".into(), vec!["*".into()]); + assert!(ch.is_user_allowed("anyone")); + } + + #[test] + fn test_user_allowed_specific() { + let ch = DingTalkChannel::new("id".into(), "secret".into(), vec!["user123".into()]); + assert!(ch.is_user_allowed("user123")); + assert!(!ch.is_user_allowed("other")); + } + + #[test] + fn test_user_denied_empty() { + let ch = DingTalkChannel::new("id".into(), "secret".into(), vec![]); + assert!(!ch.is_user_allowed("anyone")); + } + + #[test] + fn test_config_serde() { + let toml_str = r#" +client_id = "app_id_123" +client_secret = "secret_456" +allowed_users = ["user1", "*"] +"#; + let config: crate::config::schema::DingTalkConfig = toml::from_str(toml_str).unwrap(); + assert_eq!(config.client_id, "app_id_123"); + assert_eq!(config.client_secret, "secret_456"); + assert_eq!(config.allowed_users, vec!["user1", "*"]); + } + + #[test] + fn test_config_serde_defaults() { + let toml_str = r#" +client_id = "id" +client_secret = "secret" +"#; + let config: crate::config::schema::DingTalkConfig = toml::from_str(toml_str).unwrap(); + assert!(config.allowed_users.is_empty()); + } +} diff --git a/src/channels/mod.rs b/src/channels/mod.rs index a3d8281..17b5da3 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -1,4 +1,5 @@ pub mod cli; +pub mod dingtalk; pub mod discord; pub mod email_channel; pub mod imessage; @@ -11,6 +12,7 @@ pub mod traits; pub mod whatsapp; pub use cli::CliChannel; +pub use dingtalk::DingTalkChannel; pub use discord::DiscordChannel; pub use email_channel::EmailChannel; pub use imessage::IMessageChannel; @@ -555,6 +557,7 @@ pub fn handle_command(command: crate::ChannelCommands, config: &Config) -> Resul ("Email", config.channels_config.email.is_some()), ("IRC", config.channels_config.irc.is_some()), ("Lark", config.channels_config.lark.is_some()), + ("DingTalk", config.channels_config.dingtalk.is_some()), ] { println!(" {} {name}", if configured { "✅" } else { "❌" }); } @@ -697,6 +700,17 @@ pub async fn doctor_channels(config: Config) -> Result<()> { )); } + if let Some(ref dt) = config.channels_config.dingtalk { + channels.push(( + "DingTalk", + Arc::new(DingTalkChannel::new( + dt.client_id.clone(), + dt.client_secret.clone(), + dt.allowed_users.clone(), + )), + )); + } + if channels.is_empty() { println!("No real-time channels configured. Run `zeroclaw onboard` first."); return Ok(()); @@ -958,6 +972,14 @@ pub async fn start_channels(config: Config) -> Result<()> { ))); } + if let Some(ref dt) = config.channels_config.dingtalk { + channels.push(Arc::new(DingTalkChannel::new( + dt.client_id.clone(), + dt.client_secret.clone(), + dt.allowed_users.clone(), + ))); + } + if channels.is_empty() { println!("No channels configured. Run `zeroclaw onboard` to set up channels."); return Ok(()); diff --git a/src/config/schema.rs b/src/config/schema.rs index f615d13..587aa61 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -1198,6 +1198,7 @@ pub struct ChannelsConfig { pub email: Option, pub irc: Option, pub lark: Option, + pub dingtalk: Option, } impl Default for ChannelsConfig { @@ -1214,6 +1215,7 @@ impl Default for ChannelsConfig { email: None, irc: None, lark: None, + dingtalk: None, } } } @@ -1487,6 +1489,18 @@ impl Default for AuditConfig { } } +/// DingTalk (钉钉) configuration for Stream Mode messaging +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DingTalkConfig { + /// Client ID (AppKey) from DingTalk developer console + pub client_id: String, + /// Client Secret (AppSecret) from DingTalk developer console + pub client_secret: String, + /// Allowed user IDs (staff IDs). Empty = deny all, "*" = allow all + #[serde(default)] + pub allowed_users: Vec, +} + // ── Config impl ────────────────────────────────────────────────── impl Default for Config { @@ -1865,6 +1879,7 @@ mod tests { email: None, irc: None, lark: None, + dingtalk: None, }, memory: MemoryConfig::default(), tunnel: TunnelConfig::default(), @@ -2127,6 +2142,7 @@ default_temperature = 0.7 email: None, irc: None, lark: None, + dingtalk: None, }; let toml_str = toml::to_string_pretty(&c).unwrap(); let parsed: ChannelsConfig = toml::from_str(&toml_str).unwrap(); @@ -2286,6 +2302,7 @@ channel_id = "C123" email: None, irc: None, lark: None, + dingtalk: None, }; let toml_str = toml::to_string_pretty(&c).unwrap(); let parsed: ChannelsConfig = toml::from_str(&toml_str).unwrap(); diff --git a/src/integrations/registry.rs b/src/integrations/registry.rs index adbab92..b368d7e 100644 --- a/src/integrations/registry.rs +++ b/src/integrations/registry.rs @@ -125,6 +125,18 @@ pub fn all_integrations() -> Vec { category: IntegrationCategory::Chat, status_fn: |_| IntegrationStatus::ComingSoon, }, + IntegrationEntry { + name: "DingTalk", + description: "DingTalk Stream Mode (钉钉)", + category: IntegrationCategory::Chat, + status_fn: |c| { + if c.channels_config.dingtalk.is_some() { + IntegrationStatus::Active + } else { + IntegrationStatus::Available + } + }, + }, // ── AI Models ─────────────────────────────────────────── IntegrationEntry { name: "OpenRouter", diff --git a/src/onboard/wizard.rs b/src/onboard/wizard.rs index 13ed3a8..2fc30cf 100644 --- a/src/onboard/wizard.rs +++ b/src/onboard/wizard.rs @@ -1,4 +1,4 @@ -use crate::config::schema::{IrcConfig, WhatsAppConfig}; +use crate::config::schema::{DingTalkConfig, IrcConfig, WhatsAppConfig}; use crate::config::{ AutonomyConfig, BrowserConfig, ChannelsConfig, ComposioConfig, Config, DiscordConfig, HeartbeatConfig, IMessageConfig, MatrixConfig, MemoryConfig, ObservabilityConfig, @@ -155,7 +155,8 @@ pub fn run_wizard() -> Result { || config.channels_config.slack.is_some() || config.channels_config.imessage.is_some() || config.channels_config.matrix.is_some() - || config.channels_config.email.is_some(); + || config.channels_config.email.is_some() + || config.channels_config.dingtalk.is_some(); if has_channels && config.api_key.is_some() { let launch: bool = Confirm::new() @@ -211,7 +212,8 @@ pub fn run_channels_repair_wizard() -> Result { || config.channels_config.slack.is_some() || config.channels_config.imessage.is_some() || config.channels_config.matrix.is_some() - || config.channels_config.email.is_some(); + || config.channels_config.email.is_some() + || config.channels_config.dingtalk.is_some(); if has_channels && config.api_key.is_some() { let launch: bool = Confirm::new() @@ -2230,6 +2232,7 @@ fn setup_channels() -> Result { email: None, irc: None, lark: None, + dingtalk: None, }; loop { @@ -2298,13 +2301,21 @@ fn setup_channels() -> Result { "— HTTP endpoint" } ), + format!( + "DingTalk {}", + if config.dingtalk.is_some() { + "✅ connected" + } else { + "— 钉钉 Stream Mode" + } + ), "Done — finish setup".to_string(), ]; let choice = Select::new() .with_prompt(" Connect a channel (or Done to continue)") .items(&options) - .default(8) + .default(9) .interact()?; match choice { @@ -3023,6 +3034,76 @@ fn setup_channels() -> Result { style(&port).cyan() ); } + 8 => { + // ── DingTalk ── + println!(); + println!( + " {} {}", + style("DingTalk Setup").white().bold(), + style("— 钉钉 Stream Mode").dim() + ); + print_bullet("1. Go to DingTalk developer console (open.dingtalk.com)"); + print_bullet("2. Create an app and enable the Stream Mode bot"); + print_bullet("3. Copy the Client ID (AppKey) and Client Secret (AppSecret)"); + println!(); + + let client_id: String = Input::new() + .with_prompt(" Client ID (AppKey)") + .interact_text()?; + + if client_id.trim().is_empty() { + println!(" {} Skipped", style("→").dim()); + continue; + } + + let client_secret: String = Input::new() + .with_prompt(" Client Secret (AppSecret)") + .interact_text()?; + + // Test connection + print!(" {} Testing connection... ", style("⏳").dim()); + let client = reqwest::blocking::Client::new(); + let body = serde_json::json!({ + "clientId": client_id, + "clientSecret": client_secret, + }); + match client + .post("https://api.dingtalk.com/v1.0/gateway/connections/open") + .json(&body) + .send() + { + Ok(resp) if resp.status().is_success() => { + println!( + "\r {} DingTalk credentials verified ", + style("✅").green().bold() + ); + } + _ => { + println!( + "\r {} Connection failed — check your credentials", + style("❌").red().bold() + ); + continue; + } + } + + let users_str: String = Input::new() + .with_prompt(" Allowed staff IDs (comma-separated, '*' for all)") + .allow_empty(true) + .interact_text()?; + + let allowed_users: Vec = users_str + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + + config.dingtalk = Some(DingTalkConfig { + client_id, + client_secret, + allowed_users, + }); + } _ => break, // Done } println!(); @@ -3057,6 +3138,9 @@ fn setup_channels() -> Result { if config.webhook.is_some() { active.push("Webhook"); } + if config.dingtalk.is_some() { + active.push("DingTalk"); + } println!( " {} Channels: {}", @@ -3507,7 +3591,8 @@ fn print_summary(config: &Config) { || config.channels_config.slack.is_some() || config.channels_config.imessage.is_some() || config.channels_config.matrix.is_some() - || config.channels_config.email.is_some(); + || config.channels_config.email.is_some() + || config.channels_config.dingtalk.is_some(); println!(); println!(