您好,登錄后才能下訂單哦!
這篇文章為大家分享有關(guān)Golang中g(shù)oroutine和channel的使用方法。文章涵蓋goroutine和channel以及select的使用方法,希望大家通過這篇文章能有所收獲。
并發(fā):同一時間段執(zhí)行多個任務(wù)(使用微信和多個朋友聊天)
并行:同一時刻執(zhí)行多個任務(wù)(windows中360在殺毒,同時你也在寫代碼)
Go語言的并發(fā)通過goroutine實現(xiàn)。goroutine類似于線程,屬于用戶態(tài)的線程,我們可以根據(jù)需要創(chuàng)建成千上萬個goroutine并發(fā)工作。
goroutine是由Go語言的運行時(runtime)調(diào)度完成,而線程是由操作系統(tǒng)調(diào)度完成。
Go語言還提供channel在多個goroutine間進行通信。goroutine和channel是Go語言秉承的CSP(Communication Sequential Process)并發(fā)模式的重要實現(xiàn)基礎(chǔ)。
在java/Python中,我們實現(xiàn)并發(fā)編程的時候,通常需要自己維護一個線程池,并且需要自己去包裝一個又一個的任務(wù),同時需要自己去調(diào)度線程執(zhí)行任務(wù)并維護上下文切換,這一切需要耗費很多。
Go語言中的goroutine,類似于線程,但goroutine是由Go的運行時(runtime)調(diào)度和管理的。Go程序能夠只能的將goroutine中的任務(wù)合理的分配到每個CPU。Go語言被稱為現(xiàn)代化語言的原因,就是因為Go在語言層面就已經(jīng)內(nèi)置了調(diào)度和上下文切換的機制。
在Go語言編程中,不需要自己寫進程、線程、協(xié)程,你的技能只有一個,就是goroutine。
Go語言中使用goroutine非常簡單,只需要在調(diào)用函數(shù)前面加上"go"關(guān)鍵字,就可以為一個函數(shù)創(chuàng)建一個goroutine。
一個goroutine必定對應(yīng)一個函數(shù),可以創(chuàng)建多個goroutine去執(zhí)行相同的函數(shù)。
//
package main
import (
"fmt"
)
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
hello()
fmt.Println("main goroutine done!")
}
結(jié)果:
Hello Goroutine!
main goroutine done!
Process finished with exit code 0
Mac系統(tǒng)上實驗
package main
import (
"fmt"
)
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
go hello()
fmt.Println("main goroutine done!")
}
結(jié)果1:
main goroutine done!
Process finished with exit code 0
結(jié)果2:
main goroutine done!
Hello Goroutine!
Process finished with exit code 0
結(jié)果3:
Hello Goroutine!
main goroutine done!
Process finished with exit code 0
會發(fā)現(xiàn),出現(xiàn)了只打印了main goroutine done的現(xiàn)象,是因為main函數(shù)也是一個goroutine,main函數(shù)執(zhí)行完了,整個程序就結(jié)束了。
Go語言中實現(xiàn)并發(fā)就是這么簡單,可以啟動多個goroutine。
這里使用sync.WaitGroup來實現(xiàn)goroutine的同步。
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func hello(i interface{}) {
defer wg.Done() //goroutine結(jié)束就登記-1
fmt.Println("Hello Goroutine! i:",i)
}
func main() {
for i:=0;i<10;i++{
wg.Add(1) //啟動一個goroutine就登記+1
go hello(i)
}
wg.Wait()//等待所有等級的goroutine都結(jié)束
}
結(jié)果:
Hello Goroutine! i: 9
Hello Goroutine! i: 7
Hello Goroutine! i: 2
Hello Goroutine! i: 0
Hello Goroutine! i: 3
Hello Goroutine! i: 5
Hello Goroutine! i: 1
Hello Goroutine! i: 6
Hello Goroutine! i: 4
Hello Goroutine! i: 8
Process finished with exit code 0
多次執(zhí)行上面的代碼,會發(fā)現(xiàn)每次打印的數(shù)字順序都不一樣。這是因為10個goroutine是并發(fā)執(zhí)行的,而goroutine的調(diào)度室隨機的。
OS線程(操作系統(tǒng)線程)一般都有固定的棧內(nèi)存(通常為2MB),一個goroutine的棧在其生命周期開始時只有很小的棧(典型情況下2KB),goroutine的棧不是固定的,他可以按需增大和縮小,goroutine的棧大小限制可以達到1GB,雖然很少會用到這么大。
GPM是Go語言運行時(runtime)層面的實現(xiàn),是go語言自己實現(xiàn)的一套調(diào)度系統(tǒng)。區(qū)別于操作系統(tǒng)調(diào)度OS線程。
G很好理解,就是個goroutine的,里面除了存放本goroutine信息外 還有與所在P的綁定等信息。
P管理著一組goroutine隊列,P里面會存儲當前goroutine運行的上下文環(huán)境(函數(shù)指針,堆棧地址及地址邊界),P會對自己管理的goroutine隊列做一些調(diào)度(比如把占用CPU時間較長的goroutine暫停、運行后續(xù)的goroutine等等)當自己的隊列消費完了就去全局隊列里取,如果全局隊列里也消費完了會去其他P的隊列里搶任務(wù)。
M(machine)是Go運行時(runtime)對操作系統(tǒng)內(nèi)核線程的虛擬, M與內(nèi)核線程一般是一一映射的關(guān)系, 一個groutine最終是要放到M上執(zhí)行的;
P與M一般也是一一對應(yīng)的。他們關(guān)系是: P管理著一組G掛載在M上運行。當一個G長久阻塞在一個M上時,runtime會新建一個M,阻塞G所在的P會把其他的G 掛載在新建的M上。當舊的G阻塞完成或者認為其已經(jīng)死掉時 回收舊的M。
P的個數(shù)是通過runtime.GOMAXPROCS設(shè)定(最大256),Go1.5版本之后默認為物理線程數(shù)。 在并發(fā)量大的時候會增加一些P和M,但不會太多,切換太頻繁的話得不償失。
單從線程調(diào)度講,Go語言相比起其他語言的優(yōu)勢在于OS線程是由OS內(nèi)核來調(diào)度的,goroutine則是由Go運行時(runtime)自己的調(diào)度器調(diào)度的,這個調(diào)度器使用一個稱為m:n調(diào)度的技術(shù)(復(fù)用/調(diào)度m個goroutine到n個OS線程)。
其一大特點是goroutine的調(diào)度是在用戶態(tài)下完成的, 不涉及內(nèi)核態(tài)與用戶態(tài)之間的頻繁切換,包括內(nèi)存的分配與釋放,都是在用戶態(tài)維護著一塊大的內(nèi)存池, 不直接調(diào)用系統(tǒng)的malloc函數(shù)(除非內(nèi)存池需要改變),成本比調(diào)度OS線程低很多。
另一方面充分利用了多核的硬件資源,近似的把若干goroutine均分在物理線程上, 再加上本身goroutine的超輕量,以上種種保證了go調(diào)度方面的性能。
Go運行時的調(diào)度器使用GOMAXPROCS參數(shù)來確定需要使用多少個OS線程來同時執(zhí)行Go代碼。默認值是機器上的CPU核心數(shù)。
例如在一個8個CPU的機器上,調(diào)度器會把Go代碼同時調(diào)度到8個OS線程上(GOMAXPROCS是m:n中的n)
Go語言通過runtime.GOMAXPROCS()函數(shù)設(shè)置當前程序并發(fā)時占用的CPU邏輯核心數(shù)。
我們可以通過將任務(wù)分配到不同的CPU邏輯核心上實現(xiàn)并行的效果,這里舉個例子:
設(shè)置GOMAXPROCS=1,goroutine啟動兩個任務(wù),此時是一個任務(wù)執(zhí)行完了才能執(zhí)行另一個任務(wù)
package main
import (
"fmt"
"runtime"
"time"
)
func a() {
for i := 1; i < 10; i++ {
fmt.Println("A:", i)
}
}
func b() {
for i := 1; i < 10; i++ {
fmt.Println("B:", i)
}
}
func main() {
runtime.GOMAXPROCS(1)
go a()
go b()
time.Sleep(time.Second)
}
結(jié)果:
A: 1
A: 2
A: 3
A: 4
A: 5
A: 6
A: 7
A: 8
A: 9
B: 1
B: 2
B: 3
B: 4
B: 5
B: 6
B: 7
B: 8
B: 9
Process finished with exit code 0
設(shè)置GOMAXPROCS=2,goroutine啟動兩個任務(wù),兩個任務(wù)同時執(zhí)行,出現(xiàn)兩個任務(wù)交互打印現(xiàn)象,要多試幾次,需要筆記本是多個CPU哦!我在mac上測試成功的。
package main
import (
"fmt"
"runtime"
"time"
)
func a() {
for i := 1; i < 10; i++ {
fmt.Println("A:", i)
}
}
func b() {
for i := 1; i < 10; i++ {
fmt.Println("B:", i)
}
}
func main() {
runtime.GOMAXPROCS(2)
go a()
go b()
time.Sleep(time.Second)
}
結(jié)果:
A: 1
B: 1
B: 2
B: 3
B: 4
B: 5
B: 6
B: 7
B: 8
B: 9
A: 2
A: 3
A: 4
A: 5
A: 6
A: 7
A: 8
A: 9
Process finished with exit code 0
//Go語言中的操作系統(tǒng)線程和goroutine的關(guān)系:
1.一個操作系統(tǒng)線程對應(yīng)用戶態(tài)多個goroutine。
2.go程序可以同時使用多個操作系統(tǒng)線程。
3.goroutine和OS線程是多對多的關(guān)系,即m:n。
單純地將函數(shù)并發(fā)執(zhí)行是沒有意義的。函數(shù)與函數(shù)間需要交換數(shù)據(jù)才能體現(xiàn)并發(fā)執(zhí)行函數(shù)的意義。
雖然可以使用共享內(nèi)存進行數(shù)據(jù)交換,但是共享內(nèi)存在不同的goroutine中容易發(fā)生競態(tài)問題。為了保證數(shù)據(jù)交換的正確性,必須使用互斥量對內(nèi)存進行加鎖,這種做法勢必造成性能問題。
Go語言的并發(fā)模型是CSP(Communicating Sequential Processes),提倡通過通信共享內(nèi)存而不是通過共享內(nèi)存而實現(xiàn)通信。
如果說goroutine是Go程序并發(fā)的執(zhí)行體,channel就是它們之間的連接。channel是可以讓一個goroutine發(fā)送特定值到另一個goroutine的通信機制。
Go 語言中的通道(channel)是一種特殊的類型。通道像一個傳送帶或者隊列,總是遵循先入先出(First In First Out)的規(guī)則,保證收發(fā)數(shù)據(jù)的順序。每一個通道都是一個具體類型的導管,也就是聲明channel的時候需要為其指定元素類型。
channel是一種類型,一種引用類型。生命通道類型的格式如下:
var 變量 chan 元素類型
package main
import (
"fmt"
)
func main() {
var ch2 chan int // 聲明一個傳遞整型的通道
var ch3 chan bool // 聲明一個傳遞布爾型的通道
var ch4 chan []int // 聲明一個傳遞int切片的通道
fmt.Printf("v:%v type:%T\n",ch2,ch2)
fmt.Printf("v:%v type:%T\n",ch3,ch3)
fmt.Printf("v:%v type:%T\n",ch4,ch4)
}
結(jié)果:
v:<nil> type:chan int
v:<nil> type:chan bool
v:<nil> type:chan []int
Process finished with exit code 0
通道是引用類型,通道類型的空值是nil。
var ch chan int
fmt.Println(ch) // <nil>
聲明的通道后需要使用make函數(shù)初始化后才能使用。
創(chuàng)建channel的格式如下:
make(chan 元素類型, [緩沖大小])
channel的緩沖大小是可選的。
package main
import (
"fmt"
)
func main() {
ch5 := make(chan int)
ch6 := make(chan bool)
ch7 := make(chan []int)
fmt.Printf("v:%v type:%T\n",ch5,ch5)
fmt.Printf("v:%v type:%T\n",ch6,ch6)
fmt.Printf("v:%v type:%T\n",ch7,ch7)
}
結(jié)果:
v:0xc000012060 type:chan int
v:0xc0000120c0 type:chan bool
v:0xc000012120 type:chan []int
Process finished with exit code 0
通道有發(fā)送(send)、接收(receive)和關(guān)閉(close)三種操作。
發(fā)送和接收都使用<-
定義通道:ch := make(chan int)
發(fā)送:將一個值發(fā)送到通道中。
ch <- 10 //把10發(fā)送到通道中
接收:從一個通道中接收值。
a := <- ch //從ch中接收值,并賦值給a
<- ch //從ch中接收值,忽略結(jié)果
關(guān)閉:關(guān)閉通道。
close(ch)
注意:
1.只有在通知接收方goroutine所有的數(shù)據(jù)都發(fā)送完畢的時候,才需要關(guān)閉通道。
2.通道是可以被垃圾回收機制回收的,與關(guān)閉文件不一樣,文件操作結(jié)束后文件是必須關(guān)閉的,但通道不是必須關(guān)閉的。
關(guān)閉后的通道有以下特點:
1.對一個關(guān)閉的通道再發(fā)送值會導致panic。
2.對一個關(guān)閉的通道進行接收值,會一直獲取值直到通道為空。
3.對一個關(guān)閉的并且沒有值得通道執(zhí)行接收操作,會得到對應(yīng)類型的零值。
4.關(guān)閉一個已經(jīng)關(guān)閉的通道會導致panic。
無緩沖通道稱為阻塞通道。無緩沖通道必須在發(fā)送數(shù)據(jù)的同時有人接收值,否則會阻塞在那里,直到報錯。
//無緩沖通道,只發(fā)送值不接收值的時候會出現(xiàn)deadlock錯誤。
package main
import "fmt"
func main() {
ch := make(chan int)
ch <- 10
fmt.Printf("發(fā)送成功!")
}
結(jié)果:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/Users/tongchao/Desktop/gopath/src/test/test.go:7 +0x54
Process finished with exit code 2
因為我們使用ch := make(chan int)創(chuàng)建的是無緩沖通道,無緩沖通道只有在有人接收值的時候才能發(fā)送值。
上買呢代碼會阻塞在ch <- 10,這一行代碼會形成死鎖。
解決方法:使用goroutine去接收值
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func recv(ch chan int) {
defer wg.Done()
i := <- ch
fmt.Println("接收的值是:",i)
}
func main() {
ch := make(chan int)
wg.Add(1)
go recv(ch)
ch <- 10
wg.Wait()
fmt.Printf("發(fā)送成功!\n")
}
結(jié)果:
接收的值是: 10
發(fā)送成功!
Process finished with exit code 0
無緩沖通道上的發(fā)送操作會阻塞,直到另一個goroutine在該通道上執(zhí)行接收操作,這時才能發(fā)送成功,兩個goroutine將繼續(xù)執(zhí)行。
如果接收操作限制性,接收方的goroutine將會阻塞,直到另一個goroutine在該通道上發(fā)送一個值。
//使用無緩沖通道進行通信,將會導致發(fā)送和接收的goroutine同步化。因此,無緩沖通道也被稱為同步通道。
解決上面問題的方法還有一種就是使用有緩沖的通道。我們可以在使用make函數(shù)初始化通道的時候為其制定通道的容量,只要通道的容量大于零,就是有緩沖的通道,通道的容量表示通道中能存放的元素的數(shù)量。
可以使用len()獲取通道內(nèi)元素的數(shù)量,使用cap函數(shù)獲取通道的容量。
package main
import (
"fmt"
)
func main() {
ch := make(chan int,1) //創(chuàng)建一個容量為1的有緩沖區(qū)通道
ch <- 10
fmt.Printf("發(fā)送成功!\n")
fmt.Println("len(ch):",len(ch))
fmt.Println("cap(ch)",cap(ch))
}
結(jié)果:
發(fā)送成功!
len(ch): 1
cap(ch) 1
Process finished with exit code 0
當向通道中發(fā)送完數(shù)據(jù)時,我們可以通過close函數(shù)關(guān)閉通道。
當通道被關(guān)閉時,再往該通道發(fā)送值會引發(fā)panic,從該通道里接收值一直都是類型0值。那么如何判斷一個通道是否被關(guān)閉了呢?
方法一:
i, ok := <-ch2 // 通道關(guān)閉后再取值ok=false
方法二:
for range遍歷通道,通道被關(guān)閉時就會退出for range。
package main
import "fmt"
func main() {
ch2 := make(chan int)
ch3 := make(chan int)
//開啟goroutine將0-100的數(shù)發(fā)送到ch2中
go func() {
for i:=0;i<101;i++{
ch2 <- i
}
close(ch2)
}()
//開啟goroutine從ch2中接收值,并將該值的平方發(fā)送到ch3中
go func() {
for{
i,ok := <- ch2 //通道關(guān)閉后再取值ok=false
if !ok{
break
}
ch3 <- i*i
}
close(ch3)
}()
//在主goroutine中從ch3中接收值打印
for i:= range ch3{//通道關(guān)閉后退出for range循環(huán)
fmt.Println(i)
}
}
結(jié)果:
0
1
4
9
16
25
...
9604
9801
10000
Process finished with exit code 0
有的時候我們會將通道作為參數(shù)在多個任務(wù)函數(shù)間傳遞,很多時候我們在不同的任務(wù)函數(shù)中使用通道都會對其進行限制,比如限制通道在函數(shù)中只能發(fā)送或只能接收。
chan <- int是一個只寫單向通道(只能對其寫入int類型值),可以對其進行發(fā)送操作但不能執(zhí)行接收操作;
<- chan int是一個只讀單向通道(只能從通道讀取int類型值),可以對其執(zhí)行接收操作但不能執(zhí)行發(fā)送操作。
在函數(shù)傳參及任何賦值操作中,可以將雙向通道轉(zhuǎn)換為單向通道,但反過來是不可以的。
package main
import "fmt"
func counter(out chan <- int) {
for i:=0;i<101;i++{
out <- i
}
close(out)
}
func squarer(out chan <- int,in <- chan int) {
for i:= range in{
out <- i*i
}
close(out)
}
func printer(in <- chan int) {
for i:= range in{
fmt.Println(i)
}
}
func main() {
ch2 := make(chan int)
ch3 := make(chan int)
go counter(ch2)
go squarer(ch3, ch2)
printer(ch3)
}
結(jié)果:
0
1
4
9
16
25
...
9604
9801
10000
Process finished with exit code 0
在工作中,我們通常會使用可以指定啟動的goroutine數(shù)量-worker pool 模式,控制go routine的數(shù)量,防止go routine泄露和暴漲。
一個簡單的work pool 示例代碼如下:
package main
import (
"fmt"
"time"
)
func worker(id int,jobs <- chan int,results chan <- int ) {
for j:= range jobs{
fmt.Printf("worker:%d start job:%d\n", id, j)
time.Sleep(time.Second)
fmt.Printf("worker:%d end job:%d\n", id, j)
results <- j * 2
}
}
func main() {
jobs := make(chan int,100)
results := make(chan int,100)
//開啟3個goroutine
for w:=1;w<=3;w++{
go worker(w,jobs,results)
}
//5個任務(wù)
for j:=1;j<=5;j++{
jobs <- j
}
close(jobs)
// 輸出結(jié)果
for a := 1; a <= 5; a++ {
<-results
}
}
結(jié)果:
worker:1 start job:2
worker:3 start job:1
worker:2 start job:3
worker:1 end job:2
worker:3 end job:1
worker:3 start job:4
worker:2 end job:3
worker:1 start job:5
worker:1 end job:5
worker:3 end job:4
Process finished with exit code 0
在某些場景下,我們需要同時從多個通道接收數(shù)據(jù)。通道在接收數(shù)據(jù)時,如果沒有數(shù)據(jù)可以接收將會發(fā)生阻塞。
//可以使用遍歷方式,實現(xiàn)同時從多個通道中獲取數(shù)據(jù)
package main
import (
"fmt"
)
var ch2 chan int
var ch3 chan int
func main() {
ch2 = make(chan int, 100)
ch3 = make(chan int, 100)
go func() {
ch2 <- 10
close(ch2)
}()
go func() {
ch3 <- 11
close(ch3)
}()
for{
//從ch2接收值
c1,ok := <- ch2
if !ok{
fmt.Println("ch2數(shù)據(jù)取完了")
}
if c1!=0{
fmt.Println(c1)
}
//從ch3接收值
c2,ok := <- ch3
if !ok{
fmt.Println("ch3數(shù)據(jù)取完了")
break
}
fmt.Println(c2)
}
fmt.Println("操作完成!")
}
結(jié)果:
10
11
ch2數(shù)據(jù)取完了
ch3數(shù)據(jù)取完了
操作完成!
Process finished with exit code 0
//可以使用goroutine實現(xiàn)同時從多個通道中接收數(shù)據(jù)
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
var ch2 chan int
var ch3 chan int
func getFromCh2() {
defer wg.Done()
c1 := <- ch2
fmt.Println(c1)
}
func getFromCh3() {
defer wg.Done()
c2 := <- ch3
fmt.Println(c2)
}
func main() {
ch2 = make(chan int, 100)
ch3 = make(chan int, 100)
wg.Add(2)
go getFromCh2()
go getFromCh3()
go func() {
ch2 <- 10
}()
go func() {
ch3 <- 11
}()
wg.Wait()
fmt.Println("操作完成!")
}
結(jié)果:
11
10
操作完成!
Process finished with exit code 0
使用select關(guān)鍵字實現(xiàn)多個通道接收值的需求。
select的使用類似于switch語句,他有一系列case分支和一個默認分支。每個case會對應(yīng)一個通道的通信(接收或發(fā)送)過程。select會一直等待,直到某個case的通信操作完成時,就會執(zhí)行case對應(yīng)的語句。
格式如下:
select{
case <-ch2:
...
case data := <-ch3:
...
case ch4<-data:
...
default:
默認操作
}
select語句能提高代碼的可讀性。
1.可處理一個或多個channel的發(fā)送/接收操作。
2.如果多個case同時滿足,select會隨機選擇一個。
3.對于沒有case的select,會一直等待,可用于阻塞main函數(shù)。
package main
import "fmt"
func main() {
ch2 := make(chan int,1)
ch3 := make(chan int,1)
for i:=0;i<10;i++{
select {
case x1 := <- ch2:
fmt.Printf("循環(huán)第%d次,ch2取出%d:\n",i,x1)
case ch2 <- i:
fmt.Printf("循環(huán)第%d次,ch2存入:%d\n",i,i)
case x2 := <- ch3:
fmt.Printf("循環(huán)第%d次,ch3取出%d:\n",i,x2)
case ch3 <- i:
fmt.Printf("循環(huán)第%d次,ch3存入:%d\n",i,i)
}
}
}
結(jié)果:
循環(huán)第0次,ch2存入:0
循環(huán)第1次,ch2取出0:
循環(huán)第2次,ch2存入:2
循環(huán)第3次,ch2取出2:
循環(huán)第4次,ch2存入:4
循環(huán)第5次,ch2取出4:
循環(huán)第6次,ch2存入:6
循環(huán)第7次,ch3存入:7
循環(huán)第8次,ch3取出7:
循環(huán)第9次,ch2取出6:
Process finished with exit code 0
//上面的結(jié)果完美的體現(xiàn)出了 多個case同時滿足時,select會隨機選擇一個執(zhí)行。
有時候在Go代碼中會存在多個goroutine同時操作一個資源(臨界區(qū)),這種情況會發(fā)生竟態(tài)問題(數(shù)據(jù)竟態(tài))。
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
var x int64
func add() {
for i:=0;i<5000;i++{
x=x+1
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
結(jié)果1:
7281
Process finished with exit code 0
結(jié)果2:
10000
Process finished with exit code 0
//上面的代碼中,我們開啟了兩個goroutine去累加變量x的值,這兩個goroutine在訪問和修改x變量的時候就會存在數(shù)據(jù)競爭,導致最后結(jié)果與期待的不符。
互斥鎖是一種常用的控制共享資源訪問的方法,它能夠保證同時只有一個goroutine可以訪問共享資源。
Go語言中使用sync包的Mutex類型來實現(xiàn)互斥鎖。使用互斥鎖來修復(fù)上面代碼的問題:
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
var lock sync.Mutex
var x int64
func add() {
for i:=0;i<5000;i++{
lock.Lock()//加鎖
x=x+1
lock.Unlock()//解鎖
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
結(jié)果:
10000
Process finished with exit code 0
使用互斥鎖能夠保證同一時間有且只有一個goroutine進入臨界區(qū),其他的goroutine則在等待鎖;
當互斥鎖釋放后,等待的goroutine才能獲取鎖進入臨界區(qū),多個goroutine同時等待一個鎖時,喚醒策略是隨機的。
互斥鎖是完全互斥的,但是有很多實際的場景下是讀多寫少的,當我們并發(fā)的去讀取一個資源不涉及資源修改的時候是沒有必要加鎖的。
這種場景下使用讀寫鎖時更好的一種選擇。
讀寫鎖分為兩種:讀鎖和寫鎖。
當一個goroutine獲取讀鎖后,其他的goroutine可以繼續(xù)獲取讀鎖,獲取寫鎖會等待;
當一個goroutine獲取寫鎖后,其他的goroutine獲取讀鎖,寫鎖都會等待。
讀寫鎖適合讀多寫少的場景,如果讀寫操作量差別不大,讀寫鎖的優(yōu)勢就發(fā)揮不出來了。
package main
import (
"fmt"
"sync"
"time"
)
var (
x int64
wg sync.WaitGroup
lock sync.Mutex
rwlock sync.RWMutex
)
func write() {
lock.Lock() // 加互斥鎖
//rwlock.Lock() // 加寫鎖
x = x + 1
time.Sleep(10 * time.Millisecond) // 假設(shè)讀操作耗時10毫秒
//rwlock.Unlock() // 解寫鎖
lock.Unlock() // 解互斥鎖
wg.Done()
}
func read() {
lock.Lock() // 加互斥鎖
//rwlock.RLock() // 加讀鎖
time.Sleep(time.Millisecond) // 假設(shè)讀操作耗時1毫秒
//rwlock.RUnlock() // 解讀鎖
lock.Unlock() // 解互斥鎖
wg.Done()
}
func main() {
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
for i := 0; i < 1000; i++ {
wg.Add(1)
go read()
}
wg.Wait()
end := time.Now()
fmt.Println(end.Sub(start))
}
互斥鎖的時間:
1.404974744s
Process finished with exit code 0
讀寫互斥鎖的時間:
109.371376ms
Process finished with exit code 0
在代碼中生硬的使用time.Sleep是不合適的,Go語言中可以使用sync.WaitGoup來實現(xiàn)并發(fā)任務(wù)的同步。
sync.WaitGroup有以下幾個方法:
sync.WaitGroup內(nèi)部維護著一個計數(shù)器,計數(shù)器的值可以增加和減少。
例如
我們啟動了N個并發(fā)任務(wù)時,就使用Add(N)將計數(shù)器值增加N。
每個任務(wù)完成時,調(diào)用Done(),會將計數(shù)器減1。
調(diào)用Wait()來等待并發(fā)任務(wù)執(zhí)行完。
當計數(shù)器值為0時,表示所有并發(fā)任務(wù)已經(jīng)完成。
sync.WaitGroup是一個結(jié)構(gòu)體,傳遞的時候要傳遞指針。
var wg sync.WaitGroup
func hello() {
defer wg.Done()
fmt.Println("Hello Goroutine!")
}
func main() {
wg.Add(1)
go hello() // 啟動另外一個goroutine去執(zhí)行hello函數(shù)
fmt.Println("main goroutine done!")
wg.Wait()
}
在編程的很多場景下我們需要確保某些操作在高并發(fā)的場景下只執(zhí)行一次,例如:只加載一次配置文件、只關(guān)閉一次通道等。
Go語言中的sync包提供了一個針對只執(zhí)行一次場景的解決方案-sync.Once。
sync.Onece只有一個Do方法,
func (o *Once) Do(f func()) {}
如果要執(zhí)行的函數(shù)f需要傳遞參數(shù),需要搭配閉包來使用。
延遲一個開銷很大的初始化操作到真正用到它的時候在執(zhí)行是一個很好地實踐。
因為預(yù)先初始化一個變量(比如在Init函數(shù)中完成初始化)會增加程序的啟動耗時,而且有可能實際執(zhí)行過程中這個變量沒有用上,那么這個初始化操作就不是必須要做的。
看下面的例子:
var icons map[string]image.Image
func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Icon 被多個goroutine調(diào)用時不是并發(fā)安全的
func Icon(name string) image.Image {
if icons == nil {
loadIcons()
}
return icons[name]
}
多個goroutine并發(fā)調(diào)用Icon函數(shù)時不是并發(fā)安全的,現(xiàn)代的編譯器和CPU在保證每個goroutine都滿足串行一致的基礎(chǔ)上,自由的重排訪問內(nèi)存的順序。
loadIcons函數(shù)可能被重排為以下結(jié)果:
func loadIcons() {
icons = make(map[string]image.Image)
icons["left"] = loadIcon("left.png")
icons["up"] = loadIcon("up.png")
icons["right"] = loadIcon("right.png")
icons["down"] = loadIcon("down.png")
}
在這種情況下就會出現(xiàn),即使判斷了icons不是nil,也不意味著變量初始化完成了。
考慮到這種情況,我們能想到的辦法一:可以添加互斥鎖;方法二:使用sync.Once。
var icons map[string]image.Image
var loadIconsOnce sync.Once
func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Icon 是并發(fā)安全的
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}
package singleton
import (
"sync"
)
type singleton struct {}
var instance *singleton
var once sync.Once
func GetInstance() *singleton {
once.Do(func() {
instance = &singleton{}
})
return instance
}
sync.Once其實內(nèi)部包含一個互斥鎖和一個布爾值,互斥鎖保證布爾值和數(shù)據(jù)的安全,而布爾值用來記錄初始化是否完成。
這樣設(shè)計就能保證初始化操作的時候是并發(fā)安全的,并且初始化操作也不會被執(zhí)行多次。
Go語言中內(nèi)置的map不是并發(fā)安全的。
package main
import (
"fmt"
"strconv"
"sync"
)
var m = make(map[string]int)
func get(key string)int {
return m[key]
}
func set(key string,value int) {
m[key] = value
}
func main() {
wg := sync.WaitGroup{}
for i:=0;i<20;i++{
wg.Add(1)
go func(n int) {
key := strconv.Itoa(i)
set(key,i)
fmt.Printf("k=:%v,v:=%v\n", key, get(key))
wg.Done()
}(i)
}
wg.Wait()
}
結(jié)果:
fatal error: concurrent map writes
goroutine 6 [running]:
/usr/local/go/src/runtime/panic.go:617 +0x72 fp=0xc0000326b8 sp=0xc000032688 pc=0x1028282
runtime.mapassign_faststr(0x10aca40, 0xc000060180, 0x10cd3a2, 0x1, 0x0)
/usr/local/go/src/runtime/map_faststr.go:211 +0x42a fp=0xc000032720 sp=0xc0000326b8 pc=0x101031a
main.set(...)
/Users/tongchao/Desktop/gopath/src/test/test.go:15
main.main.func1(0xc000014080, 0xc000014070, 0x2)
/Users/tongchao/Desktop/gopath/src/test/test.go:23 +0x8e fp=0xc0000327c8 sp=0xc000032720 pc=0x1094fee
runtime.goexit()
/usr/local/go/src/runtime/asm_amd64.s:1337 +0x1 fp=0xc0000327d0 sp=0xc0000327c8 pc=0x1051451
created by main.main
/Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2
goroutine 1 [runnable]:
sync.(*WaitGroup).Add(0xc000014070, 0x1)
/usr/local/go/src/sync/waitgroup.go:53 +0x13c
main.main()
/Users/tongchao/Desktop/gopath/src/test/test.go:20 +0x6e
goroutine 4 [runnable]:
main.get(...)
/Users/tongchao/Desktop/gopath/src/test/test.go:12
main.main.func1(0xc000014080, 0xc000014070, 0x0)
/Users/tongchao/Desktop/gopath/src/test/test.go:24 +0xcc
created by main.main
/Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2
goroutine 5 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0x1)
/Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
/Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2
goroutine 7 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0x3)
/Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
/Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2
goroutine 8 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0x4)
/Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
/Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2
goroutine 9 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0x5)
/Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
/Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2
goroutine 10 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0x6)
/Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
/Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2
goroutine 11 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0x7)
/Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
/Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2
goroutine 12 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0x8)
/Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
/Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2
goroutine 13 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0x9)
/Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
/Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2
goroutine 14 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0xa)
/Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
/Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2
goroutine 15 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0xb)
/Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
/Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2
goroutine 16 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0xc)
/Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
/Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2
Process finished with exit code 2
Go語言的sync包中提供了一開箱即用的并發(fā)安全的map-sync.Map。
開箱即用表示不用像內(nèi)置的map一樣使用make函數(shù)初始化就能直接使用。
同時sync.Map內(nèi)置了諸如Store、Load、LoadOrStore、Delete、Range等操作方法。
package main
import (
"fmt"
"strconv"
"sync"
)
var m = sync.Map{}
func main() {
wg := sync.WaitGroup{}
for i:=0;i<20;i++{
wg.Add(1)
go func() {
key := strconv.Itoa(i)
m.Store(key,i)
value,_ := m.Load(key)
fmt.Printf("k=:%v,v:=%v\n", key, value)
wg.Done()
}()
}
wg.Wait()
}
結(jié)果:
k=:8,v:=8
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:8,v:=20
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:8,v:=8
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
Process finished with exit code 0
代碼中的加鎖操作因為涉及內(nèi)核態(tài)的上下文切換會比較耗時、代價比較高。
針對"基本數(shù)據(jù)類型",我們可以使用原子操作來保證并發(fā)安全,因為原子操作是Go 語言提供的方法,在用戶態(tài)就可以完成,因此性能比加鎖操作更好。
Go語言中原子操作由內(nèi)置的標準庫sync/atomic提供。
atomic包提供了底層的原子級內(nèi)存操作,對于同步算法的實現(xiàn)很有用。這些函數(shù)必須謹慎地保證正確使用。除了某些特殊的底層應(yīng)用,使用通道或者sync包的函數(shù)/類型實現(xiàn)同步更好。
一個示例來比較下互斥鎖和原子操作的性能。
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
type Counter interface {
Inc()
Load() int64
}
// 普通版
type CommonCounter struct {
counter int64
}
func (c CommonCounter) Inc() {
c.counter++
}
func (c CommonCounter) Load() int64 {
return c.counter
}
// 互斥鎖版
type MutexCounter struct {
counter int64
lock sync.Mutex
}
func (m *MutexCounter) Inc() {
m.lock.Lock()
defer m.lock.Unlock()
m.counter++
}
func (m *MutexCounter) Load() int64 {
m.lock.Lock()
defer m.lock.Unlock()
return m.counter
}
// 原子操作版
type AtomicCounter struct {
counter int64
}
func (a *AtomicCounter) Inc() {
atomic.AddInt64(&a.counter, 1)
}
func (a *AtomicCounter) Load() int64 {
return atomic.LoadInt64(&a.counter)
}
func test(c Counter) {
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
c.Inc()
wg.Done()
}()
}
wg.Wait()
end := time.Now()
fmt.Println(c.Load(), end.Sub(start))
}
func main() {
c1 := CommonCounter{} // 非并發(fā)安全
test(c1)
c2 := MutexCounter{} // 使用互斥鎖實現(xiàn)并發(fā)安全
test(&c2)
c3 := AtomicCounter{} // 并發(fā)安全且比互斥鎖效率更高
test(&c3)
}
結(jié)果:
0 1.099595ms
1000 907.118μs
1000 456.326μs
Process finished with exit code 0
以上就是Golang中g(shù)oroutine和channel的使用方法介紹,看完之后是否有所收獲呢?如果想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊!
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。