From 5d9e8705ac43265a38c2985b8e8468f20d840a70 Mon Sep 17 00:00:00 2001 From: Kieran Date: Wed, 18 Feb 2026 10:44:18 +0000 Subject: [PATCH] refactor(channel): replace hand-rolled IMAP with async-imap IDLE MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the blocking, poll-based IMAP client with async-imap and IMAP IDLE (RFC 2177) for instant push delivery. Key changes: - Add async-imap dependency with tokio runtime feature - Rewrite connect/fetch/listen paths to fully async using tokio TLS - Implement IDLE loop with exponential backoff reconnection (1s–60s cap) - Add idle_timeout_secs config field (default 1740s per RFC 2177) - Convert health_check to async connect-and-logout with 10s timeout - Update affected tests from sync to #[tokio::test] SMTP send path, allowlist enforcement, and Channel trait contract are unchanged. --- Cargo.lock | 151 +++++++++- Cargo.toml | 7 +- src/channels/email_channel.rs | 552 ++++++++++++++++++++-------------- 3 files changed, 474 insertions(+), 236 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0a7b284..97819fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -145,6 +145,64 @@ dependencies = [ "object 0.37.3", ] +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + +[[package]] +name = "async-channel" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-compression" +version = "0.4.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68650b7df54f0293fd061972a0fb05aaf4fc0879d3b3d21a638a182c5c543b9f" +dependencies = [ + "compression-codecs", + "compression-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "async-imap" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a78dceaba06f029d8f4d7df20addd4b7370a30206e3926267ecda2915b0f3f66" +dependencies = [ + "async-channel 2.5.0", + "async-compression", + "base64", + "bytes", + "chrono", + "futures", + "imap-proto", + "log", + "nom 7.1.3", + "pin-project", + "pin-utils", + "self_cell", + "stop-token", + "thiserror 1.0.69", + "tokio", +] + [[package]] name = "async-io" version = "2.6.0" @@ -602,6 +660,22 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "compression-codecs" +version = "0.4.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00828ba6fd27b45a448e57dbfe84f1029d4c9f26b368157e9a448a5f49a2ec2a" +dependencies = [ + "compression-core", + "flate2", +] + +[[package]] +name = "compression-core" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d" + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -1130,6 +1204,33 @@ dependencies = [ "num-traits", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener 5.4.1", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -1875,6 +1976,15 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "365a784774bb381e8c19edb91190a90d7f2625e057b55de2bc0f6b57bc779ff2" +[[package]] +name = "imap-proto" +version = "0.16.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba1f9b30846c3d04371159ef3a0413ce7c1ae0a8c619cd255c60b3d902553f22" +dependencies = [ + "nom 7.1.3", +] + [[package]] name = "indexmap" version = "2.13.0" @@ -2041,7 +2151,7 @@ dependencies = [ "httpdate", "idna", "mime", - "nom", + "nom 8.0.0", "percent-encoding", "quoted_printable", "rustls", @@ -2156,7 +2266,7 @@ dependencies = [ "itoa", "log", "md-5", - "nom", + "nom 8.0.0", "nom_locate", "rand 0.9.2", "rangemap", @@ -2260,6 +2370,12 @@ dependencies = [ "unicase", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -2350,6 +2466,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nom" version = "8.0.0" @@ -2367,7 +2493,7 @@ checksum = "0b577e2d69827c4740cba2b52efaad1c4cc7c73042860b199710b3575c68438d" dependencies = [ "bytecount", "memchr", - "nom", + "nom 8.0.0", ] [[package]] @@ -3390,6 +3516,12 @@ dependencies = [ "libc", ] +[[package]] +name = "self_cell" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89" + [[package]] name = "semver" version = "1.0.27" @@ -3662,6 +3794,18 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "stop-token" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af91f480ee899ab2d9f8435bfdfc14d08a5754bd9d3fef1f1a1c23336aad6c8b" +dependencies = [ + "async-channel 1.9.0", + "cfg-if", + "futures-core", + "pin-project-lite", +] + [[package]] name = "stringprep" version = "0.1.5" @@ -5176,6 +5320,7 @@ name = "zeroclaw" version = "0.1.0" dependencies = [ "anyhow", + "async-imap", "async-trait", "axum", "base64", diff --git a/Cargo.toml b/Cargo.toml index 6c22173..c1bbccc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,13 +99,16 @@ futures-util = { version = "0.3", default-features = false, features = ["sink"] futures = "0.3" regex = "1.10" hostname = "0.4.2" -lettre = { version = "0.11.19", default-features = false, features = ["builder", "smtp-transport", "rustls-tls"] } -mail-parser = "0.11.2" rustls = "0.23" rustls-pki-types = "1.14.0" tokio-rustls = "0.26.4" webpki-roots = "1.0.6" +# email +lettre = { version = "0.11.19", default-features = false, features = ["builder", "smtp-transport", "rustls-tls"] } +mail-parser = "0.11.2" +async-imap = { version = "0.11",features = ["runtime-tokio"], default-features = false } + # HTTP server (gateway) — replaces raw TCP for proper HTTP/1.1 compliance axum = { version = "0.8", default-features = false, features = ["http1", "json", "tokio", "query", "ws"] } tower = { version = "0.5", default-features = false } diff --git a/src/channels/email_channel.rs b/src/channels/email_channel.rs index 8d06370..d701c1f 100644 --- a/src/channels/email_channel.rs +++ b/src/channels/email_channel.rs @@ -9,20 +9,27 @@ #![allow(clippy::unnecessary_map_or)] use anyhow::{anyhow, Result}; +use async_imap::extensions::idle::IdleResponse; +use async_imap::types::Fetch; +use async_imap::Session; use async_trait::async_trait; +use futures::TryStreamExt; use lettre::message::SinglePart; use lettre::transport::smtp::authentication::Credentials; use lettre::{Message, SmtpTransport, Transport}; use mail_parser::{MessageParser, MimeHeaders}; -use parking_lot::Mutex; +use rustls::{ClientConfig, RootCertStore}; +use rustls_pki_types::DnsName; use serde::{Deserialize, Serialize}; use std::collections::HashSet; -use std::io::Write as IoWrite; -use std::net::TcpStream; +use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use tokio::sync::mpsc; -use tokio::time::{interval, sleep}; -use tracing::{error, info, warn}; +use tokio::net::TcpStream; +use tokio::sync::{mpsc, Mutex}; +use tokio::time::{sleep, timeout}; +use tokio_rustls::client::TlsStream; +use tokio_rustls::TlsConnector; +use tracing::{debug, error, info, warn}; use uuid::Uuid; use super::traits::{Channel, ChannelMessage, SendMessage}; @@ -52,7 +59,11 @@ pub struct EmailConfig { pub password: String, /// From address for outgoing emails pub from_address: String, - /// Poll interval in seconds (default: 60) + /// IDLE timeout in seconds before re-establishing connection (default: 1740 = 29 minutes) + /// RFC 2177 recommends clients restart IDLE every 29 minutes + #[serde(default = "default_idle_timeout")] + pub idle_timeout_secs: u64, + /// Fallback poll interval in seconds when IDLE is not supported (default: 60) #[serde(default = "default_poll_interval")] pub poll_interval_secs: u64, /// Allowed sender addresses/domains (empty = deny all, ["*"] = allow all) @@ -69,6 +80,9 @@ fn default_smtp_port() -> u16 { fn default_imap_folder() -> String { "INBOX".into() } +fn default_idle_timeout() -> u64 { + 1740 // 29 minutes per RFC 2177 +} fn default_poll_interval() -> u64 { 60 } @@ -88,23 +102,26 @@ impl Default for EmailConfig { username: String::new(), password: String::new(), from_address: String::new(), + idle_timeout_secs: default_idle_timeout(), poll_interval_secs: default_poll_interval(), allowed_senders: Vec::new(), } } } -/// Email channel — IMAP polling for inbound, SMTP for outbound +type ImapSession = Session>; + +/// Email channel — IMAP IDLE for instant push notifications, SMTP for outbound pub struct EmailChannel { pub config: EmailConfig, - seen_messages: Mutex>, + seen_messages: Arc>>, } impl EmailChannel { pub fn new(config: EmailConfig) -> Self { Self { config, - seen_messages: Mutex::new(HashSet::new()), + seen_messages: Arc::new(Mutex::new(HashSet::new())), } } @@ -178,180 +195,267 @@ impl EmailChannel { "(no readable content)".to_string() } - fn build_imap_tls_config() -> Result> { - use rustls::ClientConfig as TlsConfig; - use std::sync::Arc; - use tokio_rustls::rustls; - - let mut root_store = rustls::RootCertStore::empty(); - root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); - - let crypto_provider = rustls::crypto::CryptoProvider::get_default() - .cloned() - .unwrap_or_else(|| Arc::new(rustls::crypto::ring::default_provider())); - - let tls_config = TlsConfig::builder_with_provider(crypto_provider) - .with_protocol_versions(rustls::DEFAULT_VERSIONS)? - .with_root_certificates(root_store) - .with_no_client_auth(); - - Ok(Arc::new(tls_config)) - } - - /// Fetch unseen emails via IMAP (blocking, run in spawn_blocking) - fn fetch_unseen_imap(config: &EmailConfig) -> Result> { - use rustls_pki_types::ServerName; - use tokio_rustls::rustls; + /// Connect to IMAP server with TLS and authenticate + async fn connect_imap(&self) -> Result { + let addr = format!("{}:{}", self.config.imap_host, self.config.imap_port); + debug!("Connecting to IMAP server at {}", addr); // Connect TCP - let tcp = TcpStream::connect((&*config.imap_host, config.imap_port))?; - tcp.set_read_timeout(Some(Duration::from_secs(30)))?; + let tcp = TcpStream::connect(&addr).await?; - // TLS - let tls_config = Self::build_imap_tls_config()?; - let server_name: ServerName<'_> = ServerName::try_from(config.imap_host.clone())?; - let conn = rustls::ClientConnection::new(tls_config, server_name)?; - let mut tls = rustls::StreamOwned::new(conn, tcp); - - let read_line = - |tls: &mut rustls::StreamOwned| -> Result { - let mut buf = Vec::new(); - loop { - let mut byte = [0u8; 1]; - match std::io::Read::read(tls, &mut byte) { - Ok(0) => return Err(anyhow!("IMAP connection closed")), - Ok(_) => { - buf.push(byte[0]); - if buf.ends_with(b"\r\n") { - return Ok(String::from_utf8_lossy(&buf).to_string()); - } - } - Err(e) => return Err(e.into()), - } - } - }; - - let send_cmd = |tls: &mut rustls::StreamOwned, - tag: &str, - cmd: &str| - -> Result> { - let full = format!("{} {}\r\n", tag, cmd); - IoWrite::write_all(tls, full.as_bytes())?; - IoWrite::flush(tls)?; - let mut lines = Vec::new(); - loop { - let line = read_line(tls)?; - let done = line.starts_with(tag); - lines.push(line); - if done { - break; - } - } - Ok(lines) + // Establish TLS using native-tls + let certs = RootCertStore { + roots: webpki_roots::TLS_SERVER_ROOTS.into(), }; + let config = ClientConfig::builder() + .with_root_certificates(certs) + .with_no_client_auth(); + let tls_stream: TlsConnector = Arc::new(config).into(); + let sni: DnsName = self.config.imap_host.clone().try_into()?; + let stream = tls_stream.connect(sni.into(), tcp).await?; - // Read greeting - let _greeting = read_line(&mut tls)?; + // Create IMAP client + let client = async_imap::Client::new(stream); // Login - let login_resp = send_cmd( - &mut tls, - "A1", - &format!("LOGIN \"{}\" \"{}\"", config.username, config.password), - )?; - if !login_resp.last().map_or(false, |l| l.contains("OK")) { - return Err(anyhow!("IMAP login failed")); + let session = client + .login(&self.config.username, &self.config.password) + .await + .map_err(|(e, _)| anyhow!("IMAP login failed: {}", e))?; + + debug!("IMAP login successful"); + Ok(session) + } + + /// Fetch and process unseen messages from the selected mailbox + async fn fetch_unseen(&self, session: &mut ImapSession) -> Result> { + // Search for unseen messages + let uids = session.uid_search("UNSEEN").await?; + if uids.is_empty() { + return Ok(Vec::new()); } - // Select folder - let _select = send_cmd( - &mut tls, - "A2", - &format!("SELECT \"{}\"", config.imap_folder), - )?; + debug!("Found {} unseen messages", uids.len()); - // Search unseen - let search_resp = send_cmd(&mut tls, "A3", "SEARCH UNSEEN")?; - let mut uids: Vec<&str> = Vec::new(); - for line in &search_resp { - if line.starts_with("* SEARCH") { - let parts: Vec<&str> = line.trim().split_whitespace().collect(); - if parts.len() > 2 { - uids.extend_from_slice(&parts[2..]); + let mut results = Vec::new(); + let uid_set: String = uids + .iter() + .map(|u| u.to_string()) + .collect::>() + .join(","); + + // Fetch message bodies + let messages = session.uid_fetch(&uid_set, "RFC822").await?; + let messages: Vec = messages.try_collect().await?; + + for msg in messages { + let uid = msg.uid.unwrap_or(0); + if let Some(body) = msg.body() { + if let Some(parsed) = MessageParser::default().parse(body) { + let sender = Self::extract_sender(&parsed); + let subject = parsed.subject().unwrap_or("(no subject)").to_string(); + let body_text = Self::extract_text(&parsed); + let content = format!("Subject: {}\n\n{}", subject, body_text); + let msg_id = parsed + .message_id() + .map(|s| s.to_string()) + .unwrap_or_else(|| format!("gen-{}", Uuid::new_v4())); + + #[allow(clippy::cast_sign_loss)] + let ts = parsed + .date() + .map(|d| { + let naive = chrono::NaiveDate::from_ymd_opt( + d.year as i32, + u32::from(d.month), + u32::from(d.day), + ) + .and_then(|date| { + date.and_hms_opt( + u32::from(d.hour), + u32::from(d.minute), + u32::from(d.second), + ) + }); + naive.map_or(0, |n| n.and_utc().timestamp() as u64) + }) + .unwrap_or_else(|| { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0) + }); + + results.push(ParsedEmail { + _uid: uid, + msg_id, + sender, + content, + timestamp: ts, + }); } } } - let mut results = Vec::new(); - let mut tag_counter = 4_u32; // Start after A1, A2, A3 - - for uid in &uids { - // Fetch RFC822 with unique tag - let fetch_tag = format!("A{}", tag_counter); - tag_counter += 1; - let fetch_resp = send_cmd(&mut tls, &fetch_tag, &format!("FETCH {} RFC822", uid))?; - // Reconstruct the raw email from the response (skip first and last lines) - let raw: String = fetch_resp - .iter() - .skip(1) - .take(fetch_resp.len().saturating_sub(2)) - .cloned() - .collect(); - - if let Some(parsed) = MessageParser::default().parse(raw.as_bytes()) { - let sender = Self::extract_sender(&parsed); - let subject = parsed.subject().unwrap_or("(no subject)").to_string(); - let body = Self::extract_text(&parsed); - let content = format!("Subject: {}\n\n{}", subject, body); - let msg_id = parsed - .message_id() - .map(|s| s.to_string()) - .unwrap_or_else(|| format!("gen-{}", Uuid::new_v4())); - #[allow(clippy::cast_sign_loss)] - let ts = parsed - .date() - .map(|d| { - let naive = chrono::NaiveDate::from_ymd_opt( - d.year as i32, - u32::from(d.month), - u32::from(d.day), - ) - .and_then(|date| { - date.and_hms_opt( - u32::from(d.hour), - u32::from(d.minute), - u32::from(d.second), - ) - }); - naive.map_or(0, |n| n.and_utc().timestamp() as u64) - }) - .unwrap_or_else(|| { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_secs()) - .unwrap_or(0) - }); - - results.push((msg_id, sender, content, ts)); - } - - // Mark as seen with unique tag - let store_tag = format!("A{tag_counter}"); - tag_counter += 1; - let _ = send_cmd( - &mut tls, - &store_tag, - &format!("STORE {uid} +FLAGS (\\Seen)"), - ); + // Mark fetched messages as seen + if !results.is_empty() { + let _ = session + .uid_store(&uid_set, "+FLAGS (\\Seen)") + .await? + .try_collect::>() + .await; } - // Logout with unique tag - let logout_tag = format!("A{tag_counter}"); - let _ = send_cmd(&mut tls, &logout_tag, "LOGOUT"); - Ok(results) } + /// Run the IDLE loop, returning when a new message arrives or timeout + /// Note: IDLE consumes the session and returns it via done() + async fn wait_for_changes( + &self, + session: ImapSession, + ) -> Result<(IdleWaitResult, ImapSession)> { + let idle_timeout = Duration::from_secs(self.config.idle_timeout_secs); + + // Start IDLE mode - this consumes the session + let mut idle = session.idle(); + idle.init().await?; + + debug!("Entering IMAP IDLE mode"); + + // wait() returns (future, stop_source) - we only need the future + let (wait_future, _stop_source) = idle.wait(); + + // Wait for server notification or timeout + let result = timeout(idle_timeout, wait_future).await; + + match result { + Ok(Ok(response)) => { + debug!("IDLE response: {:?}", response); + // Done with IDLE, return session to normal mode + let session = idle.done().await?; + let wait_result = match response { + IdleResponse::NewData(_) => IdleWaitResult::NewMail, + IdleResponse::Timeout => IdleWaitResult::Timeout, + IdleResponse::ManualInterrupt => IdleWaitResult::Interrupted, + }; + Ok((wait_result, session)) + } + Ok(Err(e)) => { + // Try to clean up IDLE state + let _ = idle.done().await; + Err(anyhow!("IDLE error: {}", e)) + } + Err(_) => { + // Timeout - RFC 2177 recommends restarting IDLE every 29 minutes + debug!("IDLE timeout reached, will re-establish"); + let session = idle.done().await?; + Ok((IdleWaitResult::Timeout, session)) + } + } + } + + /// Main IDLE-based listen loop with automatic reconnection + async fn listen_with_idle(&self, tx: mpsc::Sender) -> Result<()> { + let mut backoff = Duration::from_secs(1); + let max_backoff = Duration::from_secs(60); + + loop { + match self.run_idle_session(&tx).await { + Ok(()) => { + // Clean exit (channel closed) + return Ok(()); + } + Err(e) => { + error!( + "IMAP session error: {}. Reconnecting in {:?}...", + e, backoff + ); + sleep(backoff).await; + // Exponential backoff with cap + backoff = std::cmp::min(backoff * 2, max_backoff); + } + } + } + } + + /// Run a single IDLE session until error or clean shutdown + async fn run_idle_session(&self, tx: &mpsc::Sender) -> Result<()> { + // Connect and authenticate + let mut session = self.connect_imap().await?; + + // Select the mailbox + session.select(&self.config.imap_folder).await?; + info!( + "Email IDLE listening on {} (instant push enabled)", + self.config.imap_folder + ); + + // Check for existing unseen messages first + self.process_unseen(&mut session, tx).await?; + + loop { + // Enter IDLE and wait for changes (consumes session, returns it via result) + match self.wait_for_changes(session).await { + Ok((IdleWaitResult::NewMail, returned_session)) => { + debug!("New mail notification received"); + session = returned_session; + self.process_unseen(&mut session, tx).await?; + } + Ok((IdleWaitResult::Timeout, returned_session)) => { + // Re-check for mail after IDLE timeout (defensive) + session = returned_session; + self.process_unseen(&mut session, tx).await?; + } + Ok((IdleWaitResult::Interrupted, _)) => { + info!("IDLE interrupted, exiting"); + return Ok(()); + } + Err(e) => { + // Connection likely broken, need to reconnect + return Err(e); + } + } + } + } + + /// Fetch unseen messages and send to channel + async fn process_unseen( + &self, + session: &mut ImapSession, + tx: &mpsc::Sender, + ) -> Result<()> { + let messages = self.fetch_unseen(session).await?; + + let mut seen = self.seen_messages.lock().await; + for email in messages { + if seen.contains(&email.msg_id) { + continue; + } + // Check allowlist + if !self.is_sender_allowed(&email.sender) { + warn!("Blocked email from {}", email.sender); + continue; + } + seen.insert(email.msg_id.clone()); + let msg = ChannelMessage { + id: email.msg_id, + reply_target: email.sender.clone(), + sender: email.sender, + content: email.content, + channel: "email".to_string(), + timestamp: email.timestamp, + }; + + if tx.send(msg).await.is_err() { + // Channel closed, exit cleanly + return Ok(()); + } + } + + Ok(()) + } + fn create_smtp_transport(&self) -> Result { let creds = Credentials::new(self.config.username.clone(), self.config.password.clone()); let transport = if self.config.smtp_tls { @@ -369,6 +473,22 @@ impl EmailChannel { } } +/// Internal struct for parsed email data +struct ParsedEmail { + _uid: u32, + msg_id: String, + sender: String, + content: String, + timestamp: u64, +} + +/// Result from waiting on IDLE +enum IdleWaitResult { + NewMail, + Timeout, + Interrupted, +} + #[async_trait] impl Channel for EmailChannel { fn name(&self) -> &str { @@ -403,62 +523,29 @@ impl Channel for EmailChannel { async fn listen(&self, tx: mpsc::Sender) -> Result<()> { info!( - "Email polling every {}s on {}", - self.config.poll_interval_secs, self.config.imap_folder + "Starting email channel with IDLE support on {}", + self.config.imap_folder ); - let mut tick = interval(Duration::from_secs(self.config.poll_interval_secs)); - let config = self.config.clone(); - - loop { - tick.tick().await; - let cfg = config.clone(); - match tokio::task::spawn_blocking(move || Self::fetch_unseen_imap(&cfg)).await { - Ok(Ok(messages)) => { - for (id, sender, content, ts) in messages { - { - let mut seen = self.seen_messages.lock(); - if seen.contains(&id) { - continue; - } - if !self.is_sender_allowed(&sender) { - warn!("Blocked email from {}", sender); - continue; - } - seen.insert(id.clone()); - } // MutexGuard dropped before await - let msg = ChannelMessage { - id, - reply_target: sender.clone(), - sender, - content, - channel: "email".to_string(), - timestamp: ts, - }; - if tx.send(msg).await.is_err() { - return Ok(()); - } - } - } - Ok(Err(e)) => { - error!("Email poll failed: {}", e); - sleep(Duration::from_secs(10)).await; - } - Err(e) => { - error!("Email poll task panicked: {}", e); - sleep(Duration::from_secs(10)).await; - } - } - } + self.listen_with_idle(tx).await } async fn health_check(&self) -> bool { - let cfg = self.config.clone(); - tokio::task::spawn_blocking(move || { - let tcp = TcpStream::connect((&*cfg.imap_host, cfg.imap_port)); - tcp.is_ok() - }) - .await - .unwrap_or_default() + // Fully async health check - attempt IMAP connection + match timeout(Duration::from_secs(10), self.connect_imap()).await { + Ok(Ok(mut session)) => { + // Try to logout cleanly + let _ = session.logout().await; + true + } + Ok(Err(e)) => { + debug!("Health check failed: {}", e); + false + } + Err(_) => { + debug!("Health check timed out"); + false + } + } } } @@ -479,23 +566,21 @@ mod tests { } #[test] - fn build_imap_tls_config_succeeds() { - let tls_config = - EmailChannel::build_imap_tls_config().expect("TLS config construction should succeed"); - assert_eq!(std::sync::Arc::strong_count(&tls_config), 1); + fn default_idle_timeout_is_29_minutes() { + assert_eq!(default_idle_timeout(), 1740); } - #[test] - fn seen_messages_starts_empty() { + #[tokio::test] + async fn seen_messages_starts_empty() { let channel = EmailChannel::new(EmailConfig::default()); - let seen = channel.seen_messages.lock(); + let seen = channel.seen_messages.lock().await; assert!(seen.is_empty()); } - #[test] - fn seen_messages_tracks_unique_ids() { + #[tokio::test] + async fn seen_messages_tracks_unique_ids() { let channel = EmailChannel::new(EmailConfig::default()); - let mut seen = channel.seen_messages.lock(); + let mut seen = channel.seen_messages.lock().await; assert!(seen.insert("first-id".to_string())); assert!(!seen.insert("first-id".to_string())); @@ -517,6 +602,7 @@ mod tests { assert_eq!(config.username, ""); assert_eq!(config.password, ""); assert_eq!(config.from_address, ""); + assert_eq!(config.idle_timeout_secs, 1740); assert_eq!(config.poll_interval_secs, 60); assert!(config.allowed_senders.is_empty()); } @@ -533,12 +619,13 @@ mod tests { username: "user@example.com".to_string(), password: "pass123".to_string(), from_address: "bot@example.com".to_string(), + idle_timeout_secs: 1200, poll_interval_secs: 30, allowed_senders: vec!["allowed@example.com".to_string()], }; assert_eq!(config.imap_host, "imap.example.com"); assert_eq!(config.imap_folder, "Archive"); - assert_eq!(config.poll_interval_secs, 30); + assert_eq!(config.idle_timeout_secs, 1200); } #[test] @@ -553,6 +640,7 @@ mod tests { username: "user@test.com".to_string(), password: "secret".to_string(), from_address: "bot@test.com".to_string(), + idle_timeout_secs: 1740, poll_interval_secs: 120, allowed_senders: vec!["*".to_string()], }; @@ -564,13 +652,13 @@ mod tests { // EmailChannel tests - #[test] - fn email_channel_new() { + #[tokio::test] + async fn email_channel_new() { let config = EmailConfig::default(); let channel = EmailChannel::new(config.clone()); assert_eq!(channel.config.imap_host, config.imap_host); - let seen_guard = channel.seen_messages.lock(); + let seen_guard = channel.seen_messages.lock().await; assert_eq!(seen_guard.len(), 0); } @@ -803,6 +891,7 @@ mod tests { username: "user@example.com".to_string(), password: "password123".to_string(), from_address: "bot@example.com".to_string(), + idle_timeout_secs: 1740, poll_interval_secs: 30, allowed_senders: vec!["allowed@example.com".to_string()], }; @@ -829,6 +918,7 @@ mod tests { assert_eq!(config.imap_port, 993); // default assert_eq!(config.smtp_port, 465); // default assert!(config.smtp_tls); // default + assert_eq!(config.idle_timeout_secs, 1740); // default assert_eq!(config.poll_interval_secs, 60); // default }