diff --git a/Cargo.lock b/Cargo.lock index 93d2938..d33fee5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -462,6 +462,28 @@ dependencies = [ "windows-link", ] +[[package]] +name = "chrono-tz" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93698b29de5e97ad0ae26447b344c482a7284c737d9ddc5f9e52b74a336671bb" +dependencies = [ + "chrono", + "chrono-tz-build", + "phf", +] + +[[package]] +name = "chrono-tz-build" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c088aee841df9c3041febbb73934cfc39708749bf96dc827e3359cd39ef11b1" +dependencies = [ + "parse-zoneinfo", + "phf", + "phf_codegen", +] + [[package]] name = "chumsky" version = "0.9.3" @@ -2443,6 +2465,15 @@ dependencies = [ "windows-link", ] +[[package]] +name = "parse-zoneinfo" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f2a05b18d44e2957b88f96ba460715e295bc1d7510468a2f3d3b44535d26c24" +dependencies = [ + "regex", +] + [[package]] name = "parse_int" version = "0.9.0" @@ -2475,6 +2506,44 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "phf" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_codegen" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aef8048c789fa5e851558d709946d6d79a8ff88c0440c587967f8e94bfb1216a" +dependencies = [ + "phf_generator", + "phf_shared", +] + +[[package]] +name = "phf_generator" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" +dependencies = [ + "phf_shared", + "rand 0.8.5", +] + +[[package]] +name = "phf_shared" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project" version = "1.1.10" @@ -3367,6 +3436,12 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" +[[package]] +name = "siphasher" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e" + [[package]] name = "slab" version = "0.4.12" @@ -4821,6 +4896,7 @@ dependencies = [ "base64", "chacha20poly1305", "chrono", + "chrono-tz", "clap", "console 0.15.11", "cron", diff --git a/Cargo.toml b/Cargo.toml index be6deed..c5f14fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,6 +72,7 @@ async-trait = "0.1" # Memory / persistence rusqlite = { version = "0.38", features = ["bundled"] } chrono = { version = "0.4", default-features = false, features = ["clock", "std", "serde"] } +chrono-tz = "0.9" cron = "0.12" # Interactive CLI prompts diff --git a/src/agent/loop_.rs b/src/agent/loop_.rs index e645764..4495995 100644 --- a/src/agent/loop_.rs +++ b/src/agent/loop_.rs @@ -595,7 +595,7 @@ pub async fn run( model_override: Option, temperature: f64, peripheral_overrides: Vec, -) -> Result<()> { +) -> Result { // ── Wire up agnostic subsystems ────────────────────────────── let base_observer = observability::create_observer(&config.observability); let observer: Arc = Arc::from(base_observer); @@ -632,6 +632,7 @@ pub async fn run( (None, None) }; let mut tools_registry = tools::all_tools_with_runtime( + Arc::new(config.clone()), &security, runtime, mem.clone(), @@ -724,6 +725,24 @@ pub async fn run( "Delete a memory entry. Use when: memory is incorrect/stale or explicitly requested for removal. Don't use when: impact is uncertain.", ), ]; + tool_descs.push(( + "cron_add", + "Create a cron job. Supports schedule kinds: cron, at, every; and job types: shell or agent.", + )); + tool_descs.push(( + "cron_list", + "List all cron jobs with schedule, status, and metadata.", + )); + tool_descs.push(("cron_remove", "Remove a cron job by job_id.")); + tool_descs.push(( + "cron_update", + "Patch a cron job (schedule, enabled, command/prompt, model, delivery, session_target).", + )); + tool_descs.push(( + "cron_run", + "Force-run a cron job immediately and record a run history entry.", + )); + tool_descs.push(("cron_runs", "Show recent run history for a cron job.")); tool_descs.push(( "screenshot", "Capture a screenshot of the current screen. Returns file path and base64-encoded PNG. Use when: visual verification, UI inspection, debugging displays.", @@ -804,6 +823,8 @@ pub async fn run( // ── Execute ────────────────────────────────────────────────── let start = Instant::now(); + let mut final_output = String::new(); + if let Some(msg) = message { // Auto-save user message to memory if config.memory.auto_save { @@ -843,6 +864,7 @@ pub async fn run( false, ) .await?; + final_output = response.clone(); println!("{response}"); observer.record_event(&ObserverEvent::TurnComplete); @@ -912,6 +934,7 @@ pub async fn run( continue; } }; + final_output = response.clone(); println!("\n{response}\n"); observer.record_event(&ObserverEvent::TurnComplete); @@ -945,7 +968,7 @@ pub async fn run( tokens_used: None, }); - Ok(()) + Ok(final_output) } /// Process a single message through the full agent (with tools, peripherals, memory). @@ -974,6 +997,7 @@ pub async fn process_message(config: Config, message: &str) -> Result { (None, None) }; let mut tools_registry = tools::all_tools_with_runtime( + Arc::new(config.clone()), &security, runtime, mem.clone(), diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 2a1dcf9..1a161ad 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -800,6 +800,7 @@ pub async fn start_channels(config: Config) -> Result<()> { // Build system prompt from workspace identity files + skills let workspace = config.workspace_dir.clone(); let tools_registry = Arc::new(tools::all_tools_with_runtime( + Arc::new(config.clone()), &security, runtime, Arc::clone(&mem), diff --git a/src/config/mod.rs b/src/config/mod.rs index db620b2..bbb8d35 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -9,7 +9,7 @@ pub use schema::{ ModelRouteConfig, ObservabilityConfig, PeripheralBoardConfig, PeripheralsConfig, ReliabilityConfig, ResourceLimitsConfig, RuntimeConfig, SandboxBackend, SandboxConfig, SchedulerConfig, SecretsConfig, SecurityConfig, SlackConfig, TelegramConfig, TunnelConfig, - WebhookConfig, + WebhookConfig, CronConfig, }; #[cfg(test)] diff --git a/src/config/schema.rs b/src/config/schema.rs index 308f8e3..34be770 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -47,6 +47,9 @@ pub struct Config { #[serde(default)] pub heartbeat: HeartbeatConfig, + #[serde(default)] + pub cron: CronConfig, + #[serde(default)] pub channels_config: ChannelsConfig, @@ -1172,6 +1175,29 @@ impl Default for HeartbeatConfig { } } +// ── Cron ──────────────────────────────────────────────────────── + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CronConfig { + #[serde(default = "default_true")] + pub enabled: bool, + #[serde(default = "default_max_run_history")] + pub max_run_history: u32, +} + +fn default_max_run_history() -> u32 { + 50 +} + +impl Default for CronConfig { + fn default() -> Self { + Self { + enabled: true, + max_run_history: default_max_run_history(), + } + } +} + // ── Tunnel ────────────────────────────────────────────────────── #[derive(Debug, Clone, Serialize, Deserialize)] @@ -1579,6 +1605,7 @@ impl Default for Config { agent: AgentConfig::default(), model_routes: Vec::new(), heartbeat: HeartbeatConfig::default(), + cron: CronConfig::default(), channels_config: ChannelsConfig::default(), memory: MemoryConfig::default(), tunnel: TunnelConfig::default(), @@ -1863,6 +1890,38 @@ mod tests { assert_eq!(h.interval_minutes, 30); } + #[test] + fn cron_config_default() { + let c = CronConfig::default(); + assert!(c.enabled); + assert_eq!(c.max_run_history, 50); + } + + #[test] + fn cron_config_serde_roundtrip() { + let c = CronConfig { + enabled: false, + max_run_history: 100, + }; + let json = serde_json::to_string(&c).unwrap(); + let parsed: CronConfig = serde_json::from_str(&json).unwrap(); + assert!(!parsed.enabled); + assert_eq!(parsed.max_run_history, 100); + } + + #[test] + fn config_defaults_cron_when_section_missing() { + let toml_str = r#" +workspace_dir = "/tmp/workspace" +config_path = "/tmp/config.toml" +default_temperature = 0.7 +"#; + + let parsed: Config = toml::from_str(toml_str).unwrap(); + assert!(parsed.cron.enabled); + assert_eq!(parsed.cron.max_run_history, 50); + } + #[test] fn memory_config_default_hygiene_settings() { let m = MemoryConfig::default(); @@ -1918,6 +1977,7 @@ mod tests { enabled: true, interval_minutes: 15, }, + cron: CronConfig::default(), channels_config: ChannelsConfig { cli: true, telegram: Some(TelegramConfig { @@ -2041,6 +2101,7 @@ tool_dispatcher = "xml" scheduler: SchedulerConfig::default(), model_routes: Vec::new(), heartbeat: HeartbeatConfig::default(), + cron: CronConfig::default(), channels_config: ChannelsConfig::default(), memory: MemoryConfig::default(), tunnel: TunnelConfig::default(), diff --git a/src/cron/mod.rs b/src/cron/mod.rs index cddc134..8c412e1 100644 --- a/src/cron/mod.rs +++ b/src/cron/mod.rs @@ -1,24 +1,24 @@ use crate::config::Config; -use anyhow::{Context, Result}; -use chrono::{DateTime, Utc}; -use cron::Schedule; -use rusqlite::{params, Connection}; -use std::str::FromStr; -use uuid::Uuid; +use anyhow::Result; + +mod schedule; +mod store; +mod types; pub mod scheduler; -#[derive(Debug, Clone)] -pub struct CronJob { - pub id: String, - pub expression: String, - pub command: String, - pub next_run: DateTime, - pub last_run: Option>, - pub last_status: Option, - pub paused: bool, - pub one_shot: bool, -} +#[allow(unused_imports)] +pub use schedule::{ + next_run_for_schedule, normalize_expression, schedule_cron_expression, validate_schedule, +}; +#[allow(unused_imports)] +pub use store::{ + add_agent_job, add_job, add_shell_job, due_jobs, get_job, list_jobs, list_runs, record_last_run, + record_run, remove_job, reschedule_after_run, update_job, +}; +pub use types::{ + CronJob, CronJobPatch, CronRun, DeliveryConfig, JobType, Schedule, SessionTarget, +}; #[allow(clippy::needless_pass_by_value)] pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<()> { @@ -29,7 +29,6 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<( println!("No scheduled tasks yet."); println!("\nUsage:"); println!(" zeroclaw cron add '0 9 * * *' 'agent -m \"Good morning!\"'"); - println!(" zeroclaw cron once 30m 'echo reminder'"); return Ok(()); } @@ -39,629 +38,134 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<( .last_run .map_or_else(|| "never".into(), |d| d.to_rfc3339()); let last_status = job.last_status.unwrap_or_else(|| "n/a".into()); - let flags = match (job.paused, job.one_shot) { - (true, true) => " [paused, one-shot]", - (true, false) => " [paused]", - (false, true) => " [one-shot]", - (false, false) => "", - }; println!( - "- {} | {} | next={} | last={} ({}){}\n cmd: {}", + "- {} | {:?} | next={} | last={} ({})", job.id, - job.expression, + job.schedule, job.next_run.to_rfc3339(), last_run, last_status, - flags, - job.command ); + if !job.command.is_empty() { + println!(" cmd: {}", job.command); + } + if let Some(prompt) = &job.prompt { + println!(" prompt: {prompt}"); + } } Ok(()) } crate::CronCommands::Add { expression, + tz, command, } => { - let job = add_job(config, &expression, &command)?; + let schedule = Schedule::Cron { + expr: expression, + tz, + }; + let job = add_shell_job(config, None, schedule, &command)?; println!("✅ Added cron job {}", job.id); println!(" Expr: {}", job.expression); println!(" Next: {}", job.next_run.to_rfc3339()); println!(" Cmd : {}", job.command); Ok(()) } + crate::CronCommands::AddAt { at, command } => { + let at = chrono::DateTime::parse_from_rfc3339(&at) + .map_err(|e| anyhow::anyhow!("Invalid RFC3339 timestamp for --at: {e}"))? + .with_timezone(&chrono::Utc); + let schedule = Schedule::At { at }; + let job = add_shell_job(config, None, schedule, &command)?; + println!("✅ Added one-shot cron job {}", job.id); + println!(" At : {}", job.next_run.to_rfc3339()); + println!(" Cmd : {}", job.command); + Ok(()) + } + crate::CronCommands::AddEvery { every_ms, command } => { + let schedule = Schedule::Every { every_ms }; + let job = add_shell_job(config, None, schedule, &command)?; + println!("✅ Added interval cron job {}", job.id); + println!(" Every(ms): {every_ms}"); + println!(" Next : {}", job.next_run.to_rfc3339()); + println!(" Cmd : {}", job.command); + Ok(()) + } crate::CronCommands::Once { delay, command } => { let job = add_once(config, &delay, &command)?; - println!("✅ Added one-shot task {}", job.id); - println!(" Runs at: {}", job.next_run.to_rfc3339()); - println!(" Cmd : {}", job.command); - Ok(()) - } - crate::CronCommands::Remove { id } => { - remove_job(config, &id)?; - println!("✅ Removed cron job {id}"); + println!("✅ Added one-shot cron job {}", job.id); + println!(" At : {}", job.next_run.to_rfc3339()); + println!(" Cmd : {}", job.command); Ok(()) } + crate::CronCommands::Remove { id } => remove_job(config, &id), crate::CronCommands::Pause { id } => { pause_job(config, &id)?; - println!("⏸️ Paused job {id}"); + println!("⏸️ Paused cron job {id}"); Ok(()) } crate::CronCommands::Resume { id } => { resume_job(config, &id)?; - println!("▶️ Resumed job {id}"); + println!("▶️ Resumed cron job {id}"); Ok(()) } } } -pub fn add_job(config: &Config, expression: &str, command: &str) -> Result { - check_max_tasks(config)?; - let now = Utc::now(); - let next_run = next_run_for(expression, now)?; - let id = Uuid::new_v4().to_string(); - - with_connection(config, |conn| { - conn.execute( - "INSERT INTO cron_jobs (id, expression, command, created_at, next_run, paused, one_shot) - VALUES (?1, ?2, ?3, ?4, ?5, 0, 0)", - params![ - id, - expression, - command, - now.to_rfc3339(), - next_run.to_rfc3339() - ], - ) - .context("Failed to insert cron job")?; - Ok(()) - })?; - - Ok(CronJob { - id, - expression: expression.to_string(), - command: command.to_string(), - next_run, - last_run: None, - last_status: None, - paused: false, - one_shot: false, - }) -} - -pub fn add_one_shot_job(config: &Config, run_at: DateTime, command: &str) -> Result { - add_one_shot_job_with_expression(config, run_at, command, "@once".to_string()) -} - pub fn add_once(config: &Config, delay: &str, command: &str) -> Result { - let duration = parse_duration(delay)?; - let run_at = Utc::now() + duration; - add_one_shot_job_with_expression(config, run_at, command, format!("@once:{delay}")) + let duration = parse_delay(delay)?; + let at = chrono::Utc::now() + duration; + add_once_at(config, at, command) } -pub fn add_once_at(config: &Config, at: DateTime, command: &str) -> Result { - add_one_shot_job_with_expression(config, at, command, format!("@at:{}", at.to_rfc3339())) -} - -fn add_one_shot_job_with_expression( +pub fn add_once_at( config: &Config, - run_at: DateTime, + at: chrono::DateTime, command: &str, - expression: String, ) -> Result { - check_max_tasks(config)?; - let now = Utc::now(); - if run_at <= now { - anyhow::bail!("Scheduled time must be in the future"); - } + let schedule = Schedule::At { at }; + add_shell_job(config, None, schedule, command) +} - let id = Uuid::new_v4().to_string(); - - with_connection(config, |conn| { - conn.execute( - "INSERT INTO cron_jobs (id, expression, command, created_at, next_run, paused, one_shot) - VALUES (?1, ?2, ?3, ?4, ?5, 0, 1)", - params![id, expression, command, now.to_rfc3339(), run_at.to_rfc3339()], - ) - .context("Failed to insert one-shot task")?; - Ok(()) - })?; - - Ok(CronJob { +pub fn pause_job(config: &Config, id: &str) -> Result { + update_job( + config, id, - expression, - command: command.to_string(), - next_run: run_at, - last_run: None, - last_status: None, - paused: false, - one_shot: true, - }) + CronJobPatch { + enabled: Some(false), + ..CronJobPatch::default() + }, + ) } -pub fn get_job(config: &Config, id: &str) -> Result> { - with_connection(config, |conn| { - let mut stmt = conn.prepare( - "SELECT id, expression, command, next_run, last_run, last_status, paused, one_shot - FROM cron_jobs WHERE id = ?1", - )?; - - let mut rows = stmt.query_map(params![id], |row| Ok(parse_job_row(row)))?; - - match rows.next() { - Some(Ok(job_result)) => Ok(Some(job_result?)), - Some(Err(e)) => Err(e.into()), - None => Ok(None), - } - }) +pub fn resume_job(config: &Config, id: &str) -> Result { + update_job( + config, + id, + CronJobPatch { + enabled: Some(true), + ..CronJobPatch::default() + }, + ) } -pub fn pause_job(config: &Config, id: &str) -> Result<()> { - let changed = with_connection(config, |conn| { - conn.execute("UPDATE cron_jobs SET paused = 1 WHERE id = ?1", params![id]) - .context("Failed to pause cron job") - })?; - - if changed == 0 { - anyhow::bail!("Cron job '{id}' not found"); - } - - Ok(()) -} - -pub fn resume_job(config: &Config, id: &str) -> Result<()> { - let changed = with_connection(config, |conn| { - conn.execute("UPDATE cron_jobs SET paused = 0 WHERE id = ?1", params![id]) - .context("Failed to resume cron job") - })?; - - if changed == 0 { - anyhow::bail!("Cron job '{id}' not found"); - } - - Ok(()) -} - -fn check_max_tasks(config: &Config) -> Result<()> { - let count = with_connection(config, |conn| { - let mut stmt = conn.prepare("SELECT COUNT(*) FROM cron_jobs")?; - let count: i64 = stmt.query_row([], |row| row.get(0))?; - usize::try_from(count).context("Unexpected negative task count") - })?; - - if count >= config.scheduler.max_tasks { - anyhow::bail!( - "Maximum number of scheduled tasks ({}) reached", - config.scheduler.max_tasks - ); - } - - Ok(()) -} - -fn parse_duration(input: &str) -> Result { +fn parse_delay(input: &str) -> Result { let input = input.trim(); if input.is_empty() { - anyhow::bail!("Empty delay string"); + anyhow::bail!("delay must not be empty"); } - - let (num_str, unit) = if input.ends_with(|c: char| c.is_ascii_alphabetic()) { - let split = input.len() - 1; - (&input[..split], &input[split..]) - } else { - (input, "m") + let split = input + .find(|c: char| !c.is_ascii_digit()) + .unwrap_or(input.len()); + let (num, unit) = input.split_at(split); + let amount: i64 = num.parse()?; + let unit = if unit.is_empty() { "m" } else { unit }; + let duration = match unit { + "s" => chrono::Duration::seconds(amount), + "m" => chrono::Duration::minutes(amount), + "h" => chrono::Duration::hours(amount), + "d" => chrono::Duration::days(amount), + _ => anyhow::bail!("unsupported delay unit '{unit}', use s/m/h/d"), }; - - let n: u64 = num_str - .trim() - .parse() - .with_context(|| format!("Invalid duration number: {num_str}"))?; - - let multiplier: u64 = match unit { - "s" => 1, - "m" => 60, - "h" => 3600, - "d" => 86400, - "w" => 604_800, - _ => anyhow::bail!("Unknown duration unit '{unit}', expected s/m/h/d/w"), - }; - - let secs = n - .checked_mul(multiplier) - .filter(|&s| i64::try_from(s).is_ok()) - .ok_or_else(|| anyhow::anyhow!("Duration value too large: {input}"))?; - - #[allow(clippy::cast_possible_wrap)] - Ok(chrono::Duration::seconds(secs as i64)) -} - -pub fn list_jobs(config: &Config) -> Result> { - with_connection(config, |conn| { - let mut stmt = conn.prepare( - "SELECT id, expression, command, next_run, last_run, last_status, paused, one_shot - FROM cron_jobs ORDER BY next_run ASC", - )?; - - let rows = stmt.query_map([], |row| Ok(parse_job_row(row)))?; - - let mut jobs = Vec::new(); - for row in rows { - jobs.push(row??); - } - Ok(jobs) - }) -} - -pub fn remove_job(config: &Config, id: &str) -> Result<()> { - let changed = with_connection(config, |conn| { - conn.execute("DELETE FROM cron_jobs WHERE id = ?1", params![id]) - .context("Failed to delete cron job") - })?; - - if changed == 0 { - anyhow::bail!("Cron job '{id}' not found"); - } - - Ok(()) -} - -pub fn due_jobs(config: &Config, now: DateTime) -> Result> { - with_connection(config, |conn| { - let mut stmt = conn.prepare( - "SELECT id, expression, command, next_run, last_run, last_status, paused, one_shot - FROM cron_jobs WHERE next_run <= ?1 AND paused = 0 ORDER BY next_run ASC", - )?; - - let rows = stmt.query_map(params![now.to_rfc3339()], |row| Ok(parse_job_row(row)))?; - - let mut jobs = Vec::new(); - for row in rows { - jobs.push(row??); - } - Ok(jobs) - }) -} - -pub fn reschedule_after_run( - config: &Config, - job: &CronJob, - success: bool, - output: &str, -) -> Result<()> { - if job.one_shot { - with_connection(config, |conn| { - conn.execute("DELETE FROM cron_jobs WHERE id = ?1", params![job.id]) - .context("Failed to remove one-shot task after execution")?; - Ok(()) - })?; - return Ok(()); - } - - let now = Utc::now(); - let next_run = next_run_for(&job.expression, now)?; - let status = if success { "ok" } else { "error" }; - - with_connection(config, |conn| { - conn.execute( - "UPDATE cron_jobs - SET next_run = ?1, last_run = ?2, last_status = ?3, last_output = ?4 - WHERE id = ?5", - params![ - next_run.to_rfc3339(), - now.to_rfc3339(), - status, - output, - job.id - ], - ) - .context("Failed to update cron job run state")?; - Ok(()) - }) -} - -fn next_run_for(expression: &str, from: DateTime) -> Result> { - let normalized = normalize_expression(expression)?; - let schedule = Schedule::from_str(&normalized) - .with_context(|| format!("Invalid cron expression: {expression}"))?; - schedule - .after(&from) - .next() - .ok_or_else(|| anyhow::anyhow!("No future occurrence for expression: {expression}")) -} - -fn normalize_expression(expression: &str) -> Result { - let expression = expression.trim(); - let field_count = expression.split_whitespace().count(); - - match field_count { - 5 => Ok(format!("0 {expression}")), - 6 | 7 => Ok(expression.to_string()), - _ => anyhow::bail!( - "Invalid cron expression: {expression} (expected 5, 6, or 7 fields, got {field_count})" - ), - } -} - -fn parse_job_row(row: &rusqlite::Row<'_>) -> Result { - let id: String = row.get(0)?; - let expression: String = row.get(1)?; - let command: String = row.get(2)?; - let next_run_raw: String = row.get(3)?; - let last_run_raw: Option = row.get(4)?; - let last_status: Option = row.get(5)?; - let paused: bool = row.get(6)?; - let one_shot: bool = row.get(7)?; - - Ok(CronJob { - id, - expression, - command, - next_run: parse_rfc3339(&next_run_raw)?, - last_run: match last_run_raw { - Some(raw) => Some(parse_rfc3339(&raw)?), - None => None, - }, - last_status, - paused, - one_shot, - }) -} - -fn parse_rfc3339(raw: &str) -> Result> { - let parsed = DateTime::parse_from_rfc3339(raw) - .with_context(|| format!("Invalid RFC3339 timestamp in cron DB: {raw}"))?; - Ok(parsed.with_timezone(&Utc)) -} - -fn with_connection(config: &Config, f: impl FnOnce(&Connection) -> Result) -> Result { - let db_path = config.workspace_dir.join("cron").join("jobs.db"); - if let Some(parent) = db_path.parent() { - std::fs::create_dir_all(parent) - .with_context(|| format!("Failed to create cron directory: {}", parent.display()))?; - } - - let conn = Connection::open(&db_path) - .with_context(|| format!("Failed to open cron DB: {}", db_path.display()))?; - - // ── Production-grade PRAGMA tuning ────────────────────── - conn.execute_batch( - "PRAGMA journal_mode = WAL; - PRAGMA synchronous = NORMAL; - PRAGMA mmap_size = 8388608; - PRAGMA cache_size = -2000; - PRAGMA temp_store = MEMORY;", - ) - .context("Failed to set cron DB PRAGMAs")?; - - conn.execute_batch( - "PRAGMA journal_mode = WAL; - PRAGMA synchronous = NORMAL; - PRAGMA mmap_size = 8388608; - PRAGMA cache_size = -2000; - PRAGMA temp_store = MEMORY;", - ) - .context("Failed to set cron DB PRAGMAs")?; - - conn.execute_batch( - "CREATE TABLE IF NOT EXISTS cron_jobs ( - id TEXT PRIMARY KEY, - expression TEXT NOT NULL, - command TEXT NOT NULL, - created_at TEXT NOT NULL, - next_run TEXT NOT NULL, - last_run TEXT, - last_status TEXT, - last_output TEXT, - paused INTEGER NOT NULL DEFAULT 0, - one_shot INTEGER NOT NULL DEFAULT 0 - ); - CREATE INDEX IF NOT EXISTS idx_cron_jobs_next_run ON cron_jobs(next_run);", - ) - .context("Failed to initialize cron schema")?; - - for column in ["paused", "one_shot"] { - let alter = format!("ALTER TABLE cron_jobs ADD COLUMN {column} INTEGER NOT NULL DEFAULT 0"); - let _ = conn.execute_batch(&alter); - } - - f(&conn) -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::config::Config; - use chrono::Duration as ChronoDuration; - use tempfile::TempDir; - - fn test_config(tmp: &TempDir) -> Config { - let config = Config { - workspace_dir: tmp.path().join("workspace"), - config_path: tmp.path().join("config.toml"), - ..Config::default() - }; - std::fs::create_dir_all(&config.workspace_dir).unwrap(); - config - } - - #[test] - fn add_job_accepts_five_field_expression() { - let tmp = TempDir::new().unwrap(); - let config = test_config(&tmp); - - let job = add_job(&config, "*/5 * * * *", "echo ok").unwrap(); - - assert_eq!(job.expression, "*/5 * * * *"); - assert_eq!(job.command, "echo ok"); - assert!(!job.one_shot); - assert!(!job.paused); - } - - #[test] - fn add_job_rejects_invalid_field_count() { - let tmp = TempDir::new().unwrap(); - let config = test_config(&tmp); - - let err = add_job(&config, "* * * *", "echo bad").unwrap_err(); - assert!(err.to_string().contains("expected 5, 6, or 7 fields")); - } - - #[test] - fn add_list_remove_roundtrip() { - let tmp = TempDir::new().unwrap(); - let config = test_config(&tmp); - - let job = add_job(&config, "*/10 * * * *", "echo roundtrip").unwrap(); - let listed = list_jobs(&config).unwrap(); - assert_eq!(listed.len(), 1); - assert_eq!(listed[0].id, job.id); - - remove_job(&config, &job.id).unwrap(); - assert!(list_jobs(&config).unwrap().is_empty()); - } - - #[test] - fn add_once_creates_one_shot_job() { - let tmp = TempDir::new().unwrap(); - let config = test_config(&tmp); - - let job = add_once(&config, "30m", "echo once").unwrap(); - assert!(job.one_shot); - assert!(job.expression.starts_with("@once:")); - - let fetched = get_job(&config, &job.id).unwrap().unwrap(); - assert!(fetched.one_shot); - assert!(!fetched.paused); - } - - #[test] - fn add_once_at_rejects_past_timestamp() { - let tmp = TempDir::new().unwrap(); - let config = test_config(&tmp); - - let run_at = Utc::now() - ChronoDuration::minutes(1); - let err = add_once_at(&config, run_at, "echo past").unwrap_err(); - assert!(err.to_string().contains("future")); - } - - #[test] - fn get_job_found_and_missing() { - let tmp = TempDir::new().unwrap(); - let config = test_config(&tmp); - - let job = add_job(&config, "*/5 * * * *", "echo found").unwrap(); - let found = get_job(&config, &job.id).unwrap(); - assert!(found.is_some()); - assert_eq!(found.unwrap().id, job.id); - - let missing = get_job(&config, "nonexistent").unwrap(); - assert!(missing.is_none()); - } - - #[test] - fn pause_resume_roundtrip() { - let tmp = TempDir::new().unwrap(); - let config = test_config(&tmp); - - let job = add_job(&config, "*/5 * * * *", "echo pause").unwrap(); - pause_job(&config, &job.id).unwrap(); - assert!(get_job(&config, &job.id).unwrap().unwrap().paused); - - resume_job(&config, &job.id).unwrap(); - assert!(!get_job(&config, &job.id).unwrap().unwrap().paused); - } - - #[test] - fn due_jobs_filters_by_timestamp_and_skips_paused() { - let tmp = TempDir::new().unwrap(); - let config = test_config(&tmp); - - let active = add_job(&config, "* * * * *", "echo due").unwrap(); - let paused = add_job(&config, "* * * * *", "echo paused").unwrap(); - pause_job(&config, &paused.id).unwrap(); - - let due_now = due_jobs(&config, Utc::now()).unwrap(); - assert!(due_now.is_empty(), "new jobs should not be due immediately"); - - let far_future = Utc::now() + ChronoDuration::days(365); - let due_future = due_jobs(&config, far_future).unwrap(); - assert_eq!(due_future.len(), 1); - assert_eq!(due_future[0].id, active.id); - } - - #[test] - fn reschedule_after_run_persists_last_status_and_last_run() { - let tmp = TempDir::new().unwrap(); - let config = test_config(&tmp); - - let job = add_job(&config, "*/15 * * * *", "echo run").unwrap(); - reschedule_after_run(&config, &job, false, "failed output").unwrap(); - - let listed = list_jobs(&config).unwrap(); - let stored = listed.iter().find(|j| j.id == job.id).unwrap(); - assert_eq!(stored.last_status.as_deref(), Some("error")); - assert!(stored.last_run.is_some()); - } - - #[test] - fn reschedule_after_run_removes_one_shot_jobs() { - let tmp = TempDir::new().unwrap(); - let config = test_config(&tmp); - - let run_at = Utc::now() + ChronoDuration::minutes(1); - let job = add_one_shot_job(&config, run_at, "echo once").unwrap(); - reschedule_after_run(&config, &job, true, "ok").unwrap(); - - assert!(get_job(&config, &job.id).unwrap().is_none()); - } - - #[test] - fn scheduler_columns_migrate_from_old_schema() { - let tmp = TempDir::new().unwrap(); - let config = test_config(&tmp); - - let db_path = config.workspace_dir.join("cron").join("jobs.db"); - std::fs::create_dir_all(db_path.parent().unwrap()).unwrap(); - - { - let conn = rusqlite::Connection::open(&db_path).unwrap(); - conn.execute_batch( - "CREATE TABLE cron_jobs ( - id TEXT PRIMARY KEY, - expression TEXT NOT NULL, - command TEXT NOT NULL, - created_at TEXT NOT NULL, - next_run TEXT NOT NULL, - last_run TEXT, - last_status TEXT, - last_output TEXT - );", - ) - .unwrap(); - conn.execute( - "INSERT INTO cron_jobs (id, expression, command, created_at, next_run) - VALUES ('old-job', '* * * * *', 'echo old', '2025-01-01T00:00:00Z', '2030-01-01T00:00:00Z')", - [], - ) - .unwrap(); - } - - let jobs = list_jobs(&config).unwrap(); - assert_eq!(jobs.len(), 1); - assert_eq!(jobs[0].id, "old-job"); - assert!(!jobs[0].paused); - assert!(!jobs[0].one_shot); - } - - #[test] - fn max_tasks_limit_is_enforced() { - let tmp = TempDir::new().unwrap(); - let mut config = test_config(&tmp); - config.scheduler.max_tasks = 1; - - let _first = add_job(&config, "*/10 * * * *", "echo first").unwrap(); - let err = add_job(&config, "*/11 * * * *", "echo second").unwrap_err(); - assert!(err - .to_string() - .contains("Maximum number of scheduled tasks")); - } + Ok(duration) } diff --git a/src/cron/schedule.rs b/src/cron/schedule.rs new file mode 100644 index 0000000..d7206b7 --- /dev/null +++ b/src/cron/schedule.rs @@ -0,0 +1,114 @@ +use crate::cron::Schedule; +use anyhow::{Context, Result}; +use chrono::{DateTime, Duration as ChronoDuration, Utc}; +use cron::Schedule as CronExprSchedule; +use std::str::FromStr; + +pub fn next_run_for_schedule(schedule: &Schedule, from: DateTime) -> Result> { + match schedule { + Schedule::Cron { expr, tz } => { + let normalized = normalize_expression(expr)?; + let cron = CronExprSchedule::from_str(&normalized) + .with_context(|| format!("Invalid cron expression: {expr}"))?; + + if let Some(tz_name) = tz { + let timezone = chrono_tz::Tz::from_str(tz_name) + .with_context(|| format!("Invalid IANA timezone: {tz_name}"))?; + let localized_from = from.with_timezone(&timezone); + let next_local = cron.after(&localized_from).next().ok_or_else(|| { + anyhow::anyhow!("No future occurrence for expression: {expr}") + })?; + Ok(next_local.with_timezone(&Utc)) + } else { + cron.after(&from) + .next() + .ok_or_else(|| anyhow::anyhow!("No future occurrence for expression: {expr}")) + } + } + Schedule::At { at } => Ok(*at), + Schedule::Every { every_ms } => { + if *every_ms == 0 { + anyhow::bail!("Invalid schedule: every_ms must be > 0"); + } + let ms = i64::try_from(*every_ms).context("every_ms is too large")?; + let delta = ChronoDuration::milliseconds(ms); + from.checked_add_signed(delta) + .ok_or_else(|| anyhow::anyhow!("every_ms overflowed DateTime")) + } + } +} + +pub fn validate_schedule(schedule: &Schedule, now: DateTime) -> Result<()> { + match schedule { + Schedule::Cron { expr, .. } => { + let _ = normalize_expression(expr)?; + let _ = next_run_for_schedule(schedule, now)?; + Ok(()) + } + Schedule::At { at } => { + if *at <= now { + anyhow::bail!("Invalid schedule: 'at' must be in the future"); + } + Ok(()) + } + Schedule::Every { every_ms } => { + if *every_ms == 0 { + anyhow::bail!("Invalid schedule: every_ms must be > 0"); + } + Ok(()) + } + } +} + +pub fn schedule_cron_expression(schedule: &Schedule) -> Option { + match schedule { + Schedule::Cron { expr, .. } => Some(expr.clone()), + _ => None, + } +} + +pub fn normalize_expression(expression: &str) -> Result { + let expression = expression.trim(); + let field_count = expression.split_whitespace().count(); + + match field_count { + // standard crontab syntax: minute hour day month weekday + 5 => Ok(format!("0 {expression}")), + // crate-native syntax includes seconds (+ optional year) + 6 | 7 => Ok(expression.to_string()), + _ => anyhow::bail!( + "Invalid cron expression: {expression} (expected 5, 6, or 7 fields, got {field_count})" + ), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeZone; + + #[test] + fn next_run_for_schedule_supports_every_and_at() { + let now = Utc::now(); + let every = Schedule::Every { every_ms: 60_000 }; + let next = next_run_for_schedule(&every, now).unwrap(); + assert!(next > now); + + let at = now + ChronoDuration::minutes(10); + let at_schedule = Schedule::At { at }; + let next_at = next_run_for_schedule(&at_schedule, now).unwrap(); + assert_eq!(next_at, at); + } + + #[test] + fn next_run_for_schedule_supports_timezone() { + let from = Utc.with_ymd_and_hms(2026, 2, 16, 0, 0, 0).unwrap(); + let schedule = Schedule::Cron { + expr: "0 9 * * *".into(), + tz: Some("America/Los_Angeles".into()), + }; + + let next = next_run_for_schedule(&schedule, from).unwrap(); + assert_eq!(next, Utc.with_ymd_and_hms(2026, 2, 16, 17, 0, 0).unwrap()); + } +} diff --git a/src/cron/scheduler.rs b/src/cron/scheduler.rs index bdb5f0b..df771d6 100644 --- a/src/cron/scheduler.rs +++ b/src/cron/scheduler.rs @@ -1,26 +1,21 @@ +use crate::channels::{Channel, DiscordChannel, SlackChannel, TelegramChannel}; use crate::config::Config; -use crate::cron::{due_jobs, reschedule_after_run, CronJob}; +use crate::cron::{ + due_jobs, next_run_for_schedule, record_last_run, record_run, remove_job, reschedule_after_run, + update_job, CronJob, CronJobPatch, DeliveryConfig, JobType, Schedule, SessionTarget, +}; use crate::security::SecurityPolicy; use anyhow::Result; -use chrono::Utc; +use chrono::{DateTime, Utc}; use tokio::process::Command; use tokio::time::{self, Duration}; const MIN_POLL_SECONDS: u64 = 5; pub async fn run(config: Config) -> Result<()> { - if !config.scheduler.enabled { - tracing::info!("Scheduler disabled by config"); - crate::health::mark_component_ok("scheduler"); - loop { - time::sleep(Duration::from_secs(3600)).await; - } - } - let poll_secs = config.reliability.scheduler_poll_secs.max(MIN_POLL_SECONDS); let mut interval = time::interval(Duration::from_secs(poll_secs)); let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); - let max_concurrent = config.scheduler.max_concurrent.max(1); crate::health::mark_component_ok("scheduler"); @@ -36,22 +31,28 @@ pub async fn run(config: Config) -> Result<()> { } }; - for job in jobs.into_iter().take(max_concurrent) { + for job in jobs { crate::health::mark_component_ok("scheduler"); + warn_if_high_frequency_agent_job(&job); + + let started_at = Utc::now(); let (success, output) = execute_job_with_retry(&config, &security, &job).await; + let finished_at = Utc::now(); + let success = + persist_job_result(&config, &job, success, &output, started_at, finished_at).await; if !success { crate::health::mark_component_error("scheduler", format!("job {} failed", job.id)); } - - if let Err(e) = reschedule_after_run(&config, &job, success, &output) { - crate::health::mark_component_error("scheduler", e.to_string()); - tracing::warn!("Failed to persist scheduler run result: {e}"); - } } } } +pub async fn execute_job_now(config: &Config, job: &CronJob) -> (bool, String) { + let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); + execute_job_with_retry(config, &security, job).await +} + async fn execute_job_with_retry( config: &Config, security: &SecurityPolicy, @@ -62,7 +63,10 @@ async fn execute_job_with_retry( let mut backoff_ms = config.reliability.provider_backoff_ms.max(200); for attempt in 0..=retries { - let (success, output) = run_job_command(config, security, job).await; + let (success, output) = match job.job_type { + JobType::Shell => run_job_command(config, security, job).await, + JobType::Agent => run_agent_job(config, job).await, + }; last_output = output; if success { @@ -84,6 +88,185 @@ async fn execute_job_with_retry( (false, last_output) } +async fn run_agent_job(config: &Config, job: &CronJob) -> (bool, String) { + let name = job.name.clone().unwrap_or_else(|| "cron-job".to_string()); + let prompt = job.prompt.clone().unwrap_or_default(); + let prefixed_prompt = format!("[cron:{} {name}] {prompt}", job.id); + let model_override = job.model.clone(); + + let run_result = match job.session_target { + SessionTarget::Main | SessionTarget::Isolated => { + crate::agent::run( + config.clone(), + Some(prefixed_prompt), + None, + model_override, + config.default_temperature, + vec![], + ) + .await + } + }; + + match run_result { + Ok(response) => ( + true, + if response.trim().is_empty() { + "agent job executed".to_string() + } else { + response + }, + ), + Err(e) => (false, format!("agent job failed: {e}")), + } +} + +async fn persist_job_result( + config: &Config, + job: &CronJob, + mut success: bool, + output: &str, + started_at: DateTime, + finished_at: DateTime, +) -> bool { + let duration_ms = (finished_at - started_at).num_milliseconds(); + + if let Err(e) = deliver_if_configured(config, job, output).await { + if job.delivery.best_effort { + tracing::warn!("Cron delivery failed (best_effort): {e}"); + } else { + success = false; + tracing::warn!("Cron delivery failed: {e}"); + } + } + + let _ = record_run( + config, + &job.id, + started_at, + finished_at, + if success { "ok" } else { "error" }, + Some(output), + duration_ms, + ); + + if is_one_shot_auto_delete(job) { + if success { + if let Err(e) = remove_job(config, &job.id) { + tracing::warn!("Failed to remove one-shot cron job after success: {e}"); + } + } else { + let _ = record_last_run(config, &job.id, finished_at, false, output); + if let Err(e) = update_job( + config, + &job.id, + CronJobPatch { + enabled: Some(false), + ..CronJobPatch::default() + }, + ) { + tracing::warn!("Failed to disable failed one-shot cron job: {e}"); + } + } + return success; + } + + if let Err(e) = reschedule_after_run(config, job, success, output) { + tracing::warn!("Failed to persist scheduler run result: {e}"); + } + + success +} + +fn is_one_shot_auto_delete(job: &CronJob) -> bool { + job.delete_after_run && matches!(job.schedule, Schedule::At { .. }) +} + +fn warn_if_high_frequency_agent_job(job: &CronJob) { + if !matches!(job.job_type, JobType::Agent) { + return; + } + let too_frequent = match &job.schedule { + Schedule::Every { every_ms } => *every_ms < 5 * 60 * 1000, + Schedule::Cron { .. } => { + let now = Utc::now(); + match ( + next_run_for_schedule(&job.schedule, now), + next_run_for_schedule(&job.schedule, now + chrono::Duration::seconds(1)), + ) { + (Ok(a), Ok(b)) => (b - a).num_minutes() < 5, + _ => false, + } + } + Schedule::At { .. } => false, + }; + + if too_frequent { + tracing::warn!( + "Cron agent job '{}' is scheduled more frequently than every 5 minutes", + job.id + ); + } +} + +async fn deliver_if_configured(config: &Config, job: &CronJob, output: &str) -> Result<()> { + let delivery: &DeliveryConfig = &job.delivery; + if !delivery.mode.eq_ignore_ascii_case("announce") { + return Ok(()); + } + + let channel = delivery + .channel + .as_deref() + .ok_or_else(|| anyhow::anyhow!("delivery.channel is required for announce mode"))?; + let target = delivery + .to + .as_deref() + .ok_or_else(|| anyhow::anyhow!("delivery.to is required for announce mode"))?; + + match channel.to_ascii_lowercase().as_str() { + "telegram" => { + let tg = config + .channels_config + .telegram + .as_ref() + .ok_or_else(|| anyhow::anyhow!("telegram channel not configured"))?; + let channel = TelegramChannel::new(tg.bot_token.clone(), tg.allowed_users.clone()); + channel.send(output, target).await?; + } + "discord" => { + let dc = config + .channels_config + .discord + .as_ref() + .ok_or_else(|| anyhow::anyhow!("discord channel not configured"))?; + let channel = DiscordChannel::new( + dc.bot_token.clone(), + dc.guild_id.clone(), + dc.allowed_users.clone(), + dc.listen_to_bots, + ); + channel.send(output, target).await?; + } + "slack" => { + let sl = config + .channels_config + .slack + .as_ref() + .ok_or_else(|| anyhow::anyhow!("slack channel not configured"))?; + let channel = SlackChannel::new( + sl.bot_token.clone(), + sl.channel_id.clone(), + sl.allowed_users.clone(), + ); + channel.send(output, target).await?; + } + other => anyhow::bail!("unsupported delivery channel: {other}"), + } + + Ok(()) +} + fn is_env_assignment(word: &str) -> bool { word.contains('=') && word @@ -212,7 +395,9 @@ async fn run_job_command( mod tests { use super::*; use crate::config::Config; + use crate::cron::{self, DeliveryConfig}; use crate::security::SecurityPolicy; + use chrono::{Duration as ChronoDuration, Utc}; use tempfile::TempDir; fn test_config(tmp: &TempDir) -> Config { @@ -229,12 +414,24 @@ mod tests { CronJob { id: "test-job".into(), expression: "* * * * *".into(), + schedule: crate::cron::Schedule::Cron { + expr: "* * * * *".into(), + tz: None, + }, command: command.into(), + prompt: None, + name: None, + job_type: JobType::Shell, + session_target: SessionTarget::Isolated, + model: None, + enabled: true, + delivery: DeliveryConfig::default(), + delete_after_run: false, + created_at: Utc::now(), next_run: Utc::now(), last_run: None, last_status: None, - paused: false, - one_shot: false, + last_output: None, } } @@ -356,4 +553,103 @@ mod tests { assert!(!success); assert!(output.contains("always_missing_for_retry_test")); } + + #[tokio::test] + async fn run_agent_job_returns_error_without_provider_key() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + let mut job = test_job(""); + job.job_type = JobType::Agent; + job.prompt = Some("Say hello".into()); + + let (success, output) = run_agent_job(&config, &job).await; + assert!(!success); + assert!(output.contains("agent job failed:")); + } + + #[tokio::test] + async fn persist_job_result_records_run_and_reschedules_shell_job() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + let job = cron::add_job(&config, "*/5 * * * *", "echo ok").unwrap(); + let started = Utc::now(); + let finished = started + ChronoDuration::milliseconds(10); + + let success = persist_job_result(&config, &job, true, "ok", started, finished).await; + assert!(success); + + let runs = cron::list_runs(&config, &job.id, 10).unwrap(); + assert_eq!(runs.len(), 1); + let updated = cron::get_job(&config, &job.id).unwrap(); + assert_eq!(updated.last_status.as_deref(), Some("ok")); + } + + #[tokio::test] + async fn persist_job_result_success_deletes_one_shot() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + let at = Utc::now() + ChronoDuration::minutes(10); + let job = cron::add_agent_job( + &config, + Some("one-shot".into()), + crate::cron::Schedule::At { at }, + "Hello", + SessionTarget::Isolated, + None, + None, + true, + ) + .unwrap(); + let started = Utc::now(); + let finished = started + ChronoDuration::milliseconds(10); + + let success = persist_job_result(&config, &job, true, "ok", started, finished).await; + assert!(success); + let lookup = cron::get_job(&config, &job.id); + assert!(lookup.is_err()); + } + + #[tokio::test] + async fn persist_job_result_failure_disables_one_shot() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + let at = Utc::now() + ChronoDuration::minutes(10); + let job = cron::add_agent_job( + &config, + Some("one-shot".into()), + crate::cron::Schedule::At { at }, + "Hello", + SessionTarget::Isolated, + None, + None, + true, + ) + .unwrap(); + let started = Utc::now(); + let finished = started + ChronoDuration::milliseconds(10); + + let success = persist_job_result(&config, &job, false, "boom", started, finished).await; + assert!(!success); + let updated = cron::get_job(&config, &job.id).unwrap(); + assert!(!updated.enabled); + assert_eq!(updated.last_status.as_deref(), Some("error")); + } + + #[tokio::test] + async fn deliver_if_configured_handles_none_and_invalid_channel() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + let mut job = test_job("echo ok"); + + assert!(deliver_if_configured(&config, &job, "x").await.is_ok()); + + job.delivery = DeliveryConfig { + mode: "announce".into(), + channel: Some("invalid".into()), + to: Some("target".into()), + best_effort: true, + }; + let err = deliver_if_configured(&config, &job, "x").await.unwrap_err(); + assert!(err.to_string().contains("unsupported delivery channel")); + } } diff --git a/src/cron/store.rs b/src/cron/store.rs new file mode 100644 index 0000000..013ed55 --- /dev/null +++ b/src/cron/store.rs @@ -0,0 +1,668 @@ +use crate::config::Config; +use crate::cron::{ + next_run_for_schedule, schedule_cron_expression, validate_schedule, CronJob, CronJobPatch, + CronRun, DeliveryConfig, JobType, Schedule, SessionTarget, +}; +use anyhow::{Context, Result}; +use chrono::{DateTime, Utc}; +use rusqlite::{params, Connection}; +use uuid::Uuid; + +pub fn add_job(config: &Config, expression: &str, command: &str) -> Result { + let schedule = Schedule::Cron { + expr: expression.to_string(), + tz: None, + }; + add_shell_job(config, None, schedule, command) +} + +pub fn add_shell_job( + config: &Config, + name: Option, + schedule: Schedule, + command: &str, +) -> Result { + let now = Utc::now(); + validate_schedule(&schedule, now)?; + let next_run = next_run_for_schedule(&schedule, now)?; + let id = Uuid::new_v4().to_string(); + let expression = schedule_cron_expression(&schedule).unwrap_or_default(); + let schedule_json = serde_json::to_string(&schedule)?; + + with_connection(config, |conn| { + conn.execute( + "INSERT INTO cron_jobs ( + id, expression, command, schedule, job_type, prompt, name, session_target, model, + enabled, delivery, delete_after_run, created_at, next_run + ) VALUES (?1, ?2, ?3, ?4, 'shell', NULL, ?5, 'isolated', NULL, 1, ?6, 0, ?7, ?8)", + params![ + id, + expression, + command, + schedule_json, + name, + serde_json::to_string(&DeliveryConfig::default())?, + now.to_rfc3339(), + next_run.to_rfc3339(), + ], + ) + .context("Failed to insert cron shell job")?; + Ok(()) + })?; + + get_job(config, &id) +} + +#[allow(clippy::too_many_arguments)] +pub fn add_agent_job( + config: &Config, + name: Option, + schedule: Schedule, + prompt: &str, + session_target: SessionTarget, + model: Option, + delivery: Option, + delete_after_run: bool, +) -> Result { + let now = Utc::now(); + validate_schedule(&schedule, now)?; + let next_run = next_run_for_schedule(&schedule, now)?; + let id = Uuid::new_v4().to_string(); + let expression = schedule_cron_expression(&schedule).unwrap_or_default(); + let schedule_json = serde_json::to_string(&schedule)?; + let delivery = delivery.unwrap_or_default(); + + with_connection(config, |conn| { + conn.execute( + "INSERT INTO cron_jobs ( + id, expression, command, schedule, job_type, prompt, name, session_target, model, + enabled, delivery, delete_after_run, created_at, next_run + ) VALUES (?1, ?2, '', ?3, 'agent', ?4, ?5, ?6, ?7, 1, ?8, ?9, ?10, ?11)", + params![ + id, + expression, + schedule_json, + prompt, + name, + session_target.as_str(), + model, + serde_json::to_string(&delivery)?, + if delete_after_run { 1 } else { 0 }, + now.to_rfc3339(), + next_run.to_rfc3339(), + ], + ) + .context("Failed to insert cron agent job")?; + Ok(()) + })?; + + get_job(config, &id) +} + +pub fn list_jobs(config: &Config) -> Result> { + with_connection(config, |conn| { + let mut stmt = conn.prepare( + "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model, + enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output + FROM cron_jobs ORDER BY next_run ASC", + )?; + + let rows = stmt.query_map([], map_cron_job_row)?; + + let mut jobs = Vec::new(); + for row in rows { + jobs.push(row?); + } + Ok(jobs) + }) +} + +pub fn get_job(config: &Config, job_id: &str) -> Result { + with_connection(config, |conn| { + let mut stmt = conn.prepare( + "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model, + enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output + FROM cron_jobs WHERE id = ?1", + )?; + + let mut rows = stmt.query(params![job_id])?; + if let Some(row) = rows.next()? { + map_cron_job_row(row).map_err(Into::into) + } else { + anyhow::bail!("Cron job '{job_id}' not found") + } + }) +} + +pub fn remove_job(config: &Config, id: &str) -> Result<()> { + let changed = with_connection(config, |conn| { + conn.execute("DELETE FROM cron_jobs WHERE id = ?1", params![id]) + .context("Failed to delete cron job") + })?; + + if changed == 0 { + anyhow::bail!("Cron job '{id}' not found"); + } + + println!("✅ Removed cron job {id}"); + Ok(()) +} + +pub fn due_jobs(config: &Config, now: DateTime) -> Result> { + with_connection(config, |conn| { + let mut stmt = conn.prepare( + "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model, + enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output + FROM cron_jobs WHERE enabled = 1 AND next_run <= ?1 ORDER BY next_run ASC", + )?; + + let rows = stmt.query_map(params![now.to_rfc3339()], map_cron_job_row)?; + + let mut jobs = Vec::new(); + for row in rows { + jobs.push(row?); + } + Ok(jobs) + }) +} + +pub fn update_job(config: &Config, job_id: &str, patch: CronJobPatch) -> Result { + let mut job = get_job(config, job_id)?; + let mut schedule_changed = false; + + if let Some(schedule) = patch.schedule { + validate_schedule(&schedule, Utc::now())?; + job.schedule = schedule; + job.expression = schedule_cron_expression(&job.schedule).unwrap_or_default(); + schedule_changed = true; + } + if let Some(command) = patch.command { + job.command = command; + } + if let Some(prompt) = patch.prompt { + job.prompt = Some(prompt); + } + if let Some(name) = patch.name { + job.name = Some(name); + } + if let Some(enabled) = patch.enabled { + job.enabled = enabled; + } + if let Some(delivery) = patch.delivery { + job.delivery = delivery; + } + if let Some(model) = patch.model { + job.model = Some(model); + } + if let Some(target) = patch.session_target { + job.session_target = target; + } + if let Some(delete_after_run) = patch.delete_after_run { + job.delete_after_run = delete_after_run; + } + + if schedule_changed { + job.next_run = next_run_for_schedule(&job.schedule, Utc::now())?; + } + + with_connection(config, |conn| { + conn.execute( + "UPDATE cron_jobs + SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4, prompt = ?5, name = ?6, + session_target = ?7, model = ?8, enabled = ?9, delivery = ?10, delete_after_run = ?11, + next_run = ?12 + WHERE id = ?13", + params![ + job.expression, + job.command, + serde_json::to_string(&job.schedule)?, + job.job_type.as_str(), + job.prompt, + job.name, + job.session_target.as_str(), + job.model, + if job.enabled { 1 } else { 0 }, + serde_json::to_string(&job.delivery)?, + if job.delete_after_run { 1 } else { 0 }, + job.next_run.to_rfc3339(), + job.id, + ], + ) + .context("Failed to update cron job")?; + Ok(()) + })?; + + get_job(config, job_id) +} + +pub fn record_last_run( + config: &Config, + job_id: &str, + finished_at: DateTime, + success: bool, + output: &str, +) -> Result<()> { + let status = if success { "ok" } else { "error" }; + with_connection(config, |conn| { + conn.execute( + "UPDATE cron_jobs + SET last_run = ?1, last_status = ?2, last_output = ?3 + WHERE id = ?4", + params![finished_at.to_rfc3339(), status, output, job_id], + ) + .context("Failed to update cron last run fields")?; + Ok(()) + }) +} + +pub fn reschedule_after_run( + config: &Config, + job: &CronJob, + success: bool, + output: &str, +) -> Result<()> { + let now = Utc::now(); + let next_run = next_run_for_schedule(&job.schedule, now)?; + let status = if success { "ok" } else { "error" }; + + with_connection(config, |conn| { + conn.execute( + "UPDATE cron_jobs + SET next_run = ?1, last_run = ?2, last_status = ?3, last_output = ?4 + WHERE id = ?5", + params![ + next_run.to_rfc3339(), + now.to_rfc3339(), + status, + output, + job.id + ], + ) + .context("Failed to update cron job run state")?; + Ok(()) + }) +} + +pub fn record_run( + config: &Config, + job_id: &str, + started_at: DateTime, + finished_at: DateTime, + status: &str, + output: Option<&str>, + duration_ms: i64, +) -> Result<()> { + with_connection(config, |conn| { + conn.execute( + "INSERT INTO cron_runs (job_id, started_at, finished_at, status, output, duration_ms) + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + params![ + job_id, + started_at.to_rfc3339(), + finished_at.to_rfc3339(), + status, + output, + duration_ms, + ], + ) + .context("Failed to insert cron run")?; + + let keep = i64::from(config.cron.max_run_history.max(1)); + conn.execute( + "DELETE FROM cron_runs + WHERE job_id = ?1 + AND id NOT IN ( + SELECT id FROM cron_runs + WHERE job_id = ?1 + ORDER BY started_at DESC, id DESC + LIMIT ?2 + )", + params![job_id, keep], + ) + .context("Failed to prune cron run history")?; + Ok(()) + }) +} + +pub fn list_runs(config: &Config, job_id: &str, limit: usize) -> Result> { + with_connection(config, |conn| { + let lim = i64::try_from(limit.max(1)).context("Run history limit overflow")?; + let mut stmt = conn.prepare( + "SELECT id, job_id, started_at, finished_at, status, output, duration_ms + FROM cron_runs + WHERE job_id = ?1 + ORDER BY started_at DESC, id DESC + LIMIT ?2", + )?; + + let rows = stmt.query_map(params![job_id, lim], |row| { + Ok(CronRun { + id: row.get(0)?, + job_id: row.get(1)?, + started_at: parse_rfc3339(&row.get::<_, String>(2)?) + .map_err(sql_conversion_error)?, + finished_at: parse_rfc3339(&row.get::<_, String>(3)?) + .map_err(sql_conversion_error)?, + status: row.get(4)?, + output: row.get(5)?, + duration_ms: row.get(6)?, + }) + })?; + + let mut runs = Vec::new(); + for row in rows { + runs.push(row?); + } + Ok(runs) + }) +} + +fn parse_rfc3339(raw: &str) -> Result> { + let parsed = DateTime::parse_from_rfc3339(raw) + .with_context(|| format!("Invalid RFC3339 timestamp in cron DB: {raw}"))?; + Ok(parsed.with_timezone(&Utc)) +} + +fn sql_conversion_error(err: anyhow::Error) -> rusqlite::Error { + rusqlite::Error::ToSqlConversionFailure(err.into()) +} + +fn map_cron_job_row(row: &rusqlite::Row<'_>) -> rusqlite::Result { + let expression: String = row.get(1)?; + let schedule_raw: Option = row.get(3)?; + let schedule = + decode_schedule(schedule_raw.as_deref(), &expression).map_err(sql_conversion_error)?; + + let delivery_raw: Option = row.get(10)?; + let delivery = decode_delivery(delivery_raw.as_deref()).map_err(sql_conversion_error)?; + + let next_run_raw: String = row.get(13)?; + let last_run_raw: Option = row.get(14)?; + let created_at_raw: String = row.get(12)?; + + Ok(CronJob { + id: row.get(0)?, + expression, + schedule, + command: row.get(2)?, + job_type: JobType::parse(&row.get::<_, String>(4)?), + prompt: row.get(5)?, + name: row.get(6)?, + session_target: SessionTarget::parse(&row.get::<_, String>(7)?), + model: row.get(8)?, + enabled: row.get::<_, i64>(9)? != 0, + delivery, + delete_after_run: row.get::<_, i64>(11)? != 0, + created_at: parse_rfc3339(&created_at_raw).map_err(sql_conversion_error)?, + next_run: parse_rfc3339(&next_run_raw).map_err(sql_conversion_error)?, + last_run: match last_run_raw { + Some(raw) => Some(parse_rfc3339(&raw).map_err(sql_conversion_error)?), + None => None, + }, + last_status: row.get(15)?, + last_output: row.get(16)?, + }) +} + +fn decode_schedule(schedule_raw: Option<&str>, expression: &str) -> Result { + if let Some(raw) = schedule_raw { + let trimmed = raw.trim(); + if !trimmed.is_empty() { + return serde_json::from_str(trimmed) + .with_context(|| format!("Failed to parse cron schedule JSON: {trimmed}")); + } + } + + if expression.trim().is_empty() { + anyhow::bail!("Missing schedule and legacy expression for cron job") + } + + Ok(Schedule::Cron { + expr: expression.to_string(), + tz: None, + }) +} + +fn decode_delivery(delivery_raw: Option<&str>) -> Result { + if let Some(raw) = delivery_raw { + let trimmed = raw.trim(); + if !trimmed.is_empty() { + return serde_json::from_str(trimmed) + .with_context(|| format!("Failed to parse cron delivery JSON: {trimmed}")); + } + } + Ok(DeliveryConfig::default()) +} + +fn add_column_if_missing(conn: &Connection, name: &str, sql_type: &str) -> Result<()> { + let mut stmt = conn.prepare("PRAGMA table_info(cron_jobs)")?; + let mut rows = stmt.query([])?; + while let Some(row) = rows.next()? { + let col_name: String = row.get(1)?; + if col_name == name { + return Ok(()); + } + } + + conn.execute( + &format!("ALTER TABLE cron_jobs ADD COLUMN {name} {sql_type}"), + [], + ) + .with_context(|| format!("Failed to add cron_jobs.{name}"))?; + Ok(()) +} + +fn with_connection(config: &Config, f: impl FnOnce(&Connection) -> Result) -> Result { + let db_path = config.workspace_dir.join("cron").join("jobs.db"); + if let Some(parent) = db_path.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("Failed to create cron directory: {}", parent.display()))?; + } + + let conn = Connection::open(&db_path) + .with_context(|| format!("Failed to open cron DB: {}", db_path.display()))?; + + conn.execute_batch( + "PRAGMA foreign_keys = ON; + CREATE TABLE IF NOT EXISTS cron_jobs ( + id TEXT PRIMARY KEY, + expression TEXT NOT NULL, + command TEXT NOT NULL, + schedule TEXT, + job_type TEXT NOT NULL DEFAULT 'shell', + prompt TEXT, + name TEXT, + session_target TEXT NOT NULL DEFAULT 'isolated', + model TEXT, + enabled INTEGER NOT NULL DEFAULT 1, + delivery TEXT, + delete_after_run INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL, + next_run TEXT NOT NULL, + last_run TEXT, + last_status TEXT, + last_output TEXT + ); + CREATE INDEX IF NOT EXISTS idx_cron_jobs_next_run ON cron_jobs(next_run); + + CREATE TABLE IF NOT EXISTS cron_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id TEXT NOT NULL, + started_at TEXT NOT NULL, + finished_at TEXT NOT NULL, + status TEXT NOT NULL, + output TEXT, + duration_ms INTEGER, + FOREIGN KEY (job_id) REFERENCES cron_jobs(id) ON DELETE CASCADE + ); + CREATE INDEX IF NOT EXISTS idx_cron_runs_job_id ON cron_runs(job_id); + CREATE INDEX IF NOT EXISTS idx_cron_runs_started_at ON cron_runs(started_at);", + ) + .context("Failed to initialize cron schema")?; + + add_column_if_missing(&conn, "schedule", "TEXT")?; + add_column_if_missing(&conn, "job_type", "TEXT NOT NULL DEFAULT 'shell'")?; + add_column_if_missing(&conn, "prompt", "TEXT")?; + add_column_if_missing(&conn, "name", "TEXT")?; + add_column_if_missing(&conn, "session_target", "TEXT NOT NULL DEFAULT 'isolated'")?; + add_column_if_missing(&conn, "model", "TEXT")?; + add_column_if_missing(&conn, "enabled", "INTEGER NOT NULL DEFAULT 1")?; + add_column_if_missing(&conn, "delivery", "TEXT")?; + add_column_if_missing(&conn, "delete_after_run", "INTEGER NOT NULL DEFAULT 0")?; + + f(&conn) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::Config; + use chrono::Duration as ChronoDuration; + use tempfile::TempDir; + + fn test_config(tmp: &TempDir) -> Config { + let config = Config { + workspace_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + ..Config::default() + }; + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + config + } + + #[test] + fn add_job_accepts_five_field_expression() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let job = add_job(&config, "*/5 * * * *", "echo ok").unwrap(); + assert_eq!(job.expression, "*/5 * * * *"); + assert_eq!(job.command, "echo ok"); + assert!(matches!(job.schedule, Schedule::Cron { .. })); + } + + #[test] + fn add_list_remove_roundtrip() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let job = add_job(&config, "*/10 * * * *", "echo roundtrip").unwrap(); + let listed = list_jobs(&config).unwrap(); + assert_eq!(listed.len(), 1); + assert_eq!(listed[0].id, job.id); + + remove_job(&config, &job.id).unwrap(); + assert!(list_jobs(&config).unwrap().is_empty()); + } + + #[test] + fn due_jobs_filters_by_timestamp_and_enabled() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let job = add_job(&config, "* * * * *", "echo due").unwrap(); + + let due_now = due_jobs(&config, Utc::now()).unwrap(); + assert!(due_now.is_empty(), "new job should not be due immediately"); + + let far_future = Utc::now() + ChronoDuration::days(365); + let due_future = due_jobs(&config, far_future).unwrap(); + assert_eq!(due_future.len(), 1, "job should be due in far future"); + + let _ = update_job( + &config, + &job.id, + CronJobPatch { + enabled: Some(false), + ..CronJobPatch::default() + }, + ) + .unwrap(); + let due_after_disable = due_jobs(&config, far_future).unwrap(); + assert!(due_after_disable.is_empty()); + } + + #[test] + fn reschedule_after_run_persists_last_status_and_last_run() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let job = add_job(&config, "*/15 * * * *", "echo run").unwrap(); + reschedule_after_run(&config, &job, false, "failed output").unwrap(); + + let listed = list_jobs(&config).unwrap(); + let stored = listed.iter().find(|j| j.id == job.id).unwrap(); + assert_eq!(stored.last_status.as_deref(), Some("error")); + assert!(stored.last_run.is_some()); + assert_eq!(stored.last_output.as_deref(), Some("failed output")); + } + + #[test] + fn migration_falls_back_to_legacy_expression() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + with_connection(&config, |conn| { + conn.execute( + "INSERT INTO cron_jobs (id, expression, command, created_at, next_run) + VALUES (?1, ?2, ?3, ?4, ?5)", + params![ + "legacy-id", + "*/5 * * * *", + "echo legacy", + Utc::now().to_rfc3339(), + (Utc::now() + ChronoDuration::minutes(5)).to_rfc3339(), + ], + )?; + conn.execute( + "UPDATE cron_jobs SET schedule = NULL WHERE id = 'legacy-id'", + [], + )?; + Ok(()) + }) + .unwrap(); + + let job = get_job(&config, "legacy-id").unwrap(); + assert!(matches!(job.schedule, Schedule::Cron { .. })); + } + + #[test] + fn record_and_prune_runs() { + let tmp = TempDir::new().unwrap(); + let mut config = test_config(&tmp); + config.cron.max_run_history = 2; + let job = add_job(&config, "*/5 * * * *", "echo ok").unwrap(); + let base = Utc::now(); + + for idx in 0..3 { + let start = base + ChronoDuration::seconds(idx); + let end = start + ChronoDuration::milliseconds(100); + record_run(&config, &job.id, start, end, "ok", Some("done"), 100).unwrap(); + } + + let runs = list_runs(&config, &job.id, 10).unwrap(); + assert_eq!(runs.len(), 2); + } + + #[test] + fn remove_job_cascades_run_history() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + let job = add_job(&config, "*/5 * * * *", "echo ok").unwrap(); + let start = Utc::now(); + record_run( + &config, + &job.id, + start, + start + ChronoDuration::milliseconds(5), + "ok", + Some("ok"), + 5, + ) + .unwrap(); + + remove_job(&config, &job.id).unwrap(); + let runs = list_runs(&config, &job.id, 10).unwrap(); + assert!(runs.is_empty()); + } +} diff --git a/src/cron/types.rs b/src/cron/types.rs new file mode 100644 index 0000000..f6d3c66 --- /dev/null +++ b/src/cron/types.rs @@ -0,0 +1,140 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +#[serde(rename_all = "lowercase")] +pub enum JobType { + #[default] + Shell, + Agent, +} + +impl JobType { + pub(crate) fn as_str(&self) -> &'static str { + match self { + Self::Shell => "shell", + Self::Agent => "agent", + } + } + + pub(crate) fn parse(raw: &str) -> Self { + if raw.eq_ignore_ascii_case("agent") { + Self::Agent + } else { + Self::Shell + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +#[serde(rename_all = "lowercase")] +pub enum SessionTarget { + #[default] + Isolated, + Main, +} + +impl SessionTarget { + pub(crate) fn as_str(&self) -> &'static str { + match self { + Self::Isolated => "isolated", + Self::Main => "main", + } + } + + pub(crate) fn parse(raw: &str) -> Self { + if raw.eq_ignore_ascii_case("main") { + Self::Main + } else { + Self::Isolated + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "kind", rename_all = "lowercase")] +pub enum Schedule { + Cron { + expr: String, + #[serde(default)] + tz: Option, + }, + At { + at: DateTime, + }, + Every { + every_ms: u64, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct DeliveryConfig { + #[serde(default)] + pub mode: String, + #[serde(default)] + pub channel: Option, + #[serde(default)] + pub to: Option, + #[serde(default = "default_true")] + pub best_effort: bool, +} + +impl Default for DeliveryConfig { + fn default() -> Self { + Self { + mode: "none".to_string(), + channel: None, + to: None, + best_effort: true, + } + } +} + +fn default_true() -> bool { + true +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CronJob { + pub id: String, + pub expression: String, + pub schedule: Schedule, + pub command: String, + pub prompt: Option, + pub name: Option, + pub job_type: JobType, + pub session_target: SessionTarget, + pub model: Option, + pub enabled: bool, + pub delivery: DeliveryConfig, + pub delete_after_run: bool, + pub created_at: DateTime, + pub next_run: DateTime, + pub last_run: Option>, + pub last_status: Option, + pub last_output: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CronRun { + pub id: i64, + pub job_id: String, + pub started_at: DateTime, + pub finished_at: DateTime, + pub status: String, + pub output: Option, + pub duration_ms: Option, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct CronJobPatch { + pub schedule: Option, + pub command: Option, + pub prompt: Option, + pub name: Option, + pub enabled: Option, + pub delivery: Option, + pub model: Option, + pub session_target: Option, + pub delete_after_run: Option, +} diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index c7935ca..c2f4487 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -71,7 +71,7 @@ pub async fn run(config: Config, host: String, port: u16) -> Result<()> { )); } - { + if config.cron.enabled { let scheduler_cfg = config.clone(); handles.push(spawn_component_supervisor( "scheduler", @@ -82,6 +82,9 @@ pub async fn run(config: Config, host: String, port: u16) -> Result<()> { async move { crate::cron::scheduler::run(cfg).await } }, )); + } else { + crate::health::mark_component_ok("scheduler"); + tracing::info!("Cron disabled; scheduler supervisor not started"); } println!("🧠 ZeroClaw daemon started"); diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 2198cce..60b78a7 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -10,8 +10,12 @@ use crate::channels::{Channel, WhatsAppChannel}; use crate::config::Config; use crate::memory::{self, Memory, MemoryCategory}; +use crate::observability::{self, Observer}; use crate::providers::{self, Provider}; +use crate::runtime; use crate::security::pairing::{constant_time_eq, is_public_bind, PairingGuard}; +use crate::security::SecurityPolicy; +use crate::tools::{self, Tool}; use crate::util::truncate_with_ellipsis; use anyhow::Result; use axum::{ @@ -218,6 +222,56 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> { &config.workspace_dir, config.api_key.as_deref(), )?); + let observer: Arc = + Arc::from(observability::create_observer(&config.observability)); + let runtime: Arc = + Arc::from(runtime::create_runtime(&config.runtime)?); + let security = Arc::new(SecurityPolicy::from_config( + &config.autonomy, + &config.workspace_dir, + )); + + let (composio_key, composio_entity_id) = if config.composio.enabled { + ( + config.composio.api_key.as_deref(), + Some(config.composio.entity_id.as_str()), + ) + } else { + (None, None) + }; + + let tools_registry = Arc::new(tools::all_tools_with_runtime( + Arc::new(config.clone()), + &security, + runtime, + Arc::clone(&mem), + composio_key, + composio_entity_id, + &config.browser, + &config.http_request, + &config.workspace_dir, + &config.agents, + config.api_key.as_deref(), + &config, + )); + let skills = crate::skills::load_skills(&config.workspace_dir); + let tool_descs: Vec<(&str, &str)> = tools_registry + .iter() + .map(|tool| (tool.name(), tool.description())) + .collect(); + + let mut system_prompt = crate::channels::build_system_prompt( + &config.workspace_dir, + &model, + &tool_descs, + &skills, + Some(&config.identity), + None, // bootstrap_max_chars — no compact context for gateway + ); + system_prompt.push_str(&crate::agent::loop_::build_tool_instructions( + tools_registry.as_ref(), + )); + let system_prompt = Arc::new(system_prompt); // Extract webhook secret for authentication let webhook_secret: Option> = config diff --git a/src/lib.rs b/src/lib.rs index cfde7a6..7f4ebb4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -147,6 +147,23 @@ pub enum CronCommands { Add { /// Cron expression expression: String, + /// Optional IANA timezone (e.g. America/Los_Angeles) + #[arg(long)] + tz: Option, + /// Command to run + command: String, + }, + /// Add a one-shot scheduled task at an RFC3339 timestamp + AddAt { + /// One-shot timestamp in RFC3339 format + at: String, + /// Command to run + command: String, + }, + /// Add a fixed-interval scheduled task + AddEvery { + /// Interval in milliseconds + every_ms: u64, /// Command to run command: String, }, diff --git a/src/main.rs b/src/main.rs index 4e808fd..dbc76ff 100644 --- a/src/main.rs +++ b/src/main.rs @@ -136,9 +136,9 @@ enum Commands { #[arg(long)] model: Option, - /// Temperature (0.0 - 2.0); defaults to config default_temperature - #[arg(short, long)] - temperature: Option, + /// Temperature (0.0 - 2.0) + #[arg(short, long, default_value = "0.7")] + temperature: f64, /// Attach a peripheral (board:path, e.g. nucleo-f401re:/dev/ttyACM0) #[arg(long)] @@ -250,6 +250,23 @@ enum CronCommands { Add { /// Cron expression expression: String, + /// Optional IANA timezone (e.g. America/Los_Angeles) + #[arg(long)] + tz: Option, + /// Command to run + command: String, + }, + /// Add a one-shot scheduled task at an RFC3339 timestamp + AddAt { + /// One-shot timestamp in RFC3339 format + at: String, + /// Command to run + command: String, + }, + /// Add a fixed-interval scheduled task + AddEvery { + /// Interval in milliseconds + every_ms: u64, /// Command to run command: String, }, @@ -412,10 +429,9 @@ async fn main() -> Result<()> { model, temperature, peripheral, - } => { - let temp = temperature.unwrap_or(config.default_temperature); - agent::run(config, message, provider, model, temp, peripheral).await - } + } => agent::run(config, message, provider, model, temperature, peripheral) + .await + .map(|_| ()), Commands::Gateway { port, host } => { if port == 0 { diff --git a/src/onboard/wizard.rs b/src/onboard/wizard.rs index 94305b6..20c3baa 100644 --- a/src/onboard/wizard.rs +++ b/src/onboard/wizard.rs @@ -117,6 +117,7 @@ pub fn run_wizard() -> Result { agent: crate::config::schema::AgentConfig::default(), model_routes: Vec::new(), heartbeat: HeartbeatConfig::default(), + cron: crate::config::CronConfig::default(), channels_config, memory: memory_config, // User-selected memory backend tunnel: tunnel_config, @@ -329,6 +330,7 @@ pub fn run_quick_setup( agent: crate::config::schema::AgentConfig::default(), model_routes: Vec::new(), heartbeat: HeartbeatConfig::default(), + cron: crate::config::CronConfig::default(), channels_config: ChannelsConfig::default(), memory: memory_config, tunnel: crate::config::TunnelConfig::default(), diff --git a/src/tools/cron_add.rs b/src/tools/cron_add.rs new file mode 100644 index 0000000..bd3abea --- /dev/null +++ b/src/tools/cron_add.rs @@ -0,0 +1,326 @@ +use super::traits::{Tool, ToolResult}; +use crate::config::Config; +use crate::cron::{self, DeliveryConfig, JobType, Schedule, SessionTarget}; +use crate::security::SecurityPolicy; +use async_trait::async_trait; +use serde_json::json; +use std::sync::Arc; + +pub struct CronAddTool { + config: Arc, + security: Arc, +} + +impl CronAddTool { + pub fn new(config: Arc, security: Arc) -> Self { + Self { config, security } + } +} + +#[async_trait] +impl Tool for CronAddTool { + fn name(&self) -> &str { + "cron_add" + } + + fn description(&self) -> &str { + "Create a scheduled cron job (shell or agent) with cron/at/every schedules" + } + + fn parameters_schema(&self) -> serde_json::Value { + json!({ + "type": "object", + "properties": { + "name": { "type": "string" }, + "schedule": { + "type": "object", + "description": "Schedule object: {kind:'cron',expr,tz?} | {kind:'at',at} | {kind:'every',every_ms}" + }, + "job_type": { "type": "string", "enum": ["shell", "agent"] }, + "command": { "type": "string" }, + "prompt": { "type": "string" }, + "session_target": { "type": "string", "enum": ["isolated", "main"] }, + "model": { "type": "string" }, + "delivery": { "type": "object" }, + "delete_after_run": { "type": "boolean" } + }, + "required": ["schedule"] + }) + } + + async fn execute(&self, args: serde_json::Value) -> anyhow::Result { + if !self.config.cron.enabled { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("cron is disabled by config (cron.enabled=false)".to_string()), + }); + } + + let schedule = match args.get("schedule") { + Some(v) => match serde_json::from_value::(v.clone()) { + Ok(schedule) => schedule, + Err(e) => { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some(format!("Invalid schedule: {e}")), + }); + } + }, + None => { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("Missing 'schedule' parameter".to_string()), + }); + } + }; + + let name = args + .get("name") + .and_then(serde_json::Value::as_str) + .map(str::to_string); + + let job_type = match args.get("job_type").and_then(serde_json::Value::as_str) { + Some("agent") => JobType::Agent, + Some("shell") => JobType::Shell, + Some(other) => { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some(format!("Invalid job_type: {other}")), + }); + } + None => { + if args.get("prompt").is_some() { + JobType::Agent + } else { + JobType::Shell + } + } + }; + + let default_delete_after_run = matches!(schedule, Schedule::At { .. }); + let delete_after_run = args + .get("delete_after_run") + .and_then(serde_json::Value::as_bool) + .unwrap_or(default_delete_after_run); + + let result = match job_type { + JobType::Shell => { + let command = match args.get("command").and_then(serde_json::Value::as_str) { + Some(command) if !command.trim().is_empty() => command, + _ => { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("Missing 'command' for shell job".to_string()), + }); + } + }; + + if !self.security.is_command_allowed(command) { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some(format!("Command blocked by security policy: {command}")), + }); + } + + cron::add_shell_job(&self.config, name, schedule, command) + } + JobType::Agent => { + let prompt = match args.get("prompt").and_then(serde_json::Value::as_str) { + Some(prompt) if !prompt.trim().is_empty() => prompt, + _ => { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("Missing 'prompt' for agent job".to_string()), + }); + } + }; + + let session_target = match args.get("session_target") { + Some(v) => match serde_json::from_value::(v.clone()) { + Ok(target) => target, + Err(e) => { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some(format!("Invalid session_target: {e}")), + }); + } + }, + None => SessionTarget::Isolated, + }; + + let model = args + .get("model") + .and_then(serde_json::Value::as_str) + .map(str::to_string); + + let delivery = match args.get("delivery") { + Some(v) => match serde_json::from_value::(v.clone()) { + Ok(cfg) => Some(cfg), + Err(e) => { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some(format!("Invalid delivery config: {e}")), + }); + } + }, + None => None, + }; + + cron::add_agent_job( + &self.config, + name, + schedule, + prompt, + session_target, + model, + delivery, + delete_after_run, + ) + } + }; + + match result { + Ok(job) => Ok(ToolResult { + success: true, + output: serde_json::to_string_pretty(&json!({ + "id": job.id, + "name": job.name, + "job_type": job.job_type, + "schedule": job.schedule, + "next_run": job.next_run, + "enabled": job.enabled + }))?, + error: None, + }), + Err(e) => Ok(ToolResult { + success: false, + output: String::new(), + error: Some(e.to_string()), + }), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::Config; + use crate::security::AutonomyLevel; + use tempfile::TempDir; + + fn test_config(tmp: &TempDir) -> Arc { + let config = Config { + workspace_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + ..Config::default() + }; + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + Arc::new(config) + } + + fn test_security(cfg: &Config) -> Arc { + Arc::new(SecurityPolicy::from_config( + &cfg.autonomy, + &cfg.workspace_dir, + )) + } + + #[tokio::test] + async fn adds_shell_job() { + let tmp = TempDir::new().unwrap(); + let cfg = test_config(&tmp); + let tool = CronAddTool::new(cfg.clone(), test_security(&cfg)); + let result = tool + .execute(json!({ + "schedule": { "kind": "cron", "expr": "*/5 * * * *" }, + "job_type": "shell", + "command": "echo ok" + })) + .await + .unwrap(); + + assert!(result.success, "{:?}", result.error); + assert!(result.output.contains("next_run")); + } + + #[tokio::test] + async fn blocks_disallowed_shell_command() { + let tmp = TempDir::new().unwrap(); + let mut config = Config { + workspace_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + ..Config::default() + }; + config.autonomy.allowed_commands = vec!["echo".into()]; + config.autonomy.level = AutonomyLevel::Supervised; + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + let cfg = Arc::new(config); + let tool = CronAddTool::new(cfg.clone(), test_security(&cfg)); + + let result = tool + .execute(json!({ + "schedule": { "kind": "cron", "expr": "*/5 * * * *" }, + "job_type": "shell", + "command": "curl https://example.com" + })) + .await + .unwrap(); + + assert!(!result.success); + assert!(result + .error + .unwrap_or_default() + .contains("blocked by security policy")); + } + + #[tokio::test] + async fn rejects_invalid_schedule() { + let tmp = TempDir::new().unwrap(); + let cfg = test_config(&tmp); + let tool = CronAddTool::new(cfg.clone(), test_security(&cfg)); + + let result = tool + .execute(json!({ + "schedule": { "kind": "every", "every_ms": 0 }, + "job_type": "shell", + "command": "echo nope" + })) + .await + .unwrap(); + + assert!(!result.success); + assert!(result + .error + .unwrap_or_default() + .contains("every_ms must be > 0")); + } + + #[tokio::test] + async fn agent_job_requires_prompt() { + let tmp = TempDir::new().unwrap(); + let cfg = test_config(&tmp); + let tool = CronAddTool::new(cfg.clone(), test_security(&cfg)); + + let result = tool + .execute(json!({ + "schedule": { "kind": "cron", "expr": "*/5 * * * *" }, + "job_type": "agent" + })) + .await + .unwrap(); + assert!(!result.success); + assert!(result + .error + .unwrap_or_default() + .contains("Missing 'prompt'")); + } +} diff --git a/src/tools/cron_list.rs b/src/tools/cron_list.rs new file mode 100644 index 0000000..0392370 --- /dev/null +++ b/src/tools/cron_list.rs @@ -0,0 +1,101 @@ +use super::traits::{Tool, ToolResult}; +use crate::config::Config; +use crate::cron; +use async_trait::async_trait; +use serde_json::json; +use std::sync::Arc; + +pub struct CronListTool { + config: Arc, +} + +impl CronListTool { + pub fn new(config: Arc) -> Self { + Self { config } + } +} + +#[async_trait] +impl Tool for CronListTool { + fn name(&self) -> &str { + "cron_list" + } + + fn description(&self) -> &str { + "List all scheduled cron jobs" + } + + fn parameters_schema(&self) -> serde_json::Value { + json!({ + "type": "object", + "properties": {}, + "additionalProperties": false + }) + } + + async fn execute(&self, _args: serde_json::Value) -> anyhow::Result { + if !self.config.cron.enabled { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("cron is disabled by config (cron.enabled=false)".to_string()), + }); + } + + match cron::list_jobs(&self.config) { + Ok(jobs) => Ok(ToolResult { + success: true, + output: serde_json::to_string_pretty(&jobs)?, + error: None, + }), + Err(e) => Ok(ToolResult { + success: false, + output: String::new(), + error: Some(e.to_string()), + }), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::Config; + use tempfile::TempDir; + + fn test_config(tmp: &TempDir) -> Arc { + let config = Config { + workspace_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + ..Config::default() + }; + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + Arc::new(config) + } + + #[tokio::test] + async fn returns_empty_list_when_no_jobs() { + let tmp = TempDir::new().unwrap(); + let cfg = test_config(&tmp); + let tool = CronListTool::new(cfg); + + let result = tool.execute(json!({})).await.unwrap(); + assert!(result.success); + assert_eq!(result.output.trim(), "[]"); + } + + #[tokio::test] + async fn errors_when_cron_disabled() { + let tmp = TempDir::new().unwrap(); + let mut cfg = (*test_config(&tmp)).clone(); + cfg.cron.enabled = false; + let tool = CronListTool::new(Arc::new(cfg)); + + let result = tool.execute(json!({})).await.unwrap(); + assert!(!result.success); + assert!(result + .error + .unwrap_or_default() + .contains("cron is disabled")); + } +} diff --git a/src/tools/cron_remove.rs b/src/tools/cron_remove.rs new file mode 100644 index 0000000..01a70dc --- /dev/null +++ b/src/tools/cron_remove.rs @@ -0,0 +1,114 @@ +use super::traits::{Tool, ToolResult}; +use crate::config::Config; +use crate::cron; +use async_trait::async_trait; +use serde_json::json; +use std::sync::Arc; + +pub struct CronRemoveTool { + config: Arc, +} + +impl CronRemoveTool { + pub fn new(config: Arc) -> Self { + Self { config } + } +} + +#[async_trait] +impl Tool for CronRemoveTool { + fn name(&self) -> &str { + "cron_remove" + } + + fn description(&self) -> &str { + "Remove a cron job by id" + } + + fn parameters_schema(&self) -> serde_json::Value { + json!({ + "type": "object", + "properties": { + "job_id": { "type": "string" } + }, + "required": ["job_id"] + }) + } + + async fn execute(&self, args: serde_json::Value) -> anyhow::Result { + if !self.config.cron.enabled { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("cron is disabled by config (cron.enabled=false)".to_string()), + }); + } + + let job_id = match args.get("job_id").and_then(serde_json::Value::as_str) { + Some(v) if !v.trim().is_empty() => v, + _ => { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("Missing 'job_id' parameter".to_string()), + }); + } + }; + + match cron::remove_job(&self.config, job_id) { + Ok(()) => Ok(ToolResult { + success: true, + output: format!("Removed cron job {job_id}"), + error: None, + }), + Err(e) => Ok(ToolResult { + success: false, + output: String::new(), + error: Some(e.to_string()), + }), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::Config; + use tempfile::TempDir; + + fn test_config(tmp: &TempDir) -> Arc { + let config = Config { + workspace_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + ..Config::default() + }; + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + Arc::new(config) + } + + #[tokio::test] + async fn removes_existing_job() { + let tmp = TempDir::new().unwrap(); + let cfg = test_config(&tmp); + let job = cron::add_job(&cfg, "*/5 * * * *", "echo ok").unwrap(); + let tool = CronRemoveTool::new(cfg.clone()); + + let result = tool.execute(json!({"job_id": job.id})).await.unwrap(); + assert!(result.success); + assert!(cron::list_jobs(&cfg).unwrap().is_empty()); + } + + #[tokio::test] + async fn errors_when_job_id_missing() { + let tmp = TempDir::new().unwrap(); + let cfg = test_config(&tmp); + let tool = CronRemoveTool::new(cfg); + + let result = tool.execute(json!({})).await.unwrap(); + assert!(!result.success); + assert!(result + .error + .unwrap_or_default() + .contains("Missing 'job_id'")); + } +} diff --git a/src/tools/cron_run.rs b/src/tools/cron_run.rs new file mode 100644 index 0000000..a4e5f75 --- /dev/null +++ b/src/tools/cron_run.rs @@ -0,0 +1,147 @@ +use super::traits::{Tool, ToolResult}; +use crate::config::Config; +use crate::cron; +use async_trait::async_trait; +use chrono::Utc; +use serde_json::json; +use std::sync::Arc; + +pub struct CronRunTool { + config: Arc, +} + +impl CronRunTool { + pub fn new(config: Arc) -> Self { + Self { config } + } +} + +#[async_trait] +impl Tool for CronRunTool { + fn name(&self) -> &str { + "cron_run" + } + + fn description(&self) -> &str { + "Force-run a cron job immediately and record run history" + } + + fn parameters_schema(&self) -> serde_json::Value { + json!({ + "type": "object", + "properties": { + "job_id": { "type": "string" } + }, + "required": ["job_id"] + }) + } + + async fn execute(&self, args: serde_json::Value) -> anyhow::Result { + if !self.config.cron.enabled { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("cron is disabled by config (cron.enabled=false)".to_string()), + }); + } + + let job_id = match args.get("job_id").and_then(serde_json::Value::as_str) { + Some(v) if !v.trim().is_empty() => v, + _ => { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("Missing 'job_id' parameter".to_string()), + }); + } + }; + + let job = match cron::get_job(&self.config, job_id) { + Ok(job) => job, + Err(e) => { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some(e.to_string()), + }); + } + }; + + let started_at = Utc::now(); + let (success, output) = cron::scheduler::execute_job_now(&self.config, &job).await; + let finished_at = Utc::now(); + let duration_ms = (finished_at - started_at).num_milliseconds(); + let status = if success { "ok" } else { "error" }; + + let _ = cron::record_run( + &self.config, + &job.id, + started_at, + finished_at, + status, + Some(&output), + duration_ms, + ); + let _ = cron::record_last_run(&self.config, &job.id, finished_at, success, &output); + + Ok(ToolResult { + success, + output: serde_json::to_string_pretty(&json!({ + "job_id": job.id, + "status": status, + "duration_ms": duration_ms, + "output": output + }))?, + error: if success { + None + } else { + Some("cron job execution failed".to_string()) + }, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::Config; + use tempfile::TempDir; + + fn test_config(tmp: &TempDir) -> Arc { + let config = Config { + workspace_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + ..Config::default() + }; + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + Arc::new(config) + } + + #[tokio::test] + async fn force_runs_job_and_records_history() { + let tmp = TempDir::new().unwrap(); + let cfg = test_config(&tmp); + let job = cron::add_job(&cfg, "*/5 * * * *", "echo run-now").unwrap(); + let tool = CronRunTool::new(cfg.clone()); + + let result = tool.execute(json!({ "job_id": job.id })).await.unwrap(); + assert!(result.success, "{:?}", result.error); + + let runs = cron::list_runs(&cfg, &job.id, 10).unwrap(); + assert_eq!(runs.len(), 1); + } + + #[tokio::test] + async fn errors_for_missing_job() { + let tmp = TempDir::new().unwrap(); + let cfg = test_config(&tmp); + let tool = CronRunTool::new(cfg); + + let result = tool + .execute(json!({ "job_id": "missing-job-id" })) + .await + .unwrap(); + assert!(!result.success); + assert!(result.error.unwrap_or_default().contains("not found")); + } +} diff --git a/src/tools/cron_runs.rs b/src/tools/cron_runs.rs new file mode 100644 index 0000000..280baa1 --- /dev/null +++ b/src/tools/cron_runs.rs @@ -0,0 +1,175 @@ +use super::traits::{Tool, ToolResult}; +use crate::config::Config; +use crate::cron; +use async_trait::async_trait; +use serde::Serialize; +use serde_json::json; +use std::sync::Arc; + +const MAX_RUN_OUTPUT_CHARS: usize = 500; + +pub struct CronRunsTool { + config: Arc, +} + +impl CronRunsTool { + pub fn new(config: Arc) -> Self { + Self { config } + } +} + +#[derive(Serialize)] +struct RunView { + id: i64, + job_id: String, + started_at: chrono::DateTime, + finished_at: chrono::DateTime, + status: String, + output: Option, + duration_ms: Option, +} + +#[async_trait] +impl Tool for CronRunsTool { + fn name(&self) -> &str { + "cron_runs" + } + + fn description(&self) -> &str { + "List recent run history for a cron job" + } + + fn parameters_schema(&self) -> serde_json::Value { + json!({ + "type": "object", + "properties": { + "job_id": { "type": "string" }, + "limit": { "type": "integer" } + }, + "required": ["job_id"] + }) + } + + async fn execute(&self, args: serde_json::Value) -> anyhow::Result { + if !self.config.cron.enabled { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("cron is disabled by config (cron.enabled=false)".to_string()), + }); + } + + let job_id = match args.get("job_id").and_then(serde_json::Value::as_str) { + Some(v) if !v.trim().is_empty() => v, + _ => { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("Missing 'job_id' parameter".to_string()), + }); + } + }; + + let limit = args + .get("limit") + .and_then(serde_json::Value::as_u64) + .map_or(10, |v| usize::try_from(v).unwrap_or(10)); + + match cron::list_runs(&self.config, job_id, limit) { + Ok(runs) => { + let runs: Vec = runs + .into_iter() + .map(|run| RunView { + id: run.id, + job_id: run.job_id, + started_at: run.started_at, + finished_at: run.finished_at, + status: run.status, + output: run.output.map(|out| truncate(&out, MAX_RUN_OUTPUT_CHARS)), + duration_ms: run.duration_ms, + }) + .collect(); + + Ok(ToolResult { + success: true, + output: serde_json::to_string_pretty(&runs)?, + error: None, + }) + } + Err(e) => Ok(ToolResult { + success: false, + output: String::new(), + error: Some(e.to_string()), + }), + } + } +} + +fn truncate(input: &str, max_chars: usize) -> String { + if input.chars().count() <= max_chars { + return input.to_string(); + } + let mut out: String = input.chars().take(max_chars).collect(); + out.push_str("..."); + out +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::Config; + use chrono::{Duration as ChronoDuration, Utc}; + use tempfile::TempDir; + + fn test_config(tmp: &TempDir) -> Arc { + let config = Config { + workspace_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + ..Config::default() + }; + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + Arc::new(config) + } + + #[tokio::test] + async fn lists_runs_with_truncation() { + let tmp = TempDir::new().unwrap(); + let cfg = test_config(&tmp); + let job = cron::add_job(&cfg, "*/5 * * * *", "echo ok").unwrap(); + + let long_output = "x".repeat(1000); + let now = Utc::now(); + cron::record_run( + &cfg, + &job.id, + now, + now + ChronoDuration::milliseconds(1), + "ok", + Some(&long_output), + 1, + ) + .unwrap(); + + let tool = CronRunsTool::new(cfg.clone()); + let result = tool + .execute(json!({ "job_id": job.id, "limit": 5 })) + .await + .unwrap(); + + assert!(result.success); + assert!(result.output.contains("...")); + } + + #[tokio::test] + async fn errors_when_job_id_missing() { + let tmp = TempDir::new().unwrap(); + let cfg = test_config(&tmp); + let tool = CronRunsTool::new(cfg); + let result = tool.execute(json!({})).await.unwrap(); + assert!(!result.success); + assert!(result + .error + .unwrap_or_default() + .contains("Missing 'job_id'")); + } +} diff --git a/src/tools/cron_update.rs b/src/tools/cron_update.rs new file mode 100644 index 0000000..c224b17 --- /dev/null +++ b/src/tools/cron_update.rs @@ -0,0 +1,177 @@ +use super::traits::{Tool, ToolResult}; +use crate::config::Config; +use crate::cron::{self, CronJobPatch}; +use crate::security::SecurityPolicy; +use async_trait::async_trait; +use serde_json::json; +use std::sync::Arc; + +pub struct CronUpdateTool { + config: Arc, + security: Arc, +} + +impl CronUpdateTool { + pub fn new(config: Arc, security: Arc) -> Self { + Self { config, security } + } +} + +#[async_trait] +impl Tool for CronUpdateTool { + fn name(&self) -> &str { + "cron_update" + } + + fn description(&self) -> &str { + "Patch an existing cron job (schedule, command, prompt, enabled, delivery, model, etc.)" + } + + fn parameters_schema(&self) -> serde_json::Value { + json!({ + "type": "object", + "properties": { + "job_id": { "type": "string" }, + "patch": { "type": "object" } + }, + "required": ["job_id", "patch"] + }) + } + + async fn execute(&self, args: serde_json::Value) -> anyhow::Result { + if !self.config.cron.enabled { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("cron is disabled by config (cron.enabled=false)".to_string()), + }); + } + + let job_id = match args.get("job_id").and_then(serde_json::Value::as_str) { + Some(v) if !v.trim().is_empty() => v, + _ => { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("Missing 'job_id' parameter".to_string()), + }); + } + }; + + let patch_val = match args.get("patch") { + Some(v) => v.clone(), + None => { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("Missing 'patch' parameter".to_string()), + }); + } + }; + + let patch = match serde_json::from_value::(patch_val) { + Ok(patch) => patch, + Err(e) => { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some(format!("Invalid patch payload: {e}")), + }); + } + }; + + if let Some(command) = &patch.command { + if !self.security.is_command_allowed(command) { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some(format!("Command blocked by security policy: {command}")), + }); + } + } + + match cron::update_job(&self.config, job_id, patch) { + Ok(job) => Ok(ToolResult { + success: true, + output: serde_json::to_string_pretty(&job)?, + error: None, + }), + Err(e) => Ok(ToolResult { + success: false, + output: String::new(), + error: Some(e.to_string()), + }), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::Config; + use tempfile::TempDir; + + fn test_config(tmp: &TempDir) -> Arc { + let config = Config { + workspace_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + ..Config::default() + }; + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + Arc::new(config) + } + + fn test_security(cfg: &Config) -> Arc { + Arc::new(SecurityPolicy::from_config( + &cfg.autonomy, + &cfg.workspace_dir, + )) + } + + #[tokio::test] + async fn updates_enabled_flag() { + let tmp = TempDir::new().unwrap(); + let cfg = test_config(&tmp); + let job = cron::add_job(&cfg, "*/5 * * * *", "echo ok").unwrap(); + let tool = CronUpdateTool::new(cfg.clone(), test_security(&cfg)); + + let result = tool + .execute(json!({ + "job_id": job.id, + "patch": { "enabled": false } + })) + .await + .unwrap(); + + assert!(result.success, "{:?}", result.error); + assert!(result.output.contains("\"enabled\": false")); + } + + #[tokio::test] + async fn blocks_disallowed_command_updates() { + let tmp = TempDir::new().unwrap(); + let mut config = Config { + workspace_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + ..Config::default() + }; + config.autonomy.allowed_commands = vec!["echo".into()]; + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + let cfg = Arc::new(config); + let job = cron::add_job(&cfg, "*/5 * * * *", "echo ok").unwrap(); + let tool = CronUpdateTool::new(cfg.clone(), test_security(&cfg)); + + let result = tool + .execute(json!({ + "job_id": job.id, + "patch": { "command": "curl https://example.com" } + })) + .await + .unwrap(); + assert!(!result.success); + assert!(result + .error + .unwrap_or_default() + .contains("blocked by security policy")); + } +} diff --git a/src/tools/mod.rs b/src/tools/mod.rs index fcf8fa5..07f29d8 100644 --- a/src/tools/mod.rs +++ b/src/tools/mod.rs @@ -1,6 +1,12 @@ pub mod browser; pub mod browser_open; pub mod composio; +pub mod cron_add; +pub mod cron_list; +pub mod cron_remove; +pub mod cron_run; +pub mod cron_runs; +pub mod cron_update; pub mod delegate; pub mod file_read; pub mod file_write; @@ -21,6 +27,12 @@ pub mod traits; pub use browser::{BrowserTool, ComputerUseConfig}; pub use browser_open::BrowserOpenTool; pub use composio::ComposioTool; +pub use cron_add::CronAddTool; +pub use cron_list::CronListTool; +pub use cron_remove::CronRemoveTool; +pub use cron_run::CronRunTool; +pub use cron_runs::CronRunsTool; +pub use cron_update::CronUpdateTool; pub use delegate::DelegateTool; pub use file_read::FileReadTool; pub use file_write::FileWriteTool; @@ -40,7 +52,7 @@ pub use traits::Tool; #[allow(unused_imports)] pub use traits::{ToolResult, ToolSpec}; -use crate::config::DelegateAgentConfig; +use crate::config::{Config, DelegateAgentConfig}; use crate::memory::Memory; use crate::runtime::{NativeRuntime, RuntimeAdapter}; use crate::security::SecurityPolicy; @@ -67,6 +79,7 @@ pub fn default_tools_with_runtime( /// Create full tool registry including memory tools and optional Composio #[allow(clippy::implicit_hasher, clippy::too_many_arguments)] pub fn all_tools( + config: Arc, security: &Arc, memory: Arc, composio_key: Option<&str>, @@ -76,9 +89,10 @@ pub fn all_tools( workspace_dir: &std::path::Path, agents: &HashMap, fallback_api_key: Option<&str>, - config: &crate::config::Config, + root_config: &crate::config::Config, ) -> Vec> { all_tools_with_runtime( + config, security, Arc::new(NativeRuntime::new()), memory, @@ -89,13 +103,14 @@ pub fn all_tools( workspace_dir, agents, fallback_api_key, - config, + root_config, ) } /// Create full tool registry including memory tools and optional Composio. #[allow(clippy::implicit_hasher, clippy::too_many_arguments)] pub fn all_tools_with_runtime( + config: Arc, security: &Arc, runtime: Arc, memory: Arc, @@ -106,16 +121,22 @@ pub fn all_tools_with_runtime( workspace_dir: &std::path::Path, agents: &HashMap, fallback_api_key: Option<&str>, - config: &crate::config::Config, + root_config: &crate::config::Config, ) -> Vec> { let mut tools: Vec> = vec![ Box::new(ShellTool::new(security.clone(), runtime)), Box::new(FileReadTool::new(security.clone())), Box::new(FileWriteTool::new(security.clone())), + Box::new(CronAddTool::new(config.clone(), security.clone())), + Box::new(CronListTool::new(config.clone())), + Box::new(CronRemoveTool::new(config.clone())), + Box::new(CronUpdateTool::new(config.clone(), security.clone())), + Box::new(CronRunTool::new(config.clone())), + Box::new(CronRunsTool::new(config.clone())), Box::new(MemoryStoreTool::new(memory.clone())), Box::new(MemoryRecallTool::new(memory.clone())), Box::new(MemoryForgetTool::new(memory)), - Box::new(ScheduleTool::new(security.clone(), config.clone())), + Box::new(ScheduleTool::new(security.clone(), root_config.clone())), Box::new(GitOperationsTool::new( security.clone(), workspace_dir.to_path_buf(), @@ -225,6 +246,7 @@ mod tests { let cfg = test_config(&tmp); let tools = all_tools( + Arc::new(Config::default()), &security, mem, None, @@ -262,6 +284,7 @@ mod tests { let cfg = test_config(&tmp); let tools = all_tools( + Arc::new(Config::default()), &security, mem, None, @@ -400,6 +423,7 @@ mod tests { ); let tools = all_tools( + Arc::new(Config::default()), &security, mem, None, @@ -431,6 +455,7 @@ mod tests { let cfg = test_config(&tmp); let tools = all_tools( + Arc::new(Config::default()), &security, mem, None, diff --git a/src/tools/schedule.rs b/src/tools/schedule.rs index 43234b8..96c3023 100644 --- a/src/tools/schedule.rs +++ b/src/tools/schedule.rs @@ -161,9 +161,11 @@ impl ScheduleTool { let mut lines = Vec::with_capacity(jobs.len()); for job in jobs { - let flags = match (job.paused, job.one_shot) { - (true, true) => " [paused, one-shot]", - (true, false) => " [paused]", + let paused = !job.enabled; + let one_shot = matches!(job.schedule, cron::Schedule::At { .. }); + let flags = match (paused, one_shot) { + (true, true) => " [disabled, one-shot]", + (true, false) => " [disabled]", (false, true) => " [one-shot]", (false, false) => "", }; @@ -191,8 +193,8 @@ impl ScheduleTool { } fn handle_get(&self, id: &str) -> Result { - match cron::get_job(&self.config, id)? { - Some(job) => { + match cron::get_job(&self.config, id) { + Ok(job) => { let detail = json!({ "id": job.id, "expression": job.expression, @@ -200,8 +202,8 @@ impl ScheduleTool { "next_run": job.next_run.to_rfc3339(), "last_run": job.last_run.map(|value| value.to_rfc3339()), "last_status": job.last_status, - "paused": job.paused, - "one_shot": job.one_shot, + "enabled": job.enabled, + "one_shot": matches!(job.schedule, cron::Schedule::At { .. }), }); Ok(ToolResult { success: true, @@ -209,7 +211,7 @@ impl ScheduleTool { error: None, }) } - None => Ok(ToolResult { + Err(_) => Ok(ToolResult { success: false, output: String::new(), error: Some(format!("Job '{id}' not found")), @@ -342,7 +344,7 @@ impl ScheduleTool { }; match operation { - Ok(()) => ToolResult { + Ok(_) => ToolResult { success: true, output: if pause { format!("Paused job {id}")