merge: resolve conflicts between feat/whatsapp-email-channels and main

- Keep main's WhatsApp implementation (webhook-based, simpler)
- Preserve email channel fixes from our branch
- Merge all main branch updates (daemon, cron, health, etc.)
- Resolve Cargo.lock conflicts
This commit is contained in:
argenis de la rosa 2026-02-14 14:59:16 -05:00
commit 4e6da51924
40 changed files with 6925 additions and 780 deletions

View file

@ -7,6 +7,7 @@ pub mod slack;
pub mod telegram;
pub mod whatsapp;
pub mod traits;
pub mod whatsapp;
pub use cli::CliChannel;
pub use discord::DiscordChannel;
@ -17,6 +18,7 @@ pub use telegram::TelegramChannel;
#[allow(unused_imports)]
pub use whatsapp::WhatsAppChannel;
pub use traits::Channel;
pub use whatsapp::WhatsAppChannel;
use crate::config::Config;
use crate::memory::{self, Memory};
@ -28,6 +30,46 @@ use std::time::Duration;
/// Maximum characters per injected workspace file (matches `OpenClaw` default).
const BOOTSTRAP_MAX_CHARS: usize = 20_000;
const DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS: u64 = 2;
const DEFAULT_CHANNEL_MAX_BACKOFF_SECS: u64 = 60;
fn spawn_supervised_listener(
ch: Arc<dyn Channel>,
tx: tokio::sync::mpsc::Sender<traits::ChannelMessage>,
initial_backoff_secs: u64,
max_backoff_secs: u64,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let component = format!("channel:{}", ch.name());
let mut backoff = initial_backoff_secs.max(1);
let max_backoff = max_backoff_secs.max(backoff);
loop {
crate::health::mark_component_ok(&component);
let result = ch.listen(tx.clone()).await;
if tx.is_closed() {
break;
}
match result {
Ok(()) => {
tracing::warn!("Channel {} exited unexpectedly; restarting", ch.name());
crate::health::mark_component_error(&component, "listener exited unexpectedly");
}
Err(e) => {
tracing::error!("Channel {} error: {e}; restarting", ch.name());
crate::health::mark_component_error(&component, e.to_string());
}
}
crate::health::bump_component_restart(&component);
tokio::time::sleep(Duration::from_secs(backoff)).await;
backoff = backoff.saturating_mul(2).min(max_backoff);
}
})
}
/// Load workspace identity files and build a system prompt.
///
/// Follows the `OpenClaw` framework structure:
@ -150,6 +192,38 @@ pub fn build_system_prompt(
}
}
/// Inject OpenClaw (markdown) identity files into the prompt
fn inject_openclaw_identity(prompt: &mut String, workspace_dir: &std::path::Path) {
#[allow(unused_imports)]
use std::fmt::Write;
prompt.push_str("## Project Context\n\n");
prompt
.push_str("The following workspace files define your identity, behavior, and context.\n\n");
let bootstrap_files = [
"AGENTS.md",
"SOUL.md",
"TOOLS.md",
"IDENTITY.md",
"USER.md",
"HEARTBEAT.md",
];
for filename in &bootstrap_files {
inject_workspace_file(prompt, workspace_dir, filename);
}
// BOOTSTRAP.md — only if it exists (first-run ritual)
let bootstrap_path = workspace_dir.join("BOOTSTRAP.md");
if bootstrap_path.exists() {
inject_workspace_file(prompt, workspace_dir, "BOOTSTRAP.md");
}
// MEMORY.md — curated long-term memory (main session only)
inject_workspace_file(prompt, workspace_dir, "MEMORY.md");
}
/// Inject a single workspace file into the prompt with truncation and missing-file markers.
fn inject_workspace_file(prompt: &mut String, workspace_dir: &std::path::Path, filename: &str) {
use std::fmt::Write;
@ -200,6 +274,7 @@ pub fn handle_command(command: super::ChannelCommands, config: &Config) -> Resul
("Webhook", config.channels_config.webhook.is_some()),
("iMessage", config.channels_config.imessage.is_some()),
("Matrix", config.channels_config.matrix.is_some()),
("WhatsApp", config.channels_config.whatsapp.is_some()),
] {
println!(" {} {name}", if configured { "" } else { "" });
}
@ -294,6 +369,18 @@ pub async fn doctor_channels(config: Config) -> Result<()> {
));
}
if let Some(ref wa) = config.channels_config.whatsapp {
channels.push((
"WhatsApp",
Arc::new(WhatsAppChannel::new(
wa.access_token.clone(),
wa.phone_number_id.clone(),
wa.verify_token.clone(),
wa.allowed_numbers.clone(),
)),
));
}
if channels.is_empty() {
println!("No real-time channels configured. Run `zeroclaw onboard` first.");
return Ok(());
@ -338,9 +425,10 @@ pub async fn doctor_channels(config: Config) -> Result<()> {
/// Start all configured channels and route messages to the agent
#[allow(clippy::too_many_lines)]
pub async fn start_channels(config: Config) -> Result<()> {
let provider: Arc<dyn Provider> = Arc::from(providers::create_provider(
let provider: Arc<dyn Provider> = Arc::from(providers::create_resilient_provider(
config.default_provider.as_deref().unwrap_or("openrouter"),
config.api_key.as_deref(),
&config.reliability,
)?);
let model = config
.default_model
@ -359,12 +447,30 @@ pub async fn start_channels(config: Config) -> Result<()> {
// Collect tool descriptions for the prompt
let mut tool_descs: Vec<(&str, &str)> = vec![
("shell", "Execute terminal commands"),
("file_read", "Read file contents"),
("file_write", "Write file contents"),
("memory_store", "Save to memory"),
("memory_recall", "Search memory"),
("memory_forget", "Delete a memory entry"),
(
"shell",
"Execute terminal commands. Use when: running local checks, build/test commands, diagnostics. Don't use when: a safer dedicated tool exists, or command is destructive without approval.",
),
(
"file_read",
"Read file contents. Use when: inspecting project files, configs, logs. Don't use when: a targeted search is enough.",
),
(
"file_write",
"Write file contents. Use when: applying focused edits, scaffolding files, updating docs/code. Don't use when: side effects are unclear or file ownership is uncertain.",
),
(
"memory_store",
"Save to memory. Use when: preserving durable preferences, decisions, key context. Don't use when: information is transient/noisy/sensitive without need.",
),
(
"memory_recall",
"Search memory. Use when: retrieving prior decisions, user preferences, historical context. Don't use when: answer is already in current context.",
),
(
"memory_forget",
"Delete a memory entry. Use when: memory is incorrect/stale or explicitly requested for removal. Don't use when: impact is uncertain.",
),
];
if config.browser.enabled {
@ -426,6 +532,15 @@ pub async fn start_channels(config: Config) -> Result<()> {
)));
}
if let Some(ref wa) = config.channels_config.whatsapp {
channels.push(Arc::new(WhatsAppChannel::new(
wa.access_token.clone(),
wa.phone_number_id.clone(),
wa.verify_token.clone(),
wa.allowed_numbers.clone(),
)));
}
if channels.is_empty() {
println!("No channels configured. Run `zeroclaw onboard` to set up channels.");
return Ok(());
@ -450,19 +565,29 @@ pub async fn start_channels(config: Config) -> Result<()> {
println!(" Listening for messages... (Ctrl+C to stop)");
println!();
crate::health::mark_component_ok("channels");
let initial_backoff_secs = config
.reliability
.channel_initial_backoff_secs
.max(DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS);
let max_backoff_secs = config
.reliability
.channel_max_backoff_secs
.max(DEFAULT_CHANNEL_MAX_BACKOFF_SECS);
// Single message bus — all channels send messages here
let (tx, mut rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(100);
// Spawn a listener for each channel
let mut handles = Vec::new();
for ch in &channels {
let ch = ch.clone();
let tx = tx.clone();
handles.push(tokio::spawn(async move {
if let Err(e) = ch.listen(tx).await {
tracing::error!("Channel {} error: {e}", ch.name());
}
}));
handles.push(spawn_supervised_listener(
ch.clone(),
tx.clone(),
initial_backoff_secs,
max_backoff_secs,
));
}
drop(tx); // Drop our copy so rx closes when all channels stop
@ -537,6 +662,8 @@ pub async fn start_channels(config: Config) -> Result<()> {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tempfile::TempDir;
fn make_workspace() -> TempDir {
@ -781,4 +908,55 @@ mod tests {
let state = classify_health_result(&result);
assert_eq!(state, ChannelHealthState::Timeout);
}
struct AlwaysFailChannel {
name: &'static str,
calls: Arc<AtomicUsize>,
}
#[async_trait::async_trait]
impl Channel for AlwaysFailChannel {
fn name(&self) -> &str {
self.name
}
async fn send(&self, _message: &str, _recipient: &str) -> anyhow::Result<()> {
Ok(())
}
async fn listen(
&self,
_tx: tokio::sync::mpsc::Sender<traits::ChannelMessage>,
) -> anyhow::Result<()> {
self.calls.fetch_add(1, Ordering::SeqCst);
anyhow::bail!("listen boom")
}
}
#[tokio::test]
async fn supervised_listener_marks_error_and_restarts_on_failures() {
let calls = Arc::new(AtomicUsize::new(0));
let channel: Arc<dyn Channel> = Arc::new(AlwaysFailChannel {
name: "test-supervised-fail",
calls: Arc::clone(&calls),
});
let (_tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(1);
let handle = spawn_supervised_listener(channel, _tx, 1, 1);
tokio::time::sleep(Duration::from_millis(80)).await;
drop(rx);
handle.abort();
let _ = handle.await;
let snapshot = crate::health::snapshot_json();
let component = &snapshot["components"]["channel:test-supervised-fail"];
assert_eq!(component["status"], "error");
assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
assert!(component["last_error"]
.as_str()
.unwrap_or("")
.contains("listen boom"));
assert!(calls.load(Ordering::SeqCst) >= 1);
}
}