From cc2f85058ef40eacb88fe759bbaa1c47bc821a7b Mon Sep 17 00:00:00 2001 From: AARTE Date: Sat, 14 Feb 2026 16:14:25 +0000 Subject: [PATCH] feat: add WhatsApp and Email channel integrations - WhatsApp Cloud API channel (Meta Business Platform) - Webhook verification, text/media messages, rate limiting - Phone number allowlist (empty=deny, *=allow, specific numbers) - Health check via API - Email channel (IMAP/SMTP over TLS) - IMAP polling for inbound messages - SMTP sending with TLS - Sender allowlist (email, domain, wildcard) - HTML stripping, duplicate detection Both implement ZeroClaw's Channel trait directly. Includes inline unit tests. --- src/channels/email_channel.rs | 349 ++++++++++++++++++++++++++++++++++ src/channels/mod.rs | 2 + src/channels/whatsapp.rs | 248 ++++++++++++++++++++++++ 3 files changed, 599 insertions(+) create mode 100644 src/channels/email_channel.rs create mode 100644 src/channels/whatsapp.rs diff --git a/src/channels/email_channel.rs b/src/channels/email_channel.rs new file mode 100644 index 0000000..66388f9 --- /dev/null +++ b/src/channels/email_channel.rs @@ -0,0 +1,349 @@ +use async_trait::async_trait; +use anyhow::{anyhow, Result}; +use lettre::transport::smtp::authentication::Credentials; +use lettre::{Message, SmtpTransport, Transport}; +use mail_parser::{Message as ParsedMessage, MimeHeaders}; +use std::collections::HashSet; +use std::io::{BufRead, BufReader, Write as IoWrite}; +use std::net::TcpStream; +use std::sync::Mutex; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tokio::sync::mpsc; +use tokio::time::{interval, sleep}; +use tracing::{debug, error, info, warn}; +use uuid::Uuid; + +// Email config — add to config.rs +use super::traits::{Channel, ChannelMessage}; + +/// Email channel — IMAP polling for inbound, SMTP for outbound +pub struct EmailChannel { + pub config: EmailConfig, + seen_messages: Mutex>, +} + +impl EmailChannel { + pub fn new(config: EmailConfig) -> Self { + Self { + config, + seen_messages: Mutex::new(HashSet::new()), + } + } + + /// Check if a sender email is in the allowlist + pub fn is_sender_allowed(&self, email: &str) -> bool { + if self.config.allowed_senders.is_empty() { + return false; // Empty = deny all + } + if self.config.allowed_senders.iter().any(|a| a == "*") { + return true; // Wildcard = allow all + } + self.config.allowed_senders.iter().any(|allowed| { + allowed.eq_ignore_ascii_case(email) + || email.to_lowercase().ends_with(&format!("@{}", allowed.to_lowercase())) + || (allowed.starts_with('@') + && email.to_lowercase().ends_with(&allowed.to_lowercase())) + }) + } + + /// Strip HTML tags from content (basic) + pub fn strip_html(html: &str) -> String { + let mut result = String::new(); + let mut in_tag = false; + for ch in html.chars() { + match ch { + '<' => in_tag = true, + '>' => in_tag = false, + _ if !in_tag => result.push(ch), + _ => {} + } + } + result.split_whitespace().collect::>().join(" ") + } + + /// Extract the sender address from a parsed email + fn extract_sender(parsed: &mail_parser::Message) -> String { + match parsed.from() { + mail_parser::HeaderValue::Address(addr) => { + addr.address.as_ref().map(|a| a.to_string()).unwrap_or_else(|| "unknown".into()) + } + mail_parser::HeaderValue::AddressList(addrs) => { + addrs.first() + .and_then(|a| a.address.as_ref()) + .map(|a| a.to_string()) + .unwrap_or_else(|| "unknown".into()) + } + _ => "unknown".into(), + } + } + + /// Extract readable text from a parsed email + fn extract_text(parsed: &mail_parser::Message) -> String { + if let Some(text) = parsed.body_text(0) { + return text.to_string(); + } + if let Some(html) = parsed.body_html(0) { + return Self::strip_html(html.as_ref()); + } + for part in parsed.attachments() { + let part: &mail_parser::MessagePart = part; + if let Some(ct) = MimeHeaders::content_type(part) { + if ct.ctype() == "text" { + if let Ok(text) = std::str::from_utf8(part.contents()) { + let name = MimeHeaders::attachment_name(part).unwrap_or("file"); + return format!("[Attachment: {}]\n{}", name, text); + } + } + } + } + "(no readable content)".to_string() + } + + /// Fetch unseen emails via IMAP (blocking, run in spawn_blocking) + fn fetch_unseen_imap(config: &EmailConfig) -> Result> { + use rustls::ClientConfig as TlsConfig; + use rustls_pki_types::ServerName; + use std::sync::Arc; + use tokio_rustls::rustls; + + // Connect TCP + let tcp = TcpStream::connect((&*config.imap_host, config.imap_port))?; + tcp.set_read_timeout(Some(Duration::from_secs(30)))?; + + // TLS + let mut root_store = rustls::RootCertStore::empty(); + root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); + let tls_config = Arc::new( + TlsConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth(), + ); + let server_name: ServerName<'_> = + ServerName::try_from(config.imap_host.clone())?; + let conn = + rustls::ClientConnection::new(tls_config, server_name)?; + let mut tls = rustls::StreamOwned::new(conn, tcp); + + let mut read_line = |tls: &mut rustls::StreamOwned| -> Result { + let mut buf = Vec::new(); + loop { + let mut byte = [0u8; 1]; + match std::io::Read::read(tls, &mut byte) { + Ok(0) => return Err(anyhow!("IMAP connection closed")), + Ok(_) => { + buf.push(byte[0]); + if buf.ends_with(b"\r\n") { + return Ok(String::from_utf8_lossy(&buf).to_string()); + } + } + Err(e) => return Err(e.into()), + } + } + }; + + let mut send_cmd = |tls: &mut rustls::StreamOwned, + tag: &str, + cmd: &str| + -> Result> { + let full = format!("{} {}\r\n", tag, cmd); + IoWrite::write_all(tls, full.as_bytes())?; + IoWrite::flush(tls)?; + let mut lines = Vec::new(); + loop { + let line = read_line(tls)?; + let done = line.starts_with(tag); + lines.push(line); + if done { + break; + } + } + Ok(lines) + }; + + // Read greeting + let _greeting = read_line(&mut tls)?; + + // Login + let login_resp = send_cmd( + &mut tls, + "A1", + &format!("LOGIN \"{}\" \"{}\"", config.username, config.password), + )?; + if !login_resp.last().map_or(false, |l| l.contains("OK")) { + return Err(anyhow!("IMAP login failed")); + } + + // Select folder + let _select = send_cmd(&mut tls, "A2", &format!("SELECT \"{}\"", config.imap_folder))?; + + // Search unseen + let search_resp = send_cmd(&mut tls, "A3", "SEARCH UNSEEN")?; + let mut uids: Vec<&str> = Vec::new(); + for line in &search_resp { + if line.starts_with("* SEARCH") { + let parts: Vec<&str> = line.trim().split_whitespace().collect(); + if parts.len() > 2 { + uids.extend_from_slice(&parts[2..]); + } + } + } + + let mut results = Vec::new(); + + for uid in &uids { + // Fetch RFC822 + let fetch_resp = send_cmd(&mut tls, "A4", &format!("FETCH {} RFC822", uid))?; + // Reconstruct the raw email from the response (skip first and last lines) + let raw: String = fetch_resp + .iter() + .skip(1) + .take(fetch_resp.len().saturating_sub(2)) + .cloned() + .collect(); + + if let Some(parsed) = ParsedMessage::parse(raw.as_bytes()) { + let sender = Self::extract_sender(&parsed); + let subject = parsed.subject().unwrap_or("(no subject)").to_string(); + let body = Self::extract_text(&parsed); + let content = format!("Subject: {}\n\n{}", subject, body); + let msg_id = parsed + .message_id() + .map(|s| s.to_string()) + .unwrap_or_else(|| format!("gen-{}", Uuid::new_v4())); + let ts = parsed + .date() + .map(|d| { + // DateTime year/month/day/hour/minute/second + let naive = chrono::NaiveDate::from_ymd_opt( + d.year as i32, d.month as u32, d.day as u32 + ).and_then(|date| date.and_hms_opt(d.hour as u32, d.minute as u32, d.second as u32)); + naive.map(|n| n.and_utc().timestamp() as u64).unwrap_or(0) + }) + .unwrap_or_else(|| { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + }); + + results.push((msg_id, sender, content, ts)); + } + + // Mark as seen + let _ = send_cmd(&mut tls, "A5", &format!("STORE {} +FLAGS (\\Seen)", uid)); + } + + // Logout + let _ = send_cmd(&mut tls, "A6", "LOGOUT"); + + Ok(results) + } + + fn create_smtp_transport(&self) -> Result { + let creds = Credentials::new(self.config.username.clone(), self.config.password.clone()); + let transport = if self.config.smtp_tls { + SmtpTransport::relay(&self.config.smtp_host)? + .port(self.config.smtp_port) + .credentials(creds) + .build() + } else { + SmtpTransport::builder_dangerous(&self.config.smtp_host) + .port(self.config.smtp_port) + .credentials(creds) + .build() + }; + Ok(transport) + } +} + +#[async_trait] +impl Channel for EmailChannel { + fn name(&self) -> &str { + "email" + } + + async fn send(&self, message: &str, recipient: &str) -> Result<()> { + let (subject, body) = if message.starts_with("Subject: ") { + if let Some(pos) = message.find('\n') { + (&message[9..pos], message[pos + 1..].trim()) + } else { + ("ZeroClaw Message", message) + } + } else { + ("ZeroClaw Message", message) + }; + + let email = Message::builder() + .from(self.config.from_address.parse()?) + .to(recipient.parse()?) + .subject(subject) + .body(body.to_string())?; + + let transport = self.create_smtp_transport()?; + transport.send(&email)?; + info!("Email sent to {}", recipient); + Ok(()) + } + + async fn listen(&self, tx: mpsc::Sender) -> Result<()> { + info!( + "Email polling every {}s on {}", + self.config.poll_interval_secs, self.config.imap_folder + ); + let mut tick = interval(Duration::from_secs(self.config.poll_interval_secs)); + let config = self.config.clone(); + + loop { + tick.tick().await; + let cfg = config.clone(); + match tokio::task::spawn_blocking(move || Self::fetch_unseen_imap(&cfg)).await { + Ok(Ok(messages)) => { + for (id, sender, content, ts) in messages { + { + let mut seen = self.seen_messages.lock().unwrap(); + if seen.contains(&id) { + continue; + } + if !self.is_sender_allowed(&sender) { + warn!("Blocked email from {}", sender); + continue; + } + seen.insert(id.clone()); + } // MutexGuard dropped before await + let msg = ChannelMessage { + id, + sender, + content, + channel: "email".to_string(), + timestamp: ts, + }; + if tx.send(msg).await.is_err() { + return Ok(()); + } + } + } + Ok(Err(e)) => { + error!("Email poll failed: {}", e); + sleep(Duration::from_secs(10)).await; + } + Err(e) => { + error!("Email poll task panicked: {}", e); + sleep(Duration::from_secs(10)).await; + } + } + } + } + + async fn health_check(&self) -> bool { + let cfg = self.config.clone(); + match tokio::task::spawn_blocking(move || { + let tcp = TcpStream::connect((&*cfg.imap_host, cfg.imap_port)); + tcp.is_ok() + }) + .await + { + Ok(ok) => ok, + Err(_) => false, + } + } +} diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 7252f7d..87686b7 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -4,6 +4,7 @@ pub mod imessage; pub mod matrix; pub mod slack; pub mod telegram; +pub mod whatsapp; pub mod traits; pub use cli::CliChannel; @@ -12,6 +13,7 @@ pub use imessage::IMessageChannel; pub use matrix::MatrixChannel; pub use slack::SlackChannel; pub use telegram::TelegramChannel; +pub use whatsapp::WhatsAppChannel; pub use traits::Channel; use crate::config::Config; diff --git a/src/channels/whatsapp.rs b/src/channels/whatsapp.rs new file mode 100644 index 0000000..7860d7c --- /dev/null +++ b/src/channels/whatsapp.rs @@ -0,0 +1,248 @@ +use async_trait::async_trait; +use anyhow::{anyhow, Result}; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::{mpsc, RwLock}; +use tracing::{debug, error, info, warn}; + +use super::traits::{Channel, ChannelMessage}; + +const WHATSAPP_API_BASE: &str = "https://graph.facebook.com/v18.0"; + +/// WhatsApp channel configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WhatsAppConfig { + pub phone_number_id: String, + pub access_token: String, + pub verify_token: String, + #[serde(default)] + pub allowed_numbers: Vec, + #[serde(default = "default_webhook_path")] + pub webhook_path: String, + #[serde(default = "default_rate_limit")] + pub rate_limit_per_minute: u32, +} + +fn default_webhook_path() -> String { "/webhook/whatsapp".into() } +fn default_rate_limit() -> u32 { 60 } + +impl Default for WhatsAppConfig { + fn default() -> Self { + Self { + phone_number_id: String::new(), + access_token: String::new(), + verify_token: String::new(), + allowed_numbers: Vec::new(), + webhook_path: default_webhook_path(), + rate_limit_per_minute: default_rate_limit(), + } + } +} + +#[derive(Debug, Deserialize)] +struct WebhookEntry { changes: Vec } +#[derive(Debug, Deserialize)] +struct WebhookChange { value: WebhookValue } +#[derive(Debug, Deserialize)] +struct WebhookValue { + messages: Option>, + statuses: Option>, +} +#[derive(Debug, Deserialize)] +struct WebhookMessage { + from: String, id: String, timestamp: String, + text: Option, + image: Option, + document: Option, +} +#[derive(Debug, Deserialize)] +struct MessageText { body: String } +#[derive(Debug, Deserialize)] +struct MediaMessage { id: String, mime_type: Option, filename: Option } +#[derive(Debug, Deserialize)] +struct MessageStatus { id: String, status: String, timestamp: String, recipient_id: String } + +#[derive(Debug, Serialize)] +struct SendMessageRequest { + messaging_product: String, to: String, + #[serde(rename = "type")] message_type: String, + text: MessageTextBody, +} +#[derive(Debug, Serialize)] +struct MessageTextBody { body: String } + +pub struct WhatsAppChannel { + pub config: WhatsAppConfig, + client: Client, + rate_limiter: Arc>>>, +} + +impl WhatsAppChannel { + pub fn new(config: WhatsAppConfig) -> Self { + Self { + config, + client: Client::builder().timeout(std::time::Duration::from_secs(30)).build().unwrap(), + rate_limiter: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub async fn verify_webhook(&self, mode: &str, token: &str, challenge: &str) -> Result { + if mode == "subscribe" && token == self.config.verify_token { + Ok(challenge.to_string()) + } else { + Err(anyhow!("Webhook verification failed")) + } + } + + pub async fn process_webhook(&self, payload: Value, tx: &mpsc::Sender) -> Result<()> { + let webhook: HashMap = serde_json::from_value(payload)?; + if let Some(entry_array) = webhook.get("entry") { + if let Some(entries) = entry_array.as_array() { + for entry in entries { + if let Ok(e) = serde_json::from_value::(entry.clone()) { + for change in e.changes { + if let Some(messages) = change.value.messages { + for msg in messages { + let _ = self.process_message(msg, tx).await; + } + } + if let Some(statuses) = change.value.statuses { + for s in statuses { + debug!("Status {}: {} for {}", s.id, s.status, s.recipient_id); + } + } + } + } + } + } + } + Ok(()) + } + + async fn process_message(&self, message: WebhookMessage, tx: &mpsc::Sender) -> Result<()> { + if !self.is_sender_allowed(&message.from) { + warn!("Blocked WhatsApp from {}", message.from); + return Ok(()); + } + if !self.check_rate_limit(&message.from).await { + warn!("Rate limited: {}", message.from); + return Ok(()); + } + let content = if let Some(text) = message.text { text.body } + else if message.image.is_some() { "[Image]".into() } + else if message.document.is_some() { "[Document]".into() } + else { "[Unsupported]".into() }; + + let timestamp = message.timestamp.parse::().unwrap_or_else(|_| { + std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() + }); + + let _ = tx.send(ChannelMessage { + id: message.id, sender: message.from, content, + channel: "whatsapp".into(), timestamp, + }).await; + Ok(()) + } + + pub fn is_sender_allowed(&self, phone: &str) -> bool { + if self.config.allowed_numbers.is_empty() { return false; } + if self.config.allowed_numbers.iter().any(|a| a == "*") { return true; } + self.config.allowed_numbers.iter().any(|a| { + a.eq_ignore_ascii_case(phone) || phone.ends_with(a) || a.ends_with(phone) + }) + } + + pub async fn check_rate_limit(&self, phone: &str) -> bool { + let now = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs(); + let mut limiter = self.rate_limiter.write().await; + let timestamps = limiter.entry(phone.to_string()).or_default(); + timestamps.retain(|&t| now - t < 60); + if timestamps.len() >= self.config.rate_limit_per_minute as usize { return false; } + timestamps.push(now); + true + } +} + +#[async_trait] +impl Channel for WhatsAppChannel { + fn name(&self) -> &str { "whatsapp" } + + async fn send(&self, message: &str, recipient: &str) -> Result<()> { + let url = format!("{}/{}/messages", WHATSAPP_API_BASE, self.config.phone_number_id); + let body = json!({ + "messaging_product": "whatsapp", "to": recipient, + "type": "text", "text": {"body": message} + }); + let resp = self.client.post(&url) + .header("Authorization", format!("Bearer {}", self.config.access_token)) + .json(&body).send().await?; + if !resp.status().is_success() { + let err = resp.text().await?; + return Err(anyhow!("WhatsApp API: {}", err)); + } + info!("WhatsApp sent to {}", recipient); + Ok(()) + } + + async fn listen(&self, _tx: mpsc::Sender) -> Result<()> { + info!("WhatsApp webhook path: {}", self.config.webhook_path); + // Webhooks handled by gateway HTTP server — process_webhook() called externally + Ok(()) + } + + async fn health_check(&self) -> bool { + let url = format!("{}/{}", WHATSAPP_API_BASE, self.config.phone_number_id); + self.client.get(&url) + .header("Authorization", format!("Bearer {}", self.config.access_token)) + .send().await + .map(|r| r.status().is_success() || r.status().as_u16() == 404) + .unwrap_or(false) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn wildcard() -> WhatsAppConfig { + WhatsAppConfig { + phone_number_id: "123".into(), access_token: "tok".into(), + verify_token: "verify".into(), allowed_numbers: vec!["*".into()], + ..Default::default() + } + } + + #[test] fn name() { assert_eq!(WhatsAppChannel::new(wildcard()).name(), "whatsapp"); } + #[test] fn allow_wildcard() { assert!(WhatsAppChannel::new(wildcard()).is_sender_allowed("any")); } + #[test] fn deny_empty() { + let mut c = wildcard(); c.allowed_numbers = vec![]; + assert!(!WhatsAppChannel::new(c).is_sender_allowed("any")); + } + #[tokio::test] async fn verify_ok() { + let ch = WhatsAppChannel::new(wildcard()); + assert_eq!(ch.verify_webhook("subscribe", "verify", "ch").await.unwrap(), "ch"); + } + #[tokio::test] async fn verify_bad() { + assert!(WhatsAppChannel::new(wildcard()).verify_webhook("subscribe", "wrong", "c").await.is_err()); + } + #[tokio::test] async fn rate_limit() { + let mut c = wildcard(); c.rate_limit_per_minute = 2; + let ch = WhatsAppChannel::new(c); + assert!(ch.check_rate_limit("+1").await); + assert!(ch.check_rate_limit("+1").await); + assert!(!ch.check_rate_limit("+1").await); + } + #[tokio::test] async fn text_msg() { + let ch = WhatsAppChannel::new(wildcard()); + let (tx, mut rx) = mpsc::channel(10); + ch.process_webhook(json!({"entry":[{"changes":[{"value":{"messages":[{ + "from":"123","id":"m1","timestamp":"100","text":{"body":"hi"} + }]}}]}]}), &tx).await.unwrap(); + let m = rx.recv().await.unwrap(); + assert_eq!(m.content, "hi"); + assert_eq!(m.channel, "whatsapp"); + } +}