pub mod cli; pub mod discord; pub mod email_channel; pub mod imessage; pub mod matrix; pub mod slack; pub mod telegram; pub mod traits; pub mod whatsapp; pub use cli::CliChannel; pub use discord::DiscordChannel; pub use imessage::IMessageChannel; pub use matrix::MatrixChannel; pub use slack::SlackChannel; pub use telegram::TelegramChannel; pub use traits::Channel; pub use whatsapp::WhatsAppChannel; use crate::config::Config; use crate::memory::{self, Memory}; use crate::providers::{self, Provider}; use crate::util::truncate_with_ellipsis; use anyhow::Result; use std::sync::Arc; use std::time::Duration; /// Maximum characters per injected workspace file (matches `OpenClaw` default). const BOOTSTRAP_MAX_CHARS: usize = 20_000; const DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS: u64 = 2; const DEFAULT_CHANNEL_MAX_BACKOFF_SECS: u64 = 60; fn spawn_supervised_listener( ch: Arc, tx: tokio::sync::mpsc::Sender, initial_backoff_secs: u64, max_backoff_secs: u64, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let component = format!("channel:{}", ch.name()); let mut backoff = initial_backoff_secs.max(1); let max_backoff = max_backoff_secs.max(backoff); loop { crate::health::mark_component_ok(&component); let result = ch.listen(tx.clone()).await; if tx.is_closed() { break; } match result { Ok(()) => { tracing::warn!("Channel {} exited unexpectedly; restarting", ch.name()); crate::health::mark_component_error(&component, "listener exited unexpectedly"); } Err(e) => { tracing::error!("Channel {} error: {e}; restarting", ch.name()); crate::health::mark_component_error(&component, e.to_string()); } } crate::health::bump_component_restart(&component); tokio::time::sleep(Duration::from_secs(backoff)).await; backoff = backoff.saturating_mul(2).min(max_backoff); } }) } /// Load workspace identity files and build a system prompt. /// /// Follows the `OpenClaw` framework structure: /// 1. Tooling — tool list + descriptions /// 2. Safety — guardrail reminder /// 3. Skills — compact list with paths (loaded on-demand) /// 4. Workspace — working directory /// 5. Bootstrap files — AGENTS, SOUL, TOOLS, IDENTITY, USER, HEARTBEAT, BOOTSTRAP, MEMORY /// 6. Date & Time — timezone for cache stability /// 7. Runtime — host, OS, model /// /// Daily memory files (`memory/*.md`) are NOT injected — they are accessed /// on-demand via `memory_recall` / `memory_search` tools. pub fn build_system_prompt( workspace_dir: &std::path::Path, model_name: &str, tools: &[(&str, &str)], skills: &[crate::skills::Skill], ) -> String { use std::fmt::Write; let mut prompt = String::with_capacity(8192); // ── 1. Tooling ────────────────────────────────────────────── if !tools.is_empty() { prompt.push_str("## Tools\n\n"); prompt.push_str("You have access to the following tools:\n\n"); for (name, desc) in tools { let _ = writeln!(prompt, "- **{name}**: {desc}"); } prompt.push('\n'); } // ── 2. Safety ─────────────────────────────────────────────── prompt.push_str("## Safety\n\n"); prompt.push_str( "- Do not exfiltrate private data.\n\ - Do not run destructive commands without asking.\n\ - Do not bypass oversight or approval mechanisms.\n\ - Prefer `trash` over `rm` (recoverable beats gone forever).\n\ - When in doubt, ask before acting externally.\n\n", ); // ── 3. Skills (compact list — load on-demand) ─────────────── if !skills.is_empty() { prompt.push_str("## Available Skills\n\n"); prompt.push_str( "Skills are loaded on demand. Use `read` on the skill path to get full instructions.\n\n", ); prompt.push_str("\n"); for skill in skills { let _ = writeln!(prompt, " "); let _ = writeln!(prompt, " {}", skill.name); let _ = writeln!( prompt, " {}", skill.description ); let location = skill.location.clone().unwrap_or_else(|| { workspace_dir .join("skills") .join(&skill.name) .join("SKILL.md") }); let _ = writeln!(prompt, " {}", location.display()); let _ = writeln!(prompt, " "); } prompt.push_str("\n\n"); } // ── 4. Workspace ──────────────────────────────────────────── let _ = writeln!( prompt, "## Workspace\n\nWorking directory: `{}`\n", workspace_dir.display() ); // ── 5. Bootstrap files (injected into context) ────────────── prompt.push_str("## Project Context\n\n"); prompt .push_str("The following workspace files define your identity, behavior, and context.\n\n"); let bootstrap_files = [ "AGENTS.md", "SOUL.md", "TOOLS.md", "IDENTITY.md", "USER.md", "HEARTBEAT.md", ]; for filename in &bootstrap_files { inject_workspace_file(&mut prompt, workspace_dir, filename); } // BOOTSTRAP.md — only if it exists (first-run ritual) let bootstrap_path = workspace_dir.join("BOOTSTRAP.md"); if bootstrap_path.exists() { inject_workspace_file(&mut prompt, workspace_dir, "BOOTSTRAP.md"); } // MEMORY.md — curated long-term memory (main session only) inject_workspace_file(&mut prompt, workspace_dir, "MEMORY.md"); // ── 6. Date & Time ────────────────────────────────────────── let now = chrono::Local::now(); let tz = now.format("%Z").to_string(); let _ = writeln!(prompt, "## Current Date & Time\n\nTimezone: {tz}\n"); // ── 7. Runtime ────────────────────────────────────────────── let host = hostname::get().map_or_else(|_| "unknown".into(), |h| h.to_string_lossy().to_string()); let _ = writeln!( prompt, "## Runtime\n\nHost: {host} | OS: {} | Model: {model_name}\n", std::env::consts::OS, ); if prompt.is_empty() { "You are ZeroClaw, a fast and efficient AI assistant built in Rust. Be helpful, concise, and direct.".to_string() } else { prompt } } /// Inject a single workspace file into the prompt with truncation and missing-file markers. fn inject_workspace_file(prompt: &mut String, workspace_dir: &std::path::Path, filename: &str) { use std::fmt::Write; let path = workspace_dir.join(filename); match std::fs::read_to_string(&path) { Ok(content) => { let trimmed = content.trim(); if trimmed.is_empty() { return; } let _ = writeln!(prompt, "### {filename}\n"); if trimmed.len() > BOOTSTRAP_MAX_CHARS { prompt.push_str(&trimmed[..BOOTSTRAP_MAX_CHARS]); let _ = writeln!( prompt, "\n\n[... truncated at {BOOTSTRAP_MAX_CHARS} chars — use `read` for full file]\n" ); } else { prompt.push_str(trimmed); prompt.push_str("\n\n"); } } Err(_) => { // Missing-file marker (matches OpenClaw behavior) let _ = writeln!(prompt, "### {filename}\n\n[File not found: {filename}]\n"); } } } pub fn handle_command(command: crate::ChannelCommands, config: &Config) -> Result<()> { match command { crate::ChannelCommands::Start => { anyhow::bail!("Start must be handled in main.rs (requires async runtime)") } crate::ChannelCommands::Doctor => { anyhow::bail!("Doctor must be handled in main.rs (requires async runtime)") } crate::ChannelCommands::List => { println!("Channels:"); println!(" ✅ CLI (always available)"); for (name, configured) in [ ("Telegram", config.channels_config.telegram.is_some()), ("Discord", config.channels_config.discord.is_some()), ("Slack", config.channels_config.slack.is_some()), ("Webhook", config.channels_config.webhook.is_some()), ("iMessage", config.channels_config.imessage.is_some()), ("Matrix", config.channels_config.matrix.is_some()), ("WhatsApp", config.channels_config.whatsapp.is_some()), ] { println!(" {} {name}", if configured { "✅" } else { "❌" }); } println!("\nTo start channels: zeroclaw channel start"); println!("To check health: zeroclaw channel doctor"); println!("To configure: zeroclaw onboard"); Ok(()) } crate::ChannelCommands::Add { channel_type, config: _, } => { anyhow::bail!( "Channel type '{channel_type}' — use `zeroclaw onboard` to configure channels" ); } crate::ChannelCommands::Remove { name } => { anyhow::bail!("Remove channel '{name}' — edit ~/.zeroclaw/config.toml directly"); } } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum ChannelHealthState { Healthy, Unhealthy, Timeout, } fn classify_health_result( result: &std::result::Result, ) -> ChannelHealthState { match result { Ok(true) => ChannelHealthState::Healthy, Ok(false) => ChannelHealthState::Unhealthy, Err(_) => ChannelHealthState::Timeout, } } /// Run health checks for configured channels. pub async fn doctor_channels(config: Config) -> Result<()> { let mut channels: Vec<(&'static str, Arc)> = Vec::new(); if let Some(ref tg) = config.channels_config.telegram { channels.push(( "Telegram", Arc::new(TelegramChannel::new( tg.bot_token.clone(), tg.allowed_users.clone(), )), )); } if let Some(ref dc) = config.channels_config.discord { channels.push(( "Discord", Arc::new(DiscordChannel::new( dc.bot_token.clone(), dc.guild_id.clone(), dc.allowed_users.clone(), )), )); } if let Some(ref sl) = config.channels_config.slack { channels.push(( "Slack", Arc::new(SlackChannel::new( sl.bot_token.clone(), sl.channel_id.clone(), sl.allowed_users.clone(), )), )); } if let Some(ref im) = config.channels_config.imessage { channels.push(( "iMessage", Arc::new(IMessageChannel::new(im.allowed_contacts.clone())), )); } if let Some(ref mx) = config.channels_config.matrix { channels.push(( "Matrix", Arc::new(MatrixChannel::new( mx.homeserver.clone(), mx.access_token.clone(), mx.room_id.clone(), mx.allowed_users.clone(), )), )); } if let Some(ref wa) = config.channels_config.whatsapp { channels.push(( "WhatsApp", Arc::new(WhatsAppChannel::new( wa.access_token.clone(), wa.phone_number_id.clone(), wa.verify_token.clone(), wa.allowed_numbers.clone(), )), )); } if channels.is_empty() { println!("No real-time channels configured. Run `zeroclaw onboard` first."); return Ok(()); } println!("🩺 ZeroClaw Channel Doctor"); println!(); let mut healthy = 0_u32; let mut unhealthy = 0_u32; let mut timeout = 0_u32; for (name, channel) in channels { let result = tokio::time::timeout(Duration::from_secs(10), channel.health_check()).await; let state = classify_health_result(&result); match state { ChannelHealthState::Healthy => { healthy += 1; println!(" ✅ {name:<9} healthy"); } ChannelHealthState::Unhealthy => { unhealthy += 1; println!(" ❌ {name:<9} unhealthy (auth/config/network)"); } ChannelHealthState::Timeout => { timeout += 1; println!(" ⏱️ {name:<9} timed out (>10s)"); } } } if config.channels_config.webhook.is_some() { println!(" ℹ️ Webhook check via `zeroclaw gateway` then GET /health"); } println!(); println!("Summary: {healthy} healthy, {unhealthy} unhealthy, {timeout} timed out"); Ok(()) } /// Start all configured channels and route messages to the agent #[allow(clippy::too_many_lines)] pub async fn start_channels(config: Config) -> Result<()> { let provider: Arc = Arc::from(providers::create_resilient_provider( config.default_provider.as_deref().unwrap_or("openrouter"), config.api_key.as_deref(), &config.reliability, )?); // Warm up the provider connection pool (TLS handshake, DNS, HTTP/2 setup) // so the first real message doesn't hit a cold-start timeout. if let Err(e) = provider.warmup().await { tracing::warn!("Provider warmup failed (non-fatal): {e}"); } let model = config .default_model .clone() .unwrap_or_else(|| "anthropic/claude-sonnet-4-20250514".into()); let temperature = config.default_temperature; let mem: Arc = Arc::from(memory::create_memory( &config.memory, &config.workspace_dir, config.api_key.as_deref(), )?); // Build system prompt from workspace identity files + skills let workspace = config.workspace_dir.clone(); let skills = crate::skills::load_skills(&workspace); // Collect tool descriptions for the prompt let mut tool_descs: Vec<(&str, &str)> = vec![ ( "shell", "Execute terminal commands. Use when: running local checks, build/test commands, diagnostics. Don't use when: a safer dedicated tool exists, or command is destructive without approval.", ), ( "file_read", "Read file contents. Use when: inspecting project files, configs, logs. Don't use when: a targeted search is enough.", ), ( "file_write", "Write file contents. Use when: applying focused edits, scaffolding files, updating docs/code. Don't use when: side effects are unclear or file ownership is uncertain.", ), ( "memory_store", "Save to memory. Use when: preserving durable preferences, decisions, key context. Don't use when: information is transient/noisy/sensitive without need.", ), ( "memory_recall", "Search memory. Use when: retrieving prior decisions, user preferences, historical context. Don't use when: answer is already in current context.", ), ( "memory_forget", "Delete a memory entry. Use when: memory is incorrect/stale or explicitly requested for removal. Don't use when: impact is uncertain.", ), ]; if config.browser.enabled { tool_descs.push(( "browser_open", "Open approved HTTPS URLs in Brave Browser (allowlist-only, no scraping)", )); } let system_prompt = build_system_prompt(&workspace, &model, &tool_descs, &skills); if !skills.is_empty() { println!( " 🧩 Skills: {}", skills .iter() .map(|s| s.name.as_str()) .collect::>() .join(", ") ); } // Collect active channels let mut channels: Vec> = Vec::new(); if let Some(ref tg) = config.channels_config.telegram { channels.push(Arc::new(TelegramChannel::new( tg.bot_token.clone(), tg.allowed_users.clone(), ))); } if let Some(ref dc) = config.channels_config.discord { channels.push(Arc::new(DiscordChannel::new( dc.bot_token.clone(), dc.guild_id.clone(), dc.allowed_users.clone(), ))); } if let Some(ref sl) = config.channels_config.slack { channels.push(Arc::new(SlackChannel::new( sl.bot_token.clone(), sl.channel_id.clone(), sl.allowed_users.clone(), ))); } if let Some(ref im) = config.channels_config.imessage { channels.push(Arc::new(IMessageChannel::new(im.allowed_contacts.clone()))); } if let Some(ref mx) = config.channels_config.matrix { channels.push(Arc::new(MatrixChannel::new( mx.homeserver.clone(), mx.access_token.clone(), mx.room_id.clone(), mx.allowed_users.clone(), ))); } if let Some(ref wa) = config.channels_config.whatsapp { channels.push(Arc::new(WhatsAppChannel::new( wa.access_token.clone(), wa.phone_number_id.clone(), wa.verify_token.clone(), wa.allowed_numbers.clone(), ))); } if channels.is_empty() { println!("No channels configured. Run `zeroclaw onboard` to set up channels."); return Ok(()); } println!("🦀 ZeroClaw Channel Server"); println!(" 🤖 Model: {model}"); println!( " 🧠 Memory: {} (auto-save: {})", config.memory.backend, if config.memory.auto_save { "on" } else { "off" } ); println!( " 📡 Channels: {}", channels .iter() .map(|c| c.name()) .collect::>() .join(", ") ); println!(); println!(" Listening for messages... (Ctrl+C to stop)"); println!(); crate::health::mark_component_ok("channels"); let initial_backoff_secs = config .reliability .channel_initial_backoff_secs .max(DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS); let max_backoff_secs = config .reliability .channel_max_backoff_secs .max(DEFAULT_CHANNEL_MAX_BACKOFF_SECS); // Single message bus — all channels send messages here let (tx, mut rx) = tokio::sync::mpsc::channel::(100); // Spawn a listener for each channel let mut handles = Vec::new(); for ch in &channels { handles.push(spawn_supervised_listener( ch.clone(), tx.clone(), initial_backoff_secs, max_backoff_secs, )); } drop(tx); // Drop our copy so rx closes when all channels stop // Process incoming messages — call the LLM and reply while let Some(msg) = rx.recv().await { println!( " 💬 [{}] from {}: {}", msg.channel, msg.sender, truncate_with_ellipsis(&msg.content, 80) ); // Auto-save to memory if config.memory.auto_save { let _ = mem .store( &format!("{}_{}", msg.channel, msg.sender), &msg.content, crate::memory::MemoryCategory::Conversation, ) .await; } // Call the LLM with system prompt (identity + soul + tools) match provider .chat_with_system(Some(&system_prompt), &msg.content, &model, temperature) .await { Ok(response) => { println!( " 🤖 Reply: {}", truncate_with_ellipsis(&response, 80) ); // Find the channel that sent this message and reply for ch in &channels { if ch.name() == msg.channel { if let Err(e) = ch.send(&response, &msg.sender).await { eprintln!(" ❌ Failed to reply on {}: {e}", ch.name()); } break; } } } Err(e) => { eprintln!(" ❌ LLM error: {e}"); for ch in &channels { if ch.name() == msg.channel { let _ = ch.send(&format!("⚠️ Error: {e}"), &msg.sender).await; break; } } } } } // Wait for all channel tasks for h in handles { let _ = h.await; } Ok(()) } #[cfg(test)] mod tests { use super::*; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tempfile::TempDir; fn make_workspace() -> TempDir { let tmp = TempDir::new().unwrap(); // Create minimal workspace files std::fs::write(tmp.path().join("SOUL.md"), "# Soul\nBe helpful.").unwrap(); std::fs::write(tmp.path().join("IDENTITY.md"), "# Identity\nName: ZeroClaw").unwrap(); std::fs::write(tmp.path().join("USER.md"), "# User\nName: Test User").unwrap(); std::fs::write( tmp.path().join("AGENTS.md"), "# Agents\nFollow instructions.", ) .unwrap(); std::fs::write(tmp.path().join("TOOLS.md"), "# Tools\nUse shell carefully.").unwrap(); std::fs::write( tmp.path().join("HEARTBEAT.md"), "# Heartbeat\nCheck status.", ) .unwrap(); std::fs::write(tmp.path().join("MEMORY.md"), "# Memory\nUser likes Rust.").unwrap(); tmp } #[test] fn prompt_contains_all_sections() { let ws = make_workspace(); let tools = vec![("shell", "Run commands"), ("file_read", "Read files")]; let prompt = build_system_prompt(ws.path(), "test-model", &tools, &[]); // Section headers assert!(prompt.contains("## Tools"), "missing Tools section"); assert!(prompt.contains("## Safety"), "missing Safety section"); assert!(prompt.contains("## Workspace"), "missing Workspace section"); assert!( prompt.contains("## Project Context"), "missing Project Context" ); assert!( prompt.contains("## Current Date & Time"), "missing Date/Time" ); assert!(prompt.contains("## Runtime"), "missing Runtime section"); } #[test] fn prompt_injects_tools() { let ws = make_workspace(); let tools = vec![ ("shell", "Run commands"), ("memory_recall", "Search memory"), ]; let prompt = build_system_prompt(ws.path(), "gpt-4o", &tools, &[]); assert!(prompt.contains("**shell**")); assert!(prompt.contains("Run commands")); assert!(prompt.contains("**memory_recall**")); } #[test] fn prompt_injects_safety() { let ws = make_workspace(); let prompt = build_system_prompt(ws.path(), "model", &[], &[]); assert!(prompt.contains("Do not exfiltrate private data")); assert!(prompt.contains("Do not run destructive commands")); assert!(prompt.contains("Prefer `trash` over `rm`")); } #[test] fn prompt_injects_workspace_files() { let ws = make_workspace(); let prompt = build_system_prompt(ws.path(), "model", &[], &[]); assert!(prompt.contains("### SOUL.md"), "missing SOUL.md header"); assert!(prompt.contains("Be helpful"), "missing SOUL content"); assert!(prompt.contains("### IDENTITY.md"), "missing IDENTITY.md"); assert!( prompt.contains("Name: ZeroClaw"), "missing IDENTITY content" ); assert!(prompt.contains("### USER.md"), "missing USER.md"); assert!(prompt.contains("### AGENTS.md"), "missing AGENTS.md"); assert!(prompt.contains("### TOOLS.md"), "missing TOOLS.md"); assert!(prompt.contains("### HEARTBEAT.md"), "missing HEARTBEAT.md"); assert!(prompt.contains("### MEMORY.md"), "missing MEMORY.md"); assert!(prompt.contains("User likes Rust"), "missing MEMORY content"); } #[test] fn prompt_missing_file_markers() { let tmp = TempDir::new().unwrap(); // Empty workspace — no files at all let prompt = build_system_prompt(tmp.path(), "model", &[], &[]); assert!(prompt.contains("[File not found: SOUL.md]")); assert!(prompt.contains("[File not found: AGENTS.md]")); assert!(prompt.contains("[File not found: IDENTITY.md]")); } #[test] fn prompt_bootstrap_only_if_exists() { let ws = make_workspace(); // No BOOTSTRAP.md — should not appear let prompt = build_system_prompt(ws.path(), "model", &[], &[]); assert!( !prompt.contains("### BOOTSTRAP.md"), "BOOTSTRAP.md should not appear when missing" ); // Create BOOTSTRAP.md — should appear std::fs::write(ws.path().join("BOOTSTRAP.md"), "# Bootstrap\nFirst run.").unwrap(); let prompt2 = build_system_prompt(ws.path(), "model", &[], &[]); assert!( prompt2.contains("### BOOTSTRAP.md"), "BOOTSTRAP.md should appear when present" ); assert!(prompt2.contains("First run")); } #[test] fn prompt_no_daily_memory_injection() { let ws = make_workspace(); let memory_dir = ws.path().join("memory"); std::fs::create_dir_all(&memory_dir).unwrap(); let today = chrono::Local::now().format("%Y-%m-%d").to_string(); std::fs::write( memory_dir.join(format!("{today}.md")), "# Daily\nSome note.", ) .unwrap(); let prompt = build_system_prompt(ws.path(), "model", &[], &[]); // Daily notes should NOT be in the system prompt (on-demand via tools) assert!( !prompt.contains("Daily Notes"), "daily notes should not be auto-injected" ); assert!( !prompt.contains("Some note"), "daily content should not be in prompt" ); } #[test] fn prompt_runtime_metadata() { let ws = make_workspace(); let prompt = build_system_prompt(ws.path(), "claude-sonnet-4", &[], &[]); assert!(prompt.contains("Model: claude-sonnet-4")); assert!(prompt.contains(&format!("OS: {}", std::env::consts::OS))); assert!(prompt.contains("Host:")); } #[test] fn prompt_skills_compact_list() { let ws = make_workspace(); let skills = vec![crate::skills::Skill { name: "code-review".into(), description: "Review code for bugs".into(), version: "1.0.0".into(), author: None, tags: vec![], tools: vec![], prompts: vec!["Long prompt content that should NOT appear in system prompt".into()], location: None, }]; let prompt = build_system_prompt(ws.path(), "model", &[], &skills); assert!(prompt.contains(""), "missing skills XML"); assert!(prompt.contains("code-review")); assert!(prompt.contains("Review code for bugs")); assert!(prompt.contains("SKILL.md")); assert!( prompt.contains("loaded on demand"), "should mention on-demand loading" ); // Full prompt content should NOT be dumped assert!(!prompt.contains("Long prompt content that should NOT appear")); } #[test] fn prompt_truncation() { let ws = make_workspace(); // Write a file larger than BOOTSTRAP_MAX_CHARS let big_content = "x".repeat(BOOTSTRAP_MAX_CHARS + 1000); std::fs::write(ws.path().join("AGENTS.md"), &big_content).unwrap(); let prompt = build_system_prompt(ws.path(), "model", &[], &[]); assert!( prompt.contains("truncated at"), "large files should be truncated" ); assert!( !prompt.contains(&big_content), "full content should not appear" ); } #[test] fn prompt_empty_files_skipped() { let ws = make_workspace(); std::fs::write(ws.path().join("TOOLS.md"), "").unwrap(); let prompt = build_system_prompt(ws.path(), "model", &[], &[]); // Empty file should not produce a header assert!( !prompt.contains("### TOOLS.md"), "empty files should be skipped" ); } #[test] fn prompt_workspace_path() { let ws = make_workspace(); let prompt = build_system_prompt(ws.path(), "model", &[], &[]); assert!(prompt.contains(&format!("Working directory: `{}`", ws.path().display()))); } #[test] fn classify_health_ok_true() { let state = classify_health_result(&Ok(true)); assert_eq!(state, ChannelHealthState::Healthy); } #[test] fn classify_health_ok_false() { let state = classify_health_result(&Ok(false)); assert_eq!(state, ChannelHealthState::Unhealthy); } #[tokio::test] async fn classify_health_timeout() { let result = tokio::time::timeout(Duration::from_millis(1), async { tokio::time::sleep(Duration::from_millis(20)).await; true }) .await; let state = classify_health_result(&result); assert_eq!(state, ChannelHealthState::Timeout); } struct AlwaysFailChannel { name: &'static str, calls: Arc, } #[async_trait::async_trait] impl Channel for AlwaysFailChannel { fn name(&self) -> &str { self.name } async fn send(&self, _message: &str, _recipient: &str) -> anyhow::Result<()> { Ok(()) } async fn listen( &self, _tx: tokio::sync::mpsc::Sender, ) -> anyhow::Result<()> { self.calls.fetch_add(1, Ordering::SeqCst); anyhow::bail!("listen boom") } } #[tokio::test] async fn supervised_listener_marks_error_and_restarts_on_failures() { let calls = Arc::new(AtomicUsize::new(0)); let channel: Arc = Arc::new(AlwaysFailChannel { name: "test-supervised-fail", calls: Arc::clone(&calls), }); let (tx, rx) = tokio::sync::mpsc::channel::(1); let handle = spawn_supervised_listener(channel, tx, 1, 1); tokio::time::sleep(Duration::from_millis(80)).await; drop(rx); handle.abort(); let _ = handle.await; let snapshot = crate::health::snapshot_json(); let component = &snapshot["components"]["channel:test-supervised-fail"]; assert_eq!(component["status"], "error"); assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1); assert!(component["last_error"] .as_str() .unwrap_or("") .contains("listen boom")); assert!(calls.load(Ordering::SeqCst) >= 1); } }