引言
raft
算法中Leader
和Follower
之间是通过 rpc 进行通信的,消息定义如下
type Message struct {
Type MessageType `protobuf:"varint,1,opt,name=type,enum=raftpb.MessageType" json:"type"`
To uint64 `protobuf:"varint,2,opt,name=to" json:"to"`
From uint64 `protobuf:"varint,3,opt,name=from" json:"from"`
Term uint64 `protobuf:"varint,4,opt,name=term" json:"term"`
// logTerm is generally used for appending Raft logs to followers. For example,
// (type=MsgApp,index=100,logTerm=5) means leader appends entries starting at
// index=101, and the term of entry at index 100 is 5.
// (type=MsgAppResp,reject=true,index=100,logTerm=5) means follower rejects some
// entries from its leader as it already has an entry with term 5 at index 100.
LogTerm uint64 `protobuf:"varint,5,opt,name=logTerm" json:"logTerm"`
Index uint64 `protobuf:"varint,6,opt,name=index" json:"index"`
Entries []Entry `protobuf:"bytes,7,rep,name=entries" json:"entries"`
Commit uint64 `protobuf:"varint,8,opt,name=commit" json:"commit"`
Snapshot Snapshot `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"`
Reject bool `protobuf:"varint,10,opt,name=reject" json:"reject"`
RejectHint uint64 `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"`
Context []byte `protobuf:"bytes,12,opt,name=context" json:"context,omitempty"`
}
复制代码
就和注释一样,Leader
给Follower
发送消息的时候,会携带一个LogTerm
和Index
分别表示发送的消息Entries
的前一条记录的Term
和Index
,正常情况下这条记录是应该已经被Follower所接受的,因为Leader
会记录Follower
的进度,然后按照进度和Follower
同步日志。
在Follower
接受到消息的时候,其处理方式如下:
func (r *raft) handleAppendEntries(m pb.Message) {
if m.Index < r.raftLog.committed {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
// Return a hint to the leader about the maximum index and term that the
// two logs could be divergent at. Do this by searching through the
// follower's log for the maximum (index, term) pair with a term <= the
// MsgApp's LogTerm and an index <= the MsgApp's Index. This can help
// skip all indexes in the follower's uncommitted tail with terms
// greater than the MsgApp's LogTerm.
//
// See the other caller for findConflictByTerm (in stepLeader) for a much
// more detailed explanation of this mechanism.
hintIndex := min(m.Index, r.raftLog.lastIndex())
hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
hintTerm, err := r.raftLog.term(hintIndex)
if err != nil {
panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err))
}
r.send(pb.Message{
To: m.From,
Type: pb.MsgAppResp,
Index: m.Index,
Reject: true,
RejectHint: hintIndex,
LogTerm: hintTerm,
})
}
}
复制代码
处理的时候是有一个分支的,用于判断是接受日志,还是拒绝日志。是否拒绝的条件其实就是判断Leader
发送过来的index
对应的term
和Follower
本地存的index
对应的term
是否匹配。匹配则接受日志,不匹配则拒绝日志。当然,不匹配不能仅仅拒绝日志。得反馈写有用的信息,让Leader
知道应该发送哪些消息给你,让你开始接受。
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
if l.matchTerm(index, logTerm) {
lastnewi = index + uint64(len(ents))
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
// ents 中的消息是 index+1, index+2, index+3, ....
// offset, ...,, conflictIndex
// -> ents[ci-offset:]
offset := index + 1
l.append(ents[ci-offset:]...)
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true
}
return 0, false
}
复制代码
Follower 的处理方式
第一步操作如下,确定从那条日志开始找冲突,开始查找的日志必须是我有的日志,所以有一个取较小值的操作
hintIndex := min(m.Index, r.raftlog.lastIndex())
复制代码
第二步操作如下,返回一个hintIndex
,难道这个hintIndex
就是冲突的交汇处?然后Leader
从这个hintIndex
返回日志Follower
接住就好了?
hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
复制代码
要回答上面的问题,得先来看看方法findConflictByTerm
干了些啥,这个方法是找到Follower
中第一条term
小于等于Leader
发送过来的LogTerm
的日志的index
// findConflictByTerm takes an (index, term) pair (indicating a conflicting log
// entry on a leader/follower during an append) and finds the largest index in
// log l with a term <= `term` and an index <= `index`. If no such index exists
// in the log, the log's first index is returned.
//
// The index provided MUST be equal to or less than l.lastIndex(). Invalid
// inputs log a warning and the input index is returned.
func (l *raftLog) findConflictByTerm(index uint64, term uint64) uint64 {
if li := l.lastIndex(); index > li {
// NB: such calls should not exist, but since there is a straightfoward
// way to recover, do it.
//
// It is tempting to also check something about the first index, but
// there is odd behavior with peers that have no log, in which case
// lastIndex will return zero and firstIndex will return one, which
// leads to calls with an index of zero into this method.
l.logger.Warningf("index(%d) is out of range [0, lastIndex(%d)] in findConflictByTerm",
index, li)
return index
}
for {
logTerm, err := l.term(index)
if logTerm <= term || err != nil {
break
}
index--
}
return index
}
复制代码
第三部操作如下,细心点的同学其实会发现这个操作是冗余的,因为在findConflictByTerm
的时候是进行过了查找日志index
的term
的过程的,是通过l.term(index)
进行的,这个函数是可以返回index
对应的term
的。
hintTerm, err := r.raftLog.term(hintIndex)
复制代码
总结上面三步的操作就是找到Follower
日志中第一个小于等于Leader
发送过来的m.LogTerm
的第一条日志的index
和term
,然后返回给Leader
。
问题解决了吗?
Leader
下一步直接从这个hintIndex
发送消息就好了嘛?不是的。比如下面这个场景的时候,Leader
发送一条(idx=9, term=5)
给Follower
,这个时候Follower
会返回(hintIndex=6, hintTerm=2)
。如果Leader
发送(idx=6, term=5)
,那么Follower
会继续拒绝,因为Follower
的index=6
的日志对应的term
是2。Leader
要想消息不被拒绝,通过hintTerm=2
可以知道,应该发送的消息是(idx=1, term=1)
。
// For example, if the leader has:
//
// idx 1 2 3 4 5 6 7 8 9
// -----------------
// term (L) 1 3 3 3 5 5 5 5 5
// term (F) 1 1 1 1 2 2
复制代码
那么Leader
在接收到拒绝的消息之后,应该怎么进行处理的呢?简单的方式是index-1
不断的进行重试,直到找到匹配的节点,但是这样效率是比较低。也可以通过方法findConflictByTerm
找到第一个小于等于term
的index
,因为如果Leader
继续发送大于hintTerm
的消息的时候,Follower
必然还会拒绝。当然这个过程也不是一下子就找到的,Leader
和Follower
进行也可能是多次交流才找到合适的位置进行操作。
nextProbeIdx := m.RejectHint
if m.LogTerm > 0 {
nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
}
if pr.MaybeDecrTo(m.Index, nextProbeIdx) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
}
r.sendAppend(m.From)
}
复制代码
为了更好的说明过程,所以手动的花了下面两张图。
场景1:
场景2:
Follower 添加日志
Follower
添加日志是通过函数append
进行的,如下:
func (l *raftLog) append(ents ...pb.Entry) uint64 {
if len(ents) == 0 {
return l.lastIndex()
}
if after := ents[0].Index - 1; after < l.committed {
l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
}
l.unstable.truncateAndAppend(ents)
return l.lastIndex()
}
复制代码
通过方法名truncateAndAppend
知道,这个方法是处理冲突的,分支default
是处理冲突的操作,把Follower
中after
之后的index
的日志去掉,然后添加到entries
之中就好了。
func (u *unstable) truncateAndAppend(ents []pb.Entry) {
after := ents[0].Index
switch {
case after == u.offset+uint64(len(u.entries)):
// after is the next index in the u.entries
// directly append
u.entries = append(u.entries, ents...)
case after <= u.offset:
u.logger.Infof("replace the unstable entries from index %d", after)
// The log is being truncated to before our current offset
// portion, so set the offset and replace the entries
u.offset = after
u.entries = ents
default:
// remove conflicted ents
// truncate to after and copy to u.entries
// then append
u.logger.Infof("truncate the unstable entries before index %d", after)
u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...)
u.entries = append(u.entries, ents...)
}
}
复制代码
总结
这个就是对论文raft
中,Follower
和Leader
日志不同的时候,Follower
和Leader
的日志保持一致解决方案,还是挺有意思的。