一、Lambda架构与实时数仓开发痛点
1、小红书的数据平台概览
首先来整体介绍一下小红书的数据平台。
首先在最底层是一个个 Cloud,包括计算、存储等。在这一基础之上,是数据采集层,采集一些原始数据,比如用户行为日志数据、RDBMS 关系型数据库的增量日志数据,以及其他一些文件系统等。
然后基于源头数据层(ODS 层)之上是数据存储和加工层,主要分为两大块:一是偏离线的部分,主要使用 Hive、Spark 计算,使用 AWS S3 存储;二是偏实时的部分,主要使用 Flink 计算,使用 Kafka 存储。
再往上是一个数据共享层,我们把一些聚合数据、Join 数据和宽表数据写入数据共享的一些分析引擎中,比如 ClickHouse、StarRocks、TiDB、HBase 等等。这些都是作为数据共享层数据存储的底座,以及计算分析引擎的一个入口。
最上面是应用层,我们基于这一层做报表、即时查询等,还会对数据做封装,打造一些统一的数据产品。
2、典型的 Lambda 架构在小红书的实践现状
小红书采用的是典型的 Lambda 架构。实时链路主要使用 Flink 和 Kafka;离线链路主要使用 S3、Spark 和 Hive。Lambda 的特点就是两条链路互相独立建设,互不影响。
Lambda 架构的痛点可以总结为三个方面:
① 实时和离线数据不一致 ,造成数据不一致的原因主要有三点:计算引擎不一致,相同 SQL 定义也容易产生不同结果;作业不同,开发人员需要维护两套代码,技术门槛高;数据 TTL 不同,Join 分析天然误差。
② Kafka 缺乏数据检索能力 ,对用户来说 Kafka 更像一个黑盒。不管 Kafka 中数据存储的是一些类似 protobuf 的数据还是 json 格式的数据,在做检索的时候都非常困难。如果用户想要根据某个条件去检索数据,这个数据很难被查找。KSQL 产品更像是一个 streaming 的处理,更注重的是实时流处理能力,用来做离线大规模检索并不适合。
③ 流存储存数据有限,回溯效率低。 这一点最大的原因是成本高,数据不能无限存。而且如果要去回溯读,从历史上去回追数据,它读的性能也不及批量读。
二、流批统一存储架构介绍
基于 Lambda 带来的痛点,我们萌生了去开发一个流批存储的产品的想法来解决 Lambda 的痛点。下面就来介绍一些设计细节。
1、流批统一存储架构介绍
如下是流批统一存储的整体架构:
我们的流批统一存储叫 Morphing Server,对用户提供的 API 还是跟 Kafka 完全兼容,都是使用流式的方式去写入和消费,这些接口都没有变,所以用户的使用方式不会有任何变化。
区别在于用户写入数据到 Kafka,Kafka 内部会有一个线程,异步将数据同步到数据湖中。我们的数据湖是采用的 Iceberg,当数据写入到 Kafka 中,内部线程会去抓取 Leader 数据,经过一些 Schema 数据解析转换为 Table Format 格式写入到 Iceberg 中,这个过程是异步的,对用户来说是无感的。
Kafka 的数据会被其他 Flink 作业消费,消费完之后可以写到下一个 Kafka 中,在下一个 Kafka 依然是以异步的形式将数据落地到数据湖中。数据湖中的数据就可以提供批读取和批存储的能力。对于 Iceberg 中的数据如何去读取的问题,我们会根据实际情况选取一些高性能的分析引擎,比如 StarRocks、小红书自研的 RedCK 等来读取离线数据。
这里我们总结了 6 点流批统一存储所提供的能力。
① 流批统一: 同时提供流存储和批存储的读写能力,构建多种应用场景。
② 无感写入: 对外提供的写入接口为原生 Kafka API,用户无需关注落数据湖过程,自动异步写湖。
③ Schema 解析: 数据在落湖前会提前进行 Schema 解析,以结构化、半结构化的 Table 形式提供查询。
④ 高速分析: 借助 StarRocks 引擎的强大湖上查询能力,能够提供向量化、CBO 等高速查询能力。
⑤ Exactly-Once: 流、批数据实现 Exactly-Once 语义,数据一致性高。
⑥ 支持 Rollback: 支持批数据的 Rollback 能力,在 Schema 变更不及时下,回溯修复数据。
接下去,我们介绍一下技术选项是如何去考量?关于技术选项分为两个部分:自动落湖的过程如何选择;对于数据湖中的数据如何选取合适的引擎去更加高效读取
对于自动落湖过程我们考虑了两种形式,Builtin(内嵌)和 Extension(外挂插件),这两种形式其实都是可以的。
在 Builtin 的形式下,我们看到只有一个独立的进程,在里面处理落日志之外,还会有一个异步的线程叫 Iceberg Syncer 去不断拉取日志中的数据,然后写入湖中,这种方式有优势也有劣势。
优势如下:
①产品形态完整,统一入口。
② 不需要额外维护外部组件。
③ 资源利用率高,共享进程。
劣势如下:
① 企业内生成集群版本难以升级,在企业中有一些集群并没有流批一体的功能,在升级中会非常困难。
② 进程隔离性弱,如果在异步线程中产生 bug,可能影响 Kafka 正常的读写功能。
针对 Builtin 形式的一些劣势,我们当初考虑了另外一种选项 - Extension,这个方式相对更加直观。
Extension 形式,也存在着一些优势和劣势。
优势如下:
① 接入灵活,集群不需要升级,我们把 Kafka 落湖进程摘取到 Kafka 进程之外,是一个单独的进程,这是最大的一个好处。
② 流存储可替换,并不局限于 Kafka,可以替换成其他引擎。
劣势如下:
① 运维成本高,组件依赖过多,需要维护两套组件。
② 产品体验稍差,整体性弱。
目前我们落地的是 Builtin 的方式,所面介绍的一些细节方案都是基于 Builtin 方式的。
接下来介绍查询分析引擎的选型。我们希望找到一款 OLAP 产品,具备以下 特点:
① MPP 架构、向量化和 CBO 来提高分析性能。
② 支持多场景,能够在各种场景下满足我们的需求。
③ 大规模,离线分析数量大,数据种类多的情况下,在大规模数据量下性能不退化。
基于这些考量,有两大类选择:左边的是 Apache Doris 和 StarRocks 为代表的 OLAP 分析引擎;右边是 ClickHouse 和小红书基于 ClickHouse 自研的 RedCK 分析引擎。
左边的分析引擎对分布式支持更好,对 SQL 协议兼容性高,提供更加一站式的查询平台。右边的分析引擎对单表性能更加优秀,在超大规模下的数据承载能力更强,特别是我们在 RedCK 上做了一些深度的定制化自研去满足更多应用场景。
下面介绍我们在分布式引擎上选择的 StarRocks。
StarRocks 支持湖上分析能力。它本身支持读数据湖,不需要将数据以任何形式同步到 StarRocks 上,更像一种外表的形式,可以通过 Iceberg 的 Catalog 去查询数据,还会做一些 Cache 缓存来加速查询。
(2)StarRocks vs Persto 在流批一体(Iceberg)上的查询对比
我们对 StarRocks 和 Presto 在流批一体上做了查询性能的对比,主要分为两大类,四小类的 SQL 进行比对。
左边主要是 Scan 全表扫描相关,在这一方面 Presto 的性能更加优越,但是两者差距不大。右边主要是 GroupBy 相关的聚合场景,具有 MPP 架构的 StarRocks 在性能上明显更加优于 Presto。这也是我们选择 StarRocks 的原因。因为在这个应用场景下 Join 使用较少,所以这里没有进行对比。
还有一类分析引擎就是之前提到的 ClickHouse 和 RedCK,如何去更好的分析湖上的数据,这里介绍一下我们自研的 RedCK。
它是一个存算分离的架构,主要分为三个模块:Service、Query Processing 和 Storage。
Service主要提供 Gateway 网关和 Service Discovery 服务发现,能够让业务更好的接入;Query Processing是计算层,可以去解析 SQL 生成执行计划,分派这些任务去读写;Storage是存储层,支持文件存储比如 HDFS 和 Juice FS,还支持对象存储比如 OBS 和 COS。
接下来看一下 RedCK 和流批存储是如何结合的。
RedCK 通过 MergeTree 的格式跟其他查询引擎打通,比如 Spark、Flink 等计算可以直接读写 MergeTree 上的数据,然后通过 RedCK 在 MergeTree 上做 OLAP 分析。这样的好处是使用 Spark 在写数据的时候可以有一个更好的性能,做到了读和写两种引擎的解耦。
基于这个考虑,我们在 Kafka 流批一体的引擎在落湖的过程中,原本只支持传统的 Parquet 现在也支持写 MergeTree 格式,同时也去提交一些和 RedCK 相兼容的元数据信息。这样 RedCK 可以根据元数据信息直接找到 MergeTree 去做一些分析。
整体上,落湖分为两大块:Commit 模块和 Broker 模块。
Broker 模块主要负责的是数据湖写入,利用 Kafka 本身的 Fetch 机制,将 Leader 上的最新数据进行解析并且不断写入,按照 Partition 维度来做单独的线程写入数据。
Broker 的设计主要包括如下内容:
① Replica Leader:Kafka 原生部分,处理 Produce 请求和 Consume 请求。
② ReplicaRemoteFetcherThread:主要工作线程,异步 Fetch Leader 数据,经过 Schema 解析,写入 Iceberg。
③ DefultSchemaTransform:Schema 解析模块,提供写入 Schema Server 变更。
④ IcebergRemoteLogStorageManager:封装 Iceberg 接口,提供写入 Iceberg 的 API 集合。
② 与 Broker 交互:发送 Checkpoint 请求,协调各 Broker Checkpoint 信息。
Exactly-once 语义主要依托于两阶段提交来实现数据不丢不重,具体如下:
① 第一步,Committer 向所有 Broker 发起一个 RPC 请求,也就是 Checkpoint 请求。
② 第二步,Broker 在接受到 Broker 请求之后将目前为止还没 Flush 的数据 Flush 到 Iceberg,完成之后将 Checkpoint 信息记录到 Checkpoint Storage 中。
③ 第三步,Broker 向 Commiter 返回一个 ACK,告诉 Commiter 已经完成 Flush 工作。
④ 第四步,Commiter 等到所有 Broker 返回的 ACK 信息之后,发起第一阶段提交并且记录到 Checkpoint Storage 中,实际上做一个 Commiter 和 CheckpointID 关联。
⑤ 最后一步,等第一阶段完成之后,发起第二阶段提交,发出一个 Commit 提交告诉 Iceberg 可以落盘。
实际生产中,常会出现一些故障。接下来介绍各种故障情况下,如何保证数据的不丢不重。
故障情况大概分为如下几类:
① Broker 故障: 比如突然宕机,其实这个故障没有太大关系,因为 Kafka 本身有 Leader 切换能力,Leader 切换到其他 Broker 之后,会在新的 Broker 拉起异步线程写 Iceberg。它会从 Checkpoint Storage 中读取上一次 Checkpoint,从上一次的 Checkpoint 恢复这些数据去重新写操作。在一次 Checkpoint 数据向 Iceberg 的数据,因为是 committer 还没有进行第二阶段提交,对于 Iceberg 来说是不可见的,可以直接丢弃这些不可见的数据。
② Controller 故障: 在第一阶段提交的时候失败,会被自动切换到别的机器上面去再起一个 Commiter 线程,会发现第一阶段还没完成,那么会重新向所有 Broker 发起一轮新的 RPC 请求,重新做一次 Checkpoint,这一次其它 Broker 在接受到 RPC 请求之后会发现不需要做 flush 操作,就会立刻返回 ACK。在收到所有 ACK 之后,会重新做一次第一阶段提交;第一阶段提交之后成功了,但是在第二阶段提交的时候失败了,那么 Controller 切换到另外的一个机器首先会去 Checkpoint Storage 中查询,如果第一阶段提交信息已经存在就会直接发起第二阶段提交工作。
③ Object Store 故障/HMS 故障: 我们会做一个无限重试,并且将一些告警信息发送出来。
三、流批统一存储应用实践
流批统一存储在公司内部落地之后,可以解决一些 Lambda 架构带来的问题,下面将从四个方面来介绍。
在流批一体之前,开发同学去检索 Kafka 数据比较复杂,如左图显示:第一步需要去申请一个 topic,按照需要写数仓作业;第二步找 DBA 申请一个 OLAP 表;第三步再去写 Flink JOB 去解析 topic 数据写到刚刚申请的 OLAP 表中,这个表纯粹是用来查询和排障,整个链路比较长。在使用流批一体之后,开发同学申请一个 Topic,然后往 Topic 中写作业,这个时候开发同学可以直接查询流批统一存储。
流批统一的存储,可作为数仓 ODS 层,建设下游链路。因为流批统一存储是 Excatly-once 语义,所以可以做到实时和离线存储完全匹配,可以避免双链路带来的数据不一致问题。
3、批量分区回刷,提升Backfill效率
结合 Flink 提供的流批统一的计算能力,同时从批存储和流存储回刷数据,极大提升回刷性能。与 Kafka 相比,批存储提供更长的数据生命周期,数据 SLA 更有保障。
利用 StarRocks 良好的湖上分析能力,充分发挥向量化引擎和 CBO 优势,在统一的计算引擎上实现多业务多维分析。例如用户行为分析、用户画像、自助报表、跨域分析等多种分析场景,都可以在一站式平台上去完成。
本网站的文章部分内容可能来源于网络和网友发布,仅供大家学习与参考,如有侵权,请联系站长进行删除处理,不代表本网站立场,转载者并注明出处:https://www.jmbhsh.com/baobaofuzhuang/36616.html