📌 内容摘要
- 流式输出让用户看到”逐字生成”而不是等待完整响应,是 AI 产品用户体验的关键设计。
- 本文覆盖完整的全链路实现:Claude API 流式调用 → Python/Node.js 后端代理 → React/原生 JS 前端渲染。
- 重点解决生产中最常见的三个问题:Nginx 缓冲导致流不出来、断线重连、并发流式连接的资源管理。
- 附 EventSource vs fetch+ReadableStream 两种前端方案对比,以及移动端的注意事项。
一、流式输出的工作原理
理解流式输出之前,先理解 SSE(Server-Sent Events)协议——这是 Claude 流式输出的底层传输机制。
SSE 是一种 HTTP 长连接协议:客户端发起一个普通 HTTP 请求,服务器不立刻关闭连接,而是持续向客户端推送数据。每条数据以 data: 内容\n\n 格式发送。连接由服务器关闭或客户端主动断开。
Claude 流式输出的完整数据流:
客户端 你的后端 Claude API
│ │ │
│──── POST /chat/stream ──►│ │
│ │──── messages.stream() ──►│
│ │ │
│◄── data: {"type":"text"} │◄── event: text_delta ────│
│◄── data: {"type":"text"} │◄── event: text_delta ────│
│◄── data: {"type":"text"} │◄── event: text_delta ────│
│◄── data: {"type":"done"} │◄── event: message_stop ──│
│ │ │
│(关闭连接) │(关闭连接) │
不能把 Anthropic API Key 暴露给前端——任何人都能从浏览器 DevTools 里看到它。正确架构是:前端 → 你的后端(持有 API Key)→ Claude API。你的后端充当代理,把 Claude 的流式响应转发给前端。
二、Python 后端流式代理
2.1 FastAPI 基础流式接口
import anthropic
import json
import asyncio
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, AsyncGenerator
app = FastAPI()
client = anthropic.AsyncAnthropic()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["POST", "GET"],
allow_headers=["*"],
)
class ChatRequest(BaseModel):
message: str
system: Optional[str] = None
model: Optional[str] = "claude-sonnet-4-6"
max_tokens: Optional[int] = 2048
temperature: Optional[float] = None
async def claude_stream_generator(
request: ChatRequest,
) -> AsyncGenerator[str, None]:
"""
核心生成器:从 Claude API 获取流,转换为 SSE 格式发给前端
SSE 格式要求:
- 每条消息以 "data: " 开头
- 每条消息以 "\n\n" 结尾
- 可以有多行的消息,用 "\n" 分隔,最后 "\n\n" 结束
"""
kwargs = dict(
model = request.model,
max_tokens = request.max_tokens,
system = request.system or "你是一个专业的 AI 助手,回答简洁准确。",
messages = [{"role": "user", "content": request.message}],
)
if request.temperature is not None:
kwargs["temperature"] = request.temperature
try:
async with client.messages.stream(**kwargs) as stream:
# 发送开始事件
yield f"data: {json.dumps({'type': 'start', 'model': request.model})}\n\n"
# 逐块发送文本
async for text in stream.text_stream:
payload = json.dumps({"type": "text", "data": text}, ensure_ascii=False)
yield f"data: {payload}\n\n"
# 流结束:发送用量统计
final = await stream.get_final_message()
usage = {
"input": final.usage.input_tokens,
"output": final.usage.output_tokens,
"stop_reason": final.stop_reason,
}
yield f"data: {json.dumps({'type': 'usage', 'data': usage})}\n\n"
yield f"data: {json.dumps({'type': 'done'})}\n\n"
except anthropic.RateLimitError:
error_payload = json.dumps({"type": "error", "code": 429, "message": "API 速率限制,请稍后重试"})
yield f"data: {error_payload}\n\n"
except anthropic.APIStatusError as e:
error_payload = json.dumps({"type": "error", "code": e.status_code, "message": str(e.message)})
yield f"data: {error_payload}\n\n"
except asyncio.CancelledError:
# 客户端主动断开连接(正常情况,不需要报错)
pass
except Exception as e:
error_payload = json.dumps({"type": "error", "code": 500, "message": "服务器内部错误"})
yield f"data: {error_payload}\n\n"
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
return StreamingResponse(
claude_stream_generator(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # 关键:告诉 Nginx 不要缓冲
"Access-Control-Allow-Origin":"*",
"Connection": "keep-alive",
},
)
# 普通(非流式)接口,方便对比
@app.post("/api/chat")
async def chat(request: ChatRequest):
try:
response = await client.messages.create(
model = request.model,
max_tokens = request.max_tokens,
system = request.system or "你是一个专业的 AI 助手。",
messages = [{"role": "user", "content": request.message}],
)
return {
"content": response.content[0].text,
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
2.2 多轮对话的流式接口
from collections import defaultdict
# 内存会话存储(生产环境用 Redis)
_sessions: dict[str, list[dict]] = defaultdict(list)
class MultiTurnRequest(BaseModel):
message: str
session_id: str
system: Optional[str] = None
async def multi_turn_stream_generator(
request: MultiTurnRequest,
) -> AsyncGenerator[str, None]:
"""多轮对话流式生成器"""
history = _sessions[request.session_id]
# 追加用户消息
history.append({"role": "user", "content": request.message})
# 控制历史长度(最多保留20条)
if len(history) > 20:
history = history[-20:]
_sessions[request.session_id] = history
full_response = []
try:
async with client.messages.stream(
model = "claude-sonnet-4-6",
max_tokens = 2048,
system = request.system or "你是一个专业的 AI 助手。",
messages = history,
) as stream:
yield f"data: {json.dumps({'type': 'start', 'session_id': request.session_id})}\n\n"
async for text in stream.text_stream:
full_response.append(text)
payload = json.dumps({"type": "text", "data": text}, ensure_ascii=False)
yield f"data: {payload}\n\n"
# 流结束后把 assistant 回复存入历史
assistant_content = "".join(full_response)
history.append({"role": "assistant", "content": assistant_content})
yield f"data: {json.dumps({'type': 'done', 'session_id': request.session_id})}\n\n"
except asyncio.CancelledError:
# 客户端断开,但我们仍然要把已有的回复存入历史
if full_response:
assistant_content = "".join(full_response) + "(已中断)"
history.append({"role": "assistant", "content": assistant_content})
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
@app.post("/api/chat/multi-stream")
async def multi_turn_stream(request: MultiTurnRequest):
return StreamingResponse(
multi_turn_stream_generator(request),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
三、Node.js/TypeScript 后端流式代理
// server.ts
import Anthropic from "@anthropic-ai/sdk";
import express, { Request, Response } from "express";
import cors from "cors";
const app = express();
const claude = new Anthropic();
app.use(cors());
app.use(express.json());
app.post("/api/chat/stream", async (req: Request, res: Response) => {
const { message, system, model = "claude-sonnet-4-6", maxTokens = 2048 } = req.body;
if (!message) {
return res.status(400).json({ error: "message 不能为空" });
}
// 设置 SSE 响应头
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("X-Accel-Buffering", "no"); // 禁用 Nginx 缓冲
res.setHeader("Access-Control-Allow-Origin","*");
res.setHeader("Connection", "keep-alive");
res.flushHeaders(); // 立即发送头部,建立长连接
// 发送辅助函数
const sendEvent = (data: object) => {
res.write(`data: ${JSON.stringify(data)}\n\n`);
};
// 监听客户端断开
let isClosed = false;
req.on("close", () => { isClosed = true; });
try {
sendEvent({ type: "start", model });
const stream = await claude.messages.stream({
model,
max_tokens: maxTokens,
system: system || "你是一个专业的 AI 助手。",
messages: [{ role: "user", content: message }],
});
for await (const chunk of stream) {
if (isClosed) break; // 客户端断开,停止生成
if (
chunk.type === "content_block_delta" &&
chunk.delta.type === "text_delta"
) {
sendEvent({ type: "text", data: chunk.delta.text });
}
}
if (!isClosed) {
const finalMsg = await stream.finalMessage();
sendEvent({
type: "usage",
data: {
input: finalMsg.usage.input_tokens,
output: finalMsg.usage.output_tokens,
stop_reason: finalMsg.stop_reason,
},
});
sendEvent({ type: "done" });
}
} catch (error: any) {
if (!isClosed) {
if (error.status === 429) {
sendEvent({ type: "error", code: 429, message: "速率限制,请稍后重试" });
} else {
sendEvent({ type: "error", code: error.status || 500, message: error.message });
}
}
} finally {
if (!isClosed) {
res.end();
}
}
});
// 健康检查
app.get("/health", (_, res) => res.json({ status: "ok" }));
app.listen(3001, () => console.log("服务启动:http://localhost:3001"));
四、前端实现:fetch + ReadableStream(推荐)
fetch + ReadableStream 是现代浏览器中处理流的最灵活方式,支持 POST 请求(EventSource 只支持 GET):
// 原生 JS 版本,可在任何框架中使用
async function* readSSEStream(response) {
/**
* 把 Response body 解析为 SSE 事件流
* 返回一个异步生成器,每次 yield 一个解析后的事件对象
*/
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// SSE 协议:每个事件以 \n\n 分隔
const lines = buffer.split("\n\n");
buffer = lines.pop() ?? ""; // 最后一段可能不完整,留到下次
for (const line of lines) {
if (!line.trim()) continue;
// 一个 SSE 事件可能有多行(event:、data:、id: 等)
// 我们只处理 data: 行
for (const part of line.split("\n")) {
if (part.startsWith("data: ")) {
try {
yield JSON.parse(part.slice(6));
} catch {
yield { type: "raw", data: part.slice(6) };
}
}
}
}
}
} finally {
reader.releaseLock();
}
}
async function streamChat({
message,
system = "",
onText, // (text: string) => void
onDone, // (usage: object) => void
onError, // (error: object) => void
signal, // AbortSignal(用于取消)
}) {
/**
* 调用流式接口的完整封装
*
* 用法:
* const controller = new AbortController();
* await streamChat({
* message: "你好",
* onText: (t) => console.log(t),
* onDone: (u) => console.log("完成", u),
* signal: controller.signal,
* });
* // 中途取消:controller.abort();
*/
let response;
try {
response = await fetch("/api/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message, system }),
signal, // 支持 AbortController 取消
});
} catch (err) {
if (err.name === "AbortError") return;
onError?.({ code: 0, message: "网络连接失败" });
return;
}
if (!response.ok) {
onError?.({ code: response.status, message: `HTTP ${response.status}` });
return;
}
let usageData = null;
for await (const event of readSSEStream(response)) {
if (signal?.aborted) break;
switch (event.type) {
case "text":
onText?.(event.data);
break;
case "usage":
usageData = event.data;
break;
case "done":
onDone?.(usageData);
break;
case "error":
onError?.(event);
return;
}
}
}
五、React 完整组件
import { useState, useRef, useEffect, useCallback } from "react";
// 复用上面的 streamChat 和 readSSEStream 函数
function ChatMessage({ role, content, isStreaming }) {
return (
{content}
{isStreaming && (
)}
);
}
export default function StreamingChat() {
const [messages, setMessages] = useState([]);
const [input, setInput] = useState("");
const [isStreaming, setIsStreaming] = useState(false);
const [usage, setUsage] = useState(null);
const [error, setError] = useState("");
const abortRef = useRef(null); // 存储 AbortController
const bottomRef = useRef(null); // 自动滚动到底部
// 自动滚动
useEffect(() => {
bottomRef.current?.scrollIntoView({ behavior: "smooth" });
}, [messages]);
const send = useCallback(async () => {
const text = input.trim();
if (!text || isStreaming) return;
setInput("");
setError("");
setUsage(null);
// 添加用户消息
setMessages(prev => [...prev, { role: "user", content: text, id: Date.now() }]);
// 添加空的 assistant 消息(即将被流填充)
const assistantId = Date.now() + 1;
setMessages(prev => [...prev, { role: "assistant", content: "", id: assistantId, streaming: true }]);
setIsStreaming(true);
// 创建 AbortController
const controller = new AbortController();
abortRef.current = controller;
await streamChat({
message: text,
signal: controller.signal,
onText(chunk) {
setMessages(prev =>
prev.map(msg =>
msg.id === assistantId
? { ...msg, content: msg.content + chunk }
: msg
)
);
},
onDone(usageData) {
setMessages(prev =>
prev.map(msg =>
msg.id === assistantId
? { ...msg, streaming: false }
: msg
)
);
setUsage(usageData);
setIsStreaming(false);
},
onError(err) {
setMessages(prev =>
prev.map(msg =>
msg.id === assistantId
? { ...msg, content: "(生成失败,请重试)", streaming: false }
: msg
)
);
setError(`错误 ${err.code}:${err.message}`);
setIsStreaming(false);
},
});
}, [input, isStreaming]);
const stop = useCallback(() => {
abortRef.current?.abort();
setIsStreaming(false);
// 标记当前流式消息为已停止
setMessages(prev =>
prev.map(msg =>
msg.streaming ? { ...msg, streaming: false, content: msg.content + "…(已停止)" } : msg
)
);
}, []);
return (
{/* 消息列表 */}
{messages.length === 0 && (
发送消息开始对话
)}
{messages.map(msg => (
))}
{/* 错误提示 */}
{error && (
⚠️ {error}
)}
{/* 用量统计 */}
{usage && !isStreaming && (
输入 {usage.input} tokens · 输出 {usage.output} tokens
)}
{/* 输入区 */}
);
}
六、EventSource 方案(GET 请求)
EventSource API 是浏览器原生的 SSE 支持,但只支持 GET 请求。适合只需要传少量参数的场景:
// EventSource 方案(GET 请求,参数通过 query string 传递)
// 后端接口
@app.get("/api/chat/stream-get")
async def chat_stream_get(message: str, model: str = "claude-sonnet-4-6"):
return StreamingResponse(
claude_stream_generator(ChatRequest(message=message, model=model)),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
// 前端使用
function useEventSourceStream(message) {
const [output, setOutput] = useState("");
const [done, setDone] = useState(false);
const esRef = useRef(null);
const start = useCallback((msg) => {
// 关闭已有连接
esRef.current?.close();
setOutput("");
setDone(false);
const params = new URLSearchParams({ message: msg });
const es = new EventSource(`/api/chat/stream-get?${params}`);
esRef.current = es;
es.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === "text") {
setOutput(prev => prev + data.data);
} else if (data.type === "done") {
setDone(true);
es.close();
} else if (data.type === "error") {
console.error("流式错误:", data);
es.close();
}
};
es.onerror = () => {
console.error("EventSource 连接错误");
es.close();
};
}, []);
const stop = useCallback(() => {
esRef.current?.close();
setDone(true);
}, []);
return { output, done, start, stop };
}
| 方案 | 请求方式 | 优点 | 缺点 |
|---|---|---|---|
| fetch + ReadableStream | GET / POST | 支持 POST、可传复杂 body、支持 AbortController | 需要手动解析 SSE 格式 |
| EventSource | 仅 GET | 浏览器原生支持、自动重连 | 只支持 GET、参数只能放 URL、无法自定义请求头 |
推荐使用 fetch + ReadableStream——灵活性更高,支持 POST 传复杂参数(如对话历史),且支持 AbortController 取消。
七、Nginx 配置(关键细节)
# /etc/nginx/sites-enabled/your-app.conf
upstream api_backend {
server 127.0.0.1:8000;
}
server {
listen 80;
server_name your-domain.com;
# ── 流式接口的专属配置 ─────────────────────────
location ~ ^/api/chat/(stream|multi-stream) {
proxy_pass http://api_backend;
proxy_http_version 1.1;
# 关闭连接升级头(SSE 不需要 WebSocket 升级)
proxy_set_header Upgrade "";
proxy_set_header Connection "";
# ⚠️ 这三行是让流式输出正常工作的关键
proxy_buffering off; # 关闭 Nginx 缓冲(最重要)
proxy_cache off; # 关闭缓存
proxy_read_timeout 300s; # 流式连接需要更长的超时
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# gzip 压缩对 SSE 没用,且可能导致问题
gzip off;
}
# ── 普通接口 ───────────────────────────────────
location /api/ {
proxy_pass http://api_backend;
proxy_http_version 1.1;
proxy_read_timeout 60s;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
# ── 前端静态文件 ───────────────────────────────
location / {
root /var/www/html;
try_files $uri $uri/ /index.html;
}
}
八、生产级问题处理
断线重连
async function streamWithRetry({
message,
onText,
onDone,
onError,
maxRetries = 3,
retryDelay = 1000,
}) {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
await streamChat({ message, onText, onDone, onError });
return; // 成功,退出重试循环
} catch (err) {
if (attempt === maxRetries) {
onError?.({ code: 0, message: `重试 ${maxRetries} 次后仍失败` });
return;
}
const delay = retryDelay * Math.pow(2, attempt); // 指数退避
await new Promise(r => setTimeout(r, delay));
}
}
}
并发流式连接数限制
# 后端:限制每个用户的并发流式连接数
from collections import defaultdict
import asyncio
_active_streams: dict[str, int] = defaultdict(int)
MAX_CONCURRENT_STREAMS = 3
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest, user_id: str = "default"):
if _active_streams[user_id] >= MAX_CONCURRENT_STREAMS:
raise HTTPException(status_code=429, detail="并发连接数超限")
_active_streams[user_id] += 1
try:
return StreamingResponse(
claude_stream_generator(request),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
finally:
_active_streams[user_id] -= 1
常见问题
Q:流式输出中途中断了怎么恢复?
SSE 协议原生支持断点续传,通过 Last-Event-ID 请求头传递上次收到的事件 ID。但 Claude API 不支持从中途恢复生成——中断了就只能重新开始。实际上,大多数应用的做法是:把已生成的文本展示给用户,提供”继续”按钮,让用户手动触发继续生成。
Q:移动端 Safari 有什么特殊注意事项?
Safari(尤其是 iOS)对长连接有一些限制:后台标签页会暂停连接,从后台切回时可能需要重新建立连接。建议在页面 visibility change 时检测状态,如果流式连接中断了,展示”连接已断开,点击继续”的提示,而不是默默失败。另外 Safari 对 TextDecoder 的 stream: true 参数支持从 iOS 16 开始,旧版本需要 polyfill。
Q:流式输出时,token 用量什么时候能拿到?
流结束之后——整个响应生成完毕,Claude 才会返回 usage 信息。在本文的实现里,用量通过 type: "usage" 事件在 type: "done" 之前发给前端。如果需要在流式过程中实时估算成本,可以用规则近似计算:输入 token 数在请求开始时可以预估(1个英文单词≈1.3 token,1个汉字≈0.7 token),输出 token 数则按生成的字符数实时估算。
总结
流式输出全链路的关键点总结:后端用 StreamingResponse + AsyncGenerator,每条数据必须以 data: ...\n\n 格式发送;Nginx 必须设置 proxy_buffering off,这是最容易被忽视的坑;前端推荐 fetch + ReadableStream 方案而不是 EventSource,因为支持 POST 和 AbortController;生产环境要做并发连接数限制和断线重连。四层的代码(Claude API → Python/Node.js 代理 → SSE 传输 → React 渲染)各自独立,可以按需替换其中任意一层。