zeroclaw/src/channels/mod.rs
Chummy b442a07530
fix(memory): prevent autosave key collisions across runtime flows
Fixes #221 - SQLite Memory Override bug.

This PR resolves memory overwrite behavior in autosave paths by replacing fixed memory keys with unique keys, and improves short-horizon recall quality in channel runtime.

**Root Cause**
SQLite memory uses a unique constraint on `memories.key` and writes with `ON CONFLICT(key) DO UPDATE`.
Several autosave paths reused fixed keys (or sender-stable keys), so newer messages overwrote earlier conversation entries.

**Changes**
- Channel runtime: autosave key changed from `channel_sender` to `channel_sender_messageId`
- Added memory-context injection before provider calls (aligned with agent loop behavior)
- Agent loop: autosave keys changed from fixed `user_msg`/`assistant_resp` to UUID-suffixed keys
- Gateway: Webhook/WhatsApp autosave keys changed to UUID-suffixed keys

All CI checks passing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 22:55:52 -05:00

1344 lines
46 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

pub mod cli;
pub mod discord;
pub mod email_channel;
pub mod imessage;
pub mod irc;
pub mod matrix;
pub mod slack;
pub mod telegram;
pub mod traits;
pub mod whatsapp;
pub use cli::CliChannel;
pub use discord::DiscordChannel;
pub use email_channel::EmailChannel;
pub use imessage::IMessageChannel;
pub use irc::IrcChannel;
pub use matrix::MatrixChannel;
pub use slack::SlackChannel;
pub use telegram::TelegramChannel;
pub use traits::Channel;
pub use whatsapp::WhatsAppChannel;
use crate::config::Config;
use crate::identity;
use crate::memory::{self, Memory};
use crate::providers::{self, Provider};
use crate::util::truncate_with_ellipsis;
use anyhow::Result;
use std::fmt::Write;
use std::sync::Arc;
use std::time::{Duration, Instant};
/// Maximum characters per injected workspace file (matches `OpenClaw` default).
const BOOTSTRAP_MAX_CHARS: usize = 20_000;
const DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS: u64 = 2;
const DEFAULT_CHANNEL_MAX_BACKOFF_SECS: u64 = 60;
const CHANNEL_MESSAGE_TIMEOUT_SECS: u64 = 90;
fn conversation_memory_key(msg: &traits::ChannelMessage) -> String {
format!("{}_{}_{}", msg.channel, msg.sender, msg.id)
}
async fn build_memory_context(mem: &dyn Memory, user_msg: &str) -> String {
let mut context = String::new();
if let Ok(entries) = mem.recall(user_msg, 5).await {
if !entries.is_empty() {
context.push_str("[Memory context]\n");
for entry in &entries {
let _ = writeln!(context, "- {}: {}", entry.key, entry.content);
}
context.push('\n');
}
}
context
}
fn spawn_supervised_listener(
ch: Arc<dyn Channel>,
tx: tokio::sync::mpsc::Sender<traits::ChannelMessage>,
initial_backoff_secs: u64,
max_backoff_secs: u64,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let component = format!("channel:{}", ch.name());
let mut backoff = initial_backoff_secs.max(1);
let max_backoff = max_backoff_secs.max(backoff);
loop {
crate::health::mark_component_ok(&component);
let result = ch.listen(tx.clone()).await;
if tx.is_closed() {
break;
}
match result {
Ok(()) => {
tracing::warn!("Channel {} exited unexpectedly; restarting", ch.name());
crate::health::mark_component_error(&component, "listener exited unexpectedly");
// Clean exit — reset backoff since the listener ran successfully
backoff = initial_backoff_secs.max(1);
}
Err(e) => {
tracing::error!("Channel {} error: {e}; restarting", ch.name());
crate::health::mark_component_error(&component, e.to_string());
}
}
crate::health::bump_component_restart(&component);
tokio::time::sleep(Duration::from_secs(backoff)).await;
// Double backoff AFTER sleeping so first error uses initial_backoff
backoff = backoff.saturating_mul(2).min(max_backoff);
}
})
}
/// Load OpenClaw format bootstrap files into the prompt.
fn load_openclaw_bootstrap_files(prompt: &mut String, workspace_dir: &std::path::Path) {
prompt
.push_str("The following workspace files define your identity, behavior, and context.\n\n");
let bootstrap_files = [
"AGENTS.md",
"SOUL.md",
"TOOLS.md",
"IDENTITY.md",
"USER.md",
"HEARTBEAT.md",
];
for filename in &bootstrap_files {
inject_workspace_file(prompt, workspace_dir, filename);
}
// BOOTSTRAP.md — only if it exists (first-run ritual)
let bootstrap_path = workspace_dir.join("BOOTSTRAP.md");
if bootstrap_path.exists() {
inject_workspace_file(prompt, workspace_dir, "BOOTSTRAP.md");
}
// MEMORY.md — curated long-term memory (main session only)
inject_workspace_file(prompt, workspace_dir, "MEMORY.md");
}
/// Load workspace identity files and build a system prompt.
///
/// Follows the `OpenClaw` framework structure by default:
/// 1. Tooling — tool list + descriptions
/// 2. Safety — guardrail reminder
/// 3. Skills — compact list with paths (loaded on-demand)
/// 4. Workspace — working directory
/// 5. Bootstrap files — AGENTS, SOUL, TOOLS, IDENTITY, USER, HEARTBEAT, BOOTSTRAP, MEMORY
/// 6. Date & Time — timezone for cache stability
/// 7. Runtime — host, OS, model
///
/// When `identity_config` is set to AIEOS format, the bootstrap files section
/// is replaced with the AIEOS identity data loaded from file or inline JSON.
///
/// Daily memory files (`memory/*.md`) are NOT injected — they are accessed
/// on-demand via `memory_recall` / `memory_search` tools.
pub fn build_system_prompt(
workspace_dir: &std::path::Path,
model_name: &str,
tools: &[(&str, &str)],
skills: &[crate::skills::Skill],
identity_config: Option<&crate::config::IdentityConfig>,
) -> String {
use std::fmt::Write;
let mut prompt = String::with_capacity(8192);
// ── 1. Tooling ──────────────────────────────────────────────
if !tools.is_empty() {
prompt.push_str("## Tools\n\n");
prompt.push_str("You have access to the following tools:\n\n");
for (name, desc) in tools {
let _ = writeln!(prompt, "- **{name}**: {desc}");
}
prompt.push('\n');
}
// ── 2. Safety ───────────────────────────────────────────────
prompt.push_str("## Safety\n\n");
prompt.push_str(
"- Do not exfiltrate private data.\n\
- Do not run destructive commands without asking.\n\
- Do not bypass oversight or approval mechanisms.\n\
- Prefer `trash` over `rm` (recoverable beats gone forever).\n\
- When in doubt, ask before acting externally.\n\n",
);
// ── 3. Skills (compact list — load on-demand) ───────────────
if !skills.is_empty() {
prompt.push_str("## Available Skills\n\n");
prompt.push_str(
"Skills are loaded on demand. Use `read` on the skill path to get full instructions.\n\n",
);
prompt.push_str("<available_skills>\n");
for skill in skills {
let _ = writeln!(prompt, " <skill>");
let _ = writeln!(prompt, " <name>{}</name>", skill.name);
let _ = writeln!(
prompt,
" <description>{}</description>",
skill.description
);
let location = skill.location.clone().unwrap_or_else(|| {
workspace_dir
.join("skills")
.join(&skill.name)
.join("SKILL.md")
});
let _ = writeln!(prompt, " <location>{}</location>", location.display());
let _ = writeln!(prompt, " </skill>");
}
prompt.push_str("</available_skills>\n\n");
}
// ── 4. Workspace ────────────────────────────────────────────
let _ = writeln!(
prompt,
"## Workspace\n\nWorking directory: `{}`\n",
workspace_dir.display()
);
// ── 5. Bootstrap files (injected into context) ──────────────
prompt.push_str("## Project Context\n\n");
// Check if AIEOS identity is configured
if let Some(config) = identity_config {
if identity::is_aieos_configured(config) {
// Load AIEOS identity
match identity::load_aieos_identity(config, workspace_dir) {
Ok(Some(aieos_identity)) => {
let aieos_prompt = identity::aieos_to_system_prompt(&aieos_identity);
if !aieos_prompt.is_empty() {
prompt.push_str(&aieos_prompt);
prompt.push_str("\n\n");
}
}
Ok(None) => {
// No AIEOS identity loaded (shouldn't happen if is_aieos_configured returned true)
// Fall back to OpenClaw bootstrap files
load_openclaw_bootstrap_files(&mut prompt, workspace_dir);
}
Err(e) => {
// Log error but don't fail - fall back to OpenClaw
eprintln!(
"Warning: Failed to load AIEOS identity: {e}. Using OpenClaw format."
);
load_openclaw_bootstrap_files(&mut prompt, workspace_dir);
}
}
} else {
// OpenClaw format
load_openclaw_bootstrap_files(&mut prompt, workspace_dir);
}
} else {
// No identity config - use OpenClaw format
load_openclaw_bootstrap_files(&mut prompt, workspace_dir);
}
// ── 6. Date & Time ──────────────────────────────────────────
let now = chrono::Local::now();
let tz = now.format("%Z").to_string();
let _ = writeln!(prompt, "## Current Date & Time\n\nTimezone: {tz}\n");
// ── 7. Runtime ──────────────────────────────────────────────
let host =
hostname::get().map_or_else(|_| "unknown".into(), |h| h.to_string_lossy().to_string());
let _ = writeln!(
prompt,
"## Runtime\n\nHost: {host} | OS: {} | Model: {model_name}\n",
std::env::consts::OS,
);
if prompt.is_empty() {
"You are ZeroClaw, a fast and efficient AI assistant built in Rust. Be helpful, concise, and direct.".to_string()
} else {
prompt
}
}
/// Inject a single workspace file into the prompt with truncation and missing-file markers.
fn inject_workspace_file(prompt: &mut String, workspace_dir: &std::path::Path, filename: &str) {
use std::fmt::Write;
let path = workspace_dir.join(filename);
match std::fs::read_to_string(&path) {
Ok(content) => {
let trimmed = content.trim();
if trimmed.is_empty() {
return;
}
let _ = writeln!(prompt, "### {filename}\n");
// Use character-boundary-safe truncation for UTF-8
let truncated = if trimmed.chars().count() > BOOTSTRAP_MAX_CHARS {
trimmed
.char_indices()
.nth(BOOTSTRAP_MAX_CHARS)
.map(|(idx, _)| &trimmed[..idx])
.unwrap_or(trimmed)
} else {
trimmed
};
if truncated.len() < trimmed.len() {
prompt.push_str(truncated);
let _ = writeln!(
prompt,
"\n\n[... truncated at {BOOTSTRAP_MAX_CHARS} chars — use `read` for full file]\n"
);
} else {
prompt.push_str(trimmed);
prompt.push_str("\n\n");
}
}
Err(_) => {
// Missing-file marker (matches OpenClaw behavior)
let _ = writeln!(prompt, "### {filename}\n\n[File not found: {filename}]\n");
}
}
}
pub fn handle_command(command: crate::ChannelCommands, config: &Config) -> Result<()> {
match command {
crate::ChannelCommands::Start => {
anyhow::bail!("Start must be handled in main.rs (requires async runtime)")
}
crate::ChannelCommands::Doctor => {
anyhow::bail!("Doctor must be handled in main.rs (requires async runtime)")
}
crate::ChannelCommands::List => {
println!("Channels:");
println!(" ✅ CLI (always available)");
for (name, configured) in [
("Telegram", config.channels_config.telegram.is_some()),
("Discord", config.channels_config.discord.is_some()),
("Slack", config.channels_config.slack.is_some()),
("Webhook", config.channels_config.webhook.is_some()),
("iMessage", config.channels_config.imessage.is_some()),
("Matrix", config.channels_config.matrix.is_some()),
("WhatsApp", config.channels_config.whatsapp.is_some()),
("Email", config.channels_config.email.is_some()),
("IRC", config.channels_config.irc.is_some()),
] {
println!(" {} {name}", if configured { "" } else { "" });
}
println!("\nTo start channels: zeroclaw channel start");
println!("To check health: zeroclaw channel doctor");
println!("To configure: zeroclaw onboard");
Ok(())
}
crate::ChannelCommands::Add {
channel_type,
config: _,
} => {
anyhow::bail!(
"Channel type '{channel_type}' — use `zeroclaw onboard` to configure channels"
);
}
crate::ChannelCommands::Remove { name } => {
anyhow::bail!("Remove channel '{name}' — edit ~/.zeroclaw/config.toml directly");
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ChannelHealthState {
Healthy,
Unhealthy,
Timeout,
}
fn classify_health_result(
result: &std::result::Result<bool, tokio::time::error::Elapsed>,
) -> ChannelHealthState {
match result {
Ok(true) => ChannelHealthState::Healthy,
Ok(false) => ChannelHealthState::Unhealthy,
Err(_) => ChannelHealthState::Timeout,
}
}
/// Run health checks for configured channels.
pub async fn doctor_channels(config: Config) -> Result<()> {
let mut channels: Vec<(&'static str, Arc<dyn Channel>)> = Vec::new();
if let Some(ref tg) = config.channels_config.telegram {
channels.push((
"Telegram",
Arc::new(TelegramChannel::new(
tg.bot_token.clone(),
tg.allowed_users.clone(),
)),
));
}
if let Some(ref dc) = config.channels_config.discord {
channels.push((
"Discord",
Arc::new(DiscordChannel::new(
dc.bot_token.clone(),
dc.guild_id.clone(),
dc.allowed_users.clone(),
)),
));
}
if let Some(ref sl) = config.channels_config.slack {
channels.push((
"Slack",
Arc::new(SlackChannel::new(
sl.bot_token.clone(),
sl.channel_id.clone(),
sl.allowed_users.clone(),
)),
));
}
if let Some(ref im) = config.channels_config.imessage {
channels.push((
"iMessage",
Arc::new(IMessageChannel::new(im.allowed_contacts.clone())),
));
}
if let Some(ref mx) = config.channels_config.matrix {
channels.push((
"Matrix",
Arc::new(MatrixChannel::new(
mx.homeserver.clone(),
mx.access_token.clone(),
mx.room_id.clone(),
mx.allowed_users.clone(),
)),
));
}
if let Some(ref wa) = config.channels_config.whatsapp {
channels.push((
"WhatsApp",
Arc::new(WhatsAppChannel::new(
wa.access_token.clone(),
wa.phone_number_id.clone(),
wa.verify_token.clone(),
wa.allowed_numbers.clone(),
)),
));
}
if let Some(ref email_cfg) = config.channels_config.email {
channels.push(("Email", Arc::new(EmailChannel::new(email_cfg.clone()))));
}
if let Some(ref irc) = config.channels_config.irc {
channels.push((
"IRC",
Arc::new(IrcChannel::new(
irc.server.clone(),
irc.port,
irc.nickname.clone(),
irc.username.clone(),
irc.channels.clone(),
irc.allowed_users.clone(),
irc.server_password.clone(),
irc.nickserv_password.clone(),
irc.sasl_password.clone(),
irc.verify_tls.unwrap_or(true),
)),
));
}
if channels.is_empty() {
println!("No real-time channels configured. Run `zeroclaw onboard` first.");
return Ok(());
}
println!("🩺 ZeroClaw Channel Doctor");
println!();
let mut healthy = 0_u32;
let mut unhealthy = 0_u32;
let mut timeout = 0_u32;
for (name, channel) in channels {
let result = tokio::time::timeout(Duration::from_secs(10), channel.health_check()).await;
let state = classify_health_result(&result);
match state {
ChannelHealthState::Healthy => {
healthy += 1;
println!("{name:<9} healthy");
}
ChannelHealthState::Unhealthy => {
unhealthy += 1;
println!("{name:<9} unhealthy (auth/config/network)");
}
ChannelHealthState::Timeout => {
timeout += 1;
println!(" ⏱️ {name:<9} timed out (>10s)");
}
}
}
if config.channels_config.webhook.is_some() {
println!(" Webhook check via `zeroclaw gateway` then GET /health");
}
println!();
println!("Summary: {healthy} healthy, {unhealthy} unhealthy, {timeout} timed out");
Ok(())
}
/// Start all configured channels and route messages to the agent
#[allow(clippy::too_many_lines)]
pub async fn start_channels(config: Config) -> Result<()> {
let provider: Arc<dyn Provider> = Arc::from(providers::create_resilient_provider(
config.default_provider.as_deref().unwrap_or("openrouter"),
config.api_key.as_deref(),
&config.reliability,
)?);
// Warm up the provider connection pool (TLS handshake, DNS, HTTP/2 setup)
// so the first real message doesn't hit a cold-start timeout.
if let Err(e) = provider.warmup().await {
tracing::warn!("Provider warmup failed (non-fatal): {e}");
}
let model = config
.default_model
.clone()
.unwrap_or_else(|| "anthropic/claude-sonnet-4-20250514".into());
let temperature = config.default_temperature;
let mem: Arc<dyn Memory> = Arc::from(memory::create_memory(
&config.memory,
&config.workspace_dir,
config.api_key.as_deref(),
)?);
// Build system prompt from workspace identity files + skills
let workspace = config.workspace_dir.clone();
let skills = crate::skills::load_skills(&workspace);
// Collect tool descriptions for the prompt
let mut tool_descs: Vec<(&str, &str)> = vec![
(
"shell",
"Execute terminal commands. Use when: running local checks, build/test commands, diagnostics. Don't use when: a safer dedicated tool exists, or command is destructive without approval.",
),
(
"file_read",
"Read file contents. Use when: inspecting project files, configs, logs. Don't use when: a targeted search is enough.",
),
(
"file_write",
"Write file contents. Use when: applying focused edits, scaffolding files, updating docs/code. Don't use when: side effects are unclear or file ownership is uncertain.",
),
(
"memory_store",
"Save to memory. Use when: preserving durable preferences, decisions, key context. Don't use when: information is transient/noisy/sensitive without need.",
),
(
"memory_recall",
"Search memory. Use when: retrieving prior decisions, user preferences, historical context. Don't use when: answer is already in current context.",
),
(
"memory_forget",
"Delete a memory entry. Use when: memory is incorrect/stale or explicitly requested for removal. Don't use when: impact is uncertain.",
),
];
if config.browser.enabled {
tool_descs.push((
"browser_open",
"Open approved HTTPS URLs in Brave Browser (allowlist-only, no scraping)",
));
}
let system_prompt = build_system_prompt(
&workspace,
&model,
&tool_descs,
&skills,
Some(&config.identity),
);
if !skills.is_empty() {
println!(
" 🧩 Skills: {}",
skills
.iter()
.map(|s| s.name.as_str())
.collect::<Vec<_>>()
.join(", ")
);
}
// Collect active channels
let mut channels: Vec<Arc<dyn Channel>> = Vec::new();
if let Some(ref tg) = config.channels_config.telegram {
channels.push(Arc::new(TelegramChannel::new(
tg.bot_token.clone(),
tg.allowed_users.clone(),
)));
}
if let Some(ref dc) = config.channels_config.discord {
channels.push(Arc::new(DiscordChannel::new(
dc.bot_token.clone(),
dc.guild_id.clone(),
dc.allowed_users.clone(),
)));
}
if let Some(ref sl) = config.channels_config.slack {
channels.push(Arc::new(SlackChannel::new(
sl.bot_token.clone(),
sl.channel_id.clone(),
sl.allowed_users.clone(),
)));
}
if let Some(ref im) = config.channels_config.imessage {
channels.push(Arc::new(IMessageChannel::new(im.allowed_contacts.clone())));
}
if let Some(ref mx) = config.channels_config.matrix {
channels.push(Arc::new(MatrixChannel::new(
mx.homeserver.clone(),
mx.access_token.clone(),
mx.room_id.clone(),
mx.allowed_users.clone(),
)));
}
if let Some(ref wa) = config.channels_config.whatsapp {
channels.push(Arc::new(WhatsAppChannel::new(
wa.access_token.clone(),
wa.phone_number_id.clone(),
wa.verify_token.clone(),
wa.allowed_numbers.clone(),
)));
}
if let Some(ref email_cfg) = config.channels_config.email {
channels.push(Arc::new(EmailChannel::new(email_cfg.clone())));
}
if let Some(ref irc) = config.channels_config.irc {
channels.push(Arc::new(IrcChannel::new(
irc.server.clone(),
irc.port,
irc.nickname.clone(),
irc.username.clone(),
irc.channels.clone(),
irc.allowed_users.clone(),
irc.server_password.clone(),
irc.nickserv_password.clone(),
irc.sasl_password.clone(),
irc.verify_tls.unwrap_or(true),
)));
}
if channels.is_empty() {
println!("No channels configured. Run `zeroclaw onboard` to set up channels.");
return Ok(());
}
println!("🦀 ZeroClaw Channel Server");
println!(" 🤖 Model: {model}");
println!(
" 🧠 Memory: {} (auto-save: {})",
config.memory.backend,
if config.memory.auto_save { "on" } else { "off" }
);
println!(
" 📡 Channels: {}",
channels
.iter()
.map(|c| c.name())
.collect::<Vec<_>>()
.join(", ")
);
println!();
println!(" Listening for messages... (Ctrl+C to stop)");
println!();
crate::health::mark_component_ok("channels");
let initial_backoff_secs = config
.reliability
.channel_initial_backoff_secs
.max(DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS);
let max_backoff_secs = config
.reliability
.channel_max_backoff_secs
.max(DEFAULT_CHANNEL_MAX_BACKOFF_SECS);
// Single message bus — all channels send messages here
let (tx, mut rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(100);
// Spawn a listener for each channel
let mut handles = Vec::new();
for ch in &channels {
handles.push(spawn_supervised_listener(
ch.clone(),
tx.clone(),
initial_backoff_secs,
max_backoff_secs,
));
}
drop(tx); // Drop our copy so rx closes when all channels stop
// Process incoming messages — call the LLM and reply
while let Some(msg) = rx.recv().await {
println!(
" 💬 [{}] from {}: {}",
msg.channel,
msg.sender,
truncate_with_ellipsis(&msg.content, 80)
);
let memory_context = build_memory_context(mem.as_ref(), &msg.content).await;
// Auto-save to memory
if config.memory.auto_save {
let autosave_key = conversation_memory_key(&msg);
let _ = mem
.store(
&autosave_key,
&msg.content,
crate::memory::MemoryCategory::Conversation,
)
.await;
}
let enriched_message = if memory_context.is_empty() {
msg.content.clone()
} else {
format!("{memory_context}{}", msg.content)
};
let target_channel = channels.iter().find(|ch| ch.name() == msg.channel);
// Show typing indicator while processing
if let Some(ch) = target_channel {
if let Err(e) = ch.start_typing(&msg.sender).await {
tracing::debug!("Failed to start typing on {}: {e}", ch.name());
}
}
// Call the LLM with system prompt (identity + soul + tools)
println!(" ⏳ Processing message...");
let started_at = Instant::now();
let llm_result = tokio::time::timeout(
Duration::from_secs(CHANNEL_MESSAGE_TIMEOUT_SECS),
provider.chat_with_system(Some(&system_prompt), &enriched_message, &model, temperature),
)
.await;
// Stop typing before sending the response
if let Some(ch) = target_channel {
if let Err(e) = ch.stop_typing(&msg.sender).await {
tracing::debug!("Failed to stop typing on {}: {e}", ch.name());
}
}
match llm_result {
Ok(Ok(response)) => {
println!(
" 🤖 Reply ({}ms): {}",
started_at.elapsed().as_millis(),
truncate_with_ellipsis(&response, 80)
);
if let Some(ch) = target_channel {
if let Err(e) = ch.send(&response, &msg.sender).await {
eprintln!(" ❌ Failed to reply on {}: {e}", ch.name());
}
}
}
Ok(Err(e)) => {
eprintln!(
" ❌ LLM error after {}ms: {e}",
started_at.elapsed().as_millis()
);
if let Some(ch) = target_channel {
let _ = ch.send(&format!("⚠️ Error: {e}"), &msg.sender).await;
}
}
Err(_) => {
let timeout_msg = format!(
"LLM response timed out after {}s",
CHANNEL_MESSAGE_TIMEOUT_SECS
);
eprintln!(
"{} (elapsed: {}ms)",
timeout_msg,
started_at.elapsed().as_millis()
);
if let Some(ch) = target_channel {
let _ = ch
.send(
"⚠️ Request timed out while waiting for the model. Please try again.",
&msg.sender,
)
.await;
}
}
}
}
// Wait for all channel tasks
for h in handles {
let _ = h.await;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory::{Memory, MemoryCategory, SqliteMemory};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tempfile::TempDir;
fn make_workspace() -> TempDir {
let tmp = TempDir::new().unwrap();
// Create minimal workspace files
std::fs::write(tmp.path().join("SOUL.md"), "# Soul\nBe helpful.").unwrap();
std::fs::write(tmp.path().join("IDENTITY.md"), "# Identity\nName: ZeroClaw").unwrap();
std::fs::write(tmp.path().join("USER.md"), "# User\nName: Test User").unwrap();
std::fs::write(
tmp.path().join("AGENTS.md"),
"# Agents\nFollow instructions.",
)
.unwrap();
std::fs::write(tmp.path().join("TOOLS.md"), "# Tools\nUse shell carefully.").unwrap();
std::fs::write(
tmp.path().join("HEARTBEAT.md"),
"# Heartbeat\nCheck status.",
)
.unwrap();
std::fs::write(tmp.path().join("MEMORY.md"), "# Memory\nUser likes Rust.").unwrap();
tmp
}
#[test]
fn prompt_contains_all_sections() {
let ws = make_workspace();
let tools = vec![("shell", "Run commands"), ("file_read", "Read files")];
let prompt = build_system_prompt(ws.path(), "test-model", &tools, &[], None);
// Section headers
assert!(prompt.contains("## Tools"), "missing Tools section");
assert!(prompt.contains("## Safety"), "missing Safety section");
assert!(prompt.contains("## Workspace"), "missing Workspace section");
assert!(
prompt.contains("## Project Context"),
"missing Project Context"
);
assert!(
prompt.contains("## Current Date & Time"),
"missing Date/Time"
);
assert!(prompt.contains("## Runtime"), "missing Runtime section");
}
#[test]
fn prompt_injects_tools() {
let ws = make_workspace();
let tools = vec![
("shell", "Run commands"),
("memory_recall", "Search memory"),
];
let prompt = build_system_prompt(ws.path(), "gpt-4o", &tools, &[], None);
assert!(prompt.contains("**shell**"));
assert!(prompt.contains("Run commands"));
assert!(prompt.contains("**memory_recall**"));
}
#[test]
fn prompt_injects_safety() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None);
assert!(prompt.contains("Do not exfiltrate private data"));
assert!(prompt.contains("Do not run destructive commands"));
assert!(prompt.contains("Prefer `trash` over `rm`"));
}
#[test]
fn prompt_injects_workspace_files() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None);
assert!(prompt.contains("### SOUL.md"), "missing SOUL.md header");
assert!(prompt.contains("Be helpful"), "missing SOUL content");
assert!(prompt.contains("### IDENTITY.md"), "missing IDENTITY.md");
assert!(
prompt.contains("Name: ZeroClaw"),
"missing IDENTITY content"
);
assert!(prompt.contains("### USER.md"), "missing USER.md");
assert!(prompt.contains("### AGENTS.md"), "missing AGENTS.md");
assert!(prompt.contains("### TOOLS.md"), "missing TOOLS.md");
assert!(prompt.contains("### HEARTBEAT.md"), "missing HEARTBEAT.md");
assert!(prompt.contains("### MEMORY.md"), "missing MEMORY.md");
assert!(prompt.contains("User likes Rust"), "missing MEMORY content");
}
#[test]
fn prompt_missing_file_markers() {
let tmp = TempDir::new().unwrap();
// Empty workspace — no files at all
let prompt = build_system_prompt(tmp.path(), "model", &[], &[], None);
assert!(prompt.contains("[File not found: SOUL.md]"));
assert!(prompt.contains("[File not found: AGENTS.md]"));
assert!(prompt.contains("[File not found: IDENTITY.md]"));
}
#[test]
fn prompt_bootstrap_only_if_exists() {
let ws = make_workspace();
// No BOOTSTRAP.md — should not appear
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None);
assert!(
!prompt.contains("### BOOTSTRAP.md"),
"BOOTSTRAP.md should not appear when missing"
);
// Create BOOTSTRAP.md — should appear
std::fs::write(ws.path().join("BOOTSTRAP.md"), "# Bootstrap\nFirst run.").unwrap();
let prompt2 = build_system_prompt(ws.path(), "model", &[], &[], None);
assert!(
prompt2.contains("### BOOTSTRAP.md"),
"BOOTSTRAP.md should appear when present"
);
assert!(prompt2.contains("First run"));
}
#[test]
fn prompt_no_daily_memory_injection() {
let ws = make_workspace();
let memory_dir = ws.path().join("memory");
std::fs::create_dir_all(&memory_dir).unwrap();
let today = chrono::Local::now().format("%Y-%m-%d").to_string();
std::fs::write(
memory_dir.join(format!("{today}.md")),
"# Daily\nSome note.",
)
.unwrap();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None);
// Daily notes should NOT be in the system prompt (on-demand via tools)
assert!(
!prompt.contains("Daily Notes"),
"daily notes should not be auto-injected"
);
assert!(
!prompt.contains("Some note"),
"daily content should not be in prompt"
);
}
#[test]
fn prompt_runtime_metadata() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "claude-sonnet-4", &[], &[], None);
assert!(prompt.contains("Model: claude-sonnet-4"));
assert!(prompt.contains(&format!("OS: {}", std::env::consts::OS)));
assert!(prompt.contains("Host:"));
}
#[test]
fn prompt_skills_compact_list() {
let ws = make_workspace();
let skills = vec![crate::skills::Skill {
name: "code-review".into(),
description: "Review code for bugs".into(),
version: "1.0.0".into(),
author: None,
tags: vec![],
tools: vec![],
prompts: vec!["Long prompt content that should NOT appear in system prompt".into()],
location: None,
}];
let prompt = build_system_prompt(ws.path(), "model", &[], &skills, None);
assert!(prompt.contains("<available_skills>"), "missing skills XML");
assert!(prompt.contains("<name>code-review</name>"));
assert!(prompt.contains("<description>Review code for bugs</description>"));
assert!(prompt.contains("SKILL.md</location>"));
assert!(
prompt.contains("loaded on demand"),
"should mention on-demand loading"
);
// Full prompt content should NOT be dumped
assert!(!prompt.contains("Long prompt content that should NOT appear"));
}
#[test]
fn prompt_truncation() {
let ws = make_workspace();
// Write a file larger than BOOTSTRAP_MAX_CHARS
let big_content = "x".repeat(BOOTSTRAP_MAX_CHARS + 1000);
std::fs::write(ws.path().join("AGENTS.md"), &big_content).unwrap();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None);
assert!(
prompt.contains("truncated at"),
"large files should be truncated"
);
assert!(
!prompt.contains(&big_content),
"full content should not appear"
);
}
#[test]
fn prompt_empty_files_skipped() {
let ws = make_workspace();
std::fs::write(ws.path().join("TOOLS.md"), "").unwrap();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None);
// Empty file should not produce a header
assert!(
!prompt.contains("### TOOLS.md"),
"empty files should be skipped"
);
}
#[test]
fn prompt_workspace_path() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None);
assert!(prompt.contains(&format!("Working directory: `{}`", ws.path().display())));
}
#[test]
fn conversation_memory_key_uses_message_id() {
let msg = traits::ChannelMessage {
id: "msg_abc123".into(),
sender: "U123".into(),
content: "hello".into(),
channel: "slack".into(),
timestamp: 1,
};
assert_eq!(conversation_memory_key(&msg), "slack_U123_msg_abc123");
}
#[test]
fn conversation_memory_key_is_unique_per_message() {
let msg1 = traits::ChannelMessage {
id: "msg_1".into(),
sender: "U123".into(),
content: "first".into(),
channel: "slack".into(),
timestamp: 1,
};
let msg2 = traits::ChannelMessage {
id: "msg_2".into(),
sender: "U123".into(),
content: "second".into(),
channel: "slack".into(),
timestamp: 2,
};
assert_ne!(
conversation_memory_key(&msg1),
conversation_memory_key(&msg2)
);
}
#[tokio::test]
async fn autosave_keys_preserve_multiple_conversation_facts() {
let tmp = TempDir::new().unwrap();
let mem = SqliteMemory::new(tmp.path()).unwrap();
let msg1 = traits::ChannelMessage {
id: "msg_1".into(),
sender: "U123".into(),
content: "I'm Paul".into(),
channel: "slack".into(),
timestamp: 1,
};
let msg2 = traits::ChannelMessage {
id: "msg_2".into(),
sender: "U123".into(),
content: "I'm 45".into(),
channel: "slack".into(),
timestamp: 2,
};
mem.store(
&conversation_memory_key(&msg1),
&msg1.content,
MemoryCategory::Conversation,
)
.await
.unwrap();
mem.store(
&conversation_memory_key(&msg2),
&msg2.content,
MemoryCategory::Conversation,
)
.await
.unwrap();
assert_eq!(mem.count().await.unwrap(), 2);
let recalled = mem.recall("45", 5).await.unwrap();
assert!(recalled.iter().any(|entry| entry.content.contains("45")));
}
#[tokio::test]
async fn build_memory_context_includes_recalled_entries() {
let tmp = TempDir::new().unwrap();
let mem = SqliteMemory::new(tmp.path()).unwrap();
mem.store("age_fact", "Age is 45", MemoryCategory::Conversation)
.await
.unwrap();
let context = build_memory_context(&mem, "age").await;
assert!(context.contains("[Memory context]"));
assert!(context.contains("Age is 45"));
}
// ── AIEOS Identity Tests (Issue #168) ─────────────────────────
#[test]
fn aieos_identity_from_file() {
use crate::config::IdentityConfig;
use tempfile::TempDir;
let tmp = TempDir::new().unwrap();
let identity_path = tmp.path().join("aieos_identity.json");
// Write AIEOS identity file
let aieos_json = r#"{
"identity": {
"names": {"first": "Nova", "nickname": "Nov"},
"bio": "A helpful AI assistant.",
"origin": "Silicon Valley"
},
"psychology": {
"mbti": "INTJ",
"moral_compass": ["Be helpful", "Do no harm"]
},
"linguistics": {
"style": "concise",
"formality": "casual"
}
}"#;
std::fs::write(&identity_path, aieos_json).unwrap();
// Create identity config pointing to the file
let config = IdentityConfig {
format: "aieos".into(),
aieos_path: Some("aieos_identity.json".into()),
aieos_inline: None,
};
let prompt = build_system_prompt(tmp.path(), "model", &[], &[], Some(&config));
// Should contain AIEOS sections
assert!(prompt.contains("## Identity"));
assert!(prompt.contains("**Name:** Nova"));
assert!(prompt.contains("**Nickname:** Nov"));
assert!(prompt.contains("**Bio:** A helpful AI assistant."));
assert!(prompt.contains("**Origin:** Silicon Valley"));
assert!(prompt.contains("## Personality"));
assert!(prompt.contains("**MBTI:** INTJ"));
assert!(prompt.contains("**Moral Compass:**"));
assert!(prompt.contains("- Be helpful"));
assert!(prompt.contains("## Communication Style"));
assert!(prompt.contains("**Style:** concise"));
assert!(prompt.contains("**Formality Level:** casual"));
// Should NOT contain OpenClaw bootstrap file headers
assert!(!prompt.contains("### SOUL.md"));
assert!(!prompt.contains("### IDENTITY.md"));
assert!(!prompt.contains("[File not found"));
}
#[test]
fn aieos_identity_from_inline() {
use crate::config::IdentityConfig;
let config = IdentityConfig {
format: "aieos".into(),
aieos_path: None,
aieos_inline: Some(r#"{"identity":{"names":{"first":"Claw"}}}"#.into()),
};
let prompt = build_system_prompt(
std::env::temp_dir().as_path(),
"model",
&[],
&[],
Some(&config),
);
assert!(prompt.contains("**Name:** Claw"));
assert!(prompt.contains("## Identity"));
}
#[test]
fn aieos_fallback_to_openclaw_on_parse_error() {
use crate::config::IdentityConfig;
let config = IdentityConfig {
format: "aieos".into(),
aieos_path: Some("nonexistent.json".into()),
aieos_inline: None,
};
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], Some(&config));
// Should fall back to OpenClaw format when AIEOS file is not found
// (Error is logged to stderr with filename, not included in prompt)
assert!(prompt.contains("### SOUL.md"));
}
#[test]
fn aieos_empty_uses_openclaw() {
use crate::config::IdentityConfig;
// Format is "aieos" but neither path nor inline is set
let config = IdentityConfig {
format: "aieos".into(),
aieos_path: None,
aieos_inline: None,
};
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], Some(&config));
// Should use OpenClaw format (not configured for AIEOS)
assert!(prompt.contains("### SOUL.md"));
assert!(prompt.contains("Be helpful"));
}
#[test]
fn openclaw_format_uses_bootstrap_files() {
use crate::config::IdentityConfig;
let config = IdentityConfig {
format: "openclaw".into(),
aieos_path: Some("identity.json".into()),
aieos_inline: None,
};
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], Some(&config));
// Should use OpenClaw format even if aieos_path is set
assert!(prompt.contains("### SOUL.md"));
assert!(prompt.contains("Be helpful"));
assert!(!prompt.contains("## Identity"));
}
#[test]
fn none_identity_config_uses_openclaw() {
let ws = make_workspace();
// Pass None for identity config
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None);
// Should use OpenClaw format
assert!(prompt.contains("### SOUL.md"));
assert!(prompt.contains("Be helpful"));
}
#[test]
fn classify_health_ok_true() {
let state = classify_health_result(&Ok(true));
assert_eq!(state, ChannelHealthState::Healthy);
}
#[test]
fn classify_health_ok_false() {
let state = classify_health_result(&Ok(false));
assert_eq!(state, ChannelHealthState::Unhealthy);
}
#[tokio::test]
async fn classify_health_timeout() {
let result = tokio::time::timeout(Duration::from_millis(1), async {
tokio::time::sleep(Duration::from_millis(20)).await;
true
})
.await;
let state = classify_health_result(&result);
assert_eq!(state, ChannelHealthState::Timeout);
}
struct AlwaysFailChannel {
name: &'static str,
calls: Arc<AtomicUsize>,
}
#[async_trait::async_trait]
impl Channel for AlwaysFailChannel {
fn name(&self) -> &str {
self.name
}
async fn send(&self, _message: &str, _recipient: &str) -> anyhow::Result<()> {
Ok(())
}
async fn listen(
&self,
_tx: tokio::sync::mpsc::Sender<traits::ChannelMessage>,
) -> anyhow::Result<()> {
self.calls.fetch_add(1, Ordering::SeqCst);
anyhow::bail!("listen boom")
}
}
#[tokio::test]
async fn supervised_listener_marks_error_and_restarts_on_failures() {
let calls = Arc::new(AtomicUsize::new(0));
let channel: Arc<dyn Channel> = Arc::new(AlwaysFailChannel {
name: "test-supervised-fail",
calls: Arc::clone(&calls),
});
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(1);
let handle = spawn_supervised_listener(channel, tx, 1, 1);
tokio::time::sleep(Duration::from_millis(80)).await;
drop(rx);
handle.abort();
let _ = handle.await;
let snapshot = crate::health::snapshot_json();
let component = &snapshot["components"]["channel:test-supervised-fail"];
assert_eq!(component["status"], "error");
assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
assert!(component["last_error"]
.as_str()
.unwrap_or("")
.contains("listen boom"));
assert!(calls.load(Ordering::SeqCst) >= 1);
}
}