Implement cron job management tools and types
- Added `JobType`, `SessionTarget`, `Schedule`, `DeliveryConfig`, `CronJob`, `CronRun`, and `CronJobPatch` types in `src/cron/types.rs` for cron job configuration and management. - Introduced `CronAddTool`, `CronListTool`, `CronRemoveTool`, `CronRunTool`, `CronRunsTool`, and `CronUpdateTool` in `src/tools` for adding, listing, removing, running, and updating cron jobs. - Updated the `run` function in `src/daemon/mod.rs` to conditionally start the scheduler based on the cron configuration. - Modified command-line argument parsing in `src/lib.rs` and `src/main.rs` to support new cron job commands. - Enhanced the onboarding wizard in `src/onboard/wizard.rs` to include cron configuration. - Added tests for cron job tools to ensure functionality and error handling.
This commit is contained in:
parent
0ec46ac3d1
commit
fb2d1cea0b
24 changed files with 2682 additions and 638 deletions
|
|
@ -1,26 +1,21 @@
|
|||
use crate::channels::{Channel, DiscordChannel, SlackChannel, TelegramChannel};
|
||||
use crate::config::Config;
|
||||
use crate::cron::{due_jobs, reschedule_after_run, CronJob};
|
||||
use crate::cron::{
|
||||
due_jobs, next_run_for_schedule, record_last_run, record_run, remove_job, reschedule_after_run,
|
||||
update_job, CronJob, CronJobPatch, DeliveryConfig, JobType, Schedule, SessionTarget,
|
||||
};
|
||||
use crate::security::SecurityPolicy;
|
||||
use anyhow::Result;
|
||||
use chrono::Utc;
|
||||
use chrono::{DateTime, Utc};
|
||||
use tokio::process::Command;
|
||||
use tokio::time::{self, Duration};
|
||||
|
||||
const MIN_POLL_SECONDS: u64 = 5;
|
||||
|
||||
pub async fn run(config: Config) -> Result<()> {
|
||||
if !config.scheduler.enabled {
|
||||
tracing::info!("Scheduler disabled by config");
|
||||
crate::health::mark_component_ok("scheduler");
|
||||
loop {
|
||||
time::sleep(Duration::from_secs(3600)).await;
|
||||
}
|
||||
}
|
||||
|
||||
let poll_secs = config.reliability.scheduler_poll_secs.max(MIN_POLL_SECONDS);
|
||||
let mut interval = time::interval(Duration::from_secs(poll_secs));
|
||||
let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir);
|
||||
let max_concurrent = config.scheduler.max_concurrent.max(1);
|
||||
|
||||
crate::health::mark_component_ok("scheduler");
|
||||
|
||||
|
|
@ -36,22 +31,28 @@ pub async fn run(config: Config) -> Result<()> {
|
|||
}
|
||||
};
|
||||
|
||||
for job in jobs.into_iter().take(max_concurrent) {
|
||||
for job in jobs {
|
||||
crate::health::mark_component_ok("scheduler");
|
||||
warn_if_high_frequency_agent_job(&job);
|
||||
|
||||
let started_at = Utc::now();
|
||||
let (success, output) = execute_job_with_retry(&config, &security, &job).await;
|
||||
let finished_at = Utc::now();
|
||||
let success =
|
||||
persist_job_result(&config, &job, success, &output, started_at, finished_at).await;
|
||||
|
||||
if !success {
|
||||
crate::health::mark_component_error("scheduler", format!("job {} failed", job.id));
|
||||
}
|
||||
|
||||
if let Err(e) = reschedule_after_run(&config, &job, success, &output) {
|
||||
crate::health::mark_component_error("scheduler", e.to_string());
|
||||
tracing::warn!("Failed to persist scheduler run result: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn execute_job_now(config: &Config, job: &CronJob) -> (bool, String) {
|
||||
let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir);
|
||||
execute_job_with_retry(config, &security, job).await
|
||||
}
|
||||
|
||||
async fn execute_job_with_retry(
|
||||
config: &Config,
|
||||
security: &SecurityPolicy,
|
||||
|
|
@ -62,7 +63,10 @@ async fn execute_job_with_retry(
|
|||
let mut backoff_ms = config.reliability.provider_backoff_ms.max(200);
|
||||
|
||||
for attempt in 0..=retries {
|
||||
let (success, output) = run_job_command(config, security, job).await;
|
||||
let (success, output) = match job.job_type {
|
||||
JobType::Shell => run_job_command(config, security, job).await,
|
||||
JobType::Agent => run_agent_job(config, job).await,
|
||||
};
|
||||
last_output = output;
|
||||
|
||||
if success {
|
||||
|
|
@ -84,6 +88,185 @@ async fn execute_job_with_retry(
|
|||
(false, last_output)
|
||||
}
|
||||
|
||||
async fn run_agent_job(config: &Config, job: &CronJob) -> (bool, String) {
|
||||
let name = job.name.clone().unwrap_or_else(|| "cron-job".to_string());
|
||||
let prompt = job.prompt.clone().unwrap_or_default();
|
||||
let prefixed_prompt = format!("[cron:{} {name}] {prompt}", job.id);
|
||||
let model_override = job.model.clone();
|
||||
|
||||
let run_result = match job.session_target {
|
||||
SessionTarget::Main | SessionTarget::Isolated => {
|
||||
crate::agent::run(
|
||||
config.clone(),
|
||||
Some(prefixed_prompt),
|
||||
None,
|
||||
model_override,
|
||||
config.default_temperature,
|
||||
vec![],
|
||||
)
|
||||
.await
|
||||
}
|
||||
};
|
||||
|
||||
match run_result {
|
||||
Ok(response) => (
|
||||
true,
|
||||
if response.trim().is_empty() {
|
||||
"agent job executed".to_string()
|
||||
} else {
|
||||
response
|
||||
},
|
||||
),
|
||||
Err(e) => (false, format!("agent job failed: {e}")),
|
||||
}
|
||||
}
|
||||
|
||||
async fn persist_job_result(
|
||||
config: &Config,
|
||||
job: &CronJob,
|
||||
mut success: bool,
|
||||
output: &str,
|
||||
started_at: DateTime<Utc>,
|
||||
finished_at: DateTime<Utc>,
|
||||
) -> bool {
|
||||
let duration_ms = (finished_at - started_at).num_milliseconds();
|
||||
|
||||
if let Err(e) = deliver_if_configured(config, job, output).await {
|
||||
if job.delivery.best_effort {
|
||||
tracing::warn!("Cron delivery failed (best_effort): {e}");
|
||||
} else {
|
||||
success = false;
|
||||
tracing::warn!("Cron delivery failed: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
let _ = record_run(
|
||||
config,
|
||||
&job.id,
|
||||
started_at,
|
||||
finished_at,
|
||||
if success { "ok" } else { "error" },
|
||||
Some(output),
|
||||
duration_ms,
|
||||
);
|
||||
|
||||
if is_one_shot_auto_delete(job) {
|
||||
if success {
|
||||
if let Err(e) = remove_job(config, &job.id) {
|
||||
tracing::warn!("Failed to remove one-shot cron job after success: {e}");
|
||||
}
|
||||
} else {
|
||||
let _ = record_last_run(config, &job.id, finished_at, false, output);
|
||||
if let Err(e) = update_job(
|
||||
config,
|
||||
&job.id,
|
||||
CronJobPatch {
|
||||
enabled: Some(false),
|
||||
..CronJobPatch::default()
|
||||
},
|
||||
) {
|
||||
tracing::warn!("Failed to disable failed one-shot cron job: {e}");
|
||||
}
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
||||
if let Err(e) = reschedule_after_run(config, job, success, output) {
|
||||
tracing::warn!("Failed to persist scheduler run result: {e}");
|
||||
}
|
||||
|
||||
success
|
||||
}
|
||||
|
||||
fn is_one_shot_auto_delete(job: &CronJob) -> bool {
|
||||
job.delete_after_run && matches!(job.schedule, Schedule::At { .. })
|
||||
}
|
||||
|
||||
fn warn_if_high_frequency_agent_job(job: &CronJob) {
|
||||
if !matches!(job.job_type, JobType::Agent) {
|
||||
return;
|
||||
}
|
||||
let too_frequent = match &job.schedule {
|
||||
Schedule::Every { every_ms } => *every_ms < 5 * 60 * 1000,
|
||||
Schedule::Cron { .. } => {
|
||||
let now = Utc::now();
|
||||
match (
|
||||
next_run_for_schedule(&job.schedule, now),
|
||||
next_run_for_schedule(&job.schedule, now + chrono::Duration::seconds(1)),
|
||||
) {
|
||||
(Ok(a), Ok(b)) => (b - a).num_minutes() < 5,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
Schedule::At { .. } => false,
|
||||
};
|
||||
|
||||
if too_frequent {
|
||||
tracing::warn!(
|
||||
"Cron agent job '{}' is scheduled more frequently than every 5 minutes",
|
||||
job.id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async fn deliver_if_configured(config: &Config, job: &CronJob, output: &str) -> Result<()> {
|
||||
let delivery: &DeliveryConfig = &job.delivery;
|
||||
if !delivery.mode.eq_ignore_ascii_case("announce") {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let channel = delivery
|
||||
.channel
|
||||
.as_deref()
|
||||
.ok_or_else(|| anyhow::anyhow!("delivery.channel is required for announce mode"))?;
|
||||
let target = delivery
|
||||
.to
|
||||
.as_deref()
|
||||
.ok_or_else(|| anyhow::anyhow!("delivery.to is required for announce mode"))?;
|
||||
|
||||
match channel.to_ascii_lowercase().as_str() {
|
||||
"telegram" => {
|
||||
let tg = config
|
||||
.channels_config
|
||||
.telegram
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("telegram channel not configured"))?;
|
||||
let channel = TelegramChannel::new(tg.bot_token.clone(), tg.allowed_users.clone());
|
||||
channel.send(output, target).await?;
|
||||
}
|
||||
"discord" => {
|
||||
let dc = config
|
||||
.channels_config
|
||||
.discord
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("discord channel not configured"))?;
|
||||
let channel = DiscordChannel::new(
|
||||
dc.bot_token.clone(),
|
||||
dc.guild_id.clone(),
|
||||
dc.allowed_users.clone(),
|
||||
dc.listen_to_bots,
|
||||
);
|
||||
channel.send(output, target).await?;
|
||||
}
|
||||
"slack" => {
|
||||
let sl = config
|
||||
.channels_config
|
||||
.slack
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("slack channel not configured"))?;
|
||||
let channel = SlackChannel::new(
|
||||
sl.bot_token.clone(),
|
||||
sl.channel_id.clone(),
|
||||
sl.allowed_users.clone(),
|
||||
);
|
||||
channel.send(output, target).await?;
|
||||
}
|
||||
other => anyhow::bail!("unsupported delivery channel: {other}"),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_env_assignment(word: &str) -> bool {
|
||||
word.contains('=')
|
||||
&& word
|
||||
|
|
@ -212,7 +395,9 @@ async fn run_job_command(
|
|||
mod tests {
|
||||
use super::*;
|
||||
use crate::config::Config;
|
||||
use crate::cron::{self, DeliveryConfig};
|
||||
use crate::security::SecurityPolicy;
|
||||
use chrono::{Duration as ChronoDuration, Utc};
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn test_config(tmp: &TempDir) -> Config {
|
||||
|
|
@ -229,12 +414,24 @@ mod tests {
|
|||
CronJob {
|
||||
id: "test-job".into(),
|
||||
expression: "* * * * *".into(),
|
||||
schedule: crate::cron::Schedule::Cron {
|
||||
expr: "* * * * *".into(),
|
||||
tz: None,
|
||||
},
|
||||
command: command.into(),
|
||||
prompt: None,
|
||||
name: None,
|
||||
job_type: JobType::Shell,
|
||||
session_target: SessionTarget::Isolated,
|
||||
model: None,
|
||||
enabled: true,
|
||||
delivery: DeliveryConfig::default(),
|
||||
delete_after_run: false,
|
||||
created_at: Utc::now(),
|
||||
next_run: Utc::now(),
|
||||
last_run: None,
|
||||
last_status: None,
|
||||
paused: false,
|
||||
one_shot: false,
|
||||
last_output: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -356,4 +553,103 @@ mod tests {
|
|||
assert!(!success);
|
||||
assert!(output.contains("always_missing_for_retry_test"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn run_agent_job_returns_error_without_provider_key() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let config = test_config(&tmp);
|
||||
let mut job = test_job("");
|
||||
job.job_type = JobType::Agent;
|
||||
job.prompt = Some("Say hello".into());
|
||||
|
||||
let (success, output) = run_agent_job(&config, &job).await;
|
||||
assert!(!success);
|
||||
assert!(output.contains("agent job failed:"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn persist_job_result_records_run_and_reschedules_shell_job() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let config = test_config(&tmp);
|
||||
let job = cron::add_job(&config, "*/5 * * * *", "echo ok").unwrap();
|
||||
let started = Utc::now();
|
||||
let finished = started + ChronoDuration::milliseconds(10);
|
||||
|
||||
let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
|
||||
assert!(success);
|
||||
|
||||
let runs = cron::list_runs(&config, &job.id, 10).unwrap();
|
||||
assert_eq!(runs.len(), 1);
|
||||
let updated = cron::get_job(&config, &job.id).unwrap();
|
||||
assert_eq!(updated.last_status.as_deref(), Some("ok"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn persist_job_result_success_deletes_one_shot() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let config = test_config(&tmp);
|
||||
let at = Utc::now() + ChronoDuration::minutes(10);
|
||||
let job = cron::add_agent_job(
|
||||
&config,
|
||||
Some("one-shot".into()),
|
||||
crate::cron::Schedule::At { at },
|
||||
"Hello",
|
||||
SessionTarget::Isolated,
|
||||
None,
|
||||
None,
|
||||
true,
|
||||
)
|
||||
.unwrap();
|
||||
let started = Utc::now();
|
||||
let finished = started + ChronoDuration::milliseconds(10);
|
||||
|
||||
let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
|
||||
assert!(success);
|
||||
let lookup = cron::get_job(&config, &job.id);
|
||||
assert!(lookup.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn persist_job_result_failure_disables_one_shot() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let config = test_config(&tmp);
|
||||
let at = Utc::now() + ChronoDuration::minutes(10);
|
||||
let job = cron::add_agent_job(
|
||||
&config,
|
||||
Some("one-shot".into()),
|
||||
crate::cron::Schedule::At { at },
|
||||
"Hello",
|
||||
SessionTarget::Isolated,
|
||||
None,
|
||||
None,
|
||||
true,
|
||||
)
|
||||
.unwrap();
|
||||
let started = Utc::now();
|
||||
let finished = started + ChronoDuration::milliseconds(10);
|
||||
|
||||
let success = persist_job_result(&config, &job, false, "boom", started, finished).await;
|
||||
assert!(!success);
|
||||
let updated = cron::get_job(&config, &job.id).unwrap();
|
||||
assert!(!updated.enabled);
|
||||
assert_eq!(updated.last_status.as_deref(), Some("error"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn deliver_if_configured_handles_none_and_invalid_channel() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let config = test_config(&tmp);
|
||||
let mut job = test_job("echo ok");
|
||||
|
||||
assert!(deliver_if_configured(&config, &job, "x").await.is_ok());
|
||||
|
||||
job.delivery = DeliveryConfig {
|
||||
mode: "announce".into(),
|
||||
channel: Some("invalid".into()),
|
||||
to: Some("target".into()),
|
||||
best_effort: true,
|
||||
};
|
||||
let err = deliver_if_configured(&config, &job, "x").await.unwrap_err();
|
||||
assert!(err.to_string().contains("unsupported delivery channel"));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue