溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Golang?errgroup設計及實現(xiàn)原理是什么

發(fā)布時間:2022-08-29 16:28:32 來源:億速云 閱讀:143 作者:iii 欄目:開發(fā)技術

這篇文章主要介紹“Golang errgroup設計及實現(xiàn)原理是什么”,在日常操作中,相信很多人在Golang errgroup設計及實現(xiàn)原理是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Golang errgroup設計及實現(xiàn)原理是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

    errgroup 源碼拆解

    errgroup 定義在 golang.org/x/sync/errgroup,承載核心能力的結構體是 Group。

    Group

    type token struct{}
    // A Group is a collection of goroutines working on subtasks that are part of
    // the same overall task.
    //
    // A zero Group is valid, has no limit on the number of active goroutines,
    // and does not cancel on error.
    type Group struct {
    	cancel func()
    	wg sync.WaitGroup
    	sem chan token
    	errOnce sync.Once
    	err     error
    }

    Group 就是對我們上面提到的一堆子任務執(zhí)行計劃的抽象。每一個子任務都會有自己對應的 goroutine 來執(zhí)行。

    通過這個結構我們也可以看出來,errgroup 底層實現(xiàn)多個 goroutine 調度,等待的能力還是基于 sync.WaitGroup。

    WithContext

    我們可以調用 errgroup.WithContext 函數(shù)來創(chuàng)建一個 Group。

    // WithContext returns a new Group and an associated Context derived from ctx.
    //
    // The derived Context is canceled the first time a function passed to Go
    // returns a non-nil error or the first time Wait returns, whichever occurs
    // first.
    func WithContext(ctx context.Context) (*Group, context.Context) {
    	ctx, cancel := context.WithCancel(ctx)
    	return &Group{cancel: cancel}, ctx
    }

    這里可以看到,Group 的 cancel 函數(shù)本質就是為了支持 context 的 cancel 能力,初始化的 Group 只有一個 cancel 屬性,其他都是默認的。一旦有一個子任務返回錯誤,或者是 Wait 調用返回,這個新 Context 就會被 cancel。

    Wait

    本質上和 WaitGroup 的 Wait 方法語義一樣,既然我們是個 group task,就需要等待所有任務都執(zhí)行完,這個語義就由 Wait 方法提供。如果有多個子任務返回錯誤,它只會返回第一個出現(xiàn)的錯誤,如果所有的子任務都執(zhí)行成功,就返回 nil。

    // Wait blocks until all function calls from the Go method have returned, then
    // returns the first non-nil error (if any) from them.
    func (g *Group) Wait() error {
    	g.wg.Wait()
    	if g.cancel != nil {
    		g.cancel()
    	}
    	return g.err
    }

    Wait 的實現(xiàn)非常簡單。一個前置的 WaitGroup Wait,結束后只做了兩件事:

    • 如果對于公共的 Context 有 cancel 函數(shù),就將其 cancel,因為事情辦完了;

    • 返回公共的 Group 結構中的 err 給調用方。

    Go

    Group 的核心能力就在于能夠并發(fā)執(zhí)行多個子任務,從調用者的角度,我們只需要傳入要執(zhí)行的函數(shù),簽名為:func() error即可,非常通用。如果任務執(zhí)行成功,就返回 nil,否則就返回 error,并且會 cancel 那個新的 Context。底層的調度邏輯由 Group 的 Go 方法實現(xiàn):

    // Go calls the given function in a new goroutine.
    // It blocks until the new goroutine can be added without the number of
    // active goroutines in the group exceeding the configured limit.
    //
    // The first call to return a non-nil error cancels the group; its error will be
    // returned by Wait.
    func (g *Group) Go(f func() error) {
    	if g.sem != nil {
    		g.sem <- token{}
    	}
    	g.wg.Add(1)
    	go func() {
    		defer g.done()
    		if err := f(); err != nil {
    			g.errOnce.Do(func() {
    				g.err = err
    				if g.cancel != nil {
    					g.cancel()
    				}
    			})
    		}
    	}()
    }
    func (g *Group) done() {
    	if g.sem != nil {
    		<-g.sem
    	}
    	g.wg.Done()
    }

    我們重點來分析下 Go 這里發(fā)生了什么。

    WaitGroup 加 1 用作計數(shù);

    啟動一個新的 goroutine 執(zhí)行調用方傳入的 f() 函數(shù);

    • 若 err 為 nil 說明執(zhí)行正常;

    • 若 err 不為 nil,說明執(zhí)行出錯,此時將這個返回的 err 賦值給全局 Group 的變量 err,并將 context cancel 掉。注意,這里的處理在 once 分支中,也就是只有第一個來的錯誤會被處理。

    在 defer 語句中調用 Group 的 done 方法,底層依賴 WaitGroup 的 Done,說明這一個子任務結束。

    這里也可以看到,其實所謂 errgroup,我們并不是將所有子任務的 error 拼成一個字符串返回,而是直接在 Go 方法那里將第一個錯誤返回,底層依賴了 once 的能力。

    SetLimit

    其實看到這里,你有沒有覺得 errgroup 的功能有點雞肋?底層核心技術都是靠 WaitGroup 完成的,自己只不過是起了個 goroutine 執(zhí)行方法,err 還只能保留一個。而且 Group 里面的 sem 那個 chan 是用來干什么呢?

    這一節(jié)我們就來看看,Golang 對 errgroup 能力的一次擴充。

    到目前為止,errgroup 是可以做到一開始人們對它的期望的,即并發(fā)執(zhí)行子任務。但問題在于,這里是每一個子任務都開了個goroutine,如果是在高并發(fā)的環(huán)境里,頻繁創(chuàng)建大量goroutine 這樣的用法很容易對資源負載產生影響。開發(fā)者們提出,希望有辦法限制 errgroup 創(chuàng)建的 goroutine 數(shù)量,參照這個 proposal: #27837

    // SetLimit limits the number of active goroutines in this group to at most n.
    // A negative value indicates no limit.
    //
    // Any subsequent call to the Go method will block until it can add an active
    // goroutine without exceeding the configured limit.
    //
    // The limit must not be modified while any goroutines in the group are active.
    func (g *Group) SetLimit(n int) {
    	if n < 0 {
    		g.sem = nil
    		return
    	}
    	if len(g.sem) != 0 {
    		panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
    	}
    	g.sem = make(chan token, n)
    }

    SetLimit 的參數(shù) n 就是我們對這個 Group 期望的最大 goroutine 數(shù)量,這里其實除去校驗邏輯,只干了一件事:g.sem = make(chan token, n),即創(chuàng)建一個長度為 n 的 channel,賦值給 sem。

    回憶一下我們在 Go 方法 defer 調用 done 中的表現(xiàn),是不是清晰了很多?我們來理一下:

    首先,Group 結構體就包含了 sem 變量,只作為通信,元素是空結構體,不包含實際意義:

    type Group struct {
    	cancel func()
    	wg sync.WaitGroup
    	sem chan token
    	errOnce sync.Once
    	err     error
    }

    如果你對整個 Group 的 Limit 沒有要求,which is fine,你就直接忽略這個 SetLimit,errgroup 的原有能力就可以支持你的訴求。

    但是,如果你希望保持 errgroup 的 goroutine 在一個上限之內,請在調用 Go 方法前,先 SetLimit,這樣 Group 的 sem 就賦值為一個長度為 n 的 channel。

    那么,當你調用 Go 方法時,注意下面的框架代碼,此時 g.sem 不為 nil,所以我們會塞一個 token 進去,作為占坑,語義上表示我申請一個 goroutine 用。

    func (g *Group) Go(f func() error) {
    	if g.sem != nil {
    		g.sem <- token{}
    	}
    	g.wg.Add(1)
    	go func() {
    		defer g.done()
                    ...
    	}()
    }

    當然,如果此時 goroutine 數(shù)量已經(jīng)達到上限,這里就會 block 住,直到別的 goroutine 干完活,sem 輸出了一個 token之后,才能繼續(xù)往里面塞。

    在每個 goroutine 執(zhí)行完畢后 defer 的 g.done 方法,則是完成了這一點:

    func (g *Group) done() {
    	if g.sem != nil {
    		<-g.sem
    	}
    	g.wg.Done()
    }

    這樣就把 sem 的用法串起來了。我們通過創(chuàng)建一個定長的channel來實現(xiàn)對于 goroutine 數(shù)量的管控,對于channel實際包含的元素并不關心,所以用一個空結構體省一省空間,這是非常優(yōu)秀的設計,大家平常也可以參考。

    TryGo

    TryGo 和 SetLimit 這套體系其實都是不久前歐長坤大佬提交到 errgroup 的能力。

    一如既往,所有帶 TryXXX的函數(shù),都不會阻塞。 其實辦的事情非常簡單,和 Go 方法一樣,傳進來一個 func() error來執(zhí)行。

    Go 方法的區(qū)別在于,如果判斷 limit 已經(jīng)不夠了,此時不再阻塞,而是直接 return false,代表執(zhí)行失敗。其他的部分完全一樣。

    // TryGo calls the given function in a new goroutine only if the number of
    // active goroutines in the group is currently below the configured limit.
    //
    // The return value reports whether the goroutine was started.
    func (g *Group) TryGo(f func() error) bool {
    	if g.sem != nil {
    		select {
    		case g.sem <- token{}:
    			// Note: this allows barging iff channels in general allow barging.
    		default:
    			return false
    		}
    	}
    	g.wg.Add(1)
    	go func() {
    		defer g.done()
    		if err := f(); err != nil {
    			g.errOnce.Do(func() {
    				g.err = err
    				if g.cancel != nil {
    					g.cancel()
    				}
    			})
    		}
    	}()
    	return true
    }

    使用方法

    這里我們先看一個最常見的用法,針對一組任務,每一個都單獨起 goroutine 執(zhí)行,最后獲取 Wait 返回的 err 作為整個 Group 的 err。

    package main
    import (
        "errors"
        "fmt"
        "time"
        "golang.org/x/sync/errgroup"
    )
    func main() {
        var g errgroup.Group
        // 啟動第一個子任務,它執(zhí)行成功
        g.Go(func() error {
            time.Sleep(5 * time.Second)
            fmt.Println("exec #1")
            return nil
        })
        // 啟動第二個子任務,它執(zhí)行失敗
        g.Go(func() error {
            time.Sleep(10 * time.Second)
            fmt.Println("exec #2")
            return errors.New("failed to exec #2")
        })
        // 啟動第三個子任務,它執(zhí)行成功
        g.Go(func() error {
            time.Sleep(15 * time.Second)
            fmt.Println("exec #3")
            return nil
        })
        // 等待三個任務都完成
        if err := g.Wait(); err == nil {
            fmt.Println("Successfully exec all")
        } else {
            fmt.Println("failed:", err)
        }
    }

    你會發(fā)現(xiàn),最后 err 打印出來就是第二個子任務的 err。

    當然,上面這個 case 是我們正好只有一個報錯,但是,如果實際有多個任務都掛了呢?

    從完備性來考慮,有沒有什么辦法能夠將多個任務的錯誤都返回呢?

    這一點其實 errgroup 庫并沒有提供非常好的支持,需要開發(fā)者自行做一些改造。因為 Group 中只有一個 err 變量,我們不可能基于 Group 來實現(xiàn)這一點。

    通常情況下,我們會創(chuàng)建一個 slice 來存儲 f() 執(zhí)行的 err。

    package main
    import (
        "errors"
        "fmt"
        "time"
        "golang.org/x/sync/errgroup"
    )
    func main() {
        var g errgroup.Group
        var result = make([]error, 3)
        // 啟動第一個子任務,它執(zhí)行成功
        g.Go(func() error {
            time.Sleep(5 * time.Second)
            fmt.Println("exec #1")
            result[0] = nil // 保存成功或者失敗的結果
            return nil
        })
        // 啟動第二個子任務,它執(zhí)行失敗
        g.Go(func() error {
            time.Sleep(10 * time.Second)
            fmt.Println("exec #2")
            result[1] = errors.New("failed to exec #2") // 保存成功或者失敗的結果
            return result[1]
        })
        // 啟動第三個子任務,它執(zhí)行成功
        g.Go(func() error {
            time.Sleep(15 * time.Second)
            fmt.Println("exec #3")
            result[2] = nil // 保存成功或者失敗的結果
            return nil
        })
        if err := g.Wait(); err == nil {
            fmt.Printf("Successfully exec all. result: %v\n", result)
        } else {
            fmt.Printf("failed: %v\n", result)
        }
    }

    可以看到,我們聲明了一個 result slice,長度為 3。這里不用擔心并發(fā)問題,因為每個 goroutine 讀寫的位置是確定唯一的。

    本質上可以理解為,我們把 f() 返回的 err 不僅僅給了 Group 一份,還自己存了一份,當出錯的時候,Wait 返回的錯誤我們不一定真的用,而是直接用自己錯的這一個 error slice。

    Go 官方文檔中的利用 errgroup 實現(xiàn) pipeline 的示例也很有參考意義:由一個子任務遍歷文件夾下的文件,然后把遍歷出的文件交給 20 個 goroutine,讓這些 goroutine 并行計算文件的 md5。

    這里貼出來簡化代碼學習一下.

    package main
    import (
    	"context"
    	"crypto/md5"
    	"fmt"
    	"io/ioutil"
    	"log"
    	"os"
    	"path/filepath"
    	"golang.org/x/sync/errgroup"
    )
    // Pipeline demonstrates the use of a Group to implement a multi-stage
    // pipeline: a version of the MD5All function with bounded parallelism from
    // https://blog.golang.org/pipelines.
    func main() {
    	m, err := MD5All(context.Background(), ".")
    	if err != nil {
    		log.Fatal(err)
    	}
    	for k, sum := range m {
    		fmt.Printf("%s:\t%x\n", k, sum)
    	}
    }
    type result struct {
    	path string
    	sum  [md5.Size]byte
    }
    // MD5All reads all the files in the file tree rooted at root and returns a map
    // from file path to the MD5 sum of the file's contents. If the directory walk
    // fails or any read operation fails, MD5All returns an error.
    func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
    	// ctx is canceled when g.Wait() returns. When this version of MD5All returns
    	// - even in case of error! - we know that all of the goroutines have finished
    	// and the memory they were using can be garbage-collected.
    	g, ctx := errgroup.WithContext(ctx)
    	paths := make(chan string)
    	g.Go(func() error {
    		defer close(paths)
    		return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
    			if err != nil {
    				return err
    			}
    			if !info.Mode().IsRegular() {
    				return nil
    			}
    			select {
    			case paths <- path:
    			case <-ctx.Done():
    				return ctx.Err()
    			}
    			return nil
    		})
    	})
    	// Start a fixed number of goroutines to read and digest files.
    	c := make(chan result)
    	const numDigesters = 20
    	for i := 0; i < numDigesters; i++ {
    		g.Go(func() error {
    			for path := range paths {
    				data, err := ioutil.ReadFile(path)
    				if err != nil {
    					return err
    				}
    				select {
    				case c <- result{path, md5.Sum(data)}:
    				case <-ctx.Done():
    					return ctx.Err()
    				}
    			}
    			return nil
    		})
    	}
    	go func() {
    		g.Wait()
    		close(c)
    	}()
    	m := make(map[string][md5.Size]byte)
    	for r := range c {
    		m[r.path] = r.sum
    	}
    	// Check whether any of the goroutines failed. Since g is accumulating the
    	// errors, we don't need to send them (or check for them) in the individual
    	// results sent on the channel.
    	if err := g.Wait(); err != nil {
    		return nil, err
    	}
    	return m, nil
    }

    到此,關于“Golang errgroup設計及實現(xiàn)原理是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

    向AI問一下細節(jié)

    免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。

    AI