📌 内容摘要

  • 从零构建一个生产可用的 Claude AI 后端,覆盖接口设计、流式输出、会话管理、速率限制、认证鉴权全套功能。
  • 重点解决生产环境的三大痛点:并发处理(异步 + 连接池)、成本控制(token 计量 + 用量上限)、稳定性(重试 + 熔断 + 优雅降级)。
  • 所有代码模块化设计,可按需组合,直接用于实际项目。
  • 文末提供 Docker 容器化和 Nginx 反向代理配置,一键部署到生产环境。

一、项目架构设计

claude-api-backend/
├── app/
│   ├── main.py            # FastAPI 应用入口
│   ├── config.py          # 配置管理(环境变量)
│   ├── deps.py            # 依赖注入(认证、限流)
│   ├── routers/
│   │   ├── chat.py        # 对话接口(普通 + 流式)
│   │   ├── sessions.py    # 会话管理接口
│   │   └── admin.py       # 管理接口(用量统计)
│   ├── services/
│   │   ├── claude.py      # Claude API 封装层
│   │   ├── session.py     # 会话存储服务
│   │   └── usage.py       # 用量追踪服务
│   └── models.py          # Pydantic 数据模型
├── .env
├── requirements.txt
├── Dockerfile
└── docker-compose.yml

二、依赖和配置

pip install fastapi uvicorn[standard] anthropic redis python-dotenv pydantic-settings slowapi
# .env
ANTHROPIC_API_KEY=sk-ant-api03-...
REDIS_URL=redis://localhost:6379
API_SECRET_KEY=your-secret-key-here
DEFAULT_MODEL=claude-sonnet-4-6
MAX_TOKENS_PER_REQUEST=4096
MAX_TOKENS_PER_USER_DAY=100000   # 每用户每天最多消耗的 token 数
RATE_LIMIT_PER_MINUTE=30         # 每分钟最多请求数
DEBUG=false
# app/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    anthropic_api_key:       str
    redis_url:               str   = "redis://localhost:6379"
    api_secret_key:          str   = "dev-secret"
    default_model:           str   = "claude-sonnet-4-6"
    max_tokens_per_request:  int   = 4096
    max_tokens_per_user_day: int   = 100_000
    rate_limit_per_minute:   int   = 30
    debug:                   bool  = False

    class Config:
        env_file = ".env"

@lru_cache()
def get_settings() -> Settings:
    return Settings()

settings = get_settings()

三、数据模型

# app/models.py
from pydantic import BaseModel, Field
from typing import Optional, Literal
from datetime import datetime
import uuid

class Message(BaseModel):
    role:    Literal["user", "assistant"]
    content: str

class ChatRequest(BaseModel):
    message:    str              = Field(..., min_length=1, max_length=10000)
    session_id: Optional[str]   = None    # 不传则创建新会话
    model:      Optional[str]   = None    # 不传则用默认模型
    system:     Optional[str]   = None    # 可选的 system prompt 覆盖
    max_tokens: Optional[int]   = Field(None, ge=1, le=4096)
    temperature:Optional[float] = Field(None, ge=0.0, le=1.0)
    stream:     bool            = False

class ChatResponse(BaseModel):
    session_id:    str
    message_id:    str
    content:       str
    model:         str
    input_tokens:  int
    output_tokens: int
    created_at:    datetime

class SessionInfo(BaseModel):
    session_id:   str
    message_count:int
    created_at:   datetime
    last_active:  datetime
    total_tokens: int

class UsageStats(BaseModel):
    user_id:        str
    date:           str
    input_tokens:   int
    output_tokens:  int
    total_tokens:   int
    request_count:  int
    remaining_quota:int

四、Claude 服务层

# app/services/claude.py
import anthropic
import asyncio
from typing import AsyncGenerator
from app.config import settings

# 使用异步客户端
_async_client: anthropic.AsyncAnthropic | None = None

def get_claude_client() -> anthropic.AsyncAnthropic:
    global _async_client
    if _async_client is None:
        _async_client = anthropic.AsyncAnthropic(
            api_key=settings.anthropic_api_key,
            max_retries=3,          # SDK 内置重试
            timeout=60.0,           # 请求超时
        )
    return _async_client


DEFAULT_SYSTEM = """你是一个专业、友善的 AI 助手。
- 回答准确、简洁
- 遇到不确定的问题,明确说明不确定性
- 不提供可能有害的内容"""


async def chat_complete(
    messages:    list[dict],
    model:       str   = None,
    system:      str   = None,
    max_tokens:  int   = None,
    temperature: float = None,
) -> tuple[str, int, int]:
    """
    普通对话(等待完整响应)

    Returns:
        (回复文本, 输入token数, 输出token数)
    """
    client = get_claude_client()

    kwargs = dict(
        model      = model or settings.default_model,
        max_tokens = max_tokens or settings.max_tokens_per_request,
        system     = system or DEFAULT_SYSTEM,
        messages   = messages,
    )
    if temperature is not None:
        kwargs["temperature"] = temperature

    response = await client.messages.create(**kwargs)

    content = response.content[0].text
    return content, response.usage.input_tokens, response.usage.output_tokens


async def chat_stream(
    messages:    list[dict],
    model:       str   = None,
    system:      str   = None,
    max_tokens:  int   = None,
    temperature: float = None,
) -> AsyncGenerator[dict, None]:
    """
    流式对话,逐块 yield

    Yields:
        {"type": "text",  "data": "文字片段"}
        {"type": "usage", "data": {"input": N, "output": M}}
        {"type": "done",  "data": None}
        {"type": "error", "data": "错误信息"}
    """
    client = get_claude_client()

    kwargs = dict(
        model      = model or settings.default_model,
        max_tokens = max_tokens or settings.max_tokens_per_request,
        system     = system or DEFAULT_SYSTEM,
        messages   = messages,
    )
    if temperature is not None:
        kwargs["temperature"] = temperature

    try:
        async with client.messages.stream(**kwargs) as stream:
            async for text in stream.text_stream:
                yield {"type": "text", "data": text}

            # 流结束后获取用量统计
            final = await stream.get_final_message()
            yield {
                "type": "usage",
                "data": {
                    "input":  final.usage.input_tokens,
                    "output": final.usage.output_tokens,
                }
            }
            yield {"type": "done", "data": None}

    except anthropic.RateLimitError:
        yield {"type": "error", "data": "API 速率限制,请稍后重试"}
    except anthropic.APIStatusError as e:
        yield {"type": "error", "data": f"API 错误:{e.status_code}"}
    except Exception as e:
        yield {"type": "error", "data": str(e)}

五、会话管理服务

# app/services/session.py
import json
import uuid
from datetime import datetime, timedelta
from redis.asyncio import Redis
from app.models import Message, SessionInfo

SESSION_TTL = 60 * 60 * 2       # 会话 2 小时无活动后过期
MAX_HISTORY = 20                  # 每个会话最多保留 20 条消息(控制 token 用量)


class SessionService:

    def __init__(self, redis: Redis):
        self.redis = redis

    def _key(self, session_id: str) -> str:
        return f"session:{session_id}"

    def _meta_key(self, session_id: str) -> str:
        return f"session_meta:{session_id}"

    async def get_or_create(
        self,
        session_id:  str | None,
        user_id:     str,
        system:      str | None = None,
    ) -> tuple[str, list[dict]]:
        """
        获取已有会话或创建新会话

        Returns:
            (session_id, messages_history)
        """
        if session_id:
            history = await self._load(session_id)
            if history is not None:
                await self._update_meta(session_id)
                return session_id, history
            # session_id 不存在或已过期,创建新会话

        # 创建新会话
        new_id = str(uuid.uuid4())
        await self._save(new_id, [])
        await self._create_meta(new_id, user_id, system)
        return new_id, []

    async def append(
        self,
        session_id: str,
        role:       str,
        content:    str,
        tokens:     int = 0,
    ):
        """追加消息到会话历史"""
        history = await self._load(session_id) or []
        history.append({"role": role, "content": content})

        # 滑动窗口:超过上限时删除最早的消息(保留 system 相关的逻辑)
        if len(history) > MAX_HISTORY:
            history = history[-(MAX_HISTORY):]

        await self._save(session_id, history)

        # 更新 token 用量统计
        if tokens > 0:
            meta_key = self._meta_key(session_id)
            await self.redis.hincrby(meta_key, "total_tokens", tokens)
            await self.redis.hincrby(meta_key, "message_count", 1)

    async def get_info(self, session_id: str) -> SessionInfo | None:
        """获取会话元信息"""
        meta = await self.redis.hgetall(self._meta_key(session_id))
        if not meta:
            return None
        return SessionInfo(
            session_id    = session_id,
            message_count = int(meta.get(b"message_count", 0)),
            created_at    = datetime.fromisoformat(meta[b"created_at"].decode()),
            last_active   = datetime.fromisoformat(meta[b"last_active"].decode()),
            total_tokens  = int(meta.get(b"total_tokens", 0)),
        )

    async def delete(self, session_id: str):
        await self.redis.delete(self._key(session_id))
        await self.redis.delete(self._meta_key(session_id))

    async def _load(self, session_id: str) -> list[dict] | None:
        data = await self.redis.get(self._key(session_id))
        return json.loads(data) if data else None

    async def _save(self, session_id: str, history: list[dict]):
        await self.redis.setex(
            self._key(session_id),
            SESSION_TTL,
            json.dumps(history, ensure_ascii=False),
        )

    async def _create_meta(self, session_id: str, user_id: str, system: str | None):
        now = datetime.utcnow().isoformat()
        await self.redis.hset(self._meta_key(session_id), mapping={
            "user_id":       user_id,
            "system":        system or "",
            "created_at":    now,
            "last_active":   now,
            "message_count": 0,
            "total_tokens":  0,
        })
        await self.redis.expire(self._meta_key(session_id), SESSION_TTL)

    async def _update_meta(self, session_id: str):
        await self.redis.hset(
            self._meta_key(session_id),
            "last_active",
            datetime.utcnow().isoformat(),
        )
        await self.redis.expire(self._key(session_id),      SESSION_TTL)
        await self.redis.expire(self._meta_key(session_id), SESSION_TTL)

六、依赖注入:认证与限流

# app/deps.py
from fastapi import Depends, HTTPException, Header, Request
from redis.asyncio import Redis, from_url
from app.config import settings
from app.services.session import SessionService
import time

# ── Redis 连接池 ──────────────────────────────────

_redis: Redis | None = None

async def get_redis() -> Redis:
    global _redis
    if _redis is None:
        _redis = from_url(settings.redis_url, decode_responses=False)
    return _redis


# ── 会话服务依赖 ──────────────────────────────────

async def get_session_service(
    redis: Redis = Depends(get_redis),
) -> SessionService:
    return SessionService(redis)


# ── API Key 认证 ──────────────────────────────────

async def verify_api_key(
    x_api_key: str = Header(..., description="API 密钥"),
) -> str:
    """
    简单的 API Key 认证
    生产环境建议替换为数据库存储的多用户 Key 管理
    """
    # 实际项目:从数据库查找 key 对应的用户
    if x_api_key == settings.api_secret_key:
        return "default_user"

    # 支持多用户:key 格式 "user_id:secret"
    if ":" in x_api_key:
        user_id, secret = x_api_key.split(":", 1)
        # TODO:从数据库验证
        if len(user_id) > 0 and len(secret) >= 16:
            return user_id

    raise HTTPException(status_code=401, detail="无效的 API Key")


# ── 速率限制 ──────────────────────────────────────

async def check_rate_limit(
    request:  Request,
    user_id:  str   = Depends(verify_api_key),
    redis:    Redis = Depends(get_redis),
) -> str:
    """
    基于滑动窗口的速率限制
    每个用户每分钟最多 N 次请求
    """
    now     = int(time.time())
    window  = 60   # 60 秒窗口
    key     = f"rate:{user_id}:{now // window}"

    count = await redis.incr(key)
    if count == 1:
        await redis.expire(key, window * 2)

    if count > settings.rate_limit_per_minute:
        retry_after = window - (now % window)
        raise HTTPException(
            status_code=429,
            detail=f"请求过于频繁,请 {retry_after} 秒后重试",
            headers={"Retry-After": str(retry_after)},
        )
    return user_id


# ── Token 用量限制 ────────────────────────────────

async def check_token_quota(
    user_id: str   = Depends(verify_api_key),
    redis:   Redis = Depends(get_redis),
) -> str:
    """检查用户今日 token 用量是否超限"""
    from datetime import date
    today   = date.today().isoformat()
    key     = f"usage:{user_id}:{today}"
    current = await redis.get(key)
    used    = int(current) if current else 0

    if used >= settings.max_tokens_per_user_day:
        raise HTTPException(
            status_code=429,
            detail=f"今日 token 用量已达上限({settings.max_tokens_per_user_day:,}),明日重置",
        )
    return user_id


async def record_token_usage(
    user_id: str,
    tokens:  int,
    redis:   Redis,
):
    """记录 token 用量(在响应后调用)"""
    from datetime import date
    today = date.today().isoformat()
    key   = f"usage:{user_id}:{today}"
    await redis.incrby(key, tokens)
    await redis.expire(key, 86400 * 2)   # 保留2天

七、路由层:对话接口

# app/routers/chat.py
import uuid
import json
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from fastapi.responses import StreamingResponse
from app.models import ChatRequest, ChatResponse
from app.services.claude import chat_complete, chat_stream
from app.services.session import SessionService
from app.deps import (
    check_rate_limit, check_token_quota,
    get_session_service, get_redis, record_token_usage,
)
from redis.asyncio import Redis

router = APIRouter(prefix="/v1", tags=["对话"])


@router.post("/chat", response_model=ChatResponse)
async def chat(
    req:             ChatRequest,
    background:      BackgroundTasks,
    user_id:         str            = Depends(check_rate_limit),
    _quota_check:    str            = Depends(check_token_quota),
    session_svc:     SessionService = Depends(get_session_service),
    redis:           Redis          = Depends(get_redis),
):
    """
    普通对话接口(等待完整响应)
    适合短对话和对延迟不敏感的场景
    """
    # 获取或创建会话
    session_id, history = await session_svc.get_or_create(
        session_id=req.session_id,
        user_id=user_id,
        system=req.system,
    )

    # 构建消息列表
    history.append({"role": "user", "content": req.message})

    # 调用 Claude
    try:
        content, input_tokens, output_tokens = await chat_complete(
            messages    = history,
            model       = req.model,
            system      = req.system,
            max_tokens  = req.max_tokens,
            temperature = req.temperature,
        )
    except Exception as e:
        raise HTTPException(status_code=502, detail=f"AI 服务暂时不可用:{str(e)}")

    # 保存会话历史
    await session_svc.append(session_id, "user",      req.message)
    await session_svc.append(session_id, "assistant", content,
                             tokens=input_tokens + output_tokens)

    # 异步记录用量(不阻塞响应)
    total_tokens = input_tokens + output_tokens
    background.add_task(record_token_usage, user_id, total_tokens, redis)

    return ChatResponse(
        session_id    = session_id,
        message_id    = str(uuid.uuid4()),
        content       = content,
        model         = req.model or "claude-sonnet-4-6",
        input_tokens  = input_tokens,
        output_tokens = output_tokens,
        created_at    = datetime.utcnow(),
    )


@router.post("/chat/stream")
async def chat_stream_endpoint(
    req:          ChatRequest,
    user_id:      str            = Depends(check_rate_limit),
    _quota_check: str            = Depends(check_token_quota),
    session_svc:  SessionService = Depends(get_session_service),
    redis:        Redis          = Depends(get_redis),
):
    """
    流式对话接口(SSE)
    适合长内容生成和需要实时反馈的场景
    """
    session_id, history = await session_svc.get_or_create(
        session_id=req.session_id,
        user_id=user_id,
        system=req.system,
    )

    history.append({"role": "user", "content": req.message})

    async def event_generator():
        full_content = []
        input_t = output_t = 0

        # 发送 session_id(客户端用于后续请求)
        yield f"data: {json.dumps({'type': 'session', 'session_id': session_id})}\n\n"

        async for event in chat_stream(
            messages    = history,
            model       = req.model,
            system      = req.system,
            max_tokens  = req.max_tokens,
            temperature = req.temperature,
        ):
            if event["type"] == "text":
                full_content.append(event["data"])
                yield f"data: {json.dumps({'type': 'text', 'data': event['data']}, ensure_ascii=False)}\n\n"

            elif event["type"] == "usage":
                input_t  = event["data"]["input"]
                output_t = event["data"]["output"]
                yield f"data: {json.dumps({'type': 'usage', 'input': input_t, 'output': output_t})}\n\n"

            elif event["type"] == "error":
                yield f"data: {json.dumps({'type': 'error', 'message': event['data']})}\n\n"
                return

            elif event["type"] == "done":
                # 保存会话历史
                content = "".join(full_content)
                await session_svc.append(session_id, "user",      req.message)
                await session_svc.append(session_id, "assistant", content,
                                         tokens=input_t + output_t)
                await record_token_usage(user_id, input_t + output_t, redis)
                yield "data: {\"type\": \"done\"}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control":      "no-cache",
            "X-Accel-Buffering":  "no",
            "Access-Control-Allow-Origin": "*",
        },
    )

八、应用入口

# app/main.py
import time
import uuid
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import logging

from app.config import settings
from app.routers import chat, sessions

# 日志配置
logging.basicConfig(
    level=logging.DEBUG if settings.debug else logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)


@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用启动 / 关闭时的初始化和清理"""
    logger.info("应用启动,初始化连接...")
    # 可以在这里预热 Redis 连接、加载配置等
    yield
    logger.info("应用关闭,清理资源...")
    # 清理全局连接
    from app.deps import _redis
    if _redis:
        await _redis.close()


app = FastAPI(
    title       = "Claude AI Backend API",
    description = "生产级 Claude AI 后端服务",
    version     = "1.0.0",
    lifespan    = lifespan,
    docs_url    = "/docs" if settings.debug else None,   # 生产环境关闭 Swagger
    redoc_url   = None,
)

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins     = ["*"],   # 生产环境改为具体域名
    allow_credentials = True,
    allow_methods     = ["*"],
    allow_headers     = ["*"],
)

# 请求 ID 和耗时日志中间件
@app.middleware("http")
async def request_logging(request: Request, call_next):
    request_id = str(uuid.uuid4())[:8]
    start      = time.time()
    request.state.request_id = request_id

    response = await call_next(request)

    duration = (time.time() - start) * 1000
    logger.info(
        f"[{request_id}] {request.method} {request.url.path} "
        f"→ {response.status_code} ({duration:.0f}ms)"
    )
    response.headers["X-Request-ID"] = request_id
    return response

# 全局异常处理
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    logger.error(f"未处理的异常:{exc}", exc_info=True)
    return JSONResponse(
        status_code=500,
        content={"detail": "内部服务器错误,请稍后重试"},
    )

# 注册路由
app.include_router(chat.router)

# 健康检查
@app.get("/health", tags=["系统"])
async def health_check():
    return {"status": "ok", "version": "1.0.0"}

@app.get("/", include_in_schema=False)
async def root():
    return {"message": "Claude AI Backend API", "docs": "/docs"}


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(
        "app.main:app",
        host    = "0.0.0.0",
        port    = 8000,
        reload  = settings.debug,
        workers = 1 if settings.debug else 4,
    )

九、Docker 部署

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 只复制依赖文件,利用 Docker 层缓存
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY app/ ./app/

# 非 root 用户运行(安全最佳实践)
RUN useradd -m appuser && chown -R appuser /app
USER appuser

EXPOSE 8000

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
# docker-compose.yml
version: "3.9"

services:
  api:
    build: .
    ports:
      - "8000:8000"
    env_file: .env
    depends_on:
      redis:
        condition: service_healthy
    restart: unless-stopped
    deploy:
      resources:
        limits:
          cpus:   "2"
          memory: "512M"

  redis:
    image: redis:7-alpine
    command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru
    ports:
      - "6379:6379"
    healthcheck:
      test:     ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout:  3s
      retries:  5
    restart: unless-stopped

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
    depends_on:
      - api
    restart: unless-stopped
# nginx.conf(关键片段)
upstream api_backend {
    server api:8000;
    keepalive 32;
}

server {
    listen 80;

    # 流式接口:关闭缓冲,确保 SSE 实时传输
    location /v1/chat/stream {
        proxy_pass         http://api_backend;
        proxy_http_version 1.1;
        proxy_set_header   Connection "";
        proxy_buffering    off;           # 关键:关闭 Nginx 缓冲
        proxy_cache        off;
        proxy_read_timeout 300s;          # 流式连接超时设长一些
        proxy_set_header   Host $host;
        proxy_set_header   X-Real-IP $remote_addr;
    }

    # 普通接口
    location /v1/chat {
        proxy_pass         http://api_backend;
        proxy_http_version 1.1;
        proxy_read_timeout 60s;
        proxy_set_header   Host $host;
        proxy_set_header   X-Real-IP $remote_addr;
    }

    location /health {
        proxy_pass http://api_backend;
    }
}

十、API 调用示例

import httpx
import json

BASE_URL = "http://localhost:8000"
HEADERS  = {"X-API-Key": "your-secret-key-here"}

# 普通对话
response = httpx.post(
    f"{BASE_URL}/v1/chat",
    headers=HEADERS,
    json={"message": "你好,介绍一下 FastAPI 的主要特性", "stream": False},
)
print(response.json())


# 多轮对话(复用 session_id)
first = httpx.post(
    f"{BASE_URL}/v1/chat",
    headers=HEADERS,
    json={"message": "我叫张三,请记住我的名字"},
)
session_id = first.json()["session_id"]

second = httpx.post(
    f"{BASE_URL}/v1/chat",
    headers=HEADERS,
    json={"message": "我叫什么名字?", "session_id": session_id},
)
print(second.json()["content"])  # 应输出:张三


# 流式对话
with httpx.stream(
    "POST",
    f"{BASE_URL}/v1/chat/stream",
    headers=HEADERS,
    json={"message": "写一首关于秋天的诗", "stream": True},
    timeout=60,
) as resp:
    for line in resp.iter_lines():
        if line.startswith("data: "):
            event = json.loads(line[6:])
            if event["type"] == "text":
                print(event["data"], end="", flush=True)
            elif event["type"] == "done":
                print("\n--- 完成 ---")

常见问题

Q:多个 Uvicorn worker 进程之间如何共享会话状态?
会话数据存在 Redis 里,所有 worker 进程共享同一个 Redis 实例,天然支持多进程共享。唯一需要注意的是 Redis 连接池:每个 worker 进程独立维护自己的连接池(全局 _redis 变量),这是正确的设计,不需要跨进程共享连接对象。

Q:流式接口在 Nginx 后面不生效(内容整块返回)怎么解决?
原因是 Nginx 默认会缓冲上游响应。关键配置是 proxy_buffering off——必须在流式接口的 location 块里设置,不能只在全局设置。同时确认 proxy_http_version 1.1proxy_set_header Connection "" 也已配置,这两个是 HTTP/1.1 长连接所必需的。

Q:如何实现更精细的用户级别 API Key 管理?
deps.py 中的 verify_api_key 改为查询数据库(PostgreSQL/MySQL):Key 存储在数据库里,包含用户ID、权限级别、每日配额、过期时间等字段。每次请求查一次数据库,用 Redis 缓存查询结果(TTL 5分钟)减少数据库压力。这样就能实现多租户、细粒度的权限控制。

总结

生产级 AI 后端的核心是三件事:异步处理(FastAPI + 异步 Claude 客户端,支持高并发)、状态管理(Redis 存会话历史和用量计数,多进程安全)、防护机制(速率限制 + Token 配额 + 全局异常处理,保护下游 API 和控制成本)。流式接口是 AI 产品的体验关键,Nginx 的 proxy_buffering off 是绕不过去的配置细节。代码中的每个模块都可以独立替换——比如把 Redis 换成 PostgreSQL 做持久化会话,或者把简单 API Key 认证换成 JWT,不影响其他模块。