FastGPT实习:工作流运行机制与调试模式
工作流基本数据结构
- 节点 Node: 工作流的基本执行单元,包含输入、输出、执行逻辑
- 边 Edge: 连接节点的有向边,三种状态:
active
: 激活状态,表示数据流经此边skipped
: 跳过状态,表示此边被跳过waiting
: 等待状态,表示等待上游节点执行完成
- 连接点 Handle: 节点上的输入输出连接点,一个节点可能有多个handle
- 活跃节点队列 activeRunQueue: 存储准备执行的节点ID集合
- 跳过节点队列 skipNodeQueue: 存储需要跳过的节点及其子节点映射
- 调试数据结构:
debugNextStepRunNodes
: 调试模式下存储下一步要执行的节点debugNodeResponses
: 调试模式下记录每个节点的执行结果
补充概念
入口节点 (Entry Node)
- 工作流的起始执行点
- 通过
isEntry
属性标识 - 调试模式下,每次步骤的入口节点由前端动态设置
交互节点 (Interactive Node) (代码逻辑上:判断节点时单独判断,处理交互节点)
- 需要用户输入的节点(如用户选择、表单输入等)
- 执行后会暂停工作流,等待用户响应
- 调试模式下特别处理,保持入口状态
并发控制
- 通过
maxConcurrency
限制同时执行的节点数量 - 防止资源过载,提高系统稳定性
运行机制
采用双队列机制,有效避免深度递归
节点状态判断逻辑
- 节点状态由所有输入边的状态决定:
- 所有输入边均为
active
→ 节点状态为run
(执行) - 所有输入边均为
skipped
→ 节点状态为skip
(跳过) - 存在
waiting
边 → 节点状态为wait
(等待) - 混合状态(部分
active
,部分skipped
)→ 节点状态为skip
(跳过)
- 所有输入边均为
执行流程
- 从入口节点开始运行工作流
- 当节点状态为
run
时,进入活跃队列,检查运行条件(并发数限制),满足则立即执行 - 当节点状态为
skip
时,进入跳过队列,只有活跃队列全部执行完,才会执行跳过队列的节点- 执行跳过队列指的是更新节点与边的状态,而不是执行节点的具体逻辑
- 因为只有更新全部的边状态(所有边不为
waiting
),才能判断所有的节点的状态
- 每执行一次跳过节点,就要重新检测活跃队列是否产生新的活跃节点,有则停止执行跳过队列,去执行活跃队列
调试模式
核心数据结构
debugNextStepRunNodes
: 存储下一步要执行的节点,不立即执行,等待用户手动触发debugNodeResponses
: 记录每个节点的执行结果,包括运行状态、响应内容、交互响应等
调试模式特点
阶段性返回:
- 有下一步节点:立即返回,等待用户点击下一步
- 无下一步节点但有跳过节点:处理跳过节点
- 完全结束:返回最终结果
延迟执行:
- 运行模式:下游节点加入
activeRunQueue
,并立即执行 - 调试模式:下游节点加入
debugNextStepRunNodes
,等待用户手动触发下一步
- 运行模式:下游节点加入
动态设置入口节点:
- 前端点击下一步时,发送
isEntry = true
,后端接收到标识,把下一个节点标记为entry
- 保证了运行模式和调试模式的代码复用
- 前端点击下一步时,发送
数据收集:
- 每一次运行完一个节点后,使用
getDebugResponse
,收集该节点执行后的相关信息返回给前端 - 更新边状态,便于用户点击下一步
- 每一次运行完一个节点后,使用
前端API接口:
- 提供
/api/core/workflow/debug
接口,调用调试模式工作流引擎(即上文) - 前端封装
postWorkflowDebug
函数调用此接口
- 提供
前端交互逻辑:
onStartNodeDebug
: 启动调试,构建初始数据onNextNodeDebug
: 执行单步调试,不自动继续- 用户通过”下一步”按钮手动控制每一步的执行
前端实现(todo)
状态管理
workflowDebugData
: 存储当前调试状态- 节点状态更新:
debugResult
属性记录执行结果
用户交互
- “开始调试”按钮:调用
onStartNodeDebug
- “下一步”按钮:调用
onNextNodeDebug
- “停止调试”按钮:调用
onStopNodeDebug
页面渲染
- 实时显示节点执行状态
- 高亮显示当前执行的节点
- 显示节点执行结果和响应内容
- 支持查看变量值和边状态
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 Fish's Blog!