zeroclaw/src/daemon/mod.rs
bhagwan 55f2637cfe feat(channel): add Signal channel via signal-cli JSON-RPC daemon
Adds a new Signal messaging channel that connects to a running
signal-cli daemon's native HTTP API (JSON-RPC + SSE).

  [channels_config.signal]
  http_url = "http://127.0.0.1:8686"
  account = "+1234567890"
  group_id = "group_id"  # optional, omit for all
  allowed_from = ["+1111111111"]
  ignore_attachments = true
  ignore_stories = true

Implementation:
- SSE listener at /api/v1/events for incoming messages
- JSON-RPC sends via /api/v1/rpc (method: send)
- Health check via /api/v1/check
- Typing indicators via sendTyping RPC
- Supports DMs and group messages (room_id filtering)
- Allowlist-based sender filtering (E.164 or wildcard)
- Optional attachment/story filtering
- Fixed has_supervised_channels() to include signal + irc/lark/dingtalk

Registered in channel list, doctor, start, integrations registry, and
daemon supervisor gate. Includes unit tests for config serde, sender
filtering, room matching, envelope processing, and deserialization.

No new dependencies (uses existing uuid, futures-util, reqwest).
2026-02-17 22:35:33 +08:00

302 lines
9.8 KiB
Rust

use crate::config::Config;
use anyhow::Result;
use chrono::Utc;
use std::future::Future;
use std::path::PathBuf;
use tokio::task::JoinHandle;
use tokio::time::Duration;
const STATUS_FLUSH_SECONDS: u64 = 5;
pub async fn run(config: Config, host: String, port: u16) -> Result<()> {
let initial_backoff = config.reliability.channel_initial_backoff_secs.max(1);
let max_backoff = config
.reliability
.channel_max_backoff_secs
.max(initial_backoff);
crate::health::mark_component_ok("daemon");
if config.heartbeat.enabled {
let _ =
crate::heartbeat::engine::HeartbeatEngine::ensure_heartbeat_file(&config.workspace_dir)
.await;
}
let mut handles: Vec<JoinHandle<()>> = vec![spawn_state_writer(config.clone())];
{
let gateway_cfg = config.clone();
let gateway_host = host.clone();
handles.push(spawn_component_supervisor(
"gateway",
initial_backoff,
max_backoff,
move || {
let cfg = gateway_cfg.clone();
let host = gateway_host.clone();
async move { crate::gateway::run_gateway(&host, port, cfg).await }
},
));
}
{
if has_supervised_channels(&config) {
let channels_cfg = config.clone();
handles.push(spawn_component_supervisor(
"channels",
initial_backoff,
max_backoff,
move || {
let cfg = channels_cfg.clone();
async move { crate::channels::start_channels(cfg).await }
},
));
} else {
crate::health::mark_component_ok("channels");
tracing::info!("No real-time channels configured; channel supervisor disabled");
}
}
if config.heartbeat.enabled {
let heartbeat_cfg = config.clone();
handles.push(spawn_component_supervisor(
"heartbeat",
initial_backoff,
max_backoff,
move || {
let cfg = heartbeat_cfg.clone();
async move { run_heartbeat_worker(cfg).await }
},
));
}
if config.cron.enabled {
let scheduler_cfg = config.clone();
handles.push(spawn_component_supervisor(
"scheduler",
initial_backoff,
max_backoff,
move || {
let cfg = scheduler_cfg.clone();
async move { crate::cron::scheduler::run(cfg).await }
},
));
} else {
crate::health::mark_component_ok("scheduler");
tracing::info!("Cron disabled; scheduler supervisor not started");
}
println!("🧠 ZeroClaw daemon started");
println!(" Gateway: http://{host}:{port}");
println!(" Components: gateway, channels, heartbeat, scheduler");
println!(" Ctrl+C to stop");
tokio::signal::ctrl_c().await?;
crate::health::mark_component_error("daemon", "shutdown requested");
for handle in &handles {
handle.abort();
}
for handle in handles {
let _ = handle.await;
}
Ok(())
}
pub fn state_file_path(config: &Config) -> PathBuf {
config
.config_path
.parent()
.map_or_else(|| PathBuf::from("."), PathBuf::from)
.join("daemon_state.json")
}
fn spawn_state_writer(config: Config) -> JoinHandle<()> {
tokio::spawn(async move {
let path = state_file_path(&config);
if let Some(parent) = path.parent() {
let _ = tokio::fs::create_dir_all(parent).await;
}
let mut interval = tokio::time::interval(Duration::from_secs(STATUS_FLUSH_SECONDS));
loop {
interval.tick().await;
let mut json = crate::health::snapshot_json();
if let Some(obj) = json.as_object_mut() {
obj.insert(
"written_at".into(),
serde_json::json!(Utc::now().to_rfc3339()),
);
}
let data = serde_json::to_vec_pretty(&json).unwrap_or_else(|_| b"{}".to_vec());
let _ = tokio::fs::write(&path, data).await;
}
})
}
fn spawn_component_supervisor<F, Fut>(
name: &'static str,
initial_backoff_secs: u64,
max_backoff_secs: u64,
mut run_component: F,
) -> JoinHandle<()>
where
F: FnMut() -> Fut + Send + 'static,
Fut: Future<Output = Result<()>> + Send + 'static,
{
tokio::spawn(async move {
let mut backoff = initial_backoff_secs.max(1);
let max_backoff = max_backoff_secs.max(backoff);
loop {
crate::health::mark_component_ok(name);
match run_component().await {
Ok(()) => {
crate::health::mark_component_error(name, "component exited unexpectedly");
tracing::warn!("Daemon component '{name}' exited unexpectedly");
// Clean exit — reset backoff since the component ran successfully
backoff = initial_backoff_secs.max(1);
}
Err(e) => {
crate::health::mark_component_error(name, e.to_string());
tracing::error!("Daemon component '{name}' failed: {e}");
}
}
crate::health::bump_component_restart(name);
tokio::time::sleep(Duration::from_secs(backoff)).await;
// Double backoff AFTER sleeping so first error uses initial_backoff
backoff = backoff.saturating_mul(2).min(max_backoff);
}
})
}
async fn run_heartbeat_worker(config: Config) -> Result<()> {
let observer: std::sync::Arc<dyn crate::observability::Observer> =
std::sync::Arc::from(crate::observability::create_observer(&config.observability));
let engine = crate::heartbeat::engine::HeartbeatEngine::new(
config.heartbeat.clone(),
config.workspace_dir.clone(),
observer,
);
let interval_mins = config.heartbeat.interval_minutes.max(5);
let mut interval = tokio::time::interval(Duration::from_secs(u64::from(interval_mins) * 60));
loop {
interval.tick().await;
let tasks = engine.collect_tasks().await?;
if tasks.is_empty() {
continue;
}
for task in tasks {
let prompt = format!("[Heartbeat Task] {task}");
let temp = config.default_temperature;
if let Err(e) =
crate::agent::run(config.clone(), Some(prompt), None, None, temp, vec![]).await
{
crate::health::mark_component_error("heartbeat", e.to_string());
tracing::warn!("Heartbeat task failed: {e}");
} else {
crate::health::mark_component_ok("heartbeat");
}
}
}
}
fn has_supervised_channels(config: &Config) -> bool {
config.channels_config.telegram.is_some()
|| config.channels_config.discord.is_some()
|| config.channels_config.slack.is_some()
|| config.channels_config.imessage.is_some()
|| config.channels_config.matrix.is_some()
|| config.channels_config.signal.is_some()
|| config.channels_config.whatsapp.is_some()
|| config.channels_config.email.is_some()
|| config.channels_config.irc.is_some()
|| config.channels_config.lark.is_some()
|| config.channels_config.dingtalk.is_some()
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn test_config(tmp: &TempDir) -> Config {
let config = Config {
workspace_dir: tmp.path().join("workspace"),
config_path: tmp.path().join("config.toml"),
..Config::default()
};
std::fs::create_dir_all(&config.workspace_dir).unwrap();
config
}
#[test]
fn state_file_path_uses_config_directory() {
let tmp = TempDir::new().unwrap();
let config = test_config(&tmp);
let path = state_file_path(&config);
assert_eq!(path, tmp.path().join("daemon_state.json"));
}
#[tokio::test]
async fn supervisor_marks_error_and_restart_on_failure() {
let handle = spawn_component_supervisor("daemon-test-fail", 1, 1, || async {
anyhow::bail!("boom")
});
tokio::time::sleep(Duration::from_millis(50)).await;
handle.abort();
let _ = handle.await;
let snapshot = crate::health::snapshot_json();
let component = &snapshot["components"]["daemon-test-fail"];
assert_eq!(component["status"], "error");
assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
assert!(component["last_error"]
.as_str()
.unwrap_or("")
.contains("boom"));
}
#[tokio::test]
async fn supervisor_marks_unexpected_exit_as_error() {
let handle = spawn_component_supervisor("daemon-test-exit", 1, 1, || async { Ok(()) });
tokio::time::sleep(Duration::from_millis(50)).await;
handle.abort();
let _ = handle.await;
let snapshot = crate::health::snapshot_json();
let component = &snapshot["components"]["daemon-test-exit"];
assert_eq!(component["status"], "error");
assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
assert!(component["last_error"]
.as_str()
.unwrap_or("")
.contains("component exited unexpectedly"));
}
#[test]
fn detects_no_supervised_channels() {
let config = Config::default();
assert!(!has_supervised_channels(&config));
}
#[test]
fn detects_supervised_channels_present() {
let mut config = Config::default();
config.channels_config.telegram = Some(crate::config::TelegramConfig {
bot_token: "token".into(),
allowed_users: vec![],
});
assert!(has_supervised_channels(&config));
}
}