📌 内容摘要
- 从零构建一个生产可用的 Claude AI 后端,覆盖接口设计、流式输出、会话管理、速率限制、认证鉴权全套功能。
- 重点解决生产环境的三大痛点:并发处理(异步 + 连接池)、成本控制(token 计量 + 用量上限)、稳定性(重试 + 熔断 + 优雅降级)。
- 所有代码模块化设计,可按需组合,直接用于实际项目。
- 文末提供 Docker 容器化和 Nginx 反向代理配置,一键部署到生产环境。
一、项目架构设计
claude-api-backend/ ├── app/ │ ├── main.py # FastAPI 应用入口 │ ├── config.py # 配置管理(环境变量) │ ├── deps.py # 依赖注入(认证、限流) │ ├── routers/ │ │ ├── chat.py # 对话接口(普通 + 流式) │ │ ├── sessions.py # 会话管理接口 │ │ └── admin.py # 管理接口(用量统计) │ ├── services/ │ │ ├── claude.py # Claude API 封装层 │ │ ├── session.py # 会话存储服务 │ │ └── usage.py # 用量追踪服务 │ └── models.py # Pydantic 数据模型 ├── .env ├── requirements.txt ├── Dockerfile └── docker-compose.yml
二、依赖和配置
pip install fastapi uvicorn[standard] anthropic redis python-dotenv pydantic-settings slowapi
# .env ANTHROPIC_API_KEY=sk-ant-api03-... REDIS_URL=redis://localhost:6379 API_SECRET_KEY=your-secret-key-here DEFAULT_MODEL=claude-sonnet-4-6 MAX_TOKENS_PER_REQUEST=4096 MAX_TOKENS_PER_USER_DAY=100000 # 每用户每天最多消耗的 token 数 RATE_LIMIT_PER_MINUTE=30 # 每分钟最多请求数 DEBUG=false
# app/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
anthropic_api_key: str
redis_url: str = "redis://localhost:6379"
api_secret_key: str = "dev-secret"
default_model: str = "claude-sonnet-4-6"
max_tokens_per_request: int = 4096
max_tokens_per_user_day: int = 100_000
rate_limit_per_minute: int = 30
debug: bool = False
class Config:
env_file = ".env"
@lru_cache()
def get_settings() -> Settings:
return Settings()
settings = get_settings()
三、数据模型
# app/models.py
from pydantic import BaseModel, Field
from typing import Optional, Literal
from datetime import datetime
import uuid
class Message(BaseModel):
role: Literal["user", "assistant"]
content: str
class ChatRequest(BaseModel):
message: str = Field(..., min_length=1, max_length=10000)
session_id: Optional[str] = None # 不传则创建新会话
model: Optional[str] = None # 不传则用默认模型
system: Optional[str] = None # 可选的 system prompt 覆盖
max_tokens: Optional[int] = Field(None, ge=1, le=4096)
temperature:Optional[float] = Field(None, ge=0.0, le=1.0)
stream: bool = False
class ChatResponse(BaseModel):
session_id: str
message_id: str
content: str
model: str
input_tokens: int
output_tokens: int
created_at: datetime
class SessionInfo(BaseModel):
session_id: str
message_count:int
created_at: datetime
last_active: datetime
total_tokens: int
class UsageStats(BaseModel):
user_id: str
date: str
input_tokens: int
output_tokens: int
total_tokens: int
request_count: int
remaining_quota:int
四、Claude 服务层
# app/services/claude.py
import anthropic
import asyncio
from typing import AsyncGenerator
from app.config import settings
# 使用异步客户端
_async_client: anthropic.AsyncAnthropic | None = None
def get_claude_client() -> anthropic.AsyncAnthropic:
global _async_client
if _async_client is None:
_async_client = anthropic.AsyncAnthropic(
api_key=settings.anthropic_api_key,
max_retries=3, # SDK 内置重试
timeout=60.0, # 请求超时
)
return _async_client
DEFAULT_SYSTEM = """你是一个专业、友善的 AI 助手。
- 回答准确、简洁
- 遇到不确定的问题,明确说明不确定性
- 不提供可能有害的内容"""
async def chat_complete(
messages: list[dict],
model: str = None,
system: str = None,
max_tokens: int = None,
temperature: float = None,
) -> tuple[str, int, int]:
"""
普通对话(等待完整响应)
Returns:
(回复文本, 输入token数, 输出token数)
"""
client = get_claude_client()
kwargs = dict(
model = model or settings.default_model,
max_tokens = max_tokens or settings.max_tokens_per_request,
system = system or DEFAULT_SYSTEM,
messages = messages,
)
if temperature is not None:
kwargs["temperature"] = temperature
response = await client.messages.create(**kwargs)
content = response.content[0].text
return content, response.usage.input_tokens, response.usage.output_tokens
async def chat_stream(
messages: list[dict],
model: str = None,
system: str = None,
max_tokens: int = None,
temperature: float = None,
) -> AsyncGenerator[dict, None]:
"""
流式对话,逐块 yield
Yields:
{"type": "text", "data": "文字片段"}
{"type": "usage", "data": {"input": N, "output": M}}
{"type": "done", "data": None}
{"type": "error", "data": "错误信息"}
"""
client = get_claude_client()
kwargs = dict(
model = model or settings.default_model,
max_tokens = max_tokens or settings.max_tokens_per_request,
system = system or DEFAULT_SYSTEM,
messages = messages,
)
if temperature is not None:
kwargs["temperature"] = temperature
try:
async with client.messages.stream(**kwargs) as stream:
async for text in stream.text_stream:
yield {"type": "text", "data": text}
# 流结束后获取用量统计
final = await stream.get_final_message()
yield {
"type": "usage",
"data": {
"input": final.usage.input_tokens,
"output": final.usage.output_tokens,
}
}
yield {"type": "done", "data": None}
except anthropic.RateLimitError:
yield {"type": "error", "data": "API 速率限制,请稍后重试"}
except anthropic.APIStatusError as e:
yield {"type": "error", "data": f"API 错误:{e.status_code}"}
except Exception as e:
yield {"type": "error", "data": str(e)}
五、会话管理服务
# app/services/session.py
import json
import uuid
from datetime import datetime, timedelta
from redis.asyncio import Redis
from app.models import Message, SessionInfo
SESSION_TTL = 60 * 60 * 2 # 会话 2 小时无活动后过期
MAX_HISTORY = 20 # 每个会话最多保留 20 条消息(控制 token 用量)
class SessionService:
def __init__(self, redis: Redis):
self.redis = redis
def _key(self, session_id: str) -> str:
return f"session:{session_id}"
def _meta_key(self, session_id: str) -> str:
return f"session_meta:{session_id}"
async def get_or_create(
self,
session_id: str | None,
user_id: str,
system: str | None = None,
) -> tuple[str, list[dict]]:
"""
获取已有会话或创建新会话
Returns:
(session_id, messages_history)
"""
if session_id:
history = await self._load(session_id)
if history is not None:
await self._update_meta(session_id)
return session_id, history
# session_id 不存在或已过期,创建新会话
# 创建新会话
new_id = str(uuid.uuid4())
await self._save(new_id, [])
await self._create_meta(new_id, user_id, system)
return new_id, []
async def append(
self,
session_id: str,
role: str,
content: str,
tokens: int = 0,
):
"""追加消息到会话历史"""
history = await self._load(session_id) or []
history.append({"role": role, "content": content})
# 滑动窗口:超过上限时删除最早的消息(保留 system 相关的逻辑)
if len(history) > MAX_HISTORY:
history = history[-(MAX_HISTORY):]
await self._save(session_id, history)
# 更新 token 用量统计
if tokens > 0:
meta_key = self._meta_key(session_id)
await self.redis.hincrby(meta_key, "total_tokens", tokens)
await self.redis.hincrby(meta_key, "message_count", 1)
async def get_info(self, session_id: str) -> SessionInfo | None:
"""获取会话元信息"""
meta = await self.redis.hgetall(self._meta_key(session_id))
if not meta:
return None
return SessionInfo(
session_id = session_id,
message_count = int(meta.get(b"message_count", 0)),
created_at = datetime.fromisoformat(meta[b"created_at"].decode()),
last_active = datetime.fromisoformat(meta[b"last_active"].decode()),
total_tokens = int(meta.get(b"total_tokens", 0)),
)
async def delete(self, session_id: str):
await self.redis.delete(self._key(session_id))
await self.redis.delete(self._meta_key(session_id))
async def _load(self, session_id: str) -> list[dict] | None:
data = await self.redis.get(self._key(session_id))
return json.loads(data) if data else None
async def _save(self, session_id: str, history: list[dict]):
await self.redis.setex(
self._key(session_id),
SESSION_TTL,
json.dumps(history, ensure_ascii=False),
)
async def _create_meta(self, session_id: str, user_id: str, system: str | None):
now = datetime.utcnow().isoformat()
await self.redis.hset(self._meta_key(session_id), mapping={
"user_id": user_id,
"system": system or "",
"created_at": now,
"last_active": now,
"message_count": 0,
"total_tokens": 0,
})
await self.redis.expire(self._meta_key(session_id), SESSION_TTL)
async def _update_meta(self, session_id: str):
await self.redis.hset(
self._meta_key(session_id),
"last_active",
datetime.utcnow().isoformat(),
)
await self.redis.expire(self._key(session_id), SESSION_TTL)
await self.redis.expire(self._meta_key(session_id), SESSION_TTL)
六、依赖注入:认证与限流
# app/deps.py
from fastapi import Depends, HTTPException, Header, Request
from redis.asyncio import Redis, from_url
from app.config import settings
from app.services.session import SessionService
import time
# ── Redis 连接池 ──────────────────────────────────
_redis: Redis | None = None
async def get_redis() -> Redis:
global _redis
if _redis is None:
_redis = from_url(settings.redis_url, decode_responses=False)
return _redis
# ── 会话服务依赖 ──────────────────────────────────
async def get_session_service(
redis: Redis = Depends(get_redis),
) -> SessionService:
return SessionService(redis)
# ── API Key 认证 ──────────────────────────────────
async def verify_api_key(
x_api_key: str = Header(..., description="API 密钥"),
) -> str:
"""
简单的 API Key 认证
生产环境建议替换为数据库存储的多用户 Key 管理
"""
# 实际项目:从数据库查找 key 对应的用户
if x_api_key == settings.api_secret_key:
return "default_user"
# 支持多用户:key 格式 "user_id:secret"
if ":" in x_api_key:
user_id, secret = x_api_key.split(":", 1)
# TODO:从数据库验证
if len(user_id) > 0 and len(secret) >= 16:
return user_id
raise HTTPException(status_code=401, detail="无效的 API Key")
# ── 速率限制 ──────────────────────────────────────
async def check_rate_limit(
request: Request,
user_id: str = Depends(verify_api_key),
redis: Redis = Depends(get_redis),
) -> str:
"""
基于滑动窗口的速率限制
每个用户每分钟最多 N 次请求
"""
now = int(time.time())
window = 60 # 60 秒窗口
key = f"rate:{user_id}:{now // window}"
count = await redis.incr(key)
if count == 1:
await redis.expire(key, window * 2)
if count > settings.rate_limit_per_minute:
retry_after = window - (now % window)
raise HTTPException(
status_code=429,
detail=f"请求过于频繁,请 {retry_after} 秒后重试",
headers={"Retry-After": str(retry_after)},
)
return user_id
# ── Token 用量限制 ────────────────────────────────
async def check_token_quota(
user_id: str = Depends(verify_api_key),
redis: Redis = Depends(get_redis),
) -> str:
"""检查用户今日 token 用量是否超限"""
from datetime import date
today = date.today().isoformat()
key = f"usage:{user_id}:{today}"
current = await redis.get(key)
used = int(current) if current else 0
if used >= settings.max_tokens_per_user_day:
raise HTTPException(
status_code=429,
detail=f"今日 token 用量已达上限({settings.max_tokens_per_user_day:,}),明日重置",
)
return user_id
async def record_token_usage(
user_id: str,
tokens: int,
redis: Redis,
):
"""记录 token 用量(在响应后调用)"""
from datetime import date
today = date.today().isoformat()
key = f"usage:{user_id}:{today}"
await redis.incrby(key, tokens)
await redis.expire(key, 86400 * 2) # 保留2天
七、路由层:对话接口
# app/routers/chat.py
import uuid
import json
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from fastapi.responses import StreamingResponse
from app.models import ChatRequest, ChatResponse
from app.services.claude import chat_complete, chat_stream
from app.services.session import SessionService
from app.deps import (
check_rate_limit, check_token_quota,
get_session_service, get_redis, record_token_usage,
)
from redis.asyncio import Redis
router = APIRouter(prefix="/v1", tags=["对话"])
@router.post("/chat", response_model=ChatResponse)
async def chat(
req: ChatRequest,
background: BackgroundTasks,
user_id: str = Depends(check_rate_limit),
_quota_check: str = Depends(check_token_quota),
session_svc: SessionService = Depends(get_session_service),
redis: Redis = Depends(get_redis),
):
"""
普通对话接口(等待完整响应)
适合短对话和对延迟不敏感的场景
"""
# 获取或创建会话
session_id, history = await session_svc.get_or_create(
session_id=req.session_id,
user_id=user_id,
system=req.system,
)
# 构建消息列表
history.append({"role": "user", "content": req.message})
# 调用 Claude
try:
content, input_tokens, output_tokens = await chat_complete(
messages = history,
model = req.model,
system = req.system,
max_tokens = req.max_tokens,
temperature = req.temperature,
)
except Exception as e:
raise HTTPException(status_code=502, detail=f"AI 服务暂时不可用:{str(e)}")
# 保存会话历史
await session_svc.append(session_id, "user", req.message)
await session_svc.append(session_id, "assistant", content,
tokens=input_tokens + output_tokens)
# 异步记录用量(不阻塞响应)
total_tokens = input_tokens + output_tokens
background.add_task(record_token_usage, user_id, total_tokens, redis)
return ChatResponse(
session_id = session_id,
message_id = str(uuid.uuid4()),
content = content,
model = req.model or "claude-sonnet-4-6",
input_tokens = input_tokens,
output_tokens = output_tokens,
created_at = datetime.utcnow(),
)
@router.post("/chat/stream")
async def chat_stream_endpoint(
req: ChatRequest,
user_id: str = Depends(check_rate_limit),
_quota_check: str = Depends(check_token_quota),
session_svc: SessionService = Depends(get_session_service),
redis: Redis = Depends(get_redis),
):
"""
流式对话接口(SSE)
适合长内容生成和需要实时反馈的场景
"""
session_id, history = await session_svc.get_or_create(
session_id=req.session_id,
user_id=user_id,
system=req.system,
)
history.append({"role": "user", "content": req.message})
async def event_generator():
full_content = []
input_t = output_t = 0
# 发送 session_id(客户端用于后续请求)
yield f"data: {json.dumps({'type': 'session', 'session_id': session_id})}\n\n"
async for event in chat_stream(
messages = history,
model = req.model,
system = req.system,
max_tokens = req.max_tokens,
temperature = req.temperature,
):
if event["type"] == "text":
full_content.append(event["data"])
yield f"data: {json.dumps({'type': 'text', 'data': event['data']}, ensure_ascii=False)}\n\n"
elif event["type"] == "usage":
input_t = event["data"]["input"]
output_t = event["data"]["output"]
yield f"data: {json.dumps({'type': 'usage', 'input': input_t, 'output': output_t})}\n\n"
elif event["type"] == "error":
yield f"data: {json.dumps({'type': 'error', 'message': event['data']})}\n\n"
return
elif event["type"] == "done":
# 保存会话历史
content = "".join(full_content)
await session_svc.append(session_id, "user", req.message)
await session_svc.append(session_id, "assistant", content,
tokens=input_t + output_t)
await record_token_usage(user_id, input_t + output_t, redis)
yield "data: {\"type\": \"done\"}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Access-Control-Allow-Origin": "*",
},
)
八、应用入口
# app/main.py
import time
import uuid
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import logging
from app.config import settings
from app.routers import chat, sessions
# 日志配置
logging.basicConfig(
level=logging.DEBUG if settings.debug else logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用启动 / 关闭时的初始化和清理"""
logger.info("应用启动,初始化连接...")
# 可以在这里预热 Redis 连接、加载配置等
yield
logger.info("应用关闭,清理资源...")
# 清理全局连接
from app.deps import _redis
if _redis:
await _redis.close()
app = FastAPI(
title = "Claude AI Backend API",
description = "生产级 Claude AI 后端服务",
version = "1.0.0",
lifespan = lifespan,
docs_url = "/docs" if settings.debug else None, # 生产环境关闭 Swagger
redoc_url = None,
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins = ["*"], # 生产环境改为具体域名
allow_credentials = True,
allow_methods = ["*"],
allow_headers = ["*"],
)
# 请求 ID 和耗时日志中间件
@app.middleware("http")
async def request_logging(request: Request, call_next):
request_id = str(uuid.uuid4())[:8]
start = time.time()
request.state.request_id = request_id
response = await call_next(request)
duration = (time.time() - start) * 1000
logger.info(
f"[{request_id}] {request.method} {request.url.path} "
f"→ {response.status_code} ({duration:.0f}ms)"
)
response.headers["X-Request-ID"] = request_id
return response
# 全局异常处理
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(f"未处理的异常:{exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={"detail": "内部服务器错误,请稍后重试"},
)
# 注册路由
app.include_router(chat.router)
# 健康检查
@app.get("/health", tags=["系统"])
async def health_check():
return {"status": "ok", "version": "1.0.0"}
@app.get("/", include_in_schema=False)
async def root():
return {"message": "Claude AI Backend API", "docs": "/docs"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app.main:app",
host = "0.0.0.0",
port = 8000,
reload = settings.debug,
workers = 1 if settings.debug else 4,
)
九、Docker 部署
# Dockerfile FROM python:3.11-slim WORKDIR /app # 只复制依赖文件,利用 Docker 层缓存 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY app/ ./app/ # 非 root 用户运行(安全最佳实践) RUN useradd -m appuser && chown -R appuser /app USER appuser EXPOSE 8000 CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
# docker-compose.yml
version: "3.9"
services:
api:
build: .
ports:
- "8000:8000"
env_file: .env
depends_on:
redis:
condition: service_healthy
restart: unless-stopped
deploy:
resources:
limits:
cpus: "2"
memory: "512M"
redis:
image: redis:7-alpine
command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 3s
retries: 5
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- api
restart: unless-stopped
# nginx.conf(关键片段)
upstream api_backend {
server api:8000;
keepalive 32;
}
server {
listen 80;
# 流式接口:关闭缓冲,确保 SSE 实时传输
location /v1/chat/stream {
proxy_pass http://api_backend;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off; # 关键:关闭 Nginx 缓冲
proxy_cache off;
proxy_read_timeout 300s; # 流式连接超时设长一些
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
# 普通接口
location /v1/chat {
proxy_pass http://api_backend;
proxy_http_version 1.1;
proxy_read_timeout 60s;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
location /health {
proxy_pass http://api_backend;
}
}
十、API 调用示例
import httpx
import json
BASE_URL = "http://localhost:8000"
HEADERS = {"X-API-Key": "your-secret-key-here"}
# 普通对话
response = httpx.post(
f"{BASE_URL}/v1/chat",
headers=HEADERS,
json={"message": "你好,介绍一下 FastAPI 的主要特性", "stream": False},
)
print(response.json())
# 多轮对话(复用 session_id)
first = httpx.post(
f"{BASE_URL}/v1/chat",
headers=HEADERS,
json={"message": "我叫张三,请记住我的名字"},
)
session_id = first.json()["session_id"]
second = httpx.post(
f"{BASE_URL}/v1/chat",
headers=HEADERS,
json={"message": "我叫什么名字?", "session_id": session_id},
)
print(second.json()["content"]) # 应输出:张三
# 流式对话
with httpx.stream(
"POST",
f"{BASE_URL}/v1/chat/stream",
headers=HEADERS,
json={"message": "写一首关于秋天的诗", "stream": True},
timeout=60,
) as resp:
for line in resp.iter_lines():
if line.startswith("data: "):
event = json.loads(line[6:])
if event["type"] == "text":
print(event["data"], end="", flush=True)
elif event["type"] == "done":
print("\n--- 完成 ---")
常见问题
Q:多个 Uvicorn worker 进程之间如何共享会话状态?
会话数据存在 Redis 里,所有 worker 进程共享同一个 Redis 实例,天然支持多进程共享。唯一需要注意的是 Redis 连接池:每个 worker 进程独立维护自己的连接池(全局 _redis 变量),这是正确的设计,不需要跨进程共享连接对象。
Q:流式接口在 Nginx 后面不生效(内容整块返回)怎么解决?
原因是 Nginx 默认会缓冲上游响应。关键配置是 proxy_buffering off——必须在流式接口的 location 块里设置,不能只在全局设置。同时确认 proxy_http_version 1.1 和 proxy_set_header Connection "" 也已配置,这两个是 HTTP/1.1 长连接所必需的。
Q:如何实现更精细的用户级别 API Key 管理?
将 deps.py 中的 verify_api_key 改为查询数据库(PostgreSQL/MySQL):Key 存储在数据库里,包含用户ID、权限级别、每日配额、过期时间等字段。每次请求查一次数据库,用 Redis 缓存查询结果(TTL 5分钟)减少数据库压力。这样就能实现多租户、细粒度的权限控制。
总结
生产级 AI 后端的核心是三件事:异步处理(FastAPI + 异步 Claude 客户端,支持高并发)、状态管理(Redis 存会话历史和用量计数,多进程安全)、防护机制(速率限制 + Token 配额 + 全局异常处理,保护下游 API 和控制成本)。流式接口是 AI 产品的体验关键,Nginx 的 proxy_buffering off 是绕不过去的配置细节。代码中的每个模块都可以独立替换——比如把 Redis 换成 PostgreSQL 做持久化会话,或者把简单 API Key 认证换成 JWT,不影响其他模块。