StreamingToolExecutor流式工具执行解析
StreamingToolExecutor流式工具执行解析
1. 这份文档回答什么问题
这份文档聚焦:
为什么 Claude Code 能在 assistant 还在流式输出时,就提前开始执行一部分工具。
2. 核心定位
实现文件是:
src/services/tools/StreamingToolExecutor.ts
它不是普通的工具调度器,而是:
专门服务于流式场景的工具执行器。
它解决的是这样一个问题:
- assistant 正在边生成边吐
tool_use - 如果等整条 assistant 消息完全结束再执行工具,会浪费时间
所以它允许:
一边接收流式 tool_use,一边开始执行工具。
3. 基本心智模型
StreamingToolExecutor 内部维护了一组 TrackedTool。
每个工具调用都会有:
queuedexecutingcompletedyielded
这说明它本质上是一个:
带状态的执行队列。
4. addTool 不是“立刻执行”,而是“进队列”
当 query.ts 在流式输出里识别到某个 tool_use 后,会调用:
streamingToolExecutor.addTool(block, assistantMessage)
这一步会:
- 找工具定义
- 计算该工具是否并发安全
- 把它封装成
TrackedTool - 塞进内部队列
- 然后触发
processQueue()
所以 addTool() 的本质是:
注册一个新的流式工具任务,并尝试看看它现在能不能开始跑。
5. processQueue 在干什么
processQueue() 的逻辑和 toolOrchestration.ts 很像,但它是“流式增量版”。
它会遍历内部队列:
- 如果某个
queued工具当前满足执行条件,就启动它 - 如果遇到不允许并发的工具,就停止往后推进
这里的重点是:
- 队列是随流式输出不断增长的
- 执行判定也会被反复重新检查
所以它不是一次性批处理,而是:
随着模型持续吐工具调用,动态推进执行窗口。
6. 并发规则
核心判断函数是:
canExecuteTool(isConcurrencySafe)
规则是:
- 如果当前没有任何工具在执行,可以启动
- 如果当前正在执行的工具全部都也是并发安全的,而且新工具也并发安全,可以一起跑
- 否则就必须等待
所以它同样遵守:
不安全工具独占,安全工具可并发。
7. 为什么它能“边流边执行”
因为在 query.ts 的 streaming 循环里:
- assistant 消息一到
- 其中的
tool_use立刻被识别出来 - 立刻
addTool(...) - 执行器开始排队和执行
- 后续再通过
getCompletedResults()或getRemainingResults()把结果吐回去
这套设计的关键不是“工具更快”,而是:
把工具执行时间藏进了模型继续流式输出的时间里。
8. 中断和失败处理是这个类的重点
StreamingToolExecutor 最复杂的地方,不是排队,而是失败和中断。
它要处理三种特殊情况:
8.1 streaming fallback
如果模型流式过程中触发 fallback:
- 旧的执行结果必须全部丢弃
- 否则就会把旧
tool_use_id对应的结果漏到新的 fallback 流里
为此它有:
discard()
一旦进入 discard 状态,后续工具会生成 synthetic error,而不是继续当正常结果回传。
8.2 sibling error
如果并发执行的一组工具里,有一个关键工具错误了,其他兄弟工具可能要立即停止。
这里用了:
siblingAbortController
它是 toolUseContext.abortController 的子控制器。
作用是:
- 可以只中止并行工具组
- 不直接把整个 query turn 立刻打死
这是一个很细的运行时控制点。
8.3 user interrupt
如果用户在工具执行期间发起中断:
- 某些工具应该取消
- 某些工具则不该取消
这取决于工具自己的:
interruptBehavior()
所以流式工具执行器并不是“一中断就全杀”,而是:
按工具定义决定哪些可以 cancel,哪些必须 block。
9. synthetic error message 的意义
当工具因为:
- sibling error
- user interrupt
- streaming fallback
等原因被取消时,执行器不会简单静默结束,而是会主动生成:
- synthetic tool_result error message
这样做是为了维持 transcript 合法性:
- 前面已经有
tool_use - 后面必须有对应的
tool_result
否则消息链就会断。
10. 它和普通 runTools 的区别
可以这样区分:
10.1 runTools(...)
- assistant 本轮输出结束后再统一执行
- 更像批处理
10.2 StreamingToolExecutor
- assistant 还在流式输出时就开始执行
- 更像实时执行队列
两者最终都会给 query.ts 产出统一的结果流,但执行时机不同。
11. 一句话总结
StreamingToolExecutor 的核心价值不是“换一种工具执行器”,而是:
把工具执行前移到 assistant 流式输出过程中,让工具延迟与模型输出延迟重叠,同时通过严格的中断、并发和 synthetic error 机制保证 transcript 不失真。