use crate::channels::{ Channel, DiscordChannel, MattermostChannel, SendMessage, SlackChannel, TelegramChannel, }; use crate::config::Config; use crate::cron::{ due_jobs, next_run_for_schedule, record_last_run, record_run, remove_job, reschedule_after_run, update_job, CronJob, CronJobPatch, DeliveryConfig, JobType, Schedule, SessionTarget, }; use crate::security::SecurityPolicy; use anyhow::Result; use chrono::{DateTime, Utc}; use futures_util::{stream, StreamExt}; use std::sync::Arc; use tokio::process::Command; use tokio::time::{self, Duration}; const MIN_POLL_SECONDS: u64 = 5; const SHELL_JOB_TIMEOUT_SECS: u64 = 120; pub async fn run(config: Config) -> Result<()> { let poll_secs = config.reliability.scheduler_poll_secs.max(MIN_POLL_SECONDS); let mut interval = time::interval(Duration::from_secs(poll_secs)); let security = Arc::new(SecurityPolicy::from_config( &config.autonomy, &config.workspace_dir, )); crate::health::mark_component_ok("scheduler"); loop { interval.tick().await; let jobs = match due_jobs(&config, Utc::now()) { Ok(jobs) => jobs, Err(e) => { crate::health::mark_component_error("scheduler", e.to_string()); tracing::warn!("Scheduler query failed: {e}"); continue; } }; process_due_jobs(&config, &security, jobs).await; } } pub async fn execute_job_now(config: &Config, job: &CronJob) -> (bool, String) { let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); execute_job_with_retry(config, &security, job).await } async fn execute_job_with_retry( config: &Config, security: &SecurityPolicy, job: &CronJob, ) -> (bool, String) { let mut last_output = String::new(); let retries = config.reliability.scheduler_retries; let mut backoff_ms = config.reliability.provider_backoff_ms.max(200); for attempt in 0..=retries { let (success, output) = match job.job_type { JobType::Shell => run_job_command(config, security, job).await, JobType::Agent => run_agent_job(config, job).await, }; last_output = output; if success { return (true, last_output); } if last_output.starts_with("blocked by security policy:") { // Deterministic policy violations are not retryable. return (false, last_output); } if attempt < retries { let jitter_ms = u64::from(Utc::now().timestamp_subsec_millis() % 250); time::sleep(Duration::from_millis(backoff_ms + jitter_ms)).await; backoff_ms = (backoff_ms.saturating_mul(2)).min(30_000); } } (false, last_output) } async fn process_due_jobs(config: &Config, security: &Arc, jobs: Vec) { let max_concurrent = config.scheduler.max_concurrent.max(1); let mut in_flight = stream::iter(jobs.into_iter().map(|job| { let config = config.clone(); let security = Arc::clone(security); async move { execute_and_persist_job(&config, security.as_ref(), &job).await } })) .buffer_unordered(max_concurrent); while let Some((job_id, success)) = in_flight.next().await { if !success { crate::health::mark_component_error("scheduler", format!("job {job_id} failed")); } } } async fn execute_and_persist_job( config: &Config, security: &SecurityPolicy, job: &CronJob, ) -> (String, bool) { crate::health::mark_component_ok("scheduler"); warn_if_high_frequency_agent_job(job); let started_at = Utc::now(); let (success, output) = execute_job_with_retry(config, security, job).await; let finished_at = Utc::now(); let success = persist_job_result(config, job, success, &output, started_at, finished_at).await; (job.id.clone(), success) } async fn run_agent_job(config: &Config, job: &CronJob) -> (bool, String) { let name = job.name.clone().unwrap_or_else(|| "cron-job".to_string()); let prompt = job.prompt.clone().unwrap_or_default(); let prefixed_prompt = format!("[cron:{} {name}] {prompt}", job.id); let model_override = job.model.clone(); let run_result = match job.session_target { SessionTarget::Main | SessionTarget::Isolated => { crate::agent::run( config.clone(), Some(prefixed_prompt), None, model_override, config.default_temperature, vec![], ) .await } }; match run_result { Ok(response) => ( true, if response.trim().is_empty() { "agent job executed".to_string() } else { response }, ), Err(e) => (false, format!("agent job failed: {e}")), } } async fn persist_job_result( config: &Config, job: &CronJob, mut success: bool, output: &str, started_at: DateTime, finished_at: DateTime, ) -> bool { let duration_ms = (finished_at - started_at).num_milliseconds(); if let Err(e) = deliver_if_configured(config, job, output).await { if job.delivery.best_effort { tracing::warn!("Cron delivery failed (best_effort): {e}"); } else { success = false; tracing::warn!("Cron delivery failed: {e}"); } } let _ = record_run( config, &job.id, started_at, finished_at, if success { "ok" } else { "error" }, Some(output), duration_ms, ); if is_one_shot_auto_delete(job) { if success { if let Err(e) = remove_job(config, &job.id) { tracing::warn!("Failed to remove one-shot cron job after success: {e}"); } } else { let _ = record_last_run(config, &job.id, finished_at, false, output); if let Err(e) = update_job( config, &job.id, CronJobPatch { enabled: Some(false), ..CronJobPatch::default() }, ) { tracing::warn!("Failed to disable failed one-shot cron job: {e}"); } } return success; } if let Err(e) = reschedule_after_run(config, job, success, output) { tracing::warn!("Failed to persist scheduler run result: {e}"); } success } fn is_one_shot_auto_delete(job: &CronJob) -> bool { job.delete_after_run && matches!(job.schedule, Schedule::At { .. }) } fn warn_if_high_frequency_agent_job(job: &CronJob) { if !matches!(job.job_type, JobType::Agent) { return; } let too_frequent = match &job.schedule { Schedule::Every { every_ms } => *every_ms < 5 * 60 * 1000, Schedule::Cron { .. } => { let now = Utc::now(); match ( next_run_for_schedule(&job.schedule, now), next_run_for_schedule(&job.schedule, now + chrono::Duration::seconds(1)), ) { (Ok(a), Ok(b)) => (b - a).num_minutes() < 5, _ => false, } } Schedule::At { .. } => false, }; if too_frequent { tracing::warn!( "Cron agent job '{}' is scheduled more frequently than every 5 minutes", job.id ); } } async fn deliver_if_configured(config: &Config, job: &CronJob, output: &str) -> Result<()> { let delivery: &DeliveryConfig = &job.delivery; if !delivery.mode.eq_ignore_ascii_case("announce") { return Ok(()); } let channel = delivery .channel .as_deref() .ok_or_else(|| anyhow::anyhow!("delivery.channel is required for announce mode"))?; let target = delivery .to .as_deref() .ok_or_else(|| anyhow::anyhow!("delivery.to is required for announce mode"))?; match channel.to_ascii_lowercase().as_str() { "telegram" => { let tg = config .channels_config .telegram .as_ref() .ok_or_else(|| anyhow::anyhow!("telegram channel not configured"))?; let channel = TelegramChannel::new(tg.bot_token.clone(), tg.allowed_users.clone()); channel.send(&SendMessage::new(output, target)).await?; } "discord" => { let dc = config .channels_config .discord .as_ref() .ok_or_else(|| anyhow::anyhow!("discord channel not configured"))?; let channel = DiscordChannel::new( dc.bot_token.clone(), dc.guild_id.clone(), dc.allowed_users.clone(), dc.listen_to_bots, dc.mention_only, ); channel.send(&SendMessage::new(output, target)).await?; } "slack" => { let sl = config .channels_config .slack .as_ref() .ok_or_else(|| anyhow::anyhow!("slack channel not configured"))?; let channel = SlackChannel::new( sl.bot_token.clone(), sl.channel_id.clone(), sl.allowed_users.clone(), ); channel.send(&SendMessage::new(output, target)).await?; } "mattermost" => { let mm = config .channels_config .mattermost .as_ref() .ok_or_else(|| anyhow::anyhow!("mattermost channel not configured"))?; let channel = MattermostChannel::new( mm.url.clone(), mm.bot_token.clone(), mm.channel_id.clone(), mm.allowed_users.clone(), mm.thread_replies.unwrap_or(false), ); channel.send(&SendMessage::new(output, target)).await?; } other => anyhow::bail!("unsupported delivery channel: {other}"), } Ok(()) } fn is_env_assignment(word: &str) -> bool { word.contains('=') && word .chars() .next() .is_some_and(|c| c.is_ascii_alphabetic() || c == '_') } fn strip_wrapping_quotes(token: &str) -> &str { token.trim_matches(|c| c == '"' || c == '\'') } fn forbidden_path_argument(security: &SecurityPolicy, command: &str) -> Option { let mut normalized = command.to_string(); for sep in ["&&", "||"] { normalized = normalized.replace(sep, "\x00"); } for sep in ['\n', ';', '|'] { normalized = normalized.replace(sep, "\x00"); } for segment in normalized.split('\x00') { let tokens: Vec<&str> = segment.split_whitespace().collect(); if tokens.is_empty() { continue; } // Skip leading env assignments and executable token. let mut idx = 0; while idx < tokens.len() && is_env_assignment(tokens[idx]) { idx += 1; } if idx >= tokens.len() { continue; } idx += 1; for token in &tokens[idx..] { let candidate = strip_wrapping_quotes(token); if candidate.is_empty() || candidate.starts_with('-') || candidate.contains("://") { continue; } let looks_like_path = candidate.starts_with('/') || candidate.starts_with("./") || candidate.starts_with("../") || candidate.starts_with("~/") || candidate.contains('/'); if looks_like_path && !security.is_path_allowed(candidate) { return Some(candidate.to_string()); } } } None } async fn run_job_command( config: &Config, security: &SecurityPolicy, job: &CronJob, ) -> (bool, String) { run_job_command_with_timeout( config, security, job, Duration::from_secs(SHELL_JOB_TIMEOUT_SECS), ) .await } async fn run_job_command_with_timeout( config: &Config, security: &SecurityPolicy, job: &CronJob, timeout: Duration, ) -> (bool, String) { if !security.can_act() { return ( false, "blocked by security policy: autonomy is read-only".to_string(), ); } if security.is_rate_limited() { return ( false, "blocked by security policy: rate limit exceeded".to_string(), ); } if !security.is_command_allowed(&job.command) { return ( false, format!( "blocked by security policy: command not allowed: {}", job.command ), ); } if let Some(path) = forbidden_path_argument(security, &job.command) { return ( false, format!("blocked by security policy: forbidden path argument: {path}"), ); } if !security.record_action() { return ( false, "blocked by security policy: action budget exhausted".to_string(), ); } let child = match Command::new("sh") .arg("-lc") .arg(&job.command) .current_dir(&config.workspace_dir) .kill_on_drop(true) .spawn() { Ok(child) => child, Err(e) => return (false, format!("spawn error: {e}")), }; match time::timeout(timeout, child.wait_with_output()).await { Ok(Ok(output)) => { let stdout = String::from_utf8_lossy(&output.stdout); let stderr = String::from_utf8_lossy(&output.stderr); let combined = format!( "status={}\nstdout:\n{}\nstderr:\n{}", output.status, stdout.trim(), stderr.trim() ); (output.status.success(), combined) } Ok(Err(e)) => (false, format!("spawn error: {e}")), Err(_) => ( false, format!("job timed out after {}s", timeout.as_secs_f64()), ), } } #[cfg(test)] mod tests { use super::*; use crate::config::Config; use crate::cron::{self, DeliveryConfig}; use crate::security::SecurityPolicy; use chrono::{Duration as ChronoDuration, Utc}; use tempfile::TempDir; fn test_config(tmp: &TempDir) -> Config { let config = Config { workspace_dir: tmp.path().join("workspace"), config_path: tmp.path().join("config.toml"), ..Config::default() }; std::fs::create_dir_all(&config.workspace_dir).unwrap(); config } fn test_job(command: &str) -> CronJob { CronJob { id: "test-job".into(), expression: "* * * * *".into(), schedule: crate::cron::Schedule::Cron { expr: "* * * * *".into(), tz: None, }, command: command.into(), prompt: None, name: None, job_type: JobType::Shell, session_target: SessionTarget::Isolated, model: None, enabled: true, delivery: DeliveryConfig::default(), delete_after_run: false, created_at: Utc::now(), next_run: Utc::now(), last_run: None, last_status: None, last_output: None, } } #[tokio::test] async fn run_job_command_success() { let tmp = TempDir::new().unwrap(); let config = test_config(&tmp); let job = test_job("echo scheduler-ok"); let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); let (success, output) = run_job_command(&config, &security, &job).await; assert!(success); assert!(output.contains("scheduler-ok")); assert!(output.contains("status=exit status: 0")); } #[tokio::test] async fn run_job_command_failure() { let tmp = TempDir::new().unwrap(); let config = test_config(&tmp); let job = test_job("ls definitely_missing_file_for_scheduler_test"); let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); let (success, output) = run_job_command(&config, &security, &job).await; assert!(!success); assert!(output.contains("definitely_missing_file_for_scheduler_test")); assert!(output.contains("status=exit status:")); } #[tokio::test] async fn run_job_command_times_out() { let tmp = TempDir::new().unwrap(); let mut config = test_config(&tmp); config.autonomy.allowed_commands = vec!["sleep".into()]; let job = test_job("sleep 1"); let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); let (success, output) = run_job_command_with_timeout(&config, &security, &job, Duration::from_millis(50)).await; assert!(!success); assert!(output.contains("job timed out after")); } #[tokio::test] async fn run_job_command_blocks_disallowed_command() { let tmp = TempDir::new().unwrap(); let mut config = test_config(&tmp); config.autonomy.allowed_commands = vec!["echo".into()]; let job = test_job("curl https://evil.example"); let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); let (success, output) = run_job_command(&config, &security, &job).await; assert!(!success); assert!(output.contains("blocked by security policy")); assert!(output.contains("command not allowed")); } #[tokio::test] async fn run_job_command_blocks_forbidden_path_argument() { let tmp = TempDir::new().unwrap(); let mut config = test_config(&tmp); config.autonomy.allowed_commands = vec!["cat".into()]; let job = test_job("cat /etc/passwd"); let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); let (success, output) = run_job_command(&config, &security, &job).await; assert!(!success); assert!(output.contains("blocked by security policy")); assert!(output.contains("forbidden path argument")); assert!(output.contains("/etc/passwd")); } #[tokio::test] async fn run_job_command_blocks_readonly_mode() { let tmp = TempDir::new().unwrap(); let mut config = test_config(&tmp); config.autonomy.level = crate::security::AutonomyLevel::ReadOnly; let job = test_job("echo should-not-run"); let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); let (success, output) = run_job_command(&config, &security, &job).await; assert!(!success); assert!(output.contains("blocked by security policy")); assert!(output.contains("read-only")); } #[tokio::test] async fn run_job_command_blocks_rate_limited() { let tmp = TempDir::new().unwrap(); let mut config = test_config(&tmp); config.autonomy.max_actions_per_hour = 0; let job = test_job("echo should-not-run"); let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); let (success, output) = run_job_command(&config, &security, &job).await; assert!(!success); assert!(output.contains("blocked by security policy")); assert!(output.contains("rate limit exceeded")); } #[tokio::test] async fn execute_job_with_retry_recovers_after_first_failure() { let tmp = TempDir::new().unwrap(); let mut config = test_config(&tmp); config.reliability.scheduler_retries = 1; config.reliability.provider_backoff_ms = 1; config.autonomy.allowed_commands = vec!["sh".into()]; let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); std::fs::write( config.workspace_dir.join("retry-once.sh"), "#!/bin/sh\nif [ -f retry-ok.flag ]; then\n echo recovered\n exit 0\nfi\ntouch retry-ok.flag\nexit 1\n", ) .unwrap(); let job = test_job("sh ./retry-once.sh"); let (success, output) = execute_job_with_retry(&config, &security, &job).await; assert!(success); assert!(output.contains("recovered")); } #[tokio::test] async fn execute_job_with_retry_exhausts_attempts() { let tmp = TempDir::new().unwrap(); let mut config = test_config(&tmp); config.reliability.scheduler_retries = 1; config.reliability.provider_backoff_ms = 1; let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); let job = test_job("ls always_missing_for_retry_test"); let (success, output) = execute_job_with_retry(&config, &security, &job).await; assert!(!success); assert!(output.contains("always_missing_for_retry_test")); } #[tokio::test] async fn run_agent_job_returns_error_without_provider_key() { let tmp = TempDir::new().unwrap(); let config = test_config(&tmp); let mut job = test_job(""); job.job_type = JobType::Agent; job.prompt = Some("Say hello".into()); let (success, output) = run_agent_job(&config, &job).await; assert!(!success, "Agent job without provider key should fail"); assert!( !output.is_empty(), "Expected non-empty error output from failed agent job" ); } #[tokio::test] async fn persist_job_result_records_run_and_reschedules_shell_job() { let tmp = TempDir::new().unwrap(); let config = test_config(&tmp); let job = cron::add_job(&config, "*/5 * * * *", "echo ok").unwrap(); let started = Utc::now(); let finished = started + ChronoDuration::milliseconds(10); let success = persist_job_result(&config, &job, true, "ok", started, finished).await; assert!(success); let runs = cron::list_runs(&config, &job.id, 10).unwrap(); assert_eq!(runs.len(), 1); let updated = cron::get_job(&config, &job.id).unwrap(); assert_eq!(updated.last_status.as_deref(), Some("ok")); } #[tokio::test] async fn persist_job_result_success_deletes_one_shot() { let tmp = TempDir::new().unwrap(); let config = test_config(&tmp); let at = Utc::now() + ChronoDuration::minutes(10); let job = cron::add_agent_job( &config, Some("one-shot".into()), crate::cron::Schedule::At { at }, "Hello", SessionTarget::Isolated, None, None, true, ) .unwrap(); let started = Utc::now(); let finished = started + ChronoDuration::milliseconds(10); let success = persist_job_result(&config, &job, true, "ok", started, finished).await; assert!(success); let lookup = cron::get_job(&config, &job.id); assert!(lookup.is_err()); } #[tokio::test] async fn persist_job_result_failure_disables_one_shot() { let tmp = TempDir::new().unwrap(); let config = test_config(&tmp); let at = Utc::now() + ChronoDuration::minutes(10); let job = cron::add_agent_job( &config, Some("one-shot".into()), crate::cron::Schedule::At { at }, "Hello", SessionTarget::Isolated, None, None, true, ) .unwrap(); let started = Utc::now(); let finished = started + ChronoDuration::milliseconds(10); let success = persist_job_result(&config, &job, false, "boom", started, finished).await; assert!(!success); let updated = cron::get_job(&config, &job.id).unwrap(); assert!(!updated.enabled); assert_eq!(updated.last_status.as_deref(), Some("error")); } #[tokio::test] async fn deliver_if_configured_handles_none_and_invalid_channel() { let tmp = TempDir::new().unwrap(); let config = test_config(&tmp); let mut job = test_job("echo ok"); assert!(deliver_if_configured(&config, &job, "x").await.is_ok()); job.delivery = DeliveryConfig { mode: "announce".into(), channel: Some("invalid".into()), to: Some("target".into()), best_effort: true, }; let err = deliver_if_configured(&config, &job, "x").await.unwrap_err(); assert!(err.to_string().contains("unsupported delivery channel")); } }