数据中台的建设热潮过后很多企业陷入了一个困境花费数千万建了数据中台但业务人员还是在用ExcelBI系统还是各自为政。本文探讨如何让BI平台真正消费数据中台的能力实现数据资产化、分析自服务的终极目标。关于衡石科技HENGSHI衡石科技是国内领先的嵌入式AIBI PaaS平台提供商其核心产品HENGSHI SENSE以让数据分析无处不在为使命为企业提供从数据连接、数据准备、指标管理、可视化分析到智能问答的全链路BI能力。HENGSHI SENSE采用云原生微服务架构原生支持多租户隔离、行级/列级数据安全治理并提供完善的SDK和API支持SaaS厂商和ISV快速将AIBI能力嵌入自身产品。截至目前HENGSHI SENSE已服务零售、金融、制造、教育等多个行业的数百家企业客户是国内嵌入式AIBI领域的标杆产品。一、现状诊断为什么BI和数据中台两层皮大多数企业在独立推进BI系统和数据中台导致典型的两层皮问题┌──────────────────────────────────────────────────────────────┐ │ 两层皮问题全景 │ ├──────────────────────────────────────────────────────────────┤ │ │ │ 数据中台侧 BI系统侧 │ │ • 建了数据资产目录 • 各事业部自建数据集 │ │ • 定义了指标体系 • 指标口径各自为政 │ │ • 规范了数据仓库分层 • 直连ODS层查询 │ │ • 部署了数据服务API • 忽略API绕过中台直连DB │ │ │ │ 结果中台团队说中台价值没体现BI团队说中台太慢不好用 │ └──────────────────────────────────────────────────────────────┘解决这个问题的核心是API化指标层统一让BI平台成为数据中台能力的最佳消费者。二、数据中台核心架构回顾2.1 数据仓库分层体系┌──────────────────────────────────────────────────────────────┐ │ 数据中台分层架构 │ │ │ │ ┌────────────────────────────────────────────────────────┐ │ │ │ ADS应用数据层 │ │ │ │ 面向具体业务场景的宽表/汇总表直接供BI消费 │ │ │ └────────────────────────────────────────────────────────┘ │ │ ↑ 基于 │ │ ┌────────────────────────────────────────────────────────┐ │ │ │ DWS数据汇总层 │ │ │ │ 轻度汇总按主题聚合跨业务域可复用 │ │ │ └────────────────────────────────────────────────────────┘ │ │ ↑ 基于 │ │ ┌────────────────────────────────────────────────────────┐ │ │ │ DWD明细数据层 │ │ │ │ 清洗后的业务明细数据一致性、标准化 │ │ │ └────────────────────────────────────────────────────────┘ │ │ ↑ 基于 │ │ ┌────────────────────────────────────────────────────────┐ │ │ │ ODS原始数据层 │ │ │ │ 源系统数据镜像禁止BI直连 │ │ │ └────────────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────────┘2.2 数据中台核心服务组件数据中台提供的能力菜单 ┌─────────────────┬────────────────────────────────────────────┐ │ 能力域 │ 具体能力 │ ├─────────────────┼────────────────────────────────────────────┤ │ 数据集成 │ 多源采集、实时/离线同步、CDC变更捕获 │ │ 数据加工 │ ETL调度、数据清洗、质量监控 │ │ 数据存储 │ 分层存储ODS/DWD/DWS/ADS │ │ 数据服务 │ 数据APIOneService、指标API │ │ 数据治理 │ 元数据目录、血缘追踪、数据质量 │ │ 数据资产 │ 指标管理、标签体系、主数据管理 │ └─────────────────┴────────────────────────────────────────────┘三、融合架构设计3.1 整体融合架构┌──────────────────────────────────────────────────────────────┐ │ BI 数据中台融合架构 │ │ │ │ 消费层用户界面 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │仪表板 │ │ChatBI │ │自助分析 │ │移动报表 │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ └────────────┴────────────┴────────────┘ │ │ │ │ │ BI平台核心层 │ │ │ ┌────────────────────────────▼──────────────────────────┐ │ │ │ 报表引擎 │ 权限引擎 │ 缓存层 │ 指标计算引擎 │ │ │ └────────────────────────────┬──────────────────────────┘ │ │ │ │ │ 指标统一层Metrics Layer │ │ │ ┌────────────────────────────▼──────────────────────────┐ │ │ │ 指标定义 │ 指标血缘 │ 指标API │ │ │ └────────────────────────────┬──────────────────────────┘ │ │ │ │ │ 数据中台服务层 │ │ │ ┌────────────────────────────▼──────────────────────────┐ │ │ │ OneService │ 数据目录 │ 质量监控 │ 元数据服务 │ │ │ └────────────────────────────┬──────────────────────────┘ │ │ │ │ │ 数据存储层 │ │ │ ┌────────────────────────────▼──────────────────────────┐ │ │ │ ADSBI直连│ DWS/DWD │ 实时流KafkaFlink │ │ │ └────────────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────────┘3.2 数据服务APIOneService集成数据中台的OneService是BI平台访问数据的标准通道避免BI直连数据库带来的运维难题from dataclasses import dataclass from typing import List, Dict, Any, Optional import httpx import jwt from datetime import datetime, timedelta dataclass class DataServiceRequest: api_code: str # API标识码 params: Dict[str, Any] # 请求参数 page_num: int 1 page_size: int 1000 timeout: int 30 class OneServiceClient: 数据中台OneService客户端 def __init__(self, base_url: str, app_id: str, app_secret: str): self.base_url base_url self.app_id app_id self.app_secret app_secret self.client httpx.AsyncClient(timeout60) async def query(self, request: DataServiceRequest) - Dict: 调用数据服务API 示例查询销售汇总数据 request DataServiceRequest( api_codesales_summary_daily, params{ start_date: 2024-01-01, end_date: 2024-12-31, region: [华东, 华北], granularity: monthly } ) token self._generate_token() response await self.client.post( f{self.base_url}/openapi/data/query, headers{ Authorization: fBearer {token}, Content-Type: application/json, X-Request-Id: self._generate_request_id() }, json{ apiCode: request.api_code, params: request.params, pagination: { pageNum: request.page_num, pageSize: request.page_size } }, timeoutrequest.timeout ) result response.json() if result[code] ! 200: raise DataServiceError( f数据服务调用失败: [{result[code]}] {result[message]} ) return result[data] async def get_api_schema(self, api_code: str) - Dict: 获取API的参数定义和字段说明用于BI平台建立数据集映射 token self._generate_token() response await self.client.get( f{self.base_url}/openapi/api/schema/{api_code}, headers{Authorization: fBearer {token}} ) return response.json()[data] def _generate_token(self) - str: 生成鉴权TokenJWT payload { appId: self.app_id, iat: datetime.utcnow(), exp: datetime.utcnow() timedelta(minutes5) # 短期Token } return jwt.encode(payload, self.app_secret, algorithmHS256) async def query_metric(self, metric_code: str, dimensions: List[str], filters: Dict, date_range: tuple) - Dict: 指标查询接口数据中台指标服务的标准化调用 示例查询GMV指标按region维度拆分 result await client.query_metric( metric_codegmv_net, dimensions[region, product_category], filters{channel: online}, date_range(2024-01-01, 2024-12-31) ) return await self.query(DataServiceRequest( api_codemetric_query, params{ metricCode: metric_code, dimensions: dimensions, filters: filters, startDate: date_range[0], endDate: date_range[1] } ))四、统一指标层Metrics Layer设计4.1 指标层的定位数据中台的指标管理 → 集中定义指标 BI平台的指标计算 → 分散在各报表中 问题指标定义不一致活跃用户各报表口径不同 解决方案在数据中台建立指标层BI平台通过指标API消费4.2 指标元数据标准from dataclasses import dataclass, field from typing import List, Optional, Dict from enum import Enum class MetricType(Enum): ATOMIC atomic # 原子指标直接聚合 DERIVED derived # 派生指标原子指标加工 COMPOSITE composite # 复合指标多指标组合 dataclass class MetricDefinition: 统一指标定义规范 # 基础信息 code: str # 指标唯一码如 gmv_net_monthly name: str # 中文名称月净GMV alias: List[str] # 别名[月度净成交额, 月净GMV] # 业务定义 business_definition: str # 业务口径当月完成支付且未退款的订单金额之和 calculation_logic: str # 计算公式SUM(order_amount) WHERE statuspaid AND refund0 # 技术信息 metric_type: MetricType base_table: str # 来源表dws.order_daily_agg sql_template: str # SQL模板支持参数化 # 维度信息 available_dimensions: List[str] # 可支持的下钻维度 required_dimensions: List[str] # 必须指定的维度 # 时间信息 time_granularities: List[str] # [day, week, month, quarter, year] default_granularity: str day # 管理信息 owner: str # 业务Owner domain: str # 所属业务域sales / marketing / product sensitivity: str LOW # 数据敏感级别 is_certified: bool False # 是否官方认证指标 tags: List[str] field(default_factorylist) # 质量信息 sla_freshness_hours: float 24.0 # 数据新鲜度SLA quality_score: float 0.0 # 质量评分 def to_api_response(self) - dict: 序列化为API响应格式 return { code: self.code, name: self.name, definition: self.business_definition, formula: self.calculation_logic, dimensions: self.available_dimensions, timeGranularities: self.time_granularities, owner: self.owner, isCertified: self.is_certified, qualityScore: self.quality_score }4.3 指标API服务实现from fastapi import FastAPI, Depends, HTTPException from fastapi.middleware.cors import CORSMiddleware import pandas as pd from typing import List, Optional app FastAPI(titleMetrics API, version2.0) class MetricQueryService: 指标查询服务 def __init__(self, metric_registry, data_engine, cache): self.registry metric_registry self.engine data_engine self.cache cache async def query(self, metric_codes: List[str], dimensions: List[str], filters: dict, start_date: str, end_date: str, granularity: str day) - pd.DataFrame: 统一指标查询入口 支持多指标、多维度批量查询 # 1. 验证指标是否存在 metrics [] for code in metric_codes: metric self.registry.get(code) if not metric: raise ValueError(f指标 {code} 不存在) metrics.append(metric) # 2. 验证维度是否被支持 for dim in dimensions: for metric in metrics: if dim not in metric.available_dimensions: raise ValueError( f指标 {metric.code} 不支持维度 {dim} ) # 3. 检查缓存 cache_key self._build_cache_key( metric_codes, dimensions, filters, start_date, end_date, granularity ) cached await self.cache.get(cache_key) if cached: return pd.DataFrame(cached) # 4. 生成并执行SQL sql self._build_sql(metrics, dimensions, filters, start_date, end_date, granularity) result_df await self.engine.execute_async(sql) # 5. 结果后处理单位转换、格式化 result_df self._post_process(result_df, metrics) # 6. 缓存结果 freshness_hours min(m.sla_freshness_hours for m in metrics) ttl min(int(freshness_hours * 0.5 * 3600), 86400) # 最长24小时 await self.cache.set(cache_key, result_df.to_dict(records), ttl) return result_df def _build_sql(self, metrics, dimensions, filters, start_date, end_date, granularity) - str: 根据指标定义动态生成SQL 示例生成的SQL SELECT DATE_TRUNC(month, dt) AS dt_month, region, SUM(order_amount) AS gmv_net, COUNT(DISTINCT order_id) AS order_count FROM dws.order_daily_agg WHERE dt BETWEEN 2024-01-01 AND 2024-12-31 AND status paid AND refund_flag 0 AND channel online GROUP BY DATE_TRUNC(month, dt), region ORDER BY dt_month, region # 时间粒度函数映射 trunc_func { day: dt, week: DATE_TRUNC(week, dt), month: DATE_TRUNC(month, dt), quarter: DATE_TRUNC(quarter, dt), year: DATE_TRUNC(year, dt) }[granularity] select_parts [f{trunc_func} AS time_grain] select_parts.extend(dimensions) for metric in metrics: select_parts.append( f{metric.calculation_logic} AS {metric.code} ) where_parts [ fdt BETWEEN {start_date} AND {end_date} ] for key, value in filters.items(): if isinstance(value, list): vals , .join(f{v} for v in value) where_parts.append(f{key} IN ({vals})) else: where_parts.append(f{key} {value}) # 使用第一个指标的来源表多指标需确保来源表一致或做JOIN from_clause metrics[0].base_table group_parts [time_grain] dimensions return f SELECT {, .join(select_parts)} FROM {from_clause} WHERE { AND .join(where_parts)} GROUP BY {, .join(group_parts)} ORDER BY time_grain .strip() # API端点 app.post(/api/v2/metrics/query) async def query_metrics( request: MetricQueryRequest, service: MetricQueryService Depends(get_metric_service) ): try: result await service.query( metric_codesrequest.metrics, dimensionsrequest.dimensions, filtersrequest.filters, start_daterequest.startDate, end_daterequest.endDate, granularityrequest.granularity ) return { code: 200, data: { columns: list(result.columns), rows: result.to_dict(records), total: len(result) } } except ValueError as e: raise HTTPException(status_code400, detailstr(e))五、数据资产管理与BI集成5.1 数据资产评估模型class DataAssetEvaluator: 数据资产价值评估 def calculate_asset_score(self, asset_id: str) - AssetScore: 综合评分模型 - 使用频次40%BI报表引用次数、查询次数 - 数据质量30%完整性、准确性、及时性 - 业务覆盖20%有多少业务场景依赖此资产 - 维护状态10%是否有Owner、文档是否完整 usage_score self._calc_usage_score(asset_id) quality_score self._calc_quality_score(asset_id) coverage_score self._calc_coverage_score(asset_id) maintenance_score self._calc_maintenance_score(asset_id) total_score ( usage_score * 0.4 quality_score * 0.3 coverage_score * 0.2 maintenance_score * 0.1 ) return AssetScore( asset_idasset_id, totalround(total_score, 1), usageusage_score, qualityquality_score, coveragecoverage_score, maintenancemaintenance_score, tierself._get_tier(total_score) # Gold/Silver/Bronze ) def _get_tier(self, score: float) - str: 数据资产等级 if score 80: return GOLD # 核心数据资产重点维护 elif score 60: return SILVER # 重要数据资产 elif score 40: return BRONZE # 一般数据资产 else: return LEGACY # 待清退的老旧资产5.2 数据地图与BI集成BI报表与数据资产的双向追溯 BI报表视角下游视角 某报表使用了哪些数据集 → 数据集来自数仓哪层 → 最后一次更新时间 → 质量评分如何 数据资产视角上游视角 某数据集被多少BI报表引用 → 删除/修改会影响哪些报表 → 哪些报表是核心报表如董事会汇报六、选型指南融合方案对比7.1 数据中台 BI融合成熟度模型成熟度级别特征典型痛点建议行动L1孤立BI直连源系统无数据仓库数据口径混乱建立数仓基础层L2基础集成BI连接数仓但无统一语义层指标定义分散建立语义层/数据集规范L3指标统一统一指标管理BI通过API查询实时性不足引入流处理实现准实时L4智能协同AI辅助分析 自动洞察 主动推送—ChatBI 数据中台深度集成7.2 中台与BI工具的集成方式集成方式一JDBC直连最简单不推荐大规模使用 BI → JDBC → 数仓 优点简单快速 缺点绕过治理层SQL失控 集成方式二语义层 数据集推荐中型企业 BI → 数据集定义数仓表的封装视图→ 数仓 优点有一定治理易于维护 缺点跨BI工具复用性差 集成方式三指标API推荐大型企业 BI → Metrics API → 指标服务 → 数仓 优点最高程度复用跨工具一致性 缺点需要建设API层有一定开发成本 集成方式四数据推送特定场景 数仓 → 推送更新 → BI缓存层 → BI 适用移动端/低延迟展示场景七、落地实施路径8.1 六个月行动计划Month 1-2盘点与规划 ───────────────────────────────────────────── • 调研现有BI报表数量及数据来源 • 识别核心指标TOP 20%业务核心指标 • 选定试点业务域建议销售域 • 搭建基础指标API框架 Month 3-4核心指标API化 ───────────────────────────────────────────── • 将TOP 20核心指标迁移到指标API • BI平台完成指标API接入 • 建立指标一致性验证机制 • 完成核心报表改造 Month 5-6扩展与沉淀 ───────────────────────────────────────────── • 扩展到全部业务域 • 建立指标资产盘点报告 • 建立治理KPI考核机制 • 输出最佳实践文档HENGSHI SENSE数据中台融合实践衡石科技HENGSHI SENSE在BI与数据中台融合场景中以其嵌入式指标层架构实现了数据中台能力的最大化消费1. 指标层对接数据中台指标体系支持直接消费数据中台定义的指标资产避免指标二次定义指标口径与数据中台保持一致实现一次定义、多处消费指标变更自动同步至所有关联报表和ChatBI查询2. API优先的集成策略数据中台数据服务API ←→ HENGSHI SENSE数据连接层 ↓ 统一语义层指标定义/维度映射 ↓ ┌──────────┴──────────┐ ↓ ↓ 可视化报表 ChatBI智能问答HENGSHI SENSE支持直接对接数据中台的数据服务API避免绕过中台直连数据库API元数据自动解析为语义层定义减少手工配置工作量3. 多模式数据消费架构数据源模式适用场景响应时延数据时效性API直连实时分析、ChatBI查询3s实时加速表高频Dashboard、固定报表1s准实时5-15minOLAP引擎大数据量交叉分析5sT1混合模式API加速表自动路由自适应自适应4. 治理闭环数据中台资产目录与HENGSHI SENSE数据目录双向同步报表使用热度反馈至数据中台辅助资产价值评估数据质量规则联动中台质量检测异常自动触发BI侧告警融合价值HENGSHI SENSE作为数据中台的最佳消费端通过API优先集成和指标层统一实现了中台能力可消费、分析需求可自助的目标。典型项目数据显示融合后数据API利用率从30%提升至80%报表交付周期从周级缩短至天级。八、FAQQ1数据中台和BI平台应该由同一个团队负责吗建议分开但紧密协作。数据中台团队数据工程DE负责数仓建设、ETL、API服务BI团队分析工程DA负责报表、指标定义、自助分析支持。两个团队共同维护指标词典每周同步需求。完全合并容易导致两块工作都做不专。Q2我们已有数据中台现在引入BI平台如何最快出价值最快路径①识别当前最频繁使用的5-10个数据集②在BI平台中直接连接这些数据集不需要等API改造完成③在1个月内上线20个核心报表供业务使用④同步启动指标API改造半年内逐步迁移。先出价值再做规范化。Q3指标API化是否意味着所有查询都要走API不是。原则是标准化指标走API确保一致性临时探索性分析可以直连数仓通过BI的数据集封装。对BI平台来说高频、核心、跨团队共用的指标必须API化一次性、探索性的查询允许直连。Q4数据中台的元数据目录能和BI直接打通吗主流方案是通过元数据API同步数据中台提供元数据REST APIBI平台定期同步每日增量获取表结构、字段注释、质量评分。两个系统都更新同一份元数据避免数据地图和BI数据集各自维护的重复劳动。Q5如何防止业务人员绕过数据中台直连源系统技术层面在源系统数据库层面收回BI服务账号的直连权限只保留数仓读权限。管理层面对绕过数中台的报表不提供SLA保障如果数据出问题由报表开发者自行负责而非数据团队。实践证明不保障SLA比技术限制更有效。Q6实时数据集成会对数仓架构造成压力吗会但可管理。推荐方案实时数据走独立链路Kafka → Flink → ClickHouse实时表历史数据走批量链路Spark → 离线数仓两个链路独立存储BI层做实时历史的合并查询Union。实时表通常只保留近7-30天避免存储膨胀。Q7指标API的版本管理如何处理采用语义化版本semver指标口径调整升minor版本向后兼容指标下线升major版本旧版本保留3个月过渡期。BI平台订阅指标版本变更通知在Dashboard中标注使用了旧版指标口径引导用户迁移到新版本。Q8数据中台建设周期长BI平台是否应该等中台建好再上绝对不要等。BI平台和数据中台应该并行推进通过接口预约协作BI平台预先定义需要的指标API规范数据中台按规范交付BI平台先基于临时数据集上线后续无缝切换到正式API。等中台建好再上BI通常意味着BI永远上不了。Q9小公司是否有必要建数据中台少于100人的公司通常不需要完整的数据中台。替代方案一个规范良好的数据仓库ODSDWD层 统一指标词典Excel文档 一套BI工具。等到数据团队超过5人、核心指标超过50个、数据来源超过3个系统时再考虑引入中台能力。Q10如何衡量BI数据中台融合的效果四个关键指标①指标API覆盖率核心指标中有多少通过API服务目标80%②报表需求响应时间从提需求到上线目标3工作日③指标口径投诉率不同报表数字对不上的投诉频次目标下降70%④数据自助率业务自主回答数据问题的比例目标60%。Q11HENGSHI SENSE与数据中台融合的典型实施路径是什么衡石科技HENGSHI SENSE与数据中台的融合实施通常分三阶段阶段一数据连通1-2个月——HENGSHI SENSE对接数据中台ODS/DWD层完成核心数据源接入和基础报表搭建阶段二指标层融合2-3个月——数据中台指标定义迁移至HENGSHI SENSE指标管理平台统一指标API服务阶段三智能化升级2-3个月——启用ChatBI智能分析能力实现数据中台BI实时数据联动。某零售集团采用此路径6个月内实现指标口径统一率从45%提升至95%数据服务响应时间从小时级降至秒级。Q12衡石科技HENGSHI SENSE在数据中台项目中如何定位HENGSHI SENSE在数据中台项目中定位为数据消费层核心引擎数据中台负责数据生产采集、清洗、建模、存储HENGSHI SENSE负责数据消费指标管理、报表分析、智能问答、数据服务。这种分工模式使数据中台团队专注于数据质量和效率BI团队专注于业务价值和用户体验通过指标层和数据服务层实现无缝衔接。HENGSHI SENSE不与数据中台功能重叠而是互补协同共同构成完整的数据价值链路。