fix(cron): add timeout and bounded execution for due jobs
This commit is contained in:
parent
5f5cb27690
commit
7de052c7d2
2 changed files with 172 additions and 26 deletions
|
|
@ -9,15 +9,21 @@ use crate::cron::{
|
||||||
use crate::security::SecurityPolicy;
|
use crate::security::SecurityPolicy;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
|
use futures_util::{stream, StreamExt};
|
||||||
|
use std::sync::Arc;
|
||||||
use tokio::process::Command;
|
use tokio::process::Command;
|
||||||
use tokio::time::{self, Duration};
|
use tokio::time::{self, Duration};
|
||||||
|
|
||||||
const MIN_POLL_SECONDS: u64 = 5;
|
const MIN_POLL_SECONDS: u64 = 5;
|
||||||
|
const SHELL_JOB_TIMEOUT_SECS: u64 = 120;
|
||||||
|
|
||||||
pub async fn run(config: Config) -> Result<()> {
|
pub async fn run(config: Config) -> Result<()> {
|
||||||
let poll_secs = config.reliability.scheduler_poll_secs.max(MIN_POLL_SECONDS);
|
let poll_secs = config.reliability.scheduler_poll_secs.max(MIN_POLL_SECONDS);
|
||||||
let mut interval = time::interval(Duration::from_secs(poll_secs));
|
let mut interval = time::interval(Duration::from_secs(poll_secs));
|
||||||
let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir);
|
let security = Arc::new(SecurityPolicy::from_config(
|
||||||
|
&config.autonomy,
|
||||||
|
&config.workspace_dir,
|
||||||
|
));
|
||||||
|
|
||||||
crate::health::mark_component_ok("scheduler");
|
crate::health::mark_component_ok("scheduler");
|
||||||
|
|
||||||
|
|
@ -33,20 +39,7 @@ pub async fn run(config: Config) -> Result<()> {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
for job in jobs {
|
process_due_jobs(&config, &security, jobs).await;
|
||||||
crate::health::mark_component_ok("scheduler");
|
|
||||||
warn_if_high_frequency_agent_job(&job);
|
|
||||||
|
|
||||||
let started_at = Utc::now();
|
|
||||||
let (success, output) = execute_job_with_retry(&config, &security, &job).await;
|
|
||||||
let finished_at = Utc::now();
|
|
||||||
let success =
|
|
||||||
persist_job_result(&config, &job, success, &output, started_at, finished_at).await;
|
|
||||||
|
|
||||||
if !success {
|
|
||||||
crate::health::mark_component_error("scheduler", format!("job {} failed", job.id));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -90,6 +83,38 @@ async fn execute_job_with_retry(
|
||||||
(false, last_output)
|
(false, last_output)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn process_due_jobs(config: &Config, security: &Arc<SecurityPolicy>, jobs: Vec<CronJob>) {
|
||||||
|
let max_concurrent = config.scheduler.max_concurrent.max(1);
|
||||||
|
let mut in_flight = stream::iter(jobs.into_iter().map(|job| {
|
||||||
|
let config = config.clone();
|
||||||
|
let security = Arc::clone(security);
|
||||||
|
async move { execute_and_persist_job(&config, security.as_ref(), &job).await }
|
||||||
|
}))
|
||||||
|
.buffer_unordered(max_concurrent);
|
||||||
|
|
||||||
|
while let Some((job_id, success)) = in_flight.next().await {
|
||||||
|
if !success {
|
||||||
|
crate::health::mark_component_error("scheduler", format!("job {job_id} failed"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn execute_and_persist_job(
|
||||||
|
config: &Config,
|
||||||
|
security: &SecurityPolicy,
|
||||||
|
job: &CronJob,
|
||||||
|
) -> (String, bool) {
|
||||||
|
crate::health::mark_component_ok("scheduler");
|
||||||
|
warn_if_high_frequency_agent_job(job);
|
||||||
|
|
||||||
|
let started_at = Utc::now();
|
||||||
|
let (success, output) = execute_job_with_retry(config, security, job).await;
|
||||||
|
let finished_at = Utc::now();
|
||||||
|
let success = persist_job_result(config, job, success, &output, started_at, finished_at).await;
|
||||||
|
|
||||||
|
(job.id.clone(), success)
|
||||||
|
}
|
||||||
|
|
||||||
async fn run_agent_job(config: &Config, job: &CronJob) -> (bool, String) {
|
async fn run_agent_job(config: &Config, job: &CronJob) -> (bool, String) {
|
||||||
let name = job.name.clone().unwrap_or_else(|| "cron-job".to_string());
|
let name = job.name.clone().unwrap_or_else(|| "cron-job".to_string());
|
||||||
let prompt = job.prompt.clone().unwrap_or_default();
|
let prompt = job.prompt.clone().unwrap_or_default();
|
||||||
|
|
@ -346,6 +371,21 @@ async fn run_job_command(
|
||||||
config: &Config,
|
config: &Config,
|
||||||
security: &SecurityPolicy,
|
security: &SecurityPolicy,
|
||||||
job: &CronJob,
|
job: &CronJob,
|
||||||
|
) -> (bool, String) {
|
||||||
|
run_job_command_with_timeout(
|
||||||
|
config,
|
||||||
|
security,
|
||||||
|
job,
|
||||||
|
Duration::from_secs(SHELL_JOB_TIMEOUT_SECS),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_job_command_with_timeout(
|
||||||
|
config: &Config,
|
||||||
|
security: &SecurityPolicy,
|
||||||
|
job: &CronJob,
|
||||||
|
timeout: Duration,
|
||||||
) -> (bool, String) {
|
) -> (bool, String) {
|
||||||
if !security.can_act() {
|
if !security.can_act() {
|
||||||
return (
|
return (
|
||||||
|
|
@ -385,15 +425,19 @@ async fn run_job_command(
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let output = Command::new("sh")
|
let child = match Command::new("sh")
|
||||||
.arg("-lc")
|
.arg("-lc")
|
||||||
.arg(&job.command)
|
.arg(&job.command)
|
||||||
.current_dir(&config.workspace_dir)
|
.current_dir(&config.workspace_dir)
|
||||||
.output()
|
.kill_on_drop(true)
|
||||||
.await;
|
.spawn()
|
||||||
|
{
|
||||||
|
Ok(child) => child,
|
||||||
|
Err(e) => return (false, format!("spawn error: {e}")),
|
||||||
|
};
|
||||||
|
|
||||||
match output {
|
match time::timeout(timeout, child.wait_with_output()).await {
|
||||||
Ok(output) => {
|
Ok(Ok(output)) => {
|
||||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||||
let combined = format!(
|
let combined = format!(
|
||||||
|
|
@ -404,7 +448,11 @@ async fn run_job_command(
|
||||||
);
|
);
|
||||||
(output.status.success(), combined)
|
(output.status.success(), combined)
|
||||||
}
|
}
|
||||||
Err(e) => (false, format!("spawn error: {e}")),
|
Ok(Err(e)) => (false, format!("spawn error: {e}")),
|
||||||
|
Err(_) => (
|
||||||
|
false,
|
||||||
|
format!("job timed out after {}s", timeout.as_secs_f64()),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -478,6 +526,20 @@ mod tests {
|
||||||
assert!(output.contains("status=exit status:"));
|
assert!(output.contains("status=exit status:"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn run_job_command_times_out() {
|
||||||
|
let tmp = TempDir::new().unwrap();
|
||||||
|
let mut config = test_config(&tmp);
|
||||||
|
config.autonomy.allowed_commands = vec!["sleep".into()];
|
||||||
|
let job = test_job("sleep 1");
|
||||||
|
let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir);
|
||||||
|
|
||||||
|
let (success, output) =
|
||||||
|
run_job_command_with_timeout(&config, &security, &job, Duration::from_millis(50)).await;
|
||||||
|
assert!(!success);
|
||||||
|
assert!(output.contains("job timed out after"));
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn run_job_command_blocks_disallowed_command() {
|
async fn run_job_command_blocks_disallowed_command() {
|
||||||
let tmp = TempDir::new().unwrap();
|
let tmp = TempDir::new().unwrap();
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,9 @@ use chrono::{DateTime, Utc};
|
||||||
use rusqlite::{params, Connection};
|
use rusqlite::{params, Connection};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
const MAX_CRON_OUTPUT_BYTES: usize = 16 * 1024;
|
||||||
|
const TRUNCATED_OUTPUT_MARKER: &str = "\n...[truncated]";
|
||||||
|
|
||||||
pub fn add_job(config: &Config, expression: &str, command: &str) -> Result<CronJob> {
|
pub fn add_job(config: &Config, expression: &str, command: &str) -> Result<CronJob> {
|
||||||
let schedule = Schedule::Cron {
|
let schedule = Schedule::Cron {
|
||||||
expr: expression.to_string(),
|
expr: expression.to_string(),
|
||||||
|
|
@ -149,14 +152,19 @@ pub fn remove_job(config: &Config, id: &str) -> Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn due_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
|
pub fn due_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
|
||||||
|
let lim = i64::try_from(config.scheduler.max_tasks.max(1))
|
||||||
|
.context("Scheduler max_tasks overflows i64")?;
|
||||||
with_connection(config, |conn| {
|
with_connection(config, |conn| {
|
||||||
let mut stmt = conn.prepare(
|
let mut stmt = conn.prepare(
|
||||||
"SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
|
"SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
|
||||||
enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output
|
enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output
|
||||||
FROM cron_jobs WHERE enabled = 1 AND next_run <= ?1 ORDER BY next_run ASC",
|
FROM cron_jobs
|
||||||
|
WHERE enabled = 1 AND next_run <= ?1
|
||||||
|
ORDER BY next_run ASC
|
||||||
|
LIMIT ?2",
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let rows = stmt.query_map(params![now.to_rfc3339()], map_cron_job_row)?;
|
let rows = stmt.query_map(params![now.to_rfc3339(), lim], map_cron_job_row)?;
|
||||||
|
|
||||||
let mut jobs = Vec::new();
|
let mut jobs = Vec::new();
|
||||||
for row in rows {
|
for row in rows {
|
||||||
|
|
@ -243,12 +251,13 @@ pub fn record_last_run(
|
||||||
output: &str,
|
output: &str,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let status = if success { "ok" } else { "error" };
|
let status = if success { "ok" } else { "error" };
|
||||||
|
let bounded_output = truncate_cron_output(output);
|
||||||
with_connection(config, |conn| {
|
with_connection(config, |conn| {
|
||||||
conn.execute(
|
conn.execute(
|
||||||
"UPDATE cron_jobs
|
"UPDATE cron_jobs
|
||||||
SET last_run = ?1, last_status = ?2, last_output = ?3
|
SET last_run = ?1, last_status = ?2, last_output = ?3
|
||||||
WHERE id = ?4",
|
WHERE id = ?4",
|
||||||
params![finished_at.to_rfc3339(), status, output, job_id],
|
params![finished_at.to_rfc3339(), status, bounded_output, job_id],
|
||||||
)
|
)
|
||||||
.context("Failed to update cron last run fields")?;
|
.context("Failed to update cron last run fields")?;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
@ -264,6 +273,7 @@ pub fn reschedule_after_run(
|
||||||
let now = Utc::now();
|
let now = Utc::now();
|
||||||
let next_run = next_run_for_schedule(&job.schedule, now)?;
|
let next_run = next_run_for_schedule(&job.schedule, now)?;
|
||||||
let status = if success { "ok" } else { "error" };
|
let status = if success { "ok" } else { "error" };
|
||||||
|
let bounded_output = truncate_cron_output(output);
|
||||||
|
|
||||||
with_connection(config, |conn| {
|
with_connection(config, |conn| {
|
||||||
conn.execute(
|
conn.execute(
|
||||||
|
|
@ -274,7 +284,7 @@ pub fn reschedule_after_run(
|
||||||
next_run.to_rfc3339(),
|
next_run.to_rfc3339(),
|
||||||
now.to_rfc3339(),
|
now.to_rfc3339(),
|
||||||
status,
|
status,
|
||||||
output,
|
bounded_output,
|
||||||
job.id
|
job.id
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
@ -292,6 +302,7 @@ pub fn record_run(
|
||||||
output: Option<&str>,
|
output: Option<&str>,
|
||||||
duration_ms: i64,
|
duration_ms: i64,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
let bounded_output = output.map(truncate_cron_output);
|
||||||
with_connection(config, |conn| {
|
with_connection(config, |conn| {
|
||||||
conn.execute(
|
conn.execute(
|
||||||
"INSERT INTO cron_runs (job_id, started_at, finished_at, status, output, duration_ms)
|
"INSERT INTO cron_runs (job_id, started_at, finished_at, status, output, duration_ms)
|
||||||
|
|
@ -301,7 +312,7 @@ pub fn record_run(
|
||||||
started_at.to_rfc3339(),
|
started_at.to_rfc3339(),
|
||||||
finished_at.to_rfc3339(),
|
finished_at.to_rfc3339(),
|
||||||
status,
|
status,
|
||||||
output,
|
bounded_output.as_deref(),
|
||||||
duration_ms,
|
duration_ms,
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
@ -324,6 +335,25 @@ pub fn record_run(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn truncate_cron_output(output: &str) -> String {
|
||||||
|
if output.len() <= MAX_CRON_OUTPUT_BYTES {
|
||||||
|
return output.to_string();
|
||||||
|
}
|
||||||
|
|
||||||
|
if MAX_CRON_OUTPUT_BYTES <= TRUNCATED_OUTPUT_MARKER.len() {
|
||||||
|
return TRUNCATED_OUTPUT_MARKER.to_string();
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut cutoff = MAX_CRON_OUTPUT_BYTES - TRUNCATED_OUTPUT_MARKER.len();
|
||||||
|
while cutoff > 0 && !output.is_char_boundary(cutoff) {
|
||||||
|
cutoff -= 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut truncated = output[..cutoff].to_string();
|
||||||
|
truncated.push_str(TRUNCATED_OUTPUT_MARKER);
|
||||||
|
truncated
|
||||||
|
}
|
||||||
|
|
||||||
pub fn list_runs(config: &Config, job_id: &str, limit: usize) -> Result<Vec<CronRun>> {
|
pub fn list_runs(config: &Config, job_id: &str, limit: usize) -> Result<Vec<CronRun>> {
|
||||||
with_connection(config, |conn| {
|
with_connection(config, |conn| {
|
||||||
let lim = i64::try_from(limit.max(1)).context("Run history limit overflow")?;
|
let lim = i64::try_from(limit.max(1)).context("Run history limit overflow")?;
|
||||||
|
|
@ -594,6 +624,21 @@ mod tests {
|
||||||
assert!(due_after_disable.is_empty());
|
assert!(due_after_disable.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn due_jobs_respects_scheduler_max_tasks_limit() {
|
||||||
|
let tmp = TempDir::new().unwrap();
|
||||||
|
let mut config = test_config(&tmp);
|
||||||
|
config.scheduler.max_tasks = 2;
|
||||||
|
|
||||||
|
let _ = add_job(&config, "* * * * *", "echo due-1").unwrap();
|
||||||
|
let _ = add_job(&config, "* * * * *", "echo due-2").unwrap();
|
||||||
|
let _ = add_job(&config, "* * * * *", "echo due-3").unwrap();
|
||||||
|
|
||||||
|
let far_future = Utc::now() + ChronoDuration::days(365);
|
||||||
|
let due = due_jobs(&config, far_future).unwrap();
|
||||||
|
assert_eq!(due.len(), 2);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn reschedule_after_run_persists_last_status_and_last_run() {
|
fn reschedule_after_run_persists_last_status_and_last_run() {
|
||||||
let tmp = TempDir::new().unwrap();
|
let tmp = TempDir::new().unwrap();
|
||||||
|
|
@ -677,4 +722,43 @@ mod tests {
|
||||||
let runs = list_runs(&config, &job.id, 10).unwrap();
|
let runs = list_runs(&config, &job.id, 10).unwrap();
|
||||||
assert!(runs.is_empty());
|
assert!(runs.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn record_run_truncates_large_output() {
|
||||||
|
let tmp = TempDir::new().unwrap();
|
||||||
|
let config = test_config(&tmp);
|
||||||
|
let job = add_job(&config, "*/5 * * * *", "echo trunc").unwrap();
|
||||||
|
let output = "x".repeat(MAX_CRON_OUTPUT_BYTES + 512);
|
||||||
|
|
||||||
|
record_run(
|
||||||
|
&config,
|
||||||
|
&job.id,
|
||||||
|
Utc::now(),
|
||||||
|
Utc::now(),
|
||||||
|
"ok",
|
||||||
|
Some(&output),
|
||||||
|
1,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let runs = list_runs(&config, &job.id, 1).unwrap();
|
||||||
|
let stored = runs[0].output.as_deref().unwrap_or_default();
|
||||||
|
assert!(stored.ends_with(TRUNCATED_OUTPUT_MARKER));
|
||||||
|
assert!(stored.len() <= MAX_CRON_OUTPUT_BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn reschedule_after_run_truncates_last_output() {
|
||||||
|
let tmp = TempDir::new().unwrap();
|
||||||
|
let config = test_config(&tmp);
|
||||||
|
let job = add_job(&config, "*/5 * * * *", "echo trunc").unwrap();
|
||||||
|
let output = "y".repeat(MAX_CRON_OUTPUT_BYTES + 1024);
|
||||||
|
|
||||||
|
reschedule_after_run(&config, &job, false, &output).unwrap();
|
||||||
|
|
||||||
|
let stored = get_job(&config, &job.id).unwrap();
|
||||||
|
let last_output = stored.last_output.as_deref().unwrap_or_default();
|
||||||
|
assert!(last_output.ends_with(TRUNCATED_OUTPUT_MARKER));
|
||||||
|
assert!(last_output.len() <= MAX_CRON_OUTPUT_BYTES);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue