diff --git a/rust/crates/runtime/src/lib.rs b/rust/crates/runtime/src/lib.rs index 75c1b9f..b3a43d5 100644 --- a/rust/crates/runtime/src/lib.rs +++ b/rust/crates/runtime/src/lib.rs @@ -17,6 +17,7 @@ pub mod sandbox; mod session; mod sse; pub mod task_registry; +pub mod team_cron_registry; mod usage; pub use bash::{execute_bash, BashCommandInput, BashCommandOutput}; diff --git a/rust/crates/runtime/src/team_cron_registry.rs b/rust/crates/runtime/src/team_cron_registry.rs new file mode 100644 index 0000000..2fc14cc --- /dev/null +++ b/rust/crates/runtime/src/team_cron_registry.rs @@ -0,0 +1,363 @@ +//! In-memory registries for Team and Cron lifecycle management. +//! +//! Provides TeamCreate/Delete and CronCreate/Delete/List runtime backing +//! to replace the stub implementations in the tools crate. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use serde::{Deserialize, Serialize}; + +fn now_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} + +// ───────────────────────────────────────────── +// Team registry +// ───────────────────────────────────────────── + +/// A team groups multiple tasks for parallel execution. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Team { + pub team_id: String, + pub name: String, + pub task_ids: Vec, + pub status: TeamStatus, + pub created_at: u64, + pub updated_at: u64, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum TeamStatus { + Created, + Running, + Completed, + Deleted, +} + +impl std::fmt::Display for TeamStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Created => write!(f, "created"), + Self::Running => write!(f, "running"), + Self::Completed => write!(f, "completed"), + Self::Deleted => write!(f, "deleted"), + } + } +} + +/// Thread-safe team registry. +#[derive(Debug, Clone, Default)] +pub struct TeamRegistry { + inner: Arc>, +} + +#[derive(Debug, Default)] +struct TeamInner { + teams: HashMap, + counter: u64, +} + +impl TeamRegistry { + #[must_use] + pub fn new() -> Self { + Self::default() + } + + /// Create a new team with the given name and task IDs. + pub fn create(&self, name: &str, task_ids: Vec) -> Team { + let mut inner = self.inner.lock().expect("team registry lock poisoned"); + inner.counter += 1; + let ts = now_secs(); + let team_id = format!("team_{:08x}_{}", ts, inner.counter); + let team = Team { + team_id: team_id.clone(), + name: name.to_owned(), + task_ids, + status: TeamStatus::Created, + created_at: ts, + updated_at: ts, + }; + inner.teams.insert(team_id, team.clone()); + team + } + + /// Get a team by ID. + pub fn get(&self, team_id: &str) -> Option { + let inner = self.inner.lock().expect("team registry lock poisoned"); + inner.teams.get(team_id).cloned() + } + + /// List all teams. + pub fn list(&self) -> Vec { + let inner = self.inner.lock().expect("team registry lock poisoned"); + inner.teams.values().cloned().collect() + } + + /// Delete a team. + pub fn delete(&self, team_id: &str) -> Result { + let mut inner = self.inner.lock().expect("team registry lock poisoned"); + let team = inner + .teams + .get_mut(team_id) + .ok_or_else(|| format!("team not found: {team_id}"))?; + team.status = TeamStatus::Deleted; + team.updated_at = now_secs(); + Ok(team.clone()) + } + + /// Remove a team entirely from the registry. + pub fn remove(&self, team_id: &str) -> Option { + let mut inner = self.inner.lock().expect("team registry lock poisoned"); + inner.teams.remove(team_id) + } + + #[must_use] + pub fn len(&self) -> usize { + let inner = self.inner.lock().expect("team registry lock poisoned"); + inner.teams.len() + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +// ───────────────────────────────────────────── +// Cron registry +// ───────────────────────────────────────────── + +/// A cron entry schedules a prompt to run on a recurring schedule. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CronEntry { + pub cron_id: String, + pub schedule: String, + pub prompt: String, + pub description: Option, + pub enabled: bool, + pub created_at: u64, + pub updated_at: u64, + pub last_run_at: Option, + pub run_count: u64, +} + +/// Thread-safe cron registry. +#[derive(Debug, Clone, Default)] +pub struct CronRegistry { + inner: Arc>, +} + +#[derive(Debug, Default)] +struct CronInner { + entries: HashMap, + counter: u64, +} + +impl CronRegistry { + #[must_use] + pub fn new() -> Self { + Self::default() + } + + /// Create a new cron entry. + pub fn create(&self, schedule: &str, prompt: &str, description: Option<&str>) -> CronEntry { + let mut inner = self.inner.lock().expect("cron registry lock poisoned"); + inner.counter += 1; + let ts = now_secs(); + let cron_id = format!("cron_{:08x}_{}", ts, inner.counter); + let entry = CronEntry { + cron_id: cron_id.clone(), + schedule: schedule.to_owned(), + prompt: prompt.to_owned(), + description: description.map(str::to_owned), + enabled: true, + created_at: ts, + updated_at: ts, + last_run_at: None, + run_count: 0, + }; + inner.entries.insert(cron_id, entry.clone()); + entry + } + + /// Get a cron entry by ID. + pub fn get(&self, cron_id: &str) -> Option { + let inner = self.inner.lock().expect("cron registry lock poisoned"); + inner.entries.get(cron_id).cloned() + } + + /// List all cron entries, optionally filtered to enabled only. + pub fn list(&self, enabled_only: bool) -> Vec { + let inner = self.inner.lock().expect("cron registry lock poisoned"); + inner + .entries + .values() + .filter(|e| !enabled_only || e.enabled) + .cloned() + .collect() + } + + /// Delete (remove) a cron entry. + pub fn delete(&self, cron_id: &str) -> Result { + let mut inner = self.inner.lock().expect("cron registry lock poisoned"); + inner + .entries + .remove(cron_id) + .ok_or_else(|| format!("cron not found: {cron_id}")) + } + + /// Disable a cron entry without removing it. + pub fn disable(&self, cron_id: &str) -> Result<(), String> { + let mut inner = self.inner.lock().expect("cron registry lock poisoned"); + let entry = inner + .entries + .get_mut(cron_id) + .ok_or_else(|| format!("cron not found: {cron_id}"))?; + entry.enabled = false; + entry.updated_at = now_secs(); + Ok(()) + } + + /// Record a cron run. + pub fn record_run(&self, cron_id: &str) -> Result<(), String> { + let mut inner = self.inner.lock().expect("cron registry lock poisoned"); + let entry = inner + .entries + .get_mut(cron_id) + .ok_or_else(|| format!("cron not found: {cron_id}"))?; + entry.last_run_at = Some(now_secs()); + entry.run_count += 1; + entry.updated_at = now_secs(); + Ok(()) + } + + #[must_use] + pub fn len(&self) -> usize { + let inner = self.inner.lock().expect("cron registry lock poisoned"); + inner.entries.len() + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // ── Team tests ────────────────────────────────────── + + #[test] + fn creates_and_retrieves_team() { + let registry = TeamRegistry::new(); + let team = registry.create("Alpha Squad", vec!["task_001".into(), "task_002".into()]); + assert_eq!(team.name, "Alpha Squad"); + assert_eq!(team.task_ids.len(), 2); + assert_eq!(team.status, TeamStatus::Created); + + let fetched = registry.get(&team.team_id).expect("team should exist"); + assert_eq!(fetched.team_id, team.team_id); + } + + #[test] + fn lists_and_deletes_teams() { + let registry = TeamRegistry::new(); + let t1 = registry.create("Team A", vec![]); + let t2 = registry.create("Team B", vec![]); + + let all = registry.list(); + assert_eq!(all.len(), 2); + + let deleted = registry.delete(&t1.team_id).expect("delete should succeed"); + assert_eq!(deleted.status, TeamStatus::Deleted); + + // Team is still listable (soft delete) + let still_there = registry.get(&t1.team_id).unwrap(); + assert_eq!(still_there.status, TeamStatus::Deleted); + + // Hard remove + registry.remove(&t2.team_id); + assert_eq!(registry.len(), 1); + } + + #[test] + fn rejects_missing_team_operations() { + let registry = TeamRegistry::new(); + assert!(registry.delete("nonexistent").is_err()); + assert!(registry.get("nonexistent").is_none()); + } + + // ── Cron tests ────────────────────────────────────── + + #[test] + fn creates_and_retrieves_cron() { + let registry = CronRegistry::new(); + let entry = registry.create("0 * * * *", "Check status", Some("hourly check")); + assert_eq!(entry.schedule, "0 * * * *"); + assert_eq!(entry.prompt, "Check status"); + assert!(entry.enabled); + assert_eq!(entry.run_count, 0); + assert!(entry.last_run_at.is_none()); + + let fetched = registry.get(&entry.cron_id).expect("cron should exist"); + assert_eq!(fetched.cron_id, entry.cron_id); + } + + #[test] + fn lists_with_enabled_filter() { + let registry = CronRegistry::new(); + let c1 = registry.create("* * * * *", "Task 1", None); + let c2 = registry.create("0 * * * *", "Task 2", None); + registry + .disable(&c1.cron_id) + .expect("disable should succeed"); + + let all = registry.list(false); + assert_eq!(all.len(), 2); + + let enabled_only = registry.list(true); + assert_eq!(enabled_only.len(), 1); + assert_eq!(enabled_only[0].cron_id, c2.cron_id); + } + + #[test] + fn deletes_cron_entry() { + let registry = CronRegistry::new(); + let entry = registry.create("* * * * *", "To delete", None); + let deleted = registry + .delete(&entry.cron_id) + .expect("delete should succeed"); + assert_eq!(deleted.cron_id, entry.cron_id); + assert!(registry.get(&entry.cron_id).is_none()); + assert!(registry.is_empty()); + } + + #[test] + fn records_cron_runs() { + let registry = CronRegistry::new(); + let entry = registry.create("*/5 * * * *", "Recurring", None); + registry.record_run(&entry.cron_id).unwrap(); + registry.record_run(&entry.cron_id).unwrap(); + + let fetched = registry.get(&entry.cron_id).unwrap(); + assert_eq!(fetched.run_count, 2); + assert!(fetched.last_run_at.is_some()); + } + + #[test] + fn rejects_missing_cron_operations() { + let registry = CronRegistry::new(); + assert!(registry.delete("nonexistent").is_err()); + assert!(registry.disable("nonexistent").is_err()); + assert!(registry.record_run("nonexistent").is_err()); + assert!(registry.get("nonexistent").is_none()); + } +} diff --git a/rust/crates/tools/src/lib.rs b/rust/crates/tools/src/lib.rs index a9f65ad..34f32fd 100644 --- a/rust/crates/tools/src/lib.rs +++ b/rust/crates/tools/src/lib.rs @@ -12,15 +12,28 @@ use plugins::PluginTool; use reqwest::blocking::Client; use runtime::{ edit_file, execute_bash, glob_search, grep_search, load_system_prompt, read_file, - task_registry::TaskRegistry, write_file, ApiClient, ApiRequest, AssistantEvent, - BashCommandInput, ContentBlock, ConversationMessage, ConversationRuntime, GrepSearchInput, - MessageRole, PermissionMode, PermissionPolicy, PromptCacheEvent, RuntimeError, Session, - ToolError, ToolExecutor, + task_registry::TaskRegistry, + team_cron_registry::{CronRegistry, TeamRegistry}, + write_file, ApiClient, ApiRequest, AssistantEvent, BashCommandInput, ContentBlock, + ConversationMessage, ConversationRuntime, GrepSearchInput, MessageRole, PermissionMode, + PermissionPolicy, PromptCacheEvent, RuntimeError, Session, ToolError, ToolExecutor, }; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; /// Global task registry shared across tool invocations within a session. +fn global_team_registry() -> &'static TeamRegistry { + use std::sync::OnceLock; + static REGISTRY: OnceLock = OnceLock::new(); + REGISTRY.get_or_init(TeamRegistry::new) +} + +fn global_cron_registry() -> &'static CronRegistry { + use std::sync::OnceLock; + static REGISTRY: OnceLock = OnceLock::new(); + REGISTRY.get_or_init(CronRegistry::new) +} + fn global_task_registry() -> &'static TaskRegistry { use std::sync::OnceLock; static REGISTRY: OnceLock = OnceLock::new(); @@ -1007,59 +1020,86 @@ fn run_task_output(input: TaskIdInput) -> Result { #[allow(clippy::needless_pass_by_value)] fn run_team_create(input: TeamCreateInput) -> Result { - let team_id = format!( - "team_{:08x}", - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs() - ); + let task_ids: Vec = input + .tasks + .iter() + .filter_map(|t| t.get("task_id").and_then(|v| v.as_str()).map(str::to_owned)) + .collect(); + let team = global_team_registry().create(&input.name, task_ids); + // Register team assignment on each task + for task_id in &team.task_ids { + let _ = global_task_registry().assign_team(task_id, &team.team_id); + } to_pretty_json(json!({ - "team_id": team_id, - "name": input.name, - "task_count": input.tasks.len(), - "status": "created" + "team_id": team.team_id, + "name": team.name, + "task_count": team.task_ids.len(), + "task_ids": team.task_ids, + "status": team.status, + "created_at": team.created_at })) } #[allow(clippy::needless_pass_by_value)] fn run_team_delete(input: TeamDeleteInput) -> Result { - to_pretty_json(json!({ - "team_id": input.team_id, - "status": "deleted" - })) + match global_team_registry().delete(&input.team_id) { + Ok(team) => to_pretty_json(json!({ + "team_id": team.team_id, + "name": team.name, + "status": team.status, + "message": "Team deleted" + })), + Err(e) => Err(e), + } } #[allow(clippy::needless_pass_by_value)] fn run_cron_create(input: CronCreateInput) -> Result { - let cron_id = format!( - "cron_{:08x}", - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs() - ); + let entry = + global_cron_registry().create(&input.schedule, &input.prompt, input.description.as_deref()); to_pretty_json(json!({ - "cron_id": cron_id, - "schedule": input.schedule, - "prompt": input.prompt, - "description": input.description, - "status": "created" + "cron_id": entry.cron_id, + "schedule": entry.schedule, + "prompt": entry.prompt, + "description": entry.description, + "enabled": entry.enabled, + "created_at": entry.created_at })) } #[allow(clippy::needless_pass_by_value)] fn run_cron_delete(input: CronDeleteInput) -> Result { - to_pretty_json(json!({ - "cron_id": input.cron_id, - "status": "deleted" - })) + match global_cron_registry().delete(&input.cron_id) { + Ok(entry) => to_pretty_json(json!({ + "cron_id": entry.cron_id, + "schedule": entry.schedule, + "status": "deleted", + "message": "Cron entry removed" + })), + Err(e) => Err(e), + } } fn run_cron_list(_input: Value) -> Result { + let entries: Vec<_> = global_cron_registry() + .list(false) + .into_iter() + .map(|e| { + json!({ + "cron_id": e.cron_id, + "schedule": e.schedule, + "prompt": e.prompt, + "description": e.description, + "enabled": e.enabled, + "run_count": e.run_count, + "last_run_at": e.last_run_at, + "created_at": e.created_at + }) + }) + .collect(); to_pretty_json(json!({ - "crons": [], - "message": "No scheduled tasks found" + "crons": entries, + "count": entries.len() })) }