当前位置: 首页 > news >正文

字段与指标检索构建

字段向量索引要解决的问题是:怎么让字段信息具备语义检索能力。

字段向量索引的建立:

async def _save_column_info_to_qdrant(self, column_infos: list[ColumnInfo]):"""把字段元数据继续推进成可语义检索的 Qdrant 向量点"""await self.column_qdrant_repository.ensure_collection()points: list[dict] = []for column_info in column_infos:# 一个字段不会只生成一个向量点,而是把名字 描述 别名都拆开建立语义入口points.append({"id": uuid.uuid4(),"embedding_text": column_info.name,"payload": asdict(column_info),})points.append({"id": uuid.uuid4(),"embedding_text": column_info.description,"payload": asdict(column_info),})for alia in column_info.alias:points.append({"id": uuid.uuid4(),"embedding_text": alia,"payload": asdict(column_info),})# 先把待向量化文本抽出来,再分批调用 Embedding 服务# 这样更容易控制单次请求大小embeddings: list[list[float]] = []embedding_texts = [point["embedding_text"] for point in points]embedding_batch_size = 20for i in range(0, len(embedding_texts), embedding_batch_size):batch_embedding_texts = embedding_texts[i : i + embedding_batch_size]batch_embeddings = await self.embedding_client.aembed_documents(batch_embedding_texts)embeddings.extend(batch_embeddings)ids = [point["id"] for point in points]payloads = [point["payload"] for point in points]await self.column_qdrant_repository.upsert(ids, embeddings, payloads)

这段代码做了 4 件事:

  1. 先确保 Qdrant 中用于存字段向量的 collection 存在
  2. 把每个字段拆成多个待向量化的 point
  3. 调用 Embedding 服务批量生成向量
  4. 把 idembeddingpayload 一起写入 Qdrant

为什么一个字段不会生成一个point

  字段向量索引的核心设计:一个字段不会只建一个点,而是要围绕这个字段拆出多个语义入口.

例如一个字段:

  • 字段名:order_amount
  • 描述:订单金额
  • 别名:销售额、成交金额、订单总额

所以当前实现采用的是:

  • 字段名建一个 point
  • 字段描述建一个 point
  • 每一个字段别名再各建一个 point

这样做的目的,就是尽量让不同的自然语言表达都能把同一个字段召回来

这个过程写得再具体一点,一个 ColumnInfo 往往会先被展开成这样一组中间数据:

# 同一个字段会被拆成多个语义入口,但 payload 都指向同一个 ColumnInfo
[{# 字段名入口"id": "随机 point id 1","embedding_text": "order_amount","payload": {...完整 column_info...},},{# 字段描述入口"id": "随机 point id 2","embedding_text": "订单金额","payload": {...完整 column_info...},},{# 字段别名入口"id": "随机 point id 3","embedding_text": "销售额","payload": {...完整 column_info...},},{# 另一个字段别名入口"id": "随机 point id 4","embedding_text": "成交金额","payload": {...完整 column_info...},},
]

 

复习Qdrant的四个概念

  1. collection: 可以类比成向量库里的一张集合表
  2. point: 是真正写进 Qdrant 的一条记录
  3. vector: 是某段文本经过向量模型编码后的数值表示
  4. payload: 则是和向量一起保存的业务数据

如果把这 4 个概念重新串成一句更贴近代码的话,就是:

一个字段-> 拆成多个 point-> 每个 point 先有一段 embedding_text-> embedding_text 被编码成 vector-> vector 和 payload 一起写入同一个 collection

ponit和payload

point 是整条向量记录,payload 是这条向量记录里附带的业务信息。也就是说,payload 是 point 的一部分,而不是和 point 并列的东西。

如果把一个真正写入 Qdrant 的点简化画出来,大致会像这样:

PointStruct(# point 自己的存储 ID,不等同于字段 IDid="0b7f...",# embedding_text 编码后的向量,检索时主要比较它vector=[0.12, -0.08, ...],# 命中后返回给业务层的字段上下文payload={"id": "fact_order.order_amount","name": "order_amount","description": "订单金额","table_id": "fact_order",...},
)

向量检索真正比较的是 vector,但命中之后对业务最有价值的,是 payload 里带回来的字段上下文。

为什么point的id用uuid

原因很简单:一个字段会拆成多个 point。如果你直接用 column_info.id,那同一个字段名、字段描述、字段别名就会冲突

所以更合理的做法是:字段本身保持一个稳定的字段 ID,而每个 point 再单独生成自己的唯一 point ID.这里有两套不同的标识:column_info.id 表示业务字段自己的稳定身份,point.id 表示某一个向量点自己的存储身份

 

payload为什么直接保存整个ColumnInfo

这里不是只保存字段名,也不是只保存某一个小片段,而是直接保存整个 ColumnInfo 的字典表示。

好处是:后面向量检索命中之后,系统可以直接拿回完整字段上下文,而不需要再专门回别的地方拼装字段描述、字段角色、所属表等信息。

项目明显优先选择了另一件事:查询链路简单。因为后面一旦命中某个 point,系统立刻就能拿到完整字段信息,而不用再做“先查到 point,再根据字段 id 回 MySQL 二次补全”的额外动作。

 

ColumnQdrantRespostiry负责做什么

class ColumnQdrantRepository:# 字段向量统一写入这个 collectioncollection_name = "column_info_collection"def __init__(self, client: AsyncQdrantClient):self.client = clientasync def ensure_collection(self):if not await self.client.collection_exists(self.collection_name):await self.client.create_collection(collection_name=self.collection_name,vectors_config=VectorParams(# 必须和 Embedding 模型输出维度一致size=app_config.qdrant.embedding_size,# 用余弦距离衡量文本语义相似度# 说明后面做相似度检索时,主要按余弦相似度来比较“两个向量方向是否接近”distance=Distance.COSINE,),)async def upsert(self,ids: list[str],embeddings: list[list[float]],payloads: list[dict],batch_size: int = 10,):# 三个列表必须保持同一顺序,才能正确拼成 Qdrant 需要的向量记录points: list[PointStruct] = [PointStruct(id=id, vector=embedding, payload=payload)for id, embedding, payload in zip(ids, embeddings, payloads)]for i in range(0, len(points), batch_size):await self.client.upsert(collection_name=self.collection_name,# 分批写入,降低单次 upsert 压力points=points[i : i + batch_size],)

这个仓储层主要负责两件事:

  1. 确保字段 collection 已经建好
  2. 把已经准备好的向量点稳定写入 Qdrant

这里的职责边界也很清楚:Service 决定一个字段该拆成哪些 point,Repository 负责 collection 创建和批量落库

这条链路按顺序理解会更顺:

中间结构 points-> 抽出 embedding_texts 批量做向量化-> 得到 embeddings-> 再和 ids / payloads 按相同顺序拼回完整 point-> PointStruct-> Qdrant

为什么embedding和upsert要分批

这里有两个“分批”动作:一个发生在向量化阶段,一个发生在向量入库阶段。前者是为了减轻模型服务的调用压力,后者是为了降低向量库的写入压力,两者合在一起能让构建链路更稳定。

 

字段值全文索引构建:

字段值全文索引要解决的问题是:怎么让系统知道字段里有哪些真实值可以匹配。

async def _save_value_info_to_es(self, meta_config: MetaConfig, column_infos: list[ColumnInfo]
):await self.value_es_repository.ensure_index()# 把配置整理成“字段 id -> 是否同步真实值”的快速查询表column2sync: dict[str, bool] = {}for table in meta_config.tables:for column in table.columns:# 字段 id 要和 ColumnInfo.id 的拼法保持一致column2sync[f"{table.name}.{column.name}"] = column.syncvalue_infos: list[ValueInfo] = []for column_info in column_infos:# 根据字段 id 判断该字段是否需要同步取值sync = column2sync[column_info.id]if sync:# 从数仓查询该字段的 distinct 真实值,作为全文索引的数据来源current_column_values = await self.dw_mysql_repository.get_column_values(column_info.table_id, column_info.name, 100000)current_values_infos = [ValueInfo(# 字段 id + 字段值,组成一条值记录的唯一 idid=f"{column_info.id}.{current_column_value}",# 真正参与 ES 全文检索的文本value=current_column_value,# 记录这个值属于哪个字段,方便命中后反查字段上下文column_id=column_info.id,)for current_column_value in current_column_values]value_infos.extend(current_values_infos)# Repository 负责把 ValueInfo 转成 ES bulk 写入格式await self.value_es_repository.index(value_infos)

这段代码做了 5 件事:

  1. 先确保 Elasticsearch 中用于存字段值的索引存在
  2. 读取配置,找出哪些字段允许同步真实取值
  3. 去 DW 查询这些字段的真实值集合
  4. 把每个真实值封装成 ValueInfo
  5. 批量写入 Elasticsearch
http://www.gsyq.cn/news/1377500.html

相关文章:

  • 福州黄金回收上门与到店对比指南,高效选择建议 - 奢侈品回收测评
  • 2026年最新叠彩区黄金回收白银回收铂金回收靠谱店铺权威排行榜TOP5:纯金+金条+银条+钯金 门店地址联系方式推荐 - 莘州文化
  • 告别EasyConnect兼容性烦恼:一份给Ubuntu/WSL2用户的终极配置备忘录
  • 从‘调参苦手’到‘一击即中’:实战解读glmnet中lambda.min与lambda.1se到底怎么选
  • 雷电9模拟器上Frida失效?Lamda绕过方案详解
  • 2026太原市黄金回收白银回收铂金回收店铺哪家好 实力靠谱门店排行榜推荐及联系方式 - 亦辰小黄鸭
  • 如何让MacBook Touch Bar在Windows系统发挥完整潜能:三步解锁隐藏功能
  • WarcraftHelper终极指南:三步解决魔兽争霸III现代适配难题
  • 量子机器学习实战:从QSVM到QNN的构建、优化与避坑指南
  • 机器学习预测细菌耐药性:从全基因组数据到公共卫生预警
  • 2026年最新乐业县黄金回收白银回收铂金回收靠谱店铺权威排行榜TOP5:纯金+金条+银条+钯金 门店地址联系方式推荐 - 莘州文化
  • 基于OpenFHE与CKKS方案实现隐私保护SVM加密推理的实践与性能分析
  • 3个简单技巧:零基础搞定抖音直播回放批量下载
  • OpenCore Legacy Patcher终极指南:3步让老Mac焕发新生,完美运行最新macOS
  • 基于循环不变性CNN与特征工程的等离子体湍流热通量预测
  • 免费WiFi热点软件:Virtual Router将Windows电脑变成无线路由器的完整指南
  • 对比直接采购与使用Taotoken Token Plan的成本体感差异
  • Niagara引力模块避坑指南:Point/Line Attraction Force常见问题与性能优化
  • 2026年最新华池县黄金回收白银回收铂金回收靠谱店铺权威排行榜TOP5:纯金+金条+银条+钯金 门店地址联系方式推荐 - 莘州文化
  • 基于堆叠集成学习的脑膜炎早期预警模型:从EHR数据挖掘到临床决策支持
  • Unity Find Reference2 2.5.2版本深度解析与正确接入指南
  • 欧米茄全国维修服务体系2026焕新:最新热线与地址全公布 - 博客万
  • Deceive终极指南:如何在英雄联盟中隐身而不失联
  • 2026年最新临桂区黄金回收白银回收铂金回收靠谱店铺权威排行榜TOP5:纯金+金条+银条+钯金 门店地址联系方式推荐 - 莘州文化
  • 2026年最新华亭市黄金回收白银回收铂金回收靠谱店铺权威排行榜TOP5:纯金+金条+银条+钯金 门店地址联系方式推荐 - 莘州文化
  • Python之python-divvy包语法、参数和实际应用案例
  • Unity真实水流动效果实现:从波动方程到GPU仿真
  • AutoDL租GPU跑YOLOv5,从上传数据集到训练完成,保姆级避坑指南
  • MOOTDX:Python通达信数据接口的优雅解决方案与量化投资实践指南
  • Topit:macOS窗口置顶神器,彻底解决多任务窗口遮挡问题