diff --git a/rust/Cargo.lock b/rust/Cargo.lock index c64d4d0..33855b7 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -25,6 +25,7 @@ dependencies = [ "runtime", "serde", "serde_json", + "telemetry", "tokio", ] @@ -1107,6 +1108,7 @@ dependencies = [ "serde", "serde_json", "sha2", + "telemetry", "tokio", "walkdir", ] @@ -1440,6 +1442,14 @@ dependencies = [ "yaml-rust", ] +[[package]] +name = "telemetry" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "thiserror" version = "2.0.18" diff --git a/rust/crates/api/src/lib.rs b/rust/crates/api/src/lib.rs index d702c1b..e48510f 100644 --- a/rust/crates/api/src/lib.rs +++ b/rust/crates/api/src/lib.rs @@ -21,3 +21,9 @@ pub use types::{ MessageResponse, MessageStartEvent, MessageStopEvent, OutputContentBlock, StreamEvent, ToolChoice, ToolDefinition, ToolResultContentBlock, Usage, }; + +pub use telemetry::{ + AnalyticsEvent, AnthropicRequestProfile, ClientIdentity, JsonlTelemetrySink, + MemoryTelemetrySink, SessionTraceRecord, SessionTracer, TelemetryEvent, TelemetrySink, + DEFAULT_ANTHROPIC_VERSION, +}; diff --git a/rust/crates/api/src/types.rs b/rust/crates/api/src/types.rs index c060be6..ec70d40 100644 --- a/rust/crates/api/src/types.rs +++ b/rust/crates/api/src/types.rs @@ -1,3 +1,4 @@ +use runtime::{pricing_for_model, TokenUsage, UsageCostEstimate}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -159,7 +160,29 @@ pub struct Usage { impl Usage { #[must_use] pub const fn total_tokens(&self) -> u32 { - self.input_tokens + self.output_tokens + self.input_tokens + + self.output_tokens + + self.cache_creation_input_tokens + + self.cache_read_input_tokens + } + + #[must_use] + pub const fn token_usage(&self) -> TokenUsage { + TokenUsage { + input_tokens: self.input_tokens, + output_tokens: self.output_tokens, + cache_creation_input_tokens: self.cache_creation_input_tokens, + cache_read_input_tokens: self.cache_read_input_tokens, + } + } + + #[must_use] + pub fn estimated_cost_usd(&self, model: &str) -> UsageCostEstimate { + let usage = self.token_usage(); + pricing_for_model(model).map_or_else( + || usage.estimate_cost_usd(), + |pricing| usage.estimate_cost_usd_with_pricing(pricing), + ) } } @@ -221,3 +244,47 @@ pub enum StreamEvent { ContentBlockStop(ContentBlockStopEvent), MessageStop(MessageStopEvent), } + +#[cfg(test)] +mod tests { + use runtime::format_usd; + + use super::{MessageResponse, Usage}; + + #[test] + fn usage_total_tokens_includes_cache_tokens() { + let usage = Usage { + input_tokens: 10, + cache_creation_input_tokens: 2, + cache_read_input_tokens: 3, + output_tokens: 4, + }; + + assert_eq!(usage.total_tokens(), 19); + assert_eq!(usage.token_usage().total_tokens(), 19); + } + + #[test] + fn message_response_estimates_cost_from_model_usage() { + let response = MessageResponse { + id: "msg_cost".to_string(), + kind: "message".to_string(), + role: "assistant".to_string(), + content: Vec::new(), + model: "claude-sonnet-4-20250514".to_string(), + stop_reason: Some("end_turn".to_string()), + stop_sequence: None, + usage: Usage { + input_tokens: 1_000_000, + cache_creation_input_tokens: 100_000, + cache_read_input_tokens: 200_000, + output_tokens: 500_000, + }, + request_id: None, + }; + + let cost = response.usage.estimated_cost_usd(&response.model); + assert_eq!(format_usd(cost.total_cost_usd()), "$54.6750"); + assert_eq!(response.total_tokens(), 1_800_000); + } +} diff --git a/rust/crates/api/tests/client_integration.rs b/rust/crates/api/tests/client_integration.rs index dcaf4f6..bdccbdd 100644 --- a/rust/crates/api/tests/client_integration.rs +++ b/rust/crates/api/tests/client_integration.rs @@ -8,6 +8,7 @@ use api::{ OutputContentBlock, ProviderClient, StreamEvent, ToolChoice, ToolDefinition, }; use serde_json::json; +use telemetry::{ClientIdentity, MemoryTelemetrySink, SessionTracer, TelemetryEvent}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; use tokio::sync::Mutex; @@ -64,6 +65,18 @@ async fn send_message_posts_json_and_parses_response() { request.headers.get("authorization").map(String::as_str), Some("Bearer proxy-token") ); + assert_eq!( + request.headers.get("anthropic-version").map(String::as_str), + Some("2023-06-01") + ); + assert_eq!( + request.headers.get("user-agent").map(String::as_str), + Some("claude-code/0.1.0") + ); + assert_eq!( + request.headers.get("anthropic-beta").map(String::as_str), + Some("claude-code-20250219,prompt-caching-scope-2026-01-05") + ); let body: serde_json::Value = serde_json::from_str(&request.body).expect("request body should be json"); assert_eq!( @@ -73,6 +86,115 @@ async fn send_message_posts_json_and_parses_response() { assert!(body.get("stream").is_none()); assert_eq!(body["tools"][0]["name"], json!("get_weather")); assert_eq!(body["tool_choice"]["type"], json!("auto")); + assert_eq!( + body["betas"], + json!(["claude-code-20250219", "prompt-caching-scope-2026-01-05"]) + ); +} + +#[tokio::test] +async fn send_message_applies_request_profile_and_records_telemetry() { + let state = Arc::new(Mutex::new(Vec::::new())); + let server = spawn_server( + state.clone(), + vec![http_response_with_headers( + "200 OK", + "application/json", + concat!( + "{", + "\"id\":\"msg_profile\",", + "\"type\":\"message\",", + "\"role\":\"assistant\",", + "\"content\":[{\"type\":\"text\",\"text\":\"ok\"}],", + "\"model\":\"claude-3-7-sonnet-latest\",", + "\"stop_reason\":\"end_turn\",", + "\"stop_sequence\":null,", + "\"usage\":{\"input_tokens\":1,\"cache_creation_input_tokens\":2,\"cache_read_input_tokens\":3,\"output_tokens\":1}", + "}" + ), + &[("request-id", "req_profile_123")], + )], + ) + .await; + let sink = Arc::new(MemoryTelemetrySink::default()); + + let client = AnthropicClient::new("test-key") + .with_base_url(server.base_url()) + .with_client_identity(ClientIdentity::new("claude-code", "9.9.9").with_runtime("rust-cli")) + .with_beta("tools-2026-04-01") + .with_extra_body_param("metadata", json!({"source": "clawd-code"})) + .with_session_tracer(SessionTracer::new("session-telemetry", sink.clone())); + + let response = client + .send_message(&sample_request(false)) + .await + .expect("request should succeed"); + + assert_eq!(response.request_id.as_deref(), Some("req_profile_123")); + + let captured = state.lock().await; + let request = captured.first().expect("server should capture request"); + assert_eq!( + request.headers.get("anthropic-beta").map(String::as_str), + Some("claude-code-20250219,prompt-caching-scope-2026-01-05,tools-2026-04-01") + ); + assert_eq!( + request.headers.get("user-agent").map(String::as_str), + Some("claude-code/9.9.9") + ); + let body: serde_json::Value = + serde_json::from_str(&request.body).expect("request body should be json"); + assert_eq!(body["metadata"]["source"], json!("clawd-code")); + assert_eq!( + body["betas"], + json!([ + "claude-code-20250219", + "prompt-caching-scope-2026-01-05", + "tools-2026-04-01" + ]) + ); + + let events = sink.events(); + assert_eq!(events.len(), 6); + assert!(matches!( + &events[0], + TelemetryEvent::HttpRequestStarted { + session_id, + attempt: 1, + method, + path, + .. + } if session_id == "session-telemetry" && method == "POST" && path == "/v1/messages" + )); + assert!(matches!( + &events[1], + TelemetryEvent::SessionTrace(trace) if trace.name == "http_request_started" + )); + assert!(matches!( + &events[2], + TelemetryEvent::HttpRequestSucceeded { + request_id, + status: 200, + .. + } if request_id.as_deref() == Some("req_profile_123") + )); + assert!(matches!( + &events[3], + TelemetryEvent::SessionTrace(trace) if trace.name == "http_request_succeeded" + )); + assert!(matches!( + &events[4], + TelemetryEvent::Analytics(event) + if event.namespace == "api" + && event.action == "message_usage" + && event.properties.get("request_id") == Some(&json!("req_profile_123")) + && event.properties.get("total_tokens") == Some(&json!(7)) + && event.properties.get("estimated_cost_usd") == Some(&json!("$0.0001")) + )); + assert!(matches!( + &events[5], + TelemetryEvent::SessionTrace(trace) if trace.name == "analytics" + )); } #[tokio::test] diff --git a/rust/crates/runtime/src/conversation.rs b/rust/crates/runtime/src/conversation.rs index 15f3262..14a8133 100644 --- a/rust/crates/runtime/src/conversation.rs +++ b/rust/crates/runtime/src/conversation.rs @@ -1,6 +1,9 @@ use std::collections::BTreeMap; use std::fmt::{Display, Formatter}; +use serde_json::{Map, Value}; +use telemetry::SessionTracer; + use crate::compact::{ compact_session, estimate_session_tokens, CompactionConfig, CompactionResult, }; @@ -132,7 +135,7 @@ where tool_executor, permission_policy, system_prompt, - RuntimeFeatureConfig::default(), + &RuntimeFeatureConfig::default(), ) } @@ -144,7 +147,7 @@ where tool_executor: T, permission_policy: PermissionPolicy, system_prompt: Vec, - feature_config: RuntimeFeatureConfig, + feature_config: &RuntimeFeatureConfig, ) -> Self { let usage_tracker = UsageTracker::from_session(&session); Self { @@ -266,6 +269,8 @@ where user_input: impl Into, mut prompter: Option<&mut dyn PermissionPrompter>, ) -> Result { + let user_input = user_input.into(); + self.record_turn_started(&user_input); self.session .push_user_text(user_input.into()) .map_err(|error| RuntimeError::new(error.to_string()))?; @@ -277,17 +282,31 @@ where loop { iterations += 1; if iterations > self.max_iterations { - return Err(RuntimeError::new( + let error = RuntimeError::new( "conversation loop exceeded the maximum number of iterations", - )); + ); + self.record_turn_failed(iterations, &error); + return Err(error); } let request = ApiRequest { system_prompt: self.system_prompt.clone(), messages: self.session.messages.clone(), }; - let events = self.api_client.stream(request)?; - let (assistant_message, usage) = build_assistant_message(events)?; + let events = match self.api_client.stream(request) { + Ok(events) => events, + Err(error) => { + self.record_turn_failed(iterations, &error); + return Err(error); + } + }; + let (assistant_message, usage) = match build_assistant_message(events) { + Ok(result) => result, + Err(error) => { + self.record_turn_failed(iterations, &error); + return Err(error); + } + }; if let Some(usage) = usage { self.usage_tracker.record(usage); } @@ -301,6 +320,11 @@ where _ => None, }) .collect::>(); + self.record_assistant_iteration( + iterations, + &assistant_message, + pending_tool_uses.len(), + ); self.session .push_message(assistant_message.clone()) @@ -720,6 +744,39 @@ mod tests { )); } + #[test] + fn records_runtime_session_trace_events() { + let sink = Arc::new(MemoryTelemetrySink::default()); + let tracer = SessionTracer::new("session-runtime", sink.clone()); + let mut runtime = ConversationRuntime::new( + Session::new(), + ScriptedApiClient { call_count: 0 }, + StaticToolExecutor::new().register("add", |_input| Ok("4".to_string())), + PermissionPolicy::new(PermissionMode::WorkspaceWrite), + vec!["system".to_string()], + ) + .with_session_tracer(tracer); + + runtime + .run_turn("what is 2 + 2?", Some(&mut PromptAllowOnce)) + .expect("conversation loop should succeed"); + + let events = sink.events(); + let trace_names = events + .iter() + .filter_map(|event| match event { + TelemetryEvent::SessionTrace(trace) => Some(trace.name.as_str()), + _ => None, + }) + .collect::>(); + + assert!(trace_names.contains(&"turn_started")); + assert!(trace_names.contains(&"assistant_iteration_completed")); + assert!(trace_names.contains(&"tool_execution_started")); + assert!(trace_names.contains(&"tool_execution_finished")); + assert!(trace_names.contains(&"turn_completed")); + } + #[test] fn records_denied_tool_results_when_prompt_rejects() { struct RejectPrompter; @@ -808,7 +865,7 @@ mod tests { }), PermissionPolicy::new(PermissionMode::DangerFullAccess), vec!["system".to_string()], - RuntimeFeatureConfig::default().with_hooks(RuntimeHookConfig::new( + &RuntimeFeatureConfig::default().with_hooks(RuntimeHookConfig::new( vec![shell_snippet("printf 'blocked by hook'; exit 2")], Vec::new(), Vec::new(), @@ -875,7 +932,7 @@ mod tests { StaticToolExecutor::new().register("add", |_input| Ok("4".to_string())), PermissionPolicy::new(PermissionMode::DangerFullAccess), vec!["system".to_string()], - RuntimeFeatureConfig::default().with_hooks(RuntimeHookConfig::new( + &RuntimeFeatureConfig::default().with_hooks(RuntimeHookConfig::new( vec![shell_snippet("printf 'pre hook ran'")], vec![shell_snippet("printf 'post hook ran'")], Vec::new(), diff --git a/rust/crates/runtime/src/hooks.rs b/rust/crates/runtime/src/hooks.rs index 739065d..77f8471 100644 --- a/rust/crates/runtime/src/hooks.rs +++ b/rust/crates/runtime/src/hooks.rs @@ -476,6 +476,15 @@ impl HookRunner { } } +struct HookInvocation<'a> { + event: HookEvent, + tool_name: &'a str, + tool_input: &'a str, + tool_output: Option<&'a str>, + is_error: bool, + payload: &'a str, +} + enum HookCommandOutcome { Allow { parsed: ParsedHookOutput }, Deny { parsed: ParsedHookOutput }, diff --git a/rust/crates/runtime/src/mcp_stdio.rs b/rust/crates/runtime/src/mcp_stdio.rs index 7e67d5d..b72b9dd 100644 --- a/rust/crates/runtime/src/mcp_stdio.rs +++ b/rust/crates/runtime/src/mcp_stdio.rs @@ -1144,8 +1144,20 @@ mod tests { } fn cleanup_script(script_path: &Path) { - fs::remove_file(script_path).expect("cleanup script"); - fs::remove_dir_all(script_path.parent().expect("script parent")).expect("cleanup dir"); + if let Err(error) = fs::remove_file(script_path) { + assert_eq!( + error.kind(), + std::io::ErrorKind::NotFound, + "cleanup script: {error}" + ); + } + if let Err(error) = fs::remove_dir_all(script_path.parent().expect("script parent")) { + assert_eq!( + error.kind(), + std::io::ErrorKind::NotFound, + "cleanup dir: {error}" + ); + } } fn manager_server_config( diff --git a/rust/crates/rusty-claude-cli/src/main.rs b/rust/crates/rusty-claude-cli/src/main.rs index e6d24ca..68c849f 100644 --- a/rust/crates/rusty-claude-cli/src/main.rs +++ b/rust/crates/rusty-claude-cli/src/main.rs @@ -17,8 +17,9 @@ use std::time::{Duration, Instant, UNIX_EPOCH}; use api::{ resolve_startup_auth_source, AnthropicClient, AuthSource, ContentBlockDelta, InputContentBlock, - InputMessage, MessageRequest, MessageResponse, OutputContentBlock, - StreamEvent as ApiStreamEvent, ToolChoice, ToolDefinition, ToolResultContentBlock, + InputMessage, JsonlTelemetrySink, MessageRequest, MessageResponse, OutputContentBlock, + SessionTracer, StreamEvent as ApiStreamEvent, ToolChoice, ToolDefinition, + ToolResultContentBlock, }; use commands::{ @@ -51,6 +52,7 @@ fn max_tokens_for_model(model: &str) -> u32 { } const DEFAULT_DATE: &str = "2026-03-31"; const DEFAULT_OAUTH_CALLBACK_PORT: u16 = 4545; +const TELEMETRY_LOG_PATH_ENV: &str = "CLAW_TELEMETRY_LOG_PATH"; const VERSION: &str = env!("CARGO_PKG_VERSION"); const BUILD_TARGET: Option<&str> = option_env!("TARGET"); const GIT_SHA: Option<&str> = option_env!("GIT_SHA"); @@ -1489,6 +1491,7 @@ impl LiveCli { let message_count = session.messages.len(); self.runtime = build_runtime( session, + &self.session.id, model.clone(), self.system_prompt.clone(), true, @@ -1533,6 +1536,7 @@ impl LiveCli { self.permission_mode = permission_mode_from_label(normalized); self.runtime = build_runtime( session, + &self.session.id, self.model.clone(), self.system_prompt.clone(), true, @@ -1597,6 +1601,7 @@ impl LiveCli { let session_id = session.session_id.clone(); self.runtime = build_runtime( session, + &handle.id, self.model.clone(), self.system_prompt.clone(), true, @@ -1686,6 +1691,7 @@ impl LiveCli { let session_id = session.session_id.clone(); self.runtime = build_runtime( session, + &handle.id, self.model.clone(), self.system_prompt.clone(), true, @@ -1785,6 +1791,7 @@ impl LiveCli { let skipped = removed == 0; self.runtime = build_runtime( result.compacted_session, + &self.session.id, self.model.clone(), self.system_prompt.clone(), true, @@ -3127,6 +3134,7 @@ fn describe_tool_progress(name: &str, input: &str) -> String { #[allow(clippy::too_many_arguments)] fn build_runtime( session: Session, + session_id: &str, model: String, system_prompt: Vec, enable_tools: bool, @@ -3271,6 +3279,11 @@ impl AnthropicRuntimeClient { progress_reporter, }) } + + fn with_session_tracer(mut self, session_tracer: SessionTracer) -> Self { + self.client = self.client.with_session_tracer(session_tracer); + self + } } fn resolve_cli_auth_source() -> Result> { @@ -3380,12 +3393,7 @@ impl ApiClient for AnthropicRuntimeClient { } } ApiStreamEvent::MessageDelta(delta) => { - events.push(AssistantEvent::Usage(TokenUsage { - input_tokens: delta.usage.input_tokens, - output_tokens: delta.usage.output_tokens, - cache_creation_input_tokens: 0, - cache_read_input_tokens: 0, - })); + events.push(AssistantEvent::Usage(delta.usage.token_usage())); } ApiStreamEvent::MessageStop(_) => { saw_stop = true; @@ -3977,12 +3985,7 @@ fn response_to_events( } } - events.push(AssistantEvent::Usage(TokenUsage { - input_tokens: response.usage.input_tokens, - output_tokens: response.usage.output_tokens, - cache_creation_input_tokens: response.usage.cache_creation_input_tokens, - cache_read_input_tokens: response.usage.cache_read_input_tokens, - })); + events.push(AssistantEvent::Usage(response.usage.token_usage())); events.push(AssistantEvent::MessageStop); Ok(events) } diff --git a/rust/crates/telemetry/Cargo.toml b/rust/crates/telemetry/Cargo.toml new file mode 100644 index 0000000..d501850 --- /dev/null +++ b/rust/crates/telemetry/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "telemetry" +version.workspace = true +edition.workspace = true +license.workspace = true +publish.workspace = true + +[dependencies] +serde = { version = "1", features = ["derive"] } +serde_json = "1" + +[lints] +workspace = true diff --git a/rust/crates/telemetry/src/lib.rs b/rust/crates/telemetry/src/lib.rs new file mode 100644 index 0000000..6e369e1 --- /dev/null +++ b/rust/crates/telemetry/src/lib.rs @@ -0,0 +1,526 @@ +use std::fmt::{Debug, Formatter}; +use std::fs::{File, OpenOptions}; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; + +pub const DEFAULT_ANTHROPIC_VERSION: &str = "2023-06-01"; +pub const DEFAULT_APP_NAME: &str = "claude-code"; +pub const DEFAULT_RUNTIME: &str = "rust"; +pub const DEFAULT_AGENTIC_BETA: &str = "claude-code-20250219"; +pub const DEFAULT_PROMPT_CACHING_SCOPE_BETA: &str = "prompt-caching-scope-2026-01-05"; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ClientIdentity { + pub app_name: String, + pub app_version: String, + pub runtime: String, +} + +impl ClientIdentity { + #[must_use] + pub fn new(app_name: impl Into, app_version: impl Into) -> Self { + Self { + app_name: app_name.into(), + app_version: app_version.into(), + runtime: DEFAULT_RUNTIME.to_string(), + } + } + + #[must_use] + pub fn with_runtime(mut self, runtime: impl Into) -> Self { + self.runtime = runtime.into(); + self + } + + #[must_use] + pub fn user_agent(&self) -> String { + format!("{}/{}", self.app_name, self.app_version) + } +} + +impl Default for ClientIdentity { + fn default() -> Self { + Self::new(DEFAULT_APP_NAME, env!("CARGO_PKG_VERSION")) + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct AnthropicRequestProfile { + pub anthropic_version: String, + pub client_identity: ClientIdentity, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub betas: Vec, + #[serde(default, skip_serializing_if = "Map::is_empty")] + pub extra_body: Map, +} + +impl AnthropicRequestProfile { + #[must_use] + pub fn new(client_identity: ClientIdentity) -> Self { + Self { + anthropic_version: DEFAULT_ANTHROPIC_VERSION.to_string(), + client_identity, + betas: vec![ + DEFAULT_AGENTIC_BETA.to_string(), + DEFAULT_PROMPT_CACHING_SCOPE_BETA.to_string(), + ], + extra_body: Map::new(), + } + } + + #[must_use] + pub fn with_beta(mut self, beta: impl Into) -> Self { + let beta = beta.into(); + if !self.betas.contains(&beta) { + self.betas.push(beta); + } + self + } + + #[must_use] + pub fn with_extra_body(mut self, key: impl Into, value: Value) -> Self { + self.extra_body.insert(key.into(), value); + self + } + + #[must_use] + pub fn header_pairs(&self) -> Vec<(String, String)> { + let mut headers = vec![ + ( + "anthropic-version".to_string(), + self.anthropic_version.clone(), + ), + ("user-agent".to_string(), self.client_identity.user_agent()), + ]; + if !self.betas.is_empty() { + headers.push(("anthropic-beta".to_string(), self.betas.join(","))); + } + headers + } + + pub fn render_json_body(&self, request: &T) -> Result { + let mut body = serde_json::to_value(request)?; + let object = body.as_object_mut().ok_or_else(|| { + serde_json::Error::io(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "request body must serialize to a JSON object", + )) + })?; + for (key, value) in &self.extra_body { + object.insert(key.clone(), value.clone()); + } + if !self.betas.is_empty() { + object.insert( + "betas".to_string(), + Value::Array(self.betas.iter().cloned().map(Value::String).collect()), + ); + } + Ok(body) + } +} + +impl Default for AnthropicRequestProfile { + fn default() -> Self { + Self::new(ClientIdentity::default()) + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct AnalyticsEvent { + pub namespace: String, + pub action: String, + #[serde(default, skip_serializing_if = "Map::is_empty")] + pub properties: Map, +} + +impl AnalyticsEvent { + #[must_use] + pub fn new(namespace: impl Into, action: impl Into) -> Self { + Self { + namespace: namespace.into(), + action: action.into(), + properties: Map::new(), + } + } + + #[must_use] + pub fn with_property(mut self, key: impl Into, value: Value) -> Self { + self.properties.insert(key.into(), value); + self + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct SessionTraceRecord { + pub session_id: String, + pub sequence: u64, + pub name: String, + pub timestamp_ms: u64, + #[serde(default, skip_serializing_if = "Map::is_empty")] + pub attributes: Map, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum TelemetryEvent { + HttpRequestStarted { + session_id: String, + attempt: u32, + method: String, + path: String, + #[serde(default, skip_serializing_if = "Map::is_empty")] + attributes: Map, + }, + HttpRequestSucceeded { + session_id: String, + attempt: u32, + method: String, + path: String, + status: u16, + #[serde(default, skip_serializing_if = "Option::is_none")] + request_id: Option, + #[serde(default, skip_serializing_if = "Map::is_empty")] + attributes: Map, + }, + HttpRequestFailed { + session_id: String, + attempt: u32, + method: String, + path: String, + error: String, + retryable: bool, + #[serde(default, skip_serializing_if = "Map::is_empty")] + attributes: Map, + }, + Analytics(AnalyticsEvent), + SessionTrace(SessionTraceRecord), +} + +pub trait TelemetrySink: Send + Sync { + fn record(&self, event: TelemetryEvent); +} + +#[derive(Default)] +pub struct MemoryTelemetrySink { + events: Mutex>, +} + +impl MemoryTelemetrySink { + #[must_use] + pub fn events(&self) -> Vec { + self.events + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .clone() + } +} + +impl TelemetrySink for MemoryTelemetrySink { + fn record(&self, event: TelemetryEvent) { + self.events + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .push(event); + } +} + +pub struct JsonlTelemetrySink { + path: PathBuf, + file: Mutex, +} + +impl Debug for JsonlTelemetrySink { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("JsonlTelemetrySink") + .field("path", &self.path) + .finish_non_exhaustive() + } +} + +impl JsonlTelemetrySink { + pub fn new(path: impl AsRef) -> Result { + let path = path.as_ref().to_path_buf(); + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + let file = OpenOptions::new().create(true).append(true).open(&path)?; + Ok(Self { + path, + file: Mutex::new(file), + }) + } + + #[must_use] + pub fn path(&self) -> &Path { + &self.path + } +} + +impl TelemetrySink for JsonlTelemetrySink { + fn record(&self, event: TelemetryEvent) { + let Ok(line) = serde_json::to_string(&event) else { + return; + }; + let mut file = self + .file + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let _ = writeln!(file, "{line}"); + let _ = file.flush(); + } +} + +#[derive(Clone)] +pub struct SessionTracer { + session_id: String, + sequence: Arc, + sink: Arc, +} + +impl Debug for SessionTracer { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SessionTracer") + .field("session_id", &self.session_id) + .finish_non_exhaustive() + } +} + +impl SessionTracer { + #[must_use] + pub fn new(session_id: impl Into, sink: Arc) -> Self { + Self { + session_id: session_id.into(), + sequence: Arc::new(AtomicU64::new(0)), + sink, + } + } + + #[must_use] + pub fn session_id(&self) -> &str { + &self.session_id + } + + pub fn record(&self, name: impl Into, attributes: Map) { + let record = SessionTraceRecord { + session_id: self.session_id.clone(), + sequence: self.sequence.fetch_add(1, Ordering::Relaxed), + name: name.into(), + timestamp_ms: current_timestamp_ms(), + attributes, + }; + self.sink.record(TelemetryEvent::SessionTrace(record)); + } + + pub fn record_http_request_started( + &self, + attempt: u32, + method: impl Into, + path: impl Into, + attributes: Map, + ) { + let method = method.into(); + let path = path.into(); + self.sink.record(TelemetryEvent::HttpRequestStarted { + session_id: self.session_id.clone(), + attempt, + method: method.clone(), + path: path.clone(), + attributes: attributes.clone(), + }); + self.record( + "http_request_started", + merge_trace_fields(method, path, attempt, attributes), + ); + } + + pub fn record_http_request_succeeded( + &self, + attempt: u32, + method: impl Into, + path: impl Into, + status: u16, + request_id: Option, + attributes: Map, + ) { + let method = method.into(); + let path = path.into(); + self.sink.record(TelemetryEvent::HttpRequestSucceeded { + session_id: self.session_id.clone(), + attempt, + method: method.clone(), + path: path.clone(), + status, + request_id: request_id.clone(), + attributes: attributes.clone(), + }); + let mut trace_attributes = merge_trace_fields(method, path, attempt, attributes); + trace_attributes.insert("status".to_string(), Value::from(status)); + if let Some(request_id) = request_id { + trace_attributes.insert("request_id".to_string(), Value::String(request_id)); + } + self.record("http_request_succeeded", trace_attributes); + } + + pub fn record_http_request_failed( + &self, + attempt: u32, + method: impl Into, + path: impl Into, + error: impl Into, + retryable: bool, + attributes: Map, + ) { + let method = method.into(); + let path = path.into(); + let error = error.into(); + self.sink.record(TelemetryEvent::HttpRequestFailed { + session_id: self.session_id.clone(), + attempt, + method: method.clone(), + path: path.clone(), + error: error.clone(), + retryable, + attributes: attributes.clone(), + }); + let mut trace_attributes = merge_trace_fields(method, path, attempt, attributes); + trace_attributes.insert("error".to_string(), Value::String(error)); + trace_attributes.insert("retryable".to_string(), Value::Bool(retryable)); + self.record("http_request_failed", trace_attributes); + } + + pub fn record_analytics(&self, event: AnalyticsEvent) { + let mut attributes = event.properties.clone(); + attributes.insert( + "namespace".to_string(), + Value::String(event.namespace.clone()), + ); + attributes.insert("action".to_string(), Value::String(event.action.clone())); + self.sink.record(TelemetryEvent::Analytics(event)); + self.record("analytics", attributes); + } +} + +fn merge_trace_fields( + method: String, + path: String, + attempt: u32, + mut attributes: Map, +) -> Map { + attributes.insert("method".to_string(), Value::String(method)); + attributes.insert("path".to_string(), Value::String(path)); + attributes.insert("attempt".to_string(), Value::from(attempt)); + attributes +} + +fn current_timestamp_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() + .try_into() + .unwrap_or(u64::MAX) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn request_profile_emits_headers_and_merges_body() { + let profile = AnthropicRequestProfile::new( + ClientIdentity::new("claude-code", "1.2.3").with_runtime("rust-cli"), + ) + .with_beta("tools-2026-04-01") + .with_extra_body("metadata", serde_json::json!({"source": "test"})); + + assert_eq!( + profile.header_pairs(), + vec![ + ( + "anthropic-version".to_string(), + DEFAULT_ANTHROPIC_VERSION.to_string() + ), + ("user-agent".to_string(), "claude-code/1.2.3".to_string()), + ( + "anthropic-beta".to_string(), + "claude-code-20250219,prompt-caching-scope-2026-01-05,tools-2026-04-01" + .to_string(), + ), + ] + ); + + let body = profile + .render_json_body(&serde_json::json!({"model": "claude-sonnet"})) + .expect("body should serialize"); + assert_eq!( + body["metadata"]["source"], + Value::String("test".to_string()) + ); + assert_eq!( + body["betas"], + serde_json::json!([ + "claude-code-20250219", + "prompt-caching-scope-2026-01-05", + "tools-2026-04-01" + ]) + ); + } + + #[test] + fn session_tracer_records_structured_events_and_trace_sequence() { + let sink = Arc::new(MemoryTelemetrySink::default()); + let tracer = SessionTracer::new("session-123", sink.clone()); + + tracer.record_http_request_started(1, "POST", "/v1/messages", Map::new()); + tracer.record_analytics( + AnalyticsEvent::new("cli", "prompt_sent") + .with_property("model", Value::String("claude-opus".to_string())), + ); + + let events = sink.events(); + assert!(matches!( + &events[0], + TelemetryEvent::HttpRequestStarted { + session_id, + attempt: 1, + method, + path, + .. + } if session_id == "session-123" && method == "POST" && path == "/v1/messages" + )); + assert!(matches!( + &events[1], + TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 0, name, .. }) + if name == "http_request_started" + )); + assert!(matches!(&events[2], TelemetryEvent::Analytics(_))); + assert!(matches!( + &events[3], + TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 1, name, .. }) + if name == "analytics" + )); + } + + #[test] + fn jsonl_sink_persists_events() { + let path = + std::env::temp_dir().join(format!("telemetry-jsonl-{}.log", current_timestamp_ms())); + let sink = JsonlTelemetrySink::new(&path).expect("sink should create file"); + + sink.record(TelemetryEvent::Analytics( + AnalyticsEvent::new("cli", "turn_completed").with_property("ok", Value::Bool(true)), + )); + + let contents = std::fs::read_to_string(&path).expect("telemetry log should be readable"); + assert!(contents.contains("\"type\":\"analytics\"")); + assert!(contents.contains("\"action\":\"turn_completed\"")); + + let _ = std::fs::remove_file(path); + } +} diff --git a/rust/crates/tools/src/lib.rs b/rust/crates/tools/src/lib.rs index 3e970cd..b4acf0a 100644 --- a/rust/crates/tools/src/lib.rs +++ b/rust/crates/tools/src/lib.rs @@ -14,7 +14,7 @@ use runtime::{ edit_file, execute_bash, glob_search, grep_search, load_system_prompt, read_file, write_file, ApiClient, ApiRequest, AssistantEvent, BashCommandInput, ContentBlock, ConversationMessage, ConversationRuntime, GrepSearchInput, MessageRole, PermissionMode, PermissionPolicy, - RuntimeError, Session, TokenUsage, ToolError, ToolExecutor, + RuntimeError, Session, ToolError, ToolExecutor, }; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; @@ -1891,12 +1891,7 @@ impl ApiClient for ProviderRuntimeClient { } } ApiStreamEvent::MessageDelta(delta) => { - events.push(AssistantEvent::Usage(TokenUsage { - input_tokens: delta.usage.input_tokens, - output_tokens: delta.usage.output_tokens, - cache_creation_input_tokens: 0, - cache_read_input_tokens: 0, - })); + events.push(AssistantEvent::Usage(delta.usage.token_usage())); } ApiStreamEvent::MessageStop(_) => { saw_stop = true; @@ -2045,12 +2040,7 @@ fn response_to_events(response: MessageResponse) -> Vec { } } - events.push(AssistantEvent::Usage(TokenUsage { - input_tokens: response.usage.input_tokens, - output_tokens: response.usage.output_tokens, - cache_creation_input_tokens: response.usage.cache_creation_input_tokens, - cache_read_input_tokens: response.usage.cache_read_input_tokens, - })); + events.push(AssistantEvent::Usage(response.usage.token_usage())); events.push(AssistantEvent::MessageStop); events }