1 定义
1.1 基本概念
互斥锁(英语:Mutual exclusion,缩写 Mutex)是一种用于多线程编程中,防止两条线程同时对同一公共资源(比如全局变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区域(critical section)达成。临界区域指的是一块对公共资源进行访问的代码,并非一种机制或是算法。一个程序、进程、线程可以拥有多个临界区域,但是并不一定会应用互斥锁。
以上是维基百科对mutex的定义,golang的sync.mutex就是一种互斥锁的实现。
1.2 golang sync.mutex
效率优先 兼顾公平
golang中的mutex是一把公平的锁,在源码里可以看到如下的解释:
// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.
复制代码而这段解释是整个mutex的核心(划重点,面试必考?),整个mutex的大体实现思路就是这样。翻译一下,大致如下
// 互斥锁的公平性。
//
// 互斥锁有两种运行模式:正常模式和饥饿模式。
// 在正常模式下,请求锁的goroutine会按 FIFO(先入先出)的顺序排队,依次被唤醒,但被唤醒的goroutine并不能直接获取锁,而是要与新请求锁的goroutines去争夺锁的所有权。
// 但是这其实是不公平的,因为新请求锁的goroutines有一个优势:他们正在cpu上运行且数量可能比较多,所以新唤醒的goroutine在这种情况下很难获取锁。在这种情况下,如果这个goroutine获取失败,会直接插入到队列的头部。
// 如果一个等待的goroutine超过1ms时仍未获取到锁,会将这把锁转换为饥饿模式。
//
// 在饥饿模式下,互斥锁的所有权会直接从解锁的goroutine转移到队首的goroutine。
// 并且新到达的goroutines不会尝试获取锁,会直接插入到队列的尾部。
//
// 如果一个等待的goroutine获得了锁的所有权,并且满足以下两个条件之一:
// (1) 它是队列中的最后一个goroutine
// (2) 它等待的时间少于 1 毫秒(hard code在代码里)
// 它会将互斥锁切回正常模式。
//
// 普通模式具有更好的性能,因为即使有很多阻塞的等待锁的goroutine,一个goroutine也可以尝试请求多次锁。
// 而饥饿模式则可以避免尾部延迟这种bad case。 
复制代码2 内存模型
实例中的代码为golang 1.16.3,传送门
2.1 数据结构
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
    state int32
    sema  uint32
}
复制代码sync.mutex的数据结构非常简单,只有两个字段,
- state字段就是这把mutex的状态,二进制低3位对应锁的状态,将state右移3位代表mutex的数量。
- sema(信号量)用来唤醒goroutine。
以上的两个注释非常重要
- 零值保证,sync.mutex符合golang里的0值语义,即sync.mutex的0值就是一把开启状态的互斥锁。
- mutex 数据结构禁止拷贝,拷贝mutex的结果是未定义的。
关键常量
const (
    mutexLocked = 1 << iota // mutex是一个加锁状态,对应0b001
    mutexWoken //是否有goroutine被唤醒or新来的goroutine,在尝试获取锁,  对应0b010
    mutexStarving //当前锁处于饥饿状态,             对应0b10
    mutexWaiterShift = iota //waiter的数量移位,通过mutex.state>> mutexWaiterShift可以得到waiter的数量。
    
    starvationThresholdNs = 1e6//(这里就是上文中提到的转换成饥饿模式的时间限制,在源码里写死为1e6 ns,也就是1ms)
)
复制代码可以看到源码中用到了iota去初始化各个常量(ps:这里吐槽一下,iota这玩意出现在语言的设计里挺离谱的,尤其是在const里各种iota搭配起来用,对读代码的人,确实是一项挑战)

2.2 接口
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
    Lock()
    Unlock()
}
复制代码sync.Mutex就实现了上述的这个接口(sync.Locker),值得一提的是后续会讲到的sync.RWMutex也实现了这个接口。
同时sync.Mutex又有自己的包内私有(小写开头)的函数,在下文会详细讲到。
func (m *Mutex) lockSlow() {}
func (m *Mutex) unlockSlow(new int32) {}
复制代码而mutex还有一些runtime的调用,这些函数的具体实现在runtime里,分别是如下几个函数。
//确定当前goroutine是否可以自旋(因为自旋是非常消耗cpu的,所以对自旋操作也有一定的次数限制)
func runtime_canSpin(i int) bool
//执行自旋操作
func runtime_doSpin()
//获取当前毫秒时间戳
func runtime_nanotime() int64
//信号量的实现,对应信号量的P原语操作,s的值代表信号量,包含一个fifo等待队列,如果lifo为true,则放入队首。skipframes只有在开启tracing时有效。
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
//信号量的实现,对应信号量的V原语操作,如果handoff为true,则会将锁直接转移给队首goroutine
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
//上述自旋锁、信号量、以及后文中会提到的原子操作,目前只需要知道函数的作用即可,后边我们还会有单独的章节去介绍
复制代码3 源码细节剖析
3.1 lock
先抢再排
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
   // fast path:这里使用了atomic的case操作,如果state的值为0(当前锁处于开启状态且无等待者),则可以直接加锁成功
   if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
      if race.Enabled {
         race.Acquire(unsafe.Pointer(m)) // load happens-before保证
      }
      return
   }
   // Slow path:需要执行各种操作,而mutex加锁的核心也在这里
   m.lockSlow()
}
复制代码func (m *Mutex) lockSlow() {
   //当前goroutine的等待时间
   var waitStartTime int64
   //当前goroutine是否饥饿
   starving := false
   //当前goroutine是否已被唤醒
   awoke := false
   //当前goroutine的自旋次数 
   iter := 0
   //当前锁的状态
   old := m.state
   for {
      // 如果锁处于饥饿状态下,就不需要自旋了,锁可以直接转移到队首的goroutine
      // 这里的if的意思是,如果锁的状态是锁定 && 
      // 处于非饥饿模式 && 
      // 当前goroutine可以进行自旋操作。
      // 则当前goroutine进行自旋操作
      if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
         //如果
         //  当前goroutine的状态为非唤醒 &&
         //  mutex的状态为非唤醒 && 
         //  waiter数不等于0 
         //则将自己的状态设置为唤醒,且将锁的状态也设置为唤醒,这样在解锁时就不需要去唤醒那些阻塞的goroutine
         if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
            atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            awoke = true
         }
         runtime_doSpin()
         iter++
         old = m.state
         continue
      }
      // 三种情况会走到这里:
      // 1. mutex处于非锁定状态
      // 2. mutex处于饥饿模式
      // 3. 当前goroutine已经不能自旋(iter有次数限制)
      new := old
      //开始设置期望状态,new在这里为期望设置的新状态(取锁),最后会通过cas操作来保证只有一个goroutine可以操作state
      
      //如果锁不是饥饿状态,则将期望的状态改为加锁,因为如果是饥饿状态,不应该尝试加锁,需要直接进入队列
      if old&mutexStarving == 0 {
         new |= mutexLocked
 }
      //如果锁的状态已经是锁定状态或者锁的状态是饥饿模式,则当前goroutine需要进入排队,waiter数量+1
      if old&(mutexLocked|mutexStarving) != 0 {
         new += 1 << mutexWaiterShift
 }
      // 当前goroutine已经处于饥饿状态,并且mutex是锁定状态,则期望将mutex转变为饥饿状态。
      // 但是如果锁是空闲态,也就不需要切换成饥饿模式了。Unlock 操作期望饥饿状态的mutex有waiter,但在这个case中不是这样的。
      // The current goroutine switches mutex to starvation mode.
     // But if the mutex is currently unlocked, don't do the switch.
     // Unlock expects that starving mutex has waiters, which will not be true in this case.
      if starving && old&mutexLocked != 0 {
         new |= mutexStarving
 }
      
      //如果goroutine的状态为唤醒,则需要将该状态重置,因为当前goroutine的状态一定会转变为sleep或已获取锁。
      if awoke {
         // The goroutine has been woken from sleep,
         // so we need to reset the flag in either case.
         if new&mutexWoken == 0 {
            throw("sync: inconsistent mutex state")
         }
         new &^= mutexWoken //按位置0
 }
      //开始cas,如果cas失败则需要重新开始循环
      if atomic.CompareAndSwapInt32(&m.state, old, new) {
         if old&(mutexLocked|mutexStarving) == 0 {
            break // locked the mutex with CAS
         }
         //如果wait的时间不为0则代表当前goroutine已经被唤醒过了,需要将当前goroutine放入等待队列的队首
         queueLifo := waitStartTime != 0
         
         //设置初始时间戳
         if waitStartTime == 0 {
            waitStartTime = runtime_nanotime()
         }
         //在这里goroutine进入sleep,使用信号量来获取锁
         runtime_SemacquireMutex(&m.sema, queueLifo, 1)
         
         //接下来sleep完了,当前goroutine被唤醒
         
         //如果当前goroutine的等待时间超过了1ms,则将当前goroutine标记为饥饿状态
         starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
 old = m.state
         //如果当前锁处于饥饿模式,则锁的状态是解锁态,当前goroutine被唤醒的方式为直接从上一个gorountine转移mutex,否则将当前goroutine状态标记为唤醒态,去与其他未进入队列的goroutine去竞争锁
         if old&mutexStarving != 0 {
            // If this goroutine was woken and mutex is in starvation mode,
            // ownership was handed off to us but mutex is in somewhat
            // inconsistent state: mutexLocked is not set and we are still
            // accounted as waiter. Fix that.
            if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
               throw("sync: inconsistent mutex state")
            }
            //当前routine已获取锁,将waiter数-1,状态更新为锁定
            delta := int32(mutexLocked - 1<<mutexWaiterShift)
            // mutexLocked + (-1<<mutexWaiterShift)
            //如果当前goroutine的等待时间小于1ms(非饥饿态)或者当前goroutine是队列中最后一个goroutine,则需要将锁的模式切换为常规模式
            if !starving || old>>mutexWaiterShift == 1 {
               delta -= mutexStarving
               // mutexLocked - mutexStarving + (-1<<mutexWaiterShift)
 }
            // 加锁、减计数、如果不饥饿则减饥饿
            atomic.AddInt32(&m.state, delta)
            break
         }
         awoke = true
         iter = 0 //自旋计数清0
      } else {
         old = m.state
      }
   }
   if race.Enabled {
      race.Acquire(unsafe.Pointer(m))
   }
}
复制代码3.2 unlock
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
   if race.Enabled {
      _ = m.state
      race.Release(unsafe.Pointer(m))
   }
   // 和加锁时基本同理,在无waiter且锁的模式为正常模式,会尝试直接解锁。
   new := atomic.AddInt32(&m.state, -mutexLocked)
   if new != 0 {
      // Outlined slow path to allow inlining the fast path.
      // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
      m.unlockSlow(new)
   }
}
复制代码//new标识锁的新状态,原状态减掉1
func (m *Mutex) unlockSlow(new int32) {
   //尝试去释放非锁定状态的mutex会导致进程直接崩溃。
   //这里对status先原子减lock,再用加法去判也是为了避免并发UnLock的问题。
   if (new+mutexLocked)&mutexLocked == 0 {
      throw("sync: unlock of unlocked mutex")
   }
   //如果mutex处于正常模式
   if new&mutexStarving == 0 {
      old := new
      for {
         // If there are no waiters or a goroutine has already
         // been woken or grabbed the lock, no need to wake anyone.
         // In starvation mode ownership is directly handed off from unlocking
         // goroutine to the next waiter. We are not part of this chain,
         // since we did not observe mutexStarving when we unlocked the mutex above.
         // So get off the way.
         // 如果没有waiter在等待mutex
         // 或者mutex已被其他goroutine获取,
         // 或者有goroutine已处于唤醒状态(在尝试获取锁)
         // 或者锁处于饥饿模式(mutex的所有权会直接转移到下一个waiter,而当前goroutine是不在这个队列中的,因为释放锁时,锁的模式为正常模式)
         //在上述四种情况下,直接返回
         if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
            return
         }
         // Grab the right to wake someone.
         //走到这一步,说明锁的状态是空闲态,且没有锁被唤醒,且锁处于正常模式
         // 那么期望的状态就是waiter个数-1,且设置锁的状态为唤醒
         new = (old - 1<<mutexWaiterShift) | mutexWoken
 if atomic.CompareAndSwapInt32(&m.state, old, new) {
            //唤醒队首的waiter去尝试获取锁
            runtime_Semrelease(&m.sema, false, 1)
            return
         }
         old = m.state
      }
   } else {
   //如果mutex处于饥饿模式,则将mutex的所有权直接转移到等待队列的队首
   
      // Starving mode: handoff mutex ownership to the next waiter, and yield
      // our time slice so that the next waiter can start to run immediately.
      // Note: mutexLocked is not set, the waiter will set it after wakeup.
      // But mutex is still considered locked if mutexStarving is set,
      // so new coming goroutines won't acquire it.
      runtime_Semrelease(&m.sema, true, 1)
   }
}
复制代码4 常见坑点
- sync.mutex不可重入
- 例如如果连续调用两次lock,会触发死锁,goroutine直接panic。
package main
import "sync"
func main() {
   m := sync.Mutex{}
   m.Lock()
   m.Lock()
}
//fatal error: all goroutines are asleep - deadlock!
复制代码- 如果A获取A+B,B获取B+A,会触发死锁
var lockA = &sync.Mutex{}
var lockB = &sync.Mutex{}
func TestA() {
    lockA.Lock()
    defer lockA.Unlock()
    time.Sleep(1 * time.Second)
    lockB.Lock()
    defer lockB.Unlock()
}
func TestB() {
    lockB.Lock()
    defer lockB.Unlock()
    time.Sleep(1 * time.Second)
    lockA.Lock()
    defer lockA.Unlock()
}
func main() {
    go TestA()
    TestB()
}
//fatal error: all goroutines are asleep - deadlock!
复制代码- 死锁不是触发panic的充分条件
package main
import (
    "sync"
    "time"
)
var lockA = &sync.Mutex{}
var lockB = &sync.Mutex{}
func TestA() {
    lockA.Lock()
    defer lockA.Unlock()
    time.Sleep(1 * time.Second)
    lockB.Lock()
    defer lockB.Unlock()
}
func TestB() {
    lockB.Lock()
    defer lockB.Unlock()
    time.Sleep(1 * time.Second)
    lockA.Lock()
    defer lockA.Unlock()
}
func main() {
    go TestA()
    go TestB()
    time.Sleep(3 * time.Second)
}
//正常return
复制代码- sync.mutex,尝试去unlock一把空闲的mutex,会导致panic。
package main
import (
   "fmt"
   "sync"
   "time"
)
func main() {
   m := sync.Mutex{}
   m.UnLock()
}
// fatal error: sync: unlock of unlocked mutex
复制代码- sync.mutex不与goroutine绑定,可由a goroutine获取锁,b goroutine释放锁。
package main
import (
   "fmt"
   "sync"
   "time"
)
func main() {
   m := sync.Mutex{}
   m.Lock()
   fmt.Println("a lock")
   go func() {
      m.Unlock()
      fmt.Println("b unlock")
   }()
   time.Sleep(time.Second)
}
//a lock
//b unlock
复制代码- 不要复制sync.Mutex,mutex做函数参数时,传参时使用指针。
5 参考文档
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END
    



















![[桜井宁宁]COS和泉纱雾超可爱写真福利集-一一网](https://www.proyy.com/skycj/data/images/2020-12-13/4d3cf227a85d7e79f5d6b4efb6bde3e8.jpg)

![[桜井宁宁] 爆乳奶牛少女cos写真-一一网](https://www.proyy.com/skycj/data/images/2020-12-13/d40483e126fcf567894e89c65eaca655.jpg)
