📌 内容摘要
- 向量数据库解决的核心问题:当文档数量超过 Claude 上下文窗口时,如何精准检索相关内容再送给模型。
- 本文覆盖两大主流向量数据库:Pinecone(托管云服务,上手最快)和 Weaviate(开源自部署,功能最全)。
- 完整实现 RAG(检索增强生成)架构,含文档切片、Embedding 生成、向量检索、上下文注入五个步骤。
- 附生产级别的优化技巧:混合检索、重排序、缓存策略,以及 Pinecone vs Weaviate 的选型对比。
一、为什么需要向量数据库?
Claude Sonnet 4.6 和 Opus 4.6 支持 100 万 token 上下文,很多人会问:既然上下文这么大,还需要向量数据库吗?
答案是:取决于你的文档库规模。100 万 token 约等于 75 万汉字——听起来很多,但一个中型企业的知识库可能有几万篇文档、数十亿字。这时候向量数据库就是必须的:它负责从海量文档中找到最相关的几篇,再把这几篇送给 Claude 处理。
| 文档规模 | 推荐方案 | 理由 |
|---|---|---|
| 1-20 份文档 | 直接放入上下文 | 100 万 token 够用,不需要额外架构 |
| 20-200 份文档 | 视情况,Prompt Caching 或轻量 RAG | Prompt Caching 可节省 90% 费用 |
| 200份以上 / 持续增长 | 向量数据库 + RAG | 超出上下文限制,需要精准检索 |
二、RAG 架构全景
【离线阶段:文档入库】
原始文档(PDF/Word/网页)
→ 文本提取
→ 文档切片(Chunking)
→ Embedding 生成(文本 → 向量)
→ 写入向量数据库
【在线阶段:查询回答】
用户问题
→ 问题 Embedding
→ 向量相似度检索(Top-K 相关片段)
→ [可选] 重排序(Reranking)
→ 构建上下文 Prompt
→ Claude 生成回答
→ 返回用户
三、环境准备
pip install anthropic pinecone-client weaviate-client openai tiktoken
# .env ANTHROPIC_API_KEY=sk-ant-... PINECONE_API_KEY=pcsk_... OPENAI_API_KEY=sk-... # 用于生成 Embeddings(text-embedding-3-small) WEAVIATE_URL=http://localhost:8080
Claude 目前没有原生的 Embedding API,本文使用 OpenAI 的
text-embedding-3-small(1536 维,性价比最高)生成向量。也可以选择 Cohere Embed v3、BGE-M3 等其他模型,原理和代码结构完全相同,只需替换 Embedding 生成部分。
四、通用基础层:文档处理和 Embedding
import os
import re
import time
import hashlib
from typing import Optional
from dataclasses import dataclass, field
import tiktoken
from openai import OpenAI
openai_client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
# ── 数据结构 ──────────────────────────────────────
@dataclass
class Document:
"""原始文档"""
id: str
content: str
metadata: dict = field(default_factory=dict) # title, source, date 等
@dataclass
class Chunk:
"""切片后的文档片段"""
id: str
doc_id: str
content: str
embedding: Optional[list[float]] = None
metadata: dict = field(default_factory=dict)
chunk_index: int = 0
# ── 文档切片 ──────────────────────────────────────
def chunk_document(
doc: Document,
chunk_size: int = 512, # 每片约 512 token
overlap: int = 64, # 前后重叠 64 token,保证上下文连续性
encoding: str = "cl100k_base",
) -> list[Chunk]:
"""
把文档切成固定大小的片段
overlap 的作用:防止一个知识点恰好被切断在两个片段之间
"""
enc = tiktoken.get_encoding(encoding)
tokens = enc.encode(doc.content)
chunks = []
start = 0
idx = 0
while start < len(tokens):
end = min(start + chunk_size, len(tokens))
chunk_tokens = tokens[start:end]
chunk_text = enc.decode(chunk_tokens)
# 生成稳定的 chunk ID(基于内容哈希)
chunk_id = hashlib.md5(
f"{doc.id}_{idx}_{chunk_text[:50]}".encode()
).hexdigest()[:16]
chunks.append(Chunk(
id=chunk_id,
doc_id=doc.id,
content=chunk_text,
chunk_index=idx,
metadata={
**doc.metadata,
"doc_id": doc.id,
"chunk_index": idx,
"char_count": len(chunk_text),
}
))
start += chunk_size - overlap
idx += 1
return chunks
# ── Embedding 生成 ────────────────────────────────
def generate_embeddings(
texts: list[str],
model: str = "text-embedding-3-small",
batch_size: int = 100,
) -> list[list[float]]:
"""
批量生成 Embedding,自动处理速率限制
text-embedding-3-small: 1536 维,$0.02/M tokens
text-embedding-3-large: 3072 维,$0.13/M tokens(精度更高但成本高)
"""
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
try:
response = openai_client.embeddings.create(
model=model,
input=batch,
)
batch_embeddings = [item.embedding for item in response.data]
all_embeddings.extend(batch_embeddings)
except Exception as e:
print(f"Embedding 生成失败(批次 {i//batch_size}):{e}")
time.sleep(5)
# 重试一次
response = openai_client.embeddings.create(model=model, input=batch)
all_embeddings.extend([item.embedding for item in response.data])
if i + batch_size < len(texts):
time.sleep(0.1) # 避免速率限制
return all_embeddings
def embed_chunks(chunks: list[Chunk]) -> list[Chunk]:
"""为文档片段批量生成 Embedding"""
texts = [chunk.content for chunk in chunks]
embeddings = generate_embeddings(texts)
for chunk, embedding in zip(chunks, embeddings):
chunk.embedding = embedding
return chunks
五、Pinecone 集成
from pinecone import Pinecone, ServerlessSpec
class PineconeStore:
"""Pinecone 向量存储,封装索引操作"""
def __init__(
self,
index_name: str = "claude-rag",
dimension: int = 1536, # text-embedding-3-small 的维度
metric: str = "cosine",
cloud: str = "aws",
region: str = "us-east-1",
):
self.pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
self.index_name = index_name
# 如果索引不存在则创建
existing = [idx.name for idx in self.pc.list_indexes()]
if index_name not in existing:
self.pc.create_index(
name=index_name,
dimension=dimension,
metric=metric,
spec=ServerlessSpec(cloud=cloud, region=region),
)
print(f"已创建索引:{index_name}")
time.sleep(2) # 等待索引就绪
self.index = self.pc.Index(index_name)
print(f"连接到索引:{index_name},统计:{self.index.describe_index_stats()}")
def upsert_chunks(self, chunks: list[Chunk], batch_size: int = 100):
"""批量写入文档片段"""
vectors = []
for chunk in chunks:
if chunk.embedding is None:
continue
vectors.append({
"id": chunk.id,
"values": chunk.embedding,
"metadata": {
**chunk.metadata,
"content": chunk.content, # 把原文也存入 metadata
}
})
# 批量 upsert
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
self.index.upsert(vectors=batch)
print(f"已写入 {min(i + batch_size, len(vectors))}/{len(vectors)} 个片段")
def search(
self,
query_embedding: list[float],
top_k: int = 5,
filter: Optional[dict] = None,
score_threshold: float = 0.6,
) -> list[dict]:
"""
向量相似度搜索
Args:
filter: 元数据过滤,如 {"source": "技术文档"} 只搜索技术文档
score_threshold: 相似度阈值,低于此值的结果不返回
"""
result = self.index.query(
vector=query_embedding,
top_k=top_k,
filter=filter,
include_metadata=True,
)
return [
{
"id": match.id,
"score": match.score,
"content": match.metadata.get("content", ""),
"metadata": {k: v for k, v in match.metadata.items()
if k != "content"},
}
for match in result.matches
if match.score >= score_threshold
]
def delete_by_doc(self, doc_id: str):
"""删除某个文档的所有片段"""
self.index.delete(filter={"doc_id": {"$eq": doc_id}})
def get_stats(self) -> dict:
return self.index.describe_index_stats()
六、Weaviate 集成
import weaviate
import weaviate.classes as wvc
class WeaviateStore:
"""
Weaviate 向量存储
支持混合检索(向量 + 关键词 BM25)
"""
COLLECTION_NAME = "DocumentChunk"
def __init__(self, url: str = "http://localhost:8080"):
self.client = weaviate.connect_to_local(
host=url.replace("http://", "").split(":")[0],
port=int(url.split(":")[-1]) if ":" in url else 8080,
)
self._ensure_collection()
def _ensure_collection(self):
"""如果集合不存在则创建"""
if not self.client.collections.exists(self.COLLECTION_NAME):
self.client.collections.create(
name=self.COLLECTION_NAME,
vectorizer_config=wvc.config.Configure.Vectorizer.none(), # 我们自己提供向量
properties=[
wvc.config.Property(name="doc_id", data_type=wvc.config.DataType.TEXT),
wvc.config.Property(name="content", data_type=wvc.config.DataType.TEXT),
wvc.config.Property(name="chunk_index", data_type=wvc.config.DataType.INT),
wvc.config.Property(name="source", data_type=wvc.config.DataType.TEXT),
wvc.config.Property(name="title", data_type=wvc.config.DataType.TEXT),
],
)
print(f"已创建集合:{self.COLLECTION_NAME}")
def upsert_chunks(self, chunks: list[Chunk]):
"""批量写入,使用 Weaviate 的批量导入"""
collection = self.client.collections.get(self.COLLECTION_NAME)
with collection.batch.dynamic() as batch:
for chunk in chunks:
if chunk.embedding is None:
continue
batch.add_object(
properties={
"doc_id": chunk.doc_id,
"content": chunk.content,
"chunk_index": chunk.chunk_index,
"source": chunk.metadata.get("source", ""),
"title": chunk.metadata.get("title", ""),
},
vector=chunk.embedding,
uuid=chunk.id,
)
print(f"已写入 {len(chunks)} 个片段")
def search_vector(
self,
query_embedding: list[float],
top_k: int = 5,
filters: Optional[wvc.query.Filter] = None,
) -> list[dict]:
"""纯向量相似度搜索"""
collection = self.client.collections.get(self.COLLECTION_NAME)
result = collection.query.near_vector(
near_vector=query_embedding,
limit=top_k,
filters=filters,
return_metadata=wvc.query.MetadataQuery(distance=True),
)
return [
{
"id": str(obj.uuid),
"score": 1 - obj.metadata.distance, # 距离转相似度
"content": obj.properties["content"],
"metadata": {k: v for k, v in obj.properties.items()
if k != "content"},
}
for obj in result.objects
]
def search_hybrid(
self,
query_text: str,
query_embedding: list[float],
top_k: int = 5,
alpha: float = 0.75, # 0=纯关键词,1=纯向量,0.75偏向语义
) -> list[dict]:
"""
混合检索:向量语义 + BM25 关键词
对于专业术语和专有名词,混合检索效果优于纯向量检索
"""
collection = self.client.collections.get(self.COLLECTION_NAME)
result = collection.query.hybrid(
query=query_text,
vector=query_embedding,
limit=top_k,
alpha=alpha,
return_metadata=wvc.query.MetadataQuery(score=True),
)
return [
{
"id": str(obj.uuid),
"score": obj.metadata.score,
"content": obj.properties["content"],
"metadata": {k: v for k, v in obj.properties.items()
if k != "content"},
}
for obj in result.objects
]
def close(self):
self.client.close()
七、RAG 核心引擎
import anthropic
from typing import Union
claude = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
# 支持 Pinecone 或 Weaviate
VectorStore = Union[PineconeStore, WeaviateStore]
class RAGEngine:
"""
RAG 核心引擎:连接向量检索和 Claude 生成
"""
def __init__(
self,
store: VectorStore,
model: str = "claude-sonnet-4-6",
top_k: int = 5,
max_context: int = 4000, # 最多送给 Claude 的上下文字符数
):
self.store = store
self.model = model
self.top_k = top_k
self.max_context = max_context
def _build_context(self, chunks: list[dict]) -> str:
"""把检索到的片段组装成上下文"""
context_parts = []
total_chars = 0
for i, chunk in enumerate(chunks, 1):
source = chunk["metadata"].get("title") or chunk["metadata"].get("source", "未知来源")
part = f"[片段 {i}] 来源:{source}\n{chunk['content']}"
if total_chars + len(part) > self.max_context:
break
context_parts.append(part)
total_chars += len(part)
return "\n\n---\n\n".join(context_parts)
def _rerank(self, query: str, chunks: list[dict]) -> list[dict]:
"""
用 Claude 对检索结果重排序(可选,提升精度但增加延迟和成本)
适合对精度要求高的场景
"""
if len(chunks) <= 2:
return chunks
chunks_text = "\n\n".join(
f"[{i}] {c['content'][:200]}..." for i, c in enumerate(chunks)
)
response = claude.messages.create(
model="claude-haiku-4-5-20251001", # 重排序用 Haiku 节省成本
max_tokens=128,
temperature=0,
messages=[{
"role": "user",
"content": f"""用户问题:{query}
以下是检索到的文档片段,按与问题的相关程度从高到低排序,只输出片段编号,用逗号分隔:
{chunks_text}
只输出类似"2,0,4,1,3"这样的编号序列。"""
}]
)
try:
order = [int(n.strip()) for n in response.content[0].text.split(",")]
reranked = [chunks[i] for i in order if i < len(chunks)]
return reranked
except Exception:
return chunks # 重排序失败时返回原始顺序
def query(
self,
question: str,
use_rerank: bool = False,
filter: Optional[dict] = None,
stream: bool = False,
) -> Union[str, any]:
"""
执行 RAG 查询
Args:
question: 用户问题
use_rerank: 是否用 Claude 重排序(提高精度,增加延迟)
filter: 元数据过滤条件(Pinecone 格式)
stream: 是否流式输出
"""
# Step 1:生成问题的 Embedding
query_embedding = generate_embeddings([question])[0]
# Step 2:向量检索
if isinstance(self.store, WeaviateStore):
# Weaviate 优先使用混合检索
chunks = self.store.search_hybrid(
query_text=question,
query_embedding=query_embedding,
top_k=self.top_k,
)
else:
chunks = self.store.search(
query_embedding=query_embedding,
top_k=self.top_k,
filter=filter,
)
if not chunks:
return "抱歉,未找到相关文档来回答这个问题。"
# Step 3:可选重排序
if use_rerank:
chunks = self._rerank(question, chunks)
# Step 4:构建上下文
context = self._build_context(chunks)
# Step 5:调用 Claude 生成回答
system = """你是一个专业的知识库问答助手。
回答规则:
- 严格基于提供的文档内容回答,不要添加文档中没有的信息
- 如果文档中没有相关信息,明确说明"根据现有文档,无法回答此问题"
- 引用具体内容时说明来源片段编号
- 回答要简洁准确,避免冗余"""
user_message = f"""请基于以下文档内容回答问题。
文档内容:
{context}
问题:{question}"""
if stream:
return claude.messages.stream(
model=self.model,
max_tokens=2048,
system=system,
messages=[{"role": "user", "content": user_message}]
)
else:
response = claude.messages.create(
model=self.model,
max_tokens=2048,
system=system,
messages=[{"role": "user", "content": user_message}]
)
return response.content[0].text
def add_documents(self, documents: list[Document]):
"""完整的文档入库流程:切片 → Embedding → 存储"""
all_chunks = []
for doc in documents:
chunks = chunk_document(doc)
all_chunks.extend(chunks)
print(f"文档 '{doc.id}' 切分为 {len(chunks)} 个片段")
print(f"正在生成 {len(all_chunks)} 个片段的 Embedding...")
all_chunks = embed_chunks(all_chunks)
print("正在写入向量数据库...")
self.store.upsert_chunks(all_chunks)
print(f"✅ 入库完成,共 {len(all_chunks)} 个片段")
八、完整使用示例
# ── 示例一:使用 Pinecone ────────────────────────
pinecone_store = PineconeStore(index_name="company-kb")
rag = RAGEngine(store=pinecone_store, model="claude-sonnet-4-6")
# 入库文档
documents = [
Document(
id="policy_001",
content="""公司差旅报销政策(2026年版)
一、国内出差标准
住宿费用:一线城市(北京、上海、广州、深圳)不超过600元/晚,
其他城市不超过400元/晚。...
二、交通费用
飞机:经济舱,提前7天购票可报销。...
高铁:二等座,特殊情况一等座需提前申请。...""",
metadata={"title": "差旅报销政策", "source": "HR 文档", "date": "2026-01"}
),
Document(
id="product_001",
content="""产品使用手册 v3.2
第一章:快速入门
1.1 系统要求:操作系统 Windows 10+ 或 macOS 12+,内存 8GB 以上...
第二章:功能详解
2.1 数据导入:支持 Excel、CSV、JSON 格式...""",
metadata={"title": "产品手册", "source": "产品文档", "date": "2026-03"}
),
]
rag.add_documents(documents)
# 查询
answer = rag.query("上海出差住宿费用上限是多少?")
print(answer)
# 带元数据过滤的查询(只搜索 HR 文档)
answer = rag.query(
"报销需要哪些材料?",
filter={"source": {"$eq": "HR 文档"}}
)
print(answer)
# 流式输出
print("\n流式回答:")
with rag.query("产品支持哪些导入格式?", stream=True) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# ── 示例二:使用 Weaviate ────────────────────────
weaviate_store = WeaviateStore(url="http://localhost:8080")
rag_weaviate = RAGEngine(store=weaviate_store)
rag_weaviate.add_documents(documents)
# 混合检索(Weaviate 自动使用,无需额外配置)
answer = rag_weaviate.query(
question="Excel 文件怎么导入?",
use_rerank=True, # 开启重排序提高精度
)
print(answer)
weaviate_store.close()
九、Pinecone vs Weaviate 选型
| 维度 | Pinecone | Weaviate |
|---|---|---|
| 部署方式 | 全托管云服务 | 开源,支持自部署 / 托管云 |
| 上手难度 | ⭐⭐(最简单) | ⭐⭐⭐(中等) |
| 混合检索 | 需要额外配置 | 原生支持 BM25+向量 |
| 免费额度 | Serverless 免费版(有限制) | 开源版完全免费自部署 |
| 数据主权 | 数据在 Pinecone 云端 | 自部署完全掌控数据 |
| 多模态支持 | 纯向量 | 文本 + 图片 + 多模态 |
| 适合场景 | 快速原型,SaaS 产品,不想运维 | 企业内部部署,数据安全要求高,需要混合检索 |
十、生产级优化建议
优化一:查询 Embedding 缓存
import hashlib
from functools import lru_cache
# 简单的内存缓存(生产环境用 Redis)
_embedding_cache: dict[str, list[float]] = {}
def get_embedding_cached(text: str) -> list[float]:
key = hashlib.md5(text.encode()).hexdigest()
if key not in _embedding_cache:
_embedding_cache[key] = generate_embeddings([text])[0]
return _embedding_cache[key]
优化二:文档更新检测(避免重复入库)
def get_doc_hash(content: str) -> str:
return hashlib.sha256(content.encode()).hexdigest()[:16]
def smart_upsert(rag: RAGEngine, doc: Document, hash_store: dict):
"""只在文档内容变化时重新入库"""
new_hash = get_doc_hash(doc.content)
if hash_store.get(doc.id) == new_hash:
print(f"文档 {doc.id} 未变化,跳过")
return
rag.store.delete_by_doc(doc.id)
rag.add_documents([doc])
hash_store[doc.id] = new_hash
print(f"文档 {doc.id} 已更新")
常见问题
Q:chunk_size 设多大合适?
没有通用最优值,取决于你的文档类型和查询方式。技术文档和 FAQ 建议 256-512 token(每个片段完整回答一个问题);叙事性文档如新闻、报告建议 512-1024 token(保留更多上下文)。建议用不同的 chunk_size 各跑一批测试,用 LLM-as-Judge 评估检索质量。
Q:什么时候需要开 use_rerank?
当检索结果的相关性不够精准时——典型场景是查询词和文档用词差异大(如用户问”报销要什么材料”,文档写的是”申请所需附件”)。重排序用 Haiku 实现,额外成本很低,但会增加约 0.5-1 秒延迟。对精度敏感的场景建议开启。
Q:向量数据库里存的内容能被 Claude 直接看到吗?
不能。向量数据库只存储向量和元数据,Claude 看不到向量数据库里的内容。RAG 的工作方式是:先用向量检索找到最相关的文本片段,再把这些文本片段作为上下文拼接进 Prompt 送给 Claude。Claude 看到的是检索后的文本,而不是向量数据库本身。
总结
向量数据库 + Claude 的 RAG 架构解决的核心问题是”海量文档中精准检索”。选型上:快速上手或 SaaS 产品用 Pinecone;数据安全要求高或需要混合检索用 Weaviate。生产环境的关键优化点是:合理的 chunk_size(直接影响检索质量)、Embedding 缓存(节省成本)、混合检索(提升专业术语命中率)。代码层面,本文提供的抽象层让两个数据库的切换只需改一行初始化代码,方便迁移和测试。