use crate::config::Config; use crate::memory::{self, Memory, MemoryCategory}; use crate::observability::{self, Observer, ObserverEvent}; use crate::providers::{self, ChatMessage, Provider}; use crate::runtime; use crate::security::SecurityPolicy; use crate::tools::{self, Tool}; use crate::util::truncate_with_ellipsis; use anyhow::Result; use std::fmt::Write; use std::io::Write as IoWrite; use std::sync::Arc; use std::time::Instant; use uuid::Uuid; /// Maximum agentic tool-use iterations per user message to prevent runaway loops. const MAX_TOOL_ITERATIONS: usize = 10; /// Maximum number of non-system messages to keep in history. /// When exceeded, the oldest messages are dropped (system prompt is always preserved). const MAX_HISTORY_MESSAGES: usize = 50; fn autosave_memory_key(prefix: &str) -> String { format!("{prefix}_{}", Uuid::new_v4()) } /// Trim conversation history to prevent unbounded growth. /// Preserves the system prompt (first message if role=system) and the most recent messages. fn trim_history(history: &mut Vec) { // Nothing to trim if within limit let has_system = history.first().map_or(false, |m| m.role == "system"); let non_system_count = if has_system { history.len() - 1 } else { history.len() }; if non_system_count <= MAX_HISTORY_MESSAGES { return; } let start = if has_system { 1 } else { 0 }; let to_remove = non_system_count - MAX_HISTORY_MESSAGES; history.drain(start..start + to_remove); } /// Build context preamble by searching memory for relevant entries async fn build_context(mem: &dyn Memory, user_msg: &str) -> String { let mut context = String::new(); // Pull relevant memories for this message if let Ok(entries) = mem.recall(user_msg, 5).await { if !entries.is_empty() { context.push_str("[Memory context]\n"); for entry in &entries { let _ = writeln!(context, "- {}: {}", entry.key, entry.content); } context.push('\n'); } } context } /// Find a tool by name in the registry. fn find_tool<'a>(tools: &'a [Box], name: &str) -> Option<&'a dyn Tool> { tools.iter().find(|t| t.name() == name).map(|t| t.as_ref()) } fn parse_arguments_value(raw: Option<&serde_json::Value>) -> serde_json::Value { match raw { Some(serde_json::Value::String(s)) => serde_json::from_str::(s) .unwrap_or_else(|_| serde_json::Value::Object(serde_json::Map::new())), Some(value) => value.clone(), None => serde_json::Value::Object(serde_json::Map::new()), } } fn parse_tool_call_value(value: &serde_json::Value) -> Option { if let Some(function) = value.get("function") { let name = function .get("name") .and_then(|v| v.as_str()) .unwrap_or("") .trim() .to_string(); if !name.is_empty() { let arguments = parse_arguments_value(function.get("arguments")); return Some(ParsedToolCall { name, arguments }); } } let name = value .get("name") .and_then(|v| v.as_str()) .unwrap_or("") .trim() .to_string(); if name.is_empty() { return None; } let arguments = parse_arguments_value(value.get("arguments")); Some(ParsedToolCall { name, arguments }) } fn parse_tool_calls_from_json_value(value: &serde_json::Value) -> Vec { let mut calls = Vec::new(); if let Some(tool_calls) = value.get("tool_calls").and_then(|v| v.as_array()) { for call in tool_calls { if let Some(parsed) = parse_tool_call_value(call) { calls.push(parsed); } } if !calls.is_empty() { return calls; } } if let Some(array) = value.as_array() { for item in array { if let Some(parsed) = parse_tool_call_value(item) { calls.push(parsed); } } return calls; } if let Some(parsed) = parse_tool_call_value(value) { calls.push(parsed); } calls } fn extract_json_values(input: &str) -> Vec { let mut values = Vec::new(); let trimmed = input.trim(); if trimmed.is_empty() { return values; } if let Ok(value) = serde_json::from_str::(trimmed) { values.push(value); return values; } let char_positions: Vec<(usize, char)> = trimmed.char_indices().collect(); let mut idx = 0; while idx < char_positions.len() { let (byte_idx, ch) = char_positions[idx]; if ch == '{' || ch == '[' { let slice = &trimmed[byte_idx..]; let mut stream = serde_json::Deserializer::from_str(slice).into_iter::(); if let Some(Ok(value)) = stream.next() { let consumed = stream.byte_offset(); if consumed > 0 { values.push(value); let next_byte = byte_idx + consumed; while idx < char_positions.len() && char_positions[idx].0 < next_byte { idx += 1; } continue; } } } idx += 1; } values } /// Parse tool calls from an LLM response that uses XML-style function calling. /// /// Expected format (common with system-prompt-guided tool use): /// ```text /// /// {"name": "shell", "arguments": {"command": "ls"}} /// /// ``` /// /// Also supports JSON with `tool_calls` array from OpenAI-format responses. fn parse_tool_calls(response: &str) -> (String, Vec) { let mut text_parts = Vec::new(); let mut calls = Vec::new(); let mut remaining = response; // First, try to parse as OpenAI-style JSON response with tool_calls array // This handles providers like Minimax that return tool_calls in native JSON format if let Ok(json_value) = serde_json::from_str::(response.trim()) { calls = parse_tool_calls_from_json_value(&json_value); if !calls.is_empty() { // If we found tool_calls, extract any content field as text if let Some(content) = json_value.get("content").and_then(|v| v.as_str()) { if !content.trim().is_empty() { text_parts.push(content.trim().to_string()); } } return (text_parts.join("\n"), calls); } } // Fall back to XML-style tag parsing (ZeroClaw's original format) while let Some(start) = remaining.find("") { // Everything before the tag is text let before = &remaining[..start]; if !before.trim().is_empty() { text_parts.push(before.trim().to_string()); } if let Some(end) = remaining[start..].find("") { let inner = &remaining[start + 11..start + end]; let mut parsed_any = false; let json_values = extract_json_values(inner); for value in json_values { let parsed_calls = parse_tool_calls_from_json_value(&value); if !parsed_calls.is_empty() { parsed_any = true; calls.extend(parsed_calls); } } if !parsed_any { tracing::warn!("Malformed JSON: expected tool-call object in tag body"); } remaining = &remaining[start + end + 12..]; } else { break; } } if calls.is_empty() { for value in extract_json_values(response) { let parsed_calls = parse_tool_calls_from_json_value(&value); if !parsed_calls.is_empty() { calls.extend(parsed_calls); } } } // Remaining text after last tool call if !remaining.trim().is_empty() { text_parts.push(remaining.trim().to_string()); } (text_parts.join("\n"), calls) } #[derive(Debug)] struct ParsedToolCall { name: String, arguments: serde_json::Value, } /// Execute a single turn of the agent loop: send messages, parse tool calls, /// execute tools, and loop until the LLM produces a final text response. async fn agent_turn( provider: &dyn Provider, history: &mut Vec, tools_registry: &[Box], observer: &dyn Observer, model: &str, temperature: f64, ) -> Result { for _iteration in 0..MAX_TOOL_ITERATIONS { let response = provider .chat_with_history(history, model, temperature) .await?; let (text, tool_calls) = parse_tool_calls(&response); if tool_calls.is_empty() { // No tool calls — this is the final response history.push(ChatMessage::assistant(&response)); return Ok(if text.is_empty() { response } else { text }); } // Print any text the LLM produced alongside tool calls if !text.is_empty() { print!("{text}"); let _ = std::io::stdout().flush(); } // Execute each tool call and build results let mut tool_results = String::new(); for call in &tool_calls { let start = Instant::now(); let result = if let Some(tool) = find_tool(tools_registry, &call.name) { match tool.execute(call.arguments.clone()).await { Ok(r) => { observer.record_event(&ObserverEvent::ToolCall { tool: call.name.clone(), duration: start.elapsed(), success: r.success, }); if r.success { r.output } else { format!("Error: {}", r.error.unwrap_or_else(|| r.output)) } } Err(e) => { observer.record_event(&ObserverEvent::ToolCall { tool: call.name.clone(), duration: start.elapsed(), success: false, }); format!("Error executing {}: {e}", call.name) } } } else { format!("Unknown tool: {}", call.name) }; let _ = writeln!( tool_results, "\n{}\n", call.name, result ); } // Add assistant message with tool calls + tool results to history history.push(ChatMessage::assistant(&response)); history.push(ChatMessage::user(format!("[Tool results]\n{tool_results}"))); } anyhow::bail!("Agent exceeded maximum tool iterations ({MAX_TOOL_ITERATIONS})") } /// Build the tool instruction block for the system prompt so the LLM knows /// how to invoke tools. fn build_tool_instructions(tools_registry: &[Box]) -> String { let mut instructions = String::new(); instructions.push_str("\n## Tool Use Protocol\n\n"); instructions.push_str("To use a tool, wrap a JSON object in tags:\n\n"); instructions.push_str("```\n\n{\"name\": \"tool_name\", \"arguments\": {\"param\": \"value\"}}\n\n```\n\n"); instructions.push_str("You may use multiple tool calls in a single response. "); instructions.push_str("After tool execution, results appear in tags. "); instructions .push_str("Continue reasoning with the results until you can give a final answer.\n\n"); instructions.push_str("### Available Tools\n\n"); for tool in tools_registry { let _ = writeln!( instructions, "**{}**: {}\nParameters: `{}`\n", tool.name(), tool.description(), tool.parameters_schema() ); } instructions } #[allow(clippy::too_many_lines)] pub async fn run( config: Config, message: Option, provider_override: Option, model_override: Option, temperature: f64, ) -> Result<()> { // ── Wire up agnostic subsystems ────────────────────────────── let observer: Arc = Arc::from(observability::create_observer(&config.observability)); let runtime: Arc = Arc::from(runtime::create_runtime(&config.runtime)?); let security = Arc::new(SecurityPolicy::from_config( &config.autonomy, &config.workspace_dir, )); // ── Memory (the brain) ──────────────────────────────────────── let mem: Arc = Arc::from(memory::create_memory( &config.memory, &config.workspace_dir, config.api_key.as_deref(), )?); tracing::info!(backend = mem.name(), "Memory initialized"); // ── Tools (including memory tools) ──────────────────────────── let composio_key = if config.composio.enabled { config.composio.api_key.as_deref() } else { None }; let tools_registry = tools::all_tools_with_runtime( &security, runtime, mem.clone(), composio_key, &config.browser, &config.agents, config.api_key.as_deref(), ); // ── Resolve provider ───────────────────────────────────────── let provider_name = provider_override .as_deref() .or(config.default_provider.as_deref()) .unwrap_or("openrouter"); let model_name = model_override .as_deref() .or(config.default_model.as_deref()) .unwrap_or("anthropic/claude-sonnet-4-20250514"); let provider: Box = providers::create_routed_provider( provider_name, config.api_key.as_deref(), &config.reliability, &config.model_routes, model_name, )?; observer.record_event(&ObserverEvent::AgentStart { provider: provider_name.to_string(), model: model_name.to_string(), }); // ── Build system prompt from workspace MD files (OpenClaw framework) ── let skills = crate::skills::load_skills(&config.workspace_dir); let mut tool_descs: Vec<(&str, &str)> = vec![ ( "shell", "Execute terminal commands. Use when: running local checks, build/test commands, diagnostics. Don't use when: a safer dedicated tool exists, or command is destructive without approval.", ), ( "file_read", "Read file contents. Use when: inspecting project files, configs, logs. Don't use when: a targeted search is enough.", ), ( "file_write", "Write file contents. Use when: applying focused edits, scaffolding files, updating docs/code. Don't use when: side effects are unclear or file ownership is uncertain.", ), ( "memory_store", "Save to memory. Use when: preserving durable preferences, decisions, key context. Don't use when: information is transient/noisy/sensitive without need.", ), ( "memory_recall", "Search memory. Use when: retrieving prior decisions, user preferences, historical context. Don't use when: answer is already in current context.", ), ( "memory_forget", "Delete a memory entry. Use when: memory is incorrect/stale or explicitly requested for removal. Don't use when: impact is uncertain.", ), ]; tool_descs.push(( "screenshot", "Capture a screenshot of the current screen. Returns file path and base64-encoded PNG. Use when: visual verification, UI inspection, debugging displays.", )); tool_descs.push(( "image_info", "Read image file metadata (format, dimensions, size) and optionally base64-encode it. Use when: inspecting images, preparing visual data for analysis.", )); if config.browser.enabled { tool_descs.push(( "browser_open", "Open approved HTTPS URLs in Brave Browser (allowlist-only, no scraping)", )); } if config.composio.enabled { tool_descs.push(( "composio", "Execute actions on 1000+ apps via Composio (Gmail, Notion, GitHub, Slack, etc.). Use action='list' to discover, 'execute' to run, 'connect' to OAuth.", )); } if !config.agents.is_empty() { tool_descs.push(( "delegate", "Delegate a subtask to a specialized agent. Use when: a task benefits from a different model \ (e.g. fast summarization, deep reasoning, code generation). The sub-agent runs a single \ prompt and returns its response.", )); } let mut system_prompt = crate::channels::build_system_prompt( &config.workspace_dir, model_name, &tool_descs, &skills, Some(&config.identity), ); // Append structured tool-use instructions with schemas system_prompt.push_str(&build_tool_instructions(&tools_registry)); // ── Execute ────────────────────────────────────────────────── let start = Instant::now(); if let Some(msg) = message { // Auto-save user message to memory if config.memory.auto_save { let user_key = autosave_memory_key("user_msg"); let _ = mem .store(&user_key, &msg, MemoryCategory::Conversation) .await; } // Inject memory context into user message let context = build_context(mem.as_ref(), &msg).await; let enriched = if context.is_empty() { msg.clone() } else { format!("{context}{msg}") }; let mut history = vec![ ChatMessage::system(&system_prompt), ChatMessage::user(&enriched), ]; let response = agent_turn( provider.as_ref(), &mut history, &tools_registry, observer.as_ref(), model_name, temperature, ) .await?; println!("{response}"); // Auto-save assistant response to daily log if config.memory.auto_save { let summary = truncate_with_ellipsis(&response, 100); let response_key = autosave_memory_key("assistant_resp"); let _ = mem .store(&response_key, &summary, MemoryCategory::Daily) .await; } } else { println!("🦀 ZeroClaw Interactive Mode"); println!("Type /quit to exit.\n"); let (tx, mut rx) = tokio::sync::mpsc::channel(32); let cli = crate::channels::CliChannel::new(); // Spawn listener let listen_handle = tokio::spawn(async move { let _ = crate::channels::Channel::listen(&cli, tx).await; }); // Persistent conversation history across turns let mut history = vec![ChatMessage::system(&system_prompt)]; while let Some(msg) = rx.recv().await { // Auto-save conversation turns if config.memory.auto_save { let user_key = autosave_memory_key("user_msg"); let _ = mem .store(&user_key, &msg.content, MemoryCategory::Conversation) .await; } // Inject memory context into user message let context = build_context(mem.as_ref(), &msg.content).await; let enriched = if context.is_empty() { msg.content.clone() } else { format!("{context}{}", msg.content) }; history.push(ChatMessage::user(&enriched)); let response = match agent_turn( provider.as_ref(), &mut history, &tools_registry, observer.as_ref(), model_name, temperature, ) .await { Ok(resp) => resp, Err(e) => { eprintln!("\nError: {e}\n"); continue; } }; println!("\n{response}\n"); // Prevent unbounded history growth in long interactive sessions trim_history(&mut history); if config.memory.auto_save { let summary = truncate_with_ellipsis(&response, 100); let response_key = autosave_memory_key("assistant_resp"); let _ = mem .store(&response_key, &summary, MemoryCategory::Daily) .await; } } listen_handle.abort(); } let duration = start.elapsed(); observer.record_event(&ObserverEvent::AgentEnd { duration, tokens_used: None, }); Ok(()) } #[cfg(test)] mod tests { use super::*; use crate::memory::{Memory, MemoryCategory, SqliteMemory}; use tempfile::TempDir; #[test] fn parse_tool_calls_extracts_single_call() { let response = r#"Let me check that. {"name": "shell", "arguments": {"command": "ls -la"}} "#; let (text, calls) = parse_tool_calls(response); assert_eq!(text, "Let me check that."); assert_eq!(calls.len(), 1); assert_eq!(calls[0].name, "shell"); assert_eq!( calls[0].arguments.get("command").unwrap().as_str().unwrap(), "ls -la" ); } #[test] fn parse_tool_calls_extracts_multiple_calls() { let response = r#" {"name": "file_read", "arguments": {"path": "a.txt"}} {"name": "file_read", "arguments": {"path": "b.txt"}} "#; let (_, calls) = parse_tool_calls(response); assert_eq!(calls.len(), 2); assert_eq!(calls[0].name, "file_read"); assert_eq!(calls[1].name, "file_read"); } #[test] fn parse_tool_calls_returns_text_only_when_no_calls() { let response = "Just a normal response with no tools."; let (text, calls) = parse_tool_calls(response); assert_eq!(text, "Just a normal response with no tools."); assert!(calls.is_empty()); } #[test] fn parse_tool_calls_handles_malformed_json() { let response = r#" not valid json Some text after."#; let (text, calls) = parse_tool_calls(response); assert!(calls.is_empty()); assert!(text.contains("Some text after.")); } #[test] fn parse_tool_calls_text_before_and_after() { let response = r#"Before text. {"name": "shell", "arguments": {"command": "echo hi"}} After text."#; let (text, calls) = parse_tool_calls(response); assert!(text.contains("Before text.")); assert!(text.contains("After text.")); assert_eq!(calls.len(), 1); } #[test] fn parse_tool_calls_handles_openai_format() { // OpenAI-style response with tool_calls array let response = r#"{"content": "Let me check that for you.", "tool_calls": [{"type": "function", "function": {"name": "shell", "arguments": "{\"command\": \"ls -la\"}"}}]}"#; let (text, calls) = parse_tool_calls(response); assert_eq!(text, "Let me check that for you."); assert_eq!(calls.len(), 1); assert_eq!(calls[0].name, "shell"); assert_eq!( calls[0].arguments.get("command").unwrap().as_str().unwrap(), "ls -la" ); } #[test] fn parse_tool_calls_handles_openai_format_multiple_calls() { let response = r#"{"tool_calls": [{"type": "function", "function": {"name": "file_read", "arguments": "{\"path\": \"a.txt\"}"}}, {"type": "function", "function": {"name": "file_read", "arguments": "{\"path\": \"b.txt\"}"}}]}"#; let (_, calls) = parse_tool_calls(response); assert_eq!(calls.len(), 2); assert_eq!(calls[0].name, "file_read"); assert_eq!(calls[1].name, "file_read"); } #[test] fn parse_tool_calls_openai_format_without_content() { // Some providers don't include content field with tool_calls let response = r#"{"tool_calls": [{"type": "function", "function": {"name": "memory_recall", "arguments": "{}"}}]}"#; let (text, calls) = parse_tool_calls(response); assert!(text.is_empty()); // No content field assert_eq!(calls.len(), 1); assert_eq!(calls[0].name, "memory_recall"); } #[test] fn parse_tool_calls_handles_markdown_json_inside_tool_call_tag() { let response = r#" ```json {"name": "file_write", "arguments": {"path": "test.py", "content": "print('ok')"}} ``` "#; let (text, calls) = parse_tool_calls(response); assert!(text.is_empty()); assert_eq!(calls.len(), 1); assert_eq!(calls[0].name, "file_write"); assert_eq!( calls[0].arguments.get("path").unwrap().as_str().unwrap(), "test.py" ); } #[test] fn parse_tool_calls_handles_noisy_tool_call_tag_body() { let response = r#" I will now call the tool with this payload: {"name": "shell", "arguments": {"command": "pwd"}} "#; let (text, calls) = parse_tool_calls(response); assert!(text.is_empty()); assert_eq!(calls.len(), 1); assert_eq!(calls[0].name, "shell"); assert_eq!( calls[0].arguments.get("command").unwrap().as_str().unwrap(), "pwd" ); } #[test] fn parse_tool_calls_handles_raw_tool_json_without_tags() { let response = r#"Sure, creating the file now. {"name": "file_write", "arguments": {"path": "hello.py", "content": "print('hello')"}}"#; let (text, calls) = parse_tool_calls(response); assert!(text.contains("Sure, creating the file now.")); assert_eq!(calls.len(), 1); assert_eq!(calls[0].name, "file_write"); assert_eq!( calls[0].arguments.get("path").unwrap().as_str().unwrap(), "hello.py" ); } #[test] fn build_tool_instructions_includes_all_tools() { use crate::security::SecurityPolicy; let security = Arc::new(SecurityPolicy::from_config( &crate::config::AutonomyConfig::default(), std::path::Path::new("/tmp"), )); let tools = tools::default_tools(security); let instructions = build_tool_instructions(&tools); assert!(instructions.contains("## Tool Use Protocol")); assert!(instructions.contains("")); assert!(instructions.contains("shell")); assert!(instructions.contains("file_read")); assert!(instructions.contains("file_write")); } #[test] fn trim_history_preserves_system_prompt() { let mut history = vec![ChatMessage::system("system prompt")]; for i in 0..MAX_HISTORY_MESSAGES + 20 { history.push(ChatMessage::user(format!("msg {i}"))); } let original_len = history.len(); assert!(original_len > MAX_HISTORY_MESSAGES + 1); trim_history(&mut history); // System prompt preserved assert_eq!(history[0].role, "system"); assert_eq!(history[0].content, "system prompt"); // Trimmed to limit assert_eq!(history.len(), MAX_HISTORY_MESSAGES + 1); // +1 for system // Most recent messages preserved let last = &history[history.len() - 1]; assert_eq!(last.content, format!("msg {}", MAX_HISTORY_MESSAGES + 19)); } #[test] fn trim_history_noop_when_within_limit() { let mut history = vec![ ChatMessage::system("sys"), ChatMessage::user("hello"), ChatMessage::assistant("hi"), ]; trim_history(&mut history); assert_eq!(history.len(), 3); } #[test] fn autosave_memory_key_has_prefix_and_uniqueness() { let key1 = autosave_memory_key("user_msg"); let key2 = autosave_memory_key("user_msg"); assert!(key1.starts_with("user_msg_")); assert!(key2.starts_with("user_msg_")); assert_ne!(key1, key2); } #[tokio::test] async fn autosave_memory_keys_preserve_multiple_turns() { let tmp = TempDir::new().unwrap(); let mem = SqliteMemory::new(tmp.path()).unwrap(); let key1 = autosave_memory_key("user_msg"); let key2 = autosave_memory_key("user_msg"); mem.store(&key1, "I'm Paul", MemoryCategory::Conversation) .await .unwrap(); mem.store(&key2, "I'm 45", MemoryCategory::Conversation) .await .unwrap(); assert_eq!(mem.count().await.unwrap(), 2); let recalled = mem.recall("45", 5).await.unwrap(); assert!(recalled.iter().any(|entry| entry.content.contains("45"))); } }