refactor(sync): migrate remaining std mutex usage to parking_lot

This commit is contained in:
Chummy 2026-02-18 00:31:27 +08:00
parent 5942caa083
commit ef02f25c46
5 changed files with 54 additions and 81 deletions

View file

@ -6,10 +6,10 @@
use crate::config::AutonomyConfig;
use crate::security::AutonomyLevel;
use chrono::Utc;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::io::{self, BufRead, Write};
use std::sync::Mutex;
// ── Types ────────────────────────────────────────────────────────
@ -99,10 +99,7 @@ impl ApprovalManager {
}
// Session allowlist (from prior "Always" responses).
let allowlist = self
.session_allowlist
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let allowlist = self.session_allowlist.lock();
if allowlist.contains(tool_name) {
return false;
}
@ -121,10 +118,7 @@ impl ApprovalManager {
) {
// If "Always", add to session allowlist.
if decision == ApprovalResponse::Always {
let mut allowlist = self
.session_allowlist
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let mut allowlist = self.session_allowlist.lock();
allowlist.insert(tool_name.to_string());
}
@ -137,27 +131,18 @@ impl ApprovalManager {
decision,
channel: channel.to_string(),
};
let mut log = self
.audit_log
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let mut log = self.audit_log.lock();
log.push(entry);
}
/// Get a snapshot of the audit log.
pub fn audit_log(&self) -> Vec<ApprovalLogEntry> {
self.audit_log
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
self.audit_log.lock().clone()
}
/// Get the current session allowlist.
pub fn session_allowlist(&self) -> HashSet<String> {
self.session_allowlist
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
self.session_allowlist.lock().clone()
}
/// Prompt the user on the CLI and return their decision.

View file

@ -1,6 +1,7 @@
use super::traits::{Channel, ChannelMessage, SendMessage};
use async_trait::async_trait;
use futures_util::{SinkExt, StreamExt};
use parking_lot::Mutex;
use serde_json::json;
use tokio_tungstenite::tungstenite::Message;
use uuid::Uuid;
@ -13,7 +14,7 @@ pub struct DiscordChannel {
listen_to_bots: bool,
mention_only: bool,
client: reqwest::Client,
typing_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
typing_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
}
impl DiscordChannel {
@ -31,7 +32,7 @@ impl DiscordChannel {
listen_to_bots,
mention_only,
client: reqwest::Client::new(),
typing_handle: std::sync::Mutex::new(None),
typing_handle: Mutex::new(None),
}
}
@ -451,18 +452,16 @@ impl Channel for DiscordChannel {
}
});
if let Ok(mut guard) = self.typing_handle.lock() {
*guard = Some(handle);
}
let mut guard = self.typing_handle.lock();
*guard = Some(handle);
Ok(())
}
async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> {
if let Ok(mut guard) = self.typing_handle.lock() {
if let Some(handle) = guard.take() {
handle.abort();
}
let mut guard = self.typing_handle.lock();
if let Some(handle) = guard.take() {
handle.abort();
}
Ok(())
}
@ -722,7 +721,7 @@ mod tests {
#[test]
fn split_multibyte_only_content_without_panics() {
let msg = "".repeat(2500);
let msg = "🦀".repeat(2500);
let chunks = split_message_for_discord(&msg);
assert_eq!(chunks.len(), 2);
assert_eq!(chunks[0].chars().count(), DISCORD_MAX_MESSAGE_LENGTH);
@ -752,7 +751,7 @@ mod tests {
#[test]
fn typing_handle_starts_as_none() {
let ch = DiscordChannel::new("fake".into(), None, vec![], false, false);
let guard = ch.typing_handle.lock().unwrap();
let guard = ch.typing_handle.lock();
assert!(guard.is_none());
}
@ -760,7 +759,7 @@ mod tests {
async fn start_typing_sets_handle() {
let ch = DiscordChannel::new("fake".into(), None, vec![], false, false);
let _ = ch.start_typing("123456").await;
let guard = ch.typing_handle.lock().unwrap();
let guard = ch.typing_handle.lock();
assert!(guard.is_some());
}
@ -769,7 +768,7 @@ mod tests {
let ch = DiscordChannel::new("fake".into(), None, vec![], false, false);
let _ = ch.start_typing("123456").await;
let _ = ch.stop_typing("123456").await;
let guard = ch.typing_handle.lock().unwrap();
let guard = ch.typing_handle.lock();
assert!(guard.is_none());
}
@ -785,7 +784,7 @@ mod tests {
let ch = DiscordChannel::new("fake".into(), None, vec![], false, false);
let _ = ch.start_typing("111").await;
let _ = ch.start_typing("222").await;
let guard = ch.typing_handle.lock().unwrap();
let guard = ch.typing_handle.lock();
assert!(guard.is_some());
}

View file

@ -2,11 +2,12 @@ use super::types::{BudgetCheck, CostRecord, CostSummary, ModelStats, TokenUsage,
use crate::config::schema::CostConfig;
use anyhow::{anyhow, Context, Result};
use chrono::{Datelike, NaiveDate, Utc};
use parking_lot::{Mutex, MutexGuard};
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::Arc;
/// Cost tracker for API usage monitoring and budget enforcement.
pub struct CostTracker {
@ -38,16 +39,12 @@ impl CostTracker {
&self.session_id
}
fn lock_storage(&self) -> Result<MutexGuard<'_, CostStorage>> {
self.storage
.lock()
.map_err(|_| anyhow!("Cost storage lock poisoned"))
fn lock_storage(&self) -> MutexGuard<'_, CostStorage> {
self.storage.lock()
}
fn lock_session_costs(&self) -> Result<MutexGuard<'_, Vec<CostRecord>>> {
self.session_costs
.lock()
.map_err(|_| anyhow!("Session cost lock poisoned"))
fn lock_session_costs(&self) -> MutexGuard<'_, Vec<CostRecord>> {
self.session_costs.lock()
}
/// Check if a request is within budget.
@ -62,7 +59,7 @@ impl CostTracker {
));
}
let mut storage = self.lock_storage()?;
let mut storage = self.lock_storage();
let (daily_cost, monthly_cost) = storage.get_aggregated_costs()?;
// Check daily limit
@ -125,12 +122,12 @@ impl CostTracker {
// Persist first for durability guarantees.
{
let mut storage = self.lock_storage()?;
let mut storage = self.lock_storage();
storage.add_record(record.clone())?;
}
// Then update in-memory session snapshot.
let mut session_costs = self.lock_session_costs()?;
let mut session_costs = self.lock_session_costs();
session_costs.push(record);
Ok(())
@ -139,11 +136,11 @@ impl CostTracker {
/// Get the current cost summary.
pub fn get_summary(&self) -> Result<CostSummary> {
let (daily_cost, monthly_cost) = {
let mut storage = self.lock_storage()?;
let mut storage = self.lock_storage();
storage.get_aggregated_costs()?
};
let session_costs = self.lock_session_costs()?;
let session_costs = self.lock_session_costs();
let session_cost: f64 = session_costs
.iter()
.map(|record| record.usage.cost_usd)
@ -167,13 +164,13 @@ impl CostTracker {
/// Get the daily cost for a specific date.
pub fn get_daily_cost(&self, date: NaiveDate) -> Result<f64> {
let storage = self.lock_storage()?;
let storage = self.lock_storage();
storage.get_cost_for_date(date)
}
/// Get the monthly cost for a specific month.
pub fn get_monthly_cost(&self, year: i32, month: u32) -> Result<f64> {
let storage = self.lock_storage()?;
let storage = self.lock_storage();
storage.get_cost_for_month(year, month)
}
}

View file

@ -1,7 +1,8 @@
use chrono::Utc;
use parking_lot::Mutex;
use serde::Serialize;
use std::collections::BTreeMap;
use std::sync::{Mutex, OnceLock};
use std::sync::OnceLock;
use std::time::Instant;
#[derive(Debug, Clone, Serialize)]
@ -43,20 +44,19 @@ fn upsert_component<F>(component: &str, update: F)
where
F: FnOnce(&mut ComponentHealth),
{
if let Ok(mut map) = registry().components.lock() {
let now = now_rfc3339();
let entry = map
.entry(component.to_string())
.or_insert_with(|| ComponentHealth {
status: "starting".into(),
updated_at: now.clone(),
last_ok: None,
last_error: None,
restart_count: 0,
});
update(entry);
entry.updated_at = now;
}
let mut map = registry().components.lock();
let now = now_rfc3339();
let entry = map
.entry(component.to_string())
.or_insert_with(|| ComponentHealth {
status: "starting".into(),
updated_at: now.clone(),
last_ok: None,
last_error: None,
restart_count: 0,
});
update(entry);
entry.updated_at = now;
}
pub fn mark_component_ok(component: &str) {
@ -83,10 +83,7 @@ pub fn bump_component_restart(component: &str) {
}
pub fn snapshot() -> HealthSnapshot {
let components = registry()
.components
.lock()
.map_or_else(|_| BTreeMap::new(), |map| map.clone());
let components = registry().components.lock().clone();
HealthSnapshot {
pid: std::process::id(),

View file

@ -2,9 +2,9 @@ use super::sqlite::SqliteMemory;
use super::traits::{Memory, MemoryCategory, MemoryEntry};
use async_trait::async_trait;
use chrono::Local;
use parking_lot::Mutex;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::time::{Duration, Instant};
use tokio::process::Command;
use tokio::time::timeout;
@ -116,25 +116,20 @@ impl LucidMemory {
}
fn in_failure_cooldown(&self) -> bool {
let Ok(guard) = self.last_failure_at.lock() else {
return false;
};
let guard = self.last_failure_at.lock();
guard
.as_ref()
.is_some_and(|last| last.elapsed() < self.failure_cooldown)
}
fn mark_failure_now(&self) {
if let Ok(mut guard) = self.last_failure_at.lock() {
*guard = Some(Instant::now());
}
let mut guard = self.last_failure_at.lock();
*guard = Some(Instant::now());
}
fn clear_failure(&self) {
if let Ok(mut guard) = self.last_failure_at.lock() {
*guard = None;
}
let mut guard = self.last_failure_at.lock();
*guard = None;
}
fn to_lucid_type(category: &MemoryCategory) -> &'static str {