您好,登錄后才能下訂單哦!
并發(fā):同一時(shí)間段內(nèi)執(zhí)行多個(gè)任務(wù)。
并行:同一時(shí)刻執(zhí)行多個(gè)任務(wù)。
Go語言的并發(fā)通過goroutine實(shí)現(xiàn)。goroutine類似于線程,屬于用戶態(tài)的線程,我們可以根據(jù)需要?jiǎng)?chuàng)建成千上萬個(gè)goroutine并發(fā)工作。goroutine是由go語言的運(yùn)行調(diào)度完成的,而線程是由操作系統(tǒng)調(diào)度完成的。
Go語言還提供了channel在多個(gè)goroutine間進(jìn)行通信。goroutine和channel是go語言秉承的CSP(Communicating Sequential Process)并發(fā)模式的重要實(shí)現(xiàn)基礎(chǔ)。
在Java/c++中我們要實(shí)現(xiàn)并發(fā)編程的時(shí)候,我們通常要自己維護(hù)一個(gè)線程池,并且需要自己去包裝一個(gè)又一個(gè)的任務(wù)和然后自己去調(diào)度線程執(zhí)行任務(wù)并維護(hù)上線文的切換,這一切通常會(huì)耗費(fèi)程序員的大量心智。能不能有一種機(jī)制,程序員只需要定義很多個(gè)任務(wù),讓系統(tǒng)去幫忙我們把這些任務(wù)分配到CPU上實(shí)現(xiàn)并發(fā)執(zhí)行呢? Go語言中的goroutine就是這樣一種機(jī)制,Go語言之所以能被稱為現(xiàn)代化的編程語言,就是因?yàn)樗谡Z言層面已經(jīng)內(nèi)置了調(diào)度和上下文切換的機(jī)制。
Go程序中使用go關(guān)鍵字為一個(gè)函數(shù)創(chuàng)建一個(gè)goroutine。一個(gè)函數(shù)可以被創(chuàng)建多個(gè)goroutine,一個(gè)goroutine必定對應(yīng)一個(gè)函數(shù)。
啟動(dòng)goroutine的方式非常簡單,只需要在調(diào)用的函數(shù)(普通函數(shù)和匿名函數(shù))前面加一個(gè)go 關(guān)鍵字。
舉個(gè)例子:
package main
import (
"fmt"
)
func hello() {
fmt.Println("hello goroutine")
}
func main() {
hello()
fmt.Println("main goroutine done!")
}
這個(gè)示例中hello函數(shù)和下面的語句是串行的,執(zhí)行的結(jié)果是打印完hello goroutine 后打印main goroutine done!
接下來我們在調(diào)用hello 函數(shù)前面加上go關(guān)鍵字,也就是啟動(dòng)一個(gè)goroutine區(qū)執(zhí)行hello這個(gè)函數(shù)。
func main() {
go hello()
fmt.Println("main goroutine done!")
}
這一次的執(zhí)行結(jié)果只打印了 main goroutine done! , 并沒有打印 hello goroutine,為什么呢?
在程序啟動(dòng)時(shí),Go程序就會(huì)為main()函數(shù)創(chuàng)建一個(gè)默認(rèn)的goroutine。當(dāng)main函數(shù)返回的時(shí)候該goroutine就結(jié)束了,所有在main()函數(shù)中啟動(dòng)的goroutine 會(huì)一同結(jié)束,main函數(shù)所在的goroutine就像是權(quán)利的游戲中的夜王,其他的goroutine就像是異鬼,夜王一死它轉(zhuǎn)化的那些異鬼也就全部GG了。
所以我們要想辦法讓main函數(shù)等一等hello函數(shù),最簡單粗暴的方式就是sleep了。
func main(){
go hello()
fmt.Println("main goroutine done!")
time.Sleep(time.Second)
}
執(zhí)行上面的代碼你會(huì)發(fā)現(xiàn),這一次先打印main goroutine done! ,然后緊接著打印Hello Goroutine!
首先為什么會(huì)打印main goroutine done! 是因?yàn)槲覀冊趧?chuàng)建新的goroutine 的時(shí)候需要花費(fèi)一些時(shí)間,而此時(shí)main函數(shù)所在的goroutine是繼續(xù)執(zhí)行的。
在代碼中生硬的使用time.sleep肯定是不合適的,Go語言中可以使用sync.WaitGroup來實(shí)現(xiàn)并發(fā)任務(wù)的同步。sync.WaitGroup有一下幾個(gè)方法:
方法名 功能
(wg WaitGroup)Add(delta int) 計(jì)數(shù)器+delta
(wg WaitGroup)Done() 計(jì)算器-1
(wg *WaitGroup)Wait() 阻塞直到計(jì)數(shù)器變?yōu)?
sync.WaitGroup內(nèi)部維護(hù)著一個(gè)計(jì)數(shù)器,計(jì)數(shù)器的值可以增加和減少。例如當(dāng)我們啟動(dòng)了N個(gè)并發(fā)任務(wù)時(shí),就將計(jì)數(shù)器的值增加N, 每個(gè)任務(wù)完成時(shí)通過調(diào)用Done()方法將計(jì)數(shù)器減1,通過調(diào)用Wait()來等待并發(fā)任務(wù)執(zhí)行完,當(dāng)計(jì)數(shù)器值為0時(shí),表示所有并發(fā)任務(wù)已經(jīng)完成。
我們利用sync.WaitGroup將上面的代碼優(yōu)化一下:
var wg sync.WaitGroup
func hello() {
defer wg.Done()
fmt.Println("hello 1")
}
func main() {
wg.Add(1)
go hello()
fmt.Println("main 2")
wg.Wait()
}
需要注意的是sync.WaitGroup是一個(gè)結(jié)構(gòu)體,傳遞的時(shí)候需要傳遞指針。
在go語言中實(shí)現(xiàn)并發(fā)就是這么簡單,我們還可以啟動(dòng)多個(gè)goroutine。讓我們再來一個(gè)例子:
var wg sync.WaitGroup
func hello(i int) {
defer wg.Done()
fmt.Println("hello ,", i)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1)
go hello(i)
}
wg.Wait()
}
多次執(zhí)行上面的代碼,會(huì)發(fā)現(xiàn)每次打印的數(shù)字的順序都不一樣,這是因?yàn)?0個(gè)goroutine是并發(fā)執(zhí)行的,而goroutine的調(diào)度室隨機(jī)的。
OS線程一般都有固定的棧內(nèi)存(通常為2MB),一個(gè)goroutine的棧在其生命周期開始時(shí)只有很小的棧(典型情況下2KB),goroutine的棧不是固定的,他可以按需增大和縮小,goroutine的棧大小限制可以達(dá)到1GB,雖然極少會(huì)用到這么大。所以在go語言中一次創(chuàng)建十萬左右的goroutine也是可以的。
os線程是由os內(nèi)核來調(diào)度的,goroutine則是由go運(yùn)行時(shí)自己的調(diào)度器來調(diào)度的,這個(gè)調(diào)度器使用一個(gè)稱為m:n調(diào)度的技術(shù)(復(fù)用/調(diào)度m個(gè)goroutine到n個(gè)OS線程上)。goroutine的調(diào)度不需要切換內(nèi)核語境,所以調(diào)度一個(gè)goroutine比調(diào)度一個(gè)線程成本低很多。
go運(yùn)行時(shí)的調(diào)度器使用GOMAXPROCS 參數(shù)來確定需要使用多少個(gè)OS線程來同時(shí)執(zhí)行GO代碼,默認(rèn)值是機(jī)器上的CPU的核心數(shù),例如在一個(gè)8核的機(jī)器上,調(diào)度器會(huì)把go代碼同時(shí)調(diào)度到8個(gè)OS線程上。go語言中可以通過runtime.GOMAXPROCS()函數(shù)設(shè)置當(dāng)前程序并發(fā)時(shí)占用的CPU邏輯核心數(shù)。
go1.5版本之前,默認(rèn)使用的是單核心執(zhí)行,Go1.5版本之后,默認(rèn)使用全部的CPU邏輯核心數(shù)。
GO語言中的操作系統(tǒng)線程和goroutine的關(guān)系:
1.一個(gè)操作系統(tǒng)線程對應(yīng)用戶態(tài)多個(gè)goroutine。
2.go程序可以同時(shí)多個(gè)操作系統(tǒng)線程。
3.goroutine 和OS線程是多對多的關(guān)系,即m:n
單純的將函數(shù)并發(fā)執(zhí)行是沒有意義的,函數(shù)與函數(shù)間需要交換數(shù)據(jù)才能體現(xiàn)并發(fā)執(zhí)行函數(shù)的意義。
雖然可以使用共享內(nèi)存進(jìn)行數(shù)據(jù)交換,但是共享內(nèi)存存在不同的goroutine中容易發(fā)生的競態(tài)問題。為了保證數(shù)據(jù)交換的正確性,必須使用互斥量對內(nèi)存進(jìn)行加鎖,這種做法必然導(dǎo)致性能問題。
go語言的并發(fā)模型是CSP,提倡通過通信共享內(nèi)存,而不是通過共享內(nèi)存實(shí)現(xiàn)通信。
如果說goroutine是Go程序并發(fā)的執(zhí)行體,channel就是它們之間的連接,channel是可以讓一個(gè)goroutine發(fā)送特定值到另一個(gè)goroutine的通信機(jī)制。
go語言中的通道channel是一種特殊的類型,通道像一個(gè)傳送帶或者隊(duì)列,總是遵循先入先出的規(guī)則,保證收發(fā)數(shù)據(jù)的順序,每個(gè)通道都是一個(gè)具體類型的導(dǎo)管,也就是聲明channel的時(shí)候需要為其指定元素類型。
聲明通道類型的格式如下:
var 變量 chan 元素類型
舉幾個(gè)例子:
var ch2 chan int
var ch3 chan bool
var ch4 chan []int
通道是引用類型,通道類型的空值是nil。
var ch chan int
fmt.Println(ch) //<nil>
聲明的通道后需要使用make函數(shù)初始化之后才能使用。創(chuàng)建channel的格式如下:
make(chan 元素類型, [緩沖大小])
緩沖大小是可選的。
舉幾個(gè)例子:
cha4 := make(chan int)
cha5 := make(chan bool)
cha6 := make(chan []int)
通道有發(fā)送(send)、接收(receive)和關(guān)閉(close)三種操作。發(fā)送和接收都使用<- 符號(hào)。
現(xiàn)在我們先使用以下語句定義一個(gè)通道:
ch := make(chan int)
將一個(gè)值發(fā)送到通道中。
ch <- 10 //把10發(fā)送到ch中
從一個(gè)通道中接收值。
x := <- ch //從ch中接收值并賦值給變量x
<- ch // 從ch中接收值,忽略結(jié)果
我們通過調(diào)用內(nèi)置的close函數(shù)來關(guān)閉通道。
close(ch)
關(guān)于關(guān)閉通道需要注意的事情是,只有在通知接收方goroutine所有的數(shù)據(jù)都發(fā)送完畢的時(shí)候才需要關(guān)閉通道。通道是可以被垃圾回收機(jī)制回收的,他和關(guān)閉文件不一樣,在結(jié)束操作之后關(guān)閉文件是必須做的,但是關(guān)閉通道不是必須的。
關(guān)閉后的通道有以下特點(diǎn):
無緩沖的通道又稱為阻塞的通道。我們來看一下下面的代碼:
func main(){
ch := make(chan int)
ch <- 10
fmt.Println("發(fā)送成功了")
}
上面的代碼能夠通過編譯,但是執(zhí)行的時(shí)候會(huì)出現(xiàn)以下錯(cuò)誤:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/Users/huoshuaibing/gowork/src/github.com/studygolang/day12/main.go:9 +0x54
exit status 2
為什么會(huì)出現(xiàn)deadlock錯(cuò)誤呢?
因?yàn)槲覀兪褂胏h := make(chan int)創(chuàng)建的是無緩沖的通道,無緩沖的通道只有在有人接收值的時(shí)候才能發(fā)送值。就像你住的小區(qū)沒有代收點(diǎn)和快遞柜,快遞員給你打電話必須把這個(gè)物品送到你的手中,簡單來說就是無緩沖的通道必須有接收才能發(fā)送。
上面的代碼會(huì)阻塞在 ch <- 10 這一行代碼形成死鎖,那么如何解決這個(gè)問題呢?
一種方法是啟用一個(gè)goroutine去接收值,例如:
func recv(c chan int) {
ret := <-c
fmt.Println("接收成功", ret)
}
func main() {
ch := make(chan int)
go recv(ch)
ch <- 10
fmt.Println("發(fā)送成功")
}
無緩沖的通道上的發(fā)送操作會(huì)阻塞,直到另一個(gè)goroutine在該通道上執(zhí)行接收操作,這時(shí)值才能發(fā)送成功,兩個(gè)goroutine將繼續(xù)執(zhí)行;相反,如果接收操作先執(zhí)行,接收方的goroutine將阻塞,直到另一個(gè)goroiutine在該通道上發(fā)送一個(gè)值。
使用無緩沖的通道進(jìn)行通信將導(dǎo)致發(fā)送和接收的goroutine同步化,因此,無緩沖的通道也被稱為同步通道。
解決上面的問題的方法還有一種就是使用有緩沖區(qū)的通道。我們可以使用make函數(shù)初始通道的時(shí)候?yàn)槠渲付ㄍǖ赖娜萘?,例如?/p>
func main() {
ch := make(chan int, 1)
ch <- 10
fmt.Println("發(fā)送成功")
}
只要通道的容量大于零,那么該通道就是有緩沖的通道,通道的容量表示通道中能存放元素的數(shù)量。就像你小區(qū)的快遞柜只有那么多個(gè)格子,格子滿了就裝不下了,就阻塞了,等到別人取走一個(gè)快遞員才能往里面放一個(gè)。
我們可以使用內(nèi)置的len函數(shù)獲取通道內(nèi)元素的數(shù)量,使用cap函數(shù)獲取通道的容量。
當(dāng)通過通道發(fā)送有限的數(shù)據(jù)時(shí),我們可以通過close函數(shù)關(guān)閉通道來告知從該通道接收值的goroutine停止等待。當(dāng)通道關(guān)閉時(shí),往該通道發(fā)送值會(huì)引發(fā)panic,從該通道里接收的值一直都是類型零值。那如何判斷一個(gè)通道是否被關(guān)閉了呢?
我們來看下面的例子:
func main() {
ch2 := make(chan int)
ch3 := make(chan int)
go func() {
for i := 0; i < 100; i++ {
ch2 <- i
}
close(ch2)
}()
go func() {
for {
i, ok := <-ch2
if !ok {
break
}
ch3 <- i * i
}
close(ch3)
}()
for i := range ch3 {
fmt.Println(i)
}
}
從上面的例子中我們看到兩種方式在接收值得時(shí)候判斷通道是否被關(guān)閉,我們通常使用的是for range 的方式。
有時(shí)候我們會(huì)將通道作為參數(shù)在多個(gè)任務(wù)函數(shù)間傳遞,很多時(shí)候我們在不同的任務(wù)函數(shù)中使用通道都會(huì)對其進(jìn)行限制,比如只能發(fā)送或接收。Go語言中提供了單向通道來處理這種情況。例如,我們把上面的例子改造如下:
func counter(out chan<- int) {
for i := 0; i < 100; i++ {
out <- i
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for i := range in {
out <- i * i
}
close(out)
}
func printer(in <-chan int) {
for i := range in {
fmt.Println(i)
}
}
func main() {
ch2 := make(chan int)
ch3 := make(chan int)
go counter(ch2)
go squarer(ch3, ch2)
printer(ch3)
}
其中,chan<- int 是一個(gè)只能發(fā)送的通道,可以發(fā)送但是不能接收; <-chan int 是一個(gè)只能接收的通道,可以接收但是不能發(fā)送。在函數(shù)傳參及任何賦值操作中將雙向通道轉(zhuǎn)換為單向通道是可以的,但是反過來不可以的。
在某些場景下我們需要同時(shí)從多個(gè)通道接收數(shù)據(jù)。通道在接收數(shù)據(jù)時(shí),如果沒有數(shù)據(jù)可以接收將會(huì)發(fā)生阻塞。你也許會(huì)寫出如下代碼使用遍歷的方式來實(shí)現(xiàn):
for {
data,ok := <- ch2
data,ok := <- ch3
...
}
這種方式雖然可以實(shí)現(xiàn)從多個(gè)通道接收值的需求,但是運(yùn)行性能會(huì)差很多。為了應(yīng)對這種場景,GO內(nèi)置了select關(guān)鍵字,可以同時(shí)響應(yīng)多個(gè)通道的操作。select的使用類似于switch語句,它有一些列case分支和一個(gè)默認(rèn)分支,每個(gè)case會(huì)對應(yīng)一個(gè)通道的通信過程。select會(huì)一直等待,直到某個(gè)case的通信操作完成,就會(huì)執(zhí)行case分支對應(yīng)的語句。具體格式如下:
select {
case <- ch2:
...
case <-ch3
...
default:
默認(rèn)操作
}
舉個(gè)小例子來演示一下select 的使用:
package main
import (
"fmt"
)
func main() {
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
select {
case x := <-ch:
fmt.Println(x)
case ch <- i:
}
}
}
使用select語句能提高代碼的可讀性。如果多個(gè)case同時(shí)滿足,那么select會(huì)隨機(jī)挑選一個(gè)。對于沒有case的select{}會(huì)一直等待。
有時(shí)候在Go代碼中可能會(huì)存在多個(gè)goroutine同時(shí)操作一個(gè)資源(臨界區(qū)),這種情況會(huì)發(fā)生競態(tài)問題。類比生活中的例子有十字路口被各個(gè)方向的汽車競爭;還有火車上的衛(wèi)生間被車廂里面的人競爭。舉個(gè)例子:
var x int64
var wg sync.WaitGroup
func add() {
for i := 0; i < 100; i++ {
x = x + 1
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
上面的代碼我們開啟了兩個(gè)goroutine 去累加變量x的值,這兩個(gè)goroutine在訪問和修改x變量的時(shí)候就會(huì)存在數(shù)據(jù)競爭,導(dǎo)致最后的結(jié)果與期待的不符。
互斥鎖是一種常用的控制共享資源訪問的方法,他們能夠保證同時(shí)只有一個(gè)goroutine可以訪問共享資源。Go語言中使用sync包的Mutex來實(shí)現(xiàn)互斥鎖。使用互斥鎖來修復(fù)上面代碼的問題:
var x int64
var wg sync.WaitGroup
var lock sync.Mutex
func add() {
for i := 0; i < 100; i++ {
lock.Lock()
x = x + 1
lock.Unlock()
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
使用互斥鎖能夠保證同一時(shí)間有且只有一個(gè)goroutine進(jìn)入臨界區(qū),其他的goroutine 則在等待鎖;當(dāng)互斥鎖釋放后,等待goroutine才可以獲得鎖進(jìn)入臨界區(qū),多個(gè)goroutine同時(shí)等待一個(gè)鎖時(shí),喚醒的策略是隨機(jī)的。
互斥鎖時(shí)完全互斥的,實(shí)際情況是很多情景下是讀多寫少的,當(dāng)我們并發(fā)的去讀一個(gè)資源不涉及資源修改的時(shí)候是沒有必要加鎖的,這種場景下使用讀寫鎖是更好的一種選擇。讀寫鎖在Go語言中使用sync包的RWMutex 類型。
讀寫鎖分兩種:讀鎖和寫鎖。當(dāng)一個(gè)goroutine獲取讀鎖之后,其他的goroutine如果是獲取讀鎖會(huì)繼續(xù)獲得鎖,如果獲取的是寫鎖就會(huì)等待,當(dāng)一個(gè)goroutine獲取寫鎖之后,其他的goroutine無論是獲取讀鎖還是寫鎖都會(huì)等待。
讀寫鎖示例:
var (
x int64
wg sync.WaitGroup
lock sync.Mutex
rwlock sync.RWMutex
)
func write() {
// lock.Lock() // 加互斥鎖
rwlock.Lock() // 加寫鎖
x = x + 1
time.Sleep(10 * time.Millisecond) // 假設(shè)讀操作耗時(shí)10毫秒
rwlock.Unlock() // 解寫鎖
// lock.Unlock() // 解互斥鎖
wg.Done()
}
func read() {
// lock.Lock() // 加互斥鎖
rwlock.RLock() // 加讀鎖
time.Sleep(time.Millisecond) // 假設(shè)讀操作耗時(shí)1毫秒
rwlock.RUnlock() // 解讀鎖
// lock.Unlock() // 解互斥鎖
wg.Done()
}
func main() {
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
for i := 0; i < 1000; i++ {
wg.Add(1)
go read()
}
wg.Wait()
end := time.Now()
fmt.Println(end.Sub(start))
}
說在前面:這是一個(gè)進(jìn)階知識(shí)點(diǎn)。延遲一個(gè)開銷很大的初始化操作到真正用到它的時(shí)候再執(zhí)行是一個(gè)很好的實(shí)踐。因?yàn)轭A(yù)先初始化一個(gè)變量(比如在init函數(shù)中完成初始化)會(huì)增加程序啟動(dòng)延時(shí),而且有可能實(shí)際執(zhí)行過程中這個(gè)變量沒有用上,那么這個(gè)初始化操作就不是必須要的。我們來看一個(gè)例子:
sync.Once其實(shí)內(nèi)部包含一個(gè)互斥鎖和一個(gè)布爾值,互斥鎖保證布爾值和數(shù)據(jù)的安全,而布爾值用來記錄初始化是否完成。這樣設(shè)計(jì)就能保證初始化操作的時(shí)候是并發(fā)安全的并且初始化操作也不會(huì)執(zhí)行多次。
Go語言中內(nèi)置的map不是并發(fā)安全的。請看下面的示例:
var m = make(map[string]int)
func get(key string) int {
return m[key]
}
func set(key string, value int) {
m[key] = value
}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
set(key, n)
fmt.Println("k=:%v, v:=%v\n", key, get(key))
wg.Done()
}(i)
}
wg.Wait()
}
上面的代碼開啟少量幾個(gè)goroutine 的時(shí)候沒有問題,當(dāng)并發(fā)對了之后,執(zhí)行就會(huì)報(bào)fatal error: concurrent map writes錯(cuò)誤。
像這種場景下就需要為map加鎖來保證并發(fā)的安全性,go語言的sync包中提供了一個(gè)開箱即用的并發(fā)安全版map-sync.Map。同時(shí)sync.Map 內(nèi)置了諸如Store、Load、LoadOrStore、Delete、Range等操作方法。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。