1. 首页 > 百货

腾讯大数据实时湖仓智能优化实践

一、湖仓架构

腾讯大数据的湖仓架构如下图所示:

这里分为三个部分,分别是数据湖计算、数据湖管理和数据湖存储。

数据湖计算部分,Spark 作为 ETL Batch 任务的主要批处理引擎,Flink 作为准实时计算的流处理引擎,StarRocks 和 Presto 作为即席查询的 OLAP 引擎。数据湖管理层以 Iceberg 为核心,同时开放了一些简单的 API,支持用户通过 SDK 的方式去调用。在 Iceberg 之上构建了一套 Auto Optimize Service 服务,帮助用户在使用 Iceberg 的过程中实现查询性能的提升和存储成本的降低。数据湖底层存储基于 HDFS 和 COS,COS 是腾讯云的云对象存储,可以满足云上用户的大规模结构化/非结构化存储需求,在上层计算框架和底层存储系统之间,也会引入 Alluxio 构建了一个统一的存储 Cache 层,进行数据缓存提速。本次分享的重点主要是围绕智能优化服务(Auto Optimize Service)展开。

智能优化服务主要由六个部分组成,分别是:Compaction Service(合并小文件)、Expiration Service(淘汰过期快照)、Cleaning Service(生命周期管理和孤儿文件清理)、Clustering Service(数据重分布)、Index Service(二级索引推荐)和 Auto Engine Service(自动引擎加速)。以下就各模块近期做的重点工作展开介绍。

小文件合并有读和写两个阶段,由于 Iceberg 主要以 PARQUET/ORC 列存格式为主,读写列存面临着两次行列转换和编解码,开销非常大。针对这个痛点,我们对 Parquet 存储模型进行了分析,主要由 RowGroup、Column Chunk、Page 以及 Footer 组成,相对位置如下图所示,不同列的最小存储单元以 Page 级别组织,数据水平方向上以 RowGroup 大小划分数据块,以便上层引擎按照 RowGroup 级别分配 task 加载数据。

基于存储模型的特点,我们针对性地采用了 RowGroup Level 和 Page Level 两种拷贝优化,对于大文件合并大文件且仅涉及重新压缩、仅涉及列裁剪的场景,使用 RowGroup Copy;对于小文件合并大文件、不涉及列变化、不涉及 BloomFilter 的场景,使用 Page Copy。

下面是我们内部全部升级优化之后的落地效果,合并时间&资源减少 5 倍多。

我们还增强了 Delete Files 合并优化和增量 Rewrite 策略。

在大规模 Update 的场景下,会产生大量的 Delete Files,数据读取时会频繁地进行 Delete File Apply> 增量 Rewrite 优化会通过在>

Iceberg 较 Hive 增加了 min-max 索引,记录了>

对于专注于业务开发的用户来说,索引的选择往往是比较困难的,如何精准的判断是不是需要索引,需要什么索引,索引是否有效,索引是否会带来副作用等,往往需要经过一些额外的任务来进行分析,如果靠用户自己的决策选择,获得大规模的适配收益很难。基于这个想法,我们做了智能推荐索引的支持,而智能的推荐,首先是需要一套 metrics 框架的支持,能够记录表的 Scan,Filter 等各种事件,收集 Partition Status 信息,然后对这些事件进行分析,统计列的查询频次,过滤条件,根据规则区分高/低基数列等。最后根据分析结果,进行 Index 的推荐。

整个端到端的 Index Service 流程如下图:1)首先是 SQL 提取,由于我们获取到的 SQL 是引擎优化后的,并不是原始 SQL,所以需要进行 SQL 重构。2)是索引粗筛,根据拿到的信息,比如列和分区的查询频度,初步判断怎么建立索引是有效的。3)开始尝试构建索引,支持构建分区级别增量索引。4)在用户无感知的情况下进行任务双跑。5)根据双跑结果进行索引优化的效果评估。6)将索引优化数据输出给用户,推荐用户使用。7)由于索引构建是复杂的,一个表会被多任务引用,一个任务也会去访问多张表,我们提供任务级别和表级别的索引构建,尽可能实现表级和任务级的同步优化。

由于 Iceberg 的 min-max 索引在随机写的情况下是普遍失效的,导致>

实际业务中,Data Clustering 和>

相对于 OLAP 引擎来讲,Iceberg 表,Hudi 表都是外表,这些外表基本都是 TB 级别,使用 StarRocks,Doris 查询外表并不能发挥 OLAP 的查询优势。AutoEngine Service 通过收集 OLAP 引擎的 Event Message,对相应的分区进行加热,也就是将相关分区数据路由到 StarRocks 集群,上层引擎可以在 StarRocks 集群中发现该分区的元数据,由此实现基于存储计算引擎的选择优化。

关于多流拼接,这里举个例子简单说明, 如图所示,有两个 MQ 同时往下游写数据,MQ1 更新列>

那在 Iceberg 层面是怎么优化的呢?由于 Iceberg 本身支持事务和列级的更新删除操作,类似于代码仓库的 Branch 概念,因此可以通过打 tag 的方式去标记状态。具体实现是,初始化阶段,数据写入主流程,同时多流往其他 Merged Branch 去写入,写完之后的话会有一个异步的 Compaction 任务,定期和主流程合并,当用户在读的时候,直接读取 Merged Branch。

通过多流 Join 的实现方法依赖 Compaction Service 的调度性能,当数据规模不断增加,多流 join 聚合计算更新的拼接方式可能存在性能瓶颈,所以我们也引入主键表作为行级更新的另一种实现方式。比如这里我们根据 id 分成四个桶,存在多个任务往一个桶去写数据,一个桶内的数据是有序的,那么下游在读取桶数据的时候会更轻松。但是当 id 的基数很大的时候,比如当 id 为 4/8/16 的时候,都会往一个桶内写数,会产生>

由于对数据湖的高阶特性能力的需要,很多业务做了架构的升级,同时也面临着存量 Thive(腾讯自研 Hive)和 Hive 的数据迁移到 Iceberg。这里需要重点支持的工作包括:存储数据的迁移,计算任务的迁移。

首先存储数据的迁移,我们提供了> 其次是计算任务的迁移支持, 我们改进支持了新的 Name Mapping 机制,增强支持了 Identity partition pruning 能力,使得对于场景的 built-in functions 裁剪能力取得数量级性能提升,优化实现如下:

Iceberg Table Spec 是开发性的实现,可以支持多种语言 API 接入,AI生态圈数据科学等主要以 Python 环境为主,要求高性能 Native 解码,对 JVM 环境无强依赖,PySpark 虽然具备接入 Iceberg 的能力,但是太重了。我们可以直接利用 PyIceberg 能力,无JVM 依赖,加载解码一次即可,提供广泛的机器学习类库的优势,拓展 Python的技术栈到 Iceberg 元数据层面,构造 Pandas,Tensorflow,Pytorch 等不同的>

未来还将从以下方面着手,进行实时湖仓的优化:

拓展 Deletion Vector,解决谓词下推必须联合去重的性能问题

本网站的文章部分内容可能来源于网络和网友发布,仅供大家学习与参考,如有侵权,请联系站长进行删除处理,不代表本网站立场,转载者并注明出处:https://jmbhsh.com/baihuo725/36595.html

联系我们

QQ号:***

微信号:***

工作日:9:30-18:30,节假日休息