Temporal 服务器源码架构分析
基于 temporalio/temporal 仓库源码深度分析
仓库地址:https://github.com/temporalio/temporal
目录
- 概述
- 项目结构
- 四大核心服务
- 事件溯源与 Workflow 历史
- 持久化层架构
- History Service 深度分析
- 6.1 History 分片
- 6.2 Mutable State
- 6.3 状态转换
- 6.4 队列处理
- 6.5 一致性保证
- Matching Service
- 7.1 Task Queue 分区
- 7.2 转发机制
- Frontend Service
- Worker Service 与内部 Worker
- 9.1 架构定位
- 9.2 源码结构
- 9.3 WorkerComponent 注册机制
- 9.4 各组件职责
- 9.5 为什么需要内部 SDK Worker
- 补充机制
- 10.1 任务调度框架
- 10.2 CHASM 框架
- 10.3 重试与容错
- 10.4 Speculative Workflow Task & Workflow Update
- 10.5 Nexus RPC
- Temporal Web UI
- 11.1 数据获取架构
- 11.2 调用的 API
- 11.3 Workflow 列表
- 11.4 Workflow 详情页
- 11.5 Schedule 查看
- 11.6 操作能力
- 11.7 与任务看板的区别
- Temporal + AI Agent 实践
- 12.1 为什么 Temporal 适合 Agent 编排
- 12.2 架构模式:Agent-as-Workflow
- 12.3 主流 Agent 框架集成生态
- 12.4 业界成功实践
- 12.5 Temporal AI Cookbook
- 12.6 与 Hermes Agent 的类比
- 总结
- 13.1 架构模式对照表
- 13.2 关键技术指标
- 13.3 演进趋势
1. 概述
Temporal 是一个持久化执行平台(Durable Execution Platform),源自 Uber 的 Cadence 项目分支,由 Temporal Technologies 公司开发维护。它允许开发者以代码方式定义 Workflow,平台自动处理间歇性故障、重试失败操作,保证 Workflow 在进程崩溃、网络中断等场景下仍能正确执行。
核心设计决策:
| 维度 | 决策 |
|---|---|
| 持久化模型 | 事件溯源(Event Sourcing)—— 每个 Workflow 执行维护一个只增的 History Event 序列 |
| 用户代码隔离 | Workflow 代码必须确定性(Deterministic),Activity 代码要求幂等 |
| 代码执行位置 | 用户代码在用户自有的 Worker 进程中执行,而非服务器 |
| 服务架构 | 微服务:Frontend / History / Matching / Worker 四大服务 |
| 编程语言 | Go(服务器),SDK 支持 Go/Java/Python/TypeScript 等 |
| 许可证 | MIT |
2. 项目结构
temporal/
├── api/ # Protobuf 定义和生成的 Go 代码
│ ├── adminservice/ # 管理服务 API
│ ├── workflowservice/ # 面向用户的 Workflow API
│ └── ...
├── chasm/ # CHASM 框架(新的 ASM 框架,用于 Scheduler 等)
├── client/ # 服务间通信的 gRPC 客户端
├── cmd/ # CLI 入口 ── cmd/server/main.go
├── common/ # 跨服务共享模块
│ ├── persistence/ # 持久化层抽象和实现(Cassandra/SQL/SQLite)
│ ├── tasks/ # 通用任务调度框架
│ ├── dynamicconfig/ # 动态配置系统
│ ├── membership/ # 集群成员管理(Ringpop)
│ ├── metrics/ # 指标定义和收集
│ └── namespace/ # Namespace 缓存和工具
├── components/ # 组件化服务(nexusoperations, callbacks)
├── config/ # 配置文件和模板
├── docs/ # 文档(含 architecture/ 核心架构文档)
├── proto/ # 内部服务 Protobuf 定义
├── schema/ # 数据库 Schema(Cassandra/SQL)
├── service/ # 四大核心服务
│ ├── frontend/ # 前端服务——用户入口
│ ├── history/ # 历史服务——Workflow 核心引擎
│ │ └── api/ # 70+ gRPC handler(每 API 独立子目录)
│ ├── matching/ # 匹配服务——Task Queue 管理
│ └── worker/ # 内部 Worker
├── temporal/ # 服务器启动和生命周期
└── temporaltest/ # 测试工具
关键技术栈:
| 技术 | 用途 |
|---|---|
| Go 1.26 | 主开发语言(模块路径 go.temporal.io/server) |
| gRPC + Protobuf | 服务间通信和 API 定义(temporal/api 和 proto/internal) |
| uber-go/fx | 依赖注入框架(各 Service 用 fx.Module 组装) |
| Cassandra / MySQL / PostgreSQL / SQLite | 四种持久化后端 |
| Ringpop | 集群成员管理和分片协调 |
| OpenTelemetry | 分布式追踪 + Prometheus 指标导出 |
| gorilla/mux | HTTP 路由(Nexus/Callback 端点) |
3. 四大核心服务
Temporal 服务器由四个独立部署的微服务组成,由 temporal/temporal.go 通过 fx 统一组装启动:
┌──────────────────┐│ Temporal CLI │── cmd/server/main.go└────────┬─────────┘│┌────────▼─────────┐│ temporal.New() │── uber-go/fx DI 容器└────────┬─────────┘│┌───────────────────┼───────────────────┐▼ ▼ ▼┌──────────┐ ┌──────────┐ ┌──────────┐│ Frontend │◄─────▶│ History │◄─────▶│ Matching ││ Service │ │ Service │ │ Service │└────┬─────┘ └────┬─────┘ └────┬─────┘│ │ │└──────────────────┴──────────────────┘│┌───────┴───────┐│ Persistence │└───────────────┘┌──────────────────┐│ User Worker │ (用户自有进程)│ (SDK Runtime) │└──────────────────┘
| 服务 | 职责 | 关键文件 | 源码规模 |
|---|---|---|---|
| Frontend | 用户入口,接收 gRPC/HTTP 请求,路由到 History/Matching | service/frontend/handler.go |
单一 handler,~2600 行 |
| History | Workflow 执行引擎,管理状态和事件序列,驱动完整生命周期 | service/history/handler.go |
最大模块,80+ 子目录 |
| Matching | 管理 Task Queue,接收 Frontend 转发的 Worker Poll 请求并匹配任务 | service/matching/ |
中等规模 |
| Worker | 内部系统 Worker(归档、可见性等),非用户 Worker | service/worker/ |
最小模块 |
完整请求流(Start → 执行 Activity → Complete):
用户 ──StartWorkflowExecution──▶ Frontend│ shard 路由▼History: 初始化事件 + Transfer Task│ QueueProcessor 异步消费▼Matching: AddWorkflowTask│Worker ──PollWorkflowTask──▶ Frontend└── gRPC → MatchingWorker ◀── WorkflowTask ─── Frontend└── Matching 响应Worker ──RespondWorkflowTaskCompleted(ScheduleActivity)──▶ History│ 追加事件 + Transfer Task▼Matching: AddActivityTaskWorker ──PollActivityTask──▶ Frontend└── gRPC → MatchingWorker ◀── ActivityTask ─── Frontend└── Matching 响应Worker 执行 ActivityWorker ──RespondActivityTaskCompleted──▶ History → 循环直至 Complete
3.1 依赖注入架构
整个服务器使用 uber-go/fx 组装:
temporal/temporal.go 中的 Server:
├── resource.Module # 共享资源(日志、指标、成员管理)
├── persistenceClient.Module # 持久化客户端
├── frontend.Module # Frontend gRPC + HTTP 服务
├── history.Module # History 服务(含 shard、队列、cache)
├── matching.Module # Matching 服务
└── worker.Module # Worker 服务
每个 Service Module 内部进一步拆分(以 History 为例):
var Module = fx.Options(resource.Module,fx.Provide(hsm.NewRegistry), // HSM 状态机注册表shard.Module, // 分片管理events.Module, // 事件缓存cache.Module, // Workflow 缓存archival.Module, // 归档ChasmEngineModule, // CHASM 引擎fx.Provide(ConfigProvider),fx.Provide(workflow.NewCommandHandlerRegistry),// ... 多个 Interceptor Provider
)
4. 事件溯源与 Workflow 历史
Temporal 最核心的设计模式是事件溯源(Event Sourcing)。每个 Workflow 执行维护一个只增的 History Event 序列,所有所需状态都可以通过重放(Replay)这个历史来重建。
4.1 History Event
Event 类型定义在 temporal/api/enums/v1/event_type.proto,常见的包括:
| Event 类型 | 触发场景 |
|---|---|
WorkflowExecutionStarted |
Workflow 启动 |
WorkflowTaskScheduled / Started / Completed |
Workflow Task 生命周期 |
ActivityTaskScheduled / Started / Completed |
Activity 生命周期 |
TimerStarted / TimerFired |
定时器触发 |
WorkflowExecutionCompleted / Failed / TimedOut |
Workflow 结束 |
WorkflowTaskFailed / ActivityTaskFailed |
任务失败 |
注意:Temporal 中的 "event" 特指 Workflow History Event,不同于事件驱动架构中的"事件"——它是服务器收到外部输入后内部计算出的 Workflow 执行状态精确描述,而非系统间异步消息。
4.2 三种持久化策略
| 策略 | 描述 | 场景 |
|---|---|---|
| History Events | 不可变事件序列,追加写入 | 所有状态变更 |
| Mutable State | 从事件计算的当前状态快照 | 快速读取(Activity 列表、Timer 等) |
| Tasks | 待处理的异步工作单元 | Transfer、Timer、Replication 等 |
Event Sourcing 的三个核心优势:
- 持久化容错:所有状态可从历史重建
- 确定性重放:Worker 通过重放历史恢复 Workflow 状态
- 审计追踪:完整的不可变执行记录
5. 持久化层架构
在深入具体服务之前,先理解存储层——这是所有服务依赖的基础设施。持久化层位于 common/persistence/,提供统一的接口抽象,支持四种后端。
5.1 接口分层
应用层(History/Matching Service)│▼
接口层(data_interfaces.go 定义接口)│┌───────┬────────┬────────┐▼ ▼ ▼ ▼Cassandra MySQL PostgreSQL SQLite(gocql) (mysql) (pgx) (modernc)
核心接口文件:
common/persistence/
├── data_interfaces.go # ExecutionManager / TaskManager / HistoryManager 等接口
├── execution_manager.go # CreateWorkflowExecution / UpdateWorkflowExecution
├── history_manager.go # AppendHistoryNodes / ReadHistoryBranch
├── shard.go # GetShard / UpdateShard
├── task_store.go # CreateTask / GetTasks / CompleteTask
├── cluster_metadata.go # 集群元数据
├── nexus_endpoint_store.go # Nexus Endpoint 存储
├── cassandra/ # Cassandra 实现
├── sql/ # SQL 实现(含 factory 按 driver 类型创建)
│ └── sqlplugin/ # mysql/ postgresql/ sqlite 插件
└── client/ # 客户端封装(重试、缓存)
5.2 关键数据库表(Cassandra 为例)
-- executions 表:Mutable State + Shard 元数据
schema/cassandra/temporal/schema.cql:7
-- 核心列: shard_id, type, namespace_id, workflow_id, run_id, data, data_encoding-- history_node / history_tree:History Event 序列
schema/cassandra/temporal/schema.cql:56,70
-- history_node: tree_id, branch_id, node_id, prev_txn_id, data, data_encoding
-- history_tree: shard_id, tree_id, branch_id, data-- 可见性存储:Workflow 搜索属性
schema/cassandra/visibility/schema.cql
5.3 主存储接口设计
// data_interfaces.go - 核心接口
type ExecutionManager interface {CreateWorkflowExecution(ctx, request) (response, error)UpdateWorkflowExecution(ctx, request) (response, error) // 核心写入路径GetWorkflowExecution(ctx, request) (response, error)
}type HistoryManager interface {AppendHistoryNodes(ctx, request) (response, error)ReadHistoryBranch(ctx, request) (response, error)ForkHistoryBranch(ctx, request) (response, error) // Reset/分支DeleteHistoryBranch(ctx, request) error
}
6. History Service 深度分析
History Service 是 Temporal 最核心的组件,负责管理每个 Workflow 执行的生命周期。其架构围绕 Shard → Engine → QueueProcessor → Executor 层层展开。
6.1 History 分片
目的:支撑水平扩展,管理数百万并发 Workflow 执行。
Temporal Cluster│├── History Host 1 ──── Shard 0, Shard 1, Shard 3├── History Host 2 ──── Shard 2, Shard 4, Shard 5└── History Host 3 ──── Shard 6, Shard 7, ...
- Shard 总数在集群创建时固定(不可更改)
- 分片所有权由
ShardController+ Ringpop(Uber 开源的 SWIM 协议实现)协调 - 每个 History 进程启动时调用
historyEngine.Start()(service/history/history_engine.go:288),为每个所属 Shard 启动队列处理器
Shard 核心元数据:
| 字段 | 说明 |
|---|---|
RangeID |
单调递增的世代号,用于 fencing(防脑裂) |
| 队列状态 | 各内部队列的已确认/已处理位置(ack level) |
在 Cassandra 中,Shard 对应 executions 表的一个分区(schema/cassandra/temporal/schema.cql:52)。
Shard 所有权的获取流程(源码跟踪):
History Service 启动└─ shard.NewController() # service/history/shard/controller.go└─ controller.Start() # 加入 Ringpop 环└─ ringpop.Lookup(key) # 一致性哈希确定 owner└─ AcquireShard() # 用 RangeID 做乐观锁获取└─ historyEngine.Start() # 启动队列处理器
当 Shard 所有权丢失(如节点宕机),AcquireShard 返回 ShardOwnershipLostError,触发重新获取。
6.2 Mutable State
Mutable State 是对 Workflow 当前状态的汇总快照,虽然理论上可以从 History 重新计算,但为了提高性能而持久化缓存。定义在 service/history/workflow/mutable_state_impl.go:112。
包含的信息:
- 正在进行中的 Activity 列表(ActivityID、TaskQueue、超时时间)
- 活跃的 Timer(TimerID、过期时间)
- Child Workflow 状态
- 待处理的 Signal
- 版本信息(Worker 版本化兼容)
- Update 信息
// 核心接口定义(简化)
type MutableState interface {GetWorkflowState() enumspb.WorkflowExecutionStateGetActivityInfo(activityID string) (*persistencespb.ActivityInfo, bool)GetTimerInfo(timerID string) (*persistencespb.TimerInfo, bool)AddActivityTaskScheduledEvent(...) (*historypb.HistoryEvent, *persistencespb.ActivityInfo, error)AddTimerStartedEvent(...) (*historypb.HistoryEvent, *persistencespb.TimerInfo, error)IsWorkflowExecutionRunning() boolHasPendingTasks() bool
}
6.3 状态转换
状态转换是 History Service 的核心操作模式,对所有类型的输入使用统一的代码路径。
通用状态转换函数(核心入口):
// service/history/api/update_workflow_util.go:37
func GetAndUpdateWorkflowWithNew(ctx, shardContext, workflowConsistencyChecker, workflowKey, updateAction,
) error
封装了完整的"读锁 → 刷新 Mutable State → 执行转换 → 持久化 → 释放锁"流程。
状态转换的通用形式:
输入 ──▶ [原子事务]├── 新 Mutable State├── 新的 History Event(s)└── 新的 History Task(s)
可触发状态转换的四种输入:
- 来自用户应用的 RPC(Start / Signal / Update / Query / Cancel / Reset)
- 来自 Worker 的 RPC(WorkflowTask / ActivityTask 完成)
- 定时器触发(Timer Task 到期)
- 其他 Workflow 执行(StartChildWorkflow / SignalExternalWorkflow)
三层调用链:
gRPC Handler (service/history/api/*/api.go)└─ GetAndUpdateWorkflowWithNew()└─ updateAction (具体业务逻辑)└─ MutableState.Add*Event() → 创建 Event + 更新状态└─ TaskGenerator.*() → 创建 Transfer/Timer Task└─ UpdateWorkflowExecutionAsActive()└─ persistence.ExecutionManager.UpdateWorkflowExecution() → 事务提交
关键代码入口:
| 触发源 | 入口函数 | 文件 |
|---|---|---|
| 用户 Signal | Invoke |
service/history/api/signalworkflow/api.go:38 |
| Worker 完成 WFT | Invoke |
service/history/api/respondworkflowtaskcompleted/api.go:110 |
| Timer 触发 | executeUserTimerTimeoutTask |
service/history/timer_queue_active_task_executor.go:136 |
| Activity 超时 | executeActivityTimeoutTask |
service/history/timer_queue_active_task_executor.go |
6.4 队列处理
每个 History Shard 管理多个内部任务队列,通过 QueueProcessor 异步消费。所有队列处理器在 historyEngine.Start() 时启动(service/history/history_engine.go:303)。
队列类型:
| 队列 | 调度类型 | 用途 |
|---|---|---|
| Transfer Task Queue | 立即执行 | 向 Matching 发送 RPC(创建 Workflow/Activity Task) |
| Timer Task Queue | 定时执行 | Timer 到期触发状态转换 |
| Visibility Task Queue | 立即执行 | 更新可见性存储(Elasticsearch/SQL) |
| Archival Task Queue | 立即执行 | 归档历史到长期存储(S3/GCS) |
| Outbound Queue | 立即执行(隔离) | Nexus/Callback 出站 HTTP 请求(新增) |
Transfer Task 处理流程:
QueueProcessor 轮询读取 Persistence└─ Execute() 分发(transfer_queue_active_task_executor.go:114)├─ processActivityTask() → Matching.AddActivityTask()├─ processWorkflowTask() → Matching.AddWorkflowTask()└─ ...其他类型
Timer Task 类型:
| Timer 类型 | 触发后行为 |
|---|---|
UserTimerTimeout |
写 TimerFired 事件 + 创建 WFT Transfer Task |
ActivityTimeout |
写 ActivityTaskTimedOut 事件 |
WorkflowTaskTimeout |
写 WorkflowTaskTimedOut 事件 |
ActivityRetryTimer |
直接调 Matching 添加 Activity Task |
6.5 一致性保证
Temporal 在三个层次保证一致性:
-
Mutable State ↔ History Task —— 数据库事务
UpdateWorkflowExecutionAsActive()→persistence.ExecutionManager.UpdateWorkflowExecution()- SQL 用
BEGIN ... COMMIT,Cassandra 用LWT(Lightweight Transaction)
-
Mutable State ↔ History Events —— Mutable State 中记录
LastEventID- 每次持久化后刷新,不一致时重新从 Persistence 加载
-
History Service ↔ Matching Service —— Transactional Outbox 模式
- History Shard 写入 Transfer Task → QueueProcessor 异步读取 → RPC 调 Matching
- 即使 RPC 失败,QueueProcessor 重试直至成功
7. Matching Service
Matching Service 负责管理 Task Queue(Workflow/Activity/Nexus),处理来自 Frontend 转发的 Worker 长轮询请求,被 History Service 的 Transfer Queue Processor 调用(AddWorkflowTask/AddActivityTask)。
7.1 Task Queue 分区
Task Queue "my-queue"│┌───┴───┐│ Root │ (分区 0)└───┬───┘│┌────┴────┐│ │┌─┴─┐ ┌─┴─┐│ P1│ │ P2│ (子分区,默认 4 分区)└───┘ └───┘
| 属性 | 说明 |
|---|---|
| 默认分区数 | 4,可配置 |
| 分区所有权 | 可重新分配 |
| 负载/卸载 | 分区元数据和任务积压可按需加载/卸载 |
7.2 转发机制
Poller → Poll 分区 P1(空)├── 有任务 → 返回└── 无任务 → ForwardToParent → Root 分区Task → 写入分区 P1├── 有 Poller → 立即同步匹配└── 无 Poller → ForwardToParent → Root 分区
- 子分区空时,Poller/Task 转发到父分区,提高匹配效率
- 如果根分区被加载,强制加载该 Task Queue 的所有子分区
8. Frontend Service
Frontend 是用户和系统的唯一入口,暴露三类 gRPC 服务 + HTTP 端点。用户的所有 SDK 请求(Start/Signal/Query/Update)都先到达 Frontend,再由它路由到 History 或 Matching。
| 服务 | 接口 | 说明 |
|---|---|---|
WorkflowService |
workflowservice.v1 |
Start/Signal/Query/Update/Describe/Reset 等 |
OperatorService |
operatorservice.v1 |
Namespace CRUD、Nexus Endpoint 管理 |
AdminService |
adminservice.v1 |
跨集群复制、重新同步等运维操作 |
关键代码结构:
service/frontend/
├── handler.go # 主 WorkflowService gRPC handler(~2600 行)
├── namespace_handler.go # Namespace 管理逻辑
├── nexus_handler.go # Nexus HTTP handler
├── nexus_completion_http_handler.go # Nexus Callback 回调处理
├── http_api_server.go # HTTP API 服务器(gorilla/mux)
└── fx.go # DI 模块
请求处理流程(以 StartWorkflow 为例):
用户 gRPC 请求 → Frontend 验证(Namespace 存在性、请求校验)→ 一致性哈希选择 History Shard→ gRPC 调用 History Service→ 等待 Response 返回用户
Frontend 还通过 gorilla/mux 运行 HTTP 服务器(默认 7243 端口),提供 Nexus 端点:
POST /namespaces/{namespace}/task-queues/{tq}/nexus-services # 分发 Nexus 任务
POST /nexus/endpoints/{endpoint}/services # 通过 Endpoint 分发
POST /namespaces/{namespace}/nexus/callback # Nexus 回调
9. Worker Service 与内部 Worker
Worker Service 是 Temporal 服务器内部自带的 Temporal SDK Worker——它使用 Temporal Go SDK 在服务器进程中启动 Worker,执行系统级 Workflow 和 Activity。相当于"Temporal 运行在 Temporal 之上"。
9.1 架构定位
┌─────────────────────────────────────────────────┐
│ Temporal Server │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Frontend │ │ History │ │ Matching │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ Worker Service │ │
│ │ │ │
│ │ ┌──────────────────────────────────┐ │ │
│ │ │ SDK Worker (系统内部 Worker) │ │ │
│ │ │ ├── Replicator Workflow │ │ │
│ │ │ ├── Scanner Workflow │ │ │
│ │ │ ├── Archiver Workflow/Activity │ │ │
│ │ │ ├── Batcher Workflow │ │ │
│ │ │ └── ParentClosePolicy Workflow │ │ │
│ │ └──────────────────────────────────┘ │ │
│ └──────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘│ 通过 SDK 客户端调用自身 gRPC API▼(Frontend / History / Matching)
Worker Service 与用户 Worker 的关键区别:
| 对比 | Worker Service(内部) | 用户 Worker(外部) |
|---|---|---|
| 运行位置 | Temporal 服务器进程中 | 用户自己的进程中 |
| 执行的任务 | 系统级 Workflow(复制、扫描、归档) | 用户的业务 Workflow/Activity |
| 启动方式 | fx.Module 由服务器自动启动 |
用户手动启动 temporal worker start |
| SDK | Temporal Go SDK(go.temporal.io/sdk/worker) |
任意语言 SDK |
| 任务队列 | 系统内部 Task Queue(temporal-system) |
用户指定的 Task Queue |
9.2 源码结构
service/worker/
├── worker.go # workerManager:管理多个 SDK Worker 实例
├── service.go # Service:Worker Service 的 gRPC + 生命周期
├── fx.go # DI 模块
│
├── replicator/ # 跨集群复制
│ ├── replicator.go # Replicator:消费复制任务并应用到本集群
│ └── replication_message_processor.go
│
├── scanner/ # 后台扫描器
│ ├── scanner.go # Scanner:定时健康检查
│ ├── executions/ # 检查 Workflow 执行完整性
│ ├── build_ids/ # 清理过期 Build ID
│ ├── taskqueue/ # 检查 Task Queue 健康
│ ├── scheduleinvariants/ # Schedule 变更检测
│ └── history/ # 历史数据扫描
│
├── batcher/ # 批量操作
│ ├── workflow.go # 批量操作 Workflow 定义
│ └── activities.go # 批量操作具体执行逻辑
│
├── parentclosepolicy/ # 子 Workflow 关闭策略
├── addsearchattributes/ # 添加搜索属性
├── deletenamespace/ # Namespace 删除
├── scheduler/ # 内部 Schedule 管理
├── workflowdeployment/ # Worker Deployment 版本管理
├── migration/ # 数据迁移
└── common/ # 共享组件接口(WorkerComponent)
9.3 WorkerComponent 注册机制
每个系统级功能实现 WorkerComponent 接口,注册其 Workflow 和 Activity:
// service/worker/common/ - WorkerComponent 接口
type WorkerComponent interface {RegisterWorkflow(worker sdkworker.Worker)RegisterActivities(worker sdkworker.Worker)DedicatedWorkflowWorkerOptions() *DedicatedWorkerOptions // nil = 用默认 WorkerDedicatedActivityWorkerOptions() *DedicatedWorkerOptions
}
workerManager.Start() 遍历所有 WorkerComponent,注册到 SDK Worker:
func (wm *workerManager) Start() {// 创建默认 SDK Worker(监听 temporal-system Task Queue)defaultWorker := wm.sdkClientFactory.NewWorker(sdkClient, "temporal-system", ...)for _, wc := range wm.workerComponents {if needDedicatedWorker(wc) {dedicatedWorker := wm.sdkClientFactory.NewWorker(sdkClient, wc.TaskQueue(), ...)wc.RegisterWorkflow(dedicatedWorker)wm.workers = append(wm.workers, dedicatedWorker)} else {wc.RegisterWorkflow(defaultWorker)}wc.RegisterActivities(defaultWorker)}
}
9.4 各组件职责
| 组件 | 职责 | 运行方式 |
|---|---|---|
| Replicator | 从远程集群接收复制任务,应用到本集群(跨集群容灾的核心) | 持续运行的 Workflow |
| Scanner | 定时扫描系统健康状态——检测孤儿 Execution、清理过期 Build ID、校验 Task Queue、检查 Schedule 一致性 | 定时触发的 Workflow(cron: * * * * *) |
| Batcher | 执行批量操作(如批量给大量 Workflow 发 Signal、批量 Terminate) | 按需启动的 Workflow |
| Archiver | 将已完成 Workflow 的历史转移到 S3/GCS 等长期存储 | Worker 中注册的 Activity |
| ParentClosePolicy | 父 Workflow 关闭时,根据 Policy 自动处理子 Workflow | Activity |
| AddSearchAttributes | 向 Elasticsearch/SQL 中添加新的搜索属性字段 | Workflow |
| DeleteNamespace | 删除 Namespace 及其关联数据 | Workflow |
| Scheduler | 管理内部 Schedule 的触发和执行 | Workflow |
| WorkerDeployment | 管理 Worker 版本化的部署生命周期 | Workflow |
其中 Scanner 最有意思——它像数据库的 AutoVacuum,通过周期性 Workflow 自我修复:
Scanner Workflow(每小时触发)├─ Executions Scan: 检查是否有丢失的 Workflow 执行记录├─ Build IDs Scan: 清理不再使用的 Worker Build ID 版本├─ Task Queue Scan: 检测积压或异常的任务队列└─ Schedule Invariants: 检查 Schedule 元数据一致性
9.5 为什么需要内部 SDK Worker
Temporal 的 Worker Service 选择用自己 SDK 启动内部 Worker(而非直接函数调用),原因:
- 利用自身能力:系统级任务也需要持久化、重试、定时——正是 Temporal 自身的强项
- 统一编程模型:内部系统 Workflow 和用户 Workflow 用同一套 SDK API 编写
- 解耦与隔离:每个系统组件独立注册,可以单独配置自己的 Task Queue、重试策略
- 利用现有基础设施:系统 Workflow 也走完整的 Frontend → History → Matching 路径,获得一致的可观测性
10. 补充机制
10.1 任务调度框架
common/tasks/ 提供通用的 Go 协程调度框架,被 History 的 QueueProcessor 使用。所有 Scheduler 统一实现 Scheduler 接口,接收 Executable 任务,执行后回调 Ack/Nack。
common/tasks/
├── fifo_scheduler.go # FIFO 顺序调度
├── sequential_scheduler.go # 每任务队列保序
├── dynamic_worker_pool_scheduler.go # 动态按需创建/销毁 goroutine
├── group_by_scheduler.go # 按源 NS + 目标分组(Outbound 用)
├── rate_limited_scheduler.go # 限速
└── interleaved_weighted_round_robin.go # 加权轮询
10.2 CHASM 框架
CHASM(Coordinated Heterogeneous Application State Machines)是 Temporal 引入的新型框架(chasm/ 目录),将 Workflow 视为众多 ASM 中的一种。
| 概念 | 说明 |
|---|---|
| ASM | Application State Machine:注册的状态机类型 |
| Component | 定义了状态(Fields)和行为的类型 |
| Execution | ASM 的运行时实例 |
| Field | 框架管理的持久化状态容器(Field[T], Map[K,T], ParentPtr) |
已有 Library:workflow、scheduler(已迁移)、nexusoperation。CHASM 的意义在于用更轻量的 ASM 替代传统 Workflow 做简单状态机逻辑,直接在 History Service 内部运行。
10.3 重试与容错
三层重试架构:
Frontend gRPC Handler Retry (NewRetryableInterceptor, server-side)
Frontend gRPC Client Retry (NewRetryableClient, can switch node)
History gRPC Handler Retry (NewRetryableInterceptor, server-side)
错误体系(common/backoff/):
backoff.ThrottleRetryContext(ctx, operation, policy, isRetryable)
ServiceError interface { error; Status() *status.Status }// 关键错误类型
NotFound | Unavailable | NamespaceNotActive | ShardOwnershipLost | TaskAlreadyStarted
10.4 Speculative Workflow Task & Workflow Update
Speculative Workflow Task 是一种优化:对 Workflow Update 请求,服务器第一次尝试不写数据库,乐观假设成功。失败则像从未存在过一样丢弃。
三种 WFT 对比:
| 类型 | 持久化 | 失败处理 |
|---|---|---|
| Normal | 写 DB | 写 Failure 事件 → 重试 |
| Transient | 不写 S/S 事件(临时) | 增加尝试计数 |
| Speculative | 完全不写 DB | 直接丢弃 |
Workflow Update 是 "Signal + Query" 的结合体:可被 Workflow 拒绝且拒绝不产生事件,接受后 API 调用者可立即获知结果。依赖四个基础机制:Speculative WFT、Message Protocol、In-memory Timer Queue、effect Package。
10.5 Nexus RPC
Nexus 是 Temporal 的跨 Namespace、跨集群通信协议。核心组件:
| 组件 | 角色 |
|---|---|
| Nexus Endpoint Registry | 集群全局 Endpoint 注册表(UUID + 名称唯一) |
| Nexus Operations | Operation 生命周期状态机 |
| Callbacks | Workflow 完成时的回调机制 |
| Outbound Queue | 出站 HTTP 请求队列(Circuit Breaker + 分组隔离) |
11. Temporal Web UI
独立项目:
temporalio/web,不属于temporalio/temporal仓库,通过 gRPC-web 调用 Frontend Service 的 API。
Temporal Web UI 是一个用 TypeScript + React 编写的调试和观测工具。temporal server start-dev 启动后默认监听 http://localhost:8233。
11.1 数据获取架构
浏览器 (React) Temporal Server│ ││ HTTP/gRPC-web ││──────────────── 轮询 ───────────────────▶│ Frontend Service│◀─────────────── 响应 ────────────────────│ ││ │ ├── History → History Events│ │ ├── Describe → Mutable State│ │ ├── Query → Worker 栈追踪│ │ └── List → Visibility Store
Web UI 没有 WebSocket 或任何推送通道,所有数据通过定时轮询获取。
11.2 调用的 API
| 页面/功能 | gRPC API | 数据来源 | 刷新方式 |
|---|---|---|---|
| Workflow 列表 | ListWorkflowExecutions |
Visibility 存储(ES/SQL) | 每 5-10s 自动轮询 |
| Workflow 计数 | CountWorkflowExecutions |
Visibility 存储 | 同上 |
| 详情-基本面板 | DescribeWorkflowExecution |
Mutable State(History Service) | 手动/自动刷新 |
| 详情-事件时间线 | GetWorkflowExecutionHistory |
History Event 存储 | 每 5s 轮询 |
| 详情-栈追踪 | QueryWorkflow("__stack_trace") |
Worker 运行时 | 手动点击触发 |
| 操作(Signal/Terminate/Reset) | 对应 Mutate API | History Service | 操作完成自动刷新 |
11.3 Workflow 列表
┌─────────────────────────────────────────────────────────────┐
│ temporal │ Namespace: default │ Status: Running ▼ │
├─────────────────────────────────────────────────────────────┤
│ Workflow ID │ Type │ Status │ Start Time │
│───────────────────┼───────────────┼─────────┼───────────────│
│ rental-car-abc │ BookingWf │ Running│ 2 min ago │
│ order-xyz-789 │ OrderWf │ Compld │ 5 min ago │
│ payment-failed │ PaymentWf │ Failed │ 10 min ago │
└─────────────────────────────────────────────────────────────┘
11.4 Workflow 详情页
基本信息面板:
┌──────────────────────────────────────────┐
│ Status: ● Running │
│ Workflow ID: rental-car-abc123 │
│ Run ID: 1111-2222-3333-4444 │
│ Type: BookingWorkflow │
│ Task Queue: booking-tasks │
│ Start Time: 2026-06-18 10:00:00 │
│ Execution Time: 2m 34s │
│ Attempt: 1 (max 5) │
├──────────────────────────────────────────┤
│ [ Signal ] [ Terminate ] [ Reset ] │
│ [ Stack Trace ] │
└──────────────────────────────────────────┘
History Event 时间线(核心功能): 每行一个事件,按 ID 递增排列。点击任一行展开显示完整 Protobuf JSON,包含 Payload 数据。对 AI Agent 场景,可直接看到每次 LLM 调用的请求/响应 Payload。
Pending Activities 面板:
Activity ID │ Type │ Status │ Attempt │ Timeout
─────────────┼─────────────────┼─────────────┼──────────┼─────────5 │ ChargeCard │ Started │ 1 │ 50s left
11.5 Schedule 查看
Schedule 触发日历和执行记录,显示下次触发时间、历史完成/失败记录。
11.6 操作能力
| 操作 | 对应 API |
|---|---|
| Signal | SignalWorkflowExecution |
| Terminate | TerminateWorkflowExecution |
| Reset | ResetWorkflowExecution |
| Cancel | RequestCancelWorkflowExecution |
所有操作调用 Frontend Service 的 gRPC API,与 SDK 调用没有区别。
11.7 与任务看板的区别
| 维度 | Temporal Web UI | 任务看板(Jira/Trello) |
|---|---|---|
| 定位 | 调试 & 可观测性 | 任务分配 & 进度管理 |
| 数据源 | Visibility + History Events | 用户自定义字段 |
| 操作 | Signal / Terminate / Reset | 拖拽卡片、编辑状态 |
| 状态 | 固定的事件类型链 | 自定义阶段列 |
| 刷新 | 定时轮询(5-10s) | 实时或近实时 |
| 受众 | 开发者 | 团队成员 |
12. Temporal + AI Agent:持久化执行平台驱动智能代理
Temporal 的持久化执行模型天然适合 AI Agent 的编排。Agent 的典型运行模式——多步推理、工具调用、LLM 回退重试、人工审批循环——与 Temporal Workflow 的容错、状态持久化、重试机制高度匹配。
12.1 为什么 Temporal 适合 Agent 编排
| Agent 痛点 | Temporal 解决方式 |
|---|---|
| LLM API 不稳定(超时、限速、返回异常) | 自动重试 + Backoff 策略,Activity 级别的重试配置 |
| Agent 运行几分钟到几小时,中途崩溃丢状态 | Durable Execution,状态由事件溯源持久化,崩溃后自动恢复 |
| 多步推理需要中间状态持久化 | Mutable State 自动维护,无需手动保存/恢复上下文 |
| 工具调用结果不可靠 | 每个工具调用作为 Activity,独立重试、超时控制 |
| 需要人工介入审批/验证 | Workflow 天然支持 Signal/Update 式 Human-in-the-Loop |
| 多 Agent 协作复杂 | Workflow 编排子 Workflow,确定性的编排逻辑保证一致性 |
| 调试困难、不可观测 | Temporal Web UI 可查看每一步的输入输出、重放历史 |
12.2 架构模式:Agent-as-Workflow
┌──────────────────────────────────────────────────────┐
│ Temporal Cluster │
│ │
│ ┌───────────────── Workflow ──────────────────────┐ │
│ │ Agent Workflow (每个 Agent 实例是一个 Workflow) │ │
│ │ │ │
│ │ Step 1: 接收用户请求 (Signal/Update/Start) │ │
│ │ ↓ │ │
│ │ Step 2: LLM 推理 (Activity: 调用 OpenAI/Gemini) │ │
│ │ ↓ (自动重试 3次, 指数退避, 应对 429) │ │
│ │ Step 3: 工具调用循环 (Activity Loop) │ │
│ │ ├── 工具 A: 搜索 (Activity) │ │
│ │ ├── 工具 B: 读取文件 (Activity) │ │
│ │ └── 工具 C: 执行代码 (Activity) │ │
│ │ ↓ │ │
│ │ Step 4: 人工审批 (Signal/Update, 等待 Human) │ │
│ │ ↓ │ │
│ │ Step 5: 返回最终结果 (Complete) │ │
│ └───────────────────────────────────────────────────┘ │
│ │
│ ┌───────────── Worker ──────────────┐ │
│ │ Activity 执行 LLM 调用 + 工具 │ │
│ └───────────────────────────────────┘ │
└──────────────────────────────────────────────────────┘
关键代码模式(Python SDK 示意):
from temporalio import workflow, activity@activity.defn
async def llm_infer(prompt: str) -> str:return await openai_client.chat.completions.create(...)@activity.defn
async def call_tool(tool: str, args: dict) -> str:return await execute_tool(tool, args)@workflow.defn
class AgentWorkflow:@workflow.runasync def run(self, user_request: str) -> str:result = await workflow.execute_activity(llm_infer, user_request,schedule_to_close_timeout=timedelta(minutes=2),retry_policy=RetryPolicy(maximum_attempts=3))while need_tool_call(result):tool, args = parse_tool_call(result)tool_result = await workflow.execute_activity(call_tool, tool, args,start_to_close_timeout=timedelta(seconds=30))result = await workflow.execute_activity(llm_infer, f"{result}\nTool Result: {tool_result}")await workflow.wait_condition(lambda: self.approved,timeout=timedelta(hours=24))return result
12.3 主流 Agent 框架集成生态
| 框架/平台 | 集成方式 | 说明 |
|---|---|---|
| OpenAI Agents SDK | 官方集成 | Temporal 作为 Durable Execution 后端,自动保存 Agent 状态 |
| LangChain / LangGraph | 社区 + 官方 | LangGraph Checkpoint 可被 Temporal 替代 |
| Google ADK | 官方集成 | Google Agent Development Kit 原生集成 Temporal |
| Mastra | 官方指南 | Mastra 框架提供 Temporal 部署指南 |
| CrewAI | 社区集成 | Crew/Agent → Workflow/Activity |
| AutoGen | 社区集成 | 多 Agent 对话用 Workflow 编排 |
| MCP (Model Context Protocol) | Temporal 官方 | 构建 Durable MCP Server |
OpenAI Agents SDK + Temporal(2025年7月发布,Public Preview):
openai-agents-sdk└── Temporal Durable Execution Runner├── Agent Run → Workflow├── Handoff → Child Workflow├── Tool Call → Activity(自动重试 + 超时)└── Guardrail → Workflow 检查点
12.4 业界成功实践
Replit Agent
| 维度 | 详情 |
|---|---|
| 规模 | 数百万次 Agent 运行/天 |
| 迁移 | 2024 年底,2 周内完成迁移 |
| 架构 | 每个 Agent 实例 = 一个 Temporal Workflow |
| 效果 | 零重大事故 |
核心场景:Agent 控制平面(Workflow ID 唯一性)、Agent 生命周期编排、Human-in-the-Loop 确认弹窗、多产品复用。
"Temporal gives us a lot more confidence to build the product and know that it's not going to have lots of edge cases." —— Connor Brewster, Lead Engineer, Replit
Retool Agents
仅 10 个工程师支撑 Agent + 基础设施,几个月内上线,每天数千次 Agent 运行。
"Without Temporal, we probably would've missed the deadline... or we would've had to hire up a big team." —— Lizzie Siegrist, Product Manager, Retool
Gorgias
"All LLM use cases are workflows." —— Romain Niveau, Senior Engineering Manager, Gorgias
12.5 Temporal AI Cookbook
官方开源 AI Cookbook 提供完整示例:Hello World (LiteLLM)、Agentic Loop with Tool Calling、Tool Calling Agent、Durable MCP Server。
12.6 与 Hermes Agent 的类比
Hermes Agent (本对话的 CLI Agent) Temporal Agent 模型
────────────────────────────── ─────────────────
单条消息执行(无状态) Workflow(有状态持久化)
异步任务委托(delegate_task) Child Workflow / Activity
Memory 存储事实 Mutable State + History Events
技能/Skill 复用 Activity 定义复用
会话搜索(session_search) Temporal Web UI 历史查询
手动重试(用户介入) 自动 Retry Policy
对于需要长期运行、多步协调、容错恢复的 Agent 场景,Temporal 提供的是 Agent 框架之下的持久化编排底座——不替代 Agent 框架,而是保证任何 Agent 框架运行时的可靠性。
13. 总结
13.1 架构模式对照表
| 模式 | Temporal 实现 |
|---|---|
| 事件溯源 | Workflow 执行历史作为不可变事件序列 |
| CQRS | 读写分离——历史写入 vs 查询 |
| Transactional Outbox | Transfer Task 队列确保与 Matching 的一致 |
| 分片 | History Shard + Ringpop 协调 |
| Fencing | RangeID 单调递增防脑裂 |
| 状态机 | Update、Nexus Operation、Scheduler 均用状态机驱动 |
| 重试 + Backoff | 三层重试体系(Handler + Client + Handler) |
| 熔断器 | Outbound Queue 的 gobreaker Circuit Breaker |
| Saga | Workflow 的补偿逻辑 |
| 依赖注入 | uber-go/fx 管理模块依赖 |
13.2 关键技术指标
| 指标 | 说明 |
|---|---|
| 编程语言 | Go(服务器)+ 多语言 SDK |
| 持久化后端 | Cassandra、MySQL、PostgreSQL、SQLite |
| 服务间通信 | gRPC + Protobuf |
| HTTP 端点 | gorilla/mux(Nexus/Callback) |
| 配置系统 | 静态 YAML + 动态配置 |
| 集群协调 | Ringpop |
| 追踪/指标 | OpenTelemetry + Prometheus |
| 框架 | uber-go/fx 依赖注入 |
| 代码规模 | ~80 万行 Go(含 api/ 生成代码) |
13.3 演进趋势
- CHASM 化:Scheduler 已迁移到 CHASM,Workflow/NexusOperation 也在迁移,逐步替代传统的直接状态管理方式
- 组件化:Nexus Operations 和 Callbacks 以组件形式存在,取代直接嵌入 History Service
- Speculative 执行:零写入拒绝模式从 Update 扩展到更多场景,减少不必要的持久化
- Nexus 生态:跨集群、跨 Namespace 通信能力,使 Temporal 连接更广泛的服务网格
- Worker Command:服务器通过 Nexus 主动向 Worker 推送指令(取消活动等),不再依赖轮询
- AI Agent 底座:与 OpenAI、Google ADK、Mastra 等 Agent 框架集成,成为 Agent 持久化编排的标准层
本文基于 temporalio/temporal 仓库源码和官方架构文档撰写
仓库版本:main 分支(commit 截至 2026-06-18)
