feat(telegram): add bind-code pairing and fix reply routing

This commit is contained in:
leon 2026-02-17 06:22:51 -05:00 committed by Chummy
parent b2690f6809
commit bfc67c9c29

View file

@ -1,11 +1,18 @@
use super::traits::{Channel, ChannelMessage}; use super::traits::{Channel, ChannelMessage};
use crate::config::Config;
use crate::security::pairing::PairingGuard;
use anyhow::Context;
use async_trait::async_trait; use async_trait::async_trait;
use directories::UserDirs;
use reqwest::multipart::{Form, Part}; use reqwest::multipart::{Form, Part};
use std::fs;
use std::path::Path; use std::path::Path;
use std::sync::{Arc, RwLock};
use std::time::Duration; use std::time::Duration;
/// Telegram's maximum message length for text messages /// Telegram's maximum message length for text messages
const TELEGRAM_MAX_MESSAGE_LENGTH: usize = 4096; const TELEGRAM_MAX_MESSAGE_LENGTH: usize = 4096;
const TELEGRAM_BIND_COMMAND: &str = "/bind";
/// Split a message into chunks that respect Telegram's 4096 character limit. /// Split a message into chunks that respect Telegram's 4096 character limit.
/// Tries to split at word boundaries when possible, and handles continuation. /// Tries to split at word boundaries when possible, and handles continuation.
@ -181,25 +188,129 @@ fn parse_attachment_markers(message: &str) -> (String, Vec<TelegramAttachment>)
/// Telegram channel — long-polls the Bot API for updates /// Telegram channel — long-polls the Bot API for updates
pub struct TelegramChannel { pub struct TelegramChannel {
bot_token: String, bot_token: String,
allowed_users: Vec<String>, allowed_users: Arc<RwLock<Vec<String>>>,
pairing: Option<PairingGuard>,
client: reqwest::Client, client: reqwest::Client,
} }
impl TelegramChannel { impl TelegramChannel {
pub fn new(bot_token: String, allowed_users: Vec<String>) -> Self { pub fn new(bot_token: String, allowed_users: Vec<String>) -> Self {
let normalized_allowed = Self::normalize_allowed_users(allowed_users);
let pairing = if normalized_allowed.is_empty() {
let guard = PairingGuard::new(true, &[]);
if let Some(code) = guard.pairing_code() {
println!(" 🔐 Telegram pairing required. One-time bind code: {code}");
println!(" Send `{TELEGRAM_BIND_COMMAND} <code>` from your Telegram account.");
}
Some(guard)
} else {
None
};
Self { Self {
bot_token, bot_token,
allowed_users, allowed_users: Arc::new(RwLock::new(normalized_allowed)),
pairing,
client: reqwest::Client::new(), client: reqwest::Client::new(),
} }
} }
fn normalize_identity(value: &str) -> String {
value.trim().trim_start_matches('@').to_string()
}
fn normalize_allowed_users(allowed_users: Vec<String>) -> Vec<String> {
allowed_users
.into_iter()
.map(|entry| Self::normalize_identity(&entry))
.filter(|entry| !entry.is_empty())
.collect()
}
fn load_config_without_env() -> anyhow::Result<Config> {
let home = UserDirs::new()
.map(|u| u.home_dir().to_path_buf())
.context("Could not find home directory")?;
let zeroclaw_dir = home.join(".zeroclaw");
let config_path = zeroclaw_dir.join("config.toml");
let contents = fs::read_to_string(&config_path)
.with_context(|| format!("Failed to read config file: {}", config_path.display()))?;
let mut config: Config = toml::from_str(&contents)
.context("Failed to parse config file for Telegram binding")?;
config.config_path = config_path;
config.workspace_dir = zeroclaw_dir.join("workspace");
Ok(config)
}
fn persist_allowed_identity_blocking(identity: &str) -> anyhow::Result<()> {
let mut config = Self::load_config_without_env()?;
let Some(telegram) = config.channels_config.telegram.as_mut() else {
anyhow::bail!("Telegram channel config is missing in config.toml");
};
let normalized = Self::normalize_identity(identity);
if normalized.is_empty() {
anyhow::bail!("Cannot persist empty Telegram identity");
}
if !telegram.allowed_users.iter().any(|u| u == &normalized) {
telegram.allowed_users.push(normalized);
config
.save()
.context("Failed to persist Telegram allowlist to config.toml")?;
}
Ok(())
}
async fn persist_allowed_identity(&self, identity: &str) -> anyhow::Result<()> {
let identity = identity.to_string();
tokio::task::spawn_blocking(move || Self::persist_allowed_identity_blocking(&identity))
.await
.map_err(|e| anyhow::anyhow!("Failed to join Telegram bind save task: {e}"))??;
Ok(())
}
fn add_allowed_identity_runtime(&self, identity: &str) {
let normalized = Self::normalize_identity(identity);
if normalized.is_empty() {
return;
}
if let Ok(mut users) = self.allowed_users.write() {
if !users.iter().any(|u| u == &normalized) {
users.push(normalized);
}
}
}
fn extract_bind_code(text: &str) -> Option<&str> {
let mut parts = text.split_whitespace();
let command = parts.next()?;
let base_command = command.split('@').next().unwrap_or(command);
if base_command != TELEGRAM_BIND_COMMAND {
return None;
}
parts.next().map(str::trim).filter(|code| !code.is_empty())
}
fn pairing_code_active(&self) -> bool {
self.pairing
.as_ref()
.and_then(PairingGuard::pairing_code)
.is_some()
}
fn api_url(&self, method: &str) -> String { fn api_url(&self, method: &str) -> String {
format!("https://api.telegram.org/bot{}/{method}", self.bot_token) format!("https://api.telegram.org/bot{}/{method}", self.bot_token)
} }
fn is_user_allowed(&self, username: &str) -> bool { fn is_user_allowed(&self, username: &str) -> bool {
self.allowed_users.iter().any(|u| u == "*" || u == username) let identity = Self::normalize_identity(username);
self.allowed_users
.read()
.map(|users| users.iter().any(|u| u == "*" || u == &identity))
.unwrap_or(false)
} }
fn is_any_user_allowed<'a, I>(&self, identities: I) -> bool fn is_any_user_allowed<'a, I>(&self, identities: I) -> bool
@ -209,6 +320,163 @@ impl TelegramChannel {
identities.into_iter().any(|id| self.is_user_allowed(id)) identities.into_iter().any(|id| self.is_user_allowed(id))
} }
async fn handle_unauthorized_message(&self, update: &serde_json::Value) {
let Some(message) = update.get("message") else {
return;
};
let Some(text) = message.get("text").and_then(serde_json::Value::as_str) else {
return;
};
let username_opt = message
.get("from")
.and_then(|from| from.get("username"))
.and_then(serde_json::Value::as_str);
let username = username_opt.unwrap_or("unknown");
let normalized_username = Self::normalize_identity(username);
let user_id = message
.get("from")
.and_then(|from| from.get("id"))
.and_then(serde_json::Value::as_i64);
let user_id_str = user_id.map(|id| id.to_string());
let normalized_user_id = user_id_str.as_deref().map(Self::normalize_identity);
let chat_id = message
.get("chat")
.and_then(|chat| chat.get("id"))
.and_then(serde_json::Value::as_i64)
.map(|id| id.to_string());
let Some(chat_id) = chat_id else {
tracing::warn!("Telegram: missing chat_id in message, skipping");
return;
};
let mut identities = vec![normalized_username.as_str()];
if let Some(ref id) = normalized_user_id {
identities.push(id.as_str());
}
if self.is_any_user_allowed(identities.iter().copied()) {
return;
}
if let Some(code) = Self::extract_bind_code(text) {
if let Some(pairing) = self.pairing.as_ref() {
match pairing.try_pair(code) {
Ok(Some(_token)) => {
let bind_identity = normalized_user_id.clone().or_else(|| {
if normalized_username.is_empty() || normalized_username == "unknown" {
None
} else {
Some(normalized_username.clone())
}
});
if let Some(identity) = bind_identity {
self.add_allowed_identity_runtime(&identity);
match self.persist_allowed_identity(&identity).await {
Ok(()) => {
let _ = self
.send(
"✅ Telegram account bound successfully. You can talk to ZeroClaw now.",
&chat_id,
)
.await;
tracing::info!(
"Telegram: paired and allowlisted identity={identity}"
);
}
Err(e) => {
tracing::error!(
"Telegram: failed to persist allowlist after bind: {e}"
);
let _ = self
.send(
"⚠️ Bound for this runtime, but failed to persist config. Access may be lost after restart; check config file permissions.",
&chat_id,
)
.await;
}
}
} else {
let _ = self
.send(
"❌ Could not identify your Telegram account. Ensure your account has a username or stable user ID, then retry.",
&chat_id,
)
.await;
}
}
Ok(None) => {
let _ = self
.send(
"❌ Invalid binding code. Ask operator for the latest code and retry.",
&chat_id,
)
.await;
}
Err(lockout_secs) => {
let _ = self
.send(
&format!("⏳ Too many invalid attempts. Retry in {lockout_secs}s."),
&chat_id,
)
.await;
}
}
} else {
let _ = self
.send(
" Telegram pairing is not active. Ask operator to update allowlist in config.toml.",
&chat_id,
)
.await;
}
return;
}
tracing::warn!(
"Telegram: ignoring message from unauthorized user: username={username}, user_id={}. \
Allowlist Telegram username (without '@') or numeric user ID.",
user_id_str.as_deref().unwrap_or("unknown")
);
let suggested_identity = normalized_user_id
.clone()
.or_else(|| {
if normalized_username.is_empty() || normalized_username == "unknown" {
None
} else {
Some(normalized_username.clone())
}
})
.unwrap_or_else(|| "YOUR_TELEGRAM_ID".to_string());
let _ = self
.send(
&format!(
"🔐 This bot requires operator approval.\n\n\
Copy this command to operator terminal:\n\
`zeroclaw channel bind-telegram {suggested_identity}`\n\n\
After operator runs it, send your message again."
),
&chat_id,
)
.await;
if self.pairing_code_active() {
let _ = self
.send(
" If operator provides a one-time pairing code, you can also run `/bind <code>`.",
&chat_id,
)
.await;
}
}
fn parse_update_message(&self, update: &serde_json::Value) -> Option<ChannelMessage> { fn parse_update_message(&self, update: &serde_json::Value) -> Option<ChannelMessage> {
let message = update.get("message")?; let message = update.get("message")?;
@ -239,11 +507,6 @@ impl TelegramChannel {
} }
if !self.is_any_user_allowed(identities.iter().copied()) { if !self.is_any_user_allowed(identities.iter().copied()) {
tracing::warn!(
"Telegram: ignoring message from unauthorized user: username={username}, user_id={}. \
Allowlist Telegram @username or numeric user ID, then run `zeroclaw onboard --channels-only`.",
user_id.as_deref().unwrap_or("unknown")
);
return None; return None;
} }
@ -849,9 +1112,9 @@ impl Channel for TelegramChannel {
} }
let Some(msg) = self.parse_update_message(update) else { let Some(msg) = self.parse_update_message(update) else {
self.handle_unauthorized_message(update).await;
continue; continue;
}; };
// Send "typing" indicator immediately when we receive a message // Send "typing" indicator immediately when we receive a message
let typing_body = serde_json::json!({ let typing_body = serde_json::json!({
"chat_id": &msg.reply_target, "chat_id": &msg.reply_target,
@ -926,6 +1189,12 @@ mod tests {
assert!(!ch.is_user_allowed("eve")); assert!(!ch.is_user_allowed("eve"));
} }
#[test]
fn telegram_user_allowed_with_at_prefix_in_config() {
let ch = TelegramChannel::new("t".into(), vec!["@alice".into()]);
assert!(ch.is_user_allowed("alice"));
}
#[test] #[test]
fn telegram_user_denied_empty() { fn telegram_user_denied_empty() {
let ch = TelegramChannel::new("t".into(), vec![]); let ch = TelegramChannel::new("t".into(), vec![]);
@ -974,6 +1243,40 @@ mod tests {
assert!(!ch.is_any_user_allowed(["unknown", "123456789"])); assert!(!ch.is_any_user_allowed(["unknown", "123456789"]));
} }
#[test]
fn telegram_pairing_enabled_with_empty_allowlist() {
let ch = TelegramChannel::new("t".into(), vec![]);
assert!(ch.pairing_code_active());
}
#[test]
fn telegram_pairing_disabled_with_nonempty_allowlist() {
let ch = TelegramChannel::new("t".into(), vec!["alice".into()]);
assert!(!ch.pairing_code_active());
}
#[test]
fn telegram_extract_bind_code_plain_command() {
assert_eq!(
TelegramChannel::extract_bind_code("/bind 123456"),
Some("123456")
);
}
#[test]
fn telegram_extract_bind_code_supports_bot_mention() {
assert_eq!(
TelegramChannel::extract_bind_code("/bind@zeroclaw_bot 654321"),
Some("654321")
);
}
#[test]
fn telegram_extract_bind_code_rejects_invalid_forms() {
assert_eq!(TelegramChannel::extract_bind_code("/bind"), None);
assert_eq!(TelegramChannel::extract_bind_code("/start"), None);
}
#[test] #[test]
fn parse_attachment_markers_extracts_multiple_types() { fn parse_attachment_markers_extracts_multiple_types() {
let message = "Here are files [IMAGE:/tmp/a.png] and [DOCUMENT:https://example.com/a.pdf]"; let message = "Here are files [IMAGE:/tmp/a.png] and [DOCUMENT:https://example.com/a.pdf]";