之前使用 waitgroup
,确实是可以并发控制任务执行,而且可以保证任务执行,但是任务执行是否成功,如果失败了,错误信息会丢失在协程运行栈上,这个导致我们很难追踪错误。
这个时候我们就需要 errgroup
。
分析
其实想想很简单,毕竟也是并发执行,最大的特点是可以把 err
吐出来。
type Group struct {
cancel func()
wg sync.WaitGroup
errOnce sync.Once
err error
}
func (g *Group) Go(f func() error) {
// 还是 waitgroup 在起效果
g.wg.Add(1)
go func() {
// 不管错误与否,都得关闭
defer g.wg.Done()
if err := f(); err != nil {
// 只会执行一次,也就是只会记录第一次的错误
g.errOnce.Do(func() {
// 出现错误赋值
g.err = err
if g.cancel != nil {
// 如果有错误,则记录第一次错误的消息,然后关闭上下文
g.cancel()
}
})
}
}()
}
复制代码
Group
中使用 waitGroup
来控制 goroutine
的并发,成员变量err来记录运行中发生的错误,这里只记录第一次返回的错误值。【注意,只有出现第一次错误的时候记录。sync.Once
只会执行一次】
如何使用?
// 创建分两种
eg1 := new(errgroup.Group)
eg2, _ := errgroup.WithContent(context.Background())
// 执行
for _,v := range urls {
eg2.Go(func()error{
resp,err := http.Get(v)
if err != nil {
resp.Body.Close()
}
...
return err
})
}
if err := eg2.Wait();err != nil {
fmt.Println(err)
}
复制代码
创建的时候其实用 new
或者是 withContext
都一样,只是 WithContent
自带一个 ctx
的 cancel
。
举一个极端的例子,有个任务有10000个子任务,也就意味着要开 10000个 goroutine
,这个对服务来说是一个不小的损耗,原生的问题就在这:不能控制并发数。
所以来看看这个该怎么控制:
func (g *Group) GOMAXPROCS(n int) {
if n <= 0 {
panic("errgroup: GOMAXPROCS must great than 0")
}
// 一次执行即可
g.workerOnce.Do(func() {
g.ch = make(chan func(context.Context) error, n)
for i := 0; i < n; i++ {
go func() {
for f := range g.ch {
// 真正做执行的地方
g.do(f)
}
}()
}
})
}
复制代码
- 保证协程控制只执行一次,不然
channel
控制就混乱了 - 用
channel
控制任务的执行 - 任何中始终携带上层的
ctx
综上,新的 Group
设计出来啦:
type Group struct {
err error
wg sync.WaitGroup
errOnce sync.Once
workerOnce sync.Once
// 协程并发控制
ch chan func(ctx context.Context) error
// 阻塞协程中要执行的函数,缓存在此,有机会拿出来再执行
chs []func(ctx context.Context) error
ctx context.Context
cancel func()
}
复制代码
上面还有一个 chs
,作为缓存未被执行的函数,既然是未被执行,要执行的话,会在最后的 Wait()
兜底执行:
func (g *Group) Wait() error {
if g.ch != nil {
// 从缓存中拉出来,会在 GOMAXPROCS 中被消耗
for _, f := range g.chs {
g.ch <- f
}
}
// 等待全部的任务都被消耗完
g.wg.Wait()
// 关闭channel,取消上下文
if g.ch != nil {
close(g.ch) // let all receiver exit
}
if g.cancel != nil {
g.cancel()
}
return g.err
}
复制代码
当然了,如何使用呢?尤其是 GOMAXPROCS()
的调用:
func main() {
g := Group{}
g.GOMAXPROCS(5)
g.Go(a)
g.Go(b)
g.Go(c)
g.Go(d)
g.Wait()
}
func a() (context.Context) error {
time.Sleep(time.Second)
return nil
}
复制代码
- 先手动设置并发执行数
- 每一个子任务执行都应该携带
context
,这个context
可以由上面携带下来
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END