当前位置: 首页 > news >正文

AI 流式响应实战:从同步等待到实时推送

AI 流式响应实战:从同步等待到实时推送

在 IM 系统中集成 AI 时,流式响应能显著提升性能。本文介绍 AQChat 如何实现 AI 流式响应,从同步等待到实时推送。

一、为什么需要流式响应?

同步等待的问题

传统同步方式的问题

// ❌ 同步方式:用户需要等待AI完整响应StringaiResponse=aiService.getAnswer(userMessage);// 如果AI响应需要10秒,用户就要等待10秒sendMessage(aiResponse);

问题:

  1. 等待时间长:AI 生成可能需要 5-10 秒,用户长时间等待
  2. 体验差:无法看到生成过程,感觉卡顿
  3. 资源占用:长时间占用连线和线程

流式响应的优势

  1. 实时反馈:逐字显示,用户可立即看到内容
  2. 体验更好:类似 ChatGPT 的打字机效果
  3. 资源利用:边生成边推送,不阻塞

对比

方式首字延迟完整响应时间用户体验
同步等待10秒10秒
流式响应1-2秒10秒

回调函数模式的设计

统一接口设计

定义统一的 AI 服务接口

publicinterfaceIAiService{/** * 流式调用AI服务 * @param userMsg 用户消息 * @param consumer 回调函数,处理每个数据块 */voidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer);/** * 多轮对话 */defaultvoidchat(Stringmessage,List<MessageRecord>messages,Consumer<AIResult>consumer){}}

关键点

  • 使用Consumer<AIResult>作为回调
  • 每个数据块通过回调处理
  • 支持多轮对话

AIResult 设计

publicinterfaceAIResult{StringgetContent();// 当前数据块的内容intgetStatus();// 状态:WAIT(0-进行中)、END(1-结束)、FAIL(2-失败)}

状态枚举

publicenumAIMessageStatusEnum{WAIT(0,"wait"),// 流式响应进行中END(1,"end"),// 流式响应结束FAIL(2,"fail");// 流式响应失败}

三、WebSocket 实时推送的实现

整体流程

用户发送消息 ↓RocketMQ异步处理 ↓ AI服务流式调用 ↓ 回调函数处理每个数据块 ↓ 封装为 STREAM_MSG_NOTIFY ↓WebSocket实时推送

代码实现

  1. RocketMQ 消费者接收消息
@ComponentpublicclassAIHelperReceiverimplementsInitializingBean{@ResourceprivateIAiServiceaiService;@ResourceprivateGlobalChannelHolderglobalChannelHolder;publicvoidinitConsumer(){defaultMQPushConsumer.setMessageListener((MessageListenerConcurrently)(messageExtList,context)->{for(MessageExtmessageExt:messageExtList){MessageDtomessageDto=JSONObject.parseObject(msgStr,MessageDto.class);// 提交到独立线程池,不阻塞MQ消费线程threadPoolUtil.submitTask(()->{StringBuilderfullContent=newStringBuilder();try{// 流式调用AI服务aiService.streamCallWithMessage(messageDto.getMessageContent(),aiResult->{// 回调函数:处理每个数据块AIMessageDtoaiMessageDto=newAIMessageDto();aiMessageDto.setMessageId(messageDto.getMessageId());aiMessageDto.setRoomId(messageDto.getRoomId());aiMessageDto.setContent(aiResult.getContent());aiMessageDto.setStatus(aiResult.getStatus());// 实时推送globalChannelHolder.sendBroadcastAIMessage(aiMessageDto,AQBusinessConstant.AI_HELPER_ID);// 累积完整内容fullContent.append(aiResult.getContent());});}catch(Exceptione){// 错误处理LOGGER.error("AI助手处理消息失败",e);AIMessageDtofailMessage=newAIMessageDto();failMessage.setStatus(AIMessageStatusEnum.FAIL.getCode());globalChannelHolder.sendBroadcastAIMessage(failMessage,AQBusinessConstant.AI_HELPER_ID);}finally{// 流式响应结束后,持久化完整消息MessageDtostoreMessage=buildStoreMessage(messageDto,fullContent);messageService.saveMessage(storeMessage);}});}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;});}}
  1. 封装流式消息并推送
@ComponentpublicclassGlobalChannelHolder{publicvoidsendBroadcastAIMessage(AIMessageDtoaiMessageDto,StringaiId){// 1. 获取AI助手信息UserGlobalInfoDtouserInfo=userHolder.getUserInfo(aiId);// 2. 构建流式消息AQChatMsgProtocol.StreamMsgNotifystreamMsgNotify=AQChatMsgProtocol.StreamMsgNotify.newBuilder().setUser(userBuilder).setMsgId(aiMessageDto.getMessageId()).setRoomId(aiMessageDto.getRoomId()).setContent(aiMessageDto.getContent()==null?"":aiMessageDto.getContent()).setStreamType(aiMessageDto.getStatus())// 0-进行中,1-结束,2-失败.build();// 3. 广播到房间内所有用户messageBroadcaster.broadcast(aiMessageDto.getRoomId(),streamMsgNotify);}}
  1. 消息广播
@ComponentpublicclassMessageBroadcaster{privatefinalMap<String,ChannelGroup>channelGroupMap=newConcurrentHashMap<>();public<TextendsGeneratedMessageV3>voidbroadcast(StringroomId,Tmsg){ChannelGroupchannelGroup=channelGroupMap.get(roomId);if(channelGroup!=null){// 批量发送,高效channelGroup.writeAndFlush(msg);}}}

四、流式消息的封装(STREM_MSG_NOTIFY)

Protobuf 消息定义

// 流式消息通知messageStreamMsgNotify{string roomId=1;// 房间IDstring msgId=2;// 消息IDUseruser=3;// AI助手信息int32 streamType=4;// 流类型:0-进行中,1-结束,2-失败string content=5;// 当前数据块内容}

消息类型

enumMsgCommand{// ...STREAM_MSG_NOTIFY=32;// 流式消息通知// ...}

消息状态流转

用户发送消息 ↓ STREAM_MSG_NOTIFY(streamType=0,content="你")← 第一个数据块 ↓ STREAM_MSG_NOTIFY(streamType=0,content="好")← 第二个数据块 ↓ STREAM_MSG_NOTIFY(streamType=0,content=",")← 第三个数据块 ↓...↓ STREAM_MSG_NOTIFY(streamType=1,content="")← 结束标志

前端处理示例(伪代码)

websocket.onmessage=(event)=>{constmessage=JSON.parse(event.data);if(message.command==='STREAM_MSG_NOTIFY'){if(message.streamType===0){// 进行中:追加内容appendContent(message.content);}elseif(message.streamType===1){// 结束:显示完整消息showCompleteMessage();}elseif(message.streamType===2){// 失败:显示错误提示showErrorMessage();}}};

五、多 AI 平台集成的统一接口设计

问题:不同 AI 平台的 API 不同

  • 阿里百炼:使用Flowable<GenerationResult>
  • Gitee AI:使用MessageHandler<String>
  • 其他平台:可能有不同的流式接口

解决方案:统一接口 + 适配器模式

  1. 统一接口定义
publicinterfaceIAiService{/** * 流式调用,统一使用 Consumer<AIResult> 回调 */voidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer);}
  1. 阿里百炼实现
@Service@PrimarypublicclassQWAiServiceimplementsIAiService{@OverridepublicvoidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer){Generationgen=newGeneration();Messagemessage=Message.builder().role(Role.USER.getValue()).content(userMsg).build();// 调用阿里百炼流式APIFlowable<GenerationResult>result=gen.streamCall(generationParam);// 转换为统一格式result.blockingForEach(r->{Stringcontent=r.getOutput().getChoices().get(0).getMessage().getContent();StringfinishReason=r.getOutput().getChoices().get(0).getFinishReason();QWResultqwResult=newQWResult();qwResult.setContent(content);// 判断是否结束qwResult.setStatus("stop".equals(finishReason)?AIMessageStatusEnum.END.getCode():AIMessageStatusEnum.WAIT.getCode());// 调用统一回调consumer.accept(qwResult);});}}
  1. Gitee AI 实现
@ServicepublicclassGiteeAIServiceimplementsIAiService{@ResourceprivateGiteeAIClientgiteeAIClient;@OverridepublicvoidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer){// 调用Gitee AI流式APIgiteeAIClient.streamChat(message,messageList,data->{JSONObjectparse=JSONObject.parseObject(data);JSONArraychoices=parse.getJSONArray("choices");JSONObjectchoicesIn=choices.getJSONObject(0);StringfinishReason=choicesIn.getString("finish_reason");if(finishReason!=null&&finishReason.equals("stop")){// 结束GiteeResultgiteeResult=newGiteeResult();giteeResult.setStatus(AIMessageStatusEnum.END.getCode());consumer.accept(giteeResult);return;}// 进行中JSONObjectdelta=choicesIn.getJSONObject("delta");Stringcontent=delta.getString("content");if(content!=null&&!content.isEmpty()){GiteeResultgiteeResult=newGiteeResult();giteeResult.setContent(content);giteeResult.setStatus(AIMessageStatusEnum.WAIT.getCode());consumer.accept(giteeResult);}});}}

统一接口的优势

  1. 业务代码无需关心具体平台
  2. 易于扩展新平台
  3. 便于切换平台(通过@Prime注解)

使用示例

// 业务代码只需要调用统一接口@ResourceprivateIAiServiceaiService;// Spring会自动注入@Primary的实现aiService.streamCallWithMessage(userMsg,aiResult->{// 处理流式响应,不关心是哪个AI平台sendBroadcastAIMessage(aiResult);});

六、性能优化

  1. 独立线程池
// AI处理在独立线程池中执行,不阻塞MQ消费线程threadPoolUtil.submitTask(()->{aiService.streamCallWithMessage(userMsg,consumer);});

优势

  • 不阻塞 RocketMQ 消费线程
  • AI 处理失败不影响其他消息
  • 可控制并发数
  1. 异步处理
// 消息发送到RocketMQ,异步处理mqSendingAgent.aiHelper(messageDto);// 立即返回,不等待AI响应

优势

  • 用户发送消息后立即返回
  • AI 响应通过 WebSocket 实时推送
  • 提升响应速度

七、总结

关键点

  1. 流式响应:使用回调函数模式,实时推送每个数据块
  2. 统一接口:IAiservice统一不同 AI 平台的接口
  3. WebSocket 推送:通过STREAM_MSG_NOTIFY实时推送
  4. 异步处理:使用 RocketMQ + 独立线程池,不阻塞主流程

优化效果

指标同步流式响应提升
首字延迟10秒1-2秒5-10倍
用户体验显著提升
资源占用降低

经验总结

  1. 流式响应能显著提升性能
  2. 统一接口便于多平台集成
  3. 异步处理避免阻塞
  4. 回调函数模式适合流式场景

通过以上实现,AQChat 实现了类似 ChatGPT 的流式响应效果,提升了用户体验。

http://www.gsyq.cn/news/127124.html

相关文章:

  • 揭秘Open-AutoGLM容器化难题:5步实现稳定高效部署
  • ModelScope镜像部署难题,99%人都忽略的3个关键配置细节
  • Open-AutoGLM环境搭建全攻略:手把手教你10分钟完成Python依赖部署
  • Hugging Face下载Open-AutoGLM总失败?你必须知道的5个关键技巧
  • 为什么你的Open-AutoGLM无法在非root下运行?深入剖析权限机制盲区
  • 黑客和程序员谁更胜一筹?从技术实力、就业范围到赚钱潜力的全方位对比
  • 研究生必备:9款免费AI论文神器,10分钟搞定全学科初稿 - 麟书学长
  • 【Open-AutoGLM vLLM推理配置终极指南】:手把手教你构建高效大模型推理系统
  • 【物联网设备配网新突破】:Open-AutoGLM WiFi直连技术全曝光
  • 红队大佬私藏的漏洞检测工具:Z0SCAN,一键扫出隐蔽漏洞,看完果断收藏!
  • 【开题答辩过程】以《基于uni-app框架的助学管理系统的设计与实现》为例,不知道这个选题怎么做的,不知道这个选题怎么开题答辩的可以进来看看
  • Flutter 应用迁移至鸿蒙HarmonyOS
  • 揭秘Open-AutoGLM虚拟机部署难题:99%新手都会忽略的3个关键细节
  • 揭秘Open-AutoGLM镜像加速技巧:如何将模型加载速度提升300%
  • Spring Bean生命周期与循环依赖全解析
  • Paperzz:当AI“指纹”成为学术新敌人,我们选择用“思想的呼吸”来化解
  • 无线调试革命来了,Open-AutoGLM如何彻底改变你的开发流程?
  • Open-AutoGLM WiFi配置(从入门到精通,仅需这一篇)
  • 当开题报告遇上 paperzz:把 “头秃时刻” 变成 “一杯咖啡的事儿”
  • Open-AutoGLM Docker部署实战(专家级配置全公开)
  • 非root环境下如何部署Open-AutoGLM?5个必知的安全避坑方案
  • 2025索尼相机适配存储卡推荐榜-专业影像创作的存储选择 - 真知灼见33
  • Open-AutoGLM如何秒连WiFi?:工程师不会告诉你的4种高效方案
  • 从单机到分布式:大数据计算模式的演进之路
  • 高质量软件测试的核心要素
  • 揭秘Open-AutoGLM与安卓13兼容性问题:3个你必须立即更新的配置文件
  • Java中的JVM(虚拟机)是什么,新书小白带你入门,收藏这篇就够了
  • 面试官问Redis主从延迟导致脏数据读怎么解决?
  • Playwright 文件上传与下载完成判断全指南
  • 2025年广东十大广告公司实力排行榜,服务大品牌的广告大型公司推荐精选优质厂家 - 品牌推荐师