|
数据成本量化
319 数据成本治理是数据平台建设中至关重要的一个环节,做好数据成本治理能减少企业成本,保障企业走提效降本的可持续发展道路。数据成本治理涉及了对数据存储、处理、传输和使用等过程中所产生成本的管理和优化,本文主要阐述了一种在治理过程中量化存储和计算资源的实现方式。首先明确我们是基于任务进行治理,并且在公司的数据开发平台(idata)上任务和业务输出表是一对一的,所以我们在量化成本的时候我们要收集到每个任务的基本信息是"属于 A 业务的 B 任务占用了 C 大小存储、消耗了 D 时间长度的多少 CPU、大约每个月要花多少(存储和计算的)钱"。下面介绍如何分别实现任务级别的存储统计和计算统计。存储统计我们的数仓部门绝大部分是基于 Spark On Hive 进行作业开发的,小部分基于 StarRocks 内部表。所以存储统计关键在于统计各张 Spark On Hive 表和分区信息,信息包括最后一次访问时间、最后一次修改时间、对应的业务 ID、预估的月存储费用等。1、统计表的最后一次访问和修改时间是为后续治理中下线无效表、过期表提供线索 2、(输出)表和任务的绑定关系维护在数据平台(IData)的作业配置中,所以在 IData 业务库内做输出表和作业表的 Join 即可获取关联关系 3、统计到分区粒度的原因是可以治理"热表冷分区"的数据(只会频繁访问某张表的特定分区),实际情况中存在一些业务表一天的分区数据就超过很多表的总数据量,所以很有治理的价值那么如何统计各张 Hive 表及其分区信息呢?方法一:ANALYZE TABLE有一种简单的方法:1、首先通过 JDBC 直连或数据迁移方式获取 Hive 元数据库的TBLS 表和 ARTITIONS 表所有的表和分区信息。2、然后基于 ANALYZE TABLE 命令轮训解析每张表中的 total_size 属性即可。该命令也可以解析到包含 row_num、raw_data_size 等关键属性。但是这种方法有一个严重缺陷,在执行ANALYZE TABLE 命令时,该命令需要扫描大量数据,那么会占用计算资源,导致其他用户的查询速度变慢。因此,只建议在低峰期执行ANALYZE TABLE命令。但实际情况是低峰期并不固定,在没算上轮训分区的情况下就要轮询几千张表调用该命令,会严重降低 Hive 查询服务对外可用性甚至不可用。所以该方法仅适用于轮训数据表/分区少且对查询稳定性要求不高的场景。方法二:元数据库 SQL 查询既然实时扫描数据的方式不行,那么我们在上面思路的第一步查 Hive 元数据库数据的同时并直接把所有数据捞出来是不是就可以了?这种方式更简单而且也不会影响到服务性能。这里的 SQL 不做展开讨论,简单来说就是关联 SDS 表、TBLS 表、PARTITION_PARAMS 表、PARTITION 表、DBS 表、TABLE_PARAMS 表获取到表和分区的 row_num、raw_data_size、numFiles、total_size、last_modified_time等信息。这个方式在实际应用中,我发现元数据库中 ARTITION_PARAMS和TABLE_PARAMS这两张表的部分数据并不可靠,存在某张表实际有数据存在但是元数据库内该张表的信息是 0 rown_num,0 total_size 的情况。可能的原因是:数据库中的元数据缺失或不一致,导致查询结果不准确。数据扫描过程中出现错误或延迟,导致部分数据未被统计在内。数据库中存在重复的元数据记录,导致查询结果出现重复计算的情况。数据可靠性比较低的情况下,另外这种方式还获取不到最近访问时间(无法区分冷热表),所以此方案也不满足要求。既然无法通过直接解析 Hive 元数据达到我们的效果,那么我们就从 HDFS 下手(Hive 底层存储是基于 HDFS 的)。如果离线作业的方式可以解析 HDFS 的 FSIMAGE 文件,那么即保证数据完整、可靠、也不会影响到服务。FSIMAGE 存储了 Hadoop 分布式文件系统中所有文件和目录的元数据信息,包括文件和目录的创建、修改和删除时间戳,文件和目录的访问权限和拥有者等信息。 FSIMAGE 不包含文件本身的数据,只存储文件系统的元数据信息。 FSIMAGE 文件是一个二进制文件,由 NameNode 生成并定期自动保存。在 NameNode 发生故障时,可以使用 FSIMAGE 文件来恢复文件系统的状态。在做 FsImage 解析之前,我简单介绍下 FsImage 的定位。如果将 HDFS 中 NameNode 比做 Redis 的话,我们知道 Redis 的数据恢复方式之一是基于 RDB + AOF 持久化的方式,这里的 RDB 就相当于 FsImage,NameNode 还有个 EditLog 角色,它相当于 Redis 中的 AOF,虽然具体实现有差异,但设计方式相似。那么这里又有一个疑问,Redis 的 RDB 数据并不是 Redis 内完整的数据,它是有时间差的,还有一部分在 AOF 内,那只解析 FsImage 数据是不是也有时间差,是不是也不准?是的,FsImage 也有这个情况。但是结合我们的数据治理,此处的延迟产生的数据差异是可以忽略不计的, Fsimage 默认是每隔 1 小时合并一次,我们在治理数据时候是以一个很长的时间段来观察的,1 小时可以忽略不计。如果你真的有此需求,你可以调整 NameNode 相关参数,控制它每隔 X 时间或者每到 Y 数据量就触发 FsImage 和 EditLog 合并(满足其中之一即触发)。或者你可以解析 EditLog 然后再做数据合并来实现,此方案数据会更完整。方法三:解析 FsImage1、第一步我们将 FsImage 二进制文件转换成易于解析的文件,基于 hdfs oiv 命令可将镜像文件转化为 XML 或 CSV 等格式。考虑到自身业务情况我选择了 XML。原始 XML 可提取的信息如下:属性类型描述例子idbigint文件的唯一标识namestring文件名称typestring数据类型, FILE/DIRECTORYblocksstruct>blocks:包含多少数据块【文件被切成数据块】block:1. 内部的 id 表示是块 id 2. genstamp 是一个唯一编号 3. numBytes 表示当前数据块的实际大小,fileSize = SUM(numBytes){"block":[{"genstamp":1009,"id":1073741833,"numBytes":134217728},{"genstamp":1010,"id":1073741834,"numBytes":100828269}]}preferredblocksizebigint推荐每一个数据块的大小134217728(128M)replicationbigint副本数atimebigint最近一次访问时间1645099637705mtimebigint最近一次修改时间1645099637705nsquotabiginttype=DIRECTORY 时有效,名称配额 限制指定目录下允许的文件和目录的数量-1dsquotabiginttype=DIRECTORY 时有效,空间配额 限制该目录下允许的字节数-1permissionstring权限root:supergroup:0666storagepolicyidbigint访问策略xattrsstruct额外属性{"xattr":{"name":"hdfs.erasurecoding.policy","ns":"SYSTEM","val":"\\0000;\\0000;\\0000;\\000b;replication"}}XML 中还有个元素标签,可以获取所有文件块的相对关系属性描述id文件的唯一标识pid对应的父文件的唯一标识2、第二步将 XML 上传至 HDFS,并定时拉取更新,定时脚本以 Linux Cron 方式触发执行#!/bin/bash# 获取 fsimage 镜像 并存放到/tmp 目录下hdfs dfsadmin -fetchImage /opt/hdfs_parse# 获取将镜像的文件名,取 fsimage_0*序列最大的那个command_result=`ls -l /opt/hdfs_parse/ | grep fsimage_0 | awk '{print $9}'| sort -nr | head -n 1`echo "command result is $command_result"# 使用 oiv 将镜像文件转化成 xmlhdfs oiv -p XML -i /opt/hdfs_parse/$command_result -o /opt/hdfs_parse/fsimage.xml# 覆盖方式上传到 hdfshdfs dfs -put -f /opt/hdfs_parse/fsimage.xml /files/fsimagerm -f /opt/hdfs_parse/$command_result3、开发 Spark 作业借助 Spark-xml 解析 XML开发的作业还有一点要注意:如果你有通过文件块父子关系获取文件绝对路径的需求,在开发过程中要考虑下自己的 FsImage 大小。如果比较小可以按照我们单节点方式根据父子关系拼成一棵树,然后深度遍历(DFS)获取。但如果 FsImage 体量较大且运行 Spark 作业的 Driver 端配置也没有特别高,是无法正常运行得到结果的,这时候需要以分布式的思路去开发解析获取,要利用上各个 Executor 的计算资源。此处代码不做赘述,思路就是先找出最长路径确定 for 循环次数,然后利用 Spark Sql 一遍遍 join,每次 join 后过滤掉已经无父级 id 的数据。以上,我们获取了各个文件块的存储信息,相对于 Hive 表它的粒度更细,文件块统计的范围不仅仅是 Hive 相关的存储。另外每个 Bolck 块大小最大是 128M,如果某个分区或者某张表大小超过了,是存在块和表多对一的关系的,那么文件夹、文件块该怎么和 Hive 表和分区关联上呢?这点其实很简单,根据路径和它生成的规则即可建立映射关系,Hive 的根路径在 HDFS 上的目录是确定的,表和库的路径规范也是有迹可循的,库文件夹在 Hive 目录下的第一层(不包括 hive default 库),以 .db 结尾。表是在库文件夹下的第一层,表之下如果文件夹包含"="即为分区目录,另外考虑到多分区的情况,目录层级会更深,最后一层就是该表(或分区)对应的文件块信息,经过聚合函数统计可获取上面相关 Hive 表(分区)信息。下面是实现过程中的数据模型数据加工模型Step1:经过 Spark 作业提取 xml 文件、加工后,输出表信息如下:create table `ods`.`ods_fsimage_hdfs_file` ( `atime` BIGINT comment '最近一次访问时间', `blocks` STRING comment '块信息(raw)', `blocknum` INT comment '块数量', `filesize` BIGINT comment '文件大小', `dsquota` BIGINT comment 'type=DIRECTORY 时有效,空间配额 限制该目录下允许的字节数', `id` BIGINT comment '文件唯一 id', `pid` BIGINT comment '父级 id', `mtime` BIGINT comment '最近一次修改时间', `name` STRING comment '文件名称', `nsquota` BIGINT comment 'type=DIRECTORY 时有效,名称配额 限制指定目录下允许的文件和目录的数量', `permission` STRING comment '权限', `preferredblocksize` BIGINT comment '推荐每一个数据块的大小', `replication` BIGINT comment '备份数', `storagepolicyid` BIGINT comment '访问策略', `type` STRING comment '数据类型, FILE/DIRECTORY', `xattrs` STRING comment 'xattrs', `path` STRING comment '文件完整路径', `pathlevel` INT comment '文件路径深度', `hivedbname` STRING comment 'hive 库', `hivetablename` STRING comment 'hive 表', `hivepartitionname` STRING comment '分区名称', `execute_time` TIMESTAMP comment '该统计作业的执行时间即该记录创建时间' ) comment '镜像文件分析表' stored as orcStep2:解析 Orc 文件行数,通过 Hive 元数据库中的 TABLE_PARAM 获取各个表存储类型(Text、Parquet、Orc),基于路径关联到对应的 HDFS 文件块,通过开发解析 Orc File Footer 的作业获取 Orc 文件的行数 row_num,其他暂时通过 Presto SQL 查询方式获取:create table `ods`.`ods_file_row_num_inc_d` ( `path` STRING comment '路径', `row_num` BIGINT comment '行数', `execute_time` TIMESTAMP comment '执行时间' ) comment '文件行数表' partitioned by ( `pt` STRING comment 'pt') stored as orcStep3:结合数据平台维护的作业内容和配置以及 Hive 元数据库内容信息,以库-表-分区为维度加工聚合,输出表中存储相关的信息如下:-- 该表已在 hive 中创建create table `dwd`.`dwd_fsimage_hive_info` ( `path` STRING comment 'HDFS 路径', `db_name` STRING comment '库名称', `tbl_id` BIGINT comment '表 id', `tbl_name` STRING comment '表名称', `tbl_type` STRING comment '表类型', `part_id` BIGINT comment '分区 id', `part_name` STRING comment '分区名称', `row_num` BIGINT comment '行数', `raw_data_size` BIGINT comment '原始数据的大小', `raw_data_size_str` STRING comment '原始数据的大小(文本形式)', `total_size` BIGINT comment '占用 HDFS 存储空间大小', `total_size_str` STRING comment '占用 HDFS 存储空间大小(文本形式)', `fsimage_file_size` BIGINT comment '镜像文件解析出的文件大小,单位 B', `fsimage_file_size_str` STRING comment '镜像文件解析出的文件大小(文本形式)', `execute_time` TIMESTAMP comment 'execute_time', `atime` TIMESTAMP comment '最近访问时间', `mtime` TIMESTAMP comment '最近修改时间', `blocknum` INT comment '块数量', `replication` INT comment '副本数', `atime_level` INT comment '-1:无访问时间,1:0-30 天,2:30-90 天,3:90-180 天,4:180-365 天,5:大于 365', `file_size_level` INT comment '-1:无大小,0:空,1:0-1MB,2:1-64M,3:64-128M,4:128-1G,5:大于 1G', `create_time` TIMESTAMP comment '表/分区创建时间', `file_count` INT comment '文件数量', `avg_fsimage_file_size` STRING comment '平均文件大小', `avg_fsimage_file_size_str` STRING comment '平均文件大小', `business_id` BIGINT comment 'idata 中 jobId', `input_format` STRING comment '输入格式' ) comment 'hive 详情表' stored as orcStep4:以库-表为维度加工聚合,输出表信息如下:-- 该表已在 hive 中创建create table `dws`.`dws_fsimage_hive_info` ( `db_name` STRING comment '库名', `tbl_name` STRING comment '表名', `path` STRING comment '路径', `env` STRING comment '环境', `file_size` STRING comment '文件大小', `latest_write_time` STRING comment '最近写入时间', `latest_read_time` STRING comment '最近访问时间', `part_num` BIGINT comment '分区数量', `row_num` BIGINT comment '行数', `execute_day` STRING comment '记录创建时间', `replication_num` INT comment '副本数量', `business_id` STRING comment '业务 id,idata 中的 jobId' ) comment 'dws 维度表' stored as orc存储成本预估计算至此,我们已经获取了任务对应的表的存储情况,为了更直观点的表现出存储成本,我们会做一个经费计算预估。按照磁盘 1G/月 0.45 元的标准即可预估其费用。任务每月的存储成本(元) = 对应输出表大小(G) * 表副本数 * 0.45G/月/元计算统计计算资源方式有两种1、第一种是根据各个作业类型(Spark/MR/Flink/Sqoop/...)设计不同计算模型分别统计。比如 MR 任务 cpu = map 个数 * 时间 + reduce 个数 * 时间,Spark 任务 cpu = task 个数 * 时间。2、第二种是依赖 Yarn 组件资源管理 API 直接获取对应每次应用任务的 CPU 和内存数据,根据 /ws/v1/cluster/apps 获取以下三个字段返回字段数据类型备注memorySecondsint所有的 container 消耗的内存总和vcoreSecondstring所有的 container 消耗的 cpu core 总和elapsedTimelong应用程序消耗时间(finished-start),单位毫秒我们数据平台的计算执行引擎的资源调度都是 Yarn 的方式,第二种方式是可行的,另外考虑到准确性和实现简易程度,使用了第二种方式。我们目前可以拿到提交到 Yarn 上各个应用的计算信息了,那么怎么关联到任务呢?我们在提交作业的时候,对 applicationName 有做统一命名规范(任务类型-离线/实时-任务 ID 的命名方式),如下图 "SparkSQL-p-5371": ID 为 5371 的离线跑批的 SparkSQL 任务,在拉取 Yarn Application 列表的时候解析该 Name 即可关联上相关任务。计算成本预估计算运行时长1、针对实时作业按 30 天算2、针对小时任务,时长按单次时长 * 每天次数 * 30 天算3、针对天任务,按单次时长 * 30 天算预估任务每月的计算成本(元) = 预估任务每月的内存成本 + 预估任务每月的 Cpu Core 成本Cpu Core 按 155 元/核/月算,内存按 30 元/G/月算,天任务的计算成本公式如下:预估任务每月的内存成本(元) = memorySeconds / 1024 / 60 / 60 / 24 * 30预估任务每月的 Cpu Core 成本(元)= vcoreSecond / 60 / 60 / 24 * 155最后数仓同学目前已基于这些数据提供的线索进行任务治理,同时也直观地量化了治理成效,包含前端、后端、测试、UED 等,Base 在风景如画的杭州,一个富有激情和技术匠心精神的成长型团队。前端,隶属于研发部。团队现有 90 余个前端小伙伴,平均年龄 27 岁,近 3 成是全栈工程师,妥妥的青年风暴团。成员构成既有来自于阿里、网易的“老”兵,也有浙大、中科大、杭电等校的应届新人。团队在日常的业务对接之外,还在物料体系、工程平台、搭建平台、性能体验、云端应用、数据分析及可视化等方向进行技术探索和实战,推动并落地了一系列的内部技术产品,持续探索前端技术体系的新边界。如果你想改变一直被事折腾,希望开始能折腾事;如果你想改变一直被告诫需要多些想法,却无从破局;如果你想改变你有能力去做成那个结果,却不需要你;如果你想改变你想做成的事需要一个团队去支撑,但没你带人的位置;如果你想改变既定的节奏,将会是“5 年工作时间 3 年工作经验”;如果你想改变本来悟性不错,但总是有那一层窗户纸的模糊… 如果你相信相信的力量,相信平凡人能成就非凡事,相信能遇到更好的自己。
|
|