把 Claude 4.6 接入 Agent 系统处理长时间任务,开发者迟早都会遇到同一个问题:任务跑着跑着,Claude 开始偏离最初的目标。轻则输出风格漂移,重则完全忘记了任务的核心约束,甚至开始重复已经完成的步骤。
这不是 Claude 能力不够,而是大语言模型在超长推理链中的固有特性——上下文越长,早期信息的权重越低,”注意力”自然向近期内容倾斜。一个设计良好的 Agent 架构,应该从根本上解决这个问题,而不是寄希望于模型自己保持一致。
本文由 Claude Ai中文官网 整理,从上下文管理、状态持久化、任务分解到检查点设计,系统说明如何让 Claude 4.6 驱动的 Agent 在超过 30 小时的长任务中保持目标一致性和执行稳定性。
本文面向通过 Anthropic API 构建 Agent 系统的开发者,内容以 Claude 4.6 系列为基础,部分架构模式同样适用于其他长上下文模型。具体 API 参数以 Claude Ai中文官网 最新文档为准。
一、理解问题根源:为什么 Agent 任务会”跑偏”
在设计解决方案之前,先把问题的根源说清楚。Claude 4.6 在 Agent 长任务中出现注意力漂移,通常来自三个独立的机制,需要分别应对:
机制 1:上下文窗口的位置偏差
即使是 200K Token 的上下文窗口,模型对不同位置内容的处理权重也不均等。随着任务执行,早期的任务说明和约束条件被越来越多的中间步骤推向上下文前端,模型对这部分的关注度逐渐降低。当累积的执行历史超过几万 Token 后,最初定义的核心目标开始变得”模糊”。
机制 2:错误积累和方向漂移
Agent 的每一步决策都基于之前所有步骤的输出。如果某个中间步骤引入了细微的偏差,后续步骤会把这个偏差当作事实继续推进,错误会以指数级放大。在 30 小时的任务中,即使每小时只有 0.5% 的漂移,累积效果也会让最终结果面目全非。
机制 3:工具调用结果的噪音积累
Agent 调用外部工具(搜索、数据库查询、API 调用)时,返回的结果会进入上下文。随着任务进行,大量的工具调用结果积累在上下文中,其中很多是已经处理过的中间信息。这些”已完成的工作记录”会稀释当前真正需要关注的信息密度。
二、核心架构:把”记忆”从上下文里独立出来
解决以上三个问题的核心思路只有一个:把 Agent 的状态管理从模型上下文中独立出来,用外部存储承载长期记忆,让每次 Claude 调用保持在一个”干净”的、信息密度高的上下文中。
这个架构模式通常被称为”状态-上下文分离”,是构建长任务 Agent 的基础设计原则。
架构示意
┌─────────────────────────────────────────────────┐
│ Agent 调度层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ 任务状态 │ │ 进度跟踪 │ │ 错误处理 │ │
│ └──────────┘ └──────────┘ └──────────────┘ │
└─────────────────────────────────────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────────┐
│ 外部状态存储 │ │ Claude 4.6 调用 │
│ (Redis / DB) │ │ [精简后的上下文] │
│ - 任务定义 │ │ - 核心目标(固定) │
│ - 完成步骤 │ │ - 当前子任务 │
│ - 中间结果 │ │ - 相关历史摘要 │
│ - 检查点 │ │ - 工具调用结果 │
└─────────────────┘ └─────────────────────┘
关键设计原则
- 每次 Claude 调用的上下文长度应该是受控的:无论任务运行了多久,每次调用 Claude 时传入的上下文都应该维持在一个合理的范围内(通常 20K–50K Token),而不是把所有历史都堆进去
- 核心目标永远在上下文最前面:任务定义、核心约束、不可违背的规则,这些内容在每次调用时都应该是上下文的第一个内容块,且通过 Prompt Caching 缓存
- 状态存在外部,摘要进上下文:完整的执行历史存在数据库或 Redis,进入 Claude 上下文的只是经过压缩的摘要
三、实现:上下文管理的具体方法
方法 1:固定核心目标区块 + Prompt Caching
把任务的核心目标和约束条件设计为一个固定的、不随任务进行而变化的内容块,放在系统提示词的最前面,并标记为可缓存:
import anthropic
from dataclasses import dataclass
from typing import Optional
client = anthropic.Anthropic()
@dataclass
class AgentTask:
task_id: str
core_objective: str # 核心目标(不变)
constraints: list[str] # 约束条件(不变)
success_criteria: str # 成功标准(不变)
def build_claude_context(
task: AgentTask,
current_step: str,
progress_summary: str,
relevant_history: str,
current_tool_results: Optional[str] = None
) -> list[dict]:
"""
构建每次调用 Claude 时的上下文。
核心目标区块固定且缓存,其余内容随任务状态动态构建。
"""
constraints_text = "\n".join(
f"- {c}" for c in task.constraints
)
# 系统提示词:核心目标区块(固定,缓存)
system = [
{
"type": "text",
"text": f"""你是一个执行长期任务的 AI Agent。
【核心目标】(在整个任务过程中始终优先遵守)
{task.core_objective}
【不可违背的约束】
{constraints_text}
【成功标准】
{task.success_criteria}
每次响应前,先确认你的行动是否符合核心目标和约束条件。
如果发现当前方向与核心目标偏离,立即说明并修正方向。""",
"cache_control": {"type": "ephemeral"} # 核心目标缓存
}
]
# 动态上下文:进度摘要 + 当前步骤 + 相关历史
user_content = f"""【当前进度摘要】
{progress_summary}
【当前需要执行的步骤】
{current_step}
【相关历史信息】
{relevant_history}
"""
if current_tool_results:
user_content += f"\n【工具调用结果】\n{current_tool_results}"
user_content += "\n\n请基于以上信息执行当前步骤,并说明下一步计划。"
return system, [{"role": "user", "content": user_content}]
方法 2:滚动摘要——压缩历史,保留关键
每隔一定步骤数,对已完成的任务历史做一次压缩摘要,用摘要替代原始执行记录:
import json
from datetime import datetime
class AgentMemoryManager:
"""管理 Agent 的长期记忆,定期压缩历史,防止上下文膨胀"""
def __init__(self, compress_every_n_steps: int = 10):
self.steps: list[dict] = []
self.compressed_summaries: list[str] = []
self.compress_threshold = compress_every_n_steps
def add_step(self, step_description: str, result: str, tools_used: list[str]):
"""记录一个执行步骤"""
self.steps.append({
"timestamp": datetime.now().isoformat(),
"step": step_description,
"result": result,
"tools": tools_used
})
# 达到压缩阈值时,自动压缩旧步骤
if len(self.steps) >= self.compress_threshold:
self._compress_old_steps()
def _compress_old_steps(self):
"""调用 Claude 对旧步骤做摘要压缩"""
steps_to_compress = self.steps[:self.compress_threshold]
steps_text = "\n".join([
f"步骤 {i+1}:{s['step']}\n结果:{s['result']}"
for i, s in enumerate(steps_to_compress)
])
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=500,
messages=[{
"role": "user",
"content": f"""请将以下执行步骤压缩为简洁摘要。
保留:已完成的关键操作、重要发现、需要后续步骤参考的信息。
去除:过程细节、工具调用的原始输出、已经不再需要的中间状态。
摘要不超过 200 字。
{steps_text}"""
}]
)
summary = response.content[0].text
self.compressed_summaries.append(summary)
# 保留最近的步骤,清除已压缩的旧步骤
self.steps = self.steps[self.compress_threshold:]
def get_context_for_claude(self, max_recent_steps: int = 5) -> str:
"""获取适合放入 Claude 上下文的进度信息"""
parts = []
# 加入压缩摘要(通常只需要最近 2-3 个摘要)
if self.compressed_summaries:
recent_summaries = self.compressed_summaries[-3:]
parts.append("【历史进度摘要】\n" + "\n\n".join(recent_summaries))
# 加入最近几步的详细记录
if self.steps:
recent = self.steps[-max_recent_steps:]
recent_text = "\n".join([
f"- {s['step']}:{s['result'][:100]}..."
for s in recent
])
parts.append(f"【最近执行步骤】\n{recent_text}")
return "\n\n".join(parts)
方法 3:目标锚定检查——定期强制对齐
每隔固定的步骤数,强制插入一次”目标对齐检查”,让 Claude 显式确认当前方向是否符合核心目标:
def goal_alignment_check(
task: AgentTask,
current_direction: str,
completed_steps_summary: str,
check_interval: int = 15
) -> dict:
"""
定期执行目标对齐检查。
让 Claude 显式评估当前执行方向是否偏离核心目标。
"""
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=800,
system=f"""你是一个严格的任务监督者。
你的职责是检查 Agent 的执行方向是否偏离了核心目标。
核心目标:{task.core_objective}
约束条件:{chr(10).join(task.constraints)}
成功标准:{task.success_criteria}""",
messages=[{
"role": "user",
"content": f"""请评估以下执行情况:
当前执行方向:
{current_direction}
已完成工作摘要:
{completed_steps_summary}
请回答:
1. 当前方向是否与核心目标一致?(是/否/部分一致)
2. 如果有偏差,具体偏差在哪里?
3. 建议的修正方向是什么?(如果需要修正)
4. 继续当前方向是否有风险?(高/中/低)
请用 JSON 格式返回,键名为:aligned、deviation、correction、risk_level"""
}]
)
try:
result = json.loads(response.content[0].text)
return result
except json.JSONDecodeError:
# 如果 JSON 解析失败,返回保守的结果触发人工检查
return {
"aligned": False,
"deviation": "无法解析对齐检查结果",
"correction": "建议人工检查",
"risk_level": "高"
}
四、任务分解:把 30 小时任务变成可管理的子任务链
另一个根本性的应对策略是从任务设计层面解决问题:不要把 30 小时的任务当作一个整体交给 Agent,而是在任务开始时就把它分解成有明确边界的子任务,每个子任务独立执行、独立验证、独立存档。
任务分解的原则
- 每个子任务有明确的完成标准:不是”做这件事”,而是”做这件事,完成的判断标准是X”
- 子任务之间有明确的依赖关系:哪些必须按顺序,哪些可以并行
- 每个子任务完成后立即验证:在进入下一个子任务之前,确认当前子任务的输出符合预期
- 子任务结果持久化:每个子任务的输出存储到外部系统,不依赖上下文保存
import sqlite3
from enum import Enum
from typing import Callable
class SubtaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
NEEDS_REVIEW = "needs_review"
class LongTaskOrchestrator:
"""
长任务编排器:把大任务分解为子任务链,
每个子任务独立执行和验证。
"""
def __init__(self, task: AgentTask, db_path: str = "agent_state.db"):
self.task = task
self.db_path = db_path
self.memory_manager = AgentMemoryManager(compress_every_n_steps=10)
self._init_db()
def _init_db(self):
"""初始化状态存储数据库"""
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS subtasks (
id INTEGER PRIMARY KEY,
task_id TEXT,
sequence INTEGER,
description TEXT,
success_criteria TEXT,
status TEXT DEFAULT 'pending',
result TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS checkpoints (
id INTEGER PRIMARY KEY,
task_id TEXT,
checkpoint_name TEXT,
state_snapshot TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
def decompose_task(self) -> list[dict]:
"""让 Claude 把大任务分解为子任务清单"""
response = client.messages.create(
model="claude-opus-4-6", # 任务分解用最强模型
max_tokens=3000,
messages=[{
"role": "user",
"content": f"""请将以下长期任务分解为具体的子任务清单。
核心目标:{self.task.core_objective}
约束条件:{chr(10).join(self.task.constraints)}
成功标准:{self.task.success_criteria}
分解要求:
1. 每个子任务应该在 1-3 小时内可以完成
2. 每个子任务有明确、可验证的完成标准
3. 标注子任务之间的依赖关系
4. 按执行顺序排列
请用 JSON 数组返回,每个元素包含:
- sequence: 执行顺序编号
- description: 子任务描述
- success_criteria: 完成标准(具体可验证)
- depends_on: 依赖的子任务编号列表(空列表表示无依赖)
- estimated_hours: 预计执行时间"""
}]
)
subtasks = json.loads(response.content[0].text)
# 持久化子任务清单
conn = sqlite3.connect(self.db_path)
for subtask in subtasks:
conn.execute(
"INSERT INTO subtasks (task_id, sequence, description, success_criteria) VALUES (?, ?, ?, ?)",
(self.task.task_id, subtask["sequence"],
subtask["description"], subtask["success_criteria"])
)
conn.commit()
conn.close()
return subtasks
def execute_subtask(
self,
subtask: dict,
validation_fn: Optional[Callable] = None
) -> bool:
"""执行单个子任务并验证结果"""
print(f"\n开始执行子任务 {subtask['sequence']}: {subtask['description']}")
# 更新状态为运行中
self._update_subtask_status(subtask["sequence"], SubtaskStatus.RUNNING)
# 构建此子任务的上下文
progress_summary = self.memory_manager.get_context_for_claude()
system, messages = build_claude_context(
task=self.task,
current_step=f"子任务 {subtask['sequence']}: {subtask['description']}\n完成标准: {subtask['success_criteria']}",
progress_summary=progress_summary,
relevant_history=self._get_relevant_completed_results(subtask)
)
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096,
system=system,
messages=messages
)
result = response.content[0].text
# 验证结果
is_valid = True
if validation_fn:
is_valid = validation_fn(result, subtask["success_criteria"])
if is_valid:
# 持久化结果
self._save_subtask_result(subtask["sequence"], result)
self.memory_manager.add_step(
subtask["description"],
result[:200], # 只记录摘要
tools_used=[]
)
print(f"子任务 {subtask['sequence']} 完成")
# 每完成 5 个子任务,做一次目标对齐检查
if subtask["sequence"] % 5 == 0:
self._run_alignment_check()
return True
else:
self._update_subtask_status(subtask["sequence"], SubtaskStatus.NEEDS_REVIEW)
print(f"子任务 {subtask['sequence']} 需要人工审查")
return False
def _run_alignment_check(self):
"""运行目标对齐检查"""
current_direction = self.memory_manager.get_context_for_claude(max_recent_steps=3)
completed_summary = "\n".join(self.memory_manager.compressed_summaries[-2:])
result = goal_alignment_check(
self.task, current_direction, completed_summary
)
if result.get("risk_level") == "高" or not result.get("aligned"):
print(f"⚠️ 对齐检查发现风险!偏差:{result.get('deviation')}")
print(f"建议修正:{result.get('correction')}")
# 在实际系统中,这里应该触发告警或暂停任务等待人工干预
def _get_relevant_completed_results(self, current_subtask: dict) -> str:
"""获取与当前子任务相关的已完成子任务结果"""
depends_on = current_subtask.get("depends_on", [])
if not depends_on:
return "(无前置依赖)"
conn = sqlite3.connect(self.db_path)
results = []
for seq in depends_on:
row = conn.execute(
"SELECT description, result FROM subtasks WHERE task_id=? AND sequence=?",
(self.task.task_id, seq)
).fetchone()
if row:
results.append(f"子任务 {seq}({row[0]})的结果:\n{row[1][:300]}...")
conn.close()
return "\n\n".join(results) if results else "(前置子任务结果未找到)"
def _update_subtask_status(self, sequence: int, status: SubtaskStatus):
conn = sqlite3.connect(self.db_path)
conn.execute(
"UPDATE subtasks SET status=? WHERE task_id=? AND sequence=?",
(status.value, self.task.task_id, sequence)
)
conn.commit()
conn.close()
def _save_subtask_result(self, sequence: int, result: str):
conn = sqlite3.connect(self.db_path)
conn.execute(
"UPDATE subtasks SET status='completed', result=?, completed_at=CURRENT_TIMESTAMP WHERE task_id=? AND sequence=?",
(result, self.task.task_id, sequence)
)
conn.commit()
conn.close()
五、检查点设计:任务可恢复是基本要求
30 小时的任务,任何环节都可能出现网络中断、服务异常、或需要人工干预的情况。一个没有恢复机制的 Agent,遇到意外就意味着从头开始——这在生产环境中是不可接受的。
检查点的三个层次
- 子任务级检查点:每个子任务完成后自动创建,包含该子任务的完整结果和执行状态
- 阶段级检查点:每完成一个大的任务阶段(通常由多个子任务组成),创建包含当前整体状态的快照
- 时间级检查点:无论任务进度如何,每隔固定时间(如每小时)自动保存一次状态
import pickle
import hashlib
from pathlib import Path
class CheckpointManager:
"""管理 Agent 任务的检查点,支持任意位置恢复"""
def __init__(self, task_id: str, checkpoint_dir: str = "./checkpoints"):
self.task_id = task_id
self.checkpoint_dir = Path(checkpoint_dir) / task_id
self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
def save_checkpoint(
self,
checkpoint_name: str,
state: dict,
metadata: Optional[dict] = None
) -> str:
"""保存检查点,返回检查点 ID"""
checkpoint_data = {
"task_id": self.task_id,
"checkpoint_name": checkpoint_name,
"timestamp": datetime.now().isoformat(),
"state": state,
"metadata": metadata or {}
}
# 生成唯一的检查点 ID
checkpoint_id = hashlib.md5(
f"{self.task_id}-{checkpoint_name}-{datetime.now().isoformat()}".encode()
).hexdigest()[:8]
checkpoint_path = self.checkpoint_dir / f"{checkpoint_id}-{checkpoint_name}.pkl"
with open(checkpoint_path, 'wb') as f:
pickle.dump(checkpoint_data, f)
print(f"检查点已保存:{checkpoint_name} ({checkpoint_id})")
return checkpoint_id
def load_checkpoint(self, checkpoint_id: str) -> Optional[dict]:
"""从检查点恢复状态"""
matching = list(self.checkpoint_dir.glob(f"{checkpoint_id}-*.pkl"))
if not matching:
print(f"检查点不存在:{checkpoint_id}")
return None
with open(matching[0], 'rb') as f:
checkpoint_data = pickle.load(f)
print(f"从检查点恢复:{checkpoint_data['checkpoint_name']} "
f"({checkpoint_data['timestamp']})")
return checkpoint_data
def list_checkpoints(self) -> list[dict]:
"""列出所有可用的检查点"""
checkpoints = []
for path in sorted(self.checkpoint_dir.glob("*.pkl")):
with open(path, 'rb') as f:
data = pickle.load(f)
checkpoints.append({
"id": path.stem.split('-')[0],
"name": data["checkpoint_name"],
"timestamp": data["timestamp"],
"metadata": data.get("metadata", {})
})
return checkpoints
def resume_from_latest(self) -> Optional[dict]:
"""从最新的检查点恢复(任务中断后自动恢复)"""
checkpoints = self.list_checkpoints()
if not checkpoints:
return None
latest = max(checkpoints, key=lambda x: x["timestamp"])
return self.load_checkpoint(latest["id"])
六、人工监督接口:长任务不能完全自动化
30 小时的完全自动化任务听起来很诱人,但在实际生产中,没有人工监督接口的 Agent 是一个风险极高的设计。以下是几个必须要有的监督机制:
异常触发的暂停机制
class AgentSupervisionGate:
"""
Agent 执行过程中的人工监督门控。
在满足以下条件时,自动暂停任务等待人工确认:
- 对齐检查发现高风险偏差
- 连续失败超过阈值
- 涉及不可逆操作(删除、发布等)
- 置信度低于阈值
"""
def __init__(
self,
max_consecutive_failures: int = 3,
alignment_risk_threshold: str = "高",
notification_fn: Optional[Callable] = None
):
self.max_failures = max_consecutive_failures
self.risk_threshold = alignment_risk_threshold
self.notify = notification_fn or print
self.consecutive_failures = 0
self.paused = False
def check_and_maybe_pause(
self,
subtask_result: dict,
alignment_result: dict,
is_irreversible_operation: bool = False
) -> bool:
"""
检查是否需要暂停。
返回 True 表示可以继续,False 表示需要暂停。
"""
# 连续失败检查
if not subtask_result.get("success"):
self.consecutive_failures += 1
if self.consecutive_failures >= self.max_failures:
self.paused = True
self.notify(
f"⚠️ 任务暂停:连续失败 {self.consecutive_failures} 次,需要人工介入"
)
return False
else:
self.consecutive_failures = 0
# 对齐风险检查
if alignment_result.get("risk_level") == self.risk_threshold:
self.paused = True
self.notify(
f"⚠️ 任务暂停:检测到高风险偏差\n"
f"偏差:{alignment_result.get('deviation')}\n"
f"建议修正:{alignment_result.get('correction')}"
)
return False
# 不可逆操作检查
if is_irreversible_operation:
self.paused = True
self.notify("⚠️ 任务暂停:即将执行不可逆操作,需要人工确认")
return False
return True
def resume(self, human_instruction: Optional[str] = None):
"""人工确认后恢复任务"""
self.paused = False
self.consecutive_failures = 0
if human_instruction:
print(f"收到人工指令:{human_instruction}")
print("任务已恢复")
实时进度报告
每完成一定数量的子任务,生成一份人类可读的进度报告,发送给任务负责人:
def generate_progress_report(
task: AgentTask,
orchestrator: LongTaskOrchestrator,
elapsed_hours: float
) -> str:
"""生成人类可读的任务进度报告"""
conn = sqlite3.connect(orchestrator.db_path)
total = conn.execute(
"SELECT COUNT(*) FROM subtasks WHERE task_id=?",
(task.task_id,)
).fetchone()[0]
completed = conn.execute(
"SELECT COUNT(*) FROM subtasks WHERE task_id=? AND status='completed'",
(task.task_id,)
).fetchone()[0]
failed = conn.execute(
"SELECT COUNT(*) FROM subtasks WHERE task_id=? AND status IN ('failed', 'needs_review')",
(task.task_id,)
).fetchone()[0]
conn.close()
progress_pct = (completed / total * 100) if total > 0 else 0
estimated_remaining = (elapsed_hours / completed * (total - completed)) if completed > 0 else 0
report = f"""
=== 任务进度报告 ===
任务:{task.task_id}
核心目标:{task.core_objective[:100]}...
进度:{completed}/{total} 子任务完成({progress_pct:.1f}%)
失败/待审:{failed} 个子任务
已运行时间:{elapsed_hours:.1f} 小时
预计剩余时间:{estimated_remaining:.1f} 小时
最近完成的工作:
{orchestrator.memory_manager.get_context_for_claude(max_recent_steps=3)}
"""
return report
七、实际运行:把所有组件组合起来
import time
from datetime import datetime
def run_long_task(
task: AgentTask,
alignment_check_interval: int = 5,
progress_report_interval: int = 10
):
"""
执行长任务的主循环。
集成了子任务分解、状态管理、检查点、对齐检查和人工监督。
"""
orchestrator = LongTaskOrchestrator(task, db_path=f"{task.task_id}_state.db")
checkpoint_mgr = CheckpointManager(task.task_id)
supervision = AgentSupervisionGate(
max_consecutive_failures=3,
notification_fn=lambda msg: print(f"\n{'='*50}\n{msg}\n{'='*50}\n")
)
start_time = datetime.now()
# 尝试从检查点恢复
latest_checkpoint = checkpoint_mgr.resume_from_latest()
if latest_checkpoint:
print(f"从检查点恢复:{latest_checkpoint['checkpoint_name']}")
# 在实际系统中,这里需要恢复 orchestrator 的状态
else:
print("开始新任务,分解子任务...")
subtasks = orchestrator.decompose_task()
print(f"任务已分解为 {len(subtasks)} 个子任务")
# 保存初始检查点
checkpoint_mgr.save_checkpoint(
"task_decomposed",
{"subtasks": subtasks, "task": task.__dict__}
)
# 获取待执行的子任务
conn = sqlite3.connect(orchestrator.db_path)
pending_subtasks = conn.execute(
"SELECT sequence, description, success_criteria FROM subtasks "
"WHERE task_id=? AND status='pending' ORDER BY sequence",
(task.task_id,)
).fetchall()
conn.close()
# 主执行循环
for i, (seq, description, criteria) in enumerate(pending_subtasks):
# 检查是否需要暂停
if supervision.paused:
print("任务已暂停,等待人工干预...")
input("按 Enter 键继续...")
supervision.resume()
subtask = {
"sequence": seq,
"description": description,
"success_criteria": criteria,
"depends_on": []
}
# 执行子任务
success = orchestrator.execute_subtask(subtask)
# 检查是否继续
elapsed = (datetime.now() - start_time).total_seconds() / 3600
alignment = {"risk_level": "低", "aligned": True} # 从对齐检查获取
if not supervision.check_and_maybe_pause(
{"success": success},
alignment
):
print(f"任务在第 {seq} 个子任务后暂停")
break
# 定期保存检查点
if seq % 5 == 0:
checkpoint_mgr.save_checkpoint(
f"after_subtask_{seq}",
{"completed_up_to": seq, "elapsed_hours": elapsed}
)
# 定期生成进度报告
if seq % progress_report_interval == 0:
report = generate_progress_report(task, orchestrator, elapsed)
print(report)
time.sleep(1) # 避免触发速率限制
elapsed_total = (datetime.now() - start_time).total_seconds() / 3600
print(f"\n任务执行完成。总耗时:{elapsed_total:.1f} 小时")
# 使用示例
if __name__ == "__main__":
task = AgentTask(
task_id="market-research-2026",
core_objective="对 2026 年中国新能源汽车市场进行全面调研,产出包含竞品分析、用户需求和技术趋势的结构化报告",
constraints=[
"所有数据必须来自公开可靠来源",
"不得包含任何推测性内容,所有结论需有数据支撑",
"报告语言为中文,目标读者是决策层管理者",
"不涉及任何公司内部保密信息"
],
success_criteria="完整的市场报告,包含:市场规模数据、主要竞品分析(至少 5 家)、用户需求调研结论、未来 2 年技术趋势预测"
)
run_long_task(task)
八、总结:让 Agent 保持注意力的核心原则
把所有策略浓缩成几条可以立即落地的原则:
- 核心目标永远在最前面:每次调用 Claude 时,核心目标和约束条件必须是上下文的第一个内容块,通过 Prompt Caching 缓存,确保它在整个任务过程中权重最高
- 状态存在外部,摘要进上下文:完整执行历史存数据库,进入 Claude 上下文的只是压缩摘要,控制每次调用的 Token 消耗
- 定期强制对齐检查:不要期望模型自己发现偏差,主动每隔固定步骤进行一次显式的目标对齐验证
- 子任务化是根本解决方案:把大任务分解成独立可验证的子任务,每个子任务完成后立即验证并持久化,降低错误积累的风险
- 检查点是基本要求:任何超过 1 小时的 Agent 任务,都必须有完整的检查点机制,支持从任意位置恢复
- 保留人工干预接口:在关键决策点、不可逆操作前、对齐检查发现风险时,自动暂停等待人工确认
这些原则不是孤立的技巧,而是构成一套完整的长任务 Agent 稳定性架构。实现其中任何一个都有帮助,但只有组合使用,才能真正让 Claude 4.6 在 30 小时以上的复杂任务中保持可靠的执行质量。
更多关于 Claude API 的 Agent 开发最佳实践和最新功能说明,欢迎访问 Claude Ai中文官网 查阅持续更新的中文开发者文档。
Agent 的稳定性不是靠模型能力撑起来的,而是靠架构设计撑起来的。把状态管理、任务分解、对齐检查做好,Claude 4.6 就能成为你可以信赖的长任务执行引擎。