Initial implementation of vault-os

Complete implementation across all 13 phases:

- vault-core: types, YAML frontmatter parsing, entity classification,
  filesystem ops, config, prompt composition, validation, search
- vault-watch: filesystem watcher with daemon write filtering, event
  classification
- vault-scheduler: cron engine, process executor, task runner with
  retry logic and concurrency limiting
- vault-api: Axum REST API (15 route modules), WebSocket with broadcast,
  AI assistant proxy, validation, templates
- Dashboard: React + TypeScript + Tailwind v4 with kanban, CodeMirror
  editor, dynamic view system, AI chat sidebar
- Nix flake with dev shell and NixOS module
- Graceful shutdown, inotify overflow recovery, tracing instrumentation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Harald Hoyer 2026-03-03 01:21:17 +01:00
commit f820a72b04
123 changed files with 18288 additions and 0 deletions

View file

@ -0,0 +1,206 @@
use chrono::{DateTime, Utc};
use cron::Schedule;
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use vault_core::entity::VaultEntity;
use vault_core::error::VaultError;
use vault_core::filesystem;
use vault_core::frontmatter;
use vault_core::types::CronJob;
#[derive(Debug, thiserror::Error)]
pub enum CronError {
#[error("Invalid cron expression '{expr}': {reason}")]
InvalidExpression { expr: String, reason: String },
#[error("Vault error: {0}")]
Vault(#[from] VaultError),
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct ScheduleEntry {
next_fire: DateTime<Utc>,
path: PathBuf,
}
impl PartialOrd for ScheduleEntry {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ScheduleEntry {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.next_fire.cmp(&other.next_fire)
}
}
pub struct CronEngine {
vault_root: PathBuf,
schedule: BinaryHeap<Reverse<ScheduleEntry>>,
}
impl CronEngine {
pub fn new(vault_root: PathBuf) -> Self {
Self {
vault_root,
schedule: BinaryHeap::new(),
}
}
/// Rebuild the entire schedule by scanning `crons/active/`.
pub fn rebuild_schedule(&mut self) -> Result<(), CronError> {
self.schedule.clear();
let active_dir = self.vault_root.join("crons/active");
let files = filesystem::list_md_files(&active_dir)?;
for file in files {
if let Err(e) = self.add_cron(&file) {
tracing::warn!(?file, error = %e, "Skipping invalid cron");
}
}
tracing::info!(count = self.schedule.len(), "Rebuilt cron schedule");
Ok(())
}
/// Add or update a cron job in the schedule.
pub fn upsert_cron(&mut self, path: &Path) -> Result<(), CronError> {
self.remove_cron(path);
self.add_cron(path)
}
/// Remove a cron job from the schedule.
pub fn remove_cron(&mut self, path: &Path) {
let entries: Vec<_> = self
.schedule
.drain()
.filter(|Reverse(e)| e.path != path)
.collect();
self.schedule = entries.into_iter().collect();
}
/// Get the next fire time, if any crons are scheduled.
pub fn next_fire_time(&self) -> Option<DateTime<Utc>> {
self.schedule.peek().map(|Reverse(e)| e.next_fire)
}
/// Pop all crons that are due (fire time <= now).
pub fn pop_due(&mut self) -> Vec<PathBuf> {
let now = Utc::now();
let mut due = Vec::new();
while let Some(Reverse(entry)) = self.schedule.peek() {
if entry.next_fire <= now {
let Reverse(entry) = self.schedule.pop().unwrap();
due.push(entry.path);
} else {
break;
}
}
due
}
/// Fire a cron: create an agent task in queued/, update cron frontmatter.
/// Returns the path to the created agent task.
#[tracing::instrument(skip(self, write_filter), fields(cron = ?cron_path.file_name()))]
pub fn fire_cron(
&mut self,
cron_path: &Path,
write_filter: &vault_watch::write_filter::DaemonWriteFilter,
) -> Result<PathBuf, CronError> {
let entity: VaultEntity<CronJob> = filesystem::read_entity(cron_path)?;
let cron = &entity.frontmatter;
// Create agent task
let slug = filesystem::timestamped_slug(&cron.title);
let task_path = self
.vault_root
.join("todos/agent/queued")
.join(format!("{}.md", slug));
let now = Utc::now();
let agent_task = vault_core::types::AgentTask {
title: cron.title.clone(),
agent: cron.agent.clone(),
priority: vault_core::types::Priority::Medium,
task_type: Some("cron".into()),
created: now,
started: None,
completed: None,
retry: 0,
max_retries: 0,
input: None,
output: None,
error: None,
};
let task_entity = VaultEntity {
path: task_path.clone(),
frontmatter: agent_task,
body: entity.body.clone(),
};
write_filter.register(task_path.clone());
filesystem::write_entity(&task_entity)?;
// Update cron frontmatter
let content = std::fs::read_to_string(cron_path)
.map_err(|e| VaultError::io(e, cron_path))?;
let updates = serde_json::json!({
"last_run": now.to_rfc3339(),
"last_status": "success",
"run_count": cron.run_count + 1,
});
let updated = frontmatter::update_frontmatter_fields(&content, cron_path, &updates)?;
write_filter.register(cron_path.to_path_buf());
std::fs::write(cron_path, updated).map_err(|e| VaultError::io(e, cron_path))?;
// Re-schedule this cron
if let Err(e) = self.add_cron(cron_path) {
tracing::warn!(?cron_path, error = %e, "Failed to reschedule cron");
}
tracing::info!(
cron = %cron.title,
agent = %cron.agent,
task = ?task_path,
"Fired cron job"
);
Ok(task_path)
}
fn add_cron(&mut self, path: &Path) -> Result<(), CronError> {
let entity: VaultEntity<CronJob> = filesystem::read_entity(path)?;
let cron = &entity.frontmatter;
if !cron.enabled {
return Ok(());
}
// cron crate expects 6 or 7 fields (sec min hour dom month dow [year])
// Standard 5-field cron: prepend "0 " for seconds
let expr = format!("0 {}", cron.schedule);
let schedule = Schedule::from_str(&expr).map_err(|e| CronError::InvalidExpression {
expr: cron.schedule.clone(),
reason: e.to_string(),
})?;
if let Some(next) = schedule.upcoming(Utc).next() {
self.schedule.push(Reverse(ScheduleEntry {
next_fire: next,
path: path.to_path_buf(),
}));
}
Ok(())
}
pub fn scheduled_count(&self) -> usize {
self.schedule.len()
}
}

View file

@ -0,0 +1,41 @@
use std::collections::HashMap;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct ExecutionResult {
pub stdout: String,
pub stderr: String,
pub exit_code: Option<i32>,
pub duration: Duration,
}
#[derive(Debug, thiserror::Error)]
pub enum ExecutionError {
#[error("Execution timed out after {0:?}")]
Timeout(Duration),
#[error("Process failed to start: {0}")]
SpawnFailed(String),
#[error("Process exited with code {code}: {stderr}")]
NonZeroExit { code: i32, stderr: String },
#[error("HTTP error: {0}")]
Http(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
}
#[async_trait::async_trait]
pub trait Executor: Send + Sync {
async fn execute(
&self,
executable: &str,
model: Option<&str>,
system_prompt: &str,
task_context: &str,
env: &HashMap<String, String>,
timeout: Duration,
) -> Result<ExecutionResult, ExecutionError>;
}

View file

@ -0,0 +1 @@
pub mod process;

View file

@ -0,0 +1,132 @@
use crate::executor::{ExecutionError, ExecutionResult, Executor};
use std::collections::HashMap;
use std::time::{Duration, Instant};
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
/// Generic process executor: spawns a child process, pipes prompt to stdin,
/// captures stdout/stderr.
pub struct GenericProcessExecutor {
vault_path: std::path::PathBuf,
}
impl GenericProcessExecutor {
pub fn new(vault_path: std::path::PathBuf) -> Self {
Self { vault_path }
}
/// Expand `${VAR}` references in environment variable values.
fn expand_env(value: &str) -> String {
let mut result = value.to_string();
// Simple ${VAR} expansion from process environment
while let Some(start) = result.find("${") {
if let Some(end) = result[start..].find('}') {
let var_name = &result[start + 2..start + end];
let replacement = std::env::var(var_name).unwrap_or_default();
result = format!("{}{}{}", &result[..start], replacement, &result[start + end + 1..]);
} else {
break;
}
}
result
}
}
#[async_trait::async_trait]
impl Executor for GenericProcessExecutor {
async fn execute(
&self,
executable: &str,
model: Option<&str>,
system_prompt: &str,
task_context: &str,
env: &HashMap<String, String>,
timeout: Duration,
) -> Result<ExecutionResult, ExecutionError> {
let start = Instant::now();
// Build the full prompt
let full_prompt = if task_context.is_empty() {
system_prompt.to_string()
} else {
format!("{}\n\n## Task\n\n{}", system_prompt, task_context)
};
// Determine command and args based on executable type
let (cmd, args) = if executable == "claude-code" {
(
"claude".to_string(),
vec![
"--print".to_string(),
"--dangerously-skip-permissions".to_string(),
full_prompt.clone(),
],
)
} else {
(executable.to_string(), vec![])
};
let mut command = Command::new(&cmd);
command.args(&args);
// Set environment
command.env("VAULT_PATH", &self.vault_path);
for (key, value) in env {
command.env(key, Self::expand_env(value));
}
if let Some(model) = model {
command.env("MODEL", model);
}
// For non-claude executables, pipe prompt via stdin
if executable != "claude-code" {
command
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
} else {
command
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
}
let mut child = command
.spawn()
.map_err(|e| ExecutionError::SpawnFailed(format!("{}: {}", cmd, e)))?;
// Write prompt to stdin for non-claude executables
if executable != "claude-code" {
if let Some(mut stdin) = child.stdin.take() {
stdin.write_all(full_prompt.as_bytes()).await?;
drop(stdin);
}
}
// Wait with timeout
let output = match tokio::time::timeout(timeout, child.wait_with_output()).await {
Ok(result) => result.map_err(|e| ExecutionError::SpawnFailed(e.to_string()))?,
Err(_) => {
return Err(ExecutionError::Timeout(timeout));
}
};
let duration = start.elapsed();
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
let exit_code = output.status.code();
if output.status.success() {
Ok(ExecutionResult {
stdout,
stderr,
exit_code,
duration,
})
} else {
Err(ExecutionError::NonZeroExit {
code: exit_code.unwrap_or(-1),
stderr,
})
}
}
}

View file

@ -0,0 +1,5 @@
pub mod cron_engine;
pub mod executor;
pub mod executors;
pub mod state;
pub mod task_runner;

View file

@ -0,0 +1,36 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::path::Path;
use vault_core::error::VaultError;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct RuntimeState {
pub last_startup: Option<DateTime<Utc>>,
pub last_shutdown: Option<DateTime<Utc>>,
pub total_tasks_executed: u64,
pub total_cron_fires: u64,
}
impl RuntimeState {
pub fn load(vault_root: &Path) -> Result<Self, VaultError> {
let state_path = vault_root.join(".vault/state.json");
if !state_path.exists() {
return Ok(Self::default());
}
let content = std::fs::read_to_string(&state_path)
.map_err(|e| VaultError::io(e, &state_path))?;
let state: RuntimeState =
serde_json::from_str(&content).unwrap_or_default();
Ok(state)
}
pub fn save(&self, vault_root: &Path) -> Result<(), VaultError> {
let state_path = vault_root.join(".vault/state.json");
let content = serde_json::to_string_pretty(self)
.map_err(|e| VaultError::InvalidEntity {
path: state_path.clone(),
reason: e.to_string(),
})?;
std::fs::write(&state_path, content).map_err(|e| VaultError::io(e, &state_path))
}
}

View file

@ -0,0 +1,253 @@
use crate::executor::{ExecutionError, Executor};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::Semaphore;
use vault_core::entity::VaultEntity;
use vault_core::error::VaultError;
use vault_core::filesystem;
use vault_core::frontmatter;
use vault_core::types::{Agent, AgentTask};
use vault_watch::write_filter::DaemonWriteFilter;
pub struct TaskRunner {
vault_root: PathBuf,
semaphore: Arc<Semaphore>,
executor: Arc<dyn Executor>,
write_filter: Arc<DaemonWriteFilter>,
}
impl TaskRunner {
pub fn new(
vault_root: PathBuf,
max_parallel: usize,
executor: Arc<dyn Executor>,
write_filter: Arc<DaemonWriteFilter>,
) -> Self {
Self {
vault_root,
semaphore: Arc::new(Semaphore::new(max_parallel)),
executor,
write_filter,
}
}
/// Process all currently queued tasks.
pub async fn process_queued(&self) -> Result<Vec<PathBuf>, VaultError> {
let queued_dir = self.vault_root.join("todos/agent/queued");
let files = filesystem::list_md_files(&queued_dir)?;
let mut spawned = Vec::new();
for file in files {
spawned.push(file.clone());
let runner = TaskRunner {
vault_root: self.vault_root.clone(),
semaphore: self.semaphore.clone(),
executor: self.executor.clone(),
write_filter: self.write_filter.clone(),
};
tokio::spawn(async move {
if let Err(e) = runner.execute_task(&file).await {
tracing::error!(task = ?file, error = %e, "Task execution failed");
}
});
}
Ok(spawned)
}
/// Execute a single agent task.
#[tracing::instrument(skip(self), fields(task = ?task_path.file_name()))]
pub async fn execute_task(&self, task_path: &Path) -> Result<(), VaultError> {
let _permit = self
.semaphore
.acquire()
.await
.map_err(|e| VaultError::InvalidEntity {
path: task_path.to_path_buf(),
reason: format!("Semaphore closed: {}", e),
})?;
let task_entity: VaultEntity<AgentTask> = filesystem::read_entity(task_path)?;
let agent_name = &task_entity.frontmatter.agent;
// Load agent definition
let agent_path = self.vault_root.join("agents").join(format!("{}.md", agent_name));
let agent_entity: VaultEntity<Agent> = filesystem::read_entity(&agent_path)?;
// Move queued -> running
let running_path = self
.vault_root
.join("todos/agent/running")
.join(task_path.file_name().unwrap());
self.write_filter.register(running_path.clone());
filesystem::move_file(task_path, &running_path)?;
// Update started timestamp
let content = std::fs::read_to_string(&running_path)
.map_err(|e| VaultError::io(e, &running_path))?;
let updates = serde_json::json!({
"started": chrono::Utc::now().to_rfc3339(),
});
let updated = frontmatter::update_frontmatter_fields(&content, &running_path, &updates)?;
self.write_filter.register(running_path.clone());
std::fs::write(&running_path, updated).map_err(|e| VaultError::io(e, &running_path))?;
// Compose prompt
let system_prompt =
vault_core::prompt::compose_prompt(&self.vault_root, &agent_entity, None)?;
let task_context = &task_entity.body;
let timeout = std::time::Duration::from_secs(agent_entity.frontmatter.timeout);
// Execute
let result = self
.executor
.execute(
&agent_entity.frontmatter.executable,
agent_entity.frontmatter.model.as_deref(),
&system_prompt,
task_context,
&agent_entity.frontmatter.env,
timeout,
)
.await;
match result {
Ok(exec_result) => {
// Move running -> done
let done_path = self
.vault_root
.join("todos/agent/done")
.join(running_path.file_name().unwrap());
let content = std::fs::read_to_string(&running_path)
.map_err(|e| VaultError::io(e, &running_path))?;
let updates = serde_json::json!({
"completed": chrono::Utc::now().to_rfc3339(),
"output": {
"stdout": exec_result.stdout,
"duration_secs": exec_result.duration.as_secs(),
},
});
let updated =
frontmatter::update_frontmatter_fields(&content, &running_path, &updates)?;
self.write_filter.register(running_path.clone());
std::fs::write(&running_path, updated)
.map_err(|e| VaultError::io(e, &running_path))?;
self.write_filter.register(done_path.clone());
filesystem::move_file(&running_path, &done_path)?;
tracing::info!(task = ?done_path, "Task completed successfully");
}
Err(exec_error) => {
let task_entity: VaultEntity<AgentTask> = filesystem::read_entity(&running_path)?;
let retry = task_entity.frontmatter.retry;
let max_retries = task_entity.frontmatter.max_retries;
if retry < max_retries {
// Re-queue with incremented retry count
let content = std::fs::read_to_string(&running_path)
.map_err(|e| VaultError::io(e, &running_path))?;
let updates = serde_json::json!({
"retry": retry + 1,
"started": null,
"error": format!("Attempt {}: {}", retry + 1, exec_error),
});
let updated =
frontmatter::update_frontmatter_fields(&content, &running_path, &updates)?;
self.write_filter.register(running_path.clone());
std::fs::write(&running_path, updated)
.map_err(|e| VaultError::io(e, &running_path))?;
let queued_path = self
.vault_root
.join("todos/agent/queued")
.join(running_path.file_name().unwrap());
self.write_filter.register(queued_path.clone());
filesystem::move_file(&running_path, &queued_path)?;
tracing::warn!(
task = ?queued_path,
retry = retry + 1,
max_retries,
"Task failed, re-queued"
);
} else {
// Move running -> failed
let failed_path = self
.vault_root
.join("todos/agent/failed")
.join(running_path.file_name().unwrap());
let content = std::fs::read_to_string(&running_path)
.map_err(|e| VaultError::io(e, &running_path))?;
let error_msg = match &exec_error {
ExecutionError::Timeout(d) => format!("Timed out after {:?}", d),
ExecutionError::NonZeroExit { code, stderr } => {
format!("Exit code {}: {}", code, stderr)
}
other => other.to_string(),
};
let updates = serde_json::json!({
"completed": chrono::Utc::now().to_rfc3339(),
"error": error_msg,
});
let updated =
frontmatter::update_frontmatter_fields(&content, &running_path, &updates)?;
self.write_filter.register(running_path.clone());
std::fs::write(&running_path, updated)
.map_err(|e| VaultError::io(e, &running_path))?;
self.write_filter.register(failed_path.clone());
filesystem::move_file(&running_path, &failed_path)?;
tracing::error!(
task = ?failed_path,
error = %exec_error,
"Task failed permanently"
);
}
}
}
Ok(())
}
/// On startup, recover tasks that were left in running/ (daemon crashed).
/// Move them back to queued/ for re-execution.
pub fn recover_running_tasks(&self) -> Result<Vec<PathBuf>, VaultError> {
let running_dir = self.vault_root.join("todos/agent/running");
let files = filesystem::list_md_files(&running_dir)?;
let mut recovered = Vec::new();
for file in &files {
let queued_path = self
.vault_root
.join("todos/agent/queued")
.join(file.file_name().unwrap());
// Reset started timestamp
let content =
std::fs::read_to_string(file).map_err(|e| VaultError::io(e, file))?;
let updates = serde_json::json!({
"started": null,
});
if let Ok(updated) = frontmatter::update_frontmatter_fields(&content, file, &updates) {
self.write_filter.register(file.clone());
let _ = std::fs::write(file, updated);
}
self.write_filter.register(queued_path.clone());
filesystem::move_file(file, &queued_path)?;
recovered.push(queued_path);
tracing::info!(task = ?file, "Recovered running task");
}
if !recovered.is_empty() {
tracing::info!(count = recovered.len(), "Recovered tasks from previous run");
}
Ok(recovered)
}
}