use super::traits::{Observer, ObserverEvent, ObserverMetric}; use opentelemetry::metrics::{Counter, Gauge, Histogram}; use opentelemetry::trace::{Span, SpanKind, Status, Tracer}; use opentelemetry::{global, KeyValue}; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::metrics::SdkMeterProvider; use opentelemetry_sdk::trace::SdkTracerProvider; use std::time::SystemTime; /// OpenTelemetry-backed observer — exports traces and metrics via OTLP. pub struct OtelObserver { tracer_provider: SdkTracerProvider, meter_provider: SdkMeterProvider, // Metrics instruments agent_starts: Counter, agent_duration: Histogram, llm_calls: Counter, llm_duration: Histogram, tool_calls: Counter, tool_duration: Histogram, channel_messages: Counter, heartbeat_ticks: Counter, errors: Counter, request_latency: Histogram, tokens_used: Counter, active_sessions: Gauge, queue_depth: Gauge, } impl OtelObserver { /// Create a new OTel observer exporting to the given OTLP endpoint. /// /// Uses HTTP/protobuf transport (port 4318 by default). /// Falls back to `http://localhost:4318` if no endpoint is provided. pub fn new(endpoint: Option<&str>, service_name: Option<&str>) -> Result { let endpoint = endpoint.unwrap_or("http://localhost:4318"); let service_name = service_name.unwrap_or("zeroclaw"); // ── Trace exporter ────────────────────────────────────── let span_exporter = opentelemetry_otlp::SpanExporter::builder() .with_http() .with_endpoint(endpoint) .build() .map_err(|e| format!("Failed to create OTLP span exporter: {e}"))?; let tracer_provider = SdkTracerProvider::builder() .with_batch_exporter(span_exporter) .with_resource( opentelemetry_sdk::Resource::builder() .with_service_name(service_name.to_string()) .build(), ) .build(); global::set_tracer_provider(tracer_provider.clone()); // ── Metric exporter ───────────────────────────────────── let metric_exporter = opentelemetry_otlp::MetricExporter::builder() .with_http() .with_endpoint(endpoint) .build() .map_err(|e| format!("Failed to create OTLP metric exporter: {e}"))?; let metric_reader = opentelemetry_sdk::metrics::PeriodicReader::builder(metric_exporter).build(); let meter_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder() .with_reader(metric_reader) .with_resource( opentelemetry_sdk::Resource::builder() .with_service_name(service_name.to_string()) .build(), ) .build(); let meter_provider_clone = meter_provider.clone(); global::set_meter_provider(meter_provider); // ── Create metric instruments ──────────────────────────── let meter = global::meter("zeroclaw"); let agent_starts = meter .u64_counter("zeroclaw.agent.starts") .with_description("Total agent invocations") .build(); let agent_duration = meter .f64_histogram("zeroclaw.agent.duration") .with_description("Agent invocation duration in seconds") .with_unit("s") .build(); let llm_calls = meter .u64_counter("zeroclaw.llm.calls") .with_description("Total LLM provider calls") .build(); let llm_duration = meter .f64_histogram("zeroclaw.llm.duration") .with_description("LLM provider call duration in seconds") .with_unit("s") .build(); let tool_calls = meter .u64_counter("zeroclaw.tool.calls") .with_description("Total tool calls") .build(); let tool_duration = meter .f64_histogram("zeroclaw.tool.duration") .with_description("Tool execution duration in seconds") .with_unit("s") .build(); let channel_messages = meter .u64_counter("zeroclaw.channel.messages") .with_description("Total channel messages") .build(); let heartbeat_ticks = meter .u64_counter("zeroclaw.heartbeat.ticks") .with_description("Total heartbeat ticks") .build(); let errors = meter .u64_counter("zeroclaw.errors") .with_description("Total errors by component") .build(); let request_latency = meter .f64_histogram("zeroclaw.request.latency") .with_description("Request latency in seconds") .with_unit("s") .build(); let tokens_used = meter .u64_counter("zeroclaw.tokens.used") .with_description("Total tokens consumed (monotonic)") .build(); let active_sessions = meter .u64_gauge("zeroclaw.sessions.active") .with_description("Current number of active sessions") .build(); let queue_depth = meter .u64_gauge("zeroclaw.queue.depth") .with_description("Current message queue depth") .build(); Ok(Self { tracer_provider, meter_provider: meter_provider_clone, agent_starts, agent_duration, llm_calls, llm_duration, tool_calls, tool_duration, channel_messages, heartbeat_ticks, errors, request_latency, tokens_used, active_sessions, queue_depth, }) } } impl Observer for OtelObserver { fn record_event(&self, event: &ObserverEvent) { let tracer = global::tracer("zeroclaw"); match event { ObserverEvent::AgentStart { provider, model } => { self.agent_starts.add( 1, &[ KeyValue::new("provider", provider.clone()), KeyValue::new("model", model.clone()), ], ); } ObserverEvent::LlmRequest { .. } | ObserverEvent::ToolCallStart { .. } | ObserverEvent::TurnComplete => {} ObserverEvent::LlmResponse { provider, model, duration, success, error_message: _, } => { let secs = duration.as_secs_f64(); let attrs = [ KeyValue::new("provider", provider.clone()), KeyValue::new("model", model.clone()), KeyValue::new("success", success.to_string()), ]; self.llm_calls.add(1, &attrs); self.llm_duration.record(secs, &attrs); // Create a completed span for visibility in trace backends. let start_time = SystemTime::now() .checked_sub(*duration) .unwrap_or(SystemTime::now()); let mut span = tracer.build( opentelemetry::trace::SpanBuilder::from_name("llm.call") .with_kind(SpanKind::Internal) .with_start_time(start_time) .with_attributes(vec![ KeyValue::new("provider", provider.clone()), KeyValue::new("model", model.clone()), KeyValue::new("success", *success), KeyValue::new("duration_s", secs), ]), ); if *success { span.set_status(Status::Ok); } else { span.set_status(Status::error("")); } span.end(); } ObserverEvent::AgentEnd { duration, tokens_used, cost_usd, } => { let secs = duration.as_secs_f64(); let start_time = SystemTime::now() .checked_sub(*duration) .unwrap_or(SystemTime::now()); // Create a completed span with correct timing let mut span = tracer.build( opentelemetry::trace::SpanBuilder::from_name("agent.invocation") .with_kind(SpanKind::Internal) .with_start_time(start_time) .with_attributes(vec![KeyValue::new("duration_s", secs)]), ); if let Some(t) = tokens_used { span.set_attribute(KeyValue::new("tokens_used", *t as i64)); } if let Some(c) = cost_usd { span.set_attribute(KeyValue::new("cost_usd", *c)); } span.end(); self.agent_duration.record(secs, &[]); // Note: tokens are recorded via record_metric(TokensUsed) to avoid // double-counting. AgentEnd only records duration. } ObserverEvent::ToolCall { tool, duration, success, } => { let secs = duration.as_secs_f64(); let start_time = SystemTime::now() .checked_sub(*duration) .unwrap_or(SystemTime::now()); let status = if *success { Status::Ok } else { Status::error("") }; let mut span = tracer.build( opentelemetry::trace::SpanBuilder::from_name("tool.call") .with_kind(SpanKind::Internal) .with_start_time(start_time) .with_attributes(vec![ KeyValue::new("tool.name", tool.clone()), KeyValue::new("tool.success", *success), KeyValue::new("duration_s", secs), ]), ); span.set_status(status); span.end(); let attrs = [ KeyValue::new("tool", tool.clone()), KeyValue::new("success", success.to_string()), ]; self.tool_calls.add(1, &attrs); self.tool_duration .record(secs, &[KeyValue::new("tool", tool.clone())]); } ObserverEvent::ChannelMessage { channel, direction } => { self.channel_messages.add( 1, &[ KeyValue::new("channel", channel.clone()), KeyValue::new("direction", direction.clone()), ], ); } ObserverEvent::HeartbeatTick => { self.heartbeat_ticks.add(1, &[]); } ObserverEvent::Error { component, message } => { // Create an error span for visibility in trace backends let mut span = tracer.build( opentelemetry::trace::SpanBuilder::from_name("error") .with_kind(SpanKind::Internal) .with_attributes(vec![ KeyValue::new("component", component.clone()), KeyValue::new("error.message", message.clone()), ]), ); span.set_status(Status::error(message.clone())); span.end(); self.errors .add(1, &[KeyValue::new("component", component.clone())]); } } } fn record_metric(&self, metric: &ObserverMetric) { match metric { ObserverMetric::RequestLatency(d) => { self.request_latency.record(d.as_secs_f64(), &[]); } ObserverMetric::TokensUsed(t) => { self.tokens_used.add(*t as u64, &[]); } ObserverMetric::ActiveSessions(s) => { self.active_sessions.record(*s as u64, &[]); } ObserverMetric::QueueDepth(d) => { self.queue_depth.record(*d as u64, &[]); } } } fn flush(&self) { if let Err(e) = self.tracer_provider.force_flush() { tracing::warn!("OTel trace flush failed: {e}"); } if let Err(e) = self.meter_provider.force_flush() { tracing::warn!("OTel metric flush failed: {e}"); } } fn name(&self) -> &str { "otel" } } #[cfg(test)] mod tests { use super::*; use std::time::Duration; // Note: OtelObserver::new() requires an OTLP endpoint. // In tests we verify the struct creation fails gracefully // when no collector is available, and test the observer interface // by constructing with a known-unreachable endpoint (spans/metrics // are buffered and exported asynchronously, so recording never panics). fn test_observer() -> OtelObserver { // Create with a dummy endpoint — exports will silently fail // but the observer itself works fine for recording OtelObserver::new(Some("http://127.0.0.1:19999"), Some("zeroclaw-test")) .expect("observer creation should not fail with valid endpoint format") } #[test] fn otel_observer_name() { let obs = test_observer(); assert_eq!(obs.name(), "otel"); } #[test] fn records_all_events_without_panic() { let obs = test_observer(); obs.record_event(&ObserverEvent::AgentStart { provider: "openrouter".into(), model: "claude-sonnet".into(), }); obs.record_event(&ObserverEvent::LlmRequest { provider: "openrouter".into(), model: "claude-sonnet".into(), messages_count: 2, }); obs.record_event(&ObserverEvent::LlmResponse { provider: "openrouter".into(), model: "claude-sonnet".into(), duration: Duration::from_millis(250), success: true, error_message: None, }); obs.record_event(&ObserverEvent::AgentEnd { duration: Duration::from_millis(500), tokens_used: Some(100), cost_usd: Some(0.0015), }); obs.record_event(&ObserverEvent::AgentEnd { duration: Duration::ZERO, tokens_used: None, cost_usd: None, }); obs.record_event(&ObserverEvent::ToolCallStart { tool: "shell".into(), }); obs.record_event(&ObserverEvent::ToolCall { tool: "shell".into(), duration: Duration::from_millis(10), success: true, }); obs.record_event(&ObserverEvent::ToolCall { tool: "file_read".into(), duration: Duration::from_millis(5), success: false, }); obs.record_event(&ObserverEvent::TurnComplete); obs.record_event(&ObserverEvent::ChannelMessage { channel: "telegram".into(), direction: "inbound".into(), }); obs.record_event(&ObserverEvent::HeartbeatTick); obs.record_event(&ObserverEvent::Error { component: "provider".into(), message: "timeout".into(), }); } #[test] fn records_all_metrics_without_panic() { let obs = test_observer(); obs.record_metric(&ObserverMetric::RequestLatency(Duration::from_secs(2))); obs.record_metric(&ObserverMetric::TokensUsed(500)); obs.record_metric(&ObserverMetric::TokensUsed(0)); obs.record_metric(&ObserverMetric::ActiveSessions(3)); obs.record_metric(&ObserverMetric::QueueDepth(42)); } #[test] fn flush_does_not_panic() { let obs = test_observer(); obs.record_event(&ObserverEvent::HeartbeatTick); obs.flush(); } }