您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“go同步協(xié)程的必備工具WaitGroup怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“go同步協(xié)程的必備工具WaitGroup怎么使用”吧!
本文將介紹 Go 語言中的 WaitGroup 并發(fā)原語,包括 WaitGroup 的基本使用方法、實(shí)現(xiàn)原理、使用注意事項(xiàng)以及常見的使用方式。能夠更好地理解和應(yīng)用 WaitGroup 來協(xié)調(diào)多個(gè) Goroutine 的執(zhí)行,提高 Go 并發(fā)編程的效率和穩(wěn)定性。
WaitGroup
是Go語言標(biāo)準(zhǔn)庫中的一個(gè)結(jié)構(gòu)體,它提供了一種簡(jiǎn)單的機(jī)制,用于同步多個(gè)協(xié)程的執(zhí)行。適用于需要并發(fā)執(zhí)行多個(gè)任務(wù)并等待它們?nèi)客瓿珊蟛拍芾^續(xù)執(zhí)行后續(xù)操作的場(chǎng)景。
首先主協(xié)程創(chuàng)建WaitGroup實(shí)例,然后在每個(gè)協(xié)程的開始處,調(diào)用Add(1)
方法,表示需要等待一個(gè)任務(wù)執(zhí)行完成,然后協(xié)程在任務(wù)執(zhí)行完成之后,調(diào)用Done
方法,表示任務(wù)已經(jīng)執(zhí)行完成了。
主協(xié)程中,需要調(diào)用Wait()
方法,等待所有協(xié)程完成任務(wù),示例如下:
func main(){ //首先主協(xié)程創(chuàng)建WaitGroup實(shí)例 var wg sync.WaitGroup // 開始時(shí)調(diào)用Add方法表示有個(gè)任務(wù)開始執(zhí)行 wg.Add(1) go func() { // 開始執(zhí)行... //完成之后,調(diào)用Done方法 wg.Done() }() // 調(diào)用Wait()方法,等待所有協(xié)程完成任務(wù) wg.Wait() // 執(zhí)行后續(xù)邏輯 }
package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup for i := 0; i < 5; i++ { wg.Add(1) go func(i int) { defer wg.Done() fmt.Printf("任務(wù)%d開始執(zhí)行\(zhòng)n", i) // 模擬協(xié)程任務(wù)執(zhí)行一段時(shí)間 time.Sleep(time.Duration(rand.Int() % 100)) // 線程任務(wù)執(zhí)行完成 fmt.Printf("任務(wù)%d執(zhí)行完畢\n", i) }(i) } fmt.Println("主協(xié)程開始等待所有任務(wù)執(zhí)行完成...") wg.Wait() fmt.Println("所有協(xié)程已經(jīng)執(zhí)行完畢...") }
在這個(gè)例子中,我們使用了sync.WaitGroup
來等待5個(gè)協(xié)程執(zhí)行完畢。在循環(huán)中,每創(chuàng)建一個(gè)任務(wù),我們調(diào)用一次wg.Add(1)
方法,然后啟動(dòng)一個(gè)協(xié)程去執(zhí)行任務(wù),當(dāng)協(xié)程完成任務(wù)后,調(diào)用wg.Done
方法,告知主協(xié)程任務(wù)已經(jīng)執(zhí)行完畢。然后主協(xié)程會(huì)在5個(gè)協(xié)程任務(wù)全部執(zhí)行完畢之后,才會(huì)繼續(xù)向下執(zhí)行。
WaitGroup
的設(shè)計(jì)初衷就是為了等待一組操作完成后再執(zhí)行下一步操作,通常會(huì)在一組協(xié)程中使用。
sync.WaitGroup
結(jié)構(gòu)體中的 state1
和 state2
字段是用于實(shí)現(xiàn) WaitGroup
功能的重要變量。
type WaitGroup struct { noCopy noCopy state1 uint64 state2 uint32 }
由于 WaitGroup
需要等待一組操作完成之后再執(zhí)行,因此需要等待所有操作完成之后才能繼續(xù)執(zhí)行。為了實(shí)現(xiàn)這個(gè)功能,WaitGroup 使用了一個(gè)計(jì)數(shù)器 counter
來記錄還有多少個(gè)操作沒有完成,如果 counter
的值為 0,則表示所有操作已經(jīng)完成。
同時(shí),WaitGroup
在所有任務(wù)都完成之后,需要喚醒所有處于等待的協(xié)程,此時(shí)需要知道有多少個(gè)協(xié)程處于等待狀態(tài)。為了實(shí)現(xiàn)這個(gè)功能,WaitGroup 使用了一個(gè)等待計(jì)數(shù)器 waiter
來記錄當(dāng)前有多少個(gè)協(xié)程正在等待操作完成。
這里WaitGroup
對(duì)于計(jì)數(shù)器和等待計(jì)數(shù)器的實(shí)現(xiàn),是通過一個(gè)64位無符號(hào)整數(shù)來實(shí)現(xiàn)的,也就是WaitGroup
結(jié)構(gòu)體中的state1,其中高32位保存了任務(wù)計(jì)數(shù)器counter
的值,低32位保存了等待計(jì)數(shù)器waiter
的值。當(dāng)我們創(chuàng)建一個(gè) WaitGroup
實(shí)例時(shí),該實(shí)例的任務(wù)計(jì)數(shù)器和等待計(jì)數(shù)器都被初始化為 0。
而且,等待協(xié)程需要等待所有任務(wù)完成之后才能繼續(xù)執(zhí)行,所以等待協(xié)程在任務(wù)未完成時(shí)會(huì)被阻塞,當(dāng)任務(wù)全部完成后,自動(dòng)被喚醒。WaitGroup
使用 state2
用于實(shí)現(xiàn)信號(hào)量機(jī)制。通過調(diào)用 runtime_Semacquire()
和 runtime_Semrelease()
函數(shù),可以在不阻塞線程的情況下進(jìn)行等待和通知操作。
調(diào)用 Add()
方法增加/減小counter
的值,delta的值可以是正數(shù),也可以是負(fù)數(shù),下面是Add
方法的源碼實(shí)現(xiàn):
func (wg *WaitGroup) Add(delta int) { // delta 的值可以為負(fù)數(shù),Done方法便是通過Add(-1)來實(shí)現(xiàn)的 // statep: 為state1的地址 semap: 為state2的地址 statep, semap := wg.state() // 高32位的值 加上 delta,增加任務(wù)計(jì)數(shù)器的值 state := atomic.AddUint64(statep, uint64(delta)<<32) // v: 取高32位數(shù)據(jù),獲取到待完成任務(wù)數(shù) v := int32(state >> 32) // 取低32位數(shù)據(jù),獲取到等待線程的值 w := uint32(state) // v > 0: 說明還有待完成的任務(wù)數(shù),此時(shí)不應(yīng)該喚醒等待協(xié)程 // w = 0: 說明沒有協(xié)程在等待,此時(shí)可以直接退出 if v > 0 || w == 0 { return } // 此時(shí)v = 0,所有任務(wù)都完成了,喚醒等待協(xié)程 *statep = 0 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } }
調(diào)用 Done()
方法表示完成了一個(gè)任務(wù),通過調(diào)用Add
方法,delta
值為-1,減少任務(wù)計(jì)數(shù)器counter
的值,當(dāng)其歸為0時(shí),便自動(dòng)喚醒所有處于等待的協(xié)程。
// Done decrements the WaitGroup counter by one. func (wg *WaitGroup) Done() { wg.Add(-1) }
調(diào)用Wait
方法,等待任務(wù)執(zhí)行完成,增加等待計(jì)數(shù)器Waiter
的值:
func (wg *WaitGroup) Wait() { // statep: 為state1的地址 semap: 為state2的地址 statep, semap := wg.state() for { // 加載state1的值 state := atomic.LoadUint64(statep) // v: 取高32位數(shù)據(jù),獲取到待完成任務(wù)數(shù) v := int32(state >> 32) // 沒有任務(wù)待執(zhí)行,全部都完成了 if v == 0 { return } // 增加waiter計(jì)數(shù)器的值 if atomic.CompareAndSwapUint64(statep, state, state+1) { // 等待被喚醒 runtime_Semacquire(semap) return } } }
Add
方法,Done
方法以及Wait
方法實(shí)現(xiàn)中,有一些異常場(chǎng)景的驗(yàn)證邏輯被我刪除掉了。當(dāng)出現(xiàn)異常場(chǎng)景時(shí),說明用戶使用方式和WaitGroup
的設(shè)計(jì)初衷相違背了,此時(shí)WaitGroup
就會(huì)直接panic。
下面通過說明使用的注意事項(xiàng),來間接介紹WaitGroup
的異常驗(yàn)證邏輯。
下面是一個(gè)Add方法和Done方法沒有成對(duì)出現(xiàn)的例子,此時(shí)Add方法調(diào)多了,此時(shí)計(jì)數(shù)器永遠(yuǎn)大于0,Wait 方法會(huì)一直阻塞等待。
package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() fmt.Println("Goroutine 1") }() go func() { fmt.Println("Goroutine 2") }() wg.Wait() fmt.Println("All goroutines finished") }
在上述代碼中,我們調(diào)用了wg.Add(2)
,但只調(diào)用了一次wg.Done()
。這會(huì)導(dǎo)致counter
的值大于0,因此調(diào)用wg.Wait()
會(huì)被永久阻塞,不會(huì)繼續(xù)向下繼續(xù)執(zhí)行。
還有另外一種情況時(shí)Done方法調(diào)用多了,此時(shí)任務(wù)計(jì)數(shù)器counter
的值為負(fù)數(shù),從WaitGroup
設(shè)計(jì)的語意來看,就是需要等待完成的任務(wù)數(shù)為負(fù)數(shù),這個(gè)不符合預(yù)期,此時(shí)將會(huì)直接panic
package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup wg.Add(1) go func() { fmt.Println("Goroutine 1 started") wg.Done() // 第一次調(diào)用Done方法 wg.Done() // 第二次調(diào)用Done方法 fmt.Println("Goroutine 1 completed") }() wg.Wait() fmt.Println("All goroutines completed") }
在上面的例子中,我們啟動(dòng)了一個(gè)goroutine,第一次調(diào)用Add
方法,counter的值變?yōu)?,在第14行調(diào)用Done
,此時(shí)計(jì)數(shù)器的值變?yōu)?,此時(shí)等待中的goroutine將會(huì)被喚醒。在第15行又調(diào)用了一次Done
方法,當(dāng)counter減小為0時(shí),再次調(diào)用Done
方法會(huì)導(dǎo)致panic,因?yàn)榇藭r(shí)waitGroup
的計(jì)數(shù)器已經(jīng)為0,再次減少將導(dǎo)致負(fù)數(shù)計(jì)數(shù),這是不被允許的。
所以在調(diào)用Done方法時(shí),需要保證每次調(diào)用都與Add方法的調(diào)用一一對(duì)應(yīng),否則會(huì)導(dǎo)致程序出現(xiàn)錯(cuò)誤。
WaitGroup
的設(shè)計(jì)初衷就是為了等待一組操作完成后再執(zhí)行下一步操作。所以,如果在所有任務(wù)添加之前,便調(diào)用Wait
方法進(jìn)行等待,此時(shí)有可能會(huì)導(dǎo)致等待協(xié)程提前被喚醒,執(zhí)行下一步操作,而尚未添加的任務(wù)則不會(huì)被等待,這違反了WaitGroup的設(shè)計(jì)初衷,也不符合預(yù)期。下面是一個(gè)簡(jiǎn)單的例子:
package main import ( "fmt" "sync" "time" ) func main() { var wg sync.WaitGroup for i := 1; i <= 3; i++ { go func(id int) { wg.Add(1) defer wg.Done() fmt.Printf("Goroutine %d started\n", id) time.Sleep(time.Duration(id) * time.Second) fmt.Printf("Goroutine %d finished\n", id) }(i) } // 不等待所有任務(wù)添加,就開始等待 wg.Wait() fmt.Println("All goroutines finished") time.Sleep(10 * time.Second) }
代碼執(zhí)行結(jié)果如下,等待協(xié)程被提前喚醒,執(zhí)行之后的操作,而子任務(wù)在等待協(xié)程喚醒后才開始執(zhí)行:
All goroutines finished
Goroutine 1 started
Goroutine 3 started
Goroutine 2 started
Goroutine 1 finished
Goroutine 2 finished
Goroutine 3 finished
在這個(gè)例子中,我們創(chuàng)建了三個(gè)協(xié)程并打印出它們開始和結(jié)束的消息。但是,我們沒有在任務(wù)開始前調(diào)用Add
方法添加任務(wù),而是在任務(wù)開始之后再調(diào)用Add
方法添加任務(wù)。
這可能會(huì)導(dǎo)致某些任務(wù)未被加入到WaitGroup
中,等待協(xié)程就調(diào)用了wg.Wait
方法,這樣就會(huì)導(dǎo)致一些任務(wù)未被加入WaitGrou
,從而導(dǎo)致等待協(xié)程不會(huì)等待這些任務(wù)執(zhí)行完成。如果這種情況發(fā)生了,我們會(huì)看到"All goroutines finished"被輸出,但實(shí)際上有一些協(xié)程還沒有完成。
因此,我們應(yīng)該在所有任務(wù)添加完畢之后再調(diào)用Wait
方法,以保證等待的正確性。
在函數(shù)或方法中使用,如果一個(gè)大任務(wù)可以拆分為多個(gè)獨(dú)立的子任務(wù),此時(shí)會(huì)將其進(jìn)行拆分,并使用多個(gè)協(xié)程來并發(fā)執(zhí)行這些任務(wù),提高執(zhí)行效率,同時(shí)使用WaitGroup
等待所有子任務(wù)執(zhí)行完成,完成協(xié)程間的同步。
下面來看go-redis中ClusterClient
結(jié)構(gòu)體中ForEachMaster
方法中對(duì)于WaitGroup
的使用。ForEachMaster
方法通常用于在 Redis 集群中執(zhí)行針對(duì)所有主節(jié)點(diǎn)的某種操作,例如在集群中添加或刪除鍵,或者執(zhí)行一些全局的診斷操作,具體執(zhí)行的操作由傳入?yún)?shù)fn
指定。
這里ForEachMaster
方法會(huì)對(duì)所有主節(jié)點(diǎn)執(zhí)行某種操作,這里的實(shí)現(xiàn)是對(duì)所有主節(jié)點(diǎn)執(zhí)行某種操作這個(gè)大任務(wù),拆分為多個(gè)獨(dú)立的子任務(wù),每個(gè)子任務(wù)完成對(duì)一個(gè)Master節(jié)點(diǎn)執(zhí)行指定操作,然后每個(gè)子任務(wù)啟動(dòng)一個(gè)協(xié)程去執(zhí)行,主協(xié)程使用WaitGroup
等待所有協(xié)程完成指定子任務(wù),ForEachMaster
也就完成了對(duì)所有主節(jié)點(diǎn)執(zhí)行某種操作的任務(wù)。具體實(shí)現(xiàn)如下:
func (c *ClusterClient) ForEachMaster( ctx context.Context, fn func(ctx context.Context, client *Client) error, ) error { // 重新加載集群狀態(tài),以確保狀態(tài)信息是最新的 state, err := c.state.ReloadOrGet(ctx) if err != nil { return err } var wg sync.WaitGroup // 用于協(xié)程間通信 errCh := make(chan error, 1) // 獲取到redis集群中所有的master節(jié)點(diǎn) for _, master := range state.Masters { // 啟動(dòng)一個(gè)協(xié)程來執(zhí)行該任務(wù) wg.Add(1) go func(node *clusterNode) { // 任務(wù)完成時(shí),調(diào)用Done告知WaitGroup任務(wù)已完成 defer wg.Done() err := fn(ctx, node.Client) if err != nil { select { case errCh <- err: default: } } }(master) } // 主協(xié)程等待所有任務(wù)完成 wg.Wait() return nil }
到此,相信大家對(duì)“go同步協(xié)程的必備工具WaitGroup怎么使用”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。