Mutex互斥锁用于实现协程对互斥资源的访问。本文将Mutex源码拆解为三个部分,分别从“互斥”、“增加运行中的协程获取锁的机会”、“解决饥饿问题”三个方面进行Mutex原理的讲解。
互斥锁
Mutex互斥锁用于实现协程对互斥资源的访问,包含Lock()
和Unlock()
两个方法。其中,Lock()
用于争夺锁资源,同一时刻会有多个协程执行Lock()
争夺锁资源;Unlock()
用于释放锁资源并唤醒等待队列中的协程。在正常的使用情况下(如代码所示),只有进入临界区的协程会执行Unlock()
。因此,同一时刻只有一个协程可以执行Unlock()
方法。
var lock sync.Mutex
func Hello1(){
lock.Lock()
fmt.Println("Hello")
lock.Unlock()
}
func Hello2(){
lock.Lock()
defer lock.Unlock()
fmt.Println("Hello")
}
复制代码
等待队列
当协程调用Lock()
方法争夺锁资源失败时,该协程会进入等待队列并切换到阻塞状态;当持有锁资源的协程调用Unlock()
方法释放锁资源的时候,会从等待队列中唤醒锁对应的协程。Mutex互斥锁的等待队列是通过平衡树实现,其源码位置在$GOROOT/src/runtime/sema.go
。等待队列采用信号量sema
区分不同的互斥锁,支持FIFO和LIFO两种进入队列的形式
// 进入等待队列并将阻塞当前协程,lifo=true放到队首,lifo=false放到队尾
func runtime_SemacquireMutex(sema *uint32, lifo bool, skipframes int)
// 从等待队列中唤醒一个协程
func runtime_Semrelease(sema *uint32, handoff bool, skipframes int)
复制代码
源码解析
Mutex互斥锁在解决互斥问题的基础上,添加了“增加运行中的协程获取锁的机会”和“解决饥饿问题”的特性。接下来,本文将对Mutex进行分解,分别从“互斥”、“增加运行中的协程获取锁的机会”、“解决饥饿问题”三个方面进行介绍。
Mutex结构体
Mutex互斥锁有state
状态、sema
信号量两个字段。其中,state
字段表示锁的状态;sema
信号量可以理解为互斥锁的唯一标识,用于将协程放入等待队列和从等待队列中唤醒协程。
type Mutex struct {
state int32 // 状态变量,表示锁的状态(等待协程数、饥饿状态、是否存在运行中的协程、是否被持有)
sema uint32 // 信号量,用于从等待队列中唤醒协程
}
const (
// 锁标识位(state的最后一位)
// Mutex.state&mutexLocked==1表示已经上锁;Mutex.state&mutexLocked==0表示已经未锁
mutexLocked = 1 << iota
// 是否存在运行中的协程(state的倒数第二位)
// Mutex.state&mutexWoken==1表示存在运行中的协程;Mutex.state&mutexWoken==0表示不存在运行中的协程
mutexWoken
// 饥饿状态位(state的倒数第三位)
// Mutex.state&mutexStarving==1表示饥饿;Mutex.state&mutexStarving==0表示不饥饿
mutexStarving
// 等待者数量偏移量(值为3)
// Mutex.state右移3位表示等待队列中阻塞协程的数量,即有多少协程由于该锁阻塞
mutexWaiterShift = iota
)
复制代码
互斥
锁的互斥通过mutexLocked
标志位实现。当mutexLocked
标志位为1时,表示锁已经被持有,其他协程需要进入等待队列;当mutexLocked
为0时,表示锁没有被持有,协程可通过CAS操作将mutexLocked
标志位设置为1来夺取锁资源。
Lock()
方法的流程如下:
- 读取锁的状态;
- 通过CAS操作对锁的状态进行更新;CAS操作失败返回步骤1,成功进入步骤3;
- 判断当前协程是否成功持有锁资源;
- 成功 -> 退出
Lock()
方法,执行临界区的代码; - 失败 -> 进入等待队列,等待被持有锁资源的协程唤醒;唤醒后重新执行步骤1;
- 成功 -> 退出
// Lock 争夺锁资源
func (m *Mutex) Lock() {
// 幸运case:当前锁没有被任何协程持有,并且上锁成功,直接返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// 读取锁的状态
old := m.state
for {
new := old
// 将锁的新状态设为被持有
new |= mutexLocked
// 如果锁已经被持有,将等待协程数+1
if old&mutexLocked != 0 {
new += 1 << mutexWaiterShift
}
// CAS操作
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 锁的原始状态为没有被持有,则当前协程争夺锁资源成功,退出for循环
if old&(mutexLocked) == 0 {
break
}
// 锁已经被其他协程持有,当前协程进入等待队列
runtime_SemacquireMutex(&m.sema, false, 1)
// 从等待队列中唤醒的协程,重新读取锁的status
old := m.state
} else {
// CAS失败,重新竞争锁
old = m.state
}
}
}
复制代码
Unlock()
方法的操作如下:
- 通过
atomic.AddInt32(&m.state, -mutexLocked)
原子操作将锁的状态切换到未被持有状态; - 判断是否需要唤醒等待队列中的协程。当锁资源已经被持有或者已经没有等待者的时候,不会从等待队列中唤醒协程;其他情况下,会通过CAS操作从等待队列中唤醒协程。
// Unlock 释放锁资源
func (m *Mutex) Unlock() {
//将锁置为未被持有状态
new := atomic.AddInt32(&m.state, -mutexLocked)
// 幸运case:没有协程等待当前锁资源, 直接返回
if new == 0 {
return
}
// 存在协程等待当前锁资源,需要唤醒其中一个协程
old := new
for {
// 锁资源已经没有等待者 -> 无需释放等待协程
// 锁资源已经上锁 -> 不用再释放等待协程 抢夺已经被持有的锁资源
if old>>mutexWaiterShift == 0 || old&(mutexLocked) != 0 {
return
}
// 将等待当前锁资源的协程数据-1
new = (old - 1<<mutexWaiterShift)
if atomic.CompareAndSwapInt32(&m.state, old, new) { // CAS操作
// 唤醒一个等待当前锁资源的协程
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
}
复制代码
增加运行中的协程获取锁的机会
从等待队列中唤醒协程需要上下文切换的成本,为了减少上下文的切换,Mutex互斥锁在设计上增加了运行中的协程获取锁的机会。通过mutexWoken
标志位实现,mutexWoken
标志位为1,表示当前存在运行中的协程争夺锁资源,这时Unlock()
方法就不会再从等待队列中唤醒协程来争夺锁。
Lock()
方法的流程如下:
- 读取锁的状态;
- 通过自旋操作更新
mutexWoken
标志位(告诉Unlock()
方法已经存在运行中的协程竞争锁,不要再唤醒协程); - 通过CAS操作对锁的状态进行更新;CAS操作失败返回步骤1,成功进入步骤4;
- 判断当前协程是否成功持有锁资源;
- 成功 -> 退出
Lock()
方法,执行临界区的代码; - 失败 -> 进入等待队列,等待被持有锁资源的协程唤醒;唤醒后重新执行步骤1;
- 成功 -> 退出
func (m *Mutex) Lock() {
// 幸运case:当前锁没有被任何协程持有,并且上锁成功,直接返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// awoke变量表示当前协程是否设置了status中的mutexWoken标志位
awoke := false
// 迭代次数,用于判断是否可以进行自旋操作
iter := 0
old := m.state
for {
// 当mutex被其他协程持有,通过自旋的方式更新status的mutexWoken标志位
// 设置mutexWoken标志位后,unlock方法不会再唤醒阻塞中的协程,减少上下文切换的成本
if old&mutexLocked == mutexLocked && runtime_canSpin(iter) {
// mutexWoken标志位没有被设置,并且锁的等待协程数不为0时;采用CAS操作设置mutexWoken标志位;
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
// mutexWoken设置成功后,将awoke设置为true,表示mutexWoken表示位是当前协程设置的
awoke = true
}
runtime_doSpin()
// 自旋的迭代次数+1
iter++
old = m.state
continue
}
new := old
// 当前锁资源已经被其他协程持有,锁资源协程等待数+1
if old&(mutexLocked) != 0 {
new += 1 << mutexWaiterShift
}
// 当前协程设置了mutexWoken标志位,则清除mutexWoken标志位
if awoke {
new &^= mutexWoken
}
// CAS操作获取锁
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 锁的原始状态为没有被持有,则当前协程争夺锁资源成功,退出for循环
if old&mutexLocked == 0 {
break
}
// 当前协程被阻塞,进入等待队列
runtime_SemacquireMutex(&m.sema, false, 1)
// 从等待队列中唤醒的协程,重新读取锁的status
old = m.state
// 在unlock操作中设置了mutexWoken标志位,等价于被唤醒的协程设置了mutexWoken标志位,将awoke设置为true
awoke = true
// 将被唤醒的协程,自旋迭代次数清零
iter = 0
} else {
old = m.state
}
}
}
复制代码
Unlock()
方法的操作如下:
- 通过
atomic.AddInt32(&m.state, -mutexLocked)
原子操作将锁的状态切换到未被持有状态; - 判断是否需要唤醒等待队列中的协程。当“锁资源已经被持有”或“已经没有等待者”或“已经有运行中的协程抢夺锁资源”的时候,不会从等待队列中唤醒协程;其他情况下,会通过CAS操作从等待队列中唤醒协程。
func (m *Mutex) Unlock() {
//将锁置为未被持有状态
new := atomic.AddInt32(&m.state, -mutexLocked)
// 幸运case:没有协程等待当前锁资源, 直接返回
if new == 0 {
return
}
old := new
for {
// 锁资源已经没有等待者 -> 无需释放等待协程
// 锁资源已经上锁 -> 不用再释放等待协程 抢夺已经被持有的锁资源
// 已经有运行中的协程抢夺锁资源 -> 无需释放等待协程,减少上下文切换的成本
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
// 唤醒等待队列中的协程,同时将等待协程数-1,并将锁标记为“已经有运行中的协程抢夺锁资源”
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
}
复制代码
饥饿问题
每次都给运行中的协程获取锁的机会,会导致已经进入等待队列的协程一直无法获取锁资源,出现饥饿问题。对此,Mutex互斥锁会基于协程的等待时间判断是否需要进入饥饿状态;进入饥饿状态后,新协程不再竞争锁,直接进入等待队列的尾部,增加旧协程获取锁资源的机会;当等待队列首部处于饥饿状态的协程都成功获取锁资源后,锁会退出饥饿状态。
Lock()
方法的流程如下:
- 读取锁的状态;
- 判断锁是否处于饥饿状态
mutexStarving
:- 不是饥饿状态:通过自旋操作更新
mutexWoken
标志位 - 饥饿状态:不更新
mutexWoken
标志位;将mutexWaiterShift
等待者数量+1,不竞争锁资源;
- 不是饥饿状态:通过自旋操作更新
- 通过CAS操作对锁的状态进行更新;CAS操作失败返回步骤1,成功进入步骤4;
- 判断当前协程是否成功持有锁资源;
- 成功 -> 退出
Lock()
方法,执行临界区的代码; - 失败 -> 判断协程是否是第一次竞争锁资源:
- 第一次:记录协程的第一次竞争锁的时间
waitStartTime
;进入等待队列的尾部; - 非第一次:进入等待队列的尾部
- 第一次:记录协程的第一次竞争锁的时间
- 当协程从等待队列被唤醒的时候,会判断是否需要切换到饥饿状态;如果锁已经处于饥饿状态,被唤醒的协程不再通过CAS操作竞争锁,直接采用atomic操作获取锁(因为同一时间段只存在一个被唤醒的协程,所以这个操作是安全的。)
- 成功 -> 退出
func (m *Mutex) Lock() {
// 幸运case:当前锁没有被任何协程持有,并且上锁成功,直接返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// waitStartTime 记录协程第一次竞争锁的时间
var waitStartTime int64
// starving 饥饿模式标志
starving := false
awoke := false
iter := 0
old := m.state
for {
// 锁处于饥饿状态,不在自旋的设置锁的唤醒标志mutexWoken
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// ...
}
new := old
// 当锁处于被持有状态或者处于饥饿状态,当前协程不获取锁,直接进入等待状态
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 设置饥饿状态
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
new &^= mutexWoken
}
// CAS操作
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// mutexLocked:锁的原始状态处于未被持有状态 -> 获取锁成功break
// mutexStarving:锁的原始状态处于饥饿状态 -> 获取锁失败,当前协程进入等待队列
if old&(mutexLocked|mutexStarving) == 0 {
break
}
// waitStartTime为0,表示协程是第一次竞争锁资源,需要记录协程进入等待的时间waitStartTime
// waitStartTime不为0,表示协程是进入等待队列之后,被唤醒再次争夺锁资源,将queueLifo设为true,进入等待队列的队首位置(LIFO,后进先出)
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 进入等待队列, 协程进入阻塞状态
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 协程被唤醒后,基于等待时间判断协程是否处于饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 锁处于饥饿状态,被唤醒的协程直接获取锁资源
if old&mutexStarving != 0 {
// mutexLocked: 上锁
// (- 1<<mutexWaiterShift): 因为处于饥饿状态的锁直接唤醒协程,unlock操作的时候没有进行status的更新,这里需要对waiter数量进行更新操作
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// !starving: 等待队列中前面都是饥饿状态的协程(因为设置了queueLifo),当被唤醒的协程不是饥饿的,表示其后面都不是饥饿的,需要清除饥饿标志
// old>>mutexWaiterShift == 1: 当前协程是等待队列中最后一个协程时,需要清除饥饿标志;其原因是,处于饥饿状态时,运行中的协程直接进入等待队列,只有被唤醒的协程能获取锁;当等待队列为空时,没有及时清除饥饿标识,会导致后续的协程直接进入等待队列,同时也没有协程执行unlock操作,所有协程都进入阻塞状态。
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}
复制代码
Unlock()
方法的操作如下:
- 通过
atomic.AddInt32(&m.state, -mutexLocked)
原子操作将锁的状态切换到未被持有状态; - 判断是否处于饥饿状态:
- 不是饥饿状态:判断是否需要唤醒等待队列中的协程。当“锁资源已经被持有”或“已经没有等待者”或“已经有运行中的协程抢夺锁资源”或“处于饥饿状态”的时候,不会从等待队列中唤醒协程;其他情况下,会通过CAS操作从等待队列中唤醒协程。
- 饥饿状态:直接从等待队列首部唤醒协程。
func (m *Mutex) Unlock() {
//将锁置为未被持有状态
new := atomic.AddInt32(&m.state, -mutexLocked)
// 幸运case:没有协程等待当前锁资源, 直接返回
if new == 0 {
return
}
if new&mutexStarving == 0 {
// 非饥饿模式
// unlock操作唤醒的协程需要与正在运行的协程进行锁竞争,所以这里使用CAS操作进行锁状态的判断
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 锁处于饥饿状态,直接唤醒等待队列中的第一个协程
// 饥饿状态下,非第一次竞争锁的协程在等待队列的首部,第一次竞争锁的协程在等待队列的尾部
runtime_Semrelease(&m.sema, true, 1)
}
}
复制代码
最后
Mutex源码位置在$GOROOT/src/sync/mutex.go
。为了更好的讲解Mutex的原理,本文代码对Mutex源码有所删减。