From bfc67c9c299f2bffc91571f78392ac1a1726eeb8 Mon Sep 17 00:00:00 2001 From: leon Date: Tue, 17 Feb 2026 06:22:51 -0500 Subject: [PATCH] feat(telegram): add bind-code pairing and fix reply routing --- src/channels/telegram.rs | 321 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 312 insertions(+), 9 deletions(-) diff --git a/src/channels/telegram.rs b/src/channels/telegram.rs index 5d25de1..cee23c6 100644 --- a/src/channels/telegram.rs +++ b/src/channels/telegram.rs @@ -1,11 +1,18 @@ use super::traits::{Channel, ChannelMessage}; +use crate::config::Config; +use crate::security::pairing::PairingGuard; +use anyhow::Context; use async_trait::async_trait; +use directories::UserDirs; use reqwest::multipart::{Form, Part}; +use std::fs; use std::path::Path; +use std::sync::{Arc, RwLock}; use std::time::Duration; /// Telegram's maximum message length for text messages const TELEGRAM_MAX_MESSAGE_LENGTH: usize = 4096; +const TELEGRAM_BIND_COMMAND: &str = "/bind"; /// Split a message into chunks that respect Telegram's 4096 character limit. /// Tries to split at word boundaries when possible, and handles continuation. @@ -181,25 +188,129 @@ fn parse_attachment_markers(message: &str) -> (String, Vec) /// Telegram channel — long-polls the Bot API for updates pub struct TelegramChannel { bot_token: String, - allowed_users: Vec, + allowed_users: Arc>>, + pairing: Option, client: reqwest::Client, } impl TelegramChannel { pub fn new(bot_token: String, allowed_users: Vec) -> Self { + let normalized_allowed = Self::normalize_allowed_users(allowed_users); + let pairing = if normalized_allowed.is_empty() { + let guard = PairingGuard::new(true, &[]); + if let Some(code) = guard.pairing_code() { + println!(" 🔐 Telegram pairing required. One-time bind code: {code}"); + println!(" Send `{TELEGRAM_BIND_COMMAND} ` from your Telegram account."); + } + Some(guard) + } else { + None + }; + Self { bot_token, - allowed_users, + allowed_users: Arc::new(RwLock::new(normalized_allowed)), + pairing, client: reqwest::Client::new(), } } + fn normalize_identity(value: &str) -> String { + value.trim().trim_start_matches('@').to_string() + } + + fn normalize_allowed_users(allowed_users: Vec) -> Vec { + allowed_users + .into_iter() + .map(|entry| Self::normalize_identity(&entry)) + .filter(|entry| !entry.is_empty()) + .collect() + } + + fn load_config_without_env() -> anyhow::Result { + let home = UserDirs::new() + .map(|u| u.home_dir().to_path_buf()) + .context("Could not find home directory")?; + let zeroclaw_dir = home.join(".zeroclaw"); + let config_path = zeroclaw_dir.join("config.toml"); + + let contents = fs::read_to_string(&config_path) + .with_context(|| format!("Failed to read config file: {}", config_path.display()))?; + let mut config: Config = toml::from_str(&contents) + .context("Failed to parse config file for Telegram binding")?; + config.config_path = config_path; + config.workspace_dir = zeroclaw_dir.join("workspace"); + Ok(config) + } + + fn persist_allowed_identity_blocking(identity: &str) -> anyhow::Result<()> { + let mut config = Self::load_config_without_env()?; + let Some(telegram) = config.channels_config.telegram.as_mut() else { + anyhow::bail!("Telegram channel config is missing in config.toml"); + }; + + let normalized = Self::normalize_identity(identity); + if normalized.is_empty() { + anyhow::bail!("Cannot persist empty Telegram identity"); + } + + if !telegram.allowed_users.iter().any(|u| u == &normalized) { + telegram.allowed_users.push(normalized); + config + .save() + .context("Failed to persist Telegram allowlist to config.toml")?; + } + + Ok(()) + } + + async fn persist_allowed_identity(&self, identity: &str) -> anyhow::Result<()> { + let identity = identity.to_string(); + tokio::task::spawn_blocking(move || Self::persist_allowed_identity_blocking(&identity)) + .await + .map_err(|e| anyhow::anyhow!("Failed to join Telegram bind save task: {e}"))??; + Ok(()) + } + + fn add_allowed_identity_runtime(&self, identity: &str) { + let normalized = Self::normalize_identity(identity); + if normalized.is_empty() { + return; + } + if let Ok(mut users) = self.allowed_users.write() { + if !users.iter().any(|u| u == &normalized) { + users.push(normalized); + } + } + } + + fn extract_bind_code(text: &str) -> Option<&str> { + let mut parts = text.split_whitespace(); + let command = parts.next()?; + let base_command = command.split('@').next().unwrap_or(command); + if base_command != TELEGRAM_BIND_COMMAND { + return None; + } + parts.next().map(str::trim).filter(|code| !code.is_empty()) + } + + fn pairing_code_active(&self) -> bool { + self.pairing + .as_ref() + .and_then(PairingGuard::pairing_code) + .is_some() + } + fn api_url(&self, method: &str) -> String { format!("https://api.telegram.org/bot{}/{method}", self.bot_token) } fn is_user_allowed(&self, username: &str) -> bool { - self.allowed_users.iter().any(|u| u == "*" || u == username) + let identity = Self::normalize_identity(username); + self.allowed_users + .read() + .map(|users| users.iter().any(|u| u == "*" || u == &identity)) + .unwrap_or(false) } fn is_any_user_allowed<'a, I>(&self, identities: I) -> bool @@ -209,6 +320,163 @@ impl TelegramChannel { identities.into_iter().any(|id| self.is_user_allowed(id)) } + async fn handle_unauthorized_message(&self, update: &serde_json::Value) { + let Some(message) = update.get("message") else { + return; + }; + + let Some(text) = message.get("text").and_then(serde_json::Value::as_str) else { + return; + }; + + let username_opt = message + .get("from") + .and_then(|from| from.get("username")) + .and_then(serde_json::Value::as_str); + let username = username_opt.unwrap_or("unknown"); + let normalized_username = Self::normalize_identity(username); + + let user_id = message + .get("from") + .and_then(|from| from.get("id")) + .and_then(serde_json::Value::as_i64); + let user_id_str = user_id.map(|id| id.to_string()); + let normalized_user_id = user_id_str.as_deref().map(Self::normalize_identity); + + let chat_id = message + .get("chat") + .and_then(|chat| chat.get("id")) + .and_then(serde_json::Value::as_i64) + .map(|id| id.to_string()); + + let Some(chat_id) = chat_id else { + tracing::warn!("Telegram: missing chat_id in message, skipping"); + return; + }; + + let mut identities = vec![normalized_username.as_str()]; + if let Some(ref id) = normalized_user_id { + identities.push(id.as_str()); + } + + if self.is_any_user_allowed(identities.iter().copied()) { + return; + } + + if let Some(code) = Self::extract_bind_code(text) { + if let Some(pairing) = self.pairing.as_ref() { + match pairing.try_pair(code) { + Ok(Some(_token)) => { + let bind_identity = normalized_user_id.clone().or_else(|| { + if normalized_username.is_empty() || normalized_username == "unknown" { + None + } else { + Some(normalized_username.clone()) + } + }); + + if let Some(identity) = bind_identity { + self.add_allowed_identity_runtime(&identity); + match self.persist_allowed_identity(&identity).await { + Ok(()) => { + let _ = self + .send( + "✅ Telegram account bound successfully. You can talk to ZeroClaw now.", + &chat_id, + ) + .await; + tracing::info!( + "Telegram: paired and allowlisted identity={identity}" + ); + } + Err(e) => { + tracing::error!( + "Telegram: failed to persist allowlist after bind: {e}" + ); + let _ = self + .send( + "âš ī¸ Bound for this runtime, but failed to persist config. Access may be lost after restart; check config file permissions.", + &chat_id, + ) + .await; + } + } + } else { + let _ = self + .send( + "❌ Could not identify your Telegram account. Ensure your account has a username or stable user ID, then retry.", + &chat_id, + ) + .await; + } + } + Ok(None) => { + let _ = self + .send( + "❌ Invalid binding code. Ask operator for the latest code and retry.", + &chat_id, + ) + .await; + } + Err(lockout_secs) => { + let _ = self + .send( + &format!("âŗ Too many invalid attempts. Retry in {lockout_secs}s."), + &chat_id, + ) + .await; + } + } + } else { + let _ = self + .send( + "â„šī¸ Telegram pairing is not active. Ask operator to update allowlist in config.toml.", + &chat_id, + ) + .await; + } + return; + } + + tracing::warn!( + "Telegram: ignoring message from unauthorized user: username={username}, user_id={}. \ +Allowlist Telegram username (without '@') or numeric user ID.", + user_id_str.as_deref().unwrap_or("unknown") + ); + + let suggested_identity = normalized_user_id + .clone() + .or_else(|| { + if normalized_username.is_empty() || normalized_username == "unknown" { + None + } else { + Some(normalized_username.clone()) + } + }) + .unwrap_or_else(|| "YOUR_TELEGRAM_ID".to_string()); + + let _ = self + .send( + &format!( + "🔐 This bot requires operator approval.\n\n\ +Copy this command to operator terminal:\n\ +`zeroclaw channel bind-telegram {suggested_identity}`\n\n\ +After operator runs it, send your message again." + ), + &chat_id, + ) + .await; + + if self.pairing_code_active() { + let _ = self + .send( + "â„šī¸ If operator provides a one-time pairing code, you can also run `/bind `.", + &chat_id, + ) + .await; + } + } + fn parse_update_message(&self, update: &serde_json::Value) -> Option { let message = update.get("message")?; @@ -239,11 +507,6 @@ impl TelegramChannel { } if !self.is_any_user_allowed(identities.iter().copied()) { - tracing::warn!( - "Telegram: ignoring message from unauthorized user: username={username}, user_id={}. \ -Allowlist Telegram @username or numeric user ID, then run `zeroclaw onboard --channels-only`.", - user_id.as_deref().unwrap_or("unknown") - ); return None; } @@ -849,9 +1112,9 @@ impl Channel for TelegramChannel { } let Some(msg) = self.parse_update_message(update) else { + self.handle_unauthorized_message(update).await; continue; }; - // Send "typing" indicator immediately when we receive a message let typing_body = serde_json::json!({ "chat_id": &msg.reply_target, @@ -926,6 +1189,12 @@ mod tests { assert!(!ch.is_user_allowed("eve")); } + #[test] + fn telegram_user_allowed_with_at_prefix_in_config() { + let ch = TelegramChannel::new("t".into(), vec!["@alice".into()]); + assert!(ch.is_user_allowed("alice")); + } + #[test] fn telegram_user_denied_empty() { let ch = TelegramChannel::new("t".into(), vec![]); @@ -974,6 +1243,40 @@ mod tests { assert!(!ch.is_any_user_allowed(["unknown", "123456789"])); } + #[test] + fn telegram_pairing_enabled_with_empty_allowlist() { + let ch = TelegramChannel::new("t".into(), vec![]); + assert!(ch.pairing_code_active()); + } + + #[test] + fn telegram_pairing_disabled_with_nonempty_allowlist() { + let ch = TelegramChannel::new("t".into(), vec!["alice".into()]); + assert!(!ch.pairing_code_active()); + } + + #[test] + fn telegram_extract_bind_code_plain_command() { + assert_eq!( + TelegramChannel::extract_bind_code("/bind 123456"), + Some("123456") + ); + } + + #[test] + fn telegram_extract_bind_code_supports_bot_mention() { + assert_eq!( + TelegramChannel::extract_bind_code("/bind@zeroclaw_bot 654321"), + Some("654321") + ); + } + + #[test] + fn telegram_extract_bind_code_rejects_invalid_forms() { + assert_eq!(TelegramChannel::extract_bind_code("/bind"), None); + assert_eq!(TelegramChannel::extract_bind_code("/start"), None); + } + #[test] fn parse_attachment_markers_extracts_multiple_types() { let message = "Here are files [IMAGE:/tmp/a.png] and [DOCUMENT:https://example.com/a.pdf]";