From 7de052c7d2c26571938a434160eb7c69c1e1cbeb Mon Sep 17 00:00:00 2001 From: fettpl <38704082+fettpl@users.noreply.github.com> Date: Tue, 17 Feb 2026 23:20:32 +0100 Subject: [PATCH] fix(cron): add timeout and bounded execution for due jobs --- src/cron/scheduler.rs | 104 +++++++++++++++++++++++++++++++++--------- src/cron/store.rs | 94 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 172 insertions(+), 26 deletions(-) diff --git a/src/cron/scheduler.rs b/src/cron/scheduler.rs index e50ef78..cbf44bc 100644 --- a/src/cron/scheduler.rs +++ b/src/cron/scheduler.rs @@ -9,15 +9,21 @@ use crate::cron::{ use crate::security::SecurityPolicy; use anyhow::Result; use chrono::{DateTime, Utc}; +use futures_util::{stream, StreamExt}; +use std::sync::Arc; use tokio::process::Command; use tokio::time::{self, Duration}; const MIN_POLL_SECONDS: u64 = 5; +const SHELL_JOB_TIMEOUT_SECS: u64 = 120; pub async fn run(config: Config) -> Result<()> { let poll_secs = config.reliability.scheduler_poll_secs.max(MIN_POLL_SECONDS); let mut interval = time::interval(Duration::from_secs(poll_secs)); - let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); + let security = Arc::new(SecurityPolicy::from_config( + &config.autonomy, + &config.workspace_dir, + )); crate::health::mark_component_ok("scheduler"); @@ -33,20 +39,7 @@ pub async fn run(config: Config) -> Result<()> { } }; - for job in jobs { - crate::health::mark_component_ok("scheduler"); - warn_if_high_frequency_agent_job(&job); - - let started_at = Utc::now(); - let (success, output) = execute_job_with_retry(&config, &security, &job).await; - let finished_at = Utc::now(); - let success = - persist_job_result(&config, &job, success, &output, started_at, finished_at).await; - - if !success { - crate::health::mark_component_error("scheduler", format!("job {} failed", job.id)); - } - } + process_due_jobs(&config, &security, jobs).await; } } @@ -90,6 +83,38 @@ async fn execute_job_with_retry( (false, last_output) } +async fn process_due_jobs(config: &Config, security: &Arc, jobs: Vec) { + let max_concurrent = config.scheduler.max_concurrent.max(1); + let mut in_flight = stream::iter(jobs.into_iter().map(|job| { + let config = config.clone(); + let security = Arc::clone(security); + async move { execute_and_persist_job(&config, security.as_ref(), &job).await } + })) + .buffer_unordered(max_concurrent); + + while let Some((job_id, success)) = in_flight.next().await { + if !success { + crate::health::mark_component_error("scheduler", format!("job {job_id} failed")); + } + } +} + +async fn execute_and_persist_job( + config: &Config, + security: &SecurityPolicy, + job: &CronJob, +) -> (String, bool) { + crate::health::mark_component_ok("scheduler"); + warn_if_high_frequency_agent_job(job); + + let started_at = Utc::now(); + let (success, output) = execute_job_with_retry(config, security, job).await; + let finished_at = Utc::now(); + let success = persist_job_result(config, job, success, &output, started_at, finished_at).await; + + (job.id.clone(), success) +} + async fn run_agent_job(config: &Config, job: &CronJob) -> (bool, String) { let name = job.name.clone().unwrap_or_else(|| "cron-job".to_string()); let prompt = job.prompt.clone().unwrap_or_default(); @@ -346,6 +371,21 @@ async fn run_job_command( config: &Config, security: &SecurityPolicy, job: &CronJob, +) -> (bool, String) { + run_job_command_with_timeout( + config, + security, + job, + Duration::from_secs(SHELL_JOB_TIMEOUT_SECS), + ) + .await +} + +async fn run_job_command_with_timeout( + config: &Config, + security: &SecurityPolicy, + job: &CronJob, + timeout: Duration, ) -> (bool, String) { if !security.can_act() { return ( @@ -385,15 +425,19 @@ async fn run_job_command( ); } - let output = Command::new("sh") + let child = match Command::new("sh") .arg("-lc") .arg(&job.command) .current_dir(&config.workspace_dir) - .output() - .await; + .kill_on_drop(true) + .spawn() + { + Ok(child) => child, + Err(e) => return (false, format!("spawn error: {e}")), + }; - match output { - Ok(output) => { + match time::timeout(timeout, child.wait_with_output()).await { + Ok(Ok(output)) => { let stdout = String::from_utf8_lossy(&output.stdout); let stderr = String::from_utf8_lossy(&output.stderr); let combined = format!( @@ -404,7 +448,11 @@ async fn run_job_command( ); (output.status.success(), combined) } - Err(e) => (false, format!("spawn error: {e}")), + Ok(Err(e)) => (false, format!("spawn error: {e}")), + Err(_) => ( + false, + format!("job timed out after {}s", timeout.as_secs_f64()), + ), } } @@ -478,6 +526,20 @@ mod tests { assert!(output.contains("status=exit status:")); } + #[tokio::test] + async fn run_job_command_times_out() { + let tmp = TempDir::new().unwrap(); + let mut config = test_config(&tmp); + config.autonomy.allowed_commands = vec!["sleep".into()]; + let job = test_job("sleep 1"); + let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); + + let (success, output) = + run_job_command_with_timeout(&config, &security, &job, Duration::from_millis(50)).await; + assert!(!success); + assert!(output.contains("job timed out after")); + } + #[tokio::test] async fn run_job_command_blocks_disallowed_command() { let tmp = TempDir::new().unwrap(); diff --git a/src/cron/store.rs b/src/cron/store.rs index 9c68d05..2b49640 100644 --- a/src/cron/store.rs +++ b/src/cron/store.rs @@ -8,6 +8,9 @@ use chrono::{DateTime, Utc}; use rusqlite::{params, Connection}; use uuid::Uuid; +const MAX_CRON_OUTPUT_BYTES: usize = 16 * 1024; +const TRUNCATED_OUTPUT_MARKER: &str = "\n...[truncated]"; + pub fn add_job(config: &Config, expression: &str, command: &str) -> Result { let schedule = Schedule::Cron { expr: expression.to_string(), @@ -149,14 +152,19 @@ pub fn remove_job(config: &Config, id: &str) -> Result<()> { } pub fn due_jobs(config: &Config, now: DateTime) -> Result> { + let lim = i64::try_from(config.scheduler.max_tasks.max(1)) + .context("Scheduler max_tasks overflows i64")?; with_connection(config, |conn| { let mut stmt = conn.prepare( "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model, enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output - FROM cron_jobs WHERE enabled = 1 AND next_run <= ?1 ORDER BY next_run ASC", + FROM cron_jobs + WHERE enabled = 1 AND next_run <= ?1 + ORDER BY next_run ASC + LIMIT ?2", )?; - let rows = stmt.query_map(params![now.to_rfc3339()], map_cron_job_row)?; + let rows = stmt.query_map(params![now.to_rfc3339(), lim], map_cron_job_row)?; let mut jobs = Vec::new(); for row in rows { @@ -243,12 +251,13 @@ pub fn record_last_run( output: &str, ) -> Result<()> { let status = if success { "ok" } else { "error" }; + let bounded_output = truncate_cron_output(output); with_connection(config, |conn| { conn.execute( "UPDATE cron_jobs SET last_run = ?1, last_status = ?2, last_output = ?3 WHERE id = ?4", - params![finished_at.to_rfc3339(), status, output, job_id], + params![finished_at.to_rfc3339(), status, bounded_output, job_id], ) .context("Failed to update cron last run fields")?; Ok(()) @@ -264,6 +273,7 @@ pub fn reschedule_after_run( let now = Utc::now(); let next_run = next_run_for_schedule(&job.schedule, now)?; let status = if success { "ok" } else { "error" }; + let bounded_output = truncate_cron_output(output); with_connection(config, |conn| { conn.execute( @@ -274,7 +284,7 @@ pub fn reschedule_after_run( next_run.to_rfc3339(), now.to_rfc3339(), status, - output, + bounded_output, job.id ], ) @@ -292,6 +302,7 @@ pub fn record_run( output: Option<&str>, duration_ms: i64, ) -> Result<()> { + let bounded_output = output.map(truncate_cron_output); with_connection(config, |conn| { conn.execute( "INSERT INTO cron_runs (job_id, started_at, finished_at, status, output, duration_ms) @@ -301,7 +312,7 @@ pub fn record_run( started_at.to_rfc3339(), finished_at.to_rfc3339(), status, - output, + bounded_output.as_deref(), duration_ms, ], ) @@ -324,6 +335,25 @@ pub fn record_run( }) } +fn truncate_cron_output(output: &str) -> String { + if output.len() <= MAX_CRON_OUTPUT_BYTES { + return output.to_string(); + } + + if MAX_CRON_OUTPUT_BYTES <= TRUNCATED_OUTPUT_MARKER.len() { + return TRUNCATED_OUTPUT_MARKER.to_string(); + } + + let mut cutoff = MAX_CRON_OUTPUT_BYTES - TRUNCATED_OUTPUT_MARKER.len(); + while cutoff > 0 && !output.is_char_boundary(cutoff) { + cutoff -= 1; + } + + let mut truncated = output[..cutoff].to_string(); + truncated.push_str(TRUNCATED_OUTPUT_MARKER); + truncated +} + pub fn list_runs(config: &Config, job_id: &str, limit: usize) -> Result> { with_connection(config, |conn| { let lim = i64::try_from(limit.max(1)).context("Run history limit overflow")?; @@ -594,6 +624,21 @@ mod tests { assert!(due_after_disable.is_empty()); } + #[test] + fn due_jobs_respects_scheduler_max_tasks_limit() { + let tmp = TempDir::new().unwrap(); + let mut config = test_config(&tmp); + config.scheduler.max_tasks = 2; + + let _ = add_job(&config, "* * * * *", "echo due-1").unwrap(); + let _ = add_job(&config, "* * * * *", "echo due-2").unwrap(); + let _ = add_job(&config, "* * * * *", "echo due-3").unwrap(); + + let far_future = Utc::now() + ChronoDuration::days(365); + let due = due_jobs(&config, far_future).unwrap(); + assert_eq!(due.len(), 2); + } + #[test] fn reschedule_after_run_persists_last_status_and_last_run() { let tmp = TempDir::new().unwrap(); @@ -677,4 +722,43 @@ mod tests { let runs = list_runs(&config, &job.id, 10).unwrap(); assert!(runs.is_empty()); } + + #[test] + fn record_run_truncates_large_output() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + let job = add_job(&config, "*/5 * * * *", "echo trunc").unwrap(); + let output = "x".repeat(MAX_CRON_OUTPUT_BYTES + 512); + + record_run( + &config, + &job.id, + Utc::now(), + Utc::now(), + "ok", + Some(&output), + 1, + ) + .unwrap(); + + let runs = list_runs(&config, &job.id, 1).unwrap(); + let stored = runs[0].output.as_deref().unwrap_or_default(); + assert!(stored.ends_with(TRUNCATED_OUTPUT_MARKER)); + assert!(stored.len() <= MAX_CRON_OUTPUT_BYTES); + } + + #[test] + fn reschedule_after_run_truncates_last_output() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + let job = add_job(&config, "*/5 * * * *", "echo trunc").unwrap(); + let output = "y".repeat(MAX_CRON_OUTPUT_BYTES + 1024); + + reschedule_after_run(&config, &job, false, &output).unwrap(); + + let stored = get_job(&config, &job.id).unwrap(); + let last_output = stored.last_output.as_deref().unwrap_or_default(); + assert!(last_output.ends_with(TRUNCATED_OUTPUT_MARKER)); + assert!(last_output.len() <= MAX_CRON_OUTPUT_BYTES); + } }