当前位置: 首页 > news >正文

狼人杀 AI 对局:后端如何用 SSE 流式推送到前端?

一、为什么要流式,而不是等一局跑完再返回 JSON?

九人局里,一个阶段可能连续触发多次 LLM 调用:狼人讨论、白天发言、逐人投票……单次推理往往要几秒到十几秒。

如果后端等 LangGraph 整段跑完再return {"state": ...},前端只能转圈等待,用户看不到:

Bot 正在想什么,发言是否正在生成,投票是否一人一人公布。

所以我们采用 SSE(Server-Sent Events):在一次 HTTP 连接里,后端持续推送多条 JSON 事件,前端边收边更新 UI;连接结束时再推一条done: true的最终快照。

二、总体架构

核心思想是一个队列,多路生产者

  1. 每个对局用thread_id区分,对应一个asyncio.Queue
  2. LangGraph 在后台 task里跑,不阻塞 SSE 写出
  3. 图节点里产生的流式事件,都投进同一个队列
  4. SSE 生成器只负责:queue.get()yield "data: ...\n\n"

三、后端入口:所有推进游戏的接口都走同一条流

/start/advance/action/speak/action/vote等,最终都调用run_and_stream,返回:

return StreamingResponse(event_stream(), media_type="text/event-stream")

event_stream的逻辑:

async def run_and_stream(input_data, thread_id): thought_queue = asyncio.Queue() register_thought_stream(thread_id, thought_queue, loop) async def graph_producer(): async for event in graph.astream_events(input_data, cfg(thread_id), version="v1"): if event["event"] == "on_chat_model_stream": chunk = event["data"]["chunk"] if chunk.content: await thought_queue.put({"type": "token", "content": chunk.content}) await thought_queue.put(SENTINEL) # 图跑完 graph_task = asyncio.create_task(graph_producer()) while True: item = await thought_queue.get() if item is SENTINEL: break yield f"data: {json.dumps(item)}\n\n" await graph_task s = graph.get_state(cfg(thread_id)) final_data = {"state": pick_values(s.values), "next": s.next, "done": True} yield f"data: {json.dumps(final_data)}\n\n"

为什么用astream_events


LangGraph 对 LangChain 模型调用会发出on_chat_model_stream事件,可以拿到 LangChain 路径下的 token,推给前端做消息打字机。

为什么最后还要get_state


流式过程中前端只做预览/增量;节点结束时 state 可能还有 patch(如__waiting_for__等人机交互)。最终以 checkpoint 快照为准,避免前后端状态漂移。

四、同步 DSPy 节点怎么往异步 SSE 里推 token?

LangGraph 里很多节点调 DSPy(同步),而 SSE 消费在 asyncio 事件循环里。直接阻塞会卡死整个服务。

stream_context.py:线程本地 callback + 跨线程入队

# 全局:thread_id → (Queue, EventLoop) _registries: dict[str, tuple] = {} def setup_node_streaming(thread_id): queue, loop = _registries[thread_id] def callback(token: str): loop.call_soon_threadsafe(queue.put_nowait, { "type": "thought_token", "content": token, }) _local.thought_callback = callback # threading.local

DSPy 自定义 LM(ai_dspy/__init__.py)在__call__里检测 callback,有则stream=True调 OpenAI API,每个 token 回调:

callback = get_thought_callback() if callback is not None: for chunk in stream: token = chunk.choices[0].delta.content callback(token) # → 安全投进 asyncio.Queue

game_logic.py:异步节点里用asyncio.to_thread包 DSPy

async def _run_dspy_streamed(thread_id, label, func, *args, **kwargs): emit_thought_event(thread_id, {"type": "thought_start", "label": label}) def _run(): setup_node_streaming(thread_id) try: return func(*args, **kwargs) finally: teardown_node_streaming() return await asyncio.to_thread(_run) # 同步 DSPy 不阻塞事件循环

白天发言等路径则用_run_llm_streamed:同样to_thread+setup_node_streaming,直接流式调 OpenAI-compatible API。

五、为什么不用 WebSocket?

SSE 单向推送足够(服务端 → 客户端);操作仍用 POST。实现简单,和 FastAPIStreamingResponse天然契合,Demo 阶段性价比最高。

http://www.gsyq.cn/news/1589557.html

相关文章:

  • 2026年微信小程序开发平台哪家好?主流工具功能和费用对比
  • LLaMA泄露事件:基础大模型治理的临界点与实践启示
  • Web测试入门:从手工到自动化,构建你的测试知识体系与实战项目
  • ReACT智能体:让大模型真正做事的推理-行动闭环框架
  • KMS_VL_ALL_AIO:智能激活脚本的完整技术解析与实战指南
  • AMD Ryzen终极调试工具SMUDebugTool:硬件性能深度掌控实战指南
  • 3种颠覆式部署方案:如何高效搭建Elasticsearch监控平台?
  • 计算机毕业设计之“花遇” 线上鲜花销售系统设计与实现
  • 承德去天津打工:天津鸿泰劳务的对比评测与风险揭示
  • OpenHarmony学习笔记【总篇:从入门到放弃】
  • WatermarkRemover:三步实现视频水印批量清除的终极解决方案
  • 自动化标签打印软件,落地实施思路,供应链协同标签打印软件
  • 分类模型评估实战:从混淆矩阵到业务指标的深度转化
  • QueryExcel终极指南:快速免费解决多Excel文件批量查询难题
  • Obsidian Excel表格转换:3分钟解决Markdown格式混乱难题
  • 2026 跨境电商卖家必备工具清单:从选品到运营,一套搞定全链路
  • Graph SLAM入门:从因子图建模到g2o实战
  • 终极指南:如何使用Translumo实现Windows游戏与视频实时屏幕翻译
  • 3大价值维度+5级能力跃迁:Chat2DB从开源工具到企业级数据管理平台的演进路径
  • Mythos架构解析:大模型长程推理的可编程能力范式
  • AI能力地图:从新闻到工作流的动态技术落地指南
  • 广州激光点焊机哪个公司技术强
  • NVIDIA算力帝国:硬件、CUDA生态与AI基础设施权力结构解析
  • 重塑Mac窗口管理体验:用Topit实现多任务智能置顶
  • 3步掌握文档下载:彻底解决30+平台付费限制难题
  • 东西方时尚审美差异量化程序,分别统计男女消费者对中西服饰偏好打分。
  • PianoPlayer深度解析:基于动态规划算法的钢琴指法生成技术实现
  • 拆解 musl libc 启动流程:从 __libc_start_main 到 main() 到底发生了什么?
  • 2026年重庆山三云企售后跟进的技术解析与工作要点说明
  • 现代gpu编程系统教程(一) ------- 概述