当前位置: 首页 > news >正文

工业级大模型学习之路027:LangGraph 高级特性与单 Agent 优化

一、基础 Agent 的局限性与高级特性价值1.1 基础 ReAct Agent 的三大致命缺陷之前基础的 ReAct Agent但它距离生产环境使用还有很大差距存在以下无法忽视的问题无法人工干预一旦启动就会自动执行到底无法在关键步骤暂停等待人类确认或修正代码复用性差所有逻辑都在一个文件中复杂 Agent 会变得难以维护和扩展执行效率低只能串行调用工具无法同时执行多个独立的工具调用无法回溯历史只能看到最终结果无法查看和回滚到之前的执行状态错误处理能力弱工具调用失败后没有自动重试和降级机制缺乏安全控制无法限制 Agent 的操作范围存在安全风险1.2 LangGraph 高级特性全景LangGraph 提供了一整套高级特性完美解决了基础 Agent 的所有问题高级特性解决的问题核心价值Checkpoint 高级操作无法回溯历史、无法恢复状态状态持久化、历史回溯、断点续跑、多会话管理人类介入Human-in-the-loop无法人工干预关键步骤审核、错误修正、信息补充、安全控制子图Subgraph代码复用性差、难以维护模块化设计、代码复用、分而治之、团队协作并行工具调用执行效率低同时执行多个独立工具大幅缩短任务完成时间状态回溯与回滚无法撤销错误操作回滚到任意历史状态、修正错误、重新执行高级错误处理错误处理能力弱自动重试、降级处理、异常捕获、优雅失败1.3 工业级 Agent 的技术要求一个真正能在生产环境使用的 Agent 必须满足以下要求可观测性能追踪每一步执行过程查看所有输入输出可控制性能在任意步骤暂停、继续、终止执行可维护性代码模块化、结构清晰、易于修改和扩展可靠性完善的错误处理和重试机制99.9% 以上的可用性安全性严格的权限控制和操作限制防止滥用和安全事故交互性支持与人类的自然交互能在需要时请求人类帮助1.4 今日学习路线图Checkpoint高级操作 → 人类介入机制 → 子图模块化 → 并行工具调用 → 状态回溯与回滚 → 错误处理与重试 → 性能优化 → 项目整合二、LangGraph 高级 API 详解2.1 Checkpoint 深度解析工业级状态管理的基石2.1.1 Checkpoint 底层原理Checkpoint 是 LangGraph 最核心的创新之一它的本质是智能体执行状态的快照。每次节点执行完成后LangGraph 会自动将当前状态保存到 Checkpoint 存储中。Checkpoint 核心组成Channel Values状态的当前值如 messages、iteration_countVersions每个状态字段的版本号用于冲突检测Checkpoint ID每个快照的唯一标识符Thread ID会话的唯一标识符用于隔离不同用户的会话增量 Checkpoint 机制LangGraph 1.2 采用增量 Checkpoint只保存变化的状态字段而不是完整状态大幅减少了存储占用和 I/O 开销。2.1.2 生产级 Checkpoint 存储后端对比SQLite 只适合开发和测试环境生产环境必须使用支持高并发和持久化的存储后端存储后端适用场景优点缺点SQLite开发测试、单用户无需部署、零配置不支持高并发、不支持分布式PostgreSQL生产环境、企业级支持高并发、ACID 事务、分布式需要部署和维护Redis高并发、低延迟速度极快、支持集群数据持久化能力弱DynamoDB云原生、无服务器完全托管、无限扩展成本较高、云厂商锁定生产级推荐PostgreSQL 15支持 JSONB 类型性能和功能都能满足绝大多数企业需求。2.1.3 实战PostgreSQL Checkpoint 配置与高级操作第一步安装依赖pip install langgraph-checkpoint-postgres psycopg2-binary第二步配置 PostgreSQL Checkpoint# core/checkpoint.py from langgraph.checkpoint.postgres import PostgresSaver from config.settings import settings def get_postgres_checkpointer(): 获取PostgreSQL Checkpoint存储生产级 connection_string fpostgresql://{settings.db_user}:{settings.db_password}{settings.db_host}:{settings.db_port}/{settings.db_name} checkpointer PostgresSaver.from_conn_string(connection_string) # 初始化数据库表只需要运行一次 checkpointer.setup() return checkpointer # 全局单例 postgres_checkpointer get_postgres_checkpointer()第三步Checkpoint 高级操作实战from core.checkpoint import postgres_checkpointer from langchain_core.messages import HumanMessage # 1. 列出所有活跃会话 def list_all_sessions(): 列出所有用户会话 sessions [] for thread in postgres_checkpointer.list_threads(): sessions.append({ thread_id: thread[thread_id], user_id: thread[metadata].get(user_id, unknown), created_at: thread[created_at], updated_at: thread[updated_at], last_message: thread[channel_values][messages][-1].content[:100] if thread[channel_values][messages] else }) return sessions # 2. 获取指定会话的所有Checkpoint历史 def get_session_checkpoints(thread_id: str): 获取会话的所有状态快照 checkpoints [] for checkpoint in postgres_checkpointer.list(thread_id): checkpoints.append({ checkpoint_id: checkpoint[id], created_at: checkpoint[created_at], step: checkpoint[step], messages_count: len(checkpoint[channel_values][messages]), iteration_count: checkpoint[channel_values][iteration_count] }) return checkpoints # 3. 断点续跑实战程序崩溃后恢复执行 def resume_session(thread_id: str): 从上次中断的地方恢复会话执行 config {configurable: {thread_id: thread_id}} # 获取当前状态 state react_agent.get_state(config) if state.next: # 有未完成的节点继续执行 print(f恢复会话{thread_id}继续执行节点{state.next}) result react_agent.invoke(None, configconfig) return result[messages][-1].content else: # 会话已完成 return 会话已完成无需恢复 # 4. 会话归档与清理 def archive_old_sessions(days: int 30): 归档30天未活动的会话 import datetime cutoff datetime.datetime.now() - datetime.timedelta(daysdays) archived_count 0 for thread in postgres_checkpointer.list_threads(): if thread[updated_at] cutoff: postgres_checkpointer.delete_thread(thread[thread_id]) archived_count 1 print(f已归档{archived_count}个过期会话) return archived_count2.1.4 Checkpoint 最佳实践与避坑指南永远不要手动修改 Checkpoint 数据库直接修改数据库可能导致状态不一致应该使用 LangGraph 提供的 API设置合理的会话过期时间定期清理过期会话避免数据库无限增长不要在状态中存储大量数据状态应该只存储必要的元数据和上下文大量数据应该存储在外部数据库使用事务保证原子性PostgreSQL Checkpoint 支持事务确保状态更新的原子性监控 Checkpoint 性能监控数据库的读写延迟和连接数及时扩容2.2 人类介入Human-in-the-loop工业级安全的核心2.2.1 人类介入底层原理人类介入的本质是在图的执行流程中插入中断点当执行到中断点时LangGraph 会暂停执行并保存当前状态等待人类输入或确认后再继续执行。人类介入的三种核心模式审核模式Agent 执行高风险操作前等待人类批准信息补充模式Agent 缺少必要信息时向人类询问修正模式人类发现 Agent 的错误后修正状态并重新执行2.2.2 工业级人类介入完整流程一个完整的工业级人类介入流程包括以下步骤Agent执行到中断点 → 保存状态 → 发送通知给人类 → 人类审核/补充/修正 → 更新状态 → 继续执行 → 记录审核日志2.2.3 实战三级审核流程实现我们将实现一个支持工具调用审核→报告生成审核→最终发布审核的三级审核流程这是企业中最常见的审批模式。第一步定义审核状态from enum import Enum class ApprovalStatus(str, Enum): PENDING pending APPROVED approved REJECTED rejected class AgentState(BaseModel): 增强版Agent状态支持审核流程 messages: Annotated[List[BaseMessage], operator.add] Field(default_factorylist) iteration_count: int Field(default0) max_iterations: int Field(default5) user_id: str Field(defaultdefault_user) # 审核相关字段 approval_status: ApprovalStatus Field(defaultApprovalStatus.PENDING) approval_level: int Field(default0) # 0: 无需审核, 1: 一级审核, 2: 二级审核, 3: 三级审核 approver: str Field(default) approval_comment: str Field(default)第二步定义多级中断点def build_approval_agent() - StateGraph: 构建支持三级审核的Agent builder StateGraph(AgentState) # 添加节点 builder.add_node(agent, agent_think) builder.add_node(tools, ToolNode(PRODUCTION_TOOLS)) builder.add_node(generate_report, generate_report_node) builder.add_node(publish_report, publish_report_node) # 添加边 builder.add_edge(tools, agent) builder.add_edge(generate_report, agent) builder.add_edge(publish_report, END) # 路由函数 def router(state: AgentState) - str: last_message state.messages[-1] if last_message.tool_calls: # 工具调用需要一级审核 return tools elif 生成报告 in last_message.content: # 报告生成需要二级审核 return generate_report elif 发布报告 in last_message.content: # 报告发布需要三级审核 return publish_report else: return END builder.add_conditional_edges(agent, router) builder.set_entry_point(agent) # 配置多级中断点 graph builder.compile( checkpointerpostgres_checkpointer, interrupt_before[tools, generate_report, publish_report] ) logger.info(✅ 三级审核Agent构建完成) return graph # 全局单例 approval_agent build_approval_agent()第三步审核流程 API 实现# core/approval_service.py from core.react_agent import approval_agent, ApprovalStatus from langchain_core.messages import HumanMessage from utils.logger import logger class ApprovalService: 审核服务 staticmethod def submit_task(question: str, user_id: str) - dict: 提交任务进入审核流程 config {configurable: {thread_id: ftask_{user_id}_{int(time.time())}}} result approval_agent.invoke( {messages: [HumanMessage(contentquestion)], user_id: user_id}, configconfig ) state approval_agent.get_state(config) if state.next: return { task_id: config[configurable][thread_id], status: pending_approval, next_step: state.next[0], message: f任务已提交等待{state.next[0]}审核 } else: return { task_id: config[configurable][thread_id], status: completed, answer: result[messages][-1].content } staticmethod def approve_task(task_id: str, approver: str, comment: str ) - dict: 批准任务继续执行 config {configurable: {thread_id: task_id}} # 更新审核状态 approval_agent.update_state( config, { approval_status: ApprovalStatus.APPROVED, approver: approver, approval_comment: comment } ) # 继续执行 result approval_agent.invoke(None, configconfig) state approval_agent.get_state(config) if state.next: return { task_id: task_id, status: pending_approval, next_step: state.next[0], message: f一级审核通过等待{state.next[0]}审核 } else: return { task_id: task_id, status: completed, answer: result[messages][-1].content } staticmethod def reject_task(task_id: str, approver: str, reason: str) - dict: 拒绝任务终止执行 config {configurable: {thread_id: task_id}} # 更新审核状态 approval_agent.update_state( config, { approval_status: ApprovalStatus.REJECTED, approver: approver, approval_comment: reason, messages: [HumanMessage(contentf任务被拒绝原因{reason})] } ) logger.info(f任务{task_id}被{approver}拒绝原因{reason}) return { task_id: task_id, status: rejected, reason: reason }2.2.4 人类介入最佳实践与避坑指南只在必要时使用人类介入过多的人类介入会降低效率应该只在高风险操作和信息不足时使用提供清晰的上下文给审核人员提供足够的上下文信息包括 Agent 的思考过程和工具调用详情设置审核超时时间如果审核人员在规定时间内没有响应应该自动拒绝或升级审核记录完整的审核日志记录所有审核操作包括审核人、审核时间、审核意见便于审计和追溯支持批量审核对于大量的低风险审核任务提供批量审核功能提高效率2.3 子图Subgraph复杂 Agent 的模块化解决方案2.3.1 子图底层原理子图的本质是一个完整的 StateGraph可以作为节点嵌入到另一个 StateGraph 中。子图拥有自己的状态、节点和边但可以与主图共享状态和 Checkpoint。子图的核心优势模块化设计将复杂系统拆分为多个独立的模块每个模块负责一个单一功能代码复用通用子图可以在多个项目中复用避免重复开发团队协作不同的团队可以并行开发不同的子图提高开发效率易于测试每个子图可以单独测试和调试降低测试难度2.3.2 子图高级用法输入输出映射与嵌套子图LangGraph 支持子图的输入输出映射可以将主图的状态字段映射到子图的状态字段反之亦然。这使得子图可以完全独立于主图设计提高了复用性。实战通用工具执行子图与嵌套子图# core/subgraphs/tool_subgraph.py from langgraph.graph import StateGraph from langgraph.prebuilt import ToolNode from core.tools import PRODUCTION_TOOLS from core.react_agent import AgentState from utils.logger import logger # 定义工具子图的状态可以与主图不同 class ToolSubgraphState(BaseModel): tool_calls: list Field(default_factorylist) tool_results: list Field(default_factorylist) def build_tool_subgraph() - StateGraph: 构建通用工具执行子图可复用 def execute_tools(state: ToolSubgraphState) - dict: 执行工具调用 tool_node ToolNode(PRODUCTION_TOOLS) results tool_node.invoke({tool_calls: state.tool_calls}) return {tool_results: results[messages]} builder StateGraph(ToolSubgraphState) builder.add_node(execute_tools, execute_tools) builder.set_entry_point(execute_tools) builder.add_edge(execute_tools, END) return builder.compile() # core/subgraphs/report_subgraph.py from langgraph.graph import StateGraph from core.react_agent import AgentState from core.llm_factory import LLMFactory llm LLMFactory.get_llm() def build_report_subgraph() - StateGraph: 构建报告生成子图嵌套子图示例 def generate_outline(state: AgentState) - dict: 生成报告大纲 outline llm.invoke(f根据以下信息生成报告大纲{state.messages[-1].content}) return {messages: [outline]} def generate_content(state: AgentState) - dict: 生成报告内容 content llm.invoke(f根据以下大纲生成报告内容{state.messages[-1].content}) return {messages: [content]} builder StateGraph(AgentState) builder.add_node(generate_outline, generate_outline) builder.add_node(generate_content, generate_content) builder.add_edge(generate_outline, generate_content) builder.add_edge(generate_content, END) builder.set_entry_point(generate_outline) return builder.compile() # 主图中使用子图 def build_modular_agent() - StateGraph: 构建模块化Agent builder StateGraph(AgentState) # 添加子图作为节点 builder.add_node(agent, agent_think) builder.add_node(tool_subgraph, build_tool_subgraph()) builder.add_node(report_subgraph, build_report_subgraph()) # 子图输入输出映射 # 将主图的tool_calls映射到子图的tool_calls # 将子图的tool_results映射到主图的messages builder.add_edge( agent, tool_subgraph, input_maplambda state: {tool_calls: state.messages[-1].tool_calls}, output_maplambda result: {messages: result[tool_results]} ) builder.add_edge(tool_subgraph, agent) builder.add_edge(report_subgraph, END) def router(state: AgentState) - str: last_message state.messages[-1] if last_message.tool_calls: return tool_subgraph elif 生成报告 in last_message.content: return report_subgraph else: return END builder.add_conditional_edges(agent, router) builder.set_entry_point(agent) graph builder.compile(checkpointerpostgres_checkpointer) logger.info(✅ 模块化Agent构建完成) return graph2.3.3 子图最佳实践与避坑指南单一职责原则每个子图只负责一个单一的功能不要让子图变得过于复杂明确的接口定义子图的输入输出应该清晰明确使用 Pydantic 模型定义独立测试每个子图都应该有自己的单元测试确保在集成到主图之前能够正常工作避免过深的嵌套子图的嵌套深度不要超过 3 层否则会增加调试难度版本管理对子图进行版本管理当子图发生变化时能够方便地回滚到旧版本2.4 并行工具调用大幅提升 Agent 执行效率2.4.1 并行工具调用底层原理LangGraph 1.2 原生支持并行工具调用当 LLM 返回多个工具调用时ToolNode会自动使用线程池并行执行所有工具调用然后将结果合并返回。并行工具调用的前提条件工具调用之间没有依赖关系工具是线程安全的系统有足够的资源支持并行执行2.4.2 实战并行工具调用与依赖处理第一步支持并行工具调用的思考节点def agent_think_with_parallel(state: AgentState) - dict: 思考节点支持并行工具调用 logger.info(fAgent思考中迭代次数{state.iteration_count}/{state.max_iterations}) messages [system_message] state.messages # 明确告诉LLM可以同时调用多个工具 messages.append(SystemMessage(content 你可以同时调用多个独立的工具来完成任务。 如果多个工具之间没有依赖关系请同时调用它们以提高效率。 工具调用之间用逗号分隔。 )) response llm_with_tools.invoke(messages) if state.iteration_count state.max_iterations: logger.warning(Agent超过最大迭代次数强制结束) return { messages: [response], iteration_count: state.iteration_count 1 } # 记录并行工具调用数量 if response.tool_calls: logger.info(fAgent决定并行调用{len(response.tool_calls)}个工具{[tc[name] for tc in response.tool_calls]}) return { messages: [response], iteration_count: state.iteration_count 1 }第二步处理有依赖的工具调用有些工具调用之间存在依赖关系必须串行执行。我们可以通过条件边实现依赖处理def build_agent_with_dependency() - StateGraph: 构建支持依赖工具调用的Agent builder StateGraph(AgentState) builder.add_node(agent, agent_think_with_parallel) builder.add_node(tools, ToolNode(PRODUCTION_TOOLS)) def router(state: AgentState) - str: last_message state.messages[-1] if not last_message.tool_calls: return END # 检查工具调用之间是否有依赖 # 例如get_user_id必须在get_user_order之前执行 tool_names [tc[name] for tc in last_message.tool_calls] if get_user_id in tool_names and get_user_order in tool_names: # 有依赖先执行get_user_id return execute_get_user_id else: # 无依赖并行执行所有工具 return tools # 单独的节点执行有依赖的工具 def execute_get_user_id(state: AgentState) - dict: 执行get_user_id工具 tool_call next(tc for tc in state.messages[-1].tool_calls if tc[name] get_user_id) result TOOL_MAP[get_user_id].invoke(tool_call[args]) return {messages: [ToolMessage(contentresult, tool_call_idtool_call[id])]} builder.add_node(execute_get_user_id, execute_get_user_id) builder.add_edge(tools, agent) builder.add_edge(execute_get_user_id, agent) builder.add_conditional_edges(agent, router) builder.set_entry_point(agent) graph builder.compile(checkpointerpostgres_checkpointer) return graph第三步并行工具调用的错误处理当并行执行多个工具时可能会出现部分成功部分失败的情况。我们需要处理这种情况def execute_tools_with_error_handling(state: AgentState) - dict: 执行工具调用处理部分失败的情况 last_message state.messages[-1] tool_results [] # 使用线程池并行执行工具 from concurrent.futures import ThreadPoolExecutor, as_completed with ThreadPoolExecutor(max_workers5) as executor: futures {} for tool_call in last_message.tool_calls: future executor.submit(TOOL_MAP[tool_call[name]].invoke, tool_call[args]) futures[future] tool_call for future in as_completed(futures): tool_call futures[future] try: result future.result() tool_results.append(ToolMessage( contentresult, tool_call_idtool_call[id], nametool_call[name] )) except Exception as e: logger.error(f工具{tool_call[name]}调用失败{e}) tool_results.append(ToolMessage( contentf工具调用失败{str(e)}请尝试其他方法, tool_call_idtool_call[id], nametool_call[name] )) return {messages: tool_results}2.4.4 并行工具调用最佳实践与避坑指南限制并行度设置合理的最大并行数建议不超过 5避免资源耗尽处理部分失败不要因为一个工具调用失败而终止整个任务应该让 Agent 根据失败结果调整策略避免并行执行有副作用的工具对于修改数据库、发送邮件等有副作用的工具尽量串行执行监控并行性能监控工具调用的执行时间和成功率及时发现性能瓶颈使用异步工具对于 I/O 密集型工具使用异步实现可以进一步提高并行效率2.5 状态回溯与回滚系统容错性的终极保障2.5.1 状态回溯与回滚底层原理状态回溯是指查看历史上任意时刻的系统状态状态回滚是指将系统状态恢复到历史上的某个时刻然后重新执行。回滚的两种核心策略硬回滚删除当前状态之后的所有 Checkpoint系统状态完全恢复到目标 Checkpoint软回滚创建一个新的 Checkpoint将状态设置为目标 Checkpoint 的值保留所有历史记录工业级推荐软回滚保留所有历史记录便于审计和调试。2.5.2 实战完整的状态回溯与回滚系统from core.checkpoint import postgres_checkpointer from core.react_agent import react_agent from langchain_core.messages import HumanMessage from utils.logger import logger class RollbackService: 状态回溯与回滚服务 staticmethod def get_session_history(thread_id: str) - list: 获取会话的完整历史记录 checkpoints list(postgres_checkpointer.list(thread_id)) history [] for checkpoint in checkpoints: state checkpoint[channel_values] history.append({ checkpoint_id: checkpoint[id], step: checkpoint[step], timestamp: checkpoint[created_at], messages: [ {role: msg.type, content: msg.content} for msg in state[messages] ], iteration_count: state[iteration_count] }) return history staticmethod def rollback_to_checkpoint(thread_id: str, checkpoint_id: str, reason: str) - dict: 软回滚到指定的Checkpoint 保留所有历史记录创建一个新的Checkpoint config {configurable: {thread_id: thread_id}} # 获取目标Checkpoint的状态 target_checkpoint postgres_checkpointer.get(thread_id, checkpoint_id) if not target_checkpoint: raise ValueError(fCheckpoint {checkpoint_id}不存在) # 创建回滚消息 rollback_message HumanMessage( contentf系统已回滚到步骤{target_checkpoint[step]}原因{reason}。请从这里继续执行。 ) # 更新状态到目标Checkpoint并添加回滚消息 react_agent.update_state( config, { messages: target_checkpoint[channel_values][messages] [rollback_message], iteration_count: target_checkpoint[channel_values][iteration_count] } ) logger.info(f会话{thread_id}已回滚到Checkpoint {checkpoint_id}原因{reason}) return { thread_id: thread_id, rollback_to_checkpoint_id: checkpoint_id, reason: reason, current_step: target_checkpoint[step] } staticmethod def reexecute_from_checkpoint(thread_id: str, checkpoint_id: str) - str: 从指定的Checkpoint重新执行 # 先回滚到目标Checkpoint RollbackService.rollback_to_checkpoint(thread_id, checkpoint_id, 重新执行) # 继续执行 config {configurable: {thread_id: thread_id}} result react_agent.invoke(None, configconfig) return result[messages][-1].content staticmethod def undo_last_action(thread_id: str) - dict: 撤销上一步操作 checkpoints list(postgres_checkpointer.list(thread_id)) if len(checkpoints) 2: raise ValueError(没有可撤销的操作) # 回滚到上上个Checkpoint target_checkpoint_id checkpoints[-2][id] return RollbackService.rollback_to_checkpoint(thread_id, target_checkpoint_id, 用户撤销操作)2.5.3 状态回溯与回滚最佳实践与避坑指南优先使用软回滚保留所有历史记录便于审计和问题排查明确回滚的边界回滚只能撤销 Agent 的内部状态无法撤销已经执行的外部操作如发送邮件、修改数据库处理外部系统的一致性对于有外部副作用的操作回滚时需要手动补偿如发送取消邮件、回滚数据库事务记录回滚日志记录所有回滚操作包括回滚时间、回滚人、回滚原因限制回滚权限只有授权用户才能执行回滚操作防止滥用三、项目整合升级 RAGService 支持所有高级特性3.1 升级 RAGService代码位置core/rag_service.pyfrom core.react_agent import react_agent, approval_agent from core.checkpoint import postgres_checkpointer from core.approval_service import ApprovalService from core.rollback_service import RollbackService class RAGService: 完整的RAGAgent问答服务第8天高级特性版 def __init__(self): self.retriever RAGRetriever() self.prompt PromptService() self.approval_service ApprovalService() self.rollback_service RollbackService() logger.info(✅ RAGAgent问答服务初始化完成支持所有高级特性) # 原有query和stream_query方法保持不变 # 审核相关方法 def submit_approval_task(self, question: str, user_id: str) - dict: 提交需要审核的任务 return self.approval_service.submit_task(question, user_id) def approve_task(self, task_id: str, approver: str, comment: str ) - dict: 批准任务 return self.approval_service.approve_task(task_id, approver, comment) def reject_task(self, task_id: str, approver: str, reason: str) - dict: 拒绝任务 return self.approval_service.reject_task(task_id, approver, reason) # 会话管理相关方法 def list_all_sessions(self) - list: 列出所有会话 return list_all_sessions() def get_session_history(self, thread_id: str) - list: 获取会话历史 return self.rollback_service.get_session_history(thread_id) def delete_session(self, thread_id: str) - bool: 删除会话 try: postgres_checkpointer.delete_thread(thread_id) return True except Exception as e: logger.error(f删除会话失败{e}) return False # 回滚相关方法 def rollback_to_checkpoint(self, thread_id: str, checkpoint_id: str, reason: str) - dict: 回滚到指定Checkpoint return self.rollback_service.rollback_to_checkpoint(thread_id, checkpoint_id, reason) def undo_last_action(self, thread_id: str) - dict: 撤销上一步操作 return self.rollback_service.undo_last_action(thread_id) def reexecute_from_checkpoint(self, thread_id: str, checkpoint_id: str) - str: 从指定Checkpoint重新执行 return self.rollback_service.reexecute_from_checkpoint(thread_id, checkpoint_id)3.2 新增高级特性 API 接口代码位置main8.pyfrom dotenv import load_dotenv load_dotenv() from core.rag_service import RAGService from fastapi import FastAPI, HTTPException from pydantic import BaseModel import uvicorn app FastAPI(title企业级RAGAgent APIDay8, version1.0.0) rag_service RAGService() class QueryRequest(BaseModel): question: str user_id: str default_user use_agent: bool False class ToolCallApprovalRequest(BaseModel): thread_id: str approver: str admin reason: str class ApprovalRequest(BaseModel): thread_id: str approver: str admin reason: str class RollbackRequest(BaseModel): thread_id: str checkpoint_id: str reason: str 用户操作 class QueryResponse(BaseModel): answer: str mode: str app.post(/api/query, response_modelQueryResponse) async def query(request: QueryRequest): 通用问答接口支持RAG和Agent两种模式 try: answer rag_service.query( questionrequest.question, user_idrequest.user_id, use_agentrequest.use_agent ) return QueryResponse( answeranswer, modeagent if request.use_agent else rag ) except Exception as e: raise HTTPException(status_code500, detailstr(e)) app.post(/api/agent/approval/submit) async def submit_approval_task(request: QueryRequest): 提交需要审核的任务 try: return rag_service.submit_approval_task( questionrequest.question, user_idrequest.user_id ) except Exception as e: raise HTTPException(status_code500, detailstr(e)) app.post(/api/agent/approval/approve) async def approve_approval(request: ApprovalRequest): 批准当前步骤的审核继续执行 try: return rag_service.approve_task( task_idrequest.thread_id, approverrequest.approver, commentrequest.reason ) except Exception as e: raise HTTPException(status_code500, detailstr(e)) app.post(/api/agent/approval/reject) async def reject_approval(request: ApprovalRequest): 拒绝当前步骤的审核终止任务 try: return rag_service.reject_task( task_idrequest.thread_id, approverrequest.approver, reasonrequest.reason ) except Exception as e: raise HTTPException(status_code500, detailstr(e)) app.post(/api/agent/rollback) async def rollback_to_checkpoint(request: RollbackRequest): 回滚到指定Checkpoint try: return rag_service.rollback_to_checkpoint( thread_idrequest.thread_id, checkpoint_idrequest.checkpoint_id, reasonrequest.reason ) except Exception as e: raise HTTPException(status_code500, detailstr(e)) app.post(/api/agent/undo) async def undo_last_action(thread_id: str): 撤销上一步操作 try: return rag_service.undo_last_action(thread_id) except Exception as e: raise HTTPException(status_code500, detailstr(e)) app.post(/api/agent/reexecute) async def reexecute_from_checkpoint(request: RollbackRequest): 从指定Checkpoint重新执行 try: answer rag_service.reexecute_from_checkpoint( thread_idrequest.thread_id, checkpoint_idrequest.checkpoint_id ) return {answer: answer} except Exception as e: raise HTTPException(status_code500, detailstr(e)) app.get(/api/health) async def health_check(): 健康检查接口 return {status: healthy, service: rag-agent-api-day8} if __name__ __main__: print( 第8天Agent增强功能审核、回滚、撤销) print(API服务启动中访问 http://localhost:8000/docs 查看接口文档) uvicorn.run(app, host0.0.0.0, port8000)这里还有个问题重复调用工具
http://www.gsyq.cn/news/1397075.html

相关文章:

  • 20 + 维度全景透视:数据驱动下的品牌 GEO 健康度实战报告
  • 中文文献管理难题如何破解?Jasminum为Zotero带来智能化解决方案
  • 无细胞表达技术助力腾讯AI Lab在Nature子刊发文,实现蛋白设计闭环
  • 创业公司如何利用taotoken的token plan套餐,精细化控制ai模型调用成本
  • 【信息系统项目管理师-选择真题】2026上半年(第二批)综合知识答案和详解(回忆版)
  • CentOS 7 上保姆级安装NUMECA Fine 10.1:从依赖检查到License配置的完整避坑指南
  • 2026年喜利得胶/植筋胶/结构胶/加固胶/锚固胶厂家推荐:耐高温耐腐蚀环氧树脂,注射式高强粘结力专业品牌榜单深度解析 - 企业推荐官【官方】
  • 如何免费解锁WeMod专业版功能:完整三步终极指南
  • 终极指南:XXMI启动器 - 一站式多游戏模组管理平台免费使用教程
  • 3分钟搞定中文文献管理:Zotero茉莉花插件终极指南
  • 告别黑窗口!用Xmanager 5在Windows上丝滑操作远程CentOS图形界面
  • 【多智能体】基于多智能体多视角三维空间定位的神经动力学方法附Matlab代码
  • 告别Windows音量弹窗:用HideVolumeOSD重获纯净桌面体验
  • 2026年5月川内钢模板企业实测评测:附近钢钢模板、隧道钢模板、塑料模板价格、塑料模板多少钱一张、建筑塑料模板批发选择指南 - 优质品牌商家
  • 思维导图笔记:大模型幻觉问题
  • 深度解析RAGFlow:超越基础架构图的实战级生产级RAG引擎全解
  • NSSM服务管理避坑指南:除了install/start,这些set命令让你的服务更稳定
  • 基于双曲深度学习与增强SPICE模型的SiC MOSFET阈值电压智能监测
  • 从一次排障经历说起:Ubuntu服务器telnet服务起不来?教你一步步诊断和解决(openbsd-inetd重启无效的看这里)
  • CNND-BRT:基于动态图神经网络的软件缺陷自动分派框架
  • 基于大语言模型与提示词工程构建AI创业想法评估工具
  • Akagi V3:从麻将新手到高手的智能进化之路
  • 抖音去水印下载哪个工具好用?2026配音无印vs司马去水印实测 - 科技大爆炸
  • 影刀RPA店群自动化:脚本智能调参与自适应等待策略工程实践
  • sklearn实战:从混淆矩阵到AUC,详解roc_auc_score()在二分类模型评估中的正确用法
  • PLC串口转网口数据采集网关有什么功能应用
  • 深度学习钓鱼攻击检测:从URL分析到混合特征模型的实战解析
  • Lovable平台搭建必须掌握的6类核心CRD定义,错过将导致边缘自治能力归零
  • SGEformer:基于Transformer的电池健康预测模型解析与实践
  • 全球仅37家认证伙伴掌握的PlayAI多语种术语一致性校验秘技(含自研TermGuard工具链)