use crate::config::Config; use crate::memory::{self, Memory, MemoryCategory}; use crate::observability::{self, Observer, ObserverEvent}; use crate::providers::{self, ChatMessage, Provider}; use crate::runtime; use crate::security::SecurityPolicy; use crate::tools::{self, Tool}; use crate::util::truncate_with_ellipsis; use anyhow::Result; use std::fmt::Write; use std::io::Write as IoWrite; use std::sync::Arc; use std::time::Instant; /// Maximum agentic tool-use iterations per user message to prevent runaway loops. const MAX_TOOL_ITERATIONS: usize = 10; /// Maximum number of non-system messages to keep in history. /// When exceeded, the oldest messages are dropped (system prompt is always preserved). const MAX_HISTORY_MESSAGES: usize = 50; /// Trim conversation history to prevent unbounded growth. /// Preserves the system prompt (first message if role=system) and the most recent messages. fn trim_history(history: &mut Vec) { // Nothing to trim if within limit let has_system = history.first().map_or(false, |m| m.role == "system"); let non_system_count = if has_system { history.len() - 1 } else { history.len() }; if non_system_count <= MAX_HISTORY_MESSAGES { return; } let start = if has_system { 1 } else { 0 }; let to_remove = non_system_count - MAX_HISTORY_MESSAGES; history.drain(start..start + to_remove); } /// Build context preamble by searching memory for relevant entries async fn build_context(mem: &dyn Memory, user_msg: &str) -> String { let mut context = String::new(); // Pull relevant memories for this message if let Ok(entries) = mem.recall(user_msg, 5).await { if !entries.is_empty() { context.push_str("[Memory context]\n"); for entry in &entries { let _ = writeln!(context, "- {}: {}", entry.key, entry.content); } context.push('\n'); } } context } /// Find a tool by name in the registry. fn find_tool<'a>(tools: &'a [Box], name: &str) -> Option<&'a dyn Tool> { tools.iter().find(|t| t.name() == name).map(|t| t.as_ref()) } /// Parse tool calls from an LLM response that uses XML-style function calling. /// /// Expected format (common with system-prompt-guided tool use): /// ```text /// /// {"name": "shell", "arguments": {"command": "ls"}} /// /// ``` /// /// Also supports JSON with `tool_calls` array from OpenAI-format responses. fn parse_tool_calls(response: &str) -> (String, Vec) { let mut text_parts = Vec::new(); let mut calls = Vec::new(); let mut remaining = response; while let Some(start) = remaining.find("") { // Everything before the tag is text let before = &remaining[..start]; if !before.trim().is_empty() { text_parts.push(before.trim().to_string()); } if let Some(end) = remaining[start..].find("") { let inner = &remaining[start + 11..start + end]; match serde_json::from_str::(inner.trim()) { Ok(parsed) => { let name = parsed .get("name") .and_then(|v| v.as_str()) .unwrap_or("") .to_string(); let arguments = parsed .get("arguments") .cloned() .unwrap_or(serde_json::Value::Object(serde_json::Map::new())); calls.push(ParsedToolCall { name, arguments }); } Err(e) => { tracing::warn!("Malformed JSON: {e}"); } } remaining = &remaining[start + end + 12..]; } else { break; } } // Remaining text after last tool call if !remaining.trim().is_empty() { text_parts.push(remaining.trim().to_string()); } (text_parts.join("\n"), calls) } #[derive(Debug)] struct ParsedToolCall { name: String, arguments: serde_json::Value, } /// Execute a single turn of the agent loop: send messages, parse tool calls, /// execute tools, and loop until the LLM produces a final text response. async fn agent_turn( provider: &dyn Provider, history: &mut Vec, tools_registry: &[Box], observer: &dyn Observer, model: &str, temperature: f64, ) -> Result { for _iteration in 0..MAX_TOOL_ITERATIONS { let response = provider .chat_with_history(history, model, temperature) .await?; let (text, tool_calls) = parse_tool_calls(&response); if tool_calls.is_empty() { // No tool calls — this is the final response history.push(ChatMessage::assistant(&response)); return Ok(if text.is_empty() { response } else { text }); } // Print any text the LLM produced alongside tool calls if !text.is_empty() { print!("{text}"); let _ = std::io::stdout().flush(); } // Execute each tool call and build results let mut tool_results = String::new(); for call in &tool_calls { let start = Instant::now(); let result = if let Some(tool) = find_tool(tools_registry, &call.name) { match tool.execute(call.arguments.clone()).await { Ok(r) => { observer.record_event(&ObserverEvent::ToolCall { tool: call.name.clone(), duration: start.elapsed(), success: r.success, }); if r.success { r.output } else { format!("Error: {}", r.error.unwrap_or_else(|| r.output)) } } Err(e) => { observer.record_event(&ObserverEvent::ToolCall { tool: call.name.clone(), duration: start.elapsed(), success: false, }); format!("Error executing {}: {e}", call.name) } } } else { format!("Unknown tool: {}", call.name) }; let _ = writeln!( tool_results, "\n{}\n", call.name, result ); } // Add assistant message with tool calls + tool results to history history.push(ChatMessage::assistant(&response)); history.push(ChatMessage::user(format!( "[Tool results]\n{tool_results}" ))); } anyhow::bail!("Agent exceeded maximum tool iterations ({MAX_TOOL_ITERATIONS})") } /// Build the tool instruction block for the system prompt so the LLM knows /// how to invoke tools. fn build_tool_instructions(tools_registry: &[Box]) -> String { let mut instructions = String::new(); instructions.push_str("\n## Tool Use Protocol\n\n"); instructions.push_str("To use a tool, wrap a JSON object in tags:\n\n"); instructions.push_str("```\n\n{\"name\": \"tool_name\", \"arguments\": {\"param\": \"value\"}}\n\n```\n\n"); instructions.push_str("You may use multiple tool calls in a single response. "); instructions.push_str("After tool execution, results appear in tags. "); instructions.push_str("Continue reasoning with the results until you can give a final answer.\n\n"); instructions.push_str("### Available Tools\n\n"); for tool in tools_registry { let _ = writeln!( instructions, "**{}**: {}\nParameters: `{}`\n", tool.name(), tool.description(), tool.parameters_schema() ); } instructions } #[allow(clippy::too_many_lines)] pub async fn run( config: Config, message: Option, provider_override: Option, model_override: Option, temperature: f64, ) -> Result<()> { // ── Wire up agnostic subsystems ────────────────────────────── let observer: Arc = Arc::from(observability::create_observer(&config.observability)); let runtime: Arc = Arc::from(runtime::create_runtime(&config.runtime)?); let security = Arc::new(SecurityPolicy::from_config( &config.autonomy, &config.workspace_dir, )); // ── Memory (the brain) ──────────────────────────────────────── let mem: Arc = Arc::from(memory::create_memory( &config.memory, &config.workspace_dir, config.api_key.as_deref(), )?); tracing::info!(backend = mem.name(), "Memory initialized"); // ── Tools (including memory tools) ──────────────────────────── let composio_key = if config.composio.enabled { config.composio.api_key.as_deref() } else { None }; let tools_registry = tools::all_tools_with_runtime( &security, runtime, mem.clone(), composio_key, &config.browser, ); // ── Resolve provider ───────────────────────────────────────── let provider_name = provider_override .as_deref() .or(config.default_provider.as_deref()) .unwrap_or("openrouter"); let model_name = model_override .as_deref() .or(config.default_model.as_deref()) .unwrap_or("anthropic/claude-sonnet-4-20250514"); let provider: Box = providers::create_routed_provider( provider_name, config.api_key.as_deref(), &config.reliability, &config.model_routes, model_name, )?; observer.record_event(&ObserverEvent::AgentStart { provider: provider_name.to_string(), model: model_name.to_string(), }); // ── Build system prompt from workspace MD files (OpenClaw framework) ── let skills = crate::skills::load_skills(&config.workspace_dir); let mut tool_descs: Vec<(&str, &str)> = vec![ ( "shell", "Execute terminal commands. Use when: running local checks, build/test commands, diagnostics. Don't use when: a safer dedicated tool exists, or command is destructive without approval.", ), ( "file_read", "Read file contents. Use when: inspecting project files, configs, logs. Don't use when: a targeted search is enough.", ), ( "file_write", "Write file contents. Use when: applying focused edits, scaffolding files, updating docs/code. Don't use when: side effects are unclear or file ownership is uncertain.", ), ( "memory_store", "Save to memory. Use when: preserving durable preferences, decisions, key context. Don't use when: information is transient/noisy/sensitive without need.", ), ( "memory_recall", "Search memory. Use when: retrieving prior decisions, user preferences, historical context. Don't use when: answer is already in current context.", ), ( "memory_forget", "Delete a memory entry. Use when: memory is incorrect/stale or explicitly requested for removal. Don't use when: impact is uncertain.", ), ]; tool_descs.push(( "screenshot", "Capture a screenshot of the current screen. Returns file path and base64-encoded PNG. Use when: visual verification, UI inspection, debugging displays.", )); tool_descs.push(( "image_info", "Read image file metadata (format, dimensions, size) and optionally base64-encode it. Use when: inspecting images, preparing visual data for analysis.", )); if config.browser.enabled { tool_descs.push(( "browser_open", "Open approved HTTPS URLs in Brave Browser (allowlist-only, no scraping)", )); } if config.composio.enabled { tool_descs.push(( "composio", "Execute actions on 1000+ apps via Composio (Gmail, Notion, GitHub, Slack, etc.). Use action='list' to discover, 'execute' to run, 'connect' to OAuth.", )); } let mut system_prompt = crate::channels::build_system_prompt( &config.workspace_dir, model_name, &tool_descs, &skills, Some(&config.identity), ); // Append structured tool-use instructions with schemas system_prompt.push_str(&build_tool_instructions(&tools_registry)); // ── Execute ────────────────────────────────────────────────── let start = Instant::now(); if let Some(msg) = message { // Auto-save user message to memory if config.memory.auto_save { let _ = mem .store("user_msg", &msg, MemoryCategory::Conversation) .await; } // Inject memory context into user message let context = build_context(mem.as_ref(), &msg).await; let enriched = if context.is_empty() { msg.clone() } else { format!("{context}{msg}") }; let mut history = vec![ ChatMessage::system(&system_prompt), ChatMessage::user(&enriched), ]; let response = agent_turn( provider.as_ref(), &mut history, &tools_registry, observer.as_ref(), model_name, temperature, ) .await?; println!("{response}"); // Auto-save assistant response to daily log if config.memory.auto_save { let summary = truncate_with_ellipsis(&response, 100); let _ = mem .store("assistant_resp", &summary, MemoryCategory::Daily) .await; } } else { println!("🦀 ZeroClaw Interactive Mode"); println!("Type /quit to exit.\n"); let (tx, mut rx) = tokio::sync::mpsc::channel(32); let cli = crate::channels::CliChannel::new(); // Spawn listener let listen_handle = tokio::spawn(async move { let _ = crate::channels::Channel::listen(&cli, tx).await; }); // Persistent conversation history across turns let mut history = vec![ChatMessage::system(&system_prompt)]; while let Some(msg) = rx.recv().await { // Auto-save conversation turns if config.memory.auto_save { let _ = mem .store("user_msg", &msg.content, MemoryCategory::Conversation) .await; } // Inject memory context into user message let context = build_context(mem.as_ref(), &msg.content).await; let enriched = if context.is_empty() { msg.content.clone() } else { format!("{context}{}", msg.content) }; history.push(ChatMessage::user(&enriched)); let response = match agent_turn( provider.as_ref(), &mut history, &tools_registry, observer.as_ref(), model_name, temperature, ) .await { Ok(resp) => resp, Err(e) => { eprintln!("\nError: {e}\n"); continue; } }; println!("\n{response}\n"); // Prevent unbounded history growth in long interactive sessions trim_history(&mut history); if config.memory.auto_save { let summary = truncate_with_ellipsis(&response, 100); let _ = mem .store("assistant_resp", &summary, MemoryCategory::Daily) .await; } } listen_handle.abort(); } let duration = start.elapsed(); observer.record_event(&ObserverEvent::AgentEnd { duration, tokens_used: None, }); Ok(()) } #[cfg(test)] mod tests { use super::*; #[test] fn parse_tool_calls_extracts_single_call() { let response = r#"Let me check that. {"name": "shell", "arguments": {"command": "ls -la"}} "#; let (text, calls) = parse_tool_calls(response); assert_eq!(text, "Let me check that."); assert_eq!(calls.len(), 1); assert_eq!(calls[0].name, "shell"); assert_eq!( calls[0].arguments.get("command").unwrap().as_str().unwrap(), "ls -la" ); } #[test] fn parse_tool_calls_extracts_multiple_calls() { let response = r#" {"name": "file_read", "arguments": {"path": "a.txt"}} {"name": "file_read", "arguments": {"path": "b.txt"}} "#; let (_, calls) = parse_tool_calls(response); assert_eq!(calls.len(), 2); assert_eq!(calls[0].name, "file_read"); assert_eq!(calls[1].name, "file_read"); } #[test] fn parse_tool_calls_returns_text_only_when_no_calls() { let response = "Just a normal response with no tools."; let (text, calls) = parse_tool_calls(response); assert_eq!(text, "Just a normal response with no tools."); assert!(calls.is_empty()); } #[test] fn parse_tool_calls_handles_malformed_json() { let response = r#" not valid json Some text after."#; let (text, calls) = parse_tool_calls(response); assert!(calls.is_empty()); assert!(text.contains("Some text after.")); } #[test] fn parse_tool_calls_text_before_and_after() { let response = r#"Before text. {"name": "shell", "arguments": {"command": "echo hi"}} After text."#; let (text, calls) = parse_tool_calls(response); assert!(text.contains("Before text.")); assert!(text.contains("After text.")); assert_eq!(calls.len(), 1); } #[test] fn build_tool_instructions_includes_all_tools() { use crate::security::SecurityPolicy; let security = Arc::new(SecurityPolicy::from_config( &crate::config::AutonomyConfig::default(), std::path::Path::new("/tmp"), )); let tools = tools::default_tools(security); let instructions = build_tool_instructions(&tools); assert!(instructions.contains("## Tool Use Protocol")); assert!(instructions.contains("")); assert!(instructions.contains("shell")); assert!(instructions.contains("file_read")); assert!(instructions.contains("file_write")); } #[test] fn trim_history_preserves_system_prompt() { let mut history = vec![ChatMessage::system("system prompt")]; for i in 0..MAX_HISTORY_MESSAGES + 20 { history.push(ChatMessage::user(format!("msg {i}"))); } let original_len = history.len(); assert!(original_len > MAX_HISTORY_MESSAGES + 1); trim_history(&mut history); // System prompt preserved assert_eq!(history[0].role, "system"); assert_eq!(history[0].content, "system prompt"); // Trimmed to limit assert_eq!(history.len(), MAX_HISTORY_MESSAGES + 1); // +1 for system // Most recent messages preserved let last = &history[history.len() - 1]; assert_eq!( last.content, format!("msg {}", MAX_HISTORY_MESSAGES + 19) ); } #[test] fn trim_history_noop_when_within_limit() { let mut history = vec![ ChatMessage::system("sys"), ChatMessage::user("hello"), ChatMessage::assistant("hi"), ]; trim_history(&mut history); assert_eq!(history.len(), 3); } }