fix(channels): bound email seen_messages set to prevent memory leak

Replace unbounded HashSet<String> with a BoundedSeenSet that evicts
the oldest message IDs (FIFO) when the 100k capacity is reached. This
prevents memory growth proportional to email volume over the process
lifetime, capping the set at ~100k entries regardless of runtime.

Closes #349

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
fettpl 2026-02-16 16:53:42 +01:00 committed by Chummy
parent 7f4c688145
commit 7db71de043

View file

@ -14,11 +14,14 @@ use lettre::transport::smtp::authentication::Credentials;
use lettre::{Message, SmtpTransport, Transport};
use mail_parser::{MessageParser, MimeHeaders};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::collections::{HashSet, VecDeque};
use std::io::Write as IoWrite;
use std::net::TcpStream;
use std::sync::Mutex;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
/// Maximum number of seen message IDs to retain before evicting the oldest.
const SEEN_MESSAGES_CAPACITY: usize = 100_000;
use tokio::sync::mpsc;
use tokio::time::{interval, sleep};
use tracing::{error, info, warn};
@ -93,17 +96,56 @@ impl Default for EmailConfig {
}
}
/// Bounded dedup set that evicts oldest entries when capacity is reached.
struct BoundedSeenSet {
set: HashSet<String>,
order: VecDeque<String>,
capacity: usize,
}
impl BoundedSeenSet {
fn new(capacity: usize) -> Self {
Self {
set: HashSet::with_capacity(capacity.min(1024)),
order: VecDeque::with_capacity(capacity.min(1024)),
capacity,
}
}
fn contains(&self, id: &str) -> bool {
self.set.contains(id)
}
fn insert(&mut self, id: String) -> bool {
if self.set.contains(&id) {
return false;
}
if self.order.len() >= self.capacity {
if let Some(oldest) = self.order.pop_front() {
self.set.remove(&oldest);
}
}
self.order.push_back(id.clone());
self.set.insert(id);
true
}
fn len(&self) -> usize {
self.set.len()
}
}
/// Email channel — IMAP polling for inbound, SMTP for outbound
pub struct EmailChannel {
pub config: EmailConfig,
seen_messages: Mutex<HashSet<String>>,
seen_messages: Mutex<BoundedSeenSet>,
}
impl EmailChannel {
pub fn new(config: EmailConfig) -> Self {
Self {
config,
seen_messages: Mutex::new(HashSet::new()),
seen_messages: Mutex::new(BoundedSeenSet::new(SEEN_MESSAGES_CAPACITY)),
}
}
@ -459,7 +501,7 @@ impl Channel for EmailChannel {
#[cfg(test)]
mod tests {
use super::EmailChannel;
use super::{BoundedSeenSet, EmailChannel};
#[test]
fn build_imap_tls_config_succeeds() {
@ -467,4 +509,65 @@ mod tests {
EmailChannel::build_imap_tls_config().expect("TLS config construction should succeed");
assert_eq!(std::sync::Arc::strong_count(&tls_config), 1);
}
#[test]
fn bounded_seen_set_insert_and_contains() {
let mut set = BoundedSeenSet::new(10);
assert!(set.insert("a".into()));
assert!(set.contains("a"));
assert!(!set.contains("b"));
}
#[test]
fn bounded_seen_set_rejects_duplicates() {
let mut set = BoundedSeenSet::new(10);
assert!(set.insert("a".into()));
assert!(!set.insert("a".into()));
assert_eq!(set.len(), 1);
}
#[test]
fn bounded_seen_set_evicts_oldest_at_capacity() {
let mut set = BoundedSeenSet::new(3);
set.insert("a".into());
set.insert("b".into());
set.insert("c".into());
assert_eq!(set.len(), 3);
// Inserting a 4th should evict "a"
set.insert("d".into());
assert_eq!(set.len(), 3);
assert!(!set.contains("a"), "oldest entry should be evicted");
assert!(set.contains("b"));
assert!(set.contains("c"));
assert!(set.contains("d"));
}
#[test]
fn bounded_seen_set_evicts_in_fifo_order() {
let mut set = BoundedSeenSet::new(2);
set.insert("first".into());
set.insert("second".into());
set.insert("third".into());
assert!(!set.contains("first"));
assert!(set.contains("second"));
assert!(set.contains("third"));
set.insert("fourth".into());
assert!(!set.contains("second"));
assert!(set.contains("third"));
assert!(set.contains("fourth"));
}
#[test]
fn bounded_seen_set_capacity_one() {
let mut set = BoundedSeenSet::new(1);
set.insert("a".into());
assert!(set.contains("a"));
set.insert("b".into());
assert!(!set.contains("a"));
assert!(set.contains("b"));
assert_eq!(set.len(), 1);
}
}