go语言提供非常方便的创建轻量级的协程goroutine来并发处理任务,但是 协程过多影响程序性能,所以,这时goroutine池就登场了。本文简要介绍ants goroutine 池的基本的使用方法。
简介
ants是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果。 ants就是一个很多大厂广泛使用的goroute池。
官网地址
功能
- 自动调度海量的 goroutines,复用 goroutines
- 定期清理过期的 goroutines,进一步节省资源
- 提供了大量有用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool
- 优雅处理 panic,防止程序崩溃
- 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能
- 非阻塞机制
quick start
使用 ants v1 版本:
go get -u github.com/panjf2000/ants
使用 ants v2 版本 (开启 GO111MODULE=on):
go get -u github.com/panjf2000/ants/v2
接下来看一下官方demo:实现一个计算大量整数和的程序
NewPool
NewPool生成ants池实例。
package main
import (
"fmt"
"sync"
"time"
"github.com/panjf2000/ants/v2"
)
var sum int32
func demoFunc() {
time.Sleep(10 * time.Millisecond)
fmt.Println("Hello World!")
}
func main() {
defer ants.Release()
runTimes := 1000
//使用公共池。
var wg sync.WaitGroup
syncCalculateSum := func() {
demoFunc()
wg.Done()
}
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = ants.Submit(syncCalculateSum)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks.\n")
}
其中:
生成一个具有特定函数的ants池实例
- NewPool(size int, options …Option) (*PoolWithFunc, error)
args.size 即池容量,即池中最多有 10 个 goroutine。 arg.Option 定制化 goroutine pool.
- p.Submit向此池提交任务。
- ants.Release关闭此池并释放工作队列。
- defaultAntsPool 导入ants时初始化实例池。
NewPoolWithFunc
package main
import (
"fmt"
"github.com/panjf2000/ants/v2"
"sync"
"sync/atomic"
)
var sum int32
func myFunc(i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
fmt.Printf("run with %d\n", n)
}
func main() {
defer ants.Release()
runTimes := 1000
var wg sync.WaitGroup
//使用带有函数的池
//设置goroutine池的容量为10,过期时间为1秒。
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i)
wg.Done()
})
defer p.Release()
//逐个提交任务。
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = p.Invoke(int32(i))
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
}
OutPut:
run with 1
run with 5
run with 11
run with 12
run with 13
run with 14
run with 7
...
run with 789
run with 809
running goroutines: 10
finish all tasks, result is 499500
其中:
生成一个具有特定函数的ants池实例
- NewPoolWithFunc(size int, pf func(interface{}), options …Option) (*PoolWithFunc, error)
args.pf 即为执行任务的函数
优雅处理 panic
测试一些当任务触发panic的情况
func myFunc(i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
if n%2 == 0 {
panic(any(fmt.Sprintf("panic from task:%d", n)))
}
fmt.Printf("run with %d\n", n)
}
output:
run with 3
run with 13
...
run with 999
2022/10/21 21:41:05 worker with func exits from a panic: panic from task:6
2022/10/21 21:41:05 worker with func exits from a panic: panic from task:0
2022/10/21 21:41:07 worker with func exits from panic: goroutine 14 [running]:
可以看到,main routine 没有因此受影响。
Options
// Options包含实例化ants池时将应用的所有选项。
type Options struct {
// ExpiryDuration is a period for the scavenger goroutine to clean up those expired workers,
// the scavenger scans all workers every `ExpiryDuration` and clean up those workers that haven't been
// used for more than `ExpiryDuration`.
ExpiryDuration time.Duration
// PreAlloc indicates whether to make memory pre-allocation when initializing Pool.
PreAlloc bool
// Max number of goroutine blocking on pool.Submit.
// 0 (default value) means no such limit.
MaxBlockingTasks int
// When Nonblocking is true, Pool.Submit will never be blocked.
// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
// When Nonblocking is true, MaxBlockingTasks is inoperative.
Nonblocking bool
// PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{})
// Logger is the customized logger for logging info, if it is not set,
// default standard logger from log package is used.
Logger Logger
}
比如 PanicHandler 遇到 panic会调用这里设置的处理函数,以上例中,我们遇到偶数会触发panic,修改NewPool函数
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i)
wg.Done()
}, ants.WithPanicHandler(func(i interface{}) {
fmt.Printf("panic recover %v", i)
}))
out:
panic recover panic from i:992run with 880
panic recover panic from i:880run with 972
panic recover panic from i:972run with 994
run with 991
还有更多关于 Benchmarks 性能报告,可参考https://github.com/panjf2000/ants