当前位置: 首页 > news >正文

从零构建轻量级 DAG 编排引擎:处理大模型复杂工作流的实战

从零构建轻量级 DAG 编排引擎:处理大模型复杂工作流的实战

一、为什么简单的链式调用不够用

在真实业务里,单靠一个 Prompt 很难处理复杂的流程。开发者通常会把多个 LLM 调用、API 请求和数据清洗步骤串在一起。但一旦逻辑变复杂,这种线性调用就会出问题。

代码里开始出现大量的if-else嵌套,处理异步等待和节点依赖变得非常麻烦。这时候,有向无环图(DAG)是一个更稳妥的选择。它不仅能理清任务顺序,还能让没有依赖关系的节点并行执行,减少等待时间。


二、DAG 调度逻辑与拓扑排序

工作流里的每个步骤就是一个“节点”,节点间的依赖关系就是“边”。要执行这个图,核心是先做环路检测,再通过拓扑排序确定执行顺序。

下面的图展示了一个简单的流程:先清洗输入,然后并行做情感分类和关键词提取,最后汇总生成报告。

graph LR Start([启动]) --> NodeA[清洗输入] NodeA --> NodeB[情感分类] NodeA --> NodeC[关键词提取] NodeB --> NodeD[生成报告] NodeC --> NodeD NodeD --> End([结束]) style NodeB fill:#bbf,stroke:#333,stroke-width:2px style NodeD fill:#bfb,stroke:#333,stroke-width:2px

节点 B 和 C 都依赖 A,但它们之间没关系,所以引擎会让它们同时跑。


三、Node.js 轻量级实现

这是一个基于 JavaScript 的简单 DAG 引擎。它实现了拓扑排序来检查依赖,并支持异步并发执行。

class WorkflowNode { constructor(id, taskFunction) { this.id = id; this.taskFunction = taskFunction; this.dependencies = []; this.status = 'PENDING'; this.result = null; } addDependency(nodeId) { this.dependencies.push(nodeId); } } class DagEngine { constructor() { this.nodes = new Map(); } registerNode(node) { this.nodes.set(node.id, node); } // 拓扑排序:检查环路并决定顺序 resolveExecutionOrder() { const inDegree = new Map(); const adjList = new Map(); const order = []; for (const [id, node] of this.nodes) { inDegree.set(id, 0); adjList.set(id, []); } for (const [id, node] of this.nodes) { for (const depId of node.dependencies) { if (!this.nodes.has(depId)) { throw new Error(`节点 ${id} 依赖的 ${depId} 未注册`); } adjList.get(depId).push(id); inDegree.set(id, inDegree.get(id) + 1); } } const queue = []; for (const [id, degree] of inDegree) { if (degree === 0) queue.push(id); } while (queue.length > 0) { const currId = queue.shift(); order.push(currId); for (const nextId of adjList.get(currId)) { inDegree.set(nextId, inDegree.get(nextId) - 1); if (inDegree.get(nextId) === 0) { queue.push(nextId); } } } if (order.length !== this.nodes.size) { throw new Error("检测到循环依赖,无法执行"); } return order; } // 并发执行 async executeWorkflow(inputContext) { const completedResults = { ...inputContext }; const runningPromises = new Map(); while (true) { let hasPending = false; let progressed = false; for (const [id, node] of this.nodes) { if (node.status === 'COMPLETED' || node.status === 'FAILED') continue; hasPending = true; if (node.status === 'RUNNING') continue; // 检查依赖是否都完成了 const allDepsMet = node.dependencies.every(depId => { const depNode = this.nodes.get(depId); return depNode && depNode.status === 'COMPLETED'; }); if (allDepsMet) { node.status = 'RUNNING'; progressed = true; const promise = (async () => { try { const depData = {}; node.dependencies.forEach(depId => { depData[depId] = this.nodes.get(depId).result; }); node.result = await node.taskFunction(completedResults, depData); node.status = 'COMPLETED'; } catch (error) { node.status = 'FAILED'; throw error; } })(); runningPromises.set(id, promise); } } if (!hasPending) break; if (!progressed && runningPromises.size === 0) { throw new Error("死锁:没有节点能继续执行"); } await Promise.race(runningPromises.values()); for (const [id, promise] of runningPromises) { const node = this.nodes.get(id); if (node.status === 'COMPLETED' || node.status === 'FAILED') { runningPromises.delete(id); } } } const finalOutput = {}; for (const [id, node] of this.nodes) { finalOutput[id] = node.result; } return finalOutput; } } // 测试运行 (async () => { const engine = new DagEngine(); const nodeA = new WorkflowNode('CleanInput', async (context) => { return context.rawText.trim().replace(/[<>]/g, ''); }); const nodeB = new WorkflowNode('LlmClassify', async (context, depData) => { const text = depData.CleanInput; await new Promise(resolve => setTimeout(resolve, 500)); // 模拟 API 延迟 return text.includes("好") ? "POSITIVE" : "NEGATIVE"; }); nodeB.addDependency('CleanInput'); const nodeC = new WorkflowNode('ExtractKeywords', async (context, depData) => { const text = depData.CleanInput; return text.split(' ').filter(word => word.length > 1); }); nodeC.addDependency('CleanInput'); const nodeD = new WorkflowNode('GenerateReport', async (context, depData) => { const sentiment = depData.LlmClassify; const keywords = depData.ExtractKeywords; return `情感: ${sentiment}, 关键词: [${keywords.join(', ')}]`; }); nodeD.addDependency('LlmClassify'); nodeD.addDependency('ExtractKeywords'); engine.registerNode(nodeA); engine.registerNode(nodeB); engine.registerNode(nodeC); engine.registerNode(nodeD); const order = engine.resolveExecutionOrder(); console.log("执行顺序:", order.join(' -> ')); const result = await engine.executeWorkflow({ rawText: " 这个产品设计得非常 好,解决了我的痛点。 " }); console.log("结果:", result); })();

四、生产环境需要考虑的几个问题

上面的代码适合本地或简单场景,如果要上生产环境,还得考虑下面几点:

1. 内存 vs 持久化
内存里的调度很快,但服务器一挂,中间结果就没了。如果工作流跑了几分钟才失败,重头再来很浪费。生产环境通常要用 Redis 或像 Temporal 这样的状态机来存状态,但这会增加网络延迟。

2. 重试策略与成本
大模型 API 经常超时或限流,加重试机制是必须的。但要注意,如果上游节点因为超时一直重试,可能会在短时间内消耗大量 Token。给每个节点设置重试上限和超时时间是必要的。

3. 静态图 vs 动态分支
DAG 在运行前就定好了结构,容易校验。但 LLM 的输出是动态的,有时候需要根据结果决定下一步走哪条路。如果要支持这种动态分支,图的拓扑结构得在运行时变,这会大大增加调试难度。


五、小结

做智能工作流,核心是把杂乱的调用拆成清晰的节点和依赖。用拓扑排序处理并发,不需要复杂的框架,也能让多个模型任务协同工作。对于小团队来说,这种轻量级的方案既能控制成本,也能保证流程跑得通。

http://www.gsyq.cn/news/1529795.html

相关文章:

  • 微博图片批量下载终极指南:免登录高效获取用户相册
  • CCF-GESP三级C++真题解析:进制判断这道题,用‘最大字符法’5分钟搞定
  • PXD10 PDI接口解析:嵌入式视频同步与BT.656标准应用实战
  • WaveTools鸣潮工具箱抽卡记录完整指南:从数据同步到故障排查的终极解决方案
  • 开源小说下载器:200+网站一键离线保存的智能解决方案
  • 知识图谱事件流的增量学习:边看边学不遗忘的实时进化方案
  • 告别枯燥:用橙心主题让Typora写作体验焕然一新
  • 2026江诗丹顿回收人气榜:合扬领跑全场,六大优质商户全方位对比 - 开心测评
  • Spring Boot项目里,MybatisPlus的saveBatch批量插入到底该怎么配才有效?(附完整yml示例)
  • 后端开发中的日志管理与监控实战
  • 黄金变现拒绝隐形消费!上海本地五家实体门店测评:收的顶报价透明无套路 - 奢侈品回收评测
  • eDMA错误处理机制解析:从DMAES寄存器到实战调试
  • PXD10微控制器Flash操作全解析:从物理原理到实战编程
  • 北京二手名表回收手续怕麻烦?一文讲清全流程,收的顶无套路 - 奢侈品回收测评
  • B2B采购信任战:从“听我说”到“给我看”
  • 闲置黄金如何高价变现 长沙正规回收门店全解析 - 润富黄金回收
  • 2026年河南AI搜索推广与GEO优化服务商深度横评:开封郑州本地获客完全指南 - 年度推荐企业名录
  • AntiDupl终极指南:5步快速清理重复图片的免费开源神器
  • MPC866 PowerPC指令集实战:从架构原理到嵌入式编程优化
  • RTS5411T-GR,4 端口 USB3.2 Gen1 HUB 芯片,兼容 BC1.2 充电规范与多级低功耗
  • 深度学习工业实战五大断层点:从梯度计算到硬件约束
  • Python学习第85天:回归模型
  • 2026深圳艺体传媒特色高中盘点:文化课薄弱生的本科突围路径 - 品研笔录
  • ALC269Q-VC3,HDA 音频编解码 + D 类 BTL 功放一体化解决方案
  • 两轮充电桩帮铺公司怎么选?主流品牌性价比对比参考 - 速递信息
  • AList项目易主后,我的私人云存储方案还安全吗?聊聊替代品与数据迁移
  • 2026年长沙零基础学美业、美业创业培训机构深度评测与官方对接指南 - 企业名录优选推荐
  • 2026实木地板品牌排行榜:林昌地板凭什么稳坐榜首?这份选购指南请收好 - 936品牌测评网
  • G-Helper架构解析:华硕笔记本轻量级控制工具的技术实现与性能优化深度评测
  • 2026 成都黄金回收综合榜单更新,收的顶实力稳居前列 - 奢侈品回收评测