From ec2d5cc93d95c387c7c99ed2aa1c6d3bddf88858 Mon Sep 17 00:00:00 2001
From: argenis de la rosa
Date: Sat, 14 Feb 2026 11:28:39 -0500
Subject: [PATCH 1/9] feat: enhance agent personality, tool guidance, and
memory hygiene
- Expand communication style presets (professional, expressive, custom)
- Enrich SOUL.md with human-like tone and emoji-awareness guidance
- Add crash recovery and sub-task scoping guidance to AGENTS.md scaffold
- Add 'Use when / Don't use when' guidance to TOOLS.md and runtime prompts
- Implement memory hygiene system with configurable archiving and retention
- Add MemoryConfig options: hygiene_enabled, archive_after_days, purge_after_days, conversation_retention_days
- Archive old daily memory and session files to archive subdirectories
- Purge old archives and prune stale SQLite conversation rows
- Add comprehensive tests for new features
---
.tmp_todo_probe | 0
Cargo.lock | 28 ++
Cargo.toml | 3 +-
README.md | 93 ++++++-
src/agent/loop_.rs | 39 ++-
src/channels/mod.rs | 150 ++++++++++-
src/channels/telegram.rs | 42 ++-
src/config/mod.rs | 4 +-
src/config/schema.rs | 118 +++++++-
src/cron/mod.rs | 350 +++++++++++++++++++++++-
src/cron/scheduler.rs | 169 ++++++++++++
src/daemon/mod.rs | 287 ++++++++++++++++++++
src/doctor/mod.rs | 123 +++++++++
src/gateway/mod.rs | 6 +-
src/health/mod.rs | 105 ++++++++
src/heartbeat/engine.rs | 13 +-
src/main.rs | 109 +++++++-
src/memory/hygiene.rs | 538 ++++++++++++++++++++++++++++++++++++
src/memory/mod.rs | 6 +
src/migration.rs | 553 ++++++++++++++++++++++++++++++++++++++
src/onboard/mod.rs | 2 +-
src/onboard/wizard.rs | 191 +++++++++++--
src/providers/mod.rs | 66 +++++
src/providers/reliable.rs | 229 ++++++++++++++++
src/runtime/mod.rs | 58 ++--
src/security/policy.rs | 10 +-
src/service/mod.rs | 284 ++++++++++++++++++++
src/tools/file_read.rs | 59 +++-
src/tools/file_write.rs | 81 +++++-
29 files changed, 3600 insertions(+), 116 deletions(-)
create mode 100644 .tmp_todo_probe
create mode 100644 src/cron/scheduler.rs
create mode 100644 src/daemon/mod.rs
create mode 100644 src/doctor/mod.rs
create mode 100644 src/health/mod.rs
create mode 100644 src/memory/hygiene.rs
create mode 100644 src/migration.rs
create mode 100644 src/providers/reliable.rs
create mode 100644 src/service/mod.rs
diff --git a/.tmp_todo_probe b/.tmp_todo_probe
new file mode 100644
index 0000000..e69de29
diff --git a/Cargo.lock b/Cargo.lock
index 00da71f..0a9ecff 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -293,6 +293,17 @@ dependencies = [
"libc",
]
+[[package]]
+name = "cron"
+version = "0.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6f8c3e73077b4b4a6ab1ea5047c37c57aee77657bc8ecd6f29b0af082d0b0c07"
+dependencies = [
+ "chrono",
+ "nom",
+ "once_cell",
+]
+
[[package]]
name = "crypto-common"
version = "0.1.7"
@@ -925,6 +936,12 @@ version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79"
+[[package]]
+name = "minimal-lexical"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
+
[[package]]
name = "mio"
version = "1.1.1"
@@ -936,6 +953,16 @@ dependencies = [
"windows-sys 0.61.2",
]
+[[package]]
+name = "nom"
+version = "7.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
+dependencies = [
+ "memchr",
+ "minimal-lexical",
+]
+
[[package]]
name = "nu-ansi-term"
version = "0.50.3"
@@ -2368,6 +2395,7 @@ dependencies = [
"chrono",
"clap",
"console",
+ "cron",
"dialoguer",
"directories",
"futures-util",
diff --git a/Cargo.toml b/Cargo.toml
index 08f75b0..147c9b7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -15,7 +15,7 @@ categories = ["command-line-utilities", "api-bindings"]
clap = { version = "4.5", features = ["derive"] }
# Async runtime - feature-optimized for size
-tokio = { version = "1.42", default-features = false, features = ["rt-multi-thread", "macros", "time", "net", "io-util", "sync", "process", "io-std", "fs"] }
+tokio = { version = "1.42", default-features = false, features = ["rt-multi-thread", "macros", "time", "net", "io-util", "sync", "process", "io-std", "fs", "signal"] }
# HTTP client - minimal features
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls", "blocking"] }
@@ -49,6 +49,7 @@ async-trait = "0.1"
# Memory / persistence
rusqlite = { version = "0.32", features = ["bundled"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
+cron = "0.12"
# Interactive CLI prompts
dialoguer = { version = "0.11", features = ["fuzzy-select"] }
diff --git a/README.md b/README.md
index 5efbbf7..8076dd4 100644
--- a/README.md
+++ b/README.md
@@ -12,12 +12,19 @@
-The fastest, smallest, fully autonomous AI assistant — deploy anywhere, swap anything.
+Fast, small, and fully autonomous AI assistant infrastructure — deploy anywhere, swap anything.
```
~3.4MB binary · <10ms startup · 1,017 tests · 22+ providers · 8 traits · Pluggable everything
```
+### Why teams pick ZeroClaw
+
+- **Lean by default:** small Rust binary, fast startup, low memory footprint.
+- **Secure by design:** pairing, strict sandboxing, explicit allowlists, workspace scoping.
+- **Fully swappable:** core systems are traits (providers, channels, tools, memory, tunnels).
+- **No lock-in:** OpenAI-compatible provider support + pluggable custom endpoints.
+
## Benchmark Snapshot (ZeroClaw vs OpenClaw)
Local machine quick benchmark (macOS arm64, Feb 2026), same host, 3 runs each.
@@ -30,7 +37,17 @@ Local machine quick benchmark (macOS arm64, Feb 2026), same host, 3 runs each.
| `--help` max RSS observed | **~7.3 MB** | **~394 MB** |
| `status` max RSS observed | **~7.8 MB** | **~1.52 GB** |
-> Notes: measured with `/usr/bin/time -l`; first run includes cold-start effects. OpenClaw results include `pnpm install` + `pnpm build` before execution.
+> Notes: measured with `/usr/bin/time -l`; first run includes cold-start effects. OpenClaw results were measured after `pnpm install` + `pnpm build`.
+
+Reproduce ZeroClaw numbers locally:
+
+```bash
+cargo build --release
+ls -lh target/release/zeroclaw
+
+/usr/bin/time -l target/release/zeroclaw --help
+/usr/bin/time -l target/release/zeroclaw status
+```
## Quick Start
@@ -38,34 +55,48 @@ Local machine quick benchmark (macOS arm64, Feb 2026), same host, 3 runs each.
git clone https://github.com/theonlyhennygod/zeroclaw.git
cd zeroclaw
cargo build --release
+cargo install --path . --force
# Quick setup (no prompts)
-cargo run --release -- onboard --api-key sk-... --provider openrouter
+zeroclaw onboard --api-key sk-... --provider openrouter
# Or interactive wizard
-cargo run --release -- onboard --interactive
+zeroclaw onboard --interactive
+
+# Or quickly repair channels/allowlists only
+zeroclaw onboard --channels-only
# Chat
-cargo run --release -- agent -m "Hello, ZeroClaw!"
+zeroclaw agent -m "Hello, ZeroClaw!"
# Interactive mode
-cargo run --release -- agent
+zeroclaw agent
# Start the gateway (webhook server)
-cargo run --release -- gateway # default: 127.0.0.1:8080
-cargo run --release -- gateway --port 0 # random port (security hardened)
+zeroclaw gateway # default: 127.0.0.1:8080
+zeroclaw gateway --port 0 # random port (security hardened)
+
+# Start full autonomous runtime
+zeroclaw daemon
# Check status
-cargo run --release -- status
+zeroclaw status
+
+# Run system diagnostics
+zeroclaw doctor
# Check channel health
-cargo run --release -- channel doctor
+zeroclaw channel doctor
# Get integration setup details
-cargo run --release -- integrations info Telegram
+zeroclaw integrations info Telegram
+
+# Manage background service
+zeroclaw service install
+zeroclaw service status
```
-> **Tip:** Run `cargo install --path .` to install `zeroclaw` globally, then use `zeroclaw` instead of `cargo run --release --`.
+> **Dev fallback (no global install):** prefix commands with `cargo run --release --` (example: `cargo run --release -- status`).
## Architecture
@@ -82,13 +113,20 @@ Every subsystem is a **trait** — swap implementations with a config change, ze
| **Memory** | `Memory` | SQLite with hybrid search (FTS5 + vector cosine similarity), Markdown | Any persistence backend |
| **Tools** | `Tool` | shell, file_read, file_write, memory_store, memory_recall, memory_forget, browser_open (Brave + allowlist), composio (optional) | Any capability |
| **Observability** | `Observer` | Noop, Log, Multi | Prometheus, OTel |
-| **Runtime** | `RuntimeAdapter` | Native (Mac/Linux/Pi) | Docker, WASM |
+| **Runtime** | `RuntimeAdapter` | Native (Mac/Linux/Pi) | Docker, WASM (planned; unsupported kinds fail fast) |
| **Security** | `SecurityPolicy` | Gateway pairing, sandbox, allowlists, rate limits, filesystem scoping, encrypted secrets | — |
| **Tunnel** | `Tunnel` | None, Cloudflare, Tailscale, ngrok, Custom | Any tunnel binary |
| **Heartbeat** | Engine | HEARTBEAT.md periodic tasks | — |
| **Skills** | Loader | TOML manifests + SKILL.md instructions | Community skill packs |
| **Integrations** | Registry | 50+ integrations across 9 categories | Plugin system |
+### Runtime support (current)
+
+- ✅ Supported today: `runtime.kind = "native"`
+- 🚧 Planned, not implemented yet: Docker / WASM / edge runtimes
+
+When an unsupported `runtime.kind` is configured, ZeroClaw now exits with a clear error instead of silently falling back to native.
+
### Memory System (Full-Stack Search Engine)
All custom, zero external dependencies — no Pinecone, no Elasticsearch, no LangChain:
@@ -124,7 +162,7 @@ ZeroClaw enforces security at **every layer** — not just the sandbox. It passe
|---|------|--------|-----|
| 1 | **Gateway not publicly exposed** | ✅ | Binds `127.0.0.1` by default. Refuses `0.0.0.0` without tunnel or explicit `allow_public_bind = true`. |
| 2 | **Pairing required** | ✅ | 6-digit one-time code on startup. Exchange via `POST /pair` for bearer token. All `/webhook` requests require `Authorization: Bearer `. |
-| 3 | **Filesystem scoped (no /)** | ✅ | `workspace_only = true` by default. 14 system dirs + 4 sensitive dotfiles blocked. Null byte injection blocked. Symlink escape detection via canonicalization. |
+| 3 | **Filesystem scoped (no /)** | ✅ | `workspace_only = true` by default. 14 system dirs + 4 sensitive dotfiles blocked. Null byte injection blocked. Symlink escape detection via canonicalization + resolved-path workspace checks in file read/write tools. |
| 4 | **Access via tunnel only** | ✅ | Gateway refuses public bind without active tunnel. Supports Tailscale, Cloudflare, ngrok, or any custom tunnel. |
> **Run your own nmap:** `nmap -p 1-65535 ` — ZeroClaw binds to localhost only, so nothing is exposed unless you explicitly configure a tunnel.
@@ -139,6 +177,26 @@ Inbound sender policy is now consistent:
This keeps accidental exposure low by default.
+Recommended low-friction setup (secure + fast):
+
+- **Telegram:** allowlist your own `@username` (without `@`) and/or your numeric Telegram user ID.
+- **Discord:** allowlist your own Discord user ID.
+- **Slack:** allowlist your own Slack member ID (usually starts with `U`).
+- Use `"*"` only for temporary open testing.
+
+If you're not sure which identity to use:
+
+1. Start channels and send one message to your bot.
+2. Read the warning log to see the exact sender identity.
+3. Add that value to the allowlist and rerun channels-only setup.
+
+If you hit authorization warnings in logs (for example: `ignoring message from unauthorized user`),
+rerun channel setup only:
+
+```bash
+zeroclaw onboard --channels-only
+```
+
## Configuration
Config: `~/.zeroclaw/config.toml` (created by `onboard`)
@@ -166,6 +224,9 @@ workspace_only = true # default: true — scoped to workspace
allowed_commands = ["git", "npm", "cargo", "ls", "cat", "grep"]
forbidden_paths = ["/etc", "/root", "/proc", "/sys", "~/.ssh", "~/.gnupg", "~/.aws"]
+[runtime]
+kind = "native" # only supported value right now; unsupported kinds fail fast
+
[heartbeat]
enabled = false
interval_minutes = 30
@@ -198,10 +259,14 @@ enabled = false # opt-in: 1000+ OAuth apps via composio.dev
|---------|-------------|
| `onboard` | Quick setup (default) |
| `onboard --interactive` | Full interactive 7-step wizard |
+| `onboard --channels-only` | Reconfigure channels/allowlists only (fast repair flow) |
| `agent -m "..."` | Single message mode |
| `agent` | Interactive chat mode |
| `gateway` | Start webhook server (default: `127.0.0.1:8080`) |
| `gateway --port 0` | Random port mode |
+| `daemon` | Start long-running autonomous runtime |
+| `service install/start/stop/status/uninstall` | Manage user-level background service |
+| `doctor` | Diagnose daemon/scheduler/channel freshness |
| `status` | Show full system status |
| `channel doctor` | Run health checks for configured channels |
| `integrations info ` | Show setup/status details for one integration |
diff --git a/src/agent/loop_.rs b/src/agent/loop_.rs
index 57e0182..0f611d7 100644
--- a/src/agent/loop_.rs
+++ b/src/agent/loop_.rs
@@ -39,7 +39,7 @@ pub async fn run(
// ── Wire up agnostic subsystems ──────────────────────────────
let observer: Arc =
Arc::from(observability::create_observer(&config.observability));
- let _runtime = runtime::create_runtime(&config.runtime);
+ let _runtime = runtime::create_runtime(&config.runtime)?;
let security = Arc::new(SecurityPolicy::from_config(
&config.autonomy,
&config.workspace_dir,
@@ -72,8 +72,11 @@ pub async fn run(
.or(config.default_model.as_deref())
.unwrap_or("anthropic/claude-sonnet-4-20250514");
- let provider: Box =
- providers::create_provider(provider_name, config.api_key.as_deref())?;
+ let provider: Box = providers::create_resilient_provider(
+ provider_name,
+ config.api_key.as_deref(),
+ &config.reliability,
+ )?;
observer.record_event(&ObserverEvent::AgentStart {
provider: provider_name.to_string(),
@@ -83,12 +86,30 @@ pub async fn run(
// ── Build system prompt from workspace MD files (OpenClaw framework) ──
let skills = crate::skills::load_skills(&config.workspace_dir);
let mut tool_descs: Vec<(&str, &str)> = vec![
- ("shell", "Execute terminal commands"),
- ("file_read", "Read file contents"),
- ("file_write", "Write file contents"),
- ("memory_store", "Save to memory"),
- ("memory_recall", "Search memory"),
- ("memory_forget", "Delete a memory entry"),
+ (
+ "shell",
+ "Execute terminal commands. Use when: running local checks, build/test commands, diagnostics. Don't use when: a safer dedicated tool exists, or command is destructive without approval.",
+ ),
+ (
+ "file_read",
+ "Read file contents. Use when: inspecting project files, configs, logs. Don't use when: a targeted search is enough.",
+ ),
+ (
+ "file_write",
+ "Write file contents. Use when: applying focused edits, scaffolding files, updating docs/code. Don't use when: side effects are unclear or file ownership is uncertain.",
+ ),
+ (
+ "memory_store",
+ "Save to memory. Use when: preserving durable preferences, decisions, key context. Don't use when: information is transient/noisy/sensitive without need.",
+ ),
+ (
+ "memory_recall",
+ "Search memory. Use when: retrieving prior decisions, user preferences, historical context. Don't use when: answer is already in current context.",
+ ),
+ (
+ "memory_forget",
+ "Delete a memory entry. Use when: memory is incorrect/stale or explicitly requested for removal. Don't use when: impact is uncertain.",
+ ),
];
if config.browser.enabled {
tool_descs.push((
diff --git a/src/channels/mod.rs b/src/channels/mod.rs
index 7252f7d..32e47e7 100644
--- a/src/channels/mod.rs
+++ b/src/channels/mod.rs
@@ -24,6 +24,46 @@ use std::time::Duration;
/// Maximum characters per injected workspace file (matches `OpenClaw` default).
const BOOTSTRAP_MAX_CHARS: usize = 20_000;
+const DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS: u64 = 2;
+const DEFAULT_CHANNEL_MAX_BACKOFF_SECS: u64 = 60;
+
+fn spawn_supervised_listener(
+ ch: Arc,
+ tx: tokio::sync::mpsc::Sender,
+ initial_backoff_secs: u64,
+ max_backoff_secs: u64,
+) -> tokio::task::JoinHandle<()> {
+ tokio::spawn(async move {
+ let component = format!("channel:{}", ch.name());
+ let mut backoff = initial_backoff_secs.max(1);
+ let max_backoff = max_backoff_secs.max(backoff);
+
+ loop {
+ crate::health::mark_component_ok(&component);
+ let result = ch.listen(tx.clone()).await;
+
+ if tx.is_closed() {
+ break;
+ }
+
+ match result {
+ Ok(()) => {
+ tracing::warn!("Channel {} exited unexpectedly; restarting", ch.name());
+ crate::health::mark_component_error(&component, "listener exited unexpectedly");
+ }
+ Err(e) => {
+ tracing::error!("Channel {} error: {e}; restarting", ch.name());
+ crate::health::mark_component_error(&component, e.to_string());
+ }
+ }
+
+ crate::health::bump_component_restart(&component);
+ tokio::time::sleep(Duration::from_secs(backoff)).await;
+ backoff = backoff.saturating_mul(2).min(max_backoff);
+ }
+ })
+}
+
/// Load workspace identity files and build a system prompt.
///
/// Follows the `OpenClaw` framework structure:
@@ -334,9 +374,10 @@ pub async fn doctor_channels(config: Config) -> Result<()> {
/// Start all configured channels and route messages to the agent
#[allow(clippy::too_many_lines)]
pub async fn start_channels(config: Config) -> Result<()> {
- let provider: Arc = Arc::from(providers::create_provider(
+ let provider: Arc = Arc::from(providers::create_resilient_provider(
config.default_provider.as_deref().unwrap_or("openrouter"),
config.api_key.as_deref(),
+ &config.reliability,
)?);
let model = config
.default_model
@@ -355,12 +396,30 @@ pub async fn start_channels(config: Config) -> Result<()> {
// Collect tool descriptions for the prompt
let mut tool_descs: Vec<(&str, &str)> = vec![
- ("shell", "Execute terminal commands"),
- ("file_read", "Read file contents"),
- ("file_write", "Write file contents"),
- ("memory_store", "Save to memory"),
- ("memory_recall", "Search memory"),
- ("memory_forget", "Delete a memory entry"),
+ (
+ "shell",
+ "Execute terminal commands. Use when: running local checks, build/test commands, diagnostics. Don't use when: a safer dedicated tool exists, or command is destructive without approval.",
+ ),
+ (
+ "file_read",
+ "Read file contents. Use when: inspecting project files, configs, logs. Don't use when: a targeted search is enough.",
+ ),
+ (
+ "file_write",
+ "Write file contents. Use when: applying focused edits, scaffolding files, updating docs/code. Don't use when: side effects are unclear or file ownership is uncertain.",
+ ),
+ (
+ "memory_store",
+ "Save to memory. Use when: preserving durable preferences, decisions, key context. Don't use when: information is transient/noisy/sensitive without need.",
+ ),
+ (
+ "memory_recall",
+ "Search memory. Use when: retrieving prior decisions, user preferences, historical context. Don't use when: answer is already in current context.",
+ ),
+ (
+ "memory_forget",
+ "Delete a memory entry. Use when: memory is incorrect/stale or explicitly requested for removal. Don't use when: impact is uncertain.",
+ ),
];
if config.browser.enabled {
@@ -446,19 +505,29 @@ pub async fn start_channels(config: Config) -> Result<()> {
println!(" Listening for messages... (Ctrl+C to stop)");
println!();
+ crate::health::mark_component_ok("channels");
+
+ let initial_backoff_secs = config
+ .reliability
+ .channel_initial_backoff_secs
+ .max(DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS);
+ let max_backoff_secs = config
+ .reliability
+ .channel_max_backoff_secs
+ .max(DEFAULT_CHANNEL_MAX_BACKOFF_SECS);
+
// Single message bus — all channels send messages here
let (tx, mut rx) = tokio::sync::mpsc::channel::(100);
// Spawn a listener for each channel
let mut handles = Vec::new();
for ch in &channels {
- let ch = ch.clone();
- let tx = tx.clone();
- handles.push(tokio::spawn(async move {
- if let Err(e) = ch.listen(tx).await {
- tracing::error!("Channel {} error: {e}", ch.name());
- }
- }));
+ handles.push(spawn_supervised_listener(
+ ch.clone(),
+ tx.clone(),
+ initial_backoff_secs,
+ max_backoff_secs,
+ ));
}
drop(tx); // Drop our copy so rx closes when all channels stop
@@ -533,6 +602,8 @@ pub async fn start_channels(config: Config) -> Result<()> {
#[cfg(test)]
mod tests {
use super::*;
+ use std::sync::atomic::{AtomicUsize, Ordering};
+ use std::sync::Arc;
use tempfile::TempDir;
fn make_workspace() -> TempDir {
@@ -777,4 +848,55 @@ mod tests {
let state = classify_health_result(&result);
assert_eq!(state, ChannelHealthState::Timeout);
}
+
+ struct AlwaysFailChannel {
+ name: &'static str,
+ calls: Arc,
+ }
+
+ #[async_trait::async_trait]
+ impl Channel for AlwaysFailChannel {
+ fn name(&self) -> &str {
+ self.name
+ }
+
+ async fn send(&self, _message: &str, _recipient: &str) -> anyhow::Result<()> {
+ Ok(())
+ }
+
+ async fn listen(
+ &self,
+ _tx: tokio::sync::mpsc::Sender,
+ ) -> anyhow::Result<()> {
+ self.calls.fetch_add(1, Ordering::SeqCst);
+ anyhow::bail!("listen boom")
+ }
+ }
+
+ #[tokio::test]
+ async fn supervised_listener_marks_error_and_restarts_on_failures() {
+ let calls = Arc::new(AtomicUsize::new(0));
+ let channel: Arc = Arc::new(AlwaysFailChannel {
+ name: "test-supervised-fail",
+ calls: Arc::clone(&calls),
+ });
+
+ let (_tx, rx) = tokio::sync::mpsc::channel::(1);
+ let handle = spawn_supervised_listener(channel, _tx, 1, 1);
+
+ tokio::time::sleep(Duration::from_millis(80)).await;
+ drop(rx);
+ handle.abort();
+ let _ = handle.await;
+
+ let snapshot = crate::health::snapshot_json();
+ let component = &snapshot["components"]["channel:test-supervised-fail"];
+ assert_eq!(component["status"], "error");
+ assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
+ assert!(component["last_error"]
+ .as_str()
+ .unwrap_or("")
+ .contains("listen boom"));
+ assert!(calls.load(Ordering::SeqCst) >= 1);
+ }
}
diff --git a/src/channels/telegram.rs b/src/channels/telegram.rs
index 56f8a3c..0147c8d 100644
--- a/src/channels/telegram.rs
+++ b/src/channels/telegram.rs
@@ -25,6 +25,13 @@ impl TelegramChannel {
fn is_user_allowed(&self, username: &str) -> bool {
self.allowed_users.iter().any(|u| u == "*" || u == username)
}
+
+ fn is_any_user_allowed<'a, I>(&self, identities: I) -> bool
+ where
+ I: IntoIterator- ,
+ {
+ identities.into_iter().any(|id| self.is_user_allowed(id))
+ }
}
#[async_trait]
@@ -95,15 +102,28 @@ impl Channel for TelegramChannel {
continue;
};
- let username = message
+ let username_opt = message
.get("from")
.and_then(|f| f.get("username"))
- .and_then(|u| u.as_str())
- .unwrap_or("unknown");
+ .and_then(|u| u.as_str());
+ let username = username_opt.unwrap_or("unknown");
- if !self.is_user_allowed(username) {
+ let user_id = message
+ .get("from")
+ .and_then(|f| f.get("id"))
+ .and_then(serde_json::Value::as_i64);
+ let user_id_str = user_id.map(|id| id.to_string());
+
+ let mut identities = vec![username];
+ if let Some(ref id) = user_id_str {
+ identities.push(id.as_str());
+ }
+
+ if !self.is_any_user_allowed(identities.iter().copied()) {
tracing::warn!(
- "Telegram: ignoring message from unauthorized user: {username}"
+ "Telegram: ignoring message from unauthorized user: username={username}, user_id={}. \
+Allowlist Telegram @username or numeric user ID, then run `zeroclaw onboard --channels-only`.",
+ user_id_str.as_deref().unwrap_or("unknown")
);
continue;
}
@@ -211,4 +231,16 @@ mod tests {
assert!(ch.is_user_allowed("bob"));
assert!(ch.is_user_allowed("anyone"));
}
+
+ #[test]
+ fn telegram_user_allowed_by_numeric_id_identity() {
+ let ch = TelegramChannel::new("t".into(), vec!["123456789".into()]);
+ assert!(ch.is_any_user_allowed(["unknown", "123456789"]));
+ }
+
+ #[test]
+ fn telegram_user_denied_when_none_of_identities_match() {
+ let ch = TelegramChannel::new("t".into(), vec!["alice".into(), "987654321".into()]);
+ assert!(!ch.is_any_user_allowed(["unknown", "123456789"]));
+ }
}
diff --git a/src/config/mod.rs b/src/config/mod.rs
index 9af098c..4632486 100644
--- a/src/config/mod.rs
+++ b/src/config/mod.rs
@@ -3,6 +3,6 @@ pub mod schema;
pub use schema::{
AutonomyConfig, BrowserConfig, ChannelsConfig, ComposioConfig, Config, DiscordConfig,
GatewayConfig, HeartbeatConfig, IMessageConfig, MatrixConfig, MemoryConfig,
- ObservabilityConfig, RuntimeConfig, SecretsConfig, SlackConfig, TelegramConfig, TunnelConfig,
- WebhookConfig,
+ ObservabilityConfig, ReliabilityConfig, RuntimeConfig, SecretsConfig, SlackConfig,
+ TelegramConfig, TunnelConfig, WebhookConfig,
};
diff --git a/src/config/schema.rs b/src/config/schema.rs
index 49a9d59..006d120 100644
--- a/src/config/schema.rs
+++ b/src/config/schema.rs
@@ -25,6 +25,9 @@ pub struct Config {
#[serde(default)]
pub runtime: RuntimeConfig,
+ #[serde(default)]
+ pub reliability: ReliabilityConfig,
+
#[serde(default)]
pub heartbeat: HeartbeatConfig,
@@ -143,6 +146,18 @@ pub struct MemoryConfig {
pub backend: String,
/// Auto-save conversation context to memory
pub auto_save: bool,
+ /// Run memory/session hygiene (archiving + retention cleanup)
+ #[serde(default = "default_hygiene_enabled")]
+ pub hygiene_enabled: bool,
+ /// Archive daily/session files older than this many days
+ #[serde(default = "default_archive_after_days")]
+ pub archive_after_days: u32,
+ /// Purge archived files older than this many days
+ #[serde(default = "default_purge_after_days")]
+ pub purge_after_days: u32,
+ /// For sqlite backend: prune conversation rows older than this many days
+ #[serde(default = "default_conversation_retention_days")]
+ pub conversation_retention_days: u32,
/// Embedding provider: "none" | "openai" | "custom:URL"
#[serde(default = "default_embedding_provider")]
pub embedding_provider: String,
@@ -169,6 +184,18 @@ pub struct MemoryConfig {
fn default_embedding_provider() -> String {
"none".into()
}
+fn default_hygiene_enabled() -> bool {
+ true
+}
+fn default_archive_after_days() -> u32 {
+ 7
+}
+fn default_purge_after_days() -> u32 {
+ 30
+}
+fn default_conversation_retention_days() -> u32 {
+ 30
+}
fn default_embedding_model() -> String {
"text-embedding-3-small".into()
}
@@ -193,6 +220,10 @@ impl Default for MemoryConfig {
Self {
backend: "sqlite".into(),
auto_save: true,
+ hygiene_enabled: default_hygiene_enabled(),
+ archive_after_days: default_archive_after_days(),
+ purge_after_days: default_purge_after_days(),
+ conversation_retention_days: default_conversation_retention_days(),
embedding_provider: default_embedding_provider(),
embedding_model: default_embedding_model(),
embedding_dimensions: default_embedding_dims(),
@@ -281,7 +312,9 @@ impl Default for AutonomyConfig {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeConfig {
- /// "native" | "docker" | "cloudflare"
+ /// Runtime kind (currently supported: "native").
+ ///
+ /// Reserved values (not implemented yet): "docker", "cloudflare".
pub kind: String,
}
@@ -293,6 +326,71 @@ impl Default for RuntimeConfig {
}
}
+// ── Reliability / supervision ────────────────────────────────────
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ReliabilityConfig {
+ /// Retries per provider before failing over.
+ #[serde(default = "default_provider_retries")]
+ pub provider_retries: u32,
+ /// Base backoff (ms) for provider retry delay.
+ #[serde(default = "default_provider_backoff_ms")]
+ pub provider_backoff_ms: u64,
+ /// Fallback provider chain (e.g. `["anthropic", "openai"]`).
+ #[serde(default)]
+ pub fallback_providers: Vec,
+ /// Initial backoff for channel/daemon restarts.
+ #[serde(default = "default_channel_backoff_secs")]
+ pub channel_initial_backoff_secs: u64,
+ /// Max backoff for channel/daemon restarts.
+ #[serde(default = "default_channel_backoff_max_secs")]
+ pub channel_max_backoff_secs: u64,
+ /// Scheduler polling cadence in seconds.
+ #[serde(default = "default_scheduler_poll_secs")]
+ pub scheduler_poll_secs: u64,
+ /// Max retries for cron job execution attempts.
+ #[serde(default = "default_scheduler_retries")]
+ pub scheduler_retries: u32,
+}
+
+fn default_provider_retries() -> u32 {
+ 2
+}
+
+fn default_provider_backoff_ms() -> u64 {
+ 500
+}
+
+fn default_channel_backoff_secs() -> u64 {
+ 2
+}
+
+fn default_channel_backoff_max_secs() -> u64 {
+ 60
+}
+
+fn default_scheduler_poll_secs() -> u64 {
+ 15
+}
+
+fn default_scheduler_retries() -> u32 {
+ 2
+}
+
+impl Default for ReliabilityConfig {
+ fn default() -> Self {
+ Self {
+ provider_retries: default_provider_retries(),
+ provider_backoff_ms: default_provider_backoff_ms(),
+ fallback_providers: Vec::new(),
+ channel_initial_backoff_secs: default_channel_backoff_secs(),
+ channel_max_backoff_secs: default_channel_backoff_max_secs(),
+ scheduler_poll_secs: default_scheduler_poll_secs(),
+ scheduler_retries: default_scheduler_retries(),
+ }
+ }
+}
+
// ── Heartbeat ────────────────────────────────────────────────────
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -463,6 +561,7 @@ impl Default for Config {
observability: ObservabilityConfig::default(),
autonomy: AutonomyConfig::default(),
runtime: RuntimeConfig::default(),
+ reliability: ReliabilityConfig::default(),
heartbeat: HeartbeatConfig::default(),
channels_config: ChannelsConfig::default(),
memory: MemoryConfig::default(),
@@ -558,6 +657,17 @@ mod tests {
assert_eq!(h.interval_minutes, 30);
}
+ #[test]
+ fn memory_config_default_hygiene_settings() {
+ let m = MemoryConfig::default();
+ assert_eq!(m.backend, "sqlite");
+ assert!(m.auto_save);
+ assert!(m.hygiene_enabled);
+ assert_eq!(m.archive_after_days, 7);
+ assert_eq!(m.purge_after_days, 30);
+ assert_eq!(m.conversation_retention_days, 30);
+ }
+
#[test]
fn channels_config_default() {
let c = ChannelsConfig::default();
@@ -591,6 +701,7 @@ mod tests {
runtime: RuntimeConfig {
kind: "docker".into(),
},
+ reliability: ReliabilityConfig::default(),
heartbeat: HeartbeatConfig {
enabled: true,
interval_minutes: 15,
@@ -650,6 +761,10 @@ default_temperature = 0.7
assert_eq!(parsed.runtime.kind, "native");
assert!(!parsed.heartbeat.enabled);
assert!(parsed.channels_config.cli);
+ assert!(parsed.memory.hygiene_enabled);
+ assert_eq!(parsed.memory.archive_after_days, 7);
+ assert_eq!(parsed.memory.purge_after_days, 30);
+ assert_eq!(parsed.memory.conversation_retention_days, 30);
}
#[test]
@@ -669,6 +784,7 @@ default_temperature = 0.7
observability: ObservabilityConfig::default(),
autonomy: AutonomyConfig::default(),
runtime: RuntimeConfig::default(),
+ reliability: ReliabilityConfig::default(),
heartbeat: HeartbeatConfig::default(),
channels_config: ChannelsConfig::default(),
memory: MemoryConfig::default(),
diff --git a/src/cron/mod.rs b/src/cron/mod.rs
index 8f52701..572670d 100644
--- a/src/cron/mod.rs
+++ b/src/cron/mod.rs
@@ -1,25 +1,353 @@
use crate::config::Config;
-use anyhow::Result;
+use anyhow::{Context, Result};
+use chrono::{DateTime, Utc};
+use cron::Schedule;
+use rusqlite::{params, Connection};
+use std::str::FromStr;
+use uuid::Uuid;
-pub fn handle_command(command: super::CronCommands, _config: Config) -> Result<()> {
+pub mod scheduler;
+
+#[derive(Debug, Clone)]
+pub struct CronJob {
+ pub id: String,
+ pub expression: String,
+ pub command: String,
+ pub next_run: DateTime,
+ pub last_run: Option>,
+ pub last_status: Option,
+}
+
+pub fn handle_command(command: super::CronCommands, config: Config) -> Result<()> {
match command {
super::CronCommands::List => {
- println!("No scheduled tasks yet.");
- println!("\nUsage:");
- println!(" zeroclaw cron add '0 9 * * *' 'agent -m \"Good morning!\"'");
+ let jobs = list_jobs(&config)?;
+ if jobs.is_empty() {
+ println!("No scheduled tasks yet.");
+ println!("\nUsage:");
+ println!(" zeroclaw cron add '0 9 * * *' 'agent -m \"Good morning!\"'");
+ return Ok(());
+ }
+
+ println!("🕒 Scheduled jobs ({}):", jobs.len());
+ for job in jobs {
+ let last_run = job
+ .last_run
+ .map(|d| d.to_rfc3339())
+ .unwrap_or_else(|| "never".into());
+ let last_status = job.last_status.unwrap_or_else(|| "n/a".into());
+ println!(
+ "- {} | {} | next={} | last={} ({})\n cmd: {}",
+ job.id,
+ job.expression,
+ job.next_run.to_rfc3339(),
+ last_run,
+ last_status,
+ job.command
+ );
+ }
Ok(())
}
super::CronCommands::Add {
expression,
command,
} => {
- println!("Cron scheduling coming soon!");
- println!(" Expression: {expression}");
- println!(" Command: {command}");
+ let job = add_job(&config, &expression, &command)?;
+ println!("✅ Added cron job {}", job.id);
+ println!(" Expr: {}", job.expression);
+ println!(" Next: {}", job.next_run.to_rfc3339());
+ println!(" Cmd : {}", job.command);
Ok(())
}
- super::CronCommands::Remove { id } => {
- anyhow::bail!("Remove task '{id}' not yet implemented");
- }
+ super::CronCommands::Remove { id } => remove_job(&config, &id),
+ }
+}
+
+pub fn add_job(config: &Config, expression: &str, command: &str) -> Result {
+ let now = Utc::now();
+ let next_run = next_run_for(expression, now)?;
+ let id = Uuid::new_v4().to_string();
+
+ with_connection(config, |conn| {
+ conn.execute(
+ "INSERT INTO cron_jobs (id, expression, command, created_at, next_run)
+ VALUES (?1, ?2, ?3, ?4, ?5)",
+ params![
+ id,
+ expression,
+ command,
+ now.to_rfc3339(),
+ next_run.to_rfc3339()
+ ],
+ )
+ .context("Failed to insert cron job")?;
+ Ok(())
+ })?;
+
+ Ok(CronJob {
+ id,
+ expression: expression.to_string(),
+ command: command.to_string(),
+ next_run,
+ last_run: None,
+ last_status: None,
+ })
+}
+
+pub fn list_jobs(config: &Config) -> Result> {
+ with_connection(config, |conn| {
+ let mut stmt = conn.prepare(
+ "SELECT id, expression, command, next_run, last_run, last_status
+ FROM cron_jobs ORDER BY next_run ASC",
+ )?;
+
+ let rows = stmt.query_map([], |row| {
+ let next_run_raw: String = row.get(3)?;
+ let last_run_raw: Option = row.get(4)?;
+ Ok((
+ row.get::<_, String>(0)?,
+ row.get::<_, String>(1)?,
+ row.get::<_, String>(2)?,
+ next_run_raw,
+ last_run_raw,
+ row.get::<_, Option>(5)?,
+ ))
+ })?;
+
+ let mut jobs = Vec::new();
+ for row in rows {
+ let (id, expression, command, next_run_raw, last_run_raw, last_status) = row?;
+ jobs.push(CronJob {
+ id,
+ expression,
+ command,
+ next_run: parse_rfc3339(&next_run_raw)?,
+ last_run: match last_run_raw {
+ Some(raw) => Some(parse_rfc3339(&raw)?),
+ None => None,
+ },
+ last_status,
+ });
+ }
+ Ok(jobs)
+ })
+}
+
+pub fn remove_job(config: &Config, id: &str) -> Result<()> {
+ let changed = with_connection(config, |conn| {
+ conn.execute("DELETE FROM cron_jobs WHERE id = ?1", params![id])
+ .context("Failed to delete cron job")
+ })?;
+
+ if changed == 0 {
+ anyhow::bail!("Cron job '{id}' not found");
+ }
+
+ println!("✅ Removed cron job {id}");
+ Ok(())
+}
+
+pub fn due_jobs(config: &Config, now: DateTime) -> Result> {
+ with_connection(config, |conn| {
+ let mut stmt = conn.prepare(
+ "SELECT id, expression, command, next_run, last_run, last_status
+ FROM cron_jobs WHERE next_run <= ?1 ORDER BY next_run ASC",
+ )?;
+
+ let rows = stmt.query_map(params![now.to_rfc3339()], |row| {
+ let next_run_raw: String = row.get(3)?;
+ let last_run_raw: Option = row.get(4)?;
+ Ok((
+ row.get::<_, String>(0)?,
+ row.get::<_, String>(1)?,
+ row.get::<_, String>(2)?,
+ next_run_raw,
+ last_run_raw,
+ row.get::<_, Option>(5)?,
+ ))
+ })?;
+
+ let mut jobs = Vec::new();
+ for row in rows {
+ let (id, expression, command, next_run_raw, last_run_raw, last_status) = row?;
+ jobs.push(CronJob {
+ id,
+ expression,
+ command,
+ next_run: parse_rfc3339(&next_run_raw)?,
+ last_run: match last_run_raw {
+ Some(raw) => Some(parse_rfc3339(&raw)?),
+ None => None,
+ },
+ last_status,
+ });
+ }
+ Ok(jobs)
+ })
+}
+
+pub fn reschedule_after_run(
+ config: &Config,
+ job: &CronJob,
+ success: bool,
+ output: &str,
+) -> Result<()> {
+ let now = Utc::now();
+ let next_run = next_run_for(&job.expression, now)?;
+ let status = if success { "ok" } else { "error" };
+
+ with_connection(config, |conn| {
+ conn.execute(
+ "UPDATE cron_jobs
+ SET next_run = ?1, last_run = ?2, last_status = ?3, last_output = ?4
+ WHERE id = ?5",
+ params![
+ next_run.to_rfc3339(),
+ now.to_rfc3339(),
+ status,
+ output,
+ job.id
+ ],
+ )
+ .context("Failed to update cron job run state")?;
+ Ok(())
+ })
+}
+
+fn next_run_for(expression: &str, from: DateTime) -> Result> {
+ let normalized = normalize_expression(expression)?;
+ let schedule = Schedule::from_str(&normalized)
+ .with_context(|| format!("Invalid cron expression: {expression}"))?;
+ schedule
+ .after(&from)
+ .next()
+ .ok_or_else(|| anyhow::anyhow!("No future occurrence for expression: {expression}"))
+}
+
+fn normalize_expression(expression: &str) -> Result {
+ let expression = expression.trim();
+ let field_count = expression.split_whitespace().count();
+
+ match field_count {
+ // standard crontab syntax: minute hour day month weekday
+ 5 => Ok(format!("0 {expression}")),
+ // crate-native syntax includes seconds (+ optional year)
+ 6 | 7 => Ok(expression.to_string()),
+ _ => anyhow::bail!(
+ "Invalid cron expression: {expression} (expected 5, 6, or 7 fields, got {field_count})"
+ ),
+ }
+}
+
+fn parse_rfc3339(raw: &str) -> Result> {
+ let parsed = DateTime::parse_from_rfc3339(raw)
+ .with_context(|| format!("Invalid RFC3339 timestamp in cron DB: {raw}"))?;
+ Ok(parsed.with_timezone(&Utc))
+}
+
+fn with_connection(config: &Config, f: impl FnOnce(&Connection) -> Result) -> Result {
+ let db_path = config.workspace_dir.join("cron").join("jobs.db");
+ if let Some(parent) = db_path.parent() {
+ std::fs::create_dir_all(parent)
+ .with_context(|| format!("Failed to create cron directory: {}", parent.display()))?;
+ }
+
+ let conn = Connection::open(&db_path)
+ .with_context(|| format!("Failed to open cron DB: {}", db_path.display()))?;
+
+ conn.execute_batch(
+ "CREATE TABLE IF NOT EXISTS cron_jobs (
+ id TEXT PRIMARY KEY,
+ expression TEXT NOT NULL,
+ command TEXT NOT NULL,
+ created_at TEXT NOT NULL,
+ next_run TEXT NOT NULL,
+ last_run TEXT,
+ last_status TEXT,
+ last_output TEXT
+ );
+ CREATE INDEX IF NOT EXISTS idx_cron_jobs_next_run ON cron_jobs(next_run);",
+ )
+ .context("Failed to initialize cron schema")?;
+
+ f(&conn)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::config::Config;
+ use chrono::Duration as ChronoDuration;
+ use tempfile::TempDir;
+
+ fn test_config(tmp: &TempDir) -> Config {
+ let mut config = Config::default();
+ config.workspace_dir = tmp.path().join("workspace");
+ config.config_path = tmp.path().join("config.toml");
+ std::fs::create_dir_all(&config.workspace_dir).unwrap();
+ config
+ }
+
+ #[test]
+ fn add_job_accepts_five_field_expression() {
+ let tmp = TempDir::new().unwrap();
+ let config = test_config(&tmp);
+
+ let job = add_job(&config, "*/5 * * * *", "echo ok").unwrap();
+
+ assert_eq!(job.expression, "*/5 * * * *");
+ assert_eq!(job.command, "echo ok");
+ }
+
+ #[test]
+ fn add_job_rejects_invalid_field_count() {
+ let tmp = TempDir::new().unwrap();
+ let config = test_config(&tmp);
+
+ let err = add_job(&config, "* * * *", "echo bad").unwrap_err();
+ assert!(err.to_string().contains("expected 5, 6, or 7 fields"));
+ }
+
+ #[test]
+ fn add_list_remove_roundtrip() {
+ let tmp = TempDir::new().unwrap();
+ let config = test_config(&tmp);
+
+ let job = add_job(&config, "*/10 * * * *", "echo roundtrip").unwrap();
+ let listed = list_jobs(&config).unwrap();
+ assert_eq!(listed.len(), 1);
+ assert_eq!(listed[0].id, job.id);
+
+ remove_job(&config, &job.id).unwrap();
+ assert!(list_jobs(&config).unwrap().is_empty());
+ }
+
+ #[test]
+ fn due_jobs_filters_by_timestamp() {
+ let tmp = TempDir::new().unwrap();
+ let config = test_config(&tmp);
+
+ let _job = add_job(&config, "* * * * *", "echo due").unwrap();
+
+ let due_now = due_jobs(&config, Utc::now()).unwrap();
+ assert!(due_now.is_empty(), "new job should not be due immediately");
+
+ let far_future = Utc::now() + ChronoDuration::days(365);
+ let due_future = due_jobs(&config, far_future).unwrap();
+ assert_eq!(due_future.len(), 1, "job should be due in far future");
+ }
+
+ #[test]
+ fn reschedule_after_run_persists_last_status_and_last_run() {
+ let tmp = TempDir::new().unwrap();
+ let config = test_config(&tmp);
+
+ let job = add_job(&config, "*/15 * * * *", "echo run").unwrap();
+ reschedule_after_run(&config, &job, false, "failed output").unwrap();
+
+ let listed = list_jobs(&config).unwrap();
+ let stored = listed.iter().find(|j| j.id == job.id).unwrap();
+ assert_eq!(stored.last_status.as_deref(), Some("error"));
+ assert!(stored.last_run.is_some());
}
}
diff --git a/src/cron/scheduler.rs b/src/cron/scheduler.rs
new file mode 100644
index 0000000..459fe59
--- /dev/null
+++ b/src/cron/scheduler.rs
@@ -0,0 +1,169 @@
+use crate::config::Config;
+use crate::cron::{due_jobs, reschedule_after_run, CronJob};
+use anyhow::Result;
+use chrono::Utc;
+use tokio::process::Command;
+use tokio::time::{self, Duration};
+
+const MIN_POLL_SECONDS: u64 = 5;
+
+pub async fn run(config: Config) -> Result<()> {
+ let poll_secs = config.reliability.scheduler_poll_secs.max(MIN_POLL_SECONDS);
+ let mut interval = time::interval(Duration::from_secs(poll_secs));
+
+ crate::health::mark_component_ok("scheduler");
+
+ loop {
+ interval.tick().await;
+
+ let jobs = match due_jobs(&config, Utc::now()) {
+ Ok(jobs) => jobs,
+ Err(e) => {
+ crate::health::mark_component_error("scheduler", e.to_string());
+ tracing::warn!("Scheduler query failed: {e}");
+ continue;
+ }
+ };
+
+ for job in jobs {
+ crate::health::mark_component_ok("scheduler");
+ let (success, output) = execute_job_with_retry(&config, &job).await;
+
+ if !success {
+ crate::health::mark_component_error("scheduler", format!("job {} failed", job.id));
+ }
+
+ if let Err(e) = reschedule_after_run(&config, &job, success, &output) {
+ crate::health::mark_component_error("scheduler", e.to_string());
+ tracing::warn!("Failed to persist scheduler run result: {e}");
+ }
+ }
+ }
+}
+
+async fn execute_job_with_retry(config: &Config, job: &CronJob) -> (bool, String) {
+ let mut last_output = String::new();
+ let retries = config.reliability.scheduler_retries;
+ let mut backoff_ms = config.reliability.provider_backoff_ms.max(200);
+
+ for attempt in 0..=retries {
+ let (success, output) = run_job_command(config, job).await;
+ last_output = output;
+
+ if success {
+ return (true, last_output);
+ }
+
+ if attempt < retries {
+ let jitter_ms = (Utc::now().timestamp_subsec_millis() % 250) as u64;
+ time::sleep(Duration::from_millis(backoff_ms + jitter_ms)).await;
+ backoff_ms = (backoff_ms.saturating_mul(2)).min(30_000);
+ }
+ }
+
+ (false, last_output)
+}
+
+async fn run_job_command(config: &Config, job: &CronJob) -> (bool, String) {
+ let output = Command::new("sh")
+ .arg("-lc")
+ .arg(&job.command)
+ .current_dir(&config.workspace_dir)
+ .output()
+ .await;
+
+ match output {
+ Ok(output) => {
+ let stdout = String::from_utf8_lossy(&output.stdout);
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ let combined = format!(
+ "status={}\nstdout:\n{}\nstderr:\n{}",
+ output.status,
+ stdout.trim(),
+ stderr.trim()
+ );
+ (output.status.success(), combined)
+ }
+ Err(e) => (false, format!("spawn error: {e}")),
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::config::Config;
+ use tempfile::TempDir;
+
+ fn test_config(tmp: &TempDir) -> Config {
+ let mut config = Config::default();
+ config.workspace_dir = tmp.path().join("workspace");
+ config.config_path = tmp.path().join("config.toml");
+ std::fs::create_dir_all(&config.workspace_dir).unwrap();
+ config
+ }
+
+ fn test_job(command: &str) -> CronJob {
+ CronJob {
+ id: "test-job".into(),
+ expression: "* * * * *".into(),
+ command: command.into(),
+ next_run: Utc::now(),
+ last_run: None,
+ last_status: None,
+ }
+ }
+
+ #[tokio::test]
+ async fn run_job_command_success() {
+ let tmp = TempDir::new().unwrap();
+ let config = test_config(&tmp);
+ let job = test_job("echo scheduler-ok");
+
+ let (success, output) = run_job_command(&config, &job).await;
+ assert!(success);
+ assert!(output.contains("scheduler-ok"));
+ assert!(output.contains("status=exit status: 0"));
+ }
+
+ #[tokio::test]
+ async fn run_job_command_failure() {
+ let tmp = TempDir::new().unwrap();
+ let config = test_config(&tmp);
+ let job = test_job("echo scheduler-fail 1>&2; exit 7");
+
+ let (success, output) = run_job_command(&config, &job).await;
+ assert!(!success);
+ assert!(output.contains("scheduler-fail"));
+ assert!(output.contains("status=exit status: 7"));
+ }
+
+ #[tokio::test]
+ async fn execute_job_with_retry_recovers_after_first_failure() {
+ let tmp = TempDir::new().unwrap();
+ let mut config = test_config(&tmp);
+ config.reliability.scheduler_retries = 1;
+ config.reliability.provider_backoff_ms = 1;
+
+ let job = test_job(
+ "if [ -f retry-ok.flag ]; then echo recovered; exit 0; else touch retry-ok.flag; echo first-fail 1>&2; exit 1; fi",
+ );
+
+ let (success, output) = execute_job_with_retry(&config, &job).await;
+ assert!(success);
+ assert!(output.contains("recovered"));
+ }
+
+ #[tokio::test]
+ async fn execute_job_with_retry_exhausts_attempts() {
+ let tmp = TempDir::new().unwrap();
+ let mut config = test_config(&tmp);
+ config.reliability.scheduler_retries = 1;
+ config.reliability.provider_backoff_ms = 1;
+
+ let job = test_job("echo still-bad 1>&2; exit 1");
+
+ let (success, output) = execute_job_with_retry(&config, &job).await;
+ assert!(!success);
+ assert!(output.contains("still-bad"));
+ }
+}
diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs
new file mode 100644
index 0000000..db374bc
--- /dev/null
+++ b/src/daemon/mod.rs
@@ -0,0 +1,287 @@
+use crate::config::Config;
+use anyhow::Result;
+use chrono::Utc;
+use std::future::Future;
+use std::path::PathBuf;
+use tokio::task::JoinHandle;
+use tokio::time::Duration;
+
+const STATUS_FLUSH_SECONDS: u64 = 5;
+
+pub async fn run(config: Config, host: String, port: u16) -> Result<()> {
+ let initial_backoff = config.reliability.channel_initial_backoff_secs.max(1);
+ let max_backoff = config
+ .reliability
+ .channel_max_backoff_secs
+ .max(initial_backoff);
+
+ crate::health::mark_component_ok("daemon");
+
+ if config.heartbeat.enabled {
+ let _ =
+ crate::heartbeat::engine::HeartbeatEngine::ensure_heartbeat_file(&config.workspace_dir)
+ .await;
+ }
+
+ let mut handles: Vec> = vec![spawn_state_writer(config.clone())];
+
+ {
+ let gateway_cfg = config.clone();
+ let gateway_host = host.clone();
+ handles.push(spawn_component_supervisor(
+ "gateway",
+ initial_backoff,
+ max_backoff,
+ move || {
+ let cfg = gateway_cfg.clone();
+ let host = gateway_host.clone();
+ async move { crate::gateway::run_gateway(&host, port, cfg).await }
+ },
+ ));
+ }
+
+ {
+ if has_supervised_channels(&config) {
+ let channels_cfg = config.clone();
+ handles.push(spawn_component_supervisor(
+ "channels",
+ initial_backoff,
+ max_backoff,
+ move || {
+ let cfg = channels_cfg.clone();
+ async move { crate::channels::start_channels(cfg).await }
+ },
+ ));
+ } else {
+ crate::health::mark_component_ok("channels");
+ tracing::info!("No real-time channels configured; channel supervisor disabled");
+ }
+ }
+
+ if config.heartbeat.enabled {
+ let heartbeat_cfg = config.clone();
+ handles.push(spawn_component_supervisor(
+ "heartbeat",
+ initial_backoff,
+ max_backoff,
+ move || {
+ let cfg = heartbeat_cfg.clone();
+ async move { run_heartbeat_worker(cfg).await }
+ },
+ ));
+ }
+
+ {
+ let scheduler_cfg = config.clone();
+ handles.push(spawn_component_supervisor(
+ "scheduler",
+ initial_backoff,
+ max_backoff,
+ move || {
+ let cfg = scheduler_cfg.clone();
+ async move { crate::cron::scheduler::run(cfg).await }
+ },
+ ));
+ }
+
+ println!("🧠 ZeroClaw daemon started");
+ println!(" Gateway: http://{host}:{port}");
+ println!(" Components: gateway, channels, heartbeat, scheduler");
+ println!(" Ctrl+C to stop");
+
+ tokio::signal::ctrl_c().await?;
+ crate::health::mark_component_error("daemon", "shutdown requested");
+
+ for handle in &handles {
+ handle.abort();
+ }
+ for handle in handles {
+ let _ = handle.await;
+ }
+
+ Ok(())
+}
+
+pub fn state_file_path(config: &Config) -> PathBuf {
+ config
+ .config_path
+ .parent()
+ .map_or_else(|| PathBuf::from("."), PathBuf::from)
+ .join("daemon_state.json")
+}
+
+fn spawn_state_writer(config: Config) -> JoinHandle<()> {
+ tokio::spawn(async move {
+ let path = state_file_path(&config);
+ if let Some(parent) = path.parent() {
+ let _ = tokio::fs::create_dir_all(parent).await;
+ }
+
+ let mut interval = tokio::time::interval(Duration::from_secs(STATUS_FLUSH_SECONDS));
+ loop {
+ interval.tick().await;
+ let mut json = crate::health::snapshot_json();
+ if let Some(obj) = json.as_object_mut() {
+ obj.insert(
+ "written_at".into(),
+ serde_json::json!(Utc::now().to_rfc3339()),
+ );
+ }
+ let data = serde_json::to_vec_pretty(&json).unwrap_or_else(|_| b"{}".to_vec());
+ let _ = tokio::fs::write(&path, data).await;
+ }
+ })
+}
+
+fn spawn_component_supervisor(
+ name: &'static str,
+ initial_backoff_secs: u64,
+ max_backoff_secs: u64,
+ mut run_component: F,
+) -> JoinHandle<()>
+where
+ F: FnMut() -> Fut + Send + 'static,
+ Fut: Future