DolphinDB工业数据质量:完整性检查与修复
目录
- 摘要
- 一、数据质量概述
- 1.1 数据质量维度
- 1.2 质量指标
- 二、完整性检查
- 2.1 字段完整性
- 2.2 记录完整性
- 2.3 时间完整性
- 三、一致性检查
- 3.1 数据一致性
- 3.2 引用一致性
- 3.3 业务一致性
- 四、数据质量评分
- 4.1 质量评分函数
- 4.2 质量报告
- 五、自动修复
- 5.1 缺失值修复
- 5.2 异常值修复
- 5.3 重复值修复
- 六、质量监控
- 6.1 质量监控表
- 6.2 定期质量检查
- 七、实战案例
- 7.1 数据质量管理平台
- 八、总结
- 参考资料
摘要
本文深入讲解DolphinDB工业数据质量管理。从完整性检查到一致性验证,从数据质量评分到自动修复,从质量监控到持续改进,全面介绍数据质量管理的核心方法。通过丰富的代码示例,帮助读者掌握工业数据质量管理的核心技能。
一、数据质量概述
1.1 数据质量维度
1.2 质量指标
| 指标 | 说明 |
|---|---|
| 完整性 | 数据是否完整 |
| 准确性 | 数据是否正确 |
| 一致性 | 数据是否一致 |
| 及时性 | 数据是否及时 |
| 有效性 | 数据是否有效 |
二、完整性检查
2.1 字段完整性
//创建测试数据 t=table(1..100asid,take([1,NULL,3],100)asdevice_id,take([25.0,NULL,27.0],100)astemperature,take([50.0,51.0,NULL],100)ashumidity)//检查字段完整性defcheckFieldCompleteness(data){result=table(data.columnNames()asfield_name,each(def(col){sum(isNull(data[col]))},data.columnNames())asnull_count,each(def(col){sum(isNull(data[col]))*100.0/data.rows()},data.columnNames())asnull_rate)returnresult}//使用 checkFieldCompleteness(t)2.2 记录完整性
//检查记录完整性defcheckRecordCompleteness(data,keyColumns){//检查主键完整性 keyNull=select*fromdata where hasNull(keyColumns)//检查重复记录 duplicates=select count(*)ascntfromdata group by keyColumns having count(*)>1returndict(STRING,ANY,[["keyNullCount",keyNull.rows()],["duplicateCount",duplicates.rows()]])}//使用 checkRecordCompleteness(t,`id)2.3 时间完整性
//检查时间序列完整性defcheckTimeCompleteness(data,timeCol,interval){//获取时间范围 minTime=min(data[timeCol])maxTime=max(data[timeCol])//计算预期记录数 expectedCount=(maxTime-minTime)/interval+1//实际记录数 actualCount=data.rows()//缺失记录 missingCount=expectedCount-actualCountreturndict(STRING,ANY,[["expectedCount",expectedCount],["actualCount",actualCount],["missingCount",missingCount],["completenessRate",actualCount*100.0/expectedCount]])}三、一致性检查
3.1 数据一致性
//检查数据一致性defcheckDataConsistency(data,rules){results=array(STRING,0)for(ruleinrules){violations=select*fromdata wherenoteval(rule.condition)if(violations.rows()>0){results.append!(rule.name+": "+string(violations.rows())+" 条违规")}}returnresults}//定义规则 rules=[dict(STRING,ANY,[["name","温度范围"],["condition","temperature between -40 and 100"]]),dict(STRING,ANY,[["name","湿度范围"],["condition","humidity between 0 and 100"]])]//使用 checkDataConsistency(t,rules)3.2 引用一致性
//检查引用一致性defcheckReferenceConsistency(data,refTable,dataCol,refCol){//查找无效引用 invalidRefs=select*fromdata where data[dataCol]notin(select refColfromrefTable)returndict(STRING,ANY,[["invalidCount",invalidRefs.rows()],["invalidRecords",invalidRefs]])}3.3 业务一致性
//检查业务一致性defcheckBusinessConsistency(data){//示例:温度升高时,湿度应该下降 inconsistent=select*fromdata where temperature>30andhumidity>70returninconsistent}四、数据质量评分
4.1 质量评分函数
//数据质量评分defcalculateQualityScore(data){scores=dict(STRING,DOUBLE)//完整性评分 nullRates=each(def(col){sum(isNull(data[col]))*100.0/data.rows()},data.columnNames())scores["completeness"]=100-avg(nullRates)//准确性评分(基于异常值比例) outlierRates=array(DOUBLE,0)for(colindata.columnNames()){if(type(data[col])in[INT,LONG,FLOAT,DOUBLE]){avgVal=avg(data[col])stdVal=std(data[col])outlierRate=sum(abs(data[col]-avgVal)>3*stdVal)*100.0/data.rows()outlierRates.append!(outlierRate)}}scores["accuracy"]=100-avg(outlierRates)//总分 scores["total"]=(scores["completeness"]+scores["accuracy"])/2returnscores}//使用 calculateQualityScore(t)4.2 质量报告
//生成质量报告defgenerateQualityReport(data){report=dict(STRING,ANY)//基本信息 report["totalRows"]=data.rows()report["totalColumns"]=data.columns()//完整性 report["completeness"]=checkFieldCompleteness(data)//质量评分 report["scores"]=calculateQualityScore(data)returnreport}//使用 report=generateQualityReport(t)print(report)五、自动修复
5.1 缺失值修复
//自动修复缺失值defautoFixMissingValues(data,strategy="mean"){result=datafor(colindata.columnNames()){if(type(data[col])in[INT,LONG,FLOAT,DOUBLE]){if(strategy=="mean"){result[col]=iif(isNull(data[col]),avg(data[col]),data[col])}elseif(strategy=="median"){result[col]=iif(isNull(data[col]),med(data[col]),data[col])}elseif(strategy=="zero"){result[col]=iif(isNull(data[col]),0,data[col])}}}returnresult}5.2 异常值修复
//自动修复异常值defautoFixOutliers(data,method="clip"){result=datafor(colindata.columnNames()){if(type(data[col])in[INT,LONG,FLOAT,DOUBLE]){avgVal=avg(data[col])stdVal=std(data[col])lower=avgVal-3*stdVal upper=avgVal+3*stdValif(method=="clip"){result[col]=iif(data[col]<lower,lower,iif(data[col]>upper,upper,data[col]))}elseif(method=="remove"){result=select*fromresult where data[col]between lowerandupper}}}returnresult}5.3 重复值修复
//自动修复重复值defautoFixDuplicates(data,keyColumns){returnselect distinct*fromdata}六、质量监控
6.1 质量监控表
//创建质量监控表 share table(1:0,`check_time`table_name`check_type`score`details,[TIMESTAMP,STRING,STRING,DOUBLE,STRING])asquality_log//记录质量检查deflogQualityCheck(tableName,checkType,score,details){insert into quality_log values(now(),tableName,checkType,score,details)}6.2 定期质量检查
//定期质量检查任务defscheduledQualityCheck(){t=loadTable("dfs://iot_db","sensor_data")//完整性检查 completeness=calculateQualityScore(t)["completeness"]logQualityCheck("sensor_data","completeness",completeness,"")//准确性检查 accuracy=calculateQualityScore(t)["accuracy"]logQualityCheck("sensor_data","accuracy",accuracy,"")}//定时任务 scheduleJob("quality_check","数据质量检查",scheduledQualityCheck,00:00,2024.01.01,2030.12.31,'D')七、实战案例
7.1 数据质量管理平台
//==========数据质量管理平台==========//1.创建质量检查函数defqualityCheckPipeline(data,tableName){print("=== 数据质量检查: "+tableName+" ===")//完整性检查 completeness=checkFieldCompleteness(data)print("字段完整性:")print(completeness)//一致性检查 rules=[dict(STRING,ANY,[["name","温度范围"],["condition","temperature between -40 and 100"]]),dict(STRING,ANY,[["name","湿度范围"],["condition","humidity between 0 and 100"]])]consistency=checkDataConsistency(data,rules)print("一致性检查: "+string(consistency))//质量评分 scores=calculateQualityScore(data)print("质量评分:")print(scores)//记录日志 logQualityCheck(tableName,"total",scores["total"],"")returnscores}//2.创建测试数据 t=table(1..1000asid,take(1..10,1000)asdevice_id,2024.01.01T00:00:00+0..999*60000astimestamp,concat([rand(20.0..30.0,950),take(NULL,30),rand(100.0..200.0,20)])astemperature,concat([rand(40.0..60.0,970),take(NULL,30)])ashumidity)//3.执行质量检查 qualityCheckPipeline(t,"sensor_data")//4.自动修复 fixed=autoFixMissingValues(t,"mean")fixed=autoFixOutliers(fixed,"clip")//5.再次检查 qualityCheckPipeline(fixed,"sensor_data_fixed")print("数据质量管理完成")八、总结
本文详细介绍了DolphinDB工业数据质量管理:
- 完整性检查:字段完整性、记录完整性、时间完整性
- 一致性检查:数据一致性、引用一致性、业务一致性
- 质量评分:评分函数、质量报告
- 自动修复:缺失值修复、异常值修复、重复值修复
- 质量监控:监控表、定期检查
思考题:
- 如何设计数据质量指标体系?
- 如何平衡数据修复的自动化程度?
- 如何持续改进数据质量?
参考资料
- DolphinDB数据质量
- DolphinDB数据治理
