zeroclaw/src/channels/lark.rs

1178 lines
41 KiB
Rust

use super::traits::{Channel, ChannelMessage};
use async_trait::async_trait;
use futures_util::{SinkExt, StreamExt};
use prost::Message as ProstMessage;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tokio_tungstenite::tungstenite::Message as WsMsg;
use uuid::Uuid;
const FEISHU_BASE_URL: &str = "https://open.feishu.cn/open-apis";
const FEISHU_WS_BASE_URL: &str = "https://open.feishu.cn";
const LARK_BASE_URL: &str = "https://open.larksuite.com/open-apis";
const LARK_WS_BASE_URL: &str = "https://open.larksuite.com";
// ─────────────────────────────────────────────────────────────────────────────
// Feishu WebSocket long-connection: pbbp2.proto frame codec
// ─────────────────────────────────────────────────────────────────────────────
#[derive(Clone, PartialEq, prost::Message)]
struct PbHeader {
#[prost(string, tag = "1")]
pub key: String,
#[prost(string, tag = "2")]
pub value: String,
}
/// Feishu WS frame (pbbp2.proto).
/// method=0 → CONTROL (ping/pong) method=1 → DATA (events)
#[derive(Clone, PartialEq, prost::Message)]
struct PbFrame {
#[prost(uint64, tag = "1")]
pub seq_id: u64,
#[prost(uint64, tag = "2")]
pub log_id: u64,
#[prost(int32, tag = "3")]
pub service: i32,
#[prost(int32, tag = "4")]
pub method: i32,
#[prost(message, repeated, tag = "5")]
pub headers: Vec<PbHeader>,
#[prost(bytes = "vec", optional, tag = "8")]
pub payload: Option<Vec<u8>>,
}
impl PbFrame {
fn header_value<'a>(&'a self, key: &str) -> &'a str {
self.headers
.iter()
.find(|h| h.key == key)
.map(|h| h.value.as_str())
.unwrap_or("")
}
}
/// Server-sent client config (parsed from pong payload)
#[derive(Debug, serde::Deserialize, Default, Clone)]
struct WsClientConfig {
#[serde(rename = "PingInterval")]
ping_interval: Option<u64>,
}
/// POST /callback/ws/endpoint response
#[derive(Debug, serde::Deserialize)]
struct WsEndpointResp {
code: i32,
#[serde(default)]
msg: Option<String>,
#[serde(default)]
data: Option<WsEndpoint>,
}
#[derive(Debug, serde::Deserialize)]
struct WsEndpoint {
#[serde(rename = "URL")]
url: String,
#[serde(rename = "ClientConfig")]
client_config: Option<WsClientConfig>,
}
/// LarkEvent envelope (method=1 / type=event payload)
#[derive(Debug, serde::Deserialize)]
struct LarkEvent {
header: LarkEventHeader,
event: serde_json::Value,
}
#[derive(Debug, serde::Deserialize)]
struct LarkEventHeader {
event_type: String,
#[allow(dead_code)]
event_id: String,
}
#[derive(Debug, serde::Deserialize)]
struct MsgReceivePayload {
sender: LarkSender,
message: LarkMessage,
}
#[derive(Debug, serde::Deserialize)]
struct LarkSender {
sender_id: LarkSenderId,
#[serde(default)]
sender_type: String,
}
#[derive(Debug, serde::Deserialize, Default)]
struct LarkSenderId {
open_id: Option<String>,
}
#[derive(Debug, serde::Deserialize)]
struct LarkMessage {
message_id: String,
chat_id: String,
chat_type: String,
message_type: String,
#[serde(default)]
content: String,
#[serde(default)]
mentions: Vec<serde_json::Value>,
}
/// Heartbeat timeout for WS connection — must be larger than ping_interval (default 120 s).
/// If no binary frame (pong or event) is received within this window, reconnect.
const WS_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(300);
/// Lark/Feishu channel.
///
/// Supports two receive modes (configured via `receive_mode` in config):
/// - **`websocket`** (default): persistent WSS long-connection; no public URL needed.
/// - **`webhook`**: HTTP callback server; requires a public HTTPS endpoint.
pub struct LarkChannel {
app_id: String,
app_secret: String,
verification_token: String,
port: Option<u16>,
allowed_users: Vec<String>,
/// When true, use Feishu (CN) endpoints; when false, use Lark (international).
use_feishu: bool,
/// How to receive events: WebSocket long-connection or HTTP webhook.
receive_mode: crate::config::schema::LarkReceiveMode,
client: reqwest::Client,
/// Cached tenant access token
tenant_token: Arc<RwLock<Option<String>>>,
/// Dedup set: WS message_ids seen in last ~30 min to prevent double-dispatch
ws_seen_ids: Arc<RwLock<HashMap<String, Instant>>>,
}
impl LarkChannel {
pub fn new(
app_id: String,
app_secret: String,
verification_token: String,
port: Option<u16>,
allowed_users: Vec<String>,
) -> Self {
Self {
app_id,
app_secret,
verification_token,
port,
allowed_users,
use_feishu: true,
receive_mode: crate::config::schema::LarkReceiveMode::default(),
client: reqwest::Client::new(),
tenant_token: Arc::new(RwLock::new(None)),
ws_seen_ids: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Build from `LarkConfig` (preserves `use_feishu` and `receive_mode`).
pub fn from_config(config: &crate::config::schema::LarkConfig) -> Self {
let mut ch = Self::new(
config.app_id.clone(),
config.app_secret.clone(),
config.verification_token.clone().unwrap_or_default(),
config.port,
config.allowed_users.clone(),
);
ch.use_feishu = config.use_feishu;
ch.receive_mode = config.receive_mode.clone();
ch
}
fn api_base(&self) -> &'static str {
if self.use_feishu {
FEISHU_BASE_URL
} else {
LARK_BASE_URL
}
}
fn ws_base(&self) -> &'static str {
if self.use_feishu {
FEISHU_WS_BASE_URL
} else {
LARK_WS_BASE_URL
}
}
/// POST /callback/ws/endpoint → (wss_url, client_config)
async fn get_ws_endpoint(&self) -> anyhow::Result<(String, WsClientConfig)> {
let resp = self
.client
.post(format!("{}/callback/ws/endpoint", self.ws_base()))
.header("locale", if self.use_feishu { "zh" } else { "en" })
.json(&serde_json::json!({
"AppID": self.app_id,
"AppSecret": self.app_secret,
}))
.send()
.await?
.json::<WsEndpointResp>()
.await?;
if resp.code != 0 {
anyhow::bail!(
"Lark WS endpoint failed: code={} msg={}",
resp.code,
resp.msg.as_deref().unwrap_or("(none)")
);
}
let ep = resp
.data
.ok_or_else(|| anyhow::anyhow!("Lark WS endpoint: empty data"))?;
Ok((ep.url, ep.client_config.unwrap_or_default()))
}
/// WS long-connection event loop. Returns Ok(()) when the connection closes
/// (the caller reconnects).
#[allow(clippy::too_many_lines)]
async fn listen_ws(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
let (wss_url, client_config) = self.get_ws_endpoint().await?;
let service_id = wss_url
.split('?')
.nth(1)
.and_then(|qs| {
qs.split('&')
.find(|kv| kv.starts_with("service_id="))
.and_then(|kv| kv.split('=').nth(1))
.and_then(|v| v.parse::<i32>().ok())
})
.unwrap_or(0);
tracing::info!("Lark: connecting to {wss_url}");
let (ws_stream, _) = tokio_tungstenite::connect_async(&wss_url).await?;
let (mut write, mut read) = ws_stream.split();
tracing::info!("Lark: WS connected (service_id={service_id})");
let mut ping_secs = client_config.ping_interval.unwrap_or(120).max(10);
let mut hb_interval = tokio::time::interval(Duration::from_secs(ping_secs));
let mut timeout_check = tokio::time::interval(Duration::from_secs(10));
hb_interval.tick().await; // consume immediate tick
let mut seq: u64 = 0;
let mut last_recv = Instant::now();
// Send initial ping immediately (like the official SDK) so the server
// starts responding with pongs and we can calibrate the ping_interval.
seq = seq.wrapping_add(1);
let initial_ping = PbFrame {
seq_id: seq,
log_id: 0,
service: service_id,
method: 0,
headers: vec![PbHeader {
key: "type".into(),
value: "ping".into(),
}],
payload: None,
};
if write
.send(WsMsg::Binary(initial_ping.encode_to_vec()))
.await
.is_err()
{
anyhow::bail!("Lark: initial ping failed");
}
// message_id → (fragment_slots, created_at) for multi-part reassembly
type FragEntry = (Vec<Option<Vec<u8>>>, Instant);
let mut frag_cache: HashMap<String, FragEntry> = HashMap::new();
loop {
tokio::select! {
biased;
_ = hb_interval.tick() => {
seq = seq.wrapping_add(1);
let ping = PbFrame {
seq_id: seq, log_id: 0, service: service_id, method: 0,
headers: vec![PbHeader { key: "type".into(), value: "ping".into() }],
payload: None,
};
if write.send(WsMsg::Binary(ping.encode_to_vec())).await.is_err() {
tracing::warn!("Lark: ping failed, reconnecting");
break;
}
// GC stale fragments > 5 min
let cutoff = Instant::now().checked_sub(Duration::from_secs(300)).unwrap_or(Instant::now());
frag_cache.retain(|_, (_, ts)| *ts > cutoff);
}
_ = timeout_check.tick() => {
if last_recv.elapsed() > WS_HEARTBEAT_TIMEOUT {
tracing::warn!("Lark: heartbeat timeout, reconnecting");
break;
}
}
msg = read.next() => {
let raw = match msg {
Some(Ok(WsMsg::Binary(b))) => { last_recv = Instant::now(); b }
Some(Ok(WsMsg::Ping(d))) => { let _ = write.send(WsMsg::Pong(d)).await; continue; }
Some(Ok(WsMsg::Close(_))) | None => { tracing::info!("Lark: WS closed — reconnecting"); break; }
Some(Err(e)) => { tracing::error!("Lark: WS read error: {e}"); break; }
_ => continue,
};
let frame = match PbFrame::decode(&raw[..]) {
Ok(f) => f,
Err(e) => { tracing::error!("Lark: proto decode: {e}"); continue; }
};
// CONTROL frame
if frame.method == 0 {
if frame.header_value("type") == "pong" {
if let Some(p) = &frame.payload {
if let Ok(cfg) = serde_json::from_slice::<WsClientConfig>(p) {
if let Some(secs) = cfg.ping_interval {
let secs = secs.max(10);
if secs != ping_secs {
ping_secs = secs;
hb_interval = tokio::time::interval(Duration::from_secs(ping_secs));
tracing::info!("Lark: ping_interval → {ping_secs}s");
}
}
}
}
}
continue;
}
// DATA frame
let msg_type = frame.header_value("type").to_string();
let msg_id = frame.header_value("message_id").to_string();
let sum = frame.header_value("sum").parse::<usize>().unwrap_or(1);
let seq_num = frame.header_value("seq").parse::<usize>().unwrap_or(0);
// ACK immediately (Feishu requires within 3 s)
{
let mut ack = frame.clone();
ack.payload = Some(br#"{"code":200,"headers":{},"data":[]}"#.to_vec());
ack.headers.push(PbHeader { key: "biz_rt".into(), value: "0".into() });
let _ = write.send(WsMsg::Binary(ack.encode_to_vec())).await;
}
// Fragment reassembly
let sum = if sum == 0 { 1 } else { sum };
let payload: Vec<u8> = if sum == 1 || msg_id.is_empty() || seq_num >= sum {
frame.payload.clone().unwrap_or_default()
} else {
let entry = frag_cache.entry(msg_id.clone())
.or_insert_with(|| (vec![None; sum], Instant::now()));
if entry.0.len() != sum { *entry = (vec![None; sum], Instant::now()); }
entry.0[seq_num] = frame.payload.clone();
if entry.0.iter().all(|s| s.is_some()) {
let full: Vec<u8> = entry.0.iter()
.flat_map(|s| s.as_deref().unwrap_or(&[]))
.copied().collect();
frag_cache.remove(&msg_id);
full
} else { continue; }
};
if msg_type != "event" { continue; }
let event: LarkEvent = match serde_json::from_slice(&payload) {
Ok(e) => e,
Err(e) => { tracing::error!("Lark: event JSON: {e}"); continue; }
};
if event.header.event_type != "im.message.receive_v1" { continue; }
let recv: MsgReceivePayload = match serde_json::from_value(event.event) {
Ok(r) => r,
Err(e) => { tracing::error!("Lark: payload parse: {e}"); continue; }
};
if recv.sender.sender_type == "app" || recv.sender.sender_type == "bot" { continue; }
let sender_open_id = recv.sender.sender_id.open_id.as_deref().unwrap_or("");
if !self.is_user_allowed(sender_open_id) {
tracing::warn!("Lark WS: ignoring {sender_open_id} (not in allowed_users)");
continue;
}
let lark_msg = &recv.message;
// Dedup
{
let now = Instant::now();
let mut seen = self.ws_seen_ids.write().await;
// GC
seen.retain(|_, t| now.duration_since(*t) < Duration::from_secs(30 * 60));
if seen.contains_key(&lark_msg.message_id) {
tracing::debug!("Lark WS: dup {}", lark_msg.message_id);
continue;
}
seen.insert(lark_msg.message_id.clone(), now);
}
// Decode content by type (mirrors clawdbot-feishu parsing)
let text = match lark_msg.message_type.as_str() {
"text" => {
let v: serde_json::Value = match serde_json::from_str(&lark_msg.content) {
Ok(v) => v,
Err(_) => continue,
};
v.get("text").and_then(|t| t.as_str()).unwrap_or("").to_string()
}
"post" => parse_post_content(&lark_msg.content),
_ => { tracing::debug!("Lark WS: skipping unsupported type '{}'", lark_msg.message_type); continue; }
};
// Strip @_user_N placeholders
let text = strip_at_placeholders(&text);
let text = text.trim().to_string();
if text.is_empty() { continue; }
// Group-chat: only respond when explicitly @-mentioned
if lark_msg.chat_type == "group" && !should_respond_in_group(&lark_msg.mentions) {
continue;
}
let channel_msg = ChannelMessage {
id: Uuid::new_v4().to_string(),
sender: lark_msg.chat_id.clone(),
content: text,
channel: "lark".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
};
tracing::debug!("Lark WS: message in {}", lark_msg.chat_id);
if tx.send(channel_msg).await.is_err() { break; }
}
}
}
Ok(())
}
/// Check if a user open_id is allowed
fn is_user_allowed(&self, open_id: &str) -> bool {
self.allowed_users.iter().any(|u| u == "*" || u == open_id)
}
/// Get or refresh tenant access token
async fn get_tenant_access_token(&self) -> anyhow::Result<String> {
// Check cache first
{
let cached = self.tenant_token.read().await;
if let Some(ref token) = *cached {
return Ok(token.clone());
}
}
let url = format!("{FEISHU_BASE_URL}/auth/v3/tenant_access_token/internal");
let body = serde_json::json!({
"app_id": self.app_id,
"app_secret": self.app_secret,
});
let resp = self.client.post(&url).json(&body).send().await?;
let data: serde_json::Value = resp.json().await?;
let code = data.get("code").and_then(|c| c.as_i64()).unwrap_or(-1);
if code != 0 {
let msg = data
.get("msg")
.and_then(|m| m.as_str())
.unwrap_or("unknown error");
anyhow::bail!("Lark tenant_access_token failed: {msg}");
}
let token = data
.get("tenant_access_token")
.and_then(|t| t.as_str())
.ok_or_else(|| anyhow::anyhow!("missing tenant_access_token in response"))?
.to_string();
// Cache it
{
let mut cached = self.tenant_token.write().await;
*cached = Some(token.clone());
}
Ok(token)
}
/// Invalidate cached token (called on 401)
async fn invalidate_token(&self) {
let mut cached = self.tenant_token.write().await;
*cached = None;
}
/// Parse an event callback payload and extract text messages
pub fn parse_event_payload(&self, payload: &serde_json::Value) -> Vec<ChannelMessage> {
let mut messages = Vec::new();
// Lark event v2 structure:
// { "header": { "event_type": "im.message.receive_v1" }, "event": { "message": { ... }, "sender": { ... } } }
let event_type = payload
.pointer("/header/event_type")
.and_then(|e| e.as_str())
.unwrap_or("");
if event_type != "im.message.receive_v1" {
return messages;
}
let event = match payload.get("event") {
Some(e) => e,
None => return messages,
};
// Extract sender open_id
let open_id = event
.pointer("/sender/sender_id/open_id")
.and_then(|s| s.as_str())
.unwrap_or("");
if open_id.is_empty() {
return messages;
}
// Check allowlist
if !self.is_user_allowed(open_id) {
tracing::warn!("Lark: ignoring message from unauthorized user: {open_id}");
return messages;
}
// Extract message content (text only)
let msg_type = event
.pointer("/message/message_type")
.and_then(|t| t.as_str())
.unwrap_or("");
if msg_type != "text" {
tracing::debug!("Lark: skipping non-text message type: {msg_type}");
return messages;
}
let content_str = event
.pointer("/message/content")
.and_then(|c| c.as_str())
.unwrap_or("");
// content is a JSON string like "{\"text\":\"hello\"}"
let text = serde_json::from_str::<serde_json::Value>(content_str)
.ok()
.and_then(|v| v.get("text").and_then(|t| t.as_str()).map(String::from))
.unwrap_or_default();
if text.is_empty() {
return messages;
}
let timestamp = event
.pointer("/message/create_time")
.and_then(|t| t.as_str())
.and_then(|t| t.parse::<u64>().ok())
// Lark timestamps are in milliseconds
.map(|ms| ms / 1000)
.unwrap_or_else(|| {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
});
let chat_id = event
.pointer("/message/chat_id")
.and_then(|c| c.as_str())
.unwrap_or(open_id);
messages.push(ChannelMessage {
id: Uuid::new_v4().to_string(),
sender: chat_id.to_string(),
content: text,
channel: "lark".to_string(),
timestamp,
});
messages
}
}
#[async_trait]
impl Channel for LarkChannel {
fn name(&self) -> &str {
"lark"
}
async fn send(&self, message: &str, recipient: &str) -> anyhow::Result<()> {
let token = self.get_tenant_access_token().await?;
let url = format!("{FEISHU_BASE_URL}/im/v1/messages?receive_id_type=chat_id");
let content = serde_json::json!({ "text": message }).to_string();
let body = serde_json::json!({
"receive_id": recipient,
"msg_type": "text",
"content": content,
});
let resp = self
.client
.post(&url)
.header("Authorization", format!("Bearer {token}"))
.header("Content-Type", "application/json; charset=utf-8")
.json(&body)
.send()
.await?;
if resp.status().as_u16() == 401 {
// Token expired, invalidate and retry once
self.invalidate_token().await;
let new_token = self.get_tenant_access_token().await?;
let retry_resp = self
.client
.post(&url)
.header("Authorization", format!("Bearer {new_token}"))
.header("Content-Type", "application/json; charset=utf-8")
.json(&body)
.send()
.await?;
if !retry_resp.status().is_success() {
let err = retry_resp.text().await.unwrap_or_default();
anyhow::bail!("Lark send failed after token refresh: {err}");
}
return Ok(());
}
if !resp.status().is_success() {
let err = resp.text().await.unwrap_or_default();
anyhow::bail!("Lark send failed: {err}");
}
Ok(())
}
async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
use crate::config::schema::LarkReceiveMode;
match self.receive_mode {
LarkReceiveMode::Websocket => self.listen_ws(tx).await,
LarkReceiveMode::Webhook => self.listen_http(tx).await,
}
}
async fn health_check(&self) -> bool {
self.get_tenant_access_token().await.is_ok()
}
}
impl LarkChannel {
/// HTTP callback server (legacy — requires a public endpoint).
/// Use `listen()` (WS long-connection) for new deployments.
pub async fn listen_http(
&self,
tx: tokio::sync::mpsc::Sender<ChannelMessage>,
) -> anyhow::Result<()> {
use axum::{extract::State, routing::post, Json, Router};
#[derive(Clone)]
struct AppState {
verification_token: String,
channel: Arc<LarkChannel>,
tx: tokio::sync::mpsc::Sender<ChannelMessage>,
}
async fn handle_event(
State(state): State<AppState>,
Json(payload): Json<serde_json::Value>,
) -> axum::response::Response {
use axum::http::StatusCode;
use axum::response::IntoResponse;
// URL verification challenge
if let Some(challenge) = payload.get("challenge").and_then(|c| c.as_str()) {
// Verify token if present
let token_ok = payload
.get("token")
.and_then(|t| t.as_str())
.map_or(true, |t| t == state.verification_token);
if !token_ok {
return (StatusCode::FORBIDDEN, "invalid token").into_response();
}
let resp = serde_json::json!({ "challenge": challenge });
return (StatusCode::OK, Json(resp)).into_response();
}
// Parse event messages
let messages = state.channel.parse_event_payload(&payload);
for msg in messages {
if state.tx.send(msg).await.is_err() {
tracing::warn!("Lark: message channel closed");
break;
}
}
(StatusCode::OK, "ok").into_response()
}
let port = self.port.ok_or_else(|| {
anyhow::anyhow!("Lark webhook mode requires `port` to be set in [channels_config.lark]")
})?;
let state = AppState {
verification_token: self.verification_token.clone(),
channel: Arc::new(LarkChannel::new(
self.app_id.clone(),
self.app_secret.clone(),
self.verification_token.clone(),
None,
self.allowed_users.clone(),
)),
tx,
};
let app = Router::new()
.route("/lark", post(handle_event))
.with_state(state);
let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port));
tracing::info!("Lark event callback server listening on {addr}");
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, app).await?;
Ok(())
}
}
// ─────────────────────────────────────────────────────────────────────────────
// WS helper functions
// ─────────────────────────────────────────────────────────────────────────────
/// Flatten a Feishu `post` rich-text message to plain text.
fn parse_post_content(content: &str) -> String {
let Ok(parsed) = serde_json::from_str::<serde_json::Value>(content) else {
return "[富文本消息]".to_string();
};
let locale = parsed
.get("zh_cn")
.or_else(|| parsed.get("en_us"))
.or_else(|| {
parsed
.as_object()
.and_then(|m| m.values().find(|v| v.is_object()))
});
let Some(locale) = locale else {
return "[富文本消息]".to_string();
};
let mut text = String::new();
if let Some(paragraphs) = locale.get("content").and_then(|c| c.as_array()) {
for para in paragraphs {
if let Some(elements) = para.as_array() {
for el in elements {
match el.get("tag").and_then(|t| t.as_str()).unwrap_or("") {
"text" => {
if let Some(t) = el.get("text").and_then(|t| t.as_str()) {
text.push_str(t);
}
}
"a" => {
text.push_str(
el.get("text")
.and_then(|t| t.as_str())
.filter(|s| !s.is_empty())
.or_else(|| el.get("href").and_then(|h| h.as_str()))
.unwrap_or(""),
);
}
"at" => {
let n = el
.get("user_name")
.and_then(|n| n.as_str())
.or_else(|| el.get("user_id").and_then(|i| i.as_str()))
.unwrap_or("user");
text.push('@');
text.push_str(n);
}
"img" => {
text.push_str("[图片]");
}
_ => {}
}
}
text.push('\n');
}
}
}
let result = text.trim().to_string();
if result.is_empty() {
"[富文本消息]".to_string()
} else {
result
}
}
/// Remove `@_user_N` placeholder tokens injected by Feishu in group chats.
fn strip_at_placeholders(text: &str) -> String {
let mut result = String::with_capacity(text.len());
let mut chars = text.char_indices().peekable();
while let Some((_, ch)) = chars.next() {
if ch == '@' {
let rest: String = chars.clone().map(|(_, c)| c).collect();
if let Some(after) = rest.strip_prefix("_user_") {
let skip =
"_user_".len() + after.chars().take_while(|c| c.is_ascii_digit()).count();
for _ in 0..=skip {
chars.next();
}
if chars.peek().map(|(_, c)| *c == ' ').unwrap_or(false) {
chars.next();
}
continue;
}
}
result.push(ch);
}
result
}
/// In group chats, only respond when the bot is explicitly @-mentioned.
fn should_respond_in_group(mentions: &[serde_json::Value]) -> bool {
!mentions.is_empty()
}
#[cfg(test)]
mod tests {
use super::*;
fn make_channel() -> LarkChannel {
LarkChannel::new(
"cli_test_app_id".into(),
"test_app_secret".into(),
"test_verification_token".into(),
None,
vec!["ou_testuser123".into()],
)
}
#[test]
fn lark_channel_name() {
let ch = make_channel();
assert_eq!(ch.name(), "lark");
}
#[test]
fn lark_user_allowed_exact() {
let ch = make_channel();
assert!(ch.is_user_allowed("ou_testuser123"));
assert!(!ch.is_user_allowed("ou_other"));
}
#[test]
fn lark_user_allowed_wildcard() {
let ch = LarkChannel::new(
"id".into(),
"secret".into(),
"token".into(),
None,
vec!["*".into()],
);
assert!(ch.is_user_allowed("ou_anyone"));
}
#[test]
fn lark_user_denied_empty() {
let ch = LarkChannel::new("id".into(), "secret".into(), "token".into(), None, vec![]);
assert!(!ch.is_user_allowed("ou_anyone"));
}
#[test]
fn lark_parse_challenge() {
let ch = make_channel();
let payload = serde_json::json!({
"challenge": "abc123",
"token": "test_verification_token",
"type": "url_verification"
});
// Challenge payloads should not produce messages
let msgs = ch.parse_event_payload(&payload);
assert!(msgs.is_empty());
}
#[test]
fn lark_parse_valid_text_message() {
let ch = make_channel();
let payload = serde_json::json!({
"header": {
"event_type": "im.message.receive_v1"
},
"event": {
"sender": {
"sender_id": {
"open_id": "ou_testuser123"
}
},
"message": {
"message_type": "text",
"content": "{\"text\":\"Hello ZeroClaw!\"}",
"chat_id": "oc_chat123",
"create_time": "1699999999000"
}
}
});
let msgs = ch.parse_event_payload(&payload);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].content, "Hello ZeroClaw!");
assert_eq!(msgs[0].sender, "oc_chat123");
assert_eq!(msgs[0].channel, "lark");
assert_eq!(msgs[0].timestamp, 1_699_999_999);
}
#[test]
fn lark_parse_unauthorized_user() {
let ch = make_channel();
let payload = serde_json::json!({
"header": { "event_type": "im.message.receive_v1" },
"event": {
"sender": { "sender_id": { "open_id": "ou_unauthorized" } },
"message": {
"message_type": "text",
"content": "{\"text\":\"spam\"}",
"chat_id": "oc_chat",
"create_time": "1000"
}
}
});
let msgs = ch.parse_event_payload(&payload);
assert!(msgs.is_empty());
}
#[test]
fn lark_parse_non_text_message_skipped() {
let ch = LarkChannel::new(
"id".into(),
"secret".into(),
"token".into(),
None,
vec!["*".into()],
);
let payload = serde_json::json!({
"header": { "event_type": "im.message.receive_v1" },
"event": {
"sender": { "sender_id": { "open_id": "ou_user" } },
"message": {
"message_type": "image",
"content": "{}",
"chat_id": "oc_chat"
}
}
});
let msgs = ch.parse_event_payload(&payload);
assert!(msgs.is_empty());
}
#[test]
fn lark_parse_empty_text_skipped() {
let ch = LarkChannel::new(
"id".into(),
"secret".into(),
"token".into(),
None,
vec!["*".into()],
);
let payload = serde_json::json!({
"header": { "event_type": "im.message.receive_v1" },
"event": {
"sender": { "sender_id": { "open_id": "ou_user" } },
"message": {
"message_type": "text",
"content": "{\"text\":\"\"}",
"chat_id": "oc_chat"
}
}
});
let msgs = ch.parse_event_payload(&payload);
assert!(msgs.is_empty());
}
#[test]
fn lark_parse_wrong_event_type() {
let ch = make_channel();
let payload = serde_json::json!({
"header": { "event_type": "im.chat.disbanded_v1" },
"event": {}
});
let msgs = ch.parse_event_payload(&payload);
assert!(msgs.is_empty());
}
#[test]
fn lark_parse_missing_sender() {
let ch = LarkChannel::new(
"id".into(),
"secret".into(),
"token".into(),
None,
vec!["*".into()],
);
let payload = serde_json::json!({
"header": { "event_type": "im.message.receive_v1" },
"event": {
"message": {
"message_type": "text",
"content": "{\"text\":\"hello\"}",
"chat_id": "oc_chat"
}
}
});
let msgs = ch.parse_event_payload(&payload);
assert!(msgs.is_empty());
}
#[test]
fn lark_parse_unicode_message() {
let ch = LarkChannel::new(
"id".into(),
"secret".into(),
"token".into(),
None,
vec!["*".into()],
);
let payload = serde_json::json!({
"header": { "event_type": "im.message.receive_v1" },
"event": {
"sender": { "sender_id": { "open_id": "ou_user" } },
"message": {
"message_type": "text",
"content": "{\"text\":\"你好世界 🌍\"}",
"chat_id": "oc_chat",
"create_time": "1000"
}
}
});
let msgs = ch.parse_event_payload(&payload);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].content, "你好世界 🌍");
}
#[test]
fn lark_parse_missing_event() {
let ch = make_channel();
let payload = serde_json::json!({
"header": { "event_type": "im.message.receive_v1" }
});
let msgs = ch.parse_event_payload(&payload);
assert!(msgs.is_empty());
}
#[test]
fn lark_parse_invalid_content_json() {
let ch = LarkChannel::new(
"id".into(),
"secret".into(),
"token".into(),
None,
vec!["*".into()],
);
let payload = serde_json::json!({
"header": { "event_type": "im.message.receive_v1" },
"event": {
"sender": { "sender_id": { "open_id": "ou_user" } },
"message": {
"message_type": "text",
"content": "not valid json",
"chat_id": "oc_chat"
}
}
});
let msgs = ch.parse_event_payload(&payload);
assert!(msgs.is_empty());
}
#[test]
fn lark_config_serde() {
use crate::config::schema::{LarkConfig, LarkReceiveMode};
let lc = LarkConfig {
app_id: "cli_app123".into(),
app_secret: "secret456".into(),
encrypt_key: None,
verification_token: Some("vtoken789".into()),
allowed_users: vec!["ou_user1".into(), "ou_user2".into()],
use_feishu: false,
receive_mode: LarkReceiveMode::default(),
port: None,
};
let json = serde_json::to_string(&lc).unwrap();
let parsed: LarkConfig = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.app_id, "cli_app123");
assert_eq!(parsed.app_secret, "secret456");
assert_eq!(parsed.verification_token.as_deref(), Some("vtoken789"));
assert_eq!(parsed.allowed_users.len(), 2);
}
#[test]
fn lark_config_toml_roundtrip() {
use crate::config::schema::{LarkConfig, LarkReceiveMode};
let lc = LarkConfig {
app_id: "app".into(),
app_secret: "secret".into(),
encrypt_key: None,
verification_token: Some("tok".into()),
allowed_users: vec!["*".into()],
use_feishu: false,
receive_mode: LarkReceiveMode::Webhook,
port: Some(9898),
};
let toml_str = toml::to_string(&lc).unwrap();
let parsed: LarkConfig = toml::from_str(&toml_str).unwrap();
assert_eq!(parsed.app_id, "app");
assert_eq!(parsed.verification_token.as_deref(), Some("tok"));
assert_eq!(parsed.allowed_users, vec!["*"]);
}
#[test]
fn lark_config_defaults_optional_fields() {
use crate::config::schema::LarkConfig;
let json = r#"{"app_id":"a","app_secret":"s"}"#;
let parsed: LarkConfig = serde_json::from_str(json).unwrap();
assert!(parsed.verification_token.is_none());
assert!(parsed.allowed_users.is_empty());
}
#[test]
fn lark_parse_fallback_sender_to_open_id() {
// When chat_id is missing, sender should fall back to open_id
let ch = LarkChannel::new(
"id".into(),
"secret".into(),
"token".into(),
None,
vec!["*".into()],
);
let payload = serde_json::json!({
"header": { "event_type": "im.message.receive_v1" },
"event": {
"sender": { "sender_id": { "open_id": "ou_user" } },
"message": {
"message_type": "text",
"content": "{\"text\":\"hello\"}",
"create_time": "1000"
}
}
});
let msgs = ch.parse_event_payload(&payload);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].sender, "ou_user");
}
}