1. 首页 > 百货 > 母婴用品

Iceberg 在小米的应用 数据湖

Iceberg 是具有 SQL 行为的表的开放式标准,此定义由 Ryan Blue 提出。这个定义中包含了两点:

第一点,Iceberg 有 SQL 行为,意味着 Iceberg 是针对于结构化数据的,具有结构化数据的特性,如 Schema 等。

第二点,Iceberg 是一个开放性的标准,开放性标准体现在两方面。第一方面体现在设计上,Iceberg 支持多种文件格式,在存储介质上可以选择各种分布式存储或者云存储(如公有云),在上层应用上支持了 Flink、Spark、Hive 和 Trino 等多种查询引擎。第二方面则体现在社区上,目前已经有多家公司参与设计和建设。

接下来介绍 Iceberg 的几个特点。

1、Iceberg 可以避免意外发生

Iceberg 表可以放心使用,无需考虑太多不愉快的事情发生。

对表的任何操作都是原子性操作,同时使用多快照提供了读写分离的特性。

可以对 Iceberg 表进行 Schema 修改,比如字段类型提升、增加列、删除列、重命名列、调整列顺序等。这里需要说明的是,字段类型不是可以随意更改的,Iceberg 只支持字段类型提升。例如,int 改成 long,float 改成 double,或者精度增大等。

2、Iceberg 支持隐式分区

Iceberg 有多种分区函数供选择,如下图所示。当我们需要根据某个 timestamp 类型的字段提取出的年、月、日或者小时进行分区时,可以直接使用 Iceberg 提供的分区函数。Iceberg 还支持多级分区,在分区选择上具有更高的灵活性。

与 Hive 进行对比,隐式分区体现在:

(1)Iceberg 写入时,不需要像 Hive 一样指定分区,写入哪个分区是由 Iceberg 自动管理的。这样的好处在于,可以保证数据分区是正确的,防止用户错误导致数据分区错误。

(2)用户查询时,不需要考虑分区的物理结构。假如一张表使用 date 字段做了分区,用户查询时不需要考虑这个字段是进行了月的分区,还是天的分区,只需要按照这个字段进行查询即可,Iceberg 会自动生成查询计划,如下图所示。

(3)在目录结构上,Iceberg 具有元数据层,通过记录分区和文件地址之间的关系,实现了物理结构和逻辑结构的分离。这样,可以非常方便地进行 partition evolution 操作。

3、Iceberg 的行级更新的能力

Fomat version 2 中提供了行级更新的能力,在 Iceberg 中使用了两类文件进行标记删除。第一类是 position delete file,这类文件可以指定文件和行号进行删除。第二类是 equality delete file,这类文件记录了被删除记录的唯一键进行删除。Iceberg 只是规定了可以使用这两类文件进行删除,但具体由哪一类文件或两类文件共同使用以达到删除目的,是由引擎层来决定的。下图中是 Flink 引擎实现行级删除的模式,对当前事务写入的文件使用 position delete file,而对于之前的事务写入的文件会使用 equality delete file 进行删除。查询时,使用 Merge On Read 模式,可以得到已经删除成功的结果。

本节介绍 Iceberg 数据湖在小米的几个应用。

1、日志入湖场景

小米原有的日志入湖的数据链路如下图所示,用户会在 Client 端使用 MQ 的 SDK,将数据发送到 MQ 中。小米使用 Talos 作为 MQ,对标于业界的 Kafka,MQ 中没有 Schema。之后使用 Spark streaming 将文件直接 flush 到 HDFS 上,然后使用 add partition 挂载到 Hive 上。

(1)使用了旧版本的 Spark streaming,实现的是 at least once 语义,数据可能会出现重复。

(2)由于 MQ 当中没有 Schema,只能使用上报的时间进行分区。这样,会在凌晨的时候出现分区漂移的问题。

(3)直接 flush 文件到 Hive 上时,Hive 的 schema 与文件 schema 可能不匹配,导致历史数据读取时可能会出现问题。

针对以上问题,我们使用 Iceberg 对日志入湖的流程重新进行了设计,修改后的数据链路如下图所示。在 MQ 上配置 Schema,使用 Flink SQL 进行解析,然后写入到 Iceberg 中。

这个数据链路有以下几个特性:

(1)使用 Flink SQL 的 exactly once,保证数据的不丢不重。

(2)使用了 Iceberg 的隐式分区特性,保证数据分区的正确性,避免了分区漂移问题。

(3)Schema On Write 以及 schema evolution 特性,保证数据在 schema 演变过程中也永远是正确的。

链路在实际落地中,可能会出现数据丢失的问题。数据丢失的根本原因是链路上的数据不规范。Talos 使用的 Schema On Read 模式,用户将 Schema 附加到 MQ 上,在 MQ 到 Iceberg 的过程中,有一个 Schema 同步的过程。但由于 MQ 中的 Schema 人为配置可能延迟,会导致 MQ 的 SDK 发送的数据与 MQ 中 Schema 不一致,使得 FlinkSQL 解析的时候可能会丢掉一些列。最终用户角度看到的就是数据丢失。想要解决这个问题,要在流程中进行规范,首先定义 Schema,然后发送数据。

2、Flink+ Iceberg 构建的近实时数仓

小米有很多的 IOT 设备,在这些设备上打点有两个痛点问题:

(1)设备打点数据延迟上报问题非常严重。 假设一台设备的一批数据没有上报,然后关机,过了一个月数据才上报,那么数据开发工程师需要将过去一个月的数据进行重新计算和存储。由于 Hive 不支持事务性,那么在进行重新计算然后覆盖过去一个月的数据的过程中,可能会导致下游读取的异常。

(2)由于 Spark 离线任务通常都是 T+1 的,所以凌晨时会启动很多的 Spark 作业做指标拆分,将 ODS 的数据拆分到 DWD 层,这会导致集群的资源紧张,数据产出的延迟风险非常大。

针对这些问题,我们使用了 Flink+Iceberg 对链路进行重构,重构后的数据链路如下图。

这个链路具有以下特点:

(1)首先在入湖侧,Iceberg 的隐式分区可以保证打点延迟的数据能够正确分区,以刚才的例子,一个月之前的数据不需要覆盖写入,只需要将下游的数据进行回溯即可。

(2)结合 Iceberg 的灵活分区,使用 date+event_name 进行了二级分区。这样,下游进行指标拆分时,只需要指定二级分区就可以进行消费,这样可以大大减少数据的扫描量,进而节省计算资源。

(3)整个链路中使用 Flink 来替换 Spark,这对用户来说非常重要,因为它意味着凌晨的计算量可以平摊到全天,这样产出延迟的风险可以大大降低。分摊到全天并不意味着风险变高了,相反,Flink 的 checkpoint 只有十几分钟到半个小时。这样,即使作业失败,恢复的代价也会比较小。

3、离线场景下遇到的一些问题

Iceberg 的离线场景是比较完善的。但是,若需要数据链路稳定,仍然需要一些努力。

分区完备性校验,即如何感知到上游的 T-1 数据已经写入完成,从而开启下游作业。这里分成两个场景。

之前 Hive 表的校验逻辑是校验 success 文件。但是 Iceberg 写入并没有 success 文件。同时 Iceberg 表的分区散落在各元数据文件当中,而 list partition 操作非常耗时。针对这一问题,我们使用了任务依赖,不是使用数据依赖来依赖分区的检测,而是依赖于上游的任务。当上游任务写入完成之后,下游任务就可以进行调度。

Iceberg 表分区在写入第一条数据时就已经生成,这样也无法校验分区。并且,在实时场景下,经常会有数据延迟到达的问题。针对这个问题,我们参考了 Flink 的 watermark 机制,使用了 Iceberg 的 watermark,根据用户提供的时间列来生成一个时间戳,如下图所示,我们会在快照里增加一个时间戳,有一个单独的检查作业来对比分区和 watermark,当 watermark 超过分区时,即意味着分区写入完成,业界也称这种方式为流转批。

① 试图将 z-order 应用于 ETL,在实践中,z-order 在整个分区中执行的代价很高。而且,对于 ETL 底层的一些表(如 ODS,DWD),查询的次数比较少,z-order 带来的收益不大。因此,建议用户使用 local sort 进行排序写入的方式。

② 我们在内部实现了 parquet 的 page column index,相比 parquet 之前的谓词下推的方式时 row group 级别的,一个 row group 是 128M 或 256M,而 parquet 最小的可读单位其实是一个 page,大概是 2MB 左右,page column index 会对 page 建立一个 min-max 索引,查询时可以利用查询谓词和 page 的 min-max 索引来对数据进行有效过滤,最终读入更少量的 page 进行计算,如下图所示。

在小米内部 benchmark 场景中,效果还是不错的。最好的情况下,可以过滤 80% 的数据。但若查询的是非排序列,比如下图的 Q7 到 Q9,基本上没有什么改善效果。

(3)隐式分区在离线场景的问题

当我们将 Iceberg 引入到离线场景之后,由 Iceberg 自带的隐式分区和 dynamic overwrite 带来的结果与用户期望有所不同。例如,假设表结构中含有四个字段(如下图所示),我们使用 date 按天分区之后再使用 hour 按小时分区。

当我们使用语句 insert overwrite catalog.db.table_test values(1,‘a’,20230101,1),(2,‘b’,20230101,2) 进行覆盖写入后,会发现查询结果只覆盖了date=20230101/hour=1和date=20230101/hour=2分区,没有覆盖date=20230101/hour=3 的分区。这意味着 dynamic overwrite 对隐式分区操作时,不会覆盖所有的二级分区。此时,用户希望回归到 Hive 的使用方式,解决方法是使用 static overwrite 来指定分区进行覆盖。将覆盖语句修改为:

Iceberg 类型和多引擎类型的对齐上存在一些问题。如 Iceberg 当中的 timestamp 类型有两类,第一种是带有时区的 timestamptz,第二种是无时区的 timestamp。

而 Spark 的 timestamp 类型只有一类,即有时区的 timestamp 类型。这样就带来一个问题,如何使用 Spark DDL 来创建出 Iceberg 的无时区的 timestamp 类型呢?这时需要配置一个参数:

当使用Spark来读取Iceberg timestamp类型时,则需要配置另一个参数:

这时 Spark 会把无时区的当成有时区的进行处理,也就是说当时间戳是 UTC 的 0 点,那么 Spark 读出来的就已经加了 8 个小时了(这里假设系统时区为 UTC+8)。这样用起来似乎也没什么问题,但是与 Trino 比较起来就有问题了。当我们在平台上同时提供了 Spark 和 Trino 两种 adhoc 的查询方式,会发现结果是不同的。这个问题在 Spark 3.4 之后应该会有所改善,因为设计中会引入一个新的无时区的时间戳类型。

4、实时集成入湖

我们将 MySQL、TiDB、Oracle 等关系型数据库的 binlog 日志采集到 MQ 当中,再使用 Flink 写入到 Iceberg 的 format v2 上,如下图所示。

这种数据链路的特点包括:

(1)整个链路借助于 Flink 的 Exactly Once 和 Iceberg 的事务性,可以到达一个端到端的 exactly once 的语义。

(2)Iceberg 对实时支持可以达到分钟级别。

(3)Iceberg 自身的 merge on read 设计,需要后台定时执行 compaction 任务。Iceberg 的 compaction 是一种插件式设计,到目前还未实现在 Flink 当中。目前,当需要使用 Flink 进行类似于 HBase 的限流或写停等操作时,尚需自己开发。假如 Compaction 任务异常终止,写链路是感知不到的。会造成写入时没有问题,但是查询时速度很慢的现象。

此外,我们在 v2 中发现更多 Iceberg 存在的问题:

Iceberg 本身并没有明确说明在表中可以配置一个主键,而是将这个权利交给引擎层去处理。这张表是否可以保证唯一主键,完全取决于引擎及使用方式。即使使用了支持声明主键的引擎,也很难保证声明的主键的唯一性。除非默认开启 Upsert 方式,但这种方式代价比较高。

Iceberg 的文件组织实现方式的 Upsert 的代价比较高。因为 Iceberg 在设计时,希望数据尽可能入湖且没有索引,所以不会去校验这条数据是否已经存在了。Upsert 的实现方式为 delete+insert 方式,即写入两条记录,一条删除一条新增。当数据量比较大时,会导致 equality delete file 文件过多。解决方法有两种,一是增加 compaction 频次,二是通过 bloom filter 来过滤掉一些无用的 delete。

实时写入时,compaction 和写入会出现并发冲突,这往往是由于 compaction 过程中,有一条 position delete 数据写入了。这种方式下,Flink 是比较友好的,因为 position delete 只会指向一个新增的文件,不会对历史的文件进行引用。因此在校验时,可以对 position delete file 在快照中打标记,从而忽略由 position delete 带来的冲突进而导致 compaction 失败问题。

Iceberg 与 Hudi 或 Paimon 不同,没有专门的 changelog 供 Flink 直接消费。我们需要从文件组织中将 changelog 自行解析出来,这样的解析代价很高,并且可能出现由于 Upsert 操作而带来的 changelog 不准确。小米内部实现了单事务中解析出删除的数据和插入的数据,然后以顺序的方式提供给下游消费。但是若单个快照中,先删后写的操作过多时,会导致下游波动。Changelog 不准确(尤其在非主键聚合的场景下),是通过配置 changelog CDC 去重来解决的,依赖于 Flink 内部的state 撤回的机制来解决,配置语句为:set table.exec.source.cdc-event-duplicate=true。

5、列级数据加密

Iceberg 由于元数据层的设计,可以在 Iceberg 表上实现数据加密。列级数据加密主要是利用了 parquet 1.12.2 高版本的加密能力。之前,小米内部的数据加密是依赖于隐私集群,单独的 IDC 机房的隔离会造成运维成本高,以及数据孤岛的问题。因此我们参照社区在 Iceberg 上实现了一个数据加密,这个方案称为单层数据加密。

与直接数据加密方式不同,直接数据加密的每条数据的写入都会调用一次 KeyCenter 进行加密,然后写入。单层数据加密会在 Iceberg 表中保存加密之后的一个密钥,当写入程序写入时,会调用一次 KeyCenter,对加密的密钥进行一次解密以获取明文密钥 DEK,然后对数据进行加密写入。读取过程与写入过程类似,读取时会对 Iceberg 元数据中保存的加密密钥进行解密,进而 对数据进行解密处理。这里会涉及两个密钥,一个是 Iceberg 表自身保存的 DEK,另一个是对这个 DEK 加密的 KeyCenter 中的密钥。单层包裹的加密方案的优点是:

(1)parquet 列级数据加密,不需要对所有的列进行加密,用户可以选择需要加密的列。

(2)对 KeyCenter 压力较小,写入和读取时只需要对 KeyCenter 访问有限次数。

这个方案在小米内部实现的是简化版本,我们会对一个 Iceberg 表维护一个 DEK 密钥。而社区的方案中,密钥粒度比较细,可以是分区粒度的密钥,也可以是文件级别的密钥。

6、Hive 升级 Iceberg 的调研

可以使用社区提供的 migrate 原地升级的方案进行升级。社区提供了 Spark 的 procedure 语法,使用 CALL migrate 语法可以直接将 Hive 表升级为 Iceberg 表。下面的例子中,将 Spark_catalog.db.sample 表升级成了 Iceberg 表,同时将新增属性 foo 为 bar。

但这种方式在实际落地中存在一些问题:

① Iceberg 支持的文件只有 parquet/orc/avro 这三种格式,不支持 text、sequenceFile 等文件格式。导致一些 Hive 表无法支持升级为 Iceberg 表。

② 表下游消费离线作业的 Spark 必须是 2.4 以上的版本。而小米内部存在一些低版本的 HiveSQL 和低版本的 Spark 作业,因此这部分表是无法使用这个方案进行升级的。

出于减少下游作业的改动的目的,我们希望能够复用 Hive 的 location。写入的时候写入到 Iceberg 表,让 Iceberg 表和 Hive 表的存储地址相同。这样我们只需要升级上游作业,下游表在 catalog 层仍然存在,这样下游作业不需要改动,如下图所示。

这个想法是比较好的,但是实现过程有些取巧,因为 Iceberg 是多快照的,因此一个分区下,可能会有多个副本,而 Hive 是通过 list 目录来读取数据的。这样,Hive 在读取时,可能会读取到重复数据。若想要让 Hive 读取单快照,那只能及时清理 Iceberg 快照和残留文件。但是这样又使得 Iceberg 失去了事务性,而且受限于 Hive 下游消费作业,Iceberg 的一些特性(如 schema evolution)也都受到了限制。若是 Hive 的 parquet 版本和 Iceberg 的 parquet 版本不一致,那么改动会非常大。最终这个方案被放弃。

这是业界使用最多的方案,这个方案的思路是:创建一张相同的 Iceberg 表,将 Hive 的历史数据回溯到 Iceberg 当中,然后升级上游作业,随后测试验证和升级 Hive 的所有下游作业,让其消费 Iceberg。

为什么这个方案比较麻烦,但是用户愿意迁移呢?主要有两个原因:

① 我们在 Iceberg 上使用了 ZSTD 的压缩算法,得益于 ZSTD 更高的压缩率,使得存储成本可以降低 30%。

② 在回溯历史数据的时候,我们对大字符串进行了排序,这样可以提高数据的相似度,进一步提升压缩率。对一张表来说,存量数据在存储中占有更大的比例。若是能够对历史数据的存储空间减少 30%,用户还是可以接受改造的。

7、Iceberg 在小米的应用现状

目前有 1 万 4 千多张表,日新增已经超过了 Hive,总的数据量已经达到 30PB。

首先,我们将跟进物化视图的功能。在 OLAP 场景且没有谓词下推的情况下,我们期望通过预计算的方式来提高 Iceberg 的查询能力。

其次,我们将跟进 Iceberg 在 Spark3.3 上的 changelog view。这个功能使得 Spark 可以获取到 Iceberg 的 changelog,我们希望在离线场景下也可以进行增量读取和更新。

最后,小米会在海外集群上探索数据上云。小米内部都是 EBS 挂载,EBS 本身比较贵,而 HDFS 本身有 3 个副本,相比直接使用公有云成本较高。

Q1:为什么要 Spark streaming 切换为 Flink SQL,主要出于什么考虑?

A1:主要是内部架构考虑。第一是,Spark Streaming 的 2.3 版本的 At least once 语义会导致数据重复。第二是,引入 Flink 之后,开始积极向 Flink 方向靠拢,不再去维护 Spark streaming 的方向,在替换为 Flink SQL 之后,对整个数据链路进行了迭代。

Q2:watermark 是 Iceberg 已经存在的,还是业务自己加的?

A2:这个需要业务自己配置使用什么字段来作为 watermark 的生成字段,需要用户自己配置。然后 Flink 在写入时,会在快照中生成 watermark。

Q3:小米在强实时场景中用到了 Hudi 吗?

A3:没有,小米在强实时场景走的 MQ 那套数据链路。

Q4:选型上为什么是 Iceberg 而不是 Hudi?

A4:最初为使得 kappa 架构和 lambda 架构得到统一而调研了数据湖的组件,选择 Iceberg 的主要原因是 Iceberg 的开放性和多引擎支持。2021 年 4 月份,Iceberg 最先支持了 Flink。而当时,Hudi 和 Spark 还未解耦。我们出于使用 Flink 的角度而选择了 Iceberg。实践中,Iceberg 在实时数据的处理中,尤其在 CDC 处理方面,可能没有 Hudi 那么易用。我们也对 Iceberg 进行了二次开发,才把数据链路运行得稳定一些。

Q5:历史的离线作业仓库,数仓作业为 Hive 作业,如果切换到实时链路 Iceberg,如何做到无感知切换?比如说,SparkSQL 语法与 FlinkSQL 语法不同,以及 UDF 实现不同。

A5:目前没有办法做到无感知切换,SparkSQL 和 FlinkSQL 语义上就不大一样。若是切换到 Flink batch 还有可能,但若是想要离线切到实时,基本上要把整个逻辑的实现一遍。

Q6:目前实时数仓当中,append 模式和 Upsert 模式的数据延时可以做到几分钟?尽可能避免数据延迟到达。

A6:这两种模式,目前最低都是 1 分钟。我们约束了用户配置的 checkpoint 时长,最低不能低于 1 分钟。

Q7:如何使用 local sort 进行多列查询?

A7:这个可以写入时在算法上使用 z-order 排序替换默认的排序算法来实现。

Q8:切换 Iceberg 带来的切换成本是怎样的,业务需求是否很强烈?

A8:Iceberg 带来的事务性、隐式分区、多引擎支持的特性可以切实解决用户的问题。即使切换过程中有很大的成本,当数据湖方案确实可以解决用户的痛点时,用户也会想用这个新架构去替换。

本网站的文章部分内容可能来源于网络和网友发布,仅供大家学习与参考,如有侵权,请联系站长进行删除处理,不代表本网站立场,转载者并注明出处:https://www.jmbhsh.com/muyingyongpin/36600.html

联系我们

QQ号:***

微信号:***

工作日:9:30-18:30,节假日休息