feat: b5-slash-help — batch 5 upstream parity

This commit is contained in:
YeonGyu-Kim
2026-04-07 14:51:12 +09:00
parent d509f16b5a
commit 5bcbc86a2b
3 changed files with 193 additions and 9 deletions

View File

@@ -1,4 +1,5 @@
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -22,9 +23,9 @@ use crate::types::{MessageDeltaEvent, MessageRequest, MessageResponse, StreamEve
pub const DEFAULT_BASE_URL: &str = "https://api.anthropic.com";
const REQUEST_ID_HEADER: &str = "request-id";
const ALT_REQUEST_ID_HEADER: &str = "x-request-id";
const DEFAULT_INITIAL_BACKOFF: Duration = Duration::from_millis(200);
const DEFAULT_MAX_BACKOFF: Duration = Duration::from_secs(2);
const DEFAULT_MAX_RETRIES: u32 = 2;
const DEFAULT_INITIAL_BACKOFF: Duration = Duration::from_secs(1);
const DEFAULT_MAX_BACKOFF: Duration = Duration::from_secs(128);
const DEFAULT_MAX_RETRIES: u32 = 8;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AuthSource {
@@ -453,7 +454,7 @@ impl AnthropicClient {
break;
}
tokio::time::sleep(self.backoff_for_attempt(attempts)?).await;
tokio::time::sleep(self.jittered_backoff_for_attempt(attempts)?).await;
}
Err(ApiError::RetriesExhausted {
@@ -569,6 +570,40 @@ impl AnthropicClient {
.checked_mul(multiplier)
.map_or(self.max_backoff, |delay| delay.min(self.max_backoff)))
}
fn jittered_backoff_for_attempt(&self, attempt: u32) -> Result<Duration, ApiError> {
let base = self.backoff_for_attempt(attempt)?;
Ok(base + jitter_for_base(base))
}
}
/// Process-wide counter that guarantees distinct jitter samples even when
/// the system clock resolution is coarser than consecutive retry sleeps.
static JITTER_COUNTER: AtomicU64 = AtomicU64::new(0);
/// Returns a random additive jitter in `[0, base]` to decorrelate retries
/// from multiple concurrent clients. Entropy is drawn from the nanosecond
/// wall clock mixed with a monotonic counter and run through a splitmix64
/// finalizer; adequate for retry jitter (no cryptographic requirement).
fn jitter_for_base(base: Duration) -> Duration {
let base_nanos = u64::try_from(base.as_nanos()).unwrap_or(u64::MAX);
if base_nanos == 0 {
return Duration::ZERO;
}
let raw_nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|elapsed| u64::try_from(elapsed.as_nanos()).unwrap_or(u64::MAX))
.unwrap_or(0);
let tick = JITTER_COUNTER.fetch_add(1, Ordering::Relaxed);
// splitmix64 finalizer — mixes the low bits so large bases still see
// jitter across their full range instead of being clamped to subsec nanos.
let mut mixed = raw_nanos.wrapping_add(tick).wrapping_add(0x9E37_79B9_7F4A_7C15);
mixed = (mixed ^ (mixed >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
mixed = (mixed ^ (mixed >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
mixed ^= mixed >> 31;
// Inclusive upper bound: jitter may equal `base`, matching "up to base".
let jitter_nanos = mixed % base_nanos.saturating_add(1);
Duration::from_nanos(jitter_nanos)
}
impl AuthSource {
@@ -1250,6 +1285,58 @@ mod tests {
);
}
#[test]
fn jittered_backoff_stays_within_additive_bounds_and_varies() {
let client = AnthropicClient::new("test-key").with_retry_policy(
8,
Duration::from_secs(1),
Duration::from_secs(128),
);
let mut samples = Vec::with_capacity(64);
for _ in 0..64 {
let base = client.backoff_for_attempt(3).expect("base attempt 3");
let jittered = client
.jittered_backoff_for_attempt(3)
.expect("jittered attempt 3");
assert!(
jittered >= base,
"jittered delay {jittered:?} must be at least the base {base:?}"
);
assert!(
jittered <= base * 2,
"jittered delay {jittered:?} must not exceed base*2 {:?}",
base * 2
);
samples.push(jittered);
}
let distinct: std::collections::HashSet<_> = samples.iter().collect();
assert!(
distinct.len() > 1,
"jitter should produce varied delays across samples, got {samples:?}"
);
}
#[test]
fn default_retry_policy_matches_exponential_schedule() {
let client = AnthropicClient::new("test-key");
assert_eq!(
client.backoff_for_attempt(1).expect("attempt 1"),
Duration::from_secs(1)
);
assert_eq!(
client.backoff_for_attempt(2).expect("attempt 2"),
Duration::from_secs(2)
);
assert_eq!(
client.backoff_for_attempt(3).expect("attempt 3"),
Duration::from_secs(4)
);
assert_eq!(
client.backoff_for_attempt(8).expect("attempt 8"),
Duration::from_secs(128)
);
}
#[test]
fn retryable_statuses_are_detected() {
assert!(super::is_retryable_status(

View File

@@ -1,5 +1,6 @@
use std::collections::{BTreeMap, VecDeque};
use std::time::Duration;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use serde::Deserialize;
use serde_json::{json, Value};
@@ -19,9 +20,9 @@ pub const DEFAULT_XAI_BASE_URL: &str = "https://api.x.ai/v1";
pub const DEFAULT_OPENAI_BASE_URL: &str = "https://api.openai.com/v1";
const REQUEST_ID_HEADER: &str = "request-id";
const ALT_REQUEST_ID_HEADER: &str = "x-request-id";
const DEFAULT_INITIAL_BACKOFF: Duration = Duration::from_millis(200);
const DEFAULT_MAX_BACKOFF: Duration = Duration::from_secs(2);
const DEFAULT_MAX_RETRIES: u32 = 2;
const DEFAULT_INITIAL_BACKOFF: Duration = Duration::from_secs(1);
const DEFAULT_MAX_BACKOFF: Duration = Duration::from_secs(128);
const DEFAULT_MAX_RETRIES: u32 = 8;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OpenAiCompatConfig {
@@ -191,7 +192,7 @@ impl OpenAiCompatClient {
break retryable_error;
}
tokio::time::sleep(self.backoff_for_attempt(attempts)?).await;
tokio::time::sleep(self.jittered_backoff_for_attempt(attempts)?).await;
};
Err(ApiError::RetriesExhausted {
@@ -227,6 +228,37 @@ impl OpenAiCompatClient {
.checked_mul(multiplier)
.map_or(self.max_backoff, |delay| delay.min(self.max_backoff)))
}
fn jittered_backoff_for_attempt(&self, attempt: u32) -> Result<Duration, ApiError> {
let base = self.backoff_for_attempt(attempt)?;
Ok(base + jitter_for_base(base))
}
}
/// Process-wide counter that guarantees distinct jitter samples even when
/// the system clock resolution is coarser than consecutive retry sleeps.
static JITTER_COUNTER: AtomicU64 = AtomicU64::new(0);
/// Returns a random additive jitter in `[0, base]` to decorrelate retries
/// from multiple concurrent clients. Entropy is drawn from the nanosecond
/// wall clock mixed with a monotonic counter and run through a splitmix64
/// finalizer; adequate for retry jitter (no cryptographic requirement).
fn jitter_for_base(base: Duration) -> Duration {
let base_nanos = u64::try_from(base.as_nanos()).unwrap_or(u64::MAX);
if base_nanos == 0 {
return Duration::ZERO;
}
let raw_nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|elapsed| u64::try_from(elapsed.as_nanos()).unwrap_or(u64::MAX))
.unwrap_or(0);
let tick = JITTER_COUNTER.fetch_add(1, Ordering::Relaxed);
let mut mixed = raw_nanos.wrapping_add(tick).wrapping_add(0x9E37_79B9_7F4A_7C15);
mixed = (mixed ^ (mixed >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
mixed = (mixed ^ (mixed >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
mixed ^= mixed >> 31;
let jitter_nanos = mixed % base_nanos.saturating_add(1);
Duration::from_nanos(jitter_nanos)
}
impl Provider for OpenAiCompatClient {

View File

@@ -545,6 +545,71 @@ async fn surfaces_retry_exhaustion_for_persistent_retryable_errors() {
}
}
#[tokio::test]
async fn retries_multiple_retryable_failures_with_exponential_backoff_and_jitter() {
let state = Arc::new(Mutex::new(Vec::<CapturedRequest>::new()));
let server = spawn_server(
state.clone(),
vec![
http_response(
"429 Too Many Requests",
"application/json",
"{\"type\":\"error\",\"error\":{\"type\":\"rate_limit_error\",\"message\":\"slow down\"}}",
),
http_response(
"500 Internal Server Error",
"application/json",
"{\"type\":\"error\",\"error\":{\"type\":\"api_error\",\"message\":\"boom\"}}",
),
http_response(
"503 Service Unavailable",
"application/json",
"{\"type\":\"error\",\"error\":{\"type\":\"overloaded_error\",\"message\":\"busy\"}}",
),
http_response(
"429 Too Many Requests",
"application/json",
"{\"type\":\"error\",\"error\":{\"type\":\"rate_limit_error\",\"message\":\"slow down again\"}}",
),
http_response(
"503 Service Unavailable",
"application/json",
"{\"type\":\"error\",\"error\":{\"type\":\"overloaded_error\",\"message\":\"still busy\"}}",
),
http_response(
"200 OK",
"application/json",
"{\"id\":\"msg_exp_retry\",\"type\":\"message\",\"role\":\"assistant\",\"content\":[{\"type\":\"text\",\"text\":\"Recovered after 5\"}],\"model\":\"claude-3-7-sonnet-latest\",\"stop_reason\":\"end_turn\",\"stop_sequence\":null,\"usage\":{\"input_tokens\":3,\"output_tokens\":2}}",
),
],
)
.await;
let client = ApiClient::new("test-key")
.with_base_url(server.base_url())
.with_retry_policy(8, Duration::from_millis(1), Duration::from_millis(4));
let started_at = std::time::Instant::now();
let response = client
.send_message(&sample_request(false))
.await
.expect("8-retry policy should absorb 5 retryable failures");
let elapsed = started_at.elapsed();
assert_eq!(response.total_tokens(), 5);
assert_eq!(
state.lock().await.len(),
6,
"client should issue 1 original + 5 retry requests before the 200"
);
// Jittered sleeps are bounded by 2 * max_backoff per retry (base + jitter),
// so 5 sleeps fit comfortably below this upper bound with generous slack.
assert!(
elapsed < Duration::from_secs(5),
"retries should complete promptly, took {elapsed:?}"
);
}
#[tokio::test]
#[allow(clippy::await_holding_lock)]
async fn send_message_reuses_recent_completion_cache_entries() {