feat(worker_boot): classify session completion failures (P2.13)

Add WorkerFailureKind::Provider variant and observe_completion() method
to classify degraded session completions as structured failures.

- Detects finish='unknown' + zero tokens as provider failure
- Detects finish='error' as provider failure
- Normal completions transition to Finished state
- 2 new tests verify classification behavior

This closes the gap where sessions complete but produce no output,
and the failure mode wasn't machine-readable for recovery policy.

ROADMAP P2.13 backlog item added.
This commit is contained in:
Jobdori
2026-04-04 19:37:57 +09:00
parent 69b9232acf
commit 736069f1ab
3 changed files with 122 additions and 18 deletions

View File

@@ -287,6 +287,7 @@ Priority order: P0 = blocks CI/green state, P1 = blocks integration wiring, P2 =
10. MCP structured degraded-startup reporting
11. Structured task packet format
12. Lane board / machine-readable status API
13. **Session completion failure classification** — sessions that complete with `finish: "unknown"` or zero output (provider errors, context exhaustion, etc.) are not classified by `WorkerFailureKind`; worker boot state machine cannot surface these as structured failures for recovery policy
**P3 — Swarm efficiency**
13. Swarm branch-lock protocol — detect same-module/same-branch collision before parallel workers drift into duplicate implementation

View File

@@ -52,6 +52,7 @@ pub enum WorkerFailureKind {
TrustGate,
PromptDelivery,
Protocol,
Provider,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -390,6 +391,60 @@ impl WorkerRegistry {
);
Ok(worker.clone())
}
/// Classify session completion and transition worker to appropriate terminal state.
/// Detects degraded completions (finish="unknown" with zero tokens) as provider failures.
pub fn observe_completion(
&self,
worker_id: &str,
finish_reason: &str,
tokens_output: u64,
) -> Result<Worker, String> {
let mut inner = self.inner.lock().expect("worker registry lock poisoned");
let worker = inner
.workers
.get_mut(worker_id)
.ok_or_else(|| format!("worker not found: {worker_id}"))?;
// Detect degraded completion: finish=unknown with zero output is provider failure
let is_provider_failure =
(finish_reason == "unknown" && tokens_output == 0) || finish_reason == "error";
if is_provider_failure {
let message = if finish_reason == "unknown" && tokens_output == 0 {
"session completed with finish='unknown' and zero output — provider degraded or context exhausted".to_string()
} else {
format!("session failed with finish='{finish_reason}' — provider error")
};
worker.last_error = Some(WorkerFailure {
kind: WorkerFailureKind::Provider,
message,
created_at: now_secs(),
});
worker.status = WorkerStatus::Failed;
push_event(
worker,
WorkerEventKind::Failed,
WorkerStatus::Failed,
Some("provider failure classified".to_string()),
);
} else {
// Normal completion
worker.status = WorkerStatus::Finished;
worker.last_error = None;
push_event(
worker,
WorkerEventKind::Finished,
WorkerStatus::Finished,
Some(format!(
"session completed: finish='{finish_reason}', tokens={tokens_output}"
)),
);
}
Ok(worker.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -729,4 +784,54 @@ mod tests {
.iter()
.any(|event| event.kind == WorkerEventKind::Finished));
}
#[test]
fn observe_completion_classifies_provider_failure_on_unknown_finish_zero_tokens() {
let registry = WorkerRegistry::new();
let worker = registry.create("/tmp/repo-f", &[], true);
registry
.observe(&worker.worker_id, "Ready for input\n>")
.expect("ready observe should succeed");
registry
.send_prompt(&worker.worker_id, Some("Run tests"))
.expect("prompt send should succeed");
// Simulate degraded completion: finish="unknown", zero output
let failed = registry
.observe_completion(&worker.worker_id, "unknown", 0)
.expect("completion observe should succeed");
assert_eq!(failed.status, WorkerStatus::Failed);
let error = failed.last_error.expect("provider error should exist");
assert_eq!(error.kind, WorkerFailureKind::Provider);
assert!(error.message.contains("provider degraded"));
assert!(failed
.events
.iter()
.any(|event| event.kind == WorkerEventKind::Failed));
}
#[test]
fn observe_completion_accepts_normal_finish_with_tokens() {
let registry = WorkerRegistry::new();
let worker = registry.create("/tmp/repo-g", &[], true);
registry
.observe(&worker.worker_id, "Ready for input\n>")
.expect("ready observe should succeed");
registry
.send_prompt(&worker.worker_id, Some("Run tests"))
.expect("prompt send should succeed");
// Normal completion with output
let finished = registry
.observe_completion(&worker.worker_id, "stop", 150)
.expect("completion observe should succeed");
assert_eq!(finished.status, WorkerStatus::Finished);
assert!(finished.last_error.is_none());
assert!(finished
.events
.iter()
.any(|event| event.kind == WorkerEventKind::Finished));
}
}

View File

@@ -5,12 +5,12 @@
use std::time::Duration;
use runtime::green_contract::{GreenContract, GreenContractOutcome, GreenLevel};
use runtime::{
apply_policy, BranchFreshness, DiffScope, LaneBlocker,
LaneContext, PolicyAction, PolicyCondition, PolicyEngine, PolicyRule,
ReconcileReason, ReviewStatus, StaleBranchAction, StaleBranchPolicy,
apply_policy, BranchFreshness, DiffScope, LaneBlocker, LaneContext, PolicyAction,
PolicyCondition, PolicyEngine, PolicyRule, ReconcileReason, ReviewStatus, StaleBranchAction,
StaleBranchPolicy,
};
use runtime::green_contract::{GreenLevel, GreenContract, GreenContractOutcome};
/// stale_branch + policy_engine integration:
/// When a branch is detected stale, does it correctly flow through
@@ -211,7 +211,7 @@ fn end_to_end_stale_lane_gets_merge_forward_action() {
// when: build context and evaluate policy
let context = LaneContext::new(
"lane-9411",
3, // Workspace green
3, // Workspace green
Duration::from_secs(5 * 60 * 60), // 5 hours stale, definitely over threshold
LaneBlocker::None,
ReviewStatus::Approved,
@@ -260,7 +260,7 @@ fn end_to_end_stale_lane_gets_merge_forward_action() {
fn fresh_approved_lane_gets_merge_action() {
let context = LaneContext::new(
"fresh-approved-lane",
3, // Workspace green
3, // Workspace green
Duration::from_secs(30 * 60), // 30 min — under 1 hour threshold = fresh
LaneBlocker::None,
ReviewStatus::Approved,
@@ -268,18 +268,16 @@ fn fresh_approved_lane_gets_merge_action() {
false,
);
let engine = PolicyEngine::new(vec![
PolicyRule::new(
"merge-if-green-approved-not-stale",
PolicyCondition::And(vec![
PolicyCondition::GreenAt { level: 3 },
PolicyCondition::ReviewPassed,
// NOT PolicyCondition::StaleBranch — fresh lanes bypass this
]),
PolicyAction::MergeToDev,
5,
),
]);
let engine = PolicyEngine::new(vec![PolicyRule::new(
"merge-if-green-approved-not-stale",
PolicyCondition::And(vec![
PolicyCondition::GreenAt { level: 3 },
PolicyCondition::ReviewPassed,
// NOT PolicyCondition::StaleBranch — fresh lanes bypass this
]),
PolicyAction::MergeToDev,
5,
)]);
let actions = engine.evaluate(&context);
assert_eq!(actions, vec![PolicyAction::MergeToDev]);