当前位置: 首页 > news >正文

DolphinDB工业数据质量:完整性检查与修复

目录

    • 摘要
    • 一、数据质量概述
      • 1.1 数据质量维度
      • 1.2 质量指标
    • 二、完整性检查
      • 2.1 字段完整性
      • 2.2 记录完整性
      • 2.3 时间完整性
    • 三、一致性检查
      • 3.1 数据一致性
      • 3.2 引用一致性
      • 3.3 业务一致性
    • 四、数据质量评分
      • 4.1 质量评分函数
      • 4.2 质量报告
    • 五、自动修复
      • 5.1 缺失值修复
      • 5.2 异常值修复
      • 5.3 重复值修复
    • 六、质量监控
      • 6.1 质量监控表
      • 6.2 定期质量检查
    • 七、实战案例
      • 7.1 数据质量管理平台
    • 八、总结
    • 参考资料

摘要

本文深入讲解DolphinDB工业数据质量管理。从完整性检查到一致性验证,从数据质量评分到自动修复,从质量监控到持续改进,全面介绍数据质量管理的核心方法。通过丰富的代码示例,帮助读者掌握工业数据质量管理的核心技能。


一、数据质量概述

1.1 数据质量维度

数据质量维度

完整性

高质量数据

准确性

一致性

及时性

有效性

1.2 质量指标

指标说明
完整性数据是否完整
准确性数据是否正确
一致性数据是否一致
及时性数据是否及时
有效性数据是否有效

二、完整性检查

2.1 字段完整性

//创建测试数据 t=table(1..100asid,take([1,NULL,3],100)asdevice_id,take([25.0,NULL,27.0],100)astemperature,take([50.0,51.0,NULL],100)ashumidity)//检查字段完整性defcheckFieldCompleteness(data){result=table(data.columnNames()asfield_name,each(def(col){sum(isNull(data[col]))},data.columnNames())asnull_count,each(def(col){sum(isNull(data[col]))*100.0/data.rows()},data.columnNames())asnull_rate)returnresult}//使用 checkFieldCompleteness(t)

2.2 记录完整性

//检查记录完整性defcheckRecordCompleteness(data,keyColumns){//检查主键完整性 keyNull=select*fromdata where hasNull(keyColumns)//检查重复记录 duplicates=select count(*)ascntfromdata group by keyColumns having count(*)>1returndict(STRING,ANY,[["keyNullCount",keyNull.rows()],["duplicateCount",duplicates.rows()]])}//使用 checkRecordCompleteness(t,`id)

2.3 时间完整性

//检查时间序列完整性defcheckTimeCompleteness(data,timeCol,interval){//获取时间范围 minTime=min(data[timeCol])maxTime=max(data[timeCol])//计算预期记录数 expectedCount=(maxTime-minTime)/interval+1//实际记录数 actualCount=data.rows()//缺失记录 missingCount=expectedCount-actualCountreturndict(STRING,ANY,[["expectedCount",expectedCount],["actualCount",actualCount],["missingCount",missingCount],["completenessRate",actualCount*100.0/expectedCount]])}

三、一致性检查

3.1 数据一致性

//检查数据一致性defcheckDataConsistency(data,rules){results=array(STRING,0)for(ruleinrules){violations=select*fromdata wherenoteval(rule.condition)if(violations.rows()>0){results.append!(rule.name+": "+string(violations.rows())+" 条违规")}}returnresults}//定义规则 rules=[dict(STRING,ANY,[["name","温度范围"],["condition","temperature between -40 and 100"]]),dict(STRING,ANY,[["name","湿度范围"],["condition","humidity between 0 and 100"]])]//使用 checkDataConsistency(t,rules)

3.2 引用一致性

//检查引用一致性defcheckReferenceConsistency(data,refTable,dataCol,refCol){//查找无效引用 invalidRefs=select*fromdata where data[dataCol]notin(select refColfromrefTable)returndict(STRING,ANY,[["invalidCount",invalidRefs.rows()],["invalidRecords",invalidRefs]])}

3.3 业务一致性

//检查业务一致性defcheckBusinessConsistency(data){//示例:温度升高时,湿度应该下降 inconsistent=select*fromdata where temperature>30andhumidity>70returninconsistent}

四、数据质量评分

4.1 质量评分函数

//数据质量评分defcalculateQualityScore(data){scores=dict(STRING,DOUBLE)//完整性评分 nullRates=each(def(col){sum(isNull(data[col]))*100.0/data.rows()},data.columnNames())scores["completeness"]=100-avg(nullRates)//准确性评分(基于异常值比例) outlierRates=array(DOUBLE,0)for(colindata.columnNames()){if(type(data[col])in[INT,LONG,FLOAT,DOUBLE]){avgVal=avg(data[col])stdVal=std(data[col])outlierRate=sum(abs(data[col]-avgVal)>3*stdVal)*100.0/data.rows()outlierRates.append!(outlierRate)}}scores["accuracy"]=100-avg(outlierRates)//总分 scores["total"]=(scores["completeness"]+scores["accuracy"])/2returnscores}//使用 calculateQualityScore(t)

4.2 质量报告

//生成质量报告defgenerateQualityReport(data){report=dict(STRING,ANY)//基本信息 report["totalRows"]=data.rows()report["totalColumns"]=data.columns()//完整性 report["completeness"]=checkFieldCompleteness(data)//质量评分 report["scores"]=calculateQualityScore(data)returnreport}//使用 report=generateQualityReport(t)print(report)

五、自动修复

5.1 缺失值修复

//自动修复缺失值defautoFixMissingValues(data,strategy="mean"){result=datafor(colindata.columnNames()){if(type(data[col])in[INT,LONG,FLOAT,DOUBLE]){if(strategy=="mean"){result[col]=iif(isNull(data[col]),avg(data[col]),data[col])}elseif(strategy=="median"){result[col]=iif(isNull(data[col]),med(data[col]),data[col])}elseif(strategy=="zero"){result[col]=iif(isNull(data[col]),0,data[col])}}}returnresult}

5.2 异常值修复

//自动修复异常值defautoFixOutliers(data,method="clip"){result=datafor(colindata.columnNames()){if(type(data[col])in[INT,LONG,FLOAT,DOUBLE]){avgVal=avg(data[col])stdVal=std(data[col])lower=avgVal-3*stdVal upper=avgVal+3*stdValif(method=="clip"){result[col]=iif(data[col]<lower,lower,iif(data[col]>upper,upper,data[col]))}elseif(method=="remove"){result=select*fromresult where data[col]between lowerandupper}}}returnresult}

5.3 重复值修复

//自动修复重复值defautoFixDuplicates(data,keyColumns){returnselect distinct*fromdata}

六、质量监控

6.1 质量监控表

//创建质量监控表 share table(1:0,`check_time`table_name`check_type`score`details,[TIMESTAMP,STRING,STRING,DOUBLE,STRING])asquality_log//记录质量检查deflogQualityCheck(tableName,checkType,score,details){insert into quality_log values(now(),tableName,checkType,score,details)}

6.2 定期质量检查

//定期质量检查任务defscheduledQualityCheck(){t=loadTable("dfs://iot_db","sensor_data")//完整性检查 completeness=calculateQualityScore(t)["completeness"]logQualityCheck("sensor_data","completeness",completeness,"")//准确性检查 accuracy=calculateQualityScore(t)["accuracy"]logQualityCheck("sensor_data","accuracy",accuracy,"")}//定时任务 scheduleJob("quality_check","数据质量检查",scheduledQualityCheck,00:00,2024.01.01,2030.12.31,'D')

七、实战案例

7.1 数据质量管理平台

//==========数据质量管理平台==========//1.创建质量检查函数defqualityCheckPipeline(data,tableName){print("=== 数据质量检查: "+tableName+" ===")//完整性检查 completeness=checkFieldCompleteness(data)print("字段完整性:")print(completeness)//一致性检查 rules=[dict(STRING,ANY,[["name","温度范围"],["condition","temperature between -40 and 100"]]),dict(STRING,ANY,[["name","湿度范围"],["condition","humidity between 0 and 100"]])]consistency=checkDataConsistency(data,rules)print("一致性检查: "+string(consistency))//质量评分 scores=calculateQualityScore(data)print("质量评分:")print(scores)//记录日志 logQualityCheck(tableName,"total",scores["total"],"")returnscores}//2.创建测试数据 t=table(1..1000asid,take(1..10,1000)asdevice_id,2024.01.01T00:00:00+0..999*60000astimestamp,concat([rand(20.0..30.0,950),take(NULL,30),rand(100.0..200.0,20)])astemperature,concat([rand(40.0..60.0,970),take(NULL,30)])ashumidity)//3.执行质量检查 qualityCheckPipeline(t,"sensor_data")//4.自动修复 fixed=autoFixMissingValues(t,"mean")fixed=autoFixOutliers(fixed,"clip")//5.再次检查 qualityCheckPipeline(fixed,"sensor_data_fixed")print("数据质量管理完成")

八、总结

本文详细介绍了DolphinDB工业数据质量管理:

  1. 完整性检查:字段完整性、记录完整性、时间完整性
  2. 一致性检查:数据一致性、引用一致性、业务一致性
  3. 质量评分:评分函数、质量报告
  4. 自动修复:缺失值修复、异常值修复、重复值修复
  5. 质量监控:监控表、定期检查

思考题

  1. 如何设计数据质量指标体系?
  2. 如何平衡数据修复的自动化程度?
  3. 如何持续改进数据质量?

参考资料

  • DolphinDB数据质量
  • DolphinDB数据治理

http://www.gsyq.cn/news/1590187.html

相关文章:

  • 动图魔方技术拆解 10:GIF 多帧重编辑的 ImageSource 与 PixelMapList 实践
  • 铁电MEMS突触技术:神经形态计算新突破
  • MuleSoft企业级AI编排:LLM安全接入核心系统的实战方法论
  • 2026实测:两款主流AI编程工具全流程vibe coding体验对比
  • LSTM股票方向预测:分类建模与置信度输出实战
  • VMware虚拟机从入门到精通:完整安装指南
  • 用pytest构建AI应用测试体系:从语义断言到CI/CD集成
  • 线性代数直觉:用Python形状思维打通机器学习矩阵运算
  • 深度学习图像去重算法:3大技术方案实现高效重复图片检测
  • 模板驱动文档自动化:结构化内容注入与四层引擎设计
  • 如何深度解析QQ数据库加密机制:专业级跨平台解密实战指南
  • Android性能测试实战:Monkey与SoloPi工具组合使用指南
  • 企业级应用SQL注入漏洞深度剖析:从原理到实战复现
  • ROS TurtleBot RViz可视化环境从零搭建指南
  • 单变量异常检测:业务语义驱动的阈值设计与工程落地
  • 智能图像去重革命:ImageDedup让你的图片库焕然一新
  • Hugging Face Transformers:从模型加载到AI流水线的框架级实践
  • 加密流量分析实战指南:从TLS元数据到机器学习分类
  • LarkMidTable数据中台:10分钟搭建你的企业级数据集成平台
  • A-59F多功能语音模组:扩音防啸叫+双波束,智能对讲全场景解决方案
  • CVE-2023-49371漏洞剖析:MyBatis中${}占位符滥用引发的SQL注入风险与修复实践
  • 深度剖析chromatic:Chromium/V8广谱注入的5个实战突破技巧
  • OpenSSL三行命令快速定位CVE-2026-0947漏洞节点
  • SimCLRv2:工业级自监督预训练落地实践指南
  • 基于NXP PCA8539的VA-LCD驱动开发与OM13503评估板实战指南
  • iPhone本地大模型部署实战:Gemma 2 2B+Core ML优化指南
  • Azure Functions 部署 AutoGen 多智能体实战指南
  • PHP反序列化漏洞实战:CVE-2016-7124绕过__wakeup()详解
  • 中国人工智能专业大学完整排名(2026 双参考:软科本科专业 + CSRankings 学术科研,分 4 大梯队)
  • Explainable Boosting Machines:可解释梯度提升模型实战指南