Pool用于存储不需要的对象,以供后续复用,来减轻GC垃圾收集器的压力。Pool是协程安全的,支持多个协程同时操作。Pool存储的临时对象,会被GC垃圾收集器自动收集;因此,Pool
不适合保存长连接等资源。
Pool对外提供三个可见接口New
函数变量、Get()
方法、Put()
方法。顾名思义,Get()
方法用于获取临时对象,Put()
方法用于向Pool
中存放临时对象。New
函数变量用于创建新的对象,当Get()
方法从空Pool
中获取对象时,会调用New
函数来创建新的临时对象。
type People struct {
Name string
Age int
}
// New指定对象创建方法(当Pool为空时,Get方法会调用New方法创建对象)
var PeoplePool = sync.Pool{New: func() interface{} {
return &People{
Name: "people",
Age: 18,
}
}}
func main() {
p1 := PeoplePool.Get().(*People)
fmt.Printf("address:%p, value:%+v\n", p1, p1)
PeoplePool.Put(p1)
p2 := PeoplePool.Get().(*People)
fmt.Printf("address:%p, value:%+v\n", p2, p2)
p3 := PeoplePool.Get().(*People)
fmt.Printf("address:%p, value:%+v\n", p3, p3)
}
/**
------ 执行结果 ------
address:0xc00008a020, value:&{Name:people Age:18} // 新建的People
address:0xc00008a020, value:&{Name:people Age:18} // Pool中缓存的People
address:0xc00008a080, value:&{Name:people Age:18} // 新建的People
*/
复制代码
应用场景
Pool
常用于保存不需要的对象,以减少新对象的申请,提升程序性能。
在下面的实例中,使用Pool
缓存不需要的Slice
切片;使用Pool
之后,程序在耗时、内存使用、内存分配次数上都有所提升。
/**
go test -bench=. -benchmem -benchtime=1s
方法名 循环次数 耗时 内存使用 内存分配次数
BenchmarkWithoutPool-8 7637380 151 ns/op 112 B/op 3 allocs/op
BenchmarkWithPool-8 22991926 49.8 ns/op 32 B/op 1 allocs/op
*/
// 每次都重新申请[]string
func BenchmarkWithoutPool(b *testing.B) {
for i := 0; i < b.N; i++ {
list := make([]string, 0)
for i := 0; i < 4; i++ {
list = append(list, "1")
}
}
}
// 优先使用Pool中的[]string,使用完后将[]string放入Pool
func BenchmarkWithPool(b *testing.B) {
var slicePool = sync.Pool{New: func() interface{} {
slice := make([]string, 0)
return &(slice)
}}
for i := 0; i < b.N; i++ {
list := *(slicePool.Get().(*[]string))
for i := 0; i < 4; i++ {
list = append(list, "1")
}
list = list[:0] // 重置切片的len
slicePool.Put(&list)
}
}
复制代码
使用Pool
缓存Slice
存在以下注意事项:
- 记得调用
slice[:0]
语句将len
长度变量置为0;不然,append
操作会在之前的位置进行插入操作。 - 随着不断往
Slice
中添加数据,Slice
的内存空间会不断增长,存在内存泄漏的风险。对此,在go内置的fmt
包中,fmt/print.go
限制了Pool
存放对象的大小,禁止过大的对象存入Pool
。
func (p *pp) free() {
// Proper usage of a sync.Pool requires each entry to have approximately
// the same memory cost. To obtain this property when the stored type
// contains a variably-sized buffer, we add a hard limit on the maximum buffer
// to place back in the pool.
//
// See https://golang.org/issue/23199
if cap(p.buf) > 64<<10 { // 对于过大的对象,直接抛弃
return
}
p.buf = p.buf[:0]
p.arg = nil
p.value = reflect.Value{}
p.wrappedErr = nil
ppFree.Put(p)
}
复制代码
源码解析
在Pool
中,每个处理器P有自己本地的poolLocal
,类似于CPU的高缓。在获取空闲对象时,优先从本地的poolLocal
获取,再从其他处理器对应的poolLocal
偷取。
同时,Pool
会定期进行清理操作;在GC垃圾收集前,Pool
会将local
或victim
存储的空闲对象引用清空,方便GC进行清理。
Pool结构体
Pool
由noCopy
、local
、victim
、New
四个变量组成,其中:
noCopy
用于防止Pool
被值复制使用,可使用go vet
工具进行Pool
值复制的检查;local
是[]poolLocal
类型的指针,用于保存临时对象;[]poolLocal
的长度等于处理器P
的长度,且每一个处理器P
对应一个poolLocal
;victim
也是[]poolLocal
类型的指针;Pool
定期会调用poolCleanUp()
进行清理操作,victim
会保存上一轮的local
;New
指定了新对象的创建方法;从空的Pool
中获取对象,会调用New
方法创建新对象。
poolLocal
类型由private
、shared
、pad
变量组成,其中:
private
保存了一个私有变量,只能被本地的处理器P获取;shared
是一个线程安全的共享队列,可以被任何处理器P访问;pad
是CacheLine的填充,用于解决“伪共享”问题。
type Pool struct {
noCopy noCopy // 不支持值复制,参考https://juejin.cn/post/6962180258179055624#heading-2
local unsafe.Pointer // []poolLocal指针类型,每一个处理器对应一个poolLocal
localSize uintptr // local的长度,len([]poolLocal)
victim unsafe.Pointer // []poolLocal指针类型,用于存储上一轮的local
victimSize uintptr // victim的长度,len([]poolLocal)
New func() interface{} // 创建新对象的方法
}
type poolLocal struct {
poolLocalInternal
// 通过对齐操作解决伪共享,高缓是以CacheLine为单位进行缓存的,存在“伪共享”问题;
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
type poolLocalInternal struct {
private interface{} // 保存一个处理器私有的临时对象,仅能被本地处理器访问
shared poolChain // 并发安全的双向链表,用于保存临时对象,可被所有处理器P访问
}
复制代码
Get()方法
Get()
方法用于从Pool
中获取空闲对象,其获取顺序如下:本地local的private
->本地local的shared
->其他处理器的local的shared
->victim
->New
。
func (p *Pool) Get() interface{} {
// 获取本地local的地址l和处理器P对应的pid,并将处理器设置为不可抢夺状态
l, pid := p.pin()
// 1.从本地local的private获取
x := l.private
l.private = nil
if x == nil {
// 2.从本地local的shared获取
x, _ = l.shared.popHead()
if x == nil {
x = p.getSlow(pid)
}
}
runtime_procUnpin()
// 6.创建新的对象
if x == nil && p.New != nil {
x = p.New()
}
return x
}
func (p *Pool) getSlow(pid int) interface{} {
size := atomic.LoadUintptr(&p.localSize)
locals := p.local
// 3.从其他处理器local的shared中偷取
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
// 4.从本地victim的private中获取
if x := l.private; x != nil {
l.private = nil
return x
}
// 5.从victim的shared中获取
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// victim没有对象可获取时,将victim的长度清零
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
复制代码
Put()方法
Put()
用于存放空闲对象,其存放顺序如下:本地local的private
->本地local的shared
。
func (p *Pool) Put(x interface{}) {
// Pool不存放nil对象
if x == nil {
return
}
// 获取本地local的地址,并将处理器设置为不可抢夺状态
l, _ := p.pin()
// 1.存放在本地local的private
if l.private == nil {
l.private = x
x = nil
}
// 2.存放在本地local的shared
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
}
复制代码
poolCleanUp()操作
程序内的所有Pool
对象都会保存在allPools
和oldPools
这两个全局变量中。其中,allPools
用于保存local
非空的Pool
对象,oldPools
用于保存local
为空、victim
非空的Pool
对象。
poolCleanup()
会对所有Pool
对象进行清理操作,poolCleanup()
的调用时刻在GC开始时。poolCleanup()
执行时,GC会进行STW操作,所以poolCleanup()
无需并发控制。
// 全局变量
var (
allPoolsMu Mutex
// allPools保存所有local非空的对象
// allPools的并发控制手段有:allPoolsMu或STW
allPools []*Pool
// oldPools保存所有local为空、victim非空的对象
// oldPools的并发控制手段有:STW
oldPools []*Pool
)
func poolCleanup() {
// 将oldPools的victim清空
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// 将allPools的local清空
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// 更换oldPools和allPools
// PS:进行Get或Put操作时,会触发local的创建操作,将local原本为空的Pool对象放回到allPools
oldPools, allPools = allPools, nil
}
复制代码
pin()操作
在执行Get()
方法和Put()
方法时,都会执行pin()
方法。pin()
方法主要进行了两个操作:
- 通过
runtime_procPin()
将协程对应的线程设置为不可抢占状态,防止协程被抢占; - 当
Pool
的local
需要被创建时,进行local
的创建。
// *poolLocal为处理器P对应的poolLocal地址
// int为处理器P的pid
func (p *Pool) pin() (*poolLocal, int) {
// 实现在runtime/proc.go,将协程对应的线程设置为不可抢占状态,防止协程被抢占
// 由于Pool的local是基于处理器P的切片,每一个处理器P都有对应的poolLocal;
// 当协程"被抢占再被调度"后,该协程被抢占前获取pid与被调度后所在处理器P不符,导致协程跨poolLocal访问,出现不符合预期的情况
pid := runtime_procPin()
s := atomic.LoadUintptr(&p.localSize)
l := p.local
if uintptr(pid) < s {
// 返回pid对应的poolLocal地址
return indexLocal(l, pid), pid
}
return p.pinSlow()
}
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}
func (p *Pool) pinSlow() (*poolLocal, int) {
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// 创建local,并将Pool对象放到allPools中
if p.local == nil {
allPools = append(allPools, p)
}
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
atomic.StoreUintptr(&p.localSize, uintptr(size))
return &local[pid], pid
}
复制代码