LAB-2A Raft Leader Election

介绍

这是我学习MIT 6.824的笔记,代码是比较早之前写好的,写文章是为了复习。本人水平极差人还懒,边学边写肯定很多错误,如果有人能看到这篇文章,欢迎在评论指出我的问题。

实验要求

本实验要求补充raft/raft.go中的代码,实现Leader的选举。实验提供的测试有两个:

第一个是创建三个正常的节点,看他们能否完成选举;

第二个会模拟通信故障,检查你的代码能否在失去部分节点后依然及时选举出Leader。

Raft是什么

首先要讲讲 replicated state machines 是什么。这是指一组服务器能够维持在相同的状态进行计算,并且在某些服务器故障后仍能正常运行。实现这样的状态机,我们可以在所有服务器上各自维护一个存放 操作(也就是Log) 的相同的队列(对非确定性的操作,比如获取当前时间,由于网络延迟,必然在每台服务器上结果是不同的,因此需要特殊处理。Fault-Tolerant Virtual Machines中有提到)这样每台服务器根据这个队列依次进行计算,就可以达到相同的状态。所以,我们需要一个算法来让服务器互相沟通,并保持一个相同的队列,Raft就是其中一种。

image.png

Raft中LogEntriesLeader为准,成为Leader的节点不断向其他节点广播自己的信息,follower则据此更新自己的LogEntries.

这个网站用动画表现了Raft的运行过程

Raft将共识问题分为三个独立的子问题,分别Leader Eelection, Log Replication和Safety。这个LAB-2A中,我们要实现的就是Raft算法中的Leader Election。

选举过程

根据论文,我们可以得知,Raft中每个节点都在Follower, Candidate, Leader三种状态中转换,实现的难点就在于搞清楚节点在不同状态下应有的行为,还有应何时转换状态。

服务器的变量定义

type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (2A, 2B, 2C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.

	// 2A
	leader          int           // 该节点认为的leader编号,-1表示不知道谁是leader
	currentTerm     int           // 该节点所在的term,初始化为0
	votedFor        int           // 本term中给谁投了票的index,-1表示没投过
	electionTimeout time.Duration // 超时则进行选举,设置为300ms~500ms中随机取值
	electionTimer   time.Time     // 上一次收到heartbeat的时间
	state           string        // 3种状态
}
复制代码

RPC

Raft中节点间通信内容有两种类型,分别是AppendEntriesRequestVote

AppendEntries RPC

AppendEntries RPC 由Leader调用,用于通知其他节点存储 LogEntries,当 LogEntries 为空时也可以用于发送心跳。在该实验中,只需要把该RPC看作发送心跳包即可,也就说只是用来检测Leader与其他节点的通信情况,所以我们在这里结构比较简单。

// AppendEntries RPC arguments structure
type AppendEntriesArgs struct {
	Term       int // leader's term
	LeaderId   int
	LogEntries []LogEntry // 该实验中只需为空
}

// AppendEntries RPC reply structure
type AppendEntriesReply struct {
	Term    int // currentTerm, for leader to update itself
	Success bool
}

type LogEntry struct {
}
复制代码

RequestVote RPC

RequestVote RPC 由Candidate调用,用于请求其他节点为该节点投票。

type RequestVoteArgs struct {
	Term        int // candidate’s term
	CandidateId int // candidate requesting vote
}

type RequestVoteReply struct {
	Term        int  // currentTerm, for candidate to update itself
	VoteGranted bool // true means candidate received vote
}
复制代码

不同状态下的表现

接下来我们要梳理三种状态下节点应有的表现

通用规则:

  1. 遇到Term小于currentTerm的请求可以直接拒绝。
  2. 遇到Term大于等于currentTerm的请求,则转换为Follower,并更新currentTerm
  • Follower
    • 只会被动地接收来自其他节点的请求,而不会主动去发请求
    • 当一段时间没有收到心跳时,转换为Candidate
  • Candidate
    • 进入该状态时,会将currentTerm + 1, 并发起一次选举
    • 当拿到了大多数节点的投票时,就会转换为Leader
    • 当接收到了AppendEntries时,如果该请求的Term>=currentTerm,则转换为Follower, 更新currentTerm,并记录发送者为Leader
    • 当选举超时,即以上两种情况都没有发生时,将currentTerm + 1,并发起一次选举
  • Leader
    • 当进入该状态时(选举成功),立刻向其他节点发送AppendEntries(心跳)
    • 会定期向所有节点发送AppendEntries(心跳)

实现骨架

可以看到三种状态都是会有主动做一些事的(包括Follower,他会主动转换状态),所以我们可以创建一个函数,在里面无限循环加sleep来实现定期任务

// run the raft server
func (rf *Raft) run() {
	... // 定义一些变量,主要是用于统计选举结果的
	for !rf.killed() {
		rf.mu.Lock() // 没有考虑性能,所以锁的范围得比较大,可以多用原子变量来解决 我这个设计不好
		switch rf.state {
		case Follower:
			...
		case Leader:
			...
		case Candidate:
                        ...
		}
		rf.mu.Unlock()
		time.Sleep(time.Millisecond * 100)
	}
}
复制代码

然后在Raft.Make中初始化服务器后,启动一个新线程运行run()

go rf.run()
复制代码

遇到的难点

Candidate收集投票的过程

Candidate在选举中只要能拿到超过1/2的票数就可以视为成功。因此,没有必要等待全部RequestVote返回结果再判断,而是采用异步模型,每次循环中都判断当前拿到的票数是否已经超过1/2,如果满足要求,则立刻转换为Leader,在这之后返回的结果可以忽视。

我的实现中,发起投票的代码如下,pVotesReceived是一个int64指针,用于记录本次选举收到的投票,这个变量会在每轮选举开始前被初始化。这里用go func(server int){...}发起了多个线程去请求投票,返回结果时,会给pVotesReceived的值加一。

args := &RequestVoteArgs{Term: rf.currentTerm, CandidateId: rf.me}
var v atomic.Value
v.Store(args)
for i := 0; i < len(rf.peers); i++ {
	if i != rf.me {
		go func(server int) {
			reply := RequestVoteReply{}
			rf.mu.Lock()
			log.Printf("[Term: %v | %v | $%v]:	send RequestVote { Term: %v } to $%v.", rf.currentTerm, rf.state, rf.me, args.Term, server)
			rf.mu.Unlock()
			ok := rf.sendRequestVote(server, v.Load().(*RequestVoteArgs), &reply)
			rf.mu.Lock()
			// 这里要比较Term,确保返回的结果是本次选举的
			if ok && reply.VoteGranted && reply.Term == rf.currentTerm {
				atomic.AddInt64(pVotesReceived, 1)
				log.Printf("[Term: %v | %v | $%v]:	receive vote from $%v. Votes Received: %v.", rf.currentTerm, rf.state, rf.me, server, atomic.LoadInt64(pVotesReceived))
			} else {
				log.Printf("[Term: %v | %v | $%v]:	cannot receive vote from $%v.[network: %v] Votes Received: %v.", rf.currentTerm, rf.state, rf.me, server, ok, atomic.LoadInt64(pVotesReceived))
			}
			rf.mu.Unlock()
		}(i)
	}
}
复制代码

同时,在Candidate的循环中,还会定期检查是否攒够票数,一旦满足条件立刻进行转换。

// If votes received from majority of servers: become leader
if int(atomic.LoadInt64(pVotesReceived)) > len(rf.peers)/2 {
	rf.state = Leader
	rf.leader = rf.me
	log.Printf("[Term: %v | %v | $%v]:	I am a Leader!!!", rf.currentTerm, rf.state, rf.me)
} else if electionStart.Add(rf.electionTimeout).Before(time.Now()) {
	// 如果经过1个electionTimeout还没有成为Leader,则重新发起一次选举
	rf.convertToCandidate(rf.currentTerm + 1)
	rf.electionTimeout = time.Duration(250+30*rand.Intn(10)) * time.Millisecond // 重新设置间隔250ms~500ms
	atomic.StoreInt64(pVotesReceived, 0)
	log.Printf("[Term: %v | %v | $%v]:	restart a election", rf.currentTerm, rf.state, rf.me)
}
复制代码

你可能会有疑问,如果转换为Leader后,前面的RequestVote才返回,修改了pVoteReceived的值怎么办呢?这一点不用担心,因为只有在Candidate状态下pVoteReceived的值才有意义,其他状态不会使用这个值,因此即使被修改也没关系;而且当我们每次从其他状态进入Candidate时都会重新初始化pVoteReceived,注意此时currentTerm也必然比之前的RequestVote中的Term高,所以只要像代码中那样限定条件

// 这里要比较Term,确保返回的结果是本次选举的
if ok && reply.VoteGranted && reply.Term == rf.currentTerm {
	atomic.AddInt64(pVotesReceived, 1)
	log.Printf("[Term: %v | %v | $%v]:	receive vote from $%v. Votes Received: %v.", rf.currentTerm, rf.state, rf.me, server, atomic.LoadInt64(pVotesReceived))
} else {
	log.Printf("[Term: %v | %v | $%v]:	cannot receive vote from $%v.[network: %v] Votes Received: %v.", rf.currentTerm, rf.state, rf.me, server, ok, atomic.LoadInt64(pVotesReceived))
}
复制代码

就可以保证新一轮选举中的pVoteReceived不会被之前选举的结果修改。

Split Votes

如果经常出现节点同时发起选举的情况,要注意代码中的electionTimeout随机值设置是否合理,尽量让各个节点electionTimeout的值相差大一点。还要注意当Candidate选举超时,重新发起选举的时间是否也是随机值。这两个地方设置合理应该就可以解决这个问题。

总结

Raft论文的介绍非常详细,只要照做就好了。多线程下调试时,多用printf输出状态,如果程序没反应多找找各种临界区看看是不是死锁了,总得来说这个实验难度不高,感觉比LAB-1 MapReduce简单些。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享