一、前情提要
上一篇我们介绍了Mutex锁资源的模式及源码分析。mutex处于正常状态时,申请锁时与自旋状态的goroutine抢占锁失败时,会将对应的goroutine放在等待队列的头或者尾(若是唤醒的goroutine则是队头,若是新创建的goroutine则是队尾)等待唤醒;mutex处于饥饿状态时,申请锁时若没有锁资源则直接放在队尾。
// 加锁
func (m *Mutex) lockSlow() {
....
// 竞争失败,使用runtime_SemacquireMutex信号量,保证不会有2个goutine获取
// 既然未能获取到锁, 那么就使用sleep原语阻塞本goroutine
// 如果是新来的goroutine,queueLifo=false, 加入到等待队列的尾部,耐心等待
// 如果是唤醒的goroutine, queueLifo=true, 加入到等待队列的头部
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
...
}
复制代码
二、底层源码理解
相当于操作系统中P一个锁一个资源
// SemacquireMutex is like Semacquire, but for profiling contended Mutexes.
// If lifo is true, queue waiter at the head of wait queue.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_SemacquireMutex's caller.
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
复制代码
底层调用semacquire1,也是主要的处理接口。该接口的功能是:获取 sudog 和 semaRoot ,其中 sudog 是 g 放在等待队列里的包装对象,sudog 里会有 g 的信息和一些其他的参数, semaRoot 则是队列结构体,内部是二插平衡树,把和当前 g 关联的 sudog 放到 semaRoot 里,然后把 g 的状态改为等待,调用调度器执行别的 g,此时当前 g 就停止执行了。一直到被调度器重新调度执行,会首先释放 sudog 然后再去执行别的代码逻辑
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}
复制代码
1、sudog
sudog运行时用来存放处于阻塞状态的 goroutine 的一个上层抽象,是用来实现用户态信号量的主要机制之一
例如当一个 goroutine 因为等待 channel 的数据需要进行阻塞时,sudog 会将 goroutine 及其用于等待数据的位置进行记录。阻塞的sudog组成一个树堆
type sudog struct {
// 当前goroutine
g *g
// 表示g正在参与一个select
isSelect bool
// 下一个goroutine
next *sudog
// 上一个goroutine
prev *sudog
// 数据元素
elem unsafe.Pointer // data element (may point to stack)
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32 // 随机优先级概率
parent *sudog // semaRoot 二叉树
waitlink *sudog // g.waiting 列表 or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
复制代码
2、semaRoot
一个 semaRoot 持有不同地址的 sudog 的树堆。每一个 sudog 可能反过来指向等待在同一个地址上的 sudog 的列表
type semaRoot struct {
lock mutex
treap *sudog // 平衡树的root
nwait uint32 // Number of waiters. Read w/o the lock. waiter 的数量
}
复制代码
3、Treap
Treap是一种平衡树,它较普通二叉查找树而言,每个节点被赋予了一个新的属性:优先级(没错就是类似优先队列的优先),对于Treap中的每个结点,除了它的权值满足二叉查找树的性质外,它的优先级还满足堆性质,也就是结点的优先级小于它所有孩子的优先级,也就是一个最小堆
所以从权值上看,Treap是一个二叉查找树;从优先级上看,Treap是一个堆。所以Treap=Tree+Heap
我们发现普通BST会不平衡是因为有序的数据会使查找路径退化成链,而随机数据使其退化的概率非常小。因此我们在Treap中赋予的这个优先级的值采用随机生成的办法,这样Treap的结构就趋于平衡了
如下图:黑色字体为优先级,旋转之后与原来的平衡树是一致的(3,7,9,11,16)。但是按照优先级进行旋转就会变成最小堆
在sudog中ticket作为优先级的字段,按照ticket进行旋转。
s.ticket = fastrand() | 1 // ticket值随机生成
复制代码
4、semacquire1具体流程
- 获取一个sudog,并添加goroutine的基本信息
- 获取一个semroot,也就是Treap的根。待将sudog插入该树中,后经过旋转达到新的平衡。
- 将当前goroutine设置为等待状态,与当前m解绑,让m执行其他goroutine
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}
// Easy case.
// 抢占到信号量直接返回 CAS抢占
if cansemacquire(addr) {
return
}
// 没有抢到信号量
// Harder case:
// increment waiter count
// try cansemacquire one more time, return if succeeded
// enqueue itself as a waiter
// sleep
// (waiter descriptor is dequeued by signaler)
// 获取一个sudog
s := acquireSudog()
// 获取一个semaRoot
root := semroot(addr)
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
// 阻塞
for {
// 加锁
lock(&root.lock)
// nwait+1
atomic.Xadd(&root.nwait, 1)
// Check cansemacquire to avoid missed wakeup.
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
// Any semrelease after the cansemacquire knows we're waiting
// (we set nwait above), so go to sleep.
// 加到 semaRoot treap 上
root.queue(addr, s, lifo)
// 解锁,并且把当前 g 的状态改为等待,然后让当前的 m 调用其他的 g 执行,当前 g 相当于等待
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3+skipframes)
}
releaseSudog(s)
}
复制代码
接下来看其中几个接口。
5、如何获得一个sudog
这里涉及到一个全局缓存池和 per-P 的缓存池。分配的时候per-P缓存池优先优先分配,当使用完毕后又再次归还给缓存池。 其遵循策略:
- 优先从 per-P 缓存中获取,如果 per-P 缓存为空,则从全局池抓取一半;
- 优先归还到 per-P 缓存,如果 per-P 缓存已满,则将 per-P 缓存的一半归还到全局池。
func acquireSudog() *sudog {
mp := acquirem() // 获取当前g对应的m
pp := mp.p.ptr() // 获取当前g对应的p
if len(pp.sudogcache) == 0 { // 检查 per-P sudogcache 池是否存在可复用的 sudog
lock(&sched.sudoglock)
// per-P没有,则从全局池取到当前per-P的一半
for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {
s := sched.sudogcache // 全局池
sched.sudogcache = s.next
s.next = nil
pp.sudogcache = append(pp.sudogcache, s) // 添加sudog
}
unlock(&sched.sudoglock)
// 这个时候实际是全局池为空,则新生成一个
if len(pp.sudogcache) == 0 {
pp.sudogcache = append(pp.sudogcache, new(sudog))
}
}
n := len(pp.sudogcache)
s := pp.sudogcache[n-1] // 取出一个
pp.sudogcache[n-1] = nil
pp.sudogcache = pp.sudogcache[:n-1]
if s.elem != nil {
throw("acquireSudog: found s.elem != nil in cache")
}
releasem(mp) // m.lock--
return s
}
复制代码
6、在semaRoot添加sudog节点
func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
s.g = getg() // 一个sudog指向当前goroutine
s.elem = unsafe.Pointer(addr)
s.next = nil
s.prev = nil
var last *sudog
pt := &root.treap
for t := *pt; t != nil; t = *pt {
if t.elem == unsafe.Pointer(addr) {
// lifo是true,放在队头
if lifo {
// 用s将t替换掉
// Substitute s in t's place in treap.
*pt = s
s.ticket = t.ticket
s.acquiretime = t.acquiretime
s.parent = t.parent
s.prev = t.prev
s.next = t.next
if s.prev != nil {
s.prev.parent = s
}
if s.next != nil {
s.next.parent = s
}
// Add t first in s's wait list.
// 将t放入到s的等待链表的第一位
s.waitlink = t
s.waittail = t.waittail
if s.waittail == nil {
s.waittail = t
}
t.parent = nil
t.prev = nil
t.next = nil
t.waittail = nil
} else { // 放在队尾
// Add s to end of t's wait list.
if t.waittail == nil {
t.waitlink = s
} else {
t.waittail.waitlink = s
}
t.waittail = s
s.waitlink = nil
}
return
}
last = t
if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) {
pt = &t.prev
} else {
pt = &t.next
}
}
s.ticket = fastrand() | 1
s.parent = last
*pt = s
// 根据ticket随机值作为优先级进行旋转
for s.parent != nil && s.parent.ticket > s.ticket {
if s.parent.prev == s {
root.rotateRight(s.parent)
} else {
if s.parent.next != s {
panic("semaRoot queue")
}
root.rotateLeft(s.parent)
}
}
}
复制代码
至此,阻塞申请一个锁资源的流程大致就是如此。当前goroutine已于对应的M解绑,相当于放在一个缓存池里面,等待有资源释放。
三、总结
我们可以看到上面的一通操作相当于操作系统的sleep信号,但是我们并没有陷入系统调用,浪费CPU等资源。而是在用户态利用缓存池屏蔽内部调度器的存在,创建基于goroutine抽象的信号量。例如,当用户态代码使用互斥锁发生竞争时,能够让用户态代码依附的 goroutine 进行 sleep,将其进行集中存放,并在可用时候被 wakeup,并被重新调度。
这就是为什么golang性能可以这么好
下一篇:释放一个资源的流程