feat(channel): stream LLM responses to Telegram via draft message edits

Wire the existing provider-layer streaming infrastructure through the
channel trait and agent loop so Telegram users see tokens arrive
progressively via editMessageText, instead of waiting for the full
response.

Changes:
- Add StreamMode enum (off/partial/block) and draft_update_interval_ms
  to TelegramConfig (backward-compatible defaults: off, 1000ms)
- Add supports_draft_updates/send_draft/update_draft/finalize_draft to
  Channel trait with no-op defaults (zero impact on existing channels)
- Implement draft methods on TelegramChannel using sendMessage +
  editMessageText with rate limiting and Markdown fallback
- Add on_delta mpsc::Sender<String> parameter to run_tool_call_loop
  (None preserves existing behavior)
- Wire streaming in process_channel_message: when channel supports
  drafts, send initial draft, spawn updater task, finalize on completion

Edge cases handled:
- 4096-char limit: finalize draft and fall back to chunked send
- Broken Markdown: use no parse_mode during streaming, apply on finalize
- Edit failures: fall back to sending complete response as new message
- Rate limiting: configurable draft_update_interval_ms (default 1s)
This commit is contained in:
Xiangjun Ma 2026-02-17 23:46:32 -08:00 committed by Chummy
parent a0b277b21e
commit 118cd53922
12 changed files with 410 additions and 43 deletions

View file

@ -221,6 +221,64 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
history.push(ChatMessage::system(instructions));
}
// Determine if this channel supports streaming draft updates
let use_streaming = target_channel
.as_ref()
.map_or(false, |ch| ch.supports_draft_updates());
// Set up streaming channel if supported
let (delta_tx, delta_rx) = if use_streaming {
let (tx, rx) = tokio::sync::mpsc::channel::<String>(64);
(Some(tx), Some(rx))
} else {
(None, None)
};
// Send initial draft message if streaming
let draft_message_id = if use_streaming {
if let Some(channel) = target_channel.as_ref() {
match channel
.send_draft(&SendMessage::new("...", &msg.reply_target))
.await
{
Ok(id) => id,
Err(e) => {
tracing::debug!("Failed to send draft on {}: {e}", channel.name());
None
}
}
} else {
None
}
} else {
None
};
// Spawn a task to forward streaming deltas to draft updates
let draft_updater = if let (Some(mut rx), Some(draft_id_ref), Some(channel_ref)) = (
delta_rx,
draft_message_id.as_deref(),
target_channel.as_ref(),
) {
let channel = Arc::clone(channel_ref);
let reply_target = msg.reply_target.clone();
let draft_id = draft_id_ref.to_string();
Some(tokio::spawn(async move {
let mut accumulated = String::new();
while let Some(delta) = rx.recv().await {
accumulated.push_str(&delta);
if let Err(e) = channel
.update_draft(&reply_target, &draft_id, &accumulated)
.await
{
tracing::debug!("Draft update failed: {e}");
}
}
}))
} else {
None
};
let llm_result = tokio::time::timeout(
Duration::from_secs(CHANNEL_MESSAGE_TIMEOUT_SECS),
run_tool_call_loop(
@ -231,14 +289,20 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
"channel-runtime",
ctx.model.as_str(),
ctx.temperature,
true, // silent — channels don't write to stdout
true,
None,
msg.channel.as_str(),
ctx.max_tool_iterations,
delta_tx,
),
)
.await;
// Wait for draft updater to finish
if let Some(handle) = draft_updater {
let _ = handle.await;
}
if let Some(channel) = target_channel.as_ref() {
if let Err(e) = channel.stop_typing(&msg.reply_target).await {
tracing::debug!("Failed to stop typing on {}: {e}", channel.name());
@ -253,7 +317,17 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
truncate_with_ellipsis(&response, 80)
);
if let Some(channel) = target_channel.as_ref() {
if let Err(e) = channel
if let Some(ref draft_id) = draft_message_id {
if let Err(e) = channel
.finalize_draft(&msg.reply_target, draft_id, &response)
.await
{
tracing::warn!("Failed to finalize draft: {e}; sending as new message");
let _ = channel
.send(&SendMessage::new(&response, &msg.reply_target))
.await;
}
} else if let Err(e) = channel
.send(&SendMessage::new(response, &msg.reply_target))
.await
{
@ -267,12 +341,18 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
started_at.elapsed().as_millis()
);
if let Some(channel) = target_channel.as_ref() {
let _ = channel
.send(&SendMessage::new(
format!("⚠️ Error: {e}"),
&msg.reply_target,
))
.await;
if let Some(ref draft_id) = draft_message_id {
let _ = channel
.finalize_draft(&msg.reply_target, draft_id, &format!("⚠️ Error: {e}"))
.await;
} else {
let _ = channel
.send(&SendMessage::new(
format!("⚠️ Error: {e}"),
&msg.reply_target,
))
.await;
}
}
}
Err(_) => {
@ -286,12 +366,17 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
started_at.elapsed().as_millis()
);
if let Some(channel) = target_channel.as_ref() {
let _ = channel
.send(&SendMessage::new(
"⚠️ Request timed out while waiting for the model. Please try again.",
&msg.reply_target,
))
.await;
let error_text =
"⚠️ Request timed out while waiting for the model. Please try again.";
if let Some(ref draft_id) = draft_message_id {
let _ = channel
.finalize_draft(&msg.reply_target, draft_id, error_text)
.await;
} else {
let _ = channel
.send(&SendMessage::new(error_text, &msg.reply_target))
.await;
}
}
}
}
@ -797,10 +882,10 @@ pub async fn doctor_channels(config: Config) -> Result<()> {
if let Some(ref tg) = config.channels_config.telegram {
channels.push((
"Telegram",
Arc::new(TelegramChannel::new(
tg.bot_token.clone(),
tg.allowed_users.clone(),
)),
Arc::new(
TelegramChannel::new(tg.bot_token.clone(), tg.allowed_users.clone())
.with_streaming(tg.stream_mode, tg.draft_update_interval_ms),
),
));
}
@ -1117,10 +1202,10 @@ pub async fn start_channels(config: Config) -> Result<()> {
let mut channels: Vec<Arc<dyn Channel>> = Vec::new();
if let Some(ref tg) = config.channels_config.telegram {
channels.push(Arc::new(TelegramChannel::new(
tg.bot_token.clone(),
tg.allowed_users.clone(),
)));
channels.push(Arc::new(
TelegramChannel::new(tg.bot_token.clone(), tg.allowed_users.clone())
.with_streaming(tg.stream_mode, tg.draft_update_interval_ms),
));
}
if let Some(ref dc) = config.channels_config.discord {

View file

@ -1,5 +1,5 @@
use super::traits::{Channel, ChannelMessage, SendMessage};
use crate::config::Config;
use crate::config::{Config, StreamMode};
use crate::security::pairing::PairingGuard;
use anyhow::Context;
use async_trait::async_trait;
@ -237,6 +237,9 @@ pub struct TelegramChannel {
pairing: Option<PairingGuard>,
client: reqwest::Client,
typing_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
stream_mode: StreamMode,
draft_update_interval_ms: u64,
last_draft_edit: Mutex<Option<std::time::Instant>>,
}
impl TelegramChannel {
@ -258,10 +261,33 @@ impl TelegramChannel {
allowed_users: Arc::new(RwLock::new(normalized_allowed)),
pairing,
client: reqwest::Client::new(),
stream_mode: StreamMode::Off,
draft_update_interval_ms: 1000,
last_draft_edit: Mutex::new(None),
typing_handle: Mutex::new(None),
}
}
/// Configure streaming mode for progressive draft updates.
pub fn with_streaming(
mut self,
stream_mode: StreamMode,
draft_update_interval_ms: u64,
) -> Self {
self.stream_mode = stream_mode;
self.draft_update_interval_ms = draft_update_interval_ms;
self
}
/// Parse reply_target into (chat_id, optional thread_id).
fn parse_reply_target(reply_target: &str) -> (String, Option<String>) {
if let Some((chat_id, thread_id)) = reply_target.split_once(':') {
(chat_id.to_string(), Some(thread_id.to_string()))
} else {
(reply_target.to_string(), None)
}
}
fn normalize_identity(value: &str) -> String {
value.trim().trim_start_matches('@').to_string()
}
@ -1182,6 +1208,170 @@ impl Channel for TelegramChannel {
"telegram"
}
fn supports_draft_updates(&self) -> bool {
self.stream_mode != StreamMode::Off
}
async fn send_draft(&self, message: &SendMessage) -> anyhow::Result<Option<String>> {
if self.stream_mode == StreamMode::Off {
return Ok(None);
}
let (chat_id, thread_id) = Self::parse_reply_target(&message.recipient);
let initial_text = if message.content.is_empty() {
"...".to_string()
} else {
message.content.clone()
};
let mut body = serde_json::json!({
"chat_id": chat_id,
"text": initial_text,
});
if let Some(tid) = thread_id {
body["message_thread_id"] = serde_json::Value::String(tid.to_string());
}
let resp = self
.client
.post(self.api_url("sendMessage"))
.json(&body)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await.unwrap_or_default();
anyhow::bail!("Telegram sendMessage (draft) failed: {err}");
}
let resp_json: serde_json::Value = resp.json().await?;
let message_id = resp_json
.get("result")
.and_then(|r| r.get("message_id"))
.and_then(|id| id.as_i64())
.map(|id| id.to_string());
*self.last_draft_edit.lock() = Some(std::time::Instant::now());
Ok(message_id)
}
async fn update_draft(
&self,
recipient: &str,
message_id: &str,
text: &str,
) -> anyhow::Result<()> {
// Rate-limit edits
{
let last = self.last_draft_edit.lock();
if let Some(last_time) = *last {
let elapsed = u64::try_from(last_time.elapsed().as_millis()).unwrap_or(u64::MAX);
if elapsed < self.draft_update_interval_ms {
return Ok(());
}
}
}
let (chat_id, _) = Self::parse_reply_target(recipient);
// Truncate to Telegram limit for mid-stream edits
let display_text = if text.len() > TELEGRAM_MAX_MESSAGE_LENGTH {
&text[..TELEGRAM_MAX_MESSAGE_LENGTH]
} else {
text
};
let body = serde_json::json!({
"chat_id": chat_id,
"message_id": message_id.parse::<i64>().unwrap_or(0),
"text": display_text,
});
let resp = self
.client
.post(self.api_url("editMessageText"))
.json(&body)
.send()
.await?;
if resp.status().is_success() {
*self.last_draft_edit.lock() = Some(std::time::Instant::now());
} else {
let status = resp.status();
let err = resp.text().await.unwrap_or_default();
tracing::debug!("Telegram editMessageText failed ({status}): {err}");
}
Ok(())
}
async fn finalize_draft(
&self,
recipient: &str,
message_id: &str,
text: &str,
) -> anyhow::Result<()> {
let (chat_id, thread_id) = Self::parse_reply_target(recipient);
// If text exceeds limit, delete draft and send as chunked messages
if text.len() > TELEGRAM_MAX_MESSAGE_LENGTH {
// Delete the draft
let _ = self
.client
.post(self.api_url("deleteMessage"))
.json(&serde_json::json!({
"chat_id": chat_id,
"message_id": message_id.parse::<i64>().unwrap_or(0),
}))
.send()
.await;
// Fall back to chunked send
return self
.send_text_chunks(text, &chat_id, thread_id.as_deref())
.await;
}
// Try editing with Markdown formatting
let mut body = serde_json::json!({
"chat_id": chat_id,
"message_id": message_id.parse::<i64>().unwrap_or(0),
"text": text,
"parse_mode": "Markdown",
});
let resp = self
.client
.post(self.api_url("editMessageText"))
.json(&body)
.send()
.await?;
if resp.status().is_success() {
return Ok(());
}
// Markdown failed — retry without parse_mode
body.as_object_mut().unwrap().remove("parse_mode");
let resp = self
.client
.post(self.api_url("editMessageText"))
.json(&body)
.send()
.await?;
if resp.status().is_success() {
return Ok(());
}
// Edit failed entirely — fall back to new message
tracing::warn!("Telegram finalize_draft edit failed; falling back to sendMessage");
self.send_text_chunks(text, &chat_id, thread_id.as_deref())
.await
}
async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
// Strip tool_call tags before processing to prevent Markdown parsing failures
let content = strip_tool_call_tags(&message.content);

View file

@ -70,6 +70,36 @@ pub trait Channel: Send + Sync {
async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> {
Ok(())
}
/// Whether this channel supports progressive message updates via draft edits.
fn supports_draft_updates(&self) -> bool {
false
}
/// Send an initial draft message. Returns a platform-specific message ID for later edits.
async fn send_draft(&self, _message: &SendMessage) -> anyhow::Result<Option<String>> {
Ok(None)
}
/// Update a previously sent draft message with new accumulated content.
async fn update_draft(
&self,
_recipient: &str,
_message_id: &str,
_text: &str,
) -> anyhow::Result<()> {
Ok(())
}
/// Finalize a draft with the complete response (e.g. apply Markdown formatting).
async fn finalize_draft(
&self,
_recipient: &str,
_message_id: &str,
_text: &str,
) -> anyhow::Result<()> {
Ok(())
}
}
#[cfg(test)]
@ -138,6 +168,23 @@ mod tests {
.is_ok());
}
#[tokio::test]
async fn default_draft_methods_return_success() {
let channel = DummyChannel;
assert!(!channel.supports_draft_updates());
assert!(channel
.send_draft(&SendMessage::new("draft", "bob"))
.await
.unwrap()
.is_none());
assert!(channel.update_draft("bob", "msg_1", "text").await.is_ok());
assert!(channel
.finalize_draft("bob", "msg_1", "final text")
.await
.is_ok());
}
#[tokio::test]
async fn listen_sends_message_to_channel() {
let channel = DummyChannel;