一、昇腾异构架构1.1 为什么需要异构计算昇腾 NPU 采用的是 CPUNPU 分离架构CPU 负责控制流和预处理NPU 负责高性能矩阵运算。这种设计与 NVIDIA GPU 有明显区别——在 GPU 中 CUDA 核既是计算核心也是控制核心而在昇腾架构中控制逻辑和数据准备由更灵活的 CPU 处理NPU 专注计算。分离设计的好处在于CPU 更适合控制流分支判断、循环控制、内存管理灵活CPU 内存比 HBM 大得多、编译简单无需为控制流生成专用 GPU 代码。┌──────────────────────────────────────────────────┐ │ 昇腾 CPU NPU 异构架构 │ ├──────────────────────────────────────────────────┤ │ │ │ CPU │ │ ┌──────────────────────────────────────────┐ │ │ │ 数据准备 │ 控制流 │ 调度 │ 预处理 │ │ │ └──────────────────────────────────────────┘ │ │ ↓ 数据拷贝 ↑ 结果回传 │ │ ┌──────────────────────────────────────────┐ │ │ │ NPU │ │ │ │ Cube矩阵│ Vector向量│ Scalar │ │ │ │ 高性能矩阵运算 │ │ │ └──────────────────────────────────────────┘ │ │ │ └──────────────────────────────────────────────────┘1.2 Host 与 Device 的角色划分组件角色职责HostCPU控制与调度数据准备、模型构建、内存管理、异常处理DeviceNPU计算执行神经网络算子执行、矩阵运算、梯度计算CPU 适合的任务数据读取与预处理IO 密集Python 胶水代码控制流动态 shape 处理内存分配与回收调试与日志NPU 适合的任务矩阵乘加运算MatMul、Conv2d批量向量运算LayerNorm、Softmax梯度计算反向传播部署推理前向计算1.3 数据传输开销CPU 与 NPU 之间的数据传输是异构编程的主要开销操作延迟带宽优化方向HBM → Host100-200 ns25 GB/s减少传输次数/批量传输Host → HBM100-200 ns25 GB/s使用 pinned memoryNPU 内部 10 ns256 GB/s尽量在 NPU 内部完成优化原则最小化 Host-Device 交互尽量在 NPU 上完成计算减少数据回传。二、Host-Device 数据交互2.1 基础内存分配Host 内存分配#includeacl/acl.h// 分配 Host 内存void*host_ptr;size_t host_size1024*1024;// 1MBaclError retacl.rt.malloc_host(host_ptr,host_size);if(ret!ACL_SUCCESS){printf(Failed to allocate host memory\n);}// 释放retacl.rt.free_host(host_ptr);NPU 内存分配// 分配 Device 内存void*device_ptr;size_t device_size1024*1024;// 1MBretacl.rt.malloc(device_ptr,device_size);if(ret!ACL_SUCCESS){printf(Failed to allocate device memory\n);}// 释放retacl.rt.free(device_ptr);2.2 同步拷贝同步拷贝会阻塞等待数据复制完成8.1 及之前同步拷贝// 同步拷贝 Host → Deviceretacl.rt.memcpy(device_ptr,// 目标device_size,// 目标大小host_ptr,// 源host_size,// 源大小ACL_MEMCPY_HOST_TO_DEVICE// 方向);// 此时 Host 线程被阻塞// 同步拷贝 Device → Hostretacl.rt.memcpy(host_ptr,// 目标host_size,// 目标大小device_ptr,// 源device_size,// 源大小ACL_MEMCPY_DEVICE_TO_HOST);8.2 新增异步拷贝通过 Stream// 创建 StreamaclrtStream stream;retacl.rt.create_stream(stream);// 异步拷贝 Host → Device// 不阻塞提交后立即返回retacl.rt.memcpy_async(device_ptr,device_size,host_ptr,host_size,ACL_MEMCPY_HOST_TO_DEVICE,stream);// 在其他计算中使用异步拷贝// 计算和拷贝并行// 同步等待拷贝完成retacl.rt.stream_synchronize(stream);2.3 Pinned Memory 加速传输Pinned Memory锁页内存可以显著加速 Host-Device 传输// 分配 Pinned Memoryvoid*pinned_ptr;size_t pinned_size1024*1024;retacl.rt.malloc_host(pinned_ptr,pinned_size,ACL_MEM_MALLOC_HUGE_FIRST);if(ret!ACL_SUCCESS){// fallback to normal host memoryretacl.rt.malloc_host(pinned_ptr,pinned_size);}// Pinned Memory 拷贝更快retacl.rt.memcpy(device_ptr,device_size,pinned_ptr,pinned_size,ACL_MEMCPY_HOST_TO_DEVICE);// 释放retacl.rt.free_host(pinned_ptr);Pinned Memory vs 普通 Host 内存特性普通 Host 内存Pinned Memory延迟较高较低带宽受限于 PCIe接近 PCIe 上限内存占用无限制受系统限制适用场景一般数据频繁传输数据2.4 Python 接口的数据传输importtorchimportnumpyasnp# PyTorch Tensor 创建和拷贝# Host → NPUdata_npnp.random.randn(1024,1024).astype(np.float32)data_tensortorch.from_numpy(data_np)data_npudata_tensor.npu()# 自动拷贝到 NPU# NPU → Hostresult_npumodel(data_npu)result_cpuresult_npu.cpu()# 拷贝回 Host# Python 下的异步传输withtorch.npu.stream(stream):data_npudata_tensor.npu()# 异步提交# 后续计算可以和拷贝并行三、CPU-NPU 协同计算3.1 算子放置策略合理的算子放置是性能关键# 在 NPU 上执行计算密集算子classModel(nn.Module):defforward(self,x):# CPU 预处理xx.cpu()# 移回 CPUxself.preprocess_cpu(x)# CPU 处理xx.npu()# 移回 NPU# NPU 主计算xself.conv1.npu()(x)xself.conv2.npu()(x)xself.matmul.npu()(x)# CPU 后处理xx.cpu()xself.postprocess_cpu(x)returnx3.2 流水线并行通过流水线让 CPU 和 NPU 同时工作8.1 及之前串行处理# CPU 处理完才到 NPU效率低forbatchindataloader:datapreprocess_cpu(batch)# CPU 处理resultmodel_npu(data)# NPU 计算postprocess_cpu(result)# CPU 处理8.2 新增流水线并行importthreadingfromqueueimportQueueclassPipelineModel:def__init__(self,model,preprocess,postprocess):self.modelmodel.npu()self.preprocesspreprocess self.postprocesspostprocess self.input_queueQueue(maxsize4)self.output_queueQueue(maxsize4)# 启动流水线线程self.preprocess_threadthreading.Thread(targetself._preprocess_loop)self.postprocess_threadthreading.Thread(targetself._postprocess_loop)self.preprocess_thread.start()self.postprocess_thread.start()def_preprocess_loop(self):forbatchindataloader:dataself.preprocess(batch)self.input_queue.put(data)def_postprocess_loop(self):whileTrue:resultself.output_queue.get()self.postprocess(result)defpredict(self,batch):# 提交到 NPU 处理dataself.input_queue.get()resultself.model(data)self.output_queue.put(result)returnresult3.3 CPU 预处理优化预处理放在 CPU 执行可以减轻 NPU 负担defcpu_preprocess(image):CPU 预处理resize, normalize, transpose# 这些操作 CPU 更高效imagecv2.resize(image,(224,224))imageimage.astype(np.float32)/255.0# 归一化meannp.array([0.485,0.456,0.406])stdnp.array([0.229,0.224,0.225])image(image-mean)/std# 通道转换 HWC → CHWimagenp.transpose(image,(2,0,1))returnimageclassHybridModel(nn.Module):def__init__(self):self.modelMyModel().npu()defforward(self,images):# 批量预处理CPU 多核并行processed[cpu_preprocess(img)forimginimages]batchnp.stack(processed)# 拷贝到 NPUbatch_nputorch.from_numpy(batch).npu()# NPU 推理outputself.model(batch_npu)returnoutput3.4 结果回传优化减少不必要的 CPU 回传# 8.1 及之前频繁回传definference_old(images):results[]forimginimages:# 每次都回传 CPUresultmodel(img.npu()).cpu()results.append(result)returnresults# 8.2 新增批量回传definference_new(images):# 批量拷贝到 NPUbatchtorch.stack([img.npu()forimginimages])# 批量推理一次 NPU 调用resultsmodel(batch)# 一次性回传 CPUresults_cpuresults.cpu()returnresults_cpu四、异构场景实践4.1 视频处理流水线视频处理是典型的异构场景——解码用 CPU推理用 NPUimportcv2importthreadingfromqueueimportQueueclassVideoInferencePipeline:def__init__(self,model,max_queue_size8):self.modelmodel.npu()self.frame_queueQueue(maxsizemax_queue_size)self.result_queueQueue()self.runningTrue# 启动处理线程self.decode_threadthreading.Thread(targetself._decode_loop)self.inference_threadthreading.Thread(targetself._inference_loop)self.decode_thread.start()self.inference_thread.start()def_decode_loop(self):CPU 解码线程capcv2.VideoCapture(video.mp4)whileself.running:ret,framecap.read()ifnotret:break# CPU 预处理frame_processedself.preprocess_cpu(frame)# 加入推理队列阻塞直到队列有空位self.frame_queue.put(frame_processed,blockTrue)cap.release()self.frame_queue.put(None)# 结束标记def_inference_loop(self):NPU 推理线程whileTrue:frameself.frame_queue.get()ifframeisNone:break# NPU 推理frame_nputorch.from_numpy(frame).npu()resultself.model(frame_npu.unsqueeze(0))self.result_queue.put(result)defget_result(self):获取结果非阻塞try:returnself.result_queue.get_nowait()except:returnNonedefstop(self):self.runningFalseself.decode_thread.join()self.inference_thread.join()4.2 大批量数据分块处理内存受限时需要分块处理classChunkedInference:def__init__(self,model,chunk_size32):self.modelmodel.npu()self.chunk_sizechunk_sizedefpredict(self,data,labelsNone):大批量数据分块推理n_sampleslen(data)results[]foriinrange(0,n_samples,self.chunk_size):chunkdata[i:iself.chunk_size]# 分块拷贝到 NPUchunk_npuchunk.npu()# 分块推理chunk_resultself.model(chunk_npu)# 分块回传results.append(chunk_result.cpu())# 释放中间张量delchunk_npu,chunk_result torch.npu.empty_cache()returntorch.cat(results,dim0)defpredict_with_labels(self,data,labels,metric_fn):分块推理并计算指标内存友好n_sampleslen(data)metrics[]foriinrange(0,n_samples,self.chunk_size):chunkdata[i:iself.chunk_size]chunk_labelslabels[i:iself.chunk_size]chunk_npuchunk.npu()chunk_labels_npuchunk_labels.npu()outputself.model(chunk_npu)metricmetric_fn(output,chunk_labels_npu)metrics.append(metric.item())delchunk_npu,chunk_labels_npu,output torch.npu.empty_cache()returnnp.mean(metrics)4.3 动态 shape 处理动态 shape 场景下 CPU 处理更灵活classDynamicShapeModel:def__init__(self,model):self.modelmodel.npu()defpredict(self,input_dict):处理不同 shape 的输入# CPU 端解析不同格式的输入batch_dataself._cpu_preprocess(input_dict)# 动态 shape 处理max_lenmax(len(x)forxinbatch_data)batch_paddedself._pad_sequence(batch_data,max_len)# NPU 推理batch_npubatch_padded.npu()outputself.model(batch_npu)# CPU 端还原原始 shaperesultself._cpu_postprocess(output,batch_data)returnresultdef_cpu_preprocess(self,input_dict):CPU 端预处理ifisinstance(input_dict,dict):returninput_dict[texts]returninput_dictdef_pad_sequence(self,sequences,max_len):CPU 端填充padded[]forseqinsequences:iflen(seq)max_len:seqseq[0]*(max_len-len(seq))padded.append(seq)returntorch.LongTensor(padded)def_cpu_postprocess(self,output,original_data):CPU 端后处理returnoutput.cpu().numpy()五、性能调优5.1 数据传输瓶颈诊断importtimeclassDataTransferProfiler:defprofile(self,data):分析数据传输开销# CPU → NPUcopy_starttime.time()data_npudata.npu()copy_timetime.time()-copy_start# NPU 计算compute_starttime.time()resultself.model(data_npu)compute_timetime.time()-compute_start# NPU → CPUresult_cpuresult.cpu()result_timetime.time()-compute_start-copy_start totalcopy_timecompute_timeresult_timeprint(f 传输性能分析: CPU→NPU 拷贝:{copy_time*1000:.2f}ms ({copy_time/total*100:.1f}%) NPU 计算:{compute_time*1000:.2f}ms ({compute_time/total*100:.1f}%) NPU→CPU 回传:{result_time*1000:.2f}ms ({result_time/total*100:.1f}%) 总计:{total*1000:.2f}ms )return{copy:copy_time,compute:compute_time,result:result_time,total:total}5.2 优化策略总结策略说明收益批量处理一次拷贝多个样本减少固定开销流水线CPU/NPU 并行工作隐藏延迟pinned memory使用锁页内存传输提升带宽异步拷贝计算和传输 overlap隐藏传输开销减少回传只在必要时回传 CPU减少传输次数六、常见问题问题原因解决方案拷贝性能差使用普通 Host 内存使用 pinned memoryCPU 成为瓶颈预处理太慢使用流水线并行显存不足batch size 过大分块处理NPU 空闲数据供给不及时使用异步预取结果不对齐shape 不匹配检查预处理流程多线程不安全ACL 非线程安全使用线程锁或进程隔离相关仓库ascend-cl- ACL 异构编程接口 https://gitee.com/ascend/ascend-cltorch_npu- PyTorch NPU 协同接口 https://gitee.com/ascend/torch_npuascend-toolkit- Profiling 工具 https://gitee.com/ascend/ascend-toolkit