diff --git a/src/agent/loop_.rs b/src/agent/loop_.rs index 402b8b7..a1aea97 100644 --- a/src/agent/loop_.rs +++ b/src/agent/loop_.rs @@ -344,13 +344,43 @@ pub(crate) async fn agent_turn( history: &mut Vec, tools_registry: &[Box], observer: &dyn Observer, + provider_name: &str, model: &str, temperature: f64, ) -> Result { for _iteration in 0..MAX_TOOL_ITERATIONS { - let response = provider + observer.record_event(&ObserverEvent::LlmRequest { + provider: provider_name.to_string(), + model: model.to_string(), + messages_count: history.len(), + }); + + let llm_started_at = Instant::now(); + let response = match provider .chat_with_history(history, model, temperature) - .await?; + .await + { + Ok(resp) => { + observer.record_event(&ObserverEvent::LlmResponse { + provider: provider_name.to_string(), + model: model.to_string(), + duration: llm_started_at.elapsed(), + success: true, + error_message: None, + }); + resp + } + Err(e) => { + observer.record_event(&ObserverEvent::LlmResponse { + provider: provider_name.to_string(), + model: model.to_string(), + duration: llm_started_at.elapsed(), + success: false, + error_message: Some(crate::providers::sanitize_api_error(&e.to_string())), + }); + return Err(e); + } + }; let (text, tool_calls) = parse_tool_calls(&response); @@ -369,6 +399,9 @@ pub(crate) async fn agent_turn( // Execute each tool call and build results let mut tool_results = String::new(); for call in &tool_calls { + observer.record_event(&ObserverEvent::ToolCallStart { + tool: call.name.clone(), + }); let start = Instant::now(); let result = if let Some(tool) = find_tool(tools_registry, &call.name) { match tool.execute(call.arguments.clone()).await { @@ -445,10 +478,18 @@ pub async fn run( provider_override: Option, model_override: Option, temperature: f64, + verbose: bool, ) -> Result<()> { // ── Wire up agnostic subsystems ────────────────────────────── - let observer: Arc = - Arc::from(observability::create_observer(&config.observability)); + let base_observer = observability::create_observer(&config.observability); + let observer: Arc = if verbose { + Arc::from(Box::new(observability::MultiObserver::new(vec![ + base_observer, + Box::new(observability::VerboseObserver::new()), + ])) as Box) + } else { + Arc::from(base_observer) + }; let runtime: Arc = Arc::from(runtime::create_runtime(&config.runtime)?); let security = Arc::new(SecurityPolicy::from_config( @@ -603,11 +644,13 @@ pub async fn run( &mut history, &tools_registry, observer.as_ref(), + provider_name, model_name, temperature, ) .await?; println!("{response}"); + observer.record_event(&ObserverEvent::TurnComplete); // Auto-save assistant response to daily log if config.memory.auto_save { @@ -656,6 +699,7 @@ pub async fn run( &mut history, &tools_registry, observer.as_ref(), + provider_name, model_name, temperature, ) @@ -668,6 +712,7 @@ pub async fn run( } }; println!("\n{response}\n"); + observer.record_event(&ObserverEvent::TurnComplete); // Auto-compaction before hard trimming to preserve long-context signal. if let Ok(compacted) = diff --git a/src/main.rs b/src/main.rs index 9d35928..6c59090 100644 --- a/src/main.rs +++ b/src/main.rs @@ -132,6 +132,10 @@ enum Commands { /// Temperature (0.0 - 2.0) #[arg(short, long, default_value = "0.7")] temperature: f64, + + /// Print user-facing progress lines via observer (`>` send, `<` receive/complete). + #[arg(long)] + verbose: bool, }, /// Start the gateway server (webhooks, websockets) @@ -339,7 +343,8 @@ async fn main() -> Result<()> { provider, model, temperature, - } => agent::run(config, message, provider, model, temperature).await, + verbose, + } => agent::run(config, message, provider, model, temperature, verbose).await, Commands::Gateway { port, host } => { if port == 0 { diff --git a/src/observability/log.rs b/src/observability/log.rs index eed4136..9e3d062 100644 --- a/src/observability/log.rs +++ b/src/observability/log.rs @@ -16,6 +16,35 @@ impl Observer for LogObserver { ObserverEvent::AgentStart { provider, model } => { info!(provider = %provider, model = %model, "agent.start"); } + ObserverEvent::LlmRequest { + provider, + model, + messages_count, + } => { + info!( + provider = %provider, + model = %model, + messages_count = messages_count, + "llm.request" + ); + } + ObserverEvent::LlmResponse { + provider, + model, + duration, + success, + error_message, + } => { + let ms = u64::try_from(duration.as_millis()).unwrap_or(u64::MAX); + info!( + provider = %provider, + model = %model, + duration_ms = ms, + success = success, + error = ?error_message, + "llm.response" + ); + } ObserverEvent::AgentEnd { duration, tokens_used, @@ -23,6 +52,9 @@ impl Observer for LogObserver { let ms = u64::try_from(duration.as_millis()).unwrap_or(u64::MAX); info!(duration_ms = ms, tokens = ?tokens_used, "agent.end"); } + ObserverEvent::ToolCallStart { tool } => { + info!(tool = %tool, "tool.start"); + } ObserverEvent::ToolCall { tool, duration, @@ -31,6 +63,9 @@ impl Observer for LogObserver { let ms = u64::try_from(duration.as_millis()).unwrap_or(u64::MAX); info!(tool = %tool, duration_ms = ms, success = success, "tool.call"); } + ObserverEvent::TurnComplete => { + info!("turn.complete"); + } ObserverEvent::ChannelMessage { channel, direction } => { info!(channel = %channel, direction = %direction, "channel.message"); } @@ -83,6 +118,18 @@ mod tests { provider: "openrouter".into(), model: "claude-sonnet".into(), }); + obs.record_event(&ObserverEvent::LlmRequest { + provider: "openrouter".into(), + model: "claude-sonnet".into(), + messages_count: 2, + }); + obs.record_event(&ObserverEvent::LlmResponse { + provider: "openrouter".into(), + model: "claude-sonnet".into(), + duration: Duration::from_millis(250), + success: true, + error_message: None, + }); obs.record_event(&ObserverEvent::AgentEnd { duration: Duration::from_millis(500), tokens_used: Some(100), @@ -91,11 +138,15 @@ mod tests { duration: Duration::ZERO, tokens_used: None, }); + obs.record_event(&ObserverEvent::ToolCallStart { + tool: "shell".into(), + }); obs.record_event(&ObserverEvent::ToolCall { tool: "shell".into(), duration: Duration::from_millis(10), success: false, }); + obs.record_event(&ObserverEvent::TurnComplete); obs.record_event(&ObserverEvent::ChannelMessage { channel: "telegram".into(), direction: "outbound".into(), diff --git a/src/observability/mod.rs b/src/observability/mod.rs index a399353..1093a4e 100644 --- a/src/observability/mod.rs +++ b/src/observability/mod.rs @@ -3,11 +3,14 @@ pub mod multi; pub mod noop; pub mod otel; pub mod traits; +pub mod verbose; pub use self::log::LogObserver; +pub use self::multi::MultiObserver; pub use noop::NoopObserver; pub use otel::OtelObserver; pub use traits::{Observer, ObserverEvent}; +pub use verbose::VerboseObserver; use crate::config::ObservabilityConfig; diff --git a/src/observability/noop.rs b/src/observability/noop.rs index 31f3a34..1189490 100644 --- a/src/observability/noop.rs +++ b/src/observability/noop.rs @@ -33,6 +33,18 @@ mod tests { provider: "test".into(), model: "test".into(), }); + obs.record_event(&ObserverEvent::LlmRequest { + provider: "test".into(), + model: "test".into(), + messages_count: 2, + }); + obs.record_event(&ObserverEvent::LlmResponse { + provider: "test".into(), + model: "test".into(), + duration: Duration::from_millis(1), + success: true, + error_message: None, + }); obs.record_event(&ObserverEvent::AgentEnd { duration: Duration::from_millis(100), tokens_used: Some(42), @@ -41,11 +53,15 @@ mod tests { duration: Duration::ZERO, tokens_used: None, }); + obs.record_event(&ObserverEvent::ToolCallStart { + tool: "shell".into(), + }); obs.record_event(&ObserverEvent::ToolCall { tool: "shell".into(), duration: Duration::from_secs(1), success: true, }); + obs.record_event(&ObserverEvent::TurnComplete); obs.record_event(&ObserverEvent::ChannelMessage { channel: "cli".into(), direction: "inbound".into(), diff --git a/src/observability/otel.rs b/src/observability/otel.rs index dd3d06f..49f5ec0 100644 --- a/src/observability/otel.rs +++ b/src/observability/otel.rs @@ -15,6 +15,8 @@ pub struct OtelObserver { // Metrics instruments agent_starts: Counter, agent_duration: Histogram, + llm_calls: Counter, + llm_duration: Histogram, tool_calls: Counter, tool_duration: Histogram, channel_messages: Counter, @@ -89,6 +91,17 @@ impl OtelObserver { .with_unit("s") .build(); + let llm_calls = meter + .u64_counter("zeroclaw.llm.calls") + .with_description("Total LLM provider calls") + .build(); + + let llm_duration = meter + .f64_histogram("zeroclaw.llm.duration") + .with_description("LLM provider call duration in seconds") + .with_unit("s") + .build(); + let tool_calls = meter .u64_counter("zeroclaw.tool.calls") .with_description("Total tool calls") @@ -141,6 +154,8 @@ impl OtelObserver { meter_provider: meter_provider_clone, agent_starts, agent_duration, + llm_calls, + llm_duration, tool_calls, tool_duration, channel_messages, @@ -168,6 +183,45 @@ impl Observer for OtelObserver { ], ); } + ObserverEvent::LlmRequest { .. } => {} + ObserverEvent::LlmResponse { + provider, + model, + duration, + success, + error_message: _, + } => { + let secs = duration.as_secs_f64(); + let attrs = [ + KeyValue::new("provider", provider.clone()), + KeyValue::new("model", model.clone()), + KeyValue::new("success", success.to_string()), + ]; + self.llm_calls.add(1, &attrs); + self.llm_duration.record(secs, &attrs); + + // Create a completed span for visibility in trace backends. + let start_time = SystemTime::now() + .checked_sub(*duration) + .unwrap_or(SystemTime::now()); + let mut span = tracer.build( + opentelemetry::trace::SpanBuilder::from_name("llm.call") + .with_kind(SpanKind::Internal) + .with_start_time(start_time) + .with_attributes(vec![ + KeyValue::new("provider", provider.clone()), + KeyValue::new("model", model.clone()), + KeyValue::new("success", *success), + KeyValue::new("duration_s", secs), + ]), + ); + if *success { + span.set_status(Status::Ok); + } else { + span.set_status(Status::error("")); + } + span.end(); + } ObserverEvent::AgentEnd { duration, tokens_used, @@ -193,6 +247,7 @@ impl Observer for OtelObserver { // Note: tokens are recorded via record_metric(TokensUsed) to avoid // double-counting. AgentEnd only records duration. } + ObserverEvent::ToolCallStart { .. } => {} ObserverEvent::ToolCall { tool, duration, @@ -230,6 +285,7 @@ impl Observer for OtelObserver { self.tool_duration .record(secs, &[KeyValue::new("tool", tool.clone())]); } + ObserverEvent::TurnComplete => {} ObserverEvent::ChannelMessage { channel, direction } => { self.channel_messages.add( 1, @@ -323,6 +379,18 @@ mod tests { provider: "openrouter".into(), model: "claude-sonnet".into(), }); + obs.record_event(&ObserverEvent::LlmRequest { + provider: "openrouter".into(), + model: "claude-sonnet".into(), + messages_count: 2, + }); + obs.record_event(&ObserverEvent::LlmResponse { + provider: "openrouter".into(), + model: "claude-sonnet".into(), + duration: Duration::from_millis(250), + success: true, + error_message: None, + }); obs.record_event(&ObserverEvent::AgentEnd { duration: Duration::from_millis(500), tokens_used: Some(100), @@ -331,6 +399,9 @@ mod tests { duration: Duration::ZERO, tokens_used: None, }); + obs.record_event(&ObserverEvent::ToolCallStart { + tool: "shell".into(), + }); obs.record_event(&ObserverEvent::ToolCall { tool: "shell".into(), duration: Duration::from_millis(10), @@ -341,6 +412,7 @@ mod tests { duration: Duration::from_millis(5), success: false, }); + obs.record_event(&ObserverEvent::TurnComplete); obs.record_event(&ObserverEvent::ChannelMessage { channel: "telegram".into(), direction: "inbound".into(), diff --git a/src/observability/traits.rs b/src/observability/traits.rs index b5b05f3..a1eb10f 100644 --- a/src/observability/traits.rs +++ b/src/observability/traits.rs @@ -7,15 +7,38 @@ pub enum ObserverEvent { provider: String, model: String, }, + /// A request is about to be sent to an LLM provider. + /// + /// This is emitted immediately before a provider call so observers can print + /// user-facing progress without leaking prompt contents. + LlmRequest { + provider: String, + model: String, + messages_count: usize, + }, + /// Result of a single LLM provider call. + LlmResponse { + provider: String, + model: String, + duration: Duration, + success: bool, + error_message: Option, + }, AgentEnd { duration: Duration, tokens_used: Option, }, + /// A tool call is about to be executed. + ToolCallStart { + tool: String, + }, ToolCall { tool: String, duration: Duration, success: bool, }, + /// The agent produced a final answer for the current user message. + TurnComplete, ChannelMessage { channel: String, direction: String, diff --git a/src/observability/verbose.rs b/src/observability/verbose.rs new file mode 100644 index 0000000..364be1e --- /dev/null +++ b/src/observability/verbose.rs @@ -0,0 +1,96 @@ +use super::traits::{Observer, ObserverEvent, ObserverMetric}; + +/// Human-readable progress observer for interactive CLI sessions. +/// +/// This observer prints compact `>` / `<` progress lines without exposing +/// prompt contents. It is intended to be opt-in (e.g. `--verbose`). +pub struct VerboseObserver; + +impl VerboseObserver { + pub fn new() -> Self { + Self + } +} + +impl Observer for VerboseObserver { + fn record_event(&self, event: &ObserverEvent) { + match event { + ObserverEvent::LlmRequest { + provider, + model, + messages_count, + } => { + eprintln!("> Thinking"); + eprintln!( + "> Send (provider={}, model={}, messages={})", + provider, model, messages_count + ); + } + ObserverEvent::LlmResponse { + duration, success, .. + } => { + let ms = u64::try_from(duration.as_millis()).unwrap_or(u64::MAX); + eprintln!("< Receive (success={success}, duration_ms={ms})"); + } + ObserverEvent::ToolCallStart { tool } => { + eprintln!("> Tool {tool}"); + } + ObserverEvent::ToolCall { + tool, + duration, + success, + } => { + let ms = u64::try_from(duration.as_millis()).unwrap_or(u64::MAX); + eprintln!("< Tool {tool} (success={success}, duration_ms={ms})"); + } + ObserverEvent::TurnComplete => { + eprintln!("< Complete"); + } + _ => {} + } + } + + #[inline(always)] + fn record_metric(&self, _metric: &ObserverMetric) {} + + fn name(&self) -> &str { + "verbose" + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn verbose_name() { + assert_eq!(VerboseObserver::new().name(), "verbose"); + } + + #[test] + fn verbose_events_do_not_panic() { + let obs = VerboseObserver::new(); + obs.record_event(&ObserverEvent::LlmRequest { + provider: "openrouter".into(), + model: "claude".into(), + messages_count: 3, + }); + obs.record_event(&ObserverEvent::LlmResponse { + provider: "openrouter".into(), + model: "claude".into(), + duration: Duration::from_millis(12), + success: true, + error_message: None, + }); + obs.record_event(&ObserverEvent::ToolCallStart { + tool: "shell".into(), + }); + obs.record_event(&ObserverEvent::ToolCall { + tool: "shell".into(), + duration: Duration::from_millis(2), + success: true, + }); + obs.record_event(&ObserverEvent::TurnComplete); + } +}