企业级 Agent 产品:多租户隔离与资源配额的架构设计
企业级 Agent 产品:多租户隔离与资源配额的架构设计
一、共享集群的公平性危机:当大客户吃掉所有 GPU
企业级 Agent 产品通常采用多租户架构——多个客户共享同一套计算集群。当某个大客户发起大量并发推理请求时,可能占满所有 GPU 资源,导致其他客户的请求排队等待甚至超时。这种"资源抢占"问题不仅影响用户体验,还可能违反 SLA 合同中的延迟承诺。
更复杂的是,不同客户对资源的需求模式差异巨大:A 客户白天高峰期需要大量 GPU,B 客户则是夜间批量处理。静态分配资源会导致白天 A 浪费 B 的配额,夜间 B 浪费 A 的配额。需要一套动态的、可配置的资源隔离和配额管理系统。
flowchart TB subgraph 无隔离 R1[请求队列] --> GPU1[GPU 池] T1[租户A<br/>100并发] --> R1 T2[租户B<br/>5并发] --> R1 Note1[租户A占满GPU<br/>租户B饥饿等待] -.-> GPU1 end subgraph 多租户隔离 T3[租户A] --> Q1[租户A队列<br/>配额: 80并发] T4[租户B] --> Q2[租户B队列<br/>配额: 20并发] Q1 --> S[调度器] Q2 --> S S --> GPU2[GPU 池] Note2[按配额分配<br/>保证最低资源] -.-> S end二、多租户隔离的核心机制
2.1 三层隔离模型
多租户隔离分为三个层次:网络隔离(不同租户的流量互不可见)、计算隔离(GPU/CPU 资源按配额分配)、存储隔离(模型权重和数据分区存储)。对于 Agent 产品,计算隔离是核心——推理请求的 GPU 占用是最大的资源瓶颈。
2.2 令牌桶与优先级调度
令牌桶算法是资源配额的经典实现:每个租户有一个令牌桶,请求消耗令牌,令牌以固定速率补充。当令牌耗尽时,新请求被限流或排队。优先级调度则在资源竞争时决定谁先获得 GPU——高优先级租户的请求优先调度,低优先级租户使用剩余资源。
sequenceDiagram participant TA as 租户A participant TB as 租户B participant Scheduler as 调度器 participant Quota as 配额管理器 participant GPU as GPU池 TA->>Scheduler: 提交50个推理请求 Scheduler->>Quota: 查询租户A配额 Quota-->>Scheduler: 配额80,已用30,剩余50 Scheduler->>GPU: 调度50个请求 TB->>Scheduler: 提交30个推理请求 Scheduler->>Quota: 查询租户B配额 Quota-->>Scheduler: 配额20,已用0,剩余20 Scheduler->>GPU: 调度20个请求 Scheduler->>TB: 10个请求排队等待 Note over TB: 等待租户A释放资源<br/>或令牌补充三、生产级代码实现
3.1 配额管理器
import time import asyncio import logging from dataclasses import dataclass, field from typing import Dict, Optional from enum import Enum logger = logging.getLogger(__name__) class Priority(Enum): LOW = 0 NORMAL = 1 HIGH = 2 CRITICAL = 3 @dataclass class TenantQuota: """租户配额定义""" tenant_id: str max_concurrent: int = 10 # 最大并发推理数 tokens_per_minute: int = 100000 # 每分钟 Token 配额 priority: Priority = Priority.NORMAL gpu_share: float = 1.0 # GPU 份额(0.0-1.0) @dataclass class TokenBucket: """令牌桶:控制每分钟 Token 消耗速率""" capacity: int # 桶容量(最大突发量) tokens: float # 当前令牌数 refill_rate: float # 每秒补充速率 last_refill: float # 上次补充时间 def consume(self, amount: int) -> bool: """尝试消耗令牌,成功返回 True""" self._refill() if self.tokens >= amount: self.tokens -= amount return True return False def _refill(self) -> None: """补充令牌""" now = time.time() elapsed = now - self.last_refill self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate) self.last_refill = now class QuotaManager: """配额管理器:管理多租户的资源配额和令牌桶 设计考量: - 每个租户独立的令牌桶,防止一个租户耗尽全局 Token 配额 - 并发数硬限制:超过最大并发的请求直接排队 - 优先级调度:高优先级租户在资源竞争时优先获得 GPU """ def __init__(self): self._quotas: Dict[str, TenantQuota] = {} self._buckets: Dict[str, TokenBucket] = {} self._current_concurrent: Dict[str, int] = {} self._lock = asyncio.Lock() def register_tenant(self, quota: TenantQuota) -> None: """注册租户配额""" self._quotas[quota.tenant_id] = quota self._buckets[quota.tenant_id] = TokenBucket( capacity=quota.tokens_per_minute, tokens=quota.tokens_per_minute, refill_rate=quota.tokens_per_minute / 60.0, last_refill=time.time(), ) self._current_concurrent[quota.tenant_id] = 0 async def acquire( self, tenant_id: str, estimated_tokens: int = 1000, ) -> bool: """尝试获取资源配额 Returns: True 表示配额充足,可以执行推理 False 表示配额不足,需要排队或拒绝 """ async with self._lock: quota = self._quotas.get(tenant_id) if not quota: logger.error(f"未知租户: {tenant_id}") return False # 检查并发数限制 if self._current_concurrent[tenant_id] >= quota.max_concurrent: logger.info(f"租户 {tenant_id} 并发数已达上限: {quota.max_concurrent}") return False # 检查令牌桶 bucket = self._buckets[tenant_id] if not bucket.consume(estimated_tokens): logger.info(f"租户 {tenant_id} Token 配额不足") return False # 分配成功 self._current_concurrent[tenant_id] += 1 return True async def release(self, tenant_id: str, actual_tokens: int = 0) -> None: """释放资源配额 Args: actual_tokens: 实际消耗的 Token 数,用于修正令牌桶 """ async with self._lock: if tenant_id in self._current_concurrent: self._current_concurrent[tenant_id] = max( 0, self._current_concurrent[tenant_id] - 1 ) # 如果实际消耗与预估差异超过 20%,记录日志用于优化预估 if actual_tokens > 0: estimated = 1000 # 默认预估值 if abs(actual_tokens - estimated) / max(estimated, 1) > 0.2: logger.info( f"租户 {tenant_id} Token 估算偏差: " f"预估={estimated}, 实际={actual_tokens}" ) def get_priority(self, tenant_id: str) -> Priority: """获取租户优先级""" quota = self._quotas.get(tenant_id) return quota.priority if quota else Priority.NORMAL def get_status(self, tenant_id: str) -> Dict: """获取租户当前资源使用状态""" quota = self._quotas.get(tenant_id) bucket = self._buckets.get(tenant_id) return { "tenant_id": tenant_id, "concurrent": self._current_concurrent.get(tenant_id, 0), "max_concurrent": quota.max_concurrent if quota else 0, "tokens_remaining": int(bucket.tokens) if bucket else 0, "priority": quota.priority.value if quota else 0, }3.2 优先级调度器
@dataclass class InferenceRequest: """推理请求""" request_id: str tenant_id: str prompt: str estimated_tokens: int = 1000 submitted_at: float = field(default_factory=time.time) class PriorityScheduler: """优先级调度器:按租户优先级和提交时间排序请求 设计考量: - 同优先级按 FIFO 排序,保证公平性 - 高优先级请求可以插队,但不能抢占正在执行的请求 - 支持超时丢弃:等待超过阈值的请求自动取消 """ MAX_WAIT_SECONDS = 30 # 最大等待时间 def __init__(self, quota_manager: QuotaManager, max_gpu_slots: int = 8): self.quota_manager = quota_manager self.max_gpu_slots = max_gpu_slots self._active_slots = 0 self._queue: list[InferenceRequest] = [] self._lock = asyncio.Lock() self._condition = asyncio.Condition(self._lock) async def submit(self, request: InferenceRequest) -> bool: """提交推理请求""" async with self._condition: # 尝试直接获取配额 if self._active_slots < self.max_gpu_slots: if await self.quota_manager.acquire(request.tenant_id, request.estimated_tokens): self._active_slots += 1 return True # 配额不足或 GPU 满载,加入等待队列 self._queue.append(request) # 按优先级降序、提交时间升序排序 self._queue.sort( key=lambda r: ( -self.quota_manager.get_priority(r.tenant_id).value, r.submitted_at, ) ) return False async def wait_for_slot(self, request: InferenceRequest) -> bool: """等待可用槽位,超时返回 False""" deadline = request.submitted_at + self.MAX_WAIT_SECONDS while time.time() < deadline: async with self._condition: if await self.quota_manager.acquire(request.tenant_id, request.estimated_tokens): if request in self._queue: self._queue.remove(request) self._active_slots += 1 return True remaining = deadline - time.time() if remaining <= 0: break await asyncio.wait_for( self._condition.wait(), timeout=min(remaining, 1.0), ) # 超时,从队列移除 if request in self._queue: self._queue.remove(request) return False async def complete(self, tenant_id: str, actual_tokens: int = 0) -> None: """请求完成,释放槽位""" async with self._condition: self._active_slots -= 1 await self.quota_manager.release(tenant_id, actual_tokens) self._condition.notify_all()四、边界分析与架构权衡
4.1 令牌桶 vs 漏桶
令牌桶允许突发流量——桶中有足够令牌时,请求可以瞬间消耗。漏桶则以固定速率放行请求,不允许突发。对于 AI 推理场景,令牌桶更合适,因为用户的请求模式天然是突发的(如打开应用后连续提问)。但突发可能导致 GPU 瞬间过载,需要配合并发数硬限制使用。
4.2 优先级与公平性的矛盾
高优先级租户可以插队,这意味着低优先级租户可能长时间等待。极端情况下,如果高优先级租户持续发送请求,低优先级租户可能永远得不到服务。解决方案是设置"最低保障"——每个租户无论优先级如何,都保证获得最低比例的 GPU 时间。
4.3 配额估算的准确性
令牌桶的预估值(estimated_tokens)与实际消耗可能差异很大。如果预估偏低,租户可能消耗超出配额;如果预估偏高,配额利用率不足。更精确的做法是基于历史数据动态调整预估值,但这增加了系统复杂度。
五、总结
多租户隔离的核心是在共享集群上实现公平的资源分配。令牌桶控制速率,并发数限制控制峰值,优先级调度处理竞争。三层机制协同工作,既保证了大客户的资源需求,也防止了小客户被饿死。
落地路线建议:第一步,为每个租户配置基础配额(并发数 + Token 速率),先做硬限制;第二步,引入令牌桶,支持突发流量;第三步,添加优先级调度,为 VIP 客户提供资源保障;第四步,基于历史数据动态调整配额,提升集群资源利用率。
