CosyVoice 双向流式 streamingCall() — 前后端总体方案
CosyVoice 双向流式streamingCall()— 前后端总体方案
在保留现有LLM 流式type=content的前提下,把 TTS 从「整段call()+ OSS URL」升级为CosyVoice 双向 WebSocket + 音频帧直推前端,并保证语音失败不影响文字。
一、现状 vs 目标
| 维度 | 现状 | 目标 |
|---|---|---|
| 文字 | 百炼streamCall→type=content | 不变 |
| TTS API | call(整段)阻塞 | streamingCall(delta)+streamingComplete() |
| 分句 | 本地VoiceStreamingSegmentBuffer | CosyVoice 服务端自动分句(可去掉本地缓冲) |
| 推前端 | voiceChunk.voiceUrl(OSS) | 音频帧(Base64 或 WS Binary)+ 可选短 URL 降级 |
| 落库 | 段级 OSS + merge →ext.voiceUrl | 内存攒帧 → 结束 merge 上传一次 |
| complete | 等 TTS 队列finishAndAwait | 文字结束即发 complete,TTS 异步收尾 |
二、总体架构
原则
- 文字与语音解耦:content 先推;TTS 异常只 catch 打日志,不中断 LLM。
- 一问一 CosyVoice 连接:每问答一个
SpeechSynthesizer,禁止单例共享。 - 文本单线程喂 TTS:LLM delta 入队,单线程
streamingCall,避免多线程同实例。 - 出站串行:同一
WebSocketSession的 content / voiceFrame 走per-connection 发送队列。
三、后端方案
3.1 模块划分
| 模块 | 职责 |
|---|---|
AliyunAgentServiceImpl | 不变:推content+onContentDelta |
CosyVoiceStreamingSession(新) | 管理一条 WSS:streamingCall/streamingComplete/ callback / close |
VoiceSynthesisService | 新增openStreamingSession(voiceId, callback),配置 WSS 端点 |
ImStreamingVoicePushSession(重构) | 去掉call()+段级 OSS;改为转发 delta + 收帧推前端 + 攒帧 merge |
WebSocketOutboundQueue(新) | 同一连接 Text/Binary 串行 send |
MobileImController | 创建/销毁 session;LLM 结束后先发 complete,TTS 异步 finalize |
3.2 CosyVoice 会话生命周期
// 问答开始(有 doctorId + voiceId + 连接可用)SpeechSynthesisParamparam=builder().apiKey(...).model("cosyvoice-v2")// 与复刻音色一致.voice(voiceId).format(PCM_22050HZ_MONO_16BIT)// 或 MP3,流式推荐 PCM/MP3.build();SpeechSynthesizersynthesizer=newSpeechSynthesizer(param,callback);// LLM 每个 delta(onContentDelta,try-catch)textFeederQueue.offer(delta);// 单线程 consumer → synthesizer.streamingCall(delta)// LLM 结束synthesizer.streamingComplete();await onComplete/latch;// merge + 落库 + closesynthesizer.getDuplexApi().close(1000,"bye");配置新增
aliyun:voice:api-key:sk-xxxmodel:cosyvoice-v2websocket-url:wss://{workspaceId}.cn-beijing.maas.aliyuncs.com/api-ws/v1/inferenceSDK 建议≥ 2.22.0(getOutput()句子事件);当前 2.19.4 可先 PoC。
3.3 与 LLM 的衔接
onContentDelta(delta): 1. outboundQueue.send(content) // 已在 Agent 层完成 2. cosyVoiceSession.feedText(delta) // 非阻塞入队 chatStreamForWebSocket 返回后: 1. updateBotMessageAfterAiReply 2. outboundQueue.send({ type: "textComplete" }) // 或带 complete 3. cosyVoiceSession.finishAsync() // streamingComplete + 不阻塞主线程 4. 环信 / 告警 等(不依赖 TTS)3.4 推前端协议(建议)
文字(不变)
{"type":"content","content":"增量","messageId":"...","timestamp":123}音频帧(新增,推荐 JSON+Base64 便于小程序)
{"type":"voiceFrame","messageId":"429476821505122304","seq":12,"format":"pcm","sampleRate":22050,"channels":1,"bitDepth":16,"sentenceIndex":0,"event":"sentence-synthesis","data":"base64...","timestamp":123}可选:句子边界(来自result.getOutput())
{"type":"voiceSentence","messageId":"...","sentenceIndex":0,"text":"...","event":"sentence-end"}结束
{"type":"voiceComplete","messageId":"...","hasVoice":true,"voiceUrl":"https://.../merged.wav"}{"type":"complete","messageId":"...","timestamp":123}| 消息 | 时机 |
|---|---|
content | LLM 流式 |
voiceGenerating | TTS 连接建立(可选) |
voiceFrame | CosyVoiceonEvent有audioFrame |
textComplete | LLM 结束(不等 TTS) |
voiceComplete | TTSonComplete+ merge 落库后 |
complete | 与textComplete同发,或 voice 可选 |
降级:帧推送失败或小程序不支持流式播放时,保留短 MP3 分片 URL作voiceChunk兼容。
3.5 落库
onEvent: audioFrames.add(frame) onComplete: bytes = concat(frames) 或 decode PCM → WAV mergedUrl = upload OSS voice/merged ext: { hasVoice, voiceUrl, voiceFormat, voiceId }段级 OSS 可取消,只保留最终 merge 一次。
3.6 失败与隔离
| 失败 | 处理 |
|---|---|
| 无音色 / 无 doctorId | 不建 TTS,仅 content + complete |
streamingCall/ CosyVoice 报错 | 日志 +voiceComplete(hasVoice=false),不影响已推 content |
| 单帧推送失败 | 日志,继续后续帧 |
| merge/OSS 失败 | 无ext.voiceUrl,实时播放仍可能完整 |
| 23s 无新文本 | 超时关 TTS;LLM 若仍输出需续连或整问重建 session |
| 同连接连发两问 | in-flight 锁或拒绝第二问 |
3.7 并发
- 每用户每问:1× SpeechSynthesizer + 1× WSS
- 全局:有界 TTS 连接池(如 32~64),超出排队
- 禁止Spring 单例
SpeechSynthesizer WebSocketConnectionManager:增加sendBinary(connectionId, bytes)+ 与 Text 共用 outbound 队列
四、前端方案
4.1 状态机
IDLE → CONNECTED → AI_STREAMING → TEXT_DONE → VOICE_STREAMING → DONE ↓ content ↓ voiceFrame ↓ ↓ 追加播放队列- 文字:按序拼接
content - 语音:按
messageId+seq维护播放队列
4.2 播放(微信小程序)
| 方案 | 做法 | 适用 |
|---|---|---|
| A. Base64 → 临时文件 | 每句/每 N 帧攒成 WAV →wx.getFileSystemManager写 temp →InnerAudioContext.src | 改造小,延迟略高于 H5 |
| B. 句子级 WAV | 收sentence-end后拼帧写文件再播 | 与 CosyVoice 分句对齐,推荐 |
| C. 降级 URL | 仍收voiceChunk.voiceUrl | 兼容旧版 |
不建议小程序裸 PCM 逐帧直播(无 Web Audio,实现成本高)。
4.3 前端伪代码
consttextBuf={};constaudioQueue=[];// { messageId, seq, pcmChunks[] }letplaying=false;onMessage(msg){switch(msg.type){case'content':appendText(msg.messageId,msg.content);break;case'voiceFrame':enqueueFrame(msg);tryPlayNext();break;case'textComplete':markTextDone(msg.messageId);break;case'voiceComplete':case'complete':finishSession(msg.messageId);break;}}4.4 与旧协议兼容
- 检测首包:有
voiceFrame走流式;仅有voiceChunk走 URL 队列 - 版本号:连接时
?voiceProtocol=2或connected里带features: ["voiceStream"]
五、分阶段实施
| 阶段 | 内容 | 风险 |
|---|---|---|
| P0 | SDK 升级 +CosyVoiceStreamingSessionPoC;服务端收帧落日志 | 低 |
| P1 | voiceFrameBase64 推 WS;小程序句子级 temp 文件播放 | 中 |
| P2 | 去掉段级 OSS + 本地VoiceStreamingSegmentBuffer;complete 与 TTS 解耦 | 中 |
| P3 | outbound 队列、有界连接池、in-flight 锁 | 中 |
| P4 | 可选 Binary 帧、H5 Web Audio 低延迟路径 | 低 |
六、和现有voiceChunk对比
现在voiceChunk+ OSS | 双向streamingCall | |
|---|---|---|
| 首包延迟 | 整句合成 + 上传 | 更低(帧级) |
| 带宽 | 客户端拉 OSS | WS 直推(Base64 更大) |
| 小程序 | InnerAudio + URL成熟 | 需 temp 文件或句子 WAV |
| 服务端 | 简单 | WSS 长连接 + 队列 + 协议 |
| 历史回放 | 段级 + merge | 仅 merge 一次即可 |
七、推荐结论
推荐路径:LLM 文字协议不动 + CosyVoice 双向流式 +voiceFrame(句子边界拼 WAV)+ 结束 merge 一次 OSS + 文字结束立即complete+ 语音失败可降级无 voice。
- 后端核心:
CosyVoiceStreamingSession+ 文本单线程 feeder + outbound 串行队列 - 前端核心:按
messageId/seq攒帧,按句写 temp WAV 播放 - 兼容:保留
voiceChunk作降级开关
若要落地 P0/P1,切Agent 模式可从VoiceSynthesisService+ImStreamingVoicePushSession改造起笔。
