1. 首页 > 百货 > 母婴用品

RocketMQ Apache 批处理模型演进之路

RocketMQ 的目标,是致力于打造一个消息、事件、流一体的超融合处理平台。这意味着它需要满足各个场景下各式各样的要求,而批量处理则是流计算领域对于极致吞吐量要求的经典解法,这当然也意味着 RocketMQ 也有一套属于自己风格的批处理模型。

至于什么样的批量模型才叫“属于自己风格”呢,且听我娓娓道来。

首先,既然谈 RocketMQ 的批处理模型,那就得聊聊什么是“批处理”,以及为什么批处理是极致吞吐量要求下的经典解法。在我看来,批处理是一种泛化的方法论,它处在各个系统的方方面面,无论是传统工业还是互联网,甚至在日常生活中,都能看到它的身影。

批处理的核心思想是将多个任务或数据集合在一起,进行统一处理。这种方法的优势在于可以充分利用系统资源,减少任务切换带来的开销,从而提高整体效率。比如在工业制造中,工厂通常会将相同类型的零部件批量生产,以降低生产成本和提高生产速度。在互联网领域,批处理则表现为批量数据的存储、传输和处理,以优化性能和提升系统吞吐量。

批处理在极致吞吐量需求下的应用,更加显著。例如,在大数据分析中,海量的数据需要集中处理才能得出有意义的结果。如果逐条处理数据,不仅效率低下,还可能造成系统瓶颈。通过批处理,可以将数据划分为若干批次,在预定的时间窗口内统一处理,从而提高系统的并行处理能力,提升整体吞吐量。

此外,批处理其实并不意味着牺牲延时,就比如在 CPU Cache中,对单个字节的操作无论如何时间上都是会优于多个字节,但是这样的比较并没有意义,因为延时的感知并不是无穷小的,用户常常并不关心 CPU执行一条指令需要花多长时间,而是执行完单个“任务/作业”需要多久,在宏观的概念上,反而批处理具有更低的延时。

接下来我们看看,RocketMQ 与批处理的“如胶似漆、形影相随”吧,其实在 RocketMQ 的诞生之初,就已经埋下了批处理的种子,这颗种子,我们暂且叫它——早期的批处理模型。

下图,是作为用户视角上感知比较强的老三样,分别是 Producer、Consumer、Broker:

而早期批处理模型,实际上只和 Producer、Broker 有关,在这条链路上会有批量消息的概念,当消息到达 Broker 后这个概念就会消失。

基于这点我们来看具体是怎么回事。首先批量消息的源头实际上就是 Producer 端的 Send 接口,在大部分场景下,我们发送一条消息都会使用以下的形式去操作:

SendResult send(Message msg);

非常地简明扼要,将一条消息发送到 Broker,如果我们要使用上早期的批处理模型,也只需要稍作修改:

SendResult send(Collection<Message> msgs)

可以看到,将多条消息串成一个集合,然后依旧是调用 send 接口,就可以完成早期批处理模型的使用了(从用户侧视角看就已经 ok 了),就像下图一样,两军交战,谁火力更猛高下立判~

那么真就到此为止了吗?当然不是,首先这里的集合是有讲究的,并不是随意将多条消息放在一起,就可以 send 出去的,它需要满足一些约束条件:

这些约束条件暂时先不展开,因为就如同它字面意思一样浅显易懂,但是这也意味着它的使用并不是随心所欲的,有一定的学习成本,也有一定的开发要求,使用前需要根据这些约束条件自行分类,然后再装进“大炮”中点火发射。

这里可能有人会问,这不是为难我胖虎吗?为什么要加这么多约束?是不是故意的?实际上并非如此,我们可以想象一下,假如我们是商家:

很显然,第二个场景很可能会收到快递小哥一个大大的白眼,这种事情理所应当的做不了,这也是为什么属于同一个 Collection<Message> 的消息必须要满足各种各样的约束条件了,在 Broker 实际收到一个“批量消息”时,会做以下处理:

首先它会根据这一批消息的某些属性,挑选出对应的队列,也就是上图中最底下的「p1、p2......」,在选定好队列之后,就可以进行后续的写入等操作了,这也是为什么必须要求相同 Topic,因为不同的 Topic 是没法选定同一个队列的。

接下来就到了上图所示流程,可以看到这里分别来了三个消息,分别是 《四条消息》《一条消息》《三条消息》,接下来他们会依次进入 unPack流程,这个流程有点像序列化过程,因为从客户端发送上来的消息都是内存结构的,距离实际存储在文件系统中的结构还有一些不同。在 unPack过程中,会分别解包成:四条消息、一条消息、三条消息;此时和连续 Send八条消息是没有任何区别的,也就是在这一刻,批量消息的生命周期就走到了尽头,此刻往后,“众生平等、不分你我”。

也正是这个机制,Consumer 其实并不知道 Producer发送的时候“到底是发射弓箭,还是点燃大炮”。这么做有个非常好的优点,那就是有着最高的兼容性,一切的一切好像和单条消息 Send的经典用法没有任何区别,在这种情况下,每条消息都有最高的自由度,例如各自独立的 tag、独立的 keys、唯一的 msgId等等,而基于这些所衍生出来的生态(例如消息轨迹)都是无缝衔接的。也就是说: 只需要更换发送者使用的 Send 接口,就可以获得极大的发送性能提升,而消费者端无需任何改动。

我一向用词都非常的严谨,可以看到上一段的结尾:“获得极大的发送性能提升”,至于为什么这么讲,是因为距离整体系统的提升还有一些距离,也就是这一段的标题“索引构建流水线改造”。

首先我们要有一个共识,那就是对于消息队列这种系统,整体性能上限比值“消费/生产”应该要满足至少大于等于一,因为大部分情况下,我们的生产出来的消息至少应该被消费一次(否则直接都不用 Send 了岂不美哉)。

其实在以往,发送性能没有被拔高之前,它就是整个生产到消费链路上的短板,也就是说消费速率可以轻松超过生产速率,整个过程也就非常协调。but!在使用早期批处理模型后,生产速率的大幅度提升就暴露了另外一个问题,也就是会出现消费速率跟不上生产的情况,这种情况下,去谈整个系统的性能都是“无稽之谈”。

而出现消费速率短板的原因,还要从索引构建讲起。由于消费是要找到具体的消息位置,那就必须依赖于索引,也就是说, 一条消息的索引构建完成之前,是无法被消费到的。 下图就是索引构建流程的简易图:

这是整个直接决定消费速率上限的流程。通过一个叫 ReputMessageService 的线程,顺序扫描 CommitLog文件,将其分割为一条一条的消息,再对这些消息进行校验等行为,将其转换成一条条的索引信息,并写入对应分区的 ConsumeQueue 文件。

整个过程是完全串行的,从分割消息,到转换索引,到写入文件,每一条消息都要经过这么一次流转。因为一开始是串行实现,所以改造起来也非常的自然,那就是通过流水线改造,提高它的并发度,这里面有几个需要解决的问题:

针对这几个难点,在设计中也引入了“批量处理”的思路,其实大到架构设计、小到实现细节,处处都体现了这一理念,下图就是改造后的流程:

由于 CommitLog扫描过程很难并行化处理,那就干脆不做并行化改造了,就使用单线程去顺序扫描,但是扫描的时候会进行一个简单的批处理,扫描出来的消息并不是单条的,而是尽可能凑齐一个较大的 buffer 块,默认是 4MB,这个由多条消息构成的 buffer 块我们不妨将其称为一个 batch msg。

然后就是对这些 batch msg 进行并行解析,将 batch msg 以单条消息的粒度扫描出来,并构建对应的 DispatchRequest结构,最终依次落盘到 ConsumeQueue 文件中。其中的关键点在于 batch msg 的顺序如何保证,以及DispatchRequest 在流转时怎么保证顺序和效率。为此我专门实现了一个轻量级的队列 DispatchRequestOrderlyQueue,这个 Queue 采用环状结构,可以随着顺序标号不断递进,并且能做到 “无序入队,有序出队” ,详细设计和实现均在开源 RocketMQ 仓库中,这里就不多赘述。

在经过改造后,索引构建流程不再成为扯后腿的一员,从原本眼中钉的角色美美隐身了~

经过上述索引构建流水线改造后,整个系统也就实现了最基本的批处理模型,可以在最小修改、最高兼容性的情况下让性能获得质的飞跃。

但是这并不够!因为早期的模型出于兼容性等考虑,所以依旧束手束脚的,于是 BatchCQ 模型诞生了,主要原因分为两个维度:

那 BatchCQ 又是如何改进上述的问题的呢?其实也非常地直观,那就是“见字如面”,将 ConsumeQueue 也批量化。这个模型去掉 Broker 端写入前的解包行为,索引也只进行一次构建:

// 发送端开启 AutoBatch 能力rmqProducer.setAutoBatch(true);

本网站的文章部分内容可能来源于网络和网友发布,仅供大家学习与参考,如有侵权,请联系站长进行删除处理,不代表本网站立场,转载者并注明出处:https://www.jmbhsh.com/muyingyongpin/36399.html

联系我们

QQ号:***

微信号:***

工作日:9:30-18:30,节假日休息