当前位置: 首页 > news >正文

从提示词到智能体:Prompt Engineering 驱动 Agent 工作流的工程化构建

从提示词到智能体:Prompt Engineering 驱动 Agent 工作流的工程化构建

一、Prompt 的天花板:为什么单次提示无法解决复杂任务

大模型的能力边界更多取决于 Prompt 设计,而非模型本身。一个精心设计的单次提示能让 GPT-4 写出好文章,但无法完成多步骤工作流——比如"调研竞品、分析差异、生成报告、发送邮件"。

问题在于单次 Prompt 缺乏状态。模型生成回答时无法回溯中间结果,不能根据上一步输出调整策略,遇到错误时也不会自动重试。对于需要多步推理、工具调用和条件分支的任务,单次 Prompt 的能力明显受限。

Agent 工作流把复杂任务拆成子步骤,每个步骤由独立 Prompt 驱动,通过状态传递和条件判断连接。这就像把"全能员工"变成"协作团队"——每人专注一件事,通过流程编排达成目标。

但工程化 Agent 工作流并不简单。Prompt 依赖如何管理?工具结果怎么可靠注入下一步?条件分支如何避免死循环?这些都需要系统方案。

二、Agent 工作流架构:从 Prompt 编排到执行引擎的全链路设计

生产级 Agent 工作流系统包含四个核心层:任务规划层、Prompt 编排层、工具执行层和状态管理层。

flowchart TB A[用户目标输入] --> B[任务规划器] B --> C[子任务拆解] C --> D[执行计划 DAG] D --> E[步骤1: Prompt 模板渲染] E --> F[步骤1: 大模型推理] F --> G[步骤1: 输出解析] G --> H{步骤1: 条件判断} H -->|需要工具调用| I[工具执行器] I --> J[工具结果注入] J --> K[步骤2: Prompt 模板渲染] H -->|直接通过| K K --> L[步骤2: 大模型推理] L --> M[步骤2: 输出解析] M --> N{步骤2: 条件判断} N -->|需要重试| F N -->|任务完成| O[结果聚合] O --> P[最终输出] subgraph 状态管理层 Q[全局上下文存储] R[步骤执行日志] S[工具调用记录] end E -.-> Q F -.-> R I -.-> S J -.-> Q subgraph Prompt 编排层 E K end subgraph 工具执行层 I J end

任务规划器是 Agent 的"大脑"。它接收用户目标,拆解为有序子任务列表,生成执行计划 DAG(有向无环图)。DAG 每个节点是子任务,边表示依赖关系。规划器本身也是 Prompt 驱动模块——系统提示要求模型输出结构化 JSON 计划,而非自由文本。

Prompt 编排层负责将每个子任务渲染为具体 Prompt。关键设计是"模板 + 上下文注入":每个子任务有预定义 Prompt 模板,模板变量在运行时从全局上下文动态填充。这样步骤 2 的 Prompt 能引用步骤 1 的输出,实现跨步骤信息传递。

工具执行层处理模型生成的工具调用请求。当模型判断需要调用外部工具(如搜索引擎、数据库查询、API 调用)时,输出结构化工具调用指令,工具执行器解析指令、执行调用、将结果注入回全局上下文。这个循环(模型推理 → 工具调用 → 结果注入 → 再次推理)是 Agent 的核心执行模式。

状态管理层维护全局上下文、执行日志和工具调用记录。它是所有步骤共享的"工作记忆",确保信息在步骤间可靠传递。同时,执行日志支持断点续跑——某个步骤失败时,可从上一步成功状态恢复,而非从头开始。

三、生产级 Agent 工作流引擎核心代码实现

以下代码基于 Python 构建,展示从任务规划到执行引擎的完整链路:

import json from enum import Enum from dataclasses import dataclass, field from typing import Any, Optional, Callable # ---- 数据结构定义 ---- class StepStatus(Enum): PENDING = "pending" RUNNING = "running" SUCCESS = "success" FAILED = "failed" SKIPPED = "skipped" class ToolCallStatus(Enum): PENDING = "pending" EXECUTED = "executed" FAILED = "failed" @dataclass class ToolCall: """工具调用记录""" tool_name: str arguments: dict result: Any = None status: ToolCallStatus = ToolCallStatus.PENDING @dataclass class StepResult: """单个步骤的执行结果""" step_id: str output: str tool_calls: list[ToolCall] = field(default_factory=list) status: StepStatus = StepStatus.SUCCESS metadata: dict = field(default_factory=dict) @dataclass class ExecutionContext: """全局执行上下文:所有步骤共享的工作记忆""" variables: dict = field(default_factory=dict) # 模板变量 step_results: dict[str, StepResult] = field(default_factory=dict) # 步骤结果 execution_log: list[dict] = field(default_factory=list) # 执行日志 def set_variable(self, key: str, value: Any): """设置模板变量,供后续步骤的 Prompt 引用""" self.variables[key] = value def get_variable(self, key: str, default: Any = None) -> Any: return self.variables.get(key, default) def log(self, step_id: str, event: str, detail: str = ""): """记录执行日志,支持断点续跑和调试""" self.execution_log.append({ "step_id": step_id, "event": event, "detail": detail, }) # ---- Prompt 模板引擎 ---- class PromptTemplate: """Prompt 模板:支持变量注入和上下文引用""" def __init__(self, system: str, user_template: str): self.system = system self.user_template = user_template def render(self, context: ExecutionContext) -> tuple[str, str]: """将模板中的变量占位符替换为上下文中的实际值""" rendered_user = self.user_template for key, value in context.variables.items(): placeholder = f"{{{{{key}}}}}" rendered_user = rendered_user.replace(placeholder, str(value)) return self.system, rendered_user # ---- 任务规划器 ---- class TaskPlanner: """将用户目标拆解为结构化的执行计划""" PLANNER_SYSTEM_PROMPT = ( "你是一个任务规划器。将用户的目标拆解为有序的子任务列表。" "每个子任务包含:step_id、description、depends_on(依赖的步骤ID列表)、" "tool_required(是否需要工具调用)。" "输出格式为 JSON 数组,不要输出其他内容。" ) def plan(self, goal: str) -> list[dict]: """生成执行计划(生产环境调用大模型)""" # 简化实现:返回预定义的计划结构 # 实际场景中,调用大模型生成动态计划 user_prompt = f"目标:{goal}\n\n请生成执行计划。" plan_json = call_llm_for_planning( self.PLANNER_SYSTEM_PROMPT, user_prompt ) return plan_json # ---- 工具注册表 ---- class ToolRegistry: """工具注册与执行:Agent 调用外部能力的统一入口""" def __init__(self): self._tools: dict[str, Callable] = {} def register(self, name: str, func: Callable, description: str = ""): """注册工具函数""" self._tools[name] = { "func": func, "description": description, } def execute(self, name: str, arguments: dict) -> Any: """执行工具调用,包含异常处理""" if name not in self._tools: raise ValueError(f"未注册的工具:{name}") tool = self._tools[name] try: result = tool["func"](**arguments) return result except Exception as e: # 工具调用失败时返回错误信息,而非抛出异常 # 让 Agent 有机会根据错误信息调整策略 return f"工具调用失败:{name},错误:{str(e)}" def get_tool_descriptions(self) -> str: """生成工具描述文本,供 Prompt 注入""" descriptions = [] for name, tool in self._tools.items(): descriptions.append(f"- {name}: {tool['description']}") return "\n".join(descriptions) # ---- Agent 执行引擎 ---- class AgentExecutor: """Agent 工作流执行引擎:编排步骤、管理状态、处理工具调用""" def __init__( self, tool_registry: ToolRegistry, max_retries: int = 2, ): self.tool_registry = tool_registry self.max_retries = max_retries def execute_plan( self, plan: list[dict], step_templates: dict[str, PromptTemplate], context: ExecutionContext, ) -> dict: """执行完整的 Agent 工作流计划""" completed_steps: set[str] = set() # 按依赖关系排序执行(拓扑排序的简化实现) remaining = list(plan) while remaining: # 找出所有依赖已满足的步骤 ready_steps = [ s for s in remaining if all( dep in completed_steps for dep in s.get("depends_on", []) ) ] if not ready_steps: # 死锁检测:存在无法满足依赖的步骤 context.log("system", "deadlock", "存在无法满足的依赖关系") break for step in ready_steps: step_id = step["step_id"] result = self._execute_step( step_id, step, step_templates, context ) context.step_results[step_id] = result if result.status == StepStatus.SUCCESS: completed_steps.add(step_id) # 将步骤输出注入到上下文变量中 context.set_variable( f"step_{step_id}_output", result.output ) elif result.status == StepStatus.FAILED: # 步骤失败,跳过依赖此步骤的后续步骤 context.log(step_id, "step_failed", result.output) remaining.remove(step) # 聚合所有成功步骤的输出 final_output = self._aggregate_results(context) return final_output def _execute_step( self, step_id: str, step: dict, templates: dict[str, PromptTemplate], context: ExecutionContext, ) -> StepResult: """执行单个步骤,支持重试和工具调用循环""" context.log(step_id, "step_started", step["description"]) template = templates.get(step_id) if not template: return StepResult( step_id=step_id, output="", status=StepStatus.FAILED, metadata={"error": f"未找到步骤模板:{step_id}"}, ) retries = 0 while retries <= self.max_retries: try: # 渲染 Prompt system, user = template.render(context) # 注入工具描述(如果步骤需要工具) if step.get("tool_required"): tool_desc = self.tool_registry.get_tool_descriptions() user = f"可用工具:\n{tool_desc}\n\n{user}" # 调用大模型 model_output = call_llm_for_step(system, user) # 解析工具调用(如果模型请求调用工具) tool_calls = self._parse_tool_calls(model_output) if tool_calls: for tc in tool_calls: tc.result = self.tool_registry.execute( tc.tool_name, tc.arguments ) tc.status = ( ToolCallStatus.EXECUTED if not isinstance(tc.result, str) or "失败" not in tc.result else ToolCallStatus.FAILED ) # 将工具结果注入上下文 context.set_variable( f"tool_{tc.tool_name}_result", tc.result ) # 工具调用后,再次调用模型生成最终输出 tool_results_str = json.dumps( [{"tool": tc.tool_name, "result": tc.result} for tc in tool_calls], ensure_ascii=False, ) context.set_variable("tool_results", tool_results_str) system2, user2 = template.render(context) model_output = call_llm_for_step(system2, user2) context.log(step_id, "step_completed", model_output[:200]) return StepResult( step_id=step_id, output=model_output, tool_calls=tool_calls, status=StepStatus.SUCCESS, ) except Exception as e: retries += 1 context.log( step_id, "step_retry", f"第 {retries} 次重试,错误:{str(e)}" ) if retries > self.max_retries: return StepResult( step_id=step_id, output=f"步骤执行失败:{str(e)}", status=StepStatus.FAILED, ) return StepResult( step_id=step_id, output="", status=StepStatus.FAILED ) def _parse_tool_calls(self, model_output: str) -> list[ToolCall]: """解析模型输出中的工具调用指令""" # 生产环境使用结构化输出(如 function calling) # 此处为简化实现,解析 JSON 格式的工具调用 tool_calls = [] try: # 尝试从输出中提取 JSON 工具调用 if "```tool_call" in model_output: parts = model_output.split("```tool_call")[1:] for part in parts: json_str = part.split("```")[0].strip() call_data = json.loads(json_str) tool_calls.append(ToolCall( tool_name=call_data["tool"], arguments=call_data.get("arguments", {}), )) except (json.JSONDecodeError, KeyError, IndexError): pass return tool_calls def _aggregate_results(self, context: ExecutionContext) -> dict: """聚合所有步骤的执行结果""" successful = { sid: r.output for sid, r in context.step_results.items() if r.status == StepStatus.SUCCESS } failed = { sid: r.output for sid, r in context.step_results.items() if r.status == StepStatus.FAILED } return { "results": successful, "failed_steps": failed, "total_steps": len(context.step_results), "success_rate": ( len(successful) / len(context.step_results) if context.step_results else 0 ), "execution_log": context.execution_log, } # ---- 辅助函数 ---- def call_llm_for_planning(system: str, user: str) -> list[dict]: """调用大模型生成执行计划——生产环境对接实际 API""" # 简化实现 return [ {"step_id": "research", "description": "调研相关信息", "depends_on": [], "tool_required": True}, {"step_id": "analyze", "description": "分析调研结果", "depends_on": ["research"], "tool_required": False}, {"step_id": "generate", "description": "生成最终输出", "depends_on": ["analyze"], "tool_required": False}, ] def call_llm_for_step(system: str, user: str) -> str: """调用大模型执行单步推理——生产环境对接实际 API""" return f"[步骤输出] System长度: {len(system)}, User长度: {len(user)}"

这段代码的设计核心是"模板驱动 + 上下文传递 + 工具循环"。PromptTemplate将每个步骤的 Prompt 参数化,运行时从ExecutionContext中动态填充变量,实现步骤间的信息传递。ToolRegistry提供统一的工具注册和执行入口,工具调用失败时返回错误信息而非抛出异常,让 Agent 有机会根据错误调整策略。AgentExecutor_execute_step方法实现了"模型推理 → 工具调用 → 结果注入 → 再次推理"的核心循环,并支持最大重试次数限制。

四、Agent 工作流的工程代价与适用边界

Agent 工作流在解决复杂任务的同时,引入了显著的工程复杂度和运行时开销。

执行延迟随步骤增加而累积。每个步骤都涉及一次大模型推理,加上可能的工具调用和网络往返。一个 5 步骤的 Agent 工作流,端到端延迟可能达到 15 ~ 30 秒。对于实时交互场景,这个延迟不可接受。缓解方案:对独立步骤做并行执行,减少串行等待时间;对高频工作流建立缓存,命中时跳过规划直接执行。

Prompt 级联错误的放大效应。步骤 1 的输出是步骤 2 的输入。如果步骤 1 的模型输出偏离预期(格式错误、信息遗漏),错误会级联放大到后续所有步骤。解决方案:在每个步骤的输出解析环节加入格式校验和重试机制;对关键步骤设置输出 schema 约束,使用结构化输出(如 JSON mode)降低格式错误率。

规划器的可靠性瓶颈。任务规划器本身是大模型驱动的,它的输出质量直接影响整个工作流的成败。当用户目标模糊或涉及领域知识时,规划器可能生成不合理的计划。工程上的折中:对规划器的输出做 schema 校验和合理性检查,不合理的计划要求重新生成;同时支持人工干预,允许用户修改自动生成的计划。

状态管理的内存压力。全局上下文随着步骤执行不断膨胀。如果工作流包含 10 个步骤,每个步骤的输出 500 tokens,加上工具调用结果,上下文可能累积到 5000+ tokens。当工作流更长时,上下文窗口可能溢出。解决方案:对历史步骤的输出做摘要压缩,只保留关键信息;设置上下文窗口的上限,超限时自动淘汰最早的步骤结果。

适用边界。Agent 工作流适用于多步骤、需要工具调用、有条件分支的复杂任务:数据分析管线、内容生成流水线、自动化运维流程。不适用于简单的单轮问答、对延迟极度敏感的实时场景、或步骤间无依赖关系的并行任务(后者应直接使用并行调用而非串行 Agent)。

五、总结

Prompt Engineering 驱动的 Agent 工作流,本质上是将大模型的"单次推理能力"扩展为"多步协作能力"。任务规划器负责拆解,Prompt 编排层负责渲染,工具执行层负责外部交互,状态管理层负责信息传递——四层协同,才能让 Agent 从"回答问题"进化为"完成任务"。

落地路线建议:第一步,定义工作流的步骤模板体系,每个步骤的 Prompt 模板明确输入变量和输出格式;第二步,构建工具注册表,统一管理外部工具的注册、调用和异常处理;第三步,实现 Agent 执行引擎,支持步骤的依赖排序、重试机制和工具调用循环;第四步,搭建全局上下文管理,确保步骤间的信息可靠传递,并支持上下文压缩和断点续跑;第五步,建立执行监控和日志系统,追踪每个步骤的延迟、成功率和工具调用情况,持续优化 Prompt 模板和工作流编排。

从一条 Prompt 到一个智能体,跨越的不是技术的鸿沟,而是工程体系的构建。每一层抽象都有代价,每一次编排都有风险,但正是这些严谨的工程实践,让 Agent 从概念走向了生产。

http://www.gsyq.cn/news/1614813.html

相关文章:

  • G5080,G6080,G7080,TS3440,MG3680,MG3660,MG6380,MG8180,报错5b00,5b02,5b04,1700,1702,p07,e08如何维修?只需2分钟修好了
  • Si5351A时钟发生器原理与应用指南
  • AI驱动未来|泛联新安携Omni可信智能开发体系亮相2026南京软件大会
  • 解锁QQ音乐数据宝库:Python解析工具让你的音乐收藏更自由
  • GPT-5真有“思维链跃迁”?DeepSeek V3的MoE稀疏激活机制拆解:附可复现的token级注意力热力图对比
  • 告警疲劳与信号丢失:云原生智能告警体系的构建之道
  • 基于51/STM32单片机智能婴儿监护系统 多功能婴儿床婴儿摇篮系统1(设计源文件+万字报告+讲解)(支持资料、图片参考_降重降ai)
  • 存在的内部结构空间区域
  • Markn:智能实时预览技术如何革命性提升Markdown文档编写效率
  • Metasploit渗透测试框架:从核心概念到实战演练的完整指南
  • WaveTools鸣潮工具箱:一键解锁游戏性能与数据管理的终极解决方案
  • WechatBakTool:微信聊天记录备份与恢复的终极指南
  • 解锁鸣潮游戏新体验:3分钟掌握WaveTools画质优化与抽卡管理
  • Mythos漏洞挖掘模型:可规模化自主发现RCE的AI安全新范式
  • MC74HC165A与PIC18F25J50实现高效数字输入扩展
  • NLP 算法落地实践:从 Tokenization 到语义理解的工程链路
  • 基于STM32与Si4731的可编程数字收音机开发实战
  • STM32与AD74413R的高精度信号采集与输出方案
  • 参考文献格式乱如麻?高校教授说用这几个AI论文写作软件
  • Retrofit:Square 出品的 HTTP 客户端,43k+ Star
  • 智能工具如何让你轻松获取Steam创意工坊模组:从困境到高效下载的转变
  • Android Studio中文界面五分钟速成指南:告别英文困扰,拥抱母语开发
  • Performance-Fish:让你的《环世界》从卡顿到流畅的终极优化方案
  • 别再试错了!2026年最稳、最快、最私密的AI工作流(已通过SOC2 Type II+GDPR双审计)
  • 终极免费SQLite数据库管理工具:DB Browser for SQLite完全指南
  • ChatGPT编程辅助正在淘汰“只会Ctrl+C/V”的开发者(内部培训PPT首度流出,仅限本周开放下载)
  • 终极指南:如何在Mac M芯片上完美运行Attu向量数据库管理工具
  • XiaoMusic技术解析:基于FastAPI的智能音箱音乐播放解决方案
  • 腾讯位置服务开发者征文大赛优秀作品回顾,官网投稿通道同步开启!
  • Codex 正在悄悄写穿你的 SSD:完整排查与修复指南