📌 内容摘要

  • 流式输出让用户看到”逐字生成”而不是等待完整响应,是 AI 产品用户体验的关键设计。
  • 本文覆盖完整的全链路实现:Claude API 流式调用 → Python/Node.js 后端代理 → React/原生 JS 前端渲染。
  • 重点解决生产中最常见的三个问题:Nginx 缓冲导致流不出来、断线重连、并发流式连接的资源管理。
  • 附 EventSource vs fetch+ReadableStream 两种前端方案对比,以及移动端的注意事项。

一、流式输出的工作原理

理解流式输出之前,先理解 SSE(Server-Sent Events)协议——这是 Claude 流式输出的底层传输机制。

SSE 是一种 HTTP 长连接协议:客户端发起一个普通 HTTP 请求,服务器不立刻关闭连接,而是持续向客户端推送数据。每条数据以 data: 内容\n\n 格式发送。连接由服务器关闭或客户端主动断开。

Claude 流式输出的完整数据流:

客户端                    你的后端                   Claude API
  │                          │                           │
  │──── POST /chat/stream ──►│                           │
  │                          │──── messages.stream() ──►│
  │                          │                           │
  │◄── data: {"type":"text"} │◄── event: text_delta ────│
  │◄── data: {"type":"text"} │◄── event: text_delta ────│
  │◄── data: {"type":"text"} │◄── event: text_delta ────│
  │◄── data: {"type":"done"} │◄── event: message_stop ──│
  │                          │                           │
  │(关闭连接)                 │(关闭连接)                  │
⚠️ 为什么不直接让前端调 Claude API?
不能把 Anthropic API Key 暴露给前端——任何人都能从浏览器 DevTools 里看到它。正确架构是:前端 → 你的后端(持有 API Key)→ Claude API。你的后端充当代理,把 Claude 的流式响应转发给前端。

二、Python 后端流式代理

2.1 FastAPI 基础流式接口

import anthropic
import json
import asyncio
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, AsyncGenerator

app    = FastAPI()
client = anthropic.AsyncAnthropic()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["POST", "GET"],
    allow_headers=["*"],
)

class ChatRequest(BaseModel):
    message:     str
    system:      Optional[str]   = None
    model:       Optional[str]   = "claude-sonnet-4-6"
    max_tokens:  Optional[int]   = 2048
    temperature: Optional[float] = None


async def claude_stream_generator(
    request: ChatRequest,
) -> AsyncGenerator[str, None]:
    """
    核心生成器:从 Claude API 获取流,转换为 SSE 格式发给前端

    SSE 格式要求:
    - 每条消息以 "data: " 开头
    - 每条消息以 "\n\n" 结尾
    - 可以有多行的消息,用 "\n" 分隔,最后 "\n\n" 结束
    """

    kwargs = dict(
        model      = request.model,
        max_tokens = request.max_tokens,
        system     = request.system or "你是一个专业的 AI 助手,回答简洁准确。",
        messages   = [{"role": "user", "content": request.message}],
    )
    if request.temperature is not None:
        kwargs["temperature"] = request.temperature

    try:
        async with client.messages.stream(**kwargs) as stream:
            # 发送开始事件
            yield f"data: {json.dumps({'type': 'start', 'model': request.model})}\n\n"

            # 逐块发送文本
            async for text in stream.text_stream:
                payload = json.dumps({"type": "text", "data": text}, ensure_ascii=False)
                yield f"data: {payload}\n\n"

            # 流结束:发送用量统计
            final   = await stream.get_final_message()
            usage   = {
                "input":       final.usage.input_tokens,
                "output":      final.usage.output_tokens,
                "stop_reason": final.stop_reason,
            }
            yield f"data: {json.dumps({'type': 'usage', 'data': usage})}\n\n"
            yield f"data: {json.dumps({'type': 'done'})}\n\n"

    except anthropic.RateLimitError:
        error_payload = json.dumps({"type": "error", "code": 429, "message": "API 速率限制,请稍后重试"})
        yield f"data: {error_payload}\n\n"

    except anthropic.APIStatusError as e:
        error_payload = json.dumps({"type": "error", "code": e.status_code, "message": str(e.message)})
        yield f"data: {error_payload}\n\n"

    except asyncio.CancelledError:
        # 客户端主动断开连接(正常情况,不需要报错)
        pass

    except Exception as e:
        error_payload = json.dumps({"type": "error", "code": 500, "message": "服务器内部错误"})
        yield f"data: {error_payload}\n\n"


@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
    return StreamingResponse(
        claude_stream_generator(request),
        media_type="text/event-stream",
        headers={
            "Cache-Control":              "no-cache",
            "X-Accel-Buffering":          "no",       # 关键:告诉 Nginx 不要缓冲
            "Access-Control-Allow-Origin":"*",
            "Connection":                 "keep-alive",
        },
    )


# 普通(非流式)接口,方便对比
@app.post("/api/chat")
async def chat(request: ChatRequest):
    try:
        response = await client.messages.create(
            model      = request.model,
            max_tokens = request.max_tokens,
            system     = request.system or "你是一个专业的 AI 助手。",
            messages   = [{"role": "user", "content": request.message}],
        )
        return {
            "content":       response.content[0].text,
            "input_tokens":  response.usage.input_tokens,
            "output_tokens": response.usage.output_tokens,
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

2.2 多轮对话的流式接口

from collections import defaultdict

# 内存会话存储(生产环境用 Redis)
_sessions: dict[str, list[dict]] = defaultdict(list)

class MultiTurnRequest(BaseModel):
    message:    str
    session_id: str
    system:     Optional[str] = None


async def multi_turn_stream_generator(
    request: MultiTurnRequest,
) -> AsyncGenerator[str, None]:
    """多轮对话流式生成器"""
    history = _sessions[request.session_id]

    # 追加用户消息
    history.append({"role": "user", "content": request.message})

    # 控制历史长度(最多保留20条)
    if len(history) > 20:
        history = history[-20:]
        _sessions[request.session_id] = history

    full_response = []

    try:
        async with client.messages.stream(
            model      = "claude-sonnet-4-6",
            max_tokens = 2048,
            system     = request.system or "你是一个专业的 AI 助手。",
            messages   = history,
        ) as stream:
            yield f"data: {json.dumps({'type': 'start', 'session_id': request.session_id})}\n\n"

            async for text in stream.text_stream:
                full_response.append(text)
                payload = json.dumps({"type": "text", "data": text}, ensure_ascii=False)
                yield f"data: {payload}\n\n"

            # 流结束后把 assistant 回复存入历史
            assistant_content = "".join(full_response)
            history.append({"role": "assistant", "content": assistant_content})

            yield f"data: {json.dumps({'type': 'done', 'session_id': request.session_id})}\n\n"

    except asyncio.CancelledError:
        # 客户端断开,但我们仍然要把已有的回复存入历史
        if full_response:
            assistant_content = "".join(full_response) + "(已中断)"
            history.append({"role": "assistant", "content": assistant_content})

    except Exception as e:
        yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"


@app.post("/api/chat/multi-stream")
async def multi_turn_stream(request: MultiTurnRequest):
    return StreamingResponse(
        multi_turn_stream_generator(request),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )

三、Node.js/TypeScript 后端流式代理

// server.ts
import Anthropic from "@anthropic-ai/sdk";
import express, { Request, Response } from "express";
import cors from "cors";

const app    = express();
const claude = new Anthropic();

app.use(cors());
app.use(express.json());


app.post("/api/chat/stream", async (req: Request, res: Response) => {
  const { message, system, model = "claude-sonnet-4-6", maxTokens = 2048 } = req.body;

  if (!message) {
    return res.status(400).json({ error: "message 不能为空" });
  }

  // 设置 SSE 响应头
  res.setHeader("Content-Type",               "text/event-stream");
  res.setHeader("Cache-Control",              "no-cache");
  res.setHeader("X-Accel-Buffering",          "no");        // 禁用 Nginx 缓冲
  res.setHeader("Access-Control-Allow-Origin","*");
  res.setHeader("Connection",                 "keep-alive");
  res.flushHeaders();    // 立即发送头部,建立长连接

  // 发送辅助函数
  const sendEvent = (data: object) => {
    res.write(`data: ${JSON.stringify(data)}\n\n`);
  };

  // 监听客户端断开
  let isClosed = false;
  req.on("close", () => { isClosed = true; });

  try {
    sendEvent({ type: "start", model });

    const stream = await claude.messages.stream({
      model,
      max_tokens: maxTokens,
      system:     system || "你是一个专业的 AI 助手。",
      messages:   [{ role: "user", content: message }],
    });

    for await (const chunk of stream) {
      if (isClosed) break;    // 客户端断开,停止生成

      if (
        chunk.type === "content_block_delta" &&
        chunk.delta.type === "text_delta"
      ) {
        sendEvent({ type: "text", data: chunk.delta.text });
      }
    }

    if (!isClosed) {
      const finalMsg = await stream.finalMessage();
      sendEvent({
        type:  "usage",
        data:  {
          input:        finalMsg.usage.input_tokens,
          output:       finalMsg.usage.output_tokens,
          stop_reason:  finalMsg.stop_reason,
        },
      });
      sendEvent({ type: "done" });
    }

  } catch (error: any) {
    if (!isClosed) {
      if (error.status === 429) {
        sendEvent({ type: "error", code: 429, message: "速率限制,请稍后重试" });
      } else {
        sendEvent({ type: "error", code: error.status || 500, message: error.message });
      }
    }
  } finally {
    if (!isClosed) {
      res.end();
    }
  }
});


// 健康检查
app.get("/health", (_, res) => res.json({ status: "ok" }));

app.listen(3001, () => console.log("服务启动:http://localhost:3001"));

四、前端实现:fetch + ReadableStream(推荐)

fetch + ReadableStream 是现代浏览器中处理流的最灵活方式,支持 POST 请求(EventSource 只支持 GET):

// 原生 JS 版本,可在任何框架中使用

async function* readSSEStream(response) {
  /**
   * 把 Response body 解析为 SSE 事件流
   * 返回一个异步生成器,每次 yield 一个解析后的事件对象
   */
  const reader  = response.body.getReader();
  const decoder = new TextDecoder();
  let   buffer  = "";

  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      buffer += decoder.decode(value, { stream: true });

      // SSE 协议:每个事件以 \n\n 分隔
      const lines = buffer.split("\n\n");
      buffer = lines.pop() ?? "";   // 最后一段可能不完整,留到下次

      for (const line of lines) {
        if (!line.trim()) continue;

        // 一个 SSE 事件可能有多行(event:、data:、id: 等)
        // 我们只处理 data: 行
        for (const part of line.split("\n")) {
          if (part.startsWith("data: ")) {
            try {
              yield JSON.parse(part.slice(6));
            } catch {
              yield { type: "raw", data: part.slice(6) };
            }
          }
        }
      }
    }
  } finally {
    reader.releaseLock();
  }
}


async function streamChat({
  message,
  system     = "",
  onText,    // (text: string) => void
  onDone,    // (usage: object) => void
  onError,   // (error: object) => void
  signal,    // AbortSignal(用于取消)
}) {
  /**
   * 调用流式接口的完整封装
   *
   * 用法:
   * const controller = new AbortController();
   * await streamChat({
   *   message: "你好",
   *   onText:  (t) => console.log(t),
   *   onDone:  (u) => console.log("完成", u),
   *   signal:  controller.signal,
   * });
   * // 中途取消:controller.abort();
   */

  let response;
  try {
    response = await fetch("/api/chat/stream", {
      method:  "POST",
      headers: { "Content-Type": "application/json" },
      body:    JSON.stringify({ message, system }),
      signal,    // 支持 AbortController 取消
    });
  } catch (err) {
    if (err.name === "AbortError") return;
    onError?.({ code: 0, message: "网络连接失败" });
    return;
  }

  if (!response.ok) {
    onError?.({ code: response.status, message: `HTTP ${response.status}` });
    return;
  }

  let usageData = null;

  for await (const event of readSSEStream(response)) {
    if (signal?.aborted) break;

    switch (event.type) {
      case "text":
        onText?.(event.data);
        break;

      case "usage":
        usageData = event.data;
        break;

      case "done":
        onDone?.(usageData);
        break;

      case "error":
        onError?.(event);
        return;
    }
  }
}

五、React 完整组件

import { useState, useRef, useEffect, useCallback } from "react";

// 复用上面的 streamChat 和 readSSEStream 函数

function ChatMessage({ role, content, isStreaming }) {
  return (
    
{content} {isStreaming && ( )}
); } export default function StreamingChat() { const [messages, setMessages] = useState([]); const [input, setInput] = useState(""); const [isStreaming, setIsStreaming] = useState(false); const [usage, setUsage] = useState(null); const [error, setError] = useState(""); const abortRef = useRef(null); // 存储 AbortController const bottomRef = useRef(null); // 自动滚动到底部 // 自动滚动 useEffect(() => { bottomRef.current?.scrollIntoView({ behavior: "smooth" }); }, [messages]); const send = useCallback(async () => { const text = input.trim(); if (!text || isStreaming) return; setInput(""); setError(""); setUsage(null); // 添加用户消息 setMessages(prev => [...prev, { role: "user", content: text, id: Date.now() }]); // 添加空的 assistant 消息(即将被流填充) const assistantId = Date.now() + 1; setMessages(prev => [...prev, { role: "assistant", content: "", id: assistantId, streaming: true }]); setIsStreaming(true); // 创建 AbortController const controller = new AbortController(); abortRef.current = controller; await streamChat({ message: text, signal: controller.signal, onText(chunk) { setMessages(prev => prev.map(msg => msg.id === assistantId ? { ...msg, content: msg.content + chunk } : msg ) ); }, onDone(usageData) { setMessages(prev => prev.map(msg => msg.id === assistantId ? { ...msg, streaming: false } : msg ) ); setUsage(usageData); setIsStreaming(false); }, onError(err) { setMessages(prev => prev.map(msg => msg.id === assistantId ? { ...msg, content: "(生成失败,请重试)", streaming: false } : msg ) ); setError(`错误 ${err.code}:${err.message}`); setIsStreaming(false); }, }); }, [input, isStreaming]); const stop = useCallback(() => { abortRef.current?.abort(); setIsStreaming(false); // 标记当前流式消息为已停止 setMessages(prev => prev.map(msg => msg.streaming ? { ...msg, streaming: false, content: msg.content + "…(已停止)" } : msg ) ); }, []); return (
{/* 消息列表 */}
{messages.length === 0 && (
发送消息开始对话
)} {messages.map(msg => ( ))}
{/* 错误提示 */} {error && (
⚠️ {error}
)} {/* 用量统计 */} {usage && !isStreaming && (
输入 {usage.input} tokens · 输出 {usage.output} tokens
)} {/* 输入区 */}