消费者在消费消息的时候,需要知道从Broker的哪一个消息队列中去获取消息。
所以,在消费者端必须要做负载均衡,即Broker端中多个消费队列分配给同一个消费者组中的哪些消费者消费。
在RocketMQ中,在消费者端有一个:Rebalance负载均衡组件。
消费者负载均衡
指为消费组下的每个消费者分配订阅主题下的消费队列,分配了消费队列消费者就可以知道去消费哪个消费队列上面的消息。
而集群模式一个消息队列同一时间只能分配给组内的一个消费者进行消费。
RocketMQ5.0以前是按照队列粒度进行负载均衡的,5.0提供了按消息粒度进行负载均衡。
队列粒度负载均衡
队列粒度负载均衡策略中,同一消费者组内的多个消费者将按照队列粒度消费消息,每个队列只能被其中一个消费者消费。
队列粒度负载均衡是在每个消费者端进行的,并不是由某个节点统一进行负载均衡之后将分配结果通知到每个消费者。
消费者增加或者减少会影响消息队列的分配,所以Broker需要感知消费者的上下线情况。
消费者在启动时会向所有的Broker发送心跳包进行注册,通知Broker消费者上线,下线的时候也会向Broker发送取消注册的请求。
Broker会维护消费者信息的注册信息,在消费者发生变更时会通知消费者进行负载均衡。
Rebalance触发时机
消费者启动时触发:
消费者在启动时会进行一次负载均衡,为自己分配消息队列。
Broker发现消费组变更时触发:
处于以下两种情况之一时会被判断为消费组发生了变化,需要进行负载均衡:
被判定为变化之后,会触发变更事件,向该消费者下的所有消费者发送发送变更请求,通知组下每个消费者进行负载均衡。
Broker收到消费者下线时触发:
如果有消费者向Broker发送UNREGISTER_CLIENT取消注册请求,并且开启了允许通知变更,会触发变更事件。
变更事件同上,Broker会通知该消费者组下的所有消费者进行一次负载均衡。
消费者定时触发:
消费者本身也会定时执行负载均衡,默认是20s执行一次。
消息粒度负载均衡
在RocketMQ5.0之后,增加了消息粒度负载均衡策略,默认且仅使用消息粒度负载均衡策略。
消息粒度负载均衡策略中,同一消费组内的多个消费者将按照消息粒度平均分摊主题中的所有消息。
消息粒度负载均衡策略保证同一个队列的消息可以被组内多个消费者共同处理。
但是该策略使用的消息分配算法结果是随机的,不能指定消息被哪一个特定的消费者处理。
所以多个消费者同时消费同一个消息队列中的消息,服务端也可以保证消息不会被多个消费者重复消费。
消息粒度负载均衡策略适用于绝大多数在线处理的业务场景,对于流式处理、聚合计算等场景,更适合队列粒度的负载均衡策略。
执行流程
负载均衡服务执行逻辑在doRebalance函数,里面会对每个消费者组执行负载均衡操作。
consumerTable这个map对象里存储了消费者组对应的的消费者实例。
private ConcurrentMapString MQConsumerInner consumerTable new ConcurrentHashMapString MQConsumerInner void doRebalance {//每个消费者组都有负载均衡 MapEntryString MQConsumerInner entry : thisconsumerTableentrySet {MQConsumerInner impl entrygetValue impl {try {impldoRebalance} catch Throwable e {logerror e}}}}
由于每个消费者组可能会消费很多topic,每个topic都有自己的不同队列,最终是按topic的维度进行负载均衡。
public void doRebalance(final boolean isOrder) {Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();try {//按topic维度执行负载均衡this.rebalanceByTopic(topic, isOrder);} catch (Throwable e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}this.truncateMessageQueueNotMyTopic();}
最终负载均衡逻辑处理的实现在:
其中分为广播消息和集群消息模型两种情况处理。
负载均衡核心功能的主流程,主要做了4件事情:
负载均衡策略原理
负载均衡策略顶层接口:
interface AllocateMessageQueueStrategy {/*** Allocating by consumer id* 给消费者id分配消费队列*/ListMessageQueue allocatefinal String consumerGroup final String currentCID final ListMessageQueue mqAll final ListString cidAll }
他默认共有7种负载均衡策略实现。
最常用的两种平均分配算法:
AllocateMessageQueueAveragely
是用总数除以消费者个数,余数按消费者顺序分配给消费者。
AlocateMessageQueueAveragelyByCircle
轮流一个一个分配。
参考:
本网站的文章部分内容可能来源于网络和网友发布,仅供大家学习与参考,如有侵权,请联系站长进行删除处理,不代表本网站立场,转载者并注明出处:https://www.jmbhsh.com/yulebagua/31494.html