📌 内容摘要
- 流式输出让 Claude 的回答逐字显示,首字延迟从数秒降到毫秒级,大幅提升用户体验。
- 本文覆盖完整技术栈:SSE 原理 → Python 服务端 → Node.js 服务端 → React 前端 → 原生 JS 前端。
- 所有代码示例均可直接运行,包含错误处理、重连机制和连接中断检测。
- 附常见坑点:Nginx 反代配置、移动端 SSE 兼容性、流中断后如何恢复。
一、为什么需要流式输出?
默认情况下,Claude API 要等模型生成完全部内容后,才把整个响应一次性返回。对于短回复这没问题,但如果 Claude 需要写一篇文章或分析一段代码,用户可能要等待 5-30 秒才看到任何内容——这种体验很差。
流式输出(Streaming)改变了这一点:模型每生成一小段文字就立即推送给客户端,用户几乎立刻就能看到第一个字,之后内容持续滚动出现。这正是 claude.ai 网页版的工作方式。
| 对比维度 | 普通请求 | 流式输出 |
|---|---|---|
| 首字时间(TTFT) | 等全部生成完(5-30秒) | 毫秒级(约100-300ms) |
| 用户体验 | 长时间空白等待 | 内容逐字出现,有打字感 |
| 费用 | 完全相同 | 完全相同 |
| 适用场景 | 批处理、后台任务 | 聊天界面、实时展示 |
二、技术原理:SSE(Server-Sent Events)
Claude 的流式输出基于 SSE 协议——服务端通过一个长连接持续向客户端推送文本事件,每个事件格式如下:
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"你好"}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":",我是"}}
event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":42}}
event: message_stop
data: {"type":"message_stop"}
Anthropic SDK 已经封装好了这些细节,你通常不需要手动解析——但了解底层格式,有助于调试问题和构建自定义处理逻辑。
三、Python 流式输出
基础用法
import anthropic
client = anthropic.Anthropic()
# 方式一:stream 上下文管理器(推荐)
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": "写一篇500字的科技简报"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True) # flush=True 确保立即输出
print() # 最后换行
# 流结束后获取完整统计
final = stream.get_final_message()
print(f"输入 tokens: {final.usage.input_tokens}")
print(f"输出 tokens: {final.usage.output_tokens}")
处理所有事件类型
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "写一首短诗"}]
) as stream:
for event in stream:
match event.type:
case "message_start":
print(f"[开始] 模型: {event.message.model}")
case "content_block_start":
print("[内容块开始]")
case "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
case "message_delta":
print(f"\n[结束] 原因: {event.delta.stop_reason}")
print(f"输出 tokens: {event.usage.output_tokens}")
case "message_stop":
print("[流已关闭]")
Python 异步流式输出
import asyncio
import anthropic
client = anthropic.AsyncAnthropic()
async def stream_response(prompt: str):
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
print()
final = await stream.get_final_message()
return final
asyncio.run(stream_response("解释一下 Python 的 GIL"))
FastAPI 流式接口
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
import json
app = FastAPI()
client = anthropic.AsyncAnthropic()
@app.post("/chat/stream")
async def chat_stream(body: dict):
messages = body.get("messages", [])
system = body.get("system", "你是一个有帮助的 AI 助手。")
async def generate():
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=2048,
system=system,
messages=messages,
) as stream:
async for text in stream.text_stream:
# SSE 格式:data: {...}\n\n
yield f"data: {json.dumps({'text': text}, ensure_ascii=False)}\n\n"
final = await stream.get_final_message()
yield f"data: {json.dumps({'done': True, 'usage': {'input': final.usage.input_tokens, 'output': final.usage.output_tokens}})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
}
)
四、Node.js 流式输出
基础用法
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
const stream = await client.messages.stream({
model: "claude-sonnet-4-6",
max_tokens: 2048,
messages: [{ role: "user", content: "写一篇500字的科技简报" }],
});
// 逐块输出
for await (const chunk of stream) {
if (
chunk.type === "content_block_delta" &&
chunk.delta.type === "text_delta"
) {
process.stdout.write(chunk.delta.text);
}
}
console.log();
const final = await stream.finalMessage();
console.log(`输入 tokens: ${final.usage.input_tokens}`);
console.log(`输出 tokens: ${final.usage.output_tokens}`);
事件监听器方式
const stream = client.messages.stream({
model: "claude-sonnet-4-6",
max_tokens: 1024,
messages: [{ role: "user", content: "写一首短诗" }],
});
stream
.on("text", (text) => {
process.stdout.write(text);
})
.on("message", (msg) => {
console.log(`\n[完成] stop_reason: ${msg.stop_reason}`);
console.log(`tokens: ${msg.usage.input_tokens} in / ${msg.usage.output_tokens} out`);
})
.on("error", (err) => {
console.error("流错误:", err.message);
});
await stream.finalMessage();
Express SSE 流式接口(完整版)
import express from "express";
import Anthropic from "@anthropic-ai/sdk";
const app = express();
const client = new Anthropic();
app.use(express.json());
app.post("/api/stream", async (req, res) => {
const { messages, system = "你是一个有帮助的 AI 助手。" } = req.body;
// SSE 响应头
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", // 禁用 Nginx 缓冲(关键!)
"Access-Control-Allow-Origin": "*", // 跨域支持
});
// 检测客户端断开连接
let isClientConnected = true;
req.on("close", () => {
isClientConnected = false;
});
// 辅助函数:发送 SSE 事件
const sendEvent = (data: object) => {
if (isClientConnected) {
res.write(`data: ${JSON.stringify(data)}\n\n`);
}
};
try {
const stream = client.messages.stream({
model: "claude-sonnet-4-6",
max_tokens: 2048,
system,
messages,
});
stream.on("text", (text) => {
sendEvent({ type: "text", text });
});
const final = await stream.finalMessage();
sendEvent({
type: "done",
usage: {
input_tokens: final.usage.input_tokens,
output_tokens: final.usage.output_tokens,
},
});
} catch (error) {
if (error instanceof Anthropic.APIError) {
sendEvent({ type: "error", message: error.message, status: error.status });
} else {
sendEvent({ type: "error", message: "Internal server error" });
}
} finally {
res.end();
}
});
app.listen(3000);
五、前端接入
原生 JavaScript(fetch + ReadableStream)
async function streamChat(userMessage, onText, onDone) {
const response = await fetch("/api/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
messages: [{ role: "user", content: userMessage }],
}),
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 按行解析 SSE 数据
const lines = buffer.split("\n");
buffer = lines.pop() ?? ""; // 最后一行可能不完整,保留到下次
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const raw = line.slice(6).trim();
if (!raw) continue;
try {
const data = JSON.parse(raw);
if (data.type === "text") onText(data.text);
if (data.type === "done") onDone(data.usage);
if (data.type === "error") throw new Error(data.message);
} catch {
// 忽略解析错误
}
}
}
}
// 使用示例
const output = document.getElementById("output");
output.textContent = "";
await streamChat(
"写一首关于程序员的短诗",
(text) => { output.textContent += text; },
(usage) => { console.log("完成,用量:", usage); }
);
React Hook 封装
import { useState, useCallback, useRef } from "react";
interface UseChatStreamOptions {
apiUrl?: string;
system?: string;
}
export function useChatStream({
apiUrl = "/api/stream",
system,
}: UseChatStreamOptions = {}) {
const [content, setContent] = useState("");
const [isLoading, setIsLoading] = useState(false);
const [error, setError] = useState(null);
const abortRef = useRef(null);
const send = useCallback(async (userMessage: string) => {
// 取消上一个请求
abortRef.current?.abort();
abortRef.current = new AbortController();
setContent("");
setError(null);
setIsLoading(true);
try {
const response = await fetch(apiUrl, {
method: "POST",
headers: { "Content-Type": "application/json" },
signal: abortRef.current.signal,
body: JSON.stringify({
messages: [{ role: "user", content: userMessage }],
system,
}),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
try {
const data = JSON.parse(line.slice(6));
if (data.type === "text") {
setContent((prev) => prev + data.text);
}
if (data.type === "error") {
setError(data.message);
}
} catch { /* ignore */ }
}
}
} catch (err: unknown) {
if (err instanceof Error && err.name !== "AbortError") {
setError(err.message);
}
} finally {
setIsLoading(false);
}
}, [apiUrl, system]);
const stop = useCallback(() => {
abortRef.current?.abort();
setIsLoading(false);
}, []);
return { content, isLoading, error, send, stop };
}
// 组件使用示例
function ChatBox() {
const { content, isLoading, error, send, stop } = useChatStream();
const [input, setInput] = useState("");
return (
);
}
六、常见坑点与解决方案
坑一:Nginx 反代缓冲导致流式失效
在 Nginx 反向代理后面部署时,Nginx 默认会缓冲响应内容,导致流式输出变成”等全部内容后一次返回”。解决方案是在响应头中加入 X-Accel-Buffering: no,或在 Nginx 配置中加入:
location /api/stream {
proxy_pass http://backend;
proxy_buffering off; # 关闭缓冲
proxy_cache off;
proxy_set_header Connection ''; # 保持长连接
chunked_transfer_encoding on;
}
坑二:前端 SSE 解析不完整行
网络分片可能导致单个 SSE 事件被切断,在两次 read() 之间分割。必须用 buffer 拼接,确认遇到 \n\n 后再解析,本文代码已处理这一问题。
坑三:流中断后无提示
网络异常可能导致流静默中断(既没有 done 事件也没有报错)。建议在前端加超时检测:如果超过 N 秒没收到新内容,提示用户并允许重试。
坑四:移动端 Safari 兼容性
iOS Safari 对 SSE 的支持较好,但对 ReadableStream 的逐字节读取有细微差异。建议用 EventSource API 替代手动解析,或在生产环境引入 @microsoft/fetch-event-source 库处理边界情况。
常见问题
Q:流式输出和普通输出的费用一样吗?
完全一样。流式输出只改变数据传输方式,不影响 token 计费逻辑——输入 token 数 × 输入单价 + 输出 token 数 × 输出单价。
Q:用户中途停止生成,已经产生的 token 还会收费吗?
会。已经生成的 token 会计费,即使客户端断开连接,服务端仍会继续生成直到完成或触发 max_tokens。建议在服务端检测客户端断开后主动中止流(本文 Express 示例已处理)。
Q:流式输出可以和 Prompt Caching 同时使用吗?
可以,两者完全兼容。在流式请求中加入 cache_control 参数即可,缓存命中时输入 token 费用降低 90%,流式传输方式不受影响。
总结
流式输出是构建 AI 聊天产品的标配技术,核心是三层:Anthropic SDK 的 stream 方法(服务端生成)→ SSE/chunked HTTP(网络传输)→ ReadableStream(前端解析)。Python 用 FastAPI、Node.js 用 Express,前端用原生 fetch 或封装成 React Hook——本文所有代码都可以直接拿去用,按需组合即可。