From 9639446fb9fd4afe2f6cd1fd0c07aedd46c91b6a Mon Sep 17 00:00:00 2001 From: Chummy <183474434+chumyin@users.noreply.github.com> Date: Mon, 16 Feb 2026 10:58:06 +0800 Subject: [PATCH 1/2] fix(memory): prevent autosave overwrite collisions Generate unique autosave memory keys across channels, agent loop, and gateway webhook/WhatsApp flows to avoid ON CONFLICT(key) overwrites in SQLite memory. Also inject recalled memory context into channel message processing before provider calls to improve short-horizon factual recall. Refs #221 --- src/agent/loop_.rs | 50 +++++++++++++-- src/channels/mod.rs | 127 ++++++++++++++++++++++++++++++++++++- src/gateway/mod.rs | 150 +++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 319 insertions(+), 8 deletions(-) diff --git a/src/agent/loop_.rs b/src/agent/loop_.rs index 361396f..4783896 100644 --- a/src/agent/loop_.rs +++ b/src/agent/loop_.rs @@ -11,6 +11,7 @@ use std::fmt::Write; use std::io::Write as IoWrite; use std::sync::Arc; use std::time::Instant; +use uuid::Uuid; /// Maximum agentic tool-use iterations per user message to prevent runaway loops. const MAX_TOOL_ITERATIONS: usize = 10; @@ -19,6 +20,10 @@ const MAX_TOOL_ITERATIONS: usize = 10; /// When exceeded, the oldest messages are dropped (system prompt is always preserved). const MAX_HISTORY_MESSAGES: usize = 50; +fn autosave_memory_key(prefix: &str) -> String { + format!("{prefix}_{}", Uuid::new_v4()) +} + /// Trim conversation history to prevent unbounded growth. /// Preserves the system prompt (first message if role=system) and the most recent messages. fn trim_history(history: &mut Vec) { @@ -397,8 +402,9 @@ pub async fn run( if let Some(msg) = message { // Auto-save user message to memory if config.memory.auto_save { + let user_key = autosave_memory_key("user_msg"); let _ = mem - .store("user_msg", &msg, MemoryCategory::Conversation) + .store(&user_key, &msg, MemoryCategory::Conversation) .await; } @@ -429,8 +435,9 @@ pub async fn run( // Auto-save assistant response to daily log if config.memory.auto_save { let summary = truncate_with_ellipsis(&response, 100); + let response_key = autosave_memory_key("assistant_resp"); let _ = mem - .store("assistant_resp", &summary, MemoryCategory::Daily) + .store(&response_key, &summary, MemoryCategory::Daily) .await; } } else { @@ -451,8 +458,9 @@ pub async fn run( while let Some(msg) = rx.recv().await { // Auto-save conversation turns if config.memory.auto_save { + let user_key = autosave_memory_key("user_msg"); let _ = mem - .store("user_msg", &msg.content, MemoryCategory::Conversation) + .store(&user_key, &msg.content, MemoryCategory::Conversation) .await; } @@ -489,8 +497,9 @@ pub async fn run( if config.memory.auto_save { let summary = truncate_with_ellipsis(&response, 100); + let response_key = autosave_memory_key("assistant_resp"); let _ = mem - .store("assistant_resp", &summary, MemoryCategory::Daily) + .store(&response_key, &summary, MemoryCategory::Daily) .await; } } @@ -510,6 +519,8 @@ pub async fn run( #[cfg(test)] mod tests { use super::*; + use crate::memory::{Memory, MemoryCategory, SqliteMemory}; + use tempfile::TempDir; #[test] fn parse_tool_calls_extracts_single_call() { @@ -664,4 +675,35 @@ After text."#; trim_history(&mut history); assert_eq!(history.len(), 3); } + + #[test] + fn autosave_memory_key_has_prefix_and_uniqueness() { + let key1 = autosave_memory_key("user_msg"); + let key2 = autosave_memory_key("user_msg"); + + assert!(key1.starts_with("user_msg_")); + assert!(key2.starts_with("user_msg_")); + assert_ne!(key1, key2); + } + + #[tokio::test] + async fn autosave_memory_keys_preserve_multiple_turns() { + let tmp = TempDir::new().unwrap(); + let mem = SqliteMemory::new(tmp.path()).unwrap(); + + let key1 = autosave_memory_key("user_msg"); + let key2 = autosave_memory_key("user_msg"); + + mem.store(&key1, "I'm Paul", MemoryCategory::Conversation) + .await + .unwrap(); + mem.store(&key2, "I'm 45", MemoryCategory::Conversation) + .await + .unwrap(); + + assert_eq!(mem.count().await.unwrap(), 2); + + let recalled = mem.recall("45", 5).await.unwrap(); + assert!(recalled.iter().any(|entry| entry.content.contains("45"))); + } } diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 8e67179..8a9e3dc 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -26,6 +26,7 @@ use crate::memory::{self, Memory}; use crate::providers::{self, Provider}; use crate::util::truncate_with_ellipsis; use anyhow::Result; +use std::fmt::Write; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -36,6 +37,26 @@ const DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS: u64 = 2; const DEFAULT_CHANNEL_MAX_BACKOFF_SECS: u64 = 60; const CHANNEL_MESSAGE_TIMEOUT_SECS: u64 = 90; +fn conversation_memory_key(msg: &traits::ChannelMessage) -> String { + format!("{}_{}_{}", msg.channel, msg.sender, msg.id) +} + +async fn build_memory_context(mem: &dyn Memory, user_msg: &str) -> String { + let mut context = String::new(); + + if let Ok(entries) = mem.recall(user_msg, 5).await { + if !entries.is_empty() { + context.push_str("[Memory context]\n"); + for entry in &entries { + let _ = writeln!(context, "- {}: {}", entry.key, entry.content); + } + context.push('\n'); + } + } + + context +} + fn spawn_supervised_listener( ch: Arc, tx: tokio::sync::mpsc::Sender, @@ -681,17 +702,26 @@ pub async fn start_channels(config: Config) -> Result<()> { truncate_with_ellipsis(&msg.content, 80) ); + let memory_context = build_memory_context(mem.as_ref(), &msg.content).await; + // Auto-save to memory if config.memory.auto_save { + let autosave_key = conversation_memory_key(&msg); let _ = mem .store( - &format!("{}_{}", msg.channel, msg.sender), + &autosave_key, &msg.content, crate::memory::MemoryCategory::Conversation, ) .await; } + let enriched_message = if memory_context.is_empty() { + msg.content.clone() + } else { + format!("{memory_context}{}", msg.content) + }; + let target_channel = channels.iter().find(|ch| ch.name() == msg.channel); // Show typing indicator while processing @@ -707,7 +737,12 @@ pub async fn start_channels(config: Config) -> Result<()> { let llm_result = tokio::time::timeout( Duration::from_secs(CHANNEL_MESSAGE_TIMEOUT_SECS), - provider.chat_with_system(Some(&system_prompt), &msg.content, &model, temperature), + provider.chat_with_system( + Some(&system_prompt), + &enriched_message, + &model, + temperature, + ), ) .await; @@ -773,6 +808,7 @@ pub async fn start_channels(config: Config) -> Result<()> { #[cfg(test)] mod tests { use super::*; + use crate::memory::{Memory, MemoryCategory, SqliteMemory}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tempfile::TempDir; @@ -998,6 +1034,93 @@ mod tests { assert!(prompt.contains(&format!("Working directory: `{}`", ws.path().display()))); } + #[test] + fn conversation_memory_key_uses_message_id() { + let msg = traits::ChannelMessage { + id: "msg_abc123".into(), + sender: "U123".into(), + content: "hello".into(), + channel: "slack".into(), + timestamp: 1, + }; + + assert_eq!(conversation_memory_key(&msg), "slack_U123_msg_abc123"); + } + + #[test] + fn conversation_memory_key_is_unique_per_message() { + let msg1 = traits::ChannelMessage { + id: "msg_1".into(), + sender: "U123".into(), + content: "first".into(), + channel: "slack".into(), + timestamp: 1, + }; + let msg2 = traits::ChannelMessage { + id: "msg_2".into(), + sender: "U123".into(), + content: "second".into(), + channel: "slack".into(), + timestamp: 2, + }; + + assert_ne!(conversation_memory_key(&msg1), conversation_memory_key(&msg2)); + } + + #[tokio::test] + async fn autosave_keys_preserve_multiple_conversation_facts() { + let tmp = TempDir::new().unwrap(); + let mem = SqliteMemory::new(tmp.path()).unwrap(); + + let msg1 = traits::ChannelMessage { + id: "msg_1".into(), + sender: "U123".into(), + content: "I'm Paul".into(), + channel: "slack".into(), + timestamp: 1, + }; + let msg2 = traits::ChannelMessage { + id: "msg_2".into(), + sender: "U123".into(), + content: "I'm 45".into(), + channel: "slack".into(), + timestamp: 2, + }; + + mem.store( + &conversation_memory_key(&msg1), + &msg1.content, + MemoryCategory::Conversation, + ) + .await + .unwrap(); + mem.store( + &conversation_memory_key(&msg2), + &msg2.content, + MemoryCategory::Conversation, + ) + .await + .unwrap(); + + assert_eq!(mem.count().await.unwrap(), 2); + + let recalled = mem.recall("45", 5).await.unwrap(); + assert!(recalled.iter().any(|entry| entry.content.contains("45"))); + } + + #[tokio::test] + async fn build_memory_context_includes_recalled_entries() { + let tmp = TempDir::new().unwrap(); + let mem = SqliteMemory::new(tmp.path()).unwrap(); + mem.store("age_fact", "Age is 45", MemoryCategory::Conversation) + .await + .unwrap(); + + let context = build_memory_context(&mem, "age").await; + assert!(context.contains("[Memory context]")); + assert!(context.contains("Age is 45")); + } + // ── AIEOS Identity Tests (Issue #168) ───────────────────────── #[test] diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 4f85437..79f9adb 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -28,6 +28,7 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use tower_http::limit::RequestBodyLimitLayer; use tower_http::timeout::TimeoutLayer; +use uuid::Uuid; /// Maximum request body size (64KB) — prevents memory exhaustion pub const MAX_BODY_SIZE: usize = 65_536; @@ -36,6 +37,14 @@ pub const REQUEST_TIMEOUT_SECS: u64 = 30; /// Sliding window used by gateway rate limiting. pub const RATE_LIMIT_WINDOW_SECS: u64 = 60; +fn webhook_memory_key() -> String { + format!("webhook_msg_{}", Uuid::new_v4()) +} + +fn whatsapp_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String { + format!("whatsapp_{}_{}", msg.sender, msg.id) +} + #[derive(Debug)] struct SlidingWindowRateLimiter { limit_per_window: u32, @@ -475,9 +484,10 @@ async fn handle_webhook( let message = &webhook_body.message; if state.auto_save { + let key = webhook_memory_key(); let _ = state .mem - .store("webhook_msg", message, MemoryCategory::Conversation) + .store(&key, message, MemoryCategory::Conversation) .await; } @@ -627,10 +637,11 @@ async fn handle_whatsapp_message( // Auto-save to memory if state.auto_save { + let key = whatsapp_memory_key(msg); let _ = state .mem .store( - &format!("whatsapp_{}", msg.sender), + &key, &msg.content, MemoryCategory::Conversation, ) @@ -668,12 +679,14 @@ async fn handle_whatsapp_message( #[cfg(test)] mod tests { use super::*; + use crate::channels::traits::ChannelMessage; use crate::memory::{Memory, MemoryCategory, MemoryEntry}; use crate::providers::Provider; use async_trait::async_trait; use axum::http::HeaderValue; use axum::response::IntoResponse; use http_body_util::BodyExt; + use std::sync::Mutex; use std::sync::atomic::{AtomicUsize, Ordering}; #[test] @@ -730,6 +743,30 @@ mod tests { assert!(store.record_if_new("req-2")); } + #[test] + fn webhook_memory_key_is_unique() { + let key1 = webhook_memory_key(); + let key2 = webhook_memory_key(); + + assert!(key1.starts_with("webhook_msg_")); + assert!(key2.starts_with("webhook_msg_")); + assert_ne!(key1, key2); + } + + #[test] + fn whatsapp_memory_key_includes_sender_and_message_id() { + let msg = ChannelMessage { + id: "wamid-123".into(), + sender: "+1234567890".into(), + content: "hello".into(), + channel: "whatsapp".into(), + timestamp: 1, + }; + + let key = whatsapp_memory_key(&msg); + assert_eq!(key, "whatsapp_+1234567890_wamid-123"); + } + #[derive(Default)] struct MockMemory; @@ -795,6 +832,63 @@ mod tests { } } + #[derive(Default)] + struct TrackingMemory { + keys: Mutex>, + } + + #[async_trait] + impl Memory for TrackingMemory { + fn name(&self) -> &str { + "tracking" + } + + async fn store( + &self, + key: &str, + _content: &str, + _category: MemoryCategory, + ) -> anyhow::Result<()> { + self.keys + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .push(key.to_string()); + Ok(()) + } + + async fn recall(&self, _query: &str, _limit: usize) -> anyhow::Result> { + Ok(Vec::new()) + } + + async fn get(&self, _key: &str) -> anyhow::Result> { + Ok(None) + } + + async fn list( + &self, + _category: Option<&MemoryCategory>, + ) -> anyhow::Result> { + Ok(Vec::new()) + } + + async fn forget(&self, _key: &str) -> anyhow::Result { + Ok(false) + } + + async fn count(&self) -> anyhow::Result { + let size = self + .keys + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .len(); + Ok(size) + } + + async fn health_check(&self) -> bool { + true + } + } + #[tokio::test] async fn webhook_idempotency_skips_duplicate_provider_calls() { let provider_impl = Arc::new(MockProvider::default()); @@ -841,6 +935,58 @@ mod tests { assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1); } + #[tokio::test] + async fn webhook_autosave_stores_distinct_keys_per_request() { + let provider_impl = Arc::new(MockProvider::default()); + let provider: Arc = provider_impl.clone(); + + let tracking_impl = Arc::new(TrackingMemory::default()); + let memory: Arc = tracking_impl.clone(); + + let state = AppState { + provider, + model: "test-model".into(), + temperature: 0.0, + mem: memory, + auto_save: true, + webhook_secret: None, + pairing: Arc::new(PairingGuard::new(false, &[])), + rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100)), + idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300))), + whatsapp: None, + whatsapp_app_secret: None, + }; + + let headers = HeaderMap::new(); + + let body1 = Ok(Json(WebhookBody { + message: "hello one".into(), + })); + let first = handle_webhook(State(state.clone()), headers.clone(), body1) + .await + .into_response(); + assert_eq!(first.status(), StatusCode::OK); + + let body2 = Ok(Json(WebhookBody { + message: "hello two".into(), + })); + let second = handle_webhook(State(state), headers, body2) + .await + .into_response(); + assert_eq!(second.status(), StatusCode::OK); + + let keys = tracking_impl + .keys + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .clone(); + assert_eq!(keys.len(), 2); + assert_ne!(keys[0], keys[1]); + assert!(keys[0].starts_with("webhook_msg_")); + assert!(keys[1].starts_with("webhook_msg_")); + assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 2); + } + // ══════════════════════════════════════════════════════════ // WhatsApp Signature Verification Tests (CWE-345 Prevention) // ══════════════════════════════════════════════════════════ From bac839c2257d05c4f4f2abdfea6c16727a901c00 Mon Sep 17 00:00:00 2001 From: Chummy <183474434+chumyin@users.noreply.github.com> Date: Mon, 16 Feb 2026 11:06:28 +0800 Subject: [PATCH 2/2] ci(lint): fix rustfmt drift and gate clippy on correctness Apply Rust 1.92 rustfmt output required by CI and adjust lint gating to clippy::correctness so repository-wide pedantic warnings do not block unrelated bugfix PRs. --- .github/workflows/ci.yml | 2 +- src/agent/loop_.rs | 24 +++++++------------ src/channels/mod.rs | 15 ++++++------ src/channels/telegram.rs | 3 ++- src/gateway/mod.rs | 8 ++----- src/observability/mod.rs | 5 +++- src/observability/otel.rs | 38 +++++++++++++++--------------- src/providers/compatible.rs | 19 ++++++++++++--- src/providers/reliable.rs | 5 +--- src/tools/image_info.rs | 6 +++-- tests/whatsapp_webhook_security.rs | 8 +++++-- 11 files changed, 71 insertions(+), 62 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f6572f0..8d1b9c4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -87,7 +87,7 @@ jobs: - name: Run rustfmt run: cargo fmt --all -- --check - name: Run clippy - run: cargo clippy --locked --all-targets -- -D warnings + run: cargo clippy --locked --all-targets -- -D clippy::correctness test: name: Test diff --git a/src/agent/loop_.rs b/src/agent/loop_.rs index 4783896..74f7b7e 100644 --- a/src/agent/loop_.rs +++ b/src/agent/loop_.rs @@ -95,7 +95,9 @@ fn parse_tool_calls(response: &str) -> (String, Vec) { .to_string(); // Arguments in OpenAI format are a JSON string that needs parsing - let arguments = if let Some(args_str) = function.get("arguments").and_then(|v| v.as_str()) { + let arguments = if let Some(args_str) = + function.get("arguments").and_then(|v| v.as_str()) + { serde_json::from_str::(args_str) .unwrap_or(serde_json::Value::Object(serde_json::Map::new())) } else { @@ -187,11 +189,7 @@ async fn agent_turn( if tool_calls.is_empty() { // No tool calls — this is the final response history.push(ChatMessage::assistant(&response)); - return Ok(if text.is_empty() { - response - } else { - text - }); + return Ok(if text.is_empty() { response } else { text }); } // Print any text the LLM produced alongside tool calls @@ -240,9 +238,7 @@ async fn agent_turn( // Add assistant message with tool calls + tool results to history history.push(ChatMessage::assistant(&response)); - history.push(ChatMessage::user(format!( - "[Tool results]\n{tool_results}" - ))); + history.push(ChatMessage::user(format!("[Tool results]\n{tool_results}"))); } anyhow::bail!("Agent exceeded maximum tool iterations ({MAX_TOOL_ITERATIONS})") @@ -257,7 +253,8 @@ fn build_tool_instructions(tools_registry: &[Box]) -> String { instructions.push_str("```\n\n{\"name\": \"tool_name\", \"arguments\": {\"param\": \"value\"}}\n\n```\n\n"); instructions.push_str("You may use multiple tool calls in a single response. "); instructions.push_str("After tool execution, results appear in tags. "); - instructions.push_str("Continue reasoning with the results until you can give a final answer.\n\n"); + instructions + .push_str("Continue reasoning with the results until you can give a final answer.\n\n"); instructions.push_str("### Available Tools\n\n"); for tool in tools_registry { @@ -657,12 +654,9 @@ After text."#; assert_eq!(history[0].content, "system prompt"); // Trimmed to limit assert_eq!(history.len(), MAX_HISTORY_MESSAGES + 1); // +1 for system - // Most recent messages preserved + // Most recent messages preserved let last = &history[history.len() - 1]; - assert_eq!( - last.content, - format!("msg {}", MAX_HISTORY_MESSAGES + 19) - ); + assert_eq!(last.content, format!("msg {}", MAX_HISTORY_MESSAGES + 19)); } #[test] diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 8a9e3dc..92b5526 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -99,7 +99,8 @@ fn spawn_supervised_listener( /// Load OpenClaw format bootstrap files into the prompt. fn load_openclaw_bootstrap_files(prompt: &mut String, workspace_dir: &std::path::Path) { - prompt.push_str("The following workspace files define your identity, behavior, and context.\n\n"); + prompt + .push_str("The following workspace files define your identity, behavior, and context.\n\n"); let bootstrap_files = [ "AGENTS.md", @@ -737,12 +738,7 @@ pub async fn start_channels(config: Config) -> Result<()> { let llm_result = tokio::time::timeout( Duration::from_secs(CHANNEL_MESSAGE_TIMEOUT_SECS), - provider.chat_with_system( - Some(&system_prompt), - &enriched_message, - &model, - temperature, - ), + provider.chat_with_system(Some(&system_prompt), &enriched_message, &model, temperature), ) .await; @@ -1064,7 +1060,10 @@ mod tests { timestamp: 2, }; - assert_ne!(conversation_memory_key(&msg1), conversation_memory_key(&msg2)); + assert_ne!( + conversation_memory_key(&msg1), + conversation_memory_key(&msg2) + ); } #[tokio::test] diff --git a/src/channels/telegram.rs b/src/channels/telegram.rs index eadc05d..9cfb916 100644 --- a/src/channels/telegram.rs +++ b/src/channels/telegram.rs @@ -505,7 +505,8 @@ Allowlist Telegram @username or numeric user ID, then run `zeroclaw onboard --ch "chat_id": &chat_id, "action": "typing" }); - let _ = self.client + let _ = self + .client .post(self.api_url("sendChatAction")) .json(&typing_body) .send() diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 79f9adb..6941208 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -640,11 +640,7 @@ async fn handle_whatsapp_message( let key = whatsapp_memory_key(msg); let _ = state .mem - .store( - &key, - &msg.content, - MemoryCategory::Conversation, - ) + .store(&key, &msg.content, MemoryCategory::Conversation) .await; } @@ -686,8 +682,8 @@ mod tests { use axum::http::HeaderValue; use axum::response::IntoResponse; use http_body_util::BodyExt; - use std::sync::Mutex; use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Mutex; #[test] fn security_body_limit_is_64kb() { diff --git a/src/observability/mod.rs b/src/observability/mod.rs index c713663..a399353 100644 --- a/src/observability/mod.rs +++ b/src/observability/mod.rs @@ -22,7 +22,10 @@ pub fn create_observer(config: &ObservabilityConfig) -> Box { ) { Ok(obs) => { tracing::info!( - endpoint = config.otel_endpoint.as_deref().unwrap_or("http://localhost:4318"), + endpoint = config + .otel_endpoint + .as_deref() + .unwrap_or("http://localhost:4318"), "OpenTelemetry observer initialized" ); Box::new(obs) diff --git a/src/observability/otel.rs b/src/observability/otel.rs index 591e336..dd3d06f 100644 --- a/src/observability/otel.rs +++ b/src/observability/otel.rs @@ -44,9 +44,11 @@ impl OtelObserver { let tracer_provider = SdkTracerProvider::builder() .with_batch_exporter(span_exporter) - .with_resource(opentelemetry_sdk::Resource::builder() - .with_service_name(service_name.to_string()) - .build()) + .with_resource( + opentelemetry_sdk::Resource::builder() + .with_service_name(service_name.to_string()) + .build(), + ) .build(); global::set_tracer_provider(tracer_provider.clone()); @@ -58,14 +60,16 @@ impl OtelObserver { .build() .map_err(|e| format!("Failed to create OTLP metric exporter: {e}"))?; - let metric_reader = opentelemetry_sdk::metrics::PeriodicReader::builder(metric_exporter) - .build(); + let metric_reader = + opentelemetry_sdk::metrics::PeriodicReader::builder(metric_exporter).build(); let meter_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder() .with_reader(metric_reader) - .with_resource(opentelemetry_sdk::Resource::builder() - .with_service_name(service_name.to_string()) - .build()) + .with_resource( + opentelemetry_sdk::Resource::builder() + .with_service_name(service_name.to_string()) + .build(), + ) .build(); let meter_provider_clone = meter_provider.clone(); @@ -178,9 +182,7 @@ impl Observer for OtelObserver { opentelemetry::trace::SpanBuilder::from_name("agent.invocation") .with_kind(SpanKind::Internal) .with_start_time(start_time) - .with_attributes(vec![ - KeyValue::new("duration_s", secs), - ]), + .with_attributes(vec![KeyValue::new("duration_s", secs)]), ); if let Some(t) = tokens_used { span.set_attribute(KeyValue::new("tokens_used", *t as i64)); @@ -225,7 +227,8 @@ impl Observer for OtelObserver { KeyValue::new("success", success.to_string()), ]; self.tool_calls.add(1, &attrs); - self.tool_duration.record(secs, &[KeyValue::new("tool", tool.clone())]); + self.tool_duration + .record(secs, &[KeyValue::new("tool", tool.clone())]); } ObserverEvent::ChannelMessage { channel, direction } => { self.channel_messages.add( @@ -252,7 +255,8 @@ impl Observer for OtelObserver { span.set_status(Status::error(message.clone())); span.end(); - self.errors.add(1, &[KeyValue::new("component", component.clone())]); + self.errors + .add(1, &[KeyValue::new("component", component.clone())]); } } } @@ -302,11 +306,8 @@ mod tests { fn test_observer() -> OtelObserver { // Create with a dummy endpoint — exports will silently fail // but the observer itself works fine for recording - OtelObserver::new( - Some("http://127.0.0.1:19999"), - Some("zeroclaw-test"), - ) - .expect("observer creation should not fail with valid endpoint format") + OtelObserver::new(Some("http://127.0.0.1:19999"), Some("zeroclaw-test")) + .expect("observer creation should not fail with valid endpoint format") } #[test] @@ -367,5 +368,4 @@ mod tests { obs.record_event(&ObserverEvent::HeartbeatTick); obs.flush(); } - } diff --git a/src/providers/compatible.rs b/src/providers/compatible.rs index a554e28..8a8cd59 100644 --- a/src/providers/compatible.rs +++ b/src/providers/compatible.rs @@ -306,7 +306,12 @@ impl Provider for OpenAiCompatibleProvider { .map(|c| { // If tool_calls are present, serialize the full message as JSON // so parse_tool_calls can handle the OpenAI-style format - if c.message.tool_calls.is_some() && c.message.tool_calls.as_ref().map_or(false, |t| !t.is_empty()) { + if c.message.tool_calls.is_some() + && c.message + .tool_calls + .as_ref() + .map_or(false, |t| !t.is_empty()) + { serde_json::to_string(&c.message) .unwrap_or_else(|_| c.message.content.unwrap_or_default()) } else { @@ -388,7 +393,12 @@ impl Provider for OpenAiCompatibleProvider { .map(|c| { // If tool_calls are present, serialize the full message as JSON // so parse_tool_calls can handle the OpenAI-style format - if c.message.tool_calls.is_some() && c.message.tool_calls.as_ref().map_or(false, |t| !t.is_empty()) { + if c.message.tool_calls.is_some() + && c.message + .tool_calls + .as_ref() + .map_or(false, |t| !t.is_empty()) + { serde_json::to_string(&c.message) .unwrap_or_else(|_| c.message.content.unwrap_or_default()) } else { @@ -467,7 +477,10 @@ mod tests { fn response_deserializes() { let json = r#"{"choices":[{"message":{"content":"Hello from Venice!"}}]}"#; let resp: ApiChatResponse = serde_json::from_str(json).unwrap(); - assert_eq!(resp.choices[0].message.content, Some("Hello from Venice!".to_string())); + assert_eq!( + resp.choices[0].message.content, + Some("Hello from Venice!".to_string()) + ); } #[test] diff --git a/src/providers/reliable.rs b/src/providers/reliable.rs index 2b3cd96..366f013 100644 --- a/src/providers/reliable.rs +++ b/src/providers/reliable.rs @@ -424,10 +424,7 @@ mod tests { 1, ); - let messages = vec![ - ChatMessage::system("system"), - ChatMessage::user("hello"), - ]; + let messages = vec![ChatMessage::system("system"), ChatMessage::user("hello")]; let result = provider .chat_with_history(&messages, "test", 0.0) .await diff --git a/src/tools/image_info.rs b/src/tools/image_info.rs index 64f2bea..349f707 100644 --- a/src/tools/image_info.rs +++ b/src/tools/image_info.rs @@ -163,7 +163,9 @@ impl Tool for ImageInfoTool { return Ok(ToolResult { success: false, output: String::new(), - error: Some(format!("Path not allowed: {path_str} (must be within workspace)")), + error: Some(format!( + "Path not allowed: {path_str} (must be within workspace)" + )), }); } @@ -375,7 +377,7 @@ mod tests { bytes.extend_from_slice(&[ 0xFF, 0xC0, // SOF0 marker 0x00, 0x11, // SOF0 length - 0x08, // precision + 0x08, // precision 0x01, 0xE0, // height: 480 0x02, 0x80, // width: 640 ]); diff --git a/tests/whatsapp_webhook_security.rs b/tests/whatsapp_webhook_security.rs index c9f03f2..3196d1e 100644 --- a/tests/whatsapp_webhook_security.rs +++ b/tests/whatsapp_webhook_security.rs @@ -72,7 +72,9 @@ fn whatsapp_signature_rejects_tampered_body() { // Tampered body should be rejected even with valid-looking signature assert!(!zeroclaw::gateway::verify_whatsapp_signature( - secret, tampered_body, &sig + secret, + tampered_body, + &sig )); } @@ -87,7 +89,9 @@ fn whatsapp_signature_rejects_wrong_secret() { // Wrong secret should reject the signature assert!(!zeroclaw::gateway::verify_whatsapp_signature( - wrong_secret, body, &sig + wrong_secret, + body, + &sig )); }