📌 内容摘要

  • 流式输出让 Claude 的回答逐字显示,首字延迟从数秒降到毫秒级,大幅提升用户体验。
  • 本文覆盖完整技术栈:SSE 原理 → Python 服务端 → Node.js 服务端 → React 前端 → 原生 JS 前端。
  • 所有代码示例均可直接运行,包含错误处理、重连机制和连接中断检测。
  • 附常见坑点:Nginx 反代配置、移动端 SSE 兼容性、流中断后如何恢复。

一、为什么需要流式输出?

默认情况下,Claude API 要等模型生成完全部内容后,才把整个响应一次性返回。对于短回复这没问题,但如果 Claude 需要写一篇文章或分析一段代码,用户可能要等待 5-30 秒才看到任何内容——这种体验很差。

流式输出(Streaming)改变了这一点:模型每生成一小段文字就立即推送给客户端,用户几乎立刻就能看到第一个字,之后内容持续滚动出现。这正是 claude.ai 网页版的工作方式。

对比维度 普通请求 流式输出
首字时间(TTFT) 等全部生成完(5-30秒) 毫秒级(约100-300ms)
用户体验 长时间空白等待 内容逐字出现,有打字感
费用 完全相同 完全相同
适用场景 批处理、后台任务 聊天界面、实时展示

二、技术原理:SSE(Server-Sent Events)

Claude 的流式输出基于 SSE 协议——服务端通过一个长连接持续向客户端推送文本事件,每个事件格式如下:

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"你好"}}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":",我是"}}

event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":42}}

event: message_stop
data: {"type":"message_stop"}

Anthropic SDK 已经封装好了这些细节,你通常不需要手动解析——但了解底层格式,有助于调试问题和构建自定义处理逻辑。

三、Python 流式输出

基础用法

import anthropic

client = anthropic.Anthropic()

# 方式一:stream 上下文管理器(推荐)
with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=2048,
    messages=[{"role": "user", "content": "写一篇500字的科技简报"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)  # flush=True 确保立即输出

print()  # 最后换行

# 流结束后获取完整统计
final = stream.get_final_message()
print(f"输入 tokens: {final.usage.input_tokens}")
print(f"输出 tokens: {final.usage.output_tokens}")

处理所有事件类型

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[{"role": "user", "content": "写一首短诗"}]
) as stream:
    for event in stream:
        match event.type:
            case "message_start":
                print(f"[开始] 模型: {event.message.model}")

            case "content_block_start":
                print("[内容块开始]")

            case "content_block_delta":
                if event.delta.type == "text_delta":
                    print(event.delta.text, end="", flush=True)

            case "message_delta":
                print(f"\n[结束] 原因: {event.delta.stop_reason}")
                print(f"输出 tokens: {event.usage.output_tokens}")

            case "message_stop":
                print("[流已关闭]")

Python 异步流式输出

import asyncio
import anthropic

client = anthropic.AsyncAnthropic()

async def stream_response(prompt: str):
    async with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)

    print()
    final = await stream.get_final_message()
    return final

asyncio.run(stream_response("解释一下 Python 的 GIL"))

FastAPI 流式接口

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
import json

app = FastAPI()
client = anthropic.AsyncAnthropic()

@app.post("/chat/stream")
async def chat_stream(body: dict):
    messages = body.get("messages", [])
    system = body.get("system", "你是一个有帮助的 AI 助手。")

    async def generate():
        async with client.messages.stream(
            model="claude-sonnet-4-6",
            max_tokens=2048,
            system=system,
            messages=messages,
        ) as stream:
            async for text in stream.text_stream:
                # SSE 格式:data: {...}\n\n
                yield f"data: {json.dumps({'text': text}, ensure_ascii=False)}\n\n"

            final = await stream.get_final_message()
            yield f"data: {json.dumps({'done': True, 'usage': {'input': final.usage.input_tokens, 'output': final.usage.output_tokens}})}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # 禁用 Nginx 缓冲
        }
    )

四、Node.js 流式输出

基础用法

import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic();

const stream = await client.messages.stream({
  model: "claude-sonnet-4-6",
  max_tokens: 2048,
  messages: [{ role: "user", content: "写一篇500字的科技简报" }],
});

// 逐块输出
for await (const chunk of stream) {
  if (
    chunk.type === "content_block_delta" &&
    chunk.delta.type === "text_delta"
  ) {
    process.stdout.write(chunk.delta.text);
  }
}

console.log();
const final = await stream.finalMessage();
console.log(`输入 tokens: ${final.usage.input_tokens}`);
console.log(`输出 tokens: ${final.usage.output_tokens}`);

事件监听器方式

const stream = client.messages.stream({
  model: "claude-sonnet-4-6",
  max_tokens: 1024,
  messages: [{ role: "user", content: "写一首短诗" }],
});

stream
  .on("text", (text) => {
    process.stdout.write(text);
  })
  .on("message", (msg) => {
    console.log(`\n[完成] stop_reason: ${msg.stop_reason}`);
    console.log(`tokens: ${msg.usage.input_tokens} in / ${msg.usage.output_tokens} out`);
  })
  .on("error", (err) => {
    console.error("流错误:", err.message);
  });

await stream.finalMessage();

Express SSE 流式接口(完整版)

import express from "express";
import Anthropic from "@anthropic-ai/sdk";

const app = express();
const client = new Anthropic();

app.use(express.json());

app.post("/api/stream", async (req, res) => {
  const { messages, system = "你是一个有帮助的 AI 助手。" } = req.body;

  // SSE 响应头
  res.writeHead(200, {
    "Content-Type": "text/event-stream",
    "Cache-Control": "no-cache, no-transform",
    "Connection": "keep-alive",
    "X-Accel-Buffering": "no",          // 禁用 Nginx 缓冲(关键!)
    "Access-Control-Allow-Origin": "*",  // 跨域支持
  });

  // 检测客户端断开连接
  let isClientConnected = true;
  req.on("close", () => {
    isClientConnected = false;
  });

  // 辅助函数:发送 SSE 事件
  const sendEvent = (data: object) => {
    if (isClientConnected) {
      res.write(`data: ${JSON.stringify(data)}\n\n`);
    }
  };

  try {
    const stream = client.messages.stream({
      model: "claude-sonnet-4-6",
      max_tokens: 2048,
      system,
      messages,
    });

    stream.on("text", (text) => {
      sendEvent({ type: "text", text });
    });

    const final = await stream.finalMessage();

    sendEvent({
      type: "done",
      usage: {
        input_tokens: final.usage.input_tokens,
        output_tokens: final.usage.output_tokens,
      },
    });

  } catch (error) {
    if (error instanceof Anthropic.APIError) {
      sendEvent({ type: "error", message: error.message, status: error.status });
    } else {
      sendEvent({ type: "error", message: "Internal server error" });
    }
  } finally {
    res.end();
  }
});

app.listen(3000);

五、前端接入

原生 JavaScript(fetch + ReadableStream)

async function streamChat(userMessage, onText, onDone) {
  const response = await fetch("/api/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({
      messages: [{ role: "user", content: userMessage }],
    }),
  });

  if (!response.ok) {
    throw new Error(`HTTP ${response.status}`);
  }

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });

    // 按行解析 SSE 数据
    const lines = buffer.split("\n");
    buffer = lines.pop() ?? "";  // 最后一行可能不完整,保留到下次

    for (const line of lines) {
      if (!line.startsWith("data: ")) continue;
      const raw = line.slice(6).trim();
      if (!raw) continue;

      try {
        const data = JSON.parse(raw);
        if (data.type === "text") onText(data.text);
        if (data.type === "done") onDone(data.usage);
        if (data.type === "error") throw new Error(data.message);
      } catch {
        // 忽略解析错误
      }
    }
  }
}

// 使用示例
const output = document.getElementById("output");
output.textContent = "";

await streamChat(
  "写一首关于程序员的短诗",
  (text) => { output.textContent += text; },
  (usage) => { console.log("完成,用量:", usage); }
);

React Hook 封装

import { useState, useCallback, useRef } from "react";

interface UseChatStreamOptions {
  apiUrl?: string;
  system?: string;
}

export function useChatStream({
  apiUrl = "/api/stream",
  system,
}: UseChatStreamOptions = {}) {
  const [content, setContent] = useState("");
  const [isLoading, setIsLoading] = useState(false);
  const [error, setError] = useState(null);
  const abortRef = useRef(null);

  const send = useCallback(async (userMessage: string) => {
    // 取消上一个请求
    abortRef.current?.abort();
    abortRef.current = new AbortController();

    setContent("");
    setError(null);
    setIsLoading(true);

    try {
      const response = await fetch(apiUrl, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        signal: abortRef.current.signal,
        body: JSON.stringify({
          messages: [{ role: "user", content: userMessage }],
          system,
        }),
      });

      const reader = response.body!.getReader();
      const decoder = new TextDecoder();
      let buffer = "";

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split("\n");
        buffer = lines.pop() ?? "";

        for (const line of lines) {
          if (!line.startsWith("data: ")) continue;
          try {
            const data = JSON.parse(line.slice(6));
            if (data.type === "text") {
              setContent((prev) => prev + data.text);
            }
            if (data.type === "error") {
              setError(data.message);
            }
          } catch { /* ignore */ }
        }
      }
    } catch (err: unknown) {
      if (err instanceof Error && err.name !== "AbortError") {
        setError(err.message);
      }
    } finally {
      setIsLoading(false);
    }
  }, [apiUrl, system]);

  const stop = useCallback(() => {
    abortRef.current?.abort();
    setIsLoading(false);
  }, []);

  return { content, isLoading, error, send, stop };
}

// 组件使用示例
function ChatBox() {
  const { content, isLoading, error, send, stop } = useChatStream();
  const [input, setInput] = useState("");

  return (