diff --git a/rust/crates/api/src/providers/anthropic.rs b/rust/crates/api/src/providers/anthropic.rs index fb2b316..3180169 100644 --- a/rust/crates/api/src/providers/anthropic.rs +++ b/rust/crates/api/src/providers/anthropic.rs @@ -1,4 +1,5 @@ use std::collections::VecDeque; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -22,9 +23,9 @@ use crate::types::{MessageDeltaEvent, MessageRequest, MessageResponse, StreamEve pub const DEFAULT_BASE_URL: &str = "https://api.anthropic.com"; const REQUEST_ID_HEADER: &str = "request-id"; const ALT_REQUEST_ID_HEADER: &str = "x-request-id"; -const DEFAULT_INITIAL_BACKOFF: Duration = Duration::from_millis(200); -const DEFAULT_MAX_BACKOFF: Duration = Duration::from_secs(2); -const DEFAULT_MAX_RETRIES: u32 = 2; +const DEFAULT_INITIAL_BACKOFF: Duration = Duration::from_secs(1); +const DEFAULT_MAX_BACKOFF: Duration = Duration::from_secs(128); +const DEFAULT_MAX_RETRIES: u32 = 8; #[derive(Debug, Clone, PartialEq, Eq)] pub enum AuthSource { @@ -453,7 +454,7 @@ impl AnthropicClient { break; } - tokio::time::sleep(self.backoff_for_attempt(attempts)?).await; + tokio::time::sleep(self.jittered_backoff_for_attempt(attempts)?).await; } Err(ApiError::RetriesExhausted { @@ -569,6 +570,40 @@ impl AnthropicClient { .checked_mul(multiplier) .map_or(self.max_backoff, |delay| delay.min(self.max_backoff))) } + + fn jittered_backoff_for_attempt(&self, attempt: u32) -> Result { + let base = self.backoff_for_attempt(attempt)?; + Ok(base + jitter_for_base(base)) + } +} + +/// Process-wide counter that guarantees distinct jitter samples even when +/// the system clock resolution is coarser than consecutive retry sleeps. +static JITTER_COUNTER: AtomicU64 = AtomicU64::new(0); + +/// Returns a random additive jitter in `[0, base]` to decorrelate retries +/// from multiple concurrent clients. Entropy is drawn from the nanosecond +/// wall clock mixed with a monotonic counter and run through a splitmix64 +/// finalizer; adequate for retry jitter (no cryptographic requirement). +fn jitter_for_base(base: Duration) -> Duration { + let base_nanos = u64::try_from(base.as_nanos()).unwrap_or(u64::MAX); + if base_nanos == 0 { + return Duration::ZERO; + } + let raw_nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|elapsed| u64::try_from(elapsed.as_nanos()).unwrap_or(u64::MAX)) + .unwrap_or(0); + let tick = JITTER_COUNTER.fetch_add(1, Ordering::Relaxed); + // splitmix64 finalizer — mixes the low bits so large bases still see + // jitter across their full range instead of being clamped to subsec nanos. + let mut mixed = raw_nanos.wrapping_add(tick).wrapping_add(0x9E37_79B9_7F4A_7C15); + mixed = (mixed ^ (mixed >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9); + mixed = (mixed ^ (mixed >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB); + mixed ^= mixed >> 31; + // Inclusive upper bound: jitter may equal `base`, matching "up to base". + let jitter_nanos = mixed % base_nanos.saturating_add(1); + Duration::from_nanos(jitter_nanos) } impl AuthSource { @@ -1250,6 +1285,58 @@ mod tests { ); } + #[test] + fn jittered_backoff_stays_within_additive_bounds_and_varies() { + let client = AnthropicClient::new("test-key").with_retry_policy( + 8, + Duration::from_secs(1), + Duration::from_secs(128), + ); + let mut samples = Vec::with_capacity(64); + for _ in 0..64 { + let base = client.backoff_for_attempt(3).expect("base attempt 3"); + let jittered = client + .jittered_backoff_for_attempt(3) + .expect("jittered attempt 3"); + assert!( + jittered >= base, + "jittered delay {jittered:?} must be at least the base {base:?}" + ); + assert!( + jittered <= base * 2, + "jittered delay {jittered:?} must not exceed base*2 {:?}", + base * 2 + ); + samples.push(jittered); + } + let distinct: std::collections::HashSet<_> = samples.iter().collect(); + assert!( + distinct.len() > 1, + "jitter should produce varied delays across samples, got {samples:?}" + ); + } + + #[test] + fn default_retry_policy_matches_exponential_schedule() { + let client = AnthropicClient::new("test-key"); + assert_eq!( + client.backoff_for_attempt(1).expect("attempt 1"), + Duration::from_secs(1) + ); + assert_eq!( + client.backoff_for_attempt(2).expect("attempt 2"), + Duration::from_secs(2) + ); + assert_eq!( + client.backoff_for_attempt(3).expect("attempt 3"), + Duration::from_secs(4) + ); + assert_eq!( + client.backoff_for_attempt(8).expect("attempt 8"), + Duration::from_secs(128) + ); + } + #[test] fn retryable_statuses_are_detected() { assert!(super::is_retryable_status( diff --git a/rust/crates/api/src/providers/openai_compat.rs b/rust/crates/api/src/providers/openai_compat.rs index 0afe4ff..57d9ef2 100644 --- a/rust/crates/api/src/providers/openai_compat.rs +++ b/rust/crates/api/src/providers/openai_compat.rs @@ -1,5 +1,6 @@ use std::collections::{BTreeMap, VecDeque}; -use std::time::Duration; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use serde::Deserialize; use serde_json::{json, Value}; @@ -19,9 +20,9 @@ pub const DEFAULT_XAI_BASE_URL: &str = "https://api.x.ai/v1"; pub const DEFAULT_OPENAI_BASE_URL: &str = "https://api.openai.com/v1"; const REQUEST_ID_HEADER: &str = "request-id"; const ALT_REQUEST_ID_HEADER: &str = "x-request-id"; -const DEFAULT_INITIAL_BACKOFF: Duration = Duration::from_millis(200); -const DEFAULT_MAX_BACKOFF: Duration = Duration::from_secs(2); -const DEFAULT_MAX_RETRIES: u32 = 2; +const DEFAULT_INITIAL_BACKOFF: Duration = Duration::from_secs(1); +const DEFAULT_MAX_BACKOFF: Duration = Duration::from_secs(128); +const DEFAULT_MAX_RETRIES: u32 = 8; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct OpenAiCompatConfig { @@ -191,7 +192,7 @@ impl OpenAiCompatClient { break retryable_error; } - tokio::time::sleep(self.backoff_for_attempt(attempts)?).await; + tokio::time::sleep(self.jittered_backoff_for_attempt(attempts)?).await; }; Err(ApiError::RetriesExhausted { @@ -227,6 +228,37 @@ impl OpenAiCompatClient { .checked_mul(multiplier) .map_or(self.max_backoff, |delay| delay.min(self.max_backoff))) } + + fn jittered_backoff_for_attempt(&self, attempt: u32) -> Result { + let base = self.backoff_for_attempt(attempt)?; + Ok(base + jitter_for_base(base)) + } +} + +/// Process-wide counter that guarantees distinct jitter samples even when +/// the system clock resolution is coarser than consecutive retry sleeps. +static JITTER_COUNTER: AtomicU64 = AtomicU64::new(0); + +/// Returns a random additive jitter in `[0, base]` to decorrelate retries +/// from multiple concurrent clients. Entropy is drawn from the nanosecond +/// wall clock mixed with a monotonic counter and run through a splitmix64 +/// finalizer; adequate for retry jitter (no cryptographic requirement). +fn jitter_for_base(base: Duration) -> Duration { + let base_nanos = u64::try_from(base.as_nanos()).unwrap_or(u64::MAX); + if base_nanos == 0 { + return Duration::ZERO; + } + let raw_nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|elapsed| u64::try_from(elapsed.as_nanos()).unwrap_or(u64::MAX)) + .unwrap_or(0); + let tick = JITTER_COUNTER.fetch_add(1, Ordering::Relaxed); + let mut mixed = raw_nanos.wrapping_add(tick).wrapping_add(0x9E37_79B9_7F4A_7C15); + mixed = (mixed ^ (mixed >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9); + mixed = (mixed ^ (mixed >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB); + mixed ^= mixed >> 31; + let jitter_nanos = mixed % base_nanos.saturating_add(1); + Duration::from_nanos(jitter_nanos) } impl Provider for OpenAiCompatClient { diff --git a/rust/crates/api/tests/client_integration.rs b/rust/crates/api/tests/client_integration.rs index 2f42a79..9d45cc7 100644 --- a/rust/crates/api/tests/client_integration.rs +++ b/rust/crates/api/tests/client_integration.rs @@ -545,6 +545,71 @@ async fn surfaces_retry_exhaustion_for_persistent_retryable_errors() { } } +#[tokio::test] +async fn retries_multiple_retryable_failures_with_exponential_backoff_and_jitter() { + let state = Arc::new(Mutex::new(Vec::::new())); + let server = spawn_server( + state.clone(), + vec![ + http_response( + "429 Too Many Requests", + "application/json", + "{\"type\":\"error\",\"error\":{\"type\":\"rate_limit_error\",\"message\":\"slow down\"}}", + ), + http_response( + "500 Internal Server Error", + "application/json", + "{\"type\":\"error\",\"error\":{\"type\":\"api_error\",\"message\":\"boom\"}}", + ), + http_response( + "503 Service Unavailable", + "application/json", + "{\"type\":\"error\",\"error\":{\"type\":\"overloaded_error\",\"message\":\"busy\"}}", + ), + http_response( + "429 Too Many Requests", + "application/json", + "{\"type\":\"error\",\"error\":{\"type\":\"rate_limit_error\",\"message\":\"slow down again\"}}", + ), + http_response( + "503 Service Unavailable", + "application/json", + "{\"type\":\"error\",\"error\":{\"type\":\"overloaded_error\",\"message\":\"still busy\"}}", + ), + http_response( + "200 OK", + "application/json", + "{\"id\":\"msg_exp_retry\",\"type\":\"message\",\"role\":\"assistant\",\"content\":[{\"type\":\"text\",\"text\":\"Recovered after 5\"}],\"model\":\"claude-3-7-sonnet-latest\",\"stop_reason\":\"end_turn\",\"stop_sequence\":null,\"usage\":{\"input_tokens\":3,\"output_tokens\":2}}", + ), + ], + ) + .await; + + let client = ApiClient::new("test-key") + .with_base_url(server.base_url()) + .with_retry_policy(8, Duration::from_millis(1), Duration::from_millis(4)); + let started_at = std::time::Instant::now(); + + let response = client + .send_message(&sample_request(false)) + .await + .expect("8-retry policy should absorb 5 retryable failures"); + + let elapsed = started_at.elapsed(); + assert_eq!(response.total_tokens(), 5); + assert_eq!( + state.lock().await.len(), + 6, + "client should issue 1 original + 5 retry requests before the 200" + ); + // Jittered sleeps are bounded by 2 * max_backoff per retry (base + jitter), + // so 5 sleeps fit comfortably below this upper bound with generous slack. + assert!( + elapsed < Duration::from_secs(5), + "retries should complete promptly, took {elapsed:?}" + ); +} + #[tokio::test] #[allow(clippy::await_holding_lock)] async fn send_message_reuses_recent_completion_cache_entries() {