From dbebd48dfec076d8c3685839c1b4f6298c166da6 Mon Sep 17 00:00:00 2001 From: Kieran Date: Tue, 17 Feb 2026 14:37:03 +0000 Subject: [PATCH] refactor(channel): accept SendMessage struct in Channel::send() Refactor the Channel trait to accept a SendMessage struct instead of separate message and recipient string parameters. This enables passing additional metadata like email subjects. Changes: - Add SendMessage struct with content, recipient, and optional subject - Update Channel::send() signature to accept &SendMessage - Update all 12 channel implementations - Update call sites in channels/mod.rs and gateway/mod.rs Subject field usage: - Email: uses subject for email subject line - DingTalk: uses subject as markdown message title - All others: ignore subject (no native platform support) --- src/channels/cli.rs | 22 ++++++++++--- src/channels/dingtalk.rs | 16 +++++----- src/channels/discord.rs | 12 +++++--- src/channels/email_channel.rs | 21 +++++++------ src/channels/imessage.rs | 10 +++--- src/channels/irc.rs | 10 +++--- src/channels/lark.rs | 8 ++--- src/channels/matrix.rs | 6 ++-- src/channels/mod.rs | 19 +++++++----- src/channels/slack.rs | 8 ++--- src/channels/telegram.rs | 17 +++++----- src/channels/traits.rs | 58 +++++++++++++++++++++++++++++++++-- src/channels/whatsapp.rs | 11 ++++--- src/gateway/mod.rs | 8 ++--- 14 files changed, 153 insertions(+), 73 deletions(-) diff --git a/src/channels/cli.rs b/src/channels/cli.rs index 46ee474..ae49548 100644 --- a/src/channels/cli.rs +++ b/src/channels/cli.rs @@ -1,4 +1,4 @@ -use super::traits::{Channel, ChannelMessage}; +use super::traits::{Channel, ChannelMessage, SendMessage}; use async_trait::async_trait; use tokio::io::{self, AsyncBufReadExt, BufReader}; use uuid::Uuid; @@ -18,8 +18,8 @@ impl Channel for CliChannel { "cli" } - async fn send(&self, message: &str, _recipient: &str) -> anyhow::Result<()> { - println!("{message}"); + async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { + println!("{}", message.content); Ok(()) } @@ -69,14 +69,26 @@ mod tests { #[tokio::test] async fn cli_channel_send_does_not_panic() { let ch = CliChannel::new(); - let result = ch.send("hello", "user").await; + let result = ch + .send(&SendMessage { + content: "hello".into(), + recipient: "user".into(), + subject: None, + }) + .await; assert!(result.is_ok()); } #[tokio::test] async fn cli_channel_send_empty_message() { let ch = CliChannel::new(); - let result = ch.send("", "").await; + let result = ch + .send(&SendMessage { + content: String::new(), + recipient: String::new(), + subject: None, + }) + .await; assert!(result.is_ok()); } diff --git a/src/channels/dingtalk.rs b/src/channels/dingtalk.rs index 7473bb3..c32db17 100644 --- a/src/channels/dingtalk.rs +++ b/src/channels/dingtalk.rs @@ -1,4 +1,4 @@ -use super::traits::{Channel, ChannelMessage}; +use super::traits::{Channel, ChannelMessage, SendMessage}; use async_trait::async_trait; use futures_util::{SinkExt, StreamExt}; use std::collections::HashMap; @@ -84,20 +84,22 @@ impl Channel for DingTalkChannel { "dingtalk" } - async fn send(&self, message: &str, recipient: &str) -> anyhow::Result<()> { + async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { let webhooks = self.session_webhooks.read().await; - let webhook_url = webhooks.get(recipient).ok_or_else(|| { + let webhook_url = webhooks.get(&message.recipient).ok_or_else(|| { anyhow::anyhow!( - "No session webhook found for chat {recipient}. \ - The user must send a message first to establish a session." + "No session webhook found for chat {}. \ + The user must send a message first to establish a session.", + message.recipient ) })?; + let title = message.subject.as_deref().unwrap_or("ZeroClaw"); let body = serde_json::json!({ "msgtype": "markdown", "markdown": { - "title": "ZeroClaw", - "text": message, + "title": title, + "text": message.content, } }); diff --git a/src/channels/discord.rs b/src/channels/discord.rs index c4d0191..32233e5 100644 --- a/src/channels/discord.rs +++ b/src/channels/discord.rs @@ -1,4 +1,4 @@ -use super::traits::{Channel, ChannelMessage}; +use super::traits::{Channel, ChannelMessage, SendMessage}; use async_trait::async_trait; use futures_util::{SinkExt, StreamExt}; use serde_json::json; @@ -185,11 +185,15 @@ impl Channel for DiscordChannel { "discord" } - async fn send(&self, message: &str, channel_id: &str) -> anyhow::Result<()> { - let chunks = split_message_for_discord(message); + async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { + let chunks = split_message_for_discord(&message.content); for (i, chunk) in chunks.iter().enumerate() { - let url = format!("https://discord.com/api/v10/channels/{channel_id}/messages"); + let url = format!( + "https://discord.com/api/v10/channels/{}/messages", + message.recipient + ); + let body = json!({ "content": chunk }); let resp = self diff --git a/src/channels/email_channel.rs b/src/channels/email_channel.rs index e59e0ac..8d06370 100644 --- a/src/channels/email_channel.rs +++ b/src/channels/email_channel.rs @@ -25,7 +25,7 @@ use tokio::time::{interval, sleep}; use tracing::{error, info, warn}; use uuid::Uuid; -use super::traits::{Channel, ChannelMessage}; +use super::traits::{Channel, ChannelMessage, SendMessage}; /// Email channel configuration #[derive(Debug, Clone, Serialize, Deserialize)] @@ -375,26 +375,29 @@ impl Channel for EmailChannel { "email" } - async fn send(&self, message: &str, recipient: &str) -> Result<()> { - let (subject, body) = if message.starts_with("Subject: ") { - if let Some(pos) = message.find('\n') { - (&message[9..pos], message[pos + 1..].trim()) + async fn send(&self, message: &SendMessage) -> Result<()> { + // Use explicit subject if provided, otherwise fall back to legacy parsing or default + let (subject, body) = if let Some(ref subj) = message.subject { + (subj.as_str(), message.content.as_str()) + } else if message.content.starts_with("Subject: ") { + if let Some(pos) = message.content.find('\n') { + (&message.content[9..pos], message.content[pos + 1..].trim()) } else { - ("ZeroClaw Message", message) + ("ZeroClaw Message", message.content.as_str()) } } else { - ("ZeroClaw Message", message) + ("ZeroClaw Message", message.content.as_str()) }; let email = Message::builder() .from(self.config.from_address.parse()?) - .to(recipient.parse()?) + .to(message.recipient.parse()?) .subject(subject) .singlepart(SinglePart::plain(body.to_string()))?; let transport = self.create_smtp_transport()?; transport.send(&email)?; - info!("Email sent to {}", recipient); + info!("Email sent to {}", message.recipient); Ok(()) } diff --git a/src/channels/imessage.rs b/src/channels/imessage.rs index 36bf72f..8dbd614 100644 --- a/src/channels/imessage.rs +++ b/src/channels/imessage.rs @@ -1,4 +1,4 @@ -use crate::channels::traits::{Channel, ChannelMessage}; +use crate::channels::traits::{Channel, ChannelMessage, SendMessage}; use async_trait::async_trait; use directories::UserDirs; use rusqlite::{Connection, OpenFlags}; @@ -95,9 +95,9 @@ impl Channel for IMessageChannel { "imessage" } - async fn send(&self, message: &str, target: &str) -> anyhow::Result<()> { + async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { // Defense-in-depth: validate target format before any interpolation - if !is_valid_imessage_target(target) { + if !is_valid_imessage_target(&message.recipient) { anyhow::bail!( "Invalid iMessage target: must be a phone number (+1234567890) or email (user@example.com)" ); @@ -105,8 +105,8 @@ impl Channel for IMessageChannel { // SECURITY: Escape both message AND target to prevent AppleScript injection // See: CWE-78 (OS Command Injection) - let escaped_msg = escape_applescript(message); - let escaped_target = escape_applescript(target); + let escaped_msg = escape_applescript(&message.content); + let escaped_target = escape_applescript(&message.recipient); let script = format!( r#"tell application "Messages" diff --git a/src/channels/irc.rs b/src/channels/irc.rs index 61a48cc..2e03378 100644 --- a/src/channels/irc.rs +++ b/src/channels/irc.rs @@ -1,4 +1,4 @@ -use crate::channels::traits::{Channel, ChannelMessage}; +use crate::channels::traits::{Channel, ChannelMessage, SendMessage}; use async_trait::async_trait; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -345,7 +345,7 @@ impl Channel for IrcChannel { "irc" } - async fn send(&self, message: &str, recipient: &str) -> anyhow::Result<()> { + async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { let mut guard = self.writer.lock().await; let writer = guard .as_mut() @@ -353,12 +353,12 @@ impl Channel for IrcChannel { // Calculate safe payload size: // 512 - sender prefix (~64 bytes for :nick!user@host) - "PRIVMSG " - target - " :" - "\r\n" - let overhead = SENDER_PREFIX_RESERVE + 10 + recipient.len() + 2; + let overhead = SENDER_PREFIX_RESERVE + 10 + message.recipient.len() + 2; let max_payload = 512_usize.saturating_sub(overhead); - let chunks = split_message(message, max_payload); + let chunks = split_message(&message.content, max_payload); for chunk in chunks { - Self::send_raw(writer, &format!("PRIVMSG {recipient} :{chunk}")).await?; + Self::send_raw(writer, &format!("PRIVMSG {} :{chunk}", message.recipient)).await?; } Ok(()) diff --git a/src/channels/lark.rs b/src/channels/lark.rs index 4be8f20..c8d6cdb 100644 --- a/src/channels/lark.rs +++ b/src/channels/lark.rs @@ -1,4 +1,4 @@ -use super::traits::{Channel, ChannelMessage}; +use super::traits::{Channel, ChannelMessage, SendMessage}; use async_trait::async_trait; use futures_util::{SinkExt, StreamExt}; use prost::Message as ProstMessage; @@ -630,13 +630,13 @@ impl Channel for LarkChannel { "lark" } - async fn send(&self, message: &str, recipient: &str) -> anyhow::Result<()> { + async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { let token = self.get_tenant_access_token().await?; let url = self.send_message_url(); - let content = serde_json::json!({ "text": message }).to_string(); + let content = serde_json::json!({ "text": message.content }).to_string(); let body = serde_json::json!({ - "receive_id": recipient, + "receive_id": message.recipient, "msg_type": "text", "content": content, }); diff --git a/src/channels/matrix.rs b/src/channels/matrix.rs index 4f34bcf..9b327d2 100644 --- a/src/channels/matrix.rs +++ b/src/channels/matrix.rs @@ -1,4 +1,4 @@ -use crate::channels::traits::{Channel, ChannelMessage}; +use crate::channels::traits::{Channel, ChannelMessage, SendMessage}; use async_trait::async_trait; use reqwest::Client; use serde::Deserialize; @@ -117,7 +117,7 @@ impl Channel for MatrixChannel { "matrix" } - async fn send(&self, message: &str, _target: &str) -> anyhow::Result<()> { + async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { let txn_id = format!("zc_{}", chrono::Utc::now().timestamp_millis()); let url = format!( "{}/_matrix/client/v3/rooms/{}/send/m.room.message/{}", @@ -126,7 +126,7 @@ impl Channel for MatrixChannel { let body = serde_json::json!({ "msgtype": "m.text", - "body": message + "body": message.content }); let resp = self diff --git a/src/channels/mod.rs b/src/channels/mod.rs index cb293cd..e74f631 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -25,7 +25,7 @@ pub use qq::QQChannel; pub use signal::SignalChannel; pub use slack::SlackChannel; pub use telegram::TelegramChannel; -pub use traits::Channel; +pub use traits::{Channel, SendMessage}; pub use whatsapp::WhatsAppChannel; use crate::agent::loop_::{build_tool_instructions, run_tool_call_loop}; @@ -235,7 +235,10 @@ async fn process_channel_message(ctx: Arc, msg: traits::C truncate_with_ellipsis(&response, 80) ); if let Some(channel) = target_channel.as_ref() { - if let Err(e) = channel.send(&response, &msg.reply_target).await { + if let Err(e) = channel + .send(&SendMessage::new(response, &msg.reply_target)) + .await + { eprintln!(" ❌ Failed to reply on {}: {e}", channel.name()); } } @@ -247,7 +250,7 @@ async fn process_channel_message(ctx: Arc, msg: traits::C ); if let Some(channel) = target_channel.as_ref() { let _ = channel - .send(&format!("⚠️ Error: {e}"), &msg.reply_target) + .send(&SendMessage::new(format!("⚠️ Error: {e}"), &msg.reply_target)) .await; } } @@ -263,10 +266,10 @@ async fn process_channel_message(ctx: Arc, msg: traits::C ); if let Some(channel) = target_channel.as_ref() { let _ = channel - .send( + .send(&SendMessage::new( "⚠️ Request timed out while waiting for the model. Please try again.", &msg.reply_target, - ) + )) .await; } } @@ -1310,11 +1313,11 @@ mod tests { "test-channel" } - async fn send(&self, message: &str, recipient: &str) -> anyhow::Result<()> { + async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { self.sent_messages .lock() .await - .push(format!("{recipient}:{message}")); + .push(format!("{}:{}", message.recipient, message.content)); Ok(()) } @@ -2089,7 +2092,7 @@ mod tests { self.name } - async fn send(&self, _message: &str, _recipient: &str) -> anyhow::Result<()> { + async fn send(&self, _message: &SendMessage) -> anyhow::Result<()> { Ok(()) } diff --git a/src/channels/slack.rs b/src/channels/slack.rs index 7f8ee51..9faad48 100644 --- a/src/channels/slack.rs +++ b/src/channels/slack.rs @@ -1,4 +1,4 @@ -use super::traits::{Channel, ChannelMessage}; +use super::traits::{Channel, ChannelMessage, SendMessage}; use async_trait::async_trait; /// Slack channel — polls conversations.history via Web API @@ -51,10 +51,10 @@ impl Channel for SlackChannel { "slack" } - async fn send(&self, message: &str, channel: &str) -> anyhow::Result<()> { + async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { let body = serde_json::json!({ - "channel": channel, - "text": message + "channel": message.recipient, + "text": message.content }); let resp = self diff --git a/src/channels/telegram.rs b/src/channels/telegram.rs index c022389..b08f843 100644 --- a/src/channels/telegram.rs +++ b/src/channels/telegram.rs @@ -1,4 +1,4 @@ -use super::traits::{Channel, ChannelMessage}; +use super::traits::{Channel, ChannelMessage, SendMessage}; use crate::config::Config; use crate::security::pairing::PairingGuard; use anyhow::Context; @@ -1049,28 +1049,29 @@ impl Channel for TelegramChannel { "telegram" } - async fn send(&self, message: &str, chat_id: &str) -> anyhow::Result<()> { - let (text_without_markers, attachments) = parse_attachment_markers(message); + async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { + let (text_without_markers, attachments) = parse_attachment_markers(&message.content); if !attachments.is_empty() { if !text_without_markers.is_empty() { - self.send_text_chunks(&text_without_markers, chat_id) + self.send_text_chunks(&text_without_markers, &message.recipient) .await?; } for attachment in &attachments { - self.send_attachment(chat_id, attachment).await?; + self.send_attachment(&message.recipient, attachment).await?; } return Ok(()); } - if let Some(attachment) = parse_path_only_attachment(message) { - self.send_attachment(chat_id, &attachment).await?; + if let Some(attachment) = parse_path_only_attachment(&message.content) { + self.send_attachment(&message.recipient, &attachment).await?; return Ok(()); } - self.send_text_chunks(message, chat_id).await + self.send_text_chunks(&message.content, &message.recipient) + .await } async fn listen(&self, tx: tokio::sync::mpsc::Sender) -> anyhow::Result<()> { diff --git a/src/channels/traits.rs b/src/channels/traits.rs index 1c44bf6..069496f 100644 --- a/src/channels/traits.rs +++ b/src/channels/traits.rs @@ -11,6 +11,58 @@ pub struct ChannelMessage { pub timestamp: u64, } +/// Message to send through a channel +#[derive(Debug, Clone, Default)] +pub struct SendMessage { + pub content: String, + pub recipient: String, + pub subject: Option, +} + +impl SendMessage { + /// Create a new message with content and recipient + pub fn new(content: impl Into, recipient: impl Into) -> Self { + Self { + content: content.into(), + recipient: recipient.into(), + subject: None, + } + } + + /// Create a new message with content, recipient, and subject + pub fn with_subject( + content: impl Into, + recipient: impl Into, + subject: impl Into, + ) -> Self { + Self { + content: content.into(), + recipient: recipient.into(), + subject: Some(subject.into()), + } + } +} + +impl From<&str> for SendMessage { + fn from(content: &str) -> Self { + Self { + content: content.to_string(), + recipient: String::new(), + subject: None, + } + } +} + +impl From<(String, String)> for SendMessage { + fn from(value: (String, String)) -> Self { + Self { + content: value.0, + recipient: value.1, + subject: None, + } + } +} + /// Core channel trait — implement for any messaging platform #[async_trait] pub trait Channel: Send + Sync { @@ -18,7 +70,7 @@ pub trait Channel: Send + Sync { fn name(&self) -> &str; /// Send a message through this channel - async fn send(&self, message: &str, recipient: &str) -> anyhow::Result<()>; + async fn send(&self, message: &SendMessage) -> anyhow::Result<()>; /// Start listening for incoming messages (long-running) async fn listen(&self, tx: tokio::sync::mpsc::Sender) -> anyhow::Result<()>; @@ -52,7 +104,7 @@ mod tests { "dummy" } - async fn send(&self, _message: &str, _recipient: &str) -> anyhow::Result<()> { + async fn send(&self, _message: &SendMessage) -> anyhow::Result<()> { Ok(()) } @@ -100,7 +152,7 @@ mod tests { assert!(channel.health_check().await); assert!(channel.start_typing("bob").await.is_ok()); assert!(channel.stop_typing("bob").await.is_ok()); - assert!(channel.send("hello", "bob").await.is_ok()); + assert!(channel.send(&SendMessage::new("hello", "bob")).await.is_ok()); } #[tokio::test] diff --git a/src/channels/whatsapp.rs b/src/channels/whatsapp.rs index 7825b96..34b8dc5 100644 --- a/src/channels/whatsapp.rs +++ b/src/channels/whatsapp.rs @@ -1,4 +1,4 @@ -use super::traits::{Channel, ChannelMessage}; +use super::traits::{Channel, ChannelMessage, SendMessage}; use async_trait::async_trait; use uuid::Uuid; @@ -139,7 +139,7 @@ impl Channel for WhatsAppChannel { "whatsapp" } - async fn send(&self, message: &str, recipient: &str) -> anyhow::Result<()> { + async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { // WhatsApp Cloud API: POST to /v18.0/{phone_number_id}/messages let url = format!( "https://graph.facebook.com/v18.0/{}/messages", @@ -147,7 +147,10 @@ impl Channel for WhatsAppChannel { ); // Normalize recipient (remove leading + if present for API) - let to = recipient.strip_prefix('+').unwrap_or(recipient); + let to = message + .recipient + .strip_prefix('+') + .unwrap_or(&message.recipient); let body = serde_json::json!({ "messaging_product": "whatsapp", @@ -156,7 +159,7 @@ impl Channel for WhatsAppChannel { "type": "text", "text": { "preview_url": false, - "body": message + "body": message.content } }); diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index b59f6cf..59ae3b0 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -7,7 +7,7 @@ //! - Request timeouts (30s) to prevent slow-loris attacks //! - Header sanitization (handled by axum/hyper) -use crate::channels::{Channel, WhatsAppChannel}; +use crate::channels::{Channel, SendMessage, WhatsAppChannel}; use crate::config::Config; use crate::memory::{self, Memory, MemoryCategory}; use crate::providers::{self, Provider}; @@ -704,17 +704,17 @@ async fn handle_whatsapp_message( { Ok(response) => { // Send reply via WhatsApp - if let Err(e) = wa.send(&response, &msg.reply_target).await { + if let Err(e) = wa.send(&SendMessage::new(response, &msg.reply_target)).await { tracing::error!("Failed to send WhatsApp reply: {e}"); } } Err(e) => { tracing::error!("LLM error for WhatsApp message: {e:#}"); let _ = wa - .send( + .send(&SendMessage::new( "Sorry, I couldn't process your message right now.", &msg.reply_target, - ) + )) .await; } }