From ae37e59423f0673947215004c1cab0cce31047cc Mon Sep 17 00:00:00 2001 From: Chummy Date: Tue, 17 Feb 2026 21:07:23 +0800 Subject: [PATCH] fix(channels): resolve telegram reply target and media delivery (#525) Co-authored-by: Will Sarg <12886992+willsarg@users.noreply.github.com> --- README.md | 15 + src/channels/cli.rs | 5 +- src/channels/dingtalk.rs | 2 +- src/channels/discord.rs | 14 +- src/channels/email_channel.rs | 4 +- src/channels/imessage.rs | 2 +- src/channels/irc.rs | 4 +- src/channels/lark.rs | 2 +- src/channels/matrix.rs | 2 +- src/channels/mod.rs | 42 ++- src/channels/slack.rs | 2 +- src/channels/telegram.rs | 616 +++++++++++++++++++++++++++------- src/channels/traits.rs | 9 +- src/channels/whatsapp.rs | 4 +- src/gateway/mod.rs | 2 +- 15 files changed, 561 insertions(+), 164 deletions(-) diff --git a/README.md b/README.md index a242116..96b5305 100644 --- a/README.md +++ b/README.md @@ -291,6 +291,21 @@ rerun channel setup only: zeroclaw onboard --channels-only ``` +### Telegram media replies + +Telegram routing now replies to the source **chat ID** from incoming updates (instead of usernames), +which avoids `Bad Request: chat not found` failures. + +For non-text replies, ZeroClaw can send Telegram attachments when the assistant includes markers: + +- `[IMAGE:]` +- `[DOCUMENT:]` +- `[VIDEO:]` +- `[AUDIO:]` +- `[VOICE:]` + +Paths can be local files (for example `/tmp/screenshot.png`) or HTTPS URLs. + ### WhatsApp Business Cloud API Setup WhatsApp uses Meta's Cloud API with webhooks (push-based, not polling): diff --git a/src/channels/cli.rs b/src/channels/cli.rs index 8e070dd..6a61b2c 100644 --- a/src/channels/cli.rs +++ b/src/channels/cli.rs @@ -91,13 +91,14 @@ mod tests { let msg = ChannelMessage { id: "test-id".into(), sender: "user".into(), - reply_to: "user".into(), + reply_target: "user".into(), content: "hello".into(), channel: "cli".into(), timestamp: 1_234_567_890, }; assert_eq!(msg.id, "test-id"); assert_eq!(msg.sender, "user"); + assert_eq!(msg.reply_target, "user"); assert_eq!(msg.content, "hello"); assert_eq!(msg.channel, "cli"); assert_eq!(msg.timestamp, 1_234_567_890); @@ -108,7 +109,7 @@ mod tests { let msg = ChannelMessage { id: "id".into(), sender: "s".into(), - reply_to: "s".into(), + reply_target: "s".into(), content: "c".into(), channel: "ch".into(), timestamp: 0, diff --git a/src/channels/dingtalk.rs b/src/channels/dingtalk.rs index 4b60b55..ca5bb95 100644 --- a/src/channels/dingtalk.rs +++ b/src/channels/dingtalk.rs @@ -7,7 +7,7 @@ use tokio::sync::RwLock; use tokio_tungstenite::tungstenite::Message; use uuid::Uuid; -/// DingTalk (钉钉) channel — connects via Stream Mode WebSocket for real-time messages. +/// DingTalk channel — connects via Stream Mode WebSocket for real-time messages. /// Replies are sent through per-message session webhook URLs. pub struct DingTalkChannel { client_id: String, diff --git a/src/channels/discord.rs b/src/channels/discord.rs index 9cbd149..10578d2 100644 --- a/src/channels/discord.rs +++ b/src/channels/discord.rs @@ -363,7 +363,11 @@ impl Channel for DiscordChannel { }; let message_id = d.get("id").and_then(|i| i.as_str()).unwrap_or(""); - let channel_id = d.get("channel_id").and_then(|c| c.as_str()).unwrap_or("").to_string(); + let channel_id = d + .get("channel_id") + .and_then(|c| c.as_str()) + .unwrap_or("") + .to_string(); let channel_msg = ChannelMessage { id: if message_id.is_empty() { @@ -372,8 +376,12 @@ impl Channel for DiscordChannel { format!("discord_{message_id}") }, sender: author_id.to_string(), - reply_to: channel_id.clone(), - content: clean_content, + reply_target: if channel_id.is_empty() { + author_id.to_string() + } else { + channel_id + }, + content: content.to_string(), channel: "discord".to_string(), timestamp: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) diff --git a/src/channels/email_channel.rs b/src/channels/email_channel.rs index 5a9ef64..709ba18 100644 --- a/src/channels/email_channel.rs +++ b/src/channels/email_channel.rs @@ -428,8 +428,8 @@ impl Channel for EmailChannel { } // MutexGuard dropped before await let msg = ChannelMessage { id, - sender: sender.clone(), - reply_to: sender, + reply_target: sender.clone(), + sender, content, channel: "email".to_string(), timestamp: ts, diff --git a/src/channels/imessage.rs b/src/channels/imessage.rs index f4fcd62..36bf72f 100644 --- a/src/channels/imessage.rs +++ b/src/channels/imessage.rs @@ -172,7 +172,7 @@ end tell"# let msg = ChannelMessage { id: rowid.to_string(), sender: sender.clone(), - reply_to: sender.clone(), + reply_target: sender.clone(), content: text, channel: "imessage".to_string(), timestamp: std::time::SystemTime::now() diff --git a/src/channels/irc.rs b/src/channels/irc.rs index 1221234..61a48cc 100644 --- a/src/channels/irc.rs +++ b/src/channels/irc.rs @@ -565,8 +565,8 @@ impl Channel for IrcChannel { let seq = MSG_SEQ.fetch_add(1, Ordering::Relaxed); let channel_msg = ChannelMessage { id: format!("irc_{}_{seq}", chrono::Utc::now().timestamp_millis()), - sender: reply_to.clone(), - reply_to, + sender: sender_nick.to_string(), + reply_target: reply_to, content, channel: "irc".to_string(), timestamp: std::time::SystemTime::now() diff --git a/src/channels/lark.rs b/src/channels/lark.rs index 6e011e7..896defc 100644 --- a/src/channels/lark.rs +++ b/src/channels/lark.rs @@ -614,7 +614,7 @@ impl LarkChannel { messages.push(ChannelMessage { id: Uuid::new_v4().to_string(), sender: chat_id.to_string(), - reply_to: chat_id.to_string(), + reply_target: chat_id.to_string(), content: text, channel: "lark".to_string(), timestamp, diff --git a/src/channels/matrix.rs b/src/channels/matrix.rs index 0462bbe..4f34bcf 100644 --- a/src/channels/matrix.rs +++ b/src/channels/matrix.rs @@ -230,7 +230,7 @@ impl Channel for MatrixChannel { let msg = ChannelMessage { id: format!("mx_{}", chrono::Utc::now().timestamp_millis()), sender: event.sender.clone(), - reply_to: self.room_id.clone(), + reply_target: event.sender.clone(), content: body.clone(), channel: "matrix".to_string(), timestamp: std::time::SystemTime::now() diff --git a/src/channels/mod.rs b/src/channels/mod.rs index f8cfe17..d63f63d 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -69,6 +69,15 @@ fn conversation_memory_key(msg: &traits::ChannelMessage) -> String { format!("{}_{}_{}", msg.channel, msg.sender, msg.id) } +fn channel_delivery_instructions(channel_name: &str) -> Option<&'static str> { + match channel_name { + "telegram" => Some( + "When responding on Telegram, include media markers for files or URLs that should be sent as attachments. Use one marker per attachment with this exact syntax: [IMAGE:], [DOCUMENT:], [VIDEO:], [AUDIO:], or [VOICE:]. Keep normal user-facing text outside markers and never wrap markers in code fences.", + ), + _ => None, + } +} + async fn build_memory_context(mem: &dyn Memory, user_msg: &str) -> String { let mut context = String::new(); @@ -172,7 +181,7 @@ async fn process_channel_message(ctx: Arc, msg: traits::C let target_channel = ctx.channels_by_name.get(&msg.channel).cloned(); if let Some(channel) = target_channel.as_ref() { - if let Err(e) = channel.start_typing(&msg.reply_to).await { + if let Err(e) = channel.start_typing(&msg.reply_target).await { tracing::debug!("Failed to start typing on {}: {e}", channel.name()); } } @@ -185,6 +194,10 @@ async fn process_channel_message(ctx: Arc, msg: traits::C ChatMessage::user(&enriched_message), ]; + if let Some(instructions) = channel_delivery_instructions(&msg.channel) { + history.push(ChatMessage::system(instructions)); + } + let llm_result = tokio::time::timeout( Duration::from_secs(CHANNEL_MESSAGE_TIMEOUT_SECS), run_tool_call_loop( @@ -201,7 +214,7 @@ async fn process_channel_message(ctx: Arc, msg: traits::C .await; if let Some(channel) = target_channel.as_ref() { - if let Err(e) = channel.stop_typing(&msg.reply_to).await { + if let Err(e) = channel.stop_typing(&msg.reply_target).await { tracing::debug!("Failed to stop typing on {}: {e}", channel.name()); } } @@ -214,7 +227,7 @@ async fn process_channel_message(ctx: Arc, msg: traits::C truncate_with_ellipsis(&response, 80) ); if let Some(channel) = target_channel.as_ref() { - if let Err(e) = channel.send(&response, &msg.reply_to).await { + if let Err(e) = channel.send(&response, &msg.reply_target).await { eprintln!(" ❌ Failed to reply on {}: {e}", channel.name()); } } @@ -225,7 +238,9 @@ async fn process_channel_message(ctx: Arc, msg: traits::C started_at.elapsed().as_millis() ); if let Some(channel) = target_channel.as_ref() { - let _ = channel.send(&format!("⚠️ Error: {e}"), &msg.reply_to).await; + let _ = channel + .send(&format!("⚠️ Error: {e}"), &msg.reply_target) + .await; } } Err(_) => { @@ -242,7 +257,7 @@ async fn process_channel_message(ctx: Arc, msg: traits::C let _ = channel .send( "⚠️ Request timed out while waiting for the model. Please try again.", - &msg.reply_to, + &msg.reply_target, ) .await; } @@ -1245,7 +1260,7 @@ mod tests { traits::ChannelMessage { id: "msg-1".to_string(), sender: "alice".to_string(), - reply_to: "alice".to_string(), + reply_target: "chat-42".to_string(), content: "What is the BTC price now?".to_string(), channel: "test-channel".to_string(), timestamp: 1, @@ -1255,6 +1270,7 @@ mod tests { let sent_messages = channel_impl.sent_messages.lock().await; assert_eq!(sent_messages.len(), 1); + assert!(sent_messages[0].starts_with("chat-42:")); assert!(sent_messages[0].contains("BTC is currently around")); assert!(!sent_messages[0].contains("\"tool_calls\"")); assert!(!sent_messages[0].contains("mock_price")); @@ -1338,7 +1354,7 @@ mod tests { tx.send(traits::ChannelMessage { id: "1".to_string(), sender: "alice".to_string(), - reply_to: "alice".to_string(), + reply_target: "alice".to_string(), content: "hello".to_string(), channel: "test-channel".to_string(), timestamp: 1, @@ -1348,7 +1364,7 @@ mod tests { tx.send(traits::ChannelMessage { id: "2".to_string(), sender: "bob".to_string(), - reply_to: "bob".to_string(), + reply_target: "bob".to_string(), content: "world".to_string(), channel: "test-channel".to_string(), timestamp: 2, @@ -1611,7 +1627,7 @@ mod tests { let msg = traits::ChannelMessage { id: "msg_abc123".into(), sender: "U123".into(), - reply_to: "U123".into(), + reply_target: "C456".into(), content: "hello".into(), channel: "slack".into(), timestamp: 1, @@ -1625,7 +1641,7 @@ mod tests { let msg1 = traits::ChannelMessage { id: "msg_1".into(), sender: "U123".into(), - reply_to: "U123".into(), + reply_target: "C456".into(), content: "first".into(), channel: "slack".into(), timestamp: 1, @@ -1633,7 +1649,7 @@ mod tests { let msg2 = traits::ChannelMessage { id: "msg_2".into(), sender: "U123".into(), - reply_to: "U123".into(), + reply_target: "C456".into(), content: "second".into(), channel: "slack".into(), timestamp: 2, @@ -1653,7 +1669,7 @@ mod tests { let msg1 = traits::ChannelMessage { id: "msg_1".into(), sender: "U123".into(), - reply_to: "U123".into(), + reply_target: "C456".into(), content: "I'm Paul".into(), channel: "slack".into(), timestamp: 1, @@ -1661,7 +1677,7 @@ mod tests { let msg2 = traits::ChannelMessage { id: "msg_2".into(), sender: "U123".into(), - reply_to: "U123".into(), + reply_target: "C456".into(), content: "I'm 45".into(), channel: "slack".into(), timestamp: 2, diff --git a/src/channels/slack.rs b/src/channels/slack.rs index 24632f3..7f8ee51 100644 --- a/src/channels/slack.rs +++ b/src/channels/slack.rs @@ -161,7 +161,7 @@ impl Channel for SlackChannel { let channel_msg = ChannelMessage { id: format!("slack_{channel_id}_{ts}"), sender: user.to_string(), - reply_to: channel_id.to_string(), + reply_target: channel_id.clone(), content: text.to_string(), channel: "slack".to_string(), timestamp: std::time::SystemTime::now() diff --git a/src/channels/telegram.rs b/src/channels/telegram.rs index 01f0b98..5d25de1 100644 --- a/src/channels/telegram.rs +++ b/src/channels/telegram.rs @@ -51,6 +51,133 @@ fn split_message_for_telegram(message: &str) -> Vec { chunks } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum TelegramAttachmentKind { + Image, + Document, + Video, + Audio, + Voice, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct TelegramAttachment { + kind: TelegramAttachmentKind, + target: String, +} + +impl TelegramAttachmentKind { + fn from_marker(marker: &str) -> Option { + match marker.trim().to_ascii_uppercase().as_str() { + "IMAGE" | "PHOTO" => Some(Self::Image), + "DOCUMENT" | "FILE" => Some(Self::Document), + "VIDEO" => Some(Self::Video), + "AUDIO" => Some(Self::Audio), + "VOICE" => Some(Self::Voice), + _ => None, + } + } +} + +fn is_http_url(target: &str) -> bool { + target.starts_with("http://") || target.starts_with("https://") +} + +fn infer_attachment_kind_from_target(target: &str) -> Option { + let normalized = target + .split('?') + .next() + .unwrap_or(target) + .split('#') + .next() + .unwrap_or(target); + + let extension = Path::new(normalized) + .extension() + .and_then(|ext| ext.to_str())? + .to_ascii_lowercase(); + + match extension.as_str() { + "png" | "jpg" | "jpeg" | "gif" | "webp" | "bmp" => Some(TelegramAttachmentKind::Image), + "mp4" | "mov" | "mkv" | "avi" | "webm" => Some(TelegramAttachmentKind::Video), + "mp3" | "m4a" | "wav" | "flac" => Some(TelegramAttachmentKind::Audio), + "ogg" | "oga" | "opus" => Some(TelegramAttachmentKind::Voice), + "pdf" | "txt" | "md" | "csv" | "json" | "zip" | "tar" | "gz" | "doc" | "docx" | "xls" + | "xlsx" | "ppt" | "pptx" => Some(TelegramAttachmentKind::Document), + _ => None, + } +} + +fn parse_path_only_attachment(message: &str) -> Option { + let trimmed = message.trim(); + if trimmed.is_empty() || trimmed.contains('\n') { + return None; + } + + let candidate = trimmed.trim_matches(|c| matches!(c, '`' | '"' | '\'')); + if candidate.chars().any(char::is_whitespace) { + return None; + } + + let candidate = candidate.strip_prefix("file://").unwrap_or(candidate); + let kind = infer_attachment_kind_from_target(candidate)?; + + if !is_http_url(candidate) && !Path::new(candidate).exists() { + return None; + } + + Some(TelegramAttachment { + kind, + target: candidate.to_string(), + }) +} + +fn parse_attachment_markers(message: &str) -> (String, Vec) { + let mut cleaned = String::with_capacity(message.len()); + let mut attachments = Vec::new(); + let mut cursor = 0; + + while cursor < message.len() { + let Some(open_rel) = message[cursor..].find('[') else { + cleaned.push_str(&message[cursor..]); + break; + }; + + let open = cursor + open_rel; + cleaned.push_str(&message[cursor..open]); + + let Some(close_rel) = message[open..].find(']') else { + cleaned.push_str(&message[open..]); + break; + }; + + let close = open + close_rel; + let marker = &message[open + 1..close]; + + let parsed = marker.split_once(':').and_then(|(kind, target)| { + let kind = TelegramAttachmentKind::from_marker(kind)?; + let target = target.trim(); + if target.is_empty() { + return None; + } + Some(TelegramAttachment { + kind, + target: target.to_string(), + }) + }); + + if let Some(attachment) = parsed { + attachments.push(attachment); + } else { + cleaned.push_str(&message[open..=close]); + } + + cursor = close + 1; + } + + (cleaned.trim().to_string(), attachments) +} + /// Telegram channel — long-polls the Bot API for updates pub struct TelegramChannel { bot_token: String, @@ -82,6 +209,216 @@ impl TelegramChannel { identities.into_iter().any(|id| self.is_user_allowed(id)) } + fn parse_update_message(&self, update: &serde_json::Value) -> Option { + let message = update.get("message")?; + + let text = message.get("text").and_then(serde_json::Value::as_str)?; + + let username = message + .get("from") + .and_then(|from| from.get("username")) + .and_then(serde_json::Value::as_str) + .unwrap_or("unknown") + .to_string(); + + let user_id = message + .get("from") + .and_then(|from| from.get("id")) + .and_then(serde_json::Value::as_i64) + .map(|id| id.to_string()); + + let sender_identity = if username == "unknown" { + user_id.clone().unwrap_or_else(|| "unknown".to_string()) + } else { + username.clone() + }; + + let mut identities = vec![username.as_str()]; + if let Some(id) = user_id.as_deref() { + identities.push(id); + } + + if !self.is_any_user_allowed(identities.iter().copied()) { + tracing::warn!( + "Telegram: ignoring message from unauthorized user: username={username}, user_id={}. \ +Allowlist Telegram @username or numeric user ID, then run `zeroclaw onboard --channels-only`.", + user_id.as_deref().unwrap_or("unknown") + ); + return None; + } + + let chat_id = message + .get("chat") + .and_then(|chat| chat.get("id")) + .and_then(serde_json::Value::as_i64) + .map(|id| id.to_string())?; + + let message_id = message + .get("message_id") + .and_then(serde_json::Value::as_i64) + .unwrap_or(0); + + Some(ChannelMessage { + id: format!("telegram_{chat_id}_{message_id}"), + sender: sender_identity, + reply_target: chat_id, + content: text.to_string(), + channel: "telegram".to_string(), + timestamp: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(), + }) + } + + async fn send_text_chunks(&self, message: &str, chat_id: &str) -> anyhow::Result<()> { + let chunks = split_message_for_telegram(message); + + for (index, chunk) in chunks.iter().enumerate() { + let text = if chunks.len() > 1 { + if index == 0 { + format!("{chunk}\n\n(continues...)") + } else if index == chunks.len() - 1 { + format!("(continued)\n\n{chunk}") + } else { + format!("(continued)\n\n{chunk}\n\n(continues...)") + } + } else { + chunk.to_string() + }; + + let markdown_body = serde_json::json!({ + "chat_id": chat_id, + "text": text, + "parse_mode": "Markdown" + }); + + let markdown_resp = self + .client + .post(self.api_url("sendMessage")) + .json(&markdown_body) + .send() + .await?; + + if markdown_resp.status().is_success() { + if index < chunks.len() - 1 { + tokio::time::sleep(Duration::from_millis(100)).await; + } + continue; + } + + let markdown_status = markdown_resp.status(); + let markdown_err = markdown_resp.text().await.unwrap_or_default(); + tracing::warn!( + status = ?markdown_status, + "Telegram sendMessage with Markdown failed; retrying without parse_mode" + ); + + let plain_body = serde_json::json!({ + "chat_id": chat_id, + "text": text, + }); + let plain_resp = self + .client + .post(self.api_url("sendMessage")) + .json(&plain_body) + .send() + .await?; + + if !plain_resp.status().is_success() { + let plain_status = plain_resp.status(); + let plain_err = plain_resp.text().await.unwrap_or_default(); + anyhow::bail!( + "Telegram sendMessage failed (markdown {}: {}; plain {}: {})", + markdown_status, + markdown_err, + plain_status, + plain_err + ); + } + + if index < chunks.len() - 1 { + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + + Ok(()) + } + + async fn send_media_by_url( + &self, + method: &str, + media_field: &str, + chat_id: &str, + url: &str, + caption: Option<&str>, + ) -> anyhow::Result<()> { + let mut body = serde_json::json!({ + "chat_id": chat_id, + }); + body[media_field] = serde_json::Value::String(url.to_string()); + + if let Some(cap) = caption { + body["caption"] = serde_json::Value::String(cap.to_string()); + } + + let resp = self + .client + .post(self.api_url(method)) + .json(&body) + .send() + .await?; + + if !resp.status().is_success() { + let err = resp.text().await?; + anyhow::bail!("Telegram {method} by URL failed: {err}"); + } + + tracing::info!("Telegram {method} sent to {chat_id}: {url}"); + Ok(()) + } + + async fn send_attachment( + &self, + chat_id: &str, + attachment: &TelegramAttachment, + ) -> anyhow::Result<()> { + let target = attachment.target.trim(); + + if is_http_url(target) { + return match attachment.kind { + TelegramAttachmentKind::Image => { + self.send_photo_by_url(chat_id, target, None).await + } + TelegramAttachmentKind::Document => { + self.send_document_by_url(chat_id, target, None).await + } + TelegramAttachmentKind::Video => { + self.send_video_by_url(chat_id, target, None).await + } + TelegramAttachmentKind::Audio => { + self.send_audio_by_url(chat_id, target, None).await + } + TelegramAttachmentKind::Voice => { + self.send_voice_by_url(chat_id, target, None).await + } + }; + } + + let path = Path::new(target); + if !path.exists() { + anyhow::bail!("Telegram attachment path not found: {target}"); + } + + match attachment.kind { + TelegramAttachmentKind::Image => self.send_photo(chat_id, path, None).await, + TelegramAttachmentKind::Document => self.send_document(chat_id, path, None).await, + TelegramAttachmentKind::Video => self.send_video(chat_id, path, None).await, + TelegramAttachmentKind::Audio => self.send_audio(chat_id, path, None).await, + TelegramAttachmentKind::Voice => self.send_voice(chat_id, path, None).await, + } + } + /// Send a document/file to a Telegram chat pub async fn send_document( &self, @@ -408,6 +745,39 @@ impl TelegramChannel { tracing::info!("Telegram photo (URL) sent to {chat_id}: {url}"); Ok(()) } + + /// Send a video by URL (Telegram will download it) + pub async fn send_video_by_url( + &self, + chat_id: &str, + url: &str, + caption: Option<&str>, + ) -> anyhow::Result<()> { + self.send_media_by_url("sendVideo", "video", chat_id, url, caption) + .await + } + + /// Send an audio file by URL (Telegram will download it) + pub async fn send_audio_by_url( + &self, + chat_id: &str, + url: &str, + caption: Option<&str>, + ) -> anyhow::Result<()> { + self.send_media_by_url("sendAudio", "audio", chat_id, url, caption) + .await + } + + /// Send a voice message by URL (Telegram will download it) + pub async fn send_voice_by_url( + &self, + chat_id: &str, + url: &str, + caption: Option<&str>, + ) -> anyhow::Result<()> { + self.send_media_by_url("sendVoice", "voice", chat_id, url, caption) + .await + } } #[async_trait] @@ -417,82 +787,27 @@ impl Channel for TelegramChannel { } async fn send(&self, message: &str, chat_id: &str) -> anyhow::Result<()> { - // Split message if it exceeds Telegram's 4096 character limit - let chunks = split_message_for_telegram(message); + let (text_without_markers, attachments) = parse_attachment_markers(message); - for (i, chunk) in chunks.iter().enumerate() { - // Add continuation marker for multi-part messages - let text = if chunks.len() > 1 { - if i == 0 { - format!("{chunk}\n\n(continues...)") - } else if i == chunks.len() - 1 { - format!("(continued)\n\n{chunk}") - } else { - format!("(continued)\n\n{chunk}\n\n(continues...)") - } - } else { - chunk.to_string() - }; - - let markdown_body = serde_json::json!({ - "chat_id": chat_id, - "text": text, - "parse_mode": "Markdown" - }); - - let markdown_resp = self - .client - .post(self.api_url("sendMessage")) - .json(&markdown_body) - .send() - .await?; - - if markdown_resp.status().is_success() { - // Small delay between chunks to avoid rate limiting - if i < chunks.len() - 1 { - tokio::time::sleep(Duration::from_millis(100)).await; - } - continue; + if !attachments.is_empty() { + if !text_without_markers.is_empty() { + self.send_text_chunks(&text_without_markers, chat_id) + .await?; } - let markdown_status = markdown_resp.status(); - let markdown_err = markdown_resp.text().await.unwrap_or_default(); - tracing::warn!( - status = ?markdown_status, - "Telegram sendMessage with Markdown failed; retrying without parse_mode" - ); - - // Retry without parse_mode as a compatibility fallback. - let plain_body = serde_json::json!({ - "chat_id": chat_id, - "text": text, - }); - let plain_resp = self - .client - .post(self.api_url("sendMessage")) - .json(&plain_body) - .send() - .await?; - - if !plain_resp.status().is_success() { - let plain_status = plain_resp.status(); - let plain_err = plain_resp.text().await.unwrap_or_default(); - anyhow::bail!( - "Telegram sendMessage failed (markdown {}: {}; plain {}: {})", - markdown_status, - markdown_err, - plain_status, - plain_err - ); + for attachment in &attachments { + self.send_attachment(chat_id, attachment).await?; } - // Small delay between chunks to avoid rate limiting - if i < chunks.len() - 1 { - tokio::time::sleep(Duration::from_millis(100)).await; - } + return Ok(()); } - Ok(()) + if let Some(attachment) = parse_path_only_attachment(message) { + self.send_attachment(chat_id, &attachment).await?; + return Ok(()); + } + + self.send_text_chunks(message, chat_id).await } async fn listen(&self, tx: tokio::sync::mpsc::Sender) -> anyhow::Result<()> { @@ -533,59 +848,13 @@ impl Channel for TelegramChannel { offset = uid + 1; } - let Some(message) = update.get("message") else { + let Some(msg) = self.parse_update_message(update) else { continue; }; - let Some(text) = message.get("text").and_then(serde_json::Value::as_str) else { - continue; - }; - - let username_opt = message - .get("from") - .and_then(|f| f.get("username")) - .and_then(|u| u.as_str()); - let username = username_opt.unwrap_or("unknown"); - - let user_id = message - .get("from") - .and_then(|f| f.get("id")) - .and_then(serde_json::Value::as_i64); - let user_id_str = user_id.map(|id| id.to_string()); - - let mut identities = vec![username]; - if let Some(ref id) = user_id_str { - identities.push(id.as_str()); - } - - if !self.is_any_user_allowed(identities.iter().copied()) { - tracing::warn!( - "Telegram: ignoring message from unauthorized user: username={username}, user_id={}. \ -Allowlist Telegram @username or numeric user ID, then run `zeroclaw onboard --channels-only`.", - user_id_str.as_deref().unwrap_or("unknown") - ); - continue; - } - - let chat_id = message - .get("chat") - .and_then(|c| c.get("id")) - .and_then(serde_json::Value::as_i64) - .map(|id| id.to_string()); - - let Some(chat_id) = chat_id else { - tracing::warn!("Telegram: missing chat_id in message, skipping"); - continue; - }; - - let message_id = message - .get("message_id") - .and_then(|v| v.as_i64()) - .unwrap_or(0); - // Send "typing" indicator immediately when we receive a message let typing_body = serde_json::json!({ - "chat_id": &chat_id, + "chat_id": &msg.reply_target, "action": "typing" }); let _ = self @@ -595,18 +864,6 @@ Allowlist Telegram @username or numeric user ID, then run `zeroclaw onboard --ch .send() .await; // Ignore errors for typing indicator - let msg = ChannelMessage { - id: format!("telegram_{chat_id}_{message_id}"), - sender: username.to_string(), - reply_to: chat_id.clone(), - content: text.to_string(), - channel: "telegram".to_string(), - timestamp: std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs(), - }; - if tx.send(msg).await.is_err() { return Ok(()); } @@ -717,6 +974,107 @@ mod tests { assert!(!ch.is_any_user_allowed(["unknown", "123456789"])); } + #[test] + fn parse_attachment_markers_extracts_multiple_types() { + let message = "Here are files [IMAGE:/tmp/a.png] and [DOCUMENT:https://example.com/a.pdf]"; + let (cleaned, attachments) = parse_attachment_markers(message); + + assert_eq!(cleaned, "Here are files and"); + assert_eq!(attachments.len(), 2); + assert_eq!(attachments[0].kind, TelegramAttachmentKind::Image); + assert_eq!(attachments[0].target, "/tmp/a.png"); + assert_eq!(attachments[1].kind, TelegramAttachmentKind::Document); + assert_eq!(attachments[1].target, "https://example.com/a.pdf"); + } + + #[test] + fn parse_attachment_markers_keeps_invalid_markers_in_text() { + let message = "Report [UNKNOWN:/tmp/a.bin]"; + let (cleaned, attachments) = parse_attachment_markers(message); + + assert_eq!(cleaned, "Report [UNKNOWN:/tmp/a.bin]"); + assert!(attachments.is_empty()); + } + + #[test] + fn parse_path_only_attachment_detects_existing_file() { + let dir = tempfile::tempdir().unwrap(); + let image_path = dir.path().join("snap.png"); + std::fs::write(&image_path, b"fake-png").unwrap(); + + let parsed = parse_path_only_attachment(image_path.to_string_lossy().as_ref()) + .expect("expected attachment"); + + assert_eq!(parsed.kind, TelegramAttachmentKind::Image); + assert_eq!(parsed.target, image_path.to_string_lossy()); + } + + #[test] + fn parse_path_only_attachment_rejects_sentence_text() { + assert!(parse_path_only_attachment("Screenshot saved to /tmp/snap.png").is_none()); + } + + #[test] + fn infer_attachment_kind_from_target_detects_document_extension() { + assert_eq!( + infer_attachment_kind_from_target("https://example.com/files/specs.pdf?download=1"), + Some(TelegramAttachmentKind::Document) + ); + } + + #[test] + fn parse_update_message_uses_chat_id_as_reply_target() { + let ch = TelegramChannel::new("token".into(), vec!["*".into()]); + let update = serde_json::json!({ + "update_id": 1, + "message": { + "message_id": 33, + "text": "hello", + "from": { + "id": 555, + "username": "alice" + }, + "chat": { + "id": -100200300 + } + } + }); + + let msg = ch + .parse_update_message(&update) + .expect("message should parse"); + + assert_eq!(msg.sender, "alice"); + assert_eq!(msg.reply_target, "-100200300"); + assert_eq!(msg.content, "hello"); + assert_eq!(msg.id, "telegram_-100200300_33"); + } + + #[test] + fn parse_update_message_allows_numeric_id_without_username() { + let ch = TelegramChannel::new("token".into(), vec!["555".into()]); + let update = serde_json::json!({ + "update_id": 2, + "message": { + "message_id": 9, + "text": "ping", + "from": { + "id": 555 + }, + "chat": { + "id": 12345 + } + } + }); + + let msg = ch + .parse_update_message(&update) + .expect("numeric allowlist should pass"); + + assert_eq!(msg.sender, "555"); + assert_eq!(msg.reply_target, "12345"); + } + // ── File sending API URL tests ────────────────────────────────── #[test] diff --git a/src/channels/traits.rs b/src/channels/traits.rs index c41442e..1c44bf6 100644 --- a/src/channels/traits.rs +++ b/src/channels/traits.rs @@ -5,9 +5,7 @@ use async_trait::async_trait; pub struct ChannelMessage { pub id: String, pub sender: String, - /// Channel-specific reply address (e.g. Telegram chat_id, Discord channel_id, Slack channel). - /// Used by `Channel::send()` to route the reply to the correct destination. - pub reply_to: String, + pub reply_target: String, pub content: String, pub channel: String, pub timestamp: u64, @@ -65,7 +63,7 @@ mod tests { tx.send(ChannelMessage { id: "1".into(), sender: "tester".into(), - reply_to: "tester".into(), + reply_target: "tester".into(), content: "hello".into(), channel: "dummy".into(), timestamp: 123, @@ -80,7 +78,7 @@ mod tests { let message = ChannelMessage { id: "42".into(), sender: "alice".into(), - reply_to: "alice".into(), + reply_target: "alice".into(), content: "ping".into(), channel: "dummy".into(), timestamp: 999, @@ -89,6 +87,7 @@ mod tests { let cloned = message.clone(); assert_eq!(cloned.id, "42"); assert_eq!(cloned.sender, "alice"); + assert_eq!(cloned.reply_target, "alice"); assert_eq!(cloned.content, "ping"); assert_eq!(cloned.channel, "dummy"); assert_eq!(cloned.timestamp, 999); diff --git a/src/channels/whatsapp.rs b/src/channels/whatsapp.rs index de8230a..7825b96 100644 --- a/src/channels/whatsapp.rs +++ b/src/channels/whatsapp.rs @@ -119,8 +119,8 @@ impl WhatsAppChannel { messages.push(ChannelMessage { id: Uuid::new_v4().to_string(), - sender: normalized_from.clone(), - reply_to: normalized_from, + reply_target: normalized_from.clone(), + sender: normalized_from, content, channel: "whatsapp".to_string(), timestamp, diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 86111da..264a16e 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -862,7 +862,7 @@ mod tests { let msg = ChannelMessage { id: "wamid-123".into(), sender: "+1234567890".into(), - reply_to: "+1234567890".into(), + reply_target: "+1234567890".into(), content: "hello".into(), channel: "whatsapp".into(), timestamp: 1,