溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何理解go-zero對Go中g(shù)oroutine支持的并發(fā)組件 ?

發(fā)布時(shí)間:2021-10-13 09:52:17 來源:億速云 閱讀:286 作者:iii 欄目:編程語言

本篇內(nèi)容主要講解“如何理解go-zero對Go中g(shù)oroutine支持的并發(fā)組件 ”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“如何理解go-zero對Go中g(shù)oroutine支持的并發(fā)組件 ”吧!

threading

雖然 go func() 已經(jīng)很方便,但是有幾個(gè)問題:

  • 如果協(xié)程異常退出,無法追蹤異常棧

  • 某個(gè)異常請求觸發(fā)panic,應(yīng)該做故障隔離,而不是整個(gè)進(jìn)程退出,容易被攻擊

我們看看 core/threading 包提供了哪些額外選擇:

func GoSafe(fn func()) {
	go RunSafe(fn)
}

func RunSafe(fn func()) {
	defer rescue.Recover()
	fn()
}

func Recover(cleanups ...func()) {
	for _, cleanup := range cleanups {
		cleanup()
	}

	if p := recover(); p != nil {
		logx.ErrorStack(p)
	}
}

GoSafe

threading.GoSafe() 就幫你解決了這個(gè)問題。開發(fā)者可以將自己在協(xié)程中需要完成邏輯,以閉包的方式傳入,由 GoSafe() 內(nèi)部 go func();

當(dāng)開發(fā)者的函數(shù)出現(xiàn)異常退出時(shí),會在 Recover() 中打印異常棧,以便讓開發(fā)者更快確定異常發(fā)生點(diǎn)和調(diào)用棧。

NewWorkerGroup

我們再看第二個(gè):WaitGroup。日常開發(fā),其實(shí) WaitGroup 沒什么好說的,你需要 N 個(gè)協(xié)程協(xié)作 :wg.Add(N) ,等待全部協(xié)程完成任務(wù):wg.Wait(),同時(shí)完成一個(gè)任務(wù)需要手動 wg.Done()。

可以看的出來,在任務(wù)開始 -> 結(jié)束 -> 等待,整個(gè)過程需要開發(fā)者關(guān)注任務(wù)的狀態(tài)然后手動修改狀態(tài)。

NewWorkerGroup 就幫開發(fā)者減輕了負(fù)擔(dān),開發(fā)者只需要關(guān)注:

  1. 任務(wù)邏輯【函數(shù)】

  2. 任務(wù)數(shù)【workers

然后啟動 WorkerGroup.Start(),對應(yīng)任務(wù)數(shù)就會啟動:

func (wg WorkerGroup) Start() {
  // 包裝了sync.WaitGroup
	group := NewRoutineGroup()
	for i := 0; i < wg.workers; i++ {
    // 內(nèi)部維護(hù)了 wg.Add(1) wg.Done()
    // 同時(shí)也是 goroutine 安全模式下進(jìn)行的
		group.RunSafe(wg.job)
	}
	group.Wait()
}

worker 的狀態(tài)會自動管理,可以用來固定數(shù)量的 worker 來處理消息隊(duì)列的任務(wù),用法如下:

func main() {
  group := NewWorkerGroup(func() {
    // process tasks
	}, runtime.NumCPU())
	group.Start()
}

Pool

這里的 Pool 不是 sync.Pool。sync.Pool 有個(gè)不方便的地方是它池化的對象可能會被垃圾回收掉,這個(gè)就讓開發(fā)者疑惑了,不知道自己創(chuàng)建并存入的對象什么時(shí)候就沒了。

go-zero 中的 pool

  1. pool 中的對象會根據(jù)使用時(shí)間做懶銷毀;

  2. 使用 cond 做對象消費(fèi)和生產(chǎn)的通知以及阻塞;

  3. 開發(fā)者可以自定義自己的生產(chǎn)函數(shù),銷毀函數(shù);

那我來看看生產(chǎn)對象,和消費(fèi)對象在 pool 中時(shí)怎么實(shí)現(xiàn)的:

func (p *Pool) Get() interface{} {
  // 調(diào)用 cond.Wait 時(shí)必須要持有c.L的鎖
	p.lock.Lock()
	defer p.lock.Unlock()

	for {
    // 1. pool中對象池是一個(gè)用鏈表連接的nodelist
		if p.head != nil {
			head := p.head
			p.head = head.next
      // 1.1 如果當(dāng)前節(jié)點(diǎn):當(dāng)前時(shí)間 >= 上次使用時(shí)間+對象最大存活時(shí)間
			if p.maxAge > 0 && head.lastUsed+p.maxAge < timex.Now() {
				p.created--
        // 說明當(dāng)前節(jié)點(diǎn)已經(jīng)過期了 -> 銷毀節(jié)點(diǎn)對應(yīng)的對象,然后繼續(xù)尋找下一個(gè)節(jié)點(diǎn)
        // 【??:不是銷毀節(jié)點(diǎn),而是銷毀節(jié)點(diǎn)對應(yīng)的對象】
				p.destroy(head.item)
				continue
			} else {
				return head.item
			}
		}
		// 2. 對象池是懶加載的,get的時(shí)候才去創(chuàng)建對象鏈表
		if p.created < p.limit {
			p.created++
      // 由開發(fā)者自己傳入:生產(chǎn)函數(shù)
			return p.create()
		}
		
		p.cond.Wait()
	}
}
func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
	// 互斥訪問 pool 中nodelist
	p.lock.Lock()
	defer p.lock.Unlock()

	p.head = &node{
		item:     x,
		next:     p.head,
		lastUsed: timex.Now(),
	}
  // 放入head,通知其他正在get的協(xié)程【極為關(guān)鍵】
	p.cond.Signal()
}

上述就是 go-zeroCond 的使用??梢灶惐?生產(chǎn)者-消費(fèi)者模型,只是在這里沒有使用 channel 做通信,而是用 Cond 。這里有幾個(gè)特性:

  • Cond和一個(gè)Locker關(guān)聯(lián),可以利用這個(gè)Locker對相關(guān)的依賴條件更改提供保護(hù)。

  • Cond可以同時(shí)支持 SignalBroadcast 方法,而 Channel 只能同時(shí)支持其中一種。

到此,相信大家對“如何理解go-zero對Go中g(shù)oroutine支持的并發(fā)組件 ”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI