Broker端与客户端的心跳在Kafka中非常的重要,因为一旦在一个心跳过期周期内(默认10s),Broker端的消费组组协调器(GroupCoordinator)会把消费者从消费组中移除,从而触发重平衡。在2.4.x以下其版本中,消费组一旦进入重平衡状态,该消费组内所有消费者全部暂停消费,直到重平衡完成。
本文将来探讨Kafka的心跳机制的具体实现。本文的组织结构如下:
温馨提示:如果大家对源码阅读不感兴趣,可以直接跳到本文的第二部分,用流程图、数据结构图阐述心跳的实现机制。
在介绍源码分析之前介绍笔直的一条源码分析经验:找准入口,了解调用链路。故笔者会先寻找归纳出Kafka心跳处理的所有入口。
Kafka心跳包的处理流程如下图所示:
图的右边是kafka心跳在服务端的核心处理流程,而左边主要展示kafka中所有的心跳请求,根据上图得知Kafka触发心跳处理的主要请求分别如下:
如果__consumers_offsets主题中的各个分区Leader发生变化,与特定分区的组协调器需要重新选举,与此组协调器相关的消费者将触发重平衡。
上述任何一种请求,都能表明消费端是存活的,故能有效阻止服务端将客户端端心跳设置为过期,进入下一个心跳检测周期。
上述各个入口,特别是__consumers_offsets的ISR对消费组的影响,后续会专门展开研究,现在我们将重心转移到服务端是如何处理一个心跳包的。
从上面的流程图可以得出,Kafka收到一个心跳包后的处理入口为GroupCoordinator的completeAndScheduleNextExpiration方法,核心代码如下图所示:
在介绍该方法之前首先介绍一个该方法的入参含义:
Step1:为消费组设置唯一标识:groupId + "-" + memberId构成。
Step2:将hearbeatSatisfied设置为true,表示该消费者收到一个有效的心跳包。
Step3:收到一个有效的心跳包,通知定时调度器停止本次的心跳过期检测。
Step4:构建一个DelayedHearbeat,进入下一个心跳检测周期。
接下来将分别对Step3、Step4展开详细介绍。
在收到一个心跳包时,尝试将本次检测设置成功,具体的实现由DelayedOperation的checkAndComplete方法,代码如下:
Kafka使用一个数据结构来存储需要跟踪的所有消费者,在这里成为Watch机制。
实现要点:根据key获取WatchList,然后从获取的WatchList中内部的ConcurrentMap中再按照Key获取对应与当前消费者对应的Watch。
Watch的数据结构如下:
接下来重点关注Watches的tryCompleteWatched方法,该方法的详细调用代码如下图所示:
这边先重点介绍一下组协调器判断一次成功的心跳检测的三个标准中满足一个即可(GroupCoordinator的tryCompleteHeartbeat方法):- 如果消费组的状态处于Dead
上述代码的实现比较简单,这里就不一一罗列,其核心关键点如下:
为了方便大家阅读源码,其主要的调用时序图如下:
在接受到一个新的心跳包首先用于清除上一轮设置的延迟任务,然后需要开启一个新的延迟任务,接下来我们将来具体看看Kafka如何开启新一轮心跳检测机制,**其本质上是Kafka的延迟(定时)实现原理。**代码入口如下图所示:
开启下一轮调度时首先将Member的heartbeatSatisfied设置为false。
其核心思想是创建一个心跳延迟任务DelayedHeartbeat,并对其检测是否完成或者添加Watch,启动心跳延迟或者等待下一个心跳包的到来。
其实看到这里,我们应该能得到一个关于Kafka心跳检测机制的实现思路:
接下来我们详细探讨一下DelayedOperationPurgatory的tryCompleteElseWatch方法,其代码如下图所示:
Step1:尝试调用DelayedHeartbeat的tryComplete方法,判断是否可以判断完成,这里主要是消费组是否为重平衡或者状态为Dead,如果上述情况不满足,则会返回false,因为在发起下一轮心跳包时已将heartbeatSatisfied设置为false。
Step2:为该消费者添加到Watch中,表示kafka需要跟踪该消费者的心跳。
Step3:再次调用maybeTryComplete方法,再尝试判断是否该心跳检测完成。
Step4:如果没有完成,则该任务延迟任务(DelayedHeartbeat)添加到定时调度中。
接下来将进入到Kafka心跳的核心机制,即延迟任务的实现机制。
每一个待执行的延迟任务被封装在TimeTaskEntry中,这个一个典型的双链表,数据结构说明说明如下: 并持有一个关键字段:该定时任务的过期时间,等于系统当前时间+过期时间,在心跳检测场景中默认为10s。
继续跟踪SystemTimer的addTimerTaskEntry,其代码如下:
addTimerTaskEntry的核心实现如下:
尝试将延迟任务添加到时间轮,如果已经过期,则提交到线程池,触发心跳过期的逻辑,提交到线程后,DelayedOperation的run方法会被调用,最终onExpiration方法被调用。
接下来重点谈一下往时间轮中添加任务的具体实现,核心代码见下图所示:
核心实现要点:
Step1:如果任务已经被取消或者已过期,返回false。如果返回false,则会触发定时任务过期。
Step2根据过期时间,放入到时间轮中指定的位置,时间轮的数据结构如下:
每一个格代表一个时间间隔,例如200ms,当前指针指向的格子,代表该格子中的所有任务过期,例如现在要要插入一个700ms过期,从当前指针的下一格开始算起,放入第4格中。
另外时间轮的总格子有限,则该时间轮能计算的最大时间是有限的,例如一个8格的时间轮,每一格代表200ms,则如果要在2s后过期,显然这个时间轮无法存储,通常的解决方案是采用多级时间轮,另外一级的时间轮,其时间精度会更粗。
结合上述关于时间轮的原理,再去看上述代码,就显得容易看懂了。
Step3:就是处理第一级时间轮无法满足过期时间,则放入到第二级时间轮中。
基于时间轮算法,除了数据按找时间轮到方向、触发时间存储在合适的刻度量,还需要驱动时间轮指针。Kafka中的驱动时间轮入口为:
具体实现代码如下:
具体就是将指针处的所有任务全部拉取出来,执行addTimeTaskEntry,其中过期的任务将提交到线程池触发延迟任务的执行。
上述代码看起来比较简单,就不一一介绍,为了方便大家读懂上面的代码,我们只需要了解一下kafka采用时间轮的实际存储数据结构,即能很容易理解上述代码:
其核心特点:环形队列就是一个数组,每一个元素在Kafka中对应一个桶,每一个桶存储一个TimerTaskList(链表),每次指针指向的TimerTaskList,将该链表中的元素代表的任务全部执行。
读起源码来说或许比较枯燥,接下来给出Kafka心跳处理的图解,重点是阐述Kafka时间轮算法的核心数据结构。
如果这篇文章对您有所帮助,或者有所启发的话,帮忙关注一下,您的支持是我坚持写作最大的动力。
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8