数仓数据质量管理脚本
一、数据质量管理概述
1、数据质量管理定义
数据质量管理(Data Quality Management),是指对数据从计划、获取、存储、共享、维护、应用、消亡生命周期的每个阶段里可能引发的各类数据质量问题,进行识别、度量、监控、预警等一系列管理活动,并通过改善和提高组织的管理水平使得数据质量获得进一步提高。
数据质量管理是循环管理过程,其终极目标是通过可靠的数据提升数据在使用中的价值,并最终为企业赢得经济效益
2、数据质量评价指标
数据质量管理的最终目标是改善,任何改善都是建立在评价的基础上。通常数据质量的评价标准包括以下内容
评价标准 | 描述 | 监控项 |
---|---|---|
唯一性 | 指主键保持唯一 | 字段唯一性检查 |
完整性 | 主要包括记录缺失和字段值缺失等方面 | 字段枚举值检查 |
字段记录数检查 | ||
字段空值检查 | ||
精确度 | 数据生成的正确性,数据在整个链路流转的正确性 | 波动阀值检查 |
合法性 | 主要包括格式、类型、域值的合法性 | 字段日期格式检查 |
字段长度检查 | ||
字段值域检查 | ||
时效性 | 主要包括数据处理的时效性 | 批处理是否按时完成 |
二、数据质量管理实操
1、需求分析
我们的数仓项目主要监控以下数据的指标(脚本可以参考,这里主要按照电商数仓来搭建的):
- ODS层数据量,每日环比和每周同比变化不能超过一定范围
- DIM层不能出现id空值,重复值;
- DWD层不能出现id空值,重复值;
表 | 检查项目 | 依据 | 异常值下限 | 异常值上限 |
---|---|---|---|---|
ods_order_info | 同比增长 | 数据总量 | -10% | 10% |
环比增长 | 数据总量 | -10% | 50% | |
值域检查 | final_amount | 0 | 100 | |
dwd_order_info | 空值检查 | id | 0 | 10 |
重复值检查 | id | 0 | 5 | |
dim_user_info | 空值检查 | id | 0 | 10 |
重复值检查 | id | 0 | 5 |
2、功能模块
3、开发环境准备
3.1 Python开发环境准备
IDEA中点击File,在下拉选择中点击Settings,点击“Plugins”,点击右上角的“Marketplace”,然后在搜索框中输入“python”,在搜索结果列表中找到Python插件,点击“Install”,安装插件
点击Idea中的“File”,在下列列表中点击“New”,在右侧弹出的列表中点击Project,在新建的工程中,点击“Python”,然后点击Next,首次创建Python项目,会提示无Python SDK,此处选择Yes,后续再添加SDK
添加Python SDK,为了保证测试和运行的Python环境一致,我们配置项目采用远程集群的Python环境执行本地代码,以下为具体配置步骤。第一步:点击“File”→Project Structure,增加Python SDK,然后点击“SSH Interpreter”,选择“Existing server configuration”,这样远程就可以连接了
3.2 初始化MySQL环境
1 | # 创建data_supervisor库 |
4、规则检测模块
4.1 单一规则检测脚本编写
检测规则脚本分为五类:分别是空id检查脚本、重复id检查脚本、值域检查脚本、数据量环比检查脚本和数据量同比检查脚本。下面分别给大家介绍一下五类检测脚本的具体编写。
空id检查脚本
在Idea中创建一个文件null_id.sh,在文件中编写如下内容: 实现的主要功能是:计算空值个数,并将结果和自己定义的阈值上下限,插入到MySQL表中
1 |
|
重复id检查脚本
在Idea中创建一个文件duplicate.sh,在文件中编写如下内容:实现的主要功能是:计算重复值个数,并将结果和自己定义的阈值上下限,插入到MySQL表中
1 |
|
值域检查脚本
在Idea中创建一个文件range.sh,在文件中编写如下内容:实现的主要功能是:计算超出规定值域的值的个数,并将结果和自己定义的阈值上下限,插入到MySQL表中
1 |
|
数据量环比检查脚本
day_on_day.sh,在文件中编写如下内容: 实现的主要功能是:计算数据量环比增长值,并将结果和自己定义的阈值上下限,插入到MySQL表中
1 |
|
数据量同比检查脚本
week_on_week.sh,在文件中编写如下内容: 实现的主要功能是:计算数据量同比增长值,并将结果和自己定义的阈值上下限,插入到MySQL表中
1 |
|
4.2 数仓各层检测脚本编写
将上一节编写的单一规则检测脚本按照数仓分层进行集成,分别编写ODS层检测脚本,DWD层检测脚本和DIM层检测脚本
ODS层
表 | 检查项目 | 依据 | 异常值下限 | 异常值上限 |
---|---|---|---|---|
ods_order_info | 同比增长 | 数据总量 | -10% | 10% |
环比增长 | 数据总量 | -10% | 50% | |
值域检查 | final_amount | 0 | 100 |
在Idea中创建一个文件check_ods.sh,在文件中编写如下内容:
1 |
|
DWD层
表 | 检查项目 | 依据 | 异常值下限 | 异常值上限 |
---|---|---|---|---|
dwd_order_info | 空值检查 | id | 0 | 10 |
重复值检查 | id | 0 | 5 |
在Idea中创建一个文件check_dwd.sh,在文件中编写如下内容:
1 |
|
DIM层
表 | 检查项目 | 依据 | 异常值下限 | 异常值上限 |
---|---|---|---|---|
dim_user_info | 空值检查 | id | 0 | 10 |
重复值检查 | id | 0 | 5 |
在Idea中创建一个文件check_dim.sh,在文件中编写如下内容:
1 |
|
5、告警集成模块
该模块主要用于检查MySQL中的检测结果的异常,若有异常出现就发送警告。警告方式可选择邮件或者集成第三方告警平台睿象云
1 | # 环境准备 |
新建python脚本用于查询数据监控结果表格并发送告警邮件,该脚本主要由三个函数组成
- read_table用于读取指标有问题的数据
- one_alert函数用于向睿象云发送告警
- mail_alert函数用于发送邮件告警
在Idea中创建一个文件check_notification.py,在文件中编写如下内容
1 | #!/usr/bin/env python |
6、调度模块
该模块的主要功能为调度数据质量监控流程。数据质量监控工作流也采用Azkaban进行调度。数据质量监控工作流必定依赖数据仓库工作流,此处为了解耦,利用Azkaban API主动监视数据仓库工作流的执行状态,进而触发数据质量监控工作流。
6.1 脚本编写
Azkaban REST API 封装脚本,该脚本主要是对Azkaban API的封装,主要有三个方法
- login函数可以登录Azkanban并返回session_id
- get_exec_id函数可以获取正在执行的工作流程的Execution ID
- wait_node可以等待指定Flow中某一结点执行完毕并判断其是否执行成功
在Idea中创建一个文件azclient.py,在文件中编写如下内容:
1 | #!/usr/bin/env python |
ODS层调度脚本
该脚本用于检查ODS层数据质量。在Idea中创建一个文件check_ods.py,在文件中编写如下内容:
1 | #!/usr/bin/env python |
DWD层调度脚本
该脚本用于检查DWD层数据质量。在Idea中创建一个文件check_dwd.py,在文件中编写如下内容
1 | #!/usr/bin/env python |
DIM层调度脚本
该脚本用于检查DIM层数据质量。在Idea中创建一个文件check_dim.py,在文件中编写如下内容
1 | #!/usr/bin/env python |
6.2 Azkaban工作流配置文件
在Idea中创建一个文件azkaban.project,在文件中编写如下内容:azkaban-flow-version: 2.0
;在Idea中创建一个文件data_supervisor.flow,在文件中编写如下内容:
1 | nodes: |
将所有文件打包成data_supervisor.zip文件,然后上传启动,填写参数运行
7、可视化模块
该模块的主要作用是对数据质量监控结果进行可视化展示。检测结果可以采用Superset进行可视化展示。具体配置步骤如下:
- 在Superset中新建数据库连接:注:mysql://root:000000@hadoop102:3306/data_supervisor?charset=utf8
- 点击“Datasets”,然后点击“+Dataset”,将所有数据表格都导入为dataset
- 新建一个dashboard
- 新建一张图表并保存到dashboard,在chart页面中选择新建chart
- 所有监控的指标创建图表,并保存到dashboard