大家好, 《RocketMQ技术内幕》作者、RocketMQ社区首席布道师、极客时间《中间件核心技术与实战》专栏作者、中通快递基础架构资深架构师,越努力越幸运,唯有坚持不懈,与大家共勉。
Kafka的消息发送与消息消费与分区关联密切,我们从这篇文章开始讲点学习分区相关的知识,本篇文章将重点介绍分区内部的工作机制,即分区状态机运转机制。
Kafka内部分区的运转机制具体实现为PartitionStateMachine,从这个类的注释上来看可以得知Kafka分区的状态共有四个,它们分别是:
NonExistentPartition 表示分区不存在,通常是该分区从未创建过或者创建后被删除。
NewPartition 分区已创建,即分配完成了副本,但还未进行分区Leader选举,即还不存在Leader分区与ISR集合,前一个有效状态为NonExistentPartition。
OnlinePartition 分区处于在线时的状态,表示已经完成了分区选举,成功选举出Leader,此时可以进行消息发送与消息消费,前一个有效状态为NewPartition/OfflinePartition。
OfflinePartition
分区处于离线时状态,表示选举出来的Leader失效了,例如Leader所在的Broker宕机,前一个有效状态为NewPartition/OnlinePartition。
关于分区的状态变如下所示:
接下来本文的行为思路,将会通过源码阅读的方式,深入PartitionStateMachine的实现细节,从而提炼出分区变更实现要点,帮助我们更好的运维kafka。
状态机的启动流程定义在PartitionStateMachine的startup方法,该方法的调用时机:一个新的Broker通过控制器选举成为新的Controller时会被调用。
该方法的声明如下:
状态机的启动主要包括两个步骤:
接下来将详细探讨实现细节。
首先我们来看一下分区的初始化流程,具体代码如下所示:
该方法的实现要点:
值得注意的是,调用changeStateTo方法改变分区的状态,仅仅只是在内存中更新状态,其具体实现如图所示:
具体的做好是将需要更新的状态存储到Map[TopicPartition, PartitionState] 中。
在内存中根据当前维护的LeaderAndISR信息后将状态存储到本地内存后,接下来就是将分区状态向Online状态转换,具体的代码实现见PartitionStateMachine的triggerOnlinePartitionStateChange方法,代码如下所示:
该方法的实现要点是在内存缓存中(Map[TopicPartition, PartitionState] )挑选出状态处于OfflinePartition与NewPartition并且未被删除的分区,驱动状态机,调用handleStateChanges方法尝试向OnlinePartition分区转化。
该方法主要做如下两件事情:
要想清晰而全面的了解分区状态的变更,我还给出了Kafka中所有调用handleStateChanges的调用入口,在后续深入研究Kafka相关机制时会再次一一提及,调用链如下图所示:
由于篇幅的问题,分区信息在其他Broker中的状态同步将在下一篇文章中介绍。
PartitionStateMachine的doHandleStateChanges方法在上一篇中已经详细介绍,[尴尬,在Kafka生产实践中又出问题了] 中详细介绍过,在这里我稍微总结提炼一下:
目标状态为NewPartition、OfflinePartition、NonExistentPartition 这三个状态并没有什么复杂的实现逻辑,只是更新内存中的状态,并在state-change.log文件中将输出状态变更日志,只有目标状态为OnlinePartition时才会详细的处理逻辑。
但或许你有一个疑问,状态变更为NewPartition,什么时候会向OnlinePartition状态转换呢?其实通过调用doHandleStateChanges将目标方法设置为NewPartition后,会紧接着调用triggerOnlinePartitionStateChange等方法,将状态进一步向OnlinePartition状态转化。
由于在[尴尬,在Kafka生产实践中又出问题了] 这篇文章中详细介绍了OfflinePartition向OnlinePartition的转化流程,故本篇文章就将重点放在了NewPartition状态向OnlinePartition的转化处理逻辑,其实也就是分区创建的流程,这块的代码入口如下所示:
由于PartitionStateMachine的initializeLeaderAndIsrForPartitions方法比较长,接下来将分步讲解。
接下来我们详细探讨PartitionStateMachine的initializeLeaderAndIsrForPartitions方法。
Step1:首先获取所有分区对应的在线副本,Seq< Map< TopicPartition, Seq< Int>> > liveReplicasPerPartition 来表示,类比Java的数据结构为List< Map< TopicPartition, List< Interger> >,代码如下所示:
在Kafka中创建一个主题时,kafka首先会根据集群节点的负载情况,根据主题的分区数、副本数,物理机架等信息,生成静态负载情况,存储在/brokers/topics/{topicName},其数据如下图所示:
而liveReplicasPerPartition是在这个数据结构的基础上筛选出在线的broker,例如如果id为4的broker已下线,那么liveReplicasPerPartition中的值就可能如下所示:
["0":[0,1,2],"1":[1,2],"2":[2,0],"3":[0,1],"4":[0,2],"5":[1,0],"6":[0,2,1],"7":[1,0,2]]
Step2:如果一个分区所有预分配的分片都不在线,则打印错误日志,代码如下所示:
Step3:为分区创建leaderIsrAndControllerEpoch信息,代码如下所示:
这里的实现比较简单,值得注意的是初始化时分区的Leader则为ISR列表中的第一个分区。
Step4:将分区的状态信息 leaderIsrAndControllerEpoch(leader,isr,LeaderEpoch、ControllerEpoch)写入到zookeeper中,具体代码如下;
具体就是在zookeeper中创建/broker/topics/{topicName}/partitions/{分区序号}/state,并将leaderIsrAndControllerEpoch写入到上述节点,具体效果如下图所示:
Step5:对zookeeper写入结果进行处理,对应的代码如下所示:
如果在zookeeper中创建成功,将leaderIsrAndControllerEpoch信息缓存到内存中(Map< TopicPartition, leaderIsrAndControllerEpoch>)中,并将信息放入到controllerBrokerRequestBatch,Kafka Broker控制将信息同步到集群的其他Broker上,同时会在state-change.log日志文件中记录状态成功变更日志;如果创建失败,则在state-change.log中输出对应的错误日志。
当然:为了尽量保证上述过程成功创建,Zookeeper的写入过程引入来重试机制来保证最终执行成功,除非一些类似AUTH_FAILED等不可恢复的异常。
分区的信息写入到zookeeper的/broker/topics/{topicName}/partitions/{分区序号}/state文件路径后,会再次调用changeTo方法,在内存中将分区的状态变更为OnlineParttion。
那在什么时候触发真正创建分区相关的文件夹呢?
原来在将分区信息写入到zookeeper指定文件后,由于Kafka Controller订阅了/broker/topics/{topicName}相关节点,故节点的创建会实时告知Kafka Controller,从而执行分区的选择,具体的代码如下所示:
通过Zookeeper的事件监听机制,kafka就这样巧妙的实现了分区状态机的切换。
通过上面的学习,我们对分区的理解应该更加深刻了,从这里我们至少能得出如下结论:
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8