万字长文解析kafka分区工作机制(上篇)

407次阅读  |  发布于2年以前

大家好, 《RocketMQ技术内幕》作者、RocketMQ社区首席布道师、极客时间《中间件核心技术与实战》专栏作者、中通快递基础架构资深架构师,越努力越幸运,唯有坚持不懈,与大家共勉。

Kafka的消息发送与消息消费与分区关联密切,我们从这篇文章开始讲点学习分区相关的知识,本篇文章将重点介绍分区内部的工作机制,即分区状态机运转机制。

1、Kafka分区状态

Kafka内部分区的运转机制具体实现为PartitionStateMachine,从这个类的注释上来看可以得知Kafka分区的状态共有四个,它们分别是:

关于分区的状态变如下所示:

2、Kafka分区状态机

接下来本文的行为思路,将会通过源码阅读的方式,深入PartitionStateMachine的实现细节,从而提炼出分区变更实现要点,帮助我们更好的运维kafka。

2.1 状态机启动流程

状态机的启动流程定义在PartitionStateMachine的startup方法,该方法的调用时机一个新的Broker通过控制器选举成为新的Controller时会被调用

该方法的声明如下:

状态机的启动主要包括两个步骤:

接下来将详细探讨实现细节。

2.1.1 分区状态初始化

首先我们来看一下分区的初始化流程,具体代码如下所示:

该方法的实现要点:

值得注意的是,调用changeStateTo方法改变分区的状态,仅仅只是在内存中更新状态,其具体实现如图所示:

具体的做好是将需要更新的状态存储到Map[TopicPartition, PartitionState] 中。

2.1.2 分区状态运转机制

在内存中根据当前维护的LeaderAndISR信息后将状态存储到本地内存后,接下来就是将分区状态向Online状态转换,具体的代码实现见PartitionStateMachine的triggerOnlinePartitionStateChange方法,代码如下所示:

该方法的实现要点是在内存缓存中(Map[TopicPartition, PartitionState] )挑选出状态处于OfflinePartition与NewPartition并且未被删除的分区,驱动状态机,调用handleStateChanges方法尝试向OnlinePartition分区转化。

该方法主要做如下两件事情:

要想清晰而全面的了解分区状态的变更,我还给出了Kafka中所有调用handleStateChanges的调用入口,在后续深入研究Kafka相关机制时会再次一一提及,调用链如下图所示:

由于篇幅的问题,分区信息在其他Broker中的状态同步将在下一篇文章中介绍。

PartitionStateMachine的doHandleStateChanges方法在上一篇中已经详细介绍,[尴尬,在Kafka生产实践中又出问题了] 中详细介绍过,在这里我稍微总结提炼一下

目标状态为NewPartition、OfflinePartition、NonExistentPartition 这三个状态并没有什么复杂的实现逻辑,只是更新内存中的状态,并在state-change.log文件中将输出状态变更日志,只有目标状态为OnlinePartition时才会详细的处理逻辑。

但或许你有一个疑问,状态变更为NewPartition,什么时候会向OnlinePartition状态转换呢?其实通过调用doHandleStateChanges将目标方法设置为NewPartition后,会紧接着调用triggerOnlinePartitionStateChange等方法,将状态进一步向OnlinePartition状态转化。

由于在[尴尬,在Kafka生产实践中又出问题了] 这篇文章中详细介绍了OfflinePartition向OnlinePartition的转化流程,故本篇文章就将重点放在了NewPartition状态向OnlinePartition的转化处理逻辑,其实也就是分区创建的流程,这块的代码入口如下所示:

由于PartitionStateMachine的initializeLeaderAndIsrForPartitions方法比较长,接下来将分步讲解。

2.1.3 分区初始化流程

接下来我们详细探讨PartitionStateMachine的initializeLeaderAndIsrForPartitions方法。

Step1:首先获取所有分区对应的在线副本,Seq< Map< TopicPartition, Seq< Int>> > liveReplicasPerPartition 来表示,类比Java的数据结构为List< Map< TopicPartition, List< Interger> >,代码如下所示:

在Kafka中创建一个主题时,kafka首先会根据集群节点的负载情况,根据主题的分区数、副本数,物理机架等信息,生成静态负载情况,存储在/brokers/topics/{topicName},其数据如下图所示:

而liveReplicasPerPartition是在这个数据结构的基础上筛选出在线的broker,例如如果id为4的broker已下线,那么liveReplicasPerPartition中的值就可能如下所示:

["0":[0,1,2],"1":[1,2],"2":[2,0],"3":[0,1],"4":[0,2],"5":[1,0],"6":[0,2,1],"7":[1,0,2]]

Step2:如果一个分区所有预分配的分片都不在线,则打印错误日志,代码如下所示:

Step3:为分区创建leaderIsrAndControllerEpoch信息,代码如下所示:

这里的实现比较简单,值得注意的是初始化时分区的Leader则为ISR列表中的第一个分区

Step4:将分区的状态信息 leaderIsrAndControllerEpoch(leader,isr,LeaderEpoch、ControllerEpoch)写入到zookeeper中,具体代码如下;

具体就是在zookeeper中创建/broker/topics/{topicName}/partitions/{分区序号}/state,并将leaderIsrAndControllerEpoch写入到上述节点,具体效果如下图所示:

Step5:对zookeeper写入结果进行处理,对应的代码如下所示:

如果在zookeeper中创建成功,将leaderIsrAndControllerEpoch信息缓存到内存中(Map< TopicPartition, leaderIsrAndControllerEpoch>)中,并将信息放入到controllerBrokerRequestBatch,Kafka Broker控制将信息同步到集群的其他Broker上,同时会在state-change.log日志文件中记录状态成功变更日志;如果创建失败,则在state-change.log中输出对应的错误日志。

当然:为了尽量保证上述过程成功创建,Zookeeper的写入过程引入来重试机制来保证最终执行成功,除非一些类似AUTH_FAILED等不可恢复的异常。

分区的信息写入到zookeeper的/broker/topics/{topicName}/partitions/{分区序号}/state文件路径后,会再次调用changeTo方法,在内存中将分区的状态变更为OnlineParttion。

那在什么时候触发真正创建分区相关的文件夹呢?

原来在将分区信息写入到zookeeper指定文件后,由于Kafka Controller订阅了/broker/topics/{topicName}相关节点,故节点的创建会实时告知Kafka Controller,从而执行分区的选择,具体的代码如下所示:

通过Zookeeper的事件监听机制,kafka就这样巧妙的实现了分区状态机的切换。

3、总结

通过上面的学习,我们对分区的理解应该更加深刻了,从这里我们至少能得出如下结论:

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8