您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)go如何實(shí)現(xiàn)無(wú)限buffer的channel方法,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。
總所周知,go
里面只有兩種 channel
,一種是 unbuffered channel, 其聲明方式為
ch := make(chan interface{})
另一種是 buffered channel,其聲明方式為
bufferSize := 5 ch := make(chan interface{},bufferSize)
對(duì)于一個(gè) buffered channel,無(wú)論它的 buffer 有多大,它終究是有極限的。這個(gè)極限就是該 channel 最初被 make 時(shí),所指定的 bufferSize 。
jojo,buffer channel 的大小是有極限的,我不做 channel 了。
一旦 channel
滿了的話,再往里面添加元素的話,將會(huì)阻塞。
so how can we make a infinite buffer channel?
本文參考了 medinum 上面的一篇文章,有興趣的同學(xué)可以直接閱讀原文。
首先當(dāng)然是建一個(gè) struct
,在百度翻譯的幫助下,我們將這個(gè) struct
取名為 InfiniteChannel
type InfiniteChannel struct { }
思考一下 channel
的核心行為,實(shí)際上就兩個(gè),一個(gè)流入(Fan in),一個(gè)流出(Fan out),因此我們添加如下幾個(gè) method。
func (c *InfiniteChannel) In(val interface{}) { // todo } func (c *InfiniteChannel) Out() interface{} { // todo }
通過 In()
接收的數(shù)據(jù),總得需要一個(gè)地方來存放。我們可以用一個(gè) slice
來存放,就算用 In()
往里面添加了很多元素,也可以通過 append()
來拓展 slice
,slice
的容量可以無(wú)限拓展下去(內(nèi)存足夠的話),所以 channel
也是 infinite
。 InfiniteChannel
的第一個(gè)成員就這么敲定下來的。
type InfiniteChannel struct { data []interface{} }
用戶調(diào)用 In()
和 Out()
時(shí),可能是并發(fā)的環(huán)境,在 go
中如何進(jìn)行并發(fā)編程,最容易想到的肯定是 channel
了,因此我們?cè)趦?nèi)部準(zhǔn)備兩個(gè) channel
,一個(gè) inChan
,一個(gè) outChan
,用 inChan
來接收數(shù)據(jù),用 outChan
來流出數(shù)據(jù)。
type InfiniteChannel struct { inChan chan interface{} outChan chan interface{} data []interface{} } func (c *InfiniteChannel) In(val interface{}) { c.inChan <- val } func (c *InfiniteChannel) Out() interface{} { return <-c.outChan }
其中, inChan
和 outChan
都是 unbuffered channel。
此外,也肯定是需要一個(gè) select
來處理來自 inChan
和 outChan
身上的事件。因此我們另起一個(gè)協(xié)程,在里面做 select
操作。
func (c *InfiniteChannel) background() { for true { select { case newVal := <-c.inChan: c.data = append(c.data, newVal) case c.outChan <- c.pop(): // pop() 將取出隊(duì)列的首個(gè)元素 } } } func NewInfiniteChannel() *InfiniteChannel { c := &InfiniteChannel{ inChan: make(chan interface{}), outChan: make(chan interface{}), } go c.background() // 注意這里另起了一個(gè)協(xié)程 return c }
ps:感覺這也算是 go 并發(fā)編程的一個(gè)套路了。即
在 new struct 的時(shí)候,順手 go 一個(gè) select 協(xié)程,select 協(xié)程內(nèi)執(zhí)行一個(gè) for 循環(huán),不停的 select,監(jiān)聽一個(gè)或者多個(gè) channel 的事件。
struct 對(duì)外提供的 method,只會(huì)操作 struct 內(nèi)的 channel(在本例中就是 inChan 和 outChan),不會(huì)操作 struct 內(nèi)的其他數(shù)據(jù)(在本例中,In() 和 Out() 都沒有直接操作 data)。
觸發(fā) channel 的事件后,由 select 協(xié)程進(jìn)行數(shù)據(jù)的更新(在本例中就是 data )。因?yàn)橹挥?select 協(xié)程對(duì)除 channel 外的數(shù)據(jù)成員進(jìn)行讀寫操作,且 go 保證了對(duì)于 channel 的并發(fā)讀寫是安全的,所以代碼是并發(fā)安全的。
如果 struct 是 exported ,用戶或許會(huì)越過 new ,直接手動(dòng) make 一個(gè) struct,可以考慮將 struct 設(shè)置為 unexported,把它的首字母小寫即可。
pop()
的實(shí)現(xiàn)也非常簡(jiǎn)單。
// 取出隊(duì)列的首個(gè)元素,如果隊(duì)列為空,將會(huì)返回一個(gè) nil func (c *InfiniteChannel) pop() interface{} { if len(c.data) == 0 { return nil } val := c.data[0] c.data = c.data[1:] return val }
用一個(gè)協(xié)程每秒鐘生產(chǎn)一條數(shù)據(jù),另一個(gè)協(xié)程每半秒消費(fèi)一條數(shù)據(jù),并打印。
func main() { c := NewInfiniteChannel() go func() { for i := 0; i < 20; i++ { c.In(i) time.Sleep(time.Second) } }() for i := 0; i < 50; i++ { val := c.Out() fmt.Print(val) time.Sleep(time.Millisecond * 500) } }
// out <nil>0<nil>1<nil>23<nil>4<nil><nil>5<nil>67<nil><nil>89<nil><nil>1011<nil>12<nil>13<nil>14<nil>15<nil>16<nil>17<nil><nil>1819<nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil> Process finished with the exit code 0
可以看到,將 InfiniteChannel
內(nèi)沒有數(shù)據(jù)可供消費(fèi)時(shí),調(diào)用 Out()
將會(huì)返回一個(gè) nil
,不過這也在我們的意料之中,原因是 pop()
在隊(duì)列為空時(shí),將會(huì)返回 nil。
目前 InfiniteChannel
的行為與標(biāo)準(zhǔn)的 channel
的行為是有出入的,go
中的 channel
,在沒有數(shù)據(jù)卻仍要取數(shù)據(jù)時(shí)會(huì)被阻塞,如何實(shí)現(xiàn)這個(gè)效果?
我認(rèn)為此處是是整篇文章最有技巧的地方,我第一次看到時(shí)忍不住拍案叫絕。
首先把原來的 background()
摘出來
func (c *InfiniteChannel) background() { for true { select { case newVal := <-c.inChan: c.data = append(c.data, newVal) case c.outChan <- c.pop(): } } }
對(duì) outChan
進(jìn)行一個(gè)簡(jiǎn)單封裝
func (c *InfiniteChannel) background() { for true { select { case newVal := <-c.inChan: c.data = append(c.data, newVal) case c.outChanWrapper() <- c.pop(): } } } func (c *InfiniteChannel) outChanWrapper() chan interface{} { return c.outChan }
目前為止,一切照舊。
點(diǎn)睛之筆來了:
func (c *InfiniteChannel) outChanWrapper() chan interface{} { if len(c.data) == 0 { return nil } return c.outChan }
在 c.data
為空的時(shí)候,返回一個(gè) nil
在 background()
中,當(dāng)執(zhí)行到 case c.outChan <- c.pop():
時(shí),實(shí)際上將會(huì)變成:
case nil <- nil:
在 go
中,是無(wú)法往一個(gè) nil
的 channel
中發(fā)送元素的。例如
func main() { var c chan interface{} select { case c <- 1: } } // fatal error: all goroutines are asleep - deadlock! func main() { var c chan interface{} select { case c <- 1: default: fmt.Println("hello world") } } // hello world
因此,對(duì)于
select { case newVal := <-c.inChan: c.data = append(c.data, newVal) case c.outChanWrapper() <- c.pop(): }
將會(huì)一直阻塞在 select
那里,直到 inChan
來了數(shù)據(jù)。
012345678910111213141516171819fatal error: all goroutines are asleep - deadlock!
最后,程序 panic
了,因?yàn)樗梨i了。
實(shí)際上 channel
除了 In()
和 Out()
外,還有一個(gè)行為,即 close()
,如果 channel close 后,依舊從其中取元素的話,將會(huì)取出該類型的默認(rèn)值。
func main() { c := make(chan interface{}) close(c) for true { v := <-c fmt.Println(v) time.Sleep(time.Second) } } // output // <nil> // <nil> // <nil> // <nil> func main() { c := make(chan interface{}) close(c) for true { v, isOpen := <-c fmt.Println(v, isOpen) time.Sleep(time.Second) } } // output // <nil> false // <nil> false // <nil> false // <nil> false
我們也需要實(shí)現(xiàn)相同的效果。
func (c *InfiniteChannel) Close() { close(c.inChan) } func (c *InfiniteChannel) background() { for true { select { case newVal, isOpen := <-c.inChan: if isOpen { c.data = append(c.data, newVal) } else { c.isOpen = false } case c.outChanWrapper() <- c.pop(): } } } func NewInfiniteChannel() *InfiniteChannel { c := &InfiniteChannel{ inChan: make(chan interface{}), outChan: make(chan interface{}), isOpen: true, } go c.background() return c } func (c *InfiniteChannel) outChanWrapper() chan interface{} { // 這里添加了對(duì) c.isOpen 的判斷 if c.isOpen && len(c.data) == 0 { return nil } return c.outChan }
再測(cè)試一下
func main() { c := NewInfiniteChannel() go func() { for i := 0; i < 20; i++ { c.In(i) time.Sleep(time.Second) } c.Close() // 這里調(diào)用了 Close }() for i := 0; i < 50; i++ { val := c.Out() fmt.Print(val) time.Sleep(time.Millisecond * 500) } }
// output 012345678910111213141516171819<nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil> Process finished with the exit code 0
符合預(yù)期
目前看上去已經(jīng)很完美了,但是和標(biāo)準(zhǔn)的 channel
相比,仍然有差距。因?yàn)闃?biāo)準(zhǔn)的 channel
是有這種用法的
v,isOpen := <- ch
可以通過 isOpen
變量來獲取 channel
的開閉情況。
因此 InfiniteChannel
也應(yīng)該提供一個(gè)類似的 method
func (c *InfiniteChannel) OutAndIsOpen() (interface{}, bool) { // todo }
可惜的是,要想得知 InfiniteChannel
是否是 Open
的,就必定要訪問 InfiniteChannel
內(nèi)的 isOpen
成員。
type InfiniteChannel struct { inChan chan interface{} outChan chan interface{} data []interface{} isOpen bool }
而 isOpen
并非 channel
類型,根據(jù)之前的套路,這種非 channel
類型的成員只應(yīng)該被 select
協(xié)程訪問。一旦有多個(gè)協(xié)程訪問,就會(huì)出現(xiàn)并發(fā)問題,除非加鎖。
我不能接受!所以干脆不提供這個(gè) method 了,嘿嘿。
完整代碼
func main() { c := NewInfiniteChannel() go func() { for i := 0; i < 20; i++ { c.In(i) time.Sleep(time.Second) } c.Close() }() for i := 0; i < 50; i++ { val := c.Out() fmt.Print(val) time.Sleep(time.Millisecond * 500) } } type InfiniteChannel struct { inChan chan interface{} outChan chan interface{} data []interface{} isOpen bool } func (c *InfiniteChannel) In(val interface{}) { c.inChan <- val } func (c *InfiniteChannel) Out() interface{} { return <-c.outChan } func (c *InfiniteChannel) Close() { close(c.inChan) } func (c *InfiniteChannel) background() { for true { select { case newVal, isOpen := <-c.inChan: if isOpen { c.data = append(c.data, newVal) } else { c.isOpen = false } case c.outChanWrapper() <- c.pop(): } } } func NewInfiniteChannel() *InfiniteChannel { c := &InfiniteChannel{ inChan: make(chan interface{}), outChan: make(chan interface{}), isOpen: true, } go c.background() return c } // 取出隊(duì)列的首個(gè)元素,如果隊(duì)列為空,將會(huì)返回一個(gè) nil func (c *InfiniteChannel) pop() interface{} { if len(c.data) == 0 { return nil } val := c.data[0] c.data = c.data[1:] return val } func (c *InfiniteChannel) outChanWrapper() chan interface{} { if c.isOpen && len(c.data) == 0 { return nil } return c.outChan }
關(guān)于“go如何實(shí)現(xiàn)無(wú)限buffer的channel方法”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。