zeroclaw/src/channels/mod.rs
argenis de la rosa 1862c18d10 fix: address PR #37 review issues
- Add missing EmailConfig struct with serde derives and defaults
- Register email_channel module in mod.rs with exports
- Fix IMAP tag reuse (RFC 3501 violation) using incrementing counter
- Fix email sender validation logic (clearer domain vs full email matching)
- Fix mail_parser API usage (MessageParser::default().parse())
- Fix WhatsApp allowlist matching (normalize phone numbers)
- Fix WhatsApp health_check (don't treat 404 as healthy)
- Fix WhatsApp listen() to keep task alive (prevent channel bus closing)
- Add missing dependencies: lettre, mail-parser, rustls-pki-types, tokio-rustls, webpki-roots
- Remove unused imports

All 665 tests pass.
2026-02-14 14:39:43 -05:00

782 lines
26 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

pub mod cli;
pub mod discord;
pub mod email_channel;
pub mod imessage;
pub mod matrix;
pub mod slack;
pub mod telegram;
pub mod whatsapp;
pub mod traits;
pub use cli::CliChannel;
pub use discord::DiscordChannel;
pub use imessage::IMessageChannel;
pub use matrix::MatrixChannel;
pub use slack::SlackChannel;
pub use telegram::TelegramChannel;
pub use traits::Channel;
use crate::config::Config;
use crate::memory::{self, Memory};
use crate::providers::{self, Provider};
use anyhow::Result;
use std::sync::Arc;
use std::time::Duration;
/// Maximum characters per injected workspace file (matches `OpenClaw` default).
const BOOTSTRAP_MAX_CHARS: usize = 20_000;
/// Load workspace identity files and build a system prompt.
///
/// Follows the `OpenClaw` framework structure:
/// 1. Tooling — tool list + descriptions
/// 2. Safety — guardrail reminder
/// 3. Skills — compact list with paths (loaded on-demand)
/// 4. Workspace — working directory
/// 5. Bootstrap files — AGENTS, SOUL, TOOLS, IDENTITY, USER, HEARTBEAT, BOOTSTRAP, MEMORY
/// 6. Date & Time — timezone for cache stability
/// 7. Runtime — host, OS, model
///
/// Daily memory files (`memory/*.md`) are NOT injected — they are accessed
/// on-demand via `memory_recall` / `memory_search` tools.
pub fn build_system_prompt(
workspace_dir: &std::path::Path,
model_name: &str,
tools: &[(&str, &str)],
skills: &[crate::skills::Skill],
) -> String {
use std::fmt::Write;
let mut prompt = String::with_capacity(8192);
// ── 1. Tooling ──────────────────────────────────────────────
if !tools.is_empty() {
prompt.push_str("## Tools\n\n");
prompt.push_str("You have access to the following tools:\n\n");
for (name, desc) in tools {
let _ = writeln!(prompt, "- **{name}**: {desc}");
}
prompt.push('\n');
}
// ── 2. Safety ───────────────────────────────────────────────
prompt.push_str("## Safety\n\n");
prompt.push_str(
"- Do not exfiltrate private data.\n\
- Do not run destructive commands without asking.\n\
- Do not bypass oversight or approval mechanisms.\n\
- Prefer `trash` over `rm` (recoverable beats gone forever).\n\
- When in doubt, ask before acting externally.\n\n",
);
// ── 3. Skills (compact list — load on-demand) ───────────────
if !skills.is_empty() {
prompt.push_str("## Available Skills\n\n");
prompt.push_str(
"Skills are loaded on demand. Use `read` on the skill path to get full instructions.\n\n",
);
prompt.push_str("<available_skills>\n");
for skill in skills {
let _ = writeln!(prompt, " <skill>");
let _ = writeln!(prompt, " <name>{}</name>", skill.name);
let _ = writeln!(
prompt,
" <description>{}</description>",
skill.description
);
let location = workspace_dir
.join("skills")
.join(&skill.name)
.join("SKILL.md");
let _ = writeln!(prompt, " <location>{}</location>", location.display());
let _ = writeln!(prompt, " </skill>");
}
prompt.push_str("</available_skills>\n\n");
}
// ── 4. Workspace ────────────────────────────────────────────
let _ = writeln!(
prompt,
"## Workspace\n\nWorking directory: `{}`\n",
workspace_dir.display()
);
// ── 5. Bootstrap files (injected into context) ──────────────
prompt.push_str("## Project Context\n\n");
prompt
.push_str("The following workspace files define your identity, behavior, and context.\n\n");
let bootstrap_files = [
"AGENTS.md",
"SOUL.md",
"TOOLS.md",
"IDENTITY.md",
"USER.md",
"HEARTBEAT.md",
];
for filename in &bootstrap_files {
inject_workspace_file(&mut prompt, workspace_dir, filename);
}
// BOOTSTRAP.md — only if it exists (first-run ritual)
let bootstrap_path = workspace_dir.join("BOOTSTRAP.md");
if bootstrap_path.exists() {
inject_workspace_file(&mut prompt, workspace_dir, "BOOTSTRAP.md");
}
// MEMORY.md — curated long-term memory (main session only)
inject_workspace_file(&mut prompt, workspace_dir, "MEMORY.md");
// ── 6. Date & Time ──────────────────────────────────────────
let now = chrono::Local::now();
let tz = now.format("%Z").to_string();
let _ = writeln!(prompt, "## Current Date & Time\n\nTimezone: {tz}\n");
// ── 7. Runtime ──────────────────────────────────────────────
let host =
hostname::get().map_or_else(|_| "unknown".into(), |h| h.to_string_lossy().to_string());
let _ = writeln!(
prompt,
"## Runtime\n\nHost: {host} | OS: {} | Model: {model_name}\n",
std::env::consts::OS,
);
if prompt.is_empty() {
"You are ZeroClaw, a fast and efficient AI assistant built in Rust. Be helpful, concise, and direct.".to_string()
} else {
prompt
}
}
/// Inject a single workspace file into the prompt with truncation and missing-file markers.
fn inject_workspace_file(prompt: &mut String, workspace_dir: &std::path::Path, filename: &str) {
use std::fmt::Write;
let path = workspace_dir.join(filename);
match std::fs::read_to_string(&path) {
Ok(content) => {
let trimmed = content.trim();
if trimmed.is_empty() {
return;
}
let _ = writeln!(prompt, "### {filename}\n");
if trimmed.len() > BOOTSTRAP_MAX_CHARS {
prompt.push_str(&trimmed[..BOOTSTRAP_MAX_CHARS]);
let _ = writeln!(
prompt,
"\n\n[... truncated at {BOOTSTRAP_MAX_CHARS} chars — use `read` for full file]\n"
);
} else {
prompt.push_str(trimmed);
prompt.push_str("\n\n");
}
}
Err(_) => {
// Missing-file marker (matches OpenClaw behavior)
let _ = writeln!(prompt, "### {filename}\n\n[File not found: {filename}]\n");
}
}
}
pub fn handle_command(command: super::ChannelCommands, config: &Config) -> Result<()> {
match command {
super::ChannelCommands::Start => {
// Handled in main.rs (needs async), this is unreachable
unreachable!("Start is handled in main.rs")
}
super::ChannelCommands::Doctor => {
// Handled in main.rs (needs async), this is unreachable
unreachable!("Doctor is handled in main.rs")
}
super::ChannelCommands::List => {
println!("Channels:");
println!(" ✅ CLI (always available)");
for (name, configured) in [
("Telegram", config.channels_config.telegram.is_some()),
("Discord", config.channels_config.discord.is_some()),
("Slack", config.channels_config.slack.is_some()),
("Webhook", config.channels_config.webhook.is_some()),
("iMessage", config.channels_config.imessage.is_some()),
("Matrix", config.channels_config.matrix.is_some()),
] {
println!(" {} {name}", if configured { "" } else { "" });
}
println!("\nTo start channels: zeroclaw channel start");
println!("To check health: zeroclaw channel doctor");
println!("To configure: zeroclaw onboard");
Ok(())
}
super::ChannelCommands::Add {
channel_type,
config: _,
} => {
anyhow::bail!(
"Channel type '{channel_type}' — use `zeroclaw onboard` to configure channels"
);
}
super::ChannelCommands::Remove { name } => {
anyhow::bail!("Remove channel '{name}' — edit ~/.zeroclaw/config.toml directly");
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ChannelHealthState {
Healthy,
Unhealthy,
Timeout,
}
fn classify_health_result(
result: &std::result::Result<bool, tokio::time::error::Elapsed>,
) -> ChannelHealthState {
match result {
Ok(true) => ChannelHealthState::Healthy,
Ok(false) => ChannelHealthState::Unhealthy,
Err(_) => ChannelHealthState::Timeout,
}
}
/// Run health checks for configured channels.
pub async fn doctor_channels(config: Config) -> Result<()> {
let mut channels: Vec<(&'static str, Arc<dyn Channel>)> = Vec::new();
if let Some(ref tg) = config.channels_config.telegram {
channels.push((
"Telegram",
Arc::new(TelegramChannel::new(
tg.bot_token.clone(),
tg.allowed_users.clone(),
)),
));
}
if let Some(ref dc) = config.channels_config.discord {
channels.push((
"Discord",
Arc::new(DiscordChannel::new(
dc.bot_token.clone(),
dc.guild_id.clone(),
dc.allowed_users.clone(),
)),
));
}
if let Some(ref sl) = config.channels_config.slack {
channels.push((
"Slack",
Arc::new(SlackChannel::new(
sl.bot_token.clone(),
sl.channel_id.clone(),
sl.allowed_users.clone(),
)),
));
}
if let Some(ref im) = config.channels_config.imessage {
channels.push((
"iMessage",
Arc::new(IMessageChannel::new(im.allowed_contacts.clone())),
));
}
if let Some(ref mx) = config.channels_config.matrix {
channels.push((
"Matrix",
Arc::new(MatrixChannel::new(
mx.homeserver.clone(),
mx.access_token.clone(),
mx.room_id.clone(),
mx.allowed_users.clone(),
)),
));
}
if channels.is_empty() {
println!("No real-time channels configured. Run `zeroclaw onboard` first.");
return Ok(());
}
println!("🩺 ZeroClaw Channel Doctor");
println!();
let mut healthy = 0_u32;
let mut unhealthy = 0_u32;
let mut timeout = 0_u32;
for (name, channel) in channels {
let result = tokio::time::timeout(Duration::from_secs(10), channel.health_check()).await;
let state = classify_health_result(&result);
match state {
ChannelHealthState::Healthy => {
healthy += 1;
println!("{name:<9} healthy");
}
ChannelHealthState::Unhealthy => {
unhealthy += 1;
println!("{name:<9} unhealthy (auth/config/network)");
}
ChannelHealthState::Timeout => {
timeout += 1;
println!(" ⏱️ {name:<9} timed out (>10s)");
}
}
}
if config.channels_config.webhook.is_some() {
println!(" Webhook check via `zeroclaw gateway` then GET /health");
}
println!();
println!("Summary: {healthy} healthy, {unhealthy} unhealthy, {timeout} timed out");
Ok(())
}
/// Start all configured channels and route messages to the agent
#[allow(clippy::too_many_lines)]
pub async fn start_channels(config: Config) -> Result<()> {
let provider: Arc<dyn Provider> = Arc::from(providers::create_provider(
config.default_provider.as_deref().unwrap_or("openrouter"),
config.api_key.as_deref(),
)?);
let model = config
.default_model
.clone()
.unwrap_or_else(|| "anthropic/claude-sonnet-4-20250514".into());
let temperature = config.default_temperature;
let mem: Arc<dyn Memory> = Arc::from(memory::create_memory(
&config.memory,
&config.workspace_dir,
config.api_key.as_deref(),
)?);
// Build system prompt from workspace identity files + skills
let workspace = config.workspace_dir.clone();
let skills = crate::skills::load_skills(&workspace);
// Collect tool descriptions for the prompt
let mut tool_descs: Vec<(&str, &str)> = vec![
("shell", "Execute terminal commands"),
("file_read", "Read file contents"),
("file_write", "Write file contents"),
("memory_store", "Save to memory"),
("memory_recall", "Search memory"),
("memory_forget", "Delete a memory entry"),
];
if config.browser.enabled {
tool_descs.push((
"browser_open",
"Open approved HTTPS URLs in Brave Browser (allowlist-only, no scraping)",
));
}
let system_prompt = build_system_prompt(&workspace, &model, &tool_descs, &skills);
if !skills.is_empty() {
println!(
" 🧩 Skills: {}",
skills
.iter()
.map(|s| s.name.as_str())
.collect::<Vec<_>>()
.join(", ")
);
}
// Collect active channels
let mut channels: Vec<Arc<dyn Channel>> = Vec::new();
if let Some(ref tg) = config.channels_config.telegram {
channels.push(Arc::new(TelegramChannel::new(
tg.bot_token.clone(),
tg.allowed_users.clone(),
)));
}
if let Some(ref dc) = config.channels_config.discord {
channels.push(Arc::new(DiscordChannel::new(
dc.bot_token.clone(),
dc.guild_id.clone(),
dc.allowed_users.clone(),
)));
}
if let Some(ref sl) = config.channels_config.slack {
channels.push(Arc::new(SlackChannel::new(
sl.bot_token.clone(),
sl.channel_id.clone(),
sl.allowed_users.clone(),
)));
}
if let Some(ref im) = config.channels_config.imessage {
channels.push(Arc::new(IMessageChannel::new(im.allowed_contacts.clone())));
}
if let Some(ref mx) = config.channels_config.matrix {
channels.push(Arc::new(MatrixChannel::new(
mx.homeserver.clone(),
mx.access_token.clone(),
mx.room_id.clone(),
mx.allowed_users.clone(),
)));
}
if channels.is_empty() {
println!("No channels configured. Run `zeroclaw onboard` to set up channels.");
return Ok(());
}
println!("🦀 ZeroClaw Channel Server");
println!(" 🤖 Model: {model}");
println!(
" 🧠 Memory: {} (auto-save: {})",
config.memory.backend,
if config.memory.auto_save { "on" } else { "off" }
);
println!(
" 📡 Channels: {}",
channels
.iter()
.map(|c| c.name())
.collect::<Vec<_>>()
.join(", ")
);
println!();
println!(" Listening for messages... (Ctrl+C to stop)");
println!();
// Single message bus — all channels send messages here
let (tx, mut rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(100);
// Spawn a listener for each channel
let mut handles = Vec::new();
for ch in &channels {
let ch = ch.clone();
let tx = tx.clone();
handles.push(tokio::spawn(async move {
if let Err(e) = ch.listen(tx).await {
tracing::error!("Channel {} error: {e}", ch.name());
}
}));
}
drop(tx); // Drop our copy so rx closes when all channels stop
// Process incoming messages — call the LLM and reply
while let Some(msg) = rx.recv().await {
println!(
" 💬 [{}] from {}: {}",
msg.channel,
msg.sender,
if msg.content.len() > 80 {
format!("{}...", &msg.content[..80])
} else {
msg.content.clone()
}
);
// Auto-save to memory
if config.memory.auto_save {
let _ = mem
.store(
&format!("{}_{}", msg.channel, msg.sender),
&msg.content,
crate::memory::MemoryCategory::Conversation,
)
.await;
}
// Call the LLM with system prompt (identity + soul + tools)
match provider
.chat_with_system(Some(&system_prompt), &msg.content, &model, temperature)
.await
{
Ok(response) => {
println!(
" 🤖 Reply: {}",
if response.len() > 80 {
format!("{}...", &response[..80])
} else {
response.clone()
}
);
// Find the channel that sent this message and reply
for ch in &channels {
if ch.name() == msg.channel {
if let Err(e) = ch.send(&response, &msg.sender).await {
eprintln!(" ❌ Failed to reply on {}: {e}", ch.name());
}
break;
}
}
}
Err(e) => {
eprintln!(" ❌ LLM error: {e}");
for ch in &channels {
if ch.name() == msg.channel {
let _ = ch.send(&format!("⚠️ Error: {e}"), &msg.sender).await;
break;
}
}
}
}
}
// Wait for all channel tasks
for h in handles {
let _ = h.await;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn make_workspace() -> TempDir {
let tmp = TempDir::new().unwrap();
// Create minimal workspace files
std::fs::write(tmp.path().join("SOUL.md"), "# Soul\nBe helpful.").unwrap();
std::fs::write(tmp.path().join("IDENTITY.md"), "# Identity\nName: ZeroClaw").unwrap();
std::fs::write(tmp.path().join("USER.md"), "# User\nName: Test User").unwrap();
std::fs::write(
tmp.path().join("AGENTS.md"),
"# Agents\nFollow instructions.",
)
.unwrap();
std::fs::write(tmp.path().join("TOOLS.md"), "# Tools\nUse shell carefully.").unwrap();
std::fs::write(
tmp.path().join("HEARTBEAT.md"),
"# Heartbeat\nCheck status.",
)
.unwrap();
std::fs::write(tmp.path().join("MEMORY.md"), "# Memory\nUser likes Rust.").unwrap();
tmp
}
#[test]
fn prompt_contains_all_sections() {
let ws = make_workspace();
let tools = vec![("shell", "Run commands"), ("file_read", "Read files")];
let prompt = build_system_prompt(ws.path(), "test-model", &tools, &[]);
// Section headers
assert!(prompt.contains("## Tools"), "missing Tools section");
assert!(prompt.contains("## Safety"), "missing Safety section");
assert!(prompt.contains("## Workspace"), "missing Workspace section");
assert!(
prompt.contains("## Project Context"),
"missing Project Context"
);
assert!(
prompt.contains("## Current Date & Time"),
"missing Date/Time"
);
assert!(prompt.contains("## Runtime"), "missing Runtime section");
}
#[test]
fn prompt_injects_tools() {
let ws = make_workspace();
let tools = vec![
("shell", "Run commands"),
("memory_recall", "Search memory"),
];
let prompt = build_system_prompt(ws.path(), "gpt-4o", &tools, &[]);
assert!(prompt.contains("**shell**"));
assert!(prompt.contains("Run commands"));
assert!(prompt.contains("**memory_recall**"));
}
#[test]
fn prompt_injects_safety() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[]);
assert!(prompt.contains("Do not exfiltrate private data"));
assert!(prompt.contains("Do not run destructive commands"));
assert!(prompt.contains("Prefer `trash` over `rm`"));
}
#[test]
fn prompt_injects_workspace_files() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[]);
assert!(prompt.contains("### SOUL.md"), "missing SOUL.md header");
assert!(prompt.contains("Be helpful"), "missing SOUL content");
assert!(prompt.contains("### IDENTITY.md"), "missing IDENTITY.md");
assert!(
prompt.contains("Name: ZeroClaw"),
"missing IDENTITY content"
);
assert!(prompt.contains("### USER.md"), "missing USER.md");
assert!(prompt.contains("### AGENTS.md"), "missing AGENTS.md");
assert!(prompt.contains("### TOOLS.md"), "missing TOOLS.md");
assert!(prompt.contains("### HEARTBEAT.md"), "missing HEARTBEAT.md");
assert!(prompt.contains("### MEMORY.md"), "missing MEMORY.md");
assert!(prompt.contains("User likes Rust"), "missing MEMORY content");
}
#[test]
fn prompt_missing_file_markers() {
let tmp = TempDir::new().unwrap();
// Empty workspace — no files at all
let prompt = build_system_prompt(tmp.path(), "model", &[], &[]);
assert!(prompt.contains("[File not found: SOUL.md]"));
assert!(prompt.contains("[File not found: AGENTS.md]"));
assert!(prompt.contains("[File not found: IDENTITY.md]"));
}
#[test]
fn prompt_bootstrap_only_if_exists() {
let ws = make_workspace();
// No BOOTSTRAP.md — should not appear
let prompt = build_system_prompt(ws.path(), "model", &[], &[]);
assert!(
!prompt.contains("### BOOTSTRAP.md"),
"BOOTSTRAP.md should not appear when missing"
);
// Create BOOTSTRAP.md — should appear
std::fs::write(ws.path().join("BOOTSTRAP.md"), "# Bootstrap\nFirst run.").unwrap();
let prompt2 = build_system_prompt(ws.path(), "model", &[], &[]);
assert!(
prompt2.contains("### BOOTSTRAP.md"),
"BOOTSTRAP.md should appear when present"
);
assert!(prompt2.contains("First run"));
}
#[test]
fn prompt_no_daily_memory_injection() {
let ws = make_workspace();
let memory_dir = ws.path().join("memory");
std::fs::create_dir_all(&memory_dir).unwrap();
let today = chrono::Local::now().format("%Y-%m-%d").to_string();
std::fs::write(
memory_dir.join(format!("{today}.md")),
"# Daily\nSome note.",
)
.unwrap();
let prompt = build_system_prompt(ws.path(), "model", &[], &[]);
// Daily notes should NOT be in the system prompt (on-demand via tools)
assert!(
!prompt.contains("Daily Notes"),
"daily notes should not be auto-injected"
);
assert!(
!prompt.contains("Some note"),
"daily content should not be in prompt"
);
}
#[test]
fn prompt_runtime_metadata() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "claude-sonnet-4", &[], &[]);
assert!(prompt.contains("Model: claude-sonnet-4"));
assert!(prompt.contains(&format!("OS: {}", std::env::consts::OS)));
assert!(prompt.contains("Host:"));
}
#[test]
fn prompt_skills_compact_list() {
let ws = make_workspace();
let skills = vec![crate::skills::Skill {
name: "code-review".into(),
description: "Review code for bugs".into(),
version: "1.0.0".into(),
author: None,
tags: vec![],
tools: vec![],
prompts: vec!["Long prompt content that should NOT appear in system prompt".into()],
}];
let prompt = build_system_prompt(ws.path(), "model", &[], &skills);
assert!(prompt.contains("<available_skills>"), "missing skills XML");
assert!(prompt.contains("<name>code-review</name>"));
assert!(prompt.contains("<description>Review code for bugs</description>"));
assert!(prompt.contains("SKILL.md</location>"));
assert!(
prompt.contains("loaded on demand"),
"should mention on-demand loading"
);
// Full prompt content should NOT be dumped
assert!(!prompt.contains("Long prompt content that should NOT appear"));
}
#[test]
fn prompt_truncation() {
let ws = make_workspace();
// Write a file larger than BOOTSTRAP_MAX_CHARS
let big_content = "x".repeat(BOOTSTRAP_MAX_CHARS + 1000);
std::fs::write(ws.path().join("AGENTS.md"), &big_content).unwrap();
let prompt = build_system_prompt(ws.path(), "model", &[], &[]);
assert!(
prompt.contains("truncated at"),
"large files should be truncated"
);
assert!(
!prompt.contains(&big_content),
"full content should not appear"
);
}
#[test]
fn prompt_empty_files_skipped() {
let ws = make_workspace();
std::fs::write(ws.path().join("TOOLS.md"), "").unwrap();
let prompt = build_system_prompt(ws.path(), "model", &[], &[]);
// Empty file should not produce a header
assert!(
!prompt.contains("### TOOLS.md"),
"empty files should be skipped"
);
}
#[test]
fn prompt_workspace_path() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[]);
assert!(prompt.contains(&format!("Working directory: `{}`", ws.path().display())));
}
#[test]
fn classify_health_ok_true() {
let state = classify_health_result(&Ok(true));
assert_eq!(state, ChannelHealthState::Healthy);
}
#[test]
fn classify_health_ok_false() {
let state = classify_health_result(&Ok(false));
assert_eq!(state, ChannelHealthState::Unhealthy);
}
#[tokio::test]
async fn classify_health_timeout() {
let result = tokio::time::timeout(Duration::from_millis(1), async {
tokio::time::sleep(Duration::from_millis(20)).await;
true
})
.await;
let state = classify_health_result(&result);
assert_eq!(state, ChannelHealthState::Timeout);
}
}