From 51707183067dedb6af06dd37ab6438020ed42eb1 Mon Sep 17 00:00:00 2001 From: Yeachan-Heo Date: Wed, 1 Apr 2026 04:30:29 +0000 Subject: [PATCH 1/5] wip: telemetry progress --- rust/Cargo.lock | 9 + rust/crates/api/Cargo.toml | 1 + rust/crates/api/src/client.rs | 182 ++++++- rust/crates/api/src/lib.rs | 6 + rust/crates/api/tests/client_integration.rs | 93 ++++ rust/crates/runtime/Cargo.toml | 1 + rust/crates/runtime/src/conversation.rs | 169 ++++++- rust/crates/runtime/src/hooks.rs | 66 +-- rust/crates/rusty-claude-cli/src/main.rs | 77 ++- rust/crates/rusty-claude-cli/src/render.rs | 4 +- rust/crates/telemetry/Cargo.toml | 13 + rust/crates/telemetry/src/lib.rs | 509 ++++++++++++++++++++ 12 files changed, 1060 insertions(+), 70 deletions(-) create mode 100644 rust/crates/telemetry/Cargo.toml create mode 100644 rust/crates/telemetry/src/lib.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 5507dca..b4dbf7e 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -25,6 +25,7 @@ dependencies = [ "runtime", "serde", "serde_json", + "telemetry", "tokio", ] @@ -1428,6 +1429,14 @@ dependencies = [ "yaml-rust", ] +[[package]] +name = "telemetry" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "thiserror" version = "2.0.18" diff --git a/rust/crates/api/Cargo.toml b/rust/crates/api/Cargo.toml index c5e152e..d3c4115 100644 --- a/rust/crates/api/Cargo.toml +++ b/rust/crates/api/Cargo.toml @@ -10,6 +10,7 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "rus runtime = { path = "../runtime" } serde = { version = "1", features = ["derive"] } serde_json = "1" +telemetry = { path = "../telemetry" } tokio = { version = "1", features = ["io-util", "macros", "net", "rt-multi-thread", "time"] } [lints] diff --git a/rust/crates/api/src/client.rs b/rust/crates/api/src/client.rs index 7ef7e83..d7dacbe 100644 --- a/rust/crates/api/src/client.rs +++ b/rust/crates/api/src/client.rs @@ -6,13 +6,15 @@ use runtime::{ OAuthTokenExchangeRequest, }; use serde::Deserialize; +use serde_json::{Map, Value}; +use telemetry::{AnthropicRequestProfile, ClientIdentity, SessionTracer}; use crate::error::ApiError; use crate::sse::SseParser; use crate::types::{MessageRequest, MessageResponse, StreamEvent}; const DEFAULT_BASE_URL: &str = "https://api.anthropic.com"; -const ANTHROPIC_VERSION: &str = "2023-06-01"; +const MESSAGES_PATH: &str = "/v1/messages"; const REQUEST_ID_HEADER: &str = "request-id"; const ALT_REQUEST_ID_HEADER: &str = "x-request-id"; const DEFAULT_INITIAL_BACKOFF: Duration = Duration::from_millis(200); @@ -108,6 +110,8 @@ pub struct AnthropicClient { max_retries: u32, initial_backoff: Duration, max_backoff: Duration, + request_profile: AnthropicRequestProfile, + session_tracer: Option, } impl AnthropicClient { @@ -120,6 +124,8 @@ impl AnthropicClient { max_retries: DEFAULT_MAX_RETRIES, initial_backoff: DEFAULT_INITIAL_BACKOFF, max_backoff: DEFAULT_MAX_BACKOFF, + request_profile: AnthropicRequestProfile::default(), + session_tracer: None, } } @@ -132,6 +138,8 @@ impl AnthropicClient { max_retries: DEFAULT_MAX_RETRIES, initial_backoff: DEFAULT_INITIAL_BACKOFF, max_backoff: DEFAULT_MAX_BACKOFF, + request_profile: AnthropicRequestProfile::default(), + session_tracer: None, } } @@ -176,6 +184,39 @@ impl AnthropicClient { self } + #[must_use] + pub fn with_request_profile(mut self, request_profile: AnthropicRequestProfile) -> Self { + self.request_profile = request_profile; + self + } + + #[must_use] + pub fn with_client_identity(mut self, client_identity: ClientIdentity) -> Self { + self.request_profile.client_identity = client_identity; + self + } + + #[must_use] + pub fn with_beta(mut self, beta: impl Into) -> Self { + let beta = beta.into(); + if !self.request_profile.betas.contains(&beta) { + self.request_profile.betas.push(beta); + } + self + } + + #[must_use] + pub fn with_extra_body_param(mut self, key: impl Into, value: Value) -> Self { + self.request_profile.extra_body.insert(key.into(), value); + self + } + + #[must_use] + pub fn with_session_tracer(mut self, session_tracer: SessionTracer) -> Self { + self.session_tracer = Some(session_tracer); + self + } + #[must_use] pub fn with_retry_policy( mut self, @@ -279,18 +320,30 @@ impl AnthropicClient { loop { attempts += 1; + self.record_request_started(request, attempts); match self.send_raw_request(request).await { Ok(response) => match expect_success(response).await { - Ok(response) => return Ok(response), + Ok(response) => { + self.record_request_succeeded(request, attempts, &response); + return Ok(response); + } Err(error) if error.is_retryable() && attempts <= self.max_retries + 1 => { + self.record_request_failed(request, attempts, &error); last_error = Some(error); } - Err(error) => return Err(error), + Err(error) => { + self.record_request_failed(request, attempts, &error); + return Err(error); + } }, Err(error) if error.is_retryable() && attempts <= self.max_retries + 1 => { + self.record_request_failed(request, attempts, &error); last_error = Some(error); } - Err(error) => return Err(error), + Err(error) => { + self.record_request_failed(request, attempts, &error); + return Err(error); + } } if attempts > self.max_retries { @@ -310,18 +363,131 @@ impl AnthropicClient { &self, request: &MessageRequest, ) -> Result { - let request_url = format!("{}/v1/messages", self.base_url.trim_end_matches('/')); - let request_builder = self + let request_url = format!("{}{}", self.base_url.trim_end_matches('/'), MESSAGES_PATH); + let mut request_builder = self .http .post(&request_url) - .header("anthropic-version", ANTHROPIC_VERSION) .header("content-type", "application/json"); + for (name, value) in self.request_profile.header_pairs() { + request_builder = request_builder.header(name, value); + } let mut request_builder = self.auth.apply(request_builder); - request_builder = request_builder.json(request); + let request_body = self.request_profile.render_json_body(request)?; + request_builder = request_builder.json(&request_body); request_builder.send().await.map_err(ApiError::from) } + fn record_request_started(&self, request: &MessageRequest, attempt: u32) { + if let Some(tracer) = &self.session_tracer { + tracer.record_http_request_started( + attempt, + "POST", + MESSAGES_PATH, + self.request_attributes(request), + ); + } + } + + fn record_request_succeeded( + &self, + request: &MessageRequest, + attempt: u32, + response: &reqwest::Response, + ) { + if let Some(tracer) = &self.session_tracer { + tracer.record_http_request_succeeded( + attempt, + "POST", + MESSAGES_PATH, + response.status().as_u16(), + request_id_from_headers(response.headers()), + self.request_attributes(request), + ); + } + } + + fn record_request_failed(&self, request: &MessageRequest, attempt: u32, error: &ApiError) { + if let Some(tracer) = &self.session_tracer { + tracer.record_http_request_failed( + attempt, + "POST", + MESSAGES_PATH, + error.to_string(), + error.is_retryable(), + self.error_attributes(request, error), + ); + } + } + + fn request_attributes(&self, request: &MessageRequest) -> Map { + let mut attributes = Map::new(); + attributes.insert("model".to_string(), Value::String(request.model.clone())); + attributes.insert("stream".to_string(), Value::Bool(request.stream)); + attributes.insert("max_tokens".to_string(), Value::from(request.max_tokens)); + attributes.insert( + "message_count".to_string(), + Value::from(u64::try_from(request.messages.len()).unwrap_or(u64::MAX)), + ); + attributes.insert( + "tool_count".to_string(), + Value::from( + u64::try_from(request.tools.as_ref().map_or(0, Vec::len)).unwrap_or(u64::MAX), + ), + ); + attributes.insert( + "beta_count".to_string(), + Value::from(u64::try_from(self.request_profile.betas.len()).unwrap_or(u64::MAX)), + ); + if !self.request_profile.extra_body.is_empty() { + attributes.insert( + "extra_body_keys".to_string(), + Value::Array( + self.request_profile + .extra_body + .keys() + .cloned() + .map(Value::String) + .collect(), + ), + ); + } + attributes + } + + fn error_attributes(&self, request: &MessageRequest, error: &ApiError) -> Map { + let mut attributes = self.request_attributes(request); + match error { + ApiError::Api { + status, + error_type, + message, + .. + } => { + attributes.insert("status".to_string(), Value::from(status.as_u16())); + if let Some(error_type) = error_type { + attributes.insert("error_type".to_string(), Value::String(error_type.clone())); + } + if let Some(message) = message { + attributes.insert("api_message".to_string(), Value::String(message.clone())); + } + } + ApiError::Http(_) => { + attributes.insert("error_type".to_string(), Value::String("http".to_string())); + } + ApiError::Json(_) => { + attributes.insert("error_type".to_string(), Value::String("json".to_string())); + } + _ => { + attributes.insert( + "error_type".to_string(), + Value::String("client".to_string()), + ); + } + } + attributes + } + fn backoff_for_attempt(&self, attempt: u32) -> Result { let Some(multiplier) = 1_u32.checked_shl(attempt.saturating_sub(1)) else { return Err(ApiError::BackoffOverflow { diff --git a/rust/crates/api/src/lib.rs b/rust/crates/api/src/lib.rs index 4108187..9c3b688 100644 --- a/rust/crates/api/src/lib.rs +++ b/rust/crates/api/src/lib.rs @@ -15,3 +15,9 @@ pub use types::{ MessageResponse, MessageStartEvent, MessageStopEvent, OutputContentBlock, StreamEvent, ToolChoice, ToolDefinition, ToolResultContentBlock, Usage, }; + +pub use telemetry::{ + AnalyticsEvent, AnthropicRequestProfile, ClientIdentity, JsonlTelemetrySink, + MemoryTelemetrySink, SessionTraceRecord, SessionTracer, TelemetryEvent, TelemetrySink, + DEFAULT_ANTHROPIC_VERSION, +}; diff --git a/rust/crates/api/tests/client_integration.rs b/rust/crates/api/tests/client_integration.rs index c37fa99..d95dba8 100644 --- a/rust/crates/api/tests/client_integration.rs +++ b/rust/crates/api/tests/client_integration.rs @@ -8,6 +8,7 @@ use api::{ StreamEvent, ToolChoice, ToolDefinition, }; use serde_json::json; +use telemetry::{ClientIdentity, MemoryTelemetrySink, SessionTracer, TelemetryEvent}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; use tokio::sync::Mutex; @@ -64,6 +65,14 @@ async fn send_message_posts_json_and_parses_response() { request.headers.get("authorization").map(String::as_str), Some("Bearer proxy-token") ); + assert_eq!( + request.headers.get("anthropic-version").map(String::as_str), + Some("2023-06-01") + ); + assert_eq!( + request.headers.get("user-agent").map(String::as_str), + Some("clawd-code/0.1.0 (rust)") + ); let body: serde_json::Value = serde_json::from_str(&request.body).expect("request body should be json"); assert_eq!( @@ -75,6 +84,90 @@ async fn send_message_posts_json_and_parses_response() { assert_eq!(body["tool_choice"]["type"], json!("auto")); } +#[tokio::test] +async fn send_message_applies_request_profile_and_records_telemetry() { + let state = Arc::new(Mutex::new(Vec::::new())); + let server = spawn_server( + state.clone(), + vec![http_response_with_headers( + "200 OK", + "application/json", + concat!( + "{", + "\"id\":\"msg_profile\",", + "\"type\":\"message\",", + "\"role\":\"assistant\",", + "\"content\":[{\"type\":\"text\",\"text\":\"ok\"}],", + "\"model\":\"claude-3-7-sonnet-latest\",", + "\"stop_reason\":\"end_turn\",", + "\"stop_sequence\":null,", + "\"usage\":{\"input_tokens\":1,\"output_tokens\":1}", + "}" + ), + &[("request-id", "req_profile_123")], + )], + ) + .await; + let sink = Arc::new(MemoryTelemetrySink::default()); + + let client = AnthropicClient::new("test-key") + .with_base_url(server.base_url()) + .with_client_identity(ClientIdentity::new("clawd-code", "9.9.9").with_runtime("rust-cli")) + .with_beta("tools-2026-04-01") + .with_extra_body_param("metadata", json!({"source": "clawd-code"})) + .with_session_tracer(SessionTracer::new("session-telemetry", sink.clone())); + + let response = client + .send_message(&sample_request(false)) + .await + .expect("request should succeed"); + + assert_eq!(response.request_id.as_deref(), Some("req_profile_123")); + + let captured = state.lock().await; + let request = captured.first().expect("server should capture request"); + assert_eq!( + request.headers.get("anthropic-beta").map(String::as_str), + Some("tools-2026-04-01") + ); + assert_eq!( + request.headers.get("user-agent").map(String::as_str), + Some("clawd-code/9.9.9 (rust-cli)") + ); + let body: serde_json::Value = + serde_json::from_str(&request.body).expect("request body should be json"); + assert_eq!(body["metadata"]["source"], json!("clawd-code")); + + let events = sink.events(); + assert_eq!(events.len(), 4); + assert!(matches!( + &events[0], + TelemetryEvent::HttpRequestStarted { + session_id, + attempt: 1, + method, + path, + .. + } if session_id == "session-telemetry" && method == "POST" && path == "/v1/messages" + )); + assert!(matches!( + &events[1], + TelemetryEvent::SessionTrace(trace) if trace.name == "http_request_started" + )); + assert!(matches!( + &events[2], + TelemetryEvent::HttpRequestSucceeded { + request_id, + status: 200, + .. + } if request_id.as_deref() == Some("req_profile_123") + )); + assert!(matches!( + &events[3], + TelemetryEvent::SessionTrace(trace) if trace.name == "http_request_succeeded" + )); +} + #[tokio::test] async fn stream_message_parses_sse_events_with_tool_use() { let state = Arc::new(Mutex::new(Vec::::new())); diff --git a/rust/crates/runtime/Cargo.toml b/rust/crates/runtime/Cargo.toml index 7ce7cd8..cf46f2d 100644 --- a/rust/crates/runtime/Cargo.toml +++ b/rust/crates/runtime/Cargo.toml @@ -11,6 +11,7 @@ glob = "0.3" regex = "1" serde = { version = "1", features = ["derive"] } serde_json = "1" +telemetry = { path = "../telemetry" } tokio = { version = "1", features = ["io-util", "macros", "process", "rt", "rt-multi-thread", "time"] } walkdir = "2" diff --git a/rust/crates/runtime/src/conversation.rs b/rust/crates/runtime/src/conversation.rs index 4ffbabc..ccaf064 100644 --- a/rust/crates/runtime/src/conversation.rs +++ b/rust/crates/runtime/src/conversation.rs @@ -1,6 +1,9 @@ use std::collections::BTreeMap; use std::fmt::{Display, Formatter}; +use serde_json::{Map, Value}; +use telemetry::SessionTracer; + use crate::compact::{ compact_session, estimate_session_tokens, CompactionConfig, CompactionResult, }; @@ -97,6 +100,7 @@ pub struct ConversationRuntime { max_iterations: usize, usage_tracker: UsageTracker, hook_runner: HookRunner, + session_tracer: Option, } impl ConversationRuntime @@ -118,7 +122,7 @@ where tool_executor, permission_policy, system_prompt, - RuntimeFeatureConfig::default(), + &RuntimeFeatureConfig::default(), ) } @@ -129,7 +133,7 @@ where tool_executor: T, permission_policy: PermissionPolicy, system_prompt: Vec, - feature_config: RuntimeFeatureConfig, + feature_config: &RuntimeFeatureConfig, ) -> Self { let usage_tracker = UsageTracker::from_session(&session); Self { @@ -140,7 +144,8 @@ where system_prompt, max_iterations: usize::MAX, usage_tracker, - hook_runner: HookRunner::from_feature_config(&feature_config), + hook_runner: HookRunner::from_feature_config(feature_config), + session_tracer: None, } } @@ -150,14 +155,22 @@ where self } + #[must_use] + pub fn with_session_tracer(mut self, session_tracer: SessionTracer) -> Self { + self.session_tracer = Some(session_tracer); + self + } + pub fn run_turn( &mut self, user_input: impl Into, mut prompter: Option<&mut dyn PermissionPrompter>, ) -> Result { + let user_input = user_input.into(); + self.record_turn_started(&user_input); self.session .messages - .push(ConversationMessage::user_text(user_input.into())); + .push(ConversationMessage::user_text(user_input)); let mut assistant_messages = Vec::new(); let mut tool_results = Vec::new(); @@ -166,16 +179,24 @@ where loop { iterations += 1; if iterations > self.max_iterations { - return Err(RuntimeError::new( + let error = RuntimeError::new( "conversation loop exceeded the maximum number of iterations", - )); + ); + self.record_turn_failed(iterations, &error); + return Err(error); } let request = ApiRequest { system_prompt: self.system_prompt.clone(), messages: self.session.messages.clone(), }; - let events = self.api_client.stream(request)?; + let events = match self.api_client.stream(request) { + Ok(events) => events, + Err(error) => { + self.record_turn_failed(iterations, &error); + return Err(error); + } + }; let (assistant_message, usage) = build_assistant_message(events)?; if let Some(usage) = usage { self.usage_tracker.record(usage); @@ -190,6 +211,7 @@ where _ => None, }) .collect::>(); + self.record_assistant_iteration(iterations, &assistant_message, pending_tool_uses.len()); self.session.messages.push(assistant_message.clone()); assistant_messages.push(assistant_message); @@ -199,6 +221,7 @@ where } for (tool_use_id, tool_name, input) in pending_tool_uses { + self.record_tool_started(iterations, &tool_name); let permission_outcome = if let Some(prompt) = prompter.as_mut() { self.permission_policy .authorize(&tool_name, &input, Some(*prompt)) @@ -249,17 +272,20 @@ where ConversationMessage::tool_result(tool_use_id, tool_name, reason, true) } }; + self.record_tool_finished(iterations, &result_message); self.session.messages.push(result_message.clone()); tool_results.push(result_message); } } - Ok(TurnSummary { + let summary = TurnSummary { assistant_messages, tool_results, iterations, usage: self.usage_tracker.cumulative_usage(), - }) + }; + self.record_turn_completed(&summary); + Ok(summary) } #[must_use] @@ -286,6 +312,125 @@ where pub fn into_session(self) -> Session { self.session } + + fn record_turn_started(&self, user_input: &str) { + if let Some(tracer) = &self.session_tracer { + let mut attributes = Map::new(); + attributes.insert( + "message_count_before".to_string(), + Value::from(u64::try_from(self.session.messages.len()).unwrap_or(u64::MAX)), + ); + attributes.insert( + "input_chars".to_string(), + Value::from(u64::try_from(user_input.chars().count()).unwrap_or(u64::MAX)), + ); + tracer.record("turn_started", attributes); + } + } + + fn record_assistant_iteration( + &self, + iteration: usize, + assistant_message: &ConversationMessage, + pending_tool_count: usize, + ) { + if let Some(tracer) = &self.session_tracer { + let mut attributes = Map::new(); + attributes.insert( + "iteration".to_string(), + Value::from(u64::try_from(iteration).unwrap_or(u64::MAX)), + ); + attributes.insert( + "block_count".to_string(), + Value::from(u64::try_from(assistant_message.blocks.len()).unwrap_or(u64::MAX)), + ); + attributes.insert( + "pending_tool_count".to_string(), + Value::from(u64::try_from(pending_tool_count).unwrap_or(u64::MAX)), + ); + tracer.record("assistant_iteration_completed", attributes); + } + } + + fn record_tool_started(&self, iteration: usize, tool_name: &str) { + if let Some(tracer) = &self.session_tracer { + let mut attributes = Map::new(); + attributes.insert( + "iteration".to_string(), + Value::from(u64::try_from(iteration).unwrap_or(u64::MAX)), + ); + attributes.insert("tool_name".to_string(), Value::String(tool_name.to_string())); + tracer.record("tool_execution_started", attributes); + } + } + + fn record_tool_finished(&self, iteration: usize, result_message: &ConversationMessage) { + let Some(tracer) = &self.session_tracer else { + return; + }; + let Some(ContentBlock::ToolResult { + tool_name, + is_error, + output, + .. + }) = result_message.blocks.first() + else { + return; + }; + let mut attributes = Map::new(); + attributes.insert( + "iteration".to_string(), + Value::from(u64::try_from(iteration).unwrap_or(u64::MAX)), + ); + attributes.insert("tool_name".to_string(), Value::String(tool_name.clone())); + attributes.insert("is_error".to_string(), Value::Bool(*is_error)); + attributes.insert( + "output_chars".to_string(), + Value::from(u64::try_from(output.chars().count()).unwrap_or(u64::MAX)), + ); + tracer.record("tool_execution_finished", attributes); + } + + fn record_turn_completed(&self, summary: &TurnSummary) { + if let Some(tracer) = &self.session_tracer { + let mut attributes = Map::new(); + attributes.insert( + "assistant_message_count".to_string(), + Value::from( + u64::try_from(summary.assistant_messages.len()).unwrap_or(u64::MAX), + ), + ); + attributes.insert( + "tool_result_count".to_string(), + Value::from(u64::try_from(summary.tool_results.len()).unwrap_or(u64::MAX)), + ); + attributes.insert( + "iterations".to_string(), + Value::from(u64::try_from(summary.iterations).unwrap_or(u64::MAX)), + ); + attributes.insert( + "total_input_tokens".to_string(), + Value::from(summary.usage.input_tokens), + ); + attributes.insert( + "total_output_tokens".to_string(), + Value::from(summary.usage.output_tokens), + ); + tracer.record("turn_completed", attributes); + } + } + + fn record_turn_failed(&self, iteration: usize, error: &RuntimeError) { + if let Some(tracer) = &self.session_tracer { + let mut attributes = Map::new(); + attributes.insert( + "iteration".to_string(), + Value::from(u64::try_from(iteration).unwrap_or(u64::MAX)), + ); + attributes.insert("error".to_string(), Value::String(error.to_string())); + tracer.record("turn_failed", attributes); + } + } } fn build_assistant_message( @@ -609,7 +754,7 @@ mod tests { }), PermissionPolicy::new(PermissionMode::DangerFullAccess), vec!["system".to_string()], - RuntimeFeatureConfig::default().with_hooks(RuntimeHookConfig::new( + &RuntimeFeatureConfig::default().with_hooks(RuntimeHookConfig::new( vec![shell_snippet("printf 'blocked by hook'; exit 2")], Vec::new(), )), @@ -675,7 +820,7 @@ mod tests { StaticToolExecutor::new().register("add", |_input| Ok("4".to_string())), PermissionPolicy::new(PermissionMode::DangerFullAccess), vec!["system".to_string()], - RuntimeFeatureConfig::default().with_hooks(RuntimeHookConfig::new( + &RuntimeFeatureConfig::default().with_hooks(RuntimeHookConfig::new( vec![shell_snippet("printf 'pre hook ran'")], vec![shell_snippet("printf 'post hook ran'")], )), @@ -697,7 +842,7 @@ mod tests { "post hook should preserve non-error result: {output:?}" ); assert!( - output.contains("4"), + output.contains('4'), "tool output missing value: {output:?}" ); assert!( diff --git a/rust/crates/runtime/src/hooks.rs b/rust/crates/runtime/src/hooks.rs index 36756a0..f218768 100644 --- a/rust/crates/runtime/src/hooks.rs +++ b/rust/crates/runtime/src/hooks.rs @@ -64,7 +64,7 @@ impl HookRunner { #[must_use] pub fn run_pre_tool_use(&self, tool_name: &str, tool_input: &str) -> HookRunResult { - self.run_commands( + Self::run_commands( HookEvent::PreToolUse, self.config.pre_tool_use(), tool_name, @@ -82,7 +82,7 @@ impl HookRunner { tool_output: &str, is_error: bool, ) -> HookRunResult { - self.run_commands( + Self::run_commands( HookEvent::PostToolUse, self.config.post_tool_use(), tool_name, @@ -93,7 +93,6 @@ impl HookRunner { } fn run_commands( - &self, event: HookEvent, commands: &[String], tool_name: &str, @@ -114,19 +113,19 @@ impl HookRunner { "tool_result_is_error": is_error, }) .to_string(); + let invocation = HookInvocation { + event, + tool_name, + tool_input, + tool_output, + is_error, + payload: &payload, + }; let mut messages = Vec::new(); for command in commands { - match self.run_command( - command, - event, - tool_name, - tool_input, - tool_output, - is_error, - &payload, - ) { + match Self::run_command(command, &invocation) { HookCommandOutcome::Allow { message } => { if let Some(message) = message { messages.push(message); @@ -149,29 +148,23 @@ impl HookRunner { HookRunResult::allow(messages) } - fn run_command( - &self, - command: &str, - event: HookEvent, - tool_name: &str, - tool_input: &str, - tool_output: Option<&str>, - is_error: bool, - payload: &str, - ) -> HookCommandOutcome { + fn run_command(command: &str, invocation: &HookInvocation<'_>) -> HookCommandOutcome { let mut child = shell_command(command); child.stdin(std::process::Stdio::piped()); child.stdout(std::process::Stdio::piped()); child.stderr(std::process::Stdio::piped()); - child.env("HOOK_EVENT", event.as_str()); - child.env("HOOK_TOOL_NAME", tool_name); - child.env("HOOK_TOOL_INPUT", tool_input); - child.env("HOOK_TOOL_IS_ERROR", if is_error { "1" } else { "0" }); - if let Some(tool_output) = tool_output { + child.env("HOOK_EVENT", invocation.event.as_str()); + child.env("HOOK_TOOL_NAME", invocation.tool_name); + child.env("HOOK_TOOL_INPUT", invocation.tool_input); + child.env( + "HOOK_TOOL_IS_ERROR", + if invocation.is_error { "1" } else { "0" }, + ); + if let Some(tool_output) = invocation.tool_output { child.env("HOOK_TOOL_OUTPUT", tool_output); } - match child.output_with_stdin(payload.as_bytes()) { + match child.output_with_stdin(invocation.payload.as_bytes()) { Ok(output) => { let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string(); let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); @@ -189,8 +182,9 @@ impl HookRunner { }, None => HookCommandOutcome::Warn { message: format!( - "{} hook `{command}` terminated by signal while handling `{tool_name}`", - event.as_str() + "{} hook `{command}` terminated by signal while handling `{}`", + invocation.event.as_str(), + invocation.tool_name ), }, } @@ -198,13 +192,23 @@ impl HookRunner { Err(error) => HookCommandOutcome::Warn { message: format!( "{} hook `{command}` failed to start for `{tool_name}`: {error}", - event.as_str() + invocation.event.as_str(), + tool_name = invocation.tool_name ), }, } } } +struct HookInvocation<'a> { + event: HookEvent, + tool_name: &'a str, + tool_input: &'a str, + tool_output: Option<&'a str>, + is_error: bool, + payload: &'a str, +} + enum HookCommandOutcome { Allow { message: Option }, Deny { message: Option }, diff --git a/rust/crates/rusty-claude-cli/src/main.rs b/rust/crates/rusty-claude-cli/src/main.rs index 5f8a7a6..b645a89 100644 --- a/rust/crates/rusty-claude-cli/src/main.rs +++ b/rust/crates/rusty-claude-cli/src/main.rs @@ -4,6 +4,7 @@ mod render; use std::collections::{BTreeMap, BTreeSet}; use std::env; +use std::fmt::Write as _; use std::fs; use std::io::{self, Read, Write}; use std::net::TcpListener; @@ -13,8 +14,9 @@ use std::time::{SystemTime, UNIX_EPOCH}; use api::{ resolve_startup_auth_source, AnthropicClient, AuthSource, ContentBlockDelta, InputContentBlock, - InputMessage, MessageRequest, MessageResponse, OutputContentBlock, - StreamEvent as ApiStreamEvent, ToolChoice, ToolDefinition, ToolResultContentBlock, + InputMessage, JsonlTelemetrySink, MessageRequest, MessageResponse, OutputContentBlock, + SessionTracer, StreamEvent as ApiStreamEvent, ToolChoice, ToolDefinition, + ToolResultContentBlock, }; use commands::{ @@ -44,6 +46,7 @@ fn max_tokens_for_model(model: &str) -> u32 { } const DEFAULT_DATE: &str = "2026-03-31"; const DEFAULT_OAUTH_CALLBACK_PORT: u16 = 4545; +const TELEMETRY_LOG_PATH_ENV: &str = "CLAW_TELEMETRY_LOG_PATH"; const VERSION: &str = env!("CARGO_PKG_VERSION"); const BUILD_TARGET: Option<&str> = option_env!("TARGET"); const GIT_SHA: Option<&str> = option_env!("GIT_SHA"); @@ -995,6 +998,7 @@ impl LiveCli { let session = create_managed_session_handle()?; let runtime = build_runtime( Session::new(), + &session.id, model.clone(), system_prompt.clone(), enable_tools, @@ -1086,6 +1090,7 @@ impl LiveCli { let session = self.runtime.session().clone(); let mut runtime = build_runtime( session, + &self.session.id, self.model.clone(), self.system_prompt.clone(), true, @@ -1275,6 +1280,7 @@ impl LiveCli { self.permission_mode = permission_mode_from_label(normalized); self.runtime = build_runtime( session, + &self.session.id, self.model.clone(), self.system_prompt.clone(), true, @@ -1300,6 +1306,7 @@ impl LiveCli { self.session = create_managed_session_handle()?; self.runtime = build_runtime( Session::new(), + &self.session.id, self.model.clone(), self.system_prompt.clone(), true, @@ -1335,6 +1342,7 @@ impl LiveCli { let message_count = session.messages.len(); self.runtime = build_runtime( session, + &self.session.id, self.model.clone(), self.system_prompt.clone(), true, @@ -1407,6 +1415,7 @@ impl LiveCli { let message_count = session.messages.len(); self.runtime = build_runtime( session, + &handle.id, self.model.clone(), self.system_prompt.clone(), true, @@ -1437,6 +1446,7 @@ impl LiveCli { let skipped = removed == 0; self.runtime = build_runtime( result.compacted_session, + &self.session.id, self.model.clone(), self.system_prompt.clone(), true, @@ -1914,6 +1924,7 @@ fn build_runtime_feature_config( fn build_runtime( session: Session, + session_id: &str, model: String, system_prompt: Vec, enable_tools: bool, @@ -1922,14 +1933,42 @@ fn build_runtime( permission_mode: PermissionMode, ) -> Result, Box> { - Ok(ConversationRuntime::new_with_features( + let session_tracer = build_session_tracer(session_id)?; + let api_client = match session_tracer.clone() { + Some(session_tracer) => AnthropicRuntimeClient::new( + model, + enable_tools, + emit_output, + allowed_tools.clone(), + )? + .with_session_tracer(session_tracer), + None => AnthropicRuntimeClient::new(model, enable_tools, emit_output, allowed_tools.clone())?, + }; + let runtime = ConversationRuntime::new_with_features( session, - AnthropicRuntimeClient::new(model, enable_tools, emit_output, allowed_tools.clone())?, + api_client, CliToolExecutor::new(allowed_tools, emit_output), permission_policy(permission_mode), system_prompt, - build_runtime_feature_config()?, - )) + &build_runtime_feature_config()?, + ); + Ok(match session_tracer { + Some(session_tracer) => runtime.with_session_tracer(session_tracer), + None => runtime, + }) +} + +fn build_session_tracer( + session_id: &str, +) -> Result, Box> { + let Some(path) = env::var_os(TELEMETRY_LOG_PATH_ENV) else { + return Ok(None); + }; + let sink = JsonlTelemetrySink::new(PathBuf::from(path))?; + Ok(Some(SessionTracer::new( + session_id.to_string(), + std::sync::Arc::new(sink), + ))) } struct CliPermissionPrompter { @@ -2004,6 +2043,11 @@ impl AnthropicRuntimeClient { allowed_tools, }) } + + fn with_session_tracer(mut self, session_tracer: SessionTracer) -> Self { + self.client = self.client.with_session_tracer(session_tracer); + self + } } fn resolve_cli_auth_source() -> Result> { @@ -2364,13 +2408,13 @@ fn format_bash_result(icon: &str, parsed: &serde_json::Value) -> String { .get("backgroundTaskId") .and_then(|value| value.as_str()) { - lines[0].push_str(&format!(" backgrounded ({task_id})")); + write!(&mut lines[0], " backgrounded ({task_id})").expect("write to string"); } else if let Some(status) = parsed .get("returnCodeInterpretation") .and_then(|value| value.as_str()) .filter(|status| !status.is_empty()) { - lines[0].push_str(&format!(" {status}")); + write!(&mut lines[0], " {status}").expect("write to string"); } if let Some(stdout) = parsed.get("stdout").and_then(|value| value.as_str()) { @@ -2392,15 +2436,15 @@ fn format_read_result(icon: &str, parsed: &serde_json::Value) -> String { let path = extract_tool_path(file); let start_line = file .get("startLine") - .and_then(|value| value.as_u64()) + .and_then(serde_json::Value::as_u64) .unwrap_or(1); let num_lines = file .get("numLines") - .and_then(|value| value.as_u64()) + .and_then(serde_json::Value::as_u64) .unwrap_or(0); let total_lines = file .get("totalLines") - .and_then(|value| value.as_u64()) + .and_then(serde_json::Value::as_u64) .unwrap_or(num_lines); let content = file .get("content") @@ -2426,8 +2470,7 @@ fn format_write_result(icon: &str, parsed: &serde_json::Value) -> String { let line_count = parsed .get("content") .and_then(|value| value.as_str()) - .map(|content| content.lines().count()) - .unwrap_or(0); + .map_or(0, |content| content.lines().count()); format!( "{icon} \x1b[1;32m✏️ {} {path}\x1b[0m \x1b[2m({line_count} lines)\x1b[0m", if kind == "create" { "Wrote" } else { "Updated" }, @@ -2458,7 +2501,7 @@ fn format_edit_result(icon: &str, parsed: &serde_json::Value) -> String { let path = extract_tool_path(parsed); let suffix = if parsed .get("replaceAll") - .and_then(|value| value.as_bool()) + .and_then(serde_json::Value::as_bool) .unwrap_or(false) { " (replace all)" @@ -2486,7 +2529,7 @@ fn format_edit_result(icon: &str, parsed: &serde_json::Value) -> String { fn format_glob_result(icon: &str, parsed: &serde_json::Value) -> String { let num_files = parsed .get("numFiles") - .and_then(|value| value.as_u64()) + .and_then(serde_json::Value::as_u64) .unwrap_or(0); let filenames = parsed .get("filenames") @@ -2510,11 +2553,11 @@ fn format_glob_result(icon: &str, parsed: &serde_json::Value) -> String { fn format_grep_result(icon: &str, parsed: &serde_json::Value) -> String { let num_matches = parsed .get("numMatches") - .and_then(|value| value.as_u64()) + .and_then(serde_json::Value::as_u64) .unwrap_or(0); let num_files = parsed .get("numFiles") - .and_then(|value| value.as_u64()) + .and_then(serde_json::Value::as_u64) .unwrap_or(0); let content = parsed .get("content") diff --git a/rust/crates/rusty-claude-cli/src/render.rs b/rust/crates/rusty-claude-cli/src/render.rs index 465c5a4..d8d8796 100644 --- a/rust/crates/rusty-claude-cli/src/render.rs +++ b/rust/crates/rusty-claude-cli/src/render.rs @@ -286,7 +286,7 @@ impl TerminalRenderer { ) { match event { Event::Start(Tag::Heading { level, .. }) => { - self.start_heading(state, level as u8, output) + Self::start_heading(state, level as u8, output); } Event::End(TagEnd::Paragraph) => output.push_str("\n\n"), Event::Start(Tag::BlockQuote(..)) => self.start_quote(state, output), @@ -426,7 +426,7 @@ impl TerminalRenderer { } } - fn start_heading(&self, state: &mut RenderState, level: u8, output: &mut String) { + fn start_heading(state: &mut RenderState, level: u8, output: &mut String) { state.heading_level = Some(level); if !output.is_empty() { output.push('\n'); diff --git a/rust/crates/telemetry/Cargo.toml b/rust/crates/telemetry/Cargo.toml new file mode 100644 index 0000000..d501850 --- /dev/null +++ b/rust/crates/telemetry/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "telemetry" +version.workspace = true +edition.workspace = true +license.workspace = true +publish.workspace = true + +[dependencies] +serde = { version = "1", features = ["derive"] } +serde_json = "1" + +[lints] +workspace = true diff --git a/rust/crates/telemetry/src/lib.rs b/rust/crates/telemetry/src/lib.rs new file mode 100644 index 0000000..e548f45 --- /dev/null +++ b/rust/crates/telemetry/src/lib.rs @@ -0,0 +1,509 @@ +use std::fmt::{Debug, Formatter}; +use std::fs::{File, OpenOptions}; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; + +pub const DEFAULT_ANTHROPIC_VERSION: &str = "2023-06-01"; +pub const DEFAULT_APP_NAME: &str = "clawd-code"; +pub const DEFAULT_RUNTIME: &str = "rust"; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ClientIdentity { + pub app_name: String, + pub app_version: String, + pub runtime: String, +} + +impl ClientIdentity { + #[must_use] + pub fn new(app_name: impl Into, app_version: impl Into) -> Self { + Self { + app_name: app_name.into(), + app_version: app_version.into(), + runtime: DEFAULT_RUNTIME.to_string(), + } + } + + #[must_use] + pub fn with_runtime(mut self, runtime: impl Into) -> Self { + self.runtime = runtime.into(); + self + } + + #[must_use] + pub fn user_agent(&self) -> String { + format!("{}/{} ({})", self.app_name, self.app_version, self.runtime) + } +} + +impl Default for ClientIdentity { + fn default() -> Self { + Self::new(DEFAULT_APP_NAME, env!("CARGO_PKG_VERSION")) + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct AnthropicRequestProfile { + pub anthropic_version: String, + pub client_identity: ClientIdentity, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub betas: Vec, + #[serde(default, skip_serializing_if = "Map::is_empty")] + pub extra_body: Map, +} + +impl AnthropicRequestProfile { + #[must_use] + pub fn new(client_identity: ClientIdentity) -> Self { + Self { + anthropic_version: DEFAULT_ANTHROPIC_VERSION.to_string(), + client_identity, + betas: Vec::new(), + extra_body: Map::new(), + } + } + + #[must_use] + pub fn with_beta(mut self, beta: impl Into) -> Self { + let beta = beta.into(); + if !self.betas.contains(&beta) { + self.betas.push(beta); + } + self + } + + #[must_use] + pub fn with_extra_body(mut self, key: impl Into, value: Value) -> Self { + self.extra_body.insert(key.into(), value); + self + } + + #[must_use] + pub fn header_pairs(&self) -> Vec<(String, String)> { + let mut headers = vec![ + ( + "anthropic-version".to_string(), + self.anthropic_version.clone(), + ), + ("user-agent".to_string(), self.client_identity.user_agent()), + ]; + if !self.betas.is_empty() { + headers.push(("anthropic-beta".to_string(), self.betas.join(","))); + } + headers + } + + pub fn render_json_body(&self, request: &T) -> Result { + let mut body = serde_json::to_value(request)?; + let object = body.as_object_mut().ok_or_else(|| { + serde_json::Error::io(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "request body must serialize to a JSON object", + )) + })?; + for (key, value) in &self.extra_body { + object.insert(key.clone(), value.clone()); + } + Ok(body) + } +} + +impl Default for AnthropicRequestProfile { + fn default() -> Self { + Self::new(ClientIdentity::default()) + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct AnalyticsEvent { + pub namespace: String, + pub action: String, + #[serde(default, skip_serializing_if = "Map::is_empty")] + pub properties: Map, +} + +impl AnalyticsEvent { + #[must_use] + pub fn new(namespace: impl Into, action: impl Into) -> Self { + Self { + namespace: namespace.into(), + action: action.into(), + properties: Map::new(), + } + } + + #[must_use] + pub fn with_property(mut self, key: impl Into, value: Value) -> Self { + self.properties.insert(key.into(), value); + self + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct SessionTraceRecord { + pub session_id: String, + pub sequence: u64, + pub name: String, + pub timestamp_ms: u64, + #[serde(default, skip_serializing_if = "Map::is_empty")] + pub attributes: Map, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum TelemetryEvent { + HttpRequestStarted { + session_id: String, + attempt: u32, + method: String, + path: String, + #[serde(default, skip_serializing_if = "Map::is_empty")] + attributes: Map, + }, + HttpRequestSucceeded { + session_id: String, + attempt: u32, + method: String, + path: String, + status: u16, + #[serde(default, skip_serializing_if = "Option::is_none")] + request_id: Option, + #[serde(default, skip_serializing_if = "Map::is_empty")] + attributes: Map, + }, + HttpRequestFailed { + session_id: String, + attempt: u32, + method: String, + path: String, + error: String, + retryable: bool, + #[serde(default, skip_serializing_if = "Map::is_empty")] + attributes: Map, + }, + Analytics(AnalyticsEvent), + SessionTrace(SessionTraceRecord), +} + +pub trait TelemetrySink: Send + Sync { + fn record(&self, event: TelemetryEvent); +} + +#[derive(Default)] +pub struct MemoryTelemetrySink { + events: Mutex>, +} + +impl MemoryTelemetrySink { + #[must_use] + pub fn events(&self) -> Vec { + self.events + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .clone() + } +} + +impl TelemetrySink for MemoryTelemetrySink { + fn record(&self, event: TelemetryEvent) { + self.events + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .push(event); + } +} + +pub struct JsonlTelemetrySink { + path: PathBuf, + file: Mutex, +} + +impl Debug for JsonlTelemetrySink { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("JsonlTelemetrySink") + .field("path", &self.path) + .finish_non_exhaustive() + } +} + +impl JsonlTelemetrySink { + pub fn new(path: impl AsRef) -> Result { + let path = path.as_ref().to_path_buf(); + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + let file = OpenOptions::new().create(true).append(true).open(&path)?; + Ok(Self { + path, + file: Mutex::new(file), + }) + } + + #[must_use] + pub fn path(&self) -> &Path { + &self.path + } +} + +impl TelemetrySink for JsonlTelemetrySink { + fn record(&self, event: TelemetryEvent) { + let Ok(line) = serde_json::to_string(&event) else { + return; + }; + let mut file = self + .file + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let _ = writeln!(file, "{line}"); + let _ = file.flush(); + } +} + +#[derive(Clone)] +pub struct SessionTracer { + session_id: String, + sequence: Arc, + sink: Arc, +} + +impl Debug for SessionTracer { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SessionTracer") + .field("session_id", &self.session_id) + .finish_non_exhaustive() + } +} + +impl SessionTracer { + #[must_use] + pub fn new(session_id: impl Into, sink: Arc) -> Self { + Self { + session_id: session_id.into(), + sequence: Arc::new(AtomicU64::new(0)), + sink, + } + } + + #[must_use] + pub fn session_id(&self) -> &str { + &self.session_id + } + + pub fn record(&self, name: impl Into, attributes: Map) { + let record = SessionTraceRecord { + session_id: self.session_id.clone(), + sequence: self.sequence.fetch_add(1, Ordering::Relaxed), + name: name.into(), + timestamp_ms: current_timestamp_ms(), + attributes, + }; + self.sink.record(TelemetryEvent::SessionTrace(record)); + } + + pub fn record_http_request_started( + &self, + attempt: u32, + method: impl Into, + path: impl Into, + attributes: Map, + ) { + let method = method.into(); + let path = path.into(); + self.sink.record(TelemetryEvent::HttpRequestStarted { + session_id: self.session_id.clone(), + attempt, + method: method.clone(), + path: path.clone(), + attributes: attributes.clone(), + }); + self.record( + "http_request_started", + merge_trace_fields(method, path, attempt, attributes), + ); + } + + pub fn record_http_request_succeeded( + &self, + attempt: u32, + method: impl Into, + path: impl Into, + status: u16, + request_id: Option, + attributes: Map, + ) { + let method = method.into(); + let path = path.into(); + self.sink.record(TelemetryEvent::HttpRequestSucceeded { + session_id: self.session_id.clone(), + attempt, + method: method.clone(), + path: path.clone(), + status, + request_id: request_id.clone(), + attributes: attributes.clone(), + }); + let mut trace_attributes = merge_trace_fields(method, path, attempt, attributes); + trace_attributes.insert("status".to_string(), Value::from(status)); + if let Some(request_id) = request_id { + trace_attributes.insert("request_id".to_string(), Value::String(request_id)); + } + self.record("http_request_succeeded", trace_attributes); + } + + pub fn record_http_request_failed( + &self, + attempt: u32, + method: impl Into, + path: impl Into, + error: impl Into, + retryable: bool, + attributes: Map, + ) { + let method = method.into(); + let path = path.into(); + let error = error.into(); + self.sink.record(TelemetryEvent::HttpRequestFailed { + session_id: self.session_id.clone(), + attempt, + method: method.clone(), + path: path.clone(), + error: error.clone(), + retryable, + attributes: attributes.clone(), + }); + let mut trace_attributes = merge_trace_fields(method, path, attempt, attributes); + trace_attributes.insert("error".to_string(), Value::String(error)); + trace_attributes.insert("retryable".to_string(), Value::Bool(retryable)); + self.record("http_request_failed", trace_attributes); + } + + pub fn record_analytics(&self, event: AnalyticsEvent) { + let mut attributes = event.properties.clone(); + attributes.insert( + "namespace".to_string(), + Value::String(event.namespace.clone()), + ); + attributes.insert("action".to_string(), Value::String(event.action.clone())); + self.sink.record(TelemetryEvent::Analytics(event)); + self.record("analytics", attributes); + } +} + +fn merge_trace_fields( + method: String, + path: String, + attempt: u32, + mut attributes: Map, +) -> Map { + attributes.insert("method".to_string(), Value::String(method)); + attributes.insert("path".to_string(), Value::String(path)); + attributes.insert("attempt".to_string(), Value::from(attempt)); + attributes +} + +fn current_timestamp_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() + .try_into() + .unwrap_or(u64::MAX) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn request_profile_emits_headers_and_merges_body() { + let profile = AnthropicRequestProfile::new( + ClientIdentity::new("clawd-code", "1.2.3").with_runtime("rust-cli"), + ) + .with_beta("tools-2026-04-01") + .with_extra_body("metadata", serde_json::json!({"source": "test"})); + + assert_eq!( + profile.header_pairs(), + vec![ + ( + "anthropic-version".to_string(), + DEFAULT_ANTHROPIC_VERSION.to_string() + ), + ( + "user-agent".to_string(), + "clawd-code/1.2.3 (rust-cli)".to_string() + ), + ("anthropic-beta".to_string(), "tools-2026-04-01".to_string(),), + ] + ); + + let body = profile + .render_json_body(&serde_json::json!({"model": "claude-sonnet"})) + .expect("body should serialize"); + assert_eq!( + body["metadata"]["source"], + Value::String("test".to_string()) + ); + } + + #[test] + fn session_tracer_records_structured_events_and_trace_sequence() { + let sink = Arc::new(MemoryTelemetrySink::default()); + let tracer = SessionTracer::new("session-123", sink.clone()); + + tracer.record_http_request_started(1, "POST", "/v1/messages", Map::new()); + tracer.record_analytics( + AnalyticsEvent::new("cli", "prompt_sent") + .with_property("model", Value::String("claude-opus".to_string())), + ); + + let events = sink.events(); + assert!(matches!( + &events[0], + TelemetryEvent::HttpRequestStarted { + session_id, + attempt: 1, + method, + path, + .. + } if session_id == "session-123" && method == "POST" && path == "/v1/messages" + )); + assert!(matches!( + &events[1], + TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 0, name, .. }) + if name == "http_request_started" + )); + assert!(matches!(&events[2], TelemetryEvent::Analytics(_))); + assert!(matches!( + &events[3], + TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 1, name, .. }) + if name == "analytics" + )); + } + + #[test] + fn jsonl_sink_persists_events() { + let path = std::env::temp_dir().join(format!( + "telemetry-jsonl-{}.log", + current_timestamp_ms() + )); + let sink = JsonlTelemetrySink::new(&path).expect("sink should create file"); + + sink.record(TelemetryEvent::Analytics( + AnalyticsEvent::new("cli", "turn_completed") + .with_property("ok", Value::Bool(true)), + )); + + let contents = std::fs::read_to_string(&path).expect("telemetry log should be readable"); + assert!(contents.contains("\"type\":\"analytics\"")); + assert!(contents.contains("\"action\":\"turn_completed\"")); + + let _ = std::fs::remove_file(path); + } +} From e7e3ae287503f3f91eb1b822b7b81941f097e057 Mon Sep 17 00:00:00 2001 From: Yeachan-Heo Date: Wed, 1 Apr 2026 04:40:21 +0000 Subject: [PATCH 2/5] wip: telemetry progress --- rust/Cargo.lock | 1 + rust/crates/runtime/src/conversation.rs | 59 +++++++++++++++++++++--- rust/crates/rusty-claude-cli/src/main.rs | 19 ++++---- rust/crates/telemetry/src/lib.rs | 9 ++-- 4 files changed, 67 insertions(+), 21 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index b4dbf7e..1995038 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1097,6 +1097,7 @@ dependencies = [ "serde", "serde_json", "sha2", + "telemetry", "tokio", "walkdir", ] diff --git a/rust/crates/runtime/src/conversation.rs b/rust/crates/runtime/src/conversation.rs index ccaf064..d44fb90 100644 --- a/rust/crates/runtime/src/conversation.rs +++ b/rust/crates/runtime/src/conversation.rs @@ -161,6 +161,7 @@ where self } + #[allow(clippy::too_many_lines)] pub fn run_turn( &mut self, user_input: impl Into, @@ -197,7 +198,13 @@ where return Err(error); } }; - let (assistant_message, usage) = build_assistant_message(events)?; + let (assistant_message, usage) = match build_assistant_message(events) { + Ok(result) => result, + Err(error) => { + self.record_turn_failed(iterations, &error); + return Err(error); + } + }; if let Some(usage) = usage { self.usage_tracker.record(usage); } @@ -211,7 +218,11 @@ where _ => None, }) .collect::>(); - self.record_assistant_iteration(iterations, &assistant_message, pending_tool_uses.len()); + self.record_assistant_iteration( + iterations, + &assistant_message, + pending_tool_uses.len(), + ); self.session.messages.push(assistant_message.clone()); assistant_messages.push(assistant_message); @@ -359,7 +370,10 @@ where "iteration".to_string(), Value::from(u64::try_from(iteration).unwrap_or(u64::MAX)), ); - attributes.insert("tool_name".to_string(), Value::String(tool_name.to_string())); + attributes.insert( + "tool_name".to_string(), + Value::String(tool_name.to_string()), + ); tracer.record("tool_execution_started", attributes); } } @@ -396,9 +410,7 @@ where let mut attributes = Map::new(); attributes.insert( "assistant_message_count".to_string(), - Value::from( - u64::try_from(summary.assistant_messages.len()).unwrap_or(u64::MAX), - ), + Value::from(u64::try_from(summary.assistant_messages.len()).unwrap_or(u64::MAX)), ); attributes.insert( "tool_result_count".to_string(), @@ -554,6 +566,8 @@ mod tests { use crate::session::{ContentBlock, MessageRole, Session}; use crate::usage::TokenUsage; use std::path::PathBuf; + use std::sync::Arc; + use telemetry::{MemoryTelemetrySink, SessionTracer, TelemetryEvent}; struct ScriptedApiClient { call_count: usize, @@ -666,6 +680,39 @@ mod tests { )); } + #[test] + fn records_runtime_session_trace_events() { + let sink = Arc::new(MemoryTelemetrySink::default()); + let tracer = SessionTracer::new("session-runtime", sink.clone()); + let mut runtime = ConversationRuntime::new( + Session::new(), + ScriptedApiClient { call_count: 0 }, + StaticToolExecutor::new().register("add", |_input| Ok("4".to_string())), + PermissionPolicy::new(PermissionMode::WorkspaceWrite), + vec!["system".to_string()], + ) + .with_session_tracer(tracer); + + runtime + .run_turn("what is 2 + 2?", Some(&mut PromptAllowOnce)) + .expect("conversation loop should succeed"); + + let events = sink.events(); + let trace_names = events + .iter() + .filter_map(|event| match event { + TelemetryEvent::SessionTrace(trace) => Some(trace.name.as_str()), + _ => None, + }) + .collect::>(); + + assert!(trace_names.contains(&"turn_started")); + assert!(trace_names.contains(&"assistant_iteration_completed")); + assert!(trace_names.contains(&"tool_execution_started")); + assert!(trace_names.contains(&"tool_execution_finished")); + assert!(trace_names.contains(&"turn_completed")); + } + #[test] fn records_denied_tool_results_when_prompt_rejects() { struct RejectPrompter; diff --git a/rust/crates/rusty-claude-cli/src/main.rs b/rust/crates/rusty-claude-cli/src/main.rs index b645a89..0a4ea16 100644 --- a/rust/crates/rusty-claude-cli/src/main.rs +++ b/rust/crates/rusty-claude-cli/src/main.rs @@ -1237,6 +1237,7 @@ impl LiveCli { let message_count = session.messages.len(); self.runtime = build_runtime( session, + &self.session.id, model.clone(), self.system_prompt.clone(), true, @@ -1342,7 +1343,7 @@ impl LiveCli { let message_count = session.messages.len(); self.runtime = build_runtime( session, - &self.session.id, + &handle.id, self.model.clone(), self.system_prompt.clone(), true, @@ -1922,6 +1923,7 @@ fn build_runtime_feature_config( .clone()) } +#[allow(clippy::too_many_arguments)] fn build_runtime( session: Session, session_id: &str, @@ -1935,14 +1937,13 @@ fn build_runtime( { let session_tracer = build_session_tracer(session_id)?; let api_client = match session_tracer.clone() { - Some(session_tracer) => AnthropicRuntimeClient::new( - model, - enable_tools, - emit_output, - allowed_tools.clone(), - )? - .with_session_tracer(session_tracer), - None => AnthropicRuntimeClient::new(model, enable_tools, emit_output, allowed_tools.clone())?, + Some(session_tracer) => { + AnthropicRuntimeClient::new(model, enable_tools, emit_output, allowed_tools.clone())? + .with_session_tracer(session_tracer) + } + None => { + AnthropicRuntimeClient::new(model, enable_tools, emit_output, allowed_tools.clone())? + } }; let runtime = ConversationRuntime::new_with_features( session, diff --git a/rust/crates/telemetry/src/lib.rs b/rust/crates/telemetry/src/lib.rs index e548f45..e7bf6e2 100644 --- a/rust/crates/telemetry/src/lib.rs +++ b/rust/crates/telemetry/src/lib.rs @@ -489,15 +489,12 @@ mod tests { #[test] fn jsonl_sink_persists_events() { - let path = std::env::temp_dir().join(format!( - "telemetry-jsonl-{}.log", - current_timestamp_ms() - )); + let path = + std::env::temp_dir().join(format!("telemetry-jsonl-{}.log", current_timestamp_ms())); let sink = JsonlTelemetrySink::new(&path).expect("sink should create file"); sink.record(TelemetryEvent::Analytics( - AnalyticsEvent::new("cli", "turn_completed") - .with_property("ok", Value::Bool(true)), + AnalyticsEvent::new("cli", "turn_completed").with_property("ok", Value::Bool(true)), )); let contents = std::fs::read_to_string(&path).expect("telemetry log should be readable"); From 828597024e85518fc41ea1ead22945f5478b913d Mon Sep 17 00:00:00 2001 From: Yeachan-Heo Date: Wed, 1 Apr 2026 05:45:28 +0000 Subject: [PATCH 3/5] wip: telemetry claude code matching --- rust/crates/api/tests/client_integration.rs | 17 ++++++++++---- rust/crates/telemetry/src/lib.rs | 25 +++++++++++++++------ 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/rust/crates/api/tests/client_integration.rs b/rust/crates/api/tests/client_integration.rs index d95dba8..569c2fa 100644 --- a/rust/crates/api/tests/client_integration.rs +++ b/rust/crates/api/tests/client_integration.rs @@ -71,7 +71,11 @@ async fn send_message_posts_json_and_parses_response() { ); assert_eq!( request.headers.get("user-agent").map(String::as_str), - Some("clawd-code/0.1.0 (rust)") + Some("claude-code/0.1.0") + ); + assert_eq!( + request.headers.get("anthropic-beta").map(String::as_str), + Some("claude-code-20250219") ); let body: serde_json::Value = serde_json::from_str(&request.body).expect("request body should be json"); @@ -82,6 +86,7 @@ async fn send_message_posts_json_and_parses_response() { assert!(body.get("stream").is_none()); assert_eq!(body["tools"][0]["name"], json!("get_weather")); assert_eq!(body["tool_choice"]["type"], json!("auto")); + assert_eq!(body["betas"], json!(["claude-code-20250219"])); } #[tokio::test] @@ -112,7 +117,7 @@ async fn send_message_applies_request_profile_and_records_telemetry() { let client = AnthropicClient::new("test-key") .with_base_url(server.base_url()) - .with_client_identity(ClientIdentity::new("clawd-code", "9.9.9").with_runtime("rust-cli")) + .with_client_identity(ClientIdentity::new("claude-code", "9.9.9").with_runtime("rust-cli")) .with_beta("tools-2026-04-01") .with_extra_body_param("metadata", json!({"source": "clawd-code"})) .with_session_tracer(SessionTracer::new("session-telemetry", sink.clone())); @@ -128,15 +133,19 @@ async fn send_message_applies_request_profile_and_records_telemetry() { let request = captured.first().expect("server should capture request"); assert_eq!( request.headers.get("anthropic-beta").map(String::as_str), - Some("tools-2026-04-01") + Some("claude-code-20250219,tools-2026-04-01") ); assert_eq!( request.headers.get("user-agent").map(String::as_str), - Some("clawd-code/9.9.9 (rust-cli)") + Some("claude-code/9.9.9") ); let body: serde_json::Value = serde_json::from_str(&request.body).expect("request body should be json"); assert_eq!(body["metadata"]["source"], json!("clawd-code")); + assert_eq!( + body["betas"], + json!(["claude-code-20250219", "tools-2026-04-01"]) + ); let events = sink.events(); assert_eq!(events.len(), 4); diff --git a/rust/crates/telemetry/src/lib.rs b/rust/crates/telemetry/src/lib.rs index e7bf6e2..4a471cc 100644 --- a/rust/crates/telemetry/src/lib.rs +++ b/rust/crates/telemetry/src/lib.rs @@ -10,8 +10,9 @@ use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; pub const DEFAULT_ANTHROPIC_VERSION: &str = "2023-06-01"; -pub const DEFAULT_APP_NAME: &str = "clawd-code"; +pub const DEFAULT_APP_NAME: &str = "claude-code"; pub const DEFAULT_RUNTIME: &str = "rust"; +pub const DEFAULT_AGENTIC_BETA: &str = "claude-code-20250219"; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ClientIdentity { @@ -38,7 +39,7 @@ impl ClientIdentity { #[must_use] pub fn user_agent(&self) -> String { - format!("{}/{} ({})", self.app_name, self.app_version, self.runtime) + format!("{}/{}", self.app_name, self.app_version) } } @@ -64,7 +65,7 @@ impl AnthropicRequestProfile { Self { anthropic_version: DEFAULT_ANTHROPIC_VERSION.to_string(), client_identity, - betas: Vec::new(), + betas: vec![DEFAULT_AGENTIC_BETA.to_string()], extra_body: Map::new(), } } @@ -110,6 +111,12 @@ impl AnthropicRequestProfile { for (key, value) in &self.extra_body { object.insert(key.clone(), value.clone()); } + if !self.betas.is_empty() { + object.insert( + "betas".to_string(), + Value::Array(self.betas.iter().cloned().map(Value::String).collect()), + ); + } Ok(body) } } @@ -423,7 +430,7 @@ mod tests { #[test] fn request_profile_emits_headers_and_merges_body() { let profile = AnthropicRequestProfile::new( - ClientIdentity::new("clawd-code", "1.2.3").with_runtime("rust-cli"), + ClientIdentity::new("claude-code", "1.2.3").with_runtime("rust-cli"), ) .with_beta("tools-2026-04-01") .with_extra_body("metadata", serde_json::json!({"source": "test"})); @@ -435,11 +442,11 @@ mod tests { "anthropic-version".to_string(), DEFAULT_ANTHROPIC_VERSION.to_string() ), + ("user-agent".to_string(), "claude-code/1.2.3".to_string()), ( - "user-agent".to_string(), - "clawd-code/1.2.3 (rust-cli)".to_string() + "anthropic-beta".to_string(), + "claude-code-20250219,tools-2026-04-01".to_string(), ), - ("anthropic-beta".to_string(), "tools-2026-04-01".to_string(),), ] ); @@ -450,6 +457,10 @@ mod tests { body["metadata"]["source"], Value::String("test".to_string()) ); + assert_eq!( + body["betas"], + serde_json::json!(["claude-code-20250219", "tools-2026-04-01"]) + ); } #[test] From 1b42c6096cbeea0a126fa05d962e1a6af29c98c1 Mon Sep 17 00:00:00 2001 From: Yeachan-Heo Date: Wed, 1 Apr 2026 05:55:25 +0000 Subject: [PATCH 4/5] feat: anthropic SDK header matching + request profile --- rust/crates/api/src/client.rs | 13 +++++++++++++ rust/crates/api/tests/client_integration.rs | 15 +++++++++++---- rust/crates/telemetry/src/lib.rs | 15 ++++++++++++--- 3 files changed, 36 insertions(+), 7 deletions(-) diff --git a/rust/crates/api/src/client.rs b/rust/crates/api/src/client.rs index d7dacbe..6901de6 100644 --- a/rust/crates/api/src/client.rs +++ b/rust/crates/api/src/client.rs @@ -439,6 +439,19 @@ impl AnthropicClient { "beta_count".to_string(), Value::from(u64::try_from(self.request_profile.betas.len()).unwrap_or(u64::MAX)), ); + if !self.request_profile.betas.is_empty() { + attributes.insert( + "betas".to_string(), + Value::Array( + self.request_profile + .betas + .iter() + .cloned() + .map(Value::String) + .collect(), + ), + ); + } if !self.request_profile.extra_body.is_empty() { attributes.insert( "extra_body_keys".to_string(), diff --git a/rust/crates/api/tests/client_integration.rs b/rust/crates/api/tests/client_integration.rs index 569c2fa..4b78d0e 100644 --- a/rust/crates/api/tests/client_integration.rs +++ b/rust/crates/api/tests/client_integration.rs @@ -75,7 +75,7 @@ async fn send_message_posts_json_and_parses_response() { ); assert_eq!( request.headers.get("anthropic-beta").map(String::as_str), - Some("claude-code-20250219") + Some("claude-code-20250219,prompt-caching-scope-2026-01-05") ); let body: serde_json::Value = serde_json::from_str(&request.body).expect("request body should be json"); @@ -86,7 +86,10 @@ async fn send_message_posts_json_and_parses_response() { assert!(body.get("stream").is_none()); assert_eq!(body["tools"][0]["name"], json!("get_weather")); assert_eq!(body["tool_choice"]["type"], json!("auto")); - assert_eq!(body["betas"], json!(["claude-code-20250219"])); + assert_eq!( + body["betas"], + json!(["claude-code-20250219", "prompt-caching-scope-2026-01-05"]) + ); } #[tokio::test] @@ -133,7 +136,7 @@ async fn send_message_applies_request_profile_and_records_telemetry() { let request = captured.first().expect("server should capture request"); assert_eq!( request.headers.get("anthropic-beta").map(String::as_str), - Some("claude-code-20250219,tools-2026-04-01") + Some("claude-code-20250219,prompt-caching-scope-2026-01-05,tools-2026-04-01") ); assert_eq!( request.headers.get("user-agent").map(String::as_str), @@ -144,7 +147,11 @@ async fn send_message_applies_request_profile_and_records_telemetry() { assert_eq!(body["metadata"]["source"], json!("clawd-code")); assert_eq!( body["betas"], - json!(["claude-code-20250219", "tools-2026-04-01"]) + json!([ + "claude-code-20250219", + "prompt-caching-scope-2026-01-05", + "tools-2026-04-01" + ]) ); let events = sink.events(); diff --git a/rust/crates/telemetry/src/lib.rs b/rust/crates/telemetry/src/lib.rs index 4a471cc..6e369e1 100644 --- a/rust/crates/telemetry/src/lib.rs +++ b/rust/crates/telemetry/src/lib.rs @@ -13,6 +13,7 @@ pub const DEFAULT_ANTHROPIC_VERSION: &str = "2023-06-01"; pub const DEFAULT_APP_NAME: &str = "claude-code"; pub const DEFAULT_RUNTIME: &str = "rust"; pub const DEFAULT_AGENTIC_BETA: &str = "claude-code-20250219"; +pub const DEFAULT_PROMPT_CACHING_SCOPE_BETA: &str = "prompt-caching-scope-2026-01-05"; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ClientIdentity { @@ -65,7 +66,10 @@ impl AnthropicRequestProfile { Self { anthropic_version: DEFAULT_ANTHROPIC_VERSION.to_string(), client_identity, - betas: vec![DEFAULT_AGENTIC_BETA.to_string()], + betas: vec![ + DEFAULT_AGENTIC_BETA.to_string(), + DEFAULT_PROMPT_CACHING_SCOPE_BETA.to_string(), + ], extra_body: Map::new(), } } @@ -445,7 +449,8 @@ mod tests { ("user-agent".to_string(), "claude-code/1.2.3".to_string()), ( "anthropic-beta".to_string(), - "claude-code-20250219,tools-2026-04-01".to_string(), + "claude-code-20250219,prompt-caching-scope-2026-01-05,tools-2026-04-01" + .to_string(), ), ] ); @@ -459,7 +464,11 @@ mod tests { ); assert_eq!( body["betas"], - serde_json::json!(["claude-code-20250219", "tools-2026-04-01"]) + serde_json::json!([ + "claude-code-20250219", + "prompt-caching-scope-2026-01-05", + "tools-2026-04-01" + ]) ); } From 61b4def7bc7ef3c3fe811ff72d482e7ca90843a8 Mon Sep 17 00:00:00 2001 From: Yeachan-Heo Date: Wed, 1 Apr 2026 06:15:15 +0000 Subject: [PATCH 5/5] feat: telemetry progress --- rust/crates/api/src/client.rs | 76 ++++++++++++++++++++- rust/crates/api/src/types.rs | 69 ++++++++++++++++++- rust/crates/api/tests/client_integration.rs | 17 ++++- rust/crates/runtime/src/mcp_stdio.rs | 16 ++++- rust/crates/rusty-claude-cli/src/main.rs | 14 +--- rust/crates/tools/src/lib.rs | 16 +---- 6 files changed, 175 insertions(+), 33 deletions(-) diff --git a/rust/crates/api/src/client.rs b/rust/crates/api/src/client.rs index 6901de6..f8eb97a 100644 --- a/rust/crates/api/src/client.rs +++ b/rust/crates/api/src/client.rs @@ -2,12 +2,12 @@ use std::collections::VecDeque; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use runtime::{ - load_oauth_credentials, save_oauth_credentials, OAuthConfig, OAuthRefreshRequest, - OAuthTokenExchangeRequest, + format_usd, load_oauth_credentials, pricing_for_model, save_oauth_credentials, OAuthConfig, + OAuthRefreshRequest, OAuthTokenExchangeRequest, }; use serde::Deserialize; use serde_json::{Map, Value}; -use telemetry::{AnthropicRequestProfile, ClientIdentity, SessionTracer}; +use telemetry::{AnalyticsEvent, AnthropicRequestProfile, ClientIdentity, SessionTracer}; use crate::error::ApiError; use crate::sse::SseParser; @@ -252,6 +252,7 @@ impl AnthropicClient { if response.request_id.is_none() { response.request_id = request_id; } + self.record_response_usage(&response); Ok(response) } @@ -420,6 +421,75 @@ impl AnthropicClient { } } + fn record_response_usage(&self, response: &MessageResponse) { + let Some(tracer) = &self.session_tracer else { + return; + }; + + let cost = response.usage.estimated_cost_usd(&response.model); + let pricing_source = if pricing_for_model(&response.model).is_some() { + "model-specific" + } else { + "default-sonnet" + }; + + let mut properties = Map::new(); + properties.insert("model".to_string(), Value::String(response.model.clone())); + properties.insert( + "pricing_source".to_string(), + Value::String(pricing_source.to_string()), + ); + properties.insert( + "input_tokens".to_string(), + Value::from(response.usage.input_tokens), + ); + properties.insert( + "output_tokens".to_string(), + Value::from(response.usage.output_tokens), + ); + properties.insert( + "cache_creation_input_tokens".to_string(), + Value::from(response.usage.cache_creation_input_tokens), + ); + properties.insert( + "cache_read_input_tokens".to_string(), + Value::from(response.usage.cache_read_input_tokens), + ); + properties.insert( + "total_tokens".to_string(), + Value::from(response.usage.total_tokens()), + ); + properties.insert( + "estimated_cost_usd".to_string(), + Value::String(format_usd(cost.total_cost_usd())), + ); + properties.insert( + "estimated_input_cost_usd".to_string(), + Value::String(format_usd(cost.input_cost_usd)), + ); + properties.insert( + "estimated_output_cost_usd".to_string(), + Value::String(format_usd(cost.output_cost_usd)), + ); + properties.insert( + "estimated_cache_creation_cost_usd".to_string(), + Value::String(format_usd(cost.cache_creation_cost_usd)), + ); + properties.insert( + "estimated_cache_read_cost_usd".to_string(), + Value::String(format_usd(cost.cache_read_cost_usd)), + ); + if let Some(request_id) = &response.request_id { + properties.insert("request_id".to_string(), Value::String(request_id.clone())); + } + + tracer.record_analytics(AnalyticsEvent { + namespace: "api".to_string(), + action: "message_usage".to_string(), + properties, + }); + } + fn request_attributes(&self, request: &MessageRequest) -> Map { let mut attributes = Map::new(); attributes.insert("model".to_string(), Value::String(request.model.clone())); diff --git a/rust/crates/api/src/types.rs b/rust/crates/api/src/types.rs index 45d5c08..f05fbe5 100644 --- a/rust/crates/api/src/types.rs +++ b/rust/crates/api/src/types.rs @@ -1,3 +1,4 @@ +use runtime::{pricing_for_model, TokenUsage, UsageCostEstimate}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -150,7 +151,29 @@ pub struct Usage { impl Usage { #[must_use] pub const fn total_tokens(&self) -> u32 { - self.input_tokens + self.output_tokens + self.input_tokens + + self.output_tokens + + self.cache_creation_input_tokens + + self.cache_read_input_tokens + } + + #[must_use] + pub const fn token_usage(&self) -> TokenUsage { + TokenUsage { + input_tokens: self.input_tokens, + output_tokens: self.output_tokens, + cache_creation_input_tokens: self.cache_creation_input_tokens, + cache_read_input_tokens: self.cache_read_input_tokens, + } + } + + #[must_use] + pub fn estimated_cost_usd(&self, model: &str) -> UsageCostEstimate { + let usage = self.token_usage(); + pricing_for_model(model).map_or_else( + || usage.estimate_cost_usd(), + |pricing| usage.estimate_cost_usd_with_pricing(pricing), + ) } } @@ -210,3 +233,47 @@ pub enum StreamEvent { ContentBlockStop(ContentBlockStopEvent), MessageStop(MessageStopEvent), } + +#[cfg(test)] +mod tests { + use runtime::format_usd; + + use super::{MessageResponse, Usage}; + + #[test] + fn usage_total_tokens_includes_cache_tokens() { + let usage = Usage { + input_tokens: 10, + cache_creation_input_tokens: 2, + cache_read_input_tokens: 3, + output_tokens: 4, + }; + + assert_eq!(usage.total_tokens(), 19); + assert_eq!(usage.token_usage().total_tokens(), 19); + } + + #[test] + fn message_response_estimates_cost_from_model_usage() { + let response = MessageResponse { + id: "msg_cost".to_string(), + kind: "message".to_string(), + role: "assistant".to_string(), + content: Vec::new(), + model: "claude-sonnet-4-20250514".to_string(), + stop_reason: Some("end_turn".to_string()), + stop_sequence: None, + usage: Usage { + input_tokens: 1_000_000, + cache_creation_input_tokens: 100_000, + cache_read_input_tokens: 200_000, + output_tokens: 500_000, + }, + request_id: None, + }; + + let cost = response.usage.estimated_cost_usd(&response.model); + assert_eq!(format_usd(cost.total_cost_usd()), "$54.6750"); + assert_eq!(response.total_tokens(), 1_800_000); + } +} diff --git a/rust/crates/api/tests/client_integration.rs b/rust/crates/api/tests/client_integration.rs index 4b78d0e..6bb1c0f 100644 --- a/rust/crates/api/tests/client_integration.rs +++ b/rust/crates/api/tests/client_integration.rs @@ -109,7 +109,7 @@ async fn send_message_applies_request_profile_and_records_telemetry() { "\"model\":\"claude-3-7-sonnet-latest\",", "\"stop_reason\":\"end_turn\",", "\"stop_sequence\":null,", - "\"usage\":{\"input_tokens\":1,\"output_tokens\":1}", + "\"usage\":{\"input_tokens\":1,\"cache_creation_input_tokens\":2,\"cache_read_input_tokens\":3,\"output_tokens\":1}", "}" ), &[("request-id", "req_profile_123")], @@ -155,7 +155,7 @@ async fn send_message_applies_request_profile_and_records_telemetry() { ); let events = sink.events(); - assert_eq!(events.len(), 4); + assert_eq!(events.len(), 6); assert!(matches!( &events[0], TelemetryEvent::HttpRequestStarted { @@ -182,6 +182,19 @@ async fn send_message_applies_request_profile_and_records_telemetry() { &events[3], TelemetryEvent::SessionTrace(trace) if trace.name == "http_request_succeeded" )); + assert!(matches!( + &events[4], + TelemetryEvent::Analytics(event) + if event.namespace == "api" + && event.action == "message_usage" + && event.properties.get("request_id") == Some(&json!("req_profile_123")) + && event.properties.get("total_tokens") == Some(&json!(7)) + && event.properties.get("estimated_cost_usd") == Some(&json!("$0.0001")) + )); + assert!(matches!( + &events[5], + TelemetryEvent::SessionTrace(trace) if trace.name == "analytics" + )); } #[tokio::test] diff --git a/rust/crates/runtime/src/mcp_stdio.rs b/rust/crates/runtime/src/mcp_stdio.rs index 7e67d5d..b72b9dd 100644 --- a/rust/crates/runtime/src/mcp_stdio.rs +++ b/rust/crates/runtime/src/mcp_stdio.rs @@ -1144,8 +1144,20 @@ mod tests { } fn cleanup_script(script_path: &Path) { - fs::remove_file(script_path).expect("cleanup script"); - fs::remove_dir_all(script_path.parent().expect("script parent")).expect("cleanup dir"); + if let Err(error) = fs::remove_file(script_path) { + assert_eq!( + error.kind(), + std::io::ErrorKind::NotFound, + "cleanup script: {error}" + ); + } + if let Err(error) = fs::remove_dir_all(script_path.parent().expect("script parent")) { + assert_eq!( + error.kind(), + std::io::ErrorKind::NotFound, + "cleanup dir: {error}" + ); + } } fn manager_server_config( diff --git a/rust/crates/rusty-claude-cli/src/main.rs b/rust/crates/rusty-claude-cli/src/main.rs index 0a4ea16..41746b3 100644 --- a/rust/crates/rusty-claude-cli/src/main.rs +++ b/rust/crates/rusty-claude-cli/src/main.rs @@ -2154,12 +2154,7 @@ impl ApiClient for AnthropicRuntimeClient { } } ApiStreamEvent::MessageDelta(delta) => { - events.push(AssistantEvent::Usage(TokenUsage { - input_tokens: delta.usage.input_tokens, - output_tokens: delta.usage.output_tokens, - cache_creation_input_tokens: 0, - cache_read_input_tokens: 0, - })); + events.push(AssistantEvent::Usage(delta.usage.token_usage())); } ApiStreamEvent::MessageStop(_) => { saw_stop = true; @@ -2655,12 +2650,7 @@ fn response_to_events( } } - events.push(AssistantEvent::Usage(TokenUsage { - input_tokens: response.usage.input_tokens, - output_tokens: response.usage.output_tokens, - cache_creation_input_tokens: response.usage.cache_creation_input_tokens, - cache_read_input_tokens: response.usage.cache_read_input_tokens, - })); + events.push(AssistantEvent::Usage(response.usage.token_usage())); events.push(AssistantEvent::MessageStop); Ok(events) } diff --git a/rust/crates/tools/src/lib.rs b/rust/crates/tools/src/lib.rs index 8dcd33d..3526d2c 100644 --- a/rust/crates/tools/src/lib.rs +++ b/rust/crates/tools/src/lib.rs @@ -13,7 +13,7 @@ use runtime::{ edit_file, execute_bash, glob_search, grep_search, load_system_prompt, read_file, write_file, ApiClient, ApiRequest, AssistantEvent, BashCommandInput, ContentBlock, ConversationMessage, ConversationRuntime, GrepSearchInput, MessageRole, PermissionMode, PermissionPolicy, - RuntimeError, Session, TokenUsage, ToolError, ToolExecutor, + RuntimeError, Session, ToolError, ToolExecutor, }; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; @@ -1723,12 +1723,7 @@ impl ApiClient for AnthropicRuntimeClient { } } ApiStreamEvent::MessageDelta(delta) => { - events.push(AssistantEvent::Usage(TokenUsage { - input_tokens: delta.usage.input_tokens, - output_tokens: delta.usage.output_tokens, - cache_creation_input_tokens: 0, - cache_read_input_tokens: 0, - })); + events.push(AssistantEvent::Usage(delta.usage.token_usage())); } ApiStreamEvent::MessageStop(_) => { saw_stop = true; @@ -1874,12 +1869,7 @@ fn response_to_events(response: MessageResponse) -> Vec { } } - events.push(AssistantEvent::Usage(TokenUsage { - input_tokens: response.usage.input_tokens, - output_tokens: response.usage.output_tokens, - cache_creation_input_tokens: response.usage.cache_creation_input_tokens, - cache_read_input_tokens: response.usage.cache_read_input_tokens, - })); + events.push(AssistantEvent::Usage(response.usage.token_usage())); events.push(AssistantEvent::MessageStop); events }