diff --git a/src/agent/agent.rs b/src/agent/agent.rs index 05a9837..ca18e79 100644 --- a/src/agent/agent.rs +++ b/src/agent/agent.rs @@ -566,7 +566,7 @@ pub async fn run( mod tests { use super::*; use async_trait::async_trait; - use std::sync::Mutex; + use parking_lot::Mutex; struct MockProvider { responses: Mutex>, @@ -590,7 +590,7 @@ mod tests { _model: &str, _temperature: f64, ) -> Result { - let mut guard = self.responses.lock().unwrap(); + let mut guard = self.responses.lock(); if guard.is_empty() { return Ok(crate::providers::ChatResponse { text: Some("done".into()), diff --git a/src/channels/discord.rs b/src/channels/discord.rs index 71b9892..4e99f43 100644 --- a/src/channels/discord.rs +++ b/src/channels/discord.rs @@ -344,7 +344,7 @@ impl Channel for DiscordChannel { } let message_id = d.get("id").and_then(|i| i.as_str()).unwrap_or(""); - let _channel_id = d.get("channel_id").and_then(|c| c.as_str()).unwrap_or("").to_string(); + let channel_id = d.get("channel_id").and_then(|c| c.as_str()).unwrap_or("").to_string(); let channel_msg = ChannelMessage { id: if message_id.is_empty() { @@ -354,7 +354,7 @@ impl Channel for DiscordChannel { }, sender: author_id.to_string(), content: content.to_string(), - channel: "discord".to_string(), + channel: channel_id, timestamp: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() diff --git a/src/channels/email_channel.rs b/src/channels/email_channel.rs index e34c7de..f1ea016 100644 --- a/src/channels/email_channel.rs +++ b/src/channels/email_channel.rs @@ -17,7 +17,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::io::Write as IoWrite; use std::net::TcpStream; -use std::sync::Mutex; +use parking_lot::Mutex; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; use tokio::time::{interval, sleep}; @@ -415,7 +415,7 @@ impl Channel for EmailChannel { let mut seen = self .seen_messages .lock() - .expect("seen_messages mutex should not be poisoned"); + ; if seen.contains(&id) { continue; } diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 1a161ad..d8fd612 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -213,7 +213,7 @@ async fn process_channel_message(ctx: Arc, msg: traits::C truncate_with_ellipsis(&response, 80) ); if let Some(channel) = target_channel.as_ref() { - if let Err(e) = channel.send(&response, &msg.sender).await { + if let Err(e) = channel.send(&response, &msg.channel).await { eprintln!(" ❌ Failed to reply on {}: {e}", channel.name()); } } diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index c5d4da3..719e8e7 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -27,7 +27,8 @@ use axum::{ }; use std::collections::HashMap; use std::net::SocketAddr; -use std::sync::{Arc, Mutex}; +use parking_lot::Mutex; +use std::sync::Arc; use std::time::{Duration, Instant}; use tower_http::limit::RequestBodyLimitLayer; use tower_http::timeout::TimeoutLayer; @@ -77,8 +78,7 @@ impl SlidingWindowRateLimiter { let mut guard = self .requests - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); + .lock(); let (requests, last_sweep) = &mut *guard; // Periodic sweep: remove IPs with no recent requests @@ -145,8 +145,7 @@ impl IdempotencyStore { let now = Instant::now(); let mut keys = self .keys - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); + .lock(); keys.retain(|_, seen_at| now.duration_since(*seen_at) < self.ttl); @@ -729,7 +728,7 @@ mod tests { use axum::response::IntoResponse; use http_body_util::BodyExt; use std::sync::atomic::{AtomicUsize, Ordering}; - use std::sync::Mutex; + use parking_lot::Mutex; #[test] fn security_body_limit_is_64kb() { diff --git a/src/memory/lucid.rs b/src/memory/lucid.rs index 00e03f6..50cf9de 100644 --- a/src/memory/lucid.rs +++ b/src/memory/lucid.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use chrono::Local; use std::collections::HashSet; use std::path::{Path, PathBuf}; -use std::sync::Mutex; +use parking_lot::Mutex; use std::time::{Duration, Instant}; use tokio::process::Command; use tokio::time::timeout; @@ -113,9 +113,7 @@ impl LucidMemory { } fn in_failure_cooldown(&self) -> bool { - let Ok(guard) = self.last_failure_at.lock() else { - return false; - }; + let guard = self.last_failure_at.lock(); guard .as_ref() @@ -123,15 +121,11 @@ impl LucidMemory { } fn mark_failure_now(&self) { - if let Ok(mut guard) = self.last_failure_at.lock() { - *guard = Some(Instant::now()); - } + *self.last_failure_at.lock() = Some(Instant::now()); } fn clear_failure(&self) { - if let Ok(mut guard) = self.last_failure_at.lock() { - *guard = None; - } + *self.last_failure_at.lock() = None; } fn to_lucid_type(category: &MemoryCategory) -> &'static str { diff --git a/src/memory/response_cache.rs b/src/memory/response_cache.rs index 3135b2b..6baa5c7 100644 --- a/src/memory/response_cache.rs +++ b/src/memory/response_cache.rs @@ -10,7 +10,7 @@ use chrono::{Duration, Local}; use rusqlite::{params, Connection}; use sha2::{Digest, Sha256}; use std::path::{Path, PathBuf}; -use std::sync::Mutex; +use parking_lot::Mutex; /// Response cache backed by a dedicated SQLite database. /// @@ -77,10 +77,7 @@ impl ResponseCache { /// Look up a cached response. Returns `None` on miss or expired entry. pub fn get(&self, key: &str) -> Result> { - let conn = self - .conn - .lock() - .map_err(|e| anyhow::anyhow!("Lock error: {e}"))?; + let conn = self.conn.lock(); let now = Local::now(); let cutoff = (now - Duration::minutes(self.ttl_minutes)).to_rfc3339(); @@ -108,10 +105,7 @@ impl ResponseCache { /// Store a response in the cache. pub fn put(&self, key: &str, model: &str, response: &str, token_count: u32) -> Result<()> { - let conn = self - .conn - .lock() - .map_err(|e| anyhow::anyhow!("Lock error: {e}"))?; + let conn = self.conn.lock(); let now = Local::now().to_rfc3339(); @@ -146,10 +140,7 @@ impl ResponseCache { /// Return cache statistics: (total_entries, total_hits, total_tokens_saved). pub fn stats(&self) -> Result<(usize, u64, u64)> { - let conn = self - .conn - .lock() - .map_err(|e| anyhow::anyhow!("Lock error: {e}"))?; + let conn = self.conn.lock(); let count: i64 = conn.query_row("SELECT COUNT(*) FROM response_cache", [], |row| row.get(0))?; @@ -172,10 +163,7 @@ impl ResponseCache { /// Wipe the entire cache (useful for `zeroclaw cache clear`). pub fn clear(&self) -> Result { - let conn = self - .conn - .lock() - .map_err(|e| anyhow::anyhow!("Lock error: {e}"))?; + let conn = self.conn.lock(); let affected = conn.execute("DELETE FROM response_cache", [])?; Ok(affected) diff --git a/src/memory/sqlite.rs b/src/memory/sqlite.rs index 6219989..160487d 100644 --- a/src/memory/sqlite.rs +++ b/src/memory/sqlite.rs @@ -5,7 +5,8 @@ use async_trait::async_trait; use chrono::Local; use rusqlite::{params, Connection}; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; +use parking_lot::Mutex; +use std::sync::Arc; use uuid::Uuid; /// SQLite-backed persistent memory — the brain @@ -896,7 +897,7 @@ mod tests { #[tokio::test] async fn schema_has_fts5_table() { let (_tmp, mem) = temp_sqlite(); - let conn = mem.conn.lock().unwrap(); + let conn = mem.conn.lock(); // FTS5 table should exist let count: i64 = conn .query_row( @@ -911,7 +912,7 @@ mod tests { #[tokio::test] async fn schema_has_embedding_cache() { let (_tmp, mem) = temp_sqlite(); - let conn = mem.conn.lock().unwrap(); + let conn = mem.conn.lock(); let count: i64 = conn .query_row( "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='embedding_cache'", @@ -925,7 +926,7 @@ mod tests { #[tokio::test] async fn schema_memories_has_embedding_column() { let (_tmp, mem) = temp_sqlite(); - let conn = mem.conn.lock().unwrap(); + let conn = mem.conn.lock(); // Check that embedding column exists by querying it let result = conn.execute_batch("SELECT embedding FROM memories LIMIT 0"); assert!(result.is_ok()); @@ -940,7 +941,7 @@ mod tests { .await .unwrap(); - let conn = mem.conn.lock().unwrap(); + let conn = mem.conn.lock(); let count: i64 = conn .query_row( "SELECT COUNT(*) FROM memories_fts WHERE memories_fts MATCH '\"unique_searchterm_xyz\"'", @@ -959,7 +960,7 @@ mod tests { .unwrap(); mem.forget("del_key").await.unwrap(); - let conn = mem.conn.lock().unwrap(); + let conn = mem.conn.lock(); let count: i64 = conn .query_row( "SELECT COUNT(*) FROM memories_fts WHERE memories_fts MATCH '\"deletable_content_abc\"'", @@ -980,7 +981,7 @@ mod tests { .await .unwrap(); - let conn = mem.conn.lock().unwrap(); + let conn = mem.conn.lock(); // Old content should not be findable let old: i64 = conn .query_row( diff --git a/src/observability/traits.rs b/src/observability/traits.rs index a1eb10f..ca62caf 100644 --- a/src/observability/traits.rs +++ b/src/observability/traits.rs @@ -85,7 +85,7 @@ pub trait Observer: Send + Sync + 'static { #[cfg(test)] mod tests { use super::*; - use std::sync::Mutex; + use parking_lot::Mutex; use std::time::Duration; #[derive(Default)] @@ -96,12 +96,12 @@ mod tests { impl Observer for DummyObserver { fn record_event(&self, _event: &ObserverEvent) { - let mut guard = self.events.lock().unwrap(); + let mut guard = self.events.lock(); *guard += 1; } fn record_metric(&self, _metric: &ObserverMetric) { - let mut guard = self.metrics.lock().unwrap(); + let mut guard = self.metrics.lock(); *guard += 1; } @@ -121,8 +121,8 @@ mod tests { }); observer.record_metric(&ObserverMetric::TokensUsed(42)); - assert_eq!(*observer.events.lock().unwrap(), 2); - assert_eq!(*observer.metrics.lock().unwrap(), 1); + assert_eq!(*observer.events.lock(), 2); + assert_eq!(*observer.metrics.lock(), 1); } #[test] diff --git a/src/providers/reliable.rs b/src/providers/reliable.rs index d91f02c..045f2c3 100644 --- a/src/providers/reliable.rs +++ b/src/providers/reliable.rs @@ -461,7 +461,7 @@ mod tests { /// Mock that records which model was used for each call. struct ModelAwareMock { calls: Arc, - models_seen: std::sync::Mutex>, + models_seen: parking_lot::Mutex>, fail_models: Vec<&'static str>, response: &'static str, } @@ -476,7 +476,7 @@ mod tests { _temperature: f64, ) -> anyhow::Result { self.calls.fetch_add(1, Ordering::SeqCst); - self.models_seen.lock().unwrap().push(model.to_string()); + self.models_seen.lock().push(model.to_string()); if self.fail_models.contains(&model) { anyhow::bail!("500 model {} unavailable", model); } @@ -729,7 +729,7 @@ mod tests { let calls = Arc::new(AtomicUsize::new(0)); let mock = Arc::new(ModelAwareMock { calls: Arc::clone(&calls), - models_seen: std::sync::Mutex::new(Vec::new()), + models_seen: parking_lot::Mutex::new(Vec::new()), fail_models: vec!["claude-opus"], response: "ok from sonnet", }); @@ -764,7 +764,7 @@ mod tests { let calls = Arc::new(AtomicUsize::new(0)); let mock = Arc::new(ModelAwareMock { calls: Arc::clone(&calls), - models_seen: std::sync::Mutex::new(Vec::new()), + models_seen: parking_lot::Mutex::new(Vec::new()), fail_models: vec!["model-a", "model-b", "model-c"], response: "never", }); diff --git a/src/providers/router.rs b/src/providers/router.rs index ccbdffb..78edde0 100644 --- a/src/providers/router.rs +++ b/src/providers/router.rs @@ -164,7 +164,7 @@ mod tests { struct MockProvider { calls: Arc, response: &'static str, - last_model: std::sync::Mutex, + last_model: parking_lot::Mutex, } impl MockProvider { @@ -172,7 +172,7 @@ mod tests { Self { calls: Arc::new(AtomicUsize::new(0)), response, - last_model: std::sync::Mutex::new(String::new()), + last_model: parking_lot::Mutex::new(String::new()), } } @@ -181,7 +181,7 @@ mod tests { } fn last_model(&self) -> String { - self.last_model.lock().unwrap().clone() + self.last_model.lock().clone() } } @@ -195,7 +195,7 @@ mod tests { _temperature: f64, ) -> anyhow::Result { self.calls.fetch_add(1, Ordering::SeqCst); - *self.last_model.lock().unwrap() = model.to_string(); + *self.last_model.lock() = model.to_string(); Ok(self.response.to_string()) } } diff --git a/src/security/audit.rs b/src/security/audit.rs index f18208f..7874450 100644 --- a/src/security/audit.rs +++ b/src/security/audit.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use std::fs::OpenOptions; use std::io::Write; use std::path::PathBuf; -use std::sync::Mutex; +use parking_lot::Mutex; use uuid::Uuid; /// Audit event types