Merge branch 'main' into pr-484-clean

This commit is contained in:
Will Sarg 2026-02-17 08:54:24 -05:00 committed by GitHub
commit ee05d62ce4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
90 changed files with 6937 additions and 1403 deletions

View file

@ -40,6 +40,7 @@ impl Channel for CliChannel {
let msg = ChannelMessage {
id: Uuid::new_v4().to_string(),
sender: "user".to_string(),
reply_target: "user".to_string(),
content: line,
channel: "cli".to_string(),
timestamp: std::time::SystemTime::now()
@ -90,12 +91,14 @@ mod tests {
let msg = ChannelMessage {
id: "test-id".into(),
sender: "user".into(),
reply_target: "user".into(),
content: "hello".into(),
channel: "cli".into(),
timestamp: 1_234_567_890,
};
assert_eq!(msg.id, "test-id");
assert_eq!(msg.sender, "user");
assert_eq!(msg.reply_target, "user");
assert_eq!(msg.content, "hello");
assert_eq!(msg.channel, "cli");
assert_eq!(msg.timestamp, 1_234_567_890);
@ -106,6 +109,7 @@ mod tests {
let msg = ChannelMessage {
id: "id".into(),
sender: "s".into(),
reply_target: "s".into(),
content: "c".into(),
channel: "ch".into(),
timestamp: 0,

View file

@ -7,7 +7,7 @@ use tokio::sync::RwLock;
use tokio_tungstenite::tungstenite::Message;
use uuid::Uuid;
/// DingTalk (钉钉) channel — connects via Stream Mode WebSocket for real-time messages.
/// DingTalk channel — connects via Stream Mode WebSocket for real-time messages.
/// Replies are sent through per-message session webhook URLs.
pub struct DingTalkChannel {
client_id: String,
@ -64,6 +64,18 @@ impl DingTalkChannel {
let gw: GatewayResponse = resp.json().await?;
Ok(gw)
}
fn resolve_reply_target(
sender_id: &str,
conversation_type: &str,
conversation_id: Option<&str>,
) -> String {
if conversation_type == "1" {
sender_id.to_string()
} else {
conversation_id.unwrap_or(sender_id).to_string()
}
}
}
#[async_trait]
@ -193,14 +205,11 @@ impl Channel for DingTalkChannel {
.unwrap_or("1");
// Private chat uses sender ID, group chat uses conversation ID
let chat_id = if conversation_type == "1" {
sender_id.to_string()
} else {
data.get("conversationId")
.and_then(|c| c.as_str())
.unwrap_or(sender_id)
.to_string()
};
let chat_id = Self::resolve_reply_target(
sender_id,
conversation_type,
data.get("conversationId").and_then(|c| c.as_str()),
);
// Store session webhook for later replies
if let Some(webhook) = data.get("sessionWebhook").and_then(|w| w.as_str()) {
@ -229,6 +238,7 @@ impl Channel for DingTalkChannel {
let channel_msg = ChannelMessage {
id: Uuid::new_v4().to_string(),
sender: sender_id.to_string(),
reply_target: chat_id,
content: content.to_string(),
channel: "dingtalk".to_string(),
timestamp: std::time::SystemTime::now()
@ -305,4 +315,22 @@ client_secret = "secret"
let config: crate::config::schema::DingTalkConfig = toml::from_str(toml_str).unwrap();
assert!(config.allowed_users.is_empty());
}
#[test]
fn test_resolve_reply_target_private_chat_uses_sender_id() {
let target = DingTalkChannel::resolve_reply_target("staff_1", "1", Some("conv_1"));
assert_eq!(target, "staff_1");
}
#[test]
fn test_resolve_reply_target_group_chat_uses_conversation_id() {
let target = DingTalkChannel::resolve_reply_target("staff_1", "2", Some("conv_1"));
assert_eq!(target, "conv_1");
}
#[test]
fn test_resolve_reply_target_group_chat_falls_back_to_sender_id() {
let target = DingTalkChannel::resolve_reply_target("staff_1", "2", None);
assert_eq!(target, "staff_1");
}
}

View file

@ -11,6 +11,7 @@ pub struct DiscordChannel {
guild_id: Option<String>,
allowed_users: Vec<String>,
listen_to_bots: bool,
mention_only: bool,
client: reqwest::Client,
typing_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
}
@ -21,12 +22,14 @@ impl DiscordChannel {
guild_id: Option<String>,
allowed_users: Vec<String>,
listen_to_bots: bool,
mention_only: bool,
) -> Self {
Self {
bot_token,
guild_id,
allowed_users,
listen_to_bots,
mention_only,
client: reqwest::Client::new(),
typing_handle: std::sync::Mutex::new(None),
}
@ -343,6 +346,22 @@ impl Channel for DiscordChannel {
continue;
}
// Skip messages that don't @-mention the bot (when mention_only is enabled)
if self.mention_only {
let mention_tag = format!("<@{bot_user_id}>");
if !content.contains(&mention_tag) {
continue;
}
}
// Strip the bot mention from content so the agent sees clean text
let clean_content = if self.mention_only {
let mention_tag = format!("<@{bot_user_id}>");
content.replace(&mention_tag, "").trim().to_string()
} else {
content.to_string()
};
let message_id = d.get("id").and_then(|i| i.as_str()).unwrap_or("");
let channel_id = d.get("channel_id").and_then(|c| c.as_str()).unwrap_or("").to_string();
@ -353,6 +372,11 @@ impl Channel for DiscordChannel {
format!("discord_{message_id}")
},
sender: author_id.to_string(),
reply_target: if channel_id.is_empty() {
author_id.to_string()
} else {
channel_id
},
content: content.to_string(),
channel: channel_id,
timestamp: std::time::SystemTime::now()
@ -423,7 +447,7 @@ mod tests {
#[test]
fn discord_channel_name() {
let ch = DiscordChannel::new("fake".into(), None, vec![], false);
let ch = DiscordChannel::new("fake".into(), None, vec![], false, false);
assert_eq!(ch.name(), "discord");
}
@ -444,21 +468,27 @@ mod tests {
#[test]
fn empty_allowlist_denies_everyone() {
let ch = DiscordChannel::new("fake".into(), None, vec![], false);
let ch = DiscordChannel::new("fake".into(), None, vec![], false, false);
assert!(!ch.is_user_allowed("12345"));
assert!(!ch.is_user_allowed("anyone"));
}
#[test]
fn wildcard_allows_everyone() {
let ch = DiscordChannel::new("fake".into(), None, vec!["*".into()], false);
let ch = DiscordChannel::new("fake".into(), None, vec!["*".into()], false, false);
assert!(ch.is_user_allowed("12345"));
assert!(ch.is_user_allowed("anyone"));
}
#[test]
fn specific_allowlist_filters() {
let ch = DiscordChannel::new("fake".into(), None, vec!["111".into(), "222".into()], false);
let ch = DiscordChannel::new(
"fake".into(),
None,
vec!["111".into(), "222".into()],
false,
false,
);
assert!(ch.is_user_allowed("111"));
assert!(ch.is_user_allowed("222"));
assert!(!ch.is_user_allowed("333"));
@ -467,7 +497,7 @@ mod tests {
#[test]
fn allowlist_is_exact_match_not_substring() {
let ch = DiscordChannel::new("fake".into(), None, vec!["111".into()], false);
let ch = DiscordChannel::new("fake".into(), None, vec!["111".into()], false, false);
assert!(!ch.is_user_allowed("1111"));
assert!(!ch.is_user_allowed("11"));
assert!(!ch.is_user_allowed("0111"));
@ -475,20 +505,26 @@ mod tests {
#[test]
fn allowlist_empty_string_user_id() {
let ch = DiscordChannel::new("fake".into(), None, vec!["111".into()], false);
let ch = DiscordChannel::new("fake".into(), None, vec!["111".into()], false, false);
assert!(!ch.is_user_allowed(""));
}
#[test]
fn allowlist_with_wildcard_and_specific() {
let ch = DiscordChannel::new("fake".into(), None, vec!["111".into(), "*".into()], false);
let ch = DiscordChannel::new(
"fake".into(),
None,
vec!["111".into(), "*".into()],
false,
false,
);
assert!(ch.is_user_allowed("111"));
assert!(ch.is_user_allowed("anyone_else"));
}
#[test]
fn allowlist_case_sensitive() {
let ch = DiscordChannel::new("fake".into(), None, vec!["ABC".into()], false);
let ch = DiscordChannel::new("fake".into(), None, vec!["ABC".into()], false, false);
assert!(ch.is_user_allowed("ABC"));
assert!(!ch.is_user_allowed("abc"));
assert!(!ch.is_user_allowed("Abc"));
@ -663,14 +699,14 @@ mod tests {
#[test]
fn typing_handle_starts_as_none() {
let ch = DiscordChannel::new("fake".into(), None, vec![], false);
let ch = DiscordChannel::new("fake".into(), None, vec![], false, false);
let guard = ch.typing_handle.lock().unwrap();
assert!(guard.is_none());
}
#[tokio::test]
async fn start_typing_sets_handle() {
let ch = DiscordChannel::new("fake".into(), None, vec![], false);
let ch = DiscordChannel::new("fake".into(), None, vec![], false, false);
let _ = ch.start_typing("123456").await;
let guard = ch.typing_handle.lock().unwrap();
assert!(guard.is_some());
@ -678,7 +714,7 @@ mod tests {
#[tokio::test]
async fn stop_typing_clears_handle() {
let ch = DiscordChannel::new("fake".into(), None, vec![], false);
let ch = DiscordChannel::new("fake".into(), None, vec![], false, false);
let _ = ch.start_typing("123456").await;
let _ = ch.stop_typing("123456").await;
let guard = ch.typing_handle.lock().unwrap();
@ -687,14 +723,14 @@ mod tests {
#[tokio::test]
async fn stop_typing_is_idempotent() {
let ch = DiscordChannel::new("fake".into(), None, vec![], false);
let ch = DiscordChannel::new("fake".into(), None, vec![], false, false);
assert!(ch.stop_typing("123456").await.is_ok());
assert!(ch.stop_typing("123456").await.is_ok());
}
#[tokio::test]
async fn start_typing_replaces_existing_task() {
let ch = DiscordChannel::new("fake".into(), None, vec![], false);
let ch = DiscordChannel::new("fake".into(), None, vec![], false, false);
let _ = ch.start_typing("111").await;
let _ = ch.start_typing("222").await;
let guard = ch.typing_handle.lock().unwrap();

View file

@ -10,6 +10,7 @@
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use lettre::message::SinglePart;
use lettre::transport::smtp::authentication::Credentials;
use lettre::{Message, SmtpTransport, Transport};
use mail_parser::{MessageParser, MimeHeaders};
@ -39,7 +40,7 @@ pub struct EmailConfig {
pub imap_folder: String,
/// SMTP server hostname
pub smtp_host: String,
/// SMTP server port (default: 587 for STARTTLS)
/// SMTP server port (default: 465 for TLS)
#[serde(default = "default_smtp_port")]
pub smtp_port: u16,
/// Use TLS for SMTP (default: true)
@ -63,7 +64,7 @@ fn default_imap_port() -> u16 {
993
}
fn default_smtp_port() -> u16 {
587
465
}
fn default_imap_folder() -> String {
"INBOX".into()
@ -389,7 +390,7 @@ impl Channel for EmailChannel {
.from(self.config.from_address.parse()?)
.to(recipient.parse()?)
.subject(subject)
.body(body.to_string())?;
.singlepart(SinglePart::plain(body.to_string()))?;
let transport = self.create_smtp_transport()?;
transport.send(&email)?;
@ -427,6 +428,7 @@ impl Channel for EmailChannel {
} // MutexGuard dropped before await
let msg = ChannelMessage {
id,
reply_target: sender.clone(),
sender,
content,
channel: "email".to_string(),
@ -464,6 +466,18 @@ impl Channel for EmailChannel {
mod tests {
use super::*;
#[test]
fn default_smtp_port_uses_tls_port() {
assert_eq!(default_smtp_port(), 465);
}
#[test]
fn email_config_default_uses_tls_smtp_defaults() {
let config = EmailConfig::default();
assert_eq!(config.smtp_port, 465);
assert!(config.smtp_tls);
}
#[test]
fn build_imap_tls_config_succeeds() {
let tls_config =
@ -504,7 +518,7 @@ mod tests {
assert_eq!(config.imap_port, 993);
assert_eq!(config.imap_folder, "INBOX");
assert_eq!(config.smtp_host, "");
assert_eq!(config.smtp_port, 587);
assert_eq!(config.smtp_port, 465);
assert!(config.smtp_tls);
assert_eq!(config.username, "");
assert_eq!(config.password, "");
@ -765,8 +779,8 @@ mod tests {
}
#[test]
fn default_smtp_port_returns_587() {
assert_eq!(default_smtp_port(), 587);
fn default_smtp_port_returns_465() {
assert_eq!(default_smtp_port(), 465);
}
#[test]
@ -822,7 +836,7 @@ mod tests {
let config: EmailConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.imap_port, 993); // default
assert_eq!(config.smtp_port, 587); // default
assert_eq!(config.smtp_port, 465); // default
assert!(config.smtp_tls); // default
assert_eq!(config.poll_interval_secs, 60); // default
}

View file

@ -172,6 +172,7 @@ end tell"#
let msg = ChannelMessage {
id: rowid.to_string(),
sender: sender.clone(),
reply_target: sender.clone(),
content: text,
channel: "imessage".to_string(),
timestamp: std::time::SystemTime::now()

View file

@ -220,32 +220,34 @@ fn split_message(message: &str, max_bytes: usize) -> Vec<String> {
chunks
}
/// Configuration for constructing an `IrcChannel`.
pub struct IrcChannelConfig {
pub server: String,
pub port: u16,
pub nickname: String,
pub username: Option<String>,
pub channels: Vec<String>,
pub allowed_users: Vec<String>,
pub server_password: Option<String>,
pub nickserv_password: Option<String>,
pub sasl_password: Option<String>,
pub verify_tls: bool,
}
impl IrcChannel {
#[allow(clippy::too_many_arguments)]
pub fn new(
server: String,
port: u16,
nickname: String,
username: Option<String>,
channels: Vec<String>,
allowed_users: Vec<String>,
server_password: Option<String>,
nickserv_password: Option<String>,
sasl_password: Option<String>,
verify_tls: bool,
) -> Self {
let username = username.unwrap_or_else(|| nickname.clone());
pub fn new(cfg: IrcChannelConfig) -> Self {
let username = cfg.username.unwrap_or_else(|| cfg.nickname.clone());
Self {
server,
port,
nickname,
server: cfg.server,
port: cfg.port,
nickname: cfg.nickname,
username,
channels,
allowed_users,
server_password,
nickserv_password,
sasl_password,
verify_tls,
channels: cfg.channels,
allowed_users: cfg.allowed_users,
server_password: cfg.server_password,
nickserv_password: cfg.nickserv_password,
sasl_password: cfg.sasl_password,
verify_tls: cfg.verify_tls,
writer: Arc::new(Mutex::new(None)),
}
}
@ -563,7 +565,8 @@ impl Channel for IrcChannel {
let seq = MSG_SEQ.fetch_add(1, Ordering::Relaxed);
let channel_msg = ChannelMessage {
id: format!("irc_{}_{seq}", chrono::Utc::now().timestamp_millis()),
sender: reply_to,
sender: sender_nick.to_string(),
reply_target: reply_to,
content,
channel: "irc".to_string(),
timestamp: std::time::SystemTime::now()
@ -807,18 +810,18 @@ mod tests {
#[test]
fn specific_user_allowed() {
let ch = IrcChannel::new(
"irc.test".into(),
6697,
"bot".into(),
None,
vec![],
vec!["alice".into(), "bob".into()],
None,
None,
None,
true,
);
let ch = IrcChannel::new(IrcChannelConfig {
server: "irc.test".into(),
port: 6697,
nickname: "bot".into(),
username: None,
channels: vec![],
allowed_users: vec!["alice".into(), "bob".into()],
server_password: None,
nickserv_password: None,
sasl_password: None,
verify_tls: true,
});
assert!(ch.is_user_allowed("alice"));
assert!(ch.is_user_allowed("bob"));
assert!(!ch.is_user_allowed("eve"));
@ -826,18 +829,18 @@ mod tests {
#[test]
fn allowlist_case_insensitive() {
let ch = IrcChannel::new(
"irc.test".into(),
6697,
"bot".into(),
None,
vec![],
vec!["Alice".into()],
None,
None,
None,
true,
);
let ch = IrcChannel::new(IrcChannelConfig {
server: "irc.test".into(),
port: 6697,
nickname: "bot".into(),
username: None,
channels: vec![],
allowed_users: vec!["Alice".into()],
server_password: None,
nickserv_password: None,
sasl_password: None,
verify_tls: true,
});
assert!(ch.is_user_allowed("alice"));
assert!(ch.is_user_allowed("ALICE"));
assert!(ch.is_user_allowed("Alice"));
@ -845,18 +848,18 @@ mod tests {
#[test]
fn empty_allowlist_denies_all() {
let ch = IrcChannel::new(
"irc.test".into(),
6697,
"bot".into(),
None,
vec![],
vec![],
None,
None,
None,
true,
);
let ch = IrcChannel::new(IrcChannelConfig {
server: "irc.test".into(),
port: 6697,
nickname: "bot".into(),
username: None,
channels: vec![],
allowed_users: vec![],
server_password: None,
nickserv_password: None,
sasl_password: None,
verify_tls: true,
});
assert!(!ch.is_user_allowed("anyone"));
}
@ -864,35 +867,35 @@ mod tests {
#[test]
fn new_defaults_username_to_nickname() {
let ch = IrcChannel::new(
"irc.test".into(),
6697,
"mybot".into(),
None,
vec![],
vec![],
None,
None,
None,
true,
);
let ch = IrcChannel::new(IrcChannelConfig {
server: "irc.test".into(),
port: 6697,
nickname: "mybot".into(),
username: None,
channels: vec![],
allowed_users: vec![],
server_password: None,
nickserv_password: None,
sasl_password: None,
verify_tls: true,
});
assert_eq!(ch.username, "mybot");
}
#[test]
fn new_uses_explicit_username() {
let ch = IrcChannel::new(
"irc.test".into(),
6697,
"mybot".into(),
Some("customuser".into()),
vec![],
vec![],
None,
None,
None,
true,
);
let ch = IrcChannel::new(IrcChannelConfig {
server: "irc.test".into(),
port: 6697,
nickname: "mybot".into(),
username: Some("customuser".into()),
channels: vec![],
allowed_users: vec![],
server_password: None,
nickserv_password: None,
sasl_password: None,
verify_tls: true,
});
assert_eq!(ch.username, "customuser");
assert_eq!(ch.nickname, "mybot");
}
@ -905,18 +908,18 @@ mod tests {
#[test]
fn new_stores_all_fields() {
let ch = IrcChannel::new(
"irc.example.com".into(),
6697,
"zcbot".into(),
Some("zeroclaw".into()),
vec!["#test".into()],
vec!["alice".into()],
Some("serverpass".into()),
Some("nspass".into()),
Some("saslpass".into()),
false,
);
let ch = IrcChannel::new(IrcChannelConfig {
server: "irc.example.com".into(),
port: 6697,
nickname: "zcbot".into(),
username: Some("zeroclaw".into()),
channels: vec!["#test".into()],
allowed_users: vec!["alice".into()],
server_password: Some("serverpass".into()),
nickserv_password: Some("nspass".into()),
sasl_password: Some("saslpass".into()),
verify_tls: false,
});
assert_eq!(ch.server, "irc.example.com");
assert_eq!(ch.port, 6697);
assert_eq!(ch.nickname, "zcbot");
@ -995,17 +998,17 @@ nickname = "bot"
// ── Helpers ─────────────────────────────────────────────
fn make_channel() -> IrcChannel {
IrcChannel::new(
"irc.example.com".into(),
6697,
"zcbot".into(),
None,
vec!["#zeroclaw".into()],
vec!["*".into()],
None,
None,
None,
true,
)
IrcChannel::new(IrcChannelConfig {
server: "irc.example.com".into(),
port: 6697,
nickname: "zcbot".into(),
username: None,
channels: vec!["#zeroclaw".into()],
allowed_users: vec!["*".into()],
server_password: None,
nickserv_password: None,
sasl_password: None,
verify_tls: true,
})
}
}

View file

@ -1,21 +1,152 @@
use super::traits::{Channel, ChannelMessage};
use async_trait::async_trait;
use futures_util::{SinkExt, StreamExt};
use prost::Message as ProstMessage;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tokio_tungstenite::tungstenite::Message as WsMsg;
use uuid::Uuid;
const FEISHU_BASE_URL: &str = "https://open.feishu.cn/open-apis";
const FEISHU_WS_BASE_URL: &str = "https://open.feishu.cn";
const LARK_BASE_URL: &str = "https://open.larksuite.com/open-apis";
const LARK_WS_BASE_URL: &str = "https://open.larksuite.com";
/// Lark/Feishu channel — receives events via HTTP callback, sends via Open API
// ─────────────────────────────────────────────────────────────────────────────
// Feishu WebSocket long-connection: pbbp2.proto frame codec
// ─────────────────────────────────────────────────────────────────────────────
#[derive(Clone, PartialEq, prost::Message)]
struct PbHeader {
#[prost(string, tag = "1")]
pub key: String,
#[prost(string, tag = "2")]
pub value: String,
}
/// Feishu WS frame (pbbp2.proto).
/// method=0 → CONTROL (ping/pong) method=1 → DATA (events)
#[derive(Clone, PartialEq, prost::Message)]
struct PbFrame {
#[prost(uint64, tag = "1")]
pub seq_id: u64,
#[prost(uint64, tag = "2")]
pub log_id: u64,
#[prost(int32, tag = "3")]
pub service: i32,
#[prost(int32, tag = "4")]
pub method: i32,
#[prost(message, repeated, tag = "5")]
pub headers: Vec<PbHeader>,
#[prost(bytes = "vec", optional, tag = "8")]
pub payload: Option<Vec<u8>>,
}
impl PbFrame {
fn header_value<'a>(&'a self, key: &str) -> &'a str {
self.headers
.iter()
.find(|h| h.key == key)
.map(|h| h.value.as_str())
.unwrap_or("")
}
}
/// Server-sent client config (parsed from pong payload)
#[derive(Debug, serde::Deserialize, Default, Clone)]
struct WsClientConfig {
#[serde(rename = "PingInterval")]
ping_interval: Option<u64>,
}
/// POST /callback/ws/endpoint response
#[derive(Debug, serde::Deserialize)]
struct WsEndpointResp {
code: i32,
#[serde(default)]
msg: Option<String>,
#[serde(default)]
data: Option<WsEndpoint>,
}
#[derive(Debug, serde::Deserialize)]
struct WsEndpoint {
#[serde(rename = "URL")]
url: String,
#[serde(rename = "ClientConfig")]
client_config: Option<WsClientConfig>,
}
/// LarkEvent envelope (method=1 / type=event payload)
#[derive(Debug, serde::Deserialize)]
struct LarkEvent {
header: LarkEventHeader,
event: serde_json::Value,
}
#[derive(Debug, serde::Deserialize)]
struct LarkEventHeader {
event_type: String,
#[allow(dead_code)]
event_id: String,
}
#[derive(Debug, serde::Deserialize)]
struct MsgReceivePayload {
sender: LarkSender,
message: LarkMessage,
}
#[derive(Debug, serde::Deserialize)]
struct LarkSender {
sender_id: LarkSenderId,
#[serde(default)]
sender_type: String,
}
#[derive(Debug, serde::Deserialize, Default)]
struct LarkSenderId {
open_id: Option<String>,
}
#[derive(Debug, serde::Deserialize)]
struct LarkMessage {
message_id: String,
chat_id: String,
chat_type: String,
message_type: String,
#[serde(default)]
content: String,
#[serde(default)]
mentions: Vec<serde_json::Value>,
}
/// Heartbeat timeout for WS connection — must be larger than ping_interval (default 120 s).
/// If no binary frame (pong or event) is received within this window, reconnect.
const WS_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(300);
/// Lark/Feishu channel.
///
/// Supports two receive modes (configured via `receive_mode` in config):
/// - **`websocket`** (default): persistent WSS long-connection; no public URL needed.
/// - **`webhook`**: HTTP callback server; requires a public HTTPS endpoint.
pub struct LarkChannel {
app_id: String,
app_secret: String,
verification_token: String,
port: u16,
port: Option<u16>,
allowed_users: Vec<String>,
/// When true, use Feishu (CN) endpoints; when false, use Lark (international).
use_feishu: bool,
/// How to receive events: WebSocket long-connection or HTTP webhook.
receive_mode: crate::config::schema::LarkReceiveMode,
client: reqwest::Client,
/// Cached tenant access token
tenant_token: Arc<RwLock<Option<String>>>,
/// Dedup set: WS message_ids seen in last ~30 min to prevent double-dispatch
ws_seen_ids: Arc<RwLock<HashMap<String, Instant>>>,
}
impl LarkChannel {
@ -23,7 +154,7 @@ impl LarkChannel {
app_id: String,
app_secret: String,
verification_token: String,
port: u16,
port: Option<u16>,
allowed_users: Vec<String>,
) -> Self {
Self {
@ -32,11 +163,310 @@ impl LarkChannel {
verification_token,
port,
allowed_users,
use_feishu: true,
receive_mode: crate::config::schema::LarkReceiveMode::default(),
client: reqwest::Client::new(),
tenant_token: Arc::new(RwLock::new(None)),
ws_seen_ids: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Build from `LarkConfig` (preserves `use_feishu` and `receive_mode`).
pub fn from_config(config: &crate::config::schema::LarkConfig) -> Self {
let mut ch = Self::new(
config.app_id.clone(),
config.app_secret.clone(),
config.verification_token.clone().unwrap_or_default(),
config.port,
config.allowed_users.clone(),
);
ch.use_feishu = config.use_feishu;
ch.receive_mode = config.receive_mode.clone();
ch
}
fn api_base(&self) -> &'static str {
if self.use_feishu {
FEISHU_BASE_URL
} else {
LARK_BASE_URL
}
}
fn ws_base(&self) -> &'static str {
if self.use_feishu {
FEISHU_WS_BASE_URL
} else {
LARK_WS_BASE_URL
}
}
fn tenant_access_token_url(&self) -> String {
format!("{}/auth/v3/tenant_access_token/internal", self.api_base())
}
fn send_message_url(&self) -> String {
format!("{}/im/v1/messages?receive_id_type=chat_id", self.api_base())
}
/// POST /callback/ws/endpoint → (wss_url, client_config)
async fn get_ws_endpoint(&self) -> anyhow::Result<(String, WsClientConfig)> {
let resp = self
.client
.post(format!("{}/callback/ws/endpoint", self.ws_base()))
.header("locale", if self.use_feishu { "zh" } else { "en" })
.json(&serde_json::json!({
"AppID": self.app_id,
"AppSecret": self.app_secret,
}))
.send()
.await?
.json::<WsEndpointResp>()
.await?;
if resp.code != 0 {
anyhow::bail!(
"Lark WS endpoint failed: code={} msg={}",
resp.code,
resp.msg.as_deref().unwrap_or("(none)")
);
}
let ep = resp
.data
.ok_or_else(|| anyhow::anyhow!("Lark WS endpoint: empty data"))?;
Ok((ep.url, ep.client_config.unwrap_or_default()))
}
/// WS long-connection event loop. Returns Ok(()) when the connection closes
/// (the caller reconnects).
#[allow(clippy::too_many_lines)]
async fn listen_ws(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
let (wss_url, client_config) = self.get_ws_endpoint().await?;
let service_id = wss_url
.split('?')
.nth(1)
.and_then(|qs| {
qs.split('&')
.find(|kv| kv.starts_with("service_id="))
.and_then(|kv| kv.split('=').nth(1))
.and_then(|v| v.parse::<i32>().ok())
})
.unwrap_or(0);
tracing::info!("Lark: connecting to {wss_url}");
let (ws_stream, _) = tokio_tungstenite::connect_async(&wss_url).await?;
let (mut write, mut read) = ws_stream.split();
tracing::info!("Lark: WS connected (service_id={service_id})");
let mut ping_secs = client_config.ping_interval.unwrap_or(120).max(10);
let mut hb_interval = tokio::time::interval(Duration::from_secs(ping_secs));
let mut timeout_check = tokio::time::interval(Duration::from_secs(10));
hb_interval.tick().await; // consume immediate tick
let mut seq: u64 = 0;
let mut last_recv = Instant::now();
// Send initial ping immediately (like the official SDK) so the server
// starts responding with pongs and we can calibrate the ping_interval.
seq = seq.wrapping_add(1);
let initial_ping = PbFrame {
seq_id: seq,
log_id: 0,
service: service_id,
method: 0,
headers: vec![PbHeader {
key: "type".into(),
value: "ping".into(),
}],
payload: None,
};
if write
.send(WsMsg::Binary(initial_ping.encode_to_vec()))
.await
.is_err()
{
anyhow::bail!("Lark: initial ping failed");
}
// message_id → (fragment_slots, created_at) for multi-part reassembly
type FragEntry = (Vec<Option<Vec<u8>>>, Instant);
let mut frag_cache: HashMap<String, FragEntry> = HashMap::new();
loop {
tokio::select! {
biased;
_ = hb_interval.tick() => {
seq = seq.wrapping_add(1);
let ping = PbFrame {
seq_id: seq, log_id: 0, service: service_id, method: 0,
headers: vec![PbHeader { key: "type".into(), value: "ping".into() }],
payload: None,
};
if write.send(WsMsg::Binary(ping.encode_to_vec())).await.is_err() {
tracing::warn!("Lark: ping failed, reconnecting");
break;
}
// GC stale fragments > 5 min
let cutoff = Instant::now().checked_sub(Duration::from_secs(300)).unwrap_or(Instant::now());
frag_cache.retain(|_, (_, ts)| *ts > cutoff);
}
_ = timeout_check.tick() => {
if last_recv.elapsed() > WS_HEARTBEAT_TIMEOUT {
tracing::warn!("Lark: heartbeat timeout, reconnecting");
break;
}
}
msg = read.next() => {
let raw = match msg {
Some(Ok(WsMsg::Binary(b))) => { last_recv = Instant::now(); b }
Some(Ok(WsMsg::Ping(d))) => { let _ = write.send(WsMsg::Pong(d)).await; continue; }
Some(Ok(WsMsg::Close(_))) | None => { tracing::info!("Lark: WS closed — reconnecting"); break; }
Some(Err(e)) => { tracing::error!("Lark: WS read error: {e}"); break; }
_ => continue,
};
let frame = match PbFrame::decode(&raw[..]) {
Ok(f) => f,
Err(e) => { tracing::error!("Lark: proto decode: {e}"); continue; }
};
// CONTROL frame
if frame.method == 0 {
if frame.header_value("type") == "pong" {
if let Some(p) = &frame.payload {
if let Ok(cfg) = serde_json::from_slice::<WsClientConfig>(p) {
if let Some(secs) = cfg.ping_interval {
let secs = secs.max(10);
if secs != ping_secs {
ping_secs = secs;
hb_interval = tokio::time::interval(Duration::from_secs(ping_secs));
tracing::info!("Lark: ping_interval → {ping_secs}s");
}
}
}
}
}
continue;
}
// DATA frame
let msg_type = frame.header_value("type").to_string();
let msg_id = frame.header_value("message_id").to_string();
let sum = frame.header_value("sum").parse::<usize>().unwrap_or(1);
let seq_num = frame.header_value("seq").parse::<usize>().unwrap_or(0);
// ACK immediately (Feishu requires within 3 s)
{
let mut ack = frame.clone();
ack.payload = Some(br#"{"code":200,"headers":{},"data":[]}"#.to_vec());
ack.headers.push(PbHeader { key: "biz_rt".into(), value: "0".into() });
let _ = write.send(WsMsg::Binary(ack.encode_to_vec())).await;
}
// Fragment reassembly
let sum = if sum == 0 { 1 } else { sum };
let payload: Vec<u8> = if sum == 1 || msg_id.is_empty() || seq_num >= sum {
frame.payload.clone().unwrap_or_default()
} else {
let entry = frag_cache.entry(msg_id.clone())
.or_insert_with(|| (vec![None; sum], Instant::now()));
if entry.0.len() != sum { *entry = (vec![None; sum], Instant::now()); }
entry.0[seq_num] = frame.payload.clone();
if entry.0.iter().all(|s| s.is_some()) {
let full: Vec<u8> = entry.0.iter()
.flat_map(|s| s.as_deref().unwrap_or(&[]))
.copied().collect();
frag_cache.remove(&msg_id);
full
} else { continue; }
};
if msg_type != "event" { continue; }
let event: LarkEvent = match serde_json::from_slice(&payload) {
Ok(e) => e,
Err(e) => { tracing::error!("Lark: event JSON: {e}"); continue; }
};
if event.header.event_type != "im.message.receive_v1" { continue; }
let recv: MsgReceivePayload = match serde_json::from_value(event.event) {
Ok(r) => r,
Err(e) => { tracing::error!("Lark: payload parse: {e}"); continue; }
};
if recv.sender.sender_type == "app" || recv.sender.sender_type == "bot" { continue; }
let sender_open_id = recv.sender.sender_id.open_id.as_deref().unwrap_or("");
if !self.is_user_allowed(sender_open_id) {
tracing::warn!("Lark WS: ignoring {sender_open_id} (not in allowed_users)");
continue;
}
let lark_msg = &recv.message;
// Dedup
{
let now = Instant::now();
let mut seen = self.ws_seen_ids.write().await;
// GC
seen.retain(|_, t| now.duration_since(*t) < Duration::from_secs(30 * 60));
if seen.contains_key(&lark_msg.message_id) {
tracing::debug!("Lark WS: dup {}", lark_msg.message_id);
continue;
}
seen.insert(lark_msg.message_id.clone(), now);
}
// Decode content by type (mirrors clawdbot-feishu parsing)
let text = match lark_msg.message_type.as_str() {
"text" => {
let v: serde_json::Value = match serde_json::from_str(&lark_msg.content) {
Ok(v) => v,
Err(_) => continue,
};
match v.get("text").and_then(|t| t.as_str()).filter(|s| !s.is_empty()) {
Some(t) => t.to_string(),
None => continue,
}
}
"post" => match parse_post_content(&lark_msg.content) {
Some(t) => t,
None => continue,
},
_ => { tracing::debug!("Lark WS: skipping unsupported type '{}'", lark_msg.message_type); continue; }
};
// Strip @_user_N placeholders
let text = strip_at_placeholders(&text);
let text = text.trim().to_string();
if text.is_empty() { continue; }
// Group-chat: only respond when explicitly @-mentioned
if lark_msg.chat_type == "group" && !should_respond_in_group(&lark_msg.mentions) {
continue;
}
let channel_msg = ChannelMessage {
id: Uuid::new_v4().to_string(),
sender: lark_msg.chat_id.clone(),
reply_target: lark_msg.chat_id.clone(),
content: text,
channel: "lark".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
};
tracing::debug!("Lark WS: message in {}", lark_msg.chat_id);
if tx.send(channel_msg).await.is_err() { break; }
}
}
}
Ok(())
}
/// Check if a user open_id is allowed
fn is_user_allowed(&self, open_id: &str) -> bool {
self.allowed_users.iter().any(|u| u == "*" || u == open_id)
@ -52,7 +482,7 @@ impl LarkChannel {
}
}
let url = format!("{FEISHU_BASE_URL}/auth/v3/tenant_access_token/internal");
let url = self.tenant_access_token_url();
let body = serde_json::json!({
"app_id": self.app_id,
"app_secret": self.app_secret,
@ -127,31 +557,41 @@ impl LarkChannel {
return messages;
}
// Extract message content (text only)
// Extract message content (text and post supported)
let msg_type = event
.pointer("/message/message_type")
.and_then(|t| t.as_str())
.unwrap_or("");
if msg_type != "text" {
tracing::debug!("Lark: skipping non-text message type: {msg_type}");
return messages;
}
let content_str = event
.pointer("/message/content")
.and_then(|c| c.as_str())
.unwrap_or("");
// content is a JSON string like "{\"text\":\"hello\"}"
let text = serde_json::from_str::<serde_json::Value>(content_str)
.ok()
.and_then(|v| v.get("text").and_then(|t| t.as_str()).map(String::from))
.unwrap_or_default();
if text.is_empty() {
return messages;
}
let text: String = match msg_type {
"text" => {
let extracted = serde_json::from_str::<serde_json::Value>(content_str)
.ok()
.and_then(|v| {
v.get("text")
.and_then(|t| t.as_str())
.filter(|s| !s.is_empty())
.map(String::from)
});
match extracted {
Some(t) => t,
None => return messages,
}
}
"post" => match parse_post_content(content_str) {
Some(t) => t,
None => return messages,
},
_ => {
tracing::debug!("Lark: skipping unsupported message type: {msg_type}");
return messages;
}
};
let timestamp = event
.pointer("/message/create_time")
@ -174,6 +614,7 @@ impl LarkChannel {
messages.push(ChannelMessage {
id: Uuid::new_v4().to_string(),
sender: chat_id.to_string(),
reply_target: chat_id.to_string(),
content: text,
channel: "lark".to_string(),
timestamp,
@ -191,7 +632,7 @@ impl Channel for LarkChannel {
async fn send(&self, message: &str, recipient: &str) -> anyhow::Result<()> {
let token = self.get_tenant_access_token().await?;
let url = format!("{FEISHU_BASE_URL}/im/v1/messages?receive_id_type=chat_id");
let url = self.send_message_url();
let content = serde_json::json!({ "text": message }).to_string();
let body = serde_json::json!({
@ -238,6 +679,25 @@ impl Channel for LarkChannel {
}
async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
use crate::config::schema::LarkReceiveMode;
match self.receive_mode {
LarkReceiveMode::Websocket => self.listen_ws(tx).await,
LarkReceiveMode::Webhook => self.listen_http(tx).await,
}
}
async fn health_check(&self) -> bool {
self.get_tenant_access_token().await.is_ok()
}
}
impl LarkChannel {
/// HTTP callback server (legacy — requires a public endpoint).
/// Use `listen()` (WS long-connection) for new deployments.
pub async fn listen_http(
&self,
tx: tokio::sync::mpsc::Sender<ChannelMessage>,
) -> anyhow::Result<()> {
use axum::{extract::State, routing::post, Json, Router};
#[derive(Clone)]
@ -282,13 +742,17 @@ impl Channel for LarkChannel {
(StatusCode::OK, "ok").into_response()
}
let port = self.port.ok_or_else(|| {
anyhow::anyhow!("Lark webhook mode requires `port` to be set in [channels_config.lark]")
})?;
let state = AppState {
verification_token: self.verification_token.clone(),
channel: Arc::new(LarkChannel::new(
self.app_id.clone(),
self.app_secret.clone(),
self.verification_token.clone(),
self.port,
None,
self.allowed_users.clone(),
)),
tx,
@ -298,7 +762,7 @@ impl Channel for LarkChannel {
.route("/lark", post(handle_event))
.with_state(state);
let addr = std::net::SocketAddr::from(([0, 0, 0, 0], self.port));
let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port));
tracing::info!("Lark event callback server listening on {addr}");
let listener = tokio::net::TcpListener::bind(addr).await?;
@ -306,10 +770,110 @@ impl Channel for LarkChannel {
Ok(())
}
}
async fn health_check(&self) -> bool {
self.get_tenant_access_token().await.is_ok()
// ─────────────────────────────────────────────────────────────────────────────
// WS helper functions
// ─────────────────────────────────────────────────────────────────────────────
/// Flatten a Feishu `post` rich-text message to plain text.
///
/// Returns `None` when the content cannot be parsed or yields no usable text,
/// so callers can simply `continue` rather than forwarding a meaningless
/// placeholder string to the agent.
fn parse_post_content(content: &str) -> Option<String> {
let parsed = serde_json::from_str::<serde_json::Value>(content).ok()?;
let locale = parsed
.get("zh_cn")
.or_else(|| parsed.get("en_us"))
.or_else(|| {
parsed
.as_object()
.and_then(|m| m.values().find(|v| v.is_object()))
})?;
let mut text = String::new();
if let Some(title) = locale
.get("title")
.and_then(|t| t.as_str())
.filter(|s| !s.is_empty())
{
text.push_str(title);
text.push_str("\n\n");
}
if let Some(paragraphs) = locale.get("content").and_then(|c| c.as_array()) {
for para in paragraphs {
if let Some(elements) = para.as_array() {
for el in elements {
match el.get("tag").and_then(|t| t.as_str()).unwrap_or("") {
"text" => {
if let Some(t) = el.get("text").and_then(|t| t.as_str()) {
text.push_str(t);
}
}
"a" => {
text.push_str(
el.get("text")
.and_then(|t| t.as_str())
.filter(|s| !s.is_empty())
.or_else(|| el.get("href").and_then(|h| h.as_str()))
.unwrap_or(""),
);
}
"at" => {
let n = el
.get("user_name")
.and_then(|n| n.as_str())
.or_else(|| el.get("user_id").and_then(|i| i.as_str()))
.unwrap_or("user");
text.push('@');
text.push_str(n);
}
_ => {}
}
}
text.push('\n');
}
}
}
let result = text.trim().to_string();
if result.is_empty() {
None
} else {
Some(result)
}
}
/// Remove `@_user_N` placeholder tokens injected by Feishu in group chats.
fn strip_at_placeholders(text: &str) -> String {
let mut result = String::with_capacity(text.len());
let mut chars = text.char_indices().peekable();
while let Some((_, ch)) = chars.next() {
if ch == '@' {
let rest: String = chars.clone().map(|(_, c)| c).collect();
if let Some(after) = rest.strip_prefix("_user_") {
let skip =
"_user_".len() + after.chars().take_while(|c| c.is_ascii_digit()).count();
for _ in 0..=skip {
chars.next();
}
if chars.peek().map(|(_, c)| *c == ' ').unwrap_or(false) {
chars.next();
}
continue;
}
}
result.push(ch);
}
result
}
/// In group chats, only respond when the bot is explicitly @-mentioned.
fn should_respond_in_group(mentions: &[serde_json::Value]) -> bool {
!mentions.is_empty()
}
#[cfg(test)]
@ -321,7 +885,7 @@ mod tests {
"cli_test_app_id".into(),
"test_app_secret".into(),
"test_verification_token".into(),
9898,
None,
vec!["ou_testuser123".into()],
)
}
@ -345,7 +909,7 @@ mod tests {
"id".into(),
"secret".into(),
"token".into(),
9898,
None,
vec!["*".into()],
);
assert!(ch.is_user_allowed("ou_anyone"));
@ -353,7 +917,7 @@ mod tests {
#[test]
fn lark_user_denied_empty() {
let ch = LarkChannel::new("id".into(), "secret".into(), "token".into(), 9898, vec![]);
let ch = LarkChannel::new("id".into(), "secret".into(), "token".into(), None, vec![]);
assert!(!ch.is_user_allowed("ou_anyone"));
}
@ -426,7 +990,7 @@ mod tests {
"id".into(),
"secret".into(),
"token".into(),
9898,
None,
vec!["*".into()],
);
let payload = serde_json::json!({
@ -451,7 +1015,7 @@ mod tests {
"id".into(),
"secret".into(),
"token".into(),
9898,
None,
vec!["*".into()],
);
let payload = serde_json::json!({
@ -488,7 +1052,7 @@ mod tests {
"id".into(),
"secret".into(),
"token".into(),
9898,
None,
vec!["*".into()],
);
let payload = serde_json::json!({
@ -512,7 +1076,7 @@ mod tests {
"id".into(),
"secret".into(),
"token".into(),
9898,
None,
vec!["*".into()],
);
let payload = serde_json::json!({
@ -550,7 +1114,7 @@ mod tests {
"id".into(),
"secret".into(),
"token".into(),
9898,
None,
vec!["*".into()],
);
let payload = serde_json::json!({
@ -571,7 +1135,7 @@ mod tests {
#[test]
fn lark_config_serde() {
use crate::config::schema::LarkConfig;
use crate::config::schema::{LarkConfig, LarkReceiveMode};
let lc = LarkConfig {
app_id: "cli_app123".into(),
app_secret: "secret456".into(),
@ -579,6 +1143,8 @@ mod tests {
verification_token: Some("vtoken789".into()),
allowed_users: vec!["ou_user1".into(), "ou_user2".into()],
use_feishu: false,
receive_mode: LarkReceiveMode::default(),
port: None,
};
let json = serde_json::to_string(&lc).unwrap();
let parsed: LarkConfig = serde_json::from_str(&json).unwrap();
@ -590,7 +1156,7 @@ mod tests {
#[test]
fn lark_config_toml_roundtrip() {
use crate::config::schema::LarkConfig;
use crate::config::schema::{LarkConfig, LarkReceiveMode};
let lc = LarkConfig {
app_id: "app".into(),
app_secret: "secret".into(),
@ -598,6 +1164,8 @@ mod tests {
verification_token: Some("tok".into()),
allowed_users: vec!["*".into()],
use_feishu: false,
receive_mode: LarkReceiveMode::Webhook,
port: Some(9898),
};
let toml_str = toml::to_string(&lc).unwrap();
let parsed: LarkConfig = toml::from_str(&toml_str).unwrap();
@ -608,11 +1176,36 @@ mod tests {
#[test]
fn lark_config_defaults_optional_fields() {
use crate::config::schema::LarkConfig;
use crate::config::schema::{LarkConfig, LarkReceiveMode};
let json = r#"{"app_id":"a","app_secret":"s"}"#;
let parsed: LarkConfig = serde_json::from_str(json).unwrap();
assert!(parsed.verification_token.is_none());
assert!(parsed.allowed_users.is_empty());
assert_eq!(parsed.receive_mode, LarkReceiveMode::Websocket);
assert!(parsed.port.is_none());
}
#[test]
fn lark_from_config_preserves_mode_and_region() {
use crate::config::schema::{LarkConfig, LarkReceiveMode};
let cfg = LarkConfig {
app_id: "cli_app123".into(),
app_secret: "secret456".into(),
encrypt_key: None,
verification_token: Some("vtoken789".into()),
allowed_users: vec!["*".into()],
use_feishu: false,
receive_mode: LarkReceiveMode::Webhook,
port: Some(9898),
};
let ch = LarkChannel::from_config(&cfg);
assert_eq!(ch.api_base(), LARK_BASE_URL);
assert_eq!(ch.ws_base(), LARK_WS_BASE_URL);
assert_eq!(ch.receive_mode, LarkReceiveMode::Webhook);
assert_eq!(ch.port, Some(9898));
}
#[test]
@ -622,7 +1215,7 @@ mod tests {
"id".into(),
"secret".into(),
"token".into(),
9898,
None,
vec!["*".into()],
);
let payload = serde_json::json!({

View file

@ -230,6 +230,7 @@ impl Channel for MatrixChannel {
let msg = ChannelMessage {
id: format!("mx_{}", chrono::Utc::now().timestamp_millis()),
sender: event.sender.clone(),
reply_target: event.sender.clone(),
content: body.clone(),
channel: "matrix".to_string(),
timestamp: std::time::SystemTime::now()

View file

@ -69,10 +69,19 @@ fn conversation_memory_key(msg: &traits::ChannelMessage) -> String {
format!("{}_{}_{}", msg.channel, msg.sender, msg.id)
}
fn channel_delivery_instructions(channel_name: &str) -> Option<&'static str> {
match channel_name {
"telegram" => Some(
"When responding on Telegram, include media markers for files or URLs that should be sent as attachments. Use one marker per attachment with this exact syntax: [IMAGE:<path-or-url>], [DOCUMENT:<path-or-url>], [VIDEO:<path-or-url>], [AUDIO:<path-or-url>], or [VOICE:<path-or-url>]. Keep normal user-facing text outside markers and never wrap markers in code fences.",
),
_ => None,
}
}
async fn build_memory_context(mem: &dyn Memory, user_msg: &str) -> String {
let mut context = String::new();
if let Ok(entries) = mem.recall(user_msg, 5).await {
if let Ok(entries) = mem.recall(user_msg, 5, None).await {
if !entries.is_empty() {
context.push_str("[Memory context]\n");
for entry in &entries {
@ -158,6 +167,7 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
&autosave_key,
&msg.content,
crate::memory::MemoryCategory::Conversation,
None,
)
.await;
}
@ -171,7 +181,7 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
let target_channel = ctx.channels_by_name.get(&msg.channel).cloned();
if let Some(channel) = target_channel.as_ref() {
if let Err(e) = channel.start_typing(&msg.sender).await {
if let Err(e) = channel.start_typing(&msg.reply_target).await {
tracing::debug!("Failed to start typing on {}: {e}", channel.name());
}
}
@ -184,6 +194,10 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
ChatMessage::user(&enriched_message),
];
if let Some(instructions) = channel_delivery_instructions(&msg.channel) {
history.push(ChatMessage::system(instructions));
}
let llm_result = tokio::time::timeout(
Duration::from_secs(CHANNEL_MESSAGE_TIMEOUT_SECS),
run_tool_call_loop(
@ -200,7 +214,7 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
.await;
if let Some(channel) = target_channel.as_ref() {
if let Err(e) = channel.stop_typing(&msg.sender).await {
if let Err(e) = channel.stop_typing(&msg.reply_target).await {
tracing::debug!("Failed to stop typing on {}: {e}", channel.name());
}
}
@ -224,7 +238,9 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
started_at.elapsed().as_millis()
);
if let Some(channel) = target_channel.as_ref() {
let _ = channel.send(&format!("⚠️ Error: {e}"), &msg.sender).await;
let _ = channel
.send(&format!("⚠️ Error: {e}"), &msg.reply_target)
.await;
}
}
Err(_) => {
@ -241,7 +257,7 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
let _ = channel
.send(
"⚠️ Request timed out while waiting for the model. Please try again.",
&msg.sender,
&msg.reply_target,
)
.await;
}
@ -483,6 +499,16 @@ pub fn build_system_prompt(
std::env::consts::OS,
);
// ── 8. Channel Capabilities ─────────────────────────────────────
prompt.push_str("## Channel Capabilities\n\n");
prompt.push_str(
"- You are running as a Discord bot. You CAN and do send messages to Discord channels.\n",
);
prompt.push_str("- When someone messages you on Discord, your response is automatically sent back to Discord.\n");
prompt.push_str("- You do NOT need to ask permission to respond — just respond directly.\n");
prompt.push_str("- NEVER repeat, describe, or echo credentials, tokens, API keys, or secrets in your responses.\n");
prompt.push_str("- If a tool output contains credentials, they have already been redacted — do not mention them.\n\n");
if prompt.is_empty() {
"You are ZeroClaw, a fast and efficient AI assistant built in Rust. Be helpful, concise, and direct.".to_string()
} else {
@ -619,6 +645,7 @@ pub async fn doctor_channels(config: Config) -> Result<()> {
dc.guild_id.clone(),
dc.allowed_users.clone(),
dc.listen_to_bots,
dc.mention_only,
)),
));
}
@ -672,32 +699,23 @@ pub async fn doctor_channels(config: Config) -> Result<()> {
if let Some(ref irc) = config.channels_config.irc {
channels.push((
"IRC",
Arc::new(IrcChannel::new(
irc.server.clone(),
irc.port,
irc.nickname.clone(),
irc.username.clone(),
irc.channels.clone(),
irc.allowed_users.clone(),
irc.server_password.clone(),
irc.nickserv_password.clone(),
irc.sasl_password.clone(),
irc.verify_tls.unwrap_or(true),
)),
Arc::new(IrcChannel::new(irc::IrcChannelConfig {
server: irc.server.clone(),
port: irc.port,
nickname: irc.nickname.clone(),
username: irc.username.clone(),
channels: irc.channels.clone(),
allowed_users: irc.allowed_users.clone(),
server_password: irc.server_password.clone(),
nickserv_password: irc.nickserv_password.clone(),
sasl_password: irc.sasl_password.clone(),
verify_tls: irc.verify_tls.unwrap_or(true),
})),
));
}
if let Some(ref lk) = config.channels_config.lark {
channels.push((
"Lark",
Arc::new(LarkChannel::new(
lk.app_id.clone(),
lk.app_secret.clone(),
lk.verification_token.clone().unwrap_or_default(),
9898,
lk.allowed_users.clone(),
)),
));
channels.push(("Lark", Arc::new(LarkChannel::from_config(lk))));
}
if let Some(ref dt) = config.channels_config.dingtalk {
@ -762,6 +780,7 @@ pub async fn start_channels(config: Config) -> Result<()> {
let provider: Arc<dyn Provider> = Arc::from(providers::create_resilient_provider(
&provider_name,
config.api_key.as_deref(),
config.api_url.as_deref(),
&config.reliability,
)?);
@ -860,6 +879,10 @@ pub async fn start_channels(config: Config) -> Result<()> {
"schedule",
"Manage scheduled tasks (create/list/get/cancel/pause/resume). Supports recurring cron and one-shot delays.",
));
tool_descs.push((
"pushover",
"Send a Pushover notification to your device. Requires PUSHOVER_TOKEN and PUSHOVER_USER_KEY in .env file.",
));
if !config.agents.is_empty() {
tool_descs.push((
"delegate",
@ -909,6 +932,7 @@ pub async fn start_channels(config: Config) -> Result<()> {
dc.guild_id.clone(),
dc.allowed_users.clone(),
dc.listen_to_bots,
dc.mention_only,
)));
}
@ -947,28 +971,22 @@ pub async fn start_channels(config: Config) -> Result<()> {
}
if let Some(ref irc) = config.channels_config.irc {
channels.push(Arc::new(IrcChannel::new(
irc.server.clone(),
irc.port,
irc.nickname.clone(),
irc.username.clone(),
irc.channels.clone(),
irc.allowed_users.clone(),
irc.server_password.clone(),
irc.nickserv_password.clone(),
irc.sasl_password.clone(),
irc.verify_tls.unwrap_or(true),
)));
channels.push(Arc::new(IrcChannel::new(irc::IrcChannelConfig {
server: irc.server.clone(),
port: irc.port,
nickname: irc.nickname.clone(),
username: irc.username.clone(),
channels: irc.channels.clone(),
allowed_users: irc.allowed_users.clone(),
server_password: irc.server_password.clone(),
nickserv_password: irc.nickserv_password.clone(),
sasl_password: irc.sasl_password.clone(),
verify_tls: irc.verify_tls.unwrap_or(true),
})));
}
if let Some(ref lk) = config.channels_config.lark {
channels.push(Arc::new(LarkChannel::new(
lk.app_id.clone(),
lk.app_secret.clone(),
lk.verification_token.clone().unwrap_or_default(),
9898,
lk.allowed_users.clone(),
)));
channels.push(Arc::new(LarkChannel::from_config(lk)));
}
if let Some(ref dt) = config.channels_config.dingtalk {
@ -1242,6 +1260,7 @@ mod tests {
traits::ChannelMessage {
id: "msg-1".to_string(),
sender: "alice".to_string(),
reply_target: "chat-42".to_string(),
content: "What is the BTC price now?".to_string(),
channel: "test-channel".to_string(),
timestamp: 1,
@ -1251,6 +1270,7 @@ mod tests {
let sent_messages = channel_impl.sent_messages.lock().await;
assert_eq!(sent_messages.len(), 1);
assert!(sent_messages[0].starts_with("chat-42:"));
assert!(sent_messages[0].contains("BTC is currently around"));
assert!(!sent_messages[0].contains("\"tool_calls\""));
assert!(!sent_messages[0].contains("mock_price"));
@ -1269,6 +1289,7 @@ mod tests {
_key: &str,
_content: &str,
_category: crate::memory::MemoryCategory,
_session_id: Option<&str>,
) -> anyhow::Result<()> {
Ok(())
}
@ -1277,6 +1298,7 @@ mod tests {
&self,
_query: &str,
_limit: usize,
_session_id: Option<&str>,
) -> anyhow::Result<Vec<crate::memory::MemoryEntry>> {
Ok(Vec::new())
}
@ -1288,6 +1310,7 @@ mod tests {
async fn list(
&self,
_category: Option<&crate::memory::MemoryCategory>,
_session_id: Option<&str>,
) -> anyhow::Result<Vec<crate::memory::MemoryEntry>> {
Ok(Vec::new())
}
@ -1331,6 +1354,7 @@ mod tests {
tx.send(traits::ChannelMessage {
id: "1".to_string(),
sender: "alice".to_string(),
reply_target: "alice".to_string(),
content: "hello".to_string(),
channel: "test-channel".to_string(),
timestamp: 1,
@ -1340,6 +1364,7 @@ mod tests {
tx.send(traits::ChannelMessage {
id: "2".to_string(),
sender: "bob".to_string(),
reply_target: "bob".to_string(),
content: "world".to_string(),
channel: "test-channel".to_string(),
timestamp: 2,
@ -1570,6 +1595,25 @@ mod tests {
assert!(truncated.is_char_boundary(truncated.len()));
}
#[test]
fn prompt_contains_channel_capabilities() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None, None);
assert!(
prompt.contains("## Channel Capabilities"),
"missing Channel Capabilities section"
);
assert!(
prompt.contains("running as a Discord bot"),
"missing Discord context"
);
assert!(
prompt.contains("NEVER repeat, describe, or echo credentials"),
"missing security instruction"
);
}
#[test]
fn prompt_workspace_path() {
let ws = make_workspace();
@ -1583,6 +1627,7 @@ mod tests {
let msg = traits::ChannelMessage {
id: "msg_abc123".into(),
sender: "U123".into(),
reply_target: "C456".into(),
content: "hello".into(),
channel: "slack".into(),
timestamp: 1,
@ -1596,6 +1641,7 @@ mod tests {
let msg1 = traits::ChannelMessage {
id: "msg_1".into(),
sender: "U123".into(),
reply_target: "C456".into(),
content: "first".into(),
channel: "slack".into(),
timestamp: 1,
@ -1603,6 +1649,7 @@ mod tests {
let msg2 = traits::ChannelMessage {
id: "msg_2".into(),
sender: "U123".into(),
reply_target: "C456".into(),
content: "second".into(),
channel: "slack".into(),
timestamp: 2,
@ -1622,6 +1669,7 @@ mod tests {
let msg1 = traits::ChannelMessage {
id: "msg_1".into(),
sender: "U123".into(),
reply_target: "C456".into(),
content: "I'm Paul".into(),
channel: "slack".into(),
timestamp: 1,
@ -1629,6 +1677,7 @@ mod tests {
let msg2 = traits::ChannelMessage {
id: "msg_2".into(),
sender: "U123".into(),
reply_target: "C456".into(),
content: "I'm 45".into(),
channel: "slack".into(),
timestamp: 2,
@ -1638,6 +1687,7 @@ mod tests {
&conversation_memory_key(&msg1),
&msg1.content,
MemoryCategory::Conversation,
None,
)
.await
.unwrap();
@ -1645,13 +1695,14 @@ mod tests {
&conversation_memory_key(&msg2),
&msg2.content,
MemoryCategory::Conversation,
None,
)
.await
.unwrap();
assert_eq!(mem.count().await.unwrap(), 2);
let recalled = mem.recall("45", 5).await.unwrap();
let recalled = mem.recall("45", 5, None).await.unwrap();
assert!(recalled.iter().any(|entry| entry.content.contains("45")));
}
@ -1659,7 +1710,7 @@ mod tests {
async fn build_memory_context_includes_recalled_entries() {
let tmp = TempDir::new().unwrap();
let mem = SqliteMemory::new(tmp.path()).unwrap();
mem.store("age_fact", "Age is 45", MemoryCategory::Conversation)
mem.store("age_fact", "Age is 45", MemoryCategory::Conversation, None)
.await
.unwrap();

View file

@ -161,6 +161,7 @@ impl Channel for SlackChannel {
let channel_msg = ChannelMessage {
id: format!("slack_{channel_id}_{ts}"),
sender: user.to_string(),
reply_target: channel_id.clone(),
content: text.to_string(),
channel: "slack".to_string(),
timestamp: std::time::SystemTime::now()

View file

@ -51,6 +51,133 @@ fn split_message_for_telegram(message: &str) -> Vec<String> {
chunks
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum TelegramAttachmentKind {
Image,
Document,
Video,
Audio,
Voice,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct TelegramAttachment {
kind: TelegramAttachmentKind,
target: String,
}
impl TelegramAttachmentKind {
fn from_marker(marker: &str) -> Option<Self> {
match marker.trim().to_ascii_uppercase().as_str() {
"IMAGE" | "PHOTO" => Some(Self::Image),
"DOCUMENT" | "FILE" => Some(Self::Document),
"VIDEO" => Some(Self::Video),
"AUDIO" => Some(Self::Audio),
"VOICE" => Some(Self::Voice),
_ => None,
}
}
}
fn is_http_url(target: &str) -> bool {
target.starts_with("http://") || target.starts_with("https://")
}
fn infer_attachment_kind_from_target(target: &str) -> Option<TelegramAttachmentKind> {
let normalized = target
.split('?')
.next()
.unwrap_or(target)
.split('#')
.next()
.unwrap_or(target);
let extension = Path::new(normalized)
.extension()
.and_then(|ext| ext.to_str())?
.to_ascii_lowercase();
match extension.as_str() {
"png" | "jpg" | "jpeg" | "gif" | "webp" | "bmp" => Some(TelegramAttachmentKind::Image),
"mp4" | "mov" | "mkv" | "avi" | "webm" => Some(TelegramAttachmentKind::Video),
"mp3" | "m4a" | "wav" | "flac" => Some(TelegramAttachmentKind::Audio),
"ogg" | "oga" | "opus" => Some(TelegramAttachmentKind::Voice),
"pdf" | "txt" | "md" | "csv" | "json" | "zip" | "tar" | "gz" | "doc" | "docx" | "xls"
| "xlsx" | "ppt" | "pptx" => Some(TelegramAttachmentKind::Document),
_ => None,
}
}
fn parse_path_only_attachment(message: &str) -> Option<TelegramAttachment> {
let trimmed = message.trim();
if trimmed.is_empty() || trimmed.contains('\n') {
return None;
}
let candidate = trimmed.trim_matches(|c| matches!(c, '`' | '"' | '\''));
if candidate.chars().any(char::is_whitespace) {
return None;
}
let candidate = candidate.strip_prefix("file://").unwrap_or(candidate);
let kind = infer_attachment_kind_from_target(candidate)?;
if !is_http_url(candidate) && !Path::new(candidate).exists() {
return None;
}
Some(TelegramAttachment {
kind,
target: candidate.to_string(),
})
}
fn parse_attachment_markers(message: &str) -> (String, Vec<TelegramAttachment>) {
let mut cleaned = String::with_capacity(message.len());
let mut attachments = Vec::new();
let mut cursor = 0;
while cursor < message.len() {
let Some(open_rel) = message[cursor..].find('[') else {
cleaned.push_str(&message[cursor..]);
break;
};
let open = cursor + open_rel;
cleaned.push_str(&message[cursor..open]);
let Some(close_rel) = message[open..].find(']') else {
cleaned.push_str(&message[open..]);
break;
};
let close = open + close_rel;
let marker = &message[open + 1..close];
let parsed = marker.split_once(':').and_then(|(kind, target)| {
let kind = TelegramAttachmentKind::from_marker(kind)?;
let target = target.trim();
if target.is_empty() {
return None;
}
Some(TelegramAttachment {
kind,
target: target.to_string(),
})
});
if let Some(attachment) = parsed {
attachments.push(attachment);
} else {
cleaned.push_str(&message[open..=close]);
}
cursor = close + 1;
}
(cleaned.trim().to_string(), attachments)
}
/// Telegram channel — long-polls the Bot API for updates
pub struct TelegramChannel {
bot_token: String,
@ -82,6 +209,216 @@ impl TelegramChannel {
identities.into_iter().any(|id| self.is_user_allowed(id))
}
fn parse_update_message(&self, update: &serde_json::Value) -> Option<ChannelMessage> {
let message = update.get("message")?;
let text = message.get("text").and_then(serde_json::Value::as_str)?;
let username = message
.get("from")
.and_then(|from| from.get("username"))
.and_then(serde_json::Value::as_str)
.unwrap_or("unknown")
.to_string();
let user_id = message
.get("from")
.and_then(|from| from.get("id"))
.and_then(serde_json::Value::as_i64)
.map(|id| id.to_string());
let sender_identity = if username == "unknown" {
user_id.clone().unwrap_or_else(|| "unknown".to_string())
} else {
username.clone()
};
let mut identities = vec![username.as_str()];
if let Some(id) = user_id.as_deref() {
identities.push(id);
}
if !self.is_any_user_allowed(identities.iter().copied()) {
tracing::warn!(
"Telegram: ignoring message from unauthorized user: username={username}, user_id={}. \
Allowlist Telegram @username or numeric user ID, then run `zeroclaw onboard --channels-only`.",
user_id.as_deref().unwrap_or("unknown")
);
return None;
}
let chat_id = message
.get("chat")
.and_then(|chat| chat.get("id"))
.and_then(serde_json::Value::as_i64)
.map(|id| id.to_string())?;
let message_id = message
.get("message_id")
.and_then(serde_json::Value::as_i64)
.unwrap_or(0);
Some(ChannelMessage {
id: format!("telegram_{chat_id}_{message_id}"),
sender: sender_identity,
reply_target: chat_id,
content: text.to_string(),
channel: "telegram".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
})
}
async fn send_text_chunks(&self, message: &str, chat_id: &str) -> anyhow::Result<()> {
let chunks = split_message_for_telegram(message);
for (index, chunk) in chunks.iter().enumerate() {
let text = if chunks.len() > 1 {
if index == 0 {
format!("{chunk}\n\n(continues...)")
} else if index == chunks.len() - 1 {
format!("(continued)\n\n{chunk}")
} else {
format!("(continued)\n\n{chunk}\n\n(continues...)")
}
} else {
chunk.to_string()
};
let markdown_body = serde_json::json!({
"chat_id": chat_id,
"text": text,
"parse_mode": "Markdown"
});
let markdown_resp = self
.client
.post(self.api_url("sendMessage"))
.json(&markdown_body)
.send()
.await?;
if markdown_resp.status().is_success() {
if index < chunks.len() - 1 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
continue;
}
let markdown_status = markdown_resp.status();
let markdown_err = markdown_resp.text().await.unwrap_or_default();
tracing::warn!(
status = ?markdown_status,
"Telegram sendMessage with Markdown failed; retrying without parse_mode"
);
let plain_body = serde_json::json!({
"chat_id": chat_id,
"text": text,
});
let plain_resp = self
.client
.post(self.api_url("sendMessage"))
.json(&plain_body)
.send()
.await?;
if !plain_resp.status().is_success() {
let plain_status = plain_resp.status();
let plain_err = plain_resp.text().await.unwrap_or_default();
anyhow::bail!(
"Telegram sendMessage failed (markdown {}: {}; plain {}: {})",
markdown_status,
markdown_err,
plain_status,
plain_err
);
}
if index < chunks.len() - 1 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
Ok(())
}
async fn send_media_by_url(
&self,
method: &str,
media_field: &str,
chat_id: &str,
url: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
let mut body = serde_json::json!({
"chat_id": chat_id,
});
body[media_field] = serde_json::Value::String(url.to_string());
if let Some(cap) = caption {
body["caption"] = serde_json::Value::String(cap.to_string());
}
let resp = self
.client
.post(self.api_url(method))
.json(&body)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram {method} by URL failed: {err}");
}
tracing::info!("Telegram {method} sent to {chat_id}: {url}");
Ok(())
}
async fn send_attachment(
&self,
chat_id: &str,
attachment: &TelegramAttachment,
) -> anyhow::Result<()> {
let target = attachment.target.trim();
if is_http_url(target) {
return match attachment.kind {
TelegramAttachmentKind::Image => {
self.send_photo_by_url(chat_id, target, None).await
}
TelegramAttachmentKind::Document => {
self.send_document_by_url(chat_id, target, None).await
}
TelegramAttachmentKind::Video => {
self.send_video_by_url(chat_id, target, None).await
}
TelegramAttachmentKind::Audio => {
self.send_audio_by_url(chat_id, target, None).await
}
TelegramAttachmentKind::Voice => {
self.send_voice_by_url(chat_id, target, None).await
}
};
}
let path = Path::new(target);
if !path.exists() {
anyhow::bail!("Telegram attachment path not found: {target}");
}
match attachment.kind {
TelegramAttachmentKind::Image => self.send_photo(chat_id, path, None).await,
TelegramAttachmentKind::Document => self.send_document(chat_id, path, None).await,
TelegramAttachmentKind::Video => self.send_video(chat_id, path, None).await,
TelegramAttachmentKind::Audio => self.send_audio(chat_id, path, None).await,
TelegramAttachmentKind::Voice => self.send_voice(chat_id, path, None).await,
}
}
/// Send a document/file to a Telegram chat
pub async fn send_document(
&self,
@ -408,6 +745,39 @@ impl TelegramChannel {
tracing::info!("Telegram photo (URL) sent to {chat_id}: {url}");
Ok(())
}
/// Send a video by URL (Telegram will download it)
pub async fn send_video_by_url(
&self,
chat_id: &str,
url: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
self.send_media_by_url("sendVideo", "video", chat_id, url, caption)
.await
}
/// Send an audio file by URL (Telegram will download it)
pub async fn send_audio_by_url(
&self,
chat_id: &str,
url: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
self.send_media_by_url("sendAudio", "audio", chat_id, url, caption)
.await
}
/// Send a voice message by URL (Telegram will download it)
pub async fn send_voice_by_url(
&self,
chat_id: &str,
url: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
self.send_media_by_url("sendVoice", "voice", chat_id, url, caption)
.await
}
}
#[async_trait]
@ -417,82 +787,27 @@ impl Channel for TelegramChannel {
}
async fn send(&self, message: &str, chat_id: &str) -> anyhow::Result<()> {
// Split message if it exceeds Telegram's 4096 character limit
let chunks = split_message_for_telegram(message);
let (text_without_markers, attachments) = parse_attachment_markers(message);
for (i, chunk) in chunks.iter().enumerate() {
// Add continuation marker for multi-part messages
let text = if chunks.len() > 1 {
if i == 0 {
format!("{chunk}\n\n(continues...)")
} else if i == chunks.len() - 1 {
format!("(continued)\n\n{chunk}")
} else {
format!("(continued)\n\n{chunk}\n\n(continues...)")
}
} else {
chunk.to_string()
};
let markdown_body = serde_json::json!({
"chat_id": chat_id,
"text": text,
"parse_mode": "Markdown"
});
let markdown_resp = self
.client
.post(self.api_url("sendMessage"))
.json(&markdown_body)
.send()
.await?;
if markdown_resp.status().is_success() {
// Small delay between chunks to avoid rate limiting
if i < chunks.len() - 1 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
continue;
if !attachments.is_empty() {
if !text_without_markers.is_empty() {
self.send_text_chunks(&text_without_markers, chat_id)
.await?;
}
let markdown_status = markdown_resp.status();
let markdown_err = markdown_resp.text().await.unwrap_or_default();
tracing::warn!(
status = ?markdown_status,
"Telegram sendMessage with Markdown failed; retrying without parse_mode"
);
// Retry without parse_mode as a compatibility fallback.
let plain_body = serde_json::json!({
"chat_id": chat_id,
"text": text,
});
let plain_resp = self
.client
.post(self.api_url("sendMessage"))
.json(&plain_body)
.send()
.await?;
if !plain_resp.status().is_success() {
let plain_status = plain_resp.status();
let plain_err = plain_resp.text().await.unwrap_or_default();
anyhow::bail!(
"Telegram sendMessage failed (markdown {}: {}; plain {}: {})",
markdown_status,
markdown_err,
plain_status,
plain_err
);
for attachment in &attachments {
self.send_attachment(chat_id, attachment).await?;
}
// Small delay between chunks to avoid rate limiting
if i < chunks.len() - 1 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
return Ok(());
}
Ok(())
if let Some(attachment) = parse_path_only_attachment(message) {
self.send_attachment(chat_id, &attachment).await?;
return Ok(());
}
self.send_text_chunks(message, chat_id).await
}
async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
@ -533,59 +848,13 @@ impl Channel for TelegramChannel {
offset = uid + 1;
}
let Some(message) = update.get("message") else {
let Some(msg) = self.parse_update_message(update) else {
continue;
};
let Some(text) = message.get("text").and_then(serde_json::Value::as_str) else {
continue;
};
let username_opt = message
.get("from")
.and_then(|f| f.get("username"))
.and_then(|u| u.as_str());
let username = username_opt.unwrap_or("unknown");
let user_id = message
.get("from")
.and_then(|f| f.get("id"))
.and_then(serde_json::Value::as_i64);
let user_id_str = user_id.map(|id| id.to_string());
let mut identities = vec![username];
if let Some(ref id) = user_id_str {
identities.push(id.as_str());
}
if !self.is_any_user_allowed(identities.iter().copied()) {
tracing::warn!(
"Telegram: ignoring message from unauthorized user: username={username}, user_id={}. \
Allowlist Telegram @username or numeric user ID, then run `zeroclaw onboard --channels-only`.",
user_id_str.as_deref().unwrap_or("unknown")
);
continue;
}
let chat_id = message
.get("chat")
.and_then(|c| c.get("id"))
.and_then(serde_json::Value::as_i64)
.map(|id| id.to_string());
let Some(chat_id) = chat_id else {
tracing::warn!("Telegram: missing chat_id in message, skipping");
continue;
};
let message_id = message
.get("message_id")
.and_then(|v| v.as_i64())
.unwrap_or(0);
// Send "typing" indicator immediately when we receive a message
let typing_body = serde_json::json!({
"chat_id": &chat_id,
"chat_id": &msg.reply_target,
"action": "typing"
});
let _ = self
@ -595,17 +864,6 @@ Allowlist Telegram @username or numeric user ID, then run `zeroclaw onboard --ch
.send()
.await; // Ignore errors for typing indicator
let msg = ChannelMessage {
id: format!("telegram_{chat_id}_{message_id}"),
sender: username.to_string(),
content: text.to_string(),
channel: "telegram".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
};
if tx.send(msg).await.is_err() {
return Ok(());
}
@ -716,6 +974,107 @@ mod tests {
assert!(!ch.is_any_user_allowed(["unknown", "123456789"]));
}
#[test]
fn parse_attachment_markers_extracts_multiple_types() {
let message = "Here are files [IMAGE:/tmp/a.png] and [DOCUMENT:https://example.com/a.pdf]";
let (cleaned, attachments) = parse_attachment_markers(message);
assert_eq!(cleaned, "Here are files and");
assert_eq!(attachments.len(), 2);
assert_eq!(attachments[0].kind, TelegramAttachmentKind::Image);
assert_eq!(attachments[0].target, "/tmp/a.png");
assert_eq!(attachments[1].kind, TelegramAttachmentKind::Document);
assert_eq!(attachments[1].target, "https://example.com/a.pdf");
}
#[test]
fn parse_attachment_markers_keeps_invalid_markers_in_text() {
let message = "Report [UNKNOWN:/tmp/a.bin]";
let (cleaned, attachments) = parse_attachment_markers(message);
assert_eq!(cleaned, "Report [UNKNOWN:/tmp/a.bin]");
assert!(attachments.is_empty());
}
#[test]
fn parse_path_only_attachment_detects_existing_file() {
let dir = tempfile::tempdir().unwrap();
let image_path = dir.path().join("snap.png");
std::fs::write(&image_path, b"fake-png").unwrap();
let parsed = parse_path_only_attachment(image_path.to_string_lossy().as_ref())
.expect("expected attachment");
assert_eq!(parsed.kind, TelegramAttachmentKind::Image);
assert_eq!(parsed.target, image_path.to_string_lossy());
}
#[test]
fn parse_path_only_attachment_rejects_sentence_text() {
assert!(parse_path_only_attachment("Screenshot saved to /tmp/snap.png").is_none());
}
#[test]
fn infer_attachment_kind_from_target_detects_document_extension() {
assert_eq!(
infer_attachment_kind_from_target("https://example.com/files/specs.pdf?download=1"),
Some(TelegramAttachmentKind::Document)
);
}
#[test]
fn parse_update_message_uses_chat_id_as_reply_target() {
let ch = TelegramChannel::new("token".into(), vec!["*".into()]);
let update = serde_json::json!({
"update_id": 1,
"message": {
"message_id": 33,
"text": "hello",
"from": {
"id": 555,
"username": "alice"
},
"chat": {
"id": -100200300
}
}
});
let msg = ch
.parse_update_message(&update)
.expect("message should parse");
assert_eq!(msg.sender, "alice");
assert_eq!(msg.reply_target, "-100200300");
assert_eq!(msg.content, "hello");
assert_eq!(msg.id, "telegram_-100200300_33");
}
#[test]
fn parse_update_message_allows_numeric_id_without_username() {
let ch = TelegramChannel::new("token".into(), vec!["555".into()]);
let update = serde_json::json!({
"update_id": 2,
"message": {
"message_id": 9,
"text": "ping",
"from": {
"id": 555
},
"chat": {
"id": 12345
}
}
});
let msg = ch
.parse_update_message(&update)
.expect("numeric allowlist should pass");
assert_eq!(msg.sender, "555");
assert_eq!(msg.reply_target, "12345");
}
// ── File sending API URL tests ──────────────────────────────────
#[test]

View file

@ -5,6 +5,7 @@ use async_trait::async_trait;
pub struct ChannelMessage {
pub id: String,
pub sender: String,
pub reply_target: String,
pub content: String,
pub channel: String,
pub timestamp: u64,
@ -62,6 +63,7 @@ mod tests {
tx.send(ChannelMessage {
id: "1".into(),
sender: "tester".into(),
reply_target: "tester".into(),
content: "hello".into(),
channel: "dummy".into(),
timestamp: 123,
@ -76,6 +78,7 @@ mod tests {
let message = ChannelMessage {
id: "42".into(),
sender: "alice".into(),
reply_target: "alice".into(),
content: "ping".into(),
channel: "dummy".into(),
timestamp: 999,
@ -84,6 +87,7 @@ mod tests {
let cloned = message.clone();
assert_eq!(cloned.id, "42");
assert_eq!(cloned.sender, "alice");
assert_eq!(cloned.reply_target, "alice");
assert_eq!(cloned.content, "ping");
assert_eq!(cloned.channel, "dummy");
assert_eq!(cloned.timestamp, 999);

View file

@ -10,7 +10,7 @@ use uuid::Uuid;
/// happens in the gateway when Meta sends webhook events.
pub struct WhatsAppChannel {
access_token: String,
phone_number_id: String,
endpoint_id: String,
verify_token: String,
allowed_numbers: Vec<String>,
client: reqwest::Client,
@ -19,13 +19,13 @@ pub struct WhatsAppChannel {
impl WhatsAppChannel {
pub fn new(
access_token: String,
phone_number_id: String,
endpoint_id: String,
verify_token: String,
allowed_numbers: Vec<String>,
) -> Self {
Self {
access_token,
phone_number_id,
endpoint_id,
verify_token,
allowed_numbers,
client: reqwest::Client::new(),
@ -119,6 +119,7 @@ impl WhatsAppChannel {
messages.push(ChannelMessage {
id: Uuid::new_v4().to_string(),
reply_target: normalized_from.clone(),
sender: normalized_from,
content,
channel: "whatsapp".to_string(),
@ -142,7 +143,7 @@ impl Channel for WhatsAppChannel {
// WhatsApp Cloud API: POST to /v18.0/{phone_number_id}/messages
let url = format!(
"https://graph.facebook.com/v18.0/{}/messages",
self.phone_number_id
self.endpoint_id
);
// Normalize recipient (remove leading + if present for API)
@ -162,7 +163,7 @@ impl Channel for WhatsAppChannel {
let resp = self
.client
.post(&url)
.header("Authorization", format!("Bearer {}", self.access_token))
.bearer_auth(&self.access_token)
.header("Content-Type", "application/json")
.json(&body)
.send()
@ -195,11 +196,11 @@ impl Channel for WhatsAppChannel {
async fn health_check(&self) -> bool {
// Check if we can reach the WhatsApp API
let url = format!("https://graph.facebook.com/v18.0/{}", self.phone_number_id);
let url = format!("https://graph.facebook.com/v18.0/{}", self.endpoint_id);
self.client
.get(&url)
.header("Authorization", format!("Bearer {}", self.access_token))
.bearer_auth(&self.access_token)
.send()
.await
.map(|r| r.status().is_success())