feat(observability): focus PR 596 on Prometheus backend

This commit is contained in:
Chummy 2026-02-18 11:32:51 +08:00
parent eba544dbd4
commit 2560399423
12 changed files with 358 additions and 103 deletions

View file

@ -544,8 +544,8 @@ pub async fn run(
.to_string();
agent.observer.record_event(&ObserverEvent::AgentStart {
provider: provider_name,
model: model_name,
provider: provider_name.clone(),
model: model_name.clone(),
});
if let Some(msg) = message {
@ -556,10 +556,11 @@ pub async fn run(
}
agent.observer.record_event(&ObserverEvent::AgentEnd {
provider: "cli".to_string(),
model: "unknown".to_string(),
provider: provider_name,
model: model_name,
duration: start.elapsed(),
tokens_used: None,
cost_usd: None,
});
Ok(())

View file

@ -1336,6 +1336,7 @@ pub async fn run(
model: model_name.to_string(),
duration,
tokens_used: None,
cost_usd: None,
});
Ok(final_output)

View file

@ -703,20 +703,94 @@ async fn handle_webhook(
.await;
}
let provider_label = state
.config
.lock()
.default_provider
.clone()
.unwrap_or_else(|| "unknown".to_string());
let model_label = state.model.clone();
let started_at = Instant::now();
state
.observer
.record_event(&crate::observability::ObserverEvent::AgentStart {
provider: provider_label.clone(),
model: model_label.clone(),
});
state
.observer
.record_event(&crate::observability::ObserverEvent::LlmRequest {
provider: provider_label.clone(),
model: model_label.clone(),
messages_count: 1,
});
match state
.provider
.simple_chat(message, &state.model, state.temperature)
.await
{
Ok(response) => {
let duration = started_at.elapsed();
state
.observer
.record_event(&crate::observability::ObserverEvent::LlmResponse {
provider: provider_label.clone(),
model: model_label.clone(),
duration,
success: true,
error_message: None,
});
state.observer.record_metric(
&crate::observability::traits::ObserverMetric::RequestLatency(duration),
);
state
.observer
.record_event(&crate::observability::ObserverEvent::AgentEnd {
provider: provider_label,
model: model_label,
duration,
tokens_used: None,
cost_usd: None,
});
let body = serde_json::json!({"response": response, "model": state.model});
(StatusCode::OK, Json(body))
}
Err(e) => {
tracing::error!(
"Webhook provider error: {}",
providers::sanitize_api_error(&e.to_string())
let duration = started_at.elapsed();
let sanitized = providers::sanitize_api_error(&e.to_string());
state
.observer
.record_event(&crate::observability::ObserverEvent::LlmResponse {
provider: provider_label.clone(),
model: model_label.clone(),
duration,
success: false,
error_message: Some(sanitized.clone()),
});
state.observer.record_metric(
&crate::observability::traits::ObserverMetric::RequestLatency(duration),
);
state
.observer
.record_event(&crate::observability::ObserverEvent::Error {
component: "gateway".to_string(),
message: sanitized.clone(),
});
state
.observer
.record_event(&crate::observability::ObserverEvent::AgentEnd {
provider: provider_label,
model: model_label,
duration,
tokens_used: None,
cost_usd: None,
});
tracing::error!("Webhook provider error: {}", sanitized);
let err = serde_json::json!({"error": "LLM request failed"});
(StatusCode::INTERNAL_SERVER_ERROR, Json(err))
}
@ -938,6 +1012,72 @@ mod tests {
assert_clone::<AppState>();
}
#[tokio::test]
async fn metrics_endpoint_returns_hint_when_prometheus_is_disabled() {
let state = AppState {
config: Arc::new(Mutex::new(Config::default())),
provider: Arc::new(MockProvider::default()),
model: "test-model".into(),
temperature: 0.0,
mem: Arc::new(MockMemory),
auto_save: false,
webhook_secret_hash: None,
pairing: Arc::new(PairingGuard::new(false, &[])),
rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100)),
idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300))),
whatsapp: None,
whatsapp_app_secret: None,
observer: Arc::new(crate::observability::NoopObserver),
};
let response = handle_metrics(State(state)).await.into_response();
assert_eq!(response.status(), StatusCode::OK);
assert_eq!(
response
.headers()
.get(header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok()),
Some(PROMETHEUS_CONTENT_TYPE)
);
let body = response.into_body().collect().await.unwrap().to_bytes();
let text = String::from_utf8(body.to_vec()).unwrap();
assert!(text.contains("Prometheus backend not enabled"));
}
#[tokio::test]
async fn metrics_endpoint_renders_prometheus_output() {
let prom = Arc::new(crate::observability::PrometheusObserver::new());
crate::observability::Observer::record_event(
prom.as_ref(),
&crate::observability::ObserverEvent::HeartbeatTick,
);
let observer: Arc<dyn crate::observability::Observer> = prom;
let state = AppState {
config: Arc::new(Mutex::new(Config::default())),
provider: Arc::new(MockProvider::default()),
model: "test-model".into(),
temperature: 0.0,
mem: Arc::new(MockMemory),
auto_save: false,
webhook_secret_hash: None,
pairing: Arc::new(PairingGuard::new(false, &[])),
rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100)),
idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300))),
whatsapp: None,
whatsapp_app_secret: None,
observer,
};
let response = handle_metrics(State(state)).await.into_response();
assert_eq!(response.status(), StatusCode::OK);
let body = response.into_body().collect().await.unwrap().to_bytes();
let text = String::from_utf8(body.to_vec()).unwrap();
assert!(text.contains("zeroclaw_heartbeat_ticks_total 1"));
}
#[test]
fn gateway_rate_limiter_blocks_after_limit() {
let limiter = GatewayRateLimiter::new(2, 2, 100);

View file

@ -22,9 +22,10 @@ impl Observer for LogObserver {
model,
duration,
tokens_used,
cost_usd,
} => {
let ms = u64::try_from(duration.as_millis()).unwrap_or(u64::MAX);
info!(provider = %provider, model = %model, duration_ms = ms, tokens = ?tokens_used, "agent.end");
info!(provider = %provider, model = %model, duration_ms = ms, tokens = ?tokens_used, cost_usd = ?cost_usd, "agent.end");
}
ObserverEvent::ToolCallStart { tool } => {
info!(tool = %tool, "tool.start");
@ -130,12 +131,14 @@ mod tests {
model: "claude-sonnet".into(),
duration: Duration::from_millis(500),
tokens_used: Some(100),
cost_usd: Some(0.0015),
});
obs.record_event(&ObserverEvent::AgentEnd {
provider: "openrouter".into(),
model: "claude-sonnet".into(),
duration: Duration::ZERO,
tokens_used: None,
cost_usd: None,
});
obs.record_event(&ObserverEvent::ToolCall {
tool: "shell".into(),

View file

@ -1,13 +1,21 @@
pub mod log;
pub mod multi;
pub mod noop;
pub mod otel;
pub mod prometheus;
pub mod traits;
pub mod verbose;
#[allow(unused_imports)]
pub use self::log::LogObserver;
#[allow(unused_imports)]
pub use self::multi::MultiObserver;
pub use noop::NoopObserver;
pub use otel::OtelObserver;
pub use prometheus::PrometheusObserver;
pub use traits::{Observer, ObserverEvent};
#[allow(unused_imports)]
pub use verbose::VerboseObserver;
use crate::config::ObservabilityConfig;
@ -16,6 +24,27 @@ pub fn create_observer(config: &ObservabilityConfig) -> Box<dyn Observer> {
match config.backend.as_str() {
"log" => Box::new(LogObserver::new()),
"prometheus" => Box::new(PrometheusObserver::new()),
"otel" | "opentelemetry" | "otlp" => {
match OtelObserver::new(
config.otel_endpoint.as_deref(),
config.otel_service_name.as_deref(),
) {
Ok(obs) => {
tracing::info!(
endpoint = config
.otel_endpoint
.as_deref()
.unwrap_or("http://localhost:4318"),
"OpenTelemetry observer initialized"
);
Box::new(obs)
}
Err(e) => {
tracing::error!("Failed to create OTel observer: {e}. Falling back to noop.");
Box::new(NoopObserver)
}
}
}
"none" | "noop" => Box::new(NoopObserver),
_ => {
tracing::warn!(
@ -35,7 +64,7 @@ mod tests {
fn factory_none_returns_noop() {
let cfg = ObservabilityConfig {
backend: "none".into(),
..Default::default()
..ObservabilityConfig::default()
};
assert_eq!(create_observer(&cfg).name(), "noop");
}
@ -44,7 +73,7 @@ mod tests {
fn factory_noop_returns_noop() {
let cfg = ObservabilityConfig {
backend: "noop".into(),
..Default::default()
..ObservabilityConfig::default()
};
assert_eq!(create_observer(&cfg).name(), "noop");
}
@ -53,7 +82,7 @@ mod tests {
fn factory_log_returns_log() {
let cfg = ObservabilityConfig {
backend: "log".into(),
..Default::default()
..ObservabilityConfig::default()
};
assert_eq!(create_observer(&cfg).name(), "log");
}
@ -62,16 +91,46 @@ mod tests {
fn factory_prometheus_returns_prometheus() {
let cfg = ObservabilityConfig {
backend: "prometheus".into(),
..Default::default()
..ObservabilityConfig::default()
};
assert_eq!(create_observer(&cfg).name(), "prometheus");
}
#[test]
fn factory_otel_returns_otel() {
let cfg = ObservabilityConfig {
backend: "otel".into(),
otel_endpoint: Some("http://127.0.0.1:19999".into()),
otel_service_name: Some("test".into()),
};
assert_eq!(create_observer(&cfg).name(), "otel");
}
#[test]
fn factory_opentelemetry_alias() {
let cfg = ObservabilityConfig {
backend: "opentelemetry".into(),
otel_endpoint: Some("http://127.0.0.1:19999".into()),
otel_service_name: Some("test".into()),
};
assert_eq!(create_observer(&cfg).name(), "otel");
}
#[test]
fn factory_otlp_alias() {
let cfg = ObservabilityConfig {
backend: "otlp".into(),
otel_endpoint: Some("http://127.0.0.1:19999".into()),
otel_service_name: Some("test".into()),
};
assert_eq!(create_observer(&cfg).name(), "otel");
}
#[test]
fn factory_unknown_falls_back_to_noop() {
let cfg = ObservabilityConfig {
backend: "xyzzy_unknown".into(),
..Default::default()
..ObservabilityConfig::default()
};
assert_eq!(create_observer(&cfg).name(), "noop");
}
@ -80,7 +139,7 @@ mod tests {
fn factory_empty_string_falls_back_to_noop() {
let cfg = ObservabilityConfig {
backend: String::new(),
..Default::default()
..ObservabilityConfig::default()
};
assert_eq!(create_observer(&cfg).name(), "noop");
}
@ -89,7 +148,7 @@ mod tests {
fn factory_garbage_falls_back_to_noop() {
let cfg = ObservabilityConfig {
backend: "xyzzy_garbage_123".into(),
..Default::default()
..ObservabilityConfig::default()
};
assert_eq!(create_observer(&cfg).name(), "noop");
}

View file

@ -43,12 +43,14 @@ mod tests {
model: "test".into(),
duration: Duration::from_millis(100),
tokens_used: Some(42),
cost_usd: Some(0.001),
});
obs.record_event(&ObserverEvent::AgentEnd {
provider: "test".into(),
model: "test".into(),
duration: Duration::ZERO,
tokens_used: None,
cost_usd: None,
});
obs.record_event(&ObserverEvent::ToolCall {
tool: "shell".into(),

View file

@ -5,6 +5,7 @@ use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use opentelemetry_sdk::trace::SdkTracerProvider;
use std::any::Any;
use std::time::SystemTime;
/// OpenTelemetry-backed observer — exports traces and metrics via OTLP.
@ -225,6 +226,8 @@ impl Observer for OtelObserver {
span.end();
}
ObserverEvent::AgentEnd {
provider,
model,
duration,
tokens_used,
cost_usd,
@ -239,7 +242,11 @@ impl Observer for OtelObserver {
opentelemetry::trace::SpanBuilder::from_name("agent.invocation")
.with_kind(SpanKind::Internal)
.with_start_time(start_time)
.with_attributes(vec![KeyValue::new("duration_s", secs)]),
.with_attributes(vec![
KeyValue::new("provider", provider.clone()),
KeyValue::new("model", model.clone()),
KeyValue::new("duration_s", secs),
]),
);
if let Some(t) = tokens_used {
span.set_attribute(KeyValue::new("tokens_used", *t as i64));
@ -249,7 +256,13 @@ impl Observer for OtelObserver {
}
span.end();
self.agent_duration.record(secs, &[]);
self.agent_duration.record(
secs,
&[
KeyValue::new("provider", provider.clone()),
KeyValue::new("model", model.clone()),
],
);
// Note: tokens are recorded via record_metric(TokensUsed) to avoid
// double-counting. AgentEnd only records duration.
}
@ -350,6 +363,10 @@ impl Observer for OtelObserver {
fn name(&self) -> &str {
"otel"
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
@ -396,11 +413,15 @@ mod tests {
error_message: None,
});
obs.record_event(&ObserverEvent::AgentEnd {
provider: "openrouter".into(),
model: "claude-sonnet".into(),
duration: Duration::from_millis(500),
tokens_used: Some(100),
cost_usd: Some(0.0015),
});
obs.record_event(&ObserverEvent::AgentEnd {
provider: "openrouter".into(),
model: "claude-sonnet".into(),
duration: Duration::ZERO,
tokens_used: None,
cost_usd: None,

View file

@ -47,10 +47,8 @@ impl PrometheusObserver {
)
.expect("valid metric");
let heartbeat_ticks = prometheus::IntCounter::new(
"zeroclaw_heartbeat_ticks_total",
"Total heartbeat ticks",
)
let heartbeat_ticks =
prometheus::IntCounter::new("zeroclaw_heartbeat_ticks_total", "Total heartbeat ticks")
.expect("valid metric");
let errors = IntCounterVec::new(
@ -158,6 +156,7 @@ impl Observer for PrometheusObserver {
model,
duration,
tokens_used,
cost_usd: _,
} => {
// Agent duration is recorded via the histogram with provider/model labels
self.agent_duration
@ -167,11 +166,7 @@ impl Observer for PrometheusObserver {
self.tokens_used.set(i64::try_from(*t).unwrap_or(i64::MAX));
}
}
ObserverEvent::ToolCallStart { tool } => {
self.tool_calls
.with_label_values(&[&tool.to_string(), &"start".to_string()])
.inc();
}
ObserverEvent::ToolCallStart { tool: _ } => {}
ObserverEvent::ToolCall {
tool,
duration,
@ -179,10 +174,10 @@ impl Observer for PrometheusObserver {
} => {
let success_str = if *success { "true" } else { "false" };
self.tool_calls
.with_label_values(&[&tool.to_string(), &success_str.to_string()])
.with_label_values(&[tool.as_str(), success_str])
.inc();
self.tool_duration
.with_label_values(&[&tool.to_string()])
.with_label_values(&[tool.as_str()])
.observe(duration.as_secs_f64());
}
ObserverEvent::TurnComplete => {
@ -221,7 +216,9 @@ impl Observer for PrometheusObserver {
.set(*s as f64);
}
ObserverMetric::QueueDepth(d) => {
self.queue_depth.with_label_values(&[] as &[&str]).set(*d as f64);
self.queue_depth
.with_label_values(&[] as &[&str])
.set(*d as f64);
}
}
}
@ -257,12 +254,14 @@ mod tests {
model: "claude-sonnet".into(),
duration: Duration::from_millis(500),
tokens_used: Some(100),
cost_usd: None,
});
obs.record_event(&ObserverEvent::AgentEnd {
provider: "openrouter".into(),
model: "claude-sonnet".into(),
duration: Duration::ZERO,
tokens_used: None,
cost_usd: None,
});
obs.record_event(&ObserverEvent::ToolCall {
tool: "shell".into(),

View file

@ -7,12 +7,31 @@ pub enum ObserverEvent {
provider: String,
model: String,
},
/// A request is about to be sent to an LLM provider.
///
/// This is emitted immediately before a provider call so observers can print
/// user-facing progress without leaking prompt contents.
LlmRequest {
provider: String,
model: String,
messages_count: usize,
},
/// Result of a single LLM provider call.
LlmResponse {
provider: String,
model: String,
duration: Duration,
success: bool,
error_message: Option<String>,
},
AgentEnd {
provider: String,
model: String,
duration: Duration,
tokens_used: Option<u64>,
cost_usd: Option<f64>,
},
/// A tool call is about to be executed.
ToolCallStart {
tool: String,
},
@ -21,6 +40,7 @@ pub enum ObserverEvent {
duration: Duration,
success: bool,
},
/// The agent produced a final answer for the current user message.
TurnComplete,
ChannelMessage {
channel: String,
@ -31,19 +51,6 @@ pub enum ObserverEvent {
component: String,
message: String,
},
// LLM request/response tracking
LlmRequest {
provider: String,
model: String,
messages_count: usize,
},
LlmResponse {
provider: String,
model: String,
duration: Duration,
success: bool,
error_message: Option<String>,
},
}
/// Numeric metrics
@ -56,7 +63,7 @@ pub enum ObserverMetric {
}
/// Core observability trait — implement for any backend
pub trait Observer: Send + Sync {
pub trait Observer: Send + Sync + 'static {
/// Record a discrete event
fn record_event(&self, event: &ObserverEvent);
@ -69,6 +76,79 @@ pub trait Observer: Send + Sync {
/// Human-readable name of this observer
fn name(&self) -> &str;
/// Downcast support for backend-specific operations (e.g. Prometheus encoding)
/// Downcast to `Any` for backend-specific operations
fn as_any(&self) -> &dyn std::any::Any;
}
#[cfg(test)]
mod tests {
use super::*;
use parking_lot::Mutex;
use std::time::Duration;
#[derive(Default)]
struct DummyObserver {
events: Mutex<u64>,
metrics: Mutex<u64>,
}
impl Observer for DummyObserver {
fn record_event(&self, _event: &ObserverEvent) {
let mut guard = self.events.lock();
*guard += 1;
}
fn record_metric(&self, _metric: &ObserverMetric) {
let mut guard = self.metrics.lock();
*guard += 1;
}
fn name(&self) -> &str {
"dummy-observer"
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
#[test]
fn observer_records_events_and_metrics() {
let observer = DummyObserver::default();
observer.record_event(&ObserverEvent::HeartbeatTick);
observer.record_event(&ObserverEvent::Error {
component: "test".into(),
message: "boom".into(),
});
observer.record_metric(&ObserverMetric::TokensUsed(42));
assert_eq!(*observer.events.lock(), 2);
assert_eq!(*observer.metrics.lock(), 1);
}
#[test]
fn observer_default_flush_and_as_any_work() {
let observer = DummyObserver::default();
observer.flush();
assert_eq!(observer.name(), "dummy-observer");
assert!(observer.as_any().downcast_ref::<DummyObserver>().is_some());
}
#[test]
fn observer_event_and_metric_are_cloneable() {
let event = ObserverEvent::ToolCall {
tool: "shell".into(),
duration: Duration::from_millis(10),
success: true,
};
let metric = ObserverMetric::RequestLatency(Duration::from_millis(8));
let cloned_event = event.clone();
let cloned_metric = metric.clone();
assert!(matches!(cloned_event, ObserverEvent::ToolCall { .. }));
assert!(matches!(cloned_metric, ObserverMetric::RequestLatency(_)));
}
}

View file

@ -1,4 +1,5 @@
use super::traits::{Observer, ObserverEvent, ObserverMetric};
use std::any::Any;
/// Human-readable progress observer for interactive CLI sessions.
///
@ -56,6 +57,10 @@ impl Observer for VerboseObserver {
fn name(&self) -> &str {
"verbose"
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]

View file

@ -1129,40 +1129,4 @@ mod tests {
"https://opencode.ai/zen/v1/chat/completions"
);
}
// ══════════════════════════════════════════════════════════
// Issue #580: Custom provider URL construction tests
// ══════════════════════════════════════════════════════════
#[test]
fn chat_completions_url_custom_provider_opencode_issue_580() {
// Issue #580: Custom provider should correctly append /chat/completions
// The error log format "{provider_name}/{current_model}" was confusing
// but the actual URL construction was always correct.
let p = make_provider("custom", "https://opencode.ai/zen/v1", None);
assert_eq!(
p.chat_completions_url(),
"https://opencode.ai/zen/v1/chat/completions"
);
}
#[test]
fn chat_completions_url_custom_provider_standard() {
// Standard custom provider without /v1 path
let p = make_provider("custom", "https://my-api.example.com", None);
assert_eq!(
p.chat_completions_url(),
"https://my-api.example.com/chat/completions"
);
}
#[test]
fn chat_completions_url_custom_provider_with_v1() {
// Custom provider with /v1 path
let p = make_provider("custom", "https://my-api.example.com/v1", None);
assert_eq!(
p.chat_completions_url(),
"https://my-api.example.com/v1/chat/completions"
);
}
}

View file

@ -193,18 +193,8 @@ impl Provider for ReliableProvider {
} else {
"retryable"
};
// For custom providers, strip the URL from the provider name
// to avoid confusion. The format "custom:https://..." in error
// logs makes it look like the model is being appended to the URL.
let display_provider = if provider_name.starts_with("custom:") {
"custom"
} else if provider_name.starts_with("anthropic-custom:") {
"anthropic-custom"
} else {
provider_name
};
failures.push(format!(
"{display_provider}/{current_model} attempt {}/{}: {failure_reason}",
"provider={provider_name} model={current_model} attempt {}/{}: {failure_reason}",
attempt + 1,
self.max_retries + 1
));
@ -308,18 +298,8 @@ impl Provider for ReliableProvider {
} else {
"retryable"
};
// For custom providers, strip the URL from the provider name
// to avoid confusion. The format "custom:https://..." in error
// logs makes it look like the model is being appended to the URL.
let display_provider = if provider_name.starts_with("custom:") {
"custom"
} else if provider_name.starts_with("anthropic-custom:") {
"anthropic-custom"
} else {
provider_name
};
failures.push(format!(
"{display_provider}/{current_model} attempt {}/{}: {failure_reason}",
"provider={provider_name} model={current_model} attempt {}/{}: {failure_reason}",
attempt + 1,
self.max_retries + 1
));