ants是一个广泛使用的goroute池,可以有效控制协程数量,防止协程过多影响程序性能。
为什么需要协程池
goroute是一种轻量级的用户态线程,可以在线程的基础上实现分时复用,适用于IO密集型应用。goroute具有轻量、资源占用少、切换开销小的优势。但是受资源限制,协程创建的数量也是有限的。过多创建协程,会导致程序消耗过多的系统资源,从而影响程序的正常运行。
接下来,我们以两个实例来说明过多创建协程的影响。
- 无限开启协程,导致标准输出被过多并发操作,从而产生panic。
func main() {
var wg sync.WaitGroup
for i := 0; i < math.MaxInt32; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
fmt.Println(i)
time.Sleep(time.Second * 100)
}(i)
}
wg.Wait()
}
panic: too many concurrent operations on a single file or socket (max 1048575)
goroutine 1980639 [running]:
internal/poll.(*fdMutex).rwlock(0xc000094060, 0xc0ee9bd600, 0x1097c75)
/usr/local/go/src/internal/poll/fd_mutex.go:147 +0x13f
...
复制代码
- 无限开启协程,导致程序占用过多内存,存在内存不足而崩溃的风险。
func main() {
var wg sync.WaitGroup
for i := 0; i < math.MaxInt32; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
time.Sleep(time.Second * 100)
}(i)
}
wg.Wait()
}
PID COMMAND %CPU TIME #TH #WQ #PORTS MEM PURG CMPRS
29954 ___go_build_ 241.1 03:33.68 18/5 0 27 24G+ 0B 17G+
复制代码
快速使用
ants协程池使用起来非常简单,支持默认协程池defaultAntsPool
、自定义协程池NewPool(size,options)
、指定方法的协程池NewPoolWithFunc
。
// 使用默认的协程池
_ = ants.Submit(func() {
fmt.Println("hello")
})
// 使用自定义协程池
p, _ := ants.NewPool(1000)
_ = p.Submit(func() {
fmt.Println("hello")
})
// 使用指定方法的协程池
p, _ := ants.NewPoolWithFunc(1000, func(i interface{}) {
fmt.Println(i)
})
_ = p.Invoke(Param)
复制代码
实现原理
在antsPool协程池中,每一个worker对应一个goroute。在worker的初始化阶段会通过go关键字创建一个goroute,然后这个goroute会不断监听并执行taskChan里面的task,类似生产者消费者模式。
antsPool协程池通过workerArray管理worker。workerArray按照worker的入队时间有序存放worker,具有FILO的特点。同时antsPool具有定时清理过期worker的特性,会定期从workerArray中查找过期的worker,将其放入workerPool sync.Pool
中等待GC。
源码解析
接下来,以ants.Submit(task func())
为例,进行分析ants的源码。ants.Submit()
方法使用了defaultAntsPool
默认协程池进行任务的执行。
const DefaultAntsPoolSize = math.MaxInt32
var (
defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
)
// Submit submits a task to pool.
func Submit(task func()) error {
return defaultAntsPool.Submit(task)
}
复制代码
协程池的结构
ants协程池的结构相对简单,这里不做过多的介绍。
// 协程池的结构
type Pool struct {
// 协程池的容量
capacity int32
// 正在运行中的协程数量
running int32
// lock用于保护workerArray
lock sync.Locker
// 保存可以使用的worker
workers workerArray
// 标记协程池是否关闭
state int32
// cond用于等待空闲的worker
cond *sync.Cond
// 暂存过期的worker,如果没有再次使用会被GC
workerCache sync.Pool
// 阻塞协程数量,指提交任务的协程
blockingNum int
// 协程池的配置,包括过期时间、是否支持预分配、最大阻塞数量、panic处理、日志等
options *Options
}
// worker的结构
type goWorker struct {
// 用于记录当前worker属于哪一个协程池
pool *Pool
// 接受任务的chan,多核环境chan大小为1,单核环境chan大小为0(借鉴fastHttp的实现)
task chan func()
// worker进入队列的时间
recycleTime time.Time
}
复制代码
协程池的创建
ants 协程池的创建主要进行了以下操作:
- 按照定义的option进行个性化配置;
- 指定
workerCache sync.Pool
创建worker的方法; - 进行workerArray的初始化;在ants中有两种实现workerArray的方式,使用预分配的情况下采用
loopQueue
循环队列实现,不使用预分配采用workerStack
栈实现; - 开启一个协程,定时清理workerArray中的worker;
func NewPool(size int, options ...Option) (*Pool, error) {
// 按照option进行配置
opts := loadOptions(options...)
// 校验协程池容量
if size <= 0 {
size = -1
}
// 过期时间设置
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime
}
// 日志输出设置
if opts.Logger == nil {
opts.Logger = defaultLogger
}
// New一个Pool实例
p := &Pool{
capacity: int32(size),
lock: internal.NewSpinLock(),
options: opts,
}
// 指定sync.Pool创建worker的方法
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
// 根据预分配标志,使用不同的workerArray的实现方式
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
p.workers = newWorkerArray(loopQueueType, size)
} else {
p.workers = newWorkerArray(stackType, 0)
}
p.cond = sync.NewCond(p.lock)
// 开启一个协程周期清理过期的worker
go p.purgePeriodically()
return p, nil
}
复制代码
任务提交
任务提交首先从协程池中获取空闲的worker,然后向worker的taskChan中提交任务,等待worker消费任务。
func (p *Pool) Submit(task func()) error {
if p.IsClosed() {
return ErrPoolClosed
}
var w *goWorker
if w = p.retrieveWorker(); w == nil {
return ErrPoolOverload
}
w.task <- task
return nil
}
复制代码
空闲worker的获取采用优先级策略,其优先级如下:
- 优先从workerArray中获取可用的worker
- 如果当前运行的协程未达到协程池的容量,从workerCache中获取并启动一个worker
- 协程池支持非阻塞,直接返回一个空worker
- 协程池不支持非阻塞,则阻塞等待可用的worker
// retrieveWorker returns a available worker to run the tasks.
func (p *Pool) retrieveWorker() (w *goWorker) {
spawnWorker := func() {
w = p.workerCache.Get().(*goWorker)
w.run()
}
p.lock.Lock()
// 1. 优先从workerArray中获取worker
w = p.workers.detach()
if w != nil {
p.lock.Unlock()
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
// 2. 可以扩容,从workerCache中获取
p.lock.Unlock()
spawnWorker()
} else {
// 支持非阻塞, 直接返回nil的worker
if p.options.Nonblocking {
p.lock.Unlock()
return
}
// 阻塞等待可用的worker
retry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return
}
p.blockingNum++
p.cond.Wait()
p.blockingNum--
var nw int
if nw = p.Running(); nw == 0 {
p.lock.Unlock()
if !p.IsClosed() {
spawnWorker()
}
return
}
if w = p.workers.detach(); w == nil {
if nw < capacity {
p.lock.Unlock()
spawnWorker()
return
}
goto retry
}
p.lock.Unlock()
}
return
}
复制代码
Worker的创建
当无法从workerArray和workerCache中获取worker时,协程池会创建一个新的worker,并调用worker.run()
启动worker。worker启动之后,会开启一个goroute监听并执行taskChan中任务。直到worker接收到终止信号nil
或协程池已满无法放回协程池时,worker会退出taskChan的监听,进入清理环节。
func (p *Pool) retrieveWorker() (w *goWorker) {
spawnWorker := func() {
// 实例化worker
w = p.workerCache.Get().(*goWorker)
// 启动worker
w.run()
}
// ......
}
func (w *goWorker) run() {
w.pool.incRunning() // 正在运行的协程数+1
go func() {
defer func() { // 清理相关
w.pool.decRunning() // 正在运行的协程数-1
w.pool.workerCache.Put(w) // 将worker放入workerCache中等待GC
if p := recover(); p != nil { // panic捕获
if ph := w.pool.options.PanicHandler; ph != nil {
ph(p)
} else {
w.pool.options.Logger.Printf("worker exits from a panic: %v\n", p)
var buf [4096]byte
n := runtime.Stack(buf[:], false)
w.pool.options.Logger.Printf("worker exits from panic: %s\n", string(buf[:n]))
}
}
w.pool.cond.Signal() // 唤醒获取worker的协程
}()
// 不断消费taskChan中的任务
for f := range w.task {
if f == nil { // 接收到终止信号nil时,退出循环进入清理环节
return
}
f() // 执行任务
if ok := w.pool.revertWorker(w); !ok { // 将worker放回到workerArray中
return
}
}
}()
}
复制代码
协程池的清理
由于workerArray按照worker的插入时间排序,在获取过期worker时仅需要通过二分查找就可以轻松找出过期的worker。找到过期的worker后,会向过期的worker发送终止信号nil,并清空过期worker的引用,以方便worker被GC。
func (p *Pool) purgePeriodically() {
heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer heartbeat.Stop()
for range heartbeat.C {
// 如果协程池已经被关闭,就退出清理的定时任务
if p.IsClosed() {
break
}
// 从workers中获取过期的worker
p.lock.Lock()
expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
p.lock.Unlock()
// 清理过期的worker
for i := range expiredWorkers {
// 向worker的taskChan发送终止信号;当worker接收到nil的任务时,会进入workerCache等待GC
expiredWorkers[i].task <- nil
// 清空worker的引用,方便GC
expiredWorkers[i] = nil
}
// 唤醒获取worker的协程
if p.Running() == 0 {
p.cond.Broadcast()
}
}
}
复制代码