把 Claude 4.6 接入 Agent 系统处理长时间任务,开发者迟早都会遇到同一个问题:任务跑着跑着,Claude 开始偏离最初的目标。轻则输出风格漂移,重则完全忘记了任务的核心约束,甚至开始重复已经完成的步骤。

这不是 Claude 能力不够,而是大语言模型在超长推理链中的固有特性——上下文越长,早期信息的权重越低,”注意力”自然向近期内容倾斜。一个设计良好的 Agent 架构,应该从根本上解决这个问题,而不是寄希望于模型自己保持一致。

本文由 Claude Ai中文官网 整理,从上下文管理、状态持久化、任务分解到检查点设计,系统说明如何让 Claude 4.6 驱动的 Agent 在超过 30 小时的长任务中保持目标一致性和执行稳定性。

本文面向通过 Anthropic API 构建 Agent 系统的开发者,内容以 Claude 4.6 系列为基础,部分架构模式同样适用于其他长上下文模型。具体 API 参数以 Claude Ai中文官网 最新文档为准。

一、理解问题根源:为什么 Agent 任务会”跑偏”

在设计解决方案之前,先把问题的根源说清楚。Claude 4.6 在 Agent 长任务中出现注意力漂移,通常来自三个独立的机制,需要分别应对:

机制 1:上下文窗口的位置偏差

即使是 200K Token 的上下文窗口,模型对不同位置内容的处理权重也不均等。随着任务执行,早期的任务说明和约束条件被越来越多的中间步骤推向上下文前端,模型对这部分的关注度逐渐降低。当累积的执行历史超过几万 Token 后,最初定义的核心目标开始变得”模糊”。

机制 2:错误积累和方向漂移

Agent 的每一步决策都基于之前所有步骤的输出。如果某个中间步骤引入了细微的偏差,后续步骤会把这个偏差当作事实继续推进,错误会以指数级放大。在 30 小时的任务中,即使每小时只有 0.5% 的漂移,累积效果也会让最终结果面目全非。

机制 3:工具调用结果的噪音积累

Agent 调用外部工具(搜索、数据库查询、API 调用)时,返回的结果会进入上下文。随着任务进行,大量的工具调用结果积累在上下文中,其中很多是已经处理过的中间信息。这些”已完成的工作记录”会稀释当前真正需要关注的信息密度。

二、核心架构:把”记忆”从上下文里独立出来

解决以上三个问题的核心思路只有一个:把 Agent 的状态管理从模型上下文中独立出来,用外部存储承载长期记忆,让每次 Claude 调用保持在一个”干净”的、信息密度高的上下文中。

这个架构模式通常被称为”状态-上下文分离”,是构建长任务 Agent 的基础设计原则。

架构示意

┌─────────────────────────────────────────────────┐
│                  Agent 调度层                     │
│  ┌──────────┐  ┌──────────┐  ┌──────────────┐   │
│  │ 任务状态  │  │ 进度跟踪  │  │  错误处理    │   │
│  └──────────┘  └──────────┘  └──────────────┘   │
└─────────────────────────────────────────────────┘
         │                          │
         ▼                          ▼
┌─────────────────┐      ┌─────────────────────┐
│   外部状态存储   │      │   Claude 4.6 调用    │
│  (Redis / DB)   │      │  [精简后的上下文]    │
│  - 任务定义      │      │  - 核心目标(固定)  │
│  - 完成步骤      │      │  - 当前子任务        │
│  - 中间结果      │      │  - 相关历史摘要      │
│  - 检查点        │      │  - 工具调用结果      │
└─────────────────┘      └─────────────────────┘

关键设计原则

  • 每次 Claude 调用的上下文长度应该是受控的:无论任务运行了多久,每次调用 Claude 时传入的上下文都应该维持在一个合理的范围内(通常 20K–50K Token),而不是把所有历史都堆进去
  • 核心目标永远在上下文最前面:任务定义、核心约束、不可违背的规则,这些内容在每次调用时都应该是上下文的第一个内容块,且通过 Prompt Caching 缓存
  • 状态存在外部,摘要进上下文:完整的执行历史存在数据库或 Redis,进入 Claude 上下文的只是经过压缩的摘要

三、实现:上下文管理的具体方法

方法 1:固定核心目标区块 + Prompt Caching

把任务的核心目标和约束条件设计为一个固定的、不随任务进行而变化的内容块,放在系统提示词的最前面,并标记为可缓存:

import anthropic
from dataclasses import dataclass
from typing import Optional

client = anthropic.Anthropic()

@dataclass
class AgentTask:
    task_id: str
    core_objective: str       # 核心目标(不变)
    constraints: list[str]    # 约束条件(不变)
    success_criteria: str     # 成功标准(不变)

def build_claude_context(
    task: AgentTask,
    current_step: str,
    progress_summary: str,
    relevant_history: str,
    current_tool_results: Optional[str] = None
) -> list[dict]:
    """
    构建每次调用 Claude 时的上下文。
    核心目标区块固定且缓存,其余内容随任务状态动态构建。
    """

    constraints_text = "\n".join(
        f"- {c}" for c in task.constraints
    )

    # 系统提示词:核心目标区块(固定,缓存)
    system = [
        {
            "type": "text",
            "text": f"""你是一个执行长期任务的 AI Agent。

【核心目标】(在整个任务过程中始终优先遵守)
{task.core_objective}

【不可违背的约束】
{constraints_text}

【成功标准】
{task.success_criteria}

每次响应前,先确认你的行动是否符合核心目标和约束条件。
如果发现当前方向与核心目标偏离,立即说明并修正方向。""",
            "cache_control": {"type": "ephemeral"}  # 核心目标缓存
        }
    ]

    # 动态上下文:进度摘要 + 当前步骤 + 相关历史
    user_content = f"""【当前进度摘要】
{progress_summary}

【当前需要执行的步骤】
{current_step}

【相关历史信息】
{relevant_history}
"""

    if current_tool_results:
        user_content += f"\n【工具调用结果】\n{current_tool_results}"

    user_content += "\n\n请基于以上信息执行当前步骤,并说明下一步计划。"

    return system, [{"role": "user", "content": user_content}]

方法 2:滚动摘要——压缩历史,保留关键

每隔一定步骤数,对已完成的任务历史做一次压缩摘要,用摘要替代原始执行记录:

import json
from datetime import datetime

class AgentMemoryManager:
    """管理 Agent 的长期记忆,定期压缩历史,防止上下文膨胀"""

    def __init__(self, compress_every_n_steps: int = 10):
        self.steps: list[dict] = []
        self.compressed_summaries: list[str] = []
        self.compress_threshold = compress_every_n_steps

    def add_step(self, step_description: str, result: str, tools_used: list[str]):
        """记录一个执行步骤"""
        self.steps.append({
            "timestamp": datetime.now().isoformat(),
            "step": step_description,
            "result": result,
            "tools": tools_used
        })

        # 达到压缩阈值时,自动压缩旧步骤
        if len(self.steps) >= self.compress_threshold:
            self._compress_old_steps()

    def _compress_old_steps(self):
        """调用 Claude 对旧步骤做摘要压缩"""
        steps_to_compress = self.steps[:self.compress_threshold]
        steps_text = "\n".join([
            f"步骤 {i+1}:{s['step']}\n结果:{s['result']}"
            for i, s in enumerate(steps_to_compress)
        ])

        response = client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=500,
            messages=[{
                "role": "user",
                "content": f"""请将以下执行步骤压缩为简洁摘要。
保留:已完成的关键操作、重要发现、需要后续步骤参考的信息。
去除:过程细节、工具调用的原始输出、已经不再需要的中间状态。
摘要不超过 200 字。

{steps_text}"""
            }]
        )

        summary = response.content[0].text
        self.compressed_summaries.append(summary)

        # 保留最近的步骤,清除已压缩的旧步骤
        self.steps = self.steps[self.compress_threshold:]

    def get_context_for_claude(self, max_recent_steps: int = 5) -> str:
        """获取适合放入 Claude 上下文的进度信息"""
        parts = []

        # 加入压缩摘要(通常只需要最近 2-3 个摘要)
        if self.compressed_summaries:
            recent_summaries = self.compressed_summaries[-3:]
            parts.append("【历史进度摘要】\n" + "\n\n".join(recent_summaries))

        # 加入最近几步的详细记录
        if self.steps:
            recent = self.steps[-max_recent_steps:]
            recent_text = "\n".join([
                f"- {s['step']}:{s['result'][:100]}..."
                for s in recent
            ])
            parts.append(f"【最近执行步骤】\n{recent_text}")

        return "\n\n".join(parts)

方法 3:目标锚定检查——定期强制对齐

每隔固定的步骤数,强制插入一次”目标对齐检查”,让 Claude 显式确认当前方向是否符合核心目标:

def goal_alignment_check(
    task: AgentTask,
    current_direction: str,
    completed_steps_summary: str,
    check_interval: int = 15
) -> dict:
    """
    定期执行目标对齐检查。
    让 Claude 显式评估当前执行方向是否偏离核心目标。
    """

    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=800,
        system=f"""你是一个严格的任务监督者。
你的职责是检查 Agent 的执行方向是否偏离了核心目标。

核心目标:{task.core_objective}
约束条件:{chr(10).join(task.constraints)}
成功标准:{task.success_criteria}""",
        messages=[{
            "role": "user",
            "content": f"""请评估以下执行情况:

当前执行方向:
{current_direction}

已完成工作摘要:
{completed_steps_summary}

请回答:
1. 当前方向是否与核心目标一致?(是/否/部分一致)
2. 如果有偏差,具体偏差在哪里?
3. 建议的修正方向是什么?(如果需要修正)
4. 继续当前方向是否有风险?(高/中/低)

请用 JSON 格式返回,键名为:aligned、deviation、correction、risk_level"""
        }]
    )

    try:
        result = json.loads(response.content[0].text)
        return result
    except json.JSONDecodeError:
        # 如果 JSON 解析失败,返回保守的结果触发人工检查
        return {
            "aligned": False,
            "deviation": "无法解析对齐检查结果",
            "correction": "建议人工检查",
            "risk_level": "高"
        }

四、任务分解:把 30 小时任务变成可管理的子任务链

另一个根本性的应对策略是从任务设计层面解决问题:不要把 30 小时的任务当作一个整体交给 Agent,而是在任务开始时就把它分解成有明确边界的子任务,每个子任务独立执行、独立验证、独立存档。

任务分解的原则

  • 每个子任务有明确的完成标准:不是”做这件事”,而是”做这件事,完成的判断标准是X”
  • 子任务之间有明确的依赖关系:哪些必须按顺序,哪些可以并行
  • 每个子任务完成后立即验证:在进入下一个子任务之前,确认当前子任务的输出符合预期
  • 子任务结果持久化:每个子任务的输出存储到外部系统,不依赖上下文保存
import sqlite3
from enum import Enum
from typing import Callable

class SubtaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    NEEDS_REVIEW = "needs_review"

class LongTaskOrchestrator:
    """
    长任务编排器:把大任务分解为子任务链,
    每个子任务独立执行和验证。
    """

    def __init__(self, task: AgentTask, db_path: str = "agent_state.db"):
        self.task = task
        self.db_path = db_path
        self.memory_manager = AgentMemoryManager(compress_every_n_steps=10)
        self._init_db()

    def _init_db(self):
        """初始化状态存储数据库"""
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS subtasks (
                id INTEGER PRIMARY KEY,
                task_id TEXT,
                sequence INTEGER,
                description TEXT,
                success_criteria TEXT,
                status TEXT DEFAULT 'pending',
                result TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                completed_at TIMESTAMP
            )
        """)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS checkpoints (
                id INTEGER PRIMARY KEY,
                task_id TEXT,
                checkpoint_name TEXT,
                state_snapshot TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        conn.commit()
        conn.close()

    def decompose_task(self) -> list[dict]:
        """让 Claude 把大任务分解为子任务清单"""
        response = client.messages.create(
            model="claude-opus-4-6",  # 任务分解用最强模型
            max_tokens=3000,
            messages=[{
                "role": "user",
                "content": f"""请将以下长期任务分解为具体的子任务清单。

核心目标:{self.task.core_objective}
约束条件:{chr(10).join(self.task.constraints)}
成功标准:{self.task.success_criteria}

分解要求:
1. 每个子任务应该在 1-3 小时内可以完成
2. 每个子任务有明确、可验证的完成标准
3. 标注子任务之间的依赖关系
4. 按执行顺序排列

请用 JSON 数组返回,每个元素包含:
- sequence: 执行顺序编号
- description: 子任务描述
- success_criteria: 完成标准(具体可验证)
- depends_on: 依赖的子任务编号列表(空列表表示无依赖)
- estimated_hours: 预计执行时间"""
            }]
        )

        subtasks = json.loads(response.content[0].text)

        # 持久化子任务清单
        conn = sqlite3.connect(self.db_path)
        for subtask in subtasks:
            conn.execute(
                "INSERT INTO subtasks (task_id, sequence, description, success_criteria) VALUES (?, ?, ?, ?)",
                (self.task.task_id, subtask["sequence"],
                 subtask["description"], subtask["success_criteria"])
            )
        conn.commit()
        conn.close()

        return subtasks

    def execute_subtask(
        self,
        subtask: dict,
        validation_fn: Optional[Callable] = None
    ) -> bool:
        """执行单个子任务并验证结果"""

        print(f"\n开始执行子任务 {subtask['sequence']}: {subtask['description']}")

        # 更新状态为运行中
        self._update_subtask_status(subtask["sequence"], SubtaskStatus.RUNNING)

        # 构建此子任务的上下文
        progress_summary = self.memory_manager.get_context_for_claude()

        system, messages = build_claude_context(
            task=self.task,
            current_step=f"子任务 {subtask['sequence']}: {subtask['description']}\n完成标准: {subtask['success_criteria']}",
            progress_summary=progress_summary,
            relevant_history=self._get_relevant_completed_results(subtask)
        )

        response = client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=4096,
            system=system,
            messages=messages
        )

        result = response.content[0].text

        # 验证结果
        is_valid = True
        if validation_fn:
            is_valid = validation_fn(result, subtask["success_criteria"])

        if is_valid:
            # 持久化结果
            self._save_subtask_result(subtask["sequence"], result)
            self.memory_manager.add_step(
                subtask["description"],
                result[:200],  # 只记录摘要
                tools_used=[]
            )
            print(f"子任务 {subtask['sequence']} 完成")

            # 每完成 5 个子任务,做一次目标对齐检查
            if subtask["sequence"] % 5 == 0:
                self._run_alignment_check()

            return True
        else:
            self._update_subtask_status(subtask["sequence"], SubtaskStatus.NEEDS_REVIEW)
            print(f"子任务 {subtask['sequence']} 需要人工审查")
            return False

    def _run_alignment_check(self):
        """运行目标对齐检查"""
        current_direction = self.memory_manager.get_context_for_claude(max_recent_steps=3)
        completed_summary = "\n".join(self.memory_manager.compressed_summaries[-2:])

        result = goal_alignment_check(
            self.task, current_direction, completed_summary
        )

        if result.get("risk_level") == "高" or not result.get("aligned"):
            print(f"⚠️ 对齐检查发现风险!偏差:{result.get('deviation')}")
            print(f"建议修正:{result.get('correction')}")
            # 在实际系统中,这里应该触发告警或暂停任务等待人工干预

    def _get_relevant_completed_results(self, current_subtask: dict) -> str:
        """获取与当前子任务相关的已完成子任务结果"""
        depends_on = current_subtask.get("depends_on", [])
        if not depends_on:
            return "(无前置依赖)"

        conn = sqlite3.connect(self.db_path)
        results = []
        for seq in depends_on:
            row = conn.execute(
                "SELECT description, result FROM subtasks WHERE task_id=? AND sequence=?",
                (self.task.task_id, seq)
            ).fetchone()
            if row:
                results.append(f"子任务 {seq}({row[0]})的结果:\n{row[1][:300]}...")
        conn.close()

        return "\n\n".join(results) if results else "(前置子任务结果未找到)"

    def _update_subtask_status(self, sequence: int, status: SubtaskStatus):
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            "UPDATE subtasks SET status=? WHERE task_id=? AND sequence=?",
            (status.value, self.task.task_id, sequence)
        )
        conn.commit()
        conn.close()

    def _save_subtask_result(self, sequence: int, result: str):
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            "UPDATE subtasks SET status='completed', result=?, completed_at=CURRENT_TIMESTAMP WHERE task_id=? AND sequence=?",
            (result, self.task.task_id, sequence)
        )
        conn.commit()
        conn.close()

五、检查点设计:任务可恢复是基本要求

30 小时的任务,任何环节都可能出现网络中断、服务异常、或需要人工干预的情况。一个没有恢复机制的 Agent,遇到意外就意味着从头开始——这在生产环境中是不可接受的。

检查点的三个层次

  • 子任务级检查点:每个子任务完成后自动创建,包含该子任务的完整结果和执行状态
  • 阶段级检查点:每完成一个大的任务阶段(通常由多个子任务组成),创建包含当前整体状态的快照
  • 时间级检查点:无论任务进度如何,每隔固定时间(如每小时)自动保存一次状态
import pickle
import hashlib
from pathlib import Path

class CheckpointManager:
    """管理 Agent 任务的检查点,支持任意位置恢复"""

    def __init__(self, task_id: str, checkpoint_dir: str = "./checkpoints"):
        self.task_id = task_id
        self.checkpoint_dir = Path(checkpoint_dir) / task_id
        self.checkpoint_dir.mkdir(parents=True, exist_ok=True)

    def save_checkpoint(
        self,
        checkpoint_name: str,
        state: dict,
        metadata: Optional[dict] = None
    ) -> str:
        """保存检查点,返回检查点 ID"""

        checkpoint_data = {
            "task_id": self.task_id,
            "checkpoint_name": checkpoint_name,
            "timestamp": datetime.now().isoformat(),
            "state": state,
            "metadata": metadata or {}
        }

        # 生成唯一的检查点 ID
        checkpoint_id = hashlib.md5(
            f"{self.task_id}-{checkpoint_name}-{datetime.now().isoformat()}".encode()
        ).hexdigest()[:8]

        checkpoint_path = self.checkpoint_dir / f"{checkpoint_id}-{checkpoint_name}.pkl"

        with open(checkpoint_path, 'wb') as f:
            pickle.dump(checkpoint_data, f)

        print(f"检查点已保存:{checkpoint_name} ({checkpoint_id})")
        return checkpoint_id

    def load_checkpoint(self, checkpoint_id: str) -> Optional[dict]:
        """从检查点恢复状态"""
        matching = list(self.checkpoint_dir.glob(f"{checkpoint_id}-*.pkl"))

        if not matching:
            print(f"检查点不存在:{checkpoint_id}")
            return None

        with open(matching[0], 'rb') as f:
            checkpoint_data = pickle.load(f)

        print(f"从检查点恢复:{checkpoint_data['checkpoint_name']} "
              f"({checkpoint_data['timestamp']})")
        return checkpoint_data

    def list_checkpoints(self) -> list[dict]:
        """列出所有可用的检查点"""
        checkpoints = []
        for path in sorted(self.checkpoint_dir.glob("*.pkl")):
            with open(path, 'rb') as f:
                data = pickle.load(f)
                checkpoints.append({
                    "id": path.stem.split('-')[0],
                    "name": data["checkpoint_name"],
                    "timestamp": data["timestamp"],
                    "metadata": data.get("metadata", {})
                })
        return checkpoints

    def resume_from_latest(self) -> Optional[dict]:
        """从最新的检查点恢复(任务中断后自动恢复)"""
        checkpoints = self.list_checkpoints()
        if not checkpoints:
            return None

        latest = max(checkpoints, key=lambda x: x["timestamp"])
        return self.load_checkpoint(latest["id"])

六、人工监督接口:长任务不能完全自动化

30 小时的完全自动化任务听起来很诱人,但在实际生产中,没有人工监督接口的 Agent 是一个风险极高的设计。以下是几个必须要有的监督机制:

异常触发的暂停机制

class AgentSupervisionGate:
    """
    Agent 执行过程中的人工监督门控。
    在满足以下条件时,自动暂停任务等待人工确认:
    - 对齐检查发现高风险偏差
    - 连续失败超过阈值
    - 涉及不可逆操作(删除、发布等)
    - 置信度低于阈值
    """

    def __init__(
        self,
        max_consecutive_failures: int = 3,
        alignment_risk_threshold: str = "高",
        notification_fn: Optional[Callable] = None
    ):
        self.max_failures = max_consecutive_failures
        self.risk_threshold = alignment_risk_threshold
        self.notify = notification_fn or print
        self.consecutive_failures = 0
        self.paused = False

    def check_and_maybe_pause(
        self,
        subtask_result: dict,
        alignment_result: dict,
        is_irreversible_operation: bool = False
    ) -> bool:
        """
        检查是否需要暂停。
        返回 True 表示可以继续,False 表示需要暂停。
        """

        # 连续失败检查
        if not subtask_result.get("success"):
            self.consecutive_failures += 1
            if self.consecutive_failures >= self.max_failures:
                self.paused = True
                self.notify(
                    f"⚠️ 任务暂停:连续失败 {self.consecutive_failures} 次,需要人工介入"
                )
                return False
        else:
            self.consecutive_failures = 0

        # 对齐风险检查
        if alignment_result.get("risk_level") == self.risk_threshold:
            self.paused = True
            self.notify(
                f"⚠️ 任务暂停:检测到高风险偏差\n"
                f"偏差:{alignment_result.get('deviation')}\n"
                f"建议修正:{alignment_result.get('correction')}"
            )
            return False

        # 不可逆操作检查
        if is_irreversible_operation:
            self.paused = True
            self.notify("⚠️ 任务暂停:即将执行不可逆操作,需要人工确认")
            return False

        return True

    def resume(self, human_instruction: Optional[str] = None):
        """人工确认后恢复任务"""
        self.paused = False
        self.consecutive_failures = 0
        if human_instruction:
            print(f"收到人工指令:{human_instruction}")
        print("任务已恢复")

实时进度报告

每完成一定数量的子任务,生成一份人类可读的进度报告,发送给任务负责人:

def generate_progress_report(
    task: AgentTask,
    orchestrator: LongTaskOrchestrator,
    elapsed_hours: float
) -> str:
    """生成人类可读的任务进度报告"""

    conn = sqlite3.connect(orchestrator.db_path)

    total = conn.execute(
        "SELECT COUNT(*) FROM subtasks WHERE task_id=?",
        (task.task_id,)
    ).fetchone()[0]

    completed = conn.execute(
        "SELECT COUNT(*) FROM subtasks WHERE task_id=? AND status='completed'",
        (task.task_id,)
    ).fetchone()[0]

    failed = conn.execute(
        "SELECT COUNT(*) FROM subtasks WHERE task_id=? AND status IN ('failed', 'needs_review')",
        (task.task_id,)
    ).fetchone()[0]

    conn.close()

    progress_pct = (completed / total * 100) if total > 0 else 0
    estimated_remaining = (elapsed_hours / completed * (total - completed)) if completed > 0 else 0

    report = f"""
=== 任务进度报告 ===
任务:{task.task_id}
核心目标:{task.core_objective[:100]}...

进度:{completed}/{total} 子任务完成({progress_pct:.1f}%)
失败/待审:{failed} 个子任务
已运行时间:{elapsed_hours:.1f} 小时
预计剩余时间:{estimated_remaining:.1f} 小时

最近完成的工作:
{orchestrator.memory_manager.get_context_for_claude(max_recent_steps=3)}
"""
    return report

七、实际运行:把所有组件组合起来

import time
from datetime import datetime

def run_long_task(
    task: AgentTask,
    alignment_check_interval: int = 5,
    progress_report_interval: int = 10
):
    """
    执行长任务的主循环。
    集成了子任务分解、状态管理、检查点、对齐检查和人工监督。
    """

    orchestrator = LongTaskOrchestrator(task, db_path=f"{task.task_id}_state.db")
    checkpoint_mgr = CheckpointManager(task.task_id)
    supervision = AgentSupervisionGate(
        max_consecutive_failures=3,
        notification_fn=lambda msg: print(f"\n{'='*50}\n{msg}\n{'='*50}\n")
    )

    start_time = datetime.now()

    # 尝试从检查点恢复
    latest_checkpoint = checkpoint_mgr.resume_from_latest()
    if latest_checkpoint:
        print(f"从检查点恢复:{latest_checkpoint['checkpoint_name']}")
        # 在实际系统中,这里需要恢复 orchestrator 的状态
    else:
        print("开始新任务,分解子任务...")
        subtasks = orchestrator.decompose_task()
        print(f"任务已分解为 {len(subtasks)} 个子任务")

        # 保存初始检查点
        checkpoint_mgr.save_checkpoint(
            "task_decomposed",
            {"subtasks": subtasks, "task": task.__dict__}
        )

    # 获取待执行的子任务
    conn = sqlite3.connect(orchestrator.db_path)
    pending_subtasks = conn.execute(
        "SELECT sequence, description, success_criteria FROM subtasks "
        "WHERE task_id=? AND status='pending' ORDER BY sequence",
        (task.task_id,)
    ).fetchall()
    conn.close()

    # 主执行循环
    for i, (seq, description, criteria) in enumerate(pending_subtasks):

        # 检查是否需要暂停
        if supervision.paused:
            print("任务已暂停,等待人工干预...")
            input("按 Enter 键继续...")
            supervision.resume()

        subtask = {
            "sequence": seq,
            "description": description,
            "success_criteria": criteria,
            "depends_on": []
        }

        # 执行子任务
        success = orchestrator.execute_subtask(subtask)

        # 检查是否继续
        elapsed = (datetime.now() - start_time).total_seconds() / 3600
        alignment = {"risk_level": "低", "aligned": True}  # 从对齐检查获取

        if not supervision.check_and_maybe_pause(
            {"success": success},
            alignment
        ):
            print(f"任务在第 {seq} 个子任务后暂停")
            break

        # 定期保存检查点
        if seq % 5 == 0:
            checkpoint_mgr.save_checkpoint(
                f"after_subtask_{seq}",
                {"completed_up_to": seq, "elapsed_hours": elapsed}
            )

        # 定期生成进度报告
        if seq % progress_report_interval == 0:
            report = generate_progress_report(task, orchestrator, elapsed)
            print(report)

        time.sleep(1)  # 避免触发速率限制

    elapsed_total = (datetime.now() - start_time).total_seconds() / 3600
    print(f"\n任务执行完成。总耗时:{elapsed_total:.1f} 小时")


# 使用示例
if __name__ == "__main__":
    task = AgentTask(
        task_id="market-research-2026",
        core_objective="对 2026 年中国新能源汽车市场进行全面调研,产出包含竞品分析、用户需求和技术趋势的结构化报告",
        constraints=[
            "所有数据必须来自公开可靠来源",
            "不得包含任何推测性内容,所有结论需有数据支撑",
            "报告语言为中文,目标读者是决策层管理者",
            "不涉及任何公司内部保密信息"
        ],
        success_criteria="完整的市场报告,包含:市场规模数据、主要竞品分析(至少 5 家)、用户需求调研结论、未来 2 年技术趋势预测"
    )

    run_long_task(task)

八、总结:让 Agent 保持注意力的核心原则

把所有策略浓缩成几条可以立即落地的原则:

  • 核心目标永远在最前面:每次调用 Claude 时,核心目标和约束条件必须是上下文的第一个内容块,通过 Prompt Caching 缓存,确保它在整个任务过程中权重最高
  • 状态存在外部,摘要进上下文:完整执行历史存数据库,进入 Claude 上下文的只是压缩摘要,控制每次调用的 Token 消耗
  • 定期强制对齐检查:不要期望模型自己发现偏差,主动每隔固定步骤进行一次显式的目标对齐验证
  • 子任务化是根本解决方案:把大任务分解成独立可验证的子任务,每个子任务完成后立即验证并持久化,降低错误积累的风险
  • 检查点是基本要求:任何超过 1 小时的 Agent 任务,都必须有完整的检查点机制,支持从任意位置恢复
  • 保留人工干预接口:在关键决策点、不可逆操作前、对齐检查发现风险时,自动暂停等待人工确认

这些原则不是孤立的技巧,而是构成一套完整的长任务 Agent 稳定性架构。实现其中任何一个都有帮助,但只有组合使用,才能真正让 Claude 4.6 在 30 小时以上的复杂任务中保持可靠的执行质量。

更多关于 Claude API 的 Agent 开发最佳实践和最新功能说明,欢迎访问 Claude Ai中文官网 查阅持续更新的中文开发者文档。

Agent 的稳定性不是靠模型能力撑起来的,而是靠架构设计撑起来的。把状态管理、任务分解、对齐检查做好,Claude 4.6 就能成为你可以信赖的长任务执行引擎。