Merge pull request #368 from fettpl/fix/349-email-bounded-seen-set
fix(channels): bound email seen_messages set to prevent memory leak
This commit is contained in:
commit
a91516df7a
1 changed files with 107 additions and 4 deletions
|
|
@ -14,11 +14,14 @@ use lettre::transport::smtp::authentication::Credentials;
|
||||||
use lettre::{Message, SmtpTransport, Transport};
|
use lettre::{Message, SmtpTransport, Transport};
|
||||||
use mail_parser::{MessageParser, MimeHeaders};
|
use mail_parser::{MessageParser, MimeHeaders};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::HashSet;
|
use std::collections::{HashSet, VecDeque};
|
||||||
use std::io::Write as IoWrite;
|
use std::io::Write as IoWrite;
|
||||||
use std::net::TcpStream;
|
use std::net::TcpStream;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
|
/// Maximum number of seen message IDs to retain before evicting the oldest.
|
||||||
|
const SEEN_MESSAGES_CAPACITY: usize = 100_000;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::time::{interval, sleep};
|
use tokio::time::{interval, sleep};
|
||||||
use tracing::{error, info, warn};
|
use tracing::{error, info, warn};
|
||||||
|
|
@ -93,17 +96,56 @@ impl Default for EmailConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Bounded dedup set that evicts oldest entries when capacity is reached.
|
||||||
|
struct BoundedSeenSet {
|
||||||
|
set: HashSet<String>,
|
||||||
|
order: VecDeque<String>,
|
||||||
|
capacity: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BoundedSeenSet {
|
||||||
|
fn new(capacity: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
set: HashSet::with_capacity(capacity.min(1024)),
|
||||||
|
order: VecDeque::with_capacity(capacity.min(1024)),
|
||||||
|
capacity,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn contains(&self, id: &str) -> bool {
|
||||||
|
self.set.contains(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&mut self, id: String) -> bool {
|
||||||
|
if self.set.contains(&id) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if self.order.len() >= self.capacity {
|
||||||
|
if let Some(oldest) = self.order.pop_front() {
|
||||||
|
self.set.remove(&oldest);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.order.push_back(id.clone());
|
||||||
|
self.set.insert(id);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
fn len(&self) -> usize {
|
||||||
|
self.set.len()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Email channel — IMAP polling for inbound, SMTP for outbound
|
/// Email channel — IMAP polling for inbound, SMTP for outbound
|
||||||
pub struct EmailChannel {
|
pub struct EmailChannel {
|
||||||
pub config: EmailConfig,
|
pub config: EmailConfig,
|
||||||
seen_messages: Mutex<HashSet<String>>,
|
seen_messages: Mutex<BoundedSeenSet>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EmailChannel {
|
impl EmailChannel {
|
||||||
pub fn new(config: EmailConfig) -> Self {
|
pub fn new(config: EmailConfig) -> Self {
|
||||||
Self {
|
Self {
|
||||||
config,
|
config,
|
||||||
seen_messages: Mutex::new(HashSet::new()),
|
seen_messages: Mutex::new(BoundedSeenSet::new(SEEN_MESSAGES_CAPACITY)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -459,7 +501,7 @@ impl Channel for EmailChannel {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::EmailChannel;
|
use super::{BoundedSeenSet, EmailChannel};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn build_imap_tls_config_succeeds() {
|
fn build_imap_tls_config_succeeds() {
|
||||||
|
|
@ -467,4 +509,65 @@ mod tests {
|
||||||
EmailChannel::build_imap_tls_config().expect("TLS config construction should succeed");
|
EmailChannel::build_imap_tls_config().expect("TLS config construction should succeed");
|
||||||
assert_eq!(std::sync::Arc::strong_count(&tls_config), 1);
|
assert_eq!(std::sync::Arc::strong_count(&tls_config), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn bounded_seen_set_insert_and_contains() {
|
||||||
|
let mut set = BoundedSeenSet::new(10);
|
||||||
|
assert!(set.insert("a".into()));
|
||||||
|
assert!(set.contains("a"));
|
||||||
|
assert!(!set.contains("b"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn bounded_seen_set_rejects_duplicates() {
|
||||||
|
let mut set = BoundedSeenSet::new(10);
|
||||||
|
assert!(set.insert("a".into()));
|
||||||
|
assert!(!set.insert("a".into()));
|
||||||
|
assert_eq!(set.len(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn bounded_seen_set_evicts_oldest_at_capacity() {
|
||||||
|
let mut set = BoundedSeenSet::new(3);
|
||||||
|
set.insert("a".into());
|
||||||
|
set.insert("b".into());
|
||||||
|
set.insert("c".into());
|
||||||
|
assert_eq!(set.len(), 3);
|
||||||
|
|
||||||
|
// Inserting a 4th should evict "a"
|
||||||
|
set.insert("d".into());
|
||||||
|
assert_eq!(set.len(), 3);
|
||||||
|
assert!(!set.contains("a"), "oldest entry should be evicted");
|
||||||
|
assert!(set.contains("b"));
|
||||||
|
assert!(set.contains("c"));
|
||||||
|
assert!(set.contains("d"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn bounded_seen_set_evicts_in_fifo_order() {
|
||||||
|
let mut set = BoundedSeenSet::new(2);
|
||||||
|
set.insert("first".into());
|
||||||
|
set.insert("second".into());
|
||||||
|
set.insert("third".into());
|
||||||
|
assert!(!set.contains("first"));
|
||||||
|
assert!(set.contains("second"));
|
||||||
|
assert!(set.contains("third"));
|
||||||
|
|
||||||
|
set.insert("fourth".into());
|
||||||
|
assert!(!set.contains("second"));
|
||||||
|
assert!(set.contains("third"));
|
||||||
|
assert!(set.contains("fourth"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn bounded_seen_set_capacity_one() {
|
||||||
|
let mut set = BoundedSeenSet::new(1);
|
||||||
|
set.insert("a".into());
|
||||||
|
assert!(set.contains("a"));
|
||||||
|
|
||||||
|
set.insert("b".into());
|
||||||
|
assert!(!set.contains("a"));
|
||||||
|
assert!(set.contains("b"));
|
||||||
|
assert_eq!(set.len(), 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue