当前位置: 首页 > news >正文

[MAF工作流框架揭秘-10]基于Open-Telemetry的调用链跟踪

可观测行已经成为部署于生产环境的应用必需具备的能力。而提到可观测行,Open-Telemetry无疑是目前最流行的开源工具之一。基于Open-Telemetry的调用链跟踪和性能监控被应用到MAF的方方面面,在如下的两篇文章中,我详细介绍了基于ChatClient和Agent的调用链跟踪和性能监控:

  • OpenTelemetryChatClient-实现链路跟踪和性能监控
  • 基于Agent的调用链跟踪和性能监控

就调用链跟踪来说,起始Wokflow更有必要。原因很简单,Worflow往往具有复杂的网络拓扑结构,具体的执行路径由消息路由决定。每次调用都可以因为输入、当前外部状态以及LLM的输出的不同而走不同的路径。基于Open-Telemetry的调用链跟踪可以帮助我们清晰地看到每次调用的具体执行路径,进而分析和优化Workflow的设计是否合理,并在出错的时候快速定位问题所在的环节。

1. 跟踪Workflow的执行路径

在正是介绍Workflow针对Open-Telemetry的调用链跟踪的设计和实现原理之前,我们先来看一个简单的示例程序,来看看Workflow的调用链跟踪在实际中的表现。为了能够收集跟踪数据并可视化呈现调用的完整链条,我们本机执行如下的命令安装了Jaeger:

dockerrun-d--namejaeger\-p16686:16686\-p4317:4317\jaegertracing/all-in-one:latest

在如下的演示程序中,我们构建了一个简单的Workflow,包含4个节点,分别是FooBarBazQux,其中Foo节点的输出同时作为BarBaz节点的输入,而Qux节点则需要等待BarBaz节点都完成后才能执行。我们在Workflow构建的时候调用了WithOpenTelemetry方法来启用Open-Telemetry的调用链跟踪功能,并且提供了一个ActivitySource对象来收集跟踪数据。在每个节点的实现中,我们模拟了一个随机的执行时间来模拟操作的耗时。

usingMicrosoft.Agents.AI.Workflows;usingOpenTelemetry;usingOpenTelemetry.Metrics;usingOpenTelemetry.Resources;usingOpenTelemetry.Trace;usingSystem.Diagnostics;varserviceName="maf.workflow";varservceVersion="1.0.0";varrandom=newRandom();varfoo=CreateExecutor("Foo");varbar=CreateExecutor("Bar");varbaz=CreateExecutor("Baz");varqux=CreateExecutor("Qux");using(Sdk.CreateTracerProviderBuilder().SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(serviceName,serviceVersion:servceVersion)).AddSource(serviceName).AddConsoleExporter().AddOtlpExporter(options=>{options.Endpoint=newUri("http://localhost:4317");options.Protocol=OpenTelemetry.Exporter.OtlpExportProtocol.Grpc;}).Build()){while(true){awaitusingvarrun=awaitInProcessExecution.Default.RunStreamingAsync(BuildWorkflow(),"start");awaitrun.RunToCompletionAsync();}}ExecutorBindingCreateExecutor(stringid)=>newFunc<string,ValueTask<string>>(asyncinput=>{awaitTask.Delay(random.Next(100,500));returnid;}).BindAsExecutor(id);WorkflowBuildWorkflow(){returnnewWorkflowBuilder(foo).AddFanOutEdge(foo,[bar,baz]).AddFanInBarrierEdge([bar,baz],qux).WithOutputFrom(qux).WithOpenTelemetry(options=>options.EnableSensitiveData=true,newActivitySource(serviceName)).Build();}

FooBarBazQux四个节点构成的Workflow具有如下的结构:

Jaeger呈现的调用具有与之匹配的结构,我们可以清晰地看到每次调用的具体执行路径:

从外到内,调用链记录下如下描述执行操作的Span:

  • workflow.session: 代表一个可以涵盖多次调用的Workflow Session,一个Session绑定一个StreamingRun或者Run对象;
  • workflow_invoke: 代表在Session内针对Workflow的一次调用;
  • executor.process: 代表Workflow中一个Executor节点的一次执行;
  • message_send: 代表节点将消息发送给下一个节点;
  • edge_group.process: 代表针对始于某个节点的多条边的处理;

除了执行Workflow的调用链跟踪,Workflow的构建过程同样被记录在另一个调用链中。

![Alternative Text][1782049741740]

虽然这个调用只有一个单一的Span,但是这个Span利用一个名为workflow.definition的Tag记录了Workflow的定义信息,这个信息可能帮助我们确定构建的Workflow是否符合预期。在我们这个例子中,构建的Workflow被描述为如下这个JSON:

{"executors":{"Foo":{"executorType":{"assemblyName":"Microsoft.Agents.AI.Workflows, Version=1.10.0.0, Culture=neutral, PublicKeyToken=f300afd708cefcd3","typeName":"Microsoft.Agents.AI.Workflows.FunctionExecutor`2[[System.String, System.Private.CoreLib, Version=10.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.String, System.Private.CoreLib, Version=10.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]]"},"executorId":"Foo"},"Bar":{"executorType":{"assemblyName":"Microsoft.Agents.AI.Workflows, Version=1.10.0.0, Culture=neutral, PublicKeyToken=f300afd708cefcd3","typeName":"Microsoft.Agents.AI.Workflows.FunctionExecutor`2[[System.String, System.Private.CoreLib, Version=10.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.String, System.Private.CoreLib, Version=10.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]]"},"executorId":"Bar"},"Baz":{"executorType":{"assemblyName":"Microsoft.Agents.AI.Workflows, Version=1.10.0.0, Culture=neutral, PublicKeyToken=f300afd708cefcd3","typeName":"Microsoft.Agents.AI.Workflows.FunctionExecutor`2[[System.String, System.Private.CoreLib, Version=10.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.String, System.Private.CoreLib, Version=10.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]]"},"executorId":"Baz"},"Qux":{"executorType":{"assemblyName":"Microsoft.Agents.AI.Workflows, Version=1.10.0.0, Culture=neutral, PublicKeyToken=f300afd708cefcd3","typeName":"Microsoft.Agents.AI.Workflows.FunctionExecutor`2[[System.String, System.Private.CoreLib, Version=10.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.String, System.Private.CoreLib, Version=10.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]]"},"executorId":"Qux"}},"edges":{"Foo":[{"$type":1,"hasAssigner":false,"kind":1,"connection":{"sourceIds":["Foo"],"sinkIds":["Bar","Baz"]}}],"Bar":[{"$type":2,"kind":2,"connection":{"sourceIds":["Bar","Baz"],"sinkIds":["Qux"]}}],"Baz":[{"$type":2,"kind":2,"connection":{"sourceIds":["Bar","Baz"],"sinkIds":["Qux"]}}]},"requestPorts":[],"startExecutorId":"Foo","outputExecutorIds":{"Qux":[]}}

2. 将Agent和LLM调用的操作纳入调用链跟踪

调用链的完整性决定了其价值。如果Workflow具有一个或者多个基于AIAgent的节点,那么将Agent和LLM调用的操作纳入调用链跟踪是非常有必要的。我们可以按照开篇提到的两篇文章中介绍的方式启用Agent和LLM调用的操作的调用链跟踪。在如下这个演示程序中,我们创建的Workflow只有一个基于AIAgent的节点。在构建这个AIAgent管道时,我们调用了UseOpenTelemetry方法来启用基于Agent调用的调用链跟踪。内部的ChatClient管道同样调用了UseOpenTelemetry方法来启用基于LLM调用的调用链跟踪。

usingAzure;usingMicrosoft.Agents.AI;usingMicrosoft.Agents.AI.Workflows;usingMicrosoft.Extensions.AI;usingOpenAI;usingOpenTelemetry;usingOpenTelemetry.Metrics;usingOpenTelemetry.Resources;usingOpenTelemetry.Trace;usingSystem.ComponentModel;usingSystem.Diagnostics;dotenv.net.DotEnv.Load();varserviceName="maf.workflow";varservceVersion="1.0.0";varendpoint=Environment.GetEnvironmentVariable("OPENAI_URL")!;varmodel=Environment.GetEnvironmentVariable("MODEL")!;varapiKey=Environment.GetEnvironmentVariable("API_KEY")!;varagent=newOpenAIClient(credential:newAzureKeyCredential(apiKey),options:newOpenAIClientOptions{Endpoint=newUri(endpoint)}).GetChatClient(model:model).AsIChatClient().AsBuilder().UseOpenTelemetry(sourceName:serviceName).Build().AsAIAgent(name:"MyAgent",tools:[AIFunctionFactory.Create(GetWeather,nameof(GetWeather))]).AsBuilder().Use(inner=>newOpenTelemetryAgent(inner,serviceName,true)).Build();using(Sdk.CreateTracerProviderBuilder().SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(serviceName,serviceVersion:servceVersion)).AddSource(serviceName).AddOtlpExporter(options=>{options.Endpoint=newUri("http://localhost:4317");options.Protocol=OpenTelemetry.Exporter.OtlpExportProtocol.Grpc;}).Build()){while(true){varworkflow=newWorkflowBuilder(agent).WithOutputFrom(agent).WithOpenTelemetry(options=>options.EnableSensitiveData=true,newActivitySource(serviceName)).Build();awaitusingvarrun=awaitInProcessExecution.Default.RunAsync(workflow,"根据目前苏州天气,提供着装建议。");foreach(varresultinrun.NewEvents.OfType<WorkflowOutputEvent>())Console.Write(result.Data);awaitTask.Delay(5000);}}[Description("获取指定城市天气信息")]staticstringGetWeather([Description("城市名称")]stringcity)=>$"{city}目前天气晴,气温25摄氏度。";

在Jaeger中,我们同样可以清晰地看到Agent、工具和LLM调用的操作被纳入了调用链跟踪:

3. WorkflowTelemetryContext

基于Open-Telemetry调用链跟踪在.NET是通过ActivityActivitySourceActivityListener这三个核心类型实现的,代表每个跟踪操作的Span对应一个Activity对象。我们前面利用Jaeger展示了Workflow构建和调用的执行路径,涉及的每个操作对应的Activity都是通过WorkflowTelemetryContext这个类来创建的。

internalsealedclassWorkflowTelemetryContext{publicActivity?StartWorkflowBuildActivity();publicActivity?StartWorkflowSessionActivity();publicActivity?StartWorkflowRunActivity();publicActivity?StartExecutorProcessActivity(stringexecutorId,string?executorType,stringmessageType,object?message);publicActivity?StartEdgeGroupProcessActivity();publicActivity?StartMessageSendActivity(stringsourceId,string?targetId,object?message);}

WorkflowTelemetryContext用于创建Activity的方法说明如下:

  • StartWorkflowBuildActivity: 代表Workflow的构建操作,名称为workflow.build
  • StartWorkflowSessionActivity: 代表Workflow Session的开始,名称为workflow.session
  • StartWorkflowRunActivity: 代表Workflow Run的开始,名称为workflow.invoke
  • StartExecutorProcessActivity: 代表Workflow中一个Executor节点的执行操作,名称为executor.process
  • StartEdgeGroupProcessActivity: 代表针对始于某个节点的多条边的处理操作,名称为edge_group.process
  • StartMessageSendActivity: 代表节点将消息发送给下一个节点的操作,名称为message.send

对于Tracing这种对性能影响极大的高频操作,一般都需要一个显式的开关。对于Worflow来说,只有WorkflowTelemetryContextIsEnabled属性为true,我们才会创建对应的Activity来记录调用链跟踪数据。静态属性Disabled提供了一个默认了一个IsEnabledfalseWorkflowTelemetryContext实例。只有在启用的情况下,WorkflowTelemetryContext才会创建一个真正的ActivitySource来创建Activity对象。如果没有显式指定ActivitySource,那么WorkflowTelemetryContext会默认创建一个命名为Microsoft.Agents.AI.WorkflowsActivitySource

internalsealedclassWorkflowTelemetryContext{publicboolIsEnabled{get;}publicstaticWorkflowTelemetryContextDisabled{get;}=newWorkflowTelemetryContext();publicWorkflowTelemetryOptionsOptions{get;}publicActivitySourceActivitySource{get;}publicWorkflowTelemetryContext(WorkflowTelemetryOptionsoptions,ActivitySource?activitySource=null);}

4. 跟踪实现

WorkflowBuilder定义了如下这个名为_telemetryContext的字段来持有WorkflowTelemetryContext对象,其默认值来源于WorkflowTelemetryContext的静态属性Disabled。它同时提供了一个SetTelemetryContext方法来设置这个字段的值。WithOpenTelemetry扩展方法会创建一个新的WorkflowTelemetryContext实例,并通过调用SetTelemetryContext方法将其设置到WorkflowBuilder中。

publicclassWorkflowBuilder{privateWorkflowTelemetryContext_telemetryContext=WorkflowTelemetryContext.Disabled;internalvoidSetTelemetryContext(WorkflowTelemetryContextcontext)=>_telemetryContext=context;}publicstaticWorkflowBuilderWithOpenTelemetry(thisWorkflowBuilderbuilder,Action<WorkflowTelemetryOptions>?configure=null,ActivitySource?activitySource=null){WorkflowTelemetryOptionsworkflowTelemetryOptions=newWorkflowTelemetryOptions();configure?.Invoke(workflowTelemetryOptions);WorkflowTelemetryContexttelemetryContext=newWorkflowTelemetryContext(workflowTelemetryOptions,activitySource);builder.SetTelemetryContext(telemetryContext);returnbuilder;}

WithOpenTelemetry扩展方法除了提供一个可选的ActivitySource参数用来指定构建WorkflowTelemetryContext实例时使用的ActivitySource之外,还提供了一个可选的configure参数用来配置WorkflowTelemetryOptions对象。

publicsealedclassWorkflowTelemetryOptions{publicboolEnableSensitiveData{get;set;}publicboolDisableWorkflowBuild{get;set;}publicboolDisableWorkflowRun{get;set;}publicboolDisableExecutorProcess{get;set;}publicboolDisableEdgeGroupProcess{get;set;}publicboolDisableMessageSend{get;set;}}

WorkflowTelemetryOptions提供的配置选项说明如下:

  • EnableSensitiveData: 是否在调用链跟踪中包含敏感数据,比如消息内容等,默认值为false;
  • DisableWorkflowBuild: 是否禁用Workflow构建操作的调用链跟踪,默认值为false;
  • DisableWorkflowRun: 是否禁用Workflow调用操作的调用链跟踪,默认值为false;
  • DisableExecutorProcess: 是否禁用Workflow中Executor节点执行操作的调用链跟踪,默认值为false;
  • DisableEdgeGroupProcess: 是否禁用针对始于某个节点的多条边的处理操作的调用链跟踪,默认值为false;
  • DisableMessageSend: 是否禁用节点将消息发送给下一个节点的操作的调用链跟踪,默认值为false。

当我们调用WorkflowBuilderBuild方法创建Workflow对象时,内部会调用WorkflowTelemetryContextStartWorkflowBuildActivity方法来创建一个代表Workflow构建操作的Activity对象,并设置以Workflow定义为核心的Tag。于此同时,WorkflowBuilder会将持有的WorkflowTelemetryContext对象传递给Workflow的构造函数,并由Workflow来持有这个对象。Workflow在执行过程中会根据需要调用WorkflowTelemetryContext的方法来创建Activity对象,从而实现基于Open-Telemetry的调用链跟踪。

publicclassWorkflow{internalWorkflowTelemetryContextTelemetryContext{get;}}
http://www.gsyq.cn/news/1640892.html

相关文章:

  • 零基础可视化看板搭建:从交互到下钻全流程
  • 智谱 GLM-5.2 凌晨上新,Code Arena 全球第一意味着什么?
  • AI 导出鸭实操指南:智谱清言生成 word 文档指令落地使用技巧
  • CSUR:城市天际线道路系统的终极解决方案,告别单调道路设计
  • 阴极发光在 SEM 分析中的应用
  • AI果蔬清洗分拣工段智能控制系统
  • Claude 怎么把表格导出|AI 导出鸭一站式表格导出操作全教程
  • 发送http请求的自定义函数库文件
  • 【关注可白嫖源码】--课程设计--毕业设计--springboot微博客户端[编号:project34944](案例分析)
  • FlexASIO终极指南:让普通音频设备拥有专业级ASIO性能
  • 如何快速配置开源Android电视播放器:VLC电视版完整操作指南
  • 【关注可白嫖源码】--课程设计+毕业设计+springbootDream car车辆租赁系统[编号:project37878](案例分析)
  • 5个理由告诉你为什么VIA是机械键盘配置的终极选择
  • 终极Wand-Enhancer完全指南:5分钟解锁游戏修改器完整高级功能
  • ZLMediaKit 9.0版本下载编译
  • AWS、微软、谷歌和 Anthropic 悄悄做了同一件事:Session 正在取代请求,成为 Agent 的新计算单元
  • 不同进程的线程切换**不一定引起进程切换**,但**必然涉及进程上下文切换(即进程切换)**——这里需要明确概念辨析
  • 2026年7月亲测,汽修引流这样干超有效!
  • 从0到1用C#开发ABB机器人上位机:PC SDK通信+运动控制+状态监控
  • TVA在具身智能商业化部署中的技术突破(10)
  • 论文学习:2.Semi-Supervised Classification with Graph Convolutional Networks(1)
  • Python练习题2
  • merge、concat、join:三张表合并搞崩你的不是语法是逻辑
  • TPA3128D2音频放大器与PIC18F4458微控制器的集成应用
  • Cangaroo:免费开源CAN总线分析软件的完整指南
  • TVA对具身智能领域的核心技术支撑(19)
  • 跨平台macOS组件下载神器:gibMacOS完全指南
  • 基于计算机视觉的课堂行为分析:从姿态估计到专注度评估实战
  • Py-GCMS 与 FTIR 的性能比较
  • TVA对具身智能领域“莫拉维克悖论“的挑战(10)