溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

SingleFlight模式的Go并發(fā)編程實例分析

發(fā)布時間:2022-04-27 09:29:36 來源:億速云 閱讀:188 作者:iii 欄目:開發(fā)技術

這篇文章主要介紹了SingleFlight模式的Go并發(fā)編程實例分析的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇SingleFlight模式的Go并發(fā)編程實例分析文章都會有所收獲,下面我們一起來看看吧。

go-zeroSingleFlight的作用是:將并發(fā)請求合并成一個請求,以減少對下層服務的壓力。

應用場景

  • 查詢緩存時,合并請求,提升服務性能。 假設有一個 IP 查詢的服務,每次用戶請求先在緩存中查詢一個 IP 的歸屬地,如果緩存中有結果則直接返回,不存在則進行 IP 解析操作。

SingleFlight模式的Go并發(fā)編程實例分析

如上圖所示,n 個用戶請求查詢同一個 IP(8.8.8.8)就會對應 n 個 Redis 的查詢,在高并發(fā)場景下,如果能將 n 個 Redis 查詢合并成一個 Redis 查詢,那么性能肯定會提升很多,而 SingleFlight就是用來實現請求合并的,效果如下:

SingleFlight模式的Go并發(fā)編程實例分析

  • 防止緩存擊穿。

緩存擊穿問題是指:在高并發(fā)的場景中,大量的請求同時查詢一個 key ,如果這個 key 正好過期失效了,就會導致大量的請求都打到數據庫,導致數據庫的連接增多,負載上升。

SingleFlight模式的Go并發(fā)編程實例分析

通過SingleFlight可以將對同一個Key的并發(fā)請求進行合并,只讓其中一個請求到數據庫進行查詢,其他請求共享同一個結果,可以很大程度提升并發(fā)能力。

應用方式

直接上代碼:

func main() {
  round := 10
  var wg sync.WaitGroup
  barrier := syncx.NewSingleFlight()
  wg.Add(round)
  for i := 0; i < round; i++ {
    go func() {
      defer wg.Done()
      // 啟用10個協(xié)程模擬獲取緩存操作
      val, err := barrier.Do("get_rand_int", func() (interface{}, error) {
        time.Sleep(time.Second)
        return rand.Int(), nil
      })
      if err != nil {
        fmt.Println(err)
      } else {
        fmt.Println(val)
      }
    }()
  }
  wg.Wait()
}

以上代碼,模擬 10 個協(xié)程請求 Redis 獲取一個 key 的內容,代碼很簡單,就是執(zhí)行Do()方法。其中,接收兩個參數,第一個參數是獲取資源的標識,可以是 redis 中緩存的 key,第二個參數就是一個匿名函數,封裝好要做的業(yè)務邏輯。最終獲得的結果如下:

5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410

從上看出,10個協(xié)程都獲得了同一個結果,也就是只有一個協(xié)程真正執(zhí)行了rand.Int()獲取了隨機數,其他的協(xié)程都共享了這個結果。

源碼解析

先看代碼結構:

type (
  // 定義接口,有2個方法 Do 和 DoEx,其實邏輯是一樣的,DoEx 多了一個標識,主要看Do的邏輯就夠了
  SingleFlight interface {
    Do(key string, fn func() (interface{}, error)) (interface{}, error)
    DoEx(key string, fn func() (interface{}, error)) (interface{}, bool, error)
  }
  // 定義 call 的結構
  call struct {
    wg  sync.WaitGroup // 用于實現通過1個 call,其他 call 阻塞
    val interface{}    // 表示 call 操作的返回結果
    err error          // 表示 call 操作發(fā)生的錯誤
  }
  // 總控結構,實現 SingleFlight 接口
  flightGroup struct {
    calls map[string]*call // 不同的 call 對應不同的 key
    lock  sync.Mutex       // 利用鎖控制請求
  }
)

然后看最核心的Do方法做了什么事情:

func (g *flightGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
  c, done := g.createCall(key)
  if done {
    return c.val, c.err
  }
  g.makeCall(c, key, fn)
  return c.val, c.err
}

代碼很簡潔,利用g.createCall(key)對 key 發(fā)起 call 請求(其實就是做一件事情),如果此時已經有其他協(xié)程已經在發(fā)起 call 請求就阻塞?。╠one 為 true 的情況),等待拿到結果后直接返回。如果 done 是 false,說明當前協(xié)程是第一個發(fā)起 call 的協(xié)程,那么就執(zhí)行g.makeCall(c, key, fn)真正地發(fā)起 call 請求(此后的其他協(xié)程就阻塞在了g.createCall(key))。 

SingleFlight模式的Go并發(fā)編程實例分析

從上圖可知,其實關鍵就兩步:

  • 判斷是第一個請求的協(xié)程(利用map)

  • 阻塞住其他所有協(xié)程(利用 sync.WaitGroup)

來看下g.createCall(key)如何實現的:

func (g *flightGroup) createCall(key string) (c *call, done bool) {
  g.lock.Lock()
  if c, ok := g.calls[key]; ok {
    g.lock.Unlock()
    c.wg.Wait()
    return c, true
  }
  c = new(call)
  c.wg.Add(1)
  g.calls[key] = c
  g.lock.Unlock()
  return c, false
}

先看第一步:判斷是第一個請求的協(xié)程(利用map)

g.lock.Lock()
if c, ok := g.calls[key]; ok {
  g.lock.Unlock()
  c.wg.Wait()
  return c, true
}

此處判斷 map 中的 key 是否存在,如果已經存在,說明已經有其他協(xié)程在請求了,當前這個協(xié)程只需要等待,等待是利用了sync.WaitGroupWait()方法實現的,此處還是很巧妙的。要注意的是,map 在 Go 中是非并發(fā)安全的,所以需要加鎖。

再看第二步:阻塞住其他所有協(xié)程(利用 sync.WaitGroup)

c = new(call)
c.wg.Add(1)
g.calls[key] = c

因為是第一個發(fā)起 call 的協(xié)程,所以需要 new 這個 call,然后將wg.Add(1),這樣就對應了上面的wg.Wait(),阻塞剩下的協(xié)程。隨后將 new 的 call 放入 map 中,注意此時只是完成了初始化,并沒有真正去執(zhí)行call請求,真正的處理邏輯在 g.makeCall(c, key, fn)中。

func (g *flightGroup) makeCall(c *call, key string, fn func() (interface{}, error)) {
  defer func() {
    g.lock.Lock()
    delete(g.calls, key)
    g.lock.Unlock()
    c.wg.Done()
  }()
  c.val, c.err = fn()
}

這個方法中做的事情很簡單,就是執(zhí)行了傳遞的匿名函數fn()(也就是真正call請求要做的事情)。最后處理收尾的事情(通過defer),也是分成兩步:

  • 刪除 map 中的 key,使得下次發(fā)起請求可以獲取新的值。

  • 調用wg.Done(),讓之前阻塞的協(xié)程全部獲得結果并返回。

關于“SingleFlight模式的Go并發(fā)編程實例分析”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“SingleFlight模式的Go并發(fā)編程實例分析”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI