feat: add OpenTelemetry tracing and metrics observer

* feat: add OpenTelemetry tracing and metrics observer

Add OtelObserver that exports traces and metrics via OTLP HTTP/protobuf
to any OpenTelemetry-compatible collector (Jaeger, Grafana Tempo, etc.).

- ObserverEvents map to OTel spans (AgentEnd, ToolCall, Error) and
  metric counters (AgentStart, ChannelMessage, HeartbeatTick)
- ObserverMetrics map to OTel histograms and gauges
- Spans include proper timing via SpanBuilder.with_start_time
- Config: backend="otel", otel_endpoint, otel_service_name
- Accepts "otel", "opentelemetry", "otlp" as backend aliases
- Graceful fallback to NoopObserver on init failure

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: resolve unused variable warning and update Cargo.lock

Prefix unused `resolved_key` with underscore to suppress clippy
warning introduced by upstream changes. Regenerate Cargo.lock
after rebase on main.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: address review comments on OTel observer

- Fix metric types: use Gauge for ActiveSessions/QueueDepth (absolute
  readings, not deltas), Counter<u64> for TokensUsed (monotonic)
- Remove duplicate token recording from AgentEnd event handler
  (TokensUsed metric via record_metric is the canonical path)
- Store meter_provider in struct so flush() exports both traces
  and metrics (was silently dropping metrics on shutdown)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: argenis de la rosa <theonlyhennygod@gmail.com>
This commit is contained in:
Edvard Schøyen 2026-02-15 14:46:49 -05:00 committed by GitHub
parent 89b1ec6fa2
commit 0f6648ceb1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 632 additions and 3 deletions

View file

@ -1,10 +1,12 @@
pub mod log;
pub mod multi;
pub mod noop;
pub mod otel;
pub mod traits;
pub use self::log::LogObserver;
pub use noop::NoopObserver;
pub use otel::OtelObserver;
pub use traits::{Observer, ObserverEvent};
use crate::config::ObservabilityConfig;
@ -13,6 +15,24 @@ use crate::config::ObservabilityConfig;
pub fn create_observer(config: &ObservabilityConfig) -> Box<dyn Observer> {
match config.backend.as_str() {
"log" => Box::new(LogObserver::new()),
"otel" | "opentelemetry" | "otlp" => {
match OtelObserver::new(
config.otel_endpoint.as_deref(),
config.otel_service_name.as_deref(),
) {
Ok(obs) => {
tracing::info!(
endpoint = config.otel_endpoint.as_deref().unwrap_or("http://localhost:4318"),
"OpenTelemetry observer initialized"
);
Box::new(obs)
}
Err(e) => {
tracing::error!("Failed to create OTel observer: {e}. Falling back to noop.");
Box::new(NoopObserver)
}
}
}
"none" | "noop" => Box::new(NoopObserver),
_ => {
tracing::warn!(
@ -32,6 +52,7 @@ mod tests {
fn factory_none_returns_noop() {
let cfg = ObservabilityConfig {
backend: "none".into(),
..ObservabilityConfig::default()
};
assert_eq!(create_observer(&cfg).name(), "noop");
}
@ -40,6 +61,7 @@ mod tests {
fn factory_noop_returns_noop() {
let cfg = ObservabilityConfig {
backend: "noop".into(),
..ObservabilityConfig::default()
};
assert_eq!(create_observer(&cfg).name(), "noop");
}
@ -48,14 +70,46 @@ mod tests {
fn factory_log_returns_log() {
let cfg = ObservabilityConfig {
backend: "log".into(),
..ObservabilityConfig::default()
};
assert_eq!(create_observer(&cfg).name(), "log");
}
#[test]
fn factory_otel_returns_otel() {
let cfg = ObservabilityConfig {
backend: "otel".into(),
otel_endpoint: Some("http://127.0.0.1:19999".into()),
otel_service_name: Some("test".into()),
};
assert_eq!(create_observer(&cfg).name(), "otel");
}
#[test]
fn factory_opentelemetry_alias() {
let cfg = ObservabilityConfig {
backend: "opentelemetry".into(),
otel_endpoint: Some("http://127.0.0.1:19999".into()),
otel_service_name: Some("test".into()),
};
assert_eq!(create_observer(&cfg).name(), "otel");
}
#[test]
fn factory_otlp_alias() {
let cfg = ObservabilityConfig {
backend: "otlp".into(),
otel_endpoint: Some("http://127.0.0.1:19999".into()),
otel_service_name: Some("test".into()),
};
assert_eq!(create_observer(&cfg).name(), "otel");
}
#[test]
fn factory_unknown_falls_back_to_noop() {
let cfg = ObservabilityConfig {
backend: "prometheus".into(),
backend: "xyzzy_unknown".into(),
..ObservabilityConfig::default()
};
assert_eq!(create_observer(&cfg).name(), "noop");
}
@ -64,6 +118,7 @@ mod tests {
fn factory_empty_string_falls_back_to_noop() {
let cfg = ObservabilityConfig {
backend: String::new(),
..ObservabilityConfig::default()
};
assert_eq!(create_observer(&cfg).name(), "noop");
}
@ -72,6 +127,7 @@ mod tests {
fn factory_garbage_falls_back_to_noop() {
let cfg = ObservabilityConfig {
backend: "xyzzy_garbage_123".into(),
..ObservabilityConfig::default()
};
assert_eq!(create_observer(&cfg).name(), "noop");
}

371
src/observability/otel.rs Normal file
View file

@ -0,0 +1,371 @@
use super::traits::{Observer, ObserverEvent, ObserverMetric};
use opentelemetry::metrics::{Counter, Gauge, Histogram};
use opentelemetry::trace::{Span, SpanKind, Status, Tracer};
use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use opentelemetry_sdk::trace::SdkTracerProvider;
use std::time::SystemTime;
/// OpenTelemetry-backed observer — exports traces and metrics via OTLP.
pub struct OtelObserver {
tracer_provider: SdkTracerProvider,
meter_provider: SdkMeterProvider,
// Metrics instruments
agent_starts: Counter<u64>,
agent_duration: Histogram<f64>,
tool_calls: Counter<u64>,
tool_duration: Histogram<f64>,
channel_messages: Counter<u64>,
heartbeat_ticks: Counter<u64>,
errors: Counter<u64>,
request_latency: Histogram<f64>,
tokens_used: Counter<u64>,
active_sessions: Gauge<u64>,
queue_depth: Gauge<u64>,
}
impl OtelObserver {
/// Create a new OTel observer exporting to the given OTLP endpoint.
///
/// Uses HTTP/protobuf transport (port 4318 by default).
/// Falls back to `http://localhost:4318` if no endpoint is provided.
pub fn new(endpoint: Option<&str>, service_name: Option<&str>) -> Result<Self, String> {
let endpoint = endpoint.unwrap_or("http://localhost:4318");
let service_name = service_name.unwrap_or("zeroclaw");
// ── Trace exporter ──────────────────────────────────────
let span_exporter = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_endpoint(endpoint)
.build()
.map_err(|e| format!("Failed to create OTLP span exporter: {e}"))?;
let tracer_provider = SdkTracerProvider::builder()
.with_batch_exporter(span_exporter)
.with_resource(opentelemetry_sdk::Resource::builder()
.with_service_name(service_name.to_string())
.build())
.build();
global::set_tracer_provider(tracer_provider.clone());
// ── Metric exporter ─────────────────────────────────────
let metric_exporter = opentelemetry_otlp::MetricExporter::builder()
.with_http()
.with_endpoint(endpoint)
.build()
.map_err(|e| format!("Failed to create OTLP metric exporter: {e}"))?;
let metric_reader = opentelemetry_sdk::metrics::PeriodicReader::builder(metric_exporter)
.build();
let meter_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
.with_reader(metric_reader)
.with_resource(opentelemetry_sdk::Resource::builder()
.with_service_name(service_name.to_string())
.build())
.build();
let meter_provider_clone = meter_provider.clone();
global::set_meter_provider(meter_provider);
// ── Create metric instruments ────────────────────────────
let meter = global::meter("zeroclaw");
let agent_starts = meter
.u64_counter("zeroclaw.agent.starts")
.with_description("Total agent invocations")
.build();
let agent_duration = meter
.f64_histogram("zeroclaw.agent.duration")
.with_description("Agent invocation duration in seconds")
.with_unit("s")
.build();
let tool_calls = meter
.u64_counter("zeroclaw.tool.calls")
.with_description("Total tool calls")
.build();
let tool_duration = meter
.f64_histogram("zeroclaw.tool.duration")
.with_description("Tool execution duration in seconds")
.with_unit("s")
.build();
let channel_messages = meter
.u64_counter("zeroclaw.channel.messages")
.with_description("Total channel messages")
.build();
let heartbeat_ticks = meter
.u64_counter("zeroclaw.heartbeat.ticks")
.with_description("Total heartbeat ticks")
.build();
let errors = meter
.u64_counter("zeroclaw.errors")
.with_description("Total errors by component")
.build();
let request_latency = meter
.f64_histogram("zeroclaw.request.latency")
.with_description("Request latency in seconds")
.with_unit("s")
.build();
let tokens_used = meter
.u64_counter("zeroclaw.tokens.used")
.with_description("Total tokens consumed (monotonic)")
.build();
let active_sessions = meter
.u64_gauge("zeroclaw.sessions.active")
.with_description("Current number of active sessions")
.build();
let queue_depth = meter
.u64_gauge("zeroclaw.queue.depth")
.with_description("Current message queue depth")
.build();
Ok(Self {
tracer_provider,
meter_provider: meter_provider_clone,
agent_starts,
agent_duration,
tool_calls,
tool_duration,
channel_messages,
heartbeat_ticks,
errors,
request_latency,
tokens_used,
active_sessions,
queue_depth,
})
}
}
impl Observer for OtelObserver {
fn record_event(&self, event: &ObserverEvent) {
let tracer = global::tracer("zeroclaw");
match event {
ObserverEvent::AgentStart { provider, model } => {
self.agent_starts.add(
1,
&[
KeyValue::new("provider", provider.clone()),
KeyValue::new("model", model.clone()),
],
);
}
ObserverEvent::AgentEnd {
duration,
tokens_used,
} => {
let secs = duration.as_secs_f64();
let start_time = SystemTime::now()
.checked_sub(*duration)
.unwrap_or(SystemTime::now());
// Create a completed span with correct timing
let mut span = tracer.build(
opentelemetry::trace::SpanBuilder::from_name("agent.invocation")
.with_kind(SpanKind::Internal)
.with_start_time(start_time)
.with_attributes(vec![
KeyValue::new("duration_s", secs),
]),
);
if let Some(t) = tokens_used {
span.set_attribute(KeyValue::new("tokens_used", *t as i64));
}
span.end();
self.agent_duration.record(secs, &[]);
// Note: tokens are recorded via record_metric(TokensUsed) to avoid
// double-counting. AgentEnd only records duration.
}
ObserverEvent::ToolCall {
tool,
duration,
success,
} => {
let secs = duration.as_secs_f64();
let start_time = SystemTime::now()
.checked_sub(*duration)
.unwrap_or(SystemTime::now());
let status = if *success {
Status::Ok
} else {
Status::error("")
};
let mut span = tracer.build(
opentelemetry::trace::SpanBuilder::from_name("tool.call")
.with_kind(SpanKind::Internal)
.with_start_time(start_time)
.with_attributes(vec![
KeyValue::new("tool.name", tool.clone()),
KeyValue::new("tool.success", *success),
KeyValue::new("duration_s", secs),
]),
);
span.set_status(status);
span.end();
let attrs = [
KeyValue::new("tool", tool.clone()),
KeyValue::new("success", success.to_string()),
];
self.tool_calls.add(1, &attrs);
self.tool_duration.record(secs, &[KeyValue::new("tool", tool.clone())]);
}
ObserverEvent::ChannelMessage { channel, direction } => {
self.channel_messages.add(
1,
&[
KeyValue::new("channel", channel.clone()),
KeyValue::new("direction", direction.clone()),
],
);
}
ObserverEvent::HeartbeatTick => {
self.heartbeat_ticks.add(1, &[]);
}
ObserverEvent::Error { component, message } => {
// Create an error span for visibility in trace backends
let mut span = tracer.build(
opentelemetry::trace::SpanBuilder::from_name("error")
.with_kind(SpanKind::Internal)
.with_attributes(vec![
KeyValue::new("component", component.clone()),
KeyValue::new("error.message", message.clone()),
]),
);
span.set_status(Status::error(message.clone()));
span.end();
self.errors.add(1, &[KeyValue::new("component", component.clone())]);
}
}
}
fn record_metric(&self, metric: &ObserverMetric) {
match metric {
ObserverMetric::RequestLatency(d) => {
self.request_latency.record(d.as_secs_f64(), &[]);
}
ObserverMetric::TokensUsed(t) => {
self.tokens_used.add(*t as u64, &[]);
}
ObserverMetric::ActiveSessions(s) => {
self.active_sessions.record(*s as u64, &[]);
}
ObserverMetric::QueueDepth(d) => {
self.queue_depth.record(*d as u64, &[]);
}
}
}
fn flush(&self) {
if let Err(e) = self.tracer_provider.force_flush() {
tracing::warn!("OTel trace flush failed: {e}");
}
if let Err(e) = self.meter_provider.force_flush() {
tracing::warn!("OTel metric flush failed: {e}");
}
}
fn name(&self) -> &str {
"otel"
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
// Note: OtelObserver::new() requires an OTLP endpoint.
// In tests we verify the struct creation fails gracefully
// when no collector is available, and test the observer interface
// by constructing with a known-unreachable endpoint (spans/metrics
// are buffered and exported asynchronously, so recording never panics).
fn test_observer() -> OtelObserver {
// Create with a dummy endpoint — exports will silently fail
// but the observer itself works fine for recording
OtelObserver::new(
Some("http://127.0.0.1:19999"),
Some("zeroclaw-test"),
)
.expect("observer creation should not fail with valid endpoint format")
}
#[test]
fn otel_observer_name() {
let obs = test_observer();
assert_eq!(obs.name(), "otel");
}
#[test]
fn records_all_events_without_panic() {
let obs = test_observer();
obs.record_event(&ObserverEvent::AgentStart {
provider: "openrouter".into(),
model: "claude-sonnet".into(),
});
obs.record_event(&ObserverEvent::AgentEnd {
duration: Duration::from_millis(500),
tokens_used: Some(100),
});
obs.record_event(&ObserverEvent::AgentEnd {
duration: Duration::ZERO,
tokens_used: None,
});
obs.record_event(&ObserverEvent::ToolCall {
tool: "shell".into(),
duration: Duration::from_millis(10),
success: true,
});
obs.record_event(&ObserverEvent::ToolCall {
tool: "file_read".into(),
duration: Duration::from_millis(5),
success: false,
});
obs.record_event(&ObserverEvent::ChannelMessage {
channel: "telegram".into(),
direction: "inbound".into(),
});
obs.record_event(&ObserverEvent::HeartbeatTick);
obs.record_event(&ObserverEvent::Error {
component: "provider".into(),
message: "timeout".into(),
});
}
#[test]
fn records_all_metrics_without_panic() {
let obs = test_observer();
obs.record_metric(&ObserverMetric::RequestLatency(Duration::from_secs(2)));
obs.record_metric(&ObserverMetric::TokensUsed(500));
obs.record_metric(&ObserverMetric::TokensUsed(0));
obs.record_metric(&ObserverMetric::ActiveSessions(3));
obs.record_metric(&ObserverMetric::QueueDepth(42));
}
#[test]
fn flush_does_not_panic() {
let obs = test_observer();
obs.record_event(&ObserverEvent::HeartbeatTick);
obs.flush();
}
}