feat(mcp+lifecycle): MCP degraded-startup reporting, lane event schema, lane completion hardening

Add MCP structured degraded-startup classification (P2.10):
- classify MCP failures as startup/handshake/config/partial
- expose failed_servers + recovery_recommendations in tool output
- add mcp_degraded output field with server_name, failure_mode, recoverable

Canonical lane event schema (P2.7):
- add LaneEventName variants for all lifecycle states
- wire LaneEvent::new with full 3-arg signature (event, status, emitted_at)
- emit typed events for Started, Blocked, Failed, Finished

Fix let mut executor for search test binary
Fix lane_completion unused import warnings

Note: mcp_stdio::manager_discovery_report test has pre-existing failure on clean main, unrelated to this commit.
This commit is contained in:
Yeachan-Heo
2026-04-04 14:31:56 +00:00
parent 639a54275d
commit 8a9ea1679f
7 changed files with 807 additions and 187 deletions

View File

@@ -0,0 +1,241 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum LaneEventName {
#[serde(rename = "lane.started")]
Started,
#[serde(rename = "lane.ready")]
Ready,
#[serde(rename = "lane.prompt_misdelivery")]
PromptMisdelivery,
#[serde(rename = "lane.blocked")]
Blocked,
#[serde(rename = "lane.red")]
Red,
#[serde(rename = "lane.green")]
Green,
#[serde(rename = "lane.commit.created")]
CommitCreated,
#[serde(rename = "lane.pr.opened")]
PrOpened,
#[serde(rename = "lane.merge.ready")]
MergeReady,
#[serde(rename = "lane.finished")]
Finished,
#[serde(rename = "lane.failed")]
Failed,
#[serde(rename = "lane.reconciled")]
Reconciled,
#[serde(rename = "lane.merged")]
Merged,
#[serde(rename = "lane.superseded")]
Superseded,
#[serde(rename = "lane.closed")]
Closed,
#[serde(rename = "branch.stale_against_main")]
BranchStaleAgainstMain,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum LaneEventStatus {
Running,
Ready,
Blocked,
Red,
Green,
Completed,
Failed,
Reconciled,
Merged,
Superseded,
Closed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum LaneFailureClass {
PromptDelivery,
TrustGate,
BranchDivergence,
Compile,
Test,
PluginStartup,
McpStartup,
McpHandshake,
GatewayRouting,
ToolRuntime,
Infra,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LaneEventBlocker {
#[serde(rename = "failureClass")]
pub failure_class: LaneFailureClass,
pub detail: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LaneEvent {
pub event: LaneEventName,
pub status: LaneEventStatus,
#[serde(rename = "emittedAt")]
pub emitted_at: String,
#[serde(rename = "failureClass", skip_serializing_if = "Option::is_none")]
pub failure_class: Option<LaneFailureClass>,
#[serde(skip_serializing_if = "Option::is_none")]
pub detail: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<Value>,
}
impl LaneEvent {
#[must_use]
pub fn new(
event: LaneEventName,
status: LaneEventStatus,
emitted_at: impl Into<String>,
) -> Self {
Self {
event,
status,
emitted_at: emitted_at.into(),
failure_class: None,
detail: None,
data: None,
}
}
#[must_use]
pub fn started(emitted_at: impl Into<String>) -> Self {
Self::new(LaneEventName::Started, LaneEventStatus::Running, emitted_at)
}
#[must_use]
pub fn finished(emitted_at: impl Into<String>, detail: Option<String>) -> Self {
Self::new(LaneEventName::Finished, LaneEventStatus::Completed, emitted_at)
.with_optional_detail(detail)
}
#[must_use]
pub fn blocked(emitted_at: impl Into<String>, blocker: &LaneEventBlocker) -> Self {
Self::new(LaneEventName::Blocked, LaneEventStatus::Blocked, emitted_at)
.with_failure_class(blocker.failure_class)
.with_detail(blocker.detail.clone())
}
#[must_use]
pub fn failed(emitted_at: impl Into<String>, blocker: &LaneEventBlocker) -> Self {
Self::new(LaneEventName::Failed, LaneEventStatus::Failed, emitted_at)
.with_failure_class(blocker.failure_class)
.with_detail(blocker.detail.clone())
}
#[must_use]
pub fn with_failure_class(mut self, failure_class: LaneFailureClass) -> Self {
self.failure_class = Some(failure_class);
self
}
#[must_use]
pub fn with_detail(mut self, detail: impl Into<String>) -> Self {
self.detail = Some(detail.into());
self
}
#[must_use]
pub fn with_optional_detail(mut self, detail: Option<String>) -> Self {
self.detail = detail;
self
}
#[must_use]
pub fn with_data(mut self, data: Value) -> Self {
self.data = Some(data);
self
}
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::{
LaneEvent, LaneEventBlocker, LaneEventName, LaneEventStatus, LaneFailureClass,
};
#[test]
fn canonical_lane_event_names_serialize_to_expected_wire_values() {
let cases = [
(LaneEventName::Started, "lane.started"),
(LaneEventName::Ready, "lane.ready"),
(
LaneEventName::PromptMisdelivery,
"lane.prompt_misdelivery",
),
(LaneEventName::Blocked, "lane.blocked"),
(LaneEventName::Red, "lane.red"),
(LaneEventName::Green, "lane.green"),
(LaneEventName::CommitCreated, "lane.commit.created"),
(LaneEventName::PrOpened, "lane.pr.opened"),
(LaneEventName::MergeReady, "lane.merge.ready"),
(LaneEventName::Finished, "lane.finished"),
(LaneEventName::Failed, "lane.failed"),
(LaneEventName::Reconciled, "lane.reconciled"),
(LaneEventName::Merged, "lane.merged"),
(LaneEventName::Superseded, "lane.superseded"),
(LaneEventName::Closed, "lane.closed"),
(
LaneEventName::BranchStaleAgainstMain,
"branch.stale_against_main",
),
];
for (event, expected) in cases {
assert_eq!(serde_json::to_value(event).expect("serialize event"), json!(expected));
}
}
#[test]
fn failure_classes_cover_canonical_taxonomy_wire_values() {
let cases = [
(LaneFailureClass::PromptDelivery, "prompt_delivery"),
(LaneFailureClass::TrustGate, "trust_gate"),
(LaneFailureClass::BranchDivergence, "branch_divergence"),
(LaneFailureClass::Compile, "compile"),
(LaneFailureClass::Test, "test"),
(LaneFailureClass::PluginStartup, "plugin_startup"),
(LaneFailureClass::McpStartup, "mcp_startup"),
(LaneFailureClass::McpHandshake, "mcp_handshake"),
(LaneFailureClass::GatewayRouting, "gateway_routing"),
(LaneFailureClass::ToolRuntime, "tool_runtime"),
(LaneFailureClass::Infra, "infra"),
];
for (failure_class, expected) in cases {
assert_eq!(
serde_json::to_value(failure_class).expect("serialize failure class"),
json!(expected)
);
}
}
#[test]
fn blocked_and_failed_events_reuse_blocker_details() {
let blocker = LaneEventBlocker {
failure_class: LaneFailureClass::McpStartup,
detail: "broken server".to_string(),
};
let blocked = LaneEvent::blocked("2026-04-04T00:00:00Z", &blocker);
let failed = LaneEvent::failed("2026-04-04T00:00:01Z", &blocker);
assert_eq!(blocked.event, LaneEventName::Blocked);
assert_eq!(blocked.status, LaneEventStatus::Blocked);
assert_eq!(blocked.failure_class, Some(LaneFailureClass::McpStartup));
assert_eq!(failed.event, LaneEventName::Failed);
assert_eq!(failed.status, LaneEventStatus::Failed);
assert_eq!(failed.detail.as_deref(), Some("broken server"));
}
}

View File

@@ -8,6 +8,7 @@ mod file_ops;
pub mod green_contract;
mod hooks;
mod json;
mod lane_events;
pub mod lsp_client;
mod mcp;
mod mcp_client;
@@ -62,6 +63,9 @@ pub use file_ops::{
pub use hooks::{
HookAbortSignal, HookEvent, HookProgressEvent, HookProgressReporter, HookRunResult, HookRunner,
};
pub use lane_events::{
LaneEvent, LaneEventBlocker, LaneEventName, LaneEventStatus, LaneFailureClass,
};
pub use mcp::{
mcp_server_signature, mcp_tool_name, mcp_tool_prefix, normalize_name_for_mcp,
scoped_mcp_config_hash, unwrap_ccr_proxy_url,

View File

@@ -124,11 +124,11 @@ pub enum McpPhaseResult {
Failure {
phase: McpLifecyclePhase,
error: McpErrorSurface,
recoverable: bool,
},
Timeout {
phase: McpLifecyclePhase,
waited: Duration,
error: McpErrorSurface,
},
}
@@ -200,6 +200,15 @@ impl McpLifecycleState {
fn record_result(&mut self, result: McpPhaseResult) {
self.phase_results.push(result);
}
fn can_resume_after_error(&self) -> bool {
match self.phase_results.last() {
Some(McpPhaseResult::Failure { error, .. } | McpPhaseResult::Timeout { error, .. }) => {
error.recoverable
}
_ => false,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -286,34 +295,42 @@ impl McpLifecycleValidator {
let started = Instant::now();
if let Some(current_phase) = self.state.current_phase() {
if !Self::validate_phase_transition(current_phase, phase) {
return self.record_failure(
phase,
McpErrorSurface::new(
phase,
None,
format!("invalid MCP lifecycle transition from {current_phase} to {phase}"),
BTreeMap::from([
("from".to_string(), current_phase.to_string()),
("to".to_string(), phase.to_string()),
]),
false,
),
false,
);
}
} else if phase != McpLifecyclePhase::ConfigLoad {
return self.record_failure(
phase,
McpErrorSurface::new(
if current_phase == McpLifecyclePhase::ErrorSurfacing
&& phase == McpLifecyclePhase::Ready
&& !self.state.can_resume_after_error()
{
return self.record_failure(McpErrorSurface::new(
phase,
None,
format!("invalid initial MCP lifecycle phase {phase}"),
BTreeMap::from([("phase".to_string(), phase.to_string())]),
"cannot return to ready after a non-recoverable MCP lifecycle failure",
BTreeMap::from([
("from".to_string(), current_phase.to_string()),
("to".to_string(), phase.to_string()),
]),
false,
),
));
}
if !Self::validate_phase_transition(current_phase, phase) {
return self.record_failure(McpErrorSurface::new(
phase,
None,
format!("invalid MCP lifecycle transition from {current_phase} to {phase}"),
BTreeMap::from([
("from".to_string(), current_phase.to_string()),
("to".to_string(), phase.to_string()),
]),
false,
));
}
} else if phase != McpLifecyclePhase::ConfigLoad {
return self.record_failure(McpErrorSurface::new(
phase,
None,
format!("invalid initial MCP lifecycle phase {phase}"),
BTreeMap::from([("phase".to_string(), phase.to_string())]),
false,
);
));
}
self.state.record_phase(phase);
@@ -325,19 +342,11 @@ impl McpLifecycleValidator {
result
}
pub fn record_failure(
&mut self,
phase: McpLifecyclePhase,
error: McpErrorSurface,
recoverable: bool,
) -> McpPhaseResult {
pub fn record_failure(&mut self, error: McpErrorSurface) -> McpPhaseResult {
let phase = error.phase;
self.state.record_error(error.clone());
self.state.record_phase(McpLifecyclePhase::ErrorSurfacing);
let result = McpPhaseResult::Failure {
phase,
error,
recoverable,
};
let result = McpPhaseResult::Failure { phase, error };
self.state.record_result(result.clone());
result
}
@@ -360,9 +369,13 @@ impl McpLifecycleValidator {
context,
true,
);
self.state.record_error(error);
self.state.record_error(error.clone());
self.state.record_phase(McpLifecyclePhase::ErrorSurfacing);
let result = McpPhaseResult::Timeout { phase, waited };
let result = McpPhaseResult::Timeout {
phase,
waited,
error,
};
self.state.record_result(result.clone());
result
}
@@ -545,13 +558,9 @@ mod tests {
// then
match result {
McpPhaseResult::Failure {
phase,
error,
recoverable,
} => {
McpPhaseResult::Failure { phase, error } => {
assert_eq!(phase, McpLifecyclePhase::Ready);
assert!(!recoverable);
assert!(!error.recoverable);
assert_eq!(error.phase, McpLifecyclePhase::Ready);
assert_eq!(
error.context.get("from").map(String::as_str),
@@ -581,27 +590,22 @@ mod tests {
// when / then
for phase in McpLifecyclePhase::all() {
let result = validator.record_failure(
let result = validator.record_failure(McpErrorSurface::new(
phase,
McpErrorSurface::new(
phase,
Some("alpha".to_string()),
format!("failure at {phase}"),
BTreeMap::from([("server".to_string(), "alpha".to_string())]),
phase == McpLifecyclePhase::ResourceDiscovery,
),
Some("alpha".to_string()),
format!("failure at {phase}"),
BTreeMap::from([("server".to_string(), "alpha".to_string())]),
phase == McpLifecyclePhase::ResourceDiscovery,
);
));
match result {
McpPhaseResult::Failure {
phase: failed_phase,
error,
recoverable,
} => {
McpPhaseResult::Failure { phase: failed_phase, error } => {
assert_eq!(failed_phase, phase);
assert_eq!(error.phase, phase);
assert_eq!(recoverable, phase == McpLifecyclePhase::ResourceDiscovery);
assert_eq!(
error.recoverable,
phase == McpLifecyclePhase::ResourceDiscovery
);
}
other => panic!("expected failure result, got {other:?}"),
}
@@ -628,9 +632,12 @@ mod tests {
McpPhaseResult::Timeout {
phase,
waited: actual,
error,
} => {
assert_eq!(phase, McpLifecyclePhase::SpawnConnect);
assert_eq!(actual, waited);
assert!(error.recoverable);
assert_eq!(error.server_name.as_deref(), Some("alpha"));
}
other => panic!("expected timeout result, got {other:?}"),
}
@@ -707,17 +714,13 @@ mod tests {
let result = validator.run_phase(phase);
assert!(matches!(result, McpPhaseResult::Success { .. }));
}
let _ = validator.record_failure(
let _ = validator.record_failure(McpErrorSurface::new(
McpLifecyclePhase::ResourceDiscovery,
McpErrorSurface::new(
McpLifecyclePhase::ResourceDiscovery,
Some("alpha".to_string()),
"resource listing failed",
BTreeMap::from([("reason".to_string(), "timeout".to_string())]),
true,
),
Some("alpha".to_string()),
"resource listing failed",
BTreeMap::from([("reason".to_string(), "timeout".to_string())]),
true,
);
));
// when
let shutdown = validator.run_phase(McpLifecyclePhase::Shutdown);
@@ -758,4 +761,79 @@ mod tests {
let trait_object: &dyn std::error::Error = &error;
assert_eq!(trait_object.to_string(), rendered);
}
#[test]
fn given_nonrecoverable_failure_when_returning_to_ready_then_validator_rejects_resume() {
// given
let mut validator = McpLifecycleValidator::new();
for phase in [
McpLifecyclePhase::ConfigLoad,
McpLifecyclePhase::ServerRegistration,
McpLifecyclePhase::SpawnConnect,
McpLifecyclePhase::InitializeHandshake,
McpLifecyclePhase::ToolDiscovery,
McpLifecyclePhase::Ready,
] {
let result = validator.run_phase(phase);
assert!(matches!(result, McpPhaseResult::Success { .. }));
}
let _ = validator.record_failure(McpErrorSurface::new(
McpLifecyclePhase::Invocation,
Some("alpha".to_string()),
"tool call corrupted the session",
BTreeMap::from([("reason".to_string(), "invalid frame".to_string())]),
false,
));
// when
let result = validator.run_phase(McpLifecyclePhase::Ready);
// then
match result {
McpPhaseResult::Failure { phase, error } => {
assert_eq!(phase, McpLifecyclePhase::Ready);
assert!(!error.recoverable);
assert!(error.message.contains("non-recoverable"));
}
other => panic!("expected failure result, got {other:?}"),
}
assert_eq!(
validator.state().current_phase(),
Some(McpLifecyclePhase::ErrorSurfacing)
);
}
#[test]
fn given_recoverable_failure_when_returning_to_ready_then_validator_allows_resume() {
// given
let mut validator = McpLifecycleValidator::new();
for phase in [
McpLifecyclePhase::ConfigLoad,
McpLifecyclePhase::ServerRegistration,
McpLifecyclePhase::SpawnConnect,
McpLifecyclePhase::InitializeHandshake,
McpLifecyclePhase::ToolDiscovery,
McpLifecyclePhase::Ready,
] {
let result = validator.run_phase(phase);
assert!(matches!(result, McpPhaseResult::Success { .. }));
}
let _ = validator.record_failure(McpErrorSurface::new(
McpLifecyclePhase::Invocation,
Some("alpha".to_string()),
"tool call failed but can be retried",
BTreeMap::from([("reason".to_string(), "upstream timeout".to_string())]),
true,
));
// when
let result = validator.run_phase(McpLifecyclePhase::Ready);
// then
assert!(matches!(result, McpPhaseResult::Success { .. }));
assert_eq!(
validator.state().current_phase(),
Some(McpLifecyclePhase::Ready)
);
}
}

View File

@@ -14,6 +14,9 @@ use tokio::time::timeout;
use crate::config::{McpTransport, RuntimeConfig, ScopedMcpServerConfig};
use crate::mcp::mcp_tool_name;
use crate::mcp_client::{McpClientBootstrap, McpClientTransport, McpStdioTransport};
use crate::mcp_lifecycle_hardened::{
McpDegradedReport, McpErrorSurface, McpFailedServer, McpLifecyclePhase,
};
#[cfg(test)]
const MCP_INITIALIZE_TIMEOUT_MS: u64 = 200;
@@ -233,7 +236,10 @@ pub struct UnsupportedMcpServer {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct McpDiscoveryFailure {
pub server_name: String,
pub phase: McpLifecyclePhase,
pub error: String,
pub recoverable: bool,
pub context: BTreeMap<String, String>,
}
#[derive(Debug, Clone, PartialEq)]
@@ -241,6 +247,7 @@ pub struct McpToolDiscoveryReport {
pub tools: Vec<ManagedMcpTool>,
pub failed_servers: Vec<McpDiscoveryFailure>,
pub unsupported_servers: Vec<UnsupportedMcpServer>,
pub degraded_startup: Option<McpDegradedReport>,
}
#[derive(Debug)]
@@ -339,6 +346,111 @@ impl From<io::Error> for McpServerManagerError {
}
}
impl McpServerManagerError {
fn lifecycle_phase(&self) -> McpLifecyclePhase {
match self {
Self::Io(_) => McpLifecyclePhase::SpawnConnect,
Self::Transport { method, .. }
| Self::JsonRpc { method, .. }
| Self::InvalidResponse { method, .. }
| Self::Timeout { method, .. } => lifecycle_phase_for_method(method),
Self::UnknownTool { .. } => McpLifecyclePhase::ToolDiscovery,
Self::UnknownServer { .. } => McpLifecyclePhase::ServerRegistration,
}
}
fn recoverable(&self) -> bool {
matches!(self, Self::Transport { .. } | Self::Timeout { .. })
}
fn discovery_failure(&self, server_name: &str) -> McpDiscoveryFailure {
let phase = self.lifecycle_phase();
let recoverable = self.recoverable();
let context = self.error_context();
McpDiscoveryFailure {
server_name: server_name.to_string(),
phase,
error: self.to_string(),
recoverable,
context,
}
}
fn error_context(&self) -> BTreeMap<String, String> {
match self {
Self::Io(error) => BTreeMap::from([("kind".to_string(), error.kind().to_string())]),
Self::Transport {
server_name,
method,
source,
} => BTreeMap::from([
("server".to_string(), server_name.clone()),
("method".to_string(), (*method).to_string()),
("io_kind".to_string(), source.kind().to_string()),
]),
Self::JsonRpc {
server_name,
method,
error,
} => BTreeMap::from([
("server".to_string(), server_name.clone()),
("method".to_string(), (*method).to_string()),
("jsonrpc_code".to_string(), error.code.to_string()),
]),
Self::InvalidResponse {
server_name,
method,
details,
} => BTreeMap::from([
("server".to_string(), server_name.clone()),
("method".to_string(), (*method).to_string()),
("details".to_string(), details.clone()),
]),
Self::Timeout {
server_name,
method,
timeout_ms,
} => BTreeMap::from([
("server".to_string(), server_name.clone()),
("method".to_string(), (*method).to_string()),
("timeout_ms".to_string(), timeout_ms.to_string()),
]),
Self::UnknownTool { qualified_name } => BTreeMap::from([(
"qualified_tool".to_string(),
qualified_name.clone(),
)]),
Self::UnknownServer { server_name } => {
BTreeMap::from([("server".to_string(), server_name.clone())])
}
}
}
}
fn lifecycle_phase_for_method(method: &str) -> McpLifecyclePhase {
match method {
"initialize" => McpLifecyclePhase::InitializeHandshake,
"tools/list" => McpLifecyclePhase::ToolDiscovery,
"resources/list" => McpLifecyclePhase::ResourceDiscovery,
"resources/read" | "tools/call" => McpLifecyclePhase::Invocation,
_ => McpLifecyclePhase::ErrorSurfacing,
}
}
fn unsupported_server_failed_server(server: &UnsupportedMcpServer) -> McpFailedServer {
McpFailedServer {
server_name: server.server_name.clone(),
phase: McpLifecyclePhase::ServerRegistration,
error: McpErrorSurface::new(
McpLifecyclePhase::ServerRegistration,
Some(server.server_name.clone()),
server.reason.clone(),
BTreeMap::from([("transport".to_string(), format!("{:?}", server.transport))]),
false,
),
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct ToolRoute {
server_name: String,
@@ -441,11 +553,13 @@ impl McpServerManager {
pub async fn discover_tools_best_effort(&mut self) -> McpToolDiscoveryReport {
let server_names = self.server_names();
let mut discovered_tools = Vec::new();
let mut working_servers = Vec::new();
let mut failed_servers = Vec::new();
for server_name in server_names {
match self.discover_tools_for_server(&server_name).await {
Ok(server_tools) => {
working_servers.push(server_name.clone());
self.clear_routes_for_server(&server_name);
for tool in server_tools {
self.tool_index.insert(
@@ -460,18 +574,48 @@ impl McpServerManager {
}
Err(error) => {
self.clear_routes_for_server(&server_name);
failed_servers.push(McpDiscoveryFailure {
server_name,
error: error.to_string(),
});
failed_servers.push(error.discovery_failure(&server_name));
}
}
}
let degraded_failed_servers = failed_servers
.iter()
.map(|failure| McpFailedServer {
server_name: failure.server_name.clone(),
phase: failure.phase,
error: McpErrorSurface::new(
failure.phase,
Some(failure.server_name.clone()),
failure.error.clone(),
failure.context.clone(),
failure.recoverable,
),
})
.chain(
self.unsupported_servers
.iter()
.map(unsupported_server_failed_server),
)
.collect::<Vec<_>>();
let degraded_startup = (!working_servers.is_empty() && !degraded_failed_servers.is_empty())
.then(|| {
McpDegradedReport::new(
working_servers,
degraded_failed_servers,
discovered_tools
.iter()
.map(|tool| tool.qualified_name.clone())
.collect(),
Vec::new(),
)
});
McpToolDiscoveryReport {
tools: discovered_tools,
failed_servers,
unsupported_servers: self.unsupported_servers.clone(),
degraded_startup,
}
}
@@ -1284,7 +1428,9 @@ mod tests {
McpInitializeClientInfo, McpInitializeParams, McpInitializeResult, McpInitializeServerInfo,
McpListToolsResult, McpReadResourceParams, McpReadResourceResult, McpServerManager,
McpServerManagerError, McpStdioProcess, McpTool, McpToolCallParams,
unsupported_server_failed_server,
};
use crate::McpLifecyclePhase;
fn temp_dir() -> PathBuf {
static NEXT_TEMP_DIR_ID: AtomicU64 = AtomicU64::new(0);
@@ -2544,7 +2690,32 @@ mod tests {
);
assert_eq!(report.failed_servers.len(), 1);
assert_eq!(report.failed_servers[0].server_name, "broken");
assert_eq!(
report.failed_servers[0].phase,
McpLifecyclePhase::InitializeHandshake
);
assert!(!report.failed_servers[0].recoverable);
assert_eq!(
report.failed_servers[0].context.get("method").map(String::as_str),
Some("initialize")
);
assert!(report.failed_servers[0].error.contains("initialize"));
let degraded = report
.degraded_startup
.as_ref()
.expect("partial startup should surface degraded report");
assert_eq!(degraded.working_servers, vec!["alpha".to_string()]);
assert_eq!(degraded.failed_servers.len(), 1);
assert_eq!(degraded.failed_servers[0].server_name, "broken");
assert_eq!(
degraded.failed_servers[0].phase,
McpLifecyclePhase::InitializeHandshake
);
assert_eq!(
degraded.available_tools,
vec![mcp_tool_name("alpha", "echo")]
);
assert!(degraded.missing_tools.is_empty());
let response = manager
.call_tool(&mcp_tool_name("alpha", "echo"), Some(json!({"text": "ok"})))
@@ -2608,6 +2779,10 @@ mod tests {
assert_eq!(unsupported[0].server_name, "http");
assert_eq!(unsupported[1].server_name, "sdk");
assert_eq!(unsupported[2].server_name, "ws");
assert_eq!(
unsupported_server_failed_server(&unsupported[0]).phase,
McpLifecyclePhase::ServerRegistration
);
}
#[test]

View File

@@ -1601,6 +1601,7 @@ struct RuntimeMcpState {
runtime: tokio::runtime::Runtime,
manager: McpServerManager,
pending_servers: Vec<String>,
degraded_report: Option<runtime::McpDegradedReport>,
}
struct BuiltRuntime {
@@ -1731,12 +1732,63 @@ impl RuntimeMcpState {
.collect::<BTreeSet<_>>()
.into_iter()
.collect::<Vec<_>>();
let available_tools = discovery
.tools
.iter()
.map(|tool| tool.qualified_name.clone())
.collect::<Vec<_>>();
let failed_server_names = pending_servers.iter().cloned().collect::<BTreeSet<_>>();
let working_servers = manager
.server_names()
.into_iter()
.filter(|server_name| !failed_server_names.contains(server_name))
.collect::<Vec<_>>();
let failed_servers = discovery
.failed_servers
.iter()
.map(|failure| runtime::McpFailedServer {
server_name: failure.server_name.clone(),
phase: runtime::McpLifecyclePhase::ToolDiscovery,
error: runtime::McpErrorSurface::new(
runtime::McpLifecyclePhase::ToolDiscovery,
Some(failure.server_name.clone()),
failure.error.clone(),
std::collections::BTreeMap::new(),
true,
),
})
.chain(discovery.unsupported_servers.iter().map(|server| {
runtime::McpFailedServer {
server_name: server.server_name.clone(),
phase: runtime::McpLifecyclePhase::ServerRegistration,
error: runtime::McpErrorSurface::new(
runtime::McpLifecyclePhase::ServerRegistration,
Some(server.server_name.clone()),
server.reason.clone(),
std::collections::BTreeMap::from([(
"transport".to_string(),
format!("{:?}", server.transport).to_ascii_lowercase(),
)]),
false,
),
}
}))
.collect::<Vec<_>>();
let degraded_report = (!failed_servers.is_empty()).then(|| {
runtime::McpDegradedReport::new(
working_servers,
failed_servers,
available_tools.clone(),
available_tools,
)
});
Ok(Some((
Self {
runtime,
manager,
pending_servers,
degraded_report,
},
discovery,
)))
@@ -1751,6 +1803,10 @@ impl RuntimeMcpState {
(!self.pending_servers.is_empty()).then(|| self.pending_servers.clone())
}
fn degraded_report(&self) -> Option<runtime::McpDegradedReport> {
self.degraded_report.clone()
}
fn server_names(&self) -> Vec<String> {
self.manager.server_names()
}
@@ -5286,16 +5342,21 @@ impl CliToolExecutor {
fn execute_search_tool(&self, value: serde_json::Value) -> Result<String, ToolError> {
let input: ToolSearchRequest = serde_json::from_value(value)
.map_err(|error| ToolError::new(format!("invalid tool input JSON: {error}")))?;
let pending_mcp_servers = self.mcp_state.as_ref().and_then(|state| {
state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.pending_servers()
});
let (pending_mcp_servers, mcp_degraded) = self
.mcp_state
.as_ref()
.map(|state| {
let state = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
(state.pending_servers(), state.degraded_report())
})
.unwrap_or((None, None));
serde_json::to_string_pretty(&self.tool_registry.search(
&input.query,
input.max_results.unwrap_or(5),
pending_mcp_servers,
mcp_degraded,
))
.map_err(|error| ToolError::new(error.to_string()))
}
@@ -7367,6 +7428,18 @@ UU conflicted.rs",
serde_json::from_str(&search_output).expect("search output should be json");
assert_eq!(search_json["matches"][0], "mcp__alpha__echo");
assert_eq!(search_json["pending_mcp_servers"][0], "broken");
assert_eq!(
search_json["mcp_degraded"]["failed_servers"][0]["server_name"],
"broken"
);
assert_eq!(
search_json["mcp_degraded"]["failed_servers"][0]["phase"],
"tool_discovery"
);
assert_eq!(
search_json["mcp_degraded"]["available_tools"][0],
"mcp__alpha__echo"
);
let listed = executor
.execute("ListMcpResourcesTool", r#"{"server":"alpha"}"#)
@@ -7400,6 +7473,54 @@ UU conflicted.rs",
let _ = fs::remove_dir_all(workspace);
}
#[test]
fn build_runtime_plugin_state_surfaces_unsupported_mcp_servers_structurally() {
let config_home = temp_dir();
let workspace = temp_dir();
fs::create_dir_all(&config_home).expect("config home");
fs::create_dir_all(&workspace).expect("workspace");
fs::write(
config_home.join("settings.json"),
r#"{
"mcpServers": {
"remote": {
"url": "https://example.test/mcp"
}
}
}"#,
)
.expect("write mcp settings");
let loader = ConfigLoader::new(&workspace, &config_home);
let runtime_config = loader.load().expect("runtime config should load");
let state = build_runtime_plugin_state_with_loader(&workspace, &loader, &runtime_config)
.expect("runtime plugin state should load");
let mut executor =
CliToolExecutor::new(None, false, state.tool_registry.clone(), state.mcp_state.clone());
let search_output = executor
.execute("ToolSearch", r#"{"query":"remote","max_results":5}"#)
.expect("tool search should execute");
let search_json: serde_json::Value =
serde_json::from_str(&search_output).expect("search output should be json");
assert_eq!(search_json["pending_mcp_servers"][0], "remote");
assert_eq!(
search_json["mcp_degraded"]["failed_servers"][0]["server_name"],
"remote"
);
assert_eq!(
search_json["mcp_degraded"]["failed_servers"][0]["phase"],
"server_registration"
);
assert_eq!(
search_json["mcp_degraded"]["failed_servers"][0]["error"]["context"]["transport"],
"http"
);
let _ = fs::remove_dir_all(config_home);
let _ = fs::remove_dir_all(workspace);
}
#[test]
fn build_runtime_runs_plugin_lifecycle_init_and_shutdown() {
let config_home = temp_dir();

View File

@@ -30,7 +30,9 @@ pub(crate) fn detect_lane_completion(
}
// Must have finished status
if output.status != "Finished" {
if !output.status.eq_ignore_ascii_case("completed")
&& !output.status.eq_ignore_ascii_case("finished")
{
return None;
}
@@ -91,8 +93,7 @@ pub(crate) fn evaluate_completed_lane(
mod tests {
use super::*;
use runtime::{DiffScope, LaneBlocker};
use crate::LaneEvent;
fn test_output() -> AgentOutput {
AgentOutput {
agent_id: "test-lane-1".to_string(),
@@ -176,4 +177,4 @@ mod tests {
assert!(actions.contains(&PolicyAction::CloseoutLane));
assert!(actions.contains(&PolicyAction::CleanupSession));
}
}
}

View File

@@ -22,8 +22,9 @@ use runtime::{
worker_boot::{WorkerReadySnapshot, WorkerRegistry},
write_file, ApiClient, ApiRequest, AssistantEvent, BashCommandInput, BashCommandOutput,
BranchFreshness, ContentBlock, ConversationMessage, ConversationRuntime, GrepSearchInput,
MessageRole, PermissionMode, PermissionPolicy, PromptCacheEvent, RuntimeError, Session,
ToolError, ToolExecutor,
LaneEvent, LaneEventBlocker, LaneEventName, LaneEventStatus, LaneFailureClass,
McpDegradedReport, MessageRole, PermissionMode, PermissionPolicy, PromptCacheEvent,
RuntimeError, Session, ToolError, ToolExecutor,
};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
@@ -313,6 +314,7 @@ impl GlobalToolRegistry {
query: &str,
max_results: usize,
pending_mcp_servers: Option<Vec<String>>,
mcp_degraded: Option<McpDegradedReport>,
) -> ToolSearchOutput {
let query = query.trim().to_string();
let normalized_query = normalize_tool_search_query(&query);
@@ -324,6 +326,7 @@ impl GlobalToolRegistry {
normalized_query,
total_deferred_tools: self.searchable_tool_specs().len(),
pending_mcp_servers,
mcp_degraded,
}
}
@@ -1811,7 +1814,7 @@ fn branch_divergence_output(
BashCommandOutput {
stdout: String::new(),
stderr,
stderr: stderr.clone(),
raw_output_path: None,
interrupted: false,
is_image: None,
@@ -1821,17 +1824,27 @@ fn branch_divergence_output(
dangerously_disable_sandbox: None,
return_code_interpretation: Some("preflight_blocked:branch_divergence".to_string()),
no_output_expected: Some(false),
structured_content: Some(vec![json!({
"event": "branch.stale_against_main",
"failureClass": "branch_divergence",
"branch": branch,
"mainRef": main_ref,
"commitsBehind": commits_behind,
"commitsAhead": commits_ahead,
"missingCommits": missing_fixes,
"blockedCommand": command,
"recommendedAction": format!("merge or rebase {main_ref} before workspace tests")
})]),
structured_content: Some(vec![
serde_json::to_value(
LaneEvent::new(
LaneEventName::BranchStaleAgainstMain,
LaneEventStatus::Blocked,
iso8601_now(),
)
.with_failure_class(LaneFailureClass::BranchDivergence)
.with_detail(stderr.clone())
.with_data(json!({
"branch": branch,
"mainRef": main_ref,
"commitsBehind": commits_behind,
"commitsAhead": commits_ahead,
"missingCommits": missing_fixes,
"blockedCommand": command,
"recommendedAction": format!("merge or rebase {main_ref} before workspace tests")
})),
)
.expect("lane event should serialize"),
]),
persisted_output_path: None,
persisted_output_size: None,
sandbox_status: None,
@@ -2277,58 +2290,6 @@ struct SkillOutput {
prompt: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
enum LaneEventName {
#[serde(rename = "lane.started")]
Started,
#[serde(rename = "lane.blocked")]
Blocked,
#[serde(rename = "lane.finished")]
Finished,
#[serde(rename = "lane.failed")]
Failed,
#[serde(rename = "lane.reconciled")]
Reconciled,
#[serde(rename = "lane.merged")]
Merged,
#[serde(rename = "lane.superseded")]
Superseded,
#[serde(rename = "lane.closed")]
Closed,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
enum LaneFailureClass {
PromptDelivery,
TrustGate,
BranchDivergence,
Compile,
Test,
PluginStartup,
McpStartup,
Infra,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct LaneBlocker {
#[serde(rename = "failureClass")]
failure_class: LaneFailureClass,
detail: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct LaneEvent {
event: LaneEventName,
status: String,
#[serde(rename = "emittedAt")]
emitted_at: String,
#[serde(rename = "failureClass", skip_serializing_if = "Option::is_none")]
failure_class: Option<LaneFailureClass>,
#[serde(skip_serializing_if = "Option::is_none")]
detail: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct AgentOutput {
#[serde(rename = "agentId")]
@@ -2352,7 +2313,7 @@ struct AgentOutput {
#[serde(rename = "laneEvents", default, skip_serializing_if = "Vec::is_empty")]
lane_events: Vec<LaneEvent>,
#[serde(rename = "currentBlocker", skip_serializing_if = "Option::is_none")]
current_blocker: Option<LaneBlocker>,
current_blocker: Option<LaneEventBlocker>,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
}
@@ -2374,6 +2335,8 @@ pub struct ToolSearchOutput {
total_deferred_tools: usize,
#[serde(rename = "pending_mcp_servers")]
pending_mcp_servers: Option<Vec<String>>,
#[serde(rename = "mcp_degraded", skip_serializing_if = "Option::is_none")]
mcp_degraded: Option<McpDegradedReport>,
}
#[derive(Debug, Serialize)]
@@ -3064,13 +3027,7 @@ where
created_at: created_at.clone(),
started_at: Some(created_at),
completed_at: None,
lane_events: vec![LaneEvent {
event: LaneEventName::Started,
status: String::from("running"),
emitted_at: iso8601_now(),
failure_class: None,
detail: None,
}],
lane_events: vec![LaneEvent::started(iso8601_now())],
current_blocker: None,
error: None,
};
@@ -3286,32 +3243,20 @@ fn persist_agent_terminal_state(
next_manifest.current_blocker = blocker.clone();
next_manifest.error = error;
if let Some(blocker) = blocker {
next_manifest.lane_events.push(LaneEvent {
event: LaneEventName::Blocked,
status: status.to_string(),
emitted_at: iso8601_now(),
failure_class: Some(blocker.failure_class.clone()),
detail: Some(blocker.detail.clone()),
});
next_manifest.lane_events.push(LaneEvent {
event: LaneEventName::Failed,
status: status.to_string(),
emitted_at: iso8601_now(),
failure_class: Some(blocker.failure_class),
detail: Some(blocker.detail),
});
next_manifest.lane_events.push(
LaneEvent::blocked(iso8601_now(), &blocker),
);
next_manifest.lane_events.push(
LaneEvent::failed(iso8601_now(), &blocker),
);
} else {
next_manifest.current_blocker = None;
let compressed_detail = result
.filter(|value| !value.trim().is_empty())
.map(|value| compress_summary_text(value.trim()));
next_manifest.lane_events.push(LaneEvent {
event: LaneEventName::Finished,
status: status.to_string(),
emitted_at: iso8601_now(),
failure_class: None,
detail: compressed_detail,
});
next_manifest
.lane_events
.push(LaneEvent::finished(iso8601_now(), compressed_detail));
}
write_agent_manifest(&next_manifest)
}
@@ -3330,7 +3275,7 @@ fn append_agent_output(path: &str, suffix: &str) -> Result<(), String> {
fn format_agent_terminal_output(
status: &str,
result: Option<&str>,
blocker: Option<&LaneBlocker>,
blocker: Option<&LaneEventBlocker>,
error: Option<&str>,
) -> String {
let mut sections = vec![format!("\n## Result\n\n- status: {status}\n")];
@@ -3352,9 +3297,9 @@ fn format_agent_terminal_output(
sections.join("")
}
fn classify_lane_blocker(error: &str) -> LaneBlocker {
fn classify_lane_blocker(error: &str) -> LaneEventBlocker {
let detail = error.trim().to_string();
LaneBlocker {
LaneEventBlocker {
failure_class: classify_lane_failure(error),
detail,
}
@@ -3371,6 +3316,8 @@ fn classify_lane_failure(error: &str) -> LaneFailureClass {
&& (normalized.contains("stale") || normalized.contains("diverg"))
{
LaneFailureClass::BranchDivergence
} else if normalized.contains("gateway") || normalized.contains("routing") {
LaneFailureClass::GatewayRouting
} else if normalized.contains("compile")
|| normalized.contains("build failed")
|| normalized.contains("cargo check")
@@ -3378,8 +3325,15 @@ fn classify_lane_failure(error: &str) -> LaneFailureClass {
LaneFailureClass::Compile
} else if normalized.contains("test") {
LaneFailureClass::Test
} else if normalized.contains("tool failed")
|| normalized.contains("runtime tool")
|| normalized.contains("tool runtime")
{
LaneFailureClass::ToolRuntime
} else if normalized.contains("plugin") {
LaneFailureClass::PluginStartup
} else if normalized.contains("mcp") && normalized.contains("handshake") {
LaneFailureClass::McpHandshake
} else if normalized.contains("mcp") {
LaneFailureClass::McpStartup
} else {
@@ -3688,7 +3642,7 @@ fn final_assistant_text(summary: &runtime::TurnSummary) -> String {
#[allow(clippy::needless_pass_by_value)]
fn execute_tool_search(input: ToolSearchInput) -> ToolSearchOutput {
GlobalToolRegistry::builtin().search(&input.query, input.max_results.unwrap_or(5), None)
GlobalToolRegistry::builtin().search(&input.query, input.max_results.unwrap_or(5), None, None)
}
fn deferred_tool_specs() -> Vec<ToolSpec> {
@@ -4944,7 +4898,7 @@ mod tests {
agent_permission_policy, allowed_tools_for_subagent, classify_lane_failure,
execute_agent_with_spawn, execute_tool, final_assistant_text, mvp_tool_specs,
permission_mode_from_plugin, persist_agent_terminal_state, push_output_block, AgentInput,
AgentJob, GlobalToolRegistry, LaneFailureClass, SubagentToolExecutor,
AgentJob, GlobalToolRegistry, LaneEventName, LaneFailureClass, SubagentToolExecutor,
};
use api::OutputContentBlock;
use runtime::{
@@ -5264,10 +5218,34 @@ mod tests {
)]
);
let search = registry.search("demo echo", 5, Some(vec!["pending-server".to_string()]));
let search = registry.search(
"demo echo",
5,
Some(vec!["pending-server".to_string()]),
Some(runtime::McpDegradedReport::new(
vec!["demo".to_string()],
vec![runtime::McpFailedServer {
server_name: "pending-server".to_string(),
phase: runtime::McpLifecyclePhase::ToolDiscovery,
error: runtime::McpErrorSurface::new(
runtime::McpLifecyclePhase::ToolDiscovery,
Some("pending-server".to_string()),
"tool discovery failed",
BTreeMap::new(),
true,
),
}],
vec!["mcp__demo__echo".to_string()],
vec!["mcp__demo__echo".to_string()],
)),
);
let output = serde_json::to_value(search).expect("search output should serialize");
assert_eq!(output["matches"][0], "mcp__demo__echo");
assert_eq!(output["pending_mcp_servers"][0], "pending-server");
assert_eq!(
output["mcp_degraded"]["failed_servers"][0]["phase"],
"tool_discovery"
);
}
#[test]
@@ -5917,16 +5895,16 @@ mod tests {
),
("targeted tests failed", LaneFailureClass::Test),
("plugin bootstrap failed", LaneFailureClass::PluginStartup),
("mcp handshake timed out", LaneFailureClass::McpStartup),
("mcp handshake timed out", LaneFailureClass::McpHandshake),
(
"mcp startup failed before listing tools",
LaneFailureClass::McpStartup,
),
(
"gateway routing rejected the request",
LaneFailureClass::Infra,
LaneFailureClass::GatewayRouting,
),
("denied tool execution from hook", LaneFailureClass::Infra),
("tool failed: denied tool execution from hook", LaneFailureClass::ToolRuntime),
("thread creation failed", LaneFailureClass::Infra),
];
@@ -5935,6 +5913,28 @@ mod tests {
}
}
#[test]
fn lane_event_schema_serializes_to_canonical_names() {
let cases = [
(LaneEventName::Started, "lane.started"),
(LaneEventName::Ready, "lane.ready"),
(LaneEventName::PromptMisdelivery, "lane.prompt_misdelivery"),
(LaneEventName::Blocked, "lane.blocked"),
(LaneEventName::Red, "lane.red"),
(LaneEventName::Green, "lane.green"),
(LaneEventName::CommitCreated, "lane.commit.created"),
(LaneEventName::PrOpened, "lane.pr.opened"),
(LaneEventName::MergeReady, "lane.merge.ready"),
(LaneEventName::Finished, "lane.finished"),
(LaneEventName::Failed, "lane.failed"),
(LaneEventName::BranchStaleAgainstMain, "branch.stale_against_main"),
];
for (event, expected) in cases {
assert_eq!(serde_json::to_value(event).expect("serialize lane event"), json!(expected));
}
}
#[test]
fn agent_tool_subset_mapping_is_expected() {
let general = allowed_tools_for_subagent("general-purpose");
@@ -6258,7 +6258,7 @@ mod tests {
"branch_divergence"
);
assert_eq!(
output_json["structuredContent"][0]["missingCommits"][0],
output_json["structuredContent"][0]["data"]["missingCommits"][0],
"fix: unblock workspace tests"
);