什么是 Apache Doris?简单来说,Doris 是一款基于 MPP 架构的高性能实时的分析型数据库。
下图是 Doris 的发展历程。最早可以追溯到 2013 年。
它是百度内部自研的一个多维分析平台。经过了几年在百度内部大规模的应用和实践,2017 年的时候,正式开源到 Github 上。在 2018 年 Doris 进入到 Apache 孵化器,在孵化的过程中,不断发展社区,培养用户和开发者。到目前为止,在孵化期内发布了七个重要的版本,每月的活跃开发者接近一百位。在 2022 年,Doris 从 Apache 孵化器毕业,成为一个顶级项目。成为顶级项目之后,我们也快速的推动社区的发展。在 2022 年 12 月份发布了 Doris 1.2.0 版本。这一版本中有很多重要功能更新。其中很重要的一部分就是对数据服务能力的优化和支持。这也是本次分享的重点。
下图中可以看到 Doris 在整个数据流中的定位:
它的上游有一些 OLTP 系统,日志系统,埋点系统。经过一些流处理或者说批处理,比如 Sparquet,Hive,Flink 等等。经过加工和处理之后,会把处理后的结构化数据存储在 Doris 中。Doris 本身是一个拥有完备 MPP 架构的数据仓库。它可以直接对外提供报表分析、信息查询,以及数据库分析等功能。同时它也可以作为一个 SQL 引擎,对外部的数据源进行查询加速,包括 Hive ,Iceberg,Hudi 等等。也支持 MySQL,ElectricSearch 等外部数据源。同时,也提供了官方的 Flink Connector 和 Spark Connector。用户可以通过这两类 Connector,方便的去把 Doris 存储中的数据和其它数据源的数据进行联邦分析查询,保证 Doris 最终的数据不会成为数据孤岛。
这就是 Doris 在整个数据流中的定位,以及它是如何在企业数据流中发挥价值的。
接下来进入本文的重点,也就是 Doris 的数据湖分析内幕。
先来讲一下什么叫 数据湖分析 。其实 Doris 本身是一套完备的数据库管理系统,包括查询层和存储层。在我们正常使用 Doris 的时候,只需要把数据灌入到 Doris 中来,就可以在 Doris 内部对数据进行管理、存储和查询,我们叫做内置数据存储(Internal Storage)或者说自管理的数据存储(Self-Managed Storage)。在实际业务应用中,还会有大量数据存储在外部数据源中的,比如可能有很多历史数据,本身已经存在 Hive 系统中,或者是最近比较火的 Iceberg、Hudi 等数据湖格式中。如果用户要把这些系统中的数据通过导入操作导入到 Doris 中来,代价是非常大的,因为数据量可能是 TB 甚至 PB 级别的。把这些数据进行一次清理加工的计算量和存储开销都是非常大的。所以很多用户希望借助 Doris 的高速查询能力,直接对外部数据源的数据进行分析。这也是 Doris 数据湖分析、或者外部数据源加速分析的一个初衷。
在早期的 1.0.0 版本中,Doris 就已经支持了对外部数据源的一些访问,比如对 Hive 外表的创建,对 Iceberg 外表的创建,或者对 MySQL 外表的创建。但是在老版本中创建这些外部数据源的映射时,只能通过表级别的映射。这会带来一个问题,比如这些表可能多达几十上百张,甚至是上千张,如果采用这种方式,用户需要对每一张表通过 create external table 这种方式去单独地建立一个映射关系,这是一个很费时费力的工作。
所以在 Doris 的新版本中,通过引入 Catalog 的概念,简化这一操作,让用户可以通过一行命令就能快速开始对外部数据进行分析。
Catalog 是标准 SQL 定义中的三个层级之一,就是 Catalog-Database-Table。我们将 Catalog 分为两大类,一类是 Internal Catalog,另一类是 External Catalog。其中Internal Catalog是管理 Doris 的内部表。External Catalog可以直接映射到一个数据源。比如一个 Hive 集群、 ES 集群、 MySQL 数据库等等。通过数据源映射,Doris 内部会自动的把外部的>
下图是新版本中架构变动的 Metadata 全景图:
ernal Catalog 现已支持几种主要数据源和元数据中心。第一类就是 Hive Metastore 或者是兼容 Hive Metastore 的元数据中心。比如云上的 AWS Glue、阿里云的>第二个架构变动就是数据访问上的功能统一。
Doris 是一个极速 OLAP 数据库,拥有性能优异的分布式查询引擎。我们希望对外部数据源的查询加速,能够充分利用现有查询引擎的优势。在查询引擎中,上层计算节点的算子的优化,比如 join 的优化,聚合节点的优化、scan 调度的优化等等,与数据源本身是无关的,它本身是查询层的一些优化。所以我们对查询层进行了代码结构的重构,把一些公共部分提取出来,这些公共部分可以帮助我们去利用 Doris 完备的极速向量化引擎、基于代价的查询优化器、以及各类算子的优化。同时,也重构了底层的 scan 任务的调度,如 scan 的并发度、CPU 时间分片的调度等,以确保这些功能能够被内部数据和外部数据源共同使用。
在做完这些架构调整之后,对于外表查询或者数据湖上的数据查询,开发者只需要关注数据源自身的一些访问特性。比如对于 Hive 表查询,我们可以实现一个 FileScanNode 的,专注于对远端存储上的文件的扫描优化。对于特定的数据格式,我们只需要实现对于特定数据格式的 format reader。如此一来,开发者在接入一个数据源时,只需专注于处理数据,扫描相关的一些优化和数据源访问的一些特性的优化,而不需要去关心整个查询层的优化措施。通过这个架构调整,对一个新的数据源接入,只需要大概一周的时间就可以完成,同时可以利用到所有已经存在的优化能力去加速数据源的查询。
接下来看一下数据源访问的整体流程:
熟悉 Doris 的同学都知道,Doris 分为两个部分:FE 节点和 BE 节点。FE 节点是 java 写的,主要负责用户请求的接入,元数据管理,查询查询计划生成;BE 节点是 C++ 写的,负责数据的存储和查询计划的执行,它是一个高性能的分布式查询执行进程。
以 Hive 为例,当我们通过 Doris 去查询一张 Hive 表的时候,首先用户请求进入到 FE,第一步是通过 Hive Metastore 去获取 table 的 schema,接着获取 partition。获取到 partition ,FE 会根据 SQL 中的分区的值的谓词条件对分区进行裁剪,得到最终的分区列表。拿到分区列表,再去访问 Hive Metastore 去获取分区所对应的文件列表。获取到文件列表,第五步就是在 FE 中对文件扫描任务进行拆分,均匀分布到所有的 BE 节点上,保证一个查询任务,可以充分的利用整个集群的计算资源进行数据查询。任务分配完会下发给 BE,BE 的逻辑就比较简单,只需要对指定文件进行扫描、过滤和读取。第七步,它会直接去访问 HDFS 或者 S3 上的数据,进行数据的读取。接下来上层会有一些中文节点,agg 节点,join 节点等等的一些查询执行。最终把它的结果返回给用户。这就是 Doris 通过 Hive Metastore 去查询 Hive 外表的整体流程。
接下来介绍一下在整个流程中 Doris 有哪些优化。
第一点优化就是对元数据和数据访问的优化。一些表的元数据信息是非常大的,比如一张 Hive 表可能有十万个分区,如果把十万个分区信息在一开始的时候就全都同步到的 FE 节点,对 FE 节点内存压力会非常大。因为 Doris 中所有的元数据都是在内存存放的,如果把这些外部数据源的信息全都一次性同步过来的话,FE 的元数据压力会非常大。所以我们在 FE 上做了多种类型的 cache。
第一种是schema cache,对于外表的所有列信息的 cache。这些 cache 全都是按需加载的。比如我们有一千张表,只需要访问到其中的一张表的时候,我们才会把这张表的 schema 缓存到 FE 的缓存集中。这样可以保证内存中只有需要用到的 schema。
第二个是partition value cache。当查询一个外表时需要对分区进行裁剪。分区裁剪只需使用分区值。所以我们单独实现了一个 partition value cache 专门去缓存分区值的信息,用于分区裁剪。分区值的内存空间占用是非常少的。通过分区值裁剪,可以得到最终需要访问的分区列表。
当得到分区列表,就进入到第三步,即需要访问 partition cache 去拿到完整的分区信息。拿到这些信息,我们就来到第四步,就是 file cache。一个分区下面会有多个文件。我们拿到了分区的位置信息,就可以通过 file cache,去获取这个分区下的所有的文件的信息(文件路径)。拿到 文件路径后,我们在 FE 中会做任务的拆分。最终会把这些文件列表拆分,发给 BE。
BE 节点负责文件的读取和访问。这里我们也实现了两个大类的缓存的优化。
第一个是数据预读(prefetch buffer),在访问 HDFS 和 S3 时,本质是一个 RPC 请求。第一个优化点就是如何能够尽量减少 RPC 的开销。一次 RPC 开销本身的 overhead 很重。所以我们增加了一个预取缓存,把多个小的 Remote IO 请求合并成一个大的 IO 请求,把原先可能几个字节的请求,合并成 4KB 到 1MB 的数据请求一次性读取过来,在本地内存中形成一个缓存。后续的 IO 可以直接在内存缓存中去获取数据,极大的减少 remote IO 的次数。
第二个是文件块级别的缓存(file block cache)。在访问 HDFS 或者 S3 上的数据文件时,可能只需访问其中的一小部分。比如一个列存格式的 parquet 文件,如果只需要访问其中的三列数据,那么只会读取整个文件的一部分,Doris 会在第一次文件读取时,将读取的文件块缓存到本地磁盘。缓存文件的文件 名是文件的路径,加上读取偏移量的组合标识。之后如果有相同的文件访问,会先查看本地是否已经有缓存的数据文件。如果有,则直接去读本地文件,减少访问远端数据的开销。通过 file block cache,可以极大地提升一些热数据的访问效率。
第二个优化点就是native 的 file format reader。以 parquet 为例,在老版本的 Doris,是通过 Apache arrow 库内置的 parquet reader 完成读取的。这个 Reader 的实现会有一些额外的开销。比如它会多一层内存格式的转换。因为它在读取的时候,首先需要把远端的文件转换成内部的 arrow 的格式。然后再把 arrow 的格式转换成 Doris 的内部内存格式。第二是 Apache arrow 内置的 parquet reader 对一些新的 Parquet 特性是不支持的,比如不支持 page index、不支持 parquet 的 bloom filter 的读取、不支持这种更精细的字典编码的优化等等。
基于以上考虑,我们在 Doris 内部实现了一个 native 的 C++ 的 parquet reader。首先是能直接转换内部存储格式,对于读取到的数据,直接转为内部内存格式,减少一次内存格式的拷贝和转换开销;第二点,我们能够利用 bloom filter 进行更精确的数据过滤。用户写数据的时候,对某一列使用的 bloom filter,可以利用 bloom filter 去对数据进行过滤。其次我们也支持了基于字典编码的谓词过滤,可以把谓词下推到 parquet reader 中。把谓词中的,比如 “a=‘北京’” 这样的一个条件改成一个对应字典编码的值。比如 “a=‘100’”,后续用 ‘100’ 在文件内部进行数据过滤。因为整数型的过滤效果是比字符串的过滤效果要好很多的。过滤完了,在最终返回结果的时候,我们再把字典编码值转换成真正的数据的值。这样来达到加速的效果。
最后一点也是非常重要的一点,就是在 Parquet Reader 上支持了延迟物化。延迟物化是访问远端存储的时候,减少 IO 的一个非常重要的特性。尤其是在带谓词条件的宽表查询上。简单来说,Doris 会优先读取带谓词条件的列。读取完这些列,我们先对这些列进行过滤得到最终过滤后的行号集合,再去读取剩余的其他的列。这样就能保证剩余其他列都是只会去读取过滤后的数据。从而极大地减少从远端去读取数据的 IO 开销。
第三点 就是弹性计算节点(compute node)。当我们去访问外部数据源的时候,Doris 本身是不会去存储这些数据的,所以不需要 BE 节点本身的存储能力。一旦我们不再需要 BE 的存储能力,它就变成了一个无状态的节点。当一个节点是有状态的,删除节点或者添加节点时都要考虑到数据如何安全下线,数据如何迁移,重新 rebalance。而一个无状态节点可以非常方便的进行弹性扩缩容。所以我们在新的版本中给 BE 节点增加了两种类型:
第一种类型是 mix node,mix 就是标准的 BE 类型。既支持数据计算,也支持本地的文件存储;第二种类型叫 compute node,即计算节点,计算节点可以很方便的进行弹性伸缩。比如可以很快速地在新机器或者云上创建一些新的 compute node。这些 compute node 可以分担访问远端存储的一些计算的开销。通过这种无状态的 BE 节点,可以快速去承接外部数据源的计算负载。来达到弹性伸缩的目的。
下图是我们在版本发布之初做的一个测试。
可以看到在同一资源规格下,我们去查询 Iceberg TPCH 100G 的数据集。相比 Trino,Doris 有三到五倍的性能提升。
最后看一下当前 Doris 对数据湖的一些功能的支持程度:
is 支持三个主流的外部数据服务或者数据仓库。第一个就是Hive,支持 Managed table 和 External table。支持 parquet、orc 和 text 格式的读取。Iceberg完整的支持 V1 Format,V2 支持了 position delete。Hudi暂时只支持 MOR 的表,包括 COW Snapshot Query 以及 MOR Read Optimized Query。
三、Doris 社区发展以及后期开发规划
最后介绍一下我们在数据湖分析这块的一些规划。
第一个规划就是增量数据访问。增量数据也是 Iceberg,Hudi 这类新兴的数据库系统所提供的核心价值之一。它可以应用于 A/B Test,或者是用其 Time Travel 的能力、CDC 的能力来进行增量数据的处理和访问。Doris 在后续也要对这一类的功能进行支持。其次就是基于增量数据的视图查询。比如我们会通过基于增量数据的多表的物化视图的更新,或者逻辑视图的权限控制等等,来帮助用户很好的去管理数据湖上的数据,并且能够对数据进行很精细的访问。
第二点就是数据湖写入能力。刚才我们介绍这些功能时候,其实都是在介绍如何去访问和读取这些外部数据源的能力。如果用户想完整的访问管理这些数据源的话,必须在外部对接例如 Spark 这些系统进行数据写入。所以我们后续希望在 Doris 内部提供统一的操作入口,来消除用户操作数据的割裂感,来保证对数据库的写入操作和查询操作,都可以统一在 Doris 中完成。
最后一点是深入集成 Iceberg 的能力。希望以 Doris 本身作为 Iceberg 的元数据中心来提供托管 Iceberg 的能力,提升自身对于数据湖,或者说是对结构化、半结构化大规模数据的管理能力。
以上就是对 Doris 数据湖的一些介绍。最后简单介绍一下 Doris 社区现状和未来规划。
Apache Doris 是国内最活跃的开源社区之一。
累计贡献者人数已经超过了四百位,平均每月的活跃用户贡献者人数也超过了一百人。可以看到我们每个月所提交的 commit 量和 push 量都是相当可观的,发展也是非常快速的。也欢迎对分布式数据库或者对 MPP、OLAP 数据库感兴趣的同学加入到社区中来,我们可以一起去完善这样的一个数据库系统。
下图是 Doris 在 2022 年底到 2023 年初的一个大致规划:
首先我们在 2022 年的 Q4 季度,发布了 1.2.0 版本。在该版本中,实现了多元数据目录,其中就包括数据分析这部分的一些能力;其次我们还加入了半结构化数据的一些支持,包括 Array 和 Binary Json 格式的支持;我们也支持了新的 unique 模型,可以帮助用户在可变更的或者可更新的数据中依然能进行快速的数据访问;其次我们还支持了包括 JDBC 的 External table,以及 Java UDF 等一些新的特性。欢迎大家到官网去体验。
在 2023 年 Q1,我们会发布新的优化器的 preview 版本。新的优化器完全重构了现有的优化器的框架。实现了一个可插拔可扩展 CPU 的查询优化器。可以帮助用户解决复杂 SQL 的获取 best plan 的问题;其次我们也会发布 Pipeline 执行引擎的 preview 版本,使 Doris 能够更细力度的去规划 BE 节点的执行资源。保证 BE 节点可以充分利用我们的单机多核的特性,并且用户不再需要手动去设置查询并发度等等。比如在闲时,利用更多的 CPU 资源;在忙时,可以进行大小查询,这种动态的资源隔离。前文提到的 compute node,在 Q1 季度会发布完整的 release 版本。还有 Vertical conduction,解决大宽表场景下的后台数据merge的内存开销问题。
在 2023 年 Q2,会发布 2.0.0 版本。除了刚才提到的优化器和 Pipeline 达到 GA 状态以外,还会有一些新的特性,比如半结构化数据的一些查询,存算分离的一些架构演进等等。希望在未来的一年能够继续给我们的用户提供更便捷、统一的分析型数据库。
Q1:Doris 通过连接外部的 Hive,能不能自动监控 Hive 的表结构或数据的变化?
A1:现在提供了手动的 refresh,可以手动 refresh Catalog 级别,DB 级别,table 级别和 partition 级别。最新的 1.2.2 版本,也支持通过 Hive Metastore 的 Hook 机制来自动监听 Hive 的元数据变动,达到一个增量的元数据同步的效果。
Q2:Doris 和 Flink 的对接方式推荐哪种?
A2:建议用 Doris 官方提供的 Flink connector。在我们的官方库上可以找到对应的代码库下载链接和发布版本。
Q3:Doris 读对象存储数据湖对性能和时延的影响会怎么样?
A3:在之前也讲了 Doris 的一些优化点,包括它的 read,消除小的 IO,本地的 file block cache 等等。做这些功能的出发点都是为了尽量避免访问远端存储,以及避免大量的小 IO 远端访问。我们做这些优化的初衷,都是为了能够尽量的去把大块数据一次性的读过来,然后在本地进行处理。据我们的测试情况来看,经过我们的这些优化,Doris 对热数据的访问,几乎是可以达到和本地表一样的访问性能。
Q4:Doris 怎么处理高并发的请求?
A4:关于高并发请求,可以分为两部分,第一部分要解决单一查询所占用的资源开销的问题。比如一个查询需要发送到一百台机器去查询,它的扇出特别大,并发是肯定很低的。所以我们会通过分区裁剪,分桶裁剪等,尽量把一个查询限制在某一台机器上,甚至是某一个数据分片上。这样单个查询的资源开销足够的小,那整个集群的整体的并发支持就会很高。第二,如果是在数据湖上的这种高频发查询的话,其实本质上还是会回归到关于远端存储 IO 的一些问题上来。也就是尽量去减少小 IO 的远端查询或者通过缓存来解决热数据查询,因为 remote IO 的 overhead 是没法彻底根除的。它跟远端存储的网络,访问的特性都有关系。所以说本质上还是要通过一些 cache 和 buffer 特性来去消除这些远端存储的 IO 的次数以达到一个高并
本网站的文章部分内容可能来源于网络和网友发布,仅供大家学习与参考,如有侵权,请联系站长进行删除处理,不代表本网站立场,转载者并注明出处:https://www.jmbhsh.com/shenghuokepu/36632.html