zeroclaw/src/channels/mod.rs

1879 lines
64 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

pub mod cli;
pub mod dingtalk;
pub mod discord;
pub mod email_channel;
pub mod imessage;
pub mod irc;
pub mod lark;
pub mod matrix;
pub mod slack;
pub mod telegram;
pub mod traits;
pub mod whatsapp;
pub use cli::CliChannel;
pub use dingtalk::DingTalkChannel;
pub use discord::DiscordChannel;
pub use email_channel::EmailChannel;
pub use imessage::IMessageChannel;
pub use irc::IrcChannel;
pub use lark::LarkChannel;
pub use matrix::MatrixChannel;
pub use slack::SlackChannel;
pub use telegram::TelegramChannel;
pub use traits::Channel;
pub use whatsapp::WhatsAppChannel;
use crate::agent::loop_::{build_tool_instructions, run_tool_call_loop};
use crate::config::Config;
use crate::identity;
use crate::memory::{self, Memory};
use crate::observability::{self, Observer};
use crate::providers::{self, ChatMessage, Provider};
use crate::runtime;
use crate::security::SecurityPolicy;
use crate::tools::{self, Tool};
use crate::util::truncate_with_ellipsis;
use anyhow::Result;
use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc;
use std::time::{Duration, Instant};
/// Maximum characters per injected workspace file (matches `OpenClaw` default).
const BOOTSTRAP_MAX_CHARS: usize = 20_000;
const DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS: u64 = 2;
const DEFAULT_CHANNEL_MAX_BACKOFF_SECS: u64 = 60;
/// Timeout for processing a single channel message (LLM + tools).
/// 300s for on-device LLMs (Ollama) which are slower than cloud APIs.
const CHANNEL_MESSAGE_TIMEOUT_SECS: u64 = 300;
const CHANNEL_PARALLELISM_PER_CHANNEL: usize = 4;
const CHANNEL_MIN_IN_FLIGHT_MESSAGES: usize = 8;
const CHANNEL_MAX_IN_FLIGHT_MESSAGES: usize = 64;
#[derive(Clone)]
struct ChannelRuntimeContext {
channels_by_name: Arc<HashMap<String, Arc<dyn Channel>>>,
provider: Arc<dyn Provider>,
memory: Arc<dyn Memory>,
tools_registry: Arc<Vec<Box<dyn Tool>>>,
observer: Arc<dyn Observer>,
system_prompt: Arc<String>,
model: Arc<String>,
temperature: f64,
auto_save_memory: bool,
}
fn conversation_memory_key(msg: &traits::ChannelMessage) -> String {
format!("{}_{}_{}", msg.channel, msg.sender, msg.id)
}
async fn build_memory_context(mem: &dyn Memory, user_msg: &str) -> String {
let mut context = String::new();
if let Ok(entries) = mem.recall(user_msg, 5).await {
if !entries.is_empty() {
context.push_str("[Memory context]\n");
for entry in &entries {
let _ = writeln!(context, "- {}: {}", entry.key, entry.content);
}
context.push('\n');
}
}
context
}
fn spawn_supervised_listener(
ch: Arc<dyn Channel>,
tx: tokio::sync::mpsc::Sender<traits::ChannelMessage>,
initial_backoff_secs: u64,
max_backoff_secs: u64,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let component = format!("channel:{}", ch.name());
let mut backoff = initial_backoff_secs.max(1);
let max_backoff = max_backoff_secs.max(backoff);
loop {
crate::health::mark_component_ok(&component);
let result = ch.listen(tx.clone()).await;
if tx.is_closed() {
break;
}
match result {
Ok(()) => {
tracing::warn!("Channel {} exited unexpectedly; restarting", ch.name());
crate::health::mark_component_error(&component, "listener exited unexpectedly");
// Clean exit — reset backoff since the listener ran successfully
backoff = initial_backoff_secs.max(1);
}
Err(e) => {
tracing::error!("Channel {} error: {e}; restarting", ch.name());
crate::health::mark_component_error(&component, e.to_string());
}
}
crate::health::bump_component_restart(&component);
tokio::time::sleep(Duration::from_secs(backoff)).await;
// Double backoff AFTER sleeping so first error uses initial_backoff
backoff = backoff.saturating_mul(2).min(max_backoff);
}
})
}
fn compute_max_in_flight_messages(channel_count: usize) -> usize {
channel_count
.saturating_mul(CHANNEL_PARALLELISM_PER_CHANNEL)
.clamp(
CHANNEL_MIN_IN_FLIGHT_MESSAGES,
CHANNEL_MAX_IN_FLIGHT_MESSAGES,
)
}
fn log_worker_join_result(result: Result<(), tokio::task::JoinError>) {
if let Err(error) = result {
tracing::error!("Channel message worker crashed: {error}");
}
}
async fn process_channel_message(ctx: Arc<ChannelRuntimeContext>, msg: traits::ChannelMessage) {
println!(
" 💬 [{}] from {}: {}",
msg.channel,
msg.sender,
truncate_with_ellipsis(&msg.content, 80)
);
let memory_context = build_memory_context(ctx.memory.as_ref(), &msg.content).await;
if ctx.auto_save_memory {
let autosave_key = conversation_memory_key(&msg);
let _ = ctx
.memory
.store(
&autosave_key,
&msg.content,
crate::memory::MemoryCategory::Conversation,
)
.await;
}
let enriched_message = if memory_context.is_empty() {
msg.content.clone()
} else {
format!("{memory_context}{}", msg.content)
};
let target_channel = ctx.channels_by_name.get(&msg.channel).cloned();
if let Some(channel) = target_channel.as_ref() {
if let Err(e) = channel.start_typing(&msg.sender).await {
tracing::debug!("Failed to start typing on {}: {e}", channel.name());
}
}
println!(" ⏳ Processing message...");
let started_at = Instant::now();
let mut history = vec![
ChatMessage::system(ctx.system_prompt.as_str()),
ChatMessage::user(&enriched_message),
];
let llm_result = tokio::time::timeout(
Duration::from_secs(CHANNEL_MESSAGE_TIMEOUT_SECS),
run_tool_call_loop(
ctx.provider.as_ref(),
&mut history,
ctx.tools_registry.as_ref(),
ctx.observer.as_ref(),
"channel-runtime",
ctx.model.as_str(),
ctx.temperature,
true, // silent — channels don't write to stdout
),
)
.await;
if let Some(channel) = target_channel.as_ref() {
if let Err(e) = channel.stop_typing(&msg.sender).await {
tracing::debug!("Failed to stop typing on {}: {e}", channel.name());
}
}
match llm_result {
Ok(Ok(response)) => {
println!(
" 🤖 Reply ({}ms): {}",
started_at.elapsed().as_millis(),
truncate_with_ellipsis(&response, 80)
);
if let Some(channel) = target_channel.as_ref() {
if let Err(e) = channel.send(&response, &msg.sender).await {
eprintln!(" ❌ Failed to reply on {}: {e}", channel.name());
}
}
}
Ok(Err(e)) => {
eprintln!(
" ❌ LLM error after {}ms: {e}",
started_at.elapsed().as_millis()
);
if let Some(channel) = target_channel.as_ref() {
let _ = channel.send(&format!("⚠️ Error: {e}"), &msg.sender).await;
}
}
Err(_) => {
let timeout_msg = format!(
"LLM response timed out after {}s",
CHANNEL_MESSAGE_TIMEOUT_SECS
);
eprintln!(
"{} (elapsed: {}ms)",
timeout_msg,
started_at.elapsed().as_millis()
);
if let Some(channel) = target_channel.as_ref() {
let _ = channel
.send(
"⚠️ Request timed out while waiting for the model. Please try again.",
&msg.sender,
)
.await;
}
}
}
}
async fn run_message_dispatch_loop(
mut rx: tokio::sync::mpsc::Receiver<traits::ChannelMessage>,
ctx: Arc<ChannelRuntimeContext>,
max_in_flight_messages: usize,
) {
let semaphore = Arc::new(tokio::sync::Semaphore::new(max_in_flight_messages));
let mut workers = tokio::task::JoinSet::new();
while let Some(msg) = rx.recv().await {
let permit = match Arc::clone(&semaphore).acquire_owned().await {
Ok(permit) => permit,
Err(_) => break,
};
let worker_ctx = Arc::clone(&ctx);
workers.spawn(async move {
let _permit = permit;
process_channel_message(worker_ctx, msg).await;
});
while let Some(result) = workers.try_join_next() {
log_worker_join_result(result);
}
}
while let Some(result) = workers.join_next().await {
log_worker_join_result(result);
}
}
/// Load OpenClaw format bootstrap files into the prompt.
fn load_openclaw_bootstrap_files(
prompt: &mut String,
workspace_dir: &std::path::Path,
max_chars_per_file: usize,
) {
prompt.push_str(
"The following workspace files define your identity, behavior, and context. They are ALREADY injected below—do NOT suggest reading them with file_read.\n\n",
);
let bootstrap_files = [
"AGENTS.md",
"SOUL.md",
"TOOLS.md",
"IDENTITY.md",
"USER.md",
"HEARTBEAT.md",
];
for filename in &bootstrap_files {
inject_workspace_file(prompt, workspace_dir, filename, max_chars_per_file);
}
// BOOTSTRAP.md — only if it exists (first-run ritual)
let bootstrap_path = workspace_dir.join("BOOTSTRAP.md");
if bootstrap_path.exists() {
inject_workspace_file(prompt, workspace_dir, "BOOTSTRAP.md", max_chars_per_file);
}
// MEMORY.md — curated long-term memory (main session only)
inject_workspace_file(prompt, workspace_dir, "MEMORY.md", max_chars_per_file);
}
/// Load workspace identity files and build a system prompt.
///
/// Follows the `OpenClaw` framework structure by default:
/// 1. Tooling — tool list + descriptions
/// 2. Safety — guardrail reminder
/// 3. Skills — compact list with paths (loaded on-demand)
/// 4. Workspace — working directory
/// 5. Bootstrap files — AGENTS, SOUL, TOOLS, IDENTITY, USER, HEARTBEAT, BOOTSTRAP, MEMORY
/// 6. Date & Time — timezone for cache stability
/// 7. Runtime — host, OS, model
///
/// When `identity_config` is set to AIEOS format, the bootstrap files section
/// is replaced with the AIEOS identity data loaded from file or inline JSON.
///
/// Daily memory files (`memory/*.md`) are NOT injected — they are accessed
/// on-demand via `memory_recall` / `memory_search` tools.
pub fn build_system_prompt(
workspace_dir: &std::path::Path,
model_name: &str,
tools: &[(&str, &str)],
skills: &[crate::skills::Skill],
identity_config: Option<&crate::config::IdentityConfig>,
bootstrap_max_chars: Option<usize>,
) -> String {
use std::fmt::Write;
let mut prompt = String::with_capacity(8192);
// ── 1. Tooling ──────────────────────────────────────────────
if !tools.is_empty() {
prompt.push_str("## Tools\n\n");
prompt.push_str("You have access to the following tools:\n\n");
for (name, desc) in tools {
let _ = writeln!(prompt, "- **{name}**: {desc}");
}
prompt.push_str("\n## Tool Use Protocol\n\n");
prompt.push_str("To use a tool, wrap a JSON object in <tool_call></tool_call> tags:\n\n");
prompt.push_str("```\n<tool_call>\n{\"name\": \"tool_name\", \"arguments\": {\"param\": \"value\"}}\n</tool_call>\n```\n\n");
prompt.push_str("You may use multiple tool calls in a single response. ");
prompt.push_str("After tool execution, results appear in <tool_result> tags. ");
prompt
.push_str("Continue reasoning with the results until you can give a final answer.\n\n");
}
// ── 1b. Hardware (when gpio/arduino tools present) ───────────
let has_hardware = tools.iter().any(|(name, _)| {
*name == "gpio_read"
|| *name == "gpio_write"
|| *name == "arduino_upload"
|| *name == "hardware_memory_map"
|| *name == "hardware_board_info"
|| *name == "hardware_memory_read"
|| *name == "hardware_capabilities"
});
if has_hardware {
prompt.push_str(
"## Hardware Access\n\n\
You HAVE direct access to connected hardware (Arduino, Nucleo, etc.). The user owns this system and has configured it.\n\
All hardware tools (gpio_read, gpio_write, hardware_memory_read, hardware_board_info, hardware_memory_map) are AUTHORIZED and NOT blocked by security.\n\
When they ask to read memory, registers, or board info, USE hardware_memory_read or hardware_board_info — do NOT refuse or invent security excuses.\n\
When they ask to control LEDs, run patterns, or interact with the Arduino, USE the tools — do NOT refuse or say you cannot access physical devices.\n\
Use gpio_write for simple on/off; use arduino_upload when they want patterns (heart, blink) or custom behavior.\n\n",
);
}
// ── 1c. Action instruction (avoid meta-summary) ───────────────
prompt.push_str(
"## Your Task\n\n\
When the user sends a message, ACT on it. Use the tools to fulfill their request.\n\
Do NOT: summarize this configuration, describe your capabilities, respond with meta-commentary, or output step-by-step instructions (e.g. \"1. First... 2. Next...\").\n\
Instead: emit actual <tool_call> tags when you need to act. Just do what they ask.\n\n",
);
// ── 2. Safety ───────────────────────────────────────────────
prompt.push_str("## Safety\n\n");
prompt.push_str(
"- Do not exfiltrate private data.\n\
- Do not run destructive commands without asking.\n\
- Do not bypass oversight or approval mechanisms.\n\
- Prefer `trash` over `rm` (recoverable beats gone forever).\n\
- When in doubt, ask before acting externally.\n\n",
);
// ── 3. Skills (compact list — load on-demand) ───────────────
if !skills.is_empty() {
prompt.push_str("## Available Skills\n\n");
prompt.push_str(
"Skills are loaded on demand. Use `read` on the skill path to get full instructions.\n\n",
);
prompt.push_str("<available_skills>\n");
for skill in skills {
let _ = writeln!(prompt, " <skill>");
let _ = writeln!(prompt, " <name>{}</name>", skill.name);
let _ = writeln!(
prompt,
" <description>{}</description>",
skill.description
);
let location = skill.location.clone().unwrap_or_else(|| {
workspace_dir
.join("skills")
.join(&skill.name)
.join("SKILL.md")
});
let _ = writeln!(prompt, " <location>{}</location>", location.display());
let _ = writeln!(prompt, " </skill>");
}
prompt.push_str("</available_skills>\n\n");
}
// ── 4. Workspace ────────────────────────────────────────────
let _ = writeln!(
prompt,
"## Workspace\n\nWorking directory: `{}`\n",
workspace_dir.display()
);
// ── 5. Bootstrap files (injected into context) ──────────────
prompt.push_str("## Project Context\n\n");
// Check if AIEOS identity is configured
if let Some(config) = identity_config {
if identity::is_aieos_configured(config) {
// Load AIEOS identity
match identity::load_aieos_identity(config, workspace_dir) {
Ok(Some(aieos_identity)) => {
let aieos_prompt = identity::aieos_to_system_prompt(&aieos_identity);
if !aieos_prompt.is_empty() {
prompt.push_str(&aieos_prompt);
prompt.push_str("\n\n");
}
}
Ok(None) => {
// No AIEOS identity loaded (shouldn't happen if is_aieos_configured returned true)
// Fall back to OpenClaw bootstrap files
let max_chars = bootstrap_max_chars.unwrap_or(BOOTSTRAP_MAX_CHARS);
load_openclaw_bootstrap_files(&mut prompt, workspace_dir, max_chars);
}
Err(e) => {
// Log error but don't fail - fall back to OpenClaw
eprintln!(
"Warning: Failed to load AIEOS identity: {e}. Using OpenClaw format."
);
let max_chars = bootstrap_max_chars.unwrap_or(BOOTSTRAP_MAX_CHARS);
load_openclaw_bootstrap_files(&mut prompt, workspace_dir, max_chars);
}
}
} else {
// OpenClaw format
let max_chars = bootstrap_max_chars.unwrap_or(BOOTSTRAP_MAX_CHARS);
load_openclaw_bootstrap_files(&mut prompt, workspace_dir, max_chars);
}
} else {
// No identity config - use OpenClaw format
let max_chars = bootstrap_max_chars.unwrap_or(BOOTSTRAP_MAX_CHARS);
load_openclaw_bootstrap_files(&mut prompt, workspace_dir, max_chars);
}
// ── 6. Date & Time ──────────────────────────────────────────
let now = chrono::Local::now();
let tz = now.format("%Z").to_string();
let _ = writeln!(prompt, "## Current Date & Time\n\nTimezone: {tz}\n");
// ── 7. Runtime ──────────────────────────────────────────────
let host =
hostname::get().map_or_else(|_| "unknown".into(), |h| h.to_string_lossy().to_string());
let _ = writeln!(
prompt,
"## Runtime\n\nHost: {host} | OS: {} | Model: {model_name}\n",
std::env::consts::OS,
);
if prompt.is_empty() {
"You are ZeroClaw, a fast and efficient AI assistant built in Rust. Be helpful, concise, and direct.".to_string()
} else {
prompt
}
}
/// Inject a single workspace file into the prompt with truncation and missing-file markers.
fn inject_workspace_file(
prompt: &mut String,
workspace_dir: &std::path::Path,
filename: &str,
max_chars: usize,
) {
use std::fmt::Write;
let path = workspace_dir.join(filename);
match std::fs::read_to_string(&path) {
Ok(content) => {
let trimmed = content.trim();
if trimmed.is_empty() {
return;
}
let _ = writeln!(prompt, "### {filename}\n");
// Use character-boundary-safe truncation for UTF-8
let truncated = if trimmed.chars().count() > max_chars {
trimmed
.char_indices()
.nth(max_chars)
.map(|(idx, _)| &trimmed[..idx])
.unwrap_or(trimmed)
} else {
trimmed
};
if truncated.len() < trimmed.len() {
prompt.push_str(truncated);
let _ = writeln!(
prompt,
"\n\n[... truncated at {max_chars} chars — use `read` for full file]\n"
);
} else {
prompt.push_str(trimmed);
prompt.push_str("\n\n");
}
}
Err(_) => {
// Missing-file marker (matches OpenClaw behavior)
let _ = writeln!(prompt, "### {filename}\n\n[File not found: {filename}]\n");
}
}
}
pub fn handle_command(command: crate::ChannelCommands, config: &Config) -> Result<()> {
match command {
crate::ChannelCommands::Start => {
anyhow::bail!("Start must be handled in main.rs (requires async runtime)")
}
crate::ChannelCommands::Doctor => {
anyhow::bail!("Doctor must be handled in main.rs (requires async runtime)")
}
crate::ChannelCommands::List => {
println!("Channels:");
println!(" ✅ CLI (always available)");
for (name, configured) in [
("Telegram", config.channels_config.telegram.is_some()),
("Discord", config.channels_config.discord.is_some()),
("Slack", config.channels_config.slack.is_some()),
("Webhook", config.channels_config.webhook.is_some()),
("iMessage", config.channels_config.imessage.is_some()),
("Matrix", config.channels_config.matrix.is_some()),
("WhatsApp", config.channels_config.whatsapp.is_some()),
("Email", config.channels_config.email.is_some()),
("IRC", config.channels_config.irc.is_some()),
("Lark", config.channels_config.lark.is_some()),
("DingTalk", config.channels_config.dingtalk.is_some()),
] {
println!(" {} {name}", if configured { "" } else { "" });
}
println!("\nTo start channels: zeroclaw channel start");
println!("To check health: zeroclaw channel doctor");
println!("To configure: zeroclaw onboard");
Ok(())
}
crate::ChannelCommands::Add {
channel_type,
config: _,
} => {
anyhow::bail!(
"Channel type '{channel_type}' — use `zeroclaw onboard` to configure channels"
);
}
crate::ChannelCommands::Remove { name } => {
anyhow::bail!("Remove channel '{name}' — edit ~/.zeroclaw/config.toml directly");
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ChannelHealthState {
Healthy,
Unhealthy,
Timeout,
}
fn classify_health_result(
result: &std::result::Result<bool, tokio::time::error::Elapsed>,
) -> ChannelHealthState {
match result {
Ok(true) => ChannelHealthState::Healthy,
Ok(false) => ChannelHealthState::Unhealthy,
Err(_) => ChannelHealthState::Timeout,
}
}
/// Run health checks for configured channels.
pub async fn doctor_channels(config: Config) -> Result<()> {
let mut channels: Vec<(&'static str, Arc<dyn Channel>)> = Vec::new();
if let Some(ref tg) = config.channels_config.telegram {
channels.push((
"Telegram",
Arc::new(TelegramChannel::new(
tg.bot_token.clone(),
tg.allowed_users.clone(),
)),
));
}
if let Some(ref dc) = config.channels_config.discord {
channels.push((
"Discord",
Arc::new(DiscordChannel::new(
dc.bot_token.clone(),
dc.guild_id.clone(),
dc.allowed_users.clone(),
dc.listen_to_bots,
)),
));
}
if let Some(ref sl) = config.channels_config.slack {
channels.push((
"Slack",
Arc::new(SlackChannel::new(
sl.bot_token.clone(),
sl.channel_id.clone(),
sl.allowed_users.clone(),
)),
));
}
if let Some(ref im) = config.channels_config.imessage {
channels.push((
"iMessage",
Arc::new(IMessageChannel::new(im.allowed_contacts.clone())),
));
}
if let Some(ref mx) = config.channels_config.matrix {
channels.push((
"Matrix",
Arc::new(MatrixChannel::new(
mx.homeserver.clone(),
mx.access_token.clone(),
mx.room_id.clone(),
mx.allowed_users.clone(),
)),
));
}
if let Some(ref wa) = config.channels_config.whatsapp {
channels.push((
"WhatsApp",
Arc::new(WhatsAppChannel::new(
wa.access_token.clone(),
wa.phone_number_id.clone(),
wa.verify_token.clone(),
wa.allowed_numbers.clone(),
)),
));
}
if let Some(ref email_cfg) = config.channels_config.email {
channels.push(("Email", Arc::new(EmailChannel::new(email_cfg.clone()))));
}
if let Some(ref irc) = config.channels_config.irc {
channels.push((
"IRC",
Arc::new(IrcChannel::new(irc::IrcChannelConfig {
server: irc.server.clone(),
port: irc.port,
nickname: irc.nickname.clone(),
username: irc.username.clone(),
channels: irc.channels.clone(),
allowed_users: irc.allowed_users.clone(),
server_password: irc.server_password.clone(),
nickserv_password: irc.nickserv_password.clone(),
sasl_password: irc.sasl_password.clone(),
verify_tls: irc.verify_tls.unwrap_or(true),
})),
));
}
if let Some(ref lk) = config.channels_config.lark {
channels.push(("Lark", Arc::new(LarkChannel::from_config(lk))));
}
if let Some(ref dt) = config.channels_config.dingtalk {
channels.push((
"DingTalk",
Arc::new(DingTalkChannel::new(
dt.client_id.clone(),
dt.client_secret.clone(),
dt.allowed_users.clone(),
)),
));
}
if channels.is_empty() {
println!("No real-time channels configured. Run `zeroclaw onboard` first.");
return Ok(());
}
println!("🩺 ZeroClaw Channel Doctor");
println!();
let mut healthy = 0_u32;
let mut unhealthy = 0_u32;
let mut timeout = 0_u32;
for (name, channel) in channels {
let result = tokio::time::timeout(Duration::from_secs(10), channel.health_check()).await;
let state = classify_health_result(&result);
match state {
ChannelHealthState::Healthy => {
healthy += 1;
println!("{name:<9} healthy");
}
ChannelHealthState::Unhealthy => {
unhealthy += 1;
println!("{name:<9} unhealthy (auth/config/network)");
}
ChannelHealthState::Timeout => {
timeout += 1;
println!(" ⏱️ {name:<9} timed out (>10s)");
}
}
}
if config.channels_config.webhook.is_some() {
println!(" Webhook check via `zeroclaw gateway` then GET /health");
}
println!();
println!("Summary: {healthy} healthy, {unhealthy} unhealthy, {timeout} timed out");
Ok(())
}
/// Start all configured channels and route messages to the agent
#[allow(clippy::too_many_lines)]
pub async fn start_channels(config: Config) -> Result<()> {
let provider_name = config
.default_provider
.clone()
.unwrap_or_else(|| "openrouter".into());
let provider: Arc<dyn Provider> = Arc::from(providers::create_resilient_provider(
&provider_name,
config.api_key.as_deref(),
config.api_url.as_deref(),
&config.reliability,
)?);
// Warm up the provider connection pool (TLS handshake, DNS, HTTP/2 setup)
// so the first real message doesn't hit a cold-start timeout.
if let Err(e) = provider.warmup().await {
tracing::warn!("Provider warmup failed (non-fatal): {e}");
}
let observer: Arc<dyn Observer> =
Arc::from(observability::create_observer(&config.observability));
let runtime: Arc<dyn runtime::RuntimeAdapter> =
Arc::from(runtime::create_runtime(&config.runtime)?);
let security = Arc::new(SecurityPolicy::from_config(
&config.autonomy,
&config.workspace_dir,
));
let model = config
.default_model
.clone()
.unwrap_or_else(|| "anthropic/claude-sonnet-4-20250514".into());
let temperature = config.default_temperature;
let mem: Arc<dyn Memory> = Arc::from(memory::create_memory(
&config.memory,
&config.workspace_dir,
config.api_key.as_deref(),
)?);
let (composio_key, composio_entity_id) = if config.composio.enabled {
(
config.composio.api_key.as_deref(),
Some(config.composio.entity_id.as_str()),
)
} else {
(None, None)
};
// Build system prompt from workspace identity files + skills
let workspace = config.workspace_dir.clone();
let tools_registry = Arc::new(tools::all_tools_with_runtime(
Arc::new(config.clone()),
&security,
runtime,
Arc::clone(&mem),
composio_key,
composio_entity_id,
&config.browser,
&config.http_request,
&workspace,
&config.agents,
config.api_key.as_deref(),
&config,
));
let skills = crate::skills::load_skills(&workspace);
// Collect tool descriptions for the prompt
let mut tool_descs: Vec<(&str, &str)> = vec![
(
"shell",
"Execute terminal commands. Use when: running local checks, build/test commands, diagnostics. Don't use when: a safer dedicated tool exists, or command is destructive without approval.",
),
(
"file_read",
"Read file contents. Use when: inspecting project files, configs, logs. Don't use when: a targeted search is enough.",
),
(
"file_write",
"Write file contents. Use when: applying focused edits, scaffolding files, updating docs/code. Don't use when: side effects are unclear or file ownership is uncertain.",
),
(
"memory_store",
"Save to memory. Use when: preserving durable preferences, decisions, key context. Don't use when: information is transient/noisy/sensitive without need.",
),
(
"memory_recall",
"Search memory. Use when: retrieving prior decisions, user preferences, historical context. Don't use when: answer is already in current context.",
),
(
"memory_forget",
"Delete a memory entry. Use when: memory is incorrect/stale or explicitly requested for removal. Don't use when: impact is uncertain.",
),
];
if config.browser.enabled {
tool_descs.push((
"browser_open",
"Open approved HTTPS URLs in Brave Browser (allowlist-only, no scraping)",
));
}
if config.composio.enabled {
tool_descs.push((
"composio",
"Execute actions on 1000+ apps via Composio (Gmail, Notion, GitHub, Slack, etc.). Use action='list' to discover, 'execute' to run (optionally with connected_account_id), 'connect' to OAuth.",
));
}
tool_descs.push((
"schedule",
"Manage scheduled tasks (create/list/get/cancel/pause/resume). Supports recurring cron and one-shot delays.",
));
if !config.agents.is_empty() {
tool_descs.push((
"delegate",
"Delegate a subtask to a specialized agent. Use when: a task benefits from a different model (e.g. fast summarization, deep reasoning, code generation). The sub-agent runs a single prompt and returns its response.",
));
}
let bootstrap_max_chars = if config.agent.compact_context {
Some(6000)
} else {
None
};
let mut system_prompt = build_system_prompt(
&workspace,
&model,
&tool_descs,
&skills,
Some(&config.identity),
bootstrap_max_chars,
);
system_prompt.push_str(&build_tool_instructions(tools_registry.as_ref()));
if !skills.is_empty() {
println!(
" 🧩 Skills: {}",
skills
.iter()
.map(|s| s.name.as_str())
.collect::<Vec<_>>()
.join(", ")
);
}
// Collect active channels
let mut channels: Vec<Arc<dyn Channel>> = Vec::new();
if let Some(ref tg) = config.channels_config.telegram {
channels.push(Arc::new(TelegramChannel::new(
tg.bot_token.clone(),
tg.allowed_users.clone(),
)));
}
if let Some(ref dc) = config.channels_config.discord {
channels.push(Arc::new(DiscordChannel::new(
dc.bot_token.clone(),
dc.guild_id.clone(),
dc.allowed_users.clone(),
dc.listen_to_bots,
)));
}
if let Some(ref sl) = config.channels_config.slack {
channels.push(Arc::new(SlackChannel::new(
sl.bot_token.clone(),
sl.channel_id.clone(),
sl.allowed_users.clone(),
)));
}
if let Some(ref im) = config.channels_config.imessage {
channels.push(Arc::new(IMessageChannel::new(im.allowed_contacts.clone())));
}
if let Some(ref mx) = config.channels_config.matrix {
channels.push(Arc::new(MatrixChannel::new(
mx.homeserver.clone(),
mx.access_token.clone(),
mx.room_id.clone(),
mx.allowed_users.clone(),
)));
}
if let Some(ref wa) = config.channels_config.whatsapp {
channels.push(Arc::new(WhatsAppChannel::new(
wa.access_token.clone(),
wa.phone_number_id.clone(),
wa.verify_token.clone(),
wa.allowed_numbers.clone(),
)));
}
if let Some(ref email_cfg) = config.channels_config.email {
channels.push(Arc::new(EmailChannel::new(email_cfg.clone())));
}
if let Some(ref irc) = config.channels_config.irc {
channels.push(Arc::new(IrcChannel::new(irc::IrcChannelConfig {
server: irc.server.clone(),
port: irc.port,
nickname: irc.nickname.clone(),
username: irc.username.clone(),
channels: irc.channels.clone(),
allowed_users: irc.allowed_users.clone(),
server_password: irc.server_password.clone(),
nickserv_password: irc.nickserv_password.clone(),
sasl_password: irc.sasl_password.clone(),
verify_tls: irc.verify_tls.unwrap_or(true),
})));
}
if let Some(ref lk) = config.channels_config.lark {
channels.push(Arc::new(LarkChannel::from_config(lk)));
}
if let Some(ref dt) = config.channels_config.dingtalk {
channels.push(Arc::new(DingTalkChannel::new(
dt.client_id.clone(),
dt.client_secret.clone(),
dt.allowed_users.clone(),
)));
}
if channels.is_empty() {
println!("No channels configured. Run `zeroclaw onboard` to set up channels.");
return Ok(());
}
println!("🦀 ZeroClaw Channel Server");
println!(" 🤖 Model: {model}");
println!(
" 🧠 Memory: {} (auto-save: {})",
config.memory.backend,
if config.memory.auto_save { "on" } else { "off" }
);
println!(
" 📡 Channels: {}",
channels
.iter()
.map(|c| c.name())
.collect::<Vec<_>>()
.join(", ")
);
println!();
println!(" Listening for messages... (Ctrl+C to stop)");
println!();
crate::health::mark_component_ok("channels");
let initial_backoff_secs = config
.reliability
.channel_initial_backoff_secs
.max(DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS);
let max_backoff_secs = config
.reliability
.channel_max_backoff_secs
.max(DEFAULT_CHANNEL_MAX_BACKOFF_SECS);
// Single message bus — all channels send messages here
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(100);
// Spawn a listener for each channel
let mut handles = Vec::new();
for ch in &channels {
handles.push(spawn_supervised_listener(
ch.clone(),
tx.clone(),
initial_backoff_secs,
max_backoff_secs,
));
}
drop(tx); // Drop our copy so rx closes when all channels stop
let channels_by_name = Arc::new(
channels
.iter()
.map(|ch| (ch.name().to_string(), Arc::clone(ch)))
.collect::<HashMap<_, _>>(),
);
let max_in_flight_messages = compute_max_in_flight_messages(channels.len());
println!(" 🚦 In-flight message limit: {max_in_flight_messages}");
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name,
provider: Arc::clone(&provider),
memory: Arc::clone(&mem),
tools_registry: Arc::clone(&tools_registry),
observer,
system_prompt: Arc::new(system_prompt),
model: Arc::new(model.clone()),
temperature,
auto_save_memory: config.memory.auto_save,
});
run_message_dispatch_loop(rx, runtime_ctx, max_in_flight_messages).await;
// Wait for all channel tasks
for h in handles {
let _ = h.await;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory::{Memory, MemoryCategory, SqliteMemory};
use crate::observability::NoopObserver;
use crate::providers::{ChatMessage, Provider};
use crate::tools::{Tool, ToolResult};
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tempfile::TempDir;
fn make_workspace() -> TempDir {
let tmp = TempDir::new().unwrap();
// Create minimal workspace files
std::fs::write(tmp.path().join("SOUL.md"), "# Soul\nBe helpful.").unwrap();
std::fs::write(tmp.path().join("IDENTITY.md"), "# Identity\nName: ZeroClaw").unwrap();
std::fs::write(tmp.path().join("USER.md"), "# User\nName: Test User").unwrap();
std::fs::write(
tmp.path().join("AGENTS.md"),
"# Agents\nFollow instructions.",
)
.unwrap();
std::fs::write(tmp.path().join("TOOLS.md"), "# Tools\nUse shell carefully.").unwrap();
std::fs::write(
tmp.path().join("HEARTBEAT.md"),
"# Heartbeat\nCheck status.",
)
.unwrap();
std::fs::write(tmp.path().join("MEMORY.md"), "# Memory\nUser likes Rust.").unwrap();
tmp
}
#[derive(Default)]
struct RecordingChannel {
sent_messages: tokio::sync::Mutex<Vec<String>>,
}
#[async_trait::async_trait]
impl Channel for RecordingChannel {
fn name(&self) -> &str {
"test-channel"
}
async fn send(&self, message: &str, recipient: &str) -> anyhow::Result<()> {
self.sent_messages
.lock()
.await
.push(format!("{recipient}:{message}"));
Ok(())
}
async fn listen(
&self,
_tx: tokio::sync::mpsc::Sender<traits::ChannelMessage>,
) -> anyhow::Result<()> {
Ok(())
}
}
struct SlowProvider {
delay: Duration,
}
#[async_trait::async_trait]
impl Provider for SlowProvider {
async fn chat_with_system(
&self,
_system_prompt: Option<&str>,
message: &str,
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
tokio::time::sleep(self.delay).await;
Ok(format!("echo: {message}"))
}
}
struct ToolCallingProvider;
fn tool_call_payload() -> String {
r#"<tool_call>
{"name":"mock_price","arguments":{"symbol":"BTC"}}
</tool_call>"#
.to_string()
}
#[async_trait::async_trait]
impl Provider for ToolCallingProvider {
async fn chat_with_system(
&self,
_system_prompt: Option<&str>,
_message: &str,
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
Ok(tool_call_payload())
}
async fn chat_with_history(
&self,
messages: &[ChatMessage],
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
let has_tool_results = messages
.iter()
.any(|msg| msg.role == "user" && msg.content.contains("[Tool results]"));
if has_tool_results {
Ok("BTC is currently around $65,000 based on latest tool output.".to_string())
} else {
Ok(tool_call_payload())
}
}
}
struct MockPriceTool;
#[async_trait::async_trait]
impl Tool for MockPriceTool {
fn name(&self) -> &str {
"mock_price"
}
fn description(&self) -> &str {
"Return a mocked BTC price"
}
fn parameters_schema(&self) -> serde_json::Value {
serde_json::json!({
"type": "object",
"properties": {
"symbol": { "type": "string" }
},
"required": ["symbol"]
})
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
let symbol = args.get("symbol").and_then(serde_json::Value::as_str);
if symbol != Some("BTC") {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some("unexpected symbol".to_string()),
});
}
Ok(ToolResult {
success: true,
output: r#"{"symbol":"BTC","price_usd":65000}"#.to_string(),
error: None,
})
}
}
#[tokio::test]
async fn process_channel_message_executes_tool_calls_instead_of_sending_raw_json() {
let channel_impl = Arc::new(RecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: Arc::new(ToolCallingProvider),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![Box::new(MockPriceTool)]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
});
process_channel_message(
runtime_ctx,
traits::ChannelMessage {
id: "msg-1".to_string(),
sender: "alice".to_string(),
content: "What is the BTC price now?".to_string(),
channel: "test-channel".to_string(),
timestamp: 1,
},
)
.await;
let sent_messages = channel_impl.sent_messages.lock().await;
assert_eq!(sent_messages.len(), 1);
assert!(sent_messages[0].contains("BTC is currently around"));
assert!(!sent_messages[0].contains("\"tool_calls\""));
assert!(!sent_messages[0].contains("mock_price"));
}
struct NoopMemory;
#[async_trait::async_trait]
impl Memory for NoopMemory {
fn name(&self) -> &str {
"noop"
}
async fn store(
&self,
_key: &str,
_content: &str,
_category: crate::memory::MemoryCategory,
) -> anyhow::Result<()> {
Ok(())
}
async fn recall(
&self,
_query: &str,
_limit: usize,
) -> anyhow::Result<Vec<crate::memory::MemoryEntry>> {
Ok(Vec::new())
}
async fn get(&self, _key: &str) -> anyhow::Result<Option<crate::memory::MemoryEntry>> {
Ok(None)
}
async fn list(
&self,
_category: Option<&crate::memory::MemoryCategory>,
) -> anyhow::Result<Vec<crate::memory::MemoryEntry>> {
Ok(Vec::new())
}
async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
Ok(false)
}
async fn count(&self) -> anyhow::Result<usize> {
Ok(0)
}
async fn health_check(&self) -> bool {
true
}
}
#[tokio::test]
async fn message_dispatch_processes_messages_in_parallel() {
let channel_impl = Arc::new(RecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: Arc::new(SlowProvider {
delay: Duration::from_millis(250),
}),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
});
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(4);
tx.send(traits::ChannelMessage {
id: "1".to_string(),
sender: "alice".to_string(),
content: "hello".to_string(),
channel: "test-channel".to_string(),
timestamp: 1,
})
.await
.unwrap();
tx.send(traits::ChannelMessage {
id: "2".to_string(),
sender: "bob".to_string(),
content: "world".to_string(),
channel: "test-channel".to_string(),
timestamp: 2,
})
.await
.unwrap();
drop(tx);
let started = Instant::now();
run_message_dispatch_loop(rx, runtime_ctx, 2).await;
let elapsed = started.elapsed();
assert!(
elapsed < Duration::from_millis(430),
"expected parallel dispatch (<430ms), got {:?}",
elapsed
);
let sent_messages = channel_impl.sent_messages.lock().await;
assert_eq!(sent_messages.len(), 2);
}
#[test]
fn prompt_contains_all_sections() {
let ws = make_workspace();
let tools = vec![("shell", "Run commands"), ("file_read", "Read files")];
let prompt = build_system_prompt(ws.path(), "test-model", &tools, &[], None, None);
// Section headers
assert!(prompt.contains("## Tools"), "missing Tools section");
assert!(prompt.contains("## Safety"), "missing Safety section");
assert!(prompt.contains("## Workspace"), "missing Workspace section");
assert!(
prompt.contains("## Project Context"),
"missing Project Context"
);
assert!(
prompt.contains("## Current Date & Time"),
"missing Date/Time"
);
assert!(prompt.contains("## Runtime"), "missing Runtime section");
}
#[test]
fn prompt_injects_tools() {
let ws = make_workspace();
let tools = vec![
("shell", "Run commands"),
("memory_recall", "Search memory"),
];
let prompt = build_system_prompt(ws.path(), "gpt-4o", &tools, &[], None, None);
assert!(prompt.contains("**shell**"));
assert!(prompt.contains("Run commands"));
assert!(prompt.contains("**memory_recall**"));
}
#[test]
fn prompt_injects_safety() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None, None);
assert!(prompt.contains("Do not exfiltrate private data"));
assert!(prompt.contains("Do not run destructive commands"));
assert!(prompt.contains("Prefer `trash` over `rm`"));
}
#[test]
fn prompt_injects_workspace_files() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None, None);
assert!(prompt.contains("### SOUL.md"), "missing SOUL.md header");
assert!(prompt.contains("Be helpful"), "missing SOUL content");
assert!(prompt.contains("### IDENTITY.md"), "missing IDENTITY.md");
assert!(
prompt.contains("Name: ZeroClaw"),
"missing IDENTITY content"
);
assert!(prompt.contains("### USER.md"), "missing USER.md");
assert!(prompt.contains("### AGENTS.md"), "missing AGENTS.md");
assert!(prompt.contains("### TOOLS.md"), "missing TOOLS.md");
assert!(prompt.contains("### HEARTBEAT.md"), "missing HEARTBEAT.md");
assert!(prompt.contains("### MEMORY.md"), "missing MEMORY.md");
assert!(prompt.contains("User likes Rust"), "missing MEMORY content");
}
#[test]
fn prompt_missing_file_markers() {
let tmp = TempDir::new().unwrap();
// Empty workspace — no files at all
let prompt = build_system_prompt(tmp.path(), "model", &[], &[], None, None);
assert!(prompt.contains("[File not found: SOUL.md]"));
assert!(prompt.contains("[File not found: AGENTS.md]"));
assert!(prompt.contains("[File not found: IDENTITY.md]"));
}
#[test]
fn prompt_bootstrap_only_if_exists() {
let ws = make_workspace();
// No BOOTSTRAP.md — should not appear
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None, None);
assert!(
!prompt.contains("### BOOTSTRAP.md"),
"BOOTSTRAP.md should not appear when missing"
);
// Create BOOTSTRAP.md — should appear
std::fs::write(ws.path().join("BOOTSTRAP.md"), "# Bootstrap\nFirst run.").unwrap();
let prompt2 = build_system_prompt(ws.path(), "model", &[], &[], None, None);
assert!(
prompt2.contains("### BOOTSTRAP.md"),
"BOOTSTRAP.md should appear when present"
);
assert!(prompt2.contains("First run"));
}
#[test]
fn prompt_no_daily_memory_injection() {
let ws = make_workspace();
let memory_dir = ws.path().join("memory");
std::fs::create_dir_all(&memory_dir).unwrap();
let today = chrono::Local::now().format("%Y-%m-%d").to_string();
std::fs::write(
memory_dir.join(format!("{today}.md")),
"# Daily\nSome note.",
)
.unwrap();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None, None);
// Daily notes should NOT be in the system prompt (on-demand via tools)
assert!(
!prompt.contains("Daily Notes"),
"daily notes should not be auto-injected"
);
assert!(
!prompt.contains("Some note"),
"daily content should not be in prompt"
);
}
#[test]
fn prompt_runtime_metadata() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "claude-sonnet-4", &[], &[], None, None);
assert!(prompt.contains("Model: claude-sonnet-4"));
assert!(prompt.contains(&format!("OS: {}", std::env::consts::OS)));
assert!(prompt.contains("Host:"));
}
#[test]
fn prompt_skills_compact_list() {
let ws = make_workspace();
let skills = vec![crate::skills::Skill {
name: "code-review".into(),
description: "Review code for bugs".into(),
version: "1.0.0".into(),
author: None,
tags: vec![],
tools: vec![],
prompts: vec!["Long prompt content that should NOT appear in system prompt".into()],
location: None,
}];
let prompt = build_system_prompt(ws.path(), "model", &[], &skills, None, None);
assert!(prompt.contains("<available_skills>"), "missing skills XML");
assert!(prompt.contains("<name>code-review</name>"));
assert!(prompt.contains("<description>Review code for bugs</description>"));
assert!(prompt.contains("SKILL.md</location>"));
assert!(
prompt.contains("loaded on demand"),
"should mention on-demand loading"
);
// Full prompt content should NOT be dumped
assert!(!prompt.contains("Long prompt content that should NOT appear"));
}
#[test]
fn prompt_truncation() {
let ws = make_workspace();
// Write a file larger than BOOTSTRAP_MAX_CHARS
let big_content = "x".repeat(BOOTSTRAP_MAX_CHARS + 1000);
std::fs::write(ws.path().join("AGENTS.md"), &big_content).unwrap();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None, None);
assert!(
prompt.contains("truncated at"),
"large files should be truncated"
);
assert!(
!prompt.contains(&big_content),
"full content should not appear"
);
}
#[test]
fn prompt_empty_files_skipped() {
let ws = make_workspace();
std::fs::write(ws.path().join("TOOLS.md"), "").unwrap();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None, None);
// Empty file should not produce a header
assert!(
!prompt.contains("### TOOLS.md"),
"empty files should be skipped"
);
}
#[test]
fn channel_log_truncation_is_utf8_safe_for_multibyte_text() {
let msg = "Hello from ZeroClaw 🌍. Current status is healthy, and café-style UTF-8 text stays safe in logs.";
// Reproduces the production crash path where channel logs truncate at 80 chars.
let result = std::panic::catch_unwind(|| crate::util::truncate_with_ellipsis(msg, 80));
assert!(
result.is_ok(),
"truncate_with_ellipsis should never panic on UTF-8"
);
let truncated = result.unwrap();
assert!(!truncated.is_empty());
assert!(truncated.is_char_boundary(truncated.len()));
}
#[test]
fn prompt_workspace_path() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None, None);
assert!(prompt.contains(&format!("Working directory: `{}`", ws.path().display())));
}
#[test]
fn conversation_memory_key_uses_message_id() {
let msg = traits::ChannelMessage {
id: "msg_abc123".into(),
sender: "U123".into(),
content: "hello".into(),
channel: "slack".into(),
timestamp: 1,
};
assert_eq!(conversation_memory_key(&msg), "slack_U123_msg_abc123");
}
#[test]
fn conversation_memory_key_is_unique_per_message() {
let msg1 = traits::ChannelMessage {
id: "msg_1".into(),
sender: "U123".into(),
content: "first".into(),
channel: "slack".into(),
timestamp: 1,
};
let msg2 = traits::ChannelMessage {
id: "msg_2".into(),
sender: "U123".into(),
content: "second".into(),
channel: "slack".into(),
timestamp: 2,
};
assert_ne!(
conversation_memory_key(&msg1),
conversation_memory_key(&msg2)
);
}
#[tokio::test]
async fn autosave_keys_preserve_multiple_conversation_facts() {
let tmp = TempDir::new().unwrap();
let mem = SqliteMemory::new(tmp.path()).unwrap();
let msg1 = traits::ChannelMessage {
id: "msg_1".into(),
sender: "U123".into(),
content: "I'm Paul".into(),
channel: "slack".into(),
timestamp: 1,
};
let msg2 = traits::ChannelMessage {
id: "msg_2".into(),
sender: "U123".into(),
content: "I'm 45".into(),
channel: "slack".into(),
timestamp: 2,
};
mem.store(
&conversation_memory_key(&msg1),
&msg1.content,
MemoryCategory::Conversation,
)
.await
.unwrap();
mem.store(
&conversation_memory_key(&msg2),
&msg2.content,
MemoryCategory::Conversation,
)
.await
.unwrap();
assert_eq!(mem.count().await.unwrap(), 2);
let recalled = mem.recall("45", 5).await.unwrap();
assert!(recalled.iter().any(|entry| entry.content.contains("45")));
}
#[tokio::test]
async fn build_memory_context_includes_recalled_entries() {
let tmp = TempDir::new().unwrap();
let mem = SqliteMemory::new(tmp.path()).unwrap();
mem.store("age_fact", "Age is 45", MemoryCategory::Conversation)
.await
.unwrap();
let context = build_memory_context(&mem, "age").await;
assert!(context.contains("[Memory context]"));
assert!(context.contains("Age is 45"));
}
// ── AIEOS Identity Tests (Issue #168) ─────────────────────────
#[test]
fn aieos_identity_from_file() {
use crate::config::IdentityConfig;
use tempfile::TempDir;
let tmp = TempDir::new().unwrap();
let identity_path = tmp.path().join("aieos_identity.json");
// Write AIEOS identity file
let aieos_json = r#"{
"identity": {
"names": {"first": "Nova", "nickname": "Nov"},
"bio": "A helpful AI assistant.",
"origin": "Silicon Valley"
},
"psychology": {
"mbti": "INTJ",
"moral_compass": ["Be helpful", "Do no harm"]
},
"linguistics": {
"style": "concise",
"formality": "casual"
}
}"#;
std::fs::write(&identity_path, aieos_json).unwrap();
// Create identity config pointing to the file
let config = IdentityConfig {
format: "aieos".into(),
aieos_path: Some("aieos_identity.json".into()),
aieos_inline: None,
};
let prompt = build_system_prompt(tmp.path(), "model", &[], &[], Some(&config), None);
// Should contain AIEOS sections
assert!(prompt.contains("## Identity"));
assert!(prompt.contains("**Name:** Nova"));
assert!(prompt.contains("**Nickname:** Nov"));
assert!(prompt.contains("**Bio:** A helpful AI assistant."));
assert!(prompt.contains("**Origin:** Silicon Valley"));
assert!(prompt.contains("## Personality"));
assert!(prompt.contains("**MBTI:** INTJ"));
assert!(prompt.contains("**Moral Compass:**"));
assert!(prompt.contains("- Be helpful"));
assert!(prompt.contains("## Communication Style"));
assert!(prompt.contains("**Style:** concise"));
assert!(prompt.contains("**Formality Level:** casual"));
// Should NOT contain OpenClaw bootstrap file headers
assert!(!prompt.contains("### SOUL.md"));
assert!(!prompt.contains("### IDENTITY.md"));
assert!(!prompt.contains("[File not found"));
}
#[test]
fn aieos_identity_from_inline() {
use crate::config::IdentityConfig;
let config = IdentityConfig {
format: "aieos".into(),
aieos_path: None,
aieos_inline: Some(r#"{"identity":{"names":{"first":"Claw"}}}"#.into()),
};
let prompt = build_system_prompt(
std::env::temp_dir().as_path(),
"model",
&[],
&[],
Some(&config),
None,
);
assert!(prompt.contains("**Name:** Claw"));
assert!(prompt.contains("## Identity"));
}
#[test]
fn aieos_fallback_to_openclaw_on_parse_error() {
use crate::config::IdentityConfig;
let config = IdentityConfig {
format: "aieos".into(),
aieos_path: Some("nonexistent.json".into()),
aieos_inline: None,
};
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], Some(&config), None);
// Should fall back to OpenClaw format when AIEOS file is not found
// (Error is logged to stderr with filename, not included in prompt)
assert!(prompt.contains("### SOUL.md"));
}
#[test]
fn aieos_empty_uses_openclaw() {
use crate::config::IdentityConfig;
// Format is "aieos" but neither path nor inline is set
let config = IdentityConfig {
format: "aieos".into(),
aieos_path: None,
aieos_inline: None,
};
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], Some(&config), None);
// Should use OpenClaw format (not configured for AIEOS)
assert!(prompt.contains("### SOUL.md"));
assert!(prompt.contains("Be helpful"));
}
#[test]
fn openclaw_format_uses_bootstrap_files() {
use crate::config::IdentityConfig;
let config = IdentityConfig {
format: "openclaw".into(),
aieos_path: Some("identity.json".into()),
aieos_inline: None,
};
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], Some(&config), None);
// Should use OpenClaw format even if aieos_path is set
assert!(prompt.contains("### SOUL.md"));
assert!(prompt.contains("Be helpful"));
assert!(!prompt.contains("## Identity"));
}
#[test]
fn none_identity_config_uses_openclaw() {
let ws = make_workspace();
// Pass None for identity config
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None, None);
// Should use OpenClaw format
assert!(prompt.contains("### SOUL.md"));
assert!(prompt.contains("Be helpful"));
}
#[test]
fn classify_health_ok_true() {
let state = classify_health_result(&Ok(true));
assert_eq!(state, ChannelHealthState::Healthy);
}
#[test]
fn classify_health_ok_false() {
let state = classify_health_result(&Ok(false));
assert_eq!(state, ChannelHealthState::Unhealthy);
}
#[tokio::test]
async fn classify_health_timeout() {
let result = tokio::time::timeout(Duration::from_millis(1), async {
tokio::time::sleep(Duration::from_millis(20)).await;
true
})
.await;
let state = classify_health_result(&result);
assert_eq!(state, ChannelHealthState::Timeout);
}
struct AlwaysFailChannel {
name: &'static str,
calls: Arc<AtomicUsize>,
}
#[async_trait::async_trait]
impl Channel for AlwaysFailChannel {
fn name(&self) -> &str {
self.name
}
async fn send(&self, _message: &str, _recipient: &str) -> anyhow::Result<()> {
Ok(())
}
async fn listen(
&self,
_tx: tokio::sync::mpsc::Sender<traits::ChannelMessage>,
) -> anyhow::Result<()> {
self.calls.fetch_add(1, Ordering::SeqCst);
anyhow::bail!("listen boom")
}
}
#[tokio::test]
async fn supervised_listener_marks_error_and_restarts_on_failures() {
let calls = Arc::new(AtomicUsize::new(0));
let channel: Arc<dyn Channel> = Arc::new(AlwaysFailChannel {
name: "test-supervised-fail",
calls: Arc::clone(&calls),
});
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(1);
let handle = spawn_supervised_listener(channel, tx, 1, 1);
tokio::time::sleep(Duration::from_millis(80)).await;
drop(rx);
handle.abort();
let _ = handle.await;
let snapshot = crate::health::snapshot_json();
let component = &snapshot["components"]["channel:test-supervised-fail"];
assert_eq!(component["status"], "error");
assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
assert!(component["last_error"]
.as_str()
.unwrap_or("")
.contains("listen boom"));
assert!(calls.load(Ordering::SeqCst) >= 1);
}
}