一、增量更新和处理架构
1、设计增量更新架构的背景
当前数据业务场景日趋复杂,对于时效性要求低的单一全量数据处理场景,MaxCompute可以较好地满足需求。时效性要求很高的秒级实时数据处理或者流处理,需要使用实时系统、流系统来满足需求。
但对于大部份业务场景,通常并不要求秒级数据更新可见,更多的是分钟级或者小时级的增量数据处理场景,同时也会有海量数据的批处理场景。
对于此类业务场景,使用单一引擎或联邦多引擎都会存在一些劣势。如图所示,如果使用单一的 MaxCompute 离线批量处理链路,分钟级的数据和全量数据做处理和存储,会存在冗余的计算和存储成本,时效性也不能较好地得到满足。但如果单纯使用实时系统,资源消耗成本比较高,性价比较低,在处理大规模批处理时稳定性也不足。
因此,一般综合解决方案会采用Lambda架构去支持大数据的复杂业务,全量批处理使用 MaxCompute 链路,时效性要求高的使用增量处理实时链路。但该架构也存在大家所熟知的问题,如图中所示。
2、MaxCompute近实时增量更新和处理一体化业务架构实践
如图所示,对于各种数据源,我们提供丰富的数据源接入工具,来支持近实时的增量导入和离线批量数据的导入,内部使用统一的数据存储和优化服务管理数据,统一的计算引擎支持近实时增量处理链路和大规模离线批处理的整体链路,统一的元数据服务支撑事务和文件的元数据管理。该一体化整体架构优势显著,可解决Lambda的一系列问题,如节省了冗余的数据存储成本以及不同系统间的数据迁移成本,消除了多套系统处理差异导致的数据不一致问题,相对于单独使用实时/流处理的方式,性价比更高;并且既可以满足增量处理链路的时效性,也能满足批处理的高效性。
此外,该套架构还提供了upsert、timetravel等一系列实用的功能,扩展整体的业务场景,节省用户的资源成本,并提升用户体验。
3、MaxCompute近实时增量更新和处理整体技术架构
如图所示为MaxCompute近实时增量更新和处理的整体技术架构,主要分为五个模块进行改造,包括数据接入、计算引擎、数据优化服务,元数据管理,数据文件组织层;其它部分可直接复用 MaxCompute 已有的技术架构和实现。
为了支持增量更新,我们设计了一种新的表类型-Transactional Table 2.0(简称TT2)。对于建表操作只需在普通表基础上额外设置主键primary key (PK),以及表属性 Transactional 为 true 即可,无需其他额外配置。
其中,PK用于支持 upsert功能, PK值相同的多行记录最终会 merge合并成一行,以满足主键唯一性的约束;Transactional表属性表示支持 ACID属性及事务机制。
如图所示,TT2支持多种数据文件格式,主要支持Base File和Delta File。Delta File表示每次事务写入的增量数据文件,会保持每一行数据的中间状态,用于满足近实时的增量查询需求;Base File是由Delta File进行Compaction合并生成,不会保留中间状态,采用了列式压缩存储的格式,用户支持高效的全量数据查询场景。
为了进一步优化读写效率,TT2支持按照Bucket Index 对数据进行分桶存储,bucket 数量可通过配置表属性write.bucket.num指定;分桶后,大部分数据操作如数据写入、重排、优化等,可以按照bucket粒度进行并发处理,如果对bucket数据列查询过滤,可进行Bucket级别裁剪优化,提升查询的效率。
下面将会介绍如何将数据写入TT2表,主要分为批量写入和近实时写入;这里先描述如何设计高并发的近实时增量写入场景。
我们定制开发了 Flink Connector,以及Dataworks 数据集成及其他工具,都可以实现数据的增量写入,这些工具内部会调用MaxCompute的Tunnel通道服务提供的客户端SDK,数据便可以以分钟级并发写入存储。
数据写入接口,目前仅支持upsert和delete两种格式,未来会进行扩展;upsert 包含 insert/update 两种隐含语义,如数据行不存在就代表 insert,如已存在就代表 update。commit 接口代表原子提交这段时间写入的数据,如返回成功,则写入数据查询可见;如返回失败,则数据不可见,数据需要重写或重试。该操作满足Read Commit隔离级别。
下面将介绍批量写入。批量导入主要通过 SQL 进行操作。为了方便用户操作,我们实现了整套的DML语法。SQL 引擎内核模块包括 Compiler、Optimizer、Runtime 等都做了大量改造开发来支持新架构的功能,如针对 pk 列的去重操作,runtime 构造 upsert 格式数据的写入,以及并发写入等等。此处还涉及到DML操作过程中,Meta服务需要完整的事务机制来保证读写隔离、事务冲突检测等操作。
由于TT2事务表需要支持分钟级近实时增量数据导入,有可能会产生大量小文件,导致存储访问压力大、数据读写 IO 效率低、分析效率低等问题。因此,我们开发了Clustering服务来解决小文件合并的问题,该服务由之前提到的Storage Service来执行。
由左图所示可以了解Clustering 服务的整体操作过程。在t1到t9的时间段,可见产生了大量的delta文件,Clustering会周期性地分析数据文件情况,如果满足触发条件,就会以Bucket为粒度,并发地执行合并操作,为了满足不同业务场景的需求,合并的策略比较丰富,比如根据文件大小、数量、时序相关的多个维度,并按照不同层次进行合并,此外,对于超过一定大小的文件,做了一些优化,不会对其进行合并。
TT2还会写入upsert和delete格式的数据,可能会造成中间状态的冗余记录比较多,计算成本高且处理效率低下,因此我们设计了Compaction操作,对所有记录进行merge合并,消除上述中间状态。Compaction操作由Storage Service负责执行,即支持手动触发,也可以按照时间频率自动触发。
结合上图可大概了解 Compaction 服务的整体操作流程。t1 到 t3 时间段,一些 delta files 写入进来,触发 compaction 操作,同样会以 bucket 粒度并发执行,把所有的 delta files 进行 merge,然后生成新的 base file。之后 t4 和 t6 时间段,又写入了一批新的 delta files,再触发 compaction 操作,会把当前存在的 base file 和新增的 delta files 一起做 merge 操作,重新生成一个新的 base file。该过程会迭代进行,因此base文件可以实现加速全量快照查询的目的。
此处Timetravel 查询,主要用来查询历史版本的数据,主要用于有数据历史状态回溯需求的业务场景,或数据出错时恢复历史状态数据进行数据校验等。
通过一个简单的case进行讲解,例如上面创建了一张表,包含一些pk 列和val 列。左边图展示了数据变化过程,在 t2 和 t4 时刻分别执行了compaction操作,生成了两个base文件: b1和b2。
在t1时刻,只需读取 delta file (d1) 进行输出;如果用户查询 t2 时刻,当时通过Compaction生成了b1这个base文件,只需读取 base文件并输出对应记录即可。base文件会对d1和d2两个文件合并,生成了三条记录,消除了2a这个中间记录。如查询 t3 时刻,就会包含 base file ( b1) 加上 delta file (d3) 进行Merge合并输出,后续时刻的查询过程同上,不再赘述。
因此可以看出,Timetravel会找到要查询的历史版本前最新的base文件,以及后续的delta文件,一起进行Merge输出。对于base文件主要用于提高查询效率,用户可以根据自己的业务场景选择合适的频率进行Compaction操作。由于Compaction操作本身也会占用一定的存储和计算,因此不能盲目频繁地执行。
下面的表格是一个增量查询的场景,主要用于业务的近实时增量处理链路。查询的时间范围是一个左开右闭的区间,即 begin 是一个开区间,必须大于它,end 是一个闭区间。
如 begin 是 t1-1,end 是 t1,只读取 t1 时间段对应的 delta file (d1) 进行输出,如果 end 是 t2,会读取两个 delta files (d1 和 d2);如果 begin 是 t1,end 是 t5,即查询的时间范围为 [t2, t5],会查询所有的delta文件,即d2,d3,d4,d5,进行合并输出。这便是增量查询和Timetravel查询的区别。
此外,增量查询对一些专门的场景进行优化,例如Clustering合并小文件,从语义上对已有数据记录进行合并,因此增量查询时不会作为新增的数据查询出来。
作为一个新设计的架构,MaxCompute 会尽量去覆盖HUDI / Iceberg + Spark/Presto整体数据湖解决方案的业务场景,有助于有类似业务需求的用户进行数据和业务链路迁移。此外,MaxCompute 离线近实时增量处理一体化架构还具备一些独特的亮点:
使用过SQL的人基本都对物化视图有大概了解,其实就是将逻辑视图的结果物化下来,本质上就是存储数据的物理表。其作用主要是把耗时操作的计算结果保存下来,避免重复计算,从而达到整体的查询加速的目的。MaxCompute的物化视图也经历了一系列的演进过程。一开始我们就支持了比较丰富的SQL语法功能,比如聚簇,分区等。
对于分区物化视图,类似于分区表,数据是通过分区的粒度进行存储和管理的。在实际场景中,物化视图的分区和源表的分区不一定保持一致,例如源表增加新的分区,物化视图可能还没来得及更新,或者只更新部分分区的场景。如果用户要查询指定的分区,但物化视图只存了部分历史分区数据,MaxCompute支持了分区穿透的功能来优化此场景的查询。对于物化视图存在的分区,可以从物化视图中查询,对于物化视图不存在的分区,直接从源表中穿透读取。这样就可以利用物化视图的结果,还能保证结果和源表一致。
此外最普遍的场景就是计算逻辑和物化视图表达式计算逻辑相似,语义的输出结果是物化视图的子集。为了充分利用好物化视图的结果,支持在物化视图的数据集上,对数据进一步加工,获得用户查询的结果。例如查找值大于10的数据 ,在改写后便可以直接从物化视图中直接增加一个过滤条件>10,便可以搜索出大于10的结果,避免了查询源表全量数据的过程,查询改写功能可有效提升查询性能,降低资源消耗。对于图中展示的查询改写例子比较简单,MaxCompute已支持非常丰富的复杂操作,比如aggregate、join等,只要表达式等效,或查询的结果集是物化视图的子集,能够转换成对应的表达式,都可以进行改写。
由于源表和物化视图的数据存储在不同地方。当源表发生更新,但物化视图没有更新时,SQL查询无法利用物化视图的结果。需要整体回退,查询源表。为了更好地提高查询效率,我们在语法上支持定时触发操作,在一定的时间范围内保证物化视图和源表数据基本保持一致。
2、物化视图智能推荐机制
为了使用物化视图,用户需要非常了解物化视图的概念,运行原理,以及业务情况,才能达到较好的使用效果。但很多场景中,公司业务较为复杂,个人无法从全局了解公司的业务情况,因此无法从查询最优的角度来创建高效的物化视图。此外,用户对于创建物化视图前后的资源消耗情况,也难以评估。
为了加大物化视图的使用场景和推广,降低整体物化视图的使用门槛,MaxCompute引擎支持自动化地分析用户业务历史作业的运行情况,根据合理的策略筛选出效果比较好的物化视图,上图为简单的智能物化视图机制的运行原理。首先,引擎会对所有作业进行分析,抽取出所有符合要求的子表达式, 实际策略上会尽可能选择包含aggregate和join的子表达式做物化视图,最终查询优化的整体效果会更好。
其次,会对所有符合要求的子表达式进行格式统一的归一化处理。例如将所有算子的顺序进行排布整理等,随后会对归一化符合要求的子表达式进行合并,生成一些新的公共表达式,从而扩展应用场景。
最后,对所有筛选生成的表达式的执行效果进行评估,给出哪些表达式适合作为物化视图的候选。此处需要获取物化视图计算时需要的CPU、内存、存储等信息,从而做出相对准确的对比评估。
最后,会根据公共表达式的使用频率和执行占用的资源效果,整体评估物化视图优化应用的效果,按顺序展现给所有用户候选的公共表达式列表,因此即使是小白用户,也可以无脑的选择推荐排名靠前的物化视图进行使用和验证,可大大减少资源消耗,同时可以提高用户的业务性能。
该功能在MaxCompute公共云已经上线,效果非常好,预计可以节省14%的CU资源。
对比Spark到3.0版本才支持Adaptive执行框架,MaxCompute的SQL引擎一开始的定位就是多层次和多维度的动态Adaptive执行计算优化。
以图中所述的执行聚合聚合操作的SQL为例,SQL Optimizer模块会根据Compiler解析的SQL语法树,根据静态的Table或者分区级别的Stats信息结合RBO/CBO/HBO计算出执行代价较低的执行Plan,提交给Job Master执行,调度Runtime Worker进行数据计算处理。Runtime内部也会根据上游Worker的输入数据Stats进行Plan优化调整,此外,运行中的算子会根据实时流入的数据特征,动态切换最合适的算法进行计算。
同时,Job Master也会不断收集operator、work级别统计的数据Stats,回传给Job Master做一些汇总和分析,进一步做Stage级别的动态优化调整,比如并发度的调整等。
此外,在运行时,Job Master还会把Stage级别的数据Stats回传给Optimizer,它会根据这些实时Stats对还未执行的Plan重新进行优化,然后把新的Job Plan再次提交给Job Master继续执行。
由上述流程可知,MaxCompute的SQL引擎可以自适应地根据多维度的Stats执行多层次的Adaptive优化,这可以充分发挥和协调各个模块的能力。后面将会简单介绍下对每个层次的优化实践。
假如用户执行SQL: select * from t1 join t2 where t1.a=t2.b,上图展示一种Plan级别DAG的动态调整示例。对于Join的分布式实现,主要分为Shuffle Join和Map Join两种实现,Shuffle Join如左侧的Plan A所示,左右两张表都要进行一次Shuffle操作,主要用于左右表都很大的场景。对于右侧Plan B,会把右表的所有记录挪到左表的所有map实例中,避免了左表的Shuffle操作,适用于左右表一个很大一个很小的场景。
但优化器在执行的过程中,无法感知右表的大小,所以无法事先决定采用哪种join实现。针对这种场景,我们同时生成plan A和plan B两种计划,并把它们同时传给Job Master,Job Master会先执行右表,获取到右表的数据总size后,再决定采用plan A还是plan B,在Plan B生效的场景下,相对Plan A通常可极大节省大表的shuffle开销,提升几倍的性能。
下面介绍Stage级别的动态调整的两个场景。一个典型的场景就是Stage并发度的调整。当上游Stage完成之后,会按照预先设置的并发度计算出下游实例应该处理的数据量,如图所示,便可以动态调整并发数。size较小的实例可以进行合并,size较大的实例可以进行拆分,均匀分布每个新实例的处理量,从而避免长尾和资源碰撞,使整体资源使用价值最大化。
另一个是Shuffle Join Worker动态调整的场景。在运行的过程中,如果发现有些表的数据实例发生严重倾斜,大概率会出现长尾问题,引擎会动态将其拆分为n个实例, 比如图中左表数据量为60这个实例,会被拆分成3个数据量20的实例并发执行,另外一个实例会把数据broadcast分发到左表的三个实例中并发进行Join操作,从而避免长尾,缩短整体运行时间。
下面将介绍Worker内DAG执行的动态调整,如图所示是一个Shuffle Join的实例。在开始运行时,可以根据从上游Worker获取到左右表实例的size来决定走Hash Join还是Merge Join,如果符合Hash Join的DAG执行,就可以避免大表排序时发生大量的Spill落盘的操作,节省大量的IO资源,从而可以提升运行速度。
最后,worker内部的具体某个算子其实也可以动态执行。运行时会根据实时数据特征来Adaptive选择不同的算法进行执行。对于Partial Hash Aggregator,可以根据实时的聚合效果决定是否持续进行聚合;对于排序可以先拿一些数据样本做一下预排序,根据排序效果决定采用哪种排序处理后续的数据;压缩方面,也可以根据压缩效果决定是否压缩等。
物化视图和物理表有什么区别?
物化视图本质上可以理解为也是一张物理表,只不过多了一个源表的关联信息,源表更新时,物化视图需要同步更新。另外,物化视图支持智能推荐,也可用于预计算的cache,用户无需感知物化视图存在,在查询SQL时,如果对部分计算做了cache,可以直接从cache中读取数据,来避免重复耗时的计算,在具体存储上二者并没有太大的区别。
物化视图有没有设置过期时间的考虑?
物化视图会有一个生命周期,超过生命周期,物化视图也会被删除。
Hash Join和Merge Join有什么优劣,实际场景应该如何选择?
这其实是分布式中的一个典型场景,对于Hash Join在具体实现上其实分两种,一种是我们说的Map Join,Map Join会把小表全部广播到大表侧的每一个实例上,这样大表侧就无需做数据分布,可以直接从源表中读出一部分数据,跟broadcast过来的小表做一个Hash join进行输出即可,这样可以避免大表侧的shuffle数据重排操作。
Hash Join还有另外一种场景,也就是Shuffle Join,就是大表和小表同时做shuffle。在每个具体实例上,数据可以选择走Hash Join,还是走Merge Join,二者是存在算法上的不同。Hash Join是选择一个小表构建Hash表,大表会直接通过lookup进行输出,不涉及任何排序操作,只要内存中能放下小表即可,效率比较高。对于Merge Join,左右两张表都比较大的场景,无法从内存中放下一个Hash表,可以对左右表的数据进行排序,排序完的数据通过有序的join就无需通过Hash方式,而是可以在内存中通过流式的方式去判断两个group是否为同一个key即可。
此外,还有一种场景,就是左右两边的数据本来就是有序的,比如一些Cluster表的数据。这样可以直接应用Merge Join,效果也会更高。所以本质上一是跟左右表的大小相关,另外也跟算法的效率相关。
本网站的文章部分内容可能来源于网络和网友发布,仅供大家学习与参考,如有侵权,请联系站长进行删除处理,不代表本网站立场,转载者并注明出处:https://www.jmbhsh.com/xinwenzixun/35003.html