当前位置: 首页 > news >正文

CANN 异步推理:隐藏推理延迟提升吞吐量的完整方案

一、同步 vs 异步推理1.1 执行模型对比同步推理 (Synchronous): 请求 → 等待推理 → 返回结果 延迟 预处理 推理 后处理 特点: 简单但 CPU 空闲等待 NPU 异步推理 (Asynchronous): 请求 → 提交推理 → 立即返回 结果就绪 → 回调通知 / 轮询获取 延迟感知 ≈ 预处理 (推理在后台) 特点: 复杂但 CPU/NPU 并行工作 ┌─────────────────────────────────┐ │ 同步: [CPU][NPU][CPU][NPU] │ │ 异步: [CPU][CPU][CPU][CPU] │ │ [NPU ][NPU ][NPU ] │ └─────────────────────────────────┘1.2 适用场景同步推理适用: - 单次推理对延迟敏感 - 简单应用不需要高吞吐 - 调试阶段 异步推理适用: - 高并发服务 - 流水线推理 - 多模型串行执行 - CPU/NPU 异构协同二、CANN 异步 API2.1 基础异步推理importtorch.npuimportthreadingclassAsyncInferenceEngine:异步推理引擎def__init__(self,model):self.modelmodel self.streamtorch.npu.Stream()self.lockthreading.Lock()definfer_async(self,input_data,callbackNone):异步推理withtorch.npu.stream(self.stream):outputself.model(input_data)ifcallback:# 注册回调 (Stream 完成后执行)eventtorch.npu.Event()event.record(self.stream)callback_threadthreading.Thread(targetself._wait_and_callback,args(event,callback,output))callback_thread.start()returnoutputdef_wait_and_callback(self,event,callback,output):等待完成并执行回调event.synchronize()callback(output)# 使用示例engineAsyncInferenceEngine(model)defon_complete(output):print(f推理完成:{output.shape})# 异步推理outputengine.infer_async(input_data,callbackon_complete)# CPU 继续其他工作process_other_tasks()2.2 Future 模式importconcurrent.futuresclassFutureInferenceEngine:Future 模式异步推理def__init__(self,model):self.modelmodel self.executorconcurrent.futures.ThreadPoolExecutor(max_workers4)self.streamtorch.npu.Stream()definfer(self,input_data):提交异步推理任务futureself.executor.submit(self._run_inference,input_data)returnfuturedef_run_inference(self,input_data):实际推理执行withtorch.npu.stream(self.stream):outputself.model(input_data)returnoutputdefinfer_batch(self,input_list):批量异步推理futures[self.infer(inp)forinpininput_list]results[f.result()forfinfutures]returnresults# 使用示例engineFutureInferenceEngine(model)# 提交多个推理任务future1engine.infer(input1)future2engine.infer(input2)future3engine.infer(input3)# 处理其他任务process_other_tasks()# 获取结果result1future1.result(timeout5.0)result2future2.result(timeout5.0)result3future3.result(timeout5.0)print(f结果:{result1.shape},{result2.shape},{result3.shape})三、生产者-消费者模型3.1 异步推理队列importqueueimportthreadingclassAsyncInferenceQueue:异步推理队列def__init__(self,model,max_queue_size100):self.modelmodel self.request_queuequeue.Queue(maxsizemax_queue_size)self.result_store{}self.lockthreading.Lock()self.runningFalseself.worker_threadNonedefstart(self):启动推理工作线程self.runningTrueself.worker_threadthreading.Thread(targetself._worker,daemonTrue)self.worker_thread.start()print(异步推理队列已启动)defstop(self):停止推理工作线程self.runningFalseifself.worker_thread:self.worker_thread.join(timeout5.0)print(异步推理队列已停止)defsubmit(self,request_id,input_data):提交推理请求self.request_queue.put({id:request_id,input:input_data,submitted_at:time.time()})withself.lock:self.result_store[request_id]{status:pending,submitted_at:time.time()}returnrequest_iddefget_result(self,request_id,timeout10.0):获取推理结果start_timetime.time()whiletime.time()-start_timetimeout:withself.lock:ifrequest_idinself.result_store:resultself.result_store[request_id]ifresult[status]completed:returnresult[output]elifresult[status]failed:raiseRuntimeError(f推理失败:{result.get(error)})time.sleep(0.01)raiseTimeoutError(f推理超时:{request_id})def_worker(self):推理工作线程whileself.running:try:requestself.request_queue.get(timeout1.0)request_idrequest[id]input_datarequest[input]try:# 执行推理outputself.model(input_data)# 存储结果withself.lock:self.result_store[request_id]{status:completed,output:output,completed_at:time.time()}exceptExceptionase:withself.lock:self.result_store[request_id]{status:failed,error:str(e)}exceptqueue.Empty:continue# 使用示例queue_engineAsyncInferenceQueue(model)queue_engine.start()# 提交请求req_idqueue_engine.submit(req_001,input_data)# 处理其他任务process_other_tasks()# 获取结果resultqueue_engine.get_result(req_id)print(f结果:{result.shape})# 停止queue_engine.stop()3.2 多消费者模型classMultiConsumerInference:多消费者异步推理def__init__(self,model,num_consumers4):self.modelmodel self.request_queuequeue.Queue(maxsize1000)self.result_store{}self.lockthreading.Lock()self.consumers[]self.num_consumersnum_consumersdefstart(self):启动多个消费者foriinrange(self.num_consumers):consumerthreading.Thread(targetself._consumer_worker,args(i,),daemonTrue)self.consumers.append(consumer)consumer.start()print(f已启动{self.num_consumers}个消费者)def_consumer_worker(self,consumer_id):消费者工作线程streamtorch.npu.Stream()whileTrue:try:requestself.request_queue.get(timeout1.0)request_idrequest[id]input_datarequest[input]try:withtorch.npu.stream(stream):outputself.model(input_data)withself.lock:self.result_store[request_id]{status:completed,output:output,consumer_id:consumer_id}exceptExceptionase:withself.lock:self.result_store[request_id]{status:failed,error:str(e),consumer_id:consumer_id}exceptqueue.Empty:continuedefsubmit(self,request_id,input_data):提交请求self.request_queue.put({id:request_id,input:input_data})withself.lock:self.result_store[request_id]{status:pending}returnrequest_iddefget_result(self,request_id,timeout10.0):获取结果start_timetime.time()whiletime.time()-start_timetimeout:withself.lock:ifrequest_idinself.result_store:resultself.result_store[request_id]ifresult[status]in[completed,failed]:returnresult time.sleep(0.01)raiseTimeoutError(f推理超时:{request_id})# 使用示例multi_consumerMultiConsumerInference(model,num_consumers4)multi_consumer.start()# 提交多个请求foriinrange(100):multi_consumer.submit(freq_{i},input_data)# 获取结果foriinrange(100):resultmulti_consumer.get_result(freq_{i})print(freq_{i}:{result[status]})四、流水线推理架构4.1 三阶段流水线classInferencePipeline:三阶段推理流水线def__init__(self,preprocessor,model,postprocessor):self.preprocessorpreprocessor self.modelmodel self.postprocessorpostprocessor# 三个阶段各自的 Streamself.preprocess_streamtorch.npu.Stream()self.inference_streamtorch.npu.Stream()self.postprocess_streamtorch.npu.Stream()# 事件同步self.preprocess_donetorch.npu.Event()self.inference_donetorch.npu.Event()definfer(self,raw_data):流水线推理# 阶段 1: 预处理withtorch.npu.stream(self.preprocess_stream):preprocessedself.preprocessor(raw_data)self.preprocess_done.record(self.preprocess_stream)# 阶段 2: 推理 (等待预处理完成)withtorch.npu.stream(self.inference_stream):self.inference_stream.wait_event(self.preprocess_done)outputself.model(preprocessed)self.inference_done.record(self.inference_stream)# 阶段 3: 后处理 (等待推理完成)withtorch.npu.stream(self.postprocess_stream):self.postprocess_stream.wait_event(self.inference_done)resultself.postprocessor(output)returnresultdefinfer_batch(self,raw_data_list):批量流水线推理results[]forraw_datainraw_data_list:resultself.infer(raw_data)results.append(result)torch.npu.synchronize()returnresults# 使用示例pipelineInferencePipeline(preprocessor,model,postprocessor)resultspipeline.infer_batch(raw_data_list)五、错误处理与超时控制5.1 超时控制classTimeoutInferenceEngine:带超时控制的异步推理def__init__(self,model,default_timeout5.0):self.modelmodel self.default_timeoutdefault_timeout self.streamtorch.npu.Stream()definfer_with_timeout(self,input_data,timeoutNone):带超时的推理timeouttimeoutorself.default_timeout futureself._submit_inference(input_data)try:resultfuture.result(timeouttimeout)returnresultexceptconcurrent.futures.TimeoutError:# 超时处理print(f推理超时 ({timeout}s))returnNonedef_submit_inference(self,input_data):提交推理任务executorconcurrent.futures.ThreadPoolExecutor(max_workers1)futureexecutor.submit(self._run_inference,input_data)returnfuturedef_run_inference(self,input_data):实际推理withtorch.npu.stream(self.stream):outputself.model(input_data)returnoutput# 使用示例engineTimeoutInferenceEngine(model,default_timeout5.0)resultengine.infer_with_timeout(input_data,timeout3.0)5.2 重试机制classRetryInferenceEngine:带重试机制的异步推理def__init__(self,model,max_retries3,retry_delay1.0):self.modelmodel self.max_retriesmax_retries self.retry_delayretry_delay self.streamtorch.npu.Stream()definfer_with_retry(self,input_data):带重试的推理forattemptinrange(self.max_retries):try:withtorch.npu.stream(self.stream):outputself.model(input_data)returnoutputexceptExceptionase:ifattemptself.max_retries-1:print(f推理失败 (尝试{attempt1}/{self.max_retries}):{e})time.sleep(self.retry_delay)else:raiseRuntimeError(f推理失败已重试{self.max_retries}次:{e})# 使用示例engineRetryInferenceEngine(model,max_retries3)resultengine.infer_with_retry(input_data)六、常见问题问题原因解决方案异步结果获取失败Stream 未同步使用 Event 同步内存泄漏异步任务未清理及时清理过期任务推理顺序错乱未使用请求 ID使用请求 ID 跟踪超时不生效超时设置太长调整超时参数重试风暴重试间隔太短增加退避策略相关仓库ascend-cl- 异步推理接口 https://gitee.com/ascend/ascend-cltorch_npu- Stream 管理 https://gitee.com/ascend/torch_nputorch_npu- Event 同步 https://gitee.com/ascend/torch_npu
http://www.gsyq.cn/news/1355978.html

相关文章:

  • Python EXE逆向工程实战指南:3步高效提取源代码的完整教程
  • OpCore-Simplify:三步搞定OpenCore EFI配置的终极解决方案
  • 如何用SUMO-RL构建智能交通信号系统:强化学习实战指南
  • 华硕笔记本性能优化终极指南:三步搞定轻量级控制神器GHelper
  • 甲言Jiayan:5大功能让文言文处理变得如此简单
  • PDF补丁丁:免费开源PDF工具箱,一键解决书签合并旋转等所有难题
  • Nodejs 后端服务如何集成多模型能力处理用户提问
  • 洛雪音乐音源:如何免费畅享全网无损音乐的终极指南
  • SolveSpace参数化CAD设计:5大核心功能深度解析与实战指南
  • Vue3与Element Plus在企业级后台系统中的架构设计与深度实践
  • 革命性macOS窗口管理:Topit智能窗口置顶工具的深度解析与实战指南
  • MATLAB机器人工具箱终极指南:从零到精通的快速入门完整教程
  • 全网最实用的网页完整保存手册:再也不怕点击才显示的内容消失了
  • 使用 Python 和 Taotoken 官方风格 SDK 实现你的第一个 AI 对话应用
  • 深度解析Python SECS/GEM协议实现:secsgem库的现代架构设计
  • 5个关键步骤:使用SUMO-RL构建城市智能交通信号控制系统
  • 洛雪音乐音源配置终极指南:5分钟打造你的专属音乐库
  • STM32开发实战:CubeMX与Visual Studio环境搭建的两种高效路径
  • 不止于调试:用Jetson Xavier NX的UART连接传感器与Arduino,打造边缘计算小项目
  • 手把手教你用ESP32C3驱动WS2812灯带:从RMT底层配置到彩虹灯效实现
  • AI Agent Runtime重构:Session事件日志如何解决上下文溢出顽疾
  • 华为交换机VRRP实战:如何用主备网关实现市场部与技术部的网络负载分担?
  • Claude学术写作辅助应用:今天不部署,下周组会PPT将暴露你仍用Word手动调格式
  • 仅限本周开放|Midjourney水效果渲染私藏参数库(含8个失效规避checklist+实时渲染反馈诊断表)
  • STM32F103C8T6+TJA1042+UTA0403:一个CAN通讯新手踩过的所有坑(附完整接线图与代码)
  • 扩散图神经网络在机器人嗅觉导航中的应用与优化
  • 企业级应用如何通过Taotoken统一管理多个AI模型API密钥
  • Kibana 将 dashboard 加载时间最高缩短 25% —— 其背后的 polling 策略揭秘
  • 从点灯到按键:用STM32CubeMX 6.7.0 + HAL库完成你的第一个嵌入式交互项目
  • 告别玄学调试:用HyperLynx快速评估DDR4 T型拓扑与Stub长度的信号影响