📌 内容摘要
- Agent 不是聊天机器人升级版,而是能自主规划和执行多步骤任务的 AI 系统——理解这个区别是选对架构的前提。
- ReAct:每一步都”思考→行动→观察”,灵活但步骤多;Plan-and-Execute:先规划全局再执行,高效但需要提前想清楚。
- 两种模式各有适用场景,本文提供判断框架和完整 Python 实现,含工具调用、错误处理、循环终止三个关键机制。
- 文末附多 Agent 协作模式(Orchestrator-Worker)的设计思路,面向复杂任务场景。
一、什么是 Agent?和普通 LLM 调用的本质区别
普通的 LLM 调用是单次问答:你给一个输入,它返回一个输出,整个过程是线性的、一次性的。Agent 不同——它有一个目标,能自主决定需要做哪些步骤、用哪些工具、按什么顺序执行,直到目标达成。
最直观的类比:普通 LLM 调用像问一个专家”这个问题怎么解决”,Agent 像雇了一个助理”帮我把这件事做完”——助理会自己想办法、查资料、打电话、协调资源,而不是等你给每一步的指令。
| 维度 | 普通 LLM 调用 | Agent |
|---|---|---|
| 执行步骤 | 单次,固定 | 多步,动态决定 |
| 工具使用 | 无 | 可调用外部工具(API、数据库、代码执行等) |
| 状态管理 | 无状态 | 维护任务状态,根据结果调整后续步骤 |
| 适合任务 | 单问题回答、内容生成 | 需要查询、计算、多步操作的复杂任务 |
二、工具定义:Agent 的能力基础
在讲设计模式之前,先建立一套工具(Tool)——这是两种 Agent 模式共用的基础层:
import anthropic
import json
import requests
from datetime import datetime
from typing import Any
client = anthropic.Anthropic()
# ── 工具函数定义 ──────────────────────────────────
def search_web(query: str) -> str:
"""模拟网页搜索(实际项目替换为真实搜索 API)"""
# 示例:替换为 Serper、Brave Search 等 API
return f"搜索'{query}'的结果:[模拟返回相关搜索结果摘要]"
def get_weather(city: str) -> str:
"""获取城市天气"""
# 示例:替换为 OpenWeatherMap API
return json.dumps({
"city": city,
"temperature": 22,
"condition": "晴",
"humidity": 45
}, ensure_ascii=False)
def calculate(expression: str) -> str:
"""安全计算数学表达式"""
try:
# 只允许安全的数学操作
allowed = set("0123456789+-*/()., ")
if not all(c in allowed for c in expression):
return "错误:表达式包含不允许的字符"
result = eval(expression)
return str(result)
except Exception as e:
return f"计算错误:{e}"
def read_file(path: str) -> str:
"""读取文件内容"""
try:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
return content[:5000] # 限制返回长度
except Exception as e:
return f"读取文件失败:{e}"
def write_file(path: str, content: str) -> str:
"""写入文件"""
try:
with open(path, "w", encoding="utf-8") as f:
f.write(content)
return f"文件已写入:{path}"
except Exception as e:
return f"写入失败:{e}"
def run_python(code: str) -> str:
"""执行 Python 代码并返回输出(生产环境需要沙箱隔离)"""
import io, sys
old_stdout = sys.stdout
sys.stdout = buffer = io.StringIO()
try:
exec(code, {})
output = buffer.getvalue()
return output if output else "代码执行完成,无输出"
except Exception as e:
return f"执行错误:{e}"
finally:
sys.stdout = old_stdout
# ── Claude API 工具定义 ───────────────────────────
TOOLS = [
{
"name": "search_web",
"description": "搜索互联网获取实时信息,适合查询新闻、事实、当前状态等",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "搜索关键词"}
},
"required": ["query"]
}
},
{
"name": "get_weather",
"description": "获取指定城市的当前天气信息",
"input_schema": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "城市名称"}
},
"required": ["city"]
}
},
{
"name": "calculate",
"description": "计算数学表达式,支持加减乘除和括号",
"input_schema": {
"type": "object",
"properties": {
"expression": {"type": "string", "description": "数学表达式,如 '(12 + 8) * 3'"}
},
"required": ["expression"]
}
},
{
"name": "read_file",
"description": "读取指定路径的文件内容",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"}
},
"required": ["path"]
}
},
{
"name": "write_file",
"description": "将内容写入指定路径的文件",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"},
"content": {"type": "string", "description": "要写入的内容"}
},
"required": ["path", "content"]
}
},
{
"name": "run_python",
"description": "执行 Python 代码片段并返回输出,适合数据处理和计算",
"input_schema": {
"type": "object",
"properties": {
"code": {"type": "string", "description": "要执行的 Python 代码"}
},
"required": ["code"]
}
}
]
# 工具名称到函数的映射
TOOL_FUNCTIONS = {
"search_web": search_web,
"get_weather": get_weather,
"calculate": calculate,
"read_file": read_file,
"write_file": write_file,
"run_python": run_python,
}
def execute_tool(tool_name: str, tool_input: dict) -> str:
"""统一的工具执行入口"""
if tool_name not in TOOL_FUNCTIONS:
return f"错误:未知工具 '{tool_name}'"
try:
result = TOOL_FUNCTIONS[tool_name](**tool_input)
return str(result)
except Exception as e:
return f"工具执行失败:{e}"
三、ReAct 模式:思考→行动→观察的循环
核心思路
ReAct(Reasoning + Acting)的核心是一个循环:在每一步,模型先思考(当前状态是什么,下一步应该做什么),再行动(调用工具),然观察结果,再进入下一轮思考。这个循环持续到任务完成。
ReAct 执行流程:
用户任务
↓
[思考] 我需要做什么?第一步是?
↓
[行动] 调用工具A(参数...)
↓
[观察] 工具A返回结果:...
↓
[思考] 基于结果,下一步是?
↓
[行动] 调用工具B(参数...)
↓
[观察] 工具B返回结果:...
↓
(重复,直到任务完成)
↓
[最终答案] 综合所有观察,给出完整回答
ReAct 完整实现
from dataclasses import dataclass, field
@dataclass
class ReActStep:
"""记录 ReAct 每一步的状态"""
step_num: int
thought: str = ""
action: str = ""
action_input:dict = field(default_factory=dict)
observation: str = ""
is_final: bool = False
final_answer:str = ""
class ReActAgent:
"""
ReAct Agent 实现
每步:思考当前状态 → 选择工具 → 执行 → 观察结果 → 继续
"""
def __init__(
self,
tools: list[dict],
model: str = "claude-sonnet-4-6",
max_steps: int = 10,
verbose: bool = True,
):
self.tools = tools
self.model = model
self.max_steps = max_steps
self.verbose = verbose
def run(self, task: str) -> tuple[str, list[ReActStep]]:
"""
执行任务,返回最终答案和执行轨迹
Returns:
(最终答案, 步骤列表)
"""
messages = []
steps = []
step_num = 0
system = """你是一个能够使用工具完成任务的 AI 助手。
工作方式:
1. 仔细理解用户的任务目标
2. 思考需要哪些信息或操作来完成任务
3. 选择合适的工具执行操作
4. 观察工具返回的结果
5. 根据结果决定下一步(继续使用工具或给出最终答案)
重要原则:
- 每次只调用一个工具,等待观察结果后再决定下一步
- 如果工具返回错误,思考如何调整后重试
- 确认任务完全完成后再给出最终答案
- 最终答案要直接回答用户的问题,不要说"我已经执行了..."这类过程描述"""
# 初始用户消息
messages.append({"role": "user", "content": task})
while step_num < self.max_steps:
step_num += 1
step = ReActStep(step_num=step_num)
if self.verbose:
print(f"\n{'─'*50}")
print(f"步骤 {step_num}")
print(f"{'─'*50}")
# 调用 Claude
response = client.messages.create(
model=self.model,
max_tokens=4096,
system=system,
tools=self.tools,
messages=messages,
)
# 处理响应
if response.stop_reason == "end_turn":
# 模型给出最终答案(不再调用工具)
final_answer = ""
for block in response.content:
if hasattr(block, "text"):
final_answer += block.text
step.is_final = True
step.final_answer = final_answer
steps.append(step)
if self.verbose:
print(f"✅ 最终答案:{final_answer[:200]}...")
return final_answer, steps
elif response.stop_reason == "tool_use":
# 模型决定调用工具
tool_results = []
for block in response.content:
if hasattr(block, "text") and block.text:
step.thought = block.text
if self.verbose:
print(f"💭 思考:{block.text[:150]}...")
elif block.type == "tool_use":
step.action = block.name
step.action_input = block.input
if self.verbose:
print(f"🔧 调用工具:{block.name}")
print(f" 参数:{json.dumps(block.input, ensure_ascii=False)[:200]}")
# 执行工具
result = execute_tool(block.name, block.input)
step.observation = result
if self.verbose:
print(f"👁️ 观察:{result[:200]}...")
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
})
steps.append(step)
# 把工具结果加入消息历史
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
else:
# 意外的停止原因
break
# 超过最大步骤数
return "任务超过最大步骤限制,未能完成", steps
# ── 使用示例 ─────────────────────────────────────
agent = ReActAgent(tools=TOOLS, max_steps=8, verbose=True)
# 示例1:需要多步操作的任务
answer, steps = agent.run(
"帮我查一下北京和上海今天的天气,然后计算两个城市温度的平均值"
)
print(f"\n最终答案:{answer}")
print(f"共执行了 {len(steps)} 步")
# 示例2:需要搜索和计算的任务
answer, steps = agent.run(
"搜索一下2026年人工智能行业的最新动态,然后把主要发现写入文件 ai_news.txt"
)
四、Plan-and-Execute 模式:先规划,再执行
核心思路
Plan-and-Execute 把任务分为两个明确的阶段:首先由规划器(Planner)生成完整的执行计划,然后执行器(Executor)按计划逐步执行。与 ReAct 的关键区别是:ReAct 边走边想,Plan-and-Execute 先想清楚再走。
Plan-and-Execute 执行流程:
用户任务
↓
【规划阶段】(Planner)
分析任务,生成完整步骤列表:
步骤1:搜索XXX
步骤2:基于结果计算YYY
步骤3:写入文件ZZZ
↓
【执行阶段】(Executor,逐步执行)
执行步骤1 → 结果1
执行步骤2(基于结果1)→ 结果2
执行步骤3(基于结果2)→ 结果3
↓
【汇总阶段】(可选:Replanner)
所有步骤完成后,汇总输出最终答案
Plan-and-Execute 完整实现
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class ExecutionPlan:
"""执行计划"""
steps: list[str] # 步骤描述列表
tools_hint: list[str] # 每步建议使用的工具
rationale: str # 规划逻辑说明
@dataclass
class StepResult:
"""单步执行结果"""
step_num: int
description: str
tool_used: str
tool_input: dict
output: str
success: bool
class PlanAndExecuteAgent:
"""
Plan-and-Execute Agent
分三个阶段:规划 → 执行 → 汇总
"""
def __init__(
self,
tools: list[dict],
planner_model: str = "claude-opus-4-6", # 规划用 Opus,推理更强
executor_model:str = "claude-sonnet-4-6", # 执行用 Sonnet,成本更优
max_replans: int = 2,
verbose: bool = True,
):
self.tools = tools
self.planner_model = planner_model
self.executor_model = executor_model
self.max_replans = max_replans
self.verbose = verbose
def plan(self, task: str, context: str = "") -> ExecutionPlan:
"""
规划阶段:生成执行计划
使用更强的模型(Opus)进行复杂规划
"""
tool_list = "\n".join(
f"- {t['name']}: {t['description']}"
for t in self.tools
)
prompt = f"""分析以下任务,生成详细的执行计划。
任务:{task}
{f'额外背景:{context}' if context else ''}
可用工具:
{tool_list}
请生成一个执行计划,以 JSON 格式返回:
{{
"rationale": "为什么这样规划(2-3句话)",
"steps": [
"步骤1的具体描述",
"步骤2的具体描述",
...
],
"tools_hint": [
"步骤1建议使用的工具名",
"步骤2建议使用的工具名",
...
]
}}
规划原则:
- 步骤要具体,每步只做一件事
- 考虑步骤间的依赖关系
- 步骤数控制在3-7步(太少可能遗漏,太多效率低)
- tools_hint 长度必须和 steps 相同
只返回 JSON,不要其他内容。"""
response = client.messages.create(
model=self.planner_model,
max_tokens=2048,
temperature=0, # 规划需要确定性
messages=[{"role": "user", "content": prompt}]
)
text = response.content[0].text.strip()
if text.startswith("```"):
text = "\n".join(text.split("\n")[1:-1]).strip()
data = json.loads(text)
plan = ExecutionPlan(
steps=data["steps"],
tools_hint=data.get("tools_hint", [""] * len(data["steps"])),
rationale=data.get("rationale", ""),
)
if self.verbose:
print(f"\n{'='*50}")
print(f"📋 执行计划(共 {len(plan.steps)} 步)")
print(f"规划逻辑:{plan.rationale}")
print(f"{'='*50}")
for i, (step, hint) in enumerate(zip(plan.steps, plan.tools_hint), 1):
print(f" 步骤{i}:{step}(工具:{hint})")
return plan
def execute_step(
self,
step_description: str,
step_num: int,
tool_hint: str,
context: str,
) -> StepResult:
"""
执行单个步骤
"""
system = f"""你是一个任务执行器。你的工作是执行具体的步骤,
使用合适的工具,并返回执行结果。
已完成的步骤和结果:
{context if context else '(这是第一步)'}
当前步骤:{step_description}
建议工具:{tool_hint}(仅供参考,根据实际情况选择)
执行原则:
- 专注执行当前步骤,不要跑偏
- 如果步骤已经可以从上下文中得到结果,直接使用,不要重复工具调用
- 执行完成后简要说明结果"""
messages = [{
"role": "user",
"content": f"请执行步骤 {step_num}:{step_description}"
}]
# 执行步骤(可能涉及工具调用)
tool_name = ""
tool_input = {}
output = ""
response = client.messages.create(
model=self.executor_model,
max_tokens=2048,
system=system,
tools=self.tools,
messages=messages,
)
# 处理工具调用
if response.stop_reason == "tool_use":
for block in response.content:
if block.type == "tool_use":
tool_name = block.name
tool_input = block.input
result = execute_tool(block.name, block.input)
# 把工具结果发回,让模型给出最终输出
messages.append({"role": "assistant", "content": response.content})
messages.append({
"role": "user",
"content": [{
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
}]
})
# 获取最终输出
final_response = client.messages.create(
model=self.executor_model,
max_tokens=1024,
system=system,
tools=self.tools,
messages=messages,
)
for block in final_response.content:
if hasattr(block, "text"):
output += block.text
else:
# 不需要工具,直接输出
for block in response.content:
if hasattr(block, "text"):
output += block.text
success = not output.startswith("错误") and bool(output)
if self.verbose:
print(f"\n步骤 {step_num}:{step_description}")
if tool_name:
print(f" 🔧 使用工具:{tool_name}")
print(f" {'✅' if success else '❌'} 结果:{output[:200]}...")
return StepResult(
step_num=step_num,
description=step_description,
tool_used=tool_name,
tool_input=tool_input,
output=output,
success=success,
)
def summarize(self, task: str, results: list[StepResult]) -> str:
"""汇总所有步骤的结果,给出最终答案"""
steps_summary = "\n".join(
f"步骤{r.step_num}({r.description}):\n{r.output}"
for r in results
)
response = client.messages.create(
model=self.executor_model,
max_tokens=2048,
messages=[{
"role": "user",
"content": f"""原始任务:{task}
各步骤执行结果:
{steps_summary}
请基于以上执行结果,给出完整的最终答案。
要求:直接回答原始任务的问题,不要重复罗列步骤过程。"""
}]
)
return response.content[0].text
def run(self, task: str) -> tuple[str, ExecutionPlan, list[StepResult]]:
"""
完整执行流程:规划 → 执行 → 汇总
Returns:
(最终答案, 执行计划, 步骤结果列表)
"""
# 阶段1:规划
plan = self.plan(task)
# 阶段2:逐步执行
results = []
context_parts = []
for i, (step, hint) in enumerate(zip(plan.steps, plan.tools_hint), 1):
context = "\n".join(context_parts)
result = self.execute_step(step, i, hint, context)
results.append(result)
context_parts.append(f"步骤{i}({step})的结果:{result.output}")
# 阶段3:汇总
if self.verbose:
print(f"\n{'='*50}")
print("📝 汇总最终答案")
print(f"{'='*50}")
final_answer = self.summarize(task, results)
if self.verbose:
print(f"\n✅ 最终答案:\n{final_answer}")
return final_answer, plan, results
# ── 使用示例 ─────────────────────────────────────
agent = PlanAndExecuteAgent(tools=TOOLS, verbose=True)
answer, plan, results = agent.run(
"帮我分析一下:搜索近期 AI 编程工具的主要产品,"
"整理出3个最受开发者欢迎的工具及其特点,"
"写成一份简报保存到 ai_tools_report.txt"
)
五、两种模式的对比与选型
| 维度 | ReAct | Plan-and-Execute |
|---|---|---|
| 决策方式 | 每步实时决策 | 先全局规划,再逐步执行 |
| 灵活性 | 高,可随时根据结果调整 | 中,执行中调整计划成本高 |
| API 调用次数 | 多(每步一次) | 少(规划+执行+汇总) |
| 可解释性 | 中(步骤多但无全局视图) | 高(计划一目了然) |
| 适合任务类型 | 探索性、需要随时调整方向 | 步骤明确、可预先规划的任务 |
| 错误恢复 | 强(可立即调整) | 弱(需要重新规划) |
| 典型场景 | 调试代码、探索性研究、需要试错的任务 | 报告生成、数据处理流水线、结构化工作流 |
选型决策树:
任务步骤是否可以提前完全确定?
├── 否(探索性、依赖中间结果)→ ReAct
└── 是
├── 步骤数是否超过5步?
│ ├── 是 → Plan-and-Execute(执行效率更高)
│ └── 否 → ReAct 或 Plan-and-Execute 都可以
└── 是否需要向用户展示执行计划?
├── 是 → Plan-and-Execute(计划可视性更好)
└── 否 → 按偏好选择
六、多 Agent 协作:Orchestrator-Worker 模式
对于更复杂的任务,单个 Agent 可能难以胜任。Orchestrator-Worker 模式把任务分给多个专门的 Worker Agent,由 Orchestrator 协调:
from concurrent.futures import ThreadPoolExecutor
class OrchestratorAgent:
"""
编排多个专门化 Worker Agent 完成复杂任务
Orchestrator 负责任务分解和结果整合
Worker 各自专注一个子任务
"""
def __init__(self, workers: dict[str, callable]):
"""
workers: {"工作名称": worker_函数, ...}
例如:{
"research": research_agent.run,
"analysis": analysis_agent.run,
"writing": writing_agent.run,
}
"""
self.workers = workers
def decompose(self, task: str) -> list[dict]:
"""把主任务分解为子任务,分配给不同 Worker"""
worker_list = "\n".join(f"- {name}" for name in self.workers)
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
temperature=0,
messages=[{
"role": "user",
"content": f"""把以下任务分解为子任务,分配给合适的 Worker。
主任务:{task}
可用 Worker:
{worker_list}
以 JSON 返回:
[
{{"worker": "worker名称", "subtask": "具体子任务描述", "depends_on": []}},
{{"worker": "worker名称", "subtask": "具体子任务描述", "depends_on": [0]}},
...
]
depends_on 是必须先完成的子任务序号(从0开始)。
只返回 JSON。"""
}]
)
text = response.content[0].text.strip()
if text.startswith("```"):
text = "\n".join(text.split("\n")[1:-1])
return json.loads(text)
def run(self, task: str) -> str:
"""协调所有 Worker 完成任务"""
subtasks = self.decompose(task)
results = {}
print(f"任务分解为 {len(subtasks)} 个子任务")
# 按依赖顺序执行
for i, subtask in enumerate(subtasks):
deps = subtask.get("depends_on", [])
worker = subtask["worker"]
# 构建包含依赖结果的上下文
context = ""
for dep_idx in deps:
if dep_idx in results:
context += f"前置结果(子任务{dep_idx}):{results[dep_idx]}\n"
full_task = f"{subtask['subtask']}\n\n{context}" if context else subtask["subtask"]
print(f"执行子任务 {i}({worker}):{subtask['subtask'][:60]}...")
if worker in self.workers:
results[i] = self.workers[worker](full_task)
else:
results[i] = f"未找到 Worker:{worker}"
# 汇总所有结果
all_results = "\n\n".join(
f"子任务{i}结果:{result}"
for i, result in results.items()
)
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=2048,
messages=[{
"role": "user",
"content": f"原始任务:{task}\n\n各子任务结果:\n{all_results}\n\n请整合以上结果,给出完整的最终答案。"
}]
)
return response.content[0].text
常见问题
Q:Agent 陷入无限循环怎么防止?
三层保护:一是设置 max_steps 硬上限(代码里已实现);二是在 System Prompt 里明确说明"如果同一个工具用相同参数调用超过2次,停下来重新思考";三是记录工具调用历史,检测重复调用并强制终止。生产环境建议三层都加。
Q:工具调用失败时 Agent 应该怎么处理?
ReAct 天然支持从工具失败中恢复——工具返回错误信息后,模型会在下一步思考如何调整(换参数重试、用其他工具、告知用户)。Plan-and-Execute 在执行阶段失败时,可以触发重新规划(Replanning):把失败信息传给 Planner,生成调整后的计划继续执行。代码中的 max_replans 参数控制最多重新规划几次。
Q:如何估算 Agent 任务的 API 成本?
ReAct 的成本取决于步骤数——每步都是一次 API 调用,而且每步都要携带完整的历史消息,越到后面 token 消耗越多。一个10步的 ReAct 任务,总 token 消耗通常是第1步的15-20倍。Plan-and-Execute 的总 token 更可控,规划+执行+汇总三个阶段相对独立。对于成本敏感的场景,Plan-and-Execute 通常更划算。
总结
ReAct 和 Plan-and-Execute 是两种互补而不是竞争的设计模式。ReAct 的核心优势是灵活——任务路径可以随观察结果实时调整,适合探索性和不确定性高的任务。Plan-and-Execute 的核心优势是高效且可解释——执行前就能看到完整计划,适合结构化、步骤可预先确定的任务。实际项目中,两种模式也可以组合:用 Plan-and-Execute 规划全局,每个步骤内部用 ReAct 处理细节。理解两种模式的适用边界,比死记实现代码更重要。