diff --git a/src/channels/discord.rs b/src/channels/discord.rs index 7eb7502..9f7d429 100644 --- a/src/channels/discord.rs +++ b/src/channels/discord.rs @@ -375,9 +375,9 @@ impl Channel for DiscordChannel { reply_target: if channel_id.is_empty() { author_id.to_string() } else { - channel_id + channel_id.clone() }, - content: content.to_string(), + content: clean_content, channel: channel_id, timestamp: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) diff --git a/src/channels/email_channel.rs b/src/channels/email_channel.rs index da3490d..e59e0ac 100644 --- a/src/channels/email_channel.rs +++ b/src/channels/email_channel.rs @@ -14,11 +14,11 @@ use lettre::message::SinglePart; use lettre::transport::smtp::authentication::Credentials; use lettre::{Message, SmtpTransport, Transport}; use mail_parser::{MessageParser, MimeHeaders}; +use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::io::Write as IoWrite; use std::net::TcpStream; -use parking_lot::Mutex; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; use tokio::time::{interval, sleep}; @@ -413,10 +413,7 @@ impl Channel for EmailChannel { Ok(Ok(messages)) => { for (id, sender, content, ts) in messages { { - let mut seen = self - .seen_messages - .lock() - ; + let mut seen = self.seen_messages.lock(); if seen.contains(&id) { continue; } @@ -488,20 +485,14 @@ mod tests { #[test] fn seen_messages_starts_empty() { let channel = EmailChannel::new(EmailConfig::default()); - let seen = channel - .seen_messages - .lock() - .expect("seen_messages mutex should not be poisoned"); + let seen = channel.seen_messages.lock(); assert!(seen.is_empty()); } #[test] fn seen_messages_tracks_unique_ids() { let channel = EmailChannel::new(EmailConfig::default()); - let mut seen = channel - .seen_messages - .lock() - .expect("seen_messages mutex should not be poisoned"); + let mut seen = channel.seen_messages.lock(); assert!(seen.insert("first-id".to_string())); assert!(!seen.insert("first-id".to_string())); @@ -576,10 +567,7 @@ mod tests { let channel = EmailChannel::new(config.clone()); assert_eq!(channel.config.imap_host, config.imap_host); - let seen_guard = channel - .seen_messages - .lock() - .expect("seen_messages mutex should not be poisoned"); + let seen_guard = channel.seen_messages.lock(); assert_eq!(seen_guard.len(), 0); } diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index b391a88..7c618ed 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -25,9 +25,9 @@ use axum::{ routing::{get, post}, Router, }; +use parking_lot::Mutex; use std::collections::HashMap; use std::net::SocketAddr; -use parking_lot::Mutex; use std::sync::Arc; use std::time::{Duration, Instant}; use tower_http::limit::RequestBodyLimitLayer; @@ -83,9 +83,7 @@ impl SlidingWindowRateLimiter { let now = Instant::now(); let cutoff = now.checked_sub(self.window).unwrap_or_else(Instant::now); - let mut guard = self - .requests - .lock(); + let mut guard = self.requests.lock(); let (requests, last_sweep) = &mut *guard; // Periodic sweep: remove IPs with no recent requests @@ -150,9 +148,7 @@ impl IdempotencyStore { /// Returns true if this key is new and is now recorded. fn record_if_new(&self, key: &str) -> bool { let now = Instant::now(); - let mut keys = self - .keys - .lock(); + let mut keys = self.keys.lock(); keys.retain(|_, seen_at| now.duration_since(*seen_at) < self.ttl); @@ -738,8 +734,8 @@ mod tests { use axum::http::HeaderValue; use axum::response::IntoResponse; use http_body_util::BodyExt; - use std::sync::atomic::{AtomicUsize, Ordering}; use parking_lot::Mutex; + use std::sync::atomic::{AtomicUsize, Ordering}; #[test] fn security_body_limit_is_64kb() { @@ -796,19 +792,13 @@ mod tests { assert!(limiter.allow("ip-3")); { - let guard = limiter - .requests - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); + let guard = limiter.requests.lock(); assert_eq!(guard.0.len(), 3); } // Force a sweep by backdating last_sweep { - let mut guard = limiter - .requests - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); + let mut guard = limiter.requests.lock(); guard.1 = Instant::now() .checked_sub(Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS + 1)) .unwrap(); @@ -821,10 +811,7 @@ mod tests { assert!(limiter.allow("ip-1")); { - let guard = limiter - .requests - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); + let guard = limiter.requests.lock(); assert_eq!(guard.0.len(), 1, "Stale entries should have been swept"); assert!(guard.0.contains_key("ip-1")); } @@ -961,10 +948,7 @@ mod tests { _category: MemoryCategory, _session_id: Option<&str>, ) -> anyhow::Result<()> { - self.keys - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner) - .push(key.to_string()); + self.keys.lock().push(key.to_string()); Ok(()) } @@ -994,11 +978,7 @@ mod tests { } async fn count(&self) -> anyhow::Result { - let size = self - .keys - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner) - .len(); + let size = self.keys.lock().len(); Ok(size) } @@ -1093,11 +1073,7 @@ mod tests { .into_response(); assert_eq!(second.status(), StatusCode::OK); - let keys = tracking_impl - .keys - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner) - .clone(); + let keys = tracking_impl.keys.lock().clone(); assert_eq!(keys.len(), 2); assert_ne!(keys[0], keys[1]); assert!(keys[0].starts_with("webhook_msg_")); diff --git a/src/memory/lucid.rs b/src/memory/lucid.rs index e1cb43a..7ea75a0 100644 --- a/src/memory/lucid.rs +++ b/src/memory/lucid.rs @@ -2,9 +2,9 @@ use super::sqlite::SqliteMemory; use super::traits::{Memory, MemoryCategory, MemoryEntry}; use async_trait::async_trait; use chrono::Local; +use parking_lot::Mutex; use std::collections::HashSet; use std::path::{Path, PathBuf}; -use parking_lot::Mutex; use std::time::{Duration, Instant}; use tokio::process::Command; use tokio::time::timeout; @@ -559,11 +559,12 @@ exit 1 "local_note", "Local sqlite auth fallback note", MemoryCategory::Core, + None, ) .await .unwrap(); - let entries = memory.recall("auth", 5).await.unwrap(); + let entries = memory.recall("auth", 5, None).await.unwrap(); assert!(entries .iter() diff --git a/src/memory/response_cache.rs b/src/memory/response_cache.rs index a260aa7..62fae6c 100644 --- a/src/memory/response_cache.rs +++ b/src/memory/response_cache.rs @@ -7,10 +7,10 @@ use anyhow::Result; use chrono::{Duration, Local}; +use parking_lot::Mutex; use rusqlite::{params, Connection}; use sha2::{Digest, Sha256}; use std::path::{Path, PathBuf}; -use parking_lot::Mutex; /// Response cache backed by a dedicated SQLite database. /// diff --git a/src/memory/sqlite.rs b/src/memory/sqlite.rs index 46a98db..b0addeb 100644 --- a/src/memory/sqlite.rs +++ b/src/memory/sqlite.rs @@ -3,9 +3,9 @@ use super::traits::{Memory, MemoryCategory, MemoryEntry}; use super::vector; use async_trait::async_trait; use chrono::Local; +use parking_lot::Mutex; use rusqlite::{params, Connection}; use std::path::{Path, PathBuf}; -use parking_lot::Mutex; use std::sync::Arc; use uuid::Uuid; @@ -186,10 +186,7 @@ impl SqliteMemory { // Check cache { - let conn = self - .conn - .lock() - .map_err(|e| anyhow::anyhow!("Lock error: {e}"))?; + let conn = self.conn.lock(); let mut stmt = conn.prepare("SELECT embedding FROM embedding_cache WHERE content_hash = ?1")?; @@ -211,10 +208,7 @@ impl SqliteMemory { // Store in cache + LRU eviction { - let conn = self - .conn - .lock() - .map_err(|e| anyhow::anyhow!("Lock error: {e}"))?; + let conn = self.conn.lock(); conn.execute( "INSERT OR REPLACE INTO embedding_cache (content_hash, embedding, created_at, accessed_at) @@ -316,10 +310,7 @@ impl SqliteMemory { pub async fn reindex(&self) -> anyhow::Result { // Step 1: Rebuild FTS5 { - let conn = self - .conn - .lock() - .map_err(|e| anyhow::anyhow!("Lock error: {e}"))?; + let conn = self.conn.lock(); conn.execute_batch("INSERT INTO memories_fts(memories_fts) VALUES('rebuild');")?; } @@ -330,10 +321,7 @@ impl SqliteMemory { } let entries: Vec<(String, String)> = { - let conn = self - .conn - .lock() - .map_err(|e| anyhow::anyhow!("Lock error: {e}"))?; + let conn = self.conn.lock(); let mut stmt = conn.prepare("SELECT id, content FROM memories WHERE embedding IS NULL")?; @@ -347,10 +335,7 @@ impl SqliteMemory { for (id, content) in &entries { if let Ok(Some(emb)) = self.get_or_compute_embedding(content).await { let bytes = vector::vec_to_bytes(&emb); - let conn = self - .conn - .lock() - .map_err(|e| anyhow::anyhow!("Lock error: {e}"))?; + let conn = self.conn.lock(); conn.execute( "UPDATE memories SET embedding = ?1 WHERE id = ?2", params![bytes, id], @@ -382,10 +367,7 @@ impl Memory for SqliteMemory { .await? .map(|emb| vector::vec_to_bytes(&emb)); - let conn = self - .conn - .lock() - .map_err(|e| anyhow::anyhow!("Lock error: {e}"))?; + let conn = self.conn.lock(); let now = Local::now().to_rfc3339(); let cat = Self::category_to_str(&category); let id = Uuid::new_v4().to_string(); @@ -418,10 +400,7 @@ impl Memory for SqliteMemory { // Compute query embedding (async, before lock) let query_embedding = self.get_or_compute_embedding(query).await?; - let conn = self - .conn - .lock() - .map_err(|e| anyhow::anyhow!("Lock error: {e}"))?; + let conn = self.conn.lock(); // FTS5 BM25 keyword search let keyword_results = Self::fts5_search(&conn, query, limit * 2).unwrap_or_default(); @@ -540,10 +519,7 @@ impl Memory for SqliteMemory { } async fn get(&self, key: &str) -> anyhow::Result> { - let conn = self - .conn - .lock() - .map_err(|e| anyhow::anyhow!("Lock error: {e}"))?; + let conn = self.conn.lock(); let mut stmt = conn.prepare( "SELECT id, key, content, category, created_at, session_id FROM memories WHERE key = ?1", @@ -572,10 +548,7 @@ impl Memory for SqliteMemory { category: Option<&MemoryCategory>, session_id: Option<&str>, ) -> anyhow::Result> { - let conn = self - .conn - .lock() - .map_err(|e| anyhow::anyhow!("Lock error: {e}"))?; + let conn = self.conn.lock(); let mut results = Vec::new(); @@ -628,29 +601,20 @@ impl Memory for SqliteMemory { } async fn forget(&self, key: &str) -> anyhow::Result { - let conn = self - .conn - .lock() - .map_err(|e| anyhow::anyhow!("Lock error: {e}"))?; + let conn = self.conn.lock(); let affected = conn.execute("DELETE FROM memories WHERE key = ?1", params![key])?; Ok(affected > 0) } async fn count(&self) -> anyhow::Result { - let conn = self - .conn - .lock() - .map_err(|e| anyhow::anyhow!("Lock error: {e}"))?; + let conn = self.conn.lock(); let count: i64 = conn.query_row("SELECT COUNT(*) FROM memories", [], |row| row.get(0))?; #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)] Ok(count as usize) } async fn health_check(&self) -> bool { - self.conn - .lock() - .map(|c| c.execute_batch("SELECT 1").is_ok()) - .unwrap_or(false) + self.conn.lock().execute_batch("SELECT 1").is_ok() } } diff --git a/src/providers/compatible.rs b/src/providers/compatible.rs index d17d309..eebdcc5 100644 --- a/src/providers/compatible.rs +++ b/src/providers/compatible.rs @@ -688,8 +688,8 @@ impl Provider for OpenAiCompatibleProvider { temperature: f64, options: StreamOptions, ) -> stream::BoxStream<'static, StreamResult> { - let api_key = match self.api_key.as_ref() { - Some(key) => key.clone(), + let credential = match self.credential.as_ref() { + Some(value) => value.clone(), None => { let provider_name = self.name.clone(); return stream::once(async move { @@ -735,10 +735,10 @@ impl Provider for OpenAiCompatibleProvider { // Apply auth header req_builder = match &auth_header { AuthStyle::Bearer => { - req_builder.header("Authorization", format!("Bearer {}", api_key)) + req_builder.header("Authorization", format!("Bearer {}", credential)) } - AuthStyle::XApiKey => req_builder.header("x-api-key", &api_key), - AuthStyle::Custom(header) => req_builder.header(header, &api_key), + AuthStyle::XApiKey => req_builder.header("x-api-key", &credential), + AuthStyle::Custom(header) => req_builder.header(header, &credential), }; // Set accept header for streaming diff --git a/src/providers/reliable.rs b/src/providers/reliable.rs index 32cc0ca..be4818c 100644 --- a/src/providers/reliable.rs +++ b/src/providers/reliable.rs @@ -767,7 +767,7 @@ mod tests { .unwrap(); assert_eq!(result, "ok from sonnet"); - let seen = mock.models_seen.lock().unwrap(); + let seen = mock.models_seen.lock(); assert_eq!(seen.len(), 2); assert_eq!(seen[0], "claude-opus"); assert_eq!(seen[1], "claude-sonnet"); @@ -802,7 +802,7 @@ mod tests { .expect_err("all models should fail"); assert!(err.to_string().contains("All providers/models failed")); - let seen = mock.models_seen.lock().unwrap(); + let seen = mock.models_seen.lock(); assert_eq!(seen.len(), 3); } diff --git a/src/providers/traits.rs b/src/providers/traits.rs index 380bbc5..1bb296b 100644 --- a/src/providers/traits.rs +++ b/src/providers/traits.rs @@ -76,6 +76,13 @@ pub struct ChatRequest<'a> { pub tools: Option<&'a [ToolSpec]>, } +/// Declares optional provider features. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct ProviderCapabilities { + /// Provider can perform native tool calling without prompt-level emulation. + pub native_tool_calling: bool, +} + /// A tool result to feed back to the LLM. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ToolResultMessage { @@ -319,11 +326,11 @@ pub trait Provider: Send + Sync { _temperature: f64, _options: StreamOptions, ) -> stream::BoxStream<'static, StreamResult> { - let system = messages + let _system = messages .iter() .find(|m| m.role == "system") .map(|m| m.content.clone()); - let last_user = messages + let _last_user = messages .iter() .rfind(|m| m.role == "user") .map(|m| m.content.clone()) diff --git a/src/security/audit.rs b/src/security/audit.rs index 7874450..5eb2b42 100644 --- a/src/security/audit.rs +++ b/src/security/audit.rs @@ -3,11 +3,11 @@ use crate::config::AuditConfig; use anyhow::Result; use chrono::{DateTime, Utc}; +use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use std::fs::OpenOptions; use std::io::Write; use std::path::PathBuf; -use parking_lot::Mutex; use uuid::Uuid; /// Audit event types