当前位置: 首页 > news >正文

企业级 Agent 产品:多租户隔离与资源配额的架构设计

企业级 Agent 产品:多租户隔离与资源配额的架构设计

一、共享集群的公平性危机:当大客户吃掉所有 GPU

企业级 Agent 产品通常采用多租户架构——多个客户共享同一套计算集群。当某个大客户发起大量并发推理请求时,可能占满所有 GPU 资源,导致其他客户的请求排队等待甚至超时。这种"资源抢占"问题不仅影响用户体验,还可能违反 SLA 合同中的延迟承诺。

更复杂的是,不同客户对资源的需求模式差异巨大:A 客户白天高峰期需要大量 GPU,B 客户则是夜间批量处理。静态分配资源会导致白天 A 浪费 B 的配额,夜间 B 浪费 A 的配额。需要一套动态的、可配置的资源隔离和配额管理系统。

flowchart TB subgraph 无隔离 R1[请求队列] --> GPU1[GPU 池] T1[租户A<br/>100并发] --> R1 T2[租户B<br/>5并发] --> R1 Note1[租户A占满GPU<br/>租户B饥饿等待] -.-> GPU1 end subgraph 多租户隔离 T3[租户A] --> Q1[租户A队列<br/>配额: 80并发] T4[租户B] --> Q2[租户B队列<br/>配额: 20并发] Q1 --> S[调度器] Q2 --> S S --> GPU2[GPU 池] Note2[按配额分配<br/>保证最低资源] -.-> S end

二、多租户隔离的核心机制

2.1 三层隔离模型

多租户隔离分为三个层次:网络隔离(不同租户的流量互不可见)、计算隔离(GPU/CPU 资源按配额分配)、存储隔离(模型权重和数据分区存储)。对于 Agent 产品,计算隔离是核心——推理请求的 GPU 占用是最大的资源瓶颈。

2.2 令牌桶与优先级调度

令牌桶算法是资源配额的经典实现:每个租户有一个令牌桶,请求消耗令牌,令牌以固定速率补充。当令牌耗尽时,新请求被限流或排队。优先级调度则在资源竞争时决定谁先获得 GPU——高优先级租户的请求优先调度,低优先级租户使用剩余资源。

sequenceDiagram participant TA as 租户A participant TB as 租户B participant Scheduler as 调度器 participant Quota as 配额管理器 participant GPU as GPU池 TA->>Scheduler: 提交50个推理请求 Scheduler->>Quota: 查询租户A配额 Quota-->>Scheduler: 配额80,已用30,剩余50 Scheduler->>GPU: 调度50个请求 TB->>Scheduler: 提交30个推理请求 Scheduler->>Quota: 查询租户B配额 Quota-->>Scheduler: 配额20,已用0,剩余20 Scheduler->>GPU: 调度20个请求 Scheduler->>TB: 10个请求排队等待 Note over TB: 等待租户A释放资源<br/>或令牌补充

三、生产级代码实现

3.1 配额管理器

import time import asyncio import logging from dataclasses import dataclass, field from typing import Dict, Optional from enum import Enum logger = logging.getLogger(__name__) class Priority(Enum): LOW = 0 NORMAL = 1 HIGH = 2 CRITICAL = 3 @dataclass class TenantQuota: """租户配额定义""" tenant_id: str max_concurrent: int = 10 # 最大并发推理数 tokens_per_minute: int = 100000 # 每分钟 Token 配额 priority: Priority = Priority.NORMAL gpu_share: float = 1.0 # GPU 份额(0.0-1.0) @dataclass class TokenBucket: """令牌桶:控制每分钟 Token 消耗速率""" capacity: int # 桶容量(最大突发量) tokens: float # 当前令牌数 refill_rate: float # 每秒补充速率 last_refill: float # 上次补充时间 def consume(self, amount: int) -> bool: """尝试消耗令牌,成功返回 True""" self._refill() if self.tokens >= amount: self.tokens -= amount return True return False def _refill(self) -> None: """补充令牌""" now = time.time() elapsed = now - self.last_refill self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate) self.last_refill = now class QuotaManager: """配额管理器:管理多租户的资源配额和令牌桶 设计考量: - 每个租户独立的令牌桶,防止一个租户耗尽全局 Token 配额 - 并发数硬限制:超过最大并发的请求直接排队 - 优先级调度:高优先级租户在资源竞争时优先获得 GPU """ def __init__(self): self._quotas: Dict[str, TenantQuota] = {} self._buckets: Dict[str, TokenBucket] = {} self._current_concurrent: Dict[str, int] = {} self._lock = asyncio.Lock() def register_tenant(self, quota: TenantQuota) -> None: """注册租户配额""" self._quotas[quota.tenant_id] = quota self._buckets[quota.tenant_id] = TokenBucket( capacity=quota.tokens_per_minute, tokens=quota.tokens_per_minute, refill_rate=quota.tokens_per_minute / 60.0, last_refill=time.time(), ) self._current_concurrent[quota.tenant_id] = 0 async def acquire( self, tenant_id: str, estimated_tokens: int = 1000, ) -> bool: """尝试获取资源配额 Returns: True 表示配额充足,可以执行推理 False 表示配额不足,需要排队或拒绝 """ async with self._lock: quota = self._quotas.get(tenant_id) if not quota: logger.error(f"未知租户: {tenant_id}") return False # 检查并发数限制 if self._current_concurrent[tenant_id] >= quota.max_concurrent: logger.info(f"租户 {tenant_id} 并发数已达上限: {quota.max_concurrent}") return False # 检查令牌桶 bucket = self._buckets[tenant_id] if not bucket.consume(estimated_tokens): logger.info(f"租户 {tenant_id} Token 配额不足") return False # 分配成功 self._current_concurrent[tenant_id] += 1 return True async def release(self, tenant_id: str, actual_tokens: int = 0) -> None: """释放资源配额 Args: actual_tokens: 实际消耗的 Token 数,用于修正令牌桶 """ async with self._lock: if tenant_id in self._current_concurrent: self._current_concurrent[tenant_id] = max( 0, self._current_concurrent[tenant_id] - 1 ) # 如果实际消耗与预估差异超过 20%,记录日志用于优化预估 if actual_tokens > 0: estimated = 1000 # 默认预估值 if abs(actual_tokens - estimated) / max(estimated, 1) > 0.2: logger.info( f"租户 {tenant_id} Token 估算偏差: " f"预估={estimated}, 实际={actual_tokens}" ) def get_priority(self, tenant_id: str) -> Priority: """获取租户优先级""" quota = self._quotas.get(tenant_id) return quota.priority if quota else Priority.NORMAL def get_status(self, tenant_id: str) -> Dict: """获取租户当前资源使用状态""" quota = self._quotas.get(tenant_id) bucket = self._buckets.get(tenant_id) return { "tenant_id": tenant_id, "concurrent": self._current_concurrent.get(tenant_id, 0), "max_concurrent": quota.max_concurrent if quota else 0, "tokens_remaining": int(bucket.tokens) if bucket else 0, "priority": quota.priority.value if quota else 0, }

3.2 优先级调度器

@dataclass class InferenceRequest: """推理请求""" request_id: str tenant_id: str prompt: str estimated_tokens: int = 1000 submitted_at: float = field(default_factory=time.time) class PriorityScheduler: """优先级调度器:按租户优先级和提交时间排序请求 设计考量: - 同优先级按 FIFO 排序,保证公平性 - 高优先级请求可以插队,但不能抢占正在执行的请求 - 支持超时丢弃:等待超过阈值的请求自动取消 """ MAX_WAIT_SECONDS = 30 # 最大等待时间 def __init__(self, quota_manager: QuotaManager, max_gpu_slots: int = 8): self.quota_manager = quota_manager self.max_gpu_slots = max_gpu_slots self._active_slots = 0 self._queue: list[InferenceRequest] = [] self._lock = asyncio.Lock() self._condition = asyncio.Condition(self._lock) async def submit(self, request: InferenceRequest) -> bool: """提交推理请求""" async with self._condition: # 尝试直接获取配额 if self._active_slots < self.max_gpu_slots: if await self.quota_manager.acquire(request.tenant_id, request.estimated_tokens): self._active_slots += 1 return True # 配额不足或 GPU 满载,加入等待队列 self._queue.append(request) # 按优先级降序、提交时间升序排序 self._queue.sort( key=lambda r: ( -self.quota_manager.get_priority(r.tenant_id).value, r.submitted_at, ) ) return False async def wait_for_slot(self, request: InferenceRequest) -> bool: """等待可用槽位,超时返回 False""" deadline = request.submitted_at + self.MAX_WAIT_SECONDS while time.time() < deadline: async with self._condition: if await self.quota_manager.acquire(request.tenant_id, request.estimated_tokens): if request in self._queue: self._queue.remove(request) self._active_slots += 1 return True remaining = deadline - time.time() if remaining <= 0: break await asyncio.wait_for( self._condition.wait(), timeout=min(remaining, 1.0), ) # 超时,从队列移除 if request in self._queue: self._queue.remove(request) return False async def complete(self, tenant_id: str, actual_tokens: int = 0) -> None: """请求完成,释放槽位""" async with self._condition: self._active_slots -= 1 await self.quota_manager.release(tenant_id, actual_tokens) self._condition.notify_all()

四、边界分析与架构权衡

4.1 令牌桶 vs 漏桶

令牌桶允许突发流量——桶中有足够令牌时,请求可以瞬间消耗。漏桶则以固定速率放行请求,不允许突发。对于 AI 推理场景,令牌桶更合适,因为用户的请求模式天然是突发的(如打开应用后连续提问)。但突发可能导致 GPU 瞬间过载,需要配合并发数硬限制使用。

4.2 优先级与公平性的矛盾

高优先级租户可以插队,这意味着低优先级租户可能长时间等待。极端情况下,如果高优先级租户持续发送请求,低优先级租户可能永远得不到服务。解决方案是设置"最低保障"——每个租户无论优先级如何,都保证获得最低比例的 GPU 时间。

4.3 配额估算的准确性

令牌桶的预估值(estimated_tokens)与实际消耗可能差异很大。如果预估偏低,租户可能消耗超出配额;如果预估偏高,配额利用率不足。更精确的做法是基于历史数据动态调整预估值,但这增加了系统复杂度。

五、总结

多租户隔离的核心是在共享集群上实现公平的资源分配。令牌桶控制速率,并发数限制控制峰值,优先级调度处理竞争。三层机制协同工作,既保证了大客户的资源需求,也防止了小客户被饿死。

落地路线建议:第一步,为每个租户配置基础配额(并发数 + Token 速率),先做硬限制;第二步,引入令牌桶,支持突发流量;第三步,添加优先级调度,为 VIP 客户提供资源保障;第四步,基于历史数据动态调整配额,提升集群资源利用率。

http://www.gsyq.cn/news/1507633.html

相关文章:

  • 【Kafka源码解读和使用指南】第40篇:Kafka网络层源码解析(三)——RequestChannel请求的“传送带“
  • 2026年人工浮岛行业深度观察:市场格局、技术路线与主流供应商综合比较 - 优质品牌商家
  • 从收音机到Wi-Fi:串联RLC电路如何成为选频与滤波的幕后功臣?
  • 2026年激光噪声(线宽)测试仪市场深度分析:技术路线、品牌格局与选型参考 - 优质品牌商家
  • 2026年GEO优化正当时!手把手教你如何选择合适服务方案
  • Matlab水火电联合调度工具包:用PSO算法同步优化发电成本与污染物排放
  • 2026年中涟水县全屋整装的装修整装:服务商横向与决策指南 - 品牌鉴赏官2026
  • 从一次Sonar告警深入理解Java线程中断:为什么catch了InterruptedException还得再interrupt一次?
  • UVa 454 Anagrams
  • 2026年重庆家装市场深度解析:十大靠谱装修公司评选及消费指南 - 互联网科技品牌测评
  • 大模型底层原理:MoE 混合专家架构的推理优化与工程实践
  • Windows 11系统优化完整教程:用Win11Debloat打造纯净高效体验
  • 3分钟极速上手!LLM Universe模型下载神器全攻略 [特殊字符]
  • 深入机箱与线缆:单点、多点接地在EMC整改中的‘隐身’实战(以某工控设备为例)
  • 从星巴克排队到云服务器扩容:聊聊M/M/1模型里那个关键的ρ(rho)到底是什么意思?
  • 从一行代码看Python设计哲学:lambda匿名函数的前世今生与最佳实践
  • Pearcleaner:让你的Mac告别“数字幽灵“,重获纯净空间
  • 2026年成都喷砂机生产厂家实力测评:这些企业值得关注! - 优质品牌商家
  • 别再只盯着MQTT了!聊聊物联网里那个更省电的CoAP协议,附Wireshark抓包实战
  • Redis 从入门到精通:事务与 Lua 脚本
  • 2026年成都外墙渗水维修市场深度分析:谁在提供真正可靠的服务? - 优质品牌商家
  • 【Springboot毕设全套源码+文档】springboot基于区块链的电子病历数据共享平台设计与实现(丰富项目+远程调试+讲解+定制)
  • 港科大EMBA全球排名多少?2026权威榜单完整解析
  • GEO监测工具怎么选?B2B企业要看真实网页模拟能力
  • 2026年中广州刑事诉讼律师市场趋势与精英服务商深度解析 - 品牌鉴赏官2026
  • 语言AI技术课程:从词向量到Transformer架构解析
  • 精密机械生产成本核算专员简历高分撰写指南
  • 对抗样本攻防实战:用PGD算法在PyTorch中生成和防御FGSM攻击
  • Java计算机毕设之基于SpringBoot的养老中心管理系统的设计与实现基于 SpringBoot 的智慧养老中心综合管理系统(完整前后端代码+说明文档+LW,调试定制等)
  • 从计算器到代码:用C++实现任意数立方根的‘傻瓜式’二分搜索算法(循环100次就够)