溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Golang中sync.Pool的作用是什么

發(fā)布時間:2021-08-03 15:35:33 來源:億速云 閱讀:210 作者:Leah 欄目:編程語言

這篇文章將為大家詳細(xì)講解有關(guān)Golang中sync.Pool的作用是什么,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。


Pool介紹

總所周知Go 是一個自動垃圾回收的編程語言,采用三色并發(fā)標(biāo)記算法標(biāo)記對象并回收。如果你想使用 Go 開發(fā)一個高性能的應(yīng)用程序的話,就必須考慮垃圾回收給性能帶來的影響。因?yàn)镚o 在垃圾回收的時候會有一個STW(stop-the-world,程序暫停)的時間,并且如果對象太多,做標(biāo)記也需要時間。

所以如果采用對象池來創(chuàng)建對象,增加對象的重復(fù)利用率,使用的時候就不必在堆上重新創(chuàng)建對象可以節(jié)省開銷。

在Go中,sync.Pool提供了對象池的功能。它對外提供了三個方法:New、Get 和 Put。下面用一個簡短的例子來說明一下Pool使用:

var pool *sync.Pool
type Person struct {
	Name string
}

func init() {
	pool = &sync.Pool{
		New: func() interface{}{
			fmt.Println("creating a new person")
			return new(Person)
		},
	}
}

func main() {

	person := pool.Get().(*Person)
	fmt.Println("Get Pool Object:", person)

	person.Name = "first"
	pool.Put(person)

	fmt.Println("Get Pool Object:",pool.Get().(*Person))
	fmt.Println("Get Pool Object:",pool.Get().(*Person))

}

結(jié)果:

creating a new person
Get Pool Object: &{}
Get Pool Object: &{first}
creating a new person
Get Pool Object: &{}

這里我用了init方法初始化了一個pool,然后get了三次,put了一次到pool中,如果pool中沒有對象,那么會調(diào)用New函數(shù)創(chuàng)建一個新的對象,否則會重put進(jìn)去的對象中獲取。

源碼分析

type Pool struct {
	noCopy noCopy 
	local     unsafe.Pointer  
	localSize uintptr 
	victim     unsafe.Pointer 
	victimSize uintptr 
	New func() interface{}
}

Pool結(jié)構(gòu)體里面noCopy代表這個結(jié)構(gòu)體是禁止拷貝的,它可以在我們使用 go vet 工具的時候生效;

local是一個poolLocal數(shù)組的指針,localSize代表這個數(shù)組的大??;同樣victim也是一個poolLocal數(shù)組的指針,每次垃圾回收的時候,Pool 會把 victim 中的對象移除,然后把 local 的數(shù)據(jù)給 victim;local和victim的邏輯我們下面會詳細(xì)介紹到。

New函數(shù)是在創(chuàng)建pool的時候設(shè)置的,當(dāng)pool沒有緩存對象的時候,會調(diào)用New方法生成一個新的對象。

下面我們對照著pool的結(jié)構(gòu)圖往下講,避免找不到北:

Golang中sync.Pool的作用是什么

type poolLocal struct {
	poolLocalInternal 
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

local字段存儲的是一個poolLocal數(shù)組的指針,poolLocal數(shù)組大小是goroutine中P的數(shù)量,訪問時,P的id對應(yīng)poolLocal數(shù)組下標(biāo)索引,所以Pool的最大個數(shù)runtime.GOMAXPROCS(0)。

通過這樣的設(shè)計(jì),每個P都有了自己的本地空間,多個 goroutine 使用同一個 Pool 時,減少了競爭,提升了性能。如果對goroutine的P、G、M有疑惑的同學(xué)不妨看看這篇文章:The Go scheduler。

poolLocal里面有一個pad數(shù)組用來占位用,防止在 cache line 上分配多個 poolLocalInternal從而造成false sharing,有關(guān)于false sharing可以看看這篇文章:

What’s false sharing and how to solve it ,文中對于false sharing的定義:

That’s what false sharing is: one core update a variable would force other cores to update cache either.

type poolLocalInternal struct {
	private interface{} // Can be used only by the respective P.
	shared  poolChain   // Local P can pushHead/popHead; any P can popTail.
}

poolLocalInternal包含兩個字段private和shared。

private代表緩存的一個元素,只能由相應(yīng)的一個 P 存取。因?yàn)橐粋€ P 同時只能執(zhí)行一個 goroutine,所以不會有并發(fā)的問題;

shared則可以由任意的 P 訪問,但是只有本地的 P 才能 pushHead/popHead,其它 P 可以 popTail。

type poolChain struct { 
	head *poolChainElt 
	tail *poolChainElt
}

type poolChainElt struct {
	poolDequeue 
	next, prev *poolChainElt
}

type poolDequeue struct { 
	headTail uint64 
	vals []eface
}

poolChain是一個雙端隊(duì)列,里面的head和tail分別指向隊(duì)列頭尾;poolDequeue里面存放真正的數(shù)據(jù),是一個單生產(chǎn)者、多消費(fèi)者的固定大小的無鎖的環(huán)狀隊(duì)列,headTail是環(huán)狀隊(duì)列的首位位置的指針,可以通過位運(yùn)算解析出首尾的位置,生產(chǎn)者可以從 head 插入、head 刪除,而消費(fèi)者僅可從 tail 刪除。

這個雙端隊(duì)列的模型大概是這個樣子:

Golang中sync.Pool的作用是什么

poolDequeue里面的環(huán)狀隊(duì)列大小是固定的,后面分析源碼我們會看到,當(dāng)環(huán)狀隊(duì)列滿了的時候會創(chuàng)建一個size是原來兩倍大小的環(huán)狀隊(duì)列。大家這張圖好好體會一下,會反復(fù)用到。

Get方法

func (p *Pool) Get() interface{} {
	...
    //1.把當(dāng)前goroutine綁定在當(dāng)前的P上
	l, pid := p.pin()
    //2.優(yōu)先從local的private中獲取
	x := l.private
	l.private = nil
	if x == nil { 
        //3,private沒有,那么從shared的頭部獲取
		x, _ = l.shared.popHead()
        //4. 如果都沒有,那么去別的local上去偷一個
		if x == nil {
			x = p.getSlow(pid)
		}
	}
    //解除搶占
	runtime_procUnpin()
	...
    //5. 如果沒有獲取到,嘗試使用New函數(shù)生成一個新的
	if x == nil && p.New != nil {
		x = p.New()
	}
	return x
}
  • 這一段代碼首先會將當(dāng)前goroutine綁定在當(dāng)前的P上返回對應(yīng)的local,然后嘗試從local的private中獲取,然后需要把private字段置空,因?yàn)橐呀?jīng)拿到了想要的對象;

  • private中獲取不到,那么就去shared的頭部獲取;

  • shared也沒有,那么嘗試遍歷所有的 local,嘗試從它們的 shared 彈出一個元素;

  • 最后如果還是沒有,那么就直接調(diào)用預(yù)先設(shè)置好的 New 函數(shù),創(chuàng)建一個出來。

pin
func (p *Pool) pin() (*poolLocal, int) {
	pid := runtime_procPin() 
	s := atomic.LoadUintptr(&p.localSize) // load-acquire
	l := p.local                          // load-consume
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}
	return p.pinSlow()
}

pin方法里面首先會調(diào)用runtime_procPin方法會先獲取當(dāng)前goroutine,然后綁定到對應(yīng)的M上,然后返回M目前綁定的P的id,因?yàn)檫@個pid后面會用到,防止在使用途中P被搶占,具體的細(xì)節(jié)可以看這篇:https://zhuanlan.zhihu.com/p/99710992。

接下來會使用原子操作取出localSize,如果當(dāng)前pid大于localSize,那么就表示Pool還沒創(chuàng)建對應(yīng)的poolLocal,那么調(diào)用pinSlow進(jìn)行創(chuàng)建工作,否則調(diào)用indexLocal取出pid對應(yīng)的poolLocal返回。

func indexLocal(l unsafe.Pointer, i int) *poolLocal {
	lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
	return (*poolLocal)(lp)
}

indexLocal里面是使用了地址操作,傳入的i是數(shù)組的index值,所以需要獲取poolLocal{}的size做一下地址的位移操作,然后再轉(zhuǎn)成轉(zhuǎn)成poolLocal地址返回。

pinSlow
func (p *Pool) pinSlow() (*poolLocal, int) { 
	// 解除pin
	runtime_procUnpin()
	// 加上全局鎖
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	// pin住
	pid := runtime_procPin() 
	s := p.localSize
	l := p.local
	// 重新對pid進(jìn)行檢查
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}
	// 初始化local前會將pool放入到allPools數(shù)組中
	if p.local == nil {
		allPools = append(allPools, p)
	} 
	// 當(dāng)前P的數(shù)量
	size := runtime.GOMAXPROCS(0)
	local := make([]poolLocal, size)
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))  
	atomic.StoreUintptr(&p.localSize, uintptr(size))         
	return &local[pid], pid
}

因?yàn)閍llPoolsMu是一個全局Mutex鎖,因此上鎖會比較慢可能被阻塞,所以上鎖前調(diào)用runtime_procUnpin方法解除pin的操作;

在解除綁定后,pinSlow 可能被其他的線程調(diào)用過了,p.local 可能會發(fā)生變化。因此這時候需要再次對 pid 進(jìn)行檢查。

最后初始化local,并使用原子操作對local和localSize設(shè)值,返回當(dāng)前P對應(yīng)的local。

到這里pin方法終于講完了。畫一個簡單的圖描述一下這整個流程:

Golang中sync.Pool的作用是什么

下面我們再回到Get方法中往下走,代碼我再貼一遍,以便閱讀:

func (p *Pool) Get() interface{} {
	...
    //2.優(yōu)先從local的private中獲取
	x := l.private
	l.private = nil
	if x == nil { 
        //3,private沒有,那么從shared的頭部獲取
		x, _ = l.shared.popHead()
        //4. 如果都沒有,那么去別的local上去偷一個
		if x == nil {
			x = p.getSlow(pid)
		}
	}
    ...
	return x
}

如果private中沒有值,那么會調(diào)用shared的popHead方法獲取值。

popHead
func (c *poolChain) popHead() (interface{}, bool) {
	// 這里頭部是一個poolChainElt
	d := c.head
	// 遍歷poolChain鏈表
	for d != nil {
		// 從poolChainElt的環(huán)狀列表中獲取值
		if val, ok := d.popHead(); ok {
			return val, ok
		} 
		// load poolChain下一個對象
		d = loadPoolChainElt(&d.prev)
	}
	return nil, false
}

popHead方法里面會獲取到poolChain的頭結(jié)點(diǎn),不記得poolChain數(shù)據(jù)結(jié)構(gòu)的同學(xué)建議往上面翻一下再回來。

接著有個for循環(huán)會挨個從poolChain的頭結(jié)點(diǎn)往下遍歷,直到獲取對象返回。

func (d *poolDequeue) popHead() (interface{}, bool) {
	var slot *eface
	for {
		ptrs := atomic.LoadUint64(&d.headTail)
		// headTail的高32位為head,低32位為tail
		head, tail := d.unpack(ptrs)
		// 首尾相等,那么這個隊(duì)列就是空的
		if tail == head { 
			return nil, false
		} 
		// 這里需要head--之后再獲取slot
		head--
		ptrs2 := d.pack(head, tail)
		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { 
			slot = &d.vals[head&uint32(len(d.vals)-1)]
			break
		}
	} 
	val := *(*interface{})(unsafe.Pointer(slot))
	// 說明沒取到緩存的對象,返回 nil
	if val == dequeueNil(nil) {
		val = nil
	} 
	// 重置slot 
	*slot = eface{}
	return val, true
}
  • poolDequeue的popHead方法首先會獲取到headTail的值,然后調(diào)用unpack解包,headTail是一個64位的值,高32位表示head,低32位表示tail。

  • 判斷head和tail是否相等,相等那么這個隊(duì)列就是空的;

  • 如果隊(duì)列不是空的,那么將head減一之后再使用,因?yàn)閔ead當(dāng)前指的位置是空值,表示下一個新對象存放的位置;

  • CAS重新設(shè)值新的headTail,成功之后獲取slot,這里因?yàn)関als大小是2的n 次冪,因此len(d.vals)-1)之后低n位全是1,和head取與之后可以獲取到head的低n位的值;

  • 如果slot所對應(yīng)的對象是dequeueNil,那么表示是空值,直接返回,否則將slot指針對應(yīng)位置的值置空,返回val。

如果shared的popHead方法也沒獲取到值,那么就需要調(diào)用getSlow方法獲取了。

getSlow
func (p *Pool) getSlow(pid int) interface{} { 
	size := atomic.LoadUintptr(&p.localSize) // load-acquire
	locals := p.local                        // load-consume 
	// 遍歷locals列表,從其他的local的shared列表尾部獲取對象
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i+1)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	} 
	size = atomic.LoadUintptr(&p.victimSize)
	if uintptr(pid) >= size {
		return nil
	}
	locals = p.victim
	l := indexLocal(locals, pid)
	// victim的private不為空則返回
	if x := l.private; x != nil {
		l.private = nil
		return x
	}
	//  遍歷victim對應(yīng)的locals列表,從其他的local的shared列表尾部獲取對象
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	} 
	// 獲取不到,將victimSize置為0
	atomic.StoreUintptr(&p.victimSize, 0)
	return nil
}

getSlow方法會遍歷locals列表,這里需要注意的是,遍歷是從索引為 pid+1 的 poolLocal 處開始,嘗試調(diào)用shared的popTail方法獲取對象;如果沒有拿到,則從 victim 里找。如果都沒找到,那么就將victimSize置為0,下次就不找victim了。

poolChain&popTail
func (c *poolChain) popTail() (interface{}, bool) {
	d := loadPoolChainElt(&c.tail)
	// 如果最后一個節(jié)點(diǎn)是空的,那么直接返回
	if d == nil {
		return nil, false
	}

	for { 
		// 這里獲取的是next節(jié)點(diǎn),與一般的雙向鏈表是相反的
		d2 := loadPoolChainElt(&d.next)
		// 獲取尾部對象
		if val, ok := d.popTail(); ok {
			return val, ok
		}

		if d2 == nil { 
			return nil, false
		} 
		// 因?yàn)閐已經(jīng)沒有數(shù)據(jù)了,所以重置tail為d2,并刪除d2的上一個節(jié)點(diǎn)
		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
			storePoolChainElt(&d2.prev, nil)
		}
		d = d2
	}
}
  • 判斷poolChain,如果最后一個節(jié)點(diǎn)是空的,那么直接返回;

  • 進(jìn)入for循環(huán),獲取tail的next節(jié)點(diǎn),這里需要注意的是這個雙向鏈表與一般的鏈表是反向的,不清楚的可以再去看看第一張圖;

  • 調(diào)用popTail獲取poolDequeue列表的對象,有對象直接返回;

  • d2為空則表示已經(jīng)遍歷完整個poolChain雙向列表了,都為空,那么直接返回;

  • 通過CAS將tail重置為d2,因?yàn)閐已經(jīng)沒有數(shù)據(jù)了,并將d2的prev節(jié)點(diǎn)置為nil,然后將d置為d2,進(jìn)入下一個循環(huán);

poolDequeue&popTail
func (d *poolDequeue) popTail() (interface{}, bool) {
	var slot *eface
	for {
		ptrs := atomic.LoadUint64(&d.headTail)
		// 和pophead一樣,將headTail解包
		head, tail := d.unpack(ptrs)
		// 首位相等,表示列表中沒有數(shù)據(jù),返回
		if tail == head { 
			return nil, false
		} 
		ptrs2 := d.pack(head, tail+1)
		// CAS重置tail位置
		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { 
			// 獲取tail位置對象
			slot = &d.vals[tail&uint32(len(d.vals)-1)]
			break
		}
	} 
	val := *(*interface{})(unsafe.Pointer(slot))
	// 判斷對象是不是為空
	if val == dequeueNil(nil) {
		val = nil
	} 
	// 將slot置空
	slot.val = nil
	atomic.StorePointer(&slot.typ, nil) 
	return val, true
}

如果看懂了popHead,那么這個popTail方法是和它非常的相近的。

popTail簡單來說也是從隊(duì)列尾部移除一個元素,如果隊(duì)列為空,返回 false。但是需要注意的是,這個popTail可能會被多個消費(fèi)者調(diào)用,所以需要循環(huán)CAS獲取對象;在poolDequeue環(huán)狀列表中tail是有數(shù)據(jù)的,不必像popHead中head--

最后,需要將slot置空。

大家可以再對照一下圖回顧一下代碼:

Golang中sync.Pool的作用是什么

Put方法

func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
	...
	l, _ := p.pin()
	if l.private == nil {
		l.private = x
		x = nil
	}
	if x != nil {
		l.shared.pushHead(x)
	}
    runtime_procUnpin()
	...
}

看完了Get方法,看Put方法就容易多了。同樣Put方法首先會去Pin住當(dāng)前goroutine和P,然后嘗試將 x 賦值給 private 字段。如果private不為空,那么就調(diào)用pushHead將其放入到shared隊(duì)列中。

poolChain&pushHead
func (c *poolChain) pushHead(val interface{}) {
	d := c.head
	// 頭節(jié)點(diǎn)沒有初始化,那么設(shè)值一下
	if d == nil {
		const initSize = 8 // Must be a power of 2
		d = new(poolChainElt)
		d.vals = make([]eface, initSize)
		c.head = d
		storePoolChainElt(&c.tail, d)
	}
	// 將對象加入到環(huán)狀隊(duì)列中
	if d.pushHead(val) {
		return
	}
	newSize := len(d.vals) * 2
	// 這里做了限制,單個環(huán)狀隊(duì)列不能超過2的30次方大小
	if newSize >= dequeueLimit {
		newSize = dequeueLimit
	}
	// 初始化新的環(huán)狀列表,大小是d的兩倍
	d2 := &poolChainElt{prev: d}
	d2.vals = make([]eface, newSize)
	c.head = d2
	storePoolChainElt(&d.next, d2)
	// push到新的隊(duì)列中
	d2.pushHead(val)
}

如果頭節(jié)點(diǎn)為空,那么需要創(chuàng)建一個新的poolChainElt對象作為頭節(jié)點(diǎn),大小為8;然后調(diào)用pushHead放入到環(huán)狀隊(duì)列中;

如果放置失敗,那么創(chuàng)建一個 poolChainElt 節(jié)點(diǎn),并且雙端隊(duì)列的長度翻倍,當(dāng)然長度也不能超過dequeueLimit,即2的30次方;

然后將新的節(jié)點(diǎn)d2和d互相綁定一下,并將d2設(shè)值為頭節(jié)點(diǎn),將傳入的對象push到d2中;

poolDequeue&pushHead
func (d *poolDequeue) pushHead(val interface{}) bool {
	ptrs := atomic.LoadUint64(&d.headTail)
	// 解包headTail
	head, tail := d.unpack(ptrs)
	// 判斷隊(duì)列是否已滿
	if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { 
		return false
	}
	// 找到head的槽位
	slot := &d.vals[head&uint32(len(d.vals)-1)] 
	// 檢查slot是否和popTail有沖突
	typ := atomic.LoadPointer(&slot.typ)
	if typ != nil { 
		return false
	} 
	if val == nil {
		val = dequeueNil(nil)
	}
	// 將 val 賦值到 slot,并將 head 指針值加 1
	*(*interface{})(unsafe.Pointer(slot)) = val 
	atomic.AddUint64(&d.headTail, 1<<dequeueBits)
	return true
}

首先通過位運(yùn)算判斷隊(duì)列是否已滿,也就是將尾部指針加上 len(d.vals) ,因?yàn)閔ead指向的是將要被填充的位置,所以head和tail位置是相隔len(d.vals),然后再取低 31 位,看它是否和 head 相等。如果隊(duì)列滿了,直接返回 false;

然后找到找到head的槽位slot,并判斷typ是否為空,因?yàn)閜opTail 是先設(shè)置 val,再將 typ 設(shè)置為 nil,所以如果有沖突,那么直接返回;

最后設(shè)值slot,并將head加1返回;

GC

在pool.go文件的 init 函數(shù)里,注冊了 GC 發(fā)生時,如何清理 Pool 的函數(shù):

func init() {
	runtime_registerPoolCleanup(poolCleanup)
}

func poolCleanup() { 
	for _, p := range oldPools {
		p.victim = nil
		p.victimSize = 0
	} 
	for _, p := range allPools {
		p.victim = p.local
		p.victimSize = p.localSize
		p.local = nil
		p.localSize = 0
	} 
	oldPools, allPools = allPools, nil
}

poolCleanup 會在 STW 階段被調(diào)用。主要是將 local 和 victim 作交換,那么不至于GC 把所有的 Pool 都清空了,而是需要兩個 GC 周期才會被釋放。如果 sync.Pool 的獲取、釋放速度穩(wěn)定,那么就不會有新的池對象進(jìn)行分配。

關(guān)于Golang中sync.Pool的作用是什么就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI