fix(telegram): add message splitting, timeout, and validation fixes (#246)
High-priority fixes: - Message length validation and splitting (4096 char limit) - Empty chat_id validation to prevent silent failures - Health check timeout (5s) to prevent service hangs Testing infrastructure: - Comprehensive test suite (20+ automated tests) - Quick smoke test script - Test message generator - Complete testing documentation All changes are backward compatible. Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
50f508766f
commit
4fd1408034
6 changed files with 1325 additions and 48 deletions
|
|
@ -2,8 +2,53 @@ use super::traits::{Channel, ChannelMessage};
|
|||
use async_trait::async_trait;
|
||||
use reqwest::multipart::{Form, Part};
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Telegram's maximum message length for text messages
|
||||
const TELEGRAM_MAX_MESSAGE_LENGTH: usize = 4096;
|
||||
|
||||
/// Split a message into chunks that respect Telegram's 4096 character limit.
|
||||
/// Tries to split at word boundaries when possible, and handles continuation.
|
||||
fn split_message_for_telegram(message: &str) -> Vec<String> {
|
||||
if message.len() <= TELEGRAM_MAX_MESSAGE_LENGTH {
|
||||
return vec![message.to_string()];
|
||||
}
|
||||
|
||||
let mut chunks = Vec::new();
|
||||
let mut remaining = message;
|
||||
|
||||
while !remaining.is_empty() {
|
||||
let chunk_end = if remaining.len() <= TELEGRAM_MAX_MESSAGE_LENGTH {
|
||||
remaining.len()
|
||||
} else {
|
||||
// Try to find a good break point (newline, then space)
|
||||
let search_area = &remaining[..TELEGRAM_MAX_MESSAGE_LENGTH];
|
||||
|
||||
// Prefer splitting at newline
|
||||
if let Some(pos) = search_area.rfind('\n') {
|
||||
// Don't split if the newline is too close to the start
|
||||
if pos >= TELEGRAM_MAX_MESSAGE_LENGTH / 2 {
|
||||
pos + 1
|
||||
} else {
|
||||
// Try space as fallback
|
||||
search_area.rfind(' ').unwrap_or(TELEGRAM_MAX_MESSAGE_LENGTH) + 1
|
||||
}
|
||||
} else if let Some(pos) = search_area.rfind(' ') {
|
||||
pos + 1
|
||||
} else {
|
||||
// Hard split at the limit
|
||||
TELEGRAM_MAX_MESSAGE_LENGTH
|
||||
}
|
||||
};
|
||||
|
||||
chunks.push(remaining[..chunk_end].to_string());
|
||||
remaining = &remaining[chunk_end..];
|
||||
}
|
||||
|
||||
chunks
|
||||
}
|
||||
|
||||
/// Telegram channel — long-polls the Bot API for updates
|
||||
pub struct TelegramChannel {
|
||||
bot_token: String,
|
||||
|
|
@ -370,52 +415,79 @@ impl Channel for TelegramChannel {
|
|||
}
|
||||
|
||||
async fn send(&self, message: &str, chat_id: &str) -> anyhow::Result<()> {
|
||||
let markdown_body = serde_json::json!({
|
||||
"chat_id": chat_id,
|
||||
"text": message,
|
||||
"parse_mode": "Markdown"
|
||||
});
|
||||
// Split message if it exceeds Telegram's 4096 character limit
|
||||
let chunks = split_message_for_telegram(message);
|
||||
|
||||
let markdown_resp = self
|
||||
.client
|
||||
.post(self.api_url("sendMessage"))
|
||||
.json(&markdown_body)
|
||||
.send()
|
||||
.await?;
|
||||
for (i, chunk) in chunks.iter().enumerate() {
|
||||
// Add continuation marker for multi-part messages
|
||||
let text = if chunks.len() > 1 {
|
||||
if i == 0 {
|
||||
format!("{chunk}\n\n(continues...)")
|
||||
} else if i == chunks.len() - 1 {
|
||||
format!("(continued)\n\n{chunk}")
|
||||
} else {
|
||||
format!("(continued)\n\n{chunk}\n\n(continues...)")
|
||||
}
|
||||
} else {
|
||||
chunk.to_string()
|
||||
};
|
||||
|
||||
if markdown_resp.status().is_success() {
|
||||
return Ok(());
|
||||
}
|
||||
let markdown_body = serde_json::json!({
|
||||
"chat_id": chat_id,
|
||||
"text": text,
|
||||
"parse_mode": "Markdown"
|
||||
});
|
||||
|
||||
let markdown_status = markdown_resp.status();
|
||||
let markdown_err = markdown_resp.text().await.unwrap_or_default();
|
||||
tracing::warn!(
|
||||
status = ?markdown_status,
|
||||
"Telegram sendMessage with Markdown failed; retrying without parse_mode"
|
||||
);
|
||||
let markdown_resp = self
|
||||
.client
|
||||
.post(self.api_url("sendMessage"))
|
||||
.json(&markdown_body)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
// Retry without parse_mode as a compatibility fallback.
|
||||
let plain_body = serde_json::json!({
|
||||
"chat_id": chat_id,
|
||||
"text": message,
|
||||
});
|
||||
let plain_resp = self
|
||||
.client
|
||||
.post(self.api_url("sendMessage"))
|
||||
.json(&plain_body)
|
||||
.send()
|
||||
.await?;
|
||||
if markdown_resp.status().is_success() {
|
||||
// Small delay between chunks to avoid rate limiting
|
||||
if i < chunks.len() - 1 {
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if !plain_resp.status().is_success() {
|
||||
let plain_status = plain_resp.status();
|
||||
let plain_err = plain_resp.text().await.unwrap_or_default();
|
||||
anyhow::bail!(
|
||||
"Telegram sendMessage failed (markdown {}: {}; plain {}: {})",
|
||||
markdown_status,
|
||||
markdown_err,
|
||||
plain_status,
|
||||
plain_err
|
||||
let markdown_status = markdown_resp.status();
|
||||
let markdown_err = markdown_resp.text().await.unwrap_or_default();
|
||||
tracing::warn!(
|
||||
status = ?markdown_status,
|
||||
"Telegram sendMessage with Markdown failed; retrying without parse_mode"
|
||||
);
|
||||
|
||||
// Retry without parse_mode as a compatibility fallback.
|
||||
let plain_body = serde_json::json!({
|
||||
"chat_id": chat_id,
|
||||
"text": text,
|
||||
});
|
||||
let plain_resp = self
|
||||
.client
|
||||
.post(self.api_url("sendMessage"))
|
||||
.json(&plain_body)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if !plain_resp.status().is_success() {
|
||||
let plain_status = plain_resp.status();
|
||||
let plain_err = plain_resp.text().await.unwrap_or_default();
|
||||
anyhow::bail!(
|
||||
"Telegram sendMessage failed (markdown {}: {}; plain {}: {})",
|
||||
markdown_status,
|
||||
markdown_err,
|
||||
plain_status,
|
||||
plain_err
|
||||
);
|
||||
}
|
||||
|
||||
// Small delay between chunks to avoid rate limiting
|
||||
if i < chunks.len() - 1 {
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
@ -497,8 +569,12 @@ Allowlist Telegram @username or numeric user ID, then run `zeroclaw onboard --ch
|
|||
.get("chat")
|
||||
.and_then(|c| c.get("id"))
|
||||
.and_then(serde_json::Value::as_i64)
|
||||
.map(|id| id.to_string())
|
||||
.unwrap_or_default();
|
||||
.map(|id| id.to_string());
|
||||
|
||||
let Some(chat_id) = chat_id else {
|
||||
tracing::warn!("Telegram: missing chat_id in message, skipping");
|
||||
continue;
|
||||
};
|
||||
|
||||
// Send "typing" indicator immediately when we receive a message
|
||||
let typing_body = serde_json::json!({
|
||||
|
|
@ -532,12 +608,24 @@ Allowlist Telegram @username or numeric user ID, then run `zeroclaw onboard --ch
|
|||
}
|
||||
|
||||
async fn health_check(&self) -> bool {
|
||||
self.client
|
||||
.get(self.api_url("getMe"))
|
||||
.send()
|
||||
.await
|
||||
.map(|r| r.status().is_success())
|
||||
.unwrap_or(false)
|
||||
let timeout_duration = Duration::from_secs(5);
|
||||
|
||||
match tokio::time::timeout(
|
||||
timeout_duration,
|
||||
self.client.get(self.api_url("getMe")).send(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(resp)) => resp.status().is_success(),
|
||||
Ok(Err(e)) => {
|
||||
tracing::debug!("Telegram health check failed: {e}");
|
||||
false
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::debug!("Telegram health check timed out after 5s");
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -785,6 +873,82 @@ mod tests {
|
|||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
// ── Message splitting tests ─────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn telegram_split_short_message() {
|
||||
let msg = "Hello, world!";
|
||||
let chunks = split_message_for_telegram(msg);
|
||||
assert_eq!(chunks.len(), 1);
|
||||
assert_eq!(chunks[0], msg);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn telegram_split_exact_limit() {
|
||||
let msg = "a".repeat(TELEGRAM_MAX_MESSAGE_LENGTH);
|
||||
let chunks = split_message_for_telegram(&msg);
|
||||
assert_eq!(chunks.len(), 1);
|
||||
assert_eq!(chunks[0].len(), TELEGRAM_MAX_MESSAGE_LENGTH);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn telegram_split_over_limit() {
|
||||
let msg = "a".repeat(TELEGRAM_MAX_MESSAGE_LENGTH + 100);
|
||||
let chunks = split_message_for_telegram(&msg);
|
||||
assert_eq!(chunks.len(), 2);
|
||||
assert!(chunks[0].len() <= TELEGRAM_MAX_MESSAGE_LENGTH);
|
||||
assert!(chunks[1].len() <= TELEGRAM_MAX_MESSAGE_LENGTH);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn telegram_split_at_word_boundary() {
|
||||
let msg = format!(
|
||||
"{} more text here",
|
||||
"word ".repeat(TELEGRAM_MAX_MESSAGE_LENGTH / 5)
|
||||
);
|
||||
let chunks = split_message_for_telegram(&msg);
|
||||
assert!(chunks.len() >= 2);
|
||||
// First chunk should end with a complete word (space at the end)
|
||||
for chunk in &chunks[..chunks.len() - 1] {
|
||||
assert!(chunk.len() <= TELEGRAM_MAX_MESSAGE_LENGTH);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn telegram_split_at_newline() {
|
||||
let text_block = "Line of text\n".repeat(TELEGRAM_MAX_MESSAGE_LENGTH / 13);
|
||||
let chunks = split_message_for_telegram(&text_block);
|
||||
assert!(chunks.len() >= 2);
|
||||
for chunk in chunks {
|
||||
assert!(chunk.len() <= TELEGRAM_MAX_MESSAGE_LENGTH);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn telegram_split_preserves_content() {
|
||||
let msg = "test ".repeat(TELEGRAM_MAX_MESSAGE_LENGTH / 5 + 100);
|
||||
let chunks = split_message_for_telegram(&msg);
|
||||
let rejoined = chunks.join("");
|
||||
assert_eq!(rejoined, msg);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn telegram_split_empty_message() {
|
||||
let chunks = split_message_for_telegram("");
|
||||
assert_eq!(chunks.len(), 1);
|
||||
assert_eq!(chunks[0], "");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn telegram_split_very_long_message() {
|
||||
let msg = "x".repeat(TELEGRAM_MAX_MESSAGE_LENGTH * 3);
|
||||
let chunks = split_message_for_telegram(&msg);
|
||||
assert!(chunks.len() >= 3);
|
||||
for chunk in chunks {
|
||||
assert!(chunk.len() <= TELEGRAM_MAX_MESSAGE_LENGTH);
|
||||
}
|
||||
}
|
||||
|
||||
// ── Caption handling tests ──────────────────────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue