fix(gateway): add periodic sweep to SlidingWindowRateLimiter
Add a sweep mechanism that removes stale IP entries from the rate limiter's HashMap every 5 minutes. Previously, IPs that made a single request and never returned would accumulate indefinitely, causing unbounded memory growth proportional to unique client IPs. The sweep runs inline during allow() calls — no background task needed. A last_sweep timestamp ensures the full-map scan only happens once per sweep interval, keeping amortized overhead minimal. Closes #353 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
639032c952
commit
5af74d1d20
1 changed files with 65 additions and 3 deletions
|
|
@ -79,11 +79,14 @@ async fn gateway_agent_reply(state: &AppState, message: &str) -> Result<String>
|
||||||
Ok(normalize_gateway_reply(reply))
|
Ok(normalize_gateway_reply(reply))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// How often the rate limiter sweeps stale IP entries from its map.
|
||||||
|
const RATE_LIMITER_SWEEP_INTERVAL_SECS: u64 = 300; // 5 minutes
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct SlidingWindowRateLimiter {
|
struct SlidingWindowRateLimiter {
|
||||||
limit_per_window: u32,
|
limit_per_window: u32,
|
||||||
window: Duration,
|
window: Duration,
|
||||||
requests: Mutex<HashMap<String, Vec<Instant>>>,
|
requests: Mutex<(HashMap<String, Vec<Instant>>, Instant)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SlidingWindowRateLimiter {
|
impl SlidingWindowRateLimiter {
|
||||||
|
|
@ -91,7 +94,7 @@ impl SlidingWindowRateLimiter {
|
||||||
Self {
|
Self {
|
||||||
limit_per_window,
|
limit_per_window,
|
||||||
window,
|
window,
|
||||||
requests: Mutex::new(HashMap::new()),
|
requests: Mutex::new((HashMap::new(), Instant::now())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -103,10 +106,20 @@ impl SlidingWindowRateLimiter {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let cutoff = now.checked_sub(self.window).unwrap_or_else(Instant::now);
|
let cutoff = now.checked_sub(self.window).unwrap_or_else(Instant::now);
|
||||||
|
|
||||||
let mut requests = self
|
let mut guard = self
|
||||||
.requests
|
.requests
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||||
|
let (requests, last_sweep) = &mut *guard;
|
||||||
|
|
||||||
|
// Periodic sweep: remove IPs with no recent requests
|
||||||
|
if last_sweep.elapsed() >= Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS) {
|
||||||
|
requests.retain(|_, timestamps| {
|
||||||
|
timestamps.retain(|t| *t > cutoff);
|
||||||
|
!timestamps.is_empty()
|
||||||
|
});
|
||||||
|
*last_sweep = now;
|
||||||
|
}
|
||||||
|
|
||||||
let entry = requests.entry(key.to_owned()).or_default();
|
let entry = requests.entry(key.to_owned()).or_default();
|
||||||
entry.retain(|instant| *instant > cutoff);
|
entry.retain(|instant| *instant > cutoff);
|
||||||
|
|
@ -811,6 +824,55 @@ mod tests {
|
||||||
assert!(!limiter.allow_pair("127.0.0.1"));
|
assert!(!limiter.allow_pair("127.0.0.1"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn rate_limiter_sweep_removes_stale_entries() {
|
||||||
|
let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60));
|
||||||
|
// Add entries for multiple IPs
|
||||||
|
assert!(limiter.allow("ip-1"));
|
||||||
|
assert!(limiter.allow("ip-2"));
|
||||||
|
assert!(limiter.allow("ip-3"));
|
||||||
|
|
||||||
|
{
|
||||||
|
let guard = limiter
|
||||||
|
.requests
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||||
|
assert_eq!(guard.0.len(), 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Force a sweep by backdating last_sweep
|
||||||
|
{
|
||||||
|
let mut guard = limiter
|
||||||
|
.requests
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||||
|
guard.1 = Instant::now() - Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS + 1);
|
||||||
|
// Clear timestamps for ip-2 and ip-3 to simulate stale entries
|
||||||
|
guard.0.get_mut("ip-2").unwrap().clear();
|
||||||
|
guard.0.get_mut("ip-3").unwrap().clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next allow() call should trigger sweep and remove stale entries
|
||||||
|
assert!(limiter.allow("ip-1"));
|
||||||
|
|
||||||
|
{
|
||||||
|
let guard = limiter
|
||||||
|
.requests
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||||
|
assert_eq!(guard.0.len(), 1, "Stale entries should have been swept");
|
||||||
|
assert!(guard.0.contains_key("ip-1"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn rate_limiter_zero_limit_always_allows() {
|
||||||
|
let limiter = SlidingWindowRateLimiter::new(0, Duration::from_secs(60));
|
||||||
|
for _ in 0..100 {
|
||||||
|
assert!(limiter.allow("any-key"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn idempotency_store_rejects_duplicate_key() {
|
fn idempotency_store_rejects_duplicate_key() {
|
||||||
let store = IdempotencyStore::new(Duration::from_secs(30));
|
let store = IdempotencyStore::new(Duration::from_secs(30));
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue