From d7cca4b150705c6e22d6c2ea9425688cc6b5cbdd Mon Sep 17 00:00:00 2001 From: Chummy Date: Mon, 16 Feb 2026 23:38:29 +0800 Subject: [PATCH] feat: unify scheduled tasks from #337 and #338 with security-first integration Unifies scheduled task capabilities and consolidates overlapping implementations from #337 and #338 into a single security-first integration path. Co-authored-by: Edvard Co-authored-by: stawky --- src/agent/loop_.rs | 5 + src/channels/mod.rs | 5 + src/config/mod.rs | 4 +- src/config/schema.rs | 43 ++++ src/cron/mod.rs | 420 +++++++++++++++++++++++++++------ src/cron/scheduler.rs | 13 +- src/gateway/mod.rs | 1 + src/lib.rs | 17 ++ src/main.rs | 17 ++ src/onboard/wizard.rs | 2 + src/tools/mod.rs | 25 +- src/tools/schedule.rs | 522 ++++++++++++++++++++++++++++++++++++++++++ 12 files changed, 1006 insertions(+), 68 deletions(-) create mode 100644 src/tools/schedule.rs diff --git a/src/agent/loop_.rs b/src/agent/loop_.rs index a8368c6..2558bfa 100644 --- a/src/agent/loop_.rs +++ b/src/agent/loop_.rs @@ -598,6 +598,7 @@ pub async fn run( &config.workspace_dir, &config.agents, config.api_key.as_deref(), + &config, ); // ── Resolve provider ───────────────────────────────────────── @@ -672,6 +673,10 @@ pub async fn run( "Execute actions on 1000+ apps via Composio (Gmail, Notion, GitHub, Slack, etc.). Use action='list' to discover, 'execute' to run, 'connect' to OAuth.", )); } + tool_descs.push(( + "schedule", + "Manage scheduled tasks (create/list/get/cancel/pause/resume). Supports recurring cron and one-shot delays.", + )); if !config.agents.is_empty() { tool_descs.push(( "delegate", diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 1acc502..21f99d0 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -730,6 +730,7 @@ pub async fn start_channels(config: Config) -> Result<()> { &config.workspace_dir, &config.agents, config.api_key.as_deref(), + &config, )); // Build system prompt from workspace identity files + skills @@ -776,6 +777,10 @@ pub async fn start_channels(config: Config) -> Result<()> { "Execute actions on 1000+ apps via Composio (Gmail, Notion, GitHub, Slack, etc.). Use action='list' to discover, 'execute' to run, 'connect' to OAuth.", )); } + tool_descs.push(( + "schedule", + "Manage scheduled tasks (create/list/get/cancel/pause/resume). Supports recurring cron and one-shot delays.", + )); if !config.agents.is_empty() { tool_descs.push(( "delegate", diff --git a/src/config/mod.rs b/src/config/mod.rs index d8980c0..a61c29c 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -6,8 +6,8 @@ pub use schema::{ DelegateAgentConfig, DiscordConfig, DockerRuntimeConfig, GatewayConfig, HeartbeatConfig, HttpRequestConfig, IMessageConfig, IdentityConfig, LarkConfig, MatrixConfig, MemoryConfig, ModelRouteConfig, ObservabilityConfig, ReliabilityConfig, ResourceLimitsConfig, RuntimeConfig, - SandboxBackend, SandboxConfig, SecretsConfig, SecurityConfig, SlackConfig, TelegramConfig, - TunnelConfig, WebhookConfig, + SandboxBackend, SandboxConfig, SchedulerConfig, SecretsConfig, SecurityConfig, SlackConfig, + TelegramConfig, TunnelConfig, WebhookConfig, }; #[cfg(test)] diff --git a/src/config/schema.rs b/src/config/schema.rs index bc27e4e..8d2ec55 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -34,6 +34,9 @@ pub struct Config { #[serde(default)] pub reliability: ReliabilityConfig, + #[serde(default)] + pub scheduler: SchedulerConfig, + /// Model routing rules — route `hint:` to specific provider+model combos. #[serde(default)] pub model_routes: Vec, @@ -697,6 +700,43 @@ impl Default for ReliabilityConfig { } } +// ── Scheduler ──────────────────────────────────────────────────── + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SchedulerConfig { + /// Enable the built-in scheduler loop. + #[serde(default = "default_scheduler_enabled")] + pub enabled: bool, + /// Maximum number of persisted scheduled tasks. + #[serde(default = "default_scheduler_max_tasks")] + pub max_tasks: usize, + /// Maximum tasks executed per scheduler polling cycle. + #[serde(default = "default_scheduler_max_concurrent")] + pub max_concurrent: usize, +} + +fn default_scheduler_enabled() -> bool { + true +} + +fn default_scheduler_max_tasks() -> usize { + 64 +} + +fn default_scheduler_max_concurrent() -> usize { + 4 +} + +impl Default for SchedulerConfig { + fn default() -> Self { + Self { + enabled: default_scheduler_enabled(), + max_tasks: default_scheduler_max_tasks(), + max_concurrent: default_scheduler_max_concurrent(), + } + } +} + // ── Model routing ──────────────────────────────────────────────── /// Route a task hint to a specific provider + model. @@ -1148,6 +1188,7 @@ impl Default for Config { autonomy: AutonomyConfig::default(), runtime: RuntimeConfig::default(), reliability: ReliabilityConfig::default(), + scheduler: SchedulerConfig::default(), model_routes: Vec::new(), heartbeat: HeartbeatConfig::default(), channels_config: ChannelsConfig::default(), @@ -1485,6 +1526,7 @@ mod tests { ..RuntimeConfig::default() }, reliability: ReliabilityConfig::default(), + scheduler: SchedulerConfig::default(), model_routes: Vec::new(), heartbeat: HeartbeatConfig { enabled: true, @@ -1578,6 +1620,7 @@ default_temperature = 0.7 autonomy: AutonomyConfig::default(), runtime: RuntimeConfig::default(), reliability: ReliabilityConfig::default(), + scheduler: SchedulerConfig::default(), model_routes: Vec::new(), heartbeat: HeartbeatConfig::default(), channels_config: ChannelsConfig::default(), diff --git a/src/cron/mod.rs b/src/cron/mod.rs index 444445f..4fe0c39 100644 --- a/src/cron/mod.rs +++ b/src/cron/mod.rs @@ -16,6 +16,8 @@ pub struct CronJob { pub next_run: DateTime, pub last_run: Option>, pub last_status: Option, + pub paused: bool, + pub one_shot: bool, } #[allow(clippy::needless_pass_by_value)] @@ -27,6 +29,7 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<( println!("No scheduled tasks yet."); println!("\nUsage:"); println!(" zeroclaw cron add '0 9 * * *' 'agent -m \"Good morning!\"'"); + println!(" zeroclaw cron once 30m 'echo reminder'"); return Ok(()); } @@ -36,13 +39,20 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<( .last_run .map_or_else(|| "never".into(), |d| d.to_rfc3339()); let last_status = job.last_status.unwrap_or_else(|| "n/a".into()); + let flags = match (job.paused, job.one_shot) { + (true, true) => " [paused, one-shot]", + (true, false) => " [paused]", + (false, true) => " [one-shot]", + (false, false) => "", + }; println!( - "- {} | {} | next={} | last={} ({})\n cmd: {}", + "- {} | {} | next={} | last={} ({}){}\n cmd: {}", job.id, job.expression, job.next_run.to_rfc3339(), last_run, last_status, + flags, job.command ); } @@ -59,19 +69,41 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<( println!(" Cmd : {}", job.command); Ok(()) } - crate::CronCommands::Remove { id } => remove_job(config, &id), + crate::CronCommands::Once { delay, command } => { + let job = add_once(config, &delay, &command)?; + println!("✅ Added one-shot task {}", job.id); + println!(" Runs at: {}", job.next_run.to_rfc3339()); + println!(" Cmd : {}", job.command); + Ok(()) + } + crate::CronCommands::Remove { id } => { + remove_job(config, &id)?; + println!("✅ Removed cron job {id}"); + Ok(()) + } + crate::CronCommands::Pause { id } => { + pause_job(config, &id)?; + println!("⏸️ Paused job {id}"); + Ok(()) + } + crate::CronCommands::Resume { id } => { + resume_job(config, &id)?; + println!("▶️ Resumed job {id}"); + Ok(()) + } } } pub fn add_job(config: &Config, expression: &str, command: &str) -> Result { + check_max_tasks(config)?; let now = Utc::now(); let next_run = next_run_for(expression, now)?; let id = Uuid::new_v4().to_string(); with_connection(config, |conn| { conn.execute( - "INSERT INTO cron_jobs (id, expression, command, created_at, next_run) - VALUES (?1, ?2, ?3, ?4, ?5)", + "INSERT INTO cron_jobs (id, expression, command, created_at, next_run, paused, one_shot) + VALUES (?1, ?2, ?3, ?4, ?5, 0, 0)", params![ id, expression, @@ -91,43 +123,169 @@ pub fn add_job(config: &Config, expression: &str, command: &str) -> Result, command: &str) -> Result { + add_one_shot_job_with_expression(config, run_at, command, "@once".to_string()) +} + +pub fn add_once(config: &Config, delay: &str, command: &str) -> Result { + let duration = parse_duration(delay)?; + let run_at = Utc::now() + duration; + add_one_shot_job_with_expression(config, run_at, command, format!("@once:{delay}")) +} + +pub fn add_once_at(config: &Config, at: DateTime, command: &str) -> Result { + add_one_shot_job_with_expression(config, at, command, format!("@at:{}", at.to_rfc3339())) +} + +fn add_one_shot_job_with_expression( + config: &Config, + run_at: DateTime, + command: &str, + expression: String, +) -> Result { + check_max_tasks(config)?; + let now = Utc::now(); + if run_at <= now { + anyhow::bail!("Scheduled time must be in the future"); + } + + let id = Uuid::new_v4().to_string(); + + with_connection(config, |conn| { + conn.execute( + "INSERT INTO cron_jobs (id, expression, command, created_at, next_run, paused, one_shot) + VALUES (?1, ?2, ?3, ?4, ?5, 0, 1)", + params![id, expression, command, now.to_rfc3339(), run_at.to_rfc3339()], + ) + .context("Failed to insert one-shot task")?; + Ok(()) + })?; + + Ok(CronJob { + id, + expression, + command: command.to_string(), + next_run: run_at, + last_run: None, + last_status: None, + paused: false, + one_shot: true, + }) +} + +pub fn get_job(config: &Config, id: &str) -> Result> { + with_connection(config, |conn| { + let mut stmt = conn.prepare( + "SELECT id, expression, command, next_run, last_run, last_status, paused, one_shot + FROM cron_jobs WHERE id = ?1", + )?; + + let mut rows = stmt.query_map(params![id], |row| Ok(parse_job_row(row)))?; + + match rows.next() { + Some(Ok(job_result)) => Ok(Some(job_result?)), + Some(Err(e)) => Err(e.into()), + None => Ok(None), + } + }) +} + +pub fn pause_job(config: &Config, id: &str) -> Result<()> { + let changed = with_connection(config, |conn| { + conn.execute("UPDATE cron_jobs SET paused = 1 WHERE id = ?1", params![id]) + .context("Failed to pause cron job") + })?; + + if changed == 0 { + anyhow::bail!("Cron job '{id}' not found"); + } + + Ok(()) +} + +pub fn resume_job(config: &Config, id: &str) -> Result<()> { + let changed = with_connection(config, |conn| { + conn.execute("UPDATE cron_jobs SET paused = 0 WHERE id = ?1", params![id]) + .context("Failed to resume cron job") + })?; + + if changed == 0 { + anyhow::bail!("Cron job '{id}' not found"); + } + + Ok(()) +} + +fn check_max_tasks(config: &Config) -> Result<()> { + let count = with_connection(config, |conn| { + let mut stmt = conn.prepare("SELECT COUNT(*) FROM cron_jobs")?; + let count: i64 = stmt.query_row([], |row| row.get(0))?; + usize::try_from(count).context("Unexpected negative task count") + })?; + + if count >= config.scheduler.max_tasks { + anyhow::bail!( + "Maximum number of scheduled tasks ({}) reached", + config.scheduler.max_tasks + ); + } + + Ok(()) +} + +fn parse_duration(input: &str) -> Result { + let input = input.trim(); + if input.is_empty() { + anyhow::bail!("Empty delay string"); + } + + let (num_str, unit) = if input.ends_with(|c: char| c.is_ascii_alphabetic()) { + let split = input.len() - 1; + (&input[..split], &input[split..]) + } else { + (input, "m") + }; + + let n: u64 = num_str + .trim() + .parse() + .with_context(|| format!("Invalid duration number: {num_str}"))?; + + let multiplier: u64 = match unit { + "s" => 1, + "m" => 60, + "h" => 3600, + "d" => 86400, + "w" => 604_800, + _ => anyhow::bail!("Unknown duration unit '{unit}', expected s/m/h/d/w"), + }; + + let secs = n + .checked_mul(multiplier) + .filter(|&s| i64::try_from(s).is_ok()) + .ok_or_else(|| anyhow::anyhow!("Duration value too large: {input}"))?; + + #[allow(clippy::cast_possible_wrap)] + Ok(chrono::Duration::seconds(secs as i64)) +} + pub fn list_jobs(config: &Config) -> Result> { with_connection(config, |conn| { let mut stmt = conn.prepare( - "SELECT id, expression, command, next_run, last_run, last_status + "SELECT id, expression, command, next_run, last_run, last_status, paused, one_shot FROM cron_jobs ORDER BY next_run ASC", )?; - let rows = stmt.query_map([], |row| { - let next_run_raw: String = row.get(3)?; - let last_run_raw: Option = row.get(4)?; - Ok(( - row.get::<_, String>(0)?, - row.get::<_, String>(1)?, - row.get::<_, String>(2)?, - next_run_raw, - last_run_raw, - row.get::<_, Option>(5)?, - )) - })?; + let rows = stmt.query_map([], |row| Ok(parse_job_row(row)))?; let mut jobs = Vec::new(); for row in rows { - let (id, expression, command, next_run_raw, last_run_raw, last_status) = row?; - jobs.push(CronJob { - id, - expression, - command, - next_run: parse_rfc3339(&next_run_raw)?, - last_run: match last_run_raw { - Some(raw) => Some(parse_rfc3339(&raw)?), - None => None, - }, - last_status, - }); + jobs.push(row??); } Ok(jobs) }) @@ -143,44 +301,21 @@ pub fn remove_job(config: &Config, id: &str) -> Result<()> { anyhow::bail!("Cron job '{id}' not found"); } - println!("✅ Removed cron job {id}"); Ok(()) } pub fn due_jobs(config: &Config, now: DateTime) -> Result> { with_connection(config, |conn| { let mut stmt = conn.prepare( - "SELECT id, expression, command, next_run, last_run, last_status - FROM cron_jobs WHERE next_run <= ?1 ORDER BY next_run ASC", + "SELECT id, expression, command, next_run, last_run, last_status, paused, one_shot + FROM cron_jobs WHERE next_run <= ?1 AND paused = 0 ORDER BY next_run ASC", )?; - let rows = stmt.query_map(params![now.to_rfc3339()], |row| { - let next_run_raw: String = row.get(3)?; - let last_run_raw: Option = row.get(4)?; - Ok(( - row.get::<_, String>(0)?, - row.get::<_, String>(1)?, - row.get::<_, String>(2)?, - next_run_raw, - last_run_raw, - row.get::<_, Option>(5)?, - )) - })?; + let rows = stmt.query_map(params![now.to_rfc3339()], |row| Ok(parse_job_row(row)))?; let mut jobs = Vec::new(); for row in rows { - let (id, expression, command, next_run_raw, last_run_raw, last_status) = row?; - jobs.push(CronJob { - id, - expression, - command, - next_run: parse_rfc3339(&next_run_raw)?, - last_run: match last_run_raw { - Some(raw) => Some(parse_rfc3339(&raw)?), - None => None, - }, - last_status, - }); + jobs.push(row??); } Ok(jobs) }) @@ -192,6 +327,15 @@ pub fn reschedule_after_run( success: bool, output: &str, ) -> Result<()> { + if job.one_shot { + with_connection(config, |conn| { + conn.execute("DELETE FROM cron_jobs WHERE id = ?1", params![job.id]) + .context("Failed to remove one-shot task after execution")?; + Ok(()) + })?; + return Ok(()); + } + let now = Utc::now(); let next_run = next_run_for(&job.expression, now)?; let status = if success { "ok" } else { "error" }; @@ -229,9 +373,7 @@ fn normalize_expression(expression: &str) -> Result { let field_count = expression.split_whitespace().count(); match field_count { - // standard crontab syntax: minute hour day month weekday 5 => Ok(format!("0 {expression}")), - // crate-native syntax includes seconds (+ optional year) 6 | 7 => Ok(expression.to_string()), _ => anyhow::bail!( "Invalid cron expression: {expression} (expected 5, 6, or 7 fields, got {field_count})" @@ -239,6 +381,31 @@ fn normalize_expression(expression: &str) -> Result { } } +fn parse_job_row(row: &rusqlite::Row<'_>) -> Result { + let id: String = row.get(0)?; + let expression: String = row.get(1)?; + let command: String = row.get(2)?; + let next_run_raw: String = row.get(3)?; + let last_run_raw: Option = row.get(4)?; + let last_status: Option = row.get(5)?; + let paused: bool = row.get(6)?; + let one_shot: bool = row.get(7)?; + + Ok(CronJob { + id, + expression, + command, + next_run: parse_rfc3339(&next_run_raw)?, + last_run: match last_run_raw { + Some(raw) => Some(parse_rfc3339(&raw)?), + None => None, + }, + last_status, + paused, + one_shot, + }) +} + fn parse_rfc3339(raw: &str) -> Result> { let parsed = DateTime::parse_from_rfc3339(raw) .with_context(|| format!("Invalid RFC3339 timestamp in cron DB: {raw}"))?; @@ -255,7 +422,6 @@ fn with_connection(config: &Config, f: impl FnOnce(&Connection) -> Result) let conn = Connection::open(&db_path) .with_context(|| format!("Failed to open cron DB: {}", db_path.display()))?; - // ── Production-grade PRAGMA tuning ────────────────────── conn.execute_batch( "PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL; @@ -274,12 +440,19 @@ fn with_connection(config: &Config, f: impl FnOnce(&Connection) -> Result) next_run TEXT NOT NULL, last_run TEXT, last_status TEXT, - last_output TEXT + last_output TEXT, + paused INTEGER NOT NULL DEFAULT 0, + one_shot INTEGER NOT NULL DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_cron_jobs_next_run ON cron_jobs(next_run);", ) .context("Failed to initialize cron schema")?; + for column in ["paused", "one_shot"] { + let alter = format!("ALTER TABLE cron_jobs ADD COLUMN {column} INTEGER NOT NULL DEFAULT 0"); + let _ = conn.execute_batch(&alter); + } + f(&conn) } @@ -309,6 +482,8 @@ mod tests { assert_eq!(job.expression, "*/5 * * * *"); assert_eq!(job.command, "echo ok"); + assert!(!job.one_shot); + assert!(!job.paused); } #[test] @@ -335,18 +510,72 @@ mod tests { } #[test] - fn due_jobs_filters_by_timestamp() { + fn add_once_creates_one_shot_job() { let tmp = TempDir::new().unwrap(); let config = test_config(&tmp); - let _job = add_job(&config, "* * * * *", "echo due").unwrap(); + let job = add_once(&config, "30m", "echo once").unwrap(); + assert!(job.one_shot); + assert!(job.expression.starts_with("@once:")); + + let fetched = get_job(&config, &job.id).unwrap().unwrap(); + assert!(fetched.one_shot); + assert!(!fetched.paused); + } + + #[test] + fn add_once_at_rejects_past_timestamp() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let run_at = Utc::now() - ChronoDuration::minutes(1); + let err = add_once_at(&config, run_at, "echo past").unwrap_err(); + assert!(err.to_string().contains("future")); + } + + #[test] + fn get_job_found_and_missing() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let job = add_job(&config, "*/5 * * * *", "echo found").unwrap(); + let found = get_job(&config, &job.id).unwrap(); + assert!(found.is_some()); + assert_eq!(found.unwrap().id, job.id); + + let missing = get_job(&config, "nonexistent").unwrap(); + assert!(missing.is_none()); + } + + #[test] + fn pause_resume_roundtrip() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let job = add_job(&config, "*/5 * * * *", "echo pause").unwrap(); + pause_job(&config, &job.id).unwrap(); + assert!(get_job(&config, &job.id).unwrap().unwrap().paused); + + resume_job(&config, &job.id).unwrap(); + assert!(!get_job(&config, &job.id).unwrap().unwrap().paused); + } + + #[test] + fn due_jobs_filters_by_timestamp_and_skips_paused() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let active = add_job(&config, "* * * * *", "echo due").unwrap(); + let paused = add_job(&config, "* * * * *", "echo paused").unwrap(); + pause_job(&config, &paused.id).unwrap(); let due_now = due_jobs(&config, Utc::now()).unwrap(); - assert!(due_now.is_empty(), "new job should not be due immediately"); + assert!(due_now.is_empty(), "new jobs should not be due immediately"); let far_future = Utc::now() + ChronoDuration::days(365); let due_future = due_jobs(&config, far_future).unwrap(); - assert_eq!(due_future.len(), 1, "job should be due in far future"); + assert_eq!(due_future.len(), 1); + assert_eq!(due_future[0].id, active.id); } #[test] @@ -362,4 +591,67 @@ mod tests { assert_eq!(stored.last_status.as_deref(), Some("error")); assert!(stored.last_run.is_some()); } + + #[test] + fn reschedule_after_run_removes_one_shot_jobs() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let run_at = Utc::now() + ChronoDuration::minutes(1); + let job = add_one_shot_job(&config, run_at, "echo once").unwrap(); + reschedule_after_run(&config, &job, true, "ok").unwrap(); + + assert!(get_job(&config, &job.id).unwrap().is_none()); + } + + #[test] + fn scheduler_columns_migrate_from_old_schema() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let db_path = config.workspace_dir.join("cron").join("jobs.db"); + std::fs::create_dir_all(db_path.parent().unwrap()).unwrap(); + + { + let conn = rusqlite::Connection::open(&db_path).unwrap(); + conn.execute_batch( + "CREATE TABLE cron_jobs ( + id TEXT PRIMARY KEY, + expression TEXT NOT NULL, + command TEXT NOT NULL, + created_at TEXT NOT NULL, + next_run TEXT NOT NULL, + last_run TEXT, + last_status TEXT, + last_output TEXT + );", + ) + .unwrap(); + conn.execute( + "INSERT INTO cron_jobs (id, expression, command, created_at, next_run) + VALUES ('old-job', '* * * * *', 'echo old', '2025-01-01T00:00:00Z', '2030-01-01T00:00:00Z')", + [], + ) + .unwrap(); + } + + let jobs = list_jobs(&config).unwrap(); + assert_eq!(jobs.len(), 1); + assert_eq!(jobs[0].id, "old-job"); + assert!(!jobs[0].paused); + assert!(!jobs[0].one_shot); + } + + #[test] + fn max_tasks_limit_is_enforced() { + let tmp = TempDir::new().unwrap(); + let mut config = test_config(&tmp); + config.scheduler.max_tasks = 1; + + let _first = add_job(&config, "*/10 * * * *", "echo first").unwrap(); + let err = add_job(&config, "*/11 * * * *", "echo second").unwrap_err(); + assert!(err + .to_string() + .contains("Maximum number of scheduled tasks")); + } } diff --git a/src/cron/scheduler.rs b/src/cron/scheduler.rs index bab1965..bdb5f0b 100644 --- a/src/cron/scheduler.rs +++ b/src/cron/scheduler.rs @@ -9,9 +9,18 @@ use tokio::time::{self, Duration}; const MIN_POLL_SECONDS: u64 = 5; pub async fn run(config: Config) -> Result<()> { + if !config.scheduler.enabled { + tracing::info!("Scheduler disabled by config"); + crate::health::mark_component_ok("scheduler"); + loop { + time::sleep(Duration::from_secs(3600)).await; + } + } + let poll_secs = config.reliability.scheduler_poll_secs.max(MIN_POLL_SECONDS); let mut interval = time::interval(Duration::from_secs(poll_secs)); let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); + let max_concurrent = config.scheduler.max_concurrent.max(1); crate::health::mark_component_ok("scheduler"); @@ -27,7 +36,7 @@ pub async fn run(config: Config) -> Result<()> { } }; - for job in jobs { + for job in jobs.into_iter().take(max_concurrent) { crate::health::mark_component_ok("scheduler"); let (success, output) = execute_job_with_retry(&config, &security, &job).await; @@ -224,6 +233,8 @@ mod tests { next_run: Utc::now(), last_run: None, last_status: None, + paused: false, + one_shot: false, } } diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 8eaa57c..104d4de 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -267,6 +267,7 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> { &config.workspace_dir, &config.agents, config.api_key.as_deref(), + &config, )); let skills = crate::skills::load_skills(&config.workspace_dir); let tool_descs: Vec<(&str, &str)> = tools_registry diff --git a/src/lib.rs b/src/lib.rs index 619190b..61a2bc6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -147,11 +147,28 @@ pub enum CronCommands { /// Command to run command: String, }, + /// Add a one-shot delayed task (e.g. "30m", "2h", "1d") + Once { + /// Delay duration + delay: String, + /// Command to run + command: String, + }, /// Remove a scheduled task Remove { /// Task ID id: String, }, + /// Pause a scheduled task + Pause { + /// Task ID + id: String, + }, + /// Resume a paused task + Resume { + /// Task ID + id: String, + }, } /// Integration subcommands diff --git a/src/main.rs b/src/main.rs index 426fdfd..3253594 100644 --- a/src/main.rs +++ b/src/main.rs @@ -234,11 +234,28 @@ enum CronCommands { /// Command to run command: String, }, + /// Add a one-shot delayed task (e.g. "30m", "2h", "1d") + Once { + /// Delay duration + delay: String, + /// Command to run + command: String, + }, /// Remove a scheduled task Remove { /// Task ID id: String, }, + /// Pause a scheduled task + Pause { + /// Task ID + id: String, + }, + /// Resume a paused task + Resume { + /// Task ID + id: String, + }, } #[derive(Subcommand, Debug)] diff --git a/src/onboard/wizard.rs b/src/onboard/wizard.rs index 0447d23..7fbcc44 100644 --- a/src/onboard/wizard.rs +++ b/src/onboard/wizard.rs @@ -110,6 +110,7 @@ pub fn run_wizard() -> Result { autonomy: AutonomyConfig::default(), runtime: RuntimeConfig::default(), reliability: crate::config::ReliabilityConfig::default(), + scheduler: crate::config::SchedulerConfig::default(), model_routes: Vec::new(), heartbeat: HeartbeatConfig::default(), channels_config, @@ -305,6 +306,7 @@ pub fn run_quick_setup( autonomy: AutonomyConfig::default(), runtime: RuntimeConfig::default(), reliability: crate::config::ReliabilityConfig::default(), + scheduler: crate::config::SchedulerConfig::default(), model_routes: Vec::new(), heartbeat: HeartbeatConfig::default(), channels_config: ChannelsConfig::default(), diff --git a/src/tools/mod.rs b/src/tools/mod.rs index 22e8d1a..b5cd67a 100644 --- a/src/tools/mod.rs +++ b/src/tools/mod.rs @@ -10,6 +10,7 @@ pub mod image_info; pub mod memory_forget; pub mod memory_recall; pub mod memory_store; +pub mod schedule; pub mod screenshot; pub mod shell; pub mod traits; @@ -26,6 +27,7 @@ pub use image_info::ImageInfoTool; pub use memory_forget::MemoryForgetTool; pub use memory_recall::MemoryRecallTool; pub use memory_store::MemoryStoreTool; +pub use schedule::ScheduleTool; pub use screenshot::ScreenshotTool; pub use shell::ShellTool; pub use traits::Tool; @@ -67,6 +69,7 @@ pub fn all_tools( workspace_dir: &std::path::Path, agents: &HashMap, fallback_api_key: Option<&str>, + config: &crate::config::Config, ) -> Vec> { all_tools_with_runtime( security, @@ -78,6 +81,7 @@ pub fn all_tools( workspace_dir, agents, fallback_api_key, + config, ) } @@ -93,6 +97,7 @@ pub fn all_tools_with_runtime( workspace_dir: &std::path::Path, agents: &HashMap, fallback_api_key: Option<&str>, + config: &crate::config::Config, ) -> Vec> { let mut tools: Vec> = vec![ Box::new(ShellTool::new(security.clone(), runtime)), @@ -101,6 +106,7 @@ pub fn all_tools_with_runtime( Box::new(MemoryStoreTool::new(memory.clone())), Box::new(MemoryRecallTool::new(memory.clone())), Box::new(MemoryForgetTool::new(memory)), + Box::new(ScheduleTool::new(security.clone(), config.clone())), Box::new(GitOperationsTool::new( security.clone(), workspace_dir.to_path_buf(), @@ -158,9 +164,17 @@ pub fn all_tools_with_runtime( #[cfg(test)] mod tests { use super::*; - use crate::config::{BrowserConfig, MemoryConfig}; + use crate::config::{BrowserConfig, Config, MemoryConfig}; use tempfile::TempDir; + fn test_config(tmp: &TempDir) -> Config { + Config { + workspace_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + ..Config::default() + } + } + #[test] fn default_tools_has_three() { let security = Arc::new(SecurityPolicy::default()); @@ -186,6 +200,7 @@ mod tests { ..BrowserConfig::default() }; let http = crate::config::HttpRequestConfig::default(); + let cfg = test_config(&tmp); let tools = all_tools( &security, @@ -196,9 +211,11 @@ mod tests { tmp.path(), &HashMap::new(), None, + &cfg, ); let names: Vec<&str> = tools.iter().map(|t| t.name()).collect(); assert!(!names.contains(&"browser_open")); + assert!(names.contains(&"schedule")); } #[test] @@ -219,6 +236,7 @@ mod tests { ..BrowserConfig::default() }; let http = crate::config::HttpRequestConfig::default(); + let cfg = test_config(&tmp); let tools = all_tools( &security, @@ -229,6 +247,7 @@ mod tests { tmp.path(), &HashMap::new(), None, + &cfg, ); let names: Vec<&str> = tools.iter().map(|t| t.name()).collect(); assert!(names.contains(&"browser_open")); @@ -341,6 +360,7 @@ mod tests { let browser = BrowserConfig::default(); let http = crate::config::HttpRequestConfig::default(); + let cfg = test_config(&tmp); let mut agents = HashMap::new(); agents.insert( @@ -364,6 +384,7 @@ mod tests { tmp.path(), &agents, Some("sk-test"), + &cfg, ); let names: Vec<&str> = tools.iter().map(|t| t.name()).collect(); assert!(names.contains(&"delegate")); @@ -382,6 +403,7 @@ mod tests { let browser = BrowserConfig::default(); let http = crate::config::HttpRequestConfig::default(); + let cfg = test_config(&tmp); let tools = all_tools( &security, @@ -392,6 +414,7 @@ mod tests { tmp.path(), &HashMap::new(), None, + &cfg, ); let names: Vec<&str> = tools.iter().map(|t| t.name()).collect(); assert!(!names.contains(&"delegate")); diff --git a/src/tools/schedule.rs b/src/tools/schedule.rs new file mode 100644 index 0000000..43234b8 --- /dev/null +++ b/src/tools/schedule.rs @@ -0,0 +1,522 @@ +use super::traits::{Tool, ToolResult}; +use crate::config::Config; +use crate::cron; +use crate::security::SecurityPolicy; +use anyhow::Result; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use serde_json::json; +use std::sync::Arc; + +/// Tool that lets the agent manage recurring and one-shot scheduled tasks. +pub struct ScheduleTool { + security: Arc, + config: Config, +} + +impl ScheduleTool { + pub fn new(security: Arc, config: Config) -> Self { + Self { security, config } + } +} + +#[async_trait] +impl Tool for ScheduleTool { + fn name(&self) -> &str { + "schedule" + } + + fn description(&self) -> &str { + "Manage scheduled tasks. Actions: create/add/once/list/get/cancel/remove/pause/resume" + } + + fn parameters_schema(&self) -> serde_json::Value { + json!({ + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": ["create", "add", "once", "list", "get", "cancel", "remove", "pause", "resume"], + "description": "Action to perform" + }, + "expression": { + "type": "string", + "description": "Cron expression for recurring tasks (e.g. '*/5 * * * *')." + }, + "delay": { + "type": "string", + "description": "Delay for one-shot tasks (e.g. '30m', '2h', '1d')." + }, + "run_at": { + "type": "string", + "description": "Absolute RFC3339 time for one-shot tasks (e.g. '2030-01-01T00:00:00Z')." + }, + "command": { + "type": "string", + "description": "Shell command to execute. Required for create/add/once." + }, + "id": { + "type": "string", + "description": "Task ID. Required for get/cancel/remove/pause/resume." + } + }, + "required": ["action"] + }) + } + + async fn execute(&self, args: serde_json::Value) -> Result { + let action = args + .get("action") + .and_then(|value| value.as_str()) + .ok_or_else(|| anyhow::anyhow!("Missing 'action' parameter"))?; + + match action { + "list" => self.handle_list(), + "get" => { + let id = args + .get("id") + .and_then(|value| value.as_str()) + .ok_or_else(|| anyhow::anyhow!("Missing 'id' parameter for get action"))?; + self.handle_get(id) + } + "create" | "add" | "once" => { + if let Some(blocked) = self.enforce_mutation_allowed(action) { + return Ok(blocked); + } + self.handle_create_like(action, &args) + } + "cancel" | "remove" => { + if let Some(blocked) = self.enforce_mutation_allowed(action) { + return Ok(blocked); + } + let id = args + .get("id") + .and_then(|value| value.as_str()) + .ok_or_else(|| anyhow::anyhow!("Missing 'id' parameter for cancel action"))?; + Ok(self.handle_cancel(id)) + } + "pause" => { + if let Some(blocked) = self.enforce_mutation_allowed(action) { + return Ok(blocked); + } + let id = args + .get("id") + .and_then(|value| value.as_str()) + .ok_or_else(|| anyhow::anyhow!("Missing 'id' parameter for pause action"))?; + Ok(self.handle_pause_resume(id, true)) + } + "resume" => { + if let Some(blocked) = self.enforce_mutation_allowed(action) { + return Ok(blocked); + } + let id = args + .get("id") + .and_then(|value| value.as_str()) + .ok_or_else(|| anyhow::anyhow!("Missing 'id' parameter for resume action"))?; + Ok(self.handle_pause_resume(id, false)) + } + other => Ok(ToolResult { + success: false, + output: String::new(), + error: Some(format!( + "Unknown action '{other}'. Use create/add/once/list/get/cancel/remove/pause/resume." + )), + }), + } + } +} + +impl ScheduleTool { + fn enforce_mutation_allowed(&self, action: &str) -> Option { + if !self.security.can_act() { + return Some(ToolResult { + success: false, + output: String::new(), + error: Some(format!( + "Security policy: read-only mode, cannot perform '{action}'" + )), + }); + } + + if !self.security.record_action() { + return Some(ToolResult { + success: false, + output: String::new(), + error: Some("Rate limit exceeded: action budget exhausted".to_string()), + }); + } + + None + } + + fn handle_list(&self) -> Result { + let jobs = cron::list_jobs(&self.config)?; + if jobs.is_empty() { + return Ok(ToolResult { + success: true, + output: "No scheduled jobs.".to_string(), + error: None, + }); + } + + let mut lines = Vec::with_capacity(jobs.len()); + for job in jobs { + let flags = match (job.paused, job.one_shot) { + (true, true) => " [paused, one-shot]", + (true, false) => " [paused]", + (false, true) => " [one-shot]", + (false, false) => "", + }; + let last_run = job + .last_run + .map_or_else(|| "never".to_string(), |value| value.to_rfc3339()); + let last_status = job.last_status.unwrap_or_else(|| "n/a".to_string()); + lines.push(format!( + "- {} | {} | next={} | last={} ({}){} | cmd: {}", + job.id, + job.expression, + job.next_run.to_rfc3339(), + last_run, + last_status, + flags, + job.command + )); + } + + Ok(ToolResult { + success: true, + output: format!("Scheduled jobs ({}):\n{}", lines.len(), lines.join("\n")), + error: None, + }) + } + + fn handle_get(&self, id: &str) -> Result { + match cron::get_job(&self.config, id)? { + Some(job) => { + let detail = json!({ + "id": job.id, + "expression": job.expression, + "command": job.command, + "next_run": job.next_run.to_rfc3339(), + "last_run": job.last_run.map(|value| value.to_rfc3339()), + "last_status": job.last_status, + "paused": job.paused, + "one_shot": job.one_shot, + }); + Ok(ToolResult { + success: true, + output: serde_json::to_string_pretty(&detail)?, + error: None, + }) + } + None => Ok(ToolResult { + success: false, + output: String::new(), + error: Some(format!("Job '{id}' not found")), + }), + } + } + + fn handle_create_like(&self, action: &str, args: &serde_json::Value) -> Result { + let command = args + .get("command") + .and_then(|value| value.as_str()) + .filter(|value| !value.trim().is_empty()) + .ok_or_else(|| anyhow::anyhow!("Missing or empty 'command' parameter"))?; + + let expression = args.get("expression").and_then(|value| value.as_str()); + let delay = args.get("delay").and_then(|value| value.as_str()); + let run_at = args.get("run_at").and_then(|value| value.as_str()); + + match action { + "add" => { + if expression.is_none() || delay.is_some() || run_at.is_some() { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("'add' requires 'expression' and forbids delay/run_at".into()), + }); + } + } + "once" => { + if expression.is_some() || (delay.is_none() && run_at.is_none()) { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("'once' requires exactly one of 'delay' or 'run_at'".into()), + }); + } + if delay.is_some() && run_at.is_some() { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("'once' supports either delay or run_at, not both".into()), + }); + } + } + _ => { + let count = [expression.is_some(), delay.is_some(), run_at.is_some()] + .into_iter() + .filter(|value| *value) + .count(); + if count != 1 { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some( + "Exactly one of 'expression', 'delay', or 'run_at' must be provided" + .into(), + ), + }); + } + } + } + + if let Some(value) = expression { + let job = cron::add_job(&self.config, value, command)?; + return Ok(ToolResult { + success: true, + output: format!( + "Created recurring job {} (expr: {}, next: {}, cmd: {})", + job.id, + job.expression, + job.next_run.to_rfc3339(), + job.command + ), + error: None, + }); + } + + if let Some(value) = delay { + let job = cron::add_once(&self.config, value, command)?; + return Ok(ToolResult { + success: true, + output: format!( + "Created one-shot job {} (runs at: {}, cmd: {})", + job.id, + job.next_run.to_rfc3339(), + job.command + ), + error: None, + }); + } + + let run_at_raw = run_at.ok_or_else(|| anyhow::anyhow!("Missing scheduling parameters"))?; + let run_at_parsed: DateTime = DateTime::parse_from_rfc3339(run_at_raw) + .map_err(|error| anyhow::anyhow!("Invalid run_at timestamp: {error}"))? + .with_timezone(&Utc); + + let job = cron::add_once_at(&self.config, run_at_parsed, command)?; + Ok(ToolResult { + success: true, + output: format!( + "Created one-shot job {} (runs at: {}, cmd: {})", + job.id, + job.next_run.to_rfc3339(), + job.command + ), + error: None, + }) + } + + fn handle_cancel(&self, id: &str) -> ToolResult { + match cron::remove_job(&self.config, id) { + Ok(()) => ToolResult { + success: true, + output: format!("Cancelled job {id}"), + error: None, + }, + Err(error) => ToolResult { + success: false, + output: String::new(), + error: Some(error.to_string()), + }, + } + } + + fn handle_pause_resume(&self, id: &str, pause: bool) -> ToolResult { + let operation = if pause { + cron::pause_job(&self.config, id) + } else { + cron::resume_job(&self.config, id) + }; + + match operation { + Ok(()) => ToolResult { + success: true, + output: if pause { + format!("Paused job {id}") + } else { + format!("Resumed job {id}") + }, + error: None, + }, + Err(error) => ToolResult { + success: false, + output: String::new(), + error: Some(error.to_string()), + }, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::security::AutonomyLevel; + use tempfile::TempDir; + + fn test_setup() -> (TempDir, Config, Arc) { + let tmp = TempDir::new().unwrap(); + let config = Config { + workspace_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + ..Config::default() + }; + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + let security = Arc::new(SecurityPolicy::from_config( + &config.autonomy, + &config.workspace_dir, + )); + (tmp, config, security) + } + + #[test] + fn tool_name_and_schema() { + let (_tmp, config, security) = test_setup(); + let tool = ScheduleTool::new(security, config); + assert_eq!(tool.name(), "schedule"); + let schema = tool.parameters_schema(); + assert!(schema["properties"]["action"].is_object()); + } + + #[tokio::test] + async fn list_empty() { + let (_tmp, config, security) = test_setup(); + let tool = ScheduleTool::new(security, config); + + let result = tool.execute(json!({"action": "list"})).await.unwrap(); + assert!(result.success); + assert!(result.output.contains("No scheduled jobs")); + } + + #[tokio::test] + async fn create_get_and_cancel_roundtrip() { + let (_tmp, config, security) = test_setup(); + let tool = ScheduleTool::new(security, config); + + let create = tool + .execute(json!({ + "action": "create", + "expression": "*/5 * * * *", + "command": "echo hello" + })) + .await + .unwrap(); + assert!(create.success); + assert!(create.output.contains("Created recurring job")); + + let list = tool.execute(json!({"action": "list"})).await.unwrap(); + assert!(list.success); + assert!(list.output.contains("echo hello")); + + let id = create.output.split_whitespace().nth(3).unwrap(); + + let get = tool + .execute(json!({"action": "get", "id": id})) + .await + .unwrap(); + assert!(get.success); + assert!(get.output.contains("echo hello")); + + let cancel = tool + .execute(json!({"action": "cancel", "id": id})) + .await + .unwrap(); + assert!(cancel.success); + } + + #[tokio::test] + async fn once_and_pause_resume_aliases_work() { + let (_tmp, config, security) = test_setup(); + let tool = ScheduleTool::new(security, config); + + let once = tool + .execute(json!({ + "action": "once", + "delay": "30m", + "command": "echo delayed" + })) + .await + .unwrap(); + assert!(once.success); + + let add = tool + .execute(json!({ + "action": "add", + "expression": "*/10 * * * *", + "command": "echo recurring" + })) + .await + .unwrap(); + assert!(add.success); + + let id = add.output.split_whitespace().nth(3).unwrap(); + let pause = tool + .execute(json!({"action": "pause", "id": id})) + .await + .unwrap(); + assert!(pause.success); + + let resume = tool + .execute(json!({"action": "resume", "id": id})) + .await + .unwrap(); + assert!(resume.success); + } + + #[tokio::test] + async fn readonly_blocks_mutating_actions() { + let tmp = TempDir::new().unwrap(); + let config = Config { + workspace_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + autonomy: crate::config::AutonomyConfig { + level: AutonomyLevel::ReadOnly, + ..Default::default() + }, + ..Config::default() + }; + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + let security = Arc::new(SecurityPolicy::from_config( + &config.autonomy, + &config.workspace_dir, + )); + + let tool = ScheduleTool::new(security, config); + + let blocked = tool + .execute(json!({ + "action": "create", + "expression": "* * * * *", + "command": "echo blocked" + })) + .await + .unwrap(); + assert!(!blocked.success); + assert!(blocked.error.as_deref().unwrap().contains("read-only")); + + let list = tool.execute(json!({"action": "list"})).await.unwrap(); + assert!(list.success); + } + + #[tokio::test] + async fn unknown_action_returns_failure() { + let (_tmp, config, security) = test_setup(); + let tool = ScheduleTool::new(security, config); + + let result = tool.execute(json!({"action": "explode"})).await.unwrap(); + assert!(!result.success); + assert!(result.error.as_deref().unwrap().contains("Unknown action")); + } +}