溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Golang并發(fā)編程怎么應(yīng)用

發(fā)布時間:2023-05-08 15:21:15 來源:億速云 閱讀:78 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要講解了“Golang并發(fā)編程怎么應(yīng)用”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Golang并發(fā)編程怎么應(yīng)用”吧!

    1、通過通信共享

    并發(fā)編程是一個很大的主題,這里只提供一些特定于go的重點內(nèi)容。

    在許多環(huán)境中,實現(xiàn)對共享變量的正確訪問所需要的微妙之處使并發(fā)編程變得困難。Go鼓勵一種不同的方法,在這種方法中,共享值在通道中傳遞,實際上,從不由單獨的執(zhí)行線程主動共享。在任何給定時間,只有一個goroutine可以訪問該值。根據(jù)設(shè)計,數(shù)據(jù)競爭是不可能發(fā)生的。為了鼓勵這種思維方式,我們把它簡化為一句口號:

    Do not communicate by sharing memory; instead, share memory by communicating.

    不要通過共享內(nèi)存進行通信;相反,通過通信共享內(nèi)存。

    這種方法可能走得太遠(yuǎn)。例如,引用計數(shù)最好通過在整數(shù)變量周圍放置互斥來實現(xiàn)。但是作為一種高級方法,使用通道來控制訪問可以更容易地編寫清晰、正確的程序。

    考慮這個模型的一種方法是考慮一個典型的單線程程序運行在一個CPU上。它不需要同步原語?,F(xiàn)在運行另一個這樣的實例;它也不需要同步。現(xiàn)在讓這兩個程序通信;如果通信是同步器,則仍然不需要其他同步。例如,Unix管道就完美地符合這個模型。盡管Go的并發(fā)方法起源于Hoare的通信順序處理(communication Sequential Processes, CSP),但它也可以被視為Unix管道的類型安全的泛化。

    2、Goroutines

    它們之所以被稱為goroutine,是因為現(xiàn)有的術(shù)語——線程、協(xié)程、進程等等——傳達(dá)了不準(zhǔn)確的含義。goroutine有一個簡單的模型:它是一個與相同地址空間中的其他goroutine并發(fā)執(zhí)行的函數(shù)。它是輕量級的,比分配棧空間的成本高不了多少。而且棧開始時很小,所以它們很便宜,并通過根據(jù)需要分配(和釋放)堆存儲來增長。

    goroutine被多路復(fù)用到多個操作系統(tǒng)線程上,因此如果一個線程阻塞,比如在等待I/O時,其他線程繼續(xù)運行。它們的設(shè)計隱藏了線程創(chuàng)建和管理的許多復(fù)雜性。

    在函數(shù)或方法調(diào)用前加上go關(guān)鍵字以在新的 goroutine 中運行該調(diào)用。當(dāng)調(diào)用完成時,goroutine 將無聲地退出。(效果類似于Unix shell的&符號,用于在后臺運行命令。)

    go list.Sort() // run list.Sort concurrently; don't wait for it.

    function literal在goroutine調(diào)用中很方便。

    func Announce(message string, delay time.Duration) {
        go func() {
            time.Sleep(delay)
            fmt.Println(message)
        }()  // Note the parentheses - must call the function.
    }

    在Go中,函數(shù)字面量( function literals )是閉包: 實現(xiàn)確保函數(shù)引用的變量只要處于活動狀態(tài)就能存活。

    3、Channels

    與map一樣,通道也使用make進行分配,結(jié)果值作為對底層數(shù)據(jù)結(jié)構(gòu)的引用。如果提供了可選的整數(shù)參數(shù),它將設(shè)置通道的緩沖區(qū)大小。對于無緩沖通道或同步通道,默認(rèn)值為0。

    ci := make(chan int)            // unbuffered channel of integers
    cj := make(chan int, 0)         // unbuffered channel of integers
    cs := make(chan *os.File, 100)  // buffered channel of pointers to Files

    無緩沖通道將通信(值的交換)與同步結(jié)合起來,確保兩個計算(gorout例程)處于已知狀態(tài)。

    有很多使用通道的好習(xí)語。這是一個開始。在前一節(jié)中,我們在后臺啟動了排序。通道可以允許啟動goroutine等待排序完成。

    c := make(chan int)  // Allocate a channel.
    // Start the sort in a goroutine; when it completes, signal on the channel.
    go func() {
        list.Sort()
        c <- 1  // Send a signal; value does not matter.
    }()
    doSomethingForAWhile()
    <-c   // Wait for sort to finish; discard sent value.

    接收者總是阻塞,直到有數(shù)據(jù)接收。如果通道無緩沖,發(fā)送方將阻塞,直到接收方接收到該值。如果通道有緩沖區(qū),發(fā)送方只阻塞直到值被復(fù)制到緩沖區(qū);如果緩沖區(qū)已滿,這意味著需要等待到某個接收器接收到一個值。 (參考3.1)

    有緩沖通道可以像信號量(semaphore)一樣使用,例如限制吞吐量。在本例中,傳入的請求被傳遞給handle, handle將一個值發(fā)送到通道中,處理請求,然后從通道接收一個值,以便為下一個使用者準(zhǔn)備“信號量”。通道緩沖區(qū)的容量限制了要處理的同時調(diào)用的數(shù)量。

    var sem = make(chan int, MaxOutstanding)
    func handle(r *Request) {
        sem <- 1    // Wait for active queue to drain.
        process(r)  // May take a long time.
        <-sem       // Done; enable next request to run.
    }
    func Serve(queue chan *Request) {
        for {
            req := <-queue
            go handle(req)  // Don't wait for handle to finish.
        }
    }

    一旦MaxOutstanding處理程序正在執(zhí)行進程,試圖向已充滿的通道緩沖區(qū)發(fā)送的請求都將阻塞,直到現(xiàn)有的一個處理程序完成并從緩沖區(qū)接收。

    但是,這種設(shè)計有一個問題:Serve為每個傳入的請求創(chuàng)建一個新的goroutine ,盡管在任何時候, 只有MaxOutstanding多個可以運行。因此,如果請求來得太快,程序可能會消耗無限的資源。我們可以通過更改Serve來限制goroutines的創(chuàng)建來解決這個缺陷。這里有一個明顯的解決方案,但要注意它有一個bug,我們隨后會修復(fù):

    func Serve(queue chan *Request) {
        for req := range queue {
            sem <- 1
            go func() {
                process(req) // Buggy; see explanation below.
                <-sem
            }()
        }
    }

    bug 在于,在Go for循環(huán)中,循環(huán)變量在每次迭代中都被重用,因此req變量在所有g(shù)oroutine中共享。這不是我們想要的。我們需要確保每個goroutine的req是唯一的。這里有一種方法,在goroutine中將req的值作為參數(shù)傳遞給閉包:

    func Serve(queue chan *Request) {
        for req := range queue {
            sem <- 1
            go func(req *Request) {
                process(req)
                <-sem
            }(req)
        }
    }

    將此版本與前一個版本進行比較,查看閉包的聲明和運行方式的差異。另一個解決方案是創(chuàng)建一個同名的新變量,如下例所示:

    func Serve(queue chan *Request) {
        for req := range queue {
            req := req // Create new instance of req for the goroutine.
            sem <- 1
            go func() {
                process(req)
                <-sem
            }()
        }
    }

    這樣寫似乎有些奇怪

    req := req

    但在Go 中這樣做是合法的和慣用的。您將得到一個具有相同名稱的新變量,故意在局部掩蓋循環(huán)變量,但對每個goroutine都是惟一的。

    回到編寫服務(wù)器的一般問題,另一種很好地管理資源的方法是啟動固定數(shù)量的handle goroutines ,所有這些handle goroutines 都從請求通道讀取。goroutine的數(shù)量限制了process同時調(diào)用的數(shù)量。這個Serve函數(shù)還接受一個通道,它將被告知退出該通道;在啟動goroutines之后,它會阻止從該通道接收。

    func handle(queue chan *Request) {
        for r := range queue {
            process(r)
        }
    }
    func Serve(clientRequests chan *Request, quit chan bool) {
        // Start handlers
        for i := 0; i < MaxOutstanding; i++ {
            go handle(clientRequests)
        }
        <-quit  // Wait to be told to exit.
    }

    3.1 Channel都有哪些特性

    Go語言中的channel具有以下幾個特性:

    線程安全

    channel是線程安全的,多個協(xié)程可以同時讀寫一個channel,而不會發(fā)生數(shù)據(jù)競爭的問題。這是因為Go語言中的channel內(nèi)部實現(xiàn)了鎖機制,保證了多個協(xié)程之間對channel的訪問是安全的。

    阻塞式發(fā)送和接收

    當(dāng)一個協(xié)程向一個channel發(fā)送數(shù)據(jù)時,如果channel已經(jīng)滿了,發(fā)送操作會被阻塞,直到有其他協(xié)程從channel中取走了數(shù)據(jù)。同樣地,當(dāng)一個協(xié)程從一個channel中接收數(shù)據(jù)時,如果channel中沒有數(shù)據(jù)可供接收,接收操作會被阻塞,直到有其他協(xié)程向channel中發(fā)送了數(shù)據(jù)。這種阻塞式的機制可以保證協(xié)程之間的同步和通信。

    順序性

    通過channel發(fā)送的數(shù)據(jù)是按照發(fā)送的順序進行排列的。也就是說,如果協(xié)程A先向channel中發(fā)送了數(shù)據(jù)x,而協(xié)程B再向channel中發(fā)送了數(shù)據(jù)y,那么從channel中接收數(shù)據(jù)時,先接收到的一定是x,后接收到的一定是y。

    可以關(guān)閉

    通過關(guān)閉channel可以通知其他協(xié)程這個channel已經(jīng)不再使用了。關(guān)閉一個channel之后,其他協(xié)程仍然可以從中接收數(shù)據(jù),但是不能再向其中發(fā)送數(shù)據(jù)了。關(guān)閉channel的操作可以避免內(nèi)存泄漏等問題。

    緩沖區(qū)大小

    channel可以帶有一個緩沖區(qū),用于存儲一定量的數(shù)據(jù)。如果緩沖區(qū)已經(jīng)滿了,發(fā)送操作會被阻塞,直到有其他協(xié)程從channel中取走了數(shù)據(jù);如果緩沖區(qū)已經(jīng)空了,接收操作會被阻塞,直到有其他協(xié)程向channel中發(fā)送了數(shù)據(jù)。緩沖區(qū)的大小可以在創(chuàng)建channel時指定,例如:

    ch := make(chan int, 10)

    會panic的幾種情況

    1.向已經(jīng)關(guān)閉的channel發(fā)送數(shù)據(jù)

    2.關(guān)閉已經(jīng)關(guān)閉的channel

    3.關(guān)閉未初始化的nil channel

    會阻塞的情況:

    1.從未初始化 nil channel中讀數(shù)據(jù)

    2.向未初始化 nil channel中發(fā)數(shù)據(jù)

    3.在沒有讀取的groutine時,向無緩沖channel發(fā)數(shù)據(jù),

    有緩沖區(qū),但緩沖區(qū)已滿,發(fā)送數(shù)據(jù)時

    4.在沒有數(shù)據(jù)時,從無緩沖或者有緩沖channel讀數(shù)據(jù)

    返回零值:

    從已經(jīng)關(guān)閉的channe接收數(shù)據(jù)

    3.2 channel 的最佳實踐

    在使用channel時,應(yīng)該遵循以下幾個最佳實踐:

    避免死鎖

    使用channel時應(yīng)該注意避免死鎖的問題。如果一個協(xié)程向一個channel發(fā)送數(shù)據(jù),但是沒有其他協(xié)程從channel中取走數(shù)據(jù),那么發(fā)送操作就會一直被阻塞,從而導(dǎo)致死鎖。為了避免這種情況,可以使用select語句來同時監(jiān)聽多個channel,從而避免阻塞。

    避免泄漏

    在使用channel時應(yīng)該注意避免內(nèi)存泄漏的問題。如果一個channel沒有被關(guān)閉,而不再使用了,那么其中的數(shù)據(jù)就無法被釋放,從而導(dǎo)致內(nèi)存泄漏。為了避免這種情況,可以在協(xié)程結(jié)束時關(guān)閉channel。

    避免競爭

    在使用channel時應(yīng)該注意避免數(shù)據(jù)競爭的問題。如果多個協(xié)程同時讀寫一個channel,那么就可能會發(fā)生競爭條件,從而導(dǎo)致數(shù)據(jù)不一致的問題。為了避免這種情況,可以使用鎖機制或者使用單向channel來限制協(xié)程的訪問權(quán)限。

    避免過度使用

    在使用channel時應(yīng)該注意避免過度使用的問題。如果一個程序中使用了大量的channel,那么就可能會導(dǎo)致程序的性能下降。為了避免這種情況,可以使用其他的并發(fā)編程機制,例如鎖、條件變量等。

    4、Channels of channels

    Go最重要的屬性之一是通道是first-class值,可以像其他值一樣分配和傳遞。此屬性的常見用途是實現(xiàn)安全的并行多路解復(fù)用。

    在上一節(jié)的示例中,handle是請求的理想處理程序,但我們沒有定義它處理的類型。如果該類型包含要在其上回復(fù)的通道,則每個客戶機都可以為應(yīng)答提供自己的路徑。下面是Request類型的示意圖定義。

    type Request struct {
        args        []int
        f           func([]int) int
        resultChan  chan int
    }

    客戶端提供了一個函數(shù)及其參數(shù),以及請求對象內(nèi)用于接收answer的通道。

    func sum(a []int) (s int) {
        for _, v := range a {
            s += v
        }
        return
    }
    request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
    // Send request
    clientRequests <- request
    // Wait for response.
    fmt.Printf("answer: %d\n", <-request.resultChan)

    在服務(wù)器端,唯一需要更改的是處理程序函數(shù)。

    func handle(queue chan *Request) {
        for req := range queue {
            req.resultChan <- req.f(req.args)
        }
    }

    顯然,要實現(xiàn)它還有很多工作要做,但這段代碼是一個速率受限、并行、非阻塞RPC系統(tǒng)的框架,而且還沒有看到mutex 。

    5、并行(Parallelization)

    這些思想的另一個應(yīng)用是跨多個CPU核并行計算。如果計算可以被分解成可以獨立執(zhí)行的獨立部分,那么它就可以被并行化,并在每個部分完成時用一個通道發(fā)出信號。

    假設(shè)我們有一個昂貴的操作要對一個items的向量執(zhí)行,并且每個item的操作值是獨立的,就像在這個理想的例子中一樣。

    type Vector []float64
    // Apply the operation to v[i], v[i+1] ... up to v[n-1].
    func (v Vector) DoSome(i, n int, u Vector, c chan int) {
        for ; i < n; i++ {
            v[i] += u.Op(v[i])
        }
        c <- 1    // signal that this piece is done
    }

    我們在一個循環(huán)中獨立地啟動這些片段,每個CPU一個。它們可以按任何順序完成,但這沒有關(guān)系;我們只是在啟動所有的goroutine之后通過排泄通道來計算完成信號。

    const numCPU = 4 // number of CPU cores
    func (v Vector) DoAll(u Vector) {
        c := make(chan int, numCPU)  // Buffering optional but sensible.
        for i := 0; i < numCPU; i++ {
            go v.DoSome(i*len(v)/numCPU, (i+1)*len(v)/numCPU, u, c)
        }
        // Drain the channel.
        for i := 0; i < numCPU; i++ {
            <-c    // wait for one task to complete
        }
        // All done.
    }

    我們不需要為numCPU創(chuàng)建一個常量,而是可以詢問運行時哪個值是合適的。函數(shù)runtime.NumCPU返回機器中硬件CPU核數(shù),因此我們可以這樣寫

    還有一個函數(shù) runtime.GOMAXPROCS,它報告(或設(shè)置)用戶指定的Go程序可以同時運行的核數(shù)。默認(rèn)值為runtime.NumCPU,但可以通過設(shè)置類似命名的shell環(huán)境變量或調(diào)用帶有正數(shù)的函數(shù)來覆蓋。用0調(diào)用它只是查詢值。因此,如果我們想要滿足用戶的資源請求,我們應(yīng)該寫

    var numCPU = runtime.GOMAXPROCS(0)

    請務(wù)必不要混淆并發(fā)性(concurrency,將程序構(gòu)造為獨立執(zhí)行的組件)和并行性(parallelism, 在多個cpu上并行執(zhí)行計算以提高效率)這兩個概念。盡管Go的并發(fā)特性可以使一些問題很容易構(gòu)建為并行計算,但Go是一種并發(fā)語言,而不是并行語言,并且并不是所有的并行化問題都適合Go的模型。關(guān)于區(qū)別的討論,請參閱本文章中引用的談話。

    6、漏桶緩沖區(qū)(A leaky buffer)

    并發(fā)編程的工具甚至可以使非并發(fā)的想法更容易表達(dá)。下面是一個從RPC包中抽象出來的示例??蛻舳薵oroutine循環(huán)從某個源(可能是網(wǎng)絡(luò))接收數(shù)據(jù)。為了避免分配和釋放緩沖區(qū),它保留了一個空閑列表,并使用緩沖通道來表示它。如果通道為空,則分配一個新的緩沖區(qū)。一旦消息緩沖區(qū)準(zhǔn)備好了,它就被發(fā)送到serverChan上的服務(wù)器。

    var freeList = make(chan *Buffer, 100)
    var serverChan = make(chan *Buffer)
    func client() {
        for {
            var b *Buffer
            // Grab a buffer if available; allocate if not.
            select {
            case b = <-freeList:
                // Got one; nothing more to do.
            default:
                // None free, so allocate a new one.
                b = new(Buffer)
            }
            load(b)              // Read next message from the net.
            serverChan <- b      // Send to server.
        }
    }

    服務(wù)器循環(huán)從客戶端接收每條消息,處理它,并將緩沖區(qū)返回到空閑列表。

    func server() {
        for {
            b := <-serverChan    // Wait for work.
            process(b)
            // Reuse buffer if there's room.
            select {
            case freeList <- b:
                // Buffer on free list; nothing more to do.
            default:
                // Free list full, just carry on.
            }
        }
    }

    客戶端嘗試從freeList中檢索緩沖區(qū);如果沒有可用的,則分配一個新的。服務(wù)器發(fā)送給freeList的消息會將b放回空閑列表中,除非空閑列表已滿,在這種情況下,緩沖區(qū)將被丟棄在地板上,由垃圾收集器回收。(當(dāng)沒有其他case可用時,select語句中的default 子句將執(zhí)行,這意味著select語句永遠(yuǎn)不會阻塞。)此實現(xiàn)僅用幾行就構(gòu)建了一個漏桶列表,依賴于緩沖通道和垃圾收集器進行記賬。

    感謝各位的閱讀,以上就是“Golang并發(fā)編程怎么應(yīng)用”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對Golang并發(fā)編程怎么應(yīng)用這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

    向AI問一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI