hashicorp raft 源码不多,抛除 boltdb 存储相关的只有 1万多行代码,不用一周时间就能读透所有细节。来看一下论文提到的几个问题如何实现
新的 Leader 当选后不知道 follower 接收了哪些 logs, AppendEntries
时一定报错,resp 里携带 follower 的 getLastIndex 加速重传, 参考函数 replicateTo
func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool) {
......
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Error("failed to appendEntries to", "peer", s.peer, "error", err)
s.failures++
return
}
appendStats(string(s.peer.ID), start, float32(len(req.Entries)))
// Check for a newer term, stop running
if resp.Term > req.Term {
r.handleStaleTerm(s)
return true
}
// Update the last contact
s.setLastContact()
// Update s based on success
if resp.Success {
......
} else {
atomic.StoreUint64(&s.nextIndex, max(min(s.nextIndex-1, resp.LastLog+1), 1))
if resp.NoRetryBackoff {
s.failures = 0
} else {
s.failures++
}
r.logger.Warn("appendEntries rejected, sending older logs", "peer", s.peer, "next", atomic.LoadUint64(&s.nextIndex))
}
......
如果 append 失败,那么根据 LastLog 来更新下一次待发送该 follower 的 nextIndex
当前 hashicorp raft 实现做不到 exactly once, 比如 client 发起了 apply 请求,当 fsm 应用完数据返回前,client 与 raft 断开了连接,client 此时收不到 resp, 重试的话就会造成请求重复,如果非幂等操作,一定会有问题
paper 5.4.2 提到不能 Committing entries from previous terms, 那么如何确保之前的数据己经提交 match 了呢?根据 log matching property, 只需新当选的 leader 使用最新的 term 提交一条 no-op 日志即可
func (r *Raft) runLeader() {
r.logger.Info("entering leader state", "leader", r)
metrics.IncrCounter([]string{"raft", "state", "leader"}, 1)
......
// Start a replication routine for each peer
r.startStopReplication()
// Dispatch a no-op log entry first. This gets this leader up to the latest
// possible commit index, even in the absence of client commands. This used
// to append a configuration entry instead of a noop. However, that permits
// an unbounded number of uncommitted configurations in the log. We now
// maintain that there exists at most one uncommitted configuration entry in
// any log, so we have to do proper no-ops here.
noop := &logFuture{
log: Log{
Type: LogNoop,
},
}
r.dispatchLogs([]*logFuture{noop})
// Sit in the leader loop until we step down
r.leaderLoop()
}
对应到代码里 runLeader
在开启 loop 前主动 dispatch 一个 noop logs
当网络分区时,某个 follower term 会一直增大,然后发起 election 到 raft 集群中,将更高的 term 传播到集群,leader 发现更高的 term 后退步成 follower, raft 是一个 CP 系统,频繁 leader 选举会造成集群无法做到稳定提供服务。
raft 作者 paper 提到了一种 Preventing disruptions when a server rejoins the cluster 就是 PreVote, 当节点无法收到 leader 心跳时,先做一次 PreVote, 并不递增 term, 如果收到大多数节点同意后,再真正的做 Vote. hashicorp raft 实现了 PreVote 嘛?并没有。解决了 high term disruptions 问题嘛?我们来看代码
// requestVote is invoked when we get an request vote RPC call.
func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
defer metrics.MeasureSince([]string{"raft", "rpc", "requestVote"}, time.Now())
r.observe(*req)
// Setup a response
resp := &RequestVoteResponse{
RPCHeader: r.getRPCHeader(),
Term: r.getCurrentTerm(),
Granted: false,
}
var rpcErr error
defer func() {
rpc.Respond(resp, rpcErr)
}()
// Check if we have an existing leader [who's not the candidate] and also
// check the LeadershipTransfer flag is set. Usually votes are rejected if
// there is a known leader. But if the leader initiated a leadership transfer,
// vote!
candidate := r.trans.DecodePeer(req.Candidate)
if leader := r.Leader(); leader != "" && leader != candidate && !req.LeadershipTransfer {
r.logger.Warn("rejecting vote request since we have a leader",
"from", candidate,
"leader", leader)
return
}
......
}
也就是说当不是强制 leader 转移时,如果 follower 的函数 requestVote
发现有 leader 了,那么会拒绝本次 candidate 的选举请求。此时 candidate 会退步成 follower, 如果此时真正的 leader 与 higher term follower 网络连通了,disruptions
现象一定会发生
什么是 Multi-Group
?拿 tikv 举例子,数据按照 key 的二进制排序是一个平坦的 (-∞,+∞) 区间,对这些 keys 空间按照 64MB 大小划分 shard, 每个 shard 三副本分部在不同物理机上,一组 shard 使用 raft 来做选举,也就是说一台机器上可能有成千上万个 raft group
hashicorp 是典型的 Single-Group
raft library, 如果直接用做 Multi-Group
场景,goroutine, 定时器满天飞,由于底层 transport 也不能复用,三台机器两两相连占用的 sockets 就会成千上万,性能相当的差
那么 Multi-Group
相对于 hashicorp 要做哪些优化呢?我们以后分析 dragonboat 和 tikv raft 后再看
关于什么是线性一致性网上有很多,大家可以搜一下,满足 linearizability 条件的分布式系统就像单个节点一样,永远不会读到过期 stale 数据(寄存器读),但是在 read 操作的 (invoke, resp) 时序区间如果有写入操作,此时可能读到不一样的数据,也符合线性一致性。
因为 raft 节点间可能有网络分区,产生多个 leader, 所以要用心跳来确认当前 leader 节点。此外在 read 发起时刻的 commitIndex 要确保 apply 到 fsm, 此时再从 fsm 中读数据才不是过期的,这个就叫做 readIndex
,当然 raft 论文还有优化版的 leaseRead, tikv 就采用这一方案,以后再讲这块。
先看 hashicorp 实现,consul[1] 有一段代码展示如何做到线性一致性读
// consistentRead is used to ensure we do not perform a stale
// read. This is done by verifying leadership before the read.
func (s *Server) consistentRead() error {
future := s.raft.VerifyLeader()
if err := future.Error(); err != nil {
return err //fail fast if leader verification fails
}
......
}
略去无关代码,实际上就是调用 VerifyLeader
, 然后 future.Error()
阻塞这里等待返回结果,如果没有报错,说明当前节点是 leader
func (r *Raft) VerifyLeader() Future {
metrics.IncrCounter([]string{"raft", "verify_leader"}, 1)
verifyFuture := &verifyFuture{}
verifyFuture.init()
select {
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.verifyCh <- verifyFuture:
return verifyFuture
}
}
简单的发送一个 verifyFuture
请求到 r.verifyCh
中,我们来看一下实现。
func (r *Raft) leaderLoop() {
for r.getState() == Leader {
select {
......
case v := <-r.verifyCh:
if v.quorumSize == 0 {
// Just dispatched, start the verification
r.verifyLeader(v)
} else if v.votes < v.quorumSize {
// Early return, means there must be a new leader
r.logger.Warn("new leader elected, stepping down")
r.setState(Follower)
delete(r.leaderState.notify, v)
for _, repl := range r.leaderState.replState {
repl.cleanNotify(v)
}
v.respond(ErrNotLeader)
} else {
// Quorum of members agree, we are still leader
delete(r.leaderState.notify, v)
for _, repl := range r.leaderState.replState {
repl.cleanNotify(v)
}
v.respond(nil)
}
......
runFollower
和 runCandidate
不接受 r.verifyCh
数据,直接返回报错,也就是说 hashicorp 的实现不允许 follower 读数据。所以直接看 runLeader
实现即可。
quorumSize
为 0, 说明是第一次接收请求,还没有获取 majority 数据,应该设置成 (n+1)/2. 调用 verifyLeader
开始验证votes
小于 quorumSize
, 那一定不是 leader, 可以提前返回了votes
大于等于 quorumSize
,那也可以提前确认是 leaderfunc (r *Raft) verifyLeader(v *verifyFuture) {
// Current leader always votes for self
v.votes = 1
// Set the quorum size, hot-path for single node
v.quorumSize = r.quorumSize()
if v.quorumSize == 1 {
v.respond(nil)
return
}
// Track this request
v.notifyCh = r.verifyCh
r.leaderState.notify[v] = struct{}{}
// Trigger immediate heartbeats
for _, repl := range r.leaderState.replState {
repl.notifyLock.Lock()
repl.notify[v] = struct{}{}
repl.notifyLock.Unlock()
asyncNotifyCh(repl.notifyCh)
}
}
verifyLeader
首先给自己投票,所以 votes 默认设置 1每个 follower 的 replicate
函数会启动一个心跳 goroutine, 除了定时发送以外,还会监听 notifyCh, 立刻触发心跳
func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
var failures uint64
req := AppendEntriesRequest{
RPCHeader: r.getRPCHeader(),
Term: s.currentTerm,
Leader: r.trans.EncodePeer(r.localID, r.localAddr),
}
var resp AppendEntriesResponse
for {
// Wait for the next heartbeat interval or forced notify
select {
case <-s.notifyCh:
case <-randomTimeout(r.conf.HeartbeatTimeout / 10):
case <-stopCh:
return
}
start := time.Now()
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
......
} else {
s.setLastContact()
failures = 0
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(s.peer.ID)}, start)
s.notifyAll(resp.Success)
}
}
}
// notifyAll is used to notify all the waiting verify futures
// if the follower believes we are still the leader.
func (s *followerReplication) notifyAll(leader bool) {
// Clear the waiting notifies minimizing lock time
s.notifyLock.Lock()
n := s.notify
s.notify = make(map[*verifyFuture]struct{})
s.notifyLock.Unlock()
// Submit our votes
for v := range n {
v.vote(leader)
}
}
AppendEntries
函数返回后,会调用 notifyAll
发起投票请求, s.notify
是一个 map, 所以一次心跳确认 leader, 会服务很多读请求。另外 follower 的 transport 会特殊处理心跳消息,参考函数 handleCommand
, 会直接调用 processHeartbeat
来验证心跳消息,不走 raft 状态机
func (v *verifyFuture) vote(leader bool) {
v.voteLock.Lock()
defer v.voteLock.Unlock()
// Guard against having notified already
if v.notifyCh == nil {
return
}
if leader {
v.votes++
if v.votes >= v.quorumSize {
v.notifyCh <- v
v.notifyCh = nil
}
} else {
v.notifyCh <- v
v.notifyCh = nil
}
}
获得的票数 votes++, 如果达到了 quorumSize, 把自己发送到 v.notifyCh 并置为 nil, 因为己经有结果了。另外如果被一个拒绝了,那么也可以提前返回证明不是 leader 了。最后触发 leaderLoop 的 r.verifyCh 的分支, 获得了 quorumSize 后 v.respond
解除客户端阻塞
看完了源码,心跳确认己经实现了,那么如何确保 readIndex 呢?issues-359[2] 有人问如何用 hashicorp 实现线性一致性,作者有回复
Another subtly: the leader has to get passed its write barrier: it has to have finished applying committed logs to it's FSM before it can proceed.
作者说还需要调用 Barrier()
来确保之前 committed logs apply 到 fsm 中,也就是发送类型 LogBarrier 的日志走一遍状态机流程。个人认为这是很慢的,因为只要确保发起请求时的 commitIndex apply 到 fsm 即满足线性一致性,如果再调用一次 Barrier()
虽然也保证了,但是会额外 apply 这中间堆积的的所有 uncommitted logs, 所以实现不高效
hashicorp 不允许 follower 读数据,如果允许的话,怎么实现 follower read Linearizability 呢?其实业界实现,还是得走 leader, commitIndex 也要从 leader 获取后才能在 follower 中读取。etcd 实现了 follower read, 以后分析 etcd 源码时再看
hashicorp raft 源码分析就到这了,总结一下吧,开箱即用,易懂。想学习 raft 的建议通读,如果想用到高并发工程上还有点距离,需要做下优化,至于 Multi-Group 那就更不适合了
[1]consul linearizability: https://github.com/hashicorp/consul/blob/89158c7a7665a48735ecf8541b72c83c860fe195/agent/consul/rpc.go#L524,
[2]issues-359: https://github.com/hashicorp/raft/issues/359,
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8