use crate::config::Config; use anyhow::Result; use chrono::Utc; use std::future::Future; use std::path::PathBuf; use tokio::task::JoinHandle; use tokio::time::Duration; const STATUS_FLUSH_SECONDS: u64 = 5; pub async fn run(config: Config, host: String, port: u16) -> Result<()> { let initial_backoff = config.reliability.channel_initial_backoff_secs.max(1); let max_backoff = config .reliability .channel_max_backoff_secs .max(initial_backoff); crate::health::mark_component_ok("daemon"); if config.heartbeat.enabled { let _ = crate::heartbeat::engine::HeartbeatEngine::ensure_heartbeat_file(&config.workspace_dir) .await; } let mut handles: Vec> = vec![spawn_state_writer(config.clone())]; { let gateway_cfg = config.clone(); let gateway_host = host.clone(); handles.push(spawn_component_supervisor( "gateway", initial_backoff, max_backoff, move || { let cfg = gateway_cfg.clone(); let host = gateway_host.clone(); async move { crate::gateway::run_gateway(&host, port, cfg).await } }, )); } { if has_supervised_channels(&config) { let channels_cfg = config.clone(); handles.push(spawn_component_supervisor( "channels", initial_backoff, max_backoff, move || { let cfg = channels_cfg.clone(); async move { crate::channels::start_channels(cfg).await } }, )); } else { crate::health::mark_component_ok("channels"); tracing::info!("No real-time channels configured; channel supervisor disabled"); } } if config.heartbeat.enabled { let heartbeat_cfg = config.clone(); handles.push(spawn_component_supervisor( "heartbeat", initial_backoff, max_backoff, move || { let cfg = heartbeat_cfg.clone(); async move { run_heartbeat_worker(cfg).await } }, )); } { let scheduler_cfg = config.clone(); handles.push(spawn_component_supervisor( "scheduler", initial_backoff, max_backoff, move || { let cfg = scheduler_cfg.clone(); async move { crate::cron::scheduler::run(cfg).await } }, )); } println!("🧠 ZeroClaw daemon started"); println!(" Gateway: http://{host}:{port}"); println!(" Components: gateway, channels, heartbeat, scheduler"); println!(" Ctrl+C to stop"); tokio::signal::ctrl_c().await?; crate::health::mark_component_error("daemon", "shutdown requested"); for handle in &handles { handle.abort(); } for handle in handles { let _ = handle.await; } Ok(()) } pub fn state_file_path(config: &Config) -> PathBuf { config .config_path .parent() .map_or_else(|| PathBuf::from("."), PathBuf::from) .join("daemon_state.json") } fn spawn_state_writer(config: Config) -> JoinHandle<()> { tokio::spawn(async move { let path = state_file_path(&config); if let Some(parent) = path.parent() { let _ = tokio::fs::create_dir_all(parent).await; } let mut interval = tokio::time::interval(Duration::from_secs(STATUS_FLUSH_SECONDS)); loop { interval.tick().await; let mut json = crate::health::snapshot_json(); if let Some(obj) = json.as_object_mut() { obj.insert( "written_at".into(), serde_json::json!(Utc::now().to_rfc3339()), ); } let data = serde_json::to_vec_pretty(&json).unwrap_or_else(|_| b"{}".to_vec()); let _ = tokio::fs::write(&path, data).await; } }) } fn spawn_component_supervisor( name: &'static str, initial_backoff_secs: u64, max_backoff_secs: u64, mut run_component: F, ) -> JoinHandle<()> where F: FnMut() -> Fut + Send + 'static, Fut: Future> + Send + 'static, { tokio::spawn(async move { let mut backoff = initial_backoff_secs.max(1); let max_backoff = max_backoff_secs.max(backoff); loop { crate::health::mark_component_ok(name); match run_component().await { Ok(()) => { crate::health::mark_component_error(name, "component exited unexpectedly"); tracing::warn!("Daemon component '{name}' exited unexpectedly"); // Clean exit — reset backoff since the component ran successfully backoff = initial_backoff_secs.max(1); } Err(e) => { crate::health::mark_component_error(name, e.to_string()); tracing::error!("Daemon component '{name}' failed: {e}"); } } crate::health::bump_component_restart(name); tokio::time::sleep(Duration::from_secs(backoff)).await; // Double backoff AFTER sleeping so first error uses initial_backoff backoff = backoff.saturating_mul(2).min(max_backoff); } }) } async fn run_heartbeat_worker(config: Config) -> Result<()> { let observer: std::sync::Arc = std::sync::Arc::from(crate::observability::create_observer(&config.observability)); let engine = crate::heartbeat::engine::HeartbeatEngine::new( config.heartbeat.clone(), config.workspace_dir.clone(), observer, ); let interval_mins = config.heartbeat.interval_minutes.max(5); let mut interval = tokio::time::interval(Duration::from_secs(u64::from(interval_mins) * 60)); loop { interval.tick().await; let tasks = engine.collect_tasks().await?; if tasks.is_empty() { continue; } for task in tasks { let prompt = format!("[Heartbeat Task] {task}"); let temp = config.default_temperature; if let Err(e) = crate::agent::run(config.clone(), Some(prompt), None, None, temp, vec![]).await { crate::health::mark_component_error("heartbeat", e.to_string()); tracing::warn!("Heartbeat task failed: {e}"); } else { crate::health::mark_component_ok("heartbeat"); } } } } fn has_supervised_channels(config: &Config) -> bool { config.channels_config.telegram.is_some() || config.channels_config.discord.is_some() || config.channels_config.slack.is_some() || config.channels_config.imessage.is_some() || config.channels_config.matrix.is_some() || config.channels_config.whatsapp.is_some() || config.channels_config.email.is_some() } #[cfg(test)] mod tests { use super::*; use tempfile::TempDir; fn test_config(tmp: &TempDir) -> Config { let config = Config { workspace_dir: tmp.path().join("workspace"), config_path: tmp.path().join("config.toml"), ..Config::default() }; std::fs::create_dir_all(&config.workspace_dir).unwrap(); config } #[test] fn state_file_path_uses_config_directory() { let tmp = TempDir::new().unwrap(); let config = test_config(&tmp); let path = state_file_path(&config); assert_eq!(path, tmp.path().join("daemon_state.json")); } #[tokio::test] async fn supervisor_marks_error_and_restart_on_failure() { let handle = spawn_component_supervisor("daemon-test-fail", 1, 1, || async { anyhow::bail!("boom") }); tokio::time::sleep(Duration::from_millis(50)).await; handle.abort(); let _ = handle.await; let snapshot = crate::health::snapshot_json(); let component = &snapshot["components"]["daemon-test-fail"]; assert_eq!(component["status"], "error"); assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1); assert!(component["last_error"] .as_str() .unwrap_or("") .contains("boom")); } #[tokio::test] async fn supervisor_marks_unexpected_exit_as_error() { let handle = spawn_component_supervisor("daemon-test-exit", 1, 1, || async { Ok(()) }); tokio::time::sleep(Duration::from_millis(50)).await; handle.abort(); let _ = handle.await; let snapshot = crate::health::snapshot_json(); let component = &snapshot["components"]["daemon-test-exit"]; assert_eq!(component["status"], "error"); assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1); assert!(component["last_error"] .as_str() .unwrap_or("") .contains("component exited unexpectedly")); } #[test] fn detects_no_supervised_channels() { let config = Config::default(); assert!(!has_supervised_channels(&config)); } #[test] fn detects_supervised_channels_present() { let mut config = Config::default(); config.channels_config.telegram = Some(crate::config::TelegramConfig { bot_token: "token".into(), allowed_users: vec![], }); assert!(has_supervised_channels(&config)); } }