深度解密 Go 语言中的 sync.Pool

时间:2021-05-22

最近在工作中碰到了 GC 的问题:项目中大量重复地创建许多对象,造成 GC 的工作量巨大,CPU 频繁掉底。准备使用 sync.Pool 来缓存对象,减轻 GC 的消耗。为了用起来更顺畅,我特地研究了一番,形成此文。本文从使用到源码解析,循序渐进,一一道来。

是什么

sync.Pool 是 sync 包下的一个组件,可以作为保存临时取还对象的一个“池子”。个人觉得它的名字有一定的误导性,因为 Pool 里装的对象可以被无通知地被回收,可能 sync.Cache 是一个更合适的名字。

有什么用

对于很多需要重复分配、回收内存的地方,sync.Pool 是一个很好的选择。频繁地分配、回收内存会给 GC 带来一定的负担,严重的时候会引起 CPU 的毛刺,而 sync.Pool 可以将暂时不用的对象缓存起来,待下次需要的时候直接使用,不用再次经过内存分配,复用对象的内存,减轻 GC 的压力,提升系统的性能。

怎么用

首先,sync.Pool 是协程安全的,这对于使用者来说是极其方便的。使用前,设置好对象的 New 函数,用于在 Pool 里没有缓存的对象时,创建一个。之后,在程序的任何地方、任何时候仅通过 Get()、Put() 方法就可以取、还对象了。

下面是 2018 年的时候,《Go 夜读》上关于 sync.Pool 的分享,关于适用场景:

当多个 goroutine 都需要创建同⼀个对象的时候,如果 goroutine 数过多,导致对象的创建数⽬剧增,进⽽导致 GC 压⼒增大。形成 “并发⼤-占⽤内存⼤-GC 缓慢-处理并发能⼒降低-并发更⼤”这样的恶性循环。

在这个时候,需要有⼀个对象池,每个 goroutine 不再⾃⼰单独创建对象,⽽是从对象池中获取出⼀个对象(如果池中已经有的话)。

因此关键思想就是对象的复用,避免重复创建、销毁,下面我们来看看如何使用。

简单的例子

首先来看一个简单的例子:

package mainimport ( "fmt" "sync")var pool *sync.Pooltype Person struct { Name string}func initPool() { pool = &sync.Pool { New: func()interface{} { fmt.Println("Creating a new Person") return new(Person) }, }}func main() { initPool() p := pool.Get().(*Person) fmt.Println("首次从 pool 里获取:", p) p.Name = "first" fmt.Printf("设置 p.Name = %s\n", p.Name) pool.Put(p) fmt.Println("Pool 里已有一个对象:&{first},调用 Get: ", pool.Get().(*Person)) fmt.Println("Pool 没有对象了,调用 Get: ", pool.Get().(*Person))}

运行结果:

Creating a new Person
首次从 pool 里获取: &{}
设置 p.Name = first
Pool 里已有一个对象:&{first},Get: &{first}
Creating a new Person
Pool 没有对象了,Get: &{}

首先,需要初始化 Pool,唯一需要的就是设置好 New 函数。当调用 Get 方法时,如果池子里缓存了对象,就直接返回缓存的对象。如果没有存货,则调用 New 函数创建一个新的对象。

另外,我们发现 Get 方法取出来的对象和上次 Put 进去的对象实际上是同一个,Pool 没有做任何“清空”的处理。但我们不应当对此有任何假设,因为在实际的并发使用场景中,无法保证这种顺序,最好的做法是在 Put 前,将对象清空。

fmt 包如何用

这部分主要看 fmt.Printf 如何使用:

func Printf(format string, a ...interface{}) (n int, err error) { return Fprintf(os.Stdout, format, a...)}

继续看 Fprintf:

func Fprintf(w io.Writer, format string, a ...interface{}) (n int, err error) { p := newPrinter() p.doPrintf(format, a) n, err = w.Write(p.buf) p.free() return}

Fprintf 函数的参数是一个 io.Writer,Printf 传的是 os.Stdout,相当于直接输出到标准输出。这里的 newPrinter 用的就是 Pool:

// newPrinter allocates a new pp struct or grabs a cached one.func newPrinter() *pp { p := ppFree.Get().(*pp) p.panicking = false p.erroring = false p.wrapErrs = false p.fmt.init(&p.buf) return p}var ppFree = sync.Pool{ New: func() interface{} { return new(pp) },}

回到 Fprintf 函数,拿到 pp 指针后,会做一些 format 的操作,并且将 p.buf 里面的内容写入 w。最后,调用 free 函数,将 pp 指针归还到 Pool 中:

// free saves used pp structs in ppFree; avoids an allocation per invocation.func (p *pp) free() { if cap(p.buf) > 64<<10 { return } p.buf = p.buf[:0] p.arg = nil p.value = reflect.Value{} p.wrappedErr = nil ppFree.Put(p)}

归还到 Pool 前将对象的一些字段清零,这样,通过 Get 拿到缓存的对象时,就可以安全地使用了。

pool_test

通过 test 文件学习源码是一个很好的途径,因为它代表了“官方”的用法。更重要的是,测试用例会故意测试一些“坑”,学习这些坑,也会让自己在使用的时候就能学会避免。

pool_test 文件里共有 7 个测试,4 个 BechMark。

TestPool 和 TestPoolNew 比较简单,主要是测试 Get/Put 的功能。我们来看下 TestPoolNew:

func TestPoolNew(t *testing.T) { // disable GC so we can control when it happens. defer debug.SetGCPercent(debug.SetGCPercent(-1)) i := 0 p := Pool{ New: func() interface{} { i++ return i }, } if v := p.Get(); v != 1 { t.Fatalf("got %v; want 1", v) } if v := p.Get(); v != 2 { t.Fatalf("got %v; want 2", v) } // Make sure that the goroutine doesn't migrate to another P // between Put and Get calls. Runtime_procPin() p.Put(42) if v := p.Get(); v != 42 { t.Fatalf("got %v; want 42", v) } Runtime_procUnpin() if v := p.Get(); v != 3 { t.Fatalf("got %v; want 3", v) }}

首先设置了 GC=-1,作用就是停止 GC。那为啥要用 defer?函数都跑完了,还要 defer 干啥。注意到,debug.SetGCPercent 这个函数被调用了两次,而且这个函数返回的是上一次 GC 的值。因此,defer 在这里的用途是还原到调用此函数之前的 GC 设置,也就是恢复现场。

接着,调置了 Pool 的 New 函数:直接返回一个 int,变且每次调用 New,都会自增 1。然后,连续调用了两次 Get 函数,因为这个时候 Pool 里没有缓存的对象,因此每次都会调用 New 创建一个,所以第一次返回 1,第二次返回 2。

然后,调用 Runtime_procPin() 防止 goroutine 被强占,目的是保护接下来的一次 Put 和 Get 操作,使得它们操作的对象都是同一个 P 的“池子”。并且,这次调用 Get 的时候并没有调用 New,因为之前有一次 Put 的操作。

最后,再次调用 Get 操作,因为没有“存货”,因此还是会再次调用 New 创建一个对象。

TestPoolGC 和 TestPoolRelease 则主要测试 GC 对 Pool 里对象的影响。这里用了一个函数,用于计数有多少对象会被 GC 回收:

runtime.SetFinalizer(v, func(vv *string) { atomic.AddUint32(&fin, 1)})

当垃圾回收检测到 v 是一个不可达的对象时,并且 v 又有一个关联的 Finalizer,就会另起一个 goroutine 调用设置的 finalizer 函数,也就是上面代码里的参数 func。这样,就会让对象 v 重新可达,从而在这次 GC 过程中不被回收。之后,解绑对象 v 和它所关联的 Finalizer,当下次 GC 再次检测到对象 v 不可达时,才会被回收。

TestPoolStress 从名字看,主要是想测一下“压力”,具体操作就是起了 10 个 goroutine 不断地向 Pool 里 Put 对象,然后又 Get 对象,看是否会出错。

TestPoolDequeue 和 TestPoolChain,都调用了 testPoolDequeue,这是具体干活的。它需要传入一个 PoolDequeue 接口:

// poolDequeue testing.type PoolDequeue interface { PushHead(val interface{}) bool PopHead() (interface{}, bool) PopTail() (interface{}, bool)}

PoolDequeue 是一个双端队列,可以从头部入队元素,从头部和尾部出队元素。调用函数时,前者传入 NewPoolDequeue(16),后者传入 NewPoolChain(),底层其实都是 poolDequeue 这个结构体。具体来看 testPoolDequeue 做了什么:

总共起了 10 个 goroutine:1 个生产者,9 个消费者。生产者不断地从队列头 pushHead 元素到双端队列里去,并且每 push 10 次,就 popHead 一次;消费者则一直从队列尾取元素。不论是从队列头还是从队列尾取元素,都会在 map 里做标记,最后检验每个元素是不是只被取出过一次。

剩下的就是 Benchmark 测试了。第一个 BenchmarkPool 比较简单,就是不停地 Put/Get,测试性能。

BenchmarkPoolSTW 函数会先关掉 GC,再向 pool 里 put 10 个对象,然后强制触发 GC,记录 GC 的停顿时间,并且做一个排序,计算 P50 和 P95 的 STW 时间。这个函数可以加入个人的代码库了:

func BenchmarkPoolSTW(b *testing.B) { // Take control of GC. defer debug.SetGCPercent(debug.SetGCPercent(-1)) var mstats runtime.MemStats var pauses []uint64 var p Pool for i := 0; i < b.N; i++ { // Put a large number of items into a pool. const N = 100000 var item interface{} = 42 for i := 0; i < N; i++ { p.Put(item) } // Do a GC. runtime.GC() // Record pause time. runtime.ReadMemStats(&mstats) pauses = append(pauses, mstats.PauseNs[(mstats.NumGC+255)%256]) } // Get pause time stats. sort.Slice(pauses, func(i, j int) bool { return pauses[i] < pauses[j] }) var total uint64 for _, ns := range pauses { total += ns } // ns/op for this benchmark is average STW time. b.ReportMetric(float64(total)/float64(b.N), "ns/op") b.ReportMetric(float64(pauses[len(pauses)*95/100]), "p95-ns/STW") b.ReportMetric(float64(pauses[len(pauses)*50/100]), "p50-ns/STW"

声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。

相关文章