📌 内容摘要

  • 向量数据库解决的核心问题:当文档数量超过 Claude 上下文窗口时,如何精准检索相关内容再送给模型。
  • 本文覆盖两大主流向量数据库:Pinecone(托管云服务,上手最快)和 Weaviate(开源自部署,功能最全)。
  • 完整实现 RAG(检索增强生成)架构,含文档切片、Embedding 生成、向量检索、上下文注入五个步骤。
  • 附生产级别的优化技巧:混合检索、重排序、缓存策略,以及 Pinecone vs Weaviate 的选型对比。

一、为什么需要向量数据库?

Claude Sonnet 4.6 和 Opus 4.6 支持 100 万 token 上下文,很多人会问:既然上下文这么大,还需要向量数据库吗?

答案是:取决于你的文档库规模。100 万 token 约等于 75 万汉字——听起来很多,但一个中型企业的知识库可能有几万篇文档、数十亿字。这时候向量数据库就是必须的:它负责从海量文档中找到最相关的几篇,再把这几篇送给 Claude 处理。

文档规模 推荐方案 理由
1-20 份文档 直接放入上下文 100 万 token 够用,不需要额外架构
20-200 份文档 视情况,Prompt Caching 或轻量 RAG Prompt Caching 可节省 90% 费用
200份以上 / 持续增长 向量数据库 + RAG 超出上下文限制,需要精准检索

二、RAG 架构全景

【离线阶段:文档入库】
原始文档(PDF/Word/网页)
    → 文本提取
    → 文档切片(Chunking)
    → Embedding 生成(文本 → 向量)
    → 写入向量数据库

【在线阶段:查询回答】
用户问题
    → 问题 Embedding
    → 向量相似度检索(Top-K 相关片段)
    → [可选] 重排序(Reranking)
    → 构建上下文 Prompt
    → Claude 生成回答
    → 返回用户

三、环境准备

pip install anthropic pinecone-client weaviate-client openai tiktoken
# .env
ANTHROPIC_API_KEY=sk-ant-...
PINECONE_API_KEY=pcsk_...
OPENAI_API_KEY=sk-...        # 用于生成 Embeddings(text-embedding-3-small)
WEAVIATE_URL=http://localhost:8080
💡 关于 Embedding 模型的选择
Claude 目前没有原生的 Embedding API,本文使用 OpenAI 的 text-embedding-3-small(1536 维,性价比最高)生成向量。也可以选择 Cohere Embed v3、BGE-M3 等其他模型,原理和代码结构完全相同,只需替换 Embedding 生成部分。

四、通用基础层:文档处理和 Embedding

import os
import re
import time
import hashlib
from typing import Optional
from dataclasses import dataclass, field
import tiktoken
from openai import OpenAI

openai_client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])

# ── 数据结构 ──────────────────────────────────────

@dataclass
class Document:
    """原始文档"""
    id:       str
    content:  str
    metadata: dict = field(default_factory=dict)  # title, source, date 等

@dataclass
class Chunk:
    """切片后的文档片段"""
    id:          str
    doc_id:      str
    content:     str
    embedding:   Optional[list[float]] = None
    metadata:    dict = field(default_factory=dict)
    chunk_index: int = 0


# ── 文档切片 ──────────────────────────────────────

def chunk_document(
    doc:        Document,
    chunk_size: int = 512,      # 每片约 512 token
    overlap:    int = 64,       # 前后重叠 64 token,保证上下文连续性
    encoding:   str = "cl100k_base",
) -> list[Chunk]:
    """
    把文档切成固定大小的片段

    overlap 的作用:防止一个知识点恰好被切断在两个片段之间
    """
    enc = tiktoken.get_encoding(encoding)
    tokens = enc.encode(doc.content)
    chunks = []
    start = 0
    idx = 0

    while start < len(tokens):
        end = min(start + chunk_size, len(tokens))
        chunk_tokens = tokens[start:end]
        chunk_text = enc.decode(chunk_tokens)

        # 生成稳定的 chunk ID(基于内容哈希)
        chunk_id = hashlib.md5(
            f"{doc.id}_{idx}_{chunk_text[:50]}".encode()
        ).hexdigest()[:16]

        chunks.append(Chunk(
            id=chunk_id,
            doc_id=doc.id,
            content=chunk_text,
            chunk_index=idx,
            metadata={
                **doc.metadata,
                "doc_id":      doc.id,
                "chunk_index": idx,
                "char_count":  len(chunk_text),
            }
        ))

        start += chunk_size - overlap
        idx += 1

    return chunks


# ── Embedding 生成 ────────────────────────────────

def generate_embeddings(
    texts:      list[str],
    model:      str = "text-embedding-3-small",
    batch_size: int = 100,
) -> list[list[float]]:
    """
    批量生成 Embedding,自动处理速率限制

    text-embedding-3-small: 1536 维,$0.02/M tokens
    text-embedding-3-large: 3072 维,$0.13/M tokens(精度更高但成本高)
    """
    all_embeddings = []

    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        try:
            response = openai_client.embeddings.create(
                model=model,
                input=batch,
            )
            batch_embeddings = [item.embedding for item in response.data]
            all_embeddings.extend(batch_embeddings)
        except Exception as e:
            print(f"Embedding 生成失败(批次 {i//batch_size}):{e}")
            time.sleep(5)
            # 重试一次
            response = openai_client.embeddings.create(model=model, input=batch)
            all_embeddings.extend([item.embedding for item in response.data])

        if i + batch_size < len(texts):
            time.sleep(0.1)   # 避免速率限制

    return all_embeddings


def embed_chunks(chunks: list[Chunk]) -> list[Chunk]:
    """为文档片段批量生成 Embedding"""
    texts = [chunk.content for chunk in chunks]
    embeddings = generate_embeddings(texts)
    for chunk, embedding in zip(chunks, embeddings):
        chunk.embedding = embedding
    return chunks

五、Pinecone 集成

from pinecone import Pinecone, ServerlessSpec

class PineconeStore:
    """Pinecone 向量存储,封装索引操作"""

    def __init__(
        self,
        index_name:  str = "claude-rag",
        dimension:   int = 1536,       # text-embedding-3-small 的维度
        metric:      str = "cosine",
        cloud:       str = "aws",
        region:      str = "us-east-1",
    ):
        self.pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
        self.index_name = index_name

        # 如果索引不存在则创建
        existing = [idx.name for idx in self.pc.list_indexes()]
        if index_name not in existing:
            self.pc.create_index(
                name=index_name,
                dimension=dimension,
                metric=metric,
                spec=ServerlessSpec(cloud=cloud, region=region),
            )
            print(f"已创建索引:{index_name}")
            time.sleep(2)   # 等待索引就绪

        self.index = self.pc.Index(index_name)
        print(f"连接到索引:{index_name},统计:{self.index.describe_index_stats()}")

    def upsert_chunks(self, chunks: list[Chunk], batch_size: int = 100):
        """批量写入文档片段"""
        vectors = []
        for chunk in chunks:
            if chunk.embedding is None:
                continue
            vectors.append({
                "id":     chunk.id,
                "values": chunk.embedding,
                "metadata": {
                    **chunk.metadata,
                    "content": chunk.content,   # 把原文也存入 metadata
                }
            })

        # 批量 upsert
        for i in range(0, len(vectors), batch_size):
            batch = vectors[i:i + batch_size]
            self.index.upsert(vectors=batch)
            print(f"已写入 {min(i + batch_size, len(vectors))}/{len(vectors)} 个片段")

    def search(
        self,
        query_embedding: list[float],
        top_k:           int = 5,
        filter:          Optional[dict] = None,
        score_threshold: float = 0.6,
    ) -> list[dict]:
        """
        向量相似度搜索

        Args:
            filter: 元数据过滤,如 {"source": "技术文档"} 只搜索技术文档
            score_threshold: 相似度阈值,低于此值的结果不返回
        """
        result = self.index.query(
            vector=query_embedding,
            top_k=top_k,
            filter=filter,
            include_metadata=True,
        )

        return [
            {
                "id":       match.id,
                "score":    match.score,
                "content":  match.metadata.get("content", ""),
                "metadata": {k: v for k, v in match.metadata.items()
                             if k != "content"},
            }
            for match in result.matches
            if match.score >= score_threshold
        ]

    def delete_by_doc(self, doc_id: str):
        """删除某个文档的所有片段"""
        self.index.delete(filter={"doc_id": {"$eq": doc_id}})

    def get_stats(self) -> dict:
        return self.index.describe_index_stats()

六、Weaviate 集成

import weaviate
import weaviate.classes as wvc

class WeaviateStore:
    """
    Weaviate 向量存储
    支持混合检索(向量 + 关键词 BM25)
    """

    COLLECTION_NAME = "DocumentChunk"

    def __init__(self, url: str = "http://localhost:8080"):
        self.client = weaviate.connect_to_local(
            host=url.replace("http://", "").split(":")[0],
            port=int(url.split(":")[-1]) if ":" in url else 8080,
        )
        self._ensure_collection()

    def _ensure_collection(self):
        """如果集合不存在则创建"""
        if not self.client.collections.exists(self.COLLECTION_NAME):
            self.client.collections.create(
                name=self.COLLECTION_NAME,
                vectorizer_config=wvc.config.Configure.Vectorizer.none(),  # 我们自己提供向量
                properties=[
                    wvc.config.Property(name="doc_id",      data_type=wvc.config.DataType.TEXT),
                    wvc.config.Property(name="content",     data_type=wvc.config.DataType.TEXT),
                    wvc.config.Property(name="chunk_index", data_type=wvc.config.DataType.INT),
                    wvc.config.Property(name="source",      data_type=wvc.config.DataType.TEXT),
                    wvc.config.Property(name="title",       data_type=wvc.config.DataType.TEXT),
                ],
            )
            print(f"已创建集合:{self.COLLECTION_NAME}")

    def upsert_chunks(self, chunks: list[Chunk]):
        """批量写入,使用 Weaviate 的批量导入"""
        collection = self.client.collections.get(self.COLLECTION_NAME)

        with collection.batch.dynamic() as batch:
            for chunk in chunks:
                if chunk.embedding is None:
                    continue
                batch.add_object(
                    properties={
                        "doc_id":      chunk.doc_id,
                        "content":     chunk.content,
                        "chunk_index": chunk.chunk_index,
                        "source":      chunk.metadata.get("source", ""),
                        "title":       chunk.metadata.get("title", ""),
                    },
                    vector=chunk.embedding,
                    uuid=chunk.id,
                )
        print(f"已写入 {len(chunks)} 个片段")

    def search_vector(
        self,
        query_embedding: list[float],
        top_k:           int = 5,
        filters:         Optional[wvc.query.Filter] = None,
    ) -> list[dict]:
        """纯向量相似度搜索"""
        collection = self.client.collections.get(self.COLLECTION_NAME)
        result = collection.query.near_vector(
            near_vector=query_embedding,
            limit=top_k,
            filters=filters,
            return_metadata=wvc.query.MetadataQuery(distance=True),
        )
        return [
            {
                "id":       str(obj.uuid),
                "score":    1 - obj.metadata.distance,   # 距离转相似度
                "content":  obj.properties["content"],
                "metadata": {k: v for k, v in obj.properties.items()
                             if k != "content"},
            }
            for obj in result.objects
        ]

    def search_hybrid(
        self,
        query_text:      str,
        query_embedding: list[float],
        top_k:           int = 5,
        alpha:           float = 0.75,   # 0=纯关键词,1=纯向量,0.75偏向语义
    ) -> list[dict]:
        """
        混合检索:向量语义 + BM25 关键词
        对于专业术语和专有名词,混合检索效果优于纯向量检索
        """
        collection = self.client.collections.get(self.COLLECTION_NAME)
        result = collection.query.hybrid(
            query=query_text,
            vector=query_embedding,
            limit=top_k,
            alpha=alpha,
            return_metadata=wvc.query.MetadataQuery(score=True),
        )
        return [
            {
                "id":       str(obj.uuid),
                "score":    obj.metadata.score,
                "content":  obj.properties["content"],
                "metadata": {k: v for k, v in obj.properties.items()
                             if k != "content"},
            }
            for obj in result.objects
        ]

    def close(self):
        self.client.close()

七、RAG 核心引擎

import anthropic
from typing import Union

claude = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])

# 支持 Pinecone 或 Weaviate
VectorStore = Union[PineconeStore, WeaviateStore]

class RAGEngine:
    """
    RAG 核心引擎:连接向量检索和 Claude 生成
    """

    def __init__(
        self,
        store:        VectorStore,
        model:        str = "claude-sonnet-4-6",
        top_k:        int = 5,
        max_context:  int = 4000,   # 最多送给 Claude 的上下文字符数
    ):
        self.store       = store
        self.model       = model
        self.top_k       = top_k
        self.max_context = max_context

    def _build_context(self, chunks: list[dict]) -> str:
        """把检索到的片段组装成上下文"""
        context_parts = []
        total_chars = 0

        for i, chunk in enumerate(chunks, 1):
            source = chunk["metadata"].get("title") or chunk["metadata"].get("source", "未知来源")
            part = f"[片段 {i}] 来源:{source}\n{chunk['content']}"

            if total_chars + len(part) > self.max_context:
                break

            context_parts.append(part)
            total_chars += len(part)

        return "\n\n---\n\n".join(context_parts)

    def _rerank(self, query: str, chunks: list[dict]) -> list[dict]:
        """
        用 Claude 对检索结果重排序(可选,提升精度但增加延迟和成本)
        适合对精度要求高的场景
        """
        if len(chunks) <= 2:
            return chunks

        chunks_text = "\n\n".join(
            f"[{i}] {c['content'][:200]}..." for i, c in enumerate(chunks)
        )
        response = claude.messages.create(
            model="claude-haiku-4-5-20251001",    # 重排序用 Haiku 节省成本
            max_tokens=128,
            temperature=0,
            messages=[{
                "role": "user",
                "content": f"""用户问题:{query}

以下是检索到的文档片段,按与问题的相关程度从高到低排序,只输出片段编号,用逗号分隔:
{chunks_text}

只输出类似"2,0,4,1,3"这样的编号序列。"""
            }]
        )

        try:
            order = [int(n.strip()) for n in response.content[0].text.split(",")]
            reranked = [chunks[i] for i in order if i < len(chunks)]
            return reranked
        except Exception:
            return chunks   # 重排序失败时返回原始顺序

    def query(
        self,
        question:    str,
        use_rerank:  bool = False,
        filter:      Optional[dict] = None,
        stream:      bool = False,
    ) -> Union[str, any]:
        """
        执行 RAG 查询

        Args:
            question:   用户问题
            use_rerank: 是否用 Claude 重排序(提高精度,增加延迟)
            filter:     元数据过滤条件(Pinecone 格式)
            stream:     是否流式输出
        """
        # Step 1:生成问题的 Embedding
        query_embedding = generate_embeddings([question])[0]

        # Step 2:向量检索
        if isinstance(self.store, WeaviateStore):
            # Weaviate 优先使用混合检索
            chunks = self.store.search_hybrid(
                query_text=question,
                query_embedding=query_embedding,
                top_k=self.top_k,
            )
        else:
            chunks = self.store.search(
                query_embedding=query_embedding,
                top_k=self.top_k,
                filter=filter,
            )

        if not chunks:
            return "抱歉,未找到相关文档来回答这个问题。"

        # Step 3:可选重排序
        if use_rerank:
            chunks = self._rerank(question, chunks)

        # Step 4:构建上下文
        context = self._build_context(chunks)

        # Step 5:调用 Claude 生成回答
        system = """你是一个专业的知识库问答助手。
回答规则:
- 严格基于提供的文档内容回答,不要添加文档中没有的信息
- 如果文档中没有相关信息,明确说明"根据现有文档,无法回答此问题"
- 引用具体内容时说明来源片段编号
- 回答要简洁准确,避免冗余"""

        user_message = f"""请基于以下文档内容回答问题。

文档内容:
{context}

问题:{question}"""

        if stream:
            return claude.messages.stream(
                model=self.model,
                max_tokens=2048,
                system=system,
                messages=[{"role": "user", "content": user_message}]
            )
        else:
            response = claude.messages.create(
                model=self.model,
                max_tokens=2048,
                system=system,
                messages=[{"role": "user", "content": user_message}]
            )
            return response.content[0].text

    def add_documents(self, documents: list[Document]):
        """完整的文档入库流程:切片 → Embedding → 存储"""
        all_chunks = []
        for doc in documents:
            chunks = chunk_document(doc)
            all_chunks.extend(chunks)
            print(f"文档 '{doc.id}' 切分为 {len(chunks)} 个片段")

        print(f"正在生成 {len(all_chunks)} 个片段的 Embedding...")
        all_chunks = embed_chunks(all_chunks)

        print("正在写入向量数据库...")
        self.store.upsert_chunks(all_chunks)
        print(f"✅ 入库完成,共 {len(all_chunks)} 个片段")

八、完整使用示例

# ── 示例一:使用 Pinecone ────────────────────────

pinecone_store = PineconeStore(index_name="company-kb")
rag = RAGEngine(store=pinecone_store, model="claude-sonnet-4-6")

# 入库文档
documents = [
    Document(
        id="policy_001",
        content="""公司差旅报销政策(2026年版)
        
        一、国内出差标准
        住宿费用:一线城市(北京、上海、广州、深圳)不超过600元/晚,
        其他城市不超过400元/晚。...
        
        二、交通费用
        飞机:经济舱,提前7天购票可报销。...
        高铁:二等座,特殊情况一等座需提前申请。...""",
        metadata={"title": "差旅报销政策", "source": "HR 文档", "date": "2026-01"}
    ),
    Document(
        id="product_001",
        content="""产品使用手册 v3.2
        
        第一章:快速入门
        1.1 系统要求:操作系统 Windows 10+ 或 macOS 12+,内存 8GB 以上...
        
        第二章:功能详解
        2.1 数据导入:支持 Excel、CSV、JSON 格式...""",
        metadata={"title": "产品手册", "source": "产品文档", "date": "2026-03"}
    ),
]

rag.add_documents(documents)

# 查询
answer = rag.query("上海出差住宿费用上限是多少?")
print(answer)

# 带元数据过滤的查询(只搜索 HR 文档)
answer = rag.query(
    "报销需要哪些材料?",
    filter={"source": {"$eq": "HR 文档"}}
)
print(answer)

# 流式输出
print("\n流式回答:")
with rag.query("产品支持哪些导入格式?", stream=True) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)


# ── 示例二:使用 Weaviate ────────────────────────

weaviate_store = WeaviateStore(url="http://localhost:8080")
rag_weaviate = RAGEngine(store=weaviate_store)

rag_weaviate.add_documents(documents)

# 混合检索(Weaviate 自动使用,无需额外配置)
answer = rag_weaviate.query(
    question="Excel 文件怎么导入?",
    use_rerank=True,    # 开启重排序提高精度
)
print(answer)

weaviate_store.close()

九、Pinecone vs Weaviate 选型

维度 Pinecone Weaviate
部署方式 全托管云服务 开源,支持自部署 / 托管云
上手难度 ⭐⭐(最简单) ⭐⭐⭐(中等)
混合检索 需要额外配置 原生支持 BM25+向量
免费额度 Serverless 免费版(有限制) 开源版完全免费自部署
数据主权 数据在 Pinecone 云端 自部署完全掌控数据
多模态支持 纯向量 文本 + 图片 + 多模态
适合场景 快速原型,SaaS 产品,不想运维 企业内部部署,数据安全要求高,需要混合检索

十、生产级优化建议

优化一:查询 Embedding 缓存

import hashlib
from functools import lru_cache

# 简单的内存缓存(生产环境用 Redis)
_embedding_cache: dict[str, list[float]] = {}

def get_embedding_cached(text: str) -> list[float]:
    key = hashlib.md5(text.encode()).hexdigest()
    if key not in _embedding_cache:
        _embedding_cache[key] = generate_embeddings([text])[0]
    return _embedding_cache[key]

优化二:文档更新检测(避免重复入库)

def get_doc_hash(content: str) -> str:
    return hashlib.sha256(content.encode()).hexdigest()[:16]

def smart_upsert(rag: RAGEngine, doc: Document, hash_store: dict):
    """只在文档内容变化时重新入库"""
    new_hash = get_doc_hash(doc.content)
    if hash_store.get(doc.id) == new_hash:
        print(f"文档 {doc.id} 未变化,跳过")
        return
    rag.store.delete_by_doc(doc.id)
    rag.add_documents([doc])
    hash_store[doc.id] = new_hash
    print(f"文档 {doc.id} 已更新")

常见问题

Q:chunk_size 设多大合适?
没有通用最优值,取决于你的文档类型和查询方式。技术文档和 FAQ 建议 256-512 token(每个片段完整回答一个问题);叙事性文档如新闻、报告建议 512-1024 token(保留更多上下文)。建议用不同的 chunk_size 各跑一批测试,用 LLM-as-Judge 评估检索质量。

Q:什么时候需要开 use_rerank?
当检索结果的相关性不够精准时——典型场景是查询词和文档用词差异大(如用户问”报销要什么材料”,文档写的是”申请所需附件”)。重排序用 Haiku 实现,额外成本很低,但会增加约 0.5-1 秒延迟。对精度敏感的场景建议开启。

Q:向量数据库里存的内容能被 Claude 直接看到吗?
不能。向量数据库只存储向量和元数据,Claude 看不到向量数据库里的内容。RAG 的工作方式是:先用向量检索找到最相关的文本片段,再把这些文本片段作为上下文拼接进 Prompt 送给 Claude。Claude 看到的是检索后的文本,而不是向量数据库本身。

总结

向量数据库 + Claude 的 RAG 架构解决的核心问题是”海量文档中精准检索”。选型上:快速上手或 SaaS 产品用 Pinecone;数据安全要求高或需要混合检索用 Weaviate。生产环境的关键优化点是:合理的 chunk_size(直接影响检索质量)、Embedding 缓存(节省成本)、混合检索(提升专业术语命中率)。代码层面,本文提供的抽象层让两个数据库的切换只需改一行初始化代码,方便迁移和测试。