📌 内容摘要
- Claude API 的网络抖动、速率限制、服务器过载都需要不同的处理策略——盲目重试会让问题更严重。
- 本文给出完整的错误分类处理方案:哪类错误应该立即重试、哪类要等待、哪类直接放弃。
- 实现三种递进机制:简单重试 → 指数退避 → 熔断器,覆盖从入门到生产的全部场景。
- Python 和 Node.js 双语完整代码,附流式输出的重试特殊处理和监控埋点方案。
一、先搞清楚:哪些错误应该重试?
不是所有错误都值得重试——盲目重试不仅解决不了问题,还可能触发更多速率限制。先把 Claude API 的错误分类:
| HTTP 状态码 | 含义 | 策略 | 等待时间 |
|---|---|---|---|
429 |
速率限制(RPM/TPM) | 重试,但要等待 | 读 Retry-After 头,或指数退避 |
500 |
服务器内部错误 | 重试 | 指数退避,最多3次 |
529 |
API 过载(Anthropic 专有) | 重试,等待更长 | 至少等 30s,指数退避 |
502 / 503 |
网关错误 / 服务不可用 | 重试 | 指数退避 |
| 网络超时 / 连接重置 | 网络抖动 | 重试 | 短暂等待后重试 |
400 |
请求参数错误 | 不重试,修复请求 | — |
401 |
API Key 无效 | 不重试,检查 Key | — |
403 |
权限不足 | 不重试 | — |
对
429 速率限制立刻重试——这会触发更多的速率限制,形成恶性循环。正确做法是读取响应头里的 Retry-After 字段,等待指定时间后再重试。
二、SDK 内置重试(最简单)
Anthropic SDK 自带了基础重试功能,很多场景直接用这个就够了:
import anthropic
# Python SDK 内置重试配置
client = anthropic.Anthropic(
api_key = "sk-ant-...",
max_retries = 3, # 最多重试3次(默认值就是2)
timeout = 60.0, # 请求超时60秒(默认600秒,通常太长)
)
# 异步客户端同样支持
async_client = anthropic.AsyncAnthropic(
api_key = "sk-ant-...",
max_retries = 3,
timeout = anthropic.Timeout(
connect = 5.0, # 建立连接超时
read = 60.0, # 读取响应超时(流式输出需要更长)
write = 10.0, # 发送请求超时
pool = 5.0, # 连接池等待超时
),
)
# SDK 内置重试会自动处理:
# - 429 速率限制(读 Retry-After)
# - 500/502/503/529 服务器错误
# - 网络超时和连接错误
# 不会重试:400/401/403 客户端错误
// Node.js SDK 内置重试
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
maxRetries: 3,
timeout: 60_000, // 毫秒
});
SDK 内置重试的局限:默认策略不一定适合所有场景(比如你需要更长的等待、或者需要在重试时记录日志、或者需要结合业务逻辑判断是否重试)。下面是更精细的自定义实现。
三、自定义重试:指数退避 + Jitter
import asyncio
import random
import time
import logging
from typing import TypeVar, Callable, Awaitable
import anthropic
logger = logging.getLogger(__name__)
T = TypeVar("T")
# ── 错误分类 ──────────────────────────────────────
RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 529}
def is_retryable(error: Exception) -> bool:
"""判断这个错误是否值得重试"""
if isinstance(error, anthropic.RateLimitError):
return True # 429:速率限制,等待后重试
if isinstance(error, anthropic.InternalServerError):
return True # 500:服务器错误,重试
if isinstance(error, anthropic.APIStatusError):
return error.status_code in RETRYABLE_STATUS_CODES
if isinstance(error, (
anthropic.APIConnectionError, # 连接失败
anthropic.APITimeoutError, # 请求超时
)):
return True
return False # 其他错误(400/401/403 等)不重试
def get_retry_after(error: Exception) -> float | None:
"""从错误响应中读取建议的等待时间"""
if isinstance(error, anthropic.APIStatusError):
retry_after = error.response.headers.get("Retry-After")
if retry_after:
try:
return float(retry_after)
except ValueError:
pass
return None
# ── 指数退避核心逻辑 ──────────────────────────────
def calc_backoff(
attempt: int,
base: float = 1.0, # 初始等待时间(秒)
factor: float = 2.0, # 每次翻倍
max_wait: float = 60.0, # 最长等待时间
jitter: bool = True, # 加随机抖动,防止多个请求同时重试("惊群效应")
) -> float:
"""
计算第 attempt 次重试的等待时间
指数退避:1s → 2s → 4s → 8s → ...
加 Jitter:在 [wait*0.5, wait*1.5] 范围内随机
为什么要加 Jitter:
如果100个请求同时失败,同时等相同时间重试,会再次同时触发限制。
加随机抖动后,重试请求会分散在一个时间窗口内,避免"雪崩"。
"""
wait = min(base * (factor ** attempt), max_wait)
if jitter:
wait = wait * (0.5 + random.random()) # 50%–150% 随机
return wait
async def with_retry(
func: Callable[[], Awaitable[T]],
max_retries: int = 3,
base_wait: float = 1.0,
max_wait: float = 60.0,
on_retry: Callable[[int, Exception, float], None] | None = None,
) -> T:
"""
通用异步重试装饰器
Args:
func: 要重试的异步函数(无参数)
max_retries: 最大重试次数
base_wait: 初始等待时间(秒)
max_wait: 最长等待时间(秒)
on_retry: 重试时的回调,参数 (attempt, error, wait_time)
用法:
result = await with_retry(
lambda: client.messages.create(...),
max_retries=3,
)
"""
last_error = None
for attempt in range(max_retries + 1):
try:
return await func()
except Exception as error:
last_error = error
# 不可重试的错误,直接抛出
if not is_retryable(error):
logger.error(f"不可重试的错误:{type(error).__name__}: {error}")
raise
# 已经用完重试次数
if attempt >= max_retries:
logger.error(f"重试 {max_retries} 次后仍失败:{error}")
raise
# 计算等待时间
# 优先用 API 返回的 Retry-After
retry_after = get_retry_after(error)
if retry_after:
wait = min(retry_after, max_wait)
else:
wait = calc_backoff(attempt, base=base_wait, max_wait=max_wait)
logger.warning(
f"第 {attempt + 1} 次失败({type(error).__name__}),"
f"{wait:.1f}s 后重试..."
)
if on_retry:
on_retry(attempt + 1, error, wait)
await asyncio.sleep(wait)
raise last_error
# ── 使用示例 ──────────────────────────────────────
async_client = anthropic.AsyncAnthropic(
api_key = "sk-ant-...",
max_retries = 0, # 关闭 SDK 内置重试,用我们自己的
timeout = 60.0,
)
async def resilient_complete(
messages: list[dict],
system: str = "",
model: str = "claude-sonnet-4-6",
max_tokens: int = 1024,
) -> str:
"""带完整重试逻辑的 Claude 调用"""
def on_retry(attempt: int, error: Exception, wait: float):
# 这里可以接入监控系统(如 Prometheus、DataDog)
logger.info(f"重试 #{attempt},原因:{type(error).__name__},等待 {wait:.1f}s")
response = await with_retry(
func = lambda: async_client.messages.create(
model = model,
max_tokens = max_tokens,
system = system,
messages = messages,
),
max_retries = 3,
base_wait = 1.0,
max_wait = 60.0,
on_retry = on_retry,
)
return response.content[0].text
# 测试
import asyncio
async def main():
result = await resilient_complete(
messages=[{"role": "user", "content": "你好"}],
)
print(result)
asyncio.run(main())
四、超时的精细化设置
"""
超时设置指南:
connect timeout(连接超时):
- 建立 TCP 连接和 TLS 握手的时间
- 通常 3–10 秒够用,网络好的环境 5s,差的 10s
read timeout(读取超时):
- 从发出请求到收到完整响应的时间
- 普通请求:30–60s
- 流式请求:不要设 read timeout,或者设成很大的值(300s+)
因为流式输出是持续接收数据,不是等待单次响应
write timeout(写入超时):
- 发送请求体的时间(通常很快)
- 10s 一般足够,除非你在上传大文件
"""
import anthropic
# 非流式请求的超时配置
sync_client = anthropic.Anthropic(
timeout = anthropic.Timeout(
connect = 5.0,
read = 60.0, # 最长等 60s 收到完整响应
write = 10.0,
pool = 5.0,
),
)
# 流式请求的超时配置(read timeout 要长)
stream_client = anthropic.Anthropic(
timeout = anthropic.Timeout(
connect = 5.0,
read = 300.0, # 流式输出可能持续几分钟
write = 10.0,
pool = 5.0,
),
)
# 单次请求级别的超时覆盖(覆盖客户端默认值)
def complete_with_custom_timeout(
messages: list[dict],
timeout_sec:float = 30.0,
) -> str:
"""对特定请求设置不同的超时"""
response = sync_client.messages.create(
model = "claude-sonnet-4-6",
max_tokens = 1024,
messages = messages,
timeout = timeout_sec, # 覆盖客户端默认超时
)
return response.content[0].text
# 超时后的优雅降级
def complete_with_fallback(
messages: list[dict],
fast_timeout:float = 10.0,
full_timeout:float = 60.0,
) -> dict:
"""
两阶段超时:
先用短超时快速试,失败后用长超时重试
适合:大多数请求快速完成,少数请求需要更长时间
"""
try:
response = sync_client.messages.create(
model = "claude-haiku-4-5-20251001", # 快速模型
max_tokens = 512,
messages = messages,
timeout = fast_timeout,
)
return {
"content": response.content[0].text,
"model": "haiku",
"fast": True,
}
except anthropic.APITimeoutError:
# 快速失败,用慢速但更强的模型重试
response = sync_client.messages.create(
model = "claude-sonnet-4-6",
max_tokens = 1024,
messages = messages,
timeout = full_timeout,
)
return {
"content": response.content[0].text,
"model": "sonnet",
"fast": False,
}
五、流式输出的重试特殊处理
from typing import AsyncGenerator
async def stream_with_retry(
messages: list[dict],
system: str = "",
max_tokens: int = 2048,
max_retries: int = 2,
) -> AsyncGenerator[str, None]:
"""
流式输出的重试逻辑
注意:流式输出中途断开的处理比普通请求复杂:
1. 连接建立前失败 → 可以完整重试
2. 流式传输中途断开 → 必须从头重试(不能续传)
- 已生成的内容会重新生成(有轻微差异)
- 成本会重复计算
"""
for attempt in range(max_retries + 1):
collected_text = []
stream_started = False
try:
async with async_client.messages.stream(
model = "claude-sonnet-4-6",
max_tokens = max_tokens,
system = system,
messages = messages,
) as stream:
stream_started = True
async for text in stream.text_stream:
collected_text.append(text)
yield text
return # 正常结束,退出重试循环
except anthropic.APIConnectionError as e:
if stream_started:
# 流传输中途断开,记录已有内容并决定是否重试
logger.warning(
f"流传输在 {len(''.join(collected_text))} 字符后断开,"
f"尝试重试(第 {attempt + 1} 次)"
)
if attempt < max_retries:
# 通知前端流中断了,等待重试
yield f"\n\n[连接断开,正在重试...]\n\n"
await asyncio.sleep(calc_backoff(attempt))
collected_text = [] # 清空,从头重试
continue
else:
yield "\n\n[连接多次断开,请刷新页面重试]"
return
else:
# 连接前就失败了,正常重试逻辑
if attempt < max_retries and is_retryable(e):
wait = calc_backoff(attempt)
await asyncio.sleep(wait)
continue
raise
except (anthropic.RateLimitError, anthropic.InternalServerError) as e:
if attempt < max_retries:
wait = get_retry_after(e) or calc_backoff(attempt, base=5.0)
logger.warning(f"速率限制/服务错误,{wait:.1f}s 后重试")
yield f"\n\n[请求被限速,{wait:.0f}s 后自动重试...]\n\n"
await asyncio.sleep(wait)
collected_text = []
continue
raise
六、Node.js 完整实现
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
maxRetries: 0, // 关闭内置重试,用自定义
timeout: 60_000,
});
// ── 错误分类 ──────────────────────────────────────
function isRetryable(error: unknown): boolean {
if (error instanceof Anthropic.RateLimitError) return true;
if (error instanceof Anthropic.InternalServerError) return true;
if (error instanceof Anthropic.APIConnectionError) return true;
if (error instanceof Anthropic.APIConnectionTimeoutError) return true;
if (error instanceof Anthropic.APIStatusError) {
return [429, 500, 502, 503, 529].includes(error.status);
}
return false;
}
function getRetryAfter(error: unknown): number | null {
if (error instanceof Anthropic.APIStatusError) {
const retryAfter = error.headers?.["retry-after"];
if (retryAfter) return parseFloat(retryAfter) * 1000; // 转为毫秒
}
return null;
}
function calcBackoff(attempt: number, base = 1000, max = 60_000): number {
const wait = Math.min(base * Math.pow(2, attempt), max);
return wait * (0.5 + Math.random()); // ±50% jitter
}
// ── 通用重试包装器 ────────────────────────────────
async function withRetry(
fn: () => Promise,
maxRetries: number = 3,
baseWait: number = 1000,
onRetry?: (attempt: number, error: unknown, wait: number) => void,
): Promise {
let lastError: unknown;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error;
if (!isRetryable(error)) throw error;
if (attempt >= maxRetries) throw error;
const wait = getRetryAfter(error) ?? calcBackoff(attempt, baseWait);
onRetry?.(attempt + 1, error, wait);
console.warn(
`重试 #${attempt + 1}(${(error as Error).constructor.name}),` +
`${(wait / 1000).toFixed(1)}s 后重试...`
);
await new Promise(r => setTimeout(r, wait));
}
}
throw lastError;
}
// ── 使用示例 ──────────────────────────────────────
async function resilientComplete(
messages: Anthropic.MessageParam[],
system?: string,
maxTokens?: number,
): Promise {
const response = await withRetry(
() => client.messages.create({
model: "claude-sonnet-4-6",
max_tokens: maxTokens ?? 1024,
system: system ?? "你是一个专业的 AI 助手。",
messages,
}),
3, // 最多重试3次
1000, // 初始等待 1s
(attempt, error, wait) => {
// 接入你的监控系统
console.log(`[监控] 重试 #${attempt}, 错误: ${(error as Error).message}, 等待: ${wait}ms`);
}
);
return response.content[0].type === "text" ? response.content[0].text : "";
}
// ── 流式重试(Node.js)────────────────────────────
async function* streamWithRetry(
messages: Anthropic.MessageParam[],
maxRetries: number = 2,
): AsyncGenerator {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
const stream = client.messages.stream({
model: "claude-sonnet-4-6",
max_tokens: 2048,
messages,
});
for await (const chunk of stream) {
if (
chunk.type === "content_block_delta" &&
chunk.delta.type === "text_delta"
) {
yield chunk.delta.text;
}
}
return;
} catch (error) {
if (!isRetryable(error) || attempt >= maxRetries) throw error;
const wait = getRetryAfter(error) ?? calcBackoff(attempt, 2000);
yield `\n\n[重试中,请等待...]\n\n`;
await new Promise(r => setTimeout(r, wait));
}
}
}
// 测试
(async () => {
const result = await resilientComplete([
{ role: "user", content: "你好,请介绍一下自己" }
]);
console.log(result);
})();
七、生产级监控埋点
import time
from collections import defaultdict, deque
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class RequestMetrics:
"""单次请求的指标"""
request_id: str
model: str
start_time: float
end_time: float = 0.0
attempts: int = 1
success: bool = False
error_type: str = ""
input_tokens: int = 0
output_tokens: int = 0
@property
def latency_ms(self) -> float:
return (self.end_time - self.start_time) * 1000
class APIMonitor:
"""
轻量级 API 调用监控
生产环境建议接入 Prometheus/Grafana 或 DataDog
"""
def __init__(self, window_size: int = 1000):
self._metrics: deque = deque(maxlen=window_size)
self._counters = defaultdict(int)
def record(self, metrics: RequestMetrics):
self._metrics.append(metrics)
self._counters["total"] += 1
if metrics.success:
self._counters["success"] += 1
else:
self._counters[f"error_{metrics.error_type}"] += 1
if metrics.attempts > 1:
self._counters["retried"] += 1
def report(self) -> dict:
if not self._metrics:
return {}
recent = list(self._metrics)
total = len(recent)
successes = [m for m in recent if m.success]
failures = [m for m in recent if not m.success]
retried = [m for m in recent if m.attempts > 1]
latencies = [m.latency_ms for m in successes]
return {
"total_requests": total,
"success_rate": len(successes) / total if total else 0,
"retry_rate": len(retried) / total if total else 0,
"avg_latency_ms": sum(latencies) / len(latencies) if latencies else 0,
"p95_latency_ms": sorted(latencies)[int(len(latencies) * 0.95)] if latencies else 0,
"avg_attempts": sum(m.attempts for m in recent) / total if total else 0,
"error_breakdown": dict(self._counters),
}
def should_alert(self) -> list[str]:
"""检查是否需要告警"""
alerts = []
report = self.report()
if not report:
return alerts
if report["success_rate"] < 0.95:
alerts.append(f"⚠️ 成功率低于95%:{report['success_rate']:.1%}")
if report["retry_rate"] > 0.2:
alerts.append(f"⚠️ 重试率超过20%:{report['retry_rate']:.1%}")
if report["avg_latency_ms"] > 10_000:
alerts.append(f"⚠️ 平均延迟超过10s:{report['avg_latency_ms']:.0f}ms")
return alerts
monitor = APIMonitor()
async def monitored_complete(
messages: list[dict],
system: str = "",
model: str = "claude-sonnet-4-6",
max_tokens: int = 1024,
max_retries: int = 3,
) -> str:
"""带监控埋点的完整调用"""
import uuid
metrics = RequestMetrics(
request_id = str(uuid.uuid4())[:8],
model = model,
start_time = time.time(),
)
try:
response = await with_retry(
func = lambda: async_client.messages.create(
model=model, max_tokens=max_tokens,
system=system, messages=messages,
),
max_retries = max_retries,
on_retry = lambda attempt, err, wait: setattr(metrics, "attempts", attempt + 1),
)
metrics.success = True
metrics.input_tokens = response.usage.input_tokens
metrics.output_tokens = response.usage.output_tokens
return response.content[0].text
except Exception as e:
metrics.error_type = type(e).__name__
raise
finally:
metrics.end_time = time.time()
monitor.record(metrics)
# 定期打印报告(生产环境改为推送到监控系统)
if len(monitor._metrics) % 100 == 0:
report = monitor.report()
print(f"[监控] 成功率 {report['success_rate']:.1%}, "
f"平均延迟 {report['avg_latency_ms']:.0f}ms, "
f"重试率 {report['retry_rate']:.1%}")
for alert in monitor.should_alert():
print(alert)
常见问题
Q:SDK 的内置重试和自定义重试可以叠加吗?
不建议叠加——如果 SDK 设了 max_retries=2,你自己又包了一层3次重试,实际上可能重试 6 次(2×3)。推荐的做法是:使用自定义重试时,把 SDK 的 max_retries 设为 0 关闭内置重试,由你的逻辑完全接管。SDK 内置重试逻辑已经足够好,如果你不需要自定义(日志、监控、fallback),直接用内置的更省事。
Q:流式输出中途断开,重试后内容会重复吗?
会的。流式输出不支持断点续传,断开后只能从头重新生成,客户端会看到重复或略有不同的内容。处理方式:在前端先清除已显示的内容再展示重试结果,或者只在流完整结束后展示(不流式展示中间状态)。对于重要的长文内容生成,建议在后端完整生成后再一次性发给前端,而不是流式传输。
Q:rate limit 很频繁,除了重试还能怎么优化?
三个方向:一是把请求合并(多条短消息合成一次批量请求);二是用 Batch API(异步,不受 RPM 限制,还有 50% 折扣);三是在应用层做请求队列,平滑突发流量。如果 rate limit 是长期问题,联系 Anthropic 申请更高的配额。
总结
超时和重试的核心是"对症下药":不同的错误类型需要不同的处理策略。400/401/403 不重试(修代码);429 等待后重试(读 Retry-After 头);500/502/503/529 指数退避重试;超时可以先快速失败再降级。指数退避 + Jitter 是防止惊群效应的标准方案——多个客户端同时失败时,随机化的等待时间会把重试分散开。SDK 内置重试覆盖了大多数场景;需要自定义日志、监控埋点、fallback 策略时再用本文的自定义实现。