当前位置: 首页 > news >正文

异步任务提交 + Redis 状态轮询模式实战指南

异步任务提交 + Redis 状态轮询模式实战指南

一、概述

当一个业务操作耗时较长(如拆单发货、大文件处理、复杂计算),如果同步执行会导致接口超时或用户长时间等待。解决方案是:接口只负责提交任务,立即返回一个任务ID,实际处理交给 MQ 消费者异步执行,处理结果写入 Redis,前端通过任务ID轮询查询结果。

本文以一个"订单拆分处理"场景为例,系统介绍这种模式的完整实现。


二、核心设计

2.1 两个接口的职责

接口职责执行方式返回
提交接口参数校验 → 初始化状态 → 发MQ同步(毫秒级)taskId
状态查询接口根据 taskId 读取 Redis同步(毫秒级)处理状态+结果

2.2 状态流转

提交接口写入 Redis: status=1(处理中) ↓ MQ 消费者处理成功: status=2(成功)+ result ↓ 或 MQ 消费者处理失败: status=0(失败)+ errorMsg + errorCode

2.3 数据流向

提交接口: 前端 → Controller → 写Redis(status=1) → 发MQ → 返回taskId

异步处理: MQ Consumer → 业务处理 → 更新Redis(status=2或0)

状态查询: 前端 → Controller → 读Redis → 返回结果


三、Redis 数据结构设计

@DatapublicclassTaskStatusDtoimplementsSerializable{/** 状态:1-处理中 2-成功 0-失败 */privateIntegerstatus;/** 错误码(失败时返回,用于前端特殊处理) */privateIntegererrorCode;/** 错误信息(失败时返回) */privateStringerrorMsg;/** 处理结果(成功时返回) */privateTaskResultDtoresult;}

Redis Key 设计

task:{memberId}_{orderCode}_{uuid} TTL: 1天

包含 memberId 和 orderCode 便于排查问题,UUID 保证唯一性。


注:

博客:

https://blog.csdn.net/badao_liumang_qizhi

四、完整示例

4.1 提交接口

@RestController@RequestMapping("/api/page/order")publicclassOrderSplitController{@ResourceprivateStringRedisTemplatestringRedisTemplate;@ResourceprivateOrderSplitMqSenderorderSplitMqSender;privatestaticfinalStringREDIS_KEY_FORMAT="order:split:%s";/** * 提交拆单请求. */@PostMapping("/submit-order-split")publicRestControllerResult<String>submitOrderSplit(@RequestBodyOrderSplitParamDtoparamDto){// 1. 参数校验if(CheckEmptyUtil.isEmpty(paramDto.getOrderCode())){thrownewBusinessException("订单编号不能为空");}// 2. 生成任务ID(Redis Key)IntegermemberId=UserContext.getMemberId();Stringuuid=memberId+"_"+paramDto.getOrderCode()+"_"+UUID.randomUUID().toString().replaceAll("-","");StringtaskId=String.format(REDIS_KEY_FORMAT,uuid);// 3. 初始化Redis状态(处理中)TaskStatusDtostatusDto=newTaskStatusDto();statusDto.setStatus(1);// 处理中stringRedisTemplate.opsForValue().set(taskId,JSON.toJSONString(statusDto),1,TimeUnit.DAYS);// 4. 补充参数paramDto.setTaskId(taskId);paramDto.setOperatorId(UserContext.getUserId());paramDto.setMemberId(memberId);// 5. 发送MQorderSplitMqSender.send(paramDto);// 6. 返回taskIdreturnRestControllerResult.success(taskId);}/** * 查询拆单状态. */@GetMapping("/order-split-status")publicRestControllerResult<TaskStatusDto>getOrderSplitStatus(@RequestParam("taskId")StringtaskId){if(CheckEmptyUtil.isEmpty(taskId)){thrownewBusinessException("taskId不能为空");}Stringjson=stringRedisTemplate.opsForValue().get(taskId);TaskStatusDtostatusDto;if(CheckEmptyUtil.isEmpty(json)){// Redis中无数据(可能已过期)statusDto=newTaskStatusDto();statusDto.setStatus(0);statusDto.setErrorCode(-1);statusDto.setErrorMsg("任务不存在或已过期");}else{statusDto=JSON.parseObject(json,TaskStatusDto.class);}returnRestControllerResult.success(statusDto);}}

4.2 MQ 生产者

@Slf4j@ComponentpublicclassOrderSplitMqSender{@ResourceprivateRabbitTemplaterabbitTemplate;privatestaticfinalStringEXCHANGE="order.split.exchange";privatestaticfinalStringROUTING_KEY="order.split.routing";publicvoidsend(OrderSplitParamDtoparamDto){Stringmessage=JSON.toJSONString(paramDto);rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,message);log.info("拆单任务MQ发送成功, orderCode={}, taskId={}",paramDto.getOrderCode(),paramDto.getTaskId());}}

4.3 MQ 消费者

@Slf4j@ComponentpublicclassOrderSplitMqConsumer{@ResourceprivateOrderSplitServiceorderSplitService;@ResourceprivateStringRedisTemplatestringRedisTemplate;@RabbitListener(queues="${mq.order.split.queue}")publicvoidconsume(Stringmessage){OrderSplitParamDtoparamDto=JSON.parseObject(message,OrderSplitParamDto.class);StringtaskId=paramDto.getTaskId();log.info("拆单任务消费开始, orderCode={}",paramDto.getOrderCode());try{// 执行拆单业务逻辑TaskResultDtoresult=orderSplitService.doSplit(paramDto);// 处理成功,更新RedisTaskStatusDtostatusDto=newTaskStatusDto();statusDto.setStatus(2);statusDto.setResult(result);stringRedisTemplate.opsForValue().set(taskId,JSON.toJSONString(statusDto));log.info("拆单任务处理成功, orderCode={}",paramDto.getOrderCode());}catch(Exceptione){log.warn("拆单任务处理失败, orderCode={}",paramDto.getOrderCode(),e);// 处理失败,更新Redis(区分错误码)TaskStatusDtostatusDto=newTaskStatusDto();statusDto.setStatus(0);if(einstanceofBusinessException){statusDto.setErrorCode(((BusinessException)e).getErrorCode());statusDto.setErrorMsg(e.getMessage());}else{statusDto.setErrorCode(-1);statusDto.setErrorMsg("处理异常,请稍后重试");}stringRedisTemplate.opsForValue().set(taskId,JSON.toJSONString(statusDto));}}}

4.4 业务处理Service(逐笔catch模式)

当任务内部包含多笔处理,且希望单笔失败不影响其他笔时:

@Slf4j@ServicepublicclassOrderSplitServiceImplimplementsOrderSplitService{@Override@Transactional(rollbackFor=Exception.class)publicTaskResultDtodoSplit(OrderSplitParamDtoparamDto){List<SplitItemDto>items=paramDto.getItems();intsuccessNum=0;interrorNum=0;interrorCode=0;List<FailItemDto>failList=newArrayList<>();for(SplitItemDtoitem:items){try{// 单笔处理逻辑(含各种校验)processSingleItem(paramDto,item);successNum++;}catch(Exceptione){log.warn("单笔拆单失败, itemCode={}",item.getItemCode(),e);errorNum++;// 记录失败信息FailItemDtofailDto=newFailItemDto();failDto.setItemCode(item.getItemCode());failDto.setErrorMsg(e.getMessage());failList.add(failDto);// 捕获自定义错误码if(einstanceofBusinessException){errorCode=((BusinessException)e).getErrorCode();}}}// 组装结果TaskResultDtoresult=newTaskResultDto();result.setTotalNum(items.size());result.setSuccessNum(successNum);result.setErrorNum(errorNum);result.setFailList(failList);// 如果有自定义错误码,设置到result中if(errorCode!=0){result.setErrorCode(errorCode);}returnresult;}}

五、错误码传递机制

5.1 问题

异步场景下,业务异常在 MQ 消费者中抛出,前端无法通过 HTTP 响应码直接感知。需要一种机制将错误码传递到前端。

5.2 两种传递路径

路径A:外层异常(整个任务失败)

异常抛出 → Consumer catch → 写入 TaskStatusDto.errorCode → Redis → 前端轮询获取

路径B:内层异常(单笔失败,任务整体成功)

异常抛出 → 内层 catch → 写入 TaskResultDto.errorCode → Redis → 前端从 result.errorCode 获取

5.3 前端判断逻辑

constpollResult=async(taskId)=>{constres=awaitapi.getStatus(taskId);if(res.status===1){// 处理中,继续轮询return;}if(res.status===0){// 整体失败if(res.errorCode===10001){showBillUnpaidDialog();// 跳转结算中心}else{showError(res.errorMsg);}return;}if(res.status===2){// 整体成功,但可能部分失败if(res.result.errorCode===10001){showBillUnpaidDialog();// 部分失败因为账单未支付}showResult(res.result);}};

六、关键设计点

6.1 Redis TTL 设置

// 提交时设置 TTL = 1天stringRedisTemplate.opsForValue().set(taskId,json,1,TimeUnit.DAYS);// 消费者更新时不重设 TTL(保持原有过期时间)stringRedisTemplate.opsForValue().set(taskId,json);

建议:

  • 提交时设 1 天 TTL
  • 消费者更新时不改 TTL
  • 超过 1 天未处理完的任务自动失效

6.2 分布式锁防重复处理

try(DistributedLocklock=lockProvider.getLock(lockKey,TimeUnit.MINUTES,20)){if(lock.tryLock(TimeUnit.MINUTES,10)){// 执行业务}}

防止同一任务被多个消费者实例重复处理。

6.3 事务与 Redis 写入的关系

@Transactional(rollbackFor=Exception.class)publicTaskResultDtodoSplit(OrderSplitParamDtoparamDto){// 业务处理...}

注意:如果方法标注了@Transactional,在方法内部写入 Redis 后如果事务回滚,Redis 中的数据不会回滚。建议:

  • 成功时:在事务提交后通过回调写入 Redis
  • 失败时:在 catch 中写入 Redis(此时事务已回滚)

6.4 前端轮询策略

letretryCount=0;constmaxRetry=60;// 最多轮询60次(2分钟)constinterval=2000;// 2秒一次consttimer=setInterval(async()=>{retryCount++;constres=awaitapi.getStatus(taskId);if(res.status!==1||retryCount>=maxRetry){clearInterval(timer);if(retryCount>=maxRetry&&res.status===1){showError("处理超时,请稍后在历史记录中查看");}else{handleResult(res);}}},interval);

七、异常处理分层

层级异常类型处理方式对前端的影响
提交接口参数校验失败直接抛异常,接口返回错误接口报错,不进入轮询
MQ消费者外层获取锁失败、未知异常写入 status=0 + errorMsg轮询拿到失败结果
MQ消费者内层单笔业务异常记录到 failList + errorCode轮询拿到部分成功结果

八、与纯同步接口的对比

维度同步接口异步提交+轮询
接口响应时间与处理时间成正比固定毫秒级
超时风险
错误码传递HTTP 响应中直接返回写入 Redis,轮询获取
用户体验页面卡住等待显示进度/加载状态
实现复杂度
适用场景处理时间 < 3秒处理时间 > 3秒

九、最佳实践清单

  1. 提交接口只做参数校验和发MQ,不执行业务逻辑
  2. Redis Key 包含业务标识(memberId、orderCode),便于排查
  3. 设置合理的 Redis TTL,避免数据堆积
  4. 分布式锁防重复消费
  5. 内层 catch 记录 errorCode,不仅记录 errorMsg
  6. 前端设置轮询上限,避免无限轮询
  7. Redis 数据不存在时返回明确状态(任务不存在或已过期)
  8. 区分整体失败和部分失败,给前端不同的处理依据
  9. 事务提交后再写 Redis,保证数据一致性
  10. 日志记录完整链路:提交时记录 taskId,消费时记录开始/成功/失败
http://www.gsyq.cn/news/1463323.html

相关文章:

  • 树莓派便携服务器DIY:从硬件组装到软件部署全攻略
  • 解锁WanVideo_comfy高级功能:LoRAs模型安装与应用技巧终极指南
  • 终极指南:如何在消费级GPU上快速部署Wan2.2-T2V-A14B视频模型
  • GLM-5.1实战指南:零改造接入VS Code/LangChain/Ollama
  • Qwen2.5-VL-72B-Instruct-quantized.w8a8极限优化:单GPU运行72B模型的实战技巧
  • MySQL性能屠龙刀:EXPLAIN与慢查询日志深度排查及优化终极指南
  • Linux 服务器安装 Nginx:从零到能用,5 分钟搞定
  • 保姆级教程:用D435i录制ROS Bag并转成BundleFusion能吃的.sens格式(附完整代码)
  • 快马AI助力:一分钟生成电商网站Playwright自动化测试原型
  • 别再只用SGD了!用PyTorch的RMSProp优化器解决梯度震荡,附完整代码对比
  • ai辅助开发新体验:让快马ai将你的自然语言变成xshell自动化脚本
  • 天津包车哪家靠谱?附真实价格与公司推荐==天津包车|企业团建年会展会研学正规用车 - 米米Ada
  • 钢件防腐技术条件
  • 从零搭建AI驱动的资产配置引擎,深度解析OpenBB+LangChain+QuantConnect三端协同架构
  • 如何用AceGPT-v2-32B解决阿拉伯语复杂任务?5个实战案例分享
  • bert-kachakacha揭秘:如何用这个94.65%准确率的BERT模型快速进行情感分析
  • Mermaid Live Editor技术架构深度解析:现代前端图表编辑器的实现原理
  • 录屏界面记录
  • PyTorch-NPU DBNet与GPU版本对比:性能差异与选择指南
  • Janus-Pro-1B模型部署完全指南:云端、本地与边缘计算环境配置
  • 气动单足机器人垂直跳跃动态特性的解析方案【附数据】
  • 武汉云克隆Luminex检测多因子精准评估骨转换状态,助力骨骼疾病研究突破
  • AI教材编写指南:低查重AI工具,10分钟生成25万字教材书稿!
  • 如何用AI多智能体系统快速搭建你的专业股票分析平台
  • 深入分析magnum-v2-4b数据集:训练数据的来源与质量评估终极指南
  • PDF补丁丁:免费高效的PDF批量处理终极解决方案
  • BitCPM-CANN-3B-unquantized完整部署教程:从训练到推理的完整流程
  • 2026年深圳婚礼策划推荐榜单:海外婚礼/目的地婚礼/草坪婚礼/户外婚礼/老钱风婚礼/秀场风婚礼品牌深度解析与高定服务优选 - 品牌企业推荐师(官方)
  • 为什么选择ChongqingAscend/distilbert-base-italian-cased?终极意大利语模型性能对比指南
  • Atcoder-460-D Repeatedly Repainting