跳转到内容

第 25 课:Long-running Task Engine

设计支持后台任务、状态流转、取消、日志、重试、结果归档和任务队列的 Codex-like task engine。

Long-running Task Engine 是把一次 agent prompt 升级成可管理任务的外层系统。它不只是“后台跑一个进程”,而是要维护任务状态、队列顺序、取消信号、日志流、重试策略、结果归档和人工审核点。

Pi 的设计哲学明确把 background bash 和任务系统留给 extension、外部工具或宿主来实现。课程要做的是利用 Pi 的 AgentSession、事件订阅、abort、retry、session JSONL、HTML export 和工具队列能力,在外层搭一个 Codex-like task queue。

当用户输入:

Terminal window
forge-code task create "fix flaky test"

系统不能只启动一个异步函数然后忘掉它。它至少要回答:

  1. 任务现在是 queued、running、needs-review、done、failed 还是 cancelled?
  2. 用户如何查看实时日志和最后结果?
  3. 用户取消时,模型请求、工具执行、bash 命令和重试等待是否都会停下?
  4. 失败后什么时候自动重试,什么时候进入人工审核?
  5. 任务完成后,diff、测试证据、review findings 和 session trace 存在哪里?

这些问题合起来,就是 Long-running Task Engine。

把任务引擎拆成七层:

  • task record:任务 id、prompt、status、attempts、cwd、session file、trace path。
  • queue:决定哪个任务先运行,以及同一 repo 是否允许并行。
  • worker:从队列取任务,创建 worktree/session,调用 agent。
  • cancellation:用 AbortController 或 runtime abort 把取消传播给模型、工具和重试等待。
  • log stream:把 agent event、命令输出、状态变更写成 jsonl。
  • retry policy:区分 transient failure、测试失败、权限拒绝和用户取消。
  • archive:任务结束后保存 diff、PR body、review report、HTML/session export。

任务引擎的难点在状态,而不是启动进程。每个状态转移都要可解释、可恢复、可查询。

课程 starter /Users/ryanchen/codespace/pi-agent-course-lab/src/15-task-engine.ts 已经定义了最小状态:

type TaskStatus = "queued" | "running" | "needs-review" | "done" | "failed" | "cancelled";

这比一个 boolean done 更真实,因为 agent 任务通常不是直接成功:它可能跑完实现但还需要 reviewer 审查,也可能测试失败但可以重试。

starter 的 transition(task, event) 是本课关键。所有状态变化都应该经过它,而不是到处写:

task.status = "done";

原因很简单:状态机是恢复和审计的核心。只有 transition 集中,系统才能拒绝非法变化,例如 queued -> donecancelled -> running

队列不只是数组。至少要考虑:

  • 同一 repo 是否只能跑一个写任务。
  • 只读 review 任务能否并行。
  • 高优先级任务是否能插队。
  • worker 崩溃后 running 任务如何恢复或标记 failed。

课堂版本可以先做内存队列:

const queue: AgentTask[] = [];
const running = new Map<string, AbortController>();

生产版本再换成 SQLite、Postgres、Redis 或文件锁。

Pi 的 AgentSession.abort() 会中止 agent 并等待 idle,abortRetry()abortCompaction()abortBash() 分别处理重试、压缩和 bash。任务引擎应该把用户取消映射成一个统一事件:

task cancel -> controller.abort() -> session.abort() -> transition(cancel)

取消不是失败。用户主动取消的任务不应该自动重试。

长任务要有两个层次的日志:

  • runtime log:状态变化、worker 启停、重试、取消、错误栈。
  • agent trace:AgentSession.subscribe() 看到的 message、tool execution、queue update、retry event、compaction event。

如果日志只打印在终端,后台任务结束后就丢了。至少应该写到 tasks/<id>/events.jsonl

Pi 的 AgentSession 已有 auto-retry 逻辑,用于 provider transient error、rate limit、网络问题等。任务引擎外层还需要一层任务级 retry:

  • provider transient error:可自动重试。
  • 测试失败:通常不自动重跑同一个实现,而是让 agent 修复后进入下一 attempt。
  • 权限拒绝:等待用户审批,不应该无限重试。
  • 用户取消:不重试。

重试次数进入 attempts,每次 attempt 都要有单独日志和结果。

任务结束时不要只保存最后一句回答。应归档:

  • diff.patch
  • test-evidence.json
  • review-findings.json
  • pr-body.md
  • session.jsonl 或 HTML export
  • summary.md

Pi 的 SessionManagerexportSessionToHtml() 已经提供 session 持久化与导出思路。任务引擎可以把这些产物放进同一个 task artifact 目录。

本课实践入口:

/Users/ryanchen/codespace/pi-agent-course-lab/src/15-task-engine.ts

starter 演示了状态转移:

const task: AgentTask = {
id: "t1",
prompt: "fix flaky test",
status: "queued",
attempts: 0,
};
const running = transition(task, "start");
const review = transition(running, "finish");

变量流要看清楚:

  • task.status 决定允许哪些 event。
  • start 会把 attempts 加一,说明一次新的执行开始。
  • finish 不直接变 done,而是进入 needs-review
  • approve 才把 needs-review 变成 done
  • cancel 可以从任何阶段进入 cancelled,但真实系统里应记录取消来源。

课堂上把任务引擎推演成一个 worker loop:

async function runWorker(queue: TaskQueue) {
while (true) {
const task = await queue.take();
const controller = new AbortController();
queue.update(transition(task, "start"));
log(task.id, { type: "task_start", attempt: task.attempts + 1 });
try {
const session = await createTaskSession(task, controller.signal);
session.subscribe((event) => log(task.id, { type: "agent_event", event }));
await session.prompt(task.prompt);
await runEvidenceCommands(task);
archiveResult(task);
queue.update(transition(task, "finish"));
} catch (error) {
if (controller.signal.aborted) {
queue.update(transition(task, "cancel"));
} else if (shouldRetry(error, task.attempts)) {
queue.requeue({ ...task, status: "queued" });
} else {
queue.update(transition(task, "fail"));
}
}
}
}

这段代码有四个控制点:

第一,controller 是取消信号的源头,任务取消要能传到 session、bash、测试命令和 retry sleep。

第二,session.subscribe() 把 agent event 变成持久日志,后续 task trace <id> 才有内容可看。

第三,finish 进入 needs-review,因为后台任务完成实现不等于可以合并。

第四,archiveResult(task) 要在状态变成 needs-review 前完成,否则用户看到“等待审查”但没有可审查材料。

  • /Users/ryanchen/codespace/pi-agent-course-lab/src/15-task-engine.ts:本课 lab starter,包含 TaskStatusAgentTasktransition()
  • /Users/ryanchen/codespace/external-sources/pi/packages/coding-agent/src/core/agent-session.ts at 61babc2AgentSession 定义 queue_updateauto_retry_start/endabort()abortRetry()abortCompaction()abortBash() 等长任务需要监听和调用的控制点。
  • /Users/ryanchen/codespace/external-sources/pi/packages/coding-agent/test/suite/agent-session-queue.test.ts at 61babc2:覆盖 steering/follow-up queue 行为,适合理解 agent 内部消息队列与任务队列的区别。
  • /Users/ryanchen/codespace/external-sources/pi/packages/coding-agent/test/agent-session-auto-compaction-queue.test.ts at 61babc2:展示 auto-compaction 与队列消息的交互,说明长任务恢复时不能忽略 queued messages。
  • /Users/ryanchen/codespace/external-sources/pi/packages/coding-agent/src/core/session-manager.ts at 61babc2:session JSONL、appendMessage()appendCustomEntry()forkFrom() 是任务结果归档和恢复的基础。
  • /Users/ryanchen/codespace/external-sources/pi/packages/coding-agent/src/core/export-html/index.ts at 61babc2exportSessionToHtml()exportFromFile() 可把任务 session 导出成可审查 artifact。
  • /Users/ryanchen/codespace/external-sources/pi/packages/coding-agent/src/core/tools/file-mutation-queue.ts at 61babc2withFileMutationQueue() 展示同一文件写操作串行、不同文件并行的队列设计,可迁移到任务并发控制。
  • /Users/ryanchen/codespace/external-sources/pi/packages/coding-agent/docs/usage.md at 61babc2:说明 Pi 核心不内置 background bash,任务引擎应作为外层 workflow 实现。

学完本课后,你应该能做到:

  • 画出 queued、running、needs-review、done、failed、cancelled 的状态机。
  • 解释任务队列和 AgentSession 内部 steering/follow-up queue 的区别。
  • 设计取消信号如何传递到模型、工具、bash、测试命令和 retry。
  • 区分 provider transient retry、测试失败修复、权限等待和用户取消。
  • 设计一个任务 artifact 目录,能保存 diff、测试证据、review findings、PR body 和 session trace。
  1. 实践题:扩展 /Users/ryanchen/codespace/pi-agent-course-lab/src/15-task-engine.ts,新增 retry event,并限制最多 3 次 attempt;当任务处于 cancelleddone 时拒绝任何新的状态转移。
  2. 思考题:为什么长任务完成实现后更适合进入 needs-review,而不是直接进入 done?请结合测试证据、PR 审查和用户确认点回答。