您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“如何理解Go中的Channel源碼”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
package main import "fmt" func main() { c := make(chan int) go func() { c <- 1 // send to channel }() x := <-c // recv from channel fmt.Println(x) }
我們可以這樣查看匯編結(jié)果:
go tool compile -N -l -S hello.go -N表示禁用優(yōu)化 -l禁用內(nèi)聯(lián) -S打印結(jié)果
通過上面這樣的方式,我們可以直到chan是調(diào)用的哪些函數(shù):
type hchan struct { qcount uint // 循環(huán)列表元素個(gè)數(shù) dataqsiz uint // 循環(huán)隊(duì)列的大小 buf unsafe.Pointer // 循環(huán)隊(duì)列的指針 elemsize uint16 // chan中元素的大小 closed uint32 // 是否已close elemtype *_type // chan中元素類型 sendx uint // send在buffer中的索引 recvx uint // recv在buffer中的索引 recvq waitq // receiver的等待隊(duì)列 sendq waitq // sender的等待隊(duì)列 // 互拆鎖 lock mutex }
qcount代表chan 中已經(jīng)接收但還沒被取走的元素的個(gè)數(shù),函數(shù) len 可以返回這個(gè)字段的值;
dataqsiz和buf分別代表隊(duì)列buffer的大小,cap函數(shù)可以返回這個(gè)字段的值以及隊(duì)列buffer的指針,是一個(gè)定長的環(huán)形數(shù)組;
elemtype 和 elemsiz表示chan 中元素的類型和 元素的大小;
sendx:發(fā)送數(shù)據(jù)的指針在 buffer中的位置;
recvx:接收請求時(shí)的指針在 buffer 中的位置;
recvq和sendq分別表示等待接收數(shù)據(jù)的 goroutine 與等待發(fā)送數(shù)據(jù)的 goroutine;
sendq和recvq的類型是waitq的結(jié)構(gòu)體:
type waitq struct { first *sudog last *sudog }
waitq里面連接的是一個(gè)sudog雙向鏈表,保存的是等待的goroutine 。整個(gè)chan的圖例大概是這樣:
下面看一下創(chuàng)建chan,我們通過匯編結(jié)果也可以查看到make(chan int)
這句代碼會調(diào)用到runtime的makechan函數(shù)中:
const ( maxAlign = 8 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) ) func makechan(t *chantype, size int) *hchan { elem := t.elem // 略去檢查代碼 ... //計(jì)算需要分配的buf空間 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } var c *hchan switch { case mem == 0: // chan的size或者元素的size是0,不必創(chuàng)建buf c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector c.buf = c.raceaddr() case elem.ptrdata == 0: // 元素不是指針,分配一塊連續(xù)的內(nèi)存給hchan數(shù)據(jù)結(jié)構(gòu)和buf c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) // 表示hchan后面在內(nèi)存里緊跟著就是buf c.buf = add(unsafe.Pointer(c), hchanSize) default: // 元素包含指針,那么單獨(dú)分配buf c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) return c }
首先我們可以看到計(jì)算hchanSize:
maxAlign = 8 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
maxAlign是8,那么maxAlign-1的二進(jìn)制就是111,然后和int(unsafe.Sizeof(hchan{}))取與就是取它的低三位,hchanSize就得到的是8的整數(shù)倍,做對齊使用。
這里switch有三種情況,第一種情況是緩沖區(qū)所需大小為 0,那么在為 hchan 分配內(nèi)存時(shí),只需要分配 sizeof(hchan) 大小的內(nèi)存;
第二種情況是緩沖區(qū)所需大小不為 0,而且數(shù)據(jù)類型不包含指針,那么就分配連續(xù)的內(nèi)存。注意的是,我們在創(chuàng)建channel的時(shí)候可以指定類型為指針類型:
//chan里存入的是int的指針 c := make(chan *int) //chan里存入的是int的值 c := make(chan int)
第三種情況是緩沖區(qū)所需大小不為 0,而且數(shù)據(jù)類型包含指針,那么就不使用add的方式讓hchan和buf放在一起了,而是單獨(dú)的為buf申請一塊內(nèi)存。
在看發(fā)送數(shù)據(jù)的代碼之前,我們先看一下什么是channel的阻塞和非阻塞。
一般情況下,傳入的參數(shù)都是 block=true
,即阻塞調(diào)用,一個(gè)往 channel 中插入數(shù)據(jù)的 goroutine 會阻塞到插入成功為止。
非阻塞是只這種情況:
select { case c <- v: ... foo default: ... bar }
編譯器會將其改為:
if selectnbsend(c, v) { ... foo } else { ... bar }
selectnbsend方法傳入的block就是false:
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(c, elem, false, getcallerpc()) }
向通道發(fā)送數(shù)據(jù)我們通過匯編結(jié)果可以發(fā)現(xiàn)是在runtime 中通過 chansend 實(shí)現(xiàn)的,方法比較長下面我們分段來進(jìn)行理解:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { // 對于非阻塞的發(fā)送,直接返回 if !block { return false } // 對于阻塞的通道,將 goroutine 掛起 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } ... }
這里會對chan做一個(gè)判斷,如果它是空的,那么對于非阻塞的發(fā)送,直接返回 false;對于阻塞的通道,將 goroutine 掛起,并且永遠(yuǎn)不會返回。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // 非阻塞的情況下,如果通道沒有關(guān)閉,滿足以下一條: // 1.沒有緩沖區(qū)并且當(dāng)前沒有接收者 // 2.緩沖區(qū)不為0,并且已滿 if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false } ... }
需要注意的是這里是沒有加鎖的,go雖然在使用指針讀取單個(gè)值的時(shí)候原子性的,但是讀取多個(gè)值并不能保證,所以在判斷完closed雖然是沒有關(guān)閉的,那么在讀取完之后依然可能在這一瞬間從未關(guān)閉狀態(tài)轉(zhuǎn)變成關(guān)閉狀態(tài)。那么就有兩種可能:
通道沒有關(guān)閉,而且已經(jīng)滿了,那么需要返回false,沒有問題;
通道關(guān)閉,而且已經(jīng)滿了,但是在非阻塞的發(fā)送中返回false,也沒有問題;
有關(guān)go的一致性原語,可以看這篇:The Go Memory Model。
上面的這些判斷被稱為 fast path,因?yàn)榧渔i的操作是一個(gè)很重的操作,所以能夠在加鎖之前返回的判斷就在加鎖之前做好是最好的。
下面接著看看加鎖部分的代碼:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... //加鎖 lock(&c.lock) // 是否關(guān)閉的判斷 if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } // 從 recvq 中取出一個(gè)接收者 if sg := c.recvq.dequeue(); sg != nil { // 如果接收者存在,直接向該接收者發(fā)送數(shù)據(jù),繞過buffer send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } ... }
進(jìn)入了lock區(qū)域之后還需要再判斷以下close的狀態(tài),然后從recvq 中取出一個(gè)接收者,如果已經(jīng)有接收者,那么就向第一個(gè)接收者發(fā)送當(dāng)前enqueue的消息。這里需要注意的是如果有接收者在隊(duì)列中等待,則說明此時(shí)的緩沖區(qū)是空的。
既然是一行行分析代碼,那么我們再進(jìn)入到send看一下實(shí)現(xiàn):
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { ... if sg.elem != nil { // 直接把要發(fā)送的數(shù)據(jù)copy到reciever的棧空間 sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } // 喚醒對應(yīng)的 goroutine goready(gp, skip+1) }
在send方法里,sg就是goroutine打包好的對象,ep是對應(yīng)要發(fā)送數(shù)據(jù)的指針,sendDirect方法會調(diào)用memmove進(jìn)行數(shù)據(jù)的內(nèi)存拷貝。然后goready函數(shù)會喚醒對應(yīng)的 goroutine進(jìn)行調(diào)度。
回到chansend方法,繼續(xù)往下看:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // 如果緩沖區(qū)沒有滿,直接將要發(fā)送的數(shù)據(jù)復(fù)制到緩沖區(qū) if c.qcount < c.dataqsiz { // 找到buf要填充數(shù)據(jù)的索引位置 qp := chanbuf(c, c.sendx) ... // 將數(shù)據(jù)拷貝到 buffer 中 typedmemmove(c.elemtype, qp, ep) // 數(shù)據(jù)索引前移,如果到了末尾,又從0開始 c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } // 元素個(gè)數(shù)加1,釋放鎖并返回 c.qcount++ unlock(&c.lock) return true } ... }
這里會判斷buf緩沖區(qū)有沒有滿,如果沒有滿,那么就找到buf要填充數(shù)據(jù)的索引位置,調(diào)用typedmemmove方法將數(shù)據(jù)拷貝到buf中,然后重新設(shè)值sendx偏移量。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // 緩沖區(qū)沒有空間了,所以對于非阻塞調(diào)用直接返回 if !block { unlock(&c.lock) return false } // 創(chuàng)建 sudog 對象 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil // 將sudog 對象入隊(duì) c.sendq.enqueue(mysg) // 進(jìn)入等待狀態(tài) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) ... }
這里會做兩部分的操作,對于非阻塞的調(diào)用會直接返回;對于阻塞的調(diào)用會創(chuàng)建sudog 對象,然后將sudog對象入隊(duì)之后gopark將 goroutine 轉(zhuǎn)入 waiting 狀態(tài),并解鎖。調(diào)用gopark之后,在使用者看來該向 channel 發(fā)送數(shù)據(jù)的代碼語句會進(jìn)行阻塞。
這里也需要注意一下,如果緩沖區(qū)為0,那么也會進(jìn)入到這里,會調(diào)用到gopark立馬阻塞,所以在使用的時(shí)候需要記得接收數(shù)據(jù),防止向chan發(fā)送數(shù)據(jù)的那一端永遠(yuǎn)阻塞,如:
func process(timeout time.Duration) bool { ch := make(chan bool) go func() { // 模擬處理耗時(shí)的業(yè)務(wù) time.Sleep((timeout + time.Second)) ch <- true // block fmt.Println("exit goroutine") }() select { case result := <-ch: return result case <-time.After(timeout): return false } }
如果這里在select的時(shí)候直接timeout返回了,而沒有調(diào)用 result := <-ch
,那么goroutine 就會永遠(yuǎn)阻塞。
到這里發(fā)送的代碼就講解完了,整個(gè)流程大致如下:
比如我要執(zhí)行:ch<-10
檢查 recvq 是否為空,如果不為空,則從 recvq 頭部取一個(gè) goroutine,將數(shù)據(jù)發(fā)送過去;
如果 recvq 為空,,并且buf沒有滿,則將數(shù)據(jù)放入到 buf中;
如果 buf已滿,則將要發(fā)送的數(shù)據(jù)和當(dāng)前 goroutine 打包成sudog,然后入隊(duì)到sendq隊(duì)列中,并將當(dāng)前 goroutine 置為 waiting 狀態(tài)進(jìn)行阻塞。
從chan獲取數(shù)據(jù)實(shí)現(xiàn)函數(shù)為 chanrecv。下面我們看一下代碼實(shí)現(xiàn):
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... if c == nil { // 如果 c 為空且是非阻塞調(diào)用,那么直接返回 (false,false) if !block { return } // 阻塞調(diào)用直接等待 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // 對于非阻塞的情況,并且沒有關(guān)閉的情況 // 如果是無緩沖chan或者是chan中沒有數(shù)據(jù),那么直接返回 (false,false) if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 { return } // 上鎖 lock(&c.lock) // 如果已經(jīng)關(guān)閉,并且chan中沒有數(shù)據(jù),返回 (true,false) if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } ... }
chanrecv方法和chansend方法是一樣的,首先也是做非空判斷,如果chan沒有初始化,那么如果是非阻塞調(diào)用,那么直接返回 (false,false),阻塞調(diào)用會直接等待;
下面的兩個(gè)if判斷我放在一起來進(jìn)行講解,因?yàn)檫@里和chansend是不一樣的,chanrecv要根據(jù)不同條件需要返回不同的結(jié)果。
在上鎖之前的判斷是邊界條件的判斷:如果是非阻塞調(diào)用會判斷chan沒有發(fā)送方(dataqsiz為空且發(fā)送隊(duì)列為空),或chan的緩沖為空(dataqsiz>0 并且qcount==0)并且chan是沒有close,那么需要返回 (false,false);而chan已經(jīng)關(guān)閉了,并且buf中沒有數(shù)據(jù),需要返回 (true,false);
為了實(shí)現(xiàn)這個(gè)需求,所以在chanrecv方法里面邊界條件的判斷都使用atomic方法進(jìn)行了獲取。
因?yàn)樾枰_的得到chan已關(guān)閉,并且 buf 空會返回 (true, false),而不是 (false,false),所以在lock上鎖之前需要使用atomic來獲取參數(shù)防止重排序(Happens Before),因此必須使此處的 qcount 和 closed 的讀取操作的順序通過原子操作得到順序保障。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... // 從發(fā)送者隊(duì)列獲取數(shù)據(jù) if sg := c.sendq.dequeue(); sg != nil { // 發(fā)送者隊(duì)列不為空,直接從發(fā)送者那里提取數(shù)據(jù) recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } ... } func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 如果是無緩沖區(qū)chan if c.dataqsiz == 0 { ... if ep != nil { // 直接從發(fā)送者拷貝數(shù)據(jù) recvDirect(c.elemtype, sg, ep) } // 有緩沖區(qū)chan } else { // 獲取buf的存放數(shù)據(jù)指針 qp := chanbuf(c, c.recvx) ... // 直接從緩沖區(qū)拷貝數(shù)據(jù)給接收者 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 從發(fā)送者拷貝數(shù)據(jù)到緩沖區(qū) typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } // 將發(fā)送者喚醒 goready(gp, skip+1) }
在這里如果有發(fā)送者在隊(duì)列等待,那么直接從發(fā)送者那里提取數(shù)據(jù),并且喚醒這個(gè)發(fā)送者。需要注意的是由于有發(fā)送者在等待,所以如果有緩沖區(qū),那么緩沖區(qū)一定是滿的。
在喚醒發(fā)送者之前需要對緩沖區(qū)做判斷,如果是無緩沖區(qū),那么直接從發(fā)送者那里提取數(shù)據(jù);如果有緩沖區(qū)首先會獲取recvx的指針,然后將從緩沖區(qū)拷貝數(shù)據(jù)給接收者,再將發(fā)送者數(shù)據(jù)拷貝到緩沖區(qū)。
然后將recvx加1,相當(dāng)于將新的數(shù)據(jù)移到了隊(duì)尾,再將recvx的值賦值給sendx,最后調(diào)用goready將發(fā)送者喚醒,這里有些繞,我們通過圖片來展示:
這里展示的是在chansend中將數(shù)據(jù)拷貝到緩沖區(qū)中,當(dāng)數(shù)據(jù)滿的時(shí)候會將sendx的指針置為0,所以當(dāng)buf環(huán)形隊(duì)列是滿的時(shí)候sendx等于recvx。
然后再來看看chanrecv中發(fā)送者隊(duì)列有數(shù)據(jù)的時(shí)候移交緩沖區(qū)的數(shù)據(jù)是怎么做的:
這里會將recvx為0處的數(shù)據(jù)直接從緩存區(qū)拷貝數(shù)據(jù)給接收者,然后將發(fā)送者拷貝數(shù)據(jù)到緩沖區(qū)recvx指針處,然后將recvx指針加1并將recvx賦值給sendx,由于是滿的所以用recvx加1的效果實(shí)現(xiàn)了將新加入的數(shù)據(jù)入庫到隊(duì)尾的操作。
接著往下看:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... // 如果緩沖區(qū)中有數(shù)據(jù) if c.qcount > 0 { qp := chanbuf(c, c.recvx) ... // 從緩沖區(qū)復(fù)制數(shù)據(jù)到 ep if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) // 接收數(shù)據(jù)的指針前移 c.recvx++ // 環(huán)形隊(duì)列,如果到了末尾,再從0開始 if c.recvx == c.dataqsiz { c.recvx = 0 } // 緩沖區(qū)中現(xiàn)存數(shù)據(jù)減一 c.qcount-- unlock(&c.lock) return true, true } ... }
到了這里,說明緩沖區(qū)中有數(shù)據(jù),但是發(fā)送者隊(duì)列沒有數(shù)據(jù),那么將數(shù)據(jù)拷貝到接收數(shù)據(jù)的協(xié)程,然后將接收數(shù)據(jù)的指針前移,如果已經(jīng)到了隊(duì)尾,那么就從0開始,最后將緩沖區(qū)中現(xiàn)存數(shù)據(jù)減一并解鎖。
下面就是緩沖區(qū)中沒有數(shù)據(jù)的情況:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... // 非阻塞,直接返回 if !block { unlock(&c.lock) return false, false } // 創(chuàng)建sudog gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil // 將sudog添加到接收隊(duì)列中 c.recvq.enqueue(mysg) // 阻塞住goroutine,等待被喚醒 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) ... }
如果是非阻塞調(diào)用,直接返回;阻塞調(diào)用會將當(dāng)前goroutine 封裝成sudog,然后將sudog添加到接收隊(duì)列中,調(diào)用gopark阻塞住goroutine,等待被喚醒。
關(guān)閉通道會調(diào)用到closechan方法:
func closechan(c *hchan) { // 1. 校驗(yàn)chan是否已初始化 if c == nil { panic(plainError("close of nil channel")) } // 加鎖 lock(&c.lock) // 如果已關(guān)閉了,那么不能被再次關(guān)閉 if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } ... // 設(shè)置chan已關(guān)閉 c.closed = 1 // 申明一個(gè)存放g的list,用于存放在等待隊(duì)列中的groutine var glist gList // 2. 獲取所有接收者 for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } // 加入隊(duì)列中 glist.push(gp) } // 獲取所有發(fā)送者 for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } // 加入隊(duì)列中 glist.push(gp) } unlock(&c.lock) // 3.喚醒所有的glist中的goroutine for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }
這個(gè)方法首先會校驗(yàn)chan是否已被初始化,然后加鎖之后再校驗(yàn)是否已被關(guān)閉過,如果校驗(yàn)都通過了,那么將closed字段設(shè)值為1;
遍歷所有的接收者和發(fā)送者,并將其goroutine 加入到glist中;
將所有g(shù)list中的goroutine加入調(diào)度隊(duì)列,等待被喚醒,這里需要注意的是發(fā)送者在被喚醒之后會panic;
“如何理解Go中的Channel源碼”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。