1. 首页 > 百货

Kafka放弃Zookeeper后如何存储主题与消费组呢

由于笔者公司目前使用的kafka版本是2.2.1,故当下关于kafka的内核研究目前主要是基于该版本,当然该专栏还会继续关注Kafka3.0。

我在使用kafka时发现客户端可以不依赖Zookeeper的情况下完成消息发送、消息消费,众所周知早期的Kafka,所有的元信息(topic、消费组、集群)等信息都存储在Zookeeper中,原先的消息发送客户端、消息消费客户端都需要依赖Zookeeper。

温馨提示:Kafka逐步开启了去zookeeper化,到kafka2.8之前实现了消息发送者、消息消费者的去zookeeper化,从2.8版之后broker也支持去zookeeper。

那kafka2.2.1版本中,主题的路由信息、消费组信息分别是存储在什么地方呢?消息发送端、消息消费端是如何感知的呢?

温馨提示:如果大家对Kafka有基本的了解,不防停留片刻,稍作思考。

1.主题元数据存储在Zookeeper中

进入到Kafka Broker连接的Zookeeper集群,我们不难发现在/{namespace}/brokers/topics节点下存在该集群中所有的主题信息,展开某一个具体的主题,如下图所示:

关于主题的元信息,其实主要包括如下信息:

controller_epoch 控制器当前的选举版本。

leader 该分区的Leader所在的Broker节点ID。

version 当前的存储格式版本,默认为1。

leader_epoch 分区Leader的选举版本。

isr 分区的ISR集合。

主题的路由信息是存储在Zookeeper中,那为什么客户端只需要Broker的地址,就可以获取到主题的路由信息呢?

1.1 主题路由寻址

查找路由信息在Kafka2.1版本中是发送ApiKeys.METADATA请求,该请求的响应逻辑定义在Broker中,那客户端是如何对Broker进行路由,Broker中的路由信息又是从何而来呢?

消息发送者首次发送METADATA定位Broker机制:首次发送请求会从KafkaProducer的bootstrap.servers中设置的broker列表中选择当前最空闲的Broker,后续能感知所有的Broker。

消息消费者发送METADATA定位Broker机制:发送到当前消费组的组协调所在的Broker。

根据查阅KafkaApis的handleTopicMetadataRequest方法,进行一些ACL校验后进入其核心方法:

关键点:

在/brokers/topics节点下创建子节点,子节点名称为topic的名称。

根据当前kafka分区的机架信息,分区数、副本数,broker节点数,进行分配,主要尽量将主分区不放在同一个机架、存储在主题的节点信息中,例如{"version":1,"partitions":{"4":[2,0,1],"5":[0,1,2],"1":[2,1,0],"0":[1,0,2],"2":[0,2,1],"3":[1,2,0]}},其中key为分区名称,值为副本所在的brokerId,其中排在第一位是倾向性Leader,主题中存储的值是静态数据,具体还会触发选举,选举算法会参考这个分配。

控制器还会注册调用registerPartitionModificationsHandlers方法,监听主题信息的变化,从而触发后续流程,启动分区的真正创建(各个分区的Leader选举等)。

温馨提示:Kafka开启自动创建主题,分区数量取自kafkabroker中的num.partitions参数,默认为1,副本因子则取决于default.replication.factor参数,默认为1。

1.2 路由信息同步机制

MetadataCache,元信息缓存,那这里的数据又是从何而来呢?MetadataCache中路由信息的更新调用链如下图所示:

Kafka的KafkaController(后续统称控制器)首先会听/brokers/topics/{topicName}节点内容的变化,一旦有新主题创建或主题信息变更,topic变更事件就会触发,此时TopicChange的process方法会调用,最终调用updatePartitionReplicaAssignment,也就是一旦主题的信息发生变更,控制器会向所有Broker节点发送ApiKeys.UPDATE_METADATA,各个Broker在到该请求后,会更新各个Broker中的内存缓存,供消息发送者查找topic路由信息。

即Kafka2.2版本中,topic的元信息存储在Zookeeper中,同时KafkaController会监听zookeeper中相关节点,从而感知信息变更,从而将路由信息通过RPC发送到集群内所有的Broker中,故每一个Broker的内存中都存储一份相同的路由信息。

Kafka2.8版本开始尝试去Zookeeper化。

思考题:为什么各个Broker不都监听zookeeper,从而感知topic变化,更新本地内存呢?欢迎各位留言讨论或私信dingwpmz,共同交流。

2.消费组存储在位点主题中

在较低版本中,启动Kafka消费组需要指定zookeeper集群的地址,因为在低版本中消费组的元信息存储在zookeeper中,具体路径为/consumers,但后续版本中消费端的启动已经不需指定zookeeper,而是指定broker的地址列表即可,那这个时候,消费组的信息是存储在哪呢?

在前面介绍Kafka故障解决相关的文章中我们常常看到消费组组协调器,内部持有一个消费组元数据管理器GroupMetadataManager,相关的代码截图如下所示:

在GroupMetadataManager对象中持有一个Map结构的缓存,其键为消费组的名称,值为GroupMetadata对象,内部记录消费组的状态,消费组的成员列表,位点信息。

内存的特点:访问高效,但随着Broker进程的退出而丢失,消费组存储在内存中显然不行,但又不在zookeeper中,那消费组的定义信息存储在什么地方呢?

2.1消费组元信息存储

消费组的定义信息存储在系统主题__consumer_offsets中,什么,这个主题不是用来存储消费位点的吗?

原来__consumer_offsets不仅存储消费组的位点信息,还存储消费组的元信息,具体代码入口:GroupMetadataManager#storeGroup,部分代码截图如下所示:

即消费组元信息当成一条消息写入到__consumer_offsets,一条消费组元信息存储的value值,由GroupMetadataManager的groupMetadataValue方法定义,具体代码如下:

随着Kafka的不断演化,存储格式进行了多次修改,对应的版本如下:

消费组元信息存储的格式为Json,具体存储的内容:

client_id 客户端ID。

client_host 客户端ip地址。

rebalance_timeout 重平衡时间,默认为300000,5分钟。

session_timeout 会话超时时间,默认为10s。

subscription元信息,取自AbstractCoordinator的抽象方法metadata(),消费组的实现类为ConsumerCoordinator,主要是遍历负载算法,每一个负载算法根据订阅信息计算元信息。

assignment

各个消费者的队列负载情况。

温馨提示:GroupMetadataManager的storeGroup方法的调用时间是在消费组进行重平衡时,具体是重平衡第二阶段(SYNC_GROUP)与完成重平衡。

2.2加载消息组元信息

消费组元信息是存储在 __consumer_offsets主题中,在什么时候会从该主题中加载到内存中呢?

在__consumer_offsets的分区发生Leader选举时会触发将对应分区中的数据加载到内存,具体的处理入口在KafkaApis的handleLeaderAndIsrRequest方法,简易调用链如下图所示:

3.总结

本文主要介绍了Kafka主题与消费组的持久化机制,在Kafka2.8版本开始,官方逐步去除对Zookeeper的依赖,那kafka3.x之后,又会是如何存储消费组、主题的信息呢?

本网站的文章部分内容可能来源于网络和网友发布,仅供大家学习与参考,如有侵权,请联系站长进行删除处理,不代表本网站立场,转载者并注明出处:https://www.jmbhsh.com/baihuo725/36016.html

联系我们

QQ号:***

微信号:***

工作日:9:30-18:30,节假日休息