zeroclaw/src/channels/telegram.rs
Xiangjun Ma f1db63219c refactor(telegram): address code review findings
- Add strip_tool_call_tags() to finalize_draft to prevent Markdown
  parse failures from tool-call tags reaching Telegram API
- Deduplicate parse_reply_target() call in update_draft (was called
  twice, discarding thread_id both times)
- Replace body.as_object_mut().unwrap() mutation with separate
  plain_body JSON literal (eliminates unwrap in runtime path)
- Clean up per-chat rate-limit HashMap entry in finalize_draft to
  prevent unbounded growth over long uptimes
- Extract magic number 80 to STREAM_CHUNK_MIN_CHARS constant in
  agent loop
2026-02-18 16:33:33 +08:00

2424 lines
79 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use super::traits::{Channel, ChannelMessage, SendMessage};
use crate::config::{Config, StreamMode};
use crate::security::pairing::PairingGuard;
use anyhow::Context;
use async_trait::async_trait;
use directories::UserDirs;
use parking_lot::Mutex;
use reqwest::multipart::{Form, Part};
use std::fs;
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::time::Duration;
/// Telegram's maximum message length for text messages
const TELEGRAM_MAX_MESSAGE_LENGTH: usize = 4096;
const TELEGRAM_BIND_COMMAND: &str = "/bind";
/// Split a message into chunks that respect Telegram's 4096 character limit.
/// Tries to split at word boundaries when possible, and handles continuation.
fn split_message_for_telegram(message: &str) -> Vec<String> {
if message.len() <= TELEGRAM_MAX_MESSAGE_LENGTH {
return vec![message.to_string()];
}
let mut chunks = Vec::new();
let mut remaining = message;
while !remaining.is_empty() {
let chunk_end = if remaining.len() <= TELEGRAM_MAX_MESSAGE_LENGTH {
remaining.len()
} else {
// Try to find a good break point (newline, then space)
let search_area = &remaining[..TELEGRAM_MAX_MESSAGE_LENGTH];
// Prefer splitting at newline
if let Some(pos) = search_area.rfind('\n') {
// Don't split if the newline is too close to the start
if pos >= TELEGRAM_MAX_MESSAGE_LENGTH / 2 {
pos + 1
} else {
// Try space as fallback
search_area
.rfind(' ')
.unwrap_or(TELEGRAM_MAX_MESSAGE_LENGTH)
+ 1
}
} else if let Some(pos) = search_area.rfind(' ') {
pos + 1
} else {
// Hard split at the limit
TELEGRAM_MAX_MESSAGE_LENGTH
}
};
chunks.push(remaining[..chunk_end].to_string());
remaining = &remaining[chunk_end..];
}
chunks
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum TelegramAttachmentKind {
Image,
Document,
Video,
Audio,
Voice,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct TelegramAttachment {
kind: TelegramAttachmentKind,
target: String,
}
impl TelegramAttachmentKind {
fn from_marker(marker: &str) -> Option<Self> {
match marker.trim().to_ascii_uppercase().as_str() {
"IMAGE" | "PHOTO" => Some(Self::Image),
"DOCUMENT" | "FILE" => Some(Self::Document),
"VIDEO" => Some(Self::Video),
"AUDIO" => Some(Self::Audio),
"VOICE" => Some(Self::Voice),
_ => None,
}
}
}
fn is_http_url(target: &str) -> bool {
target.starts_with("http://") || target.starts_with("https://")
}
fn infer_attachment_kind_from_target(target: &str) -> Option<TelegramAttachmentKind> {
let normalized = target
.split('?')
.next()
.unwrap_or(target)
.split('#')
.next()
.unwrap_or(target);
let extension = Path::new(normalized)
.extension()
.and_then(|ext| ext.to_str())?
.to_ascii_lowercase();
match extension.as_str() {
"png" | "jpg" | "jpeg" | "gif" | "webp" | "bmp" => Some(TelegramAttachmentKind::Image),
"mp4" | "mov" | "mkv" | "avi" | "webm" => Some(TelegramAttachmentKind::Video),
"mp3" | "m4a" | "wav" | "flac" => Some(TelegramAttachmentKind::Audio),
"ogg" | "oga" | "opus" => Some(TelegramAttachmentKind::Voice),
"pdf" | "txt" | "md" | "csv" | "json" | "zip" | "tar" | "gz" | "doc" | "docx" | "xls"
| "xlsx" | "ppt" | "pptx" => Some(TelegramAttachmentKind::Document),
_ => None,
}
}
fn parse_path_only_attachment(message: &str) -> Option<TelegramAttachment> {
let trimmed = message.trim();
if trimmed.is_empty() || trimmed.contains('\n') {
return None;
}
let candidate = trimmed.trim_matches(|c| matches!(c, '`' | '"' | '\''));
if candidate.chars().any(char::is_whitespace) {
return None;
}
let candidate = candidate.strip_prefix("file://").unwrap_or(candidate);
let kind = infer_attachment_kind_from_target(candidate)?;
if !is_http_url(candidate) && !Path::new(candidate).exists() {
return None;
}
Some(TelegramAttachment {
kind,
target: candidate.to_string(),
})
}
/// Strip tool_call XML-style tags from message text.
/// These tags are used internally but must not be sent to Telegram as raw markup,
/// since Telegram's Markdown parser will reject them (causing status 400 errors).
fn strip_tool_call_tags(message: &str) -> String {
let mut result = message.to_string();
// Strip <tool>...</tool>
while let Some(start) = result.find("<tool>") {
if let Some(end) = result[start..].find("</tool>") {
let end = start + end + "</tool>".len();
result = format!("{}{}", &result[..start], &result[end..]);
} else {
break;
}
}
// Strip <toolcall>...</toolcall>
while let Some(start) = result.find("<toolcall>") {
if let Some(end) = result[start..].find("</toolcall>") {
let end = start + end + "</toolcall>".len();
result = format!("{}{}", &result[..start], &result[end..]);
} else {
break;
}
}
// Strip <tool-call>...</tool-call>
while let Some(start) = result.find("<tool-call>") {
if let Some(end) = result[start..].find("</tool-call>") {
let end = start + end + "</tool-call>".len();
result = format!("{}{}", &result[..start], &result[end..]);
} else {
break;
}
}
// Clean up any resulting blank lines (but preserve paragraphs)
while result.contains("\n\n\n") {
result = result.replace("\n\n\n", "\n\n");
}
result.trim().to_string()
}
fn parse_attachment_markers(message: &str) -> (String, Vec<TelegramAttachment>) {
let mut cleaned = String::with_capacity(message.len());
let mut attachments = Vec::new();
let mut cursor = 0;
while cursor < message.len() {
let Some(open_rel) = message[cursor..].find('[') else {
cleaned.push_str(&message[cursor..]);
break;
};
let open = cursor + open_rel;
cleaned.push_str(&message[cursor..open]);
let Some(close_rel) = message[open..].find(']') else {
cleaned.push_str(&message[open..]);
break;
};
let close = open + close_rel;
let marker = &message[open + 1..close];
let parsed = marker.split_once(':').and_then(|(kind, target)| {
let kind = TelegramAttachmentKind::from_marker(kind)?;
let target = target.trim();
if target.is_empty() {
return None;
}
Some(TelegramAttachment {
kind,
target: target.to_string(),
})
});
if let Some(attachment) = parsed {
attachments.push(attachment);
} else {
cleaned.push_str(&message[open..=close]);
}
cursor = close + 1;
}
(cleaned.trim().to_string(), attachments)
}
/// Telegram channel — long-polls the Bot API for updates
pub struct TelegramChannel {
bot_token: String,
allowed_users: Arc<RwLock<Vec<String>>>,
pairing: Option<PairingGuard>,
client: reqwest::Client,
typing_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
stream_mode: StreamMode,
draft_update_interval_ms: u64,
last_draft_edit: Mutex<std::collections::HashMap<String, std::time::Instant>>,
}
impl TelegramChannel {
pub fn new(bot_token: String, allowed_users: Vec<String>) -> Self {
let normalized_allowed = Self::normalize_allowed_users(allowed_users);
let pairing = if normalized_allowed.is_empty() {
let guard = PairingGuard::new(true, &[]);
if let Some(code) = guard.pairing_code() {
println!(" 🔐 Telegram pairing required. One-time bind code: {code}");
println!(" Send `{TELEGRAM_BIND_COMMAND} <code>` from your Telegram account.");
}
Some(guard)
} else {
None
};
Self {
bot_token,
allowed_users: Arc::new(RwLock::new(normalized_allowed)),
pairing,
client: reqwest::Client::new(),
stream_mode: StreamMode::Off,
draft_update_interval_ms: 1000,
last_draft_edit: Mutex::new(std::collections::HashMap::new()),
typing_handle: Mutex::new(None),
}
}
/// Configure streaming mode for progressive draft updates.
pub fn with_streaming(
mut self,
stream_mode: StreamMode,
draft_update_interval_ms: u64,
) -> Self {
self.stream_mode = stream_mode;
self.draft_update_interval_ms = draft_update_interval_ms;
self
}
/// Parse reply_target into (chat_id, optional thread_id).
fn parse_reply_target(reply_target: &str) -> (String, Option<String>) {
if let Some((chat_id, thread_id)) = reply_target.split_once(':') {
(chat_id.to_string(), Some(thread_id.to_string()))
} else {
(reply_target.to_string(), None)
}
}
fn normalize_identity(value: &str) -> String {
value.trim().trim_start_matches('@').to_string()
}
fn normalize_allowed_users(allowed_users: Vec<String>) -> Vec<String> {
allowed_users
.into_iter()
.map(|entry| Self::normalize_identity(&entry))
.filter(|entry| !entry.is_empty())
.collect()
}
fn load_config_without_env() -> anyhow::Result<Config> {
let home = UserDirs::new()
.map(|u| u.home_dir().to_path_buf())
.context("Could not find home directory")?;
let zeroclaw_dir = home.join(".zeroclaw");
let config_path = zeroclaw_dir.join("config.toml");
let contents = fs::read_to_string(&config_path)
.with_context(|| format!("Failed to read config file: {}", config_path.display()))?;
let mut config: Config = toml::from_str(&contents)
.context("Failed to parse config file for Telegram binding")?;
config.config_path = config_path;
config.workspace_dir = zeroclaw_dir.join("workspace");
Ok(config)
}
fn persist_allowed_identity_blocking(identity: &str) -> anyhow::Result<()> {
let mut config = Self::load_config_without_env()?;
let Some(telegram) = config.channels_config.telegram.as_mut() else {
anyhow::bail!("Telegram channel config is missing in config.toml");
};
let normalized = Self::normalize_identity(identity);
if normalized.is_empty() {
anyhow::bail!("Cannot persist empty Telegram identity");
}
if !telegram.allowed_users.iter().any(|u| u == &normalized) {
telegram.allowed_users.push(normalized);
config
.save()
.context("Failed to persist Telegram allowlist to config.toml")?;
}
Ok(())
}
async fn persist_allowed_identity(&self, identity: &str) -> anyhow::Result<()> {
let identity = identity.to_string();
tokio::task::spawn_blocking(move || Self::persist_allowed_identity_blocking(&identity))
.await
.map_err(|e| anyhow::anyhow!("Failed to join Telegram bind save task: {e}"))??;
Ok(())
}
fn add_allowed_identity_runtime(&self, identity: &str) {
let normalized = Self::normalize_identity(identity);
if normalized.is_empty() {
return;
}
if let Ok(mut users) = self.allowed_users.write() {
if !users.iter().any(|u| u == &normalized) {
users.push(normalized);
}
}
}
fn extract_bind_code(text: &str) -> Option<&str> {
let mut parts = text.split_whitespace();
let command = parts.next()?;
let base_command = command.split('@').next().unwrap_or(command);
if base_command != TELEGRAM_BIND_COMMAND {
return None;
}
parts.next().map(str::trim).filter(|code| !code.is_empty())
}
fn pairing_code_active(&self) -> bool {
self.pairing
.as_ref()
.and_then(PairingGuard::pairing_code)
.is_some()
}
fn api_url(&self, method: &str) -> String {
format!("https://api.telegram.org/bot{}/{method}", self.bot_token)
}
fn is_user_allowed(&self, username: &str) -> bool {
let identity = Self::normalize_identity(username);
self.allowed_users
.read()
.map(|users| users.iter().any(|u| u == "*" || u == &identity))
.unwrap_or(false)
}
fn is_any_user_allowed<'a, I>(&self, identities: I) -> bool
where
I: IntoIterator<Item = &'a str>,
{
identities.into_iter().any(|id| self.is_user_allowed(id))
}
async fn handle_unauthorized_message(&self, update: &serde_json::Value) {
let Some(message) = update.get("message") else {
return;
};
let Some(text) = message.get("text").and_then(serde_json::Value::as_str) else {
return;
};
let username_opt = message
.get("from")
.and_then(|from| from.get("username"))
.and_then(serde_json::Value::as_str);
let username = username_opt.unwrap_or("unknown");
let normalized_username = Self::normalize_identity(username);
let user_id = message
.get("from")
.and_then(|from| from.get("id"))
.and_then(serde_json::Value::as_i64);
let user_id_str = user_id.map(|id| id.to_string());
let normalized_user_id = user_id_str.as_deref().map(Self::normalize_identity);
let chat_id = message
.get("chat")
.and_then(|chat| chat.get("id"))
.and_then(serde_json::Value::as_i64)
.map(|id| id.to_string());
let Some(chat_id) = chat_id else {
tracing::warn!("Telegram: missing chat_id in message, skipping");
return;
};
let mut identities = vec![normalized_username.as_str()];
if let Some(ref id) = normalized_user_id {
identities.push(id.as_str());
}
if self.is_any_user_allowed(identities.iter().copied()) {
return;
}
if let Some(code) = Self::extract_bind_code(text) {
if let Some(pairing) = self.pairing.as_ref() {
match pairing.try_pair(code) {
Ok(Some(_token)) => {
let bind_identity = normalized_user_id.clone().or_else(|| {
if normalized_username.is_empty() || normalized_username == "unknown" {
None
} else {
Some(normalized_username.clone())
}
});
if let Some(identity) = bind_identity {
self.add_allowed_identity_runtime(&identity);
match self.persist_allowed_identity(&identity).await {
Ok(()) => {
let _ = self
.send(&SendMessage::new(
"✅ Telegram account bound successfully. You can talk to ZeroClaw now.",
&chat_id,
))
.await;
tracing::info!(
"Telegram: paired and allowlisted identity={identity}"
);
}
Err(e) => {
tracing::error!(
"Telegram: failed to persist allowlist after bind: {e}"
);
let _ = self
.send(&SendMessage::new(
"⚠️ Bound for this runtime, but failed to persist config. Access may be lost after restart; check config file permissions.",
&chat_id,
))
.await;
}
}
} else {
let _ = self
.send(&SendMessage::new(
"❌ Could not identify your Telegram account. Ensure your account has a username or stable user ID, then retry.",
&chat_id,
))
.await;
}
}
Ok(None) => {
let _ = self
.send(&SendMessage::new(
"❌ Invalid binding code. Ask operator for the latest code and retry.",
&chat_id,
))
.await;
}
Err(lockout_secs) => {
let _ = self
.send(&SendMessage::new(
format!("⏳ Too many invalid attempts. Retry in {lockout_secs}s."),
&chat_id,
))
.await;
}
}
} else {
let _ = self
.send(&SendMessage::new(
" Telegram pairing is not active. Ask operator to update allowlist in config.toml.",
&chat_id,
))
.await;
}
return;
}
tracing::warn!(
"Telegram: ignoring message from unauthorized user: username={username}, user_id={}. \
Allowlist Telegram username (without '@') or numeric user ID.",
user_id_str.as_deref().unwrap_or("unknown")
);
let suggested_identity = normalized_user_id
.clone()
.or_else(|| {
if normalized_username.is_empty() || normalized_username == "unknown" {
None
} else {
Some(normalized_username.clone())
}
})
.unwrap_or_else(|| "YOUR_TELEGRAM_ID".to_string());
let _ = self
.send(&SendMessage::new(
format!(
"🔐 This bot requires operator approval.\n\nCopy this command to operator terminal:\n`zeroclaw channel bind-telegram {suggested_identity}`\n\nAfter operator runs it, send your message again."
),
&chat_id,
))
.await;
if self.pairing_code_active() {
let _ = self
.send(&SendMessage::new(
" If operator provides a one-time pairing code, you can also run `/bind <code>`.",
&chat_id,
))
.await;
}
}
fn parse_update_message(&self, update: &serde_json::Value) -> Option<ChannelMessage> {
let message = update.get("message")?;
let text = message.get("text").and_then(serde_json::Value::as_str)?;
let username = message
.get("from")
.and_then(|from| from.get("username"))
.and_then(serde_json::Value::as_str)
.unwrap_or("unknown")
.to_string();
let user_id = message
.get("from")
.and_then(|from| from.get("id"))
.and_then(serde_json::Value::as_i64)
.map(|id| id.to_string());
let sender_identity = if username == "unknown" {
user_id.clone().unwrap_or_else(|| "unknown".to_string())
} else {
username.clone()
};
let mut identities = vec![username.as_str()];
if let Some(id) = user_id.as_deref() {
identities.push(id);
}
if !self.is_any_user_allowed(identities.iter().copied()) {
return None;
}
let chat_id = message
.get("chat")
.and_then(|chat| chat.get("id"))
.and_then(serde_json::Value::as_i64)
.map(|id| id.to_string())?;
let message_id = message
.get("message_id")
.and_then(serde_json::Value::as_i64)
.unwrap_or(0);
// Extract thread/topic ID for forum support
let thread_id = message
.get("message_thread_id")
.and_then(serde_json::Value::as_i64)
.map(|id| id.to_string());
// reply_target: chat_id or chat_id:thread_id format
let reply_target = if let Some(tid) = thread_id {
format!("{}:{}", chat_id, tid)
} else {
chat_id.clone()
};
Some(ChannelMessage {
id: format!("telegram_{chat_id}_{message_id}"),
sender: sender_identity,
reply_target,
content: text.to_string(),
channel: "telegram".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
})
}
async fn send_text_chunks(
&self,
message: &str,
chat_id: &str,
thread_id: Option<&str>,
) -> anyhow::Result<()> {
let chunks = split_message_for_telegram(message);
for (index, chunk) in chunks.iter().enumerate() {
let text = if chunks.len() > 1 {
if index == 0 {
format!("{chunk}\n\n(continues...)")
} else if index == chunks.len() - 1 {
format!("(continued)\n\n{chunk}")
} else {
format!("(continued)\n\n{chunk}\n\n(continues...)")
}
} else {
chunk.to_string()
};
let mut markdown_body = serde_json::json!({
"chat_id": chat_id,
"text": text,
"parse_mode": "Markdown"
});
// Add message_thread_id for forum topic support
if let Some(tid) = thread_id {
markdown_body["message_thread_id"] = serde_json::Value::String(tid.to_string());
}
let markdown_resp = self
.client
.post(self.api_url("sendMessage"))
.json(&markdown_body)
.send()
.await?;
if markdown_resp.status().is_success() {
if index < chunks.len() - 1 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
continue;
}
let markdown_status = markdown_resp.status();
let markdown_err = markdown_resp.text().await.unwrap_or_default();
tracing::warn!(
status = ?markdown_status,
"Telegram sendMessage with Markdown failed; retrying without parse_mode"
);
let mut plain_body = serde_json::json!({
"chat_id": chat_id,
"text": text,
});
// Add message_thread_id for forum topic support
if let Some(tid) = thread_id {
plain_body["message_thread_id"] = serde_json::Value::String(tid.to_string());
}
let plain_resp = self
.client
.post(self.api_url("sendMessage"))
.json(&plain_body)
.send()
.await?;
if !plain_resp.status().is_success() {
let plain_status = plain_resp.status();
let plain_err = plain_resp.text().await.unwrap_or_default();
anyhow::bail!(
"Telegram sendMessage failed (markdown {}: {}; plain {}: {})",
markdown_status,
markdown_err,
plain_status,
plain_err
);
}
if index < chunks.len() - 1 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
Ok(())
}
async fn send_media_by_url(
&self,
method: &str,
media_field: &str,
chat_id: &str,
thread_id: Option<&str>,
url: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
let mut body = serde_json::json!({
"chat_id": chat_id,
});
body[media_field] = serde_json::Value::String(url.to_string());
if let Some(tid) = thread_id {
body["message_thread_id"] = serde_json::Value::String(tid.to_string());
}
if let Some(cap) = caption {
body["caption"] = serde_json::Value::String(cap.to_string());
}
let resp = self
.client
.post(self.api_url(method))
.json(&body)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram {method} by URL failed: {err}");
}
tracing::info!("Telegram {method} sent to {chat_id}: {url}");
Ok(())
}
async fn send_attachment(
&self,
chat_id: &str,
thread_id: Option<&str>,
attachment: &TelegramAttachment,
) -> anyhow::Result<()> {
let target = attachment.target.trim();
if is_http_url(target) {
return match attachment.kind {
TelegramAttachmentKind::Image => {
self.send_photo_by_url(chat_id, thread_id, target, None)
.await
}
TelegramAttachmentKind::Document => {
self.send_document_by_url(chat_id, thread_id, target, None)
.await
}
TelegramAttachmentKind::Video => {
self.send_video_by_url(chat_id, thread_id, target, None)
.await
}
TelegramAttachmentKind::Audio => {
self.send_audio_by_url(chat_id, thread_id, target, None)
.await
}
TelegramAttachmentKind::Voice => {
self.send_voice_by_url(chat_id, thread_id, target, None)
.await
}
};
}
let path = Path::new(target);
if !path.exists() {
anyhow::bail!("Telegram attachment path not found: {target}");
}
match attachment.kind {
TelegramAttachmentKind::Image => self.send_photo(chat_id, thread_id, path, None).await,
TelegramAttachmentKind::Document => {
self.send_document(chat_id, thread_id, path, None).await
}
TelegramAttachmentKind::Video => self.send_video(chat_id, thread_id, path, None).await,
TelegramAttachmentKind::Audio => self.send_audio(chat_id, thread_id, path, None).await,
TelegramAttachmentKind::Voice => self.send_voice(chat_id, thread_id, path, None).await,
}
}
/// Send a document/file to a Telegram chat
pub async fn send_document(
&self,
chat_id: &str,
thread_id: Option<&str>,
file_path: &Path,
caption: Option<&str>,
) -> anyhow::Result<()> {
let file_name = file_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("file");
let file_bytes = tokio::fs::read(file_path).await?;
let part = Part::bytes(file_bytes).file_name(file_name.to_string());
let mut form = Form::new()
.text("chat_id", chat_id.to_string())
.part("document", part);
if let Some(tid) = thread_id {
form = form.text("message_thread_id", tid.to_string());
}
if let Some(cap) = caption {
form = form.text("caption", cap.to_string());
}
let resp = self
.client
.post(self.api_url("sendDocument"))
.multipart(form)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram sendDocument failed: {err}");
}
tracing::info!("Telegram document sent to {chat_id}: {file_name}");
Ok(())
}
/// Send a document from bytes (in-memory) to a Telegram chat
pub async fn send_document_bytes(
&self,
chat_id: &str,
thread_id: Option<&str>,
file_bytes: Vec<u8>,
file_name: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
let part = Part::bytes(file_bytes).file_name(file_name.to_string());
let mut form = Form::new()
.text("chat_id", chat_id.to_string())
.part("document", part);
if let Some(tid) = thread_id {
form = form.text("message_thread_id", tid.to_string());
}
if let Some(cap) = caption {
form = form.text("caption", cap.to_string());
}
let resp = self
.client
.post(self.api_url("sendDocument"))
.multipart(form)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram sendDocument failed: {err}");
}
tracing::info!("Telegram document sent to {chat_id}: {file_name}");
Ok(())
}
/// Send a photo to a Telegram chat
pub async fn send_photo(
&self,
chat_id: &str,
thread_id: Option<&str>,
file_path: &Path,
caption: Option<&str>,
) -> anyhow::Result<()> {
let file_name = file_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("photo.jpg");
let file_bytes = tokio::fs::read(file_path).await?;
let part = Part::bytes(file_bytes).file_name(file_name.to_string());
let mut form = Form::new()
.text("chat_id", chat_id.to_string())
.part("photo", part);
if let Some(tid) = thread_id {
form = form.text("message_thread_id", tid.to_string());
}
if let Some(cap) = caption {
form = form.text("caption", cap.to_string());
}
let resp = self
.client
.post(self.api_url("sendPhoto"))
.multipart(form)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram sendPhoto failed: {err}");
}
tracing::info!("Telegram photo sent to {chat_id}: {file_name}");
Ok(())
}
/// Send a photo from bytes (in-memory) to a Telegram chat
pub async fn send_photo_bytes(
&self,
chat_id: &str,
thread_id: Option<&str>,
file_bytes: Vec<u8>,
file_name: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
let part = Part::bytes(file_bytes).file_name(file_name.to_string());
let mut form = Form::new()
.text("chat_id", chat_id.to_string())
.part("photo", part);
if let Some(tid) = thread_id {
form = form.text("message_thread_id", tid.to_string());
}
if let Some(cap) = caption {
form = form.text("caption", cap.to_string());
}
let resp = self
.client
.post(self.api_url("sendPhoto"))
.multipart(form)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram sendPhoto failed: {err}");
}
tracing::info!("Telegram photo sent to {chat_id}: {file_name}");
Ok(())
}
/// Send a video to a Telegram chat
pub async fn send_video(
&self,
chat_id: &str,
thread_id: Option<&str>,
file_path: &Path,
caption: Option<&str>,
) -> anyhow::Result<()> {
let file_name = file_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("video.mp4");
let file_bytes = tokio::fs::read(file_path).await?;
let part = Part::bytes(file_bytes).file_name(file_name.to_string());
let mut form = Form::new()
.text("chat_id", chat_id.to_string())
.part("video", part);
if let Some(tid) = thread_id {
form = form.text("message_thread_id", tid.to_string());
}
if let Some(cap) = caption {
form = form.text("caption", cap.to_string());
}
let resp = self
.client
.post(self.api_url("sendVideo"))
.multipart(form)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram sendVideo failed: {err}");
}
tracing::info!("Telegram video sent to {chat_id}: {file_name}");
Ok(())
}
/// Send an audio file to a Telegram chat
pub async fn send_audio(
&self,
chat_id: &str,
thread_id: Option<&str>,
file_path: &Path,
caption: Option<&str>,
) -> anyhow::Result<()> {
let file_name = file_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("audio.mp3");
let file_bytes = tokio::fs::read(file_path).await?;
let part = Part::bytes(file_bytes).file_name(file_name.to_string());
let mut form = Form::new()
.text("chat_id", chat_id.to_string())
.part("audio", part);
if let Some(tid) = thread_id {
form = form.text("message_thread_id", tid.to_string());
}
if let Some(cap) = caption {
form = form.text("caption", cap.to_string());
}
let resp = self
.client
.post(self.api_url("sendAudio"))
.multipart(form)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram sendAudio failed: {err}");
}
tracing::info!("Telegram audio sent to {chat_id}: {file_name}");
Ok(())
}
/// Send a voice message to a Telegram chat
pub async fn send_voice(
&self,
chat_id: &str,
thread_id: Option<&str>,
file_path: &Path,
caption: Option<&str>,
) -> anyhow::Result<()> {
let file_name = file_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("voice.ogg");
let file_bytes = tokio::fs::read(file_path).await?;
let part = Part::bytes(file_bytes).file_name(file_name.to_string());
let mut form = Form::new()
.text("chat_id", chat_id.to_string())
.part("voice", part);
if let Some(tid) = thread_id {
form = form.text("message_thread_id", tid.to_string());
}
if let Some(cap) = caption {
form = form.text("caption", cap.to_string());
}
let resp = self
.client
.post(self.api_url("sendVoice"))
.multipart(form)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram sendVoice failed: {err}");
}
tracing::info!("Telegram voice sent to {chat_id}: {file_name}");
Ok(())
}
/// Send a file by URL (Telegram will download it)
pub async fn send_document_by_url(
&self,
chat_id: &str,
thread_id: Option<&str>,
url: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
let mut body = serde_json::json!({
"chat_id": chat_id,
"document": url
});
if let Some(tid) = thread_id {
body["message_thread_id"] = serde_json::Value::String(tid.to_string());
}
if let Some(cap) = caption {
body["caption"] = serde_json::Value::String(cap.to_string());
}
let resp = self
.client
.post(self.api_url("sendDocument"))
.json(&body)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram sendDocument by URL failed: {err}");
}
tracing::info!("Telegram document (URL) sent to {chat_id}: {url}");
Ok(())
}
/// Send a photo by URL (Telegram will download it)
pub async fn send_photo_by_url(
&self,
chat_id: &str,
thread_id: Option<&str>,
url: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
let mut body = serde_json::json!({
"chat_id": chat_id,
"photo": url
});
if let Some(tid) = thread_id {
body["message_thread_id"] = serde_json::Value::String(tid.to_string());
}
if let Some(cap) = caption {
body["caption"] = serde_json::Value::String(cap.to_string());
}
let resp = self
.client
.post(self.api_url("sendPhoto"))
.json(&body)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram sendPhoto by URL failed: {err}");
}
tracing::info!("Telegram photo (URL) sent to {chat_id}: {url}");
Ok(())
}
/// Send a video by URL (Telegram will download it)
pub async fn send_video_by_url(
&self,
chat_id: &str,
thread_id: Option<&str>,
url: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
self.send_media_by_url("sendVideo", "video", chat_id, thread_id, url, caption)
.await
}
/// Send an audio file by URL (Telegram will download it)
pub async fn send_audio_by_url(
&self,
chat_id: &str,
thread_id: Option<&str>,
url: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
self.send_media_by_url("sendAudio", "audio", chat_id, thread_id, url, caption)
.await
}
/// Send a voice message by URL (Telegram will download it)
pub async fn send_voice_by_url(
&self,
chat_id: &str,
thread_id: Option<&str>,
url: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
self.send_media_by_url("sendVoice", "voice", chat_id, thread_id, url, caption)
.await
}
}
#[async_trait]
impl Channel for TelegramChannel {
fn name(&self) -> &str {
"telegram"
}
fn supports_draft_updates(&self) -> bool {
self.stream_mode != StreamMode::Off
}
async fn send_draft(&self, message: &SendMessage) -> anyhow::Result<Option<String>> {
if self.stream_mode == StreamMode::Off {
return Ok(None);
}
let (chat_id, thread_id) = Self::parse_reply_target(&message.recipient);
let initial_text = if message.content.is_empty() {
"...".to_string()
} else {
message.content.clone()
};
let mut body = serde_json::json!({
"chat_id": chat_id,
"text": initial_text,
});
if let Some(tid) = thread_id {
body["message_thread_id"] = serde_json::Value::String(tid.to_string());
}
let resp = self
.client
.post(self.api_url("sendMessage"))
.json(&body)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await.unwrap_or_default();
anyhow::bail!("Telegram sendMessage (draft) failed: {err}");
}
let resp_json: serde_json::Value = resp.json().await?;
let message_id = resp_json
.get("result")
.and_then(|r| r.get("message_id"))
.and_then(|id| id.as_i64())
.map(|id| id.to_string());
self.last_draft_edit
.lock()
.insert(chat_id.to_string(), std::time::Instant::now());
Ok(message_id)
}
async fn update_draft(
&self,
recipient: &str,
message_id: &str,
text: &str,
) -> anyhow::Result<()> {
let (chat_id, _) = Self::parse_reply_target(recipient);
// Rate-limit edits per chat
{
let last_edits = self.last_draft_edit.lock();
if let Some(last_time) = last_edits.get(&chat_id) {
let elapsed = u64::try_from(last_time.elapsed().as_millis()).unwrap_or(u64::MAX);
if elapsed < self.draft_update_interval_ms {
return Ok(());
}
}
}
// Truncate to Telegram limit for mid-stream edits (UTF-8 safe)
let display_text = if text.len() > TELEGRAM_MAX_MESSAGE_LENGTH {
let mut end = 0;
for (idx, ch) in text.char_indices() {
let next = idx + ch.len_utf8();
if next > TELEGRAM_MAX_MESSAGE_LENGTH {
break;
}
end = next;
}
&text[..end]
} else {
text
};
let message_id_parsed = match message_id.parse::<i64>() {
Ok(id) => id,
Err(e) => {
tracing::warn!("Invalid Telegram message_id '{message_id}': {e}");
return Ok(());
}
};
let body = serde_json::json!({
"chat_id": chat_id,
"message_id": message_id_parsed,
"text": display_text,
});
let resp = self
.client
.post(self.api_url("editMessageText"))
.json(&body)
.send()
.await?;
if resp.status().is_success() {
self.last_draft_edit
.lock()
.insert(chat_id.clone(), std::time::Instant::now());
} else {
let status = resp.status();
let err = resp.text().await.unwrap_or_default();
tracing::debug!("Telegram editMessageText failed ({status}): {err}");
}
Ok(())
}
async fn finalize_draft(
&self,
recipient: &str,
message_id: &str,
text: &str,
) -> anyhow::Result<()> {
let text = &strip_tool_call_tags(text);
let (chat_id, thread_id) = Self::parse_reply_target(recipient);
// Clean up rate-limit tracking for this chat
self.last_draft_edit.lock().remove(&chat_id);
// If text exceeds limit, delete draft and send as chunked messages
if text.len() > TELEGRAM_MAX_MESSAGE_LENGTH {
let msg_id = match message_id.parse::<i64>() {
Ok(id) => id,
Err(e) => {
tracing::warn!("Invalid Telegram message_id '{message_id}': {e}");
return self
.send_text_chunks(text, &chat_id, thread_id.as_deref())
.await;
}
};
// Delete the draft
let _ = self
.client
.post(self.api_url("deleteMessage"))
.json(&serde_json::json!({
"chat_id": chat_id,
"message_id": msg_id,
}))
.send()
.await;
// Fall back to chunked send
return self
.send_text_chunks(text, &chat_id, thread_id.as_deref())
.await;
}
let msg_id = match message_id.parse::<i64>() {
Ok(id) => id,
Err(e) => {
tracing::warn!("Invalid Telegram message_id '{message_id}': {e}");
return self
.send_text_chunks(text, &chat_id, thread_id.as_deref())
.await;
}
};
// Try editing with Markdown formatting
let body = serde_json::json!({
"chat_id": chat_id,
"message_id": msg_id,
"text": text,
"parse_mode": "Markdown",
});
let resp = self
.client
.post(self.api_url("editMessageText"))
.json(&body)
.send()
.await?;
if resp.status().is_success() {
return Ok(());
}
// Markdown failed — retry without parse_mode
let plain_body = serde_json::json!({
"chat_id": chat_id,
"message_id": msg_id,
"text": text,
});
let resp = self
.client
.post(self.api_url("editMessageText"))
.json(&plain_body)
.send()
.await?;
if resp.status().is_success() {
return Ok(());
}
// Edit failed entirely — fall back to new message
tracing::warn!("Telegram finalize_draft edit failed; falling back to sendMessage");
self.send_text_chunks(text, &chat_id, thread_id.as_deref())
.await
}
async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
// Strip tool_call tags before processing to prevent Markdown parsing failures
let content = strip_tool_call_tags(&message.content);
// Parse recipient: "chat_id" or "chat_id:thread_id" format
let (chat_id, thread_id) = match message.recipient.split_once(':') {
Some((chat, thread)) => (chat, Some(thread)),
None => (message.recipient.as_str(), None),
};
let (text_without_markers, attachments) = parse_attachment_markers(&content);
if !attachments.is_empty() {
if !text_without_markers.is_empty() {
self.send_text_chunks(&text_without_markers, chat_id, thread_id)
.await?;
}
for attachment in &attachments {
self.send_attachment(chat_id, thread_id, attachment).await?;
}
return Ok(());
}
if let Some(attachment) = parse_path_only_attachment(&content) {
self.send_attachment(chat_id, thread_id, &attachment)
.await?;
return Ok(());
}
self.send_text_chunks(&content, chat_id, thread_id).await
}
async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
let mut offset: i64 = 0;
tracing::info!("Telegram channel listening for messages...");
loop {
let url = self.api_url("getUpdates");
let body = serde_json::json!({
"offset": offset,
"timeout": 30,
"allowed_updates": ["message"]
});
let resp = match self.client.post(&url).json(&body).send().await {
Ok(r) => r,
Err(e) => {
tracing::warn!("Telegram poll error: {e}");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
continue;
}
};
let data: serde_json::Value = match resp.json().await {
Ok(d) => d,
Err(e) => {
tracing::warn!("Telegram parse error: {e}");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
continue;
}
};
let ok = data
.get("ok")
.and_then(serde_json::Value::as_bool)
.unwrap_or(true);
if !ok {
let error_code = data
.get("error_code")
.and_then(serde_json::Value::as_i64)
.unwrap_or_default();
let description = data
.get("description")
.and_then(serde_json::Value::as_str)
.unwrap_or("unknown Telegram API error");
if error_code == 409 {
tracing::warn!(
"Telegram polling conflict (409): {description}. \
Ensure only one `zeroclaw` process is using this bot token."
);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
} else {
tracing::warn!(
"Telegram getUpdates API error (code={}): {description}",
error_code
);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
continue;
}
if let Some(results) = data.get("result").and_then(serde_json::Value::as_array) {
for update in results {
// Advance offset past this update
if let Some(uid) = update.get("update_id").and_then(serde_json::Value::as_i64) {
offset = uid + 1;
}
let Some(msg) = self.parse_update_message(update) else {
self.handle_unauthorized_message(update).await;
continue;
};
// Send "typing" indicator immediately when we receive a message
let typing_body = serde_json::json!({
"chat_id": &msg.reply_target,
"action": "typing"
});
let _ = self
.client
.post(self.api_url("sendChatAction"))
.json(&typing_body)
.send()
.await; // Ignore errors for typing indicator
if tx.send(msg).await.is_err() {
return Ok(());
}
}
}
}
}
async fn health_check(&self) -> bool {
let timeout_duration = Duration::from_secs(5);
match tokio::time::timeout(
timeout_duration,
self.client.get(self.api_url("getMe")).send(),
)
.await
{
Ok(Ok(resp)) => resp.status().is_success(),
Ok(Err(e)) => {
tracing::debug!("Telegram health check failed: {e}");
false
}
Err(_) => {
tracing::debug!("Telegram health check timed out after 5s");
false
}
}
}
async fn start_typing(&self, recipient: &str) -> anyhow::Result<()> {
self.stop_typing(recipient).await?;
let client = self.client.clone();
let url = self.api_url("sendChatAction");
let chat_id = recipient.to_string();
let handle = tokio::spawn(async move {
loop {
let body = serde_json::json!({
"chat_id": &chat_id,
"action": "typing"
});
let _ = client.post(&url).json(&body).send().await;
// Telegram typing indicator expires after 5s; refresh at 4s
tokio::time::sleep(Duration::from_secs(4)).await;
}
});
let mut guard = self.typing_handle.lock();
*guard = Some(handle);
Ok(())
}
async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> {
let mut guard = self.typing_handle.lock();
if let Some(handle) = guard.take() {
handle.abort();
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn telegram_channel_name() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
assert_eq!(ch.name(), "telegram");
}
#[test]
fn typing_handle_starts_as_none() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
let guard = ch.typing_handle.lock();
assert!(guard.is_none());
}
#[tokio::test]
async fn stop_typing_clears_handle() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
// Manually insert a dummy handle
{
let mut guard = ch.typing_handle.lock();
*guard = Some(tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(60)).await;
}));
}
// stop_typing should abort and clear
ch.stop_typing("123").await.unwrap();
let guard = ch.typing_handle.lock();
assert!(guard.is_none());
}
#[tokio::test]
async fn start_typing_replaces_previous_handle() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
// Insert a dummy handle first
{
let mut guard = ch.typing_handle.lock();
*guard = Some(tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(60)).await;
}));
}
// start_typing should abort the old handle and set a new one
let _ = ch.start_typing("123").await;
let guard = ch.typing_handle.lock();
assert!(guard.is_some());
}
#[test]
fn supports_draft_updates_respects_stream_mode() {
let off = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
assert!(!off.supports_draft_updates());
let partial = TelegramChannel::new("fake-token".into(), vec!["*".into()])
.with_streaming(StreamMode::Partial, 750);
assert!(partial.supports_draft_updates());
assert_eq!(partial.draft_update_interval_ms, 750);
}
#[tokio::test]
async fn send_draft_returns_none_when_stream_mode_off() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
let id = ch
.send_draft(&SendMessage::new("draft", "123"))
.await
.unwrap();
assert!(id.is_none());
}
#[tokio::test]
async fn update_draft_rate_limit_short_circuits_network() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()])
.with_streaming(StreamMode::Partial, 60_000);
ch.last_draft_edit
.lock()
.insert("123".to_string(), std::time::Instant::now());
let result = ch.update_draft("123", "42", "delta text").await;
assert!(result.is_ok());
}
#[tokio::test]
async fn update_draft_utf8_truncation_is_safe_for_multibyte_text() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()])
.with_streaming(StreamMode::Partial, 0);
let long_emoji_text = "😀".repeat(TELEGRAM_MAX_MESSAGE_LENGTH + 20);
// Invalid message_id returns early after building display_text.
// This asserts truncation never panics on UTF-8 boundaries.
let result = ch
.update_draft("123", "not-a-number", &long_emoji_text)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn finalize_draft_invalid_message_id_falls_back_to_chunk_send() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()])
.with_streaming(StreamMode::Partial, 0);
let long_text = "a".repeat(TELEGRAM_MAX_MESSAGE_LENGTH + 64);
// For oversized text + invalid draft message_id, finalize_draft should
// fall back to chunked send instead of returning early.
let result = ch.finalize_draft("123", "not-a-number", &long_text).await;
assert!(result.is_err());
}
#[test]
fn telegram_api_url() {
let ch = TelegramChannel::new("123:ABC".into(), vec![]);
assert_eq!(
ch.api_url("getMe"),
"https://api.telegram.org/bot123:ABC/getMe"
);
}
#[test]
fn telegram_user_allowed_wildcard() {
let ch = TelegramChannel::new("t".into(), vec!["*".into()]);
assert!(ch.is_user_allowed("anyone"));
}
#[test]
fn telegram_user_allowed_specific() {
let ch = TelegramChannel::new("t".into(), vec!["alice".into(), "bob".into()]);
assert!(ch.is_user_allowed("alice"));
assert!(!ch.is_user_allowed("eve"));
}
#[test]
fn telegram_user_allowed_with_at_prefix_in_config() {
let ch = TelegramChannel::new("t".into(), vec!["@alice".into()]);
assert!(ch.is_user_allowed("alice"));
}
#[test]
fn telegram_user_denied_empty() {
let ch = TelegramChannel::new("t".into(), vec![]);
assert!(!ch.is_user_allowed("anyone"));
}
#[test]
fn telegram_user_exact_match_not_substring() {
let ch = TelegramChannel::new("t".into(), vec!["alice".into()]);
assert!(!ch.is_user_allowed("alice_bot"));
assert!(!ch.is_user_allowed("alic"));
assert!(!ch.is_user_allowed("malice"));
}
#[test]
fn telegram_user_empty_string_denied() {
let ch = TelegramChannel::new("t".into(), vec!["alice".into()]);
assert!(!ch.is_user_allowed(""));
}
#[test]
fn telegram_user_case_sensitive() {
let ch = TelegramChannel::new("t".into(), vec!["Alice".into()]);
assert!(ch.is_user_allowed("Alice"));
assert!(!ch.is_user_allowed("alice"));
assert!(!ch.is_user_allowed("ALICE"));
}
#[test]
fn telegram_wildcard_with_specific_users() {
let ch = TelegramChannel::new("t".into(), vec!["alice".into(), "*".into()]);
assert!(ch.is_user_allowed("alice"));
assert!(ch.is_user_allowed("bob"));
assert!(ch.is_user_allowed("anyone"));
}
#[test]
fn telegram_user_allowed_by_numeric_id_identity() {
let ch = TelegramChannel::new("t".into(), vec!["123456789".into()]);
assert!(ch.is_any_user_allowed(["unknown", "123456789"]));
}
#[test]
fn telegram_user_denied_when_none_of_identities_match() {
let ch = TelegramChannel::new("t".into(), vec!["alice".into(), "987654321".into()]);
assert!(!ch.is_any_user_allowed(["unknown", "123456789"]));
}
#[test]
fn telegram_pairing_enabled_with_empty_allowlist() {
let ch = TelegramChannel::new("t".into(), vec![]);
assert!(ch.pairing_code_active());
}
#[test]
fn telegram_pairing_disabled_with_nonempty_allowlist() {
let ch = TelegramChannel::new("t".into(), vec!["alice".into()]);
assert!(!ch.pairing_code_active());
}
#[test]
fn telegram_extract_bind_code_plain_command() {
assert_eq!(
TelegramChannel::extract_bind_code("/bind 123456"),
Some("123456")
);
}
#[test]
fn telegram_extract_bind_code_supports_bot_mention() {
assert_eq!(
TelegramChannel::extract_bind_code("/bind@zeroclaw_bot 654321"),
Some("654321")
);
}
#[test]
fn telegram_extract_bind_code_rejects_invalid_forms() {
assert_eq!(TelegramChannel::extract_bind_code("/bind"), None);
assert_eq!(TelegramChannel::extract_bind_code("/start"), None);
}
#[test]
fn parse_attachment_markers_extracts_multiple_types() {
let message = "Here are files [IMAGE:/tmp/a.png] and [DOCUMENT:https://example.com/a.pdf]";
let (cleaned, attachments) = parse_attachment_markers(message);
assert_eq!(cleaned, "Here are files and");
assert_eq!(attachments.len(), 2);
assert_eq!(attachments[0].kind, TelegramAttachmentKind::Image);
assert_eq!(attachments[0].target, "/tmp/a.png");
assert_eq!(attachments[1].kind, TelegramAttachmentKind::Document);
assert_eq!(attachments[1].target, "https://example.com/a.pdf");
}
#[test]
fn parse_attachment_markers_keeps_invalid_markers_in_text() {
let message = "Report [UNKNOWN:/tmp/a.bin]";
let (cleaned, attachments) = parse_attachment_markers(message);
assert_eq!(cleaned, "Report [UNKNOWN:/tmp/a.bin]");
assert!(attachments.is_empty());
}
#[test]
fn parse_path_only_attachment_detects_existing_file() {
let dir = tempfile::tempdir().unwrap();
let image_path = dir.path().join("snap.png");
std::fs::write(&image_path, b"fake-png").unwrap();
let parsed = parse_path_only_attachment(image_path.to_string_lossy().as_ref())
.expect("expected attachment");
assert_eq!(parsed.kind, TelegramAttachmentKind::Image);
assert_eq!(parsed.target, image_path.to_string_lossy());
}
#[test]
fn parse_path_only_attachment_rejects_sentence_text() {
assert!(parse_path_only_attachment("Screenshot saved to /tmp/snap.png").is_none());
}
#[test]
fn infer_attachment_kind_from_target_detects_document_extension() {
assert_eq!(
infer_attachment_kind_from_target("https://example.com/files/specs.pdf?download=1"),
Some(TelegramAttachmentKind::Document)
);
}
#[test]
fn parse_update_message_uses_chat_id_as_reply_target() {
let ch = TelegramChannel::new("token".into(), vec!["*".into()]);
let update = serde_json::json!({
"update_id": 1,
"message": {
"message_id": 33,
"text": "hello",
"from": {
"id": 555,
"username": "alice"
},
"chat": {
"id": -100_200_300
}
}
});
let msg = ch
.parse_update_message(&update)
.expect("message should parse");
assert_eq!(msg.sender, "alice");
assert_eq!(msg.reply_target, "-100200300");
assert_eq!(msg.content, "hello");
assert_eq!(msg.id, "telegram_-100200300_33");
}
#[test]
fn parse_update_message_allows_numeric_id_without_username() {
let ch = TelegramChannel::new("token".into(), vec!["555".into()]);
let update = serde_json::json!({
"update_id": 2,
"message": {
"message_id": 9,
"text": "ping",
"from": {
"id": 555
},
"chat": {
"id": 12345
}
}
});
let msg = ch
.parse_update_message(&update)
.expect("numeric allowlist should pass");
assert_eq!(msg.sender, "555");
assert_eq!(msg.reply_target, "12345");
}
#[test]
fn parse_update_message_extracts_thread_id_for_forum_topic() {
let ch = TelegramChannel::new("token".into(), vec!["*".into()]);
let update = serde_json::json!({
"update_id": 3,
"message": {
"message_id": 42,
"text": "hello from topic",
"from": {
"id": 555,
"username": "alice"
},
"chat": {
"id": -100_200_300
},
"message_thread_id": 789
}
});
let msg = ch
.parse_update_message(&update)
.expect("message with thread_id should parse");
assert_eq!(msg.sender, "alice");
assert_eq!(msg.reply_target, "-100200300:789");
assert_eq!(msg.content, "hello from topic");
assert_eq!(msg.id, "telegram_-100200300_42");
}
// ── File sending API URL tests ──────────────────────────────────
#[test]
fn telegram_api_url_send_document() {
let ch = TelegramChannel::new("123:ABC".into(), vec![]);
assert_eq!(
ch.api_url("sendDocument"),
"https://api.telegram.org/bot123:ABC/sendDocument"
);
}
#[test]
fn telegram_api_url_send_photo() {
let ch = TelegramChannel::new("123:ABC".into(), vec![]);
assert_eq!(
ch.api_url("sendPhoto"),
"https://api.telegram.org/bot123:ABC/sendPhoto"
);
}
#[test]
fn telegram_api_url_send_video() {
let ch = TelegramChannel::new("123:ABC".into(), vec![]);
assert_eq!(
ch.api_url("sendVideo"),
"https://api.telegram.org/bot123:ABC/sendVideo"
);
}
#[test]
fn telegram_api_url_send_audio() {
let ch = TelegramChannel::new("123:ABC".into(), vec![]);
assert_eq!(
ch.api_url("sendAudio"),
"https://api.telegram.org/bot123:ABC/sendAudio"
);
}
#[test]
fn telegram_api_url_send_voice() {
let ch = TelegramChannel::new("123:ABC".into(), vec![]);
assert_eq!(
ch.api_url("sendVoice"),
"https://api.telegram.org/bot123:ABC/sendVoice"
);
}
// ── File sending integration tests (with mock server) ──────────
#[tokio::test]
async fn telegram_send_document_bytes_builds_correct_form() {
// This test verifies the method doesn't panic and handles bytes correctly
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
let file_bytes = b"Hello, this is a test file content".to_vec();
// The actual API call will fail (no real server), but we verify the method exists
// and handles the input correctly up to the network call
let result = ch
.send_document_bytes("123456", None, file_bytes, "test.txt", Some("Test caption"))
.await;
// Should fail with network error, not a panic or type error
assert!(result.is_err());
let err = result.unwrap_err().to_string();
// Error should be network-related, not a code bug
assert!(
err.contains("error") || err.contains("failed") || err.contains("connect"),
"Expected network error, got: {err}"
);
}
#[tokio::test]
async fn telegram_send_photo_bytes_builds_correct_form() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
// Minimal valid PNG header bytes
let file_bytes = vec![0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A];
let result = ch
.send_photo_bytes("123456", None, file_bytes, "test.png", None)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn telegram_send_document_by_url_builds_correct_json() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
let result = ch
.send_document_by_url(
"123456",
None,
"https://example.com/file.pdf",
Some("PDF doc"),
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn telegram_send_photo_by_url_builds_correct_json() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
let result = ch
.send_photo_by_url("123456", None, "https://example.com/image.jpg", None)
.await;
assert!(result.is_err());
}
// ── File path handling tests ────────────────────────────────────
#[tokio::test]
async fn telegram_send_document_nonexistent_file() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
let path = Path::new("/nonexistent/path/to/file.txt");
let result = ch.send_document("123456", None, path, None).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
// Should fail with file not found error
assert!(
err.contains("No such file") || err.contains("not found") || err.contains("os error"),
"Expected file not found error, got: {err}"
);
}
#[tokio::test]
async fn telegram_send_photo_nonexistent_file() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
let path = Path::new("/nonexistent/path/to/photo.jpg");
let result = ch.send_photo("123456", None, path, None).await;
assert!(result.is_err());
}
#[tokio::test]
async fn telegram_send_video_nonexistent_file() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
let path = Path::new("/nonexistent/path/to/video.mp4");
let result = ch.send_video("123456", None, path, None).await;
assert!(result.is_err());
}
#[tokio::test]
async fn telegram_send_audio_nonexistent_file() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
let path = Path::new("/nonexistent/path/to/audio.mp3");
let result = ch.send_audio("123456", None, path, None).await;
assert!(result.is_err());
}
#[tokio::test]
async fn telegram_send_voice_nonexistent_file() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
let path = Path::new("/nonexistent/path/to/voice.ogg");
let result = ch.send_voice("123456", None, path, None).await;
assert!(result.is_err());
}
// ── Message splitting tests ─────────────────────────────────────
#[test]
fn telegram_split_short_message() {
let msg = "Hello, world!";
let chunks = split_message_for_telegram(msg);
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0], msg);
}
#[test]
fn telegram_split_exact_limit() {
let msg = "a".repeat(TELEGRAM_MAX_MESSAGE_LENGTH);
let chunks = split_message_for_telegram(&msg);
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].len(), TELEGRAM_MAX_MESSAGE_LENGTH);
}
#[test]
fn telegram_split_over_limit() {
let msg = "a".repeat(TELEGRAM_MAX_MESSAGE_LENGTH + 100);
let chunks = split_message_for_telegram(&msg);
assert_eq!(chunks.len(), 2);
assert!(chunks[0].len() <= TELEGRAM_MAX_MESSAGE_LENGTH);
assert!(chunks[1].len() <= TELEGRAM_MAX_MESSAGE_LENGTH);
}
#[test]
fn telegram_split_at_word_boundary() {
let msg = format!(
"{} more text here",
"word ".repeat(TELEGRAM_MAX_MESSAGE_LENGTH / 5)
);
let chunks = split_message_for_telegram(&msg);
assert!(chunks.len() >= 2);
// First chunk should end with a complete word (space at the end)
for chunk in &chunks[..chunks.len() - 1] {
assert!(chunk.len() <= TELEGRAM_MAX_MESSAGE_LENGTH);
}
}
#[test]
fn telegram_split_at_newline() {
let text_block = "Line of text\n".repeat(TELEGRAM_MAX_MESSAGE_LENGTH / 13 + 1);
let chunks = split_message_for_telegram(&text_block);
assert!(chunks.len() >= 2);
for chunk in chunks {
assert!(chunk.len() <= TELEGRAM_MAX_MESSAGE_LENGTH);
}
}
#[test]
fn telegram_split_preserves_content() {
let msg = "test ".repeat(TELEGRAM_MAX_MESSAGE_LENGTH / 5 + 100);
let chunks = split_message_for_telegram(&msg);
let rejoined = chunks.join("");
assert_eq!(rejoined, msg);
}
#[test]
fn telegram_split_empty_message() {
let chunks = split_message_for_telegram("");
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0], "");
}
#[test]
fn telegram_split_very_long_message() {
let msg = "x".repeat(TELEGRAM_MAX_MESSAGE_LENGTH * 3);
let chunks = split_message_for_telegram(&msg);
assert!(chunks.len() >= 3);
for chunk in chunks {
assert!(chunk.len() <= TELEGRAM_MAX_MESSAGE_LENGTH);
}
}
// ── Caption handling tests ──────────────────────────────────────
#[tokio::test]
async fn telegram_send_document_bytes_with_caption() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
let file_bytes = b"test content".to_vec();
// With caption
let result = ch
.send_document_bytes(
"123456",
None,
file_bytes.clone(),
"test.txt",
Some("My caption"),
)
.await;
assert!(result.is_err()); // Network error expected
// Without caption
let result = ch
.send_document_bytes("123456", None, file_bytes, "test.txt", None)
.await;
assert!(result.is_err()); // Network error expected
}
#[tokio::test]
async fn telegram_send_photo_bytes_with_caption() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
let file_bytes = vec![0x89, 0x50, 0x4E, 0x47];
// With caption
let result = ch
.send_photo_bytes(
"123456",
None,
file_bytes.clone(),
"test.png",
Some("Photo caption"),
)
.await;
assert!(result.is_err());
// Without caption
let result = ch
.send_photo_bytes("123456", None, file_bytes, "test.png", None)
.await;
assert!(result.is_err());
}
// ── Empty/edge case tests ───────────────────────────────────────
#[tokio::test]
async fn telegram_send_document_bytes_empty_file() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
let file_bytes: Vec<u8> = vec![];
let result = ch
.send_document_bytes("123456", None, file_bytes, "empty.txt", None)
.await;
// Should not panic, will fail at API level
assert!(result.is_err());
}
#[tokio::test]
async fn telegram_send_document_bytes_empty_filename() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
let file_bytes = b"content".to_vec();
let result = ch
.send_document_bytes("123456", None, file_bytes, "", None)
.await;
// Should not panic
assert!(result.is_err());
}
#[tokio::test]
async fn telegram_send_document_bytes_empty_chat_id() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
let file_bytes = b"content".to_vec();
let result = ch
.send_document_bytes("", None, file_bytes, "test.txt", None)
.await;
// Should not panic
assert!(result.is_err());
}
// ── Message ID edge cases ─────────────────────────────────────
#[test]
fn telegram_message_id_format_includes_chat_and_message_id() {
// Verify that message IDs follow the format: telegram_{chat_id}_{message_id}
let chat_id = "123456";
let message_id = 789;
let expected_id = format!("telegram_{chat_id}_{message_id}");
assert_eq!(expected_id, "telegram_123456_789");
}
#[test]
fn telegram_message_id_is_deterministic() {
// Same chat_id + same message_id = same ID (prevents duplicates after restart)
let chat_id = "123456";
let message_id = 789;
let id1 = format!("telegram_{chat_id}_{message_id}");
let id2 = format!("telegram_{chat_id}_{message_id}");
assert_eq!(id1, id2);
}
#[test]
fn telegram_message_id_different_message_different_id() {
// Different message IDs produce different IDs
let chat_id = "123456";
let id1 = format!("telegram_{chat_id}_789");
let id2 = format!("telegram_{chat_id}_790");
assert_ne!(id1, id2);
}
#[test]
fn telegram_message_id_different_chat_different_id() {
// Different chats produce different IDs even with same message_id
let message_id = 789;
let id1 = format!("telegram_123456_{message_id}");
let id2 = format!("telegram_789012_{message_id}");
assert_ne!(id1, id2);
}
#[test]
fn telegram_message_id_no_uuid_randomness() {
// Verify format doesn't contain random UUID components
let chat_id = "123456";
let message_id = 789;
let id = format!("telegram_{chat_id}_{message_id}");
assert!(!id.contains('-')); // No UUID dashes
assert!(id.starts_with("telegram_"));
}
#[test]
fn telegram_message_id_handles_zero_message_id() {
// Edge case: message_id can be 0 (fallback/missing case)
let chat_id = "123456";
let message_id = 0;
let id = format!("telegram_{chat_id}_{message_id}");
assert_eq!(id, "telegram_123456_0");
}
// ── Tool call tag stripping tests ───────────────────────────────────
#[test]
fn strip_tool_call_tags_removes_standard_tags() {
let input =
"Hello <tool>{\"name\":\"shell\",\"arguments\":{\"command\":\"ls\"}}</tool> world";
let result = strip_tool_call_tags(input);
assert_eq!(result, "Hello world");
}
#[test]
fn strip_tool_call_tags_removes_alias_tags() {
let input = "Hello <toolcall>{\"name\":\"shell\",\"arguments\":{\"command\":\"ls\"}}</toolcall> world";
let result = strip_tool_call_tags(input);
assert_eq!(result, "Hello world");
}
#[test]
fn strip_tool_call_tags_removes_dash_tags() {
let input = "Hello <tool-call>{\"name\":\"shell\",\"arguments\":{\"command\":\"ls\"}}</tool-call> world";
let result = strip_tool_call_tags(input);
assert_eq!(result, "Hello world");
}
#[test]
fn strip_tool_call_tags_handles_multiple_tags() {
let input = "Start <tool>a</tool> middle <tool>b</tool> end";
let result = strip_tool_call_tags(input);
assert_eq!(result, "Start middle end");
}
#[test]
fn strip_tool_call_tags_handles_mixed_tags() {
let input = "A <tool>a</tool> B <toolcall>b</toolcall> C <tool-call>c</tool-call> D";
let result = strip_tool_call_tags(input);
assert_eq!(result, "A B C D");
}
#[test]
fn strip_tool_call_tags_preserves_normal_text() {
let input = "Hello world! This is a test.";
let result = strip_tool_call_tags(input);
assert_eq!(result, "Hello world! This is a test.");
}
#[test]
fn strip_tool_call_tags_handles_unclosed_tags() {
let input = "Hello <tool>world";
let result = strip_tool_call_tags(input);
assert_eq!(result, "Hello <tool>world");
}
#[test]
fn strip_tool_call_tags_cleans_extra_newlines() {
let input = "Hello\n\n<tool>\ntest\n</tool>\n\n\nworld";
let result = strip_tool_call_tags(input);
assert_eq!(result, "Hello\n\nworld");
}
#[test]
fn strip_tool_call_tags_handles_empty_input() {
let input = "";
let result = strip_tool_call_tags(input);
assert_eq!(result, "");
}
#[test]
fn strip_tool_call_tags_handles_only_tags() {
let input = "<tool>{\"name\":\"test\"}</tool>";
let result = strip_tool_call_tags(input);
assert_eq!(result, "");
}
}