feat(observability): implement Prometheus metrics backend with /metrics endpoint
- Adds PrometheusObserver backend with counters, histograms, and gauges - Tracks agent starts/duration, tool calls, channel messages, heartbeat ticks, errors, request latency, tokens, sessions, queue depth - Adds GET /metrics endpoint to gateway for Prometheus scraping - Adds provider/model labels to AgentStart and AgentEnd events for better observability - Adds as_any() method to Observer trait for backend-specific downcast Metrics exposed: - zeroclaw_agent_starts_total (Counter) with provider/model labels - zeroclaw_agent_duration_seconds (Histogram) with provider/model labels - zeroclaw_tool_calls_total (Counter) with tool/success labels - zeroclaw_tool_duration_seconds (Histogram) with tool label - zeroclaw_channel_messages_total (Counter) with channel/direction labels - zeroclaw_heartbeat_ticks_total (Counter) - zeroclaw_errors_total (Counter) with component label - zeroclaw_request_latency_seconds (Histogram) - zeroclaw_tokens_used_last (Gauge) - zeroclaw_active_sessions (Gauge) - zeroclaw_queue_depth (Gauge) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
c04f2855e4
commit
eba544dbd4
11 changed files with 575 additions and 228 deletions
387
src/observability/prometheus.rs
Normal file
387
src/observability/prometheus.rs
Normal file
|
|
@ -0,0 +1,387 @@
|
|||
use super::traits::{Observer, ObserverEvent, ObserverMetric};
|
||||
use prometheus::{
|
||||
Encoder, GaugeVec, Histogram, HistogramOpts, HistogramVec, IntCounterVec, Registry, TextEncoder,
|
||||
};
|
||||
|
||||
/// Prometheus-backed observer — exposes metrics for scraping via `/metrics`.
|
||||
pub struct PrometheusObserver {
|
||||
registry: Registry,
|
||||
|
||||
// Counters
|
||||
agent_starts: IntCounterVec,
|
||||
tool_calls: IntCounterVec,
|
||||
channel_messages: IntCounterVec,
|
||||
heartbeat_ticks: prometheus::IntCounter,
|
||||
errors: IntCounterVec,
|
||||
|
||||
// Histograms
|
||||
agent_duration: HistogramVec,
|
||||
tool_duration: HistogramVec,
|
||||
request_latency: Histogram,
|
||||
|
||||
// Gauges
|
||||
tokens_used: prometheus::IntGauge,
|
||||
active_sessions: GaugeVec,
|
||||
queue_depth: GaugeVec,
|
||||
}
|
||||
|
||||
impl PrometheusObserver {
|
||||
pub fn new() -> Self {
|
||||
let registry = Registry::new();
|
||||
|
||||
let agent_starts = IntCounterVec::new(
|
||||
prometheus::Opts::new("zeroclaw_agent_starts_total", "Total agent invocations"),
|
||||
&["provider", "model"],
|
||||
)
|
||||
.expect("valid metric");
|
||||
|
||||
let tool_calls = IntCounterVec::new(
|
||||
prometheus::Opts::new("zeroclaw_tool_calls_total", "Total tool calls"),
|
||||
&["tool", "success"],
|
||||
)
|
||||
.expect("valid metric");
|
||||
|
||||
let channel_messages = IntCounterVec::new(
|
||||
prometheus::Opts::new("zeroclaw_channel_messages_total", "Total channel messages"),
|
||||
&["channel", "direction"],
|
||||
)
|
||||
.expect("valid metric");
|
||||
|
||||
let heartbeat_ticks = prometheus::IntCounter::new(
|
||||
"zeroclaw_heartbeat_ticks_total",
|
||||
"Total heartbeat ticks",
|
||||
)
|
||||
.expect("valid metric");
|
||||
|
||||
let errors = IntCounterVec::new(
|
||||
prometheus::Opts::new("zeroclaw_errors_total", "Total errors by component"),
|
||||
&["component"],
|
||||
)
|
||||
.expect("valid metric");
|
||||
|
||||
let agent_duration = HistogramVec::new(
|
||||
HistogramOpts::new(
|
||||
"zeroclaw_agent_duration_seconds",
|
||||
"Agent invocation duration in seconds",
|
||||
)
|
||||
.buckets(vec![0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]),
|
||||
&["provider", "model"],
|
||||
)
|
||||
.expect("valid metric");
|
||||
|
||||
let tool_duration = HistogramVec::new(
|
||||
HistogramOpts::new(
|
||||
"zeroclaw_tool_duration_seconds",
|
||||
"Tool execution duration in seconds",
|
||||
)
|
||||
.buckets(vec![0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0]),
|
||||
&["tool"],
|
||||
)
|
||||
.expect("valid metric");
|
||||
|
||||
let request_latency = Histogram::with_opts(
|
||||
HistogramOpts::new(
|
||||
"zeroclaw_request_latency_seconds",
|
||||
"Request latency in seconds",
|
||||
)
|
||||
.buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]),
|
||||
)
|
||||
.expect("valid metric");
|
||||
|
||||
let tokens_used = prometheus::IntGauge::new(
|
||||
"zeroclaw_tokens_used_last",
|
||||
"Tokens used in the last request",
|
||||
)
|
||||
.expect("valid metric");
|
||||
|
||||
let active_sessions = GaugeVec::new(
|
||||
prometheus::Opts::new("zeroclaw_active_sessions", "Number of active sessions"),
|
||||
&[],
|
||||
)
|
||||
.expect("valid metric");
|
||||
|
||||
let queue_depth = GaugeVec::new(
|
||||
prometheus::Opts::new("zeroclaw_queue_depth", "Message queue depth"),
|
||||
&[],
|
||||
)
|
||||
.expect("valid metric");
|
||||
|
||||
// Register all metrics
|
||||
registry.register(Box::new(agent_starts.clone())).ok();
|
||||
registry.register(Box::new(tool_calls.clone())).ok();
|
||||
registry.register(Box::new(channel_messages.clone())).ok();
|
||||
registry.register(Box::new(heartbeat_ticks.clone())).ok();
|
||||
registry.register(Box::new(errors.clone())).ok();
|
||||
registry.register(Box::new(agent_duration.clone())).ok();
|
||||
registry.register(Box::new(tool_duration.clone())).ok();
|
||||
registry.register(Box::new(request_latency.clone())).ok();
|
||||
registry.register(Box::new(tokens_used.clone())).ok();
|
||||
registry.register(Box::new(active_sessions.clone())).ok();
|
||||
registry.register(Box::new(queue_depth.clone())).ok();
|
||||
|
||||
Self {
|
||||
registry,
|
||||
agent_starts,
|
||||
tool_calls,
|
||||
channel_messages,
|
||||
heartbeat_ticks,
|
||||
errors,
|
||||
agent_duration,
|
||||
tool_duration,
|
||||
request_latency,
|
||||
tokens_used,
|
||||
active_sessions,
|
||||
queue_depth,
|
||||
}
|
||||
}
|
||||
|
||||
/// Encode all registered metrics into Prometheus text exposition format.
|
||||
pub fn encode(&self) -> String {
|
||||
let encoder = TextEncoder::new();
|
||||
let families = self.registry.gather();
|
||||
let mut buf = Vec::new();
|
||||
encoder.encode(&families, &mut buf).unwrap_or_default();
|
||||
String::from_utf8(buf).unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
||||
impl Observer for PrometheusObserver {
|
||||
fn record_event(&self, event: &ObserverEvent) {
|
||||
match event {
|
||||
ObserverEvent::AgentStart { provider, model } => {
|
||||
self.agent_starts
|
||||
.with_label_values(&[provider, model])
|
||||
.inc();
|
||||
}
|
||||
ObserverEvent::AgentEnd {
|
||||
provider,
|
||||
model,
|
||||
duration,
|
||||
tokens_used,
|
||||
} => {
|
||||
// Agent duration is recorded via the histogram with provider/model labels
|
||||
self.agent_duration
|
||||
.with_label_values(&[provider, model])
|
||||
.observe(duration.as_secs_f64());
|
||||
if let Some(t) = tokens_used {
|
||||
self.tokens_used.set(i64::try_from(*t).unwrap_or(i64::MAX));
|
||||
}
|
||||
}
|
||||
ObserverEvent::ToolCallStart { tool } => {
|
||||
self.tool_calls
|
||||
.with_label_values(&[&tool.to_string(), &"start".to_string()])
|
||||
.inc();
|
||||
}
|
||||
ObserverEvent::ToolCall {
|
||||
tool,
|
||||
duration,
|
||||
success,
|
||||
} => {
|
||||
let success_str = if *success { "true" } else { "false" };
|
||||
self.tool_calls
|
||||
.with_label_values(&[&tool.to_string(), &success_str.to_string()])
|
||||
.inc();
|
||||
self.tool_duration
|
||||
.with_label_values(&[&tool.to_string()])
|
||||
.observe(duration.as_secs_f64());
|
||||
}
|
||||
ObserverEvent::TurnComplete => {
|
||||
// No metric for turn complete currently
|
||||
}
|
||||
ObserverEvent::ChannelMessage { channel, direction } => {
|
||||
self.channel_messages
|
||||
.with_label_values(&[channel, direction])
|
||||
.inc();
|
||||
}
|
||||
ObserverEvent::HeartbeatTick => {
|
||||
self.heartbeat_ticks.inc();
|
||||
}
|
||||
ObserverEvent::Error {
|
||||
component,
|
||||
message: _,
|
||||
} => {
|
||||
self.errors.with_label_values(&[component]).inc();
|
||||
}
|
||||
ObserverEvent::LlmRequest { .. } => {}
|
||||
ObserverEvent::LlmResponse { .. } => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn record_metric(&self, metric: &ObserverMetric) {
|
||||
match metric {
|
||||
ObserverMetric::RequestLatency(d) => {
|
||||
self.request_latency.observe(d.as_secs_f64());
|
||||
}
|
||||
ObserverMetric::TokensUsed(t) => {
|
||||
self.tokens_used.set(i64::try_from(*t).unwrap_or(i64::MAX));
|
||||
}
|
||||
ObserverMetric::ActiveSessions(s) => {
|
||||
self.active_sessions
|
||||
.with_label_values(&[] as &[&str])
|
||||
.set(*s as f64);
|
||||
}
|
||||
ObserverMetric::QueueDepth(d) => {
|
||||
self.queue_depth.with_label_values(&[] as &[&str]).set(*d as f64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn name(&self) -> &str {
|
||||
"prometheus"
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::time::Duration;
|
||||
|
||||
#[test]
|
||||
fn prometheus_observer_name() {
|
||||
assert_eq!(PrometheusObserver::new().name(), "prometheus");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn records_all_events_without_panic() {
|
||||
let obs = PrometheusObserver::new();
|
||||
obs.record_event(&ObserverEvent::AgentStart {
|
||||
provider: "openrouter".into(),
|
||||
model: "claude-sonnet".into(),
|
||||
});
|
||||
obs.record_event(&ObserverEvent::AgentEnd {
|
||||
provider: "openrouter".into(),
|
||||
model: "claude-sonnet".into(),
|
||||
duration: Duration::from_millis(500),
|
||||
tokens_used: Some(100),
|
||||
});
|
||||
obs.record_event(&ObserverEvent::AgentEnd {
|
||||
provider: "openrouter".into(),
|
||||
model: "claude-sonnet".into(),
|
||||
duration: Duration::ZERO,
|
||||
tokens_used: None,
|
||||
});
|
||||
obs.record_event(&ObserverEvent::ToolCall {
|
||||
tool: "shell".into(),
|
||||
duration: Duration::from_millis(10),
|
||||
success: true,
|
||||
});
|
||||
obs.record_event(&ObserverEvent::ToolCall {
|
||||
tool: "file_read".into(),
|
||||
duration: Duration::from_millis(5),
|
||||
success: false,
|
||||
});
|
||||
obs.record_event(&ObserverEvent::ChannelMessage {
|
||||
channel: "telegram".into(),
|
||||
direction: "inbound".into(),
|
||||
});
|
||||
obs.record_event(&ObserverEvent::HeartbeatTick);
|
||||
obs.record_event(&ObserverEvent::Error {
|
||||
component: "provider".into(),
|
||||
message: "timeout".into(),
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn records_all_metrics_without_panic() {
|
||||
let obs = PrometheusObserver::new();
|
||||
obs.record_metric(&ObserverMetric::RequestLatency(Duration::from_secs(2)));
|
||||
obs.record_metric(&ObserverMetric::TokensUsed(500));
|
||||
obs.record_metric(&ObserverMetric::TokensUsed(0));
|
||||
obs.record_metric(&ObserverMetric::ActiveSessions(3));
|
||||
obs.record_metric(&ObserverMetric::QueueDepth(42));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn encode_produces_prometheus_text_format() {
|
||||
let obs = PrometheusObserver::new();
|
||||
obs.record_event(&ObserverEvent::AgentStart {
|
||||
provider: "openrouter".into(),
|
||||
model: "claude-sonnet".into(),
|
||||
});
|
||||
obs.record_event(&ObserverEvent::ToolCall {
|
||||
tool: "shell".into(),
|
||||
duration: Duration::from_millis(100),
|
||||
success: true,
|
||||
});
|
||||
obs.record_event(&ObserverEvent::HeartbeatTick);
|
||||
obs.record_metric(&ObserverMetric::RequestLatency(Duration::from_millis(250)));
|
||||
|
||||
let output = obs.encode();
|
||||
assert!(output.contains("zeroclaw_agent_starts_total"));
|
||||
assert!(output.contains("zeroclaw_tool_calls_total"));
|
||||
assert!(output.contains("zeroclaw_heartbeat_ticks_total"));
|
||||
assert!(output.contains("zeroclaw_request_latency_seconds"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn counters_increment_correctly() {
|
||||
let obs = PrometheusObserver::new();
|
||||
|
||||
for _ in 0..3 {
|
||||
obs.record_event(&ObserverEvent::HeartbeatTick);
|
||||
}
|
||||
|
||||
let output = obs.encode();
|
||||
assert!(output.contains("zeroclaw_heartbeat_ticks_total 3"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tool_calls_track_success_and_failure_separately() {
|
||||
let obs = PrometheusObserver::new();
|
||||
|
||||
obs.record_event(&ObserverEvent::ToolCall {
|
||||
tool: "shell".into(),
|
||||
duration: Duration::from_millis(10),
|
||||
success: true,
|
||||
});
|
||||
obs.record_event(&ObserverEvent::ToolCall {
|
||||
tool: "shell".into(),
|
||||
duration: Duration::from_millis(10),
|
||||
success: true,
|
||||
});
|
||||
obs.record_event(&ObserverEvent::ToolCall {
|
||||
tool: "shell".into(),
|
||||
duration: Duration::from_millis(10),
|
||||
success: false,
|
||||
});
|
||||
|
||||
let output = obs.encode();
|
||||
assert!(output.contains(r#"zeroclaw_tool_calls_total{success="true",tool="shell"} 2"#));
|
||||
assert!(output.contains(r#"zeroclaw_tool_calls_total{success="false",tool="shell"} 1"#));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn errors_track_by_component() {
|
||||
let obs = PrometheusObserver::new();
|
||||
obs.record_event(&ObserverEvent::Error {
|
||||
component: "provider".into(),
|
||||
message: "timeout".into(),
|
||||
});
|
||||
obs.record_event(&ObserverEvent::Error {
|
||||
component: "provider".into(),
|
||||
message: "rate limit".into(),
|
||||
});
|
||||
obs.record_event(&ObserverEvent::Error {
|
||||
component: "channels".into(),
|
||||
message: "disconnected".into(),
|
||||
});
|
||||
|
||||
let output = obs.encode();
|
||||
assert!(output.contains(r#"zeroclaw_errors_total{component="provider"} 2"#));
|
||||
assert!(output.contains(r#"zeroclaw_errors_total{component="channels"} 1"#));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gauge_reflects_latest_value() {
|
||||
let obs = PrometheusObserver::new();
|
||||
obs.record_metric(&ObserverMetric::TokensUsed(100));
|
||||
obs.record_metric(&ObserverMetric::TokensUsed(200));
|
||||
|
||||
let output = obs.encode();
|
||||
assert!(output.contains("zeroclaw_tokens_used_last 200"));
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue