📌 内容摘要

  • Claude API 的网络抖动、速率限制、服务器过载都需要不同的处理策略——盲目重试会让问题更严重。
  • 本文给出完整的错误分类处理方案:哪类错误应该立即重试、哪类要等待、哪类直接放弃。
  • 实现三种递进机制:简单重试 → 指数退避 → 熔断器,覆盖从入门到生产的全部场景。
  • Python 和 Node.js 双语完整代码,附流式输出的重试特殊处理和监控埋点方案。

一、先搞清楚:哪些错误应该重试?

不是所有错误都值得重试——盲目重试不仅解决不了问题,还可能触发更多速率限制。先把 Claude API 的错误分类:

HTTP 状态码 含义 策略 等待时间
429 速率限制(RPM/TPM) 重试,但要等待 读 Retry-After 头,或指数退避
500 服务器内部错误 重试 指数退避,最多3次
529 API 过载(Anthropic 专有) 重试,等待更长 至少等 30s,指数退避
502 / 503 网关错误 / 服务不可用 重试 指数退避
网络超时 / 连接重置 网络抖动 重试 短暂等待后重试
400 请求参数错误 不重试,修复请求
401 API Key 无效 不重试,检查 Key
403 权限不足 不重试
⚠️ 最容易犯的错误
429 速率限制立刻重试——这会触发更多的速率限制,形成恶性循环。正确做法是读取响应头里的 Retry-After 字段,等待指定时间后再重试。

二、SDK 内置重试(最简单)

Anthropic SDK 自带了基础重试功能,很多场景直接用这个就够了:

import anthropic

# Python SDK 内置重试配置
client = anthropic.Anthropic(
    api_key     = "sk-ant-...",
    max_retries = 3,         # 最多重试3次(默认值就是2)
    timeout     = 60.0,      # 请求超时60秒(默认600秒,通常太长)
)

# 异步客户端同样支持
async_client = anthropic.AsyncAnthropic(
    api_key     = "sk-ant-...",
    max_retries = 3,
    timeout     = anthropic.Timeout(
        connect = 5.0,    # 建立连接超时
        read    = 60.0,   # 读取响应超时(流式输出需要更长)
        write   = 10.0,   # 发送请求超时
        pool    = 5.0,    # 连接池等待超时
    ),
)

# SDK 内置重试会自动处理:
# - 429 速率限制(读 Retry-After)
# - 500/502/503/529 服务器错误
# - 网络超时和连接错误
# 不会重试:400/401/403 客户端错误
// Node.js SDK 内置重试
import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic({
  apiKey:     process.env.ANTHROPIC_API_KEY,
  maxRetries: 3,
  timeout:    60_000,    // 毫秒
});

SDK 内置重试的局限:默认策略不一定适合所有场景(比如你需要更长的等待、或者需要在重试时记录日志、或者需要结合业务逻辑判断是否重试)。下面是更精细的自定义实现。

三、自定义重试:指数退避 + Jitter

import asyncio
import random
import time
import logging
from typing import TypeVar, Callable, Awaitable

import anthropic

logger = logging.getLogger(__name__)
T = TypeVar("T")

# ── 错误分类 ──────────────────────────────────────

RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 529}

def is_retryable(error: Exception) -> bool:
    """判断这个错误是否值得重试"""
    if isinstance(error, anthropic.RateLimitError):
        return True     # 429:速率限制,等待后重试
    if isinstance(error, anthropic.InternalServerError):
        return True     # 500:服务器错误,重试
    if isinstance(error, anthropic.APIStatusError):
        return error.status_code in RETRYABLE_STATUS_CODES
    if isinstance(error, (
        anthropic.APIConnectionError,     # 连接失败
        anthropic.APITimeoutError,        # 请求超时
    )):
        return True
    return False        # 其他错误(400/401/403 等)不重试

def get_retry_after(error: Exception) -> float | None:
    """从错误响应中读取建议的等待时间"""
    if isinstance(error, anthropic.APIStatusError):
        retry_after = error.response.headers.get("Retry-After")
        if retry_after:
            try:
                return float(retry_after)
            except ValueError:
                pass
    return None


# ── 指数退避核心逻辑 ──────────────────────────────

def calc_backoff(
    attempt:   int,
    base:      float = 1.0,    # 初始等待时间(秒)
    factor:    float = 2.0,    # 每次翻倍
    max_wait:  float = 60.0,   # 最长等待时间
    jitter:    bool  = True,   # 加随机抖动,防止多个请求同时重试("惊群效应")
) -> float:
    """
    计算第 attempt 次重试的等待时间

    指数退避:1s → 2s → 4s → 8s → ...
    加 Jitter:在 [wait*0.5, wait*1.5] 范围内随机

    为什么要加 Jitter:
    如果100个请求同时失败,同时等相同时间重试,会再次同时触发限制。
    加随机抖动后,重试请求会分散在一个时间窗口内,避免"雪崩"。
    """
    wait = min(base * (factor ** attempt), max_wait)
    if jitter:
        wait = wait * (0.5 + random.random())   # 50%–150% 随机
    return wait


async def with_retry(
    func:        Callable[[], Awaitable[T]],
    max_retries: int   = 3,
    base_wait:   float = 1.0,
    max_wait:    float = 60.0,
    on_retry:    Callable[[int, Exception, float], None] | None = None,
) -> T:
    """
    通用异步重试装饰器

    Args:
        func:        要重试的异步函数(无参数)
        max_retries: 最大重试次数
        base_wait:   初始等待时间(秒)
        max_wait:    最长等待时间(秒)
        on_retry:    重试时的回调,参数 (attempt, error, wait_time)

    用法:
        result = await with_retry(
            lambda: client.messages.create(...),
            max_retries=3,
        )
    """
    last_error = None

    for attempt in range(max_retries + 1):
        try:
            return await func()

        except Exception as error:
            last_error = error

            # 不可重试的错误,直接抛出
            if not is_retryable(error):
                logger.error(f"不可重试的错误:{type(error).__name__}: {error}")
                raise

            # 已经用完重试次数
            if attempt >= max_retries:
                logger.error(f"重试 {max_retries} 次后仍失败:{error}")
                raise

            # 计算等待时间
            # 优先用 API 返回的 Retry-After
            retry_after = get_retry_after(error)
            if retry_after:
                wait = min(retry_after, max_wait)
            else:
                wait = calc_backoff(attempt, base=base_wait, max_wait=max_wait)

            logger.warning(
                f"第 {attempt + 1} 次失败({type(error).__name__}),"
                f"{wait:.1f}s 后重试..."
            )

            if on_retry:
                on_retry(attempt + 1, error, wait)

            await asyncio.sleep(wait)

    raise last_error


# ── 使用示例 ──────────────────────────────────────

async_client = anthropic.AsyncAnthropic(
    api_key     = "sk-ant-...",
    max_retries = 0,     # 关闭 SDK 内置重试,用我们自己的
    timeout     = 60.0,
)

async def resilient_complete(
    messages:   list[dict],
    system:     str  = "",
    model:      str  = "claude-sonnet-4-6",
    max_tokens: int  = 1024,
) -> str:
    """带完整重试逻辑的 Claude 调用"""

    def on_retry(attempt: int, error: Exception, wait: float):
        # 这里可以接入监控系统(如 Prometheus、DataDog)
        logger.info(f"重试 #{attempt},原因:{type(error).__name__},等待 {wait:.1f}s")

    response = await with_retry(
        func = lambda: async_client.messages.create(
            model      = model,
            max_tokens = max_tokens,
            system     = system,
            messages   = messages,
        ),
        max_retries = 3,
        base_wait   = 1.0,
        max_wait    = 60.0,
        on_retry    = on_retry,
    )
    return response.content[0].text


# 测试
import asyncio

async def main():
    result = await resilient_complete(
        messages=[{"role": "user", "content": "你好"}],
    )
    print(result)

asyncio.run(main())

四、超时的精细化设置

"""
超时设置指南:

connect timeout(连接超时):
- 建立 TCP 连接和 TLS 握手的时间
- 通常 3–10 秒够用,网络好的环境 5s,差的 10s

read timeout(读取超时):
- 从发出请求到收到完整响应的时间
- 普通请求:30–60s
- 流式请求:不要设 read timeout,或者设成很大的值(300s+)
  因为流式输出是持续接收数据,不是等待单次响应

write timeout(写入超时):
- 发送请求体的时间(通常很快)
- 10s 一般足够,除非你在上传大文件
"""

import anthropic

# 非流式请求的超时配置
sync_client = anthropic.Anthropic(
    timeout = anthropic.Timeout(
        connect = 5.0,
        read    = 60.0,     # 最长等 60s 收到完整响应
        write   = 10.0,
        pool    = 5.0,
    ),
)

# 流式请求的超时配置(read timeout 要长)
stream_client = anthropic.Anthropic(
    timeout = anthropic.Timeout(
        connect = 5.0,
        read    = 300.0,    # 流式输出可能持续几分钟
        write   = 10.0,
        pool    = 5.0,
    ),
)


# 单次请求级别的超时覆盖(覆盖客户端默认值)
def complete_with_custom_timeout(
    messages:   list[dict],
    timeout_sec:float = 30.0,
) -> str:
    """对特定请求设置不同的超时"""
    response = sync_client.messages.create(
        model      = "claude-sonnet-4-6",
        max_tokens = 1024,
        messages   = messages,
        timeout    = timeout_sec,    # 覆盖客户端默认超时
    )
    return response.content[0].text


# 超时后的优雅降级
def complete_with_fallback(
    messages:    list[dict],
    fast_timeout:float = 10.0,
    full_timeout:float = 60.0,
) -> dict:
    """
    两阶段超时:
    先用短超时快速试,失败后用长超时重试
    适合:大多数请求快速完成,少数请求需要更长时间
    """
    try:
        response = sync_client.messages.create(
            model      = "claude-haiku-4-5-20251001",   # 快速模型
            max_tokens = 512,
            messages   = messages,
            timeout    = fast_timeout,
        )
        return {
            "content": response.content[0].text,
            "model":   "haiku",
            "fast":    True,
        }
    except anthropic.APITimeoutError:
        # 快速失败,用慢速但更强的模型重试
        response = sync_client.messages.create(
            model      = "claude-sonnet-4-6",
            max_tokens = 1024,
            messages   = messages,
            timeout    = full_timeout,
        )
        return {
            "content": response.content[0].text,
            "model":   "sonnet",
            "fast":    False,
        }

五、流式输出的重试特殊处理

from typing import AsyncGenerator

async def stream_with_retry(
    messages:    list[dict],
    system:      str  = "",
    max_tokens:  int  = 2048,
    max_retries: int  = 2,
) -> AsyncGenerator[str, None]:
    """
    流式输出的重试逻辑

    注意:流式输出中途断开的处理比普通请求复杂:
    1. 连接建立前失败 → 可以完整重试
    2. 流式传输中途断开 → 必须从头重试(不能续传)
       - 已生成的内容会重新生成(有轻微差异)
       - 成本会重复计算
    """
    for attempt in range(max_retries + 1):
        collected_text = []
        stream_started = False

        try:
            async with async_client.messages.stream(
                model      = "claude-sonnet-4-6",
                max_tokens = max_tokens,
                system     = system,
                messages   = messages,
            ) as stream:
                stream_started = True
                async for text in stream.text_stream:
                    collected_text.append(text)
                    yield text
                return     # 正常结束,退出重试循环

        except anthropic.APIConnectionError as e:
            if stream_started:
                # 流传输中途断开,记录已有内容并决定是否重试
                logger.warning(
                    f"流传输在 {len(''.join(collected_text))} 字符后断开,"
                    f"尝试重试(第 {attempt + 1} 次)"
                )
                if attempt < max_retries:
                    # 通知前端流中断了,等待重试
                    yield f"\n\n[连接断开,正在重试...]\n\n"
                    await asyncio.sleep(calc_backoff(attempt))
                    collected_text = []   # 清空,从头重试
                    continue
                else:
                    yield "\n\n[连接多次断开,请刷新页面重试]"
                    return
            else:
                # 连接前就失败了,正常重试逻辑
                if attempt < max_retries and is_retryable(e):
                    wait = calc_backoff(attempt)
                    await asyncio.sleep(wait)
                    continue
                raise

        except (anthropic.RateLimitError, anthropic.InternalServerError) as e:
            if attempt < max_retries:
                wait = get_retry_after(e) or calc_backoff(attempt, base=5.0)
                logger.warning(f"速率限制/服务错误,{wait:.1f}s 后重试")
                yield f"\n\n[请求被限速,{wait:.0f}s 后自动重试...]\n\n"
                await asyncio.sleep(wait)
                collected_text = []
                continue
            raise

六、Node.js 完整实现

import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic({
  apiKey:     process.env.ANTHROPIC_API_KEY,
  maxRetries: 0,      // 关闭内置重试,用自定义
  timeout:    60_000,
});

// ── 错误分类 ──────────────────────────────────────

function isRetryable(error: unknown): boolean {
  if (error instanceof Anthropic.RateLimitError)      return true;
  if (error instanceof Anthropic.InternalServerError) return true;
  if (error instanceof Anthropic.APIConnectionError)  return true;
  if (error instanceof Anthropic.APIConnectionTimeoutError) return true;
  if (error instanceof Anthropic.APIStatusError) {
    return [429, 500, 502, 503, 529].includes(error.status);
  }
  return false;
}

function getRetryAfter(error: unknown): number | null {
  if (error instanceof Anthropic.APIStatusError) {
    const retryAfter = error.headers?.["retry-after"];
    if (retryAfter) return parseFloat(retryAfter) * 1000;  // 转为毫秒
  }
  return null;
}

function calcBackoff(attempt: number, base = 1000, max = 60_000): number {
  const wait = Math.min(base * Math.pow(2, attempt), max);
  return wait * (0.5 + Math.random());   // ±50% jitter
}


// ── 通用重试包装器 ────────────────────────────────

async function withRetry(
  fn:         () => Promise,
  maxRetries: number = 3,
  baseWait:   number = 1000,
  onRetry?:   (attempt: number, error: unknown, wait: number) => void,
): Promise {
  let lastError: unknown;

  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      return await fn();
    } catch (error) {
      lastError = error;

      if (!isRetryable(error)) throw error;
      if (attempt >= maxRetries) throw error;

      const wait = getRetryAfter(error) ?? calcBackoff(attempt, baseWait);
      onRetry?.(attempt + 1, error, wait);

      console.warn(
        `重试 #${attempt + 1}(${(error as Error).constructor.name}),` +
        `${(wait / 1000).toFixed(1)}s 后重试...`
      );

      await new Promise(r => setTimeout(r, wait));
    }
  }

  throw lastError;
}


// ── 使用示例 ──────────────────────────────────────

async function resilientComplete(
  messages:   Anthropic.MessageParam[],
  system?:    string,
  maxTokens?: number,
): Promise {
  const response = await withRetry(
    () => client.messages.create({
      model:      "claude-sonnet-4-6",
      max_tokens: maxTokens ?? 1024,
      system:     system ?? "你是一个专业的 AI 助手。",
      messages,
    }),
    3,          // 最多重试3次
    1000,       // 初始等待 1s
    (attempt, error, wait) => {
      // 接入你的监控系统
      console.log(`[监控] 重试 #${attempt}, 错误: ${(error as Error).message}, 等待: ${wait}ms`);
    }
  );

  return response.content[0].type === "text" ? response.content[0].text : "";
}


// ── 流式重试(Node.js)────────────────────────────

async function* streamWithRetry(
  messages:   Anthropic.MessageParam[],
  maxRetries: number = 2,
): AsyncGenerator {
  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      const stream = client.messages.stream({
        model:      "claude-sonnet-4-6",
        max_tokens: 2048,
        messages,
      });

      for await (const chunk of stream) {
        if (
          chunk.type === "content_block_delta" &&
          chunk.delta.type === "text_delta"
        ) {
          yield chunk.delta.text;
        }
      }
      return;

    } catch (error) {
      if (!isRetryable(error) || attempt >= maxRetries) throw error;

      const wait = getRetryAfter(error) ?? calcBackoff(attempt, 2000);
      yield `\n\n[重试中,请等待...]\n\n`;
      await new Promise(r => setTimeout(r, wait));
    }
  }
}


// 测试
(async () => {
  const result = await resilientComplete([
    { role: "user", content: "你好,请介绍一下自己" }
  ]);
  console.log(result);
})();

七、生产级监控埋点

import time
from collections import defaultdict, deque
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class RequestMetrics:
    """单次请求的指标"""
    request_id:    str
    model:         str
    start_time:    float
    end_time:      float = 0.0
    attempts:      int   = 1
    success:       bool  = False
    error_type:    str   = ""
    input_tokens:  int   = 0
    output_tokens: int   = 0

    @property
    def latency_ms(self) -> float:
        return (self.end_time - self.start_time) * 1000


class APIMonitor:
    """
    轻量级 API 调用监控
    生产环境建议接入 Prometheus/Grafana 或 DataDog
    """

    def __init__(self, window_size: int = 1000):
        self._metrics: deque = deque(maxlen=window_size)
        self._counters = defaultdict(int)

    def record(self, metrics: RequestMetrics):
        self._metrics.append(metrics)
        self._counters["total"] += 1
        if metrics.success:
            self._counters["success"] += 1
        else:
            self._counters[f"error_{metrics.error_type}"] += 1
        if metrics.attempts > 1:
            self._counters["retried"] += 1

    def report(self) -> dict:
        if not self._metrics:
            return {}

        recent    = list(self._metrics)
        total     = len(recent)
        successes = [m for m in recent if m.success]
        failures  = [m for m in recent if not m.success]
        retried   = [m for m in recent if m.attempts > 1]

        latencies = [m.latency_ms for m in successes]

        return {
            "total_requests":   total,
            "success_rate":     len(successes) / total if total else 0,
            "retry_rate":       len(retried)   / total if total else 0,
            "avg_latency_ms":   sum(latencies) / len(latencies) if latencies else 0,
            "p95_latency_ms":   sorted(latencies)[int(len(latencies) * 0.95)] if latencies else 0,
            "avg_attempts":     sum(m.attempts for m in recent) / total if total else 0,
            "error_breakdown":  dict(self._counters),
        }

    def should_alert(self) -> list[str]:
        """检查是否需要告警"""
        alerts = []
        report = self.report()
        if not report:
            return alerts

        if report["success_rate"] < 0.95:
            alerts.append(f"⚠️ 成功率低于95%:{report['success_rate']:.1%}")
        if report["retry_rate"] > 0.2:
            alerts.append(f"⚠️ 重试率超过20%:{report['retry_rate']:.1%}")
        if report["avg_latency_ms"] > 10_000:
            alerts.append(f"⚠️ 平均延迟超过10s:{report['avg_latency_ms']:.0f}ms")
        return alerts


monitor = APIMonitor()


async def monitored_complete(
    messages:    list[dict],
    system:      str  = "",
    model:       str  = "claude-sonnet-4-6",
    max_tokens:  int  = 1024,
    max_retries: int  = 3,
) -> str:
    """带监控埋点的完整调用"""
    import uuid

    metrics = RequestMetrics(
        request_id = str(uuid.uuid4())[:8],
        model      = model,
        start_time = time.time(),
    )

    try:
        response = await with_retry(
            func = lambda: async_client.messages.create(
                model=model, max_tokens=max_tokens,
                system=system, messages=messages,
            ),
            max_retries = max_retries,
            on_retry    = lambda attempt, err, wait: setattr(metrics, "attempts", attempt + 1),
        )

        metrics.success       = True
        metrics.input_tokens  = response.usage.input_tokens
        metrics.output_tokens = response.usage.output_tokens
        return response.content[0].text

    except Exception as e:
        metrics.error_type = type(e).__name__
        raise

    finally:
        metrics.end_time = time.time()
        monitor.record(metrics)

        # 定期打印报告(生产环境改为推送到监控系统)
        if len(monitor._metrics) % 100 == 0:
            report = monitor.report()
            print(f"[监控] 成功率 {report['success_rate']:.1%}, "
                  f"平均延迟 {report['avg_latency_ms']:.0f}ms, "
                  f"重试率 {report['retry_rate']:.1%}")
        for alert in monitor.should_alert():
            print(alert)

常见问题

Q:SDK 的内置重试和自定义重试可以叠加吗?
不建议叠加——如果 SDK 设了 max_retries=2,你自己又包了一层3次重试,实际上可能重试 6 次(2×3)。推荐的做法是:使用自定义重试时,把 SDK 的 max_retries 设为 0 关闭内置重试,由你的逻辑完全接管。SDK 内置重试逻辑已经足够好,如果你不需要自定义(日志、监控、fallback),直接用内置的更省事。

Q:流式输出中途断开,重试后内容会重复吗?
会的。流式输出不支持断点续传,断开后只能从头重新生成,客户端会看到重复或略有不同的内容。处理方式:在前端先清除已显示的内容再展示重试结果,或者只在流完整结束后展示(不流式展示中间状态)。对于重要的长文内容生成,建议在后端完整生成后再一次性发给前端,而不是流式传输。

Q:rate limit 很频繁,除了重试还能怎么优化?
三个方向:一是把请求合并(多条短消息合成一次批量请求);二是用 Batch API(异步,不受 RPM 限制,还有 50% 折扣);三是在应用层做请求队列,平滑突发流量。如果 rate limit 是长期问题,联系 Anthropic 申请更高的配额。

总结

超时和重试的核心是"对症下药":不同的错误类型需要不同的处理策略。400/401/403 不重试(修代码);429 等待后重试(读 Retry-After 头);500/502/503/529 指数退避重试;超时可以先快速失败再降级。指数退避 + Jitter 是防止惊群效应的标准方案——多个客户端同时失败时,随机化的等待时间会把重试分散开。SDK 内置重试覆盖了大多数场景;需要自定义日志、监控埋点、fallback 策略时再用本文的自定义实现。