From ef02f25c4623a3d56a6b3cecd6f3b73471b0437e Mon Sep 17 00:00:00 2001 From: Chummy Date: Wed, 18 Feb 2026 00:31:27 +0800 Subject: [PATCH] refactor(sync): migrate remaining std mutex usage to parking_lot --- src/approval/mod.rs | 27 ++++++--------------------- src/channels/discord.rs | 27 +++++++++++++-------------- src/cost/tracker.rs | 29 +++++++++++++---------------- src/health/mod.rs | 35 ++++++++++++++++------------------- src/memory/lucid.rs | 17 ++++++----------- 5 files changed, 54 insertions(+), 81 deletions(-) diff --git a/src/approval/mod.rs b/src/approval/mod.rs index 5099d9b..ea5b02b 100644 --- a/src/approval/mod.rs +++ b/src/approval/mod.rs @@ -6,10 +6,10 @@ use crate::config::AutonomyConfig; use crate::security::AutonomyLevel; use chrono::Utc; +use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::io::{self, BufRead, Write}; -use std::sync::Mutex; // ── Types ──────────────────────────────────────────────────────── @@ -99,10 +99,7 @@ impl ApprovalManager { } // Session allowlist (from prior "Always" responses). - let allowlist = self - .session_allowlist - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); + let allowlist = self.session_allowlist.lock(); if allowlist.contains(tool_name) { return false; } @@ -121,10 +118,7 @@ impl ApprovalManager { ) { // If "Always", add to session allowlist. if decision == ApprovalResponse::Always { - let mut allowlist = self - .session_allowlist - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); + let mut allowlist = self.session_allowlist.lock(); allowlist.insert(tool_name.to_string()); } @@ -137,27 +131,18 @@ impl ApprovalManager { decision, channel: channel.to_string(), }; - let mut log = self - .audit_log - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); + let mut log = self.audit_log.lock(); log.push(entry); } /// Get a snapshot of the audit log. pub fn audit_log(&self) -> Vec { - self.audit_log - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner) - .clone() + self.audit_log.lock().clone() } /// Get the current session allowlist. pub fn session_allowlist(&self) -> HashSet { - self.session_allowlist - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner) - .clone() + self.session_allowlist.lock().clone() } /// Prompt the user on the CLI and return their decision. diff --git a/src/channels/discord.rs b/src/channels/discord.rs index 32233e5..939d47c 100644 --- a/src/channels/discord.rs +++ b/src/channels/discord.rs @@ -1,6 +1,7 @@ use super::traits::{Channel, ChannelMessage, SendMessage}; use async_trait::async_trait; use futures_util::{SinkExt, StreamExt}; +use parking_lot::Mutex; use serde_json::json; use tokio_tungstenite::tungstenite::Message; use uuid::Uuid; @@ -13,7 +14,7 @@ pub struct DiscordChannel { listen_to_bots: bool, mention_only: bool, client: reqwest::Client, - typing_handle: std::sync::Mutex>>, + typing_handle: Mutex>>, } impl DiscordChannel { @@ -31,7 +32,7 @@ impl DiscordChannel { listen_to_bots, mention_only, client: reqwest::Client::new(), - typing_handle: std::sync::Mutex::new(None), + typing_handle: Mutex::new(None), } } @@ -451,18 +452,16 @@ impl Channel for DiscordChannel { } }); - if let Ok(mut guard) = self.typing_handle.lock() { - *guard = Some(handle); - } + let mut guard = self.typing_handle.lock(); + *guard = Some(handle); Ok(()) } async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> { - if let Ok(mut guard) = self.typing_handle.lock() { - if let Some(handle) = guard.take() { - handle.abort(); - } + let mut guard = self.typing_handle.lock(); + if let Some(handle) = guard.take() { + handle.abort(); } Ok(()) } @@ -722,7 +721,7 @@ mod tests { #[test] fn split_multibyte_only_content_without_panics() { - let msg = "你".repeat(2500); + let msg = "🦀".repeat(2500); let chunks = split_message_for_discord(&msg); assert_eq!(chunks.len(), 2); assert_eq!(chunks[0].chars().count(), DISCORD_MAX_MESSAGE_LENGTH); @@ -752,7 +751,7 @@ mod tests { #[test] fn typing_handle_starts_as_none() { let ch = DiscordChannel::new("fake".into(), None, vec![], false, false); - let guard = ch.typing_handle.lock().unwrap(); + let guard = ch.typing_handle.lock(); assert!(guard.is_none()); } @@ -760,7 +759,7 @@ mod tests { async fn start_typing_sets_handle() { let ch = DiscordChannel::new("fake".into(), None, vec![], false, false); let _ = ch.start_typing("123456").await; - let guard = ch.typing_handle.lock().unwrap(); + let guard = ch.typing_handle.lock(); assert!(guard.is_some()); } @@ -769,7 +768,7 @@ mod tests { let ch = DiscordChannel::new("fake".into(), None, vec![], false, false); let _ = ch.start_typing("123456").await; let _ = ch.stop_typing("123456").await; - let guard = ch.typing_handle.lock().unwrap(); + let guard = ch.typing_handle.lock(); assert!(guard.is_none()); } @@ -785,7 +784,7 @@ mod tests { let ch = DiscordChannel::new("fake".into(), None, vec![], false, false); let _ = ch.start_typing("111").await; let _ = ch.start_typing("222").await; - let guard = ch.typing_handle.lock().unwrap(); + let guard = ch.typing_handle.lock(); assert!(guard.is_some()); } diff --git a/src/cost/tracker.rs b/src/cost/tracker.rs index 697f381..1905b36 100644 --- a/src/cost/tracker.rs +++ b/src/cost/tracker.rs @@ -2,11 +2,12 @@ use super::types::{BudgetCheck, CostRecord, CostSummary, ModelStats, TokenUsage, use crate::config::schema::CostConfig; use anyhow::{anyhow, Context, Result}; use chrono::{Datelike, NaiveDate, Utc}; +use parking_lot::{Mutex, MutexGuard}; use std::collections::HashMap; use std::fs::{self, File, OpenOptions}; use std::io::{BufRead, BufReader, Write}; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::Arc; /// Cost tracker for API usage monitoring and budget enforcement. pub struct CostTracker { @@ -38,16 +39,12 @@ impl CostTracker { &self.session_id } - fn lock_storage(&self) -> Result> { - self.storage - .lock() - .map_err(|_| anyhow!("Cost storage lock poisoned")) + fn lock_storage(&self) -> MutexGuard<'_, CostStorage> { + self.storage.lock() } - fn lock_session_costs(&self) -> Result>> { - self.session_costs - .lock() - .map_err(|_| anyhow!("Session cost lock poisoned")) + fn lock_session_costs(&self) -> MutexGuard<'_, Vec> { + self.session_costs.lock() } /// Check if a request is within budget. @@ -62,7 +59,7 @@ impl CostTracker { )); } - let mut storage = self.lock_storage()?; + let mut storage = self.lock_storage(); let (daily_cost, monthly_cost) = storage.get_aggregated_costs()?; // Check daily limit @@ -125,12 +122,12 @@ impl CostTracker { // Persist first for durability guarantees. { - let mut storage = self.lock_storage()?; + let mut storage = self.lock_storage(); storage.add_record(record.clone())?; } // Then update in-memory session snapshot. - let mut session_costs = self.lock_session_costs()?; + let mut session_costs = self.lock_session_costs(); session_costs.push(record); Ok(()) @@ -139,11 +136,11 @@ impl CostTracker { /// Get the current cost summary. pub fn get_summary(&self) -> Result { let (daily_cost, monthly_cost) = { - let mut storage = self.lock_storage()?; + let mut storage = self.lock_storage(); storage.get_aggregated_costs()? }; - let session_costs = self.lock_session_costs()?; + let session_costs = self.lock_session_costs(); let session_cost: f64 = session_costs .iter() .map(|record| record.usage.cost_usd) @@ -167,13 +164,13 @@ impl CostTracker { /// Get the daily cost for a specific date. pub fn get_daily_cost(&self, date: NaiveDate) -> Result { - let storage = self.lock_storage()?; + let storage = self.lock_storage(); storage.get_cost_for_date(date) } /// Get the monthly cost for a specific month. pub fn get_monthly_cost(&self, year: i32, month: u32) -> Result { - let storage = self.lock_storage()?; + let storage = self.lock_storage(); storage.get_cost_for_month(year, month) } } diff --git a/src/health/mod.rs b/src/health/mod.rs index 1d28ef0..2926c21 100644 --- a/src/health/mod.rs +++ b/src/health/mod.rs @@ -1,7 +1,8 @@ use chrono::Utc; +use parking_lot::Mutex; use serde::Serialize; use std::collections::BTreeMap; -use std::sync::{Mutex, OnceLock}; +use std::sync::OnceLock; use std::time::Instant; #[derive(Debug, Clone, Serialize)] @@ -43,20 +44,19 @@ fn upsert_component(component: &str, update: F) where F: FnOnce(&mut ComponentHealth), { - if let Ok(mut map) = registry().components.lock() { - let now = now_rfc3339(); - let entry = map - .entry(component.to_string()) - .or_insert_with(|| ComponentHealth { - status: "starting".into(), - updated_at: now.clone(), - last_ok: None, - last_error: None, - restart_count: 0, - }); - update(entry); - entry.updated_at = now; - } + let mut map = registry().components.lock(); + let now = now_rfc3339(); + let entry = map + .entry(component.to_string()) + .or_insert_with(|| ComponentHealth { + status: "starting".into(), + updated_at: now.clone(), + last_ok: None, + last_error: None, + restart_count: 0, + }); + update(entry); + entry.updated_at = now; } pub fn mark_component_ok(component: &str) { @@ -83,10 +83,7 @@ pub fn bump_component_restart(component: &str) { } pub fn snapshot() -> HealthSnapshot { - let components = registry() - .components - .lock() - .map_or_else(|_| BTreeMap::new(), |map| map.clone()); + let components = registry().components.lock().clone(); HealthSnapshot { pid: std::process::id(), diff --git a/src/memory/lucid.rs b/src/memory/lucid.rs index ab27840..62af08f 100644 --- a/src/memory/lucid.rs +++ b/src/memory/lucid.rs @@ -2,9 +2,9 @@ use super::sqlite::SqliteMemory; use super::traits::{Memory, MemoryCategory, MemoryEntry}; use async_trait::async_trait; use chrono::Local; +use parking_lot::Mutex; use std::collections::HashSet; use std::path::{Path, PathBuf}; -use std::sync::Mutex; use std::time::{Duration, Instant}; use tokio::process::Command; use tokio::time::timeout; @@ -116,25 +116,20 @@ impl LucidMemory { } fn in_failure_cooldown(&self) -> bool { - let Ok(guard) = self.last_failure_at.lock() else { - return false; - }; - + let guard = self.last_failure_at.lock(); guard .as_ref() .is_some_and(|last| last.elapsed() < self.failure_cooldown) } fn mark_failure_now(&self) { - if let Ok(mut guard) = self.last_failure_at.lock() { - *guard = Some(Instant::now()); - } + let mut guard = self.last_failure_at.lock(); + *guard = Some(Instant::now()); } fn clear_failure(&self) { - if let Ok(mut guard) = self.last_failure_at.lock() { - *guard = None; - } + let mut guard = self.last_failure_at.lock(); + *guard = None; } fn to_lucid_type(category: &MemoryCategory) -> &'static str {