From 760728d0383d0cb5d51696196dde1cbac774ceb3 Mon Sep 17 00:00:00 2001 From: stawky Date: Mon, 16 Feb 2026 20:19:52 +0800 Subject: [PATCH] feat(channels): add Lark/Feishu IM channel support Implement Lark/Feishu as a new channel for ZeroClaw (Issue #164). - Add LarkChannel with Channel trait impl (name, listen, send) - listen: HTTP server (axum) for event callback with URL verification (challenge response) and im.message.receive_v1 text message parsing - send: POST /open-apis/im/v1/messages with tenant_access_token auth - get_tenant_access_token with caching and auto-refresh on 401 - Allowlist filtering by open_id (same pattern as other channels) - Add LarkConfig to schema (app_id, app_secret, verification_token, port, allowed_users) - Register lark in channel list, doctor, and start_channels - 18 unit tests: config serde, allowlist, channel name, message parsing, edge cases (unicode, missing fields, invalid JSON, wrong event type) - Fix pre-existing SchedulerConfig compile error on main --- src/channels/lark.rs | 649 +++++++++++++++++++++++++++++++++++++++++++ src/channels/mod.rs | 26 ++ 2 files changed, 675 insertions(+) create mode 100644 src/channels/lark.rs diff --git a/src/channels/lark.rs b/src/channels/lark.rs new file mode 100644 index 0000000..71a9a25 --- /dev/null +++ b/src/channels/lark.rs @@ -0,0 +1,649 @@ +use super::traits::{Channel, ChannelMessage}; +use async_trait::async_trait; +use std::sync::Arc; +use tokio::sync::RwLock; +use uuid::Uuid; + +const FEISHU_BASE_URL: &str = "https://open.feishu.cn/open-apis"; + +/// Lark/Feishu channel — receives events via HTTP callback, sends via Open API +pub struct LarkChannel { + app_id: String, + app_secret: String, + verification_token: String, + port: u16, + allowed_users: Vec, + client: reqwest::Client, + /// Cached tenant access token + tenant_token: Arc>>, +} + +impl LarkChannel { + pub fn new( + app_id: String, + app_secret: String, + verification_token: String, + port: u16, + allowed_users: Vec, + ) -> Self { + Self { + app_id, + app_secret, + verification_token, + port, + allowed_users, + client: reqwest::Client::new(), + tenant_token: Arc::new(RwLock::new(None)), + } + } + + /// Check if a user open_id is allowed + fn is_user_allowed(&self, open_id: &str) -> bool { + self.allowed_users.iter().any(|u| u == "*" || u == open_id) + } + + /// Get or refresh tenant access token + async fn get_tenant_access_token(&self) -> anyhow::Result { + // Check cache first + { + let cached = self.tenant_token.read().await; + if let Some(ref token) = *cached { + return Ok(token.clone()); + } + } + + let url = format!("{FEISHU_BASE_URL}/auth/v3/tenant_access_token/internal"); + let body = serde_json::json!({ + "app_id": self.app_id, + "app_secret": self.app_secret, + }); + + let resp = self.client.post(&url).json(&body).send().await?; + let data: serde_json::Value = resp.json().await?; + + let code = data.get("code").and_then(|c| c.as_i64()).unwrap_or(-1); + if code != 0 { + let msg = data + .get("msg") + .and_then(|m| m.as_str()) + .unwrap_or("unknown error"); + anyhow::bail!("Lark tenant_access_token failed: {msg}"); + } + + let token = data + .get("tenant_access_token") + .and_then(|t| t.as_str()) + .ok_or_else(|| anyhow::anyhow!("missing tenant_access_token in response"))? + .to_string(); + + // Cache it + { + let mut cached = self.tenant_token.write().await; + *cached = Some(token.clone()); + } + + Ok(token) + } + + /// Invalidate cached token (called on 401) + async fn invalidate_token(&self) { + let mut cached = self.tenant_token.write().await; + *cached = None; + } + + /// Parse an event callback payload and extract text messages + pub fn parse_event_payload(&self, payload: &serde_json::Value) -> Vec { + let mut messages = Vec::new(); + + // Lark event v2 structure: + // { "header": { "event_type": "im.message.receive_v1" }, "event": { "message": { ... }, "sender": { ... } } } + let event_type = payload + .pointer("/header/event_type") + .and_then(|e| e.as_str()) + .unwrap_or(""); + + if event_type != "im.message.receive_v1" { + return messages; + } + + let event = match payload.get("event") { + Some(e) => e, + None => return messages, + }; + + // Extract sender open_id + let open_id = event + .pointer("/sender/sender_id/open_id") + .and_then(|s| s.as_str()) + .unwrap_or(""); + + if open_id.is_empty() { + return messages; + } + + // Check allowlist + if !self.is_user_allowed(open_id) { + tracing::warn!("Lark: ignoring message from unauthorized user: {open_id}"); + return messages; + } + + // Extract message content (text only) + let msg_type = event + .pointer("/message/message_type") + .and_then(|t| t.as_str()) + .unwrap_or(""); + + if msg_type != "text" { + tracing::debug!("Lark: skipping non-text message type: {msg_type}"); + return messages; + } + + let content_str = event + .pointer("/message/content") + .and_then(|c| c.as_str()) + .unwrap_or(""); + + // content is a JSON string like "{\"text\":\"hello\"}" + let text = serde_json::from_str::(content_str) + .ok() + .and_then(|v| v.get("text").and_then(|t| t.as_str()).map(String::from)) + .unwrap_or_default(); + + if text.is_empty() { + return messages; + } + + let timestamp = event + .pointer("/message/create_time") + .and_then(|t| t.as_str()) + .and_then(|t| t.parse::().ok()) + // Lark timestamps are in milliseconds + .map(|ms| ms / 1000) + .unwrap_or_else(|| { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + }); + + let chat_id = event + .pointer("/message/chat_id") + .and_then(|c| c.as_str()) + .unwrap_or(open_id); + + messages.push(ChannelMessage { + id: Uuid::new_v4().to_string(), + sender: chat_id.to_string(), + content: text, + channel: "lark".to_string(), + timestamp, + }); + + messages + } +} + +#[async_trait] +impl Channel for LarkChannel { + fn name(&self) -> &str { + "lark" + } + + async fn send(&self, message: &str, recipient: &str) -> anyhow::Result<()> { + let token = self.get_tenant_access_token().await?; + let url = format!("{FEISHU_BASE_URL}/im/v1/messages?receive_id_type=chat_id"); + + let content = serde_json::json!({ "text": message }).to_string(); + let body = serde_json::json!({ + "receive_id": recipient, + "msg_type": "text", + "content": content, + }); + + let resp = self + .client + .post(&url) + .header("Authorization", format!("Bearer {token}")) + .header("Content-Type", "application/json; charset=utf-8") + .json(&body) + .send() + .await?; + + if resp.status().as_u16() == 401 { + // Token expired, invalidate and retry once + self.invalidate_token().await; + let new_token = self.get_tenant_access_token().await?; + let retry_resp = self + .client + .post(&url) + .header("Authorization", format!("Bearer {new_token}")) + .header("Content-Type", "application/json; charset=utf-8") + .json(&body) + .send() + .await?; + + if !retry_resp.status().is_success() { + let err = retry_resp.text().await.unwrap_or_default(); + anyhow::bail!("Lark send failed after token refresh: {err}"); + } + return Ok(()); + } + + if !resp.status().is_success() { + let err = resp.text().await.unwrap_or_default(); + anyhow::bail!("Lark send failed: {err}"); + } + + Ok(()) + } + + async fn listen(&self, tx: tokio::sync::mpsc::Sender) -> anyhow::Result<()> { + use axum::{extract::State, routing::post, Json, Router}; + + #[derive(Clone)] + struct AppState { + verification_token: String, + channel: Arc, + tx: tokio::sync::mpsc::Sender, + } + + async fn handle_event( + State(state): State, + Json(payload): Json, + ) -> axum::response::Response { + use axum::http::StatusCode; + use axum::response::IntoResponse; + + // URL verification challenge + if let Some(challenge) = payload.get("challenge").and_then(|c| c.as_str()) { + // Verify token if present + let token_ok = payload + .get("token") + .and_then(|t| t.as_str()) + .map_or(true, |t| t == state.verification_token); + + if !token_ok { + return (StatusCode::FORBIDDEN, "invalid token").into_response(); + } + + let resp = serde_json::json!({ "challenge": challenge }); + return (StatusCode::OK, Json(resp)).into_response(); + } + + // Parse event messages + let messages = state.channel.parse_event_payload(&payload); + for msg in messages { + if state.tx.send(msg).await.is_err() { + tracing::warn!("Lark: message channel closed"); + break; + } + } + + (StatusCode::OK, "ok").into_response() + } + + let state = AppState { + verification_token: self.verification_token.clone(), + channel: Arc::new(LarkChannel::new( + self.app_id.clone(), + self.app_secret.clone(), + self.verification_token.clone(), + self.port, + self.allowed_users.clone(), + )), + tx, + }; + + let app = Router::new() + .route("/lark", post(handle_event)) + .with_state(state); + + let addr = std::net::SocketAddr::from(([0, 0, 0, 0], self.port)); + tracing::info!("Lark event callback server listening on {addr}"); + + let listener = tokio::net::TcpListener::bind(addr).await?; + axum::serve(listener, app).await?; + + Ok(()) + } + + async fn health_check(&self) -> bool { + self.get_tenant_access_token().await.is_ok() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_channel() -> LarkChannel { + LarkChannel::new( + "cli_test_app_id".into(), + "test_app_secret".into(), + "test_verification_token".into(), + 9898, + vec!["ou_testuser123".into()], + ) + } + + #[test] + fn lark_channel_name() { + let ch = make_channel(); + assert_eq!(ch.name(), "lark"); + } + + #[test] + fn lark_user_allowed_exact() { + let ch = make_channel(); + assert!(ch.is_user_allowed("ou_testuser123")); + assert!(!ch.is_user_allowed("ou_other")); + } + + #[test] + fn lark_user_allowed_wildcard() { + let ch = LarkChannel::new( + "id".into(), + "secret".into(), + "token".into(), + 9898, + vec!["*".into()], + ); + assert!(ch.is_user_allowed("ou_anyone")); + } + + #[test] + fn lark_user_denied_empty() { + let ch = LarkChannel::new( + "id".into(), + "secret".into(), + "token".into(), + 9898, + vec![], + ); + assert!(!ch.is_user_allowed("ou_anyone")); + } + + #[test] + fn lark_parse_challenge() { + let ch = make_channel(); + let payload = serde_json::json!({ + "challenge": "abc123", + "token": "test_verification_token", + "type": "url_verification" + }); + // Challenge payloads should not produce messages + let msgs = ch.parse_event_payload(&payload); + assert!(msgs.is_empty()); + } + + #[test] + fn lark_parse_valid_text_message() { + let ch = make_channel(); + let payload = serde_json::json!({ + "header": { + "event_type": "im.message.receive_v1" + }, + "event": { + "sender": { + "sender_id": { + "open_id": "ou_testuser123" + } + }, + "message": { + "message_type": "text", + "content": "{\"text\":\"Hello ZeroClaw!\"}", + "chat_id": "oc_chat123", + "create_time": "1699999999000" + } + } + }); + + let msgs = ch.parse_event_payload(&payload); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].content, "Hello ZeroClaw!"); + assert_eq!(msgs[0].sender, "oc_chat123"); + assert_eq!(msgs[0].channel, "lark"); + assert_eq!(msgs[0].timestamp, 1_699_999_999); + } + + #[test] + fn lark_parse_unauthorized_user() { + let ch = make_channel(); + let payload = serde_json::json!({ + "header": { "event_type": "im.message.receive_v1" }, + "event": { + "sender": { "sender_id": { "open_id": "ou_unauthorized" } }, + "message": { + "message_type": "text", + "content": "{\"text\":\"spam\"}", + "chat_id": "oc_chat", + "create_time": "1000" + } + } + }); + + let msgs = ch.parse_event_payload(&payload); + assert!(msgs.is_empty()); + } + + #[test] + fn lark_parse_non_text_message_skipped() { + let ch = LarkChannel::new( + "id".into(), + "secret".into(), + "token".into(), + 9898, + vec!["*".into()], + ); + let payload = serde_json::json!({ + "header": { "event_type": "im.message.receive_v1" }, + "event": { + "sender": { "sender_id": { "open_id": "ou_user" } }, + "message": { + "message_type": "image", + "content": "{}", + "chat_id": "oc_chat" + } + } + }); + + let msgs = ch.parse_event_payload(&payload); + assert!(msgs.is_empty()); + } + + #[test] + fn lark_parse_empty_text_skipped() { + let ch = LarkChannel::new( + "id".into(), + "secret".into(), + "token".into(), + 9898, + vec!["*".into()], + ); + let payload = serde_json::json!({ + "header": { "event_type": "im.message.receive_v1" }, + "event": { + "sender": { "sender_id": { "open_id": "ou_user" } }, + "message": { + "message_type": "text", + "content": "{\"text\":\"\"}", + "chat_id": "oc_chat" + } + } + }); + + let msgs = ch.parse_event_payload(&payload); + assert!(msgs.is_empty()); + } + + #[test] + fn lark_parse_wrong_event_type() { + let ch = make_channel(); + let payload = serde_json::json!({ + "header": { "event_type": "im.chat.disbanded_v1" }, + "event": {} + }); + + let msgs = ch.parse_event_payload(&payload); + assert!(msgs.is_empty()); + } + + #[test] + fn lark_parse_missing_sender() { + let ch = LarkChannel::new( + "id".into(), + "secret".into(), + "token".into(), + 9898, + vec!["*".into()], + ); + let payload = serde_json::json!({ + "header": { "event_type": "im.message.receive_v1" }, + "event": { + "message": { + "message_type": "text", + "content": "{\"text\":\"hello\"}", + "chat_id": "oc_chat" + } + } + }); + + let msgs = ch.parse_event_payload(&payload); + assert!(msgs.is_empty()); + } + + #[test] + fn lark_parse_unicode_message() { + let ch = LarkChannel::new( + "id".into(), + "secret".into(), + "token".into(), + 9898, + vec!["*".into()], + ); + let payload = serde_json::json!({ + "header": { "event_type": "im.message.receive_v1" }, + "event": { + "sender": { "sender_id": { "open_id": "ou_user" } }, + "message": { + "message_type": "text", + "content": "{\"text\":\"你好世界 🌍\"}", + "chat_id": "oc_chat", + "create_time": "1000" + } + } + }); + + let msgs = ch.parse_event_payload(&payload); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].content, "你好世界 🌍"); + } + + #[test] + fn lark_parse_missing_event() { + let ch = make_channel(); + let payload = serde_json::json!({ + "header": { "event_type": "im.message.receive_v1" } + }); + + let msgs = ch.parse_event_payload(&payload); + assert!(msgs.is_empty()); + } + + #[test] + fn lark_parse_invalid_content_json() { + let ch = LarkChannel::new( + "id".into(), + "secret".into(), + "token".into(), + 9898, + vec!["*".into()], + ); + let payload = serde_json::json!({ + "header": { "event_type": "im.message.receive_v1" }, + "event": { + "sender": { "sender_id": { "open_id": "ou_user" } }, + "message": { + "message_type": "text", + "content": "not valid json", + "chat_id": "oc_chat" + } + } + }); + + let msgs = ch.parse_event_payload(&payload); + assert!(msgs.is_empty()); + } + + #[test] + fn lark_config_serde() { + use crate::config::schema::LarkConfig; + let lc = LarkConfig { + app_id: "cli_app123".into(), + app_secret: "secret456".into(), + verification_token: "vtoken789".into(), + port: 9898, + allowed_users: vec!["ou_user1".into(), "ou_user2".into()], + }; + let json = serde_json::to_string(&lc).unwrap(); + let parsed: LarkConfig = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.app_id, "cli_app123"); + assert_eq!(parsed.app_secret, "secret456"); + assert_eq!(parsed.verification_token, "vtoken789"); + assert_eq!(parsed.port, 9898); + assert_eq!(parsed.allowed_users.len(), 2); + } + + #[test] + fn lark_config_toml_roundtrip() { + use crate::config::schema::LarkConfig; + let lc = LarkConfig { + app_id: "app".into(), + app_secret: "secret".into(), + verification_token: "tok".into(), + port: 8080, + allowed_users: vec!["*".into()], + }; + let toml_str = toml::to_string(&lc).unwrap(); + let parsed: LarkConfig = toml::from_str(&toml_str).unwrap(); + assert_eq!(parsed.app_id, "app"); + assert_eq!(parsed.port, 8080); + assert_eq!(parsed.allowed_users, vec!["*"]); + } + + #[test] + fn lark_config_default_port() { + use crate::config::schema::LarkConfig; + let json = r#"{"app_id":"a","app_secret":"s","verification_token":"t"}"#; + let parsed: LarkConfig = serde_json::from_str(json).unwrap(); + assert_eq!(parsed.port, 9898); + assert!(parsed.allowed_users.is_empty()); + } + + #[test] + fn lark_parse_fallback_sender_to_open_id() { + // When chat_id is missing, sender should fall back to open_id + let ch = LarkChannel::new( + "id".into(), + "secret".into(), + "token".into(), + 9898, + vec!["*".into()], + ); + let payload = serde_json::json!({ + "header": { "event_type": "im.message.receive_v1" }, + "event": { + "sender": { "sender_id": { "open_id": "ou_user" } }, + "message": { + "message_type": "text", + "content": "{\"text\":\"hello\"}", + "create_time": "1000" + } + } + }); + + let msgs = ch.parse_event_payload(&payload); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].sender, "ou_user"); + } +} diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 1acc502..3ffb1da 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -3,6 +3,7 @@ pub mod discord; pub mod email_channel; pub mod imessage; pub mod irc; +pub mod lark; pub mod matrix; pub mod slack; pub mod telegram; @@ -14,6 +15,7 @@ pub use discord::DiscordChannel; pub use email_channel::EmailChannel; pub use imessage::IMessageChannel; pub use irc::IrcChannel; +pub use lark::LarkChannel; pub use matrix::MatrixChannel; pub use slack::SlackChannel; pub use telegram::TelegramChannel; @@ -506,6 +508,7 @@ pub fn handle_command(command: crate::ChannelCommands, config: &Config) -> Resul ("WhatsApp", config.channels_config.whatsapp.is_some()), ("Email", config.channels_config.email.is_some()), ("IRC", config.channels_config.irc.is_some()), + ("Lark", config.channels_config.lark.is_some()), ] { println!(" {} {name}", if configured { "✅" } else { "❌" }); } @@ -635,6 +638,19 @@ pub async fn doctor_channels(config: Config) -> Result<()> { )); } + if let Some(ref lk) = config.channels_config.lark { + channels.push(( + "Lark", + Arc::new(LarkChannel::new( + lk.app_id.clone(), + lk.app_secret.clone(), + lk.verification_token.clone().unwrap_or_default(), + 9898, + lk.allowed_users.clone(), + )), + )); + } + if channels.is_empty() { println!("No real-time channels configured. Run `zeroclaw onboard` first."); return Ok(()); @@ -871,6 +887,16 @@ pub async fn start_channels(config: Config) -> Result<()> { ))); } + if let Some(ref lk) = config.channels_config.lark { + channels.push(Arc::new(LarkChannel::new( + lk.app_id.clone(), + lk.app_secret.clone(), + lk.verification_token.clone().unwrap_or_default(), + 9898, + lk.allowed_users.clone(), + ))); + } + if channels.is_empty() { println!("No channels configured. Run `zeroclaw onboard` to set up channels."); return Ok(());