1. 首页 > 娱乐 > 娱乐八卦

爱奇艺数据湖平台建设实践

首先简单介绍一下爱奇艺 OLAP 的基本情况:

存储方面,OLAP 目前支持三类存储:

① 离线 HDFS: 用于离线分析、批处理等场景;

② 实时 Kafka: 用于实时分析、在线处理等场景;

③近实时 Iceberg: 分钟级延迟,是今天要重点介绍的数据湖产品。

存储之上是查询引擎,我们采用 SparkSQL 做 ETL 处理,采用 Trino 做 Ad-hoc 即席查询,ClickHouse 用于查询加速的场景。我们通过 Pilot 提供对外的统一查询,支持各类应用场景。

下面来介绍一下爱奇艺数据湖的建设背景。

1、数据湖技术加速数据流通

为什么要有数据湖?数据湖其实就是为了加速数据流通。

Pingback 是爱奇艺内部对端上埋点的习惯名称,每个公司都会有类似的服务。在经典的 Lambda 架构解决方案里,Pingback 数据在投递后,有离线和实时两个通路。

离线通路写到 HDFS 里,然后由离线开发平台构建离线数仓。离线数仓的优点是成本很低,支持的容量也很大。缺点是延迟大,可能要 1 小时或者 1 天。为了解决这个时效性问题,往往会再构建一个实时数仓。通常用 Kafka 作为存储,用 Flink 或者 Spark 这类的流计算任务处理 Kafka 数据,构建实时数仓。

实时数仓的延迟非常低,能做到秒级的延迟,但缺点是成本很高,只能放最近几个小时的数据,要基于 Kafka 做明细查询也是比较的困难的。

其实很多实时分析场景并不需要秒级的延迟,分钟级的延迟就足够了。譬如说广告、会员的运营场景,或者监控大盘等。数据湖产品提供了性价比很高,容量很大的分钟级延迟的解决方案。

爱奇艺的数据湖选型用的是 Iceberg,Iceberg 是一种新设计的开源表格式,用于大规模数据分析。

① Iceberg 本质上不是存储,因为它底层存储复用了 HDFS,或者对象存储。在存储之上构建了 Iceberg 表级抽象,对标 Hive 的表设计。

② 它也不是查询引擎或者流计算引擎,它支持各类计算引擎,比如 Hive、 Flink、 Spark,也支持各类的 SQL 查询引擎。

为什么有了 Hive 表格式还要引入 Iceberg 表格式?

一个经典的 Hive 表可能会有天级分区、小时级分区,或者进一步的子分区。其设计核心是用目录树去组织数据,能够很好地做分区级过滤。

但是它也有着以下缺点:

① 元数据统一存在 Metastore,通常底下是 MySQL,很容易成为瓶颈。

② 由于元信息是分区级别的,没有文件级别的信息,因而当发起一个查询时,制定执行计划需要拿到分区下的文件列表。拿到文件列表本质上是对每一个分区请求 NameNode 做 List 请求。举个例子,一天有 200 多个分区,查 7 天的数据,分区数就会非常多,会发起 O(N) 复杂度的 NameNode 的 List 请求调用,这个元数据的枚举过程会非常的慢。

③ 由于它的最小单位是分区级别的,最大的原子操作就是分区级别的覆盖,其他一些原子操作是不支持的。

Iceberg 新定义的表结构有元数据层和数据层。数据层就是数据文件。元数据层是它很重要的设计点,可以复用 Hive 的 MetaStore,指向最新的快照。元数据里面分多层,记录了具体的文件列表。

每次有新的 Commit,就会创建出新的快照,读请求可以访问旧的快照,写请求写新的。在写的过程中,新创建的数据文件读是不可见的,只有在提交后把最新的版本指过去,新写入的文件才可见。做到了读写分离。同时修改操作是原子的,能够支持细粒度的分区内部的修改。

简单比较一下 Hive 和 Iceberg:两者底层都采用 HDFS 或者对象存储,都是 PB 级的廉价存储方案。区别 Hive 元信息是分区级,Iceberg 是文件级。比如 Hive 分区原本有 100 个文件,加了 5 个文件,那么 Hive 下游任务就需要重新计算 Hive 分区下的全部数据。Iceberg 能够获取到修改的 5 个文件,可以做增量的下游计算。

时效性是 Iceberg 很明显的优势,能够做到近实时,比如 5 分钟级,如果每分钟提交一次则可以做到分钟级。

制定执行计划时,Iceberg 是常数级的,它只读取固定的元数据文件就能够拿到文件列表。

Iceberg 还支持文件级别的过滤,比如基于统计信息或者字典做过滤。

为了方便用户使用,爱奇艺在引入数据湖,首先要做平台化建设。

这是爱奇艺数据湖整体的产品架构图:

最底下是数据源,比如前面提到的 Pingback、用户 MySQL 的 Binlog 解析、日志和监控信息,会分别进到实时、离线和 Iceberg 通道。在 Iceberg 之上,通过 RCP 平台、Babel 平台分别做流式入湖和离线入湖。使用 Trino 和 Spark SQL 去做查询。同时我们开发了数据湖平台去完成元数据管理、权限管理等等。

爱奇艺通过实时计算平台,能够做到很简单的入湖。一个 Kafka 的数据只需要三步,就可以完成配置流任务:首先配置从哪个 Kafka 开始读;然后在里面做 Transform 逻辑,比如筛选、重命名,最后定义写到哪个 Iceberg。

入湖的下一步是查询,也就是出湖。目前 Iceberg 有两类文件格式,V1 格式支持 Append Only 数据,不支持行级修改。Iceberg 发布的最新版本 V2 格式能支持行级更新。

目前 V1 格式是通过 Trino 引擎查询,V2 格式通过 SparkSQL 查询。前端是通过 Pilot,我们的自研 SQL 引擎做分发,能够基于文件格式自动地选择引擎,支持各类用户场景。

下面介绍一些性能优化的工作。

说到数据湖,无论哪个产品都绕不开的一个问题就是小文件问题。Hive 可以批量,比如每小时做一次计算,可以写出很大的文件。在 Iceberg 中,由于需要做到近实时,每分钟或者每 5 分钟写文件,文件就比较小,必然会有小文件问题。我们主要通过两个方面去解决小文件问题:

根据表的生命周期做处理。比如一张表可能只需要保留一年,或者保留 30 天,历史的数据可以删除。

目前平台会限制用户建表必须配置生命周期,通过数据湖平台自动地完成清理逻辑。

清理用的是 Iceberg 官方提供的解决方案,Spark 的 Procedure,先是 Drop 分区,然后 Expire 历史的 Snapshot,再删除孤儿文件,最后重写元数据文件。

这套流程直接跑,有些环节是存在性能问题的,并不能够满足清理的效率:

① 第一: Spark 的使用模式,每次跑任务都需要提交一个 Spark 任务,需要先申请Yarn 资源,再启动 Application,跑完这个任务后这个 Application 就释放掉了。这里可以采用 Spark 的常驻模式,生命周期清理 SQL 可以跑得很快, 资源是不释放的,避免了申请和启动的耗时。

② 第二: 天级的目录删除,Iceberg 官方的实现是比较慢的。它用的是孤儿文件删除的策略,在文件数比较多的时候,扫描过程比较慢。我们做了改进,因为明确知道整个天级目录都不需要,可以直接删除整个目录。

③ 第三: 我们添加了回收站的机制,生命周期误删除时能有恢复的手段。

做了这些优化,线上大概几千个表,都能够按时完成生命周期的清理。比如 Venus 库原先可能有 2 亿个 iNode,清理完稳定在 0 万的数量级。

另外一个处理小文件问题的方式就是合并。最简单的就是配置一个定时合并。

人工配置定时合并比较大的问题是:定时策略比较难配置。比如,什么时机应该做合并,这次合并应该要合并什么范围的数据,如果让业务去配这些信息,每一个 Iceberg 用户就需要非常深入地去理解小文件产生的机理才能够比较好地控制合并的范围。

为了解决这个问题,我们参考了 Netflix 的文章,做了智能合并,它的核心思想是:

不再由用户指定合并行为,而是统计 Iceberg 表每个分区下面的文件数,计算均方差,再结合表的权重因子,算出来哪些表合并效果是最好的,添加到待合并的分区列表里面。然后由合并任务按照优先级完成合并过程,用户无需做配置。

有了智能合并,还要解决合并的性能优化问题,我们也一直跟随社区的发展。在使用过程中,最初 Iceberg 在文件合并这块做得还不是很好。最早的时候,有个问题,Delete File 在合并并没有被真正地删除,目前已经修复。举个例子,如果 Delete 马上有个 Rewrite> 还有一些大表合并任务经常失败。这里我们可以配置 Bucket 分区,将全表合并改为每次合并其中一个 Bucket 分区,减少单次合并的数据量。

还可以应用 Binpack 合并策略去控制合并选择的逻辑。应用 Bucket 分区和 Binpack合并策略, 如右上示意图体现的是文件数的变化, 可以判断这个文件数一直在增长,这个小的下降是小时级分区合并,到一定时间做全表合并,它的文件数据减少得比较多,存在周期性的震荡。

还有一个例子,我们发现在做合并的时候经常会和写入任务冲突,会报一个错误,要合并的这个文件有一个 Position Delete 在引用,其实是一个误判,因为在社区的默认的参数里面,去判断这个>

前文介绍了当小文件已经产生的时候如何优化,但我们更希望小文件最好不要产生,在写入的时候就把文件数控制住。我们需要去了解 Flink 任务写入的时候是怎么控制文件数量的。

左上角示意图中这个 Flink 任务有 100 个并行度,在默认参数 Distribution-mode = None 时每一个并行度都会往分区下写文件,就会写入 100 个文件,一分钟写 100 个文件每个数据文件都很小。

如果配置 Distribution-mode = Hash,如左下角的图中,在写入的时候会先做 Shuffle,基于 Partition Key Shuffle 到特定的 Sink,这个 Flink 任务会把数据都集中到一个 Sink,写到一个文件,就解决了小文件问题。

但又会引入新的问题,数据量比较大的时候,单个任务写文件的效率跟不上,就会造成 Flink 任务反压。这个时候我们用哈希策略结合 Bucket 分区。比如,可以控制 1 个 Hour 下面 10 个 Bucket,通过两者结合起来就可以很精确地去控制 1 个分区到底要生产多少个文件。一般建议写入文件大概在 100 MB 左右是比较合适的。上图的表格中列出了各个参数配置下的文件数量。

解决了小文件问题,接下来是查询的性能问题。在最初做 Iceberg 性能验证的时候,我们发现它的批量 Scan 性能是非常好的,但是点查询的性能就比较糟糕。

举个例子,在订单表中,用特定 ID,如订单 ID 或者用户 ID 去查询明细,简化后的SQL 就是 order_id = ‘555’。默认的情况下,Iceberg 会基于 MinMax 做过滤,但数据按照时间戳排序,MinMax 过滤其实是不生效的,比如 File A 的 MinMax 范围包含 555,File N MinMax 321 到 987 也包含 555,其实是过滤不掉的。因而点查询事实上就是全表扫描。

针对点查询场景,BloomFilter 是非常适用的。最初社区没有这个功能,Parquet 在 1.12 的时候支持 BloomFilter,Iceberg 的默认存储格式也是 Parquet,所以我们考虑修改 Iceberg 引入这一功能。

先介绍一下 BloomFilter 的作用,在这个架构图中,比如,针对 order_id 开启了 BloomFilter,为每一个数据文件构建 BloomFilter,将 order_id 进行哈希后映射到对应 bit,如果值存在就把对应的位设为 1,如果不存在对应的位默认是 0。在 Bloom Filter 里面,如果标志位为 1,这个值不一定存在,但如果标志位为 0,这个值一定不存在。通过努力,我们在 Iceberg 的内核里面添加了相应的支持。在 Spark 读取 Iceberg 和 Trino 读取的时候也添加了相应的能力。

BloomFilter 支持 Equals 和 In 过滤。如果标志位为 0 是一定能过滤的。不支持 not equals、not in、比较符等过滤条件。

示意图中 order_id = 555 这个条件,哈希后另外两个文件对应的标志位值都是 0,在查询的时候就可以很快地把其他文件过滤掉了,能够精确命中订单所在的数据文件。

经过测试,在 Spark SQL 中的订单 ID 查询,原来全表扫描需要将近 1000 秒,开启 BloomFilter 后只需要 10 秒钟。Trino 开启 BF 后,可以过滤 98.5% 的查询,CPU 消耗只有以前的 5%。

BloomFilter 会带来额外的空间开销。经过简单的测试,大概有 3% 的额外空间损耗。即 3% 的存储代价可以带来点查询 100 倍的提升。

查询优化另外一个工作是 缓存加速 ,如使用 Alluxio 做缓存加速。

这是爱奇艺 Trino 查数据湖的架构图。业务通过 Pilot 引擎分发到 Trino 网关,自动地选择使用哪个 Trino 集群执行查询。原本 Trino Worker 上面的 SSD 存储是浪费的,我们在之上混布了 Alluxio,复用了原本闲置的 SSD 存储,几乎没有什么额外机器开销。

以前去查 HDFS 可能会有性能抖动,比如,业务有一个大的批任务,导致 HDFS 性抖动,查询性能会降得很厉害,Alluxio 缓存能够很好地屏蔽这一点。经过测试 Venus 日志应用 Alluxio ,P90 从 18 秒可以降低到 1 秒。

在实际的使用过程中发现 Trino 查询有个意想不到的问题,元数据读取性能远比我们想象中的要慢。比如,读取一个 5 M 的元数据竟然要 3 秒钟,后面查数据可能只需要 1 秒,元数据反而更慢。

通过火焰图和阿里的 Arthas 做定位,发现 Read 的方法被调用了百万次,文件总共 5 M,读取 100 多万次是非常不合理的。进一步跟踪,定位原因是父类里面一个 Read 方法的默认实现会逐个 Byte 读取,Trino 这边没有覆盖这个方法的实现,就会降级到默认方法,每次读 1 个 Byte ,所以调用次数非常多,导致很慢,优化耗时缩短到了 0.5 秒。

最后来介绍业务落地的情况,在应用了上述优化后,业务能取得什么样的效果。

第一个例子是广告的流批一体场景。原来的实时链路中,实时数据通过 Kafka 写到 Kudu,离线数据同步到 Hive,通过 Impala 来统一查询,基于离线覆盖的进度将查询分发到 Kudu 和 Hive。

使用 Iceberg ,实时和离线数据都更新 Iceberg,不需要进度管理,直接查询 Iceberg 表即可。Iceberg 实现了两方面的统一,一是存储统一,不需要有两个类型的存储,查询不需要做拆分。二是任务开发统一为 SQL,原先离线是 HiveSQL,实时是 Spark Jar 包,统一为 SQL 开发。数据入湖后结合分布式改造,广告智能出价全链路由 35

Venus 是爱奇艺内部的日志分析平台。之前的架构中 Kafka 数据往 ElasticSearch 里面存储,如果业务流量较大就给它一个独立集群,小流量业务则用公共集群。这个方案存在一些问题:一是流量调度很难做,当集群流量有瓶颈时,需要把流量拆分走;二是 ES 的存储成本非常高。

存储改用 Iceberg 方案后,所有业务的流量都写到一个 Iceberg 集群,不需要拆分流量。Venus 接入层通过日志查询平台,数据存储的切换对用户是透明的。Iceberg 带来的好处包括:

① 成本显著下降。 不需要独立的 ES 集群了,Iceberg 和 Trino 都复用现有的资源,并没有什么额外的成本。

② 稳定性大幅提升。 因为 ES 的成本太贵,没有配副本,一旦单个磁盘或节点有问题,都会引发用户的报障。用 Iceberg ,写入带宽非常大而且稳定性很好,报障减少了 80% 以上。

接下来是爱奇艺内部的审核场景,审核场景需要对一些历史的行记录做修改。没有 Iceberg 以前,没有很好的技术方案支持行级更新。

原来解决方案里用 MongoDB 存全量的数据,做行级的更新,然后用 ES 构建二级索引,改用 Iceberg 两个存储都统一到 Iceberg 里面。对业务带来的好处是:

① 原本的监控告警要定期查 ES 做聚合,用 MySQL 开发报表,现在不需要了,报表直接查 Iceberg 就可以,能够支持实时告警。

②数据湖大幅提高业务的效率。原本分析任务开发非常复杂,要从 Mongo 里面导数非常不方便。有了数据湖可以统一为 SQL 查询。

最后是 CDC 类数据入湖,此处以订单为例。基于 MySQL 数据做大数据分析,有两类解决方案: 第一类 是每天导出一份到 Hive,缺点是每次导出都是全量,延迟很大,只能看一天以前的数据。另外全量导的性能也很差,对 MySQL 压力也比较大。 第二类 是实时解决方案,增量变更写在 Kudu 里面,Kudu 是一个成本很高的解决方案。如果 Kudu 写入带宽波动,同步任务负责人需要去做运维操作。

使用数据湖方案,爱奇艺实时计算平台,通过 Flink CDC 技术很方便地可以将 MySQL 数据入湖。 数据湖方案具备如下优势, 一是近实时,数据延迟在分钟级,远优于之前的离线方案;二是成本低,相比于 Kudu 无需独立节点,大幅降低机器成本;三是省运维,Iceberg 写入带宽大且稳定,大幅降低运维代价。

最后介绍一下未来规划。爱奇艺未来会在流批一体里面有更多的落地,包括广告的全面推广、Pingback 在 BI 场景的落地。另外,我们计划把数据湖落地在特征生产,可以由以前离线或者批的特征生产,变成近实时,能够支持晚到数据,支持样本的行级的修正。

在技术方面会尝试把 Iceberg 的 Puffin 统计信息用于查询加速的场景。还会对社区在做的 Branch 和 Tag 进行调研,寻找内部的落地场景。

本网站的文章部分内容可能来源于网络和网友发布,仅供大家学习与参考,如有侵权,请联系站长进行删除处理,不代表本网站立场,转载者并注明出处:https://www.jmbhsh.com/yulebagua/36590.html

联系我们

QQ号:***

微信号:***

工作日:9:30-18:30,节假日休息