溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

go同步協(xié)程的必備工具WaitGroup怎么使用

發(fā)布時(shí)間:2023-03-21 14:52:58 來源:億速云 閱讀:100 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容主要講解“go同步協(xié)程的必備工具WaitGroup怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“go同步協(xié)程的必備工具WaitGroup怎么使用”吧!

1. 簡(jiǎn)介

本文將介紹 Go 語言中的 WaitGroup 并發(fā)原語,包括 WaitGroup 的基本使用方法、實(shí)現(xiàn)原理、使用注意事項(xiàng)以及常見的使用方式。能夠更好地理解和應(yīng)用 WaitGroup 來協(xié)調(diào)多個(gè) Goroutine 的執(zhí)行,提高 Go 并發(fā)編程的效率和穩(wěn)定性。

2. 基本使用

2.1 定義

WaitGroup是Go語言標(biāo)準(zhǔn)庫中的一個(gè)結(jié)構(gòu)體,它提供了一種簡(jiǎn)單的機(jī)制,用于同步多個(gè)協(xié)程的執(zhí)行。適用于需要并發(fā)執(zhí)行多個(gè)任務(wù)并等待它們?nèi)客瓿珊蟛拍芾^續(xù)執(zhí)行后續(xù)操作的場(chǎng)景。

2.2 使用方式

首先主協(xié)程創(chuàng)建WaitGroup實(shí)例,然后在每個(gè)協(xié)程的開始處,調(diào)用Add(1)方法,表示需要等待一個(gè)任務(wù)執(zhí)行完成,然后協(xié)程在任務(wù)執(zhí)行完成之后,調(diào)用Done方法,表示任務(wù)已經(jīng)執(zhí)行完成了。

主協(xié)程中,需要調(diào)用Wait()方法,等待所有協(xié)程完成任務(wù),示例如下:

func main(){
    //首先主協(xié)程創(chuàng)建WaitGroup實(shí)例
    var wg sync.WaitGroup
    // 開始時(shí)調(diào)用Add方法表示有個(gè)任務(wù)開始執(zhí)行
    wg.Add(1)
    go func() {
        // 開始執(zhí)行...
        //完成之后,調(diào)用Done方法
        wg.Done()
    }()
    // 調(diào)用Wait()方法,等待所有協(xié)程完成任務(wù)
    wg.Wait()
    // 執(zhí)行后續(xù)邏輯
}

2.3 使用例子

package main
import (
    "fmt"
    "sync"
)
func main() {
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
       wg.Add(1)
       go func(i int) {
          defer wg.Done()
          fmt.Printf("任務(wù)%d開始執(zhí)行\(zhòng)n", i)
          // 模擬協(xié)程任務(wù)執(zhí)行一段時(shí)間
          time.Sleep(time.Duration(rand.Int() % 100))
          // 線程任務(wù)執(zhí)行完成
          fmt.Printf("任務(wù)%d執(zhí)行完畢\n", i)
       }(i)
    }
    fmt.Println("主協(xié)程開始等待所有任務(wù)執(zhí)行完成...")
    wg.Wait()
    fmt.Println("所有協(xié)程已經(jīng)執(zhí)行完畢...")
}

在這個(gè)例子中,我們使用了sync.WaitGroup來等待5個(gè)協(xié)程執(zhí)行完畢。在循環(huán)中,每創(chuàng)建一個(gè)任務(wù),我們調(diào)用一次wg.Add(1)方法,然后啟動(dòng)一個(gè)協(xié)程去執(zhí)行任務(wù),當(dāng)協(xié)程完成任務(wù)后,調(diào)用wg.Done方法,告知主協(xié)程任務(wù)已經(jīng)執(zhí)行完畢。然后主協(xié)程會(huì)在5個(gè)協(xié)程任務(wù)全部執(zhí)行完畢之后,才會(huì)繼續(xù)向下執(zhí)行。

3.實(shí)現(xiàn)原理

3.1 設(shè)計(jì)初衷

WaitGroup的設(shè)計(jì)初衷就是為了等待一組操作完成后再執(zhí)行下一步操作,通常會(huì)在一組協(xié)程中使用。

3.2 基本原理

sync.WaitGroup 結(jié)構(gòu)體中的 state1state2 字段是用于實(shí)現(xiàn) WaitGroup 功能的重要變量。

type WaitGroup struct {
   noCopy noCopy
   state1 uint64
   state2 uint32
}

由于 WaitGroup 需要等待一組操作完成之后再執(zhí)行,因此需要等待所有操作完成之后才能繼續(xù)執(zhí)行。為了實(shí)現(xiàn)這個(gè)功能,WaitGroup 使用了一個(gè)計(jì)數(shù)器 counter 來記錄還有多少個(gè)操作沒有完成,如果 counter 的值為 0,則表示所有操作已經(jīng)完成。

同時(shí),WaitGroup 在所有任務(wù)都完成之后,需要喚醒所有處于等待的協(xié)程,此時(shí)需要知道有多少個(gè)協(xié)程處于等待狀態(tài)。為了實(shí)現(xiàn)這個(gè)功能,WaitGroup 使用了一個(gè)等待計(jì)數(shù)器 waiter 來記錄當(dāng)前有多少個(gè)協(xié)程正在等待操作完成。

這里WaitGroup對(duì)于計(jì)數(shù)器和等待計(jì)數(shù)器的實(shí)現(xiàn),是通過一個(gè)64位無符號(hào)整數(shù)來實(shí)現(xiàn)的,也就是WaitGroup結(jié)構(gòu)體中的state1,其中高32位保存了任務(wù)計(jì)數(shù)器counter的值,低32位保存了等待計(jì)數(shù)器waiter的值。當(dāng)我們創(chuàng)建一個(gè) WaitGroup 實(shí)例時(shí),該實(shí)例的任務(wù)計(jì)數(shù)器等待計(jì)數(shù)器都被初始化為 0。

而且,等待協(xié)程需要等待所有任務(wù)完成之后才能繼續(xù)執(zhí)行,所以等待協(xié)程在任務(wù)未完成時(shí)會(huì)被阻塞,當(dāng)任務(wù)全部完成后,自動(dòng)被喚醒。WaitGroup使用 state2 用于實(shí)現(xiàn)信號(hào)量機(jī)制。通過調(diào)用 runtime_Semacquire()runtime_Semrelease() 函數(shù),可以在不阻塞線程的情況下進(jìn)行等待和通知操作。

3.3 代碼實(shí)現(xiàn)

3.3.1 Add方法

調(diào)用 Add() 方法增加/減小counter的值,delta的值可以是正數(shù),也可以是負(fù)數(shù),下面是Add方法的源碼實(shí)現(xiàn):

func (wg *WaitGroup) Add(delta int) {
   // delta 的值可以為負(fù)數(shù),Done方法便是通過Add(-1)來實(shí)現(xiàn)的
   // statep: 為state1的地址  semap: 為state2的地址
   statep, semap := wg.state()
   // 高32位的值 加上 delta,增加任務(wù)計(jì)數(shù)器的值
   state := atomic.AddUint64(statep, uint64(delta)<<32)
   // v: 取高32位數(shù)據(jù),獲取到待完成任務(wù)數(shù)
   v := int32(state >> 32)
   // 取低32位數(shù)據(jù),獲取到等待線程的值
   w := uint32(state)
   // v > 0: 說明還有待完成的任務(wù)數(shù),此時(shí)不應(yīng)該喚醒等待協(xié)程
   // w = 0: 說明沒有協(xié)程在等待,此時(shí)可以直接退出
   if v > 0 || w == 0 {
      return
   }     
   // 此時(shí)v = 0,所有任務(wù)都完成了,喚醒等待協(xié)程
   *statep = 0
   for ; w != 0; w-- {
      runtime_Semrelease(semap, false, 0)
   }
}
3.3.2 Done方法實(shí)現(xiàn)

調(diào)用 Done() 方法表示完成了一個(gè)任務(wù),通過調(diào)用Add方法,delta值為-1,減少任務(wù)計(jì)數(shù)器counter的值,當(dāng)其歸為0時(shí),便自動(dòng)喚醒所有處于等待的協(xié)程。

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
   wg.Add(-1)
}
3.3.3 Wait方法實(shí)現(xiàn)

調(diào)用Wait方法,等待任務(wù)執(zhí)行完成,增加等待計(jì)數(shù)器Waiter的值:

func (wg *WaitGroup) Wait() {
   // statep: 為state1的地址  semap: 為state2的地址
   statep, semap := wg.state()
   for {
      // 加載state1的值
      state := atomic.LoadUint64(statep)
      // v: 取高32位數(shù)據(jù),獲取到待完成任務(wù)數(shù)
      v := int32(state >> 32)
      // 沒有任務(wù)待執(zhí)行,全部都完成了
      if v == 0 {
         return
      }
      // 增加waiter計(jì)數(shù)器的值
      if atomic.CompareAndSwapUint64(statep, state, state+1) {
          // 等待被喚醒
         runtime_Semacquire(semap)
         return
      }
   }
}

3.4 實(shí)現(xiàn)補(bǔ)充

Add方法,Done方法以及Wait方法實(shí)現(xiàn)中,有一些異常場(chǎng)景的驗(yàn)證邏輯被我刪除掉了。當(dāng)出現(xiàn)異常場(chǎng)景時(shí),說明用戶使用方式和WaitGroup的設(shè)計(jì)初衷相違背了,此時(shí)WaitGroup就會(huì)直接panic。

下面通過說明使用的注意事項(xiàng),來間接介紹WaitGroup的異常驗(yàn)證邏輯。

4.使用注意事項(xiàng)

4.1 Add方法和Done方法需要成對(duì)出現(xiàn)

下面是一個(gè)Add方法和Done方法沒有成對(duì)出現(xiàn)的例子,此時(shí)Add方法調(diào)多了,此時(shí)計(jì)數(shù)器永遠(yuǎn)大于0,Wait 方法會(huì)一直阻塞等待。

package main
import (
    "fmt"
    "sync"
)
func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        defer wg.Done()
        fmt.Println("Goroutine 1")
    }()
    go func() {
        fmt.Println("Goroutine 2")
    }()
    wg.Wait()
    fmt.Println("All goroutines finished")
}

在上述代碼中,我們調(diào)用了wg.Add(2),但只調(diào)用了一次wg.Done()。這會(huì)導(dǎo)致counter的值大于0,因此調(diào)用wg.Wait()會(huì)被永久阻塞,不會(huì)繼續(xù)向下繼續(xù)執(zhí)行。

還有另外一種情況時(shí)Done方法調(diào)用多了,此時(shí)任務(wù)計(jì)數(shù)器counter的值為負(fù)數(shù),從WaitGroup設(shè)計(jì)的語意來看,就是需要等待完成的任務(wù)數(shù)為負(fù)數(shù),這個(gè)不符合預(yù)期,此時(shí)將會(huì)直接panic

package main
import (
    "fmt"
    "sync"
)
func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        fmt.Println("Goroutine 1 started")
        wg.Done() // 第一次調(diào)用Done方法
        wg.Done() // 第二次調(diào)用Done方法
        fmt.Println("Goroutine 1 completed")
    }()
    wg.Wait()
    fmt.Println("All goroutines completed")
}

在上面的例子中,我們啟動(dòng)了一個(gè)goroutine,第一次調(diào)用Add方法,counter的值變?yōu)?,在第14行調(diào)用Done,此時(shí)計(jì)數(shù)器的值變?yōu)?,此時(shí)等待中的goroutine將會(huì)被喚醒。在第15行又調(diào)用了一次Done方法,當(dāng)counter減小為0時(shí),再次調(diào)用Done方法會(huì)導(dǎo)致panic,因?yàn)榇藭r(shí)waitGroup的計(jì)數(shù)器已經(jīng)為0,再次減少將導(dǎo)致負(fù)數(shù)計(jì)數(shù),這是不被允許的。

所以在調(diào)用Done方法時(shí),需要保證每次調(diào)用都與Add方法的調(diào)用一一對(duì)應(yīng),否則會(huì)導(dǎo)致程序出現(xiàn)錯(cuò)誤。

4.2 在所有任務(wù)都已經(jīng)添加之后,才調(diào)用Wait方法進(jìn)行等待

WaitGroup的設(shè)計(jì)初衷就是為了等待一組操作完成后再執(zhí)行下一步操作。所以,如果在所有任務(wù)添加之前,便調(diào)用Wait方法進(jìn)行等待,此時(shí)有可能會(huì)導(dǎo)致等待協(xié)程提前被喚醒,執(zhí)行下一步操作,而尚未添加的任務(wù)則不會(huì)被等待,這違反了WaitGroup的設(shè)計(jì)初衷,也不符合預(yù)期。下面是一個(gè)簡(jiǎn)單的例子:

package main
import (
        "fmt"
        "sync"
        "time"
)
func main() {
        var wg sync.WaitGroup
        for i := 1; i <= 3; i++ {
           go func(id int) {
              wg.Add(1)
              defer wg.Done()
              fmt.Printf("Goroutine %d started\n", id)
              time.Sleep(time.Duration(id) * time.Second) 
              fmt.Printf("Goroutine %d finished\n", id)
           }(i)
        }
        // 不等待所有任務(wù)添加,就開始等待
        wg.Wait()
        fmt.Println("All goroutines finished")
        time.Sleep(10 * time.Second)
}

代碼執(zhí)行結(jié)果如下,等待協(xié)程被提前喚醒,執(zhí)行之后的操作,而子任務(wù)在等待協(xié)程喚醒后才開始執(zhí)行:

All goroutines finished
Goroutine 1 started
Goroutine 3 started
Goroutine 2 started
Goroutine 1 finished
Goroutine 2 finished
Goroutine 3 finished

在這個(gè)例子中,我們創(chuàng)建了三個(gè)協(xié)程并打印出它們開始和結(jié)束的消息。但是,我們沒有在任務(wù)開始前調(diào)用Add方法添加任務(wù),而是在任務(wù)開始之后再調(diào)用Add方法添加任務(wù)。

這可能會(huì)導(dǎo)致某些任務(wù)未被加入到WaitGroup中,等待協(xié)程就調(diào)用了wg.Wait方法,這樣就會(huì)導(dǎo)致一些任務(wù)未被加入WaitGrou,從而導(dǎo)致等待協(xié)程不會(huì)等待這些任務(wù)執(zhí)行完成。如果這種情況發(fā)生了,我們會(huì)看到"All goroutines finished"被輸出,但實(shí)際上有一些協(xié)程還沒有完成。

因此,我們應(yīng)該在所有任務(wù)添加完畢之后再調(diào)用Wait方法,以保證等待的正確性。

5. WaitGroup常見使用場(chǎng)景

在函數(shù)或方法中使用,如果一個(gè)大任務(wù)可以拆分為多個(gè)獨(dú)立的子任務(wù),此時(shí)會(huì)將其進(jìn)行拆分,并使用多個(gè)協(xié)程來并發(fā)執(zhí)行這些任務(wù),提高執(zhí)行效率,同時(shí)使用WaitGroup等待所有子任務(wù)執(zhí)行完成,完成協(xié)程間的同步。

下面來看go-redisClusterClient結(jié)構(gòu)體中ForEachMaster方法中對(duì)于WaitGroup的使用。ForEachMaster方法通常用于在 Redis 集群中執(zhí)行針對(duì)所有主節(jié)點(diǎn)的某種操作,例如在集群中添加或刪除鍵,或者執(zhí)行一些全局的診斷操作,具體執(zhí)行的操作由傳入?yún)?shù)fn指定。

這里ForEachMaster方法會(huì)對(duì)所有主節(jié)點(diǎn)執(zhí)行某種操作,這里的實(shí)現(xiàn)是對(duì)所有主節(jié)點(diǎn)執(zhí)行某種操作這個(gè)大任務(wù),拆分為多個(gè)獨(dú)立的子任務(wù),每個(gè)子任務(wù)完成對(duì)一個(gè)Master節(jié)點(diǎn)執(zhí)行指定操作,然后每個(gè)子任務(wù)啟動(dòng)一個(gè)協(xié)程去執(zhí)行,主協(xié)程使用WaitGroup等待所有協(xié)程完成指定子任務(wù),ForEachMaster也就完成了對(duì)所有主節(jié)點(diǎn)執(zhí)行某種操作的任務(wù)。具體實(shí)現(xiàn)如下:

func (c *ClusterClient) ForEachMaster(
   ctx context.Context,
   fn func(ctx context.Context, client *Client) error,
) error {
   // 重新加載集群狀態(tài),以確保狀態(tài)信息是最新的
   state, err := c.state.ReloadOrGet(ctx)
   if err != nil {
      return err
   }
   var wg sync.WaitGroup
   // 用于協(xié)程間通信
   errCh := make(chan error, 1)
    // 獲取到redis集群中所有的master節(jié)點(diǎn)
   for _, master := range state.Masters {
      // 啟動(dòng)一個(gè)協(xié)程來執(zhí)行該任務(wù)
      wg.Add(1)
      go func(node *clusterNode) {
         // 任務(wù)完成時(shí),調(diào)用Done告知WaitGroup任務(wù)已完成
         defer wg.Done()
         err := fn(ctx, node.Client)
         if err != nil {
            select {
            case errCh &lt;- err:
            default:
            }
         }
      }(master)
   }
   // 主協(xié)程等待所有任務(wù)完成
   wg.Wait()
   return nil
 }

到此,相信大家對(duì)“go同步協(xié)程的必備工具WaitGroup怎么使用”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI