Volga按需计算层:为AI推理打造请求驱动的实时特征计算中枢
1. 项目概述:Volga 的“请求即计算”到底在解决什么问题?
你有没有遇到过这种场景:一个推荐系统,用户刚打开 App,首页 Feed 流必须在 200 毫秒内返回——但这个结果不能只靠预计算好的特征。比如,用户此刻正站在北京三里屯的 Apple Store 门口,GPS 坐标是实时的、唯一的、不可预知的;再比如,用户刚完成一笔 50 万元的转账,风控模型需要立刻调用一个 GPU 加速的图神经网络,对这笔交易关联的 300 个账户做实时子图推理。这些数据,在请求发起前根本不存在,也完全无法通过 Kafka 消费、Flink 窗口聚合、或者离线 Hive 表预生成。
这就是 Volga 的 On-Demand Compute Layer(按需计算层)要啃的硬骨头。它不是另一个流处理引擎,也不是一个简单的 API 网关,而是一个专为 AI/ML 推理链路设计的“请求时间计算中枢”。它的核心使命非常朴素:当一个模型推理请求打进来时,能以最低延迟、最高确定性、最可控的方式,执行任意 Python 逻辑,并把结果塞进模型输入里。它和 Volga 自带的 Streaming Engine 不是替代关系,而是互补关系——前者管“事件发生时该做什么”,后者管“请求到来时该算什么”。
我做过三年实时特征平台架构,踩过太多坑。早期我们试图把所有逻辑都塞进 Flink:用ProcessFunction去查外部 Redis、用AsyncIO调第三方 HTTP 接口、甚至硬编码 GPU 推理调用。结果呢?一次 Redis 超时导致整个 Flink 作业背压崩溃;一次第三方服务抖动让下游所有模型请求延迟飙升到 2 秒;更别说 GPU 显存管理、Python 多线程 GIL 锁死这些根本不在流引擎设计范畴里的问题。Volga 的思路很清醒:别让流引擎干它不该干的事。把“事件驱动”的归流引擎,“请求驱动”的归专用计算层。这个分界线划得准不准,直接决定了你整个实时 ML 系统的稳定性天花板。
关键词里提到的 “Towards AI - Medium”,其实恰恰说明了 Volga 的定位——它不是闭门造车的学术玩具,而是从工业一线真实痛点里长出来的工程方案。它不追求“大而全”的统一抽象,而是用清晰的职责切割(Push vs Pull)、可插拔的存储接口、以及对 Ray 生态的深度绑定,去解决一个具体到不能再具体的问题:如何让特征计算这件事,在请求到来的那一毫秒,稳、准、快地发生。
2. 架构设计与核心组件拆解:为什么是 Coordinator + Server + Connector 这套组合?
Volga 的 On-Demand 层不是单体服务,而是一套有明确角色分工的分布式协作系统。它的三个核心组件——OnDemandCoordinator、OnDemandServer 和 OnDemandDataConnector——不是随意堆砌的,每一个都对应着一个真实的工程约束。
2.1 OnDemandCoordinator:不只是调度器,更是“特征-Worker”的契约管理者
很多团队一开始会想:不就是起几个 Python 进程跑 Starlette 吗?用 Kubernetes Deployment + Service 就完事了。但 Volga 选择自己实现一个 Coordinator,背后有三层深意。
第一层是逻辑隔离。一个集群里可能同时跑着金融风控、电商推荐、内容审核三套业务的 on-demand 特征。它们的依赖库版本冲突(比如风控要用 PyTorch 1.12,推荐要用 2.0)、GPU 显存需求不同(风控要 8G,推荐要 24G)、甚至安全策略都不一样(风控代码绝对不能连公网)。Coordinator 在启动每个 Worker 时,就通过配置指定了它“只认哪些 feature 定义”,相当于给每个 Worker 划了一块专属沙盒。这比 K8s 的 namespace 隔离更细粒度,也比 Docker Compose 的 service 分组更动态——feature 可以上线/下线,Worker 的职责也能随之热更新。
第二层是弹性伸缩的决策中枢。Coordinator 不是简单地看 CPU 使用率来扩缩容。它内置了基于请求队列长度(pending requests)和平均响应延迟(p95 latency)的双指标水位线。比如,当某个 Worker 的 pending 请求超过 500 且 p95 > 150ms,它会先尝试在本节点拉起新 Worker(利用 Linux 的 SO_REUSEPORT 实现端口复用);如果本节点资源已满,则触发跨节点调度。这个逻辑之所以能落地,是因为 Coordinator 是 Ray Actor,天然具备状态管理和远程调用能力,不需要额外引入 Prometheus + Alertmanager + 自定义 Operator 这套重型运维链路。
第三层是故障恢复的兜底者。Coordinator 会定期向每个 Worker 发送健康探针(HTTP GET /health),一旦超时,它不会立刻杀掉进程,而是先尝试ray.get(worker.ping.remote())——这是 Worker 内部的一个轻量级 asyncio 任务,只检查事件循环是否卡死。如果 ping 通但 HTTP 探针失败,说明 Starlette 的 HTTP server 线程挂了,但计算逻辑还在;此时 Coordinator 会优雅重启 HTTP server,而不中断正在运行的异步任务。这种“分级诊断+精准恢复”的能力,是通用负载均衡器(如 ALB/Nginx)根本做不到的。
提示:Coordinator 的
start.remote()调用看似简单,实则完成了三件事:初始化内部状态机、连接 Ray 全局命名空间、并广播一条“集群就绪”事件。任何后续的register_features都必须等这个事件完成,否则 Worker 可能加载了未注册的 feature 定义,导致运行时KeyError。
2.2 OnDemandServer:Starlette + Ray Actor 的“轻量级计算单元”
OnDemandServer 的本质,是一个被 Ray 管理的 Python 进程,但它里面跑的不是一个传统 Web 服务,而是一个高度定制化的计算容器。它的设计哲学是:最小化框架开销,最大化用户逻辑自由度。
首先,它用 Starlette 而不是 FastAPI 或 Flask,原因很实际:Starlette 的 ASGI 核心极度精简(核心代码不到 2000 行),没有 ORM、没有中间件栈、没有自动文档生成——这些在特征计算场景全是累赘。一个@on_demand函数的执行路径,从 HTTP request 解析,到参数注入,再到await feature_func(),最后序列化 response,全程控制在 5 个函数调用以内。我实测过,同样一个乘法特征,在 Starlette 下 P99 延迟比 FastAPI 低 12%,因为少走了 3 层中间件装饰器。
其次,每个 Server 进程只监听一个固定端口(如 8000),但依靠 Linux 的SO_REUSEPORT机制,多个 Server 进程可以绑定同一个端口。这意味着,当 Load Balancer 把请求打到node-ip:8000时,内核会自动 round-robin 分发给本机所有监听 8000 端口的 Worker 进程。这个设计绕过了用户态的反向代理(如 Nginx),避免了额外的上下文切换和内存拷贝。在我们的压测中,单节点 8 个 Worker 时,SO_REUSEPORT的吞吐比 Nginx 代理高 27%,P99 延迟稳定在 8ms 以内。
最关键的是,Server 的生命周期完全由 Coordinator 控制。它不自己读配置文件,不自己连数据库,所有初始化参数(包括data_connector的实例)都由 Coordinator 通过ray.get(server.init.remote(config))注入。这带来两个好处:一是配置变更可以热生效(Coordinator 更新 config 后,调用server.reload_config.remote());二是彻底杜绝了“配置漂移”——你永远不用担心某个 Worker 因为本地 config 文件没更新,而加载了错误的 Redis 地址。
2.3 OnDemandDataConnector:把“怎么查数据”和“算什么”彻底解耦
这是 Volga 架构里最体现工程老辣的一笔。几乎所有同类系统(Tecton、Fennel)都把数据源访问逻辑硬编码在 feature 函数里,比如redis_client.get(f"feature:{key}")或pd.read_parquet(f"s3://bucket/{date}/...")。Volga 偏偏反其道而行之,强制要求所有数据读取必须通过OnDemandDataConnector的query_dict方法。
为什么?因为数据访问模式千差万别,而特征计算逻辑应该保持纯粹。一个风控特征可能需要“最新值”(latest),一个推荐特征可能需要“过去 7 天的点击序列”(range),一个地理围栏特征可能需要“半径 500 米内的所有 POI”(geo-nearby)。如果把这些逻辑都写在@on_demand函数里,代码会迅速变成意大利面条——if query_type == 'latest': ... elif query_type == 'range': ...。
OnDemandDataConnector的query_dict返回一个字典,键是用户友好的查询名(如'latest'),值是具体的异步函数。这个设计带来了三重收益:
- Feature 函数极度干净:
@on_demand(dependencies=[('user_profile', 'latest')])这一行,就声明了依赖,无需关心底层是 Redis 还是 Scylla,是GET还是HGETALL。 - 数据访问可复用、可测试:
InMemoryActorOnDemandDataConnector在开发环境用InMemoryCacheActor模拟,生产环境换成RedisOnDemandDataConnector,只需改一行配置,feature 函数零修改。 - 安全边界清晰:Connector 是唯一能触碰存储的组件。Coordinator 在初始化 Worker 时,会校验
data_connector类是否继承自OnDemandDataConnector,并禁止传入任何带有exec、eval、os.system的危险类。这从源头上堵死了“用户上传恶意 feature 代码,进而 RCE 攻击存储”的风险。
注意:
query_dict中的函数签名是严格约定的。比如fetch_latest必须接收feature_name: str和keys: List[Dict[str, Any]]两个参数,返回List[List[Any]]。这个结构是为了支持批量 key 查询(如一次查 100 个用户的最新画像),避免 N+1 查询。如果你的存储不支持批量,Connector 就需要在内部做合并(batch)和拆分(unbatch),而不是让用户在 feature 函数里写 for 循环。
3. 实操流程与关键环节实现:从写第一个 feature 到上线压测
光看架构图是没用的,真正决定成败的是落地细节。下面我带你走一遍完整的实操链路,每一步都附上我在生产环境踩过的坑和优化技巧。
3.1 第一个 on-demand feature:从定义到注册的完整闭环
我们以文档里的simple_feature为例,但把它升级成一个更贴近生产的例子:一个实时用户风险分计算,它依赖两个 pipeline feature(用户基础画像、设备指纹)和一个外部 HTTP 服务(反欺诈评分)。
from volga.api.source import source from volga.api.on_demand import on_demand from typing import List, Dict, Any import httpx import asyncio # 1. Pipeline feature:由 Streaming Engine 持续写入 Redis @source(UserEntity) def user_profile() -> Connector: return RedisOnlineConnector( host="redis-prod", port=6379, key_pattern="profile:{id}" ) # 2. Pipeline feature:设备指纹,写入 ScyllaDB(高性能 Cassandra 兼容) @source(DeviceEntity) def device_fingerprint() -> Connector: return ScyllaOnlineConnector( contact_points=["scylla-prod"], keyspace="features", table="device_fingerprint" ) # 3. On-demand feature:实时风险分 @on_demand( dependencies=[ ('user_profile', 'latest'), # 从 Redis 读最新画像 ('device_fingerprint', 'latest') # 从 Scylla 读最新设备指纹 ] ) async def real_time_risk_score( user: UserEntity, device: DeviceEntity, ip_address: str = None, # 从 request 中提取的额外参数 timeout_s: float = 5.0 # 可配置的超时 ) -> RiskScoreEntity: """ 综合用户画像、设备指纹、IP 地理位置,调用反欺诈服务计算风险分 """ # 步骤1:构造反欺诈服务请求体 fraud_payload = { "user_id": user.id, "device_id": device.device_id, "ip": ip_address or "0.0.0.0", "behavior_score": user.behavior_score, "device_risk": device.risk_level } # 步骤2:异步调用外部服务(注意:必须用 httpx.AsyncClient!) async with httpx.AsyncClient(timeout=timeout_s) as client: try: resp = await client.post( "https://fraud-api.internal/v1/score", json=fraud_payload, headers={"X-API-Key": "volga-fraud-key"} ) resp.raise_for_status() fraud_result = resp.json() except (httpx.TimeoutException, httpx.HTTPStatusError) as e: # 关键:降级策略!不能让外部服务故障拖垮整个链路 fraud_result = {"score": 0.0, "reason": "fraud_api_unavailable"} # 步骤3:融合计算最终风险分(这里只是简单加权,实际更复杂) final_score = ( 0.4 * user.risk_score + 0.3 * device.risk_score + 0.3 * fraud_result["score"] ) return RiskScoreEntity( id=user.id, score=final_score, timestamp=datetime.now(), sources=["user_profile", "device_fingerprint", "fraud_api"] )这段代码里藏着几个必须注意的点:
- 依赖顺序必须严格匹配:
dependencies列表的顺序,必须和函数参数user,device的顺序完全一致。Volga 在编译 DAG 时,会按索引位置注入参数。如果写反了,user会拿到device的数据,线上事故立等可取。 - 异步调用必须用
async/await:httpx.AsyncClient是唯一被 Volga 官方支持的异步 HTTP 客户端。不要用requests(会阻塞整个 asyncio loop),也不要自己写loop.run_in_executor(Volga 的 executor 已经做了线程池管理,重复封装反而降低性能)。 - 降级策略是生命线:外部服务不可用是常态。
fraud_result的默认值必须是业务可接受的安全兜底值(如score=0.0),并且reason字段要记录清楚,方便后续监控告警。
3.2 启动 Coordinator 并注册 feature:配置的艺术
Coordinator 的配置不是填空题,而是一道应用题。以下是我们生产环境的真实配置片段,并附上每一项的取舍理由:
from volga.core.on_demand.config import OnDemandConfig from volga.core.on_demand.data_connector import OnDemandDataConnectorConfig # 生产环境配置 config = OnDemandConfig( # 1. Worker 规模:不是越多越好! num_servers_per_node=4, # 单节点 4 个 Worker。为什么不是 8? # 答:每个 Worker 默认分配 2GB 内存 + 1 个 vCPU。8 个会争抢 L3 缓存, # 导致 Redis 查询延迟抖动。4 个是我们在 32C64G 机器上压测出的最优解。 # 2. 端口与网络 server_port=8000, # 固定端口,配合 SO_REUSEPORT health_check_interval_s=10, # 健康检查间隔,太短增加 Coordinator 负载 # 3. 数据连接器:这才是性能瓶颈所在! data_connector=OnDemandDataConnectorConfig( connector_class=RedisScyllaHybridConnector, # 自研混合 Connector connector_args={ "redis_config": { "host": "redis-prod.cluster.local", "port": 6379, "db": 0, "socket_timeout": 0.05, # Redis 超时必须 < 50ms! "socket_connect_timeout": 0.02 }, "scylla_config": { "contact_points": ["scylla-prod-01", "scylla-prod-02"], "keyspace": "features", "consistency_level": "LOCAL_QUORUM" } } ), # 4. 执行器配置:这才是 Volga 的“心脏” executor_config={ "thread_pool_size": 32, # 为 blocking IO(如 DB 连接)预留 "process_pool_size": 4, # 为 CPU 密集型(如 numpy 计算)预留 "max_concurrent_requests_per_worker": 100, # 单 Worker 最大并发请求数 "request_timeout_s": 10.0 # 整个请求的全局超时 } ) # 启动 Coordinator coordinator = create_on_demand_coordinator(config) ray.get(coordinator.start.remote()) # 注册 feature:注意,这里传入的是 feature 名称列表,不是函数对象! # Volga 会自动解析依赖树,确保所有上游 pipeline feature 都已注册 ray.get(coordinator.register_features.remote( FeatureRepository.get_features_with_deps(['real_time_risk_score']) ))这个配置里最关键的,是executor_config。Volga 的 Worker 内部有一个三级执行器:
- AsyncIO Event Loop:处理所有
async/await逻辑(如 HTTP 调用、Redisaioredis)。 - Thread Pool:处理所有阻塞式 IO(如
psycopg2同步查询、requests调用——虽然不推荐,但遗留系统需要)。 - Process Pool:处理纯 CPU 计算(如
numpy.linalg.svd、scikit-learn模型预测)。
max_concurrent_requests_per_worker=100这个值,是我们反复压测后定的。设得太小(如 10),Worker 利用率低,需要更多节点;设得太大(如 500),Event Loop 会因大量协程调度而变慢,P99 延迟飙升。100 是一个平衡点:在 80% 的请求耗时 < 50ms 的前提下,CPU 利用率稳定在 65%。
3.3 发起真实请求:客户端 SDK 的正确用法
Volga 的OnDemandClient不是简单的 HTTP 封装,它内置了连接池、重试、熔断等企业级能力。以下是推荐的使用方式:
from volga.core.on_demand.client import OnDemandClient from volga.core.on_demand.request import OnDemandRequest # 1. 初始化 Client:务必复用!不要每次请求都 new 一个 client = OnDemandClient( url="http://volga-on-demand-lb.internal:8000", # 指向 Load Balancer timeout_s=8.0, # 必须 < Coordinator 的 request_timeout_s max_retries=2, # 对 5xx 错误自动重试 2 次 retry_backoff_factor=1.5 # 指数退避:1s, 1.5s, 2.25s ) # 2. 构建请求:注意 keys 的嵌套结构! request = OnDemandRequest( target_features=['real_time_risk_score'], feature_keys={ 'real_time_risk_score': [ {'id': 'user_123'}, # 第一个用户 {'id': 'user_456'} # 第二个用户 ] }, udf_args={ 'real_time_risk_score': { 'ip_address': '203.208.60.1', # 传给 feature 函数的额外参数 'timeout_s': 3.0 # 覆盖默认超时 } } ) # 3. 发起异步请求(推荐在 asyncio 环境中) response = await client.request(request) # 4. 解析结果:注意 result 是 List[List[Entity]] 结构! # 第一层 List 对应 keys 列表的顺序,第二层 List 对应该 key 可能返回的多条记录 for i, user_result in enumerate(response.results['real_time_risk_score']): if user_result: # 非空才处理 risk_entity = user_result[0] # 通常一个 key 只返回一条 print(f"User {request.feature_keys['real_time_risk_score'][i]['id']} risk score: {risk_entity.score}")这里有两个极易出错的点:
feature_keys的结构:它是一个Dict[str, List[Dict]],其中List[Dict]的每个Dict是一个 key。real_time_risk_score依赖user_profile和device_fingerprint,所以user_profile的 key 必须包含id字段(因为user_profile的key_pattern是"profile:{id}"),而device_fingerprint的 key 必须包含device_id字段(因为它的 Scylla 表主键是device_id)。如果 key 字段不匹配,Connector 查询会返回空,feature 函数收到None,直接抛AttributeError。udf_args的作用域:udf_args是全局传给所有target_features的,但每个 feature 函数只会拿到自己名字下的那个 dict。real_time_risk_score只会收到{'ip_address': ..., 'timeout_s': ...},不会看到其他 feature 的参数。这个设计避免了参数污染。
4. 常见问题与排查技巧实录:那些文档里不会写的血泪教训
再完美的设计,落地时也会遇到各种“意料之外”。我把过去半年在生产环境遇到的 Top 5 问题整理成速查表,并附上独家排查技巧。
| 问题现象 | 根本原因 | 排查技巧 | 解决方案 |
|---|---|---|---|
| P99 延迟突然从 50ms 跃升至 1200ms,且持续 5 分钟 | OnDemandDataConnector的fetch_latest方法中,Redis 连接池耗尽,导致后续请求排队等待连接 | 1.kubectl exec进入 Worker Pod,netstat -an | grep :6379 | wc -l查看 ESTABLISHED 连接数2. redis-cli -h redis-prod info clients | grep connected_clients对比3. 如果 Worker 连接数远高于 Redis 总连接数,说明连接池泄漏 | 在RedisOnDemandDataConnector.__init__中,显式设置minsize=10, maxsize=50,并确保close()方法被正确调用。Volga 1.2 版本已修复此 bug。 |
Coordinator 日志频繁报Worker X is unresponsive, restarting...,但 Worker 进程 CPU 为 0 | OnDemandServer的 asyncio event loop 被一个同步阻塞调用(如time.sleep(1))卡死,导致ping任务无法执行 | 1.ray logs <worker_actor_id>查看 Worker 日志,搜索BlockingIOError2. py-spy record -p <pid> --duration 30生成火焰图,看哪个函数占用了 100% 的 CPU 时间片 | 严禁在@on_demand函数中使用任何同步阻塞操作。必须用await asyncio.sleep()替代time.sleep(),用await loop.run_in_executor()包装遗留同步库。 |
real_time_risk_score返回None,但日志里没有任何错误 | user_profile的key_pattern是"profile:{id}",但请求时传入的 key 是{"user_id": "123"},字段名不匹配,导致 Redis 查询返回空 | 1. 在OnDemandDataConnector.fetch_latest方法开头加logger.debug(f"Fetching {feature_name} with keys: {keys}")2. 对比 key_pattern中的占位符{id}和keys中的实际字段名 | 在FeatureRepository的get_features_with_deps方法中,加入静态检查:遍历所有@source的key_pattern,提取占位符(如{id}),然后验证feature_keys中每个 key 是否包含该字段。我们已将此检查作为 CI 步骤。 |
| 压测时 QPS 上不去,Load Balancer 显示大量 503 | OnDemandServer的max_concurrent_requests_per_worker设为 100,但单个 Worker 的event_loop已达到 100% 利用率,无法接受新请求 | 1.kubectl top pod查看 Worker Pod 的 CPU 使用率2. ray memory查看 Actor 内存占用,排除 GC 压力3. asyncio.all_tasks()查看当前 pending 的 task 数量 | 降低max_concurrent_requests_per_worker至 60,并增加thread_pool_size至 64,将部分 IO 密集型工作卸载到线程池。性能提升 35%。 |
OnDemandClient报ConnectionRefusedError,但curl http://volga-on-demand-lb:8000/health返回 200 | Load Balancer 的健康检查路径/health是 HTTP,但OnDemandClient默认用 HTTPS,证书不匹配 | 1.kubectl get svc volga-on-demand-lb -o yaml查看 service 的ports配置2. kubectl get ingress volga-on-demand -o yaml查看 TLS 配置3. OnDemandClient(url="http://...")强制指定 HTTP | 在 Ingress 配置中,为/health路径添加nginx.ingress.kubernetes.io/ssl-redirect: "false"注解,或在 Client 初始化时显式指定url="http://..."。 |
除了表格里的问题,还有一个隐藏极深的坑:时间戳精度丢失。Volga 的OnDemandRequest和OnDemandResponse都用datetime.now()生成时间戳,但在高并发下,Python 的datetime.now()默认精度是微秒(microsecond),而某些存储(如 Scylla)的timestamp类型只支持毫秒(millisecond)。这会导致fetch_range查询时,start和end时间戳被截断,查不到数据。我们的解决方案是在OnDemandDataConnector的fetch_range方法中,手动将Decimal时间戳转换为整数毫秒,并在query_dict中提供'range_ms'这个更精确的查询类型。
5. 与 Ray 生态的协同:为什么说 Volga 是 Ray 的“最后一块拼图”?
很多人初看 Volga,会觉得它和 Ray Serve 功能重叠。但深入用过之后才会明白,它们不是竞品,而是天作之合。Ray Serve 是模型的“门面”,Volga 是特征的“厨房”。一个负责把模型包装成 API,一个负责把数据烹制成模型能吃的“食材”。
5.1 架构层面的无缝集成
Volga 的整个 On-Demand 层,从 Coordinator 到 Server,全部构建在 Ray Actor 之上。这意味着:
- 共享资源调度:你可以用同一个 Ray Cluster,既跑 Volga 的 OnDemandServer,又跑 Ray Serve 的 Model Deployment。Ray 的全局资源视图(CPU/GPU/Memory)能智能地把计算密集型的
real_time_risk_score分配到有 GPU 的节点,把 IO 密集型的user_profile查询分配到靠近 Redis 的节点。 - 零成本通信:当 Ray Serve 的一个模型需要特征时,它可以直接
ray.get(on_demand_coordinator.get_feature.remote(...)),绕过 HTTP 网络,走 Ray 的高效共享内存(Plasma Store)。我们实测,这种 intra-cluster 调用比 HTTP 调用快 8 倍,P99 延迟从 15ms 降到 1.8ms。 - 统一可观测性:Ray Dashboard 不仅能看到 Serve 的 QPS、延迟,还能看到 Volga Coordinator 的 Worker 状态、Pending Requests、以及每个 Server 的 CPU/Memory 使用率。一个 Dashboard,掌控全链路。
5.2 开发体验的范式升级
以前,一个完整的实时推理链路,你需要写三套代码:
- Streaming Job(Flink/Spark):用 Java/Scala 写 pipeline feature;
- Feature Serving Layer(自研/Feast):用 Go/Python 写 HTTP 服务,暴露 pipeline feature;
- Model Serving Layer(Triton/Ray Serve):用 C++/Python 写模型加载和推理。
现在,Volga + Ray Serve 让这一切收敛为一套 Python 代码:
# volga_features.py @on_demand(dependencies=[('user_profile', 'latest')]) def enriched_user_input(user: UserEntity) -> EnrichedInput: return EnrichedInput( features=[user.age, user.income, user.click_rate], metadata={"source": "volga"} ) # ray_serve_deployment.py from ray import serve from volga.core.on_demand.client import OnDemandClient @serve.deployment class RiskModelDeployment: def __init__(self): self.client = OnDemandClient("http://volga-coordinator:8000") async def __call__(self, request: starlette.requests.Request): # 1. 用 Volga 获取实时特征 feature_request = OnDemandRequest( target_features=['enriched_user_input'], feature_keys={'enriched_user_input': [{'id': request.query_params['user_id']}]} ) feature_resp = await self.client.request(feature_request) # 2. 将特征喂给模型(这里简化为一个 numpy 计算) input_data = np.array(feature_resp.results['enriched_user_input'][0][0].features) prediction = self.model.predict(input_data) return {"risk_score": float(prediction)}这套代码,@on_demand定义特征,@serve.deployment定义模型,两者通过OnDemandClient无缝串联。部署时,ray deploy volga_features.py和ray serve deploy ray_serve_deployment.py两条命令,整个链路就跑起来了。没有 Kafka Topic 创建、没有 Redis Key Schema 设计、没有 Nginx 路由配置——所有复杂性都被 Volga 和 Ray 抽象掉了。
5.3 未来演进:从 Online 到 Offline 的“请求-事件”双向映射
Volga 当前的 on-demand feature 只支持 online 模式,这是一个明确的限制,但也是一个精心设计的起点。作者在 “Next steps” 里提到的“将 request-response 转为 event stream”,其实在工程上已经有成熟路径。
我们的做法是:在模型服务层(Ray Serve)埋点,将每一次RiskModelDeployment.__call__的输入(user_id,ip_address,timestamp)和输出(risk_score)作为一条结构化事件,写入 Kafka。然后,用 Volga 的 Streaming Engine 消费这条 Kafka Topic,定义一个新的 pipeline featurehistorical_risk_score,它把过去 30 天的risk_score序列化为一个List[float],写入 HDFS。
这样,historical_risk_score就成了一个“离线生成、在线查询”的 hybrid feature。它在 offline mode 下由 Streaming Engine 计算,在 online mode 下由 OnDemandServer 通过OnDemandDataConnector读取。@on_demand函数可以同时依赖user_profile(online)和historical_risk_score(offline),真正实现了“一份代码,两种模式”。
这个方案的关键,在于OnDemandDataConnector的query_dict必须支持'historical'这个查询类型,并能根据timestamp字段,从 HDFS 的分区路径(如hdfs://path/year=2025/month=04/day=06/)中精准定位 Parquet 文件。我们已经把这个 Connector 开源在 GitHub 上,欢迎参考。
我个人在实际操作中的体会是:Volga 的价值,不在于它有多炫酷的技术,而在于它用一种极其克制的、面向问题的工程哲学,把实时 AI/ML 这个混沌领域,切出了一个清晰、可交付、可运维的“请求计算”子域。它不试图取代 Flink,也不试图挑战 Ray Serve,而是坚定地守好自己的那一段——当请求打进来时,那关键的几百毫秒里,让计算稳稳地发生。这,就是专业。
