From 7db71de043500f3d42a8eb28ebebd5cfc2a91aa2 Mon Sep 17 00:00:00 2001 From: fettpl <38704082+fettpl@users.noreply.github.com> Date: Mon, 16 Feb 2026 16:53:42 +0100 Subject: [PATCH] fix(channels): bound email seen_messages set to prevent memory leak Replace unbounded HashSet with a BoundedSeenSet that evicts the oldest message IDs (FIFO) when the 100k capacity is reached. This prevents memory growth proportional to email volume over the process lifetime, capping the set at ~100k entries regardless of runtime. Closes #349 Co-Authored-By: Claude Opus 4.6 --- src/channels/email_channel.rs | 111 ++++++++++++++++++++++++++++++++-- 1 file changed, 107 insertions(+), 4 deletions(-) diff --git a/src/channels/email_channel.rs b/src/channels/email_channel.rs index e7c54a8..4fcfd71 100644 --- a/src/channels/email_channel.rs +++ b/src/channels/email_channel.rs @@ -14,11 +14,14 @@ use lettre::transport::smtp::authentication::Credentials; use lettre::{Message, SmtpTransport, Transport}; use mail_parser::{MessageParser, MimeHeaders}; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; +use std::collections::{HashSet, VecDeque}; use std::io::Write as IoWrite; use std::net::TcpStream; use std::sync::Mutex; use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +/// Maximum number of seen message IDs to retain before evicting the oldest. +const SEEN_MESSAGES_CAPACITY: usize = 100_000; use tokio::sync::mpsc; use tokio::time::{interval, sleep}; use tracing::{error, info, warn}; @@ -93,17 +96,56 @@ impl Default for EmailConfig { } } +/// Bounded dedup set that evicts oldest entries when capacity is reached. +struct BoundedSeenSet { + set: HashSet, + order: VecDeque, + capacity: usize, +} + +impl BoundedSeenSet { + fn new(capacity: usize) -> Self { + Self { + set: HashSet::with_capacity(capacity.min(1024)), + order: VecDeque::with_capacity(capacity.min(1024)), + capacity, + } + } + + fn contains(&self, id: &str) -> bool { + self.set.contains(id) + } + + fn insert(&mut self, id: String) -> bool { + if self.set.contains(&id) { + return false; + } + if self.order.len() >= self.capacity { + if let Some(oldest) = self.order.pop_front() { + self.set.remove(&oldest); + } + } + self.order.push_back(id.clone()); + self.set.insert(id); + true + } + + fn len(&self) -> usize { + self.set.len() + } +} + /// Email channel — IMAP polling for inbound, SMTP for outbound pub struct EmailChannel { pub config: EmailConfig, - seen_messages: Mutex>, + seen_messages: Mutex, } impl EmailChannel { pub fn new(config: EmailConfig) -> Self { Self { config, - seen_messages: Mutex::new(HashSet::new()), + seen_messages: Mutex::new(BoundedSeenSet::new(SEEN_MESSAGES_CAPACITY)), } } @@ -459,7 +501,7 @@ impl Channel for EmailChannel { #[cfg(test)] mod tests { - use super::EmailChannel; + use super::{BoundedSeenSet, EmailChannel}; #[test] fn build_imap_tls_config_succeeds() { @@ -467,4 +509,65 @@ mod tests { EmailChannel::build_imap_tls_config().expect("TLS config construction should succeed"); assert_eq!(std::sync::Arc::strong_count(&tls_config), 1); } + + #[test] + fn bounded_seen_set_insert_and_contains() { + let mut set = BoundedSeenSet::new(10); + assert!(set.insert("a".into())); + assert!(set.contains("a")); + assert!(!set.contains("b")); + } + + #[test] + fn bounded_seen_set_rejects_duplicates() { + let mut set = BoundedSeenSet::new(10); + assert!(set.insert("a".into())); + assert!(!set.insert("a".into())); + assert_eq!(set.len(), 1); + } + + #[test] + fn bounded_seen_set_evicts_oldest_at_capacity() { + let mut set = BoundedSeenSet::new(3); + set.insert("a".into()); + set.insert("b".into()); + set.insert("c".into()); + assert_eq!(set.len(), 3); + + // Inserting a 4th should evict "a" + set.insert("d".into()); + assert_eq!(set.len(), 3); + assert!(!set.contains("a"), "oldest entry should be evicted"); + assert!(set.contains("b")); + assert!(set.contains("c")); + assert!(set.contains("d")); + } + + #[test] + fn bounded_seen_set_evicts_in_fifo_order() { + let mut set = BoundedSeenSet::new(2); + set.insert("first".into()); + set.insert("second".into()); + set.insert("third".into()); + assert!(!set.contains("first")); + assert!(set.contains("second")); + assert!(set.contains("third")); + + set.insert("fourth".into()); + assert!(!set.contains("second")); + assert!(set.contains("third")); + assert!(set.contains("fourth")); + } + + #[test] + fn bounded_seen_set_capacity_one() { + let mut set = BoundedSeenSet::new(1); + set.insert("a".into()); + assert!(set.contains("a")); + + set.insert("b".into()); + assert!(!set.contains("a")); + assert!(set.contains("b")); + assert_eq!(set.len(), 1); + } }