refactor(channel): replace hand-rolled IMAP with async-imap IDLE

Replace the blocking, poll-based IMAP client with async-imap and
IMAP IDLE (RFC 2177) for instant push delivery. Key changes:

- Add async-imap dependency with tokio runtime feature
- Rewrite connect/fetch/listen paths to fully async using tokio TLS
- Implement IDLE loop with exponential backoff reconnection (1s–60s cap)
- Add idle_timeout_secs config field (default 1740s per RFC 2177)
- Convert health_check to async connect-and-logout with 10s timeout
- Update affected tests from sync to #[tokio::test]

SMTP send path, allowlist enforcement, and Channel trait contract
are unchanged.
This commit is contained in:
Kieran 2026-02-18 10:44:18 +00:00 committed by Chummy
parent 08ea559c21
commit 5d9e8705ac
3 changed files with 474 additions and 236 deletions

View file

@ -9,20 +9,27 @@
#![allow(clippy::unnecessary_map_or)]
use anyhow::{anyhow, Result};
use async_imap::extensions::idle::IdleResponse;
use async_imap::types::Fetch;
use async_imap::Session;
use async_trait::async_trait;
use futures::TryStreamExt;
use lettre::message::SinglePart;
use lettre::transport::smtp::authentication::Credentials;
use lettre::{Message, SmtpTransport, Transport};
use mail_parser::{MessageParser, MimeHeaders};
use parking_lot::Mutex;
use rustls::{ClientConfig, RootCertStore};
use rustls_pki_types::DnsName;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::io::Write as IoWrite;
use std::net::TcpStream;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
use tokio::time::{interval, sleep};
use tracing::{error, info, warn};
use tokio::net::TcpStream;
use tokio::sync::{mpsc, Mutex};
use tokio::time::{sleep, timeout};
use tokio_rustls::client::TlsStream;
use tokio_rustls::TlsConnector;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use super::traits::{Channel, ChannelMessage, SendMessage};
@ -52,7 +59,11 @@ pub struct EmailConfig {
pub password: String,
/// From address for outgoing emails
pub from_address: String,
/// Poll interval in seconds (default: 60)
/// IDLE timeout in seconds before re-establishing connection (default: 1740 = 29 minutes)
/// RFC 2177 recommends clients restart IDLE every 29 minutes
#[serde(default = "default_idle_timeout")]
pub idle_timeout_secs: u64,
/// Fallback poll interval in seconds when IDLE is not supported (default: 60)
#[serde(default = "default_poll_interval")]
pub poll_interval_secs: u64,
/// Allowed sender addresses/domains (empty = deny all, ["*"] = allow all)
@ -69,6 +80,9 @@ fn default_smtp_port() -> u16 {
fn default_imap_folder() -> String {
"INBOX".into()
}
fn default_idle_timeout() -> u64 {
1740 // 29 minutes per RFC 2177
}
fn default_poll_interval() -> u64 {
60
}
@ -88,23 +102,26 @@ impl Default for EmailConfig {
username: String::new(),
password: String::new(),
from_address: String::new(),
idle_timeout_secs: default_idle_timeout(),
poll_interval_secs: default_poll_interval(),
allowed_senders: Vec::new(),
}
}
}
/// Email channel — IMAP polling for inbound, SMTP for outbound
type ImapSession = Session<TlsStream<TcpStream>>;
/// Email channel — IMAP IDLE for instant push notifications, SMTP for outbound
pub struct EmailChannel {
pub config: EmailConfig,
seen_messages: Mutex<HashSet<String>>,
seen_messages: Arc<Mutex<HashSet<String>>>,
}
impl EmailChannel {
pub fn new(config: EmailConfig) -> Self {
Self {
config,
seen_messages: Mutex::new(HashSet::new()),
seen_messages: Arc::new(Mutex::new(HashSet::new())),
}
}
@ -178,180 +195,267 @@ impl EmailChannel {
"(no readable content)".to_string()
}
fn build_imap_tls_config() -> Result<std::sync::Arc<tokio_rustls::rustls::ClientConfig>> {
use rustls::ClientConfig as TlsConfig;
use std::sync::Arc;
use tokio_rustls::rustls;
let mut root_store = rustls::RootCertStore::empty();
root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
let crypto_provider = rustls::crypto::CryptoProvider::get_default()
.cloned()
.unwrap_or_else(|| Arc::new(rustls::crypto::ring::default_provider()));
let tls_config = TlsConfig::builder_with_provider(crypto_provider)
.with_protocol_versions(rustls::DEFAULT_VERSIONS)?
.with_root_certificates(root_store)
.with_no_client_auth();
Ok(Arc::new(tls_config))
}
/// Fetch unseen emails via IMAP (blocking, run in spawn_blocking)
fn fetch_unseen_imap(config: &EmailConfig) -> Result<Vec<(String, String, String, u64)>> {
use rustls_pki_types::ServerName;
use tokio_rustls::rustls;
/// Connect to IMAP server with TLS and authenticate
async fn connect_imap(&self) -> Result<ImapSession> {
let addr = format!("{}:{}", self.config.imap_host, self.config.imap_port);
debug!("Connecting to IMAP server at {}", addr);
// Connect TCP
let tcp = TcpStream::connect((&*config.imap_host, config.imap_port))?;
tcp.set_read_timeout(Some(Duration::from_secs(30)))?;
let tcp = TcpStream::connect(&addr).await?;
// TLS
let tls_config = Self::build_imap_tls_config()?;
let server_name: ServerName<'_> = ServerName::try_from(config.imap_host.clone())?;
let conn = rustls::ClientConnection::new(tls_config, server_name)?;
let mut tls = rustls::StreamOwned::new(conn, tcp);
let read_line =
|tls: &mut rustls::StreamOwned<rustls::ClientConnection, TcpStream>| -> Result<String> {
let mut buf = Vec::new();
loop {
let mut byte = [0u8; 1];
match std::io::Read::read(tls, &mut byte) {
Ok(0) => return Err(anyhow!("IMAP connection closed")),
Ok(_) => {
buf.push(byte[0]);
if buf.ends_with(b"\r\n") {
return Ok(String::from_utf8_lossy(&buf).to_string());
}
}
Err(e) => return Err(e.into()),
}
}
};
let send_cmd = |tls: &mut rustls::StreamOwned<rustls::ClientConnection, TcpStream>,
tag: &str,
cmd: &str|
-> Result<Vec<String>> {
let full = format!("{} {}\r\n", tag, cmd);
IoWrite::write_all(tls, full.as_bytes())?;
IoWrite::flush(tls)?;
let mut lines = Vec::new();
loop {
let line = read_line(tls)?;
let done = line.starts_with(tag);
lines.push(line);
if done {
break;
}
}
Ok(lines)
// Establish TLS using native-tls
let certs = RootCertStore {
roots: webpki_roots::TLS_SERVER_ROOTS.into(),
};
let config = ClientConfig::builder()
.with_root_certificates(certs)
.with_no_client_auth();
let tls_stream: TlsConnector = Arc::new(config).into();
let sni: DnsName = self.config.imap_host.clone().try_into()?;
let stream = tls_stream.connect(sni.into(), tcp).await?;
// Read greeting
let _greeting = read_line(&mut tls)?;
// Create IMAP client
let client = async_imap::Client::new(stream);
// Login
let login_resp = send_cmd(
&mut tls,
"A1",
&format!("LOGIN \"{}\" \"{}\"", config.username, config.password),
)?;
if !login_resp.last().map_or(false, |l| l.contains("OK")) {
return Err(anyhow!("IMAP login failed"));
let session = client
.login(&self.config.username, &self.config.password)
.await
.map_err(|(e, _)| anyhow!("IMAP login failed: {}", e))?;
debug!("IMAP login successful");
Ok(session)
}
/// Fetch and process unseen messages from the selected mailbox
async fn fetch_unseen(&self, session: &mut ImapSession) -> Result<Vec<ParsedEmail>> {
// Search for unseen messages
let uids = session.uid_search("UNSEEN").await?;
if uids.is_empty() {
return Ok(Vec::new());
}
// Select folder
let _select = send_cmd(
&mut tls,
"A2",
&format!("SELECT \"{}\"", config.imap_folder),
)?;
debug!("Found {} unseen messages", uids.len());
// Search unseen
let search_resp = send_cmd(&mut tls, "A3", "SEARCH UNSEEN")?;
let mut uids: Vec<&str> = Vec::new();
for line in &search_resp {
if line.starts_with("* SEARCH") {
let parts: Vec<&str> = line.trim().split_whitespace().collect();
if parts.len() > 2 {
uids.extend_from_slice(&parts[2..]);
let mut results = Vec::new();
let uid_set: String = uids
.iter()
.map(|u| u.to_string())
.collect::<Vec<_>>()
.join(",");
// Fetch message bodies
let messages = session.uid_fetch(&uid_set, "RFC822").await?;
let messages: Vec<Fetch> = messages.try_collect().await?;
for msg in messages {
let uid = msg.uid.unwrap_or(0);
if let Some(body) = msg.body() {
if let Some(parsed) = MessageParser::default().parse(body) {
let sender = Self::extract_sender(&parsed);
let subject = parsed.subject().unwrap_or("(no subject)").to_string();
let body_text = Self::extract_text(&parsed);
let content = format!("Subject: {}\n\n{}", subject, body_text);
let msg_id = parsed
.message_id()
.map(|s| s.to_string())
.unwrap_or_else(|| format!("gen-{}", Uuid::new_v4()));
#[allow(clippy::cast_sign_loss)]
let ts = parsed
.date()
.map(|d| {
let naive = chrono::NaiveDate::from_ymd_opt(
d.year as i32,
u32::from(d.month),
u32::from(d.day),
)
.and_then(|date| {
date.and_hms_opt(
u32::from(d.hour),
u32::from(d.minute),
u32::from(d.second),
)
});
naive.map_or(0, |n| n.and_utc().timestamp() as u64)
})
.unwrap_or_else(|| {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
});
results.push(ParsedEmail {
_uid: uid,
msg_id,
sender,
content,
timestamp: ts,
});
}
}
}
let mut results = Vec::new();
let mut tag_counter = 4_u32; // Start after A1, A2, A3
for uid in &uids {
// Fetch RFC822 with unique tag
let fetch_tag = format!("A{}", tag_counter);
tag_counter += 1;
let fetch_resp = send_cmd(&mut tls, &fetch_tag, &format!("FETCH {} RFC822", uid))?;
// Reconstruct the raw email from the response (skip first and last lines)
let raw: String = fetch_resp
.iter()
.skip(1)
.take(fetch_resp.len().saturating_sub(2))
.cloned()
.collect();
if let Some(parsed) = MessageParser::default().parse(raw.as_bytes()) {
let sender = Self::extract_sender(&parsed);
let subject = parsed.subject().unwrap_or("(no subject)").to_string();
let body = Self::extract_text(&parsed);
let content = format!("Subject: {}\n\n{}", subject, body);
let msg_id = parsed
.message_id()
.map(|s| s.to_string())
.unwrap_or_else(|| format!("gen-{}", Uuid::new_v4()));
#[allow(clippy::cast_sign_loss)]
let ts = parsed
.date()
.map(|d| {
let naive = chrono::NaiveDate::from_ymd_opt(
d.year as i32,
u32::from(d.month),
u32::from(d.day),
)
.and_then(|date| {
date.and_hms_opt(
u32::from(d.hour),
u32::from(d.minute),
u32::from(d.second),
)
});
naive.map_or(0, |n| n.and_utc().timestamp() as u64)
})
.unwrap_or_else(|| {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
});
results.push((msg_id, sender, content, ts));
}
// Mark as seen with unique tag
let store_tag = format!("A{tag_counter}");
tag_counter += 1;
let _ = send_cmd(
&mut tls,
&store_tag,
&format!("STORE {uid} +FLAGS (\\Seen)"),
);
// Mark fetched messages as seen
if !results.is_empty() {
let _ = session
.uid_store(&uid_set, "+FLAGS (\\Seen)")
.await?
.try_collect::<Vec<_>>()
.await;
}
// Logout with unique tag
let logout_tag = format!("A{tag_counter}");
let _ = send_cmd(&mut tls, &logout_tag, "LOGOUT");
Ok(results)
}
/// Run the IDLE loop, returning when a new message arrives or timeout
/// Note: IDLE consumes the session and returns it via done()
async fn wait_for_changes(
&self,
session: ImapSession,
) -> Result<(IdleWaitResult, ImapSession)> {
let idle_timeout = Duration::from_secs(self.config.idle_timeout_secs);
// Start IDLE mode - this consumes the session
let mut idle = session.idle();
idle.init().await?;
debug!("Entering IMAP IDLE mode");
// wait() returns (future, stop_source) - we only need the future
let (wait_future, _stop_source) = idle.wait();
// Wait for server notification or timeout
let result = timeout(idle_timeout, wait_future).await;
match result {
Ok(Ok(response)) => {
debug!("IDLE response: {:?}", response);
// Done with IDLE, return session to normal mode
let session = idle.done().await?;
let wait_result = match response {
IdleResponse::NewData(_) => IdleWaitResult::NewMail,
IdleResponse::Timeout => IdleWaitResult::Timeout,
IdleResponse::ManualInterrupt => IdleWaitResult::Interrupted,
};
Ok((wait_result, session))
}
Ok(Err(e)) => {
// Try to clean up IDLE state
let _ = idle.done().await;
Err(anyhow!("IDLE error: {}", e))
}
Err(_) => {
// Timeout - RFC 2177 recommends restarting IDLE every 29 minutes
debug!("IDLE timeout reached, will re-establish");
let session = idle.done().await?;
Ok((IdleWaitResult::Timeout, session))
}
}
}
/// Main IDLE-based listen loop with automatic reconnection
async fn listen_with_idle(&self, tx: mpsc::Sender<ChannelMessage>) -> Result<()> {
let mut backoff = Duration::from_secs(1);
let max_backoff = Duration::from_secs(60);
loop {
match self.run_idle_session(&tx).await {
Ok(()) => {
// Clean exit (channel closed)
return Ok(());
}
Err(e) => {
error!(
"IMAP session error: {}. Reconnecting in {:?}...",
e, backoff
);
sleep(backoff).await;
// Exponential backoff with cap
backoff = std::cmp::min(backoff * 2, max_backoff);
}
}
}
}
/// Run a single IDLE session until error or clean shutdown
async fn run_idle_session(&self, tx: &mpsc::Sender<ChannelMessage>) -> Result<()> {
// Connect and authenticate
let mut session = self.connect_imap().await?;
// Select the mailbox
session.select(&self.config.imap_folder).await?;
info!(
"Email IDLE listening on {} (instant push enabled)",
self.config.imap_folder
);
// Check for existing unseen messages first
self.process_unseen(&mut session, tx).await?;
loop {
// Enter IDLE and wait for changes (consumes session, returns it via result)
match self.wait_for_changes(session).await {
Ok((IdleWaitResult::NewMail, returned_session)) => {
debug!("New mail notification received");
session = returned_session;
self.process_unseen(&mut session, tx).await?;
}
Ok((IdleWaitResult::Timeout, returned_session)) => {
// Re-check for mail after IDLE timeout (defensive)
session = returned_session;
self.process_unseen(&mut session, tx).await?;
}
Ok((IdleWaitResult::Interrupted, _)) => {
info!("IDLE interrupted, exiting");
return Ok(());
}
Err(e) => {
// Connection likely broken, need to reconnect
return Err(e);
}
}
}
}
/// Fetch unseen messages and send to channel
async fn process_unseen(
&self,
session: &mut ImapSession,
tx: &mpsc::Sender<ChannelMessage>,
) -> Result<()> {
let messages = self.fetch_unseen(session).await?;
let mut seen = self.seen_messages.lock().await;
for email in messages {
if seen.contains(&email.msg_id) {
continue;
}
// Check allowlist
if !self.is_sender_allowed(&email.sender) {
warn!("Blocked email from {}", email.sender);
continue;
}
seen.insert(email.msg_id.clone());
let msg = ChannelMessage {
id: email.msg_id,
reply_target: email.sender.clone(),
sender: email.sender,
content: email.content,
channel: "email".to_string(),
timestamp: email.timestamp,
};
if tx.send(msg).await.is_err() {
// Channel closed, exit cleanly
return Ok(());
}
}
Ok(())
}
fn create_smtp_transport(&self) -> Result<SmtpTransport> {
let creds = Credentials::new(self.config.username.clone(), self.config.password.clone());
let transport = if self.config.smtp_tls {
@ -369,6 +473,22 @@ impl EmailChannel {
}
}
/// Internal struct for parsed email data
struct ParsedEmail {
_uid: u32,
msg_id: String,
sender: String,
content: String,
timestamp: u64,
}
/// Result from waiting on IDLE
enum IdleWaitResult {
NewMail,
Timeout,
Interrupted,
}
#[async_trait]
impl Channel for EmailChannel {
fn name(&self) -> &str {
@ -403,62 +523,29 @@ impl Channel for EmailChannel {
async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> Result<()> {
info!(
"Email polling every {}s on {}",
self.config.poll_interval_secs, self.config.imap_folder
"Starting email channel with IDLE support on {}",
self.config.imap_folder
);
let mut tick = interval(Duration::from_secs(self.config.poll_interval_secs));
let config = self.config.clone();
loop {
tick.tick().await;
let cfg = config.clone();
match tokio::task::spawn_blocking(move || Self::fetch_unseen_imap(&cfg)).await {
Ok(Ok(messages)) => {
for (id, sender, content, ts) in messages {
{
let mut seen = self.seen_messages.lock();
if seen.contains(&id) {
continue;
}
if !self.is_sender_allowed(&sender) {
warn!("Blocked email from {}", sender);
continue;
}
seen.insert(id.clone());
} // MutexGuard dropped before await
let msg = ChannelMessage {
id,
reply_target: sender.clone(),
sender,
content,
channel: "email".to_string(),
timestamp: ts,
};
if tx.send(msg).await.is_err() {
return Ok(());
}
}
}
Ok(Err(e)) => {
error!("Email poll failed: {}", e);
sleep(Duration::from_secs(10)).await;
}
Err(e) => {
error!("Email poll task panicked: {}", e);
sleep(Duration::from_secs(10)).await;
}
}
}
self.listen_with_idle(tx).await
}
async fn health_check(&self) -> bool {
let cfg = self.config.clone();
tokio::task::spawn_blocking(move || {
let tcp = TcpStream::connect((&*cfg.imap_host, cfg.imap_port));
tcp.is_ok()
})
.await
.unwrap_or_default()
// Fully async health check - attempt IMAP connection
match timeout(Duration::from_secs(10), self.connect_imap()).await {
Ok(Ok(mut session)) => {
// Try to logout cleanly
let _ = session.logout().await;
true
}
Ok(Err(e)) => {
debug!("Health check failed: {}", e);
false
}
Err(_) => {
debug!("Health check timed out");
false
}
}
}
}
@ -479,23 +566,21 @@ mod tests {
}
#[test]
fn build_imap_tls_config_succeeds() {
let tls_config =
EmailChannel::build_imap_tls_config().expect("TLS config construction should succeed");
assert_eq!(std::sync::Arc::strong_count(&tls_config), 1);
fn default_idle_timeout_is_29_minutes() {
assert_eq!(default_idle_timeout(), 1740);
}
#[test]
fn seen_messages_starts_empty() {
#[tokio::test]
async fn seen_messages_starts_empty() {
let channel = EmailChannel::new(EmailConfig::default());
let seen = channel.seen_messages.lock();
let seen = channel.seen_messages.lock().await;
assert!(seen.is_empty());
}
#[test]
fn seen_messages_tracks_unique_ids() {
#[tokio::test]
async fn seen_messages_tracks_unique_ids() {
let channel = EmailChannel::new(EmailConfig::default());
let mut seen = channel.seen_messages.lock();
let mut seen = channel.seen_messages.lock().await;
assert!(seen.insert("first-id".to_string()));
assert!(!seen.insert("first-id".to_string()));
@ -517,6 +602,7 @@ mod tests {
assert_eq!(config.username, "");
assert_eq!(config.password, "");
assert_eq!(config.from_address, "");
assert_eq!(config.idle_timeout_secs, 1740);
assert_eq!(config.poll_interval_secs, 60);
assert!(config.allowed_senders.is_empty());
}
@ -533,12 +619,13 @@ mod tests {
username: "user@example.com".to_string(),
password: "pass123".to_string(),
from_address: "bot@example.com".to_string(),
idle_timeout_secs: 1200,
poll_interval_secs: 30,
allowed_senders: vec!["allowed@example.com".to_string()],
};
assert_eq!(config.imap_host, "imap.example.com");
assert_eq!(config.imap_folder, "Archive");
assert_eq!(config.poll_interval_secs, 30);
assert_eq!(config.idle_timeout_secs, 1200);
}
#[test]
@ -553,6 +640,7 @@ mod tests {
username: "user@test.com".to_string(),
password: "secret".to_string(),
from_address: "bot@test.com".to_string(),
idle_timeout_secs: 1740,
poll_interval_secs: 120,
allowed_senders: vec!["*".to_string()],
};
@ -564,13 +652,13 @@ mod tests {
// EmailChannel tests
#[test]
fn email_channel_new() {
#[tokio::test]
async fn email_channel_new() {
let config = EmailConfig::default();
let channel = EmailChannel::new(config.clone());
assert_eq!(channel.config.imap_host, config.imap_host);
let seen_guard = channel.seen_messages.lock();
let seen_guard = channel.seen_messages.lock().await;
assert_eq!(seen_guard.len(), 0);
}
@ -803,6 +891,7 @@ mod tests {
username: "user@example.com".to_string(),
password: "password123".to_string(),
from_address: "bot@example.com".to_string(),
idle_timeout_secs: 1740,
poll_interval_secs: 30,
allowed_senders: vec!["allowed@example.com".to_string()],
};
@ -829,6 +918,7 @@ mod tests {
assert_eq!(config.imap_port, 993); // default
assert_eq!(config.smtp_port, 465); // default
assert!(config.smtp_tls); // default
assert_eq!(config.idle_timeout_secs, 1740); // default
assert_eq!(config.poll_interval_secs, 60); // default
}