溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

sync.Pool的實現(xiàn)原理是什么

發(fā)布時間:2021-07-28 18:26:48 來源:億速云 閱讀:208 作者:chen 欄目:云計算

本篇內(nèi)容主要講解“sync.Pool的實現(xiàn)原理是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“sync.Pool的實現(xiàn)原理是什么”吧!

sync.Pool實現(xiàn)原理

對象的創(chuàng)建和銷毀會消耗一定的系統(tǒng)資源(內(nèi)存,gc等),過多的創(chuàng)建銷毀對象會帶來內(nèi)存不穩(wěn)定與更長的gc停頓,因為go的gc不存在分代,因而更加不擅長處理這種問題。因而go早早就推出Pool包用于緩解這種情況。Pool用于核心的功能就是Put和Get。當我們需要一個對象的時候通過Get獲取一個,創(chuàng)建的對象也可以Put放進池子里,通過這種方式可以反復利用現(xiàn)有對象,這樣gc就不用高頻的促發(fā)內(nèi)存gc了。

結構

    type Pool struct {
        noCopy noCopy

        local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
        localSize uintptr        // size of the local array

        // New optionally specifies a function to generate
        // a value when Get would otherwise return nil.
        // It may not be changed concurrently with calls to Get.
        New func() interface{}
    }

創(chuàng)建時候指定New方法用于創(chuàng)建默認對象,local,localSize會在隨后用到的時候生成. local是一個poolLocalInternal的切片指針。

    type poolLocalInternal struct {
        private interface{}   // Can be used only by the respective P.
        shared  []interface{} // Can be used by any P.
        Mutex                 // Protects shared.
    }

當不同的p調(diào)用Pool時,每個p都會在local上分配這樣一個poolLocal,索引值就是p的id。 private存放的對象只能由創(chuàng)建的p讀寫,shared則會在多個p之間共享。 sync.Pool的實現(xiàn)原理是什么

PUT

    // Put adds x to the pool.
    func (p *Pool) Put(x interface{}) {
        if x == nil {
            return
        }
        if race.Enabled {
            if fastrand()%4 == 0 {
                // Randomly drop x on floor.
                return
            }
            race.ReleaseMerge(poolRaceAddr(x))
            race.Disable()
        }
        l := p.pin()
        if l.private == nil {
            l.private = x
            x = nil
        }
        runtime_procUnpin()
        if x != nil {
            l.Lock()
            l.shared = append(l.shared, x)
            l.Unlock()
        }
        if race.Enabled {
            race.Enable()
        }
    }

Put先要通過pin函數(shù)獲取當前Pool對應的pid位置上的localPool,然后檢查private是否存在,存在則設置到private上,如果不存在就追加到shared尾部。

func (p *Pool) pin() *poolLocal {
	pid := runtime_procPin()
	// In pinSlow we store to localSize and then to local, here we load in opposite order.
	// Since we've disabled preemption, GC cannot happen in between.
	// Thus here we must observe local at least as large localSize.
	// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
	s := atomic.LoadUintptr(&p.localSize) // load-acquire
	l := p.local                          // load-consume
	if uintptr(pid) < s {                 // 這句話的意思是如果當前pool的localPool切片尚未創(chuàng)建,尚未創(chuàng)建這句話肯定是false的
		return indexLocal(l, pid)
	}
	return p.pinSlow()
}

pin函數(shù)先通過自旋加鎖(可以避免p自身發(fā)生并發(fā)),在檢查本地local切片的size,size大于當前pid則使用pid去本地local切片上索引到localpool對象,否則就要走pinSlow對象創(chuàng)建本地localPool切片了.

func (p *Pool) pinSlow() *poolLocal {
	// Retry under the mutex.
	// Can not lock the mutex while pinned.
	runtime_procUnpin()
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	pid := runtime_procPin()
	// poolCleanup won't be called while we are pinned.
	s := p.localSize
	l := p.local
	if uintptr(pid) < s {
		return indexLocal(l, pid)
	}
	if p.local == nil {
		allPools = append(allPools, p)
	}
	// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
	size := runtime.GOMAXPROCS(0)
	local := make([]poolLocal, size)
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
	atomic.StoreUintptr(&p.localSize, uintptr(size))         // store-release
	return &local[pid]
}

pinShow先要取消自旋鎖,因為后面的lock內(nèi)部也會嘗試自旋鎖,下面可能會操作allpool因而這里需要使用互斥鎖allPoolsMu,然后又加上自旋鎖,(這里注釋說不會發(fā)生poolCleanup,但是查看代碼gcstart只是查看了當前m的lock狀態(tài),然而避免不了其他m觸發(fā)的gc,尚存疑),這里會再次嘗試之前的操作,因為可能在unpin,pin之間有并發(fā)產(chǎn)生了poolocal,確認本地local切片是空的才會生成一個新的pool。后面是創(chuàng)建Pool上的localPool切片,runtime.GOMAXPROCS這里的作用是返回p的數(shù)量,用于確定pool的localpool的數(shù)量.

GET

    func (p *Pool) Get() interface{} {
        if race.Enabled {
            race.Disable()
        }
        l := p.pin()
        x := l.private
        l.private = nil
        runtime_procUnpin()
        if x == nil {
            l.Lock()
            last := len(l.shared) - 1
            if last >= 0 {
                x = l.shared[last]
                l.shared = l.shared[:last]
            }
            l.Unlock()
            if x == nil {
                x = p.getSlow()
            }
        }
        if race.Enabled {
            race.Enable()
            if x != nil {
                race.Acquire(poolRaceAddr(x))
            }
        }
        if x == nil && p.New != nil {
            x = p.New()
        }
        return x
    }

GET 先調(diào)用pin獲取本地local,這個具體流程和上面一樣了,如果當前private存在返回private上面的對象,如果不存在就從shared查找,存在返回尾部對象,反之就要從其他的p的localPool里面偷了。

    func (p *Pool) getSlow() (x interface{}) {
        // See the comment in pin regarding ordering of the loads.
        size := atomic.LoadUintptr(&p.localSize) // load-acquire
        local := p.local                         // load-consume
        // Try to steal one element from other procs.
        pid := runtime_procPin()
        runtime_procUnpin()
        for i := 0; i < int(size); i++ {
            l := indexLocal(local, (pid+i+1)%int(size))
            l.Lock()
            last := len(l.shared) - 1
            if last >= 0 {
                x = l.shared[last]
                l.shared = l.shared[:last]
                l.Unlock()
                break
            }
            l.Unlock()
        }
        return x
    }

首先就要獲取當前size,用于輪詢p的local,這里的查詢順序不是從0開始,而是是從當前p的位置往后查一圈。查到依次檢查每個p的shared上是否存在對象,如果存在就獲取末尾的值。 如果所有p的poollocal都是空的,那么初始化的New函數(shù)就起作用了,調(diào)用這個New函數(shù)創(chuàng)建一個新的對象出來。

清理

func poolCleanup() {
	// This function is called with the world stopped, at the beginning of a garbage collection.
	// It must not allocate and probably should not call any runtime functions.
	// Defensively zero out everything, 2 reasons:
	// 1. To prevent false retention of whole Pools.
	// 2. If GC happens while a goroutine works with l.shared in Put/Get,
	//    it will retain whole Pool. So next cycle memory consumption would be doubled.
	for i, p := range allPools {
		allPools[i] = nil
		for i := 0; i < int(p.localSize); i++ {
			l := indexLocal(p.local, i)
			l.private = nil
			for j := range l.shared {
				l.shared[j] = nil
			}
			l.shared = nil
		}
		p.local = nil
		p.localSize = 0
	}
	allPools = []*Pool{}
}

pool對象的清理是在每次gc之前清理,通過runtime_registerPoolCleanup函數(shù)注冊一個上面的poolCleanup對象,內(nèi)部會把這個函數(shù)設置到clearpool函數(shù)上面,然后每次gc之前會調(diào)用clearPool來取消所有pool的引用,重置所有的Pool。代碼很簡單就是輪詢一邊設置nil,然后取消所有poollocal,pool引用。方法簡單粗暴。由于clearPool是在STW中調(diào)用的,如果Pool存在大量對象會拉長STW的時間,在已經(jīng)有提案來修復這個問題了(CL 166961.)[https://go-review.googlesource.com/c/go/+/166961/]

到此,相信大家對“sync.Pool的實現(xiàn)原理是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關內(nèi)容可以進入相關頻道進行查詢,關注我們,繼續(xù)學習!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI