use super::traits::{Channel, ChannelMessage, SendMessage}; use crate::config::{Config, StreamMode}; use crate::security::pairing::PairingGuard; use anyhow::Context; use async_trait::async_trait; use directories::UserDirs; use parking_lot::Mutex; use reqwest::multipart::{Form, Part}; use std::fs; use std::path::Path; use std::sync::{Arc, RwLock}; use std::time::Duration; /// Telegram's maximum message length for text messages const TELEGRAM_MAX_MESSAGE_LENGTH: usize = 4096; const TELEGRAM_BIND_COMMAND: &str = "/bind"; /// Split a message into chunks that respect Telegram's 4096 character limit. /// Tries to split at word boundaries when possible, and handles continuation. fn split_message_for_telegram(message: &str) -> Vec { if message.len() <= TELEGRAM_MAX_MESSAGE_LENGTH { return vec![message.to_string()]; } let mut chunks = Vec::new(); let mut remaining = message; while !remaining.is_empty() { let chunk_end = if remaining.len() <= TELEGRAM_MAX_MESSAGE_LENGTH { remaining.len() } else { // Try to find a good break point (newline, then space) let search_area = &remaining[..TELEGRAM_MAX_MESSAGE_LENGTH]; // Prefer splitting at newline if let Some(pos) = search_area.rfind('\n') { // Don't split if the newline is too close to the start if pos >= TELEGRAM_MAX_MESSAGE_LENGTH / 2 { pos + 1 } else { // Try space as fallback search_area .rfind(' ') .unwrap_or(TELEGRAM_MAX_MESSAGE_LENGTH) + 1 } } else if let Some(pos) = search_area.rfind(' ') { pos + 1 } else { // Hard split at the limit TELEGRAM_MAX_MESSAGE_LENGTH } }; chunks.push(remaining[..chunk_end].to_string()); remaining = &remaining[chunk_end..]; } chunks } #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum TelegramAttachmentKind { Image, Document, Video, Audio, Voice, } #[derive(Debug, Clone, PartialEq, Eq)] struct TelegramAttachment { kind: TelegramAttachmentKind, target: String, } impl TelegramAttachmentKind { fn from_marker(marker: &str) -> Option { match marker.trim().to_ascii_uppercase().as_str() { "IMAGE" | "PHOTO" => Some(Self::Image), "DOCUMENT" | "FILE" => Some(Self::Document), "VIDEO" => Some(Self::Video), "AUDIO" => Some(Self::Audio), "VOICE" => Some(Self::Voice), _ => None, } } } fn is_http_url(target: &str) -> bool { target.starts_with("http://") || target.starts_with("https://") } fn infer_attachment_kind_from_target(target: &str) -> Option { let normalized = target .split('?') .next() .unwrap_or(target) .split('#') .next() .unwrap_or(target); let extension = Path::new(normalized) .extension() .and_then(|ext| ext.to_str())? .to_ascii_lowercase(); match extension.as_str() { "png" | "jpg" | "jpeg" | "gif" | "webp" | "bmp" => Some(TelegramAttachmentKind::Image), "mp4" | "mov" | "mkv" | "avi" | "webm" => Some(TelegramAttachmentKind::Video), "mp3" | "m4a" | "wav" | "flac" => Some(TelegramAttachmentKind::Audio), "ogg" | "oga" | "opus" => Some(TelegramAttachmentKind::Voice), "pdf" | "txt" | "md" | "csv" | "json" | "zip" | "tar" | "gz" | "doc" | "docx" | "xls" | "xlsx" | "ppt" | "pptx" => Some(TelegramAttachmentKind::Document), _ => None, } } fn parse_path_only_attachment(message: &str) -> Option { let trimmed = message.trim(); if trimmed.is_empty() || trimmed.contains('\n') { return None; } let candidate = trimmed.trim_matches(|c| matches!(c, '`' | '"' | '\'')); if candidate.chars().any(char::is_whitespace) { return None; } let candidate = candidate.strip_prefix("file://").unwrap_or(candidate); let kind = infer_attachment_kind_from_target(candidate)?; if !is_http_url(candidate) && !Path::new(candidate).exists() { return None; } Some(TelegramAttachment { kind, target: candidate.to_string(), }) } /// Strip tool_call XML-style tags from message text. /// These tags are used internally but must not be sent to Telegram as raw markup, /// since Telegram's Markdown parser will reject them (causing status 400 errors). fn strip_tool_call_tags(message: &str) -> String { const TOOL_CALL_OPEN_TAGS: [&str; 5] = [ "", "", "", "", "", ]; fn find_first_tag<'a>(haystack: &str, tags: &'a [&'a str]) -> Option<(usize, &'a str)> { tags.iter() .filter_map(|tag| haystack.find(tag).map(|idx| (idx, *tag))) .min_by_key(|(idx, _)| *idx) } fn matching_close_tag(open_tag: &str) -> Option<&'static str> { match open_tag { "" => Some(""), "" => Some(""), "" => Some(""), "" => Some(""), "" => Some(""), _ => None, } } fn extract_first_json_end(input: &str) -> Option { let trimmed = input.trim_start(); let trim_offset = input.len().saturating_sub(trimmed.len()); for (byte_idx, ch) in trimmed.char_indices() { if ch != '{' && ch != '[' { continue; } let slice = &trimmed[byte_idx..]; let mut stream = serde_json::Deserializer::from_str(slice).into_iter::(); if let Some(Ok(_value)) = stream.next() { let consumed = stream.byte_offset(); if consumed > 0 { return Some(trim_offset + byte_idx + consumed); } } } None } fn strip_leading_close_tags(mut input: &str) -> &str { loop { let trimmed = input.trim_start(); if !trimmed.starts_with("') else { return ""; }; input = &trimmed[close_end + 1..]; } } let mut kept_segments = Vec::new(); let mut remaining = message; while let Some((start, open_tag)) = find_first_tag(remaining, &TOOL_CALL_OPEN_TAGS) { let before = &remaining[..start]; if !before.is_empty() { kept_segments.push(before.to_string()); } let Some(close_tag) = matching_close_tag(open_tag) else { break; }; let after_open = &remaining[start + open_tag.len()..]; if let Some(close_idx) = after_open.find(close_tag) { remaining = &after_open[close_idx + close_tag.len()..]; continue; } if let Some(consumed_end) = extract_first_json_end(after_open) { remaining = strip_leading_close_tags(&after_open[consumed_end..]); continue; } kept_segments.push(remaining[start..].to_string()); remaining = ""; break; } if !remaining.is_empty() { kept_segments.push(remaining.to_string()); } let mut result = kept_segments.concat(); // Clean up any resulting blank lines (but preserve paragraphs) while result.contains("\n\n\n") { result = result.replace("\n\n\n", "\n\n"); } result.trim().to_string() } fn parse_attachment_markers(message: &str) -> (String, Vec) { let mut cleaned = String::with_capacity(message.len()); let mut attachments = Vec::new(); let mut cursor = 0; while cursor < message.len() { let Some(open_rel) = message[cursor..].find('[') else { cleaned.push_str(&message[cursor..]); break; }; let open = cursor + open_rel; cleaned.push_str(&message[cursor..open]); let Some(close_rel) = message[open..].find(']') else { cleaned.push_str(&message[open..]); break; }; let close = open + close_rel; let marker = &message[open + 1..close]; let parsed = marker.split_once(':').and_then(|(kind, target)| { let kind = TelegramAttachmentKind::from_marker(kind)?; let target = target.trim(); if target.is_empty() { return None; } Some(TelegramAttachment { kind, target: target.to_string(), }) }); if let Some(attachment) = parsed { attachments.push(attachment); } else { cleaned.push_str(&message[open..=close]); } cursor = close + 1; } (cleaned.trim().to_string(), attachments) } /// Telegram channel — long-polls the Bot API for updates pub struct TelegramChannel { bot_token: String, allowed_users: Arc>>, pairing: Option, client: reqwest::Client, typing_handle: Mutex>>, stream_mode: StreamMode, draft_update_interval_ms: u64, last_draft_edit: Mutex>, mention_only: bool, bot_username: Mutex>, } impl TelegramChannel { pub fn new(bot_token: String, allowed_users: Vec, mention_only: bool) -> Self { let normalized_allowed = Self::normalize_allowed_users(allowed_users); let pairing = if normalized_allowed.is_empty() { let guard = PairingGuard::new(true, &[]); if let Some(code) = guard.pairing_code() { println!(" 🔐 Telegram pairing required. One-time bind code: {code}"); println!(" Send `{TELEGRAM_BIND_COMMAND} ` from your Telegram account."); } Some(guard) } else { None }; Self { bot_token, allowed_users: Arc::new(RwLock::new(normalized_allowed)), pairing, client: reqwest::Client::new(), stream_mode: StreamMode::Off, draft_update_interval_ms: 1000, last_draft_edit: Mutex::new(std::collections::HashMap::new()), typing_handle: Mutex::new(None), mention_only, bot_username: Mutex::new(None), } } /// Configure streaming mode for progressive draft updates. pub fn with_streaming( mut self, stream_mode: StreamMode, draft_update_interval_ms: u64, ) -> Self { self.stream_mode = stream_mode; self.draft_update_interval_ms = draft_update_interval_ms; self } /// Parse reply_target into (chat_id, optional thread_id). fn parse_reply_target(reply_target: &str) -> (String, Option) { if let Some((chat_id, thread_id)) = reply_target.split_once(':') { (chat_id.to_string(), Some(thread_id.to_string())) } else { (reply_target.to_string(), None) } } fn normalize_identity(value: &str) -> String { value.trim().trim_start_matches('@').to_string() } fn normalize_allowed_users(allowed_users: Vec) -> Vec { allowed_users .into_iter() .map(|entry| Self::normalize_identity(&entry)) .filter(|entry| !entry.is_empty()) .collect() } fn load_config_without_env() -> anyhow::Result { let home = UserDirs::new() .map(|u| u.home_dir().to_path_buf()) .context("Could not find home directory")?; let zeroclaw_dir = home.join(".zeroclaw"); let config_path = zeroclaw_dir.join("config.toml"); let contents = fs::read_to_string(&config_path) .with_context(|| format!("Failed to read config file: {}", config_path.display()))?; let mut config: Config = toml::from_str(&contents) .context("Failed to parse config file for Telegram binding")?; config.config_path = config_path; config.workspace_dir = zeroclaw_dir.join("workspace"); Ok(config) } fn persist_allowed_identity_blocking(identity: &str) -> anyhow::Result<()> { let mut config = Self::load_config_without_env()?; let Some(telegram) = config.channels_config.telegram.as_mut() else { anyhow::bail!("Telegram channel config is missing in config.toml"); }; let normalized = Self::normalize_identity(identity); if normalized.is_empty() { anyhow::bail!("Cannot persist empty Telegram identity"); } if !telegram.allowed_users.iter().any(|u| u == &normalized) { telegram.allowed_users.push(normalized); config .save() .context("Failed to persist Telegram allowlist to config.toml")?; } Ok(()) } async fn persist_allowed_identity(&self, identity: &str) -> anyhow::Result<()> { let identity = identity.to_string(); tokio::task::spawn_blocking(move || Self::persist_allowed_identity_blocking(&identity)) .await .map_err(|e| anyhow::anyhow!("Failed to join Telegram bind save task: {e}"))??; Ok(()) } fn add_allowed_identity_runtime(&self, identity: &str) { let normalized = Self::normalize_identity(identity); if normalized.is_empty() { return; } if let Ok(mut users) = self.allowed_users.write() { if !users.iter().any(|u| u == &normalized) { users.push(normalized); } } } fn extract_bind_code(text: &str) -> Option<&str> { let mut parts = text.split_whitespace(); let command = parts.next()?; let base_command = command.split('@').next().unwrap_or(command); if base_command != TELEGRAM_BIND_COMMAND { return None; } parts.next().map(str::trim).filter(|code| !code.is_empty()) } fn pairing_code_active(&self) -> bool { self.pairing .as_ref() .and_then(PairingGuard::pairing_code) .is_some() } fn api_url(&self, method: &str) -> String { format!("https://api.telegram.org/bot{}/{method}", self.bot_token) } async fn fetch_bot_username(&self) -> anyhow::Result { let resp = self.client.get(self.api_url("getMe")).send().await?; if !resp.status().is_success() { anyhow::bail!("Failed to fetch bot info: {}", resp.status()); } let data: serde_json::Value = resp.json().await?; let username = data .get("result") .and_then(|r| r.get("username")) .and_then(|u| u.as_str()) .context("Bot username not found in response")?; Ok(username.to_string()) } async fn get_bot_username(&self) -> Option { { let cache = self.bot_username.lock(); if let Some(ref username) = *cache { return Some(username.clone()); } } match self.fetch_bot_username().await { Ok(username) => { let mut cache = self.bot_username.lock(); *cache = Some(username.clone()); Some(username) } Err(e) => { tracing::warn!("Failed to fetch bot username: {e}"); None } } } fn is_telegram_username_char(ch: char) -> bool { ch.is_ascii_alphanumeric() || ch == '_' } fn find_bot_mention_spans(text: &str, bot_username: &str) -> Vec<(usize, usize)> { let bot_username = bot_username.trim_start_matches('@'); if bot_username.is_empty() { return Vec::new(); } let mut spans = Vec::new(); for (at_idx, ch) in text.char_indices() { if ch != '@' { continue; } if at_idx > 0 { let prev = text[..at_idx].chars().next_back().unwrap_or(' '); if Self::is_telegram_username_char(prev) { continue; } } let username_start = at_idx + 1; let mut username_end = username_start; for (rel_idx, candidate_ch) in text[username_start..].char_indices() { if Self::is_telegram_username_char(candidate_ch) { username_end = username_start + rel_idx + candidate_ch.len_utf8(); } else { break; } } if username_end == username_start { continue; } let mention_username = &text[username_start..username_end]; if mention_username.eq_ignore_ascii_case(bot_username) { spans.push((at_idx, username_end)); } } spans } fn contains_bot_mention(text: &str, bot_username: &str) -> bool { !Self::find_bot_mention_spans(text, bot_username).is_empty() } fn normalize_incoming_content(text: &str, bot_username: &str) -> Option { let spans = Self::find_bot_mention_spans(text, bot_username); if spans.is_empty() { let normalized = text.split_whitespace().collect::>().join(" "); return (!normalized.is_empty()).then_some(normalized); } let mut normalized = String::with_capacity(text.len()); let mut cursor = 0; for (start, end) in spans { normalized.push_str(&text[cursor..start]); cursor = end; } normalized.push_str(&text[cursor..]); let normalized = normalized.split_whitespace().collect::>().join(" "); (!normalized.is_empty()).then_some(normalized) } fn is_group_message(message: &serde_json::Value) -> bool { message .get("chat") .and_then(|c| c.get("type")) .and_then(|t| t.as_str()) .map(|t| t == "group" || t == "supergroup") .unwrap_or(false) } fn is_user_allowed(&self, username: &str) -> bool { let identity = Self::normalize_identity(username); self.allowed_users .read() .map(|users| users.iter().any(|u| u == "*" || u == &identity)) .unwrap_or(false) } fn is_any_user_allowed<'a, I>(&self, identities: I) -> bool where I: IntoIterator, { identities.into_iter().any(|id| self.is_user_allowed(id)) } async fn handle_unauthorized_message(&self, update: &serde_json::Value) { let Some(message) = update.get("message") else { return; }; let Some(text) = message.get("text").and_then(serde_json::Value::as_str) else { return; }; let username_opt = message .get("from") .and_then(|from| from.get("username")) .and_then(serde_json::Value::as_str); let username = username_opt.unwrap_or("unknown"); let normalized_username = Self::normalize_identity(username); let user_id = message .get("from") .and_then(|from| from.get("id")) .and_then(serde_json::Value::as_i64); let user_id_str = user_id.map(|id| id.to_string()); let normalized_user_id = user_id_str.as_deref().map(Self::normalize_identity); let chat_id = message .get("chat") .and_then(|chat| chat.get("id")) .and_then(serde_json::Value::as_i64) .map(|id| id.to_string()); let Some(chat_id) = chat_id else { tracing::warn!("Telegram: missing chat_id in message, skipping"); return; }; let mut identities = vec![normalized_username.as_str()]; if let Some(ref id) = normalized_user_id { identities.push(id.as_str()); } if self.is_any_user_allowed(identities.iter().copied()) { return; } if let Some(code) = Self::extract_bind_code(text) { if let Some(pairing) = self.pairing.as_ref() { match pairing.try_pair(code) { Ok(Some(_token)) => { let bind_identity = normalized_user_id.clone().or_else(|| { if normalized_username.is_empty() || normalized_username == "unknown" { None } else { Some(normalized_username.clone()) } }); if let Some(identity) = bind_identity { self.add_allowed_identity_runtime(&identity); match self.persist_allowed_identity(&identity).await { Ok(()) => { let _ = self .send(&SendMessage::new( "✅ Telegram account bound successfully. You can talk to ZeroClaw now.", &chat_id, )) .await; tracing::info!( "Telegram: paired and allowlisted identity={identity}" ); } Err(e) => { tracing::error!( "Telegram: failed to persist allowlist after bind: {e}" ); let _ = self .send(&SendMessage::new( "âš ī¸ Bound for this runtime, but failed to persist config. Access may be lost after restart; check config file permissions.", &chat_id, )) .await; } } } else { let _ = self .send(&SendMessage::new( "❌ Could not identify your Telegram account. Ensure your account has a username or stable user ID, then retry.", &chat_id, )) .await; } } Ok(None) => { let _ = self .send(&SendMessage::new( "❌ Invalid binding code. Ask operator for the latest code and retry.", &chat_id, )) .await; } Err(lockout_secs) => { let _ = self .send(&SendMessage::new( format!("âŗ Too many invalid attempts. Retry in {lockout_secs}s."), &chat_id, )) .await; } } } else { let _ = self .send(&SendMessage::new( "â„šī¸ Telegram pairing is not active. Ask operator to update allowlist in config.toml.", &chat_id, )) .await; } return; } tracing::warn!( "Telegram: ignoring message from unauthorized user: username={username}, user_id={}. \ Allowlist Telegram username (without '@') or numeric user ID.", user_id_str.as_deref().unwrap_or("unknown") ); let suggested_identity = normalized_user_id .clone() .or_else(|| { if normalized_username.is_empty() || normalized_username == "unknown" { None } else { Some(normalized_username.clone()) } }) .unwrap_or_else(|| "YOUR_TELEGRAM_ID".to_string()); let _ = self .send(&SendMessage::new( format!( "🔐 This bot requires operator approval.\n\nCopy this command to operator terminal:\n`zeroclaw channel bind-telegram {suggested_identity}`\n\nAfter operator runs it, send your message again." ), &chat_id, )) .await; if self.pairing_code_active() { let _ = self .send(&SendMessage::new( "â„šī¸ If operator provides a one-time pairing code, you can also run `/bind `.", &chat_id, )) .await; } } fn parse_update_message(&self, update: &serde_json::Value) -> Option { let message = update.get("message")?; let text = message.get("text").and_then(serde_json::Value::as_str)?; let username = message .get("from") .and_then(|from| from.get("username")) .and_then(serde_json::Value::as_str) .unwrap_or("unknown") .to_string(); let user_id = message .get("from") .and_then(|from| from.get("id")) .and_then(serde_json::Value::as_i64) .map(|id| id.to_string()); let sender_identity = if username == "unknown" { user_id.clone().unwrap_or_else(|| "unknown".to_string()) } else { username.clone() }; let mut identities = vec![username.as_str()]; if let Some(id) = user_id.as_deref() { identities.push(id); } if !self.is_any_user_allowed(identities.iter().copied()) { return None; } let is_group = Self::is_group_message(message); if self.mention_only && is_group { let bot_username = self.bot_username.lock(); if let Some(ref bot_username) = *bot_username { if !Self::contains_bot_mention(text, bot_username) { return None; } } else { return None; } } let chat_id = message .get("chat") .and_then(|chat| chat.get("id")) .and_then(serde_json::Value::as_i64) .map(|id| id.to_string())?; let message_id = message .get("message_id") .and_then(serde_json::Value::as_i64) .unwrap_or(0); // Extract thread/topic ID for forum support let thread_id = message .get("message_thread_id") .and_then(serde_json::Value::as_i64) .map(|id| id.to_string()); // reply_target: chat_id or chat_id:thread_id format let reply_target = if let Some(tid) = thread_id { format!("{}:{}", chat_id, tid) } else { chat_id.clone() }; let content = if self.mention_only && is_group { let bot_username = self.bot_username.lock(); let bot_username = bot_username.as_ref()?; Self::normalize_incoming_content(text, bot_username)? } else { text.to_string() }; Some(ChannelMessage { id: format!("telegram_{chat_id}_{message_id}"), sender: sender_identity, reply_target, content, channel: "telegram".to_string(), timestamp: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(), }) } async fn send_text_chunks( &self, message: &str, chat_id: &str, thread_id: Option<&str>, ) -> anyhow::Result<()> { let chunks = split_message_for_telegram(message); for (index, chunk) in chunks.iter().enumerate() { let text = if chunks.len() > 1 { if index == 0 { format!("{chunk}\n\n(continues...)") } else if index == chunks.len() - 1 { format!("(continued)\n\n{chunk}") } else { format!("(continued)\n\n{chunk}\n\n(continues...)") } } else { chunk.to_string() }; let mut markdown_body = serde_json::json!({ "chat_id": chat_id, "text": text, "parse_mode": "Markdown" }); // Add message_thread_id for forum topic support if let Some(tid) = thread_id { markdown_body["message_thread_id"] = serde_json::Value::String(tid.to_string()); } let markdown_resp = self .client .post(self.api_url("sendMessage")) .json(&markdown_body) .send() .await?; if markdown_resp.status().is_success() { if index < chunks.len() - 1 { tokio::time::sleep(Duration::from_millis(100)).await; } continue; } let markdown_status = markdown_resp.status(); let markdown_err = markdown_resp.text().await.unwrap_or_default(); tracing::warn!( status = ?markdown_status, "Telegram sendMessage with Markdown failed; retrying without parse_mode" ); let mut plain_body = serde_json::json!({ "chat_id": chat_id, "text": text, }); // Add message_thread_id for forum topic support if let Some(tid) = thread_id { plain_body["message_thread_id"] = serde_json::Value::String(tid.to_string()); } let plain_resp = self .client .post(self.api_url("sendMessage")) .json(&plain_body) .send() .await?; if !plain_resp.status().is_success() { let plain_status = plain_resp.status(); let plain_err = plain_resp.text().await.unwrap_or_default(); anyhow::bail!( "Telegram sendMessage failed (markdown {}: {}; plain {}: {})", markdown_status, markdown_err, plain_status, plain_err ); } if index < chunks.len() - 1 { tokio::time::sleep(Duration::from_millis(100)).await; } } Ok(()) } async fn send_media_by_url( &self, method: &str, media_field: &str, chat_id: &str, thread_id: Option<&str>, url: &str, caption: Option<&str>, ) -> anyhow::Result<()> { let mut body = serde_json::json!({ "chat_id": chat_id, }); body[media_field] = serde_json::Value::String(url.to_string()); if let Some(tid) = thread_id { body["message_thread_id"] = serde_json::Value::String(tid.to_string()); } if let Some(cap) = caption { body["caption"] = serde_json::Value::String(cap.to_string()); } let resp = self .client .post(self.api_url(method)) .json(&body) .send() .await?; if !resp.status().is_success() { let err = resp.text().await?; anyhow::bail!("Telegram {method} by URL failed: {err}"); } tracing::info!("Telegram {method} sent to {chat_id}: {url}"); Ok(()) } async fn send_attachment( &self, chat_id: &str, thread_id: Option<&str>, attachment: &TelegramAttachment, ) -> anyhow::Result<()> { let target = attachment.target.trim(); if is_http_url(target) { return match attachment.kind { TelegramAttachmentKind::Image => { self.send_photo_by_url(chat_id, thread_id, target, None) .await } TelegramAttachmentKind::Document => { self.send_document_by_url(chat_id, thread_id, target, None) .await } TelegramAttachmentKind::Video => { self.send_video_by_url(chat_id, thread_id, target, None) .await } TelegramAttachmentKind::Audio => { self.send_audio_by_url(chat_id, thread_id, target, None) .await } TelegramAttachmentKind::Voice => { self.send_voice_by_url(chat_id, thread_id, target, None) .await } }; } let path = Path::new(target); if !path.exists() { anyhow::bail!("Telegram attachment path not found: {target}"); } match attachment.kind { TelegramAttachmentKind::Image => self.send_photo(chat_id, thread_id, path, None).await, TelegramAttachmentKind::Document => { self.send_document(chat_id, thread_id, path, None).await } TelegramAttachmentKind::Video => self.send_video(chat_id, thread_id, path, None).await, TelegramAttachmentKind::Audio => self.send_audio(chat_id, thread_id, path, None).await, TelegramAttachmentKind::Voice => self.send_voice(chat_id, thread_id, path, None).await, } } /// Send a document/file to a Telegram chat pub async fn send_document( &self, chat_id: &str, thread_id: Option<&str>, file_path: &Path, caption: Option<&str>, ) -> anyhow::Result<()> { let file_name = file_path .file_name() .and_then(|n| n.to_str()) .unwrap_or("file"); let file_bytes = tokio::fs::read(file_path).await?; let part = Part::bytes(file_bytes).file_name(file_name.to_string()); let mut form = Form::new() .text("chat_id", chat_id.to_string()) .part("document", part); if let Some(tid) = thread_id { form = form.text("message_thread_id", tid.to_string()); } if let Some(cap) = caption { form = form.text("caption", cap.to_string()); } let resp = self .client .post(self.api_url("sendDocument")) .multipart(form) .send() .await?; if !resp.status().is_success() { let err = resp.text().await?; anyhow::bail!("Telegram sendDocument failed: {err}"); } tracing::info!("Telegram document sent to {chat_id}: {file_name}"); Ok(()) } /// Send a document from bytes (in-memory) to a Telegram chat pub async fn send_document_bytes( &self, chat_id: &str, thread_id: Option<&str>, file_bytes: Vec, file_name: &str, caption: Option<&str>, ) -> anyhow::Result<()> { let part = Part::bytes(file_bytes).file_name(file_name.to_string()); let mut form = Form::new() .text("chat_id", chat_id.to_string()) .part("document", part); if let Some(tid) = thread_id { form = form.text("message_thread_id", tid.to_string()); } if let Some(cap) = caption { form = form.text("caption", cap.to_string()); } let resp = self .client .post(self.api_url("sendDocument")) .multipart(form) .send() .await?; if !resp.status().is_success() { let err = resp.text().await?; anyhow::bail!("Telegram sendDocument failed: {err}"); } tracing::info!("Telegram document sent to {chat_id}: {file_name}"); Ok(()) } /// Send a photo to a Telegram chat pub async fn send_photo( &self, chat_id: &str, thread_id: Option<&str>, file_path: &Path, caption: Option<&str>, ) -> anyhow::Result<()> { let file_name = file_path .file_name() .and_then(|n| n.to_str()) .unwrap_or("photo.jpg"); let file_bytes = tokio::fs::read(file_path).await?; let part = Part::bytes(file_bytes).file_name(file_name.to_string()); let mut form = Form::new() .text("chat_id", chat_id.to_string()) .part("photo", part); if let Some(tid) = thread_id { form = form.text("message_thread_id", tid.to_string()); } if let Some(cap) = caption { form = form.text("caption", cap.to_string()); } let resp = self .client .post(self.api_url("sendPhoto")) .multipart(form) .send() .await?; if !resp.status().is_success() { let err = resp.text().await?; anyhow::bail!("Telegram sendPhoto failed: {err}"); } tracing::info!("Telegram photo sent to {chat_id}: {file_name}"); Ok(()) } /// Send a photo from bytes (in-memory) to a Telegram chat pub async fn send_photo_bytes( &self, chat_id: &str, thread_id: Option<&str>, file_bytes: Vec, file_name: &str, caption: Option<&str>, ) -> anyhow::Result<()> { let part = Part::bytes(file_bytes).file_name(file_name.to_string()); let mut form = Form::new() .text("chat_id", chat_id.to_string()) .part("photo", part); if let Some(tid) = thread_id { form = form.text("message_thread_id", tid.to_string()); } if let Some(cap) = caption { form = form.text("caption", cap.to_string()); } let resp = self .client .post(self.api_url("sendPhoto")) .multipart(form) .send() .await?; if !resp.status().is_success() { let err = resp.text().await?; anyhow::bail!("Telegram sendPhoto failed: {err}"); } tracing::info!("Telegram photo sent to {chat_id}: {file_name}"); Ok(()) } /// Send a video to a Telegram chat pub async fn send_video( &self, chat_id: &str, thread_id: Option<&str>, file_path: &Path, caption: Option<&str>, ) -> anyhow::Result<()> { let file_name = file_path .file_name() .and_then(|n| n.to_str()) .unwrap_or("video.mp4"); let file_bytes = tokio::fs::read(file_path).await?; let part = Part::bytes(file_bytes).file_name(file_name.to_string()); let mut form = Form::new() .text("chat_id", chat_id.to_string()) .part("video", part); if let Some(tid) = thread_id { form = form.text("message_thread_id", tid.to_string()); } if let Some(cap) = caption { form = form.text("caption", cap.to_string()); } let resp = self .client .post(self.api_url("sendVideo")) .multipart(form) .send() .await?; if !resp.status().is_success() { let err = resp.text().await?; anyhow::bail!("Telegram sendVideo failed: {err}"); } tracing::info!("Telegram video sent to {chat_id}: {file_name}"); Ok(()) } /// Send an audio file to a Telegram chat pub async fn send_audio( &self, chat_id: &str, thread_id: Option<&str>, file_path: &Path, caption: Option<&str>, ) -> anyhow::Result<()> { let file_name = file_path .file_name() .and_then(|n| n.to_str()) .unwrap_or("audio.mp3"); let file_bytes = tokio::fs::read(file_path).await?; let part = Part::bytes(file_bytes).file_name(file_name.to_string()); let mut form = Form::new() .text("chat_id", chat_id.to_string()) .part("audio", part); if let Some(tid) = thread_id { form = form.text("message_thread_id", tid.to_string()); } if let Some(cap) = caption { form = form.text("caption", cap.to_string()); } let resp = self .client .post(self.api_url("sendAudio")) .multipart(form) .send() .await?; if !resp.status().is_success() { let err = resp.text().await?; anyhow::bail!("Telegram sendAudio failed: {err}"); } tracing::info!("Telegram audio sent to {chat_id}: {file_name}"); Ok(()) } /// Send a voice message to a Telegram chat pub async fn send_voice( &self, chat_id: &str, thread_id: Option<&str>, file_path: &Path, caption: Option<&str>, ) -> anyhow::Result<()> { let file_name = file_path .file_name() .and_then(|n| n.to_str()) .unwrap_or("voice.ogg"); let file_bytes = tokio::fs::read(file_path).await?; let part = Part::bytes(file_bytes).file_name(file_name.to_string()); let mut form = Form::new() .text("chat_id", chat_id.to_string()) .part("voice", part); if let Some(tid) = thread_id { form = form.text("message_thread_id", tid.to_string()); } if let Some(cap) = caption { form = form.text("caption", cap.to_string()); } let resp = self .client .post(self.api_url("sendVoice")) .multipart(form) .send() .await?; if !resp.status().is_success() { let err = resp.text().await?; anyhow::bail!("Telegram sendVoice failed: {err}"); } tracing::info!("Telegram voice sent to {chat_id}: {file_name}"); Ok(()) } /// Send a file by URL (Telegram will download it) pub async fn send_document_by_url( &self, chat_id: &str, thread_id: Option<&str>, url: &str, caption: Option<&str>, ) -> anyhow::Result<()> { let mut body = serde_json::json!({ "chat_id": chat_id, "document": url }); if let Some(tid) = thread_id { body["message_thread_id"] = serde_json::Value::String(tid.to_string()); } if let Some(cap) = caption { body["caption"] = serde_json::Value::String(cap.to_string()); } let resp = self .client .post(self.api_url("sendDocument")) .json(&body) .send() .await?; if !resp.status().is_success() { let err = resp.text().await?; anyhow::bail!("Telegram sendDocument by URL failed: {err}"); } tracing::info!("Telegram document (URL) sent to {chat_id}: {url}"); Ok(()) } /// Send a photo by URL (Telegram will download it) pub async fn send_photo_by_url( &self, chat_id: &str, thread_id: Option<&str>, url: &str, caption: Option<&str>, ) -> anyhow::Result<()> { let mut body = serde_json::json!({ "chat_id": chat_id, "photo": url }); if let Some(tid) = thread_id { body["message_thread_id"] = serde_json::Value::String(tid.to_string()); } if let Some(cap) = caption { body["caption"] = serde_json::Value::String(cap.to_string()); } let resp = self .client .post(self.api_url("sendPhoto")) .json(&body) .send() .await?; if !resp.status().is_success() { let err = resp.text().await?; anyhow::bail!("Telegram sendPhoto by URL failed: {err}"); } tracing::info!("Telegram photo (URL) sent to {chat_id}: {url}"); Ok(()) } /// Send a video by URL (Telegram will download it) pub async fn send_video_by_url( &self, chat_id: &str, thread_id: Option<&str>, url: &str, caption: Option<&str>, ) -> anyhow::Result<()> { self.send_media_by_url("sendVideo", "video", chat_id, thread_id, url, caption) .await } /// Send an audio file by URL (Telegram will download it) pub async fn send_audio_by_url( &self, chat_id: &str, thread_id: Option<&str>, url: &str, caption: Option<&str>, ) -> anyhow::Result<()> { self.send_media_by_url("sendAudio", "audio", chat_id, thread_id, url, caption) .await } /// Send a voice message by URL (Telegram will download it) pub async fn send_voice_by_url( &self, chat_id: &str, thread_id: Option<&str>, url: &str, caption: Option<&str>, ) -> anyhow::Result<()> { self.send_media_by_url("sendVoice", "voice", chat_id, thread_id, url, caption) .await } } #[async_trait] impl Channel for TelegramChannel { fn name(&self) -> &str { "telegram" } fn supports_draft_updates(&self) -> bool { self.stream_mode != StreamMode::Off } async fn send_draft(&self, message: &SendMessage) -> anyhow::Result> { if self.stream_mode == StreamMode::Off { return Ok(None); } let (chat_id, thread_id) = Self::parse_reply_target(&message.recipient); let initial_text = if message.content.is_empty() { "...".to_string() } else { message.content.clone() }; let mut body = serde_json::json!({ "chat_id": chat_id, "text": initial_text, }); if let Some(tid) = thread_id { body["message_thread_id"] = serde_json::Value::String(tid.to_string()); } let resp = self .client .post(self.api_url("sendMessage")) .json(&body) .send() .await?; if !resp.status().is_success() { let err = resp.text().await.unwrap_or_default(); anyhow::bail!("Telegram sendMessage (draft) failed: {err}"); } let resp_json: serde_json::Value = resp.json().await?; let message_id = resp_json .get("result") .and_then(|r| r.get("message_id")) .and_then(|id| id.as_i64()) .map(|id| id.to_string()); self.last_draft_edit .lock() .insert(chat_id.to_string(), std::time::Instant::now()); Ok(message_id) } async fn update_draft( &self, recipient: &str, message_id: &str, text: &str, ) -> anyhow::Result<()> { let (chat_id, _) = Self::parse_reply_target(recipient); // Rate-limit edits per chat { let last_edits = self.last_draft_edit.lock(); if let Some(last_time) = last_edits.get(&chat_id) { let elapsed = u64::try_from(last_time.elapsed().as_millis()).unwrap_or(u64::MAX); if elapsed < self.draft_update_interval_ms { return Ok(()); } } } // Truncate to Telegram limit for mid-stream edits (UTF-8 safe) let display_text = if text.len() > TELEGRAM_MAX_MESSAGE_LENGTH { let mut end = 0; for (idx, ch) in text.char_indices() { let next = idx + ch.len_utf8(); if next > TELEGRAM_MAX_MESSAGE_LENGTH { break; } end = next; } &text[..end] } else { text }; let message_id_parsed = match message_id.parse::() { Ok(id) => id, Err(e) => { tracing::warn!("Invalid Telegram message_id '{message_id}': {e}"); return Ok(()); } }; let body = serde_json::json!({ "chat_id": chat_id, "message_id": message_id_parsed, "text": display_text, }); let resp = self .client .post(self.api_url("editMessageText")) .json(&body) .send() .await?; if resp.status().is_success() { self.last_draft_edit .lock() .insert(chat_id.clone(), std::time::Instant::now()); } else { let status = resp.status(); let err = resp.text().await.unwrap_or_default(); tracing::debug!("Telegram editMessageText failed ({status}): {err}"); } Ok(()) } async fn finalize_draft( &self, recipient: &str, message_id: &str, text: &str, ) -> anyhow::Result<()> { let text = &strip_tool_call_tags(text); let (chat_id, thread_id) = Self::parse_reply_target(recipient); // Clean up rate-limit tracking for this chat self.last_draft_edit.lock().remove(&chat_id); // If text exceeds limit, delete draft and send as chunked messages if text.len() > TELEGRAM_MAX_MESSAGE_LENGTH { let msg_id = match message_id.parse::() { Ok(id) => id, Err(e) => { tracing::warn!("Invalid Telegram message_id '{message_id}': {e}"); return self .send_text_chunks(text, &chat_id, thread_id.as_deref()) .await; } }; // Delete the draft let _ = self .client .post(self.api_url("deleteMessage")) .json(&serde_json::json!({ "chat_id": chat_id, "message_id": msg_id, })) .send() .await; // Fall back to chunked send return self .send_text_chunks(text, &chat_id, thread_id.as_deref()) .await; } let msg_id = match message_id.parse::() { Ok(id) => id, Err(e) => { tracing::warn!("Invalid Telegram message_id '{message_id}': {e}"); return self .send_text_chunks(text, &chat_id, thread_id.as_deref()) .await; } }; // Try editing with Markdown formatting let body = serde_json::json!({ "chat_id": chat_id, "message_id": msg_id, "text": text, "parse_mode": "Markdown", }); let resp = self .client .post(self.api_url("editMessageText")) .json(&body) .send() .await?; if resp.status().is_success() { return Ok(()); } // Markdown failed — retry without parse_mode let plain_body = serde_json::json!({ "chat_id": chat_id, "message_id": msg_id, "text": text, }); let resp = self .client .post(self.api_url("editMessageText")) .json(&plain_body) .send() .await?; if resp.status().is_success() { return Ok(()); } // Edit failed entirely — fall back to new message tracing::warn!("Telegram finalize_draft edit failed; falling back to sendMessage"); self.send_text_chunks(text, &chat_id, thread_id.as_deref()) .await } async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { // Strip tool_call tags before processing to prevent Markdown parsing failures let content = strip_tool_call_tags(&message.content); // Parse recipient: "chat_id" or "chat_id:thread_id" format let (chat_id, thread_id) = match message.recipient.split_once(':') { Some((chat, thread)) => (chat, Some(thread)), None => (message.recipient.as_str(), None), }; let (text_without_markers, attachments) = parse_attachment_markers(&content); if !attachments.is_empty() { if !text_without_markers.is_empty() { self.send_text_chunks(&text_without_markers, chat_id, thread_id) .await?; } for attachment in &attachments { self.send_attachment(chat_id, thread_id, attachment).await?; } return Ok(()); } if let Some(attachment) = parse_path_only_attachment(&content) { self.send_attachment(chat_id, thread_id, &attachment) .await?; return Ok(()); } self.send_text_chunks(&content, chat_id, thread_id).await } async fn listen(&self, tx: tokio::sync::mpsc::Sender) -> anyhow::Result<()> { let mut offset: i64 = 0; if self.mention_only { let _ = self.get_bot_username().await; } tracing::info!("Telegram channel listening for messages..."); loop { if self.mention_only { let missing_username = self.bot_username.lock().is_none(); if missing_username { let _ = self.get_bot_username().await; } } let url = self.api_url("getUpdates"); let body = serde_json::json!({ "offset": offset, "timeout": 30, "allowed_updates": ["message"] }); let resp = match self.client.post(&url).json(&body).send().await { Ok(r) => r, Err(e) => { tracing::warn!("Telegram poll error: {e}"); tokio::time::sleep(std::time::Duration::from_secs(5)).await; continue; } }; let data: serde_json::Value = match resp.json().await { Ok(d) => d, Err(e) => { tracing::warn!("Telegram parse error: {e}"); tokio::time::sleep(std::time::Duration::from_secs(5)).await; continue; } }; let ok = data .get("ok") .and_then(serde_json::Value::as_bool) .unwrap_or(true); if !ok { let error_code = data .get("error_code") .and_then(serde_json::Value::as_i64) .unwrap_or_default(); let description = data .get("description") .and_then(serde_json::Value::as_str) .unwrap_or("unknown Telegram API error"); if error_code == 409 { tracing::warn!( "Telegram polling conflict (409): {description}. \ Ensure only one `zeroclaw` process is using this bot token." ); tokio::time::sleep(std::time::Duration::from_secs(2)).await; } else { tracing::warn!( "Telegram getUpdates API error (code={}): {description}", error_code ); tokio::time::sleep(std::time::Duration::from_secs(5)).await; } continue; } if let Some(results) = data.get("result").and_then(serde_json::Value::as_array) { for update in results { // Advance offset past this update if let Some(uid) = update.get("update_id").and_then(serde_json::Value::as_i64) { offset = uid + 1; } let Some(msg) = self.parse_update_message(update) else { self.handle_unauthorized_message(update).await; continue; }; // Send "typing" indicator immediately when we receive a message let typing_body = serde_json::json!({ "chat_id": &msg.reply_target, "action": "typing" }); let _ = self .client .post(self.api_url("sendChatAction")) .json(&typing_body) .send() .await; // Ignore errors for typing indicator if tx.send(msg).await.is_err() { return Ok(()); } } } } } async fn health_check(&self) -> bool { let timeout_duration = Duration::from_secs(5); match tokio::time::timeout( timeout_duration, self.client.get(self.api_url("getMe")).send(), ) .await { Ok(Ok(resp)) => resp.status().is_success(), Ok(Err(e)) => { tracing::debug!("Telegram health check failed: {e}"); false } Err(_) => { tracing::debug!("Telegram health check timed out after 5s"); false } } } async fn start_typing(&self, recipient: &str) -> anyhow::Result<()> { self.stop_typing(recipient).await?; let client = self.client.clone(); let url = self.api_url("sendChatAction"); let chat_id = recipient.to_string(); let handle = tokio::spawn(async move { loop { let body = serde_json::json!({ "chat_id": &chat_id, "action": "typing" }); let _ = client.post(&url).json(&body).send().await; // Telegram typing indicator expires after 5s; refresh at 4s tokio::time::sleep(Duration::from_secs(4)).await; } }); let mut guard = self.typing_handle.lock(); *guard = Some(handle); Ok(()) } async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> { let mut guard = self.typing_handle.lock(); if let Some(handle) = guard.take() { handle.abort(); } Ok(()) } } #[cfg(test)] mod tests { use super::*; #[test] fn telegram_channel_name() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); assert_eq!(ch.name(), "telegram"); } #[test] fn typing_handle_starts_as_none() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); let guard = ch.typing_handle.lock(); assert!(guard.is_none()); } #[tokio::test] async fn stop_typing_clears_handle() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); // Manually insert a dummy handle { let mut guard = ch.typing_handle.lock(); *guard = Some(tokio::spawn(async { tokio::time::sleep(Duration::from_secs(60)).await; })); } // stop_typing should abort and clear ch.stop_typing("123").await.unwrap(); let guard = ch.typing_handle.lock(); assert!(guard.is_none()); } #[tokio::test] async fn start_typing_replaces_previous_handle() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); // Insert a dummy handle first { let mut guard = ch.typing_handle.lock(); *guard = Some(tokio::spawn(async { tokio::time::sleep(Duration::from_secs(60)).await; })); } // start_typing should abort the old handle and set a new one let _ = ch.start_typing("123").await; let guard = ch.typing_handle.lock(); assert!(guard.is_some()); } #[test] fn supports_draft_updates_respects_stream_mode() { let off = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); assert!(!off.supports_draft_updates()); let partial = TelegramChannel::new("fake-token".into(), vec!["*".into()], false) .with_streaming(StreamMode::Partial, 750); assert!(partial.supports_draft_updates()); assert_eq!(partial.draft_update_interval_ms, 750); } #[tokio::test] async fn send_draft_returns_none_when_stream_mode_off() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); let id = ch .send_draft(&SendMessage::new("draft", "123")) .await .unwrap(); assert!(id.is_none()); } #[tokio::test] async fn update_draft_rate_limit_short_circuits_network() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false) .with_streaming(StreamMode::Partial, 60_000); ch.last_draft_edit .lock() .insert("123".to_string(), std::time::Instant::now()); let result = ch.update_draft("123", "42", "delta text").await; assert!(result.is_ok()); } #[tokio::test] async fn update_draft_utf8_truncation_is_safe_for_multibyte_text() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false) .with_streaming(StreamMode::Partial, 0); let long_emoji_text = "😀".repeat(TELEGRAM_MAX_MESSAGE_LENGTH + 20); // Invalid message_id returns early after building display_text. // This asserts truncation never panics on UTF-8 boundaries. let result = ch .update_draft("123", "not-a-number", &long_emoji_text) .await; assert!(result.is_ok()); } #[tokio::test] async fn finalize_draft_invalid_message_id_falls_back_to_chunk_send() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false) .with_streaming(StreamMode::Partial, 0); let long_text = "a".repeat(TELEGRAM_MAX_MESSAGE_LENGTH + 64); // For oversized text + invalid draft message_id, finalize_draft should // fall back to chunked send instead of returning early. let result = ch.finalize_draft("123", "not-a-number", &long_text).await; assert!(result.is_err()); } #[test] fn telegram_api_url() { let ch = TelegramChannel::new("123:ABC".into(), vec![], false); assert_eq!( ch.api_url("getMe"), "https://api.telegram.org/bot123:ABC/getMe" ); } #[test] fn telegram_user_allowed_wildcard() { let ch = TelegramChannel::new("t".into(), vec!["*".into()], false); assert!(ch.is_user_allowed("anyone")); } #[test] fn telegram_user_allowed_specific() { let ch = TelegramChannel::new("t".into(), vec!["alice".into(), "bob".into()], false); assert!(ch.is_user_allowed("alice")); assert!(!ch.is_user_allowed("eve")); } #[test] fn telegram_user_allowed_with_at_prefix_in_config() { let ch = TelegramChannel::new("t".into(), vec!["@alice".into()], false); assert!(ch.is_user_allowed("alice")); } #[test] fn telegram_user_denied_empty() { let ch = TelegramChannel::new("t".into(), vec![], false); assert!(!ch.is_user_allowed("anyone")); } #[test] fn telegram_user_exact_match_not_substring() { let ch = TelegramChannel::new("t".into(), vec!["alice".into()], false); assert!(!ch.is_user_allowed("alice_bot")); assert!(!ch.is_user_allowed("alic")); assert!(!ch.is_user_allowed("malice")); } #[test] fn telegram_user_empty_string_denied() { let ch = TelegramChannel::new("t".into(), vec!["alice".into()], false); assert!(!ch.is_user_allowed("")); } #[test] fn telegram_user_case_sensitive() { let ch = TelegramChannel::new("t".into(), vec!["Alice".into()], false); assert!(ch.is_user_allowed("Alice")); assert!(!ch.is_user_allowed("alice")); assert!(!ch.is_user_allowed("ALICE")); } #[test] fn telegram_wildcard_with_specific_users() { let ch = TelegramChannel::new("t".into(), vec!["alice".into(), "*".into()], false); assert!(ch.is_user_allowed("alice")); assert!(ch.is_user_allowed("bob")); assert!(ch.is_user_allowed("anyone")); } #[test] fn telegram_user_allowed_by_numeric_id_identity() { let ch = TelegramChannel::new("t".into(), vec!["123456789".into()], false); assert!(ch.is_any_user_allowed(["unknown", "123456789"])); } #[test] fn telegram_user_denied_when_none_of_identities_match() { let ch = TelegramChannel::new("t".into(), vec!["alice".into(), "987654321".into()], false); assert!(!ch.is_any_user_allowed(["unknown", "123456789"])); } #[test] fn telegram_pairing_enabled_with_empty_allowlist() { let ch = TelegramChannel::new("t".into(), vec![], false); assert!(ch.pairing_code_active()); } #[test] fn telegram_pairing_disabled_with_nonempty_allowlist() { let ch = TelegramChannel::new("t".into(), vec!["alice".into()], false); assert!(!ch.pairing_code_active()); } #[test] fn telegram_extract_bind_code_plain_command() { assert_eq!( TelegramChannel::extract_bind_code("/bind 123456"), Some("123456") ); } #[test] fn telegram_extract_bind_code_supports_bot_mention() { assert_eq!( TelegramChannel::extract_bind_code("/bind@zeroclaw_bot 654321"), Some("654321") ); } #[test] fn telegram_extract_bind_code_rejects_invalid_forms() { assert_eq!(TelegramChannel::extract_bind_code("/bind"), None); assert_eq!(TelegramChannel::extract_bind_code("/start"), None); } #[test] fn parse_attachment_markers_extracts_multiple_types() { let message = "Here are files [IMAGE:/tmp/a.png] and [DOCUMENT:https://example.com/a.pdf]"; let (cleaned, attachments) = parse_attachment_markers(message); assert_eq!(cleaned, "Here are files and"); assert_eq!(attachments.len(), 2); assert_eq!(attachments[0].kind, TelegramAttachmentKind::Image); assert_eq!(attachments[0].target, "/tmp/a.png"); assert_eq!(attachments[1].kind, TelegramAttachmentKind::Document); assert_eq!(attachments[1].target, "https://example.com/a.pdf"); } #[test] fn parse_attachment_markers_keeps_invalid_markers_in_text() { let message = "Report [UNKNOWN:/tmp/a.bin]"; let (cleaned, attachments) = parse_attachment_markers(message); assert_eq!(cleaned, "Report [UNKNOWN:/tmp/a.bin]"); assert!(attachments.is_empty()); } #[test] fn parse_path_only_attachment_detects_existing_file() { let dir = tempfile::tempdir().unwrap(); let image_path = dir.path().join("snap.png"); std::fs::write(&image_path, b"fake-png").unwrap(); let parsed = parse_path_only_attachment(image_path.to_string_lossy().as_ref()) .expect("expected attachment"); assert_eq!(parsed.kind, TelegramAttachmentKind::Image); assert_eq!(parsed.target, image_path.to_string_lossy()); } #[test] fn parse_path_only_attachment_rejects_sentence_text() { assert!(parse_path_only_attachment("Screenshot saved to /tmp/snap.png").is_none()); } #[test] fn infer_attachment_kind_from_target_detects_document_extension() { assert_eq!( infer_attachment_kind_from_target("https://example.com/files/specs.pdf?download=1"), Some(TelegramAttachmentKind::Document) ); } #[test] fn parse_update_message_uses_chat_id_as_reply_target() { let ch = TelegramChannel::new("token".into(), vec!["*".into()], false); let update = serde_json::json!({ "update_id": 1, "message": { "message_id": 33, "text": "hello", "from": { "id": 555, "username": "alice" }, "chat": { "id": -100_200_300 } } }); let msg = ch .parse_update_message(&update) .expect("message should parse"); assert_eq!(msg.sender, "alice"); assert_eq!(msg.reply_target, "-100200300"); assert_eq!(msg.content, "hello"); assert_eq!(msg.id, "telegram_-100200300_33"); } #[test] fn parse_update_message_allows_numeric_id_without_username() { let ch = TelegramChannel::new("token".into(), vec!["555".into()], false); let update = serde_json::json!({ "update_id": 2, "message": { "message_id": 9, "text": "ping", "from": { "id": 555 }, "chat": { "id": 12345 } } }); let msg = ch .parse_update_message(&update) .expect("numeric allowlist should pass"); assert_eq!(msg.sender, "555"); assert_eq!(msg.reply_target, "12345"); } #[test] fn parse_update_message_extracts_thread_id_for_forum_topic() { let ch = TelegramChannel::new("token".into(), vec!["*".into()], false); let update = serde_json::json!({ "update_id": 3, "message": { "message_id": 42, "text": "hello from topic", "from": { "id": 555, "username": "alice" }, "chat": { "id": -100_200_300 }, "message_thread_id": 789 } }); let msg = ch .parse_update_message(&update) .expect("message with thread_id should parse"); assert_eq!(msg.sender, "alice"); assert_eq!(msg.reply_target, "-100200300:789"); assert_eq!(msg.content, "hello from topic"); assert_eq!(msg.id, "telegram_-100200300_42"); } // ── File sending API URL tests ────────────────────────────────── #[test] fn telegram_api_url_send_document() { let ch = TelegramChannel::new("123:ABC".into(), vec![], false); assert_eq!( ch.api_url("sendDocument"), "https://api.telegram.org/bot123:ABC/sendDocument" ); } #[test] fn telegram_api_url_send_photo() { let ch = TelegramChannel::new("123:ABC".into(), vec![], false); assert_eq!( ch.api_url("sendPhoto"), "https://api.telegram.org/bot123:ABC/sendPhoto" ); } #[test] fn telegram_api_url_send_video() { let ch = TelegramChannel::new("123:ABC".into(), vec![], false); assert_eq!( ch.api_url("sendVideo"), "https://api.telegram.org/bot123:ABC/sendVideo" ); } #[test] fn telegram_api_url_send_audio() { let ch = TelegramChannel::new("123:ABC".into(), vec![], false); assert_eq!( ch.api_url("sendAudio"), "https://api.telegram.org/bot123:ABC/sendAudio" ); } #[test] fn telegram_api_url_send_voice() { let ch = TelegramChannel::new("123:ABC".into(), vec![], false); assert_eq!( ch.api_url("sendVoice"), "https://api.telegram.org/bot123:ABC/sendVoice" ); } // ── File sending integration tests (with mock server) ────────── #[tokio::test] async fn telegram_send_document_bytes_builds_correct_form() { // This test verifies the method doesn't panic and handles bytes correctly let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); let file_bytes = b"Hello, this is a test file content".to_vec(); // The actual API call will fail (no real server), but we verify the method exists // and handles the input correctly up to the network call let result = ch .send_document_bytes("123456", None, file_bytes, "test.txt", Some("Test caption")) .await; // Should fail with network error, not a panic or type error assert!(result.is_err()); let err = result.unwrap_err().to_string(); // Error should be network-related, not a code bug assert!( err.contains("error") || err.contains("failed") || err.contains("connect"), "Expected network error, got: {err}" ); } #[tokio::test] async fn telegram_send_photo_bytes_builds_correct_form() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); // Minimal valid PNG header bytes let file_bytes = vec![0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A]; let result = ch .send_photo_bytes("123456", None, file_bytes, "test.png", None) .await; assert!(result.is_err()); } #[tokio::test] async fn telegram_send_document_by_url_builds_correct_json() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); let result = ch .send_document_by_url( "123456", None, "https://example.com/file.pdf", Some("PDF doc"), ) .await; assert!(result.is_err()); } #[tokio::test] async fn telegram_send_photo_by_url_builds_correct_json() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); let result = ch .send_photo_by_url("123456", None, "https://example.com/image.jpg", None) .await; assert!(result.is_err()); } // ── File path handling tests ──────────────────────────────────── #[tokio::test] async fn telegram_send_document_nonexistent_file() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); let path = Path::new("/nonexistent/path/to/file.txt"); let result = ch.send_document("123456", None, path, None).await; assert!(result.is_err()); let err = result.unwrap_err().to_string(); // Should fail with file not found error assert!( err.contains("No such file") || err.contains("not found") || err.contains("os error"), "Expected file not found error, got: {err}" ); } #[tokio::test] async fn telegram_send_photo_nonexistent_file() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); let path = Path::new("/nonexistent/path/to/photo.jpg"); let result = ch.send_photo("123456", None, path, None).await; assert!(result.is_err()); } #[tokio::test] async fn telegram_send_video_nonexistent_file() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); let path = Path::new("/nonexistent/path/to/video.mp4"); let result = ch.send_video("123456", None, path, None).await; assert!(result.is_err()); } #[tokio::test] async fn telegram_send_audio_nonexistent_file() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); let path = Path::new("/nonexistent/path/to/audio.mp3"); let result = ch.send_audio("123456", None, path, None).await; assert!(result.is_err()); } #[tokio::test] async fn telegram_send_voice_nonexistent_file() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); let path = Path::new("/nonexistent/path/to/voice.ogg"); let result = ch.send_voice("123456", None, path, None).await; assert!(result.is_err()); } // ── Message splitting tests ───────────────────────────────────── #[test] fn telegram_split_short_message() { let msg = "Hello, world!"; let chunks = split_message_for_telegram(msg); assert_eq!(chunks.len(), 1); assert_eq!(chunks[0], msg); } #[test] fn telegram_split_exact_limit() { let msg = "a".repeat(TELEGRAM_MAX_MESSAGE_LENGTH); let chunks = split_message_for_telegram(&msg); assert_eq!(chunks.len(), 1); assert_eq!(chunks[0].len(), TELEGRAM_MAX_MESSAGE_LENGTH); } #[test] fn telegram_split_over_limit() { let msg = "a".repeat(TELEGRAM_MAX_MESSAGE_LENGTH + 100); let chunks = split_message_for_telegram(&msg); assert_eq!(chunks.len(), 2); assert!(chunks[0].len() <= TELEGRAM_MAX_MESSAGE_LENGTH); assert!(chunks[1].len() <= TELEGRAM_MAX_MESSAGE_LENGTH); } #[test] fn telegram_split_at_word_boundary() { let msg = format!( "{} more text here", "word ".repeat(TELEGRAM_MAX_MESSAGE_LENGTH / 5) ); let chunks = split_message_for_telegram(&msg); assert!(chunks.len() >= 2); // First chunk should end with a complete word (space at the end) for chunk in &chunks[..chunks.len() - 1] { assert!(chunk.len() <= TELEGRAM_MAX_MESSAGE_LENGTH); } } #[test] fn telegram_split_at_newline() { let text_block = "Line of text\n".repeat(TELEGRAM_MAX_MESSAGE_LENGTH / 13 + 1); let chunks = split_message_for_telegram(&text_block); assert!(chunks.len() >= 2); for chunk in chunks { assert!(chunk.len() <= TELEGRAM_MAX_MESSAGE_LENGTH); } } #[test] fn telegram_split_preserves_content() { let msg = "test ".repeat(TELEGRAM_MAX_MESSAGE_LENGTH / 5 + 100); let chunks = split_message_for_telegram(&msg); let rejoined = chunks.join(""); assert_eq!(rejoined, msg); } #[test] fn telegram_split_empty_message() { let chunks = split_message_for_telegram(""); assert_eq!(chunks.len(), 1); assert_eq!(chunks[0], ""); } #[test] fn telegram_split_very_long_message() { let msg = "x".repeat(TELEGRAM_MAX_MESSAGE_LENGTH * 3); let chunks = split_message_for_telegram(&msg); assert!(chunks.len() >= 3); for chunk in chunks { assert!(chunk.len() <= TELEGRAM_MAX_MESSAGE_LENGTH); } } // ── Caption handling tests ────────────────────────────────────── #[tokio::test] async fn telegram_send_document_bytes_with_caption() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); let file_bytes = b"test content".to_vec(); // With caption let result = ch .send_document_bytes( "123456", None, file_bytes.clone(), "test.txt", Some("My caption"), ) .await; assert!(result.is_err()); // Network error expected // Without caption let result = ch .send_document_bytes("123456", None, file_bytes, "test.txt", None) .await; assert!(result.is_err()); // Network error expected } #[tokio::test] async fn telegram_send_photo_bytes_with_caption() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); let file_bytes = vec![0x89, 0x50, 0x4E, 0x47]; // With caption let result = ch .send_photo_bytes( "123456", None, file_bytes.clone(), "test.png", Some("Photo caption"), ) .await; assert!(result.is_err()); // Without caption let result = ch .send_photo_bytes("123456", None, file_bytes, "test.png", None) .await; assert!(result.is_err()); } // ── Empty/edge case tests ─────────────────────────────────────── #[tokio::test] async fn telegram_send_document_bytes_empty_file() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); let file_bytes: Vec = vec![]; let result = ch .send_document_bytes("123456", None, file_bytes, "empty.txt", None) .await; // Should not panic, will fail at API level assert!(result.is_err()); } #[tokio::test] async fn telegram_send_document_bytes_empty_filename() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); let file_bytes = b"content".to_vec(); let result = ch .send_document_bytes("123456", None, file_bytes, "", None) .await; // Should not panic assert!(result.is_err()); } #[tokio::test] async fn telegram_send_document_bytes_empty_chat_id() { let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false); let file_bytes = b"content".to_vec(); let result = ch .send_document_bytes("", None, file_bytes, "test.txt", None) .await; // Should not panic assert!(result.is_err()); } // ── Message ID edge cases ───────────────────────────────────── #[test] fn telegram_message_id_format_includes_chat_and_message_id() { // Verify that message IDs follow the format: telegram_{chat_id}_{message_id} let chat_id = "123456"; let message_id = 789; let expected_id = format!("telegram_{chat_id}_{message_id}"); assert_eq!(expected_id, "telegram_123456_789"); } #[test] fn telegram_message_id_is_deterministic() { // Same chat_id + same message_id = same ID (prevents duplicates after restart) let chat_id = "123456"; let message_id = 789; let id1 = format!("telegram_{chat_id}_{message_id}"); let id2 = format!("telegram_{chat_id}_{message_id}"); assert_eq!(id1, id2); } #[test] fn telegram_message_id_different_message_different_id() { // Different message IDs produce different IDs let chat_id = "123456"; let id1 = format!("telegram_{chat_id}_789"); let id2 = format!("telegram_{chat_id}_790"); assert_ne!(id1, id2); } #[test] fn telegram_message_id_different_chat_different_id() { // Different chats produce different IDs even with same message_id let message_id = 789; let id1 = format!("telegram_123456_{message_id}"); let id2 = format!("telegram_789012_{message_id}"); assert_ne!(id1, id2); } #[test] fn telegram_message_id_no_uuid_randomness() { // Verify format doesn't contain random UUID components let chat_id = "123456"; let message_id = 789; let id = format!("telegram_{chat_id}_{message_id}"); assert!(!id.contains('-')); // No UUID dashes assert!(id.starts_with("telegram_")); } #[test] fn telegram_message_id_handles_zero_message_id() { // Edge case: message_id can be 0 (fallback/missing case) let chat_id = "123456"; let message_id = 0; let id = format!("telegram_{chat_id}_{message_id}"); assert_eq!(id, "telegram_123456_0"); } // ── Tool call tag stripping tests ─────────────────────────────────── #[test] fn strip_tool_call_tags_removes_standard_tags() { let input = "Hello {\"name\":\"shell\",\"arguments\":{\"command\":\"ls\"}} world"; let result = strip_tool_call_tags(input); assert_eq!(result, "Hello world"); } #[test] fn strip_tool_call_tags_removes_alias_tags() { let input = "Hello {\"name\":\"shell\",\"arguments\":{\"command\":\"ls\"}} world"; let result = strip_tool_call_tags(input); assert_eq!(result, "Hello world"); } #[test] fn strip_tool_call_tags_removes_dash_tags() { let input = "Hello {\"name\":\"shell\",\"arguments\":{\"command\":\"ls\"}} world"; let result = strip_tool_call_tags(input); assert_eq!(result, "Hello world"); } #[test] fn strip_tool_call_tags_removes_tool_call_tags() { let input = "Hello {\"name\":\"shell\",\"arguments\":{\"command\":\"ls\"}} world"; let result = strip_tool_call_tags(input); assert_eq!(result, "Hello world"); } #[test] fn strip_tool_call_tags_removes_invoke_tags() { let input = "Hello {\"name\":\"shell\",\"arguments\":{\"command\":\"date\"}} world"; let result = strip_tool_call_tags(input); assert_eq!(result, "Hello world"); } #[test] fn strip_tool_call_tags_handles_multiple_tags() { let input = "Start a middle b end"; let result = strip_tool_call_tags(input); assert_eq!(result, "Start middle end"); } #[test] fn strip_tool_call_tags_handles_mixed_tags() { let input = "A a B b C c D"; let result = strip_tool_call_tags(input); assert_eq!(result, "A B C D"); } #[test] fn strip_tool_call_tags_preserves_normal_text() { let input = "Hello world! This is a test."; let result = strip_tool_call_tags(input); assert_eq!(result, "Hello world! This is a test."); } #[test] fn strip_tool_call_tags_handles_unclosed_tags() { let input = "Hello world"; let result = strip_tool_call_tags(input); assert_eq!(result, "Hello world"); } #[test] fn strip_tool_call_tags_handles_unclosed_tool_call_with_json() { let input = "Status:\n\n{\"name\":\"shell\",\"arguments\":{\"command\":\"uptime\"}}"; let result = strip_tool_call_tags(input); assert_eq!(result, "Status:"); } #[test] fn strip_tool_call_tags_handles_mismatched_close_tag() { let input = "{\"name\":\"shell\",\"arguments\":{\"command\":\"uptime\"}}"; let result = strip_tool_call_tags(input); assert_eq!(result, ""); } #[test] fn strip_tool_call_tags_cleans_extra_newlines() { let input = "Hello\n\n\ntest\n\n\n\nworld"; let result = strip_tool_call_tags(input); assert_eq!(result, "Hello\n\nworld"); } #[test] fn strip_tool_call_tags_handles_empty_input() { let input = ""; let result = strip_tool_call_tags(input); assert_eq!(result, ""); } #[test] fn strip_tool_call_tags_handles_only_tags() { let input = "{\"name\":\"test\"}"; let result = strip_tool_call_tags(input); assert_eq!(result, ""); } #[test] fn telegram_contains_bot_mention_finds_mention() { assert!(TelegramChannel::contains_bot_mention( "Hello @mybot", "mybot" )); assert!(TelegramChannel::contains_bot_mention( "@mybot help", "mybot" )); assert!(TelegramChannel::contains_bot_mention( "Hey @mybot how are you?", "mybot" )); assert!(TelegramChannel::contains_bot_mention( "Hello @MyBot, can you help?", "mybot" )); } #[test] fn telegram_contains_bot_mention_no_false_positives() { assert!(!TelegramChannel::contains_bot_mention( "Hello @otherbot", "mybot" )); assert!(!TelegramChannel::contains_bot_mention( "Hello mybot", "mybot" )); assert!(!TelegramChannel::contains_bot_mention( "Hello @mybot2", "mybot" )); assert!(!TelegramChannel::contains_bot_mention("", "mybot")); } #[test] fn telegram_normalize_incoming_content_strips_mention() { let result = TelegramChannel::normalize_incoming_content("@mybot hello", "mybot"); assert_eq!(result, Some("hello".to_string())); } #[test] fn telegram_normalize_incoming_content_handles_multiple_mentions() { let result = TelegramChannel::normalize_incoming_content("@mybot @mybot test", "mybot"); assert_eq!(result, Some("test".to_string())); } #[test] fn telegram_normalize_incoming_content_returns_none_for_empty() { let result = TelegramChannel::normalize_incoming_content("@mybot", "mybot"); assert_eq!(result, None); } #[test] fn parse_update_message_mention_only_group_requires_exact_mention() { let ch = TelegramChannel::new("token".into(), vec!["*".into()], true); { let mut cache = ch.bot_username.lock(); *cache = Some("mybot".to_string()); } let update = serde_json::json!({ "update_id": 10, "message": { "message_id": 44, "text": "hello @mybot2", "from": { "id": 555, "username": "alice" }, "chat": { "id": -100_200_300, "type": "group" } } }); assert!(ch.parse_update_message(&update).is_none()); } #[test] fn parse_update_message_mention_only_group_strips_mention_and_drops_empty() { let ch = TelegramChannel::new("token".into(), vec!["*".into()], true); { let mut cache = ch.bot_username.lock(); *cache = Some("mybot".to_string()); } let update = serde_json::json!({ "update_id": 11, "message": { "message_id": 45, "text": "Hi @MyBot status please", "from": { "id": 555, "username": "alice" }, "chat": { "id": -100_200_300, "type": "group" } } }); let parsed = ch .parse_update_message(&update) .expect("mention should parse"); assert_eq!(parsed.content, "Hi status please"); let empty_update = serde_json::json!({ "update_id": 12, "message": { "message_id": 46, "text": "@mybot", "from": { "id": 555, "username": "alice" }, "chat": { "id": -100_200_300, "type": "group" } } }); assert!(ch.parse_update_message(&empty_update).is_none()); } #[test] fn telegram_is_group_message_detects_groups() { let group_msg = serde_json::json!({ "chat": { "type": "group" } }); assert!(TelegramChannel::is_group_message(&group_msg)); let supergroup_msg = serde_json::json!({ "chat": { "type": "supergroup" } }); assert!(TelegramChannel::is_group_message(&supergroup_msg)); let private_msg = serde_json::json!({ "chat": { "type": "private" } }); assert!(!TelegramChannel::is_group_message(&private_msg)); } #[test] fn telegram_mention_only_enabled_by_config() { let ch = TelegramChannel::new("token".into(), vec!["*".into()], true); assert!(ch.mention_only); let ch_disabled = TelegramChannel::new("token".into(), vec!["*".into()], false); assert!(!ch_disabled.mention_only); } }