溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么在Go中實現(xiàn)并發(fā)控制context

發(fā)布時間:2021-06-01 17:09:07 來源:億速云 閱讀:149 作者:Leah 欄目:編程語言

怎么在Go中實現(xiàn)并發(fā)控制context?很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。

1. 前言

Golang context是Golang應(yīng)用開發(fā)常用的并發(fā)控制技術(shù),它與WaitGroup最大的不同點是context對于派生goroutine有更強(qiáng)的控制力,它可以控制多級的goroutine。

context翻譯成中文是"上下文",即它可以控制一組呈樹狀結(jié)構(gòu)的goroutine,每個goroutine擁有相同的上下文。

典型的使用場景如下圖所示:

怎么在Go中實現(xiàn)并發(fā)控制context

上圖中由于goroutine派生出子goroutine,而子goroutine又繼續(xù)派生新的goroutine,這種情況下使用WaitGroup就不太容易,因為子goroutine個數(shù)不容易確定。而使用context就可以很容易實現(xiàn)。

2. Context實現(xiàn)原理

context實際上只定義了接口,凡是實現(xiàn)該接口的類都可稱為是一種context,官方包中實現(xiàn)了幾個常用的context,分別可用于不同的場景。

2.1 接口定義

源碼包中src/context/context.go:Context 定義了該接口:

type Context interface {
  Deadline() (deadline time.Time, ok bool)

  Done() <-chan struct{}

  Err() error

  Value(key interface{}) interface{}
}

基礎(chǔ)的context接口只定義了4個方法,下面分別簡要說明一下:

2.1.1 Deadline()

該方法返回一個deadline和標(biāo)識是否已設(shè)置deadline的bool值,如果沒有設(shè)置deadline,則ok == false,此時deadline為一個初始值的time.Time值

2.1.2 Done()

該方法返回一個channel,需要在select-case語句中使用,如"case <-context.Done():"。

當(dāng)context關(guān)閉后,Done()返回一個被關(guān)閉的管道,關(guān)閉的管理仍然是可讀的,據(jù)此goroutine可以收到關(guān)閉請求;當(dāng)context還未關(guān)閉時,Done()返回nil。

2.1.3 Err()

該方法描述context關(guān)閉的原因。關(guān)閉原因由context實現(xiàn)控制,不需要用戶設(shè)置。比如Deadline context,關(guān)閉原因可能是因為deadline,也可能提前被主動關(guān)閉,那么關(guān)閉原因就會不同:

  • 因deadline關(guān)閉:“context deadline exceeded”;

  • 因主動關(guān)閉: "context canceled"。

當(dāng)context關(guān)閉后,Err()返回context的關(guān)閉原因;當(dāng)context還未關(guān)閉時,Err()返回nil;

2.1.3 Value()

有一種context,它不是用于控制呈樹狀分布的goroutine,而是用于在樹狀分布的goroutine間傳遞信息。

Value()方法就是用于此種類型的context,該方法根據(jù)key值查詢map中的value。具體使用后面示例說明。

2.2 空context

context包中定義了一個空的context, 名為emptyCtx,用于context的根節(jié)點,空的context只是簡單的實現(xiàn)了Context,本身不包含任何值,僅用于其他context的父節(jié)點。

emptyCtx類型定義如下代碼所示:

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
	return
}

func (*emptyCtx) Done() <-chan struct{} {
	return nil
}

func (*emptyCtx) Err() error {
	return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
	return nil
}

context包中定義了一個公用的emptCtx全局變量,名為background,可以使用context.Background()獲取它,實現(xiàn)代碼如下所示:

var background = new(emptyCtx)
func Background() Context {
	return background
}

context包提供了4個方法創(chuàng)建不同類型的context,使用這四個方法時如果沒有父context,都需要傳入backgroud,即backgroud作為其父節(jié)點:

  • WithCancel()

  • WithDeadline()

  • WithTimeout()

  • WithValue()

context包中實現(xiàn)Context接口的struct,除了emptyCtx外,還有cancelCtx、timerCtx和valueCtx三種,正是基于這三種context實例,實現(xiàn)了上述4種類型的context。

context包中各context類型之間的關(guān)系,如下圖所示:

怎么在Go中實現(xiàn)并發(fā)控制context

struct cancelCtx、valueCtx、valueCtx都繼承于Context,下面分別介紹這三個struct。

2.3 cancelCtx

源碼包中src/context/context.go:cancelCtx 定義了該類型context:

type cancelCtx struct {
	Context

	mu    sync.Mutex      // protects following fields
	done   chan struct{}     // created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err   error         // set to non-nil by the first cancel call
}

children中記錄了由此context派生的所有child,此context被cancle時會把其中的所有child都cancle掉。

cancelCtx與deadline和value無關(guān),所以只需要實現(xiàn)Done()和Err()接口外露接口即可。

2.3.1 Done()接口實現(xiàn)

按照Context定義,Done()接口只需要返回一個channel即可,對于cancelCtx來說只需要返回成員變量done即可。

這里直接看下源碼,非常簡單:

func (c *cancelCtx) Done() <-chan struct{} {
	c.mu.Lock()
	if c.done == nil {
		c.done = make(chan struct{})
	}
	d := c.done
	c.mu.Unlock()
	return d
}

由于cancelCtx沒有指定初始化函數(shù),所以cancelCtx.done可能還未分配,所以需要考慮初始化。
cancelCtx.done會在context被cancel時關(guān)閉,所以cancelCtx.done的值一般經(jīng)歷如三個階段:nil --> chan struct{} --> closed chan。

2.3.2 Err()接口實現(xiàn)

按照Context定義,Err()只需要返回一個error告知context被關(guān)閉的原因。對于cancelCtx來說只需要返回成員變量err即可。

還是直接看下源碼:

func (c *cancelCtx) Err() error {
	c.mu.Lock()
	err := c.err
	c.mu.Unlock()
	return err
}

cancelCtx.err默認(rèn)是nil,在context被cancel時指定一個error變量: var Canceled = errors.New("context canceled")。

2.3.3 cancel()接口實現(xiàn)

cancel()內(nèi)部方法是理解cancelCtx的最關(guān)鍵的方法,其作用是關(guān)閉自己和其后代,其后代存儲在cancelCtx.children的map中,其中key值即后代對象,value值并沒有意義,這里使用map只是為了方便查詢而已。

cancel方法實現(xiàn)偽代碼如下所示:

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
  c.mu.Lock()
	
  c.err = err	           //設(shè)置一個error,說明關(guān)閉原因
  close(c.done)           //將channel關(guān)閉,以此通知派生的context
	
  for child := range c.children {  //遍歷所有children,逐個調(diào)用cancel方法
    child.cancel(false, err)
  }
  c.children = nil
  c.mu.Unlock()

  if removeFromParent {      //正常情況下,需要將自己從parent刪除
    removeChild(c.Context, c)
  }
}

實際上,WithCancel()返回的第二個用于cancel context的方法正是此cancel()。

2.3.4 WithCancel()方法實現(xiàn)

WithCancel()方法作了三件事:

  • 初始化一個cancelCtx實例

  • 將cancelCtx實例添加到其父節(jié)點的children中(如果父節(jié)點也可以被cancel的話)

  • 返回cancelCtx實例和cancel()方法

其實現(xiàn)源碼如下所示:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	c := newCancelCtx(parent)
	propagateCancel(parent, &c)  //將自身添加到父節(jié)點
	return &c, func() { c.cancel(true, Canceled) }
}

這里將自身添加到父節(jié)點的過程有必要簡單說明一下:

  • 如果父節(jié)點也支持cancel,也就是說其父節(jié)點肯定有children成員,那么把新context添加到children里即可;

  • 如果父節(jié)點不支持cancel,就繼續(xù)向上查詢,直到找到一個支持cancel的節(jié)點,把新context添加到children里;

  • 如果所有的父節(jié)點均不支持cancel,則啟動一個協(xié)程等待父節(jié)點結(jié)束,然后再把當(dāng)前context結(jié)束。

2.3.5 典型使用案例

一個典型的使用cancel context的例子如下所示:

package main

import (
  "fmt"
  "time"
  "context"
)

func HandelRequest(ctx context.Context) {
  go WriteRedis(ctx)
  go WriteDatabase(ctx)
  for {
    select {
    case <-ctx.Done():
      fmt.Println("HandelRequest Done.")
      return
    default:
      fmt.Println("HandelRequest running")
      time.Sleep(2 * time.Second)
    }
  }
}

func WriteRedis(ctx context.Context) {
  for {
    select {
    case <-ctx.Done():
      fmt.Println("WriteRedis Done.")
      return
    default:
      fmt.Println("WriteRedis running")
      time.Sleep(2 * time.Second)
    }
  }
}

func WriteDatabase(ctx context.Context) {
  for {
    select {
    case <-ctx.Done():
      fmt.Println("WriteDatabase Done.")
      return
    default:
      fmt.Println("WriteDatabase running")
      time.Sleep(2 * time.Second)
    }
  }
}

func main() {
  ctx, cancel := context.WithCancel(context.Background())
  go HandelRequest(ctx)

  time.Sleep(5 * time.Second)
  fmt.Println("It's time to stop all sub goroutines!")
  cancel()

  //Just for test whether sub goroutines exit or not
  time.Sleep(5 * time.Second)
}

上面代碼中協(xié)程HandelRequest()用于處理某個請求,其又會創(chuàng)建兩個協(xié)程:WriteRedis()、WriteDatabase(),main協(xié)程創(chuàng)建創(chuàng)建context,并把context在各子協(xié)程間傳遞,main協(xié)程在適當(dāng)?shù)臅r機(jī)可以cancel掉所有子協(xié)程。

程序輸出如下所示:

HandelRequest running
WriteDatabase running
WriteRedis running
HandelRequest running
WriteDatabase running
WriteRedis running
HandelRequest running
WriteDatabase running
WriteRedis running
It's time to stop all sub goroutines!
WriteDatabase Done.
HandelRequest Done.
WriteRedis Done.

2.4 timerCtx

源碼包中src/context/context.go:timerCtx 定義了該類型context:

type timerCtx struct {
	cancelCtx
	timer *time.Timer // Under cancelCtx.mu.

	deadline time.Time
}

timerCtx在cancelCtx基礎(chǔ)上增加了deadline用于標(biāo)示自動cancel的最終時間,而timer就是一個觸發(fā)自動cancel的定時器。

由此,衍生出WithDeadline()和WithTimeout()。實現(xiàn)上這兩種類型實現(xiàn)原理一樣,只不過使用語境不一樣:

  • deadline: 指定最后期限,比如context將2018.10.20 00:00:00之時自動結(jié)束

  • timeout: 指定最長存活時間,比如context將在30s后結(jié)束。

對于接口來說,timerCtx在cancelCtx基礎(chǔ)上還需要實現(xiàn)Deadline()和cancel()方法,其中cancel()方法是重寫的。

2.4.1 Deadline()接口實現(xiàn)

Deadline()方法僅僅是返回timerCtx.deadline而矣。而timerCtx.deadline是WithDeadline()或WithTimeout()方法設(shè)置的。

2.4.2 cancel()接口實現(xiàn)

cancel()方法基本繼承cancelCtx,只需要額外把timer關(guān)閉。

timerCtx被關(guān)閉后,timerCtx.cancelCtx.err將會存儲關(guān)閉原因:

  • 如果deadline到來之前手動關(guān)閉,則關(guān)閉原因與cancelCtx顯示一致;

  • 如果deadline到來時自動關(guān)閉,則原因為:"context deadline exceeded"

2.4.3 WithDeadline()方法實現(xiàn)

WithDeadline()方法實現(xiàn)步驟如下:

  • 初始化一個timerCtx實例

  • 將timerCtx實例添加到其父節(jié)點的children中(如果父節(jié)點也可以被cancel的話)

  • 啟動定時器,定時器到期后會自動cancel本context

  • 返回timerCtx實例和cancel()方法

也就是說,timerCtx類型的context不僅支持手動cancel,也會在定時器到來后自動cancel。

2.4.4 WithTimeout()方法實現(xiàn)

WithTimeout()實際調(diào)用了WithDeadline,二者實現(xiàn)原理一致。

看代碼會非常清晰:

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
	return WithDeadline(parent, time.Now().Add(timeout))
}

2.4.5 典型使用案例

下面例子中使用WithTimeout()獲得一個context并在其了協(xié)程中傳遞:

package main

import (
  "fmt"
  "time"
  "context"
)

func HandelRequest(ctx context.Context) {
  go WriteRedis(ctx)
  go WriteDatabase(ctx)
  for {
    select {
    case <-ctx.Done():
      fmt.Println("HandelRequest Done.")
      return
    default:
      fmt.Println("HandelRequest running")
      time.Sleep(2 * time.Second)
    }
  }
}

func WriteRedis(ctx context.Context) {
  for {
    select {
    case <-ctx.Done():
      fmt.Println("WriteRedis Done.")
      return
    default:
      fmt.Println("WriteRedis running")
      time.Sleep(2 * time.Second)
    }
  }
}

func WriteDatabase(ctx context.Context) {
  for {
    select {
    case <-ctx.Done():
      fmt.Println("WriteDatabase Done.")
      return
    default:
      fmt.Println("WriteDatabase running")
      time.Sleep(2 * time.Second)
    }
  }
}

func main() {
  ctx, _ := context.WithTimeout(context.Background(), 5 * time.Second)
  go HandelRequest(ctx)

  time.Sleep(10 * time.Second)
}

主協(xié)程中創(chuàng)建一個10s超時的context,并將其傳遞給子協(xié)程,10s自動關(guān)閉context。程序輸出如下:

HandelRequest running
WriteRedis running
WriteDatabase running
HandelRequest running
WriteRedis running
WriteDatabase running
HandelRequest running
WriteRedis running
WriteDatabase running
HandelRequest Done.
WriteDatabase Done.
WriteRedis Done.

2.5 valueCtx

源碼包中src/context/context.go:valueCtx 定義了該類型context:

type valueCtx struct {
	Context
	key, val interface{}
}

valueCtx只是在Context基礎(chǔ)上增加了一個key-value對,用于在各級協(xié)程間傳遞一些數(shù)據(jù)。

由于valueCtx既不需要cancel,也不需要deadline,那么只需要實現(xiàn)Value()接口即可。

2.5.1 Value()接口實現(xiàn)

由valueCtx數(shù)據(jù)結(jié)構(gòu)定義可見,valueCtx.key和valueCtx.val分別代表其key和value值。 實現(xiàn)也很簡單:

func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key {
		return c.val
	}
	return c.Context.Value(key)
}

這里有個細(xì)節(jié)需要關(guān)注一下,即當(dāng)前context查找不到key時,會向父節(jié)點查找,如果查詢不到則最終返回interface{}。也就是說,可以通過子context查詢到父的value值。

2.5.2 WithValue()方法實現(xiàn)

WithValue()實現(xiàn)也是非常的簡單, 偽代碼如下:

func WithValue(parent Context, key, val interface{}) Context {
	if key == nil {
		panic("nil key")
	}
	return &valueCtx{parent, key, val}
}

2.5.3 典型使用案例

下面示例程序展示valueCtx的用法:

package main

import (
  "fmt"
  "time"
  "context"
)

func HandelRequest(ctx context.Context) {
  for {
    select {
    case <-ctx.Done():
      fmt.Println("HandelRequest Done.")
      return
    default:
      fmt.Println("HandelRequest running, parameter: ", ctx.Value("parameter"))
      time.Sleep(2 * time.Second)
    }
  }
}

func main() {
  ctx := context.WithValue(context.Background(), "parameter", "1")
  go HandelRequest(ctx)

  time.Sleep(10 * time.Second)
}

上例main()中通過WithValue()方法獲得一個context,需要指定一個父context、key和value。然后通將該context傳遞給子協(xié)程HandelRequest,子協(xié)程可以讀取到context的key-value。

注意:本例中子協(xié)程無法自動結(jié)束,因為context是不支持cancle的,也就是說<-ctx.Done()永遠(yuǎn)無法返回。如果需要返回,需要在創(chuàng)建context時指定一個可以cancel的context作為父節(jié)點,使用父節(jié)點的cancel()在適當(dāng)?shù)臅r機(jī)結(jié)束整個context。

總結(jié)

Context僅僅是一個接口定義,跟據(jù)實現(xiàn)的不同,可以衍生出不同的context類型;

cancelCtx實現(xiàn)了Context接口,通過WithCancel()創(chuàng)建cancelCtx實例;

timerCtx實現(xiàn)了Context接口,通過WithDeadline()和WithTimeout()創(chuàng)建timerCtx實例;

valueCtx實現(xiàn)了Context接口,通過WithValue()創(chuàng)建valueCtx實例;

三種context實例可互為父節(jié)點,從而可以組合成不同的應(yīng)用形式;

看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI