AI Agent 双层记忆系统:从理论到落地
前言
做过 AI 对话应用的人都遇到过这个问题:用户昨天说"叫我小王",今天问"我是谁",模型一脸茫然。
这就是记忆系统要解决的核心问题。在 Deep Research 项目中,我设计了一套双层记忆架构——短期记忆负责当前对话上下文,长期记忆负责跨会话的用户画像与历史知识。本文完整记录设计思路与关键代码。
记忆模型
参考人脑记忆机制,将 AI 记忆分为三层:
| 层级 | 类比 | 职责 | 生命周期 | 后端 |
|---|---|---|---|---|
| 短期记忆 | 工作记忆 | 当前对话的上下文 | 7天TTL | Redis / PostgreSQL / 内存 |
| 长期-语义记忆 | 百科知识 | 用户画像、偏好、事实 | 永久 | PostgreSQL / SQLite + Milvus |
| 长期-情节记忆 | 经历记忆 | 历史任务、对话记录 | 永久 | PostgreSQL / SQLite + Milvus |
架构图
用户提问 │ ▼ MemoryManager.build_personalized_prompt_context() │ ├── 短期记忆(最近N条消息 + 对话摘要) ├── 长期语义记忆(Milvus向量检索 → 用户画像/偏好/事实) └── 长期情节记忆(Milvus向量检索 → 相似历史任务) │ ▼ 拼接为 [跨会话记忆] 注入 System Prompt │ ▼ Agent 处理后,persist_turn() 持久化本轮对话
短期记忆
存储结构
支持三种后端,根据环境自动选择:
def add_short_term_message(self, thread_id, message, user_id, tenant_id): payload = {"role": "human"|"ai", "content": "..."} if self.short_term_backend == "redis": key = f"ma:short:{tenant_id}:{user_id}:{thread_id}" self._redis_client.rpush(key, json.dumps(payload)) self._redis_client.expire(key, self.short_term_ttl) elif self.short_term_backend == "postgres": # INSERT INTO short_term_messages (id, tenant_id, user_id, thread_id, role, content, created_at) ... else: # 内存 dict self.short_term.add_message(thread_id, message)摘要压缩
消息数量超阈值时自动触发 LLM 摘要压缩:
def _compress_pg_thread(self, tenant_id, user_id, thread_id): history = self._get_pg_short_term_messages(tenant_id, user_id, thread_id) if len(history) <= self.short_term_max_messages: return # 还没到阈值 # 保留最近的消息,压缩旧消息 split_at = len(history) - self.short_term_summary_threshold to_summarize = history[:split_at] keep_messages = history[split_at:] existing_summary = self._get_pg_short_term_summary(...) new_summary = self._summarize_text(existing_summary, to_summarize) # 原子替换:删旧消息 + 保留新消息 + 更新摘要 DELETE ...; INSERT keep_messages; UPSERT summary
LLM 摘要 Prompt 设计:
def _summarize_text(self, existing_summary, history_slice): prompt = ( "你是对话压缩引擎。请在保留事实、偏好、结论、待办和约束的前提下进行递归摘要。\n" f"已有摘要:{existing_summary or '无'}\n" f"新增历史:\n{history_text}\n" "输出要求:100-300字,中文,结构紧凑。" ) response = self._summary_llm.invoke([HumanMessage(content=prompt)]) return str(response.content).strip()关键点:摘要不是一次性生成,而是递归式——每次合并已有摘要 + 新增消息,类似 git rebase。
长期记忆
Milvus 向量检索
长期记忆的入口是向量检索。用户的提问 embedding 在 Milvus 中找最相似的历史记忆:
def _search_milvus(self, tenant_id, user_id, query, memory_type, limit=5): docs = self._milvus_store.similarity_search(query, k=max(limit * 4, 20)) entries = [] for doc in docs: metadata = doc.metadata or {} # 多租户过滤 if metadata.get("tenant_id") != tenant_id: continue if metadata.get("user_id") != user_id: continue # 类型过滤 if memory_type and metadata.get("memory_type") != memory_type: continue entries.append(MemoryEntry(...)) if len(entries) >= limit: break return entries注意这里k = limit * 4——从 Milvus 多取一些,因为多租户和类型过滤会筛掉大量结果。如果只取 limit 个,过滤后可能一条不剩。
记忆保存
def save_fact(self, user_id, fact, category, tenant_id): memory_id = str(uuid4()) # 1. 写 PostgreSQL(结构化的元数据 + 全文搜索能力) self._insert_memory_pg(entry, summary=fact[:500]) # 2. 写 Milvus(向量语义搜索) self._index_memory_milvus( text=fact, metadata={ "tenant_id": tenant_id, "user_id": user_id, "memory_id": memory_id, "memory_type": "semantic", "namespace": f"facts/{category}", }, ) return memory_idPostgreSQL 和 Milvus双写:PG 支撑精确匹配和 ILIKE 模糊搜索,Milvus 支撑语义相似搜索。任一挂了不影响另一路。
自动记忆提取
在persist_turn中检测用户是否触发了"记住我"意图:
def persist_turn(self, tenant_id, user_id, thread_id, query, answer): # 1. 保存短期记忆 self.add_short_term_messages(...) # 2. 检测记忆触发词 remember_markers = ["记住", "我叫", "我的偏好", "remember", "my name is", ...] if any(marker in query.lower() for marker in remember_markers): extracted = extract_memory_from_messages([user_message]) # 提取 facts 和 preferences,分别存储 for fact in extracted["facts"]: self.save_fact(user_id, fact, category="user_fact") for pref in extracted["preferences"]: self.save_user_profile(user_id, {"preferences": [pref]}, merge=True)上下文注入
每次用户提问前,build_personalized_prompt_context组装记忆上下文:
def build_personalized_prompt_context(self, user_id, thread_id, query, max_memories=8): context = self.get_context_for_agent(user_id, thread_id, query, max_memories) sections = [] if profile_text := context.get("user_profile"): sections.append(f"## 用户画像\n{profile_text}") if recent_text := context.get("recent_messages"): sections.append(f"## 最近对话\n{recent_text}") if summary_text := context.get("conversation_summary"): sections.append(f"## 对话摘要\n{summary_text}") if memory_text := context.get("memory_text"): sections.append(memory_text) return "\n\n".join(sections)注入到每个节点的 prompt 中:
def with_memory_context(state, user_prompt): memory_context = state.get("memory_context", "").strip() if not memory_context: return user_prompt return f"{user_prompt}\n\n[跨会话记忆]\n{memory_context}"追踪与调试
每次记忆注入后记录完整 trace,方便排查"为什么给我注入了这段记忆":
self._last_trace = { "query": query, "memory_count": len(memory_entries), "source_count": {"milvus": 3, "postgres": 1}, # 来自哪个检索源 "items": [ {"id": "xxx", "type": "semantic", "source": "milvus", "snippet": "..."}, ], "milvus_raw_hits": [...], # Milvus 原始命中(含被过滤的) "injected_chars": len(injected), }用户输入/memory-trace即可查看最近一次的注入详情。
总结
短期记忆:消息累积 + 递归 LLM 摘要压缩,多后端自动降级
长期记忆:PG + Milvus 双写,向量检索为主、全文搜索兜底
自动提取:触发词检测 + LLM 信息抽取,零手动标注
可观测性:每次注入记录完整 trace,排错不靠猜
这个架构的可扩展性不错——如果想接入对话推荐、情感分析、知识图谱,只需要在新的 Agent 节点中读取memory_context即可。
