fix(discord): use channel_id instead of sender for replies (fixes #483)

fix(misc): complete parking_lot::Mutex migration (fixes #505)

- DiscordChannel: store actual channel_id in ChannelMessage.channel
  instead of hardcoded "discord" string
- channels/mod.rs: use msg.channel instead of msg.sender for replies
- Migrate all std::sync::Mutex to parking_lot::Mutex:
  * src/security/audit.rs
  * src/memory/sqlite.rs
  * src/memory/response_cache.rs
  * src/memory/lucid.rs
  * src/channels/email_channel.rs
  * src/gateway/mod.rs
  * src/observability/traits.rs
  * src/providers/reliable.rs
  * src/providers/router.rs
  * src/agent/agent.rs
- Remove all .lock().unwrap() and .map_err(PoisonError) patterns
  since parking_lot::Mutex never poisons

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
argenis de la rosa 2026-02-17 08:05:25 -05:00
parent f7d77b09f4
commit 1908af3248
12 changed files with 43 additions and 61 deletions

View file

@ -566,7 +566,7 @@ pub async fn run(
mod tests { mod tests {
use super::*; use super::*;
use async_trait::async_trait; use async_trait::async_trait;
use std::sync::Mutex; use parking_lot::Mutex;
struct MockProvider { struct MockProvider {
responses: Mutex<Vec<crate::providers::ChatResponse>>, responses: Mutex<Vec<crate::providers::ChatResponse>>,
@ -590,7 +590,7 @@ mod tests {
_model: &str, _model: &str,
_temperature: f64, _temperature: f64,
) -> Result<crate::providers::ChatResponse> { ) -> Result<crate::providers::ChatResponse> {
let mut guard = self.responses.lock().unwrap(); let mut guard = self.responses.lock();
if guard.is_empty() { if guard.is_empty() {
return Ok(crate::providers::ChatResponse { return Ok(crate::providers::ChatResponse {
text: Some("done".into()), text: Some("done".into()),

View file

@ -344,7 +344,7 @@ impl Channel for DiscordChannel {
} }
let message_id = d.get("id").and_then(|i| i.as_str()).unwrap_or(""); let message_id = d.get("id").and_then(|i| i.as_str()).unwrap_or("");
let _channel_id = d.get("channel_id").and_then(|c| c.as_str()).unwrap_or("").to_string(); let channel_id = d.get("channel_id").and_then(|c| c.as_str()).unwrap_or("").to_string();
let channel_msg = ChannelMessage { let channel_msg = ChannelMessage {
id: if message_id.is_empty() { id: if message_id.is_empty() {
@ -354,7 +354,7 @@ impl Channel for DiscordChannel {
}, },
sender: author_id.to_string(), sender: author_id.to_string(),
content: content.to_string(), content: content.to_string(),
channel: "discord".to_string(), channel: channel_id,
timestamp: std::time::SystemTime::now() timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default() .unwrap_or_default()

View file

@ -17,7 +17,7 @@ use serde::{Deserialize, Serialize};
use std::collections::HashSet; use std::collections::HashSet;
use std::io::Write as IoWrite; use std::io::Write as IoWrite;
use std::net::TcpStream; use std::net::TcpStream;
use std::sync::Mutex; use parking_lot::Mutex;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::{interval, sleep}; use tokio::time::{interval, sleep};
@ -415,7 +415,7 @@ impl Channel for EmailChannel {
let mut seen = self let mut seen = self
.seen_messages .seen_messages
.lock() .lock()
.expect("seen_messages mutex should not be poisoned"); ;
if seen.contains(&id) { if seen.contains(&id) {
continue; continue;
} }

View file

@ -213,7 +213,7 @@ async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::C
truncate_with_ellipsis(&response, 80) truncate_with_ellipsis(&response, 80)
); );
if let Some(channel) = target_channel.as_ref() { if let Some(channel) = target_channel.as_ref() {
if let Err(e) = channel.send(&response, &msg.sender).await { if let Err(e) = channel.send(&response, &msg.channel).await {
eprintln!(" ❌ Failed to reply on {}: {e}", channel.name()); eprintln!(" ❌ Failed to reply on {}: {e}", channel.name());
} }
} }

View file

@ -27,7 +27,8 @@ use axum::{
}; };
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::{Arc, Mutex}; use parking_lot::Mutex;
use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tower_http::limit::RequestBodyLimitLayer; use tower_http::limit::RequestBodyLimitLayer;
use tower_http::timeout::TimeoutLayer; use tower_http::timeout::TimeoutLayer;
@ -77,8 +78,7 @@ impl SlidingWindowRateLimiter {
let mut guard = self let mut guard = self
.requests .requests
.lock() .lock();
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (requests, last_sweep) = &mut *guard; let (requests, last_sweep) = &mut *guard;
// Periodic sweep: remove IPs with no recent requests // Periodic sweep: remove IPs with no recent requests
@ -145,8 +145,7 @@ impl IdempotencyStore {
let now = Instant::now(); let now = Instant::now();
let mut keys = self let mut keys = self
.keys .keys
.lock() .lock();
.unwrap_or_else(std::sync::PoisonError::into_inner);
keys.retain(|_, seen_at| now.duration_since(*seen_at) < self.ttl); keys.retain(|_, seen_at| now.duration_since(*seen_at) < self.ttl);
@ -729,7 +728,7 @@ mod tests {
use axum::response::IntoResponse; use axum::response::IntoResponse;
use http_body_util::BodyExt; use http_body_util::BodyExt;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex; use parking_lot::Mutex;
#[test] #[test]
fn security_body_limit_is_64kb() { fn security_body_limit_is_64kb() {

View file

@ -4,7 +4,7 @@ use async_trait::async_trait;
use chrono::Local; use chrono::Local;
use std::collections::HashSet; use std::collections::HashSet;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Mutex; use parking_lot::Mutex;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::process::Command; use tokio::process::Command;
use tokio::time::timeout; use tokio::time::timeout;
@ -113,9 +113,7 @@ impl LucidMemory {
} }
fn in_failure_cooldown(&self) -> bool { fn in_failure_cooldown(&self) -> bool {
let Ok(guard) = self.last_failure_at.lock() else { let guard = self.last_failure_at.lock();
return false;
};
guard guard
.as_ref() .as_ref()
@ -123,15 +121,11 @@ impl LucidMemory {
} }
fn mark_failure_now(&self) { fn mark_failure_now(&self) {
if let Ok(mut guard) = self.last_failure_at.lock() { *self.last_failure_at.lock() = Some(Instant::now());
*guard = Some(Instant::now());
}
} }
fn clear_failure(&self) { fn clear_failure(&self) {
if let Ok(mut guard) = self.last_failure_at.lock() { *self.last_failure_at.lock() = None;
*guard = None;
}
} }
fn to_lucid_type(category: &MemoryCategory) -> &'static str { fn to_lucid_type(category: &MemoryCategory) -> &'static str {

View file

@ -10,7 +10,7 @@ use chrono::{Duration, Local};
use rusqlite::{params, Connection}; use rusqlite::{params, Connection};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Mutex; use parking_lot::Mutex;
/// Response cache backed by a dedicated SQLite database. /// Response cache backed by a dedicated SQLite database.
/// ///
@ -77,10 +77,7 @@ impl ResponseCache {
/// Look up a cached response. Returns `None` on miss or expired entry. /// Look up a cached response. Returns `None` on miss or expired entry.
pub fn get(&self, key: &str) -> Result<Option<String>> { pub fn get(&self, key: &str) -> Result<Option<String>> {
let conn = self let conn = self.conn.lock();
.conn
.lock()
.map_err(|e| anyhow::anyhow!("Lock error: {e}"))?;
let now = Local::now(); let now = Local::now();
let cutoff = (now - Duration::minutes(self.ttl_minutes)).to_rfc3339(); let cutoff = (now - Duration::minutes(self.ttl_minutes)).to_rfc3339();
@ -108,10 +105,7 @@ impl ResponseCache {
/// Store a response in the cache. /// Store a response in the cache.
pub fn put(&self, key: &str, model: &str, response: &str, token_count: u32) -> Result<()> { pub fn put(&self, key: &str, model: &str, response: &str, token_count: u32) -> Result<()> {
let conn = self let conn = self.conn.lock();
.conn
.lock()
.map_err(|e| anyhow::anyhow!("Lock error: {e}"))?;
let now = Local::now().to_rfc3339(); let now = Local::now().to_rfc3339();
@ -146,10 +140,7 @@ impl ResponseCache {
/// Return cache statistics: (total_entries, total_hits, total_tokens_saved). /// Return cache statistics: (total_entries, total_hits, total_tokens_saved).
pub fn stats(&self) -> Result<(usize, u64, u64)> { pub fn stats(&self) -> Result<(usize, u64, u64)> {
let conn = self let conn = self.conn.lock();
.conn
.lock()
.map_err(|e| anyhow::anyhow!("Lock error: {e}"))?;
let count: i64 = let count: i64 =
conn.query_row("SELECT COUNT(*) FROM response_cache", [], |row| row.get(0))?; conn.query_row("SELECT COUNT(*) FROM response_cache", [], |row| row.get(0))?;
@ -172,10 +163,7 @@ impl ResponseCache {
/// Wipe the entire cache (useful for `zeroclaw cache clear`). /// Wipe the entire cache (useful for `zeroclaw cache clear`).
pub fn clear(&self) -> Result<usize> { pub fn clear(&self) -> Result<usize> {
let conn = self let conn = self.conn.lock();
.conn
.lock()
.map_err(|e| anyhow::anyhow!("Lock error: {e}"))?;
let affected = conn.execute("DELETE FROM response_cache", [])?; let affected = conn.execute("DELETE FROM response_cache", [])?;
Ok(affected) Ok(affected)

View file

@ -5,7 +5,8 @@ use async_trait::async_trait;
use chrono::Local; use chrono::Local;
use rusqlite::{params, Connection}; use rusqlite::{params, Connection};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex}; use parking_lot::Mutex;
use std::sync::Arc;
use uuid::Uuid; use uuid::Uuid;
/// SQLite-backed persistent memory — the brain /// SQLite-backed persistent memory — the brain
@ -896,7 +897,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn schema_has_fts5_table() { async fn schema_has_fts5_table() {
let (_tmp, mem) = temp_sqlite(); let (_tmp, mem) = temp_sqlite();
let conn = mem.conn.lock().unwrap(); let conn = mem.conn.lock();
// FTS5 table should exist // FTS5 table should exist
let count: i64 = conn let count: i64 = conn
.query_row( .query_row(
@ -911,7 +912,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn schema_has_embedding_cache() { async fn schema_has_embedding_cache() {
let (_tmp, mem) = temp_sqlite(); let (_tmp, mem) = temp_sqlite();
let conn = mem.conn.lock().unwrap(); let conn = mem.conn.lock();
let count: i64 = conn let count: i64 = conn
.query_row( .query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='embedding_cache'", "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='embedding_cache'",
@ -925,7 +926,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn schema_memories_has_embedding_column() { async fn schema_memories_has_embedding_column() {
let (_tmp, mem) = temp_sqlite(); let (_tmp, mem) = temp_sqlite();
let conn = mem.conn.lock().unwrap(); let conn = mem.conn.lock();
// Check that embedding column exists by querying it // Check that embedding column exists by querying it
let result = conn.execute_batch("SELECT embedding FROM memories LIMIT 0"); let result = conn.execute_batch("SELECT embedding FROM memories LIMIT 0");
assert!(result.is_ok()); assert!(result.is_ok());
@ -940,7 +941,7 @@ mod tests {
.await .await
.unwrap(); .unwrap();
let conn = mem.conn.lock().unwrap(); let conn = mem.conn.lock();
let count: i64 = conn let count: i64 = conn
.query_row( .query_row(
"SELECT COUNT(*) FROM memories_fts WHERE memories_fts MATCH '\"unique_searchterm_xyz\"'", "SELECT COUNT(*) FROM memories_fts WHERE memories_fts MATCH '\"unique_searchterm_xyz\"'",
@ -959,7 +960,7 @@ mod tests {
.unwrap(); .unwrap();
mem.forget("del_key").await.unwrap(); mem.forget("del_key").await.unwrap();
let conn = mem.conn.lock().unwrap(); let conn = mem.conn.lock();
let count: i64 = conn let count: i64 = conn
.query_row( .query_row(
"SELECT COUNT(*) FROM memories_fts WHERE memories_fts MATCH '\"deletable_content_abc\"'", "SELECT COUNT(*) FROM memories_fts WHERE memories_fts MATCH '\"deletable_content_abc\"'",
@ -980,7 +981,7 @@ mod tests {
.await .await
.unwrap(); .unwrap();
let conn = mem.conn.lock().unwrap(); let conn = mem.conn.lock();
// Old content should not be findable // Old content should not be findable
let old: i64 = conn let old: i64 = conn
.query_row( .query_row(

View file

@ -85,7 +85,7 @@ pub trait Observer: Send + Sync + 'static {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use std::sync::Mutex; use parking_lot::Mutex;
use std::time::Duration; use std::time::Duration;
#[derive(Default)] #[derive(Default)]
@ -96,12 +96,12 @@ mod tests {
impl Observer for DummyObserver { impl Observer for DummyObserver {
fn record_event(&self, _event: &ObserverEvent) { fn record_event(&self, _event: &ObserverEvent) {
let mut guard = self.events.lock().unwrap(); let mut guard = self.events.lock();
*guard += 1; *guard += 1;
} }
fn record_metric(&self, _metric: &ObserverMetric) { fn record_metric(&self, _metric: &ObserverMetric) {
let mut guard = self.metrics.lock().unwrap(); let mut guard = self.metrics.lock();
*guard += 1; *guard += 1;
} }
@ -121,8 +121,8 @@ mod tests {
}); });
observer.record_metric(&ObserverMetric::TokensUsed(42)); observer.record_metric(&ObserverMetric::TokensUsed(42));
assert_eq!(*observer.events.lock().unwrap(), 2); assert_eq!(*observer.events.lock(), 2);
assert_eq!(*observer.metrics.lock().unwrap(), 1); assert_eq!(*observer.metrics.lock(), 1);
} }
#[test] #[test]

View file

@ -461,7 +461,7 @@ mod tests {
/// Mock that records which model was used for each call. /// Mock that records which model was used for each call.
struct ModelAwareMock { struct ModelAwareMock {
calls: Arc<AtomicUsize>, calls: Arc<AtomicUsize>,
models_seen: std::sync::Mutex<Vec<String>>, models_seen: parking_lot::Mutex<Vec<String>>,
fail_models: Vec<&'static str>, fail_models: Vec<&'static str>,
response: &'static str, response: &'static str,
} }
@ -476,7 +476,7 @@ mod tests {
_temperature: f64, _temperature: f64,
) -> anyhow::Result<String> { ) -> anyhow::Result<String> {
self.calls.fetch_add(1, Ordering::SeqCst); self.calls.fetch_add(1, Ordering::SeqCst);
self.models_seen.lock().unwrap().push(model.to_string()); self.models_seen.lock().push(model.to_string());
if self.fail_models.contains(&model) { if self.fail_models.contains(&model) {
anyhow::bail!("500 model {} unavailable", model); anyhow::bail!("500 model {} unavailable", model);
} }
@ -729,7 +729,7 @@ mod tests {
let calls = Arc::new(AtomicUsize::new(0)); let calls = Arc::new(AtomicUsize::new(0));
let mock = Arc::new(ModelAwareMock { let mock = Arc::new(ModelAwareMock {
calls: Arc::clone(&calls), calls: Arc::clone(&calls),
models_seen: std::sync::Mutex::new(Vec::new()), models_seen: parking_lot::Mutex::new(Vec::new()),
fail_models: vec!["claude-opus"], fail_models: vec!["claude-opus"],
response: "ok from sonnet", response: "ok from sonnet",
}); });
@ -764,7 +764,7 @@ mod tests {
let calls = Arc::new(AtomicUsize::new(0)); let calls = Arc::new(AtomicUsize::new(0));
let mock = Arc::new(ModelAwareMock { let mock = Arc::new(ModelAwareMock {
calls: Arc::clone(&calls), calls: Arc::clone(&calls),
models_seen: std::sync::Mutex::new(Vec::new()), models_seen: parking_lot::Mutex::new(Vec::new()),
fail_models: vec!["model-a", "model-b", "model-c"], fail_models: vec!["model-a", "model-b", "model-c"],
response: "never", response: "never",
}); });

View file

@ -164,7 +164,7 @@ mod tests {
struct MockProvider { struct MockProvider {
calls: Arc<AtomicUsize>, calls: Arc<AtomicUsize>,
response: &'static str, response: &'static str,
last_model: std::sync::Mutex<String>, last_model: parking_lot::Mutex<String>,
} }
impl MockProvider { impl MockProvider {
@ -172,7 +172,7 @@ mod tests {
Self { Self {
calls: Arc::new(AtomicUsize::new(0)), calls: Arc::new(AtomicUsize::new(0)),
response, response,
last_model: std::sync::Mutex::new(String::new()), last_model: parking_lot::Mutex::new(String::new()),
} }
} }
@ -181,7 +181,7 @@ mod tests {
} }
fn last_model(&self) -> String { fn last_model(&self) -> String {
self.last_model.lock().unwrap().clone() self.last_model.lock().clone()
} }
} }
@ -195,7 +195,7 @@ mod tests {
_temperature: f64, _temperature: f64,
) -> anyhow::Result<String> { ) -> anyhow::Result<String> {
self.calls.fetch_add(1, Ordering::SeqCst); self.calls.fetch_add(1, Ordering::SeqCst);
*self.last_model.lock().unwrap() = model.to_string(); *self.last_model.lock() = model.to_string();
Ok(self.response.to_string()) Ok(self.response.to_string())
} }
} }

View file

@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::Write; use std::io::Write;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Mutex; use parking_lot::Mutex;
use uuid::Uuid; use uuid::Uuid;
/// Audit event types /// Audit event types