- Fix doc_markdown warnings in WhatsApp channel - Fix needless_pass_by_value in cron, health, migration, service modules - Fix match_same_arms in migration.rs - Fix too_many_lines in skills/mod.rs - Fix manual_let_else in tools/file_write.rs - Apply cargo fmt formatting fixes All 435 tests pass, clippy clean.
297 lines
9.8 KiB
Rust
297 lines
9.8 KiB
Rust
use crate::config::Config;
|
|
use crate::cron::{due_jobs, reschedule_after_run, CronJob};
|
|
use crate::security::SecurityPolicy;
|
|
use anyhow::Result;
|
|
use chrono::Utc;
|
|
use tokio::process::Command;
|
|
use tokio::time::{self, Duration};
|
|
|
|
const MIN_POLL_SECONDS: u64 = 5;
|
|
|
|
pub async fn run(config: Config) -> Result<()> {
|
|
let poll_secs = config.reliability.scheduler_poll_secs.max(MIN_POLL_SECONDS);
|
|
let mut interval = time::interval(Duration::from_secs(poll_secs));
|
|
let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir);
|
|
|
|
crate::health::mark_component_ok("scheduler");
|
|
|
|
loop {
|
|
interval.tick().await;
|
|
|
|
let jobs = match due_jobs(&config, Utc::now()) {
|
|
Ok(jobs) => jobs,
|
|
Err(e) => {
|
|
crate::health::mark_component_error("scheduler", e.to_string());
|
|
tracing::warn!("Scheduler query failed: {e}");
|
|
continue;
|
|
}
|
|
};
|
|
|
|
for job in jobs {
|
|
crate::health::mark_component_ok("scheduler");
|
|
let (success, output) = execute_job_with_retry(&config, &security, &job).await;
|
|
|
|
if !success {
|
|
crate::health::mark_component_error("scheduler", format!("job {} failed", job.id));
|
|
}
|
|
|
|
if let Err(e) = reschedule_after_run(&config, &job, success, &output) {
|
|
crate::health::mark_component_error("scheduler", e.to_string());
|
|
tracing::warn!("Failed to persist scheduler run result: {e}");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn execute_job_with_retry(
|
|
config: &Config,
|
|
security: &SecurityPolicy,
|
|
job: &CronJob,
|
|
) -> (bool, String) {
|
|
let mut last_output = String::new();
|
|
let retries = config.reliability.scheduler_retries;
|
|
let mut backoff_ms = config.reliability.provider_backoff_ms.max(200);
|
|
|
|
for attempt in 0..=retries {
|
|
let (success, output) = run_job_command(config, security, job).await;
|
|
last_output = output;
|
|
|
|
if success {
|
|
return (true, last_output);
|
|
}
|
|
|
|
if last_output.starts_with("blocked by security policy:") {
|
|
// Deterministic policy violations are not retryable.
|
|
return (false, last_output);
|
|
}
|
|
|
|
if attempt < retries {
|
|
let jitter_ms = u64::from(Utc::now().timestamp_subsec_millis() % 250);
|
|
time::sleep(Duration::from_millis(backoff_ms + jitter_ms)).await;
|
|
backoff_ms = (backoff_ms.saturating_mul(2)).min(30_000);
|
|
}
|
|
}
|
|
|
|
(false, last_output)
|
|
}
|
|
|
|
fn is_env_assignment(word: &str) -> bool {
|
|
word.contains('=')
|
|
&& word
|
|
.chars()
|
|
.next()
|
|
.is_some_and(|c| c.is_ascii_alphabetic() || c == '_')
|
|
}
|
|
|
|
fn strip_wrapping_quotes(token: &str) -> &str {
|
|
token.trim_matches(|c| c == '"' || c == '\'')
|
|
}
|
|
|
|
fn forbidden_path_argument(security: &SecurityPolicy, command: &str) -> Option<String> {
|
|
let mut normalized = command.to_string();
|
|
for sep in ["&&", "||"] {
|
|
normalized = normalized.replace(sep, "\x00");
|
|
}
|
|
for sep in ['\n', ';', '|'] {
|
|
normalized = normalized.replace(sep, "\x00");
|
|
}
|
|
|
|
for segment in normalized.split('\x00') {
|
|
let tokens: Vec<&str> = segment.split_whitespace().collect();
|
|
if tokens.is_empty() {
|
|
continue;
|
|
}
|
|
|
|
// Skip leading env assignments and executable token.
|
|
let mut idx = 0;
|
|
while idx < tokens.len() && is_env_assignment(tokens[idx]) {
|
|
idx += 1;
|
|
}
|
|
if idx >= tokens.len() {
|
|
continue;
|
|
}
|
|
idx += 1;
|
|
|
|
for token in &tokens[idx..] {
|
|
let candidate = strip_wrapping_quotes(token);
|
|
if candidate.is_empty() || candidate.starts_with('-') || candidate.contains("://") {
|
|
continue;
|
|
}
|
|
|
|
let looks_like_path = candidate.starts_with('/')
|
|
|| candidate.starts_with("./")
|
|
|| candidate.starts_with("../")
|
|
|| candidate.starts_with("~/")
|
|
|| candidate.contains('/');
|
|
|
|
if looks_like_path && !security.is_path_allowed(candidate) {
|
|
return Some(candidate.to_string());
|
|
}
|
|
}
|
|
}
|
|
|
|
None
|
|
}
|
|
|
|
async fn run_job_command(
|
|
config: &Config,
|
|
security: &SecurityPolicy,
|
|
job: &CronJob,
|
|
) -> (bool, String) {
|
|
if !security.is_command_allowed(&job.command) {
|
|
return (
|
|
false,
|
|
format!(
|
|
"blocked by security policy: command not allowed: {}",
|
|
job.command
|
|
),
|
|
);
|
|
}
|
|
|
|
if let Some(path) = forbidden_path_argument(security, &job.command) {
|
|
return (
|
|
false,
|
|
format!("blocked by security policy: forbidden path argument: {path}"),
|
|
);
|
|
}
|
|
|
|
let output = Command::new("sh")
|
|
.arg("-lc")
|
|
.arg(&job.command)
|
|
.current_dir(&config.workspace_dir)
|
|
.output()
|
|
.await;
|
|
|
|
match output {
|
|
Ok(output) => {
|
|
let stdout = String::from_utf8_lossy(&output.stdout);
|
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
|
let combined = format!(
|
|
"status={}\nstdout:\n{}\nstderr:\n{}",
|
|
output.status,
|
|
stdout.trim(),
|
|
stderr.trim()
|
|
);
|
|
(output.status.success(), combined)
|
|
}
|
|
Err(e) => (false, format!("spawn error: {e}")),
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::config::Config;
|
|
use crate::security::SecurityPolicy;
|
|
use tempfile::TempDir;
|
|
|
|
fn test_config(tmp: &TempDir) -> Config {
|
|
let mut config = Config::default();
|
|
config.workspace_dir = tmp.path().join("workspace");
|
|
config.config_path = tmp.path().join("config.toml");
|
|
std::fs::create_dir_all(&config.workspace_dir).unwrap();
|
|
config
|
|
}
|
|
|
|
fn test_job(command: &str) -> CronJob {
|
|
CronJob {
|
|
id: "test-job".into(),
|
|
expression: "* * * * *".into(),
|
|
command: command.into(),
|
|
next_run: Utc::now(),
|
|
last_run: None,
|
|
last_status: None,
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn run_job_command_success() {
|
|
let tmp = TempDir::new().unwrap();
|
|
let config = test_config(&tmp);
|
|
let job = test_job("echo scheduler-ok");
|
|
let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir);
|
|
|
|
let (success, output) = run_job_command(&config, &security, &job).await;
|
|
assert!(success);
|
|
assert!(output.contains("scheduler-ok"));
|
|
assert!(output.contains("status=exit status: 0"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn run_job_command_failure() {
|
|
let tmp = TempDir::new().unwrap();
|
|
let config = test_config(&tmp);
|
|
let job = test_job("ls definitely_missing_file_for_scheduler_test");
|
|
let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir);
|
|
|
|
let (success, output) = run_job_command(&config, &security, &job).await;
|
|
assert!(!success);
|
|
assert!(output.contains("definitely_missing_file_for_scheduler_test"));
|
|
assert!(output.contains("status=exit status:"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn run_job_command_blocks_disallowed_command() {
|
|
let tmp = TempDir::new().unwrap();
|
|
let mut config = test_config(&tmp);
|
|
config.autonomy.allowed_commands = vec!["echo".into()];
|
|
let job = test_job("curl https://evil.example");
|
|
let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir);
|
|
|
|
let (success, output) = run_job_command(&config, &security, &job).await;
|
|
assert!(!success);
|
|
assert!(output.contains("blocked by security policy"));
|
|
assert!(output.contains("command not allowed"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn run_job_command_blocks_forbidden_path_argument() {
|
|
let tmp = TempDir::new().unwrap();
|
|
let mut config = test_config(&tmp);
|
|
config.autonomy.allowed_commands = vec!["cat".into()];
|
|
let job = test_job("cat /etc/passwd");
|
|
let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir);
|
|
|
|
let (success, output) = run_job_command(&config, &security, &job).await;
|
|
assert!(!success);
|
|
assert!(output.contains("blocked by security policy"));
|
|
assert!(output.contains("forbidden path argument"));
|
|
assert!(output.contains("/etc/passwd"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn execute_job_with_retry_recovers_after_first_failure() {
|
|
let tmp = TempDir::new().unwrap();
|
|
let mut config = test_config(&tmp);
|
|
config.reliability.scheduler_retries = 1;
|
|
config.reliability.provider_backoff_ms = 1;
|
|
config.autonomy.allowed_commands = vec!["sh".into()];
|
|
let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir);
|
|
|
|
std::fs::write(
|
|
config.workspace_dir.join("retry-once.sh"),
|
|
"#!/bin/sh\nif [ -f retry-ok.flag ]; then\n echo recovered\n exit 0\nfi\ntouch retry-ok.flag\nexit 1\n",
|
|
)
|
|
.unwrap();
|
|
let job = test_job("sh ./retry-once.sh");
|
|
|
|
let (success, output) = execute_job_with_retry(&config, &security, &job).await;
|
|
assert!(success);
|
|
assert!(output.contains("recovered"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn execute_job_with_retry_exhausts_attempts() {
|
|
let tmp = TempDir::new().unwrap();
|
|
let mut config = test_config(&tmp);
|
|
config.reliability.scheduler_retries = 1;
|
|
config.reliability.provider_backoff_ms = 1;
|
|
let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir);
|
|
|
|
let job = test_job("ls always_missing_for_retry_test");
|
|
|
|
let (success, output) = execute_job_with_retry(&config, &security, &job).await;
|
|
assert!(!success);
|
|
assert!(output.contains("always_missing_for_retry_test"));
|
|
}
|
|
}
|