1. 首页 > 百货 > 其他百货

阿里近实时增量处理技术架构解析

MaxCompute作为阿里云自研的海量大数据处理平台已经有十几年的发展历史,在规模和扩展性方面一直表现比较优秀。其依托阿里云飞天分布式操作系统,能够提供快速,完全托管的EB级数据仓库及数据湖解决方案,可经济高效的处理海量数据。目前,其承担着阿里集团绝大部分离线数据存储和计算力,是阿里云产品矩阵中最重要的自研核心平台之一。

MaxCompute发展之初,主要聚焦数仓方面的大数据处理业务场景,并且处理的数据源主要为格式化数据。随着数据处理场景的多样化和业界数据湖架构的兴起,加上阿里集团内部本身数据也非常多,支持多样化数据源也就成为了一个必选项。因此MaxCompute设计了完善的外表机制,可以读取存储在外部的多种格式的数据对象,例如Hadoop开源体系,OSS半结构化或非结构化数据,为此也尽可能设计开发统一的元数据处理架构,此阶段MaxCompute在湖仓一体化解决方案中迈出了重要一步,极大的扩展了数据处理的业务场景,有效的打破数据孤岛,联动各方面的数据进行综合分析来挖掘整体数据价值。但时效性不足,通常是T+1离线场景。

随着用户数和数据规模不断增加,很多业务场景也越加复杂,需要更加完善综合的整体解决方案。其中的关键环节之一就是数据需要更加高效的流转起来,为此MaxCompute进一步设计完善开放存储和计算架构,更好的去融合生态,让数据可流畅的进得来也出得去。此外,还有一个重要的业务场景是大规模批量处理和高时效高效率增量处理一体化解决方案,为简化用户数据处理链路,节省不同系统之间的数据迁移成本以及冗余计算和存储成本,我们设计开发了MaxCompute离线和近实时增量处理的一体化架构。总体来说,现阶段以及未来会基于统一的存储、统一的元数据、统一的计算引擎有效支撑湖仓一体的整体技术架构,让数据能够开放互通高效流转,并且计算和存储成本持续优化。

二、MaxCompute近实时增量处理技术架构简介

1、MaxCompte离线&近实时增量处理业务系统架构现状

随着当前数据处理的业务场景日趋复杂,对于时效性要求低的大规模数据全量批处理的场景,直接使用MaxCompute足以很好的满足业务需求,对于时效性要求很高的秒级实时数据处理或者流处理,则需要使用实时系统或流系统来满足需求。

但其实对于大部份业务场景,并不要求秒级数据更新可见,更多的是分钟级或者小时级的增量数据处理场景,并且叠加海量数据批处理场景。

对于这类业务场景的解决方案,如果使用单一的MaxCompute离线批量处理链路,为了计算的高效性,需要将用户各种复杂的一些链路和处理逻辑转化成T+1的批次处理,链路复杂度增加,也可能产生冗余的计算和存储成本,且时效性也较差。但如果使用单一的实时系统,资源消耗的成本比较高,性价比也较低,并且大规模数据批处理的稳定性也不足。因此当前比较典型的解决方案是Lambda架构,全量批处理使用MaxCompute链路,时效性要求比较高的增量处理使用实时系统链路,但该架构也存在大家所熟知的一些固有缺陷,比如多套处理和存储引擎引发的数据不一致问题,多份数据冗余存储和计算引入的额外成本,架构复杂以及开发周期长等。

针对这些问题近几年大数据开源生态也推出了各种解决方案,最流行的就是Spark/Flink/Presto开源数据处理引擎,深度集成开源数据湖Hudi、Delta Lake和Iceberg三剑客,来综合提供解决方案,解决Lamdba架构带来的一系列问题,而MaxCompute近一年自研开发的离线近实时增量处理一体化架构,同样是为了解决这些问题而设计,不仅仅具备分钟级的增全量数据读写以及数据处理的业务需求,也能提供Upsert,Timetravel等一系列实用功能,可大幅扩展业务场景,并且有效的节省数据计算,存储和迁移成本,切实提高用户体验。下文就将介绍该技术架构的一些典型的功能和设计。

2、MaxCompute近实时增量处理技术架构

MaxCompute近实时增量处理整体架构的设计改动主要集中在五个模块:数据接入、计算引擎、数据优化服务,元数据管理,数据文件组织。其他部份直接复用MaxCompute已有的架构和计算流程,比如数据的分布式存储直接集成了阿里云基础设施盘古服务。

1、统一的数据文件组织格式

要支持全量和增量处理一体化架构首先需要设计统一的表类型以及对应的数据组织格式,这里称为TransactionalTable2.0,简称TT2,基本可以支持普通表的所有功能,同时支持增量处理链路的新场景,包括timetravel查询、upsert操作等。

TT2要生效只需要在创建普通表时额外设置主键primary key(PK),以及表属性transactional为true即可。PK列用于支持Upsert链路功能,PK值相同的多行记录在查询或者Compaction会merge成一行数据,只保留最新状态。transactional属性则代表支持ACID事务机制,满足读写快照隔离,并且每行数据会绑定事务属性,比如事务timestamp,用来支持timetravel查询,过滤出正确数据版本的记录。此外TT2的tblproperties还可以设置其他的一些可选的表属性,比如write.bucket.num用来配置数据写入的并发度,acid.data.retain.hours用来配置历史数据的有效查询时间范围等。

TT2表数据文件存在多种组织格式用来支持丰富的读写场景。其中base file数据文件不保留Update/Delete中间状态,用来支撑全量批处理的读写效率,delta file增量数据文件会保存每行数据的中间状态,用于满足近实时增量读写需求。

为了进一步优化读写效率,TT2支持按照BucketIndex对数据进行切分存储,BucketIndex数据列默认复用PK列,bucket数量可通过配置表属性write.bucket.num指定,数据写入的高并发可通过bucket数量水平扩展,并且查询时,如果过滤条件为PK列,也可有效的进行Bucket裁剪查询优化。数据文件也可按照PK列进行排序,可有效提升MergeSort的效率,并有助于DataSkipping查询优化。数据文件会按照列式压缩存储,可有效减少存储的数据量,节省成本,也可有效的提升IO读写效率。

前面介绍了统一的数据组织格式,接下来需要考虑数据如何高效写入TT2。

数据流入主要分成近实时增量写入和批量写入两种场景。这里先描述如何设计高并发的近实时增量写入场景。用户的数据源丰富多样,可能存在数据库,日志系统或者其他消息队列等系统中,为了方便用户迁移数据写入TT2, MaxCompute定制开发了Flink Connector、Dataworks数据集成以及其它开源工具,并且针对TT2表做了很多专门的设计开发优化。这些工具内部会集成MaxCompute数据通道服务Tunnel提供的客户端SDK,支持分钟级高并发写入数据到Tunnel Server,由它高并发把数据写入到每个Bucket的数据文件中。

写入并发度可通过前面提及的表属性write.bucket.num来配置,因此写入速度可水平扩展。对同一张表或分区的数据,写入数据会按pk值对数据进行切分,相同pk值会落在同一个bucket桶中。此外,数据分桶的好处还有利于数据优化管理操作例如小文件clustering,compaction等都可以桶的粒度来并发计算,提高执行效率。分桶对于查询优化也非常有好处,可支持bucket裁剪、shuffle move等查询优化操作。

Tunnel SDK提供的数据写入接口目前支持upsert和delete两种数据格式,upsert包含insert / update两种隐含语义,如数据行不存在就代表insert,如已存在就代表update。commit接口代表原子提交这段时间写入的数据如返回成功就代表写入数据查询可见,满足读写快照隔离级别,如返回失败,数据需要重新写入。

批量导入主要通过SQL进行操作。为了方便用户操作,实现了操作TT2所有的DDL / DML语法。SQL引擎内核模块包括Compiler、Optimizer、Runtime等都做了大量改造开发以支持相关功能,包括特定语法的解析,特定算子的Planner优化,针对pk列的去重逻辑,以及runtime构造Upsert格式数据写入等。数据计算写入完成之后,会由Meta Service来原子性更新Meta信息,此外,也做了大量改造来支持完整的事务机制保证读写隔离、事务冲突检测等等。

由于TT2本身支持分钟级近实时增量数据导入,高流量场景下可能会导致增量小文件数量膨胀,从而引发存储访问压力大、成本高,并且大量的小文件还会引发meta更新以及分析执行慢,数据读写IO效率低下等问题,因此需要设计合理的小文件合并服务, 即Clustering服务来自动优化此类场景。

Clustering服务主要由MaxCompute 内部的Storage Service来负责执行,专门解决小文件合并的问题,需要注意的是,它并不会改变任何数据的历史中间状态,即不会消除数据的Update/Delete中间状态。

结合上图可大概了解Clustering服务的整体操作流程。Clustering策略制定主要根据一些典型的读写业务场景而设计,会周期性的根据数据文件大小,数量等多个维度来综合评估,进行分层次的合并。Level0到Level1主要针对原始写入的Delta小文件(图中蓝色数据文件)合并为中等大小的Delta文件(图中黄色数据文件),当中等大小的Delta文件达到一定规模后,会进一步触发Level1到Level2的合并,生成更大的Delta文件(图中橙色数据文件)。

对于一些超过一定大小的数据文件会进行专门的隔离处理,不会触发进一步合并,避免不必要的读写放大问题,如图中Bucket3的T8数据文件。超过一定时间跨度的文件也不会合并,因为时间跨度太大的数据合并在一起的话,当TimeTravel或者增量查询时,可能会读取大量不属于此次查询时间范围的历史数据,造成不必要的读放大问题。

由于数据是按照BucketIndex来切分存储的,因此Clustering服务会以bucket粒度来并发执行,大幅缩短整体运行时间。

Clustering服务需要和Meta Service进行交互,获取需要执行此操作的表或分区的列表,执行结束之后,会把新老数据文件的信息传入Meta Service,它负责Clustering操作的事务冲突检测,新老文件meta信息原子更新、老的数据文件回收等。

Clustering服务可以很好的解决大文件数量膨胀引发的一系列效率低下的读写问题,但不是频率越高越好,执行一次也会消耗计算和IO资源,至少数据都要全部读写一遍,存在一定的读写放大问题。因此执行策略的选择尤其重要,所以目前暂时不会开放给用户手动执行,而是引擎根据系统状态智能自动触发执行,可保障Clustering服务执行的高效率。

除了小文件膨胀问题需要解决外,依然还有一些典型场景存在其它问题。TT2支持update、delete格式的数据写入,如果存在大量此格式的数据写入,会造成中间状态的冗余记录太多,引发存储和计算成本增加,查询效率低下等问题。因此需要设计合理的数据文件compaction服务优化此类场景。

Compaction服务主要由MaxCompute 内部的Storage Service来负责执行,既支持用户手动执行SQL语句触发、也可通过配置表属性按照时间频率、Commit次数等维度自动触发。此服务会把选中的数据文件,包含base file和delta file,一起进行Merge,消除数据的Update / Delete中间状态,PK值相同的多行记录只保留最新状态的一行记录,最后生成新的只包含Insert格式的base file。

结合上图可大概了解Compaction服务的整体操作流程。t1到t3时间段,一些delta files写入进来,触发compaction操作,同样会以bucket粒度并发执行,把所有的delta files进行merge,然后生成新的base file。之后t4和t6时间段,又写入了一批新的delta files,再触发compaction操作,会把当前存在的base file和新增的delta files一起做merge操作,重新生成一个新的base file。

Compaction服务也需要和Meta Service进行交互,流程和Clustering类似,获取需要执行此操作的表或分区的列表,执行结束之后,会把新老数据文件的信息传入Meta Service,它负责Compaction操作的事务冲突检测,新老文件meta信息原子更新、老的数据文件回收等。

Compaction服务通过消除数据中间历史状态,可节省计算和存储成本,极大加速全量快照查询场景的效率,但也不是频率越高越好,首先执行一次也要读取一遍全量数据进行Merge,极大消耗计算和IO资源,并且生成的新base file也会占据额外的存储成本,而老的delta file文件可能需要用于支持timetravel查询,因此不能很快删除,依然会有存储成本,所以Compaction操作需要用户根据自己的业务场景和数据特征来合理选择执行的频率,通常来说,对于Update / Delete格式的记录较多,并且全量查询次数也较多的场景,可以适当增加compaction的频率来加速查询。

以上主要介绍了典型的数据更新操作,而它们的事务并发管理都会统一由Meta Service进行控制。

上面表格详细展示了各个具体操作并发执行的事物冲突规则。Meta服务采用了经典的MVCC模型来满足读写快照隔离,采用OCC模型进行乐观事务并发控制。对于一些高频的操作单独设计优化了事务冲突检测和重试机制,如clustering操作和insert into 并发执行,即使事务Start和Commit时间出现交叉也不会冲突失败,都能成功执行,即使在原子提交Meta信息更新时出现小概率失败也可在Meta层面进行事务重试,代价很低,不需要数据重新计算和读写。

此外,各种数据文件信息以及快照版本也需要有效的管理,其中包含数据版本、统计信息、历史数据、生命周期等等。对于TimeTravel和增量查询,Meta层面专门进行了设计开发优化,支持高效的查询历史版本和文件信息。

基于TT2,计算引擎可高效支持典型的业务场景TimeTravel查询,即查询历史版本的数据,可用于回溯历史状态的业务数据,或数据出错时,用来恢复历史状态数据进行数据纠正,当然也支持直接使用restore操作恢复到指定的历史版本。

对于TimeTravel查询,会首先找到要查询的历史数据版本之前最近的base file,再查找后面的delta files,进行合并输出,其中base file可以用来加速查询读取效率。

这里结合上图进一步描述一些具体的数据查询场景。比如创建一TT2表,schema包含一个pk列和一个val列。左边图展示了数据变化过程,在t2和t4时刻分别执行了compaction操作,生成了两个base file: b1和b2。b1中已经消除了历史中间状态记录(2,a),只保留最新状态的记录 (2,b)。

如查询t1时刻的历史数据,只需读取delta file (d1)进行输出; 如查询t2时刻,只需读取base file (b1) 输出其三条记录。如查询t3时刻,就会包含base file ( b1)加上delta file (d3)进行合并输出,可依此类推其他时刻的查询。

可见,base文件虽可用来加速查询,但需要触发较重的compaction操作,用户需要结合自己的业务场景选择合适的触发策略。

TimeTravel可根据timestamp和version两种版本形态进行查询,除了直接指定一些常量和常用函数外,我们还额外开发了get_latest_timestamp和get_latest_version两个函数,第二个参数代表它是最近第几次commit,方便用户获取我们内部的数据版本进行精准查询,提升用户体验。

此外,SQL增量查询也是重点设计开发的场景,主要用于一些业务的近实时增量处理链路,新增SQL语法采用between and关键字,查询的时间范围是左开右闭,即begin是一个开区间,必须大于它,end是一个闭区间。

增量查询不会读取任何base file,只会读取指定时间区间内的所有delta files,按照指定的策略进行Merge输出。

通过上诉表格可进一步了解细节,如begin是t1-1,end是t1,只读取t1时间段对应的delta file (d1)进行输出, 如果end是t2,会读取两个delta files (d1和d2);如果begin是t1,end是t2-1,即查询的时间范围为(t1, t2),这个时间段是没有任何增量数据插入的,会返回空行。

对于Clustering和Compaction操作也会产生新的数据文件,但并没有增加新的逻辑数据行,因此这些新文件都不会作为新增数据的语义,增量查询做了专门设计优化,会剔除掉这些文件,也比较贴合用户使用场景。

由于Timetravel和增量查询都会查询数据的历史状态,因此需要保存一定的时间,可通过表属性acid.data.retain.hours来配置保留的时间范围。如果历史状态数据存在的时间早于配置值,系统会开始自动回收清理,一旦清理完成,TimeTravel就查询不到对应的历史状态了。回收的数据主要包含操作日志和数据文件两部分。

同时,也会提供purge命令,用于特殊场景下手动触发强制清除历史数据。

10、数据接入生态集成现状

初期上线支持接入TT2的工具主要包括:

对其它一些接入工具,比如Kafka等,后续也在陆续规划支持中。

作为一个新设计的架构,我们会尽量去覆盖开源数据湖(HUDI / Iceberg)的一些通用功能,有助于类似业务场景的用户进行数据和业务链路迁移。此外,MaxCompute离线 &近实时增量处理一体化架构还具备一些独特的亮点:

四、应用实践与未来规划

1、离线 &近实时增量处理一体化业务架构实践

基于新架构,MaxCompute可重新构建离线 &近实时增量处理一体化的业务架构,即可以解决大部分的Lambda架构的痛点,也能节省使用单一离线或者实时系统架构带来的一些不可避免的计算和存储成本。各种数据源可以方便的通过丰富的接入工具实现增量和离线批量导入,由统一的存储和数据管理服务自动优化数据编排,使用统一的计算引擎支持近实时增量处理链路和大规模离线批量处理链路,而且由统一的元数据服务支持事务和文件元数据管理。它带来的优势非常显著,可有效避免纯离线系统处理增量数据导致的冗余计算和存储,也能解决纯实时系统高昂的资源消耗成本,也可消除多套系统的不一致问题和减少冗余多份存储成本以及系统间的数据迁移成本,其他的优势可以参考上图,就不一一列举了。总体而言,就是使用一套架构既可以满足增量处理链路的计算存储优化以及分钟级的时效性,又能保证批处理的整体高效性,还能有效节省资源使用成本。

最后再看一下未来一年内的规划:

Q1:Bucket数量的设置与commit间隔以及compaction间隔设置的最佳推荐是什么?

A1:Bucket数量与导入的数据量相关,数据量越大,建议设置的bucket数量多一些,在批量导入的场景,推荐每个bucket的数据量不要超过1G,在近实时增量导入场景,也要根据Tunnel的可用资源以及QPS流量情况来决定bucket数量。对于commit的间隔虽然支持分钟级数据可见,但如果数据规模较大,bucket数量较多,我们推荐间隔最好在五分钟以上,也需要考虑结合 Flink Connector的checkpoint机制来联动设置commit频率,以支持Exactly Once语义,流量不大的话,5~10分钟间隔是推荐值。Compaction间隔跟业务场景相关,它有很大的计算成本,也会引入额外的base file存储成本,如果对查询效率要求比较高且比较频繁,compaction需要考虑设置合理的频率,如果不设置,随着delta files和update记录的不断增加,查询效率会越来越差。

Q2:会不会因为commit太快,compaction跟不上?

A2:Commit频率和Compaction频率没有直接关系,Compaction会读取全量数据,所以频率要低一些,至少小时或者天级别,而Commit写入增量数据的频率是比较快的,通常是分钟级别。

Q3:是否需要专门的增量计算优化器?

A3:这个问题很好,确实需要有一些特定的优化规则,目前只是复用我们现有的SQL优化器,后续会持续规划针对一些特殊的场景进行增量计算的设计优化。

Q4:刚刚说会在一两个月邀测MaxCompute新架构,让大家去咨询。是全部替换为新的架构还是上线一部分的新架构去做些尝试,是要让用户去选择吗?还是怎样?

A4:新技术架构对用户来说是透明的,用户可以通过MaxCompute无缝接入使用,只需要创建新类型的表即可。针对有这个需求的新业务或者之前处理链路性价比不高的老业务,可以考虑慢慢切换到这条新链路尝试使用。

本网站的文章部分内容可能来源于网络和网友发布,仅供大家学习与参考,如有侵权,请联系站长进行删除处理,不代表本网站立场,转载者并注明出处:https://www.jmbhsh.com/qitabaihuo/36629.html

联系我们

QQ号:***

微信号:***

工作日:9:30-18:30,节假日休息