From 736069f1ab45a4e90703130188732b7e5ac13620 Mon Sep 17 00:00:00 2001 From: Jobdori Date: Sat, 4 Apr 2026 19:37:57 +0900 Subject: [PATCH] feat(worker_boot): classify session completion failures (P2.13) Add WorkerFailureKind::Provider variant and observe_completion() method to classify degraded session completions as structured failures. - Detects finish='unknown' + zero tokens as provider failure - Detects finish='error' as provider failure - Normal completions transition to Finished state - 2 new tests verify classification behavior This closes the gap where sessions complete but produce no output, and the failure mode wasn't machine-readable for recovery policy. ROADMAP P2.13 backlog item added. --- ROADMAP.md | 1 + rust/crates/runtime/src/worker_boot.rs | 105 ++++++++++++++++++ .../crates/runtime/tests/integration_tests.rs | 34 +++--- 3 files changed, 122 insertions(+), 18 deletions(-) diff --git a/ROADMAP.md b/ROADMAP.md index 19c11cb..c003550 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -287,6 +287,7 @@ Priority order: P0 = blocks CI/green state, P1 = blocks integration wiring, P2 = 10. MCP structured degraded-startup reporting 11. Structured task packet format 12. Lane board / machine-readable status API +13. **Session completion failure classification** — sessions that complete with `finish: "unknown"` or zero output (provider errors, context exhaustion, etc.) are not classified by `WorkerFailureKind`; worker boot state machine cannot surface these as structured failures for recovery policy **P3 — Swarm efficiency** 13. Swarm branch-lock protocol — detect same-module/same-branch collision before parallel workers drift into duplicate implementation diff --git a/rust/crates/runtime/src/worker_boot.rs b/rust/crates/runtime/src/worker_boot.rs index d276779..08b5af5 100644 --- a/rust/crates/runtime/src/worker_boot.rs +++ b/rust/crates/runtime/src/worker_boot.rs @@ -52,6 +52,7 @@ pub enum WorkerFailureKind { TrustGate, PromptDelivery, Protocol, + Provider, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -390,6 +391,60 @@ impl WorkerRegistry { ); Ok(worker.clone()) } + + /// Classify session completion and transition worker to appropriate terminal state. + /// Detects degraded completions (finish="unknown" with zero tokens) as provider failures. + pub fn observe_completion( + &self, + worker_id: &str, + finish_reason: &str, + tokens_output: u64, + ) -> Result { + let mut inner = self.inner.lock().expect("worker registry lock poisoned"); + let worker = inner + .workers + .get_mut(worker_id) + .ok_or_else(|| format!("worker not found: {worker_id}"))?; + + // Detect degraded completion: finish=unknown with zero output is provider failure + let is_provider_failure = + (finish_reason == "unknown" && tokens_output == 0) || finish_reason == "error"; + + if is_provider_failure { + let message = if finish_reason == "unknown" && tokens_output == 0 { + "session completed with finish='unknown' and zero output — provider degraded or context exhausted".to_string() + } else { + format!("session failed with finish='{finish_reason}' — provider error") + }; + + worker.last_error = Some(WorkerFailure { + kind: WorkerFailureKind::Provider, + message, + created_at: now_secs(), + }); + worker.status = WorkerStatus::Failed; + push_event( + worker, + WorkerEventKind::Failed, + WorkerStatus::Failed, + Some("provider failure classified".to_string()), + ); + } else { + // Normal completion + worker.status = WorkerStatus::Finished; + worker.last_error = None; + push_event( + worker, + WorkerEventKind::Finished, + WorkerStatus::Finished, + Some(format!( + "session completed: finish='{finish_reason}', tokens={tokens_output}" + )), + ); + } + + Ok(worker.clone()) + } } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -729,4 +784,54 @@ mod tests { .iter() .any(|event| event.kind == WorkerEventKind::Finished)); } + + #[test] + fn observe_completion_classifies_provider_failure_on_unknown_finish_zero_tokens() { + let registry = WorkerRegistry::new(); + let worker = registry.create("/tmp/repo-f", &[], true); + registry + .observe(&worker.worker_id, "Ready for input\n>") + .expect("ready observe should succeed"); + registry + .send_prompt(&worker.worker_id, Some("Run tests")) + .expect("prompt send should succeed"); + + // Simulate degraded completion: finish="unknown", zero output + let failed = registry + .observe_completion(&worker.worker_id, "unknown", 0) + .expect("completion observe should succeed"); + + assert_eq!(failed.status, WorkerStatus::Failed); + let error = failed.last_error.expect("provider error should exist"); + assert_eq!(error.kind, WorkerFailureKind::Provider); + assert!(error.message.contains("provider degraded")); + assert!(failed + .events + .iter() + .any(|event| event.kind == WorkerEventKind::Failed)); + } + + #[test] + fn observe_completion_accepts_normal_finish_with_tokens() { + let registry = WorkerRegistry::new(); + let worker = registry.create("/tmp/repo-g", &[], true); + registry + .observe(&worker.worker_id, "Ready for input\n>") + .expect("ready observe should succeed"); + registry + .send_prompt(&worker.worker_id, Some("Run tests")) + .expect("prompt send should succeed"); + + // Normal completion with output + let finished = registry + .observe_completion(&worker.worker_id, "stop", 150) + .expect("completion observe should succeed"); + + assert_eq!(finished.status, WorkerStatus::Finished); + assert!(finished.last_error.is_none()); + assert!(finished + .events + .iter() + .any(|event| event.kind == WorkerEventKind::Finished)); + } } diff --git a/rust/crates/runtime/tests/integration_tests.rs b/rust/crates/runtime/tests/integration_tests.rs index 030ff4c..3b5cddd 100644 --- a/rust/crates/runtime/tests/integration_tests.rs +++ b/rust/crates/runtime/tests/integration_tests.rs @@ -5,12 +5,12 @@ use std::time::Duration; +use runtime::green_contract::{GreenContract, GreenContractOutcome, GreenLevel}; use runtime::{ - apply_policy, BranchFreshness, DiffScope, LaneBlocker, - LaneContext, PolicyAction, PolicyCondition, PolicyEngine, PolicyRule, - ReconcileReason, ReviewStatus, StaleBranchAction, StaleBranchPolicy, + apply_policy, BranchFreshness, DiffScope, LaneBlocker, LaneContext, PolicyAction, + PolicyCondition, PolicyEngine, PolicyRule, ReconcileReason, ReviewStatus, StaleBranchAction, + StaleBranchPolicy, }; -use runtime::green_contract::{GreenLevel, GreenContract, GreenContractOutcome}; /// stale_branch + policy_engine integration: /// When a branch is detected stale, does it correctly flow through @@ -211,7 +211,7 @@ fn end_to_end_stale_lane_gets_merge_forward_action() { // when: build context and evaluate policy let context = LaneContext::new( "lane-9411", - 3, // Workspace green + 3, // Workspace green Duration::from_secs(5 * 60 * 60), // 5 hours stale, definitely over threshold LaneBlocker::None, ReviewStatus::Approved, @@ -260,7 +260,7 @@ fn end_to_end_stale_lane_gets_merge_forward_action() { fn fresh_approved_lane_gets_merge_action() { let context = LaneContext::new( "fresh-approved-lane", - 3, // Workspace green + 3, // Workspace green Duration::from_secs(30 * 60), // 30 min — under 1 hour threshold = fresh LaneBlocker::None, ReviewStatus::Approved, @@ -268,18 +268,16 @@ fn fresh_approved_lane_gets_merge_action() { false, ); - let engine = PolicyEngine::new(vec![ - PolicyRule::new( - "merge-if-green-approved-not-stale", - PolicyCondition::And(vec![ - PolicyCondition::GreenAt { level: 3 }, - PolicyCondition::ReviewPassed, - // NOT PolicyCondition::StaleBranch — fresh lanes bypass this - ]), - PolicyAction::MergeToDev, - 5, - ), - ]); + let engine = PolicyEngine::new(vec![PolicyRule::new( + "merge-if-green-approved-not-stale", + PolicyCondition::And(vec![ + PolicyCondition::GreenAt { level: 3 }, + PolicyCondition::ReviewPassed, + // NOT PolicyCondition::StaleBranch — fresh lanes bypass this + ]), + PolicyAction::MergeToDev, + 5, + )]); let actions = engine.evaluate(&context); assert_eq!(actions, vec![PolicyAction::MergeToDev]);