当前位置: 首页 > news >正文

mori通信库分析(一)——对称内存RDMA数据发送过程

前言

在高性能计算领域,对称内存(Symmetric Memory)是简化多进程/多GPU通信的关键抽象。AMD开源的MORI库(基于IBGDA)借助对称堆(Symmetric Heap)和虚拟内存管理(VMM),实现了高效的GPU间RDMA数据传输。本文简单分析设备端执行shmem_put时的数据发送路径。

主机端初始化:建立对称映射

注册对称对象, RegisterSymmMemObj:

// 1. 分配CPU侧对象SymmMemObj*cpuMemObj=newSymmMemObj();cpuMemObj->localPtr=localPtr;// 2. 通过Allgather交换本地基地址cpuMemObj->peerPtrs=calloc(worldSize,sizeof(uintptr_t));bootNet.Allgather(&localPtr,cpuMemObj->peerPtrs,sizeof(uintptr_t));// 现在 peerPtrs[pe] 是远端进程 pe 的本地堆基地址(虚拟地址)// 3. 处理P2P传输(同节点)cpuMemObj->p2pPeerPtrs=calloc(worldSize,sizeof(uintptr_t));// 交换IPC句柄并打开,得到可直接访问的映射地址hipIpcGetMemHandle(&handle,localPtr);bootNet.Allgather(&handle,ipcMemHandles,sizeof(hipIpcMemHandle_t));foreach peer i:if(CanUseP2P(i))hipIpcOpenMemHandle(&cpuMemObj->p2pPeerPtrs[i],ipcMemHandles[i],...);// 对于非RDMA传输(P2P/SDMA),覆盖peerPtrs为本地可访问地址if(transportType!=RDMA)cpuMemObj->peerPtrs[i]=cpuMemObj->p2pPeerPtrs[i];// 4. RDMA注册(仅当有RDMA peer)if(anyRdmaPeer){RdmaMemoryRegion mr=rdmaContext->RegisterRdmaMemoryRegion(localPtr,size);cpuMemObj->lkey=mr.lkey;cpuMemObj->peerRkeys[rank]=mr.rkey;}bootNet.Allgather(&cpuMemObj->peerRkeys[rank],cpuMemObj->peerRkeys,sizeof(uint32_t));

将cpuMemObj及其指针数组通过hipMemcpy拷贝到GPU,并通过globalGpuStates->heapObj供设备端内核访问。ConfigureHeapInfoForGpu
关键点:peerPtrs在RDMA模式下存储的是远端虚拟地址,在P2P模式下存储的是本进程映射的本地地址,但设备端代码统一通过peerPtrs[pe] + offset计算目标地址,无需区分底层传输。

设备端发送流程

PutNbi APIs

PutNbi APIs

// ============================================================================// PutNbi APIs - Address-based only// ============================================================================__device____attribute__((visibility("default")))intmori_shmem_putmem_nbi_thread(void*dest,constvoid*source,size_t bytes,intpe,intqpId){mori::shmem::ShmemPutMemNbiThread(dest,source,bytes,pe,qpId);return0;}// ============================================================================// PutNbi APIs - Warp Scope (Address-based only)// ============================================================================__device____attribute__((visibility("default")))intmori_shmem_putmem_nbi_warp(void*dest,constvoid*source,size_t bytes,intpe,intqpId){mori::shmem::ShmemPutMemNbiWarp(dest,source,bytes,pe,qpId);return0;}// ============================================================================// PutNbi APIs - Block Scope (Address-based only)// ============================================================================__device____attribute__((visibility("default")))intmori_shmem_putmem_nbi_block(void*dest,constvoid*source,size_t bytes,intpe,intqpId){mori::shmem::ShmemPutMemNbiBlock(dest,source,bytes,pe,qpId);return0;}

OpenSHMEM Style PutNbi APIs

#defineDEFINE_SHMEM_PUT_MEM_NBI_ADDR_API_TEMPLATE(Scope)\inline__device__voidShmemPutMemNbi##Scope(void*dest,constvoid*source,size_t bytes,\intpe,intqpId=0){\DISPATCH_TRANSPORT_TYPE(ShmemPutMemNbi##Scope##Kernel,pe,dest,source,bytes,pe,qpId);\}DEFINE_SHMEM_PUT_MEM_NBI_ADDR_API_TEMPLATE(Thread)DEFINE_SHMEM_PUT_MEM_NBI_ADDR_API_TEMPLATE(Warp)DEFINE_SHMEM_PUT_MEM_NBI_ADDR_API_TEMPLATE(Block)

DISPATCH_TRANSPORT_TYPE

#defineDISPATCH_TRANSPORT_TYPE(func,pe,...)\GpuStates*globalGpuStates=GetGlobalGpuStatesPtr();\application::TransportType transportType=globalGpuStates->transportTypes[pe];\if(transportType==application::TransportType::RDMA){\func<application::TransportType::RDMA>(__VA_ARGS__);\}elseif(transportType==application::TransportType::P2P){\func<application::TransportType::P2P>(__VA_ARGS__);\}elseif(transportType==application::TransportType::SDMA){\func<application::TransportType::SDMA>(__VA_ARGS__);\}else{\assert(false);\}

ShmemPutMemNbiThread调用的函数为ShmemPutMemNbiThreadKernel。

在多机集群场景中,本节点数据传输走p2p路径,跨节点数据传输走RDMA。如何区分?通过目标 PE(处理单元/进程)和预先协商好的传输类型(TransportType)进行动态路由。整个决策流程在初始化时建表,在运行时查表分支。在 Context::InitializePossibleTransports,Mori 会探测当前节点与所有其他 PE 之间的物理连接能力,并为每个 PE 标记一个 TransportType
DISPATCH_TRANSPORT_TYPE根据globalGpuStates->transportTypes[pe],分发到对应函数。

接下来我们分析RDMA路径下的数据传输过程。以ShmemPutMemNbiThreadKernel<application::TransportType::RDMA>为例。

template<>inline__device__voidShmemPutMemNbiThreadKernel<application::TransportType::RDMA>(constapplication::SymmMemObjPtr dest,size_t destOffset,constapplication::SymmMemObjPtr source,size_t sourceOffset,size_t bytes,intpe,intqpId){boolneed_turn{true};uint64_tturns=__ballot(need_turn);while(turns){uint8_tlane=__ffsll((unsignedlonglong)turns)-1;intpe_turn=__shfl(pe,lane);if(pe_turn==pe){DISPATCH_PROVIDER_TYPE_COMPILE_TIME(ShmemPutMemNbiThreadKernelImpl,dest,destOffset,source,sourceOffset,bytes,pe,qpId);need_turn=false;}turns=__ballot(need_turn);}}

ShmemPutMemNbiThreadKernelImpl

ShmemPutMemNbiThreadKernelImpl 是 MORI 库中 设备端非阻塞 Put 操作 的核心模板函数,负责将一个对称内存区域的数据通过 RDMA 发送到远端 PE。该函数具备以下关键特性:

  • 支持静态堆与 VMM 分块堆;
  • 支持多种网卡(MLX5/BNXT/PSD);
  • Warp 级协同预留 WQE 槽位,减少原子竞争;
  • 自动流控,当发送队列满时主动回收完成事件。

初始化与循环控制

boolneedsChunking=globalGpuStates->useVMMHeap;size_t currentOffset=0;size_t remaining=bytes;while(true){// 检查本线程是否还有剩余数据boolhas_remaining=(remaining>0);uint64_tactivemask=__ballot(has_remaining);if(activemask==0)break;...}

_ballot 同步 Warp 内所有线程,获得仍有数据待发送的线程掩码。
若掩码为 0,则所有线程已完成,退出循环。
只有掩码中的线程参与本轮发送,其他线程跳过。

计算传输大小与密钥
根据 needsChunking 分两种情况:
静态堆模式

  • lkey = source->lkey(全局统一
  • srcAddr = source->localPtr + sourceOffset + currentOffset
  • raddr = dest->peerPtrs[pe] + destOffset + currentOffset
  • rkey = dest->peerRkeys[pe]
  • transfer_size = remaining(一次性发送剩余全部)

VMM 堆模式

  • 查询源端 Chunk:VmmQueryLocalKey(srcAddr, remaining, lkey, src_chunk_size)
  • 查询目标端 Chunk:VmmQueryRemoteAddr(dstAddr, pe, remaining, raddr, rkey, dst_chunk_size)
  • transfer_size = min(src_chunk_size, dst_chunk_size, remaining)

注意:VmmQuery* 会返回当前地址所在 Chunk 的剩余大小及对应的 Key。
关键点:VMM 模式下,每次循环只传输一个 Chunk 内的数据,确保 lkey 和 rkey 有效。

对于 BNXT 网卡,需要额外计算每个线程的 PSN(包序列号)数量:

ifconstexpr(PrvdType==BNXT){psnCnt=(transfer_size+wq->mtuSize-1)/wq->mtuSize;my_psn_excl=WarpActivePsnPrefix(psnCnt,activemask,&warp_total_psn);}

WarpActivePsnPrefix 计算每个活跃线程之前的 PSN 总数,用于后续分配 PSN 偏移。

Warp 级 WQE 槽位预留
步骤说明:

  1. 确定本次需要预留的 WQE 数量 num_wqes:
  • MLX5/PSD:每个活跃线程 1 个 WQE → num_wqes = num_active_lanes
  • BNXT:每个线程需要的 WQE 数为 psnCnt(因为每个 WQE 可能携带多个 MTU 数据)→ num_wqes = warp_total_psn
  1. 由 Leader 线程(最后一个活跃线程)执行原子预留:
if(is_leader){ifconstexpr(MLX5)warp_sq_counter=atomicAdd(&wq->postIdx,num_active_lanes);elseifconstexpr(BNXT){// 同时增加 msn 和 psnatomic_add_packed_msn_and_psn(&wq->msnPack,num_active_lanes,warp_total_psn,...);warp_sq_counter=warp_msntbl_counter;// MSN 即 SQ 索引atomicMax(&wq->postIdx,warp_sq_counter+num_active_lanes);}}
  1. 广播起始槽位:warp_sq_counter = __shfl(warp_sq_counter, leader_phys_lane_id)

  2. 计算每个线程的私有 WQE 索引:
    MLX5:my_sq_counter = warp_sq_counter + my_logical_lane_id
    BNXT:my_sq_counter = warp_sq_counter + my_logical_lane_id(但 my_msntbl_counter 和 my_psn_counter 另有偏移)

设计思想:将多个线程的预留合并为一次原子操作,降低争用。

发送队列流控

while(true){uint64_tdb_touched=wq->dbTouchIdx;// 已提交硬件(但可能未完成)uint64_tdb_done=wq->doneIdx;// 已确认完成uint64_tactive=db_touched-db_done;uint64_tfree=wq->sqWqeNum-active;uint64_tneed_until_end=warp_sq_counter+num_wqes-db_touched;if(free>need_until_end)break;// 队列不够,主动等待完成ShmemQuietThreadKernelImpl<PrvdType>(pe,qpId);}

dbTouchIdx:网卡已读取的 WQE 最大索引(但可能未完成)。
doneIdx:已完成(CQE 已回收)的最大索引。
若空闲槽位不足以容纳本次预留,则调用 ShmemQuietThreadKernelImpl 尝试从 CQ 中回收完成事件,更新 doneIdx。
循环直至满足条件。

填充 WQE 与更新门铃
调用 core::PostWrite。该函数根据网卡类型在 SQ 内存中写入 WQE,返回一个门铃值(包含需要通知硬件的状态)。

uint64_tdbr_val;ifconstexpr(MLX5){dbr_val=PostWrite(wq,my_sq_counter,my_sq_counter,my_sq_counter,is_leader,qpn,srcAddr,lkey,raddr,rkey,transfer_size);}elseifconstexpr(BNXT){dbr_val=PostWrite(wq,my_sq_counter,my_msntbl_counter,my_psn_counter,is_leader,qpn,srcAddr,lkey,raddr,rkey,transfer_size);}elseifconstexpr(PSD){dbr_val=PostWrite(wq,my_sq_counter,my_sq_counter,my_sq_counter,is_leader,qpn,srcAddr,lkey,raddr,rkey,transfer_size);}

不同网卡需要不同的索引(MSN/PSN)。

is_leader 决定是否在 WQE 中设置“完成通知”标志(通常只有最后一个 WQE 需要产生 CQE,减少中断开销)。

内存屏障与门铃提交

__threadfence_system();// 确保所有 WQE 写入全局可见if(is_leader){// 等待 dbTouchIdx 追上预留起始位置(确保前面的 WQE 已被硬件读取)while(wq->dbTouchIdx!=warp_sq_counter){}// 更新网卡侧的 dbr 记录UpdateSendDbrRecord(wq->dbrRecAddr,warp_sq_counter+num_wqes);__threadfence_system();// 敲响门铃RingDoorbell(wq->dbrAddr,dbr_val);__threadfence_system();// 更新 needConsIdx 并设置 dbTouchIdxatomicAdd(&cq->needConsIdx,1);atomicStore(&wq->dbTouchIdx,warp_sq_counter+num_wqes);}

UpdateSendDbrRecord 更新网卡侧的内存指针,告诉硬件新的 WQE 范围。
RingDoorbell 写入 PCIe 门铃寄存器,触发硬件 DMA 读取 WQE 并执行。
dbTouchIdx 的更新让其他线程知道这些槽位已被提交。

特殊优化与细节

  1. 多线程协同预留
    只有 Leader 线程执行原子操作,其他线程通过 __shfl 获取索引,避免所有线程争抢同一个原子变量。这种做法充分利用了 Warp 内通信的低延迟优势。
  2. 流控与完成回收的原子锁
    ShmemQuietThreadKernelImpl 内部会尝试获取 CQ 锁(pollCqLock),只有获取到的线程执行 PollCq,其他线程自旋等待或返回(非最终静默场景)。这保证了 CQ 处理的串行化。

QueryRemoteAddr

ConcurrentPutImmThreadKernelPureAddr

// New API: Using pure addresses__global__voidConcurrentPutImmThreadKernelPureAddr(intmyPe,uint32_t*localBuff){constexprintsendPe=0;constexprintrecvPe=1;uint32_tval=42;intglobalTid=blockIdx.x*blockDim.x+threadIdx.x;if(myPe==sendPe){// Calculate destination addressuint32_t*dest=localBuff+globalTid;// Use pure address-based APIShmemPutSizeImmNbiThread(dest,&val,sizeof(uint32_t),recvPe);__threadfence_system();if(blockIdx.x==0){ShmemQuietThread();}}else{// Wait for data to arrivewhile(atomicAdd(localBuff+globalTid,0)!=val){}}}

在RDMA模式下,分析程序怎么把本地的dest地址转换为远程地址的。
ShmemPutSizeImmNbiThread

#defineSHMEM_PUT_SIZE_IMM_NBI_ADDR_API(Scope)\inline__device__voidShmemPutSizeImmNbi##Scope(void*dest,void*val,size_t bytes,intpe,\intqpId=0){\DISPATCH_TRANSPORT_TYPE(ShmemPutSizeImmNbi##Scope##Kernel,pe,dest,val,bytes,pe,qpId);\}SHMEM_PUT_SIZE_IMM_NBI_ADDR_API(Thread)SHMEM_PUT_SIZE_IMM_NBI_ADDR_API(Warp)

以ShmemPutSizeImmNbiThreadKernel为例,其内部调用函数ShmemPutSizeImmNbiThreadKernelAddrImpl。

// Convert addresses to remote addresses (supports both Static Heap and VMM Heap)uintptr_t raddr;uint32_trkey;QueryRemoteAddr(dest,pe,raddr,rkey);

QueryRemoteAddr

// Calculate offset within the symmetric heapsize_t offset=localAddrInt-globalGpuStates->heapBaseAddr;application::SymmMemObj*heapObj=globalGpuStates->heapObj;if(globalGpuStates->useVMMHeap){// VMM Heap: need to get chunk-specific rkeyVmmLookupRemote(localAddrInt,pe,out_raddr,out_rkey);}else{// Static Heap: direct rkey accessout_raddr=heapObj->peerPtrs[pe]+offset;out_rkey=heapObj->peerRkeys[pe];}

out_raddr = peerPtrs[pe] + offset。VmmLookupRemote中也是类似处理。

Reference

[1] rocm mori
[2] vLLM distributed inference with MoRI
[3] [ROCm][PD] add moriio kv connector
[4] nccl分析(二)——RDMA带外建链过程
[5] RDMA带外建联过程

http://www.gsyq.cn/news/1631342.html

相关文章:

  • TVA在具身智能商业化部署中的技术突破(15)
  • ClassLoader深度解剖:双亲委派、Tomcat类隔离、SPI与模块化
  • 【合作邀约】携手共创未来:专业试玩广告制作,赋能您的产品增长
  • 微信小程序开发学习文档(2026汇总版)
  • 02-01-原理篇-Unity原生AssetBundle原理深度解析
  • 大模型版本命名误区解析:GPT-4o与DeepSeek-V2的真实能力边界
  • 【每天认识一个国家 | 日本】
  • 中小团队研发效能提升实战:基于 GitLab CI/CD 的自动化测试与发布流水线搭建
  • PCB设计中孤铜现象的影响与AD18处理技巧
  • 批量压缩图片还在用在线工具?这款648KB小软件,画质不变体积暴减
  • 混凝土裂隙数据集 建筑物裂缝分割数据集 1000张yolo数据集
  • 设备单元级(L1)实施路径
  • 【AI编程代码审查黄金标准】:20年资深架构师亲授5大质量保障铁律,错过再等十年?
  • Scrum落地避坑指南:一个技术负责人踩过的5个流程管理深坑与解法
  • 高速PMSM无感控制三大难题与工程解决方案
  • ShadingModel与Lighting
  • C++语言基础1:作用域解析运算符“::”详细讲解
  • 人工智能赋能新型工业化实施路径方法论
  • 《唤醒你的AI同事:WorkBuddy从零上手》035:工作流程优化
  • 【共创季稿事节】鸿蒙原生 ArkTS 布局方式之 Stack 实现渐变背景与文字对比度提升
  • 成都月映长滩四层老旧别墅电梯落地:天井改造加装封闭式曳引电梯
  • 警惕AI技术谣言:GPT-5并不存在,理性看待大模型演进
  • AI赋能非技术行业实战:我用DeepSeek+混元整理了2026河北高考志愿填报完整指南
  • 27届二本!简历主项目烂大街,立刻放弃主攻开发岗
  • 【监控与可观测性】03-ELK日志体系搭建:从采集到告警的完整闭环
  • Codex 卡在 Reconnecting 5/5?手把手带您排查修复
  • Prompt Engineering进阶指南:从提示词工程到AI Agent工作流编排
  • 7自由度开源机械臂OpenArm 2.0:从实验室到生产环境的完整实战指南
  • 电子合同选型7大盲区,企业必看避坑指南
  • 智慧农业技术深耕:从单点赋能到全产业链升级,重构农业生产底层模式