介绍
这是我学习MIT 6.824的笔记,代码是比较早之前写好的,写文章是为了复习。本人水平极差人还懒,边学边写肯定很多错误,如果有人能看到这篇文章,欢迎在评论指出我的问题。
实验要求
本实验要求补充raft/raft.go
中的代码,实现Leader的选举。实验提供的测试有两个:
第一个是创建三个正常的节点,看他们能否完成选举;
第二个会模拟通信故障,检查你的代码能否在失去部分节点后依然及时选举出Leader。
Raft是什么
首先要讲讲 replicated state machines 是什么。这是指一组服务器能够维持在相同的状态进行计算,并且在某些服务器故障后仍能正常运行。实现这样的状态机,我们可以在所有服务器上各自维护一个存放 操作(也就是Log) 的相同的队列(对非确定性的操作,比如获取当前时间
,由于网络延迟,必然在每台服务器上结果是不同的,因此需要特殊处理。Fault-Tolerant Virtual Machines中有提到)这样每台服务器根据这个队列依次进行计算,就可以达到相同的状态。所以,我们需要一个算法来让服务器互相沟通,并保持一个相同的队列,Raft就是其中一种。
Raft中LogEntries
以Leader
为准,成为Leader
的节点不断向其他节点广播自己的信息,follower
则据此更新自己的LogEntries
.
Raft将共识问题分为三个独立的子问题,分别Leader Eelection, Log Replication和Safety。这个LAB-2A中,我们要实现的就是Raft算法中的Leader Election。
选举过程
根据论文,我们可以得知,Raft中每个节点都在Follower, Candidate, Leader
三种状态中转换,实现的难点就在于搞清楚节点在不同状态下应有的行为,还有应何时转换状态。
服务器的变量定义
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
// 2A
leader int // 该节点认为的leader编号,-1表示不知道谁是leader
currentTerm int // 该节点所在的term,初始化为0
votedFor int // 本term中给谁投了票的index,-1表示没投过
electionTimeout time.Duration // 超时则进行选举,设置为300ms~500ms中随机取值
electionTimer time.Time // 上一次收到heartbeat的时间
state string // 3种状态
}
复制代码
RPC
Raft中节点间通信内容有两种类型,分别是AppendEntries
和RequestVote
。
AppendEntries RPC
AppendEntries RPC 由Leader
调用,用于通知其他节点存储 LogEntries
,当 LogEntries
为空时也可以用于发送心跳。在该实验中,只需要把该RPC看作发送心跳包即可,也就说只是用来检测Leader
与其他节点的通信情况,所以我们在这里结构比较简单。
// AppendEntries RPC arguments structure
type AppendEntriesArgs struct {
Term int // leader's term
LeaderId int
LogEntries []LogEntry // 该实验中只需为空
}
// AppendEntries RPC reply structure
type AppendEntriesReply struct {
Term int // currentTerm, for leader to update itself
Success bool
}
type LogEntry struct {
}
复制代码
RequestVote RPC
RequestVote RPC 由Candidate
调用,用于请求其他节点为该节点投票。
type RequestVoteArgs struct {
Term int // candidate’s term
CandidateId int // candidate requesting vote
}
type RequestVoteReply struct {
Term int // currentTerm, for candidate to update itself
VoteGranted bool // true means candidate received vote
}
复制代码
不同状态下的表现
接下来我们要梳理三种状态下节点应有的表现
通用规则:
- 遇到
Term
小于currentTerm
的请求可以直接拒绝。 - 遇到
Term
大于等于currentTerm
的请求,则转换为Follower
,并更新currentTerm
。
- Follower
- 只会被动地接收来自其他节点的请求,而不会主动去发请求
- 当一段时间没有收到心跳时,转换为
Candidate
- Candidate
- 进入该状态时,会将
currentTerm + 1
, 并发起一次选举 - 当拿到了大多数节点的投票时,就会转换为
Leader
- 当接收到了AppendEntries时,如果该请求的
Term
>=currentTerm
,则转换为Follower
, 更新currentTerm
,并记录发送者为Leader
- 当选举超时,即以上两种情况都没有发生时,将
currentTerm + 1
,并发起一次选举
- 进入该状态时,会将
- Leader
- 当进入该状态时(选举成功),立刻向其他节点发送AppendEntries(心跳)
- 会定期向所有节点发送AppendEntries(心跳)
实现骨架
可以看到三种状态都是会有主动做一些事的(包括Follower
,他会主动转换状态),所以我们可以创建一个函数,在里面无限循环加sleep
来实现定期任务
// run the raft server
func (rf *Raft) run() {
... // 定义一些变量,主要是用于统计选举结果的
for !rf.killed() {
rf.mu.Lock() // 没有考虑性能,所以锁的范围得比较大,可以多用原子变量来解决 我这个设计不好
switch rf.state {
case Follower:
...
case Leader:
...
case Candidate:
...
}
rf.mu.Unlock()
time.Sleep(time.Millisecond * 100)
}
}
复制代码
然后在Raft.Make
中初始化服务器后,启动一个新线程运行run()
go rf.run()
复制代码
遇到的难点
Candidate收集投票的过程
Candidate
在选举中只要能拿到超过1/2的票数就可以视为成功。因此,没有必要等待全部RequestVote
返回结果再判断,而是采用异步模型,每次循环中都判断当前拿到的票数是否已经超过1/2,如果满足要求,则立刻转换为Leader
,在这之后返回的结果可以忽视。
我的实现中,发起投票的代码如下,pVotesReceived
是一个int64
指针,用于记录本次选举收到的投票,这个变量会在每轮选举开始前被初始化。这里用go func(server int){...}
发起了多个线程去请求投票,返回结果时,会给pVotesReceived
的值加一。
args := &RequestVoteArgs{Term: rf.currentTerm, CandidateId: rf.me}
var v atomic.Value
v.Store(args)
for i := 0; i < len(rf.peers); i++ {
if i != rf.me {
go func(server int) {
reply := RequestVoteReply{}
rf.mu.Lock()
log.Printf("[Term: %v | %v | $%v]: send RequestVote { Term: %v } to $%v.", rf.currentTerm, rf.state, rf.me, args.Term, server)
rf.mu.Unlock()
ok := rf.sendRequestVote(server, v.Load().(*RequestVoteArgs), &reply)
rf.mu.Lock()
// 这里要比较Term,确保返回的结果是本次选举的
if ok && reply.VoteGranted && reply.Term == rf.currentTerm {
atomic.AddInt64(pVotesReceived, 1)
log.Printf("[Term: %v | %v | $%v]: receive vote from $%v. Votes Received: %v.", rf.currentTerm, rf.state, rf.me, server, atomic.LoadInt64(pVotesReceived))
} else {
log.Printf("[Term: %v | %v | $%v]: cannot receive vote from $%v.[network: %v] Votes Received: %v.", rf.currentTerm, rf.state, rf.me, server, ok, atomic.LoadInt64(pVotesReceived))
}
rf.mu.Unlock()
}(i)
}
}
复制代码
同时,在Candidate
的循环中,还会定期检查是否攒够票数,一旦满足条件立刻进行转换。
// If votes received from majority of servers: become leader
if int(atomic.LoadInt64(pVotesReceived)) > len(rf.peers)/2 {
rf.state = Leader
rf.leader = rf.me
log.Printf("[Term: %v | %v | $%v]: I am a Leader!!!", rf.currentTerm, rf.state, rf.me)
} else if electionStart.Add(rf.electionTimeout).Before(time.Now()) {
// 如果经过1个electionTimeout还没有成为Leader,则重新发起一次选举
rf.convertToCandidate(rf.currentTerm + 1)
rf.electionTimeout = time.Duration(250+30*rand.Intn(10)) * time.Millisecond // 重新设置间隔250ms~500ms
atomic.StoreInt64(pVotesReceived, 0)
log.Printf("[Term: %v | %v | $%v]: restart a election", rf.currentTerm, rf.state, rf.me)
}
复制代码
你可能会有疑问,如果转换为Leader
后,前面的RequestVote
才返回,修改了pVoteReceived
的值怎么办呢?这一点不用担心,因为只有在Candidate
状态下pVoteReceived
的值才有意义,其他状态不会使用这个值,因此即使被修改也没关系;而且当我们每次从其他状态进入Candidate
时都会重新初始化pVoteReceived
,注意此时currentTerm
也必然比之前的RequestVote
中的Term
高,所以只要像代码中那样限定条件
// 这里要比较Term,确保返回的结果是本次选举的
if ok && reply.VoteGranted && reply.Term == rf.currentTerm {
atomic.AddInt64(pVotesReceived, 1)
log.Printf("[Term: %v | %v | $%v]: receive vote from $%v. Votes Received: %v.", rf.currentTerm, rf.state, rf.me, server, atomic.LoadInt64(pVotesReceived))
} else {
log.Printf("[Term: %v | %v | $%v]: cannot receive vote from $%v.[network: %v] Votes Received: %v.", rf.currentTerm, rf.state, rf.me, server, ok, atomic.LoadInt64(pVotesReceived))
}
复制代码
就可以保证新一轮选举中的pVoteReceived
不会被之前选举的结果修改。
Split Votes
如果经常出现节点同时发起选举的情况,要注意代码中的electionTimeout
随机值设置是否合理,尽量让各个节点electionTimeout
的值相差大一点。还要注意当Candidate
选举超时,重新发起选举的时间是否也是随机值。这两个地方设置合理应该就可以解决这个问题。
总结
Raft论文的介绍非常详细,只要照做就好了。多线程下调试时,多用printf输出状态,如果程序没反应多找找各种临界区看看是不是死锁了,总得来说这个实验难度不高,感觉比LAB-1 MapReduce简单些。