RAG 召回质量治理:用 Go 构建可调试的切片、检索与重排链路
RAG 召回质量治理:用 Go 构建可调试的切片、检索与重排链路
一、检索结果看似很多,答案却总是不准:RAG 落地的第一道坑
很多团队做企业知识库问答时,第一版 RAG 通常很快就能跑起来。文档丢进向量库,用户问题转成 Embedding,再取 TopK 拼进 Prompt,最后让大模型回答。演示时效果还行,真到业务部门试用,问题就开始变得扎心。
用户问“离职交接需要哪些材料”,系统召回了一堆 HR 制度。看上去都有点关系,答案却漏掉了最后的审批附件。用户问“合同超过 50 万怎么走审批”,召回结果里有合同模板,也有采购制度,但关键的额度阈值藏在另一份财务流程里。更麻烦的是,大模型会把这些残缺上下文缝起来,回答得很像那么回事。
这就是 RAG 的典型落地陷阱:召回“有相关性”,不等于召回“可回答”。如果只盯着向量相似度,系统很容易变成一个会说话的模糊搜索框。见证奇迹的时刻不是上线当天,而是业务同事拿着错误答案来问你“这玩意儿到底信不信得过”。
RAG 质量治理要解决的不是单点算法问题,而是一条链路问题。文档切片会影响语义完整性。Query 改写会影响召回方向。粗召回会影响候选范围。重排会影响最终证据顺序。Prompt 组装会影响模型是否忠实于上下文。任何一段偷懒,最后都会反映到答案质量上。
本文用 Go 设计一条可调试的 RAG 召回链路。重点放在三个生产问题上:如何切片才不把知识切碎,如何召回才不只看表面相似,如何重排才让证据真正可用。示例代码会实现文档切片、混合召回、交叉编码器重排的接口骨架,以及必要的超时、错误处理和可观测性埋点。
二、从文档到证据链:RAG 召回链路的底层机制
RAG 的核心不是“把向量库接上大模型”。更准确地说,RAG 是把用户问题映射到一组可信证据,再让大模型基于证据生成答案。向量库只是其中一个组件。
一条生产级链路通常会包含六个阶段。
flowchart TD A[原始文档] --> B[结构化解析] B --> C[语义切片与重叠窗口] C --> D[Embedding 写入向量索引] E[用户问题] --> F[Query 规范化与改写] F --> G[向量召回 + 关键词召回] G --> H[候选片段去重与过滤] H --> I[重排模型计算证据优先级] I --> J[Prompt 证据组装] J --> K[LLM 生成答案] K --> L[引用与质量日志]第一阶段是结构化解析。PDF、Word、Markdown、网页和表格的结构完全不同。如果把它们都粗暴转成纯文本,很容易丢掉标题层级、表格字段和章节边界。RAG 对结构很敏感。因为很多答案不是藏在某一句里,而是依赖标题、列表和表格上下文。
第二阶段是语义切片。切片太短,会把一个完整规则拆散。切片太长,会让 Embedding 表达变得浑浊。常见做法是按标题、段落和 token 长度综合切分,并保留一定 overlap。Overlap 的作用不是凑字数,而是避免跨段信息被截断。
第三阶段是索引构建。Embedding 模型会把文本映射为高维向量。向量检索通常使用近似最近邻算法,例如 HNSW。它的优势是快,代价是结果不是严格精确。工程上要接受这个权衡。要想提高召回质量,不能只调一个 TopK。
第四阶段是 Query 处理。用户问题往往很短,甚至有口语化省略。比如“这个怎么报销”没有明确对象,直接向量化会很飘。Query 改写可以补全意图,也可以生成多个检索子问题。但改写不能太自由,否则会把用户原意带偏。
第五阶段是混合召回。向量检索擅长语义相似,关键词检索擅长精确术语。企业制度里经常有编号、金额、产品名和字段名。单靠向量召回很容易漏掉这些硬条件。更稳的方案是向量召回加 BM25,再统一去重。
第六阶段是重排。粗召回拿到的是候选集合,不是最终证据。重排模型会同时看 Query 和 Chunk,判断“这段内容是否真的能回答问题”。这一步通常比向量相似度更准,但成本更高,所以只对 TopN 候选做。
这里有个很现实的原则:RAG 的每一步都要能被日志还原。用户问了什么,改写成了什么,召回了哪些片段,相似度是多少,重排分是多少,最终用了哪些证据。没有这些记录,RAG 调优就会变成玄学。技术人不能靠拍脑袋修水管,得先看哪里漏。
三、用 Go 实现可观测的召回与重排骨架
下面的代码不是玩具版“向量库查一下”。它把 RAG 链路拆成可替换接口,并在关键路径上处理超时、错误和调试信息。生产环境可以把VectorStore接到 Milvus、pgvector、Qdrant 或 Elasticsearch dense_vector,把KeywordStore接到 Elasticsearch 或 OpenSearch。
package rag import ( "context" "errors" "fmt" "sort" "strings" "time" ) type Chunk struct { ID string DocID string Title string Content string Metadata map[string]string } type Candidate struct { Chunk Chunk VectorScore float64 BM25Score float64 RerankScore float64 Source string } type VectorStore interface { Search(ctx context.Context, query string, topK int) ([]Candidate, error) } type KeywordStore interface { Search(ctx context.Context, query string, topK int) ([]Candidate, error) } type Reranker interface { Score(ctx context.Context, query string, chunks []Chunk) ([]float64, error) } type Logger interface { Info(msg string, fields map[string]any) Error(msg string, fields map[string]any) } type Retriever struct { vector VectorStore keyword KeywordStore reranker Reranker logger Logger vectorTopK int keywordTopK int finalTopK int } func NewRetriever(v VectorStore, k KeywordStore, r Reranker, l Logger) *Retriever { return &Retriever{ vector: v, keyword: k, reranker: r, logger: l, vectorTopK: 30, keywordTopK: 20, finalTopK: 8, } } func (r *Retriever) Retrieve(ctx context.Context, query string) ([]Candidate, error) { query = normalizeQuery(query) if query == "" { return nil, errors.New("empty query") } start := time.Now() searchCtx, cancel := context.WithTimeout(ctx, 1500*time.Millisecond) defer cancel() vectorCh := make(chan result, 1) keywordCh := make(chan result, 1) go func() { items, err := r.vector.Search(searchCtx, query, r.vectorTopK) vectorCh <- result{items: items, err: err, source: "vector"} }() go func() { items, err := r.keyword.Search(searchCtx, query, r.keywordTopK) keywordCh <- result{items: items, err: err, source: "keyword"} }() merged := make([]Candidate, 0, r.vectorTopK+r.keywordTopK) for i := 0; i < 2; i++ { select { case res := <-vectorCh: if res.err != nil { r.logger.Error("vector search failed", map[string]any{"error": res.err.Error()}) continue } merged = append(merged, markSource(res.items, res.source)...) case res := <-keywordCh: if res.err != nil { r.logger.Error("keyword search failed", map[string]any{"error": res.err.Error()}) continue } merged = append(merged, markSource(res.items, res.source)...) case <-searchCtx.Done(): return nil, fmt.Errorf("search timeout: %w", searchCtx.Err()) } } merged = dedupeCandidates(merged) if len(merged) == 0 { return nil, errors.New("no retrieval candidates") } final, err := r.rerank(ctx, query, merged) if err != nil { return nil, err } r.logger.Info("rag retrieve finished", map[string]any{ "query": query, "candidates": len(merged), "final": len(final), "latency_ms": time.Since(start).Milliseconds(), "top_chunk": final[0].Chunk.ID, "top_score": final[0].RerankScore, }) return final, nil } type result struct { items []Candidate err error source string } func (r *Retriever) rerank(ctx context.Context, query string, candidates []Candidate) ([]Candidate, error) { limit := 24 if len(candidates) < limit { limit = len(candidates) } // 粗排先融合向量分和关键词分,避免把明显无关的片段送进重排模型浪费成本。 sort.Slice(candidates, func(i, j int) bool { return hybridScore(candidates[i]) > hybridScore(candidates[j]) }) shortlist := candidates[:limit] chunks := make([]Chunk, 0, len(shortlist)) for _, item := range shortlist { chunks = append(chunks, item.Chunk) } rerankCtx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() scores, err := r.reranker.Score(rerankCtx, query, chunks) if err != nil { return nil, fmt.Errorf("rerank failed: %w", err) } if len(scores) != len(shortlist) { return nil, fmt.Errorf("rerank score size mismatch, got=%d want=%d", len(scores), len(shortlist)) } for i := range shortlist { shortlist[i].RerankScore = scores[i] } sort.Slice(shortlist, func(i, j int) bool { return shortlist[i].RerankScore > shortlist[j].RerankScore }) if len(shortlist) > r.finalTopK { shortlist = shortlist[:r.finalTopK] } return shortlist, nil } func normalizeQuery(query string) string { query = strings.TrimSpace(query) query = strings.Join(strings.Fields(query), " ") return query } func markSource(items []Candidate, source string) []Candidate { for i := range items { if items[i].Source == "" { items[i].Source = source } } return items } func dedupeCandidates(items []Candidate) []Candidate { seen := make(map[string]Candidate, len(items)) for _, item := range items { old, ok := seen[item.Chunk.ID] if !ok || hybridScore(item) > hybridScore(old) { seen[item.Chunk.ID] = item } } out := make([]Candidate, 0, len(seen)) for _, item := range seen { out = append(out, item) } return out } func hybridScore(c Candidate) float64 { // 两类分数通常不在同一分布。生产环境应做分位数归一化。 return 0.65*c.VectorScore + 0.35*c.BM25Score }这段代码有几个设计点值得单独说。
第一,向量召回和关键词召回并发执行,但共享同一个检索超时。这样可以避免某个索引慢查询拖住整体请求。RAG 的上游通常是在线问答,检索阶段不能无限等。
第二,召回失败不是立刻全链路失败。比如关键词索引临时异常,向量召回还能返回候选。这里要记录错误,但不要因为一个召回器失败就让用户完全不可用。反过来,如果两个召回器都没有候选,就应该明确返回错误,而不是把空上下文交给大模型瞎编。
第三,重排前先做粗排截断。交叉编码器重排质量更好,但成本更高。把 200 个候选全送进去,是预算不嫌多的写法。更稳的方式是用融合分数截到 20 到 30 个,再做精排。
第四,日志里必须记录候选数量、最终片段、耗时和分数。后续做离线评测时,这些字段能直接还原一次召回过程。没有这层记录,线上用户说“答错了”,你只能打开控制台干瞪眼。
切片也要单独治理。下面是一个按段落切片的简化实现。它会保留标题,并使用 overlap 避免上下文断裂。
func SplitDocument(docID, title, text string, maxChars, overlap int) ([]Chunk, error) { if maxChars <= 0 || overlap < 0 || overlap >= maxChars { return nil, errors.New("invalid split config") } paragraphs := strings.Split(text, "\n") var chunks []Chunk var buf strings.Builder index := 0 flush := func() { content := strings.TrimSpace(buf.String()) if content == "" { return } chunks = append(chunks, Chunk{ ID: fmt.Sprintf("%s_%04d", docID, index), DocID: docID, Title: title, Content: title + "\n" + content, Metadata: map[string]string{ "splitter": "paragraph_with_overlap", }, }) index++ if overlap > 0 && len(content) > overlap { buf.Reset() buf.WriteString(content[len(content)-overlap:]) buf.WriteString("\n") } else { buf.Reset() } } for _, p := range paragraphs { p = strings.TrimSpace(p) if p == "" { continue } if buf.Len()+len(p) > maxChars { flush() } buf.WriteString(p) buf.WriteString("\n") } flush() if len(chunks) == 0 { return nil, errors.New("document produced no chunks") } return chunks, nil }这个切片器不复杂,但体现了一个原则:宁可规则清楚,也不要一上来就搞玄学。生产里可以继续增强,比如识别 Markdown 标题、表格、编号列表和代码块。关键是每个 chunk 都要保留可追溯信息。否则答案引用无法定位,用户也无法核验。
四、调参、成本与质量之间的架构权衡
RAG 召回质量治理最难的地方,不是写代码,而是做取舍。
第一个取舍是 chunk 大小。chunk 越小,语义越聚焦,检索更容易命中细节。但上下文可能不完整,模型看到的证据会断。chunk 越大,上下文更完整,但向量表达会变得平均,容易召回一段“什么都沾点”的内容。经验上,制度类文档可以从 500 到 1000 中文字符开始试,技术文档可以按标题层级和代码块结构动态切。
第二个取舍是 TopK。TopK 太小,召回漏掉关键证据。TopK 太大,重排成本和 Prompt 成本都会上升。更糟糕的是,过多弱相关证据会干扰大模型,让它在一堆半相关内容里自由发挥。线上不要只看命中率,还要看最终答案引用的证据是否真的支持结论。
第三个取舍是 Query 改写。改写能提升召回,但也可能引入幻觉。比如用户问“报销上限是多少”,改写器如果擅自补成“差旅报销上限”,就会把问题带偏。比较稳的方案是保留原始 Query,同时生成 1 到 3 个检索子问题。最终日志里必须记录改写结果,方便回放。
第四个取舍是重排模型。重排模型越强,召回排序越准,但延迟和成本越高。对于低频知识库,可以用云端 reranker。对于高频客服,可以用轻量本地模型,或者只在粗排分数接近时触发重排。别为了理论最优,把每次问答都堆成成本怪物。
第五个取舍是可观测性成本。记录完整召回链路会占用存储,也可能涉及敏感文本。企业知识库尤其要注意权限和脱敏。日志里可以存 chunk ID、分数、文档标题和 hash。原文内容是否落日志,要看合规要求。能不裸奔就别裸奔,数据安全不是出事后才想起来的装饰品。
还有一个禁用场景要明确:如果业务需要严格事实一致性,但知识源本身没有结构化、没有版本控制、没有权限边界,先别急着上 RAG。先治理知识库。RAG 只能放大知识库的可用性,不能把一堆过期制度自动炼成真理。
五、总结
RAG 生产质量的核心,不是“接一个向量库”,而是把文档切片、混合召回、候选去重、重排排序和证据引用串成一条可观察、可回放、可调参的链路。
落地时可以按三步推进。
第一步,先做好结构化切片。保留标题、段落、表格和文档来源。每个 chunk 都必须能追溯到原文位置。
第二步,引入混合召回。向量召回负责语义相似,关键词召回负责术语和编号。两者合并后去重,再用粗排控制候选规模。
第三步,加上重排和质量日志。重排负责把“看起来相关”的片段变成“真正能回答”的证据。质量日志负责让每次错误都能复盘,而不是靠感觉改参数。
RAG 系统最终拼的不是谁的 Prompt 更花哨,而是谁能把证据链治理清楚。答案错了不可怕,可怕的是不知道为什么错。把召回链路做成可调试系统,后续无论换 Embedding、换向量库,还是换重排模型,都有明确抓手。这才是大模型应用落地里真正划算的 ROI。
