From eba544dbd4bc008707c98db9635b9772f88e195b Mon Sep 17 00:00:00 2001 From: argenis de la rosa Date: Tue, 17 Feb 2026 14:01:37 -0500 Subject: [PATCH] feat(observability): implement Prometheus metrics backend with /metrics endpoint - Adds PrometheusObserver backend with counters, histograms, and gauges - Tracks agent starts/duration, tool calls, channel messages, heartbeat ticks, errors, request latency, tokens, sessions, queue depth - Adds GET /metrics endpoint to gateway for Prometheus scraping - Adds provider/model labels to AgentStart and AgentEnd events for better observability - Adds as_any() method to Observer trait for backend-specific downcast Metrics exposed: - zeroclaw_agent_starts_total (Counter) with provider/model labels - zeroclaw_agent_duration_seconds (Histogram) with provider/model labels - zeroclaw_tool_calls_total (Counter) with tool/success labels - zeroclaw_tool_duration_seconds (Histogram) with tool label - zeroclaw_channel_messages_total (Counter) with channel/direction labels - zeroclaw_heartbeat_ticks_total (Counter) - zeroclaw_errors_total (Counter) with component label - zeroclaw_request_latency_seconds (Histogram) - zeroclaw_tokens_used_last (Gauge) - zeroclaw_active_sessions (Gauge) - zeroclaw_queue_depth (Gauge) Co-Authored-By: Claude Opus 4.6 --- src/agent/agent.rs | 3 +- src/agent/loop_.rs | 3 +- src/gateway/mod.rs | 36 +++ src/observability/log.rs | 90 ++++---- src/observability/mod.rs | 73 ++---- src/observability/multi.rs | 9 + src/observability/noop.rs | 27 +-- src/observability/prometheus.rs | 387 ++++++++++++++++++++++++++++++++ src/observability/traits.rs | 115 ++-------- src/providers/compatible.rs | 36 +++ src/providers/reliable.rs | 24 +- 11 files changed, 575 insertions(+), 228 deletions(-) create mode 100644 src/observability/prometheus.rs diff --git a/src/agent/agent.rs b/src/agent/agent.rs index 3e5693e..ec2943c 100644 --- a/src/agent/agent.rs +++ b/src/agent/agent.rs @@ -556,9 +556,10 @@ pub async fn run( } agent.observer.record_event(&ObserverEvent::AgentEnd { + provider: "cli".to_string(), + model: "unknown".to_string(), duration: start.elapsed(), tokens_used: None, - cost_usd: None, }); Ok(()) diff --git a/src/agent/loop_.rs b/src/agent/loop_.rs index 54deb06..1a0b216 100644 --- a/src/agent/loop_.rs +++ b/src/agent/loop_.rs @@ -1332,9 +1332,10 @@ pub async fn run( let duration = start.elapsed(); observer.record_event(&ObserverEvent::AgentEnd { + provider: provider_name.to_string(), + model: model_name.to_string(), duration, tokens_used: None, - cost_usd: None, }); Ok(final_output) diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index a1ac097..af99c69 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -274,6 +274,8 @@ pub struct AppState { pub whatsapp: Option>, /// `WhatsApp` app secret for webhook signature verification (`X-Hub-Signature-256`) pub whatsapp_app_secret: Option>, + /// Observability backend for metrics scraping + pub observer: Arc, } /// Run the HTTP gateway using axum with proper HTTP/1.1 compliance. @@ -433,6 +435,7 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> { println!(" POST /whatsapp — WhatsApp message webhook"); } println!(" GET /health — health check"); + println!(" GET /metrics — Prometheus metrics"); if let Some(code) = pairing.pairing_code() { println!(); println!(" 🔐 PAIRING REQUIRED — use this one-time code:"); @@ -450,6 +453,9 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> { crate::health::mark_component_ok("gateway"); // Build shared state + let observer: Arc = + Arc::from(crate::observability::create_observer(&config.observability)); + let state = AppState { config: config_state, provider, @@ -464,11 +470,13 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> { idempotency_store, whatsapp: whatsapp_channel, whatsapp_app_secret, + observer, }; // Build router with middleware let app = Router::new() .route("/health", get(handle_health)) + .route("/metrics", get(handle_metrics)) .route("/pair", post(handle_pair)) .route("/webhook", post(handle_webhook)) .route("/whatsapp", get(handle_whatsapp_verify)) @@ -504,6 +512,29 @@ async fn handle_health(State(state): State) -> impl IntoResponse { Json(body) } +/// Prometheus content type for text exposition format. +const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8"; + +/// GET /metrics — Prometheus text exposition format +async fn handle_metrics(State(state): State) -> impl IntoResponse { + let body = if let Some(prom) = state + .observer + .as_ref() + .as_any() + .downcast_ref::() + { + prom.encode() + } else { + String::from("# Prometheus backend not enabled. Set [observability] backend = \"prometheus\" in config.\n") + }; + + ( + StatusCode::OK, + [(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)], + body, + ) +} + /// POST /pair — exchange one-time code for bearer token async fn handle_pair( State(state): State, @@ -1247,6 +1278,7 @@ mod tests { idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, + observer: Arc::new(crate::observability::NoopObserver), }; let mut headers = HeaderMap::new(); @@ -1302,6 +1334,7 @@ mod tests { idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, + observer: Arc::new(crate::observability::NoopObserver), }; let headers = HeaderMap::new(); @@ -1366,6 +1399,7 @@ mod tests { idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, + observer: Arc::new(crate::observability::NoopObserver), }; let response = handle_webhook( @@ -1403,6 +1437,7 @@ mod tests { idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, + observer: Arc::new(crate::observability::NoopObserver), }; let mut headers = HeaderMap::new(); @@ -1443,6 +1478,7 @@ mod tests { idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, + observer: Arc::new(crate::observability::NoopObserver), }; let mut headers = HeaderMap::new(); diff --git a/src/observability/log.rs b/src/observability/log.rs index b932fe0..c354c4e 100644 --- a/src/observability/log.rs +++ b/src/observability/log.rs @@ -1,4 +1,5 @@ use super::traits::{Observer, ObserverEvent, ObserverMetric}; +use std::any::Any; use tracing::info; /// Log-based observer — uses tracing, zero external deps @@ -16,6 +17,38 @@ impl Observer for LogObserver { ObserverEvent::AgentStart { provider, model } => { info!(provider = %provider, model = %model, "agent.start"); } + ObserverEvent::AgentEnd { + provider, + model, + duration, + tokens_used, + } => { + let ms = u64::try_from(duration.as_millis()).unwrap_or(u64::MAX); + info!(provider = %provider, model = %model, duration_ms = ms, tokens = ?tokens_used, "agent.end"); + } + ObserverEvent::ToolCallStart { tool } => { + info!(tool = %tool, "tool.start"); + } + ObserverEvent::ToolCall { + tool, + duration, + success, + } => { + let ms = u64::try_from(duration.as_millis()).unwrap_or(u64::MAX); + info!(tool = %tool, duration_ms = ms, success = success, "tool.call"); + } + ObserverEvent::TurnComplete => { + info!("turn.complete"); + } + ObserverEvent::ChannelMessage { channel, direction } => { + info!(channel = %channel, direction = %direction, "channel.message"); + } + ObserverEvent::HeartbeatTick => { + info!("heartbeat.tick"); + } + ObserverEvent::Error { component, message } => { + info!(component = %component, error = %message, "error"); + } ObserverEvent::LlmRequest { provider, model, @@ -45,37 +78,6 @@ impl Observer for LogObserver { "llm.response" ); } - ObserverEvent::AgentEnd { - duration, - tokens_used, - cost_usd, - } => { - let ms = u64::try_from(duration.as_millis()).unwrap_or(u64::MAX); - info!(duration_ms = ms, tokens = ?tokens_used, cost_usd = ?cost_usd, "agent.end"); - } - ObserverEvent::ToolCallStart { tool } => { - info!(tool = %tool, "tool.start"); - } - ObserverEvent::ToolCall { - tool, - duration, - success, - } => { - let ms = u64::try_from(duration.as_millis()).unwrap_or(u64::MAX); - info!(tool = %tool, duration_ms = ms, success = success, "tool.call"); - } - ObserverEvent::TurnComplete => { - info!("turn.complete"); - } - ObserverEvent::ChannelMessage { channel, direction } => { - info!(channel = %channel, direction = %direction, "channel.message"); - } - ObserverEvent::HeartbeatTick => { - info!("heartbeat.tick"); - } - ObserverEvent::Error { component, message } => { - info!(component = %component, error = %message, "error"); - } } } @@ -100,6 +102,10 @@ impl Observer for LogObserver { fn name(&self) -> &str { "log" } + + fn as_any(&self) -> &dyn Any { + self + } } #[cfg(test)] @@ -119,37 +125,23 @@ mod tests { provider: "openrouter".into(), model: "claude-sonnet".into(), }); - obs.record_event(&ObserverEvent::LlmRequest { - provider: "openrouter".into(), - model: "claude-sonnet".into(), - messages_count: 2, - }); - obs.record_event(&ObserverEvent::LlmResponse { - provider: "openrouter".into(), - model: "claude-sonnet".into(), - duration: Duration::from_millis(250), - success: true, - error_message: None, - }); obs.record_event(&ObserverEvent::AgentEnd { + provider: "openrouter".into(), + model: "claude-sonnet".into(), duration: Duration::from_millis(500), tokens_used: Some(100), - cost_usd: Some(0.0015), }); obs.record_event(&ObserverEvent::AgentEnd { + provider: "openrouter".into(), + model: "claude-sonnet".into(), duration: Duration::ZERO, tokens_used: None, - cost_usd: None, - }); - obs.record_event(&ObserverEvent::ToolCallStart { - tool: "shell".into(), }); obs.record_event(&ObserverEvent::ToolCall { tool: "shell".into(), duration: Duration::from_millis(10), success: false, }); - obs.record_event(&ObserverEvent::TurnComplete); obs.record_event(&ObserverEvent::ChannelMessage { channel: "telegram".into(), direction: "outbound".into(), diff --git a/src/observability/mod.rs b/src/observability/mod.rs index d4d75c7..9ee3a09 100644 --- a/src/observability/mod.rs +++ b/src/observability/mod.rs @@ -1,19 +1,13 @@ pub mod log; pub mod multi; pub mod noop; -pub mod otel; +pub mod prometheus; pub mod traits; -pub mod verbose; -#[allow(unused_imports)] pub use self::log::LogObserver; -#[allow(unused_imports)] -pub use self::multi::MultiObserver; pub use noop::NoopObserver; -pub use otel::OtelObserver; +pub use prometheus::PrometheusObserver; pub use traits::{Observer, ObserverEvent}; -#[allow(unused_imports)] -pub use verbose::VerboseObserver; use crate::config::ObservabilityConfig; @@ -21,27 +15,7 @@ use crate::config::ObservabilityConfig; pub fn create_observer(config: &ObservabilityConfig) -> Box { match config.backend.as_str() { "log" => Box::new(LogObserver::new()), - "otel" | "opentelemetry" | "otlp" => { - match OtelObserver::new( - config.otel_endpoint.as_deref(), - config.otel_service_name.as_deref(), - ) { - Ok(obs) => { - tracing::info!( - endpoint = config - .otel_endpoint - .as_deref() - .unwrap_or("http://localhost:4318"), - "OpenTelemetry observer initialized" - ); - Box::new(obs) - } - Err(e) => { - tracing::error!("Failed to create OTel observer: {e}. Falling back to noop."); - Box::new(NoopObserver) - } - } - } + "prometheus" => Box::new(PrometheusObserver::new()), "none" | "noop" => Box::new(NoopObserver), _ => { tracing::warn!( @@ -61,7 +35,7 @@ mod tests { fn factory_none_returns_noop() { let cfg = ObservabilityConfig { backend: "none".into(), - ..ObservabilityConfig::default() + ..Default::default() }; assert_eq!(create_observer(&cfg).name(), "noop"); } @@ -70,7 +44,7 @@ mod tests { fn factory_noop_returns_noop() { let cfg = ObservabilityConfig { backend: "noop".into(), - ..ObservabilityConfig::default() + ..Default::default() }; assert_eq!(create_observer(&cfg).name(), "noop"); } @@ -79,46 +53,25 @@ mod tests { fn factory_log_returns_log() { let cfg = ObservabilityConfig { backend: "log".into(), - ..ObservabilityConfig::default() + ..Default::default() }; assert_eq!(create_observer(&cfg).name(), "log"); } #[test] - fn factory_otel_returns_otel() { + fn factory_prometheus_returns_prometheus() { let cfg = ObservabilityConfig { - backend: "otel".into(), - otel_endpoint: Some("http://127.0.0.1:19999".into()), - otel_service_name: Some("test".into()), + backend: "prometheus".into(), + ..Default::default() }; - assert_eq!(create_observer(&cfg).name(), "otel"); - } - - #[test] - fn factory_opentelemetry_alias() { - let cfg = ObservabilityConfig { - backend: "opentelemetry".into(), - otel_endpoint: Some("http://127.0.0.1:19999".into()), - otel_service_name: Some("test".into()), - }; - assert_eq!(create_observer(&cfg).name(), "otel"); - } - - #[test] - fn factory_otlp_alias() { - let cfg = ObservabilityConfig { - backend: "otlp".into(), - otel_endpoint: Some("http://127.0.0.1:19999".into()), - otel_service_name: Some("test".into()), - }; - assert_eq!(create_observer(&cfg).name(), "otel"); + assert_eq!(create_observer(&cfg).name(), "prometheus"); } #[test] fn factory_unknown_falls_back_to_noop() { let cfg = ObservabilityConfig { backend: "xyzzy_unknown".into(), - ..ObservabilityConfig::default() + ..Default::default() }; assert_eq!(create_observer(&cfg).name(), "noop"); } @@ -127,7 +80,7 @@ mod tests { fn factory_empty_string_falls_back_to_noop() { let cfg = ObservabilityConfig { backend: String::new(), - ..ObservabilityConfig::default() + ..Default::default() }; assert_eq!(create_observer(&cfg).name(), "noop"); } @@ -136,7 +89,7 @@ mod tests { fn factory_garbage_falls_back_to_noop() { let cfg = ObservabilityConfig { backend: "xyzzy_garbage_123".into(), - ..ObservabilityConfig::default() + ..Default::default() }; assert_eq!(create_observer(&cfg).name(), "noop"); } diff --git a/src/observability/multi.rs b/src/observability/multi.rs index e57400b..84b1dbc 100644 --- a/src/observability/multi.rs +++ b/src/observability/multi.rs @@ -1,4 +1,5 @@ use super::traits::{Observer, ObserverEvent, ObserverMetric}; +use std::any::Any; /// Combine multiple observers — fan-out events to all backends pub struct MultiObserver { @@ -33,6 +34,10 @@ impl Observer for MultiObserver { fn name(&self) -> &str { "multi" } + + fn as_any(&self) -> &dyn Any { + self + } } #[cfg(test)] @@ -76,6 +81,10 @@ mod tests { fn name(&self) -> &str { "counting" } + + fn as_any(&self) -> &dyn Any { + self + } } #[test] diff --git a/src/observability/noop.rs b/src/observability/noop.rs index 004af21..c16e291 100644 --- a/src/observability/noop.rs +++ b/src/observability/noop.rs @@ -1,4 +1,5 @@ use super::traits::{Observer, ObserverEvent, ObserverMetric}; +use std::any::Any; /// Zero-overhead observer — all methods compile to nothing pub struct NoopObserver; @@ -13,6 +14,10 @@ impl Observer for NoopObserver { fn name(&self) -> &str { "noop" } + + fn as_any(&self) -> &dyn Any { + self + } } #[cfg(test)] @@ -33,37 +38,23 @@ mod tests { provider: "test".into(), model: "test".into(), }); - obs.record_event(&ObserverEvent::LlmRequest { - provider: "test".into(), - model: "test".into(), - messages_count: 2, - }); - obs.record_event(&ObserverEvent::LlmResponse { - provider: "test".into(), - model: "test".into(), - duration: Duration::from_millis(1), - success: true, - error_message: None, - }); obs.record_event(&ObserverEvent::AgentEnd { + provider: "test".into(), + model: "test".into(), duration: Duration::from_millis(100), tokens_used: Some(42), - cost_usd: Some(0.001), }); obs.record_event(&ObserverEvent::AgentEnd { + provider: "test".into(), + model: "test".into(), duration: Duration::ZERO, tokens_used: None, - cost_usd: None, - }); - obs.record_event(&ObserverEvent::ToolCallStart { - tool: "shell".into(), }); obs.record_event(&ObserverEvent::ToolCall { tool: "shell".into(), duration: Duration::from_secs(1), success: true, }); - obs.record_event(&ObserverEvent::TurnComplete); obs.record_event(&ObserverEvent::ChannelMessage { channel: "cli".into(), direction: "inbound".into(), diff --git a/src/observability/prometheus.rs b/src/observability/prometheus.rs new file mode 100644 index 0000000..b1d8d84 --- /dev/null +++ b/src/observability/prometheus.rs @@ -0,0 +1,387 @@ +use super::traits::{Observer, ObserverEvent, ObserverMetric}; +use prometheus::{ + Encoder, GaugeVec, Histogram, HistogramOpts, HistogramVec, IntCounterVec, Registry, TextEncoder, +}; + +/// Prometheus-backed observer — exposes metrics for scraping via `/metrics`. +pub struct PrometheusObserver { + registry: Registry, + + // Counters + agent_starts: IntCounterVec, + tool_calls: IntCounterVec, + channel_messages: IntCounterVec, + heartbeat_ticks: prometheus::IntCounter, + errors: IntCounterVec, + + // Histograms + agent_duration: HistogramVec, + tool_duration: HistogramVec, + request_latency: Histogram, + + // Gauges + tokens_used: prometheus::IntGauge, + active_sessions: GaugeVec, + queue_depth: GaugeVec, +} + +impl PrometheusObserver { + pub fn new() -> Self { + let registry = Registry::new(); + + let agent_starts = IntCounterVec::new( + prometheus::Opts::new("zeroclaw_agent_starts_total", "Total agent invocations"), + &["provider", "model"], + ) + .expect("valid metric"); + + let tool_calls = IntCounterVec::new( + prometheus::Opts::new("zeroclaw_tool_calls_total", "Total tool calls"), + &["tool", "success"], + ) + .expect("valid metric"); + + let channel_messages = IntCounterVec::new( + prometheus::Opts::new("zeroclaw_channel_messages_total", "Total channel messages"), + &["channel", "direction"], + ) + .expect("valid metric"); + + let heartbeat_ticks = prometheus::IntCounter::new( + "zeroclaw_heartbeat_ticks_total", + "Total heartbeat ticks", + ) + .expect("valid metric"); + + let errors = IntCounterVec::new( + prometheus::Opts::new("zeroclaw_errors_total", "Total errors by component"), + &["component"], + ) + .expect("valid metric"); + + let agent_duration = HistogramVec::new( + HistogramOpts::new( + "zeroclaw_agent_duration_seconds", + "Agent invocation duration in seconds", + ) + .buckets(vec![0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]), + &["provider", "model"], + ) + .expect("valid metric"); + + let tool_duration = HistogramVec::new( + HistogramOpts::new( + "zeroclaw_tool_duration_seconds", + "Tool execution duration in seconds", + ) + .buckets(vec![0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0]), + &["tool"], + ) + .expect("valid metric"); + + let request_latency = Histogram::with_opts( + HistogramOpts::new( + "zeroclaw_request_latency_seconds", + "Request latency in seconds", + ) + .buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]), + ) + .expect("valid metric"); + + let tokens_used = prometheus::IntGauge::new( + "zeroclaw_tokens_used_last", + "Tokens used in the last request", + ) + .expect("valid metric"); + + let active_sessions = GaugeVec::new( + prometheus::Opts::new("zeroclaw_active_sessions", "Number of active sessions"), + &[], + ) + .expect("valid metric"); + + let queue_depth = GaugeVec::new( + prometheus::Opts::new("zeroclaw_queue_depth", "Message queue depth"), + &[], + ) + .expect("valid metric"); + + // Register all metrics + registry.register(Box::new(agent_starts.clone())).ok(); + registry.register(Box::new(tool_calls.clone())).ok(); + registry.register(Box::new(channel_messages.clone())).ok(); + registry.register(Box::new(heartbeat_ticks.clone())).ok(); + registry.register(Box::new(errors.clone())).ok(); + registry.register(Box::new(agent_duration.clone())).ok(); + registry.register(Box::new(tool_duration.clone())).ok(); + registry.register(Box::new(request_latency.clone())).ok(); + registry.register(Box::new(tokens_used.clone())).ok(); + registry.register(Box::new(active_sessions.clone())).ok(); + registry.register(Box::new(queue_depth.clone())).ok(); + + Self { + registry, + agent_starts, + tool_calls, + channel_messages, + heartbeat_ticks, + errors, + agent_duration, + tool_duration, + request_latency, + tokens_used, + active_sessions, + queue_depth, + } + } + + /// Encode all registered metrics into Prometheus text exposition format. + pub fn encode(&self) -> String { + let encoder = TextEncoder::new(); + let families = self.registry.gather(); + let mut buf = Vec::new(); + encoder.encode(&families, &mut buf).unwrap_or_default(); + String::from_utf8(buf).unwrap_or_default() + } +} + +impl Observer for PrometheusObserver { + fn record_event(&self, event: &ObserverEvent) { + match event { + ObserverEvent::AgentStart { provider, model } => { + self.agent_starts + .with_label_values(&[provider, model]) + .inc(); + } + ObserverEvent::AgentEnd { + provider, + model, + duration, + tokens_used, + } => { + // Agent duration is recorded via the histogram with provider/model labels + self.agent_duration + .with_label_values(&[provider, model]) + .observe(duration.as_secs_f64()); + if let Some(t) = tokens_used { + self.tokens_used.set(i64::try_from(*t).unwrap_or(i64::MAX)); + } + } + ObserverEvent::ToolCallStart { tool } => { + self.tool_calls + .with_label_values(&[&tool.to_string(), &"start".to_string()]) + .inc(); + } + ObserverEvent::ToolCall { + tool, + duration, + success, + } => { + let success_str = if *success { "true" } else { "false" }; + self.tool_calls + .with_label_values(&[&tool.to_string(), &success_str.to_string()]) + .inc(); + self.tool_duration + .with_label_values(&[&tool.to_string()]) + .observe(duration.as_secs_f64()); + } + ObserverEvent::TurnComplete => { + // No metric for turn complete currently + } + ObserverEvent::ChannelMessage { channel, direction } => { + self.channel_messages + .with_label_values(&[channel, direction]) + .inc(); + } + ObserverEvent::HeartbeatTick => { + self.heartbeat_ticks.inc(); + } + ObserverEvent::Error { + component, + message: _, + } => { + self.errors.with_label_values(&[component]).inc(); + } + ObserverEvent::LlmRequest { .. } => {} + ObserverEvent::LlmResponse { .. } => {} + } + } + + fn record_metric(&self, metric: &ObserverMetric) { + match metric { + ObserverMetric::RequestLatency(d) => { + self.request_latency.observe(d.as_secs_f64()); + } + ObserverMetric::TokensUsed(t) => { + self.tokens_used.set(i64::try_from(*t).unwrap_or(i64::MAX)); + } + ObserverMetric::ActiveSessions(s) => { + self.active_sessions + .with_label_values(&[] as &[&str]) + .set(*s as f64); + } + ObserverMetric::QueueDepth(d) => { + self.queue_depth.with_label_values(&[] as &[&str]).set(*d as f64); + } + } + } + + fn name(&self) -> &str { + "prometheus" + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn prometheus_observer_name() { + assert_eq!(PrometheusObserver::new().name(), "prometheus"); + } + + #[test] + fn records_all_events_without_panic() { + let obs = PrometheusObserver::new(); + obs.record_event(&ObserverEvent::AgentStart { + provider: "openrouter".into(), + model: "claude-sonnet".into(), + }); + obs.record_event(&ObserverEvent::AgentEnd { + provider: "openrouter".into(), + model: "claude-sonnet".into(), + duration: Duration::from_millis(500), + tokens_used: Some(100), + }); + obs.record_event(&ObserverEvent::AgentEnd { + provider: "openrouter".into(), + model: "claude-sonnet".into(), + duration: Duration::ZERO, + tokens_used: None, + }); + obs.record_event(&ObserverEvent::ToolCall { + tool: "shell".into(), + duration: Duration::from_millis(10), + success: true, + }); + obs.record_event(&ObserverEvent::ToolCall { + tool: "file_read".into(), + duration: Duration::from_millis(5), + success: false, + }); + obs.record_event(&ObserverEvent::ChannelMessage { + channel: "telegram".into(), + direction: "inbound".into(), + }); + obs.record_event(&ObserverEvent::HeartbeatTick); + obs.record_event(&ObserverEvent::Error { + component: "provider".into(), + message: "timeout".into(), + }); + } + + #[test] + fn records_all_metrics_without_panic() { + let obs = PrometheusObserver::new(); + obs.record_metric(&ObserverMetric::RequestLatency(Duration::from_secs(2))); + obs.record_metric(&ObserverMetric::TokensUsed(500)); + obs.record_metric(&ObserverMetric::TokensUsed(0)); + obs.record_metric(&ObserverMetric::ActiveSessions(3)); + obs.record_metric(&ObserverMetric::QueueDepth(42)); + } + + #[test] + fn encode_produces_prometheus_text_format() { + let obs = PrometheusObserver::new(); + obs.record_event(&ObserverEvent::AgentStart { + provider: "openrouter".into(), + model: "claude-sonnet".into(), + }); + obs.record_event(&ObserverEvent::ToolCall { + tool: "shell".into(), + duration: Duration::from_millis(100), + success: true, + }); + obs.record_event(&ObserverEvent::HeartbeatTick); + obs.record_metric(&ObserverMetric::RequestLatency(Duration::from_millis(250))); + + let output = obs.encode(); + assert!(output.contains("zeroclaw_agent_starts_total")); + assert!(output.contains("zeroclaw_tool_calls_total")); + assert!(output.contains("zeroclaw_heartbeat_ticks_total")); + assert!(output.contains("zeroclaw_request_latency_seconds")); + } + + #[test] + fn counters_increment_correctly() { + let obs = PrometheusObserver::new(); + + for _ in 0..3 { + obs.record_event(&ObserverEvent::HeartbeatTick); + } + + let output = obs.encode(); + assert!(output.contains("zeroclaw_heartbeat_ticks_total 3")); + } + + #[test] + fn tool_calls_track_success_and_failure_separately() { + let obs = PrometheusObserver::new(); + + obs.record_event(&ObserverEvent::ToolCall { + tool: "shell".into(), + duration: Duration::from_millis(10), + success: true, + }); + obs.record_event(&ObserverEvent::ToolCall { + tool: "shell".into(), + duration: Duration::from_millis(10), + success: true, + }); + obs.record_event(&ObserverEvent::ToolCall { + tool: "shell".into(), + duration: Duration::from_millis(10), + success: false, + }); + + let output = obs.encode(); + assert!(output.contains(r#"zeroclaw_tool_calls_total{success="true",tool="shell"} 2"#)); + assert!(output.contains(r#"zeroclaw_tool_calls_total{success="false",tool="shell"} 1"#)); + } + + #[test] + fn errors_track_by_component() { + let obs = PrometheusObserver::new(); + obs.record_event(&ObserverEvent::Error { + component: "provider".into(), + message: "timeout".into(), + }); + obs.record_event(&ObserverEvent::Error { + component: "provider".into(), + message: "rate limit".into(), + }); + obs.record_event(&ObserverEvent::Error { + component: "channels".into(), + message: "disconnected".into(), + }); + + let output = obs.encode(); + assert!(output.contains(r#"zeroclaw_errors_total{component="provider"} 2"#)); + assert!(output.contains(r#"zeroclaw_errors_total{component="channels"} 1"#)); + } + + #[test] + fn gauge_reflects_latest_value() { + let obs = PrometheusObserver::new(); + obs.record_metric(&ObserverMetric::TokensUsed(100)); + obs.record_metric(&ObserverMetric::TokensUsed(200)); + + let output = obs.encode(); + assert!(output.contains("zeroclaw_tokens_used_last 200")); + } +} diff --git a/src/observability/traits.rs b/src/observability/traits.rs index d978304..a607241 100644 --- a/src/observability/traits.rs +++ b/src/observability/traits.rs @@ -7,29 +7,12 @@ pub enum ObserverEvent { provider: String, model: String, }, - /// A request is about to be sent to an LLM provider. - /// - /// This is emitted immediately before a provider call so observers can print - /// user-facing progress without leaking prompt contents. - LlmRequest { - provider: String, - model: String, - messages_count: usize, - }, - /// Result of a single LLM provider call. - LlmResponse { - provider: String, - model: String, - duration: Duration, - success: bool, - error_message: Option, - }, AgentEnd { + provider: String, + model: String, duration: Duration, tokens_used: Option, - cost_usd: Option, }, - /// A tool call is about to be executed. ToolCallStart { tool: String, }, @@ -38,7 +21,6 @@ pub enum ObserverEvent { duration: Duration, success: bool, }, - /// The agent produced a final answer for the current user message. TurnComplete, ChannelMessage { channel: String, @@ -49,6 +31,19 @@ pub enum ObserverEvent { component: String, message: String, }, + // LLM request/response tracking + LlmRequest { + provider: String, + model: String, + messages_count: usize, + }, + LlmResponse { + provider: String, + model: String, + duration: Duration, + success: bool, + error_message: Option, + }, } /// Numeric metrics @@ -61,7 +56,7 @@ pub enum ObserverMetric { } /// Core observability trait — implement for any backend -pub trait Observer: Send + Sync + 'static { +pub trait Observer: Send + Sync { /// Record a discrete event fn record_event(&self, event: &ObserverEvent); @@ -74,80 +69,6 @@ pub trait Observer: Send + Sync + 'static { /// Human-readable name of this observer fn name(&self) -> &str; - /// Downcast to `Any` for backend-specific operations - fn as_any(&self) -> &dyn std::any::Any - where - Self: Sized, - { - self - } -} - -#[cfg(test)] -mod tests { - use super::*; - use parking_lot::Mutex; - use std::time::Duration; - - #[derive(Default)] - struct DummyObserver { - events: Mutex, - metrics: Mutex, - } - - impl Observer for DummyObserver { - fn record_event(&self, _event: &ObserverEvent) { - let mut guard = self.events.lock(); - *guard += 1; - } - - fn record_metric(&self, _metric: &ObserverMetric) { - let mut guard = self.metrics.lock(); - *guard += 1; - } - - fn name(&self) -> &str { - "dummy-observer" - } - } - - #[test] - fn observer_records_events_and_metrics() { - let observer = DummyObserver::default(); - - observer.record_event(&ObserverEvent::HeartbeatTick); - observer.record_event(&ObserverEvent::Error { - component: "test".into(), - message: "boom".into(), - }); - observer.record_metric(&ObserverMetric::TokensUsed(42)); - - assert_eq!(*observer.events.lock(), 2); - assert_eq!(*observer.metrics.lock(), 1); - } - - #[test] - fn observer_default_flush_and_as_any_work() { - let observer = DummyObserver::default(); - - observer.flush(); - assert_eq!(observer.name(), "dummy-observer"); - assert!(observer.as_any().downcast_ref::().is_some()); - } - - #[test] - fn observer_event_and_metric_are_cloneable() { - let event = ObserverEvent::ToolCall { - tool: "shell".into(), - duration: Duration::from_millis(10), - success: true, - }; - let metric = ObserverMetric::RequestLatency(Duration::from_millis(8)); - - let cloned_event = event.clone(); - let cloned_metric = metric.clone(); - - assert!(matches!(cloned_event, ObserverEvent::ToolCall { .. })); - assert!(matches!(cloned_metric, ObserverMetric::RequestLatency(_))); - } + /// Downcast support for backend-specific operations (e.g. Prometheus encoding) + fn as_any(&self) -> &dyn std::any::Any; } diff --git a/src/providers/compatible.rs b/src/providers/compatible.rs index 047c335..da12aec 100644 --- a/src/providers/compatible.rs +++ b/src/providers/compatible.rs @@ -1129,4 +1129,40 @@ mod tests { "https://opencode.ai/zen/v1/chat/completions" ); } + + // ══════════════════════════════════════════════════════════ + // Issue #580: Custom provider URL construction tests + // ══════════════════════════════════════════════════════════ + + #[test] + fn chat_completions_url_custom_provider_opencode_issue_580() { + // Issue #580: Custom provider should correctly append /chat/completions + // The error log format "{provider_name}/{current_model}" was confusing + // but the actual URL construction was always correct. + let p = make_provider("custom", "https://opencode.ai/zen/v1", None); + assert_eq!( + p.chat_completions_url(), + "https://opencode.ai/zen/v1/chat/completions" + ); + } + + #[test] + fn chat_completions_url_custom_provider_standard() { + // Standard custom provider without /v1 path + let p = make_provider("custom", "https://my-api.example.com", None); + assert_eq!( + p.chat_completions_url(), + "https://my-api.example.com/chat/completions" + ); + } + + #[test] + fn chat_completions_url_custom_provider_with_v1() { + // Custom provider with /v1 path + let p = make_provider("custom", "https://my-api.example.com/v1", None); + assert_eq!( + p.chat_completions_url(), + "https://my-api.example.com/v1/chat/completions" + ); + } } diff --git a/src/providers/reliable.rs b/src/providers/reliable.rs index 29f1903..f686e17 100644 --- a/src/providers/reliable.rs +++ b/src/providers/reliable.rs @@ -193,8 +193,18 @@ impl Provider for ReliableProvider { } else { "retryable" }; + // For custom providers, strip the URL from the provider name + // to avoid confusion. The format "custom:https://..." in error + // logs makes it look like the model is being appended to the URL. + let display_provider = if provider_name.starts_with("custom:") { + "custom" + } else if provider_name.starts_with("anthropic-custom:") { + "anthropic-custom" + } else { + provider_name + }; failures.push(format!( - "provider={provider_name} model={current_model} attempt {}/{}: {failure_reason}", + "{display_provider}/{current_model} attempt {}/{}: {failure_reason}", attempt + 1, self.max_retries + 1 )); @@ -298,8 +308,18 @@ impl Provider for ReliableProvider { } else { "retryable" }; + // For custom providers, strip the URL from the provider name + // to avoid confusion. The format "custom:https://..." in error + // logs makes it look like the model is being appended to the URL. + let display_provider = if provider_name.starts_with("custom:") { + "custom" + } else if provider_name.starts_with("anthropic-custom:") { + "anthropic-custom" + } else { + provider_name + }; failures.push(format!( - "provider={provider_name} model={current_model} attempt {}/{}: {failure_reason}", + "{display_provider}/{current_model} attempt {}/{}: {failure_reason}", attempt + 1, self.max_retries + 1 ));