溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Golang?channel如何應(yīng)用

發(fā)布時(shí)間:2022-10-26 09:53:55 來(lái)源:億速云 閱讀:112 作者:iii 欄目:開(kāi)發(fā)技術(shù)

這篇文章主要介紹“Golang channel如何應(yīng)用”的相關(guān)知識(shí),小編通過(guò)實(shí)際案例向大家展示操作過(guò)程,操作方法簡(jiǎn)單快捷,實(shí)用性強(qiáng),希望這篇“Golang channel如何應(yīng)用”文章能幫助大家解決問(wèn)題。

前言

channel是用于 goroutine 之間的同步、通信的數(shù)據(jù)結(jié)構(gòu)

channel 的底層是通過(guò) mutex 來(lái)控制并發(fā)的,但它為程序員提供了更高一層次的抽象,封裝了更多的功能,這樣并發(fā)編程變得更加容易和安全,得以讓程序員把注意力留到業(yè)務(wù)上去,提升開(kāi)發(fā)效率

channel的用途包括但不限于以下幾點(diǎn):

  • 協(xié)程間通信,同步

  • 定時(shí)任務(wù):和timer結(jié)合

  • 解耦生產(chǎn)方和消費(fèi)方,實(shí)現(xiàn)阻塞隊(duì)列

  • 控制并發(fā)數(shù)

整體結(jié)構(gòu)

Go channel的數(shù)據(jù)結(jié)構(gòu)如下所示:

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters
    lock mutex
}

qcount:已經(jīng)存儲(chǔ)了多少個(gè)元素

dataqsie:最多存儲(chǔ)多少個(gè)元素,即緩沖區(qū)容量

buf:指向緩沖區(qū)的位置,實(shí)際上是一個(gè)數(shù)組

elemsize:每個(gè)元素占多大空間

closed:channel能夠關(guān)閉,這里記錄其關(guān)閉狀態(tài)

elemtype:保存數(shù)據(jù)的類(lèi)型信息,用于go運(yùn)行時(shí)使用

sendx,recvx:

  • 記錄下一個(gè)要發(fā)送到的位置,下一次從哪里還是接收

  • 這里用數(shù)組模擬隊(duì)列,這兩個(gè)變量即表示隊(duì)列的隊(duì)頭,隊(duì)尾

  • 因此channel的緩沖也被稱(chēng)為環(huán)形緩沖區(qū)

recvq,sendq:

當(dāng)發(fā)送個(gè)接收不能立即完成時(shí),需要讓協(xié)程在channel上等待,所以有兩個(gè)等待隊(duì)列,分別針對(duì)接收和發(fā)送

lock:channel支持協(xié)程間并發(fā)訪問(wèn),因此需要一把鎖來(lái)保護(hù)

創(chuàng)建

創(chuàng)建channel會(huì)被編譯器編譯為調(diào)用makechan函數(shù)

// 無(wú)緩沖通道
ch2 := make(chan int)
// 有緩沖通道
ch3 := make(chan int, 10)

會(huì)根據(jù)創(chuàng)建的是帶緩存,還是無(wú)緩沖,決定第二個(gè)參數(shù)size的值

可以看出,創(chuàng)建出來(lái)的是hchan指針,這樣就能在函數(shù)間直接傳遞 channel,而不用傳遞 channel 的指針

func makechan(t *chantype, size int) *hchan {
   elem := t.elem
    
   // mem:緩沖區(qū)大小 
   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {
      panic(plainError( "makechan: size out of range" ))
   }

   var c *hchan
   switch {
   // 緩沖區(qū)大小為空,只申請(qǐng)hchanSize大小的內(nèi)存
   case mem == 0:
       c = (*hchan)(mallocgc(hchanSize, nil, true))
       c.buf = c.raceaddr()
   // 元素類(lèi)型不包含指針,一次性分配hchanSize+mem大小的內(nèi)存
   case elem.ptrdata == 0:
       c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
       c.buf = add(unsafe.Pointer(c), hchanSize)
   // 否則就是帶緩存,且有指針,分配兩次內(nèi)存
   default:
      // Elements contain pointers.
       c = new(hchan)
       c.buf = mallocgc(mem, elem, true)
   }
    
   // 保存元素類(lèi)型,元素大小,容量
   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)
   lockInit(&c.lock, lockRankHchan)
   
   return c
}

發(fā)送

執(zhí)行以下代碼時(shí):

ch <- 3

編譯器會(huì)轉(zhuǎn)化為對(duì)chansend的調(diào)用

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   // 如果channel是空
   if c == nil {
      // 非阻塞,直接返回
      if !block {
         return  false
      }
      // 否則阻塞當(dāng)前協(xié)程
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw( "unreachable" )
   }

   // 非阻塞,沒(méi)有關(guān)閉,且容量滿了,無(wú)法發(fā)送,直接返回
   if !block && c.closed == 0 && full(c) {
      return  false
   }

   // 加鎖
   lock(&c.lock)

   // 如果已經(jīng)關(guān)閉,無(wú)法發(fā)送,直接panic
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError( "send on closed channel" ))
   }

   // 從接收隊(duì)列彈出一個(gè)協(xié)程的包裝結(jié)構(gòu)sudog
   if sg := c.recvq.dequeue(); sg != nil {
      // 如果能彈出,即有等到接收的協(xié)程,說(shuō)明:
      // 該channel要么是無(wú)緩沖,要么緩沖區(qū)為空,不然不可能有協(xié)程在等待
      // 將要發(fā)送的數(shù)據(jù)拷貝到該協(xié)程的接收指針上
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return  true
}

   // 緩沖區(qū)還有空間
   if c.qcount < c.dataqsiz {
      // qp:計(jì)算要發(fā)送到的位置的地址
      qp := chanbuf(c, c.sendx)
      // 將數(shù)據(jù)從ep拷貝到qp
      typedmemmove(c.elemtype, qp, ep)
      // 待發(fā)送位置移動(dòng)
      c.sendx++
      // 由于是數(shù)組模擬隊(duì)列,sendx到頂了需要?dú)w零
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      // 緩沖區(qū)數(shù)量++
      c.qcount++
      unlock(&c.lock)
      return  true
}

   // 往下就是緩沖區(qū)無(wú)數(shù)據(jù),也沒(méi)有等到接收協(xié)程的情況了
   
   // 如果是非阻塞模式,直接返回
   if !block {
      unlock(&c.lock)
      return  false
    }

   // 將當(dāng)前協(xié)程包裝成sudog,阻塞到channel上
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
  
   mysg.elem = ep
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil
   
   // 當(dāng)前協(xié)程進(jìn)入發(fā)送等待隊(duì)列
   c.sendq.enqueue(mysg)
   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
   
 // 被喚醒后從這里開(kāi)始執(zhí)行
   
   KeepAlive(ep)

   if mysg != gp.waiting {
      throw( "G waiting list is corrupted" )
   }
   gp.waiting = nil
   gp.activeStackChans = false
   closed := !mysg.success
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg)
   // 被喚醒后發(fā)現(xiàn)channel關(guān)閉了,panic
   if closed {
      if c.closed == 0 {
         throw( "chansend: spurious wakeup" )
      }
      panic(plainError( "send on closed channel" ))
   }
   return  true
}

整體流程為:

如果當(dāng)前操作為非阻塞,channel沒(méi)有關(guān)閉,且容量滿了,無(wú)法發(fā)送,直接返回

從接收隊(duì)列彈出一個(gè)協(xié)程的包裝結(jié)構(gòu)sudog,如果能彈出,即有等到接收的協(xié)程,說(shuō)明:

  • 該channel要么是無(wú)緩沖,要么緩沖區(qū)為空,不然不可能有協(xié)程在等待

  • 將要發(fā)送的數(shù)據(jù)拷貝到該協(xié)程的接收指針上,返回

  • 這里直接從發(fā)送者拷貝到接收者的內(nèi)存,而不是先把數(shù)據(jù)拷貝到緩沖區(qū),再?gòu)木彌_區(qū)拷貝到接收者,節(jié)約了一次內(nèi)存拷貝

否則看看緩沖區(qū)還有空間,如果有,將數(shù)據(jù)拷貝到緩沖區(qū)上,也返回

接下來(lái)就是既沒(méi)有接收者等待,緩沖區(qū)也為空的情況,就需要將當(dāng)前協(xié)程包裝成sudog,阻塞到channel上

將協(xié)程阻塞到channel的等待隊(duì)列時(shí),將其包裝成了sudog結(jié)構(gòu):

type sudog struct {
   // 協(xié)程
   g *g
   // 前一個(gè),后一個(gè)指針
   next *sudog
   prev *sudog
   // 等到發(fā)送的數(shù)據(jù)在哪,等待從哪個(gè)位置接收數(shù)據(jù)
   elem unsafe.Pointer
   acquiretime int64
   releasetime int64
   ticket      uint32
   isSelect bool
   success bool

   parent   *sudog // semaRoot binary tree
   waitlink *sudog // g.waiting list or semaRoot
   waittail *sudog // semaRoot
   // 在哪個(gè)channel上等待
   c        *hchan // channel
}

其目的是:

  • g本身沒(méi)有存儲(chǔ)前一個(gè),后一個(gè)指針,需要用sudog結(jié)構(gòu)包裝才能加入隊(duì)列

  • elem字段存儲(chǔ)等到發(fā)送的數(shù)據(jù)在哪,等待從哪個(gè)位置接收數(shù)據(jù),用于從數(shù)據(jù)能從協(xié)程到協(xié)程的直接拷貝

來(lái)看看一些子函數(shù):

1.判斷channel是否是滿的

func full(c *hchan) bool {
   // 無(wú)緩沖
   if c.dataqsiz == 0 {
      // 并且沒(méi)有其他協(xié)程在等待
      return c.recvq.first == nil
   }
   // 有緩沖,但容量裝滿了
   return c.qcount == c.dataqsiz
}

2.send方法:

/**
c:要操作的channel
sg:彈出的接收者協(xié)程
ep:要發(fā)送的數(shù)據(jù)在的位置
*/
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   // 如果接收者指針不為空,直接把數(shù)據(jù)從ep拷貝到sg.elem
   if sg.elem != nil {
      sendDirect(c.elemtype, sg, ep)
      sg.elem = nil
   }
   gp := sg.g
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   // 喚醒該接收者協(xié)程
   goready(gp, skip+1)
}

接收

從channel中接收數(shù)據(jù)有幾種寫(xiě)法:

  • 帶不帶ok

  • 接不接收返回值

根據(jù)帶不帶ok,決定用下面哪個(gè)方法

func chanrecv1(c *hchan, elem unsafe.Pointer) {
        chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
        _, received = chanrecv(c, elem, true)
        return
}

根據(jù)接不接收返回值,決定elem是不是nil

最終都會(huì)調(diào)用chanrecv方法:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 如果channel為nil,根據(jù)參數(shù)中是否阻塞來(lái)決定是否阻塞
   if c == nil {
      if !block {
         return
   }
      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      throw( "unreachable" )
   }

   // 非阻塞,并且channel為空
   if !block && empty(c) {
      // 如果還沒(méi)關(guān)閉,直接返回
   if atomic.Load(&c.closed) == 0 {
      return
   }
      // 否則已經(jīng)關(guān)閉,
      // 如果為空,返回該類(lèi)型的零值
   if empty(c) {
     if ep != nil {
        typedmemclr(c.elemtype, ep)
     }
     return  true, false
       }
   }

   lock(&c.lock)
   
   // 同樣,如果channel已經(jīng)關(guān)閉,且緩沖區(qū)沒(méi)有元素,返回該類(lèi)型零值
   if c.closed != 0 && c.qcount == 0 {
      unlock(&c.lock)
      if ep != nil {
         typedmemclr(c.elemtype, ep)
      }
      return  true, false
}
    
   // 如果有發(fā)送者正在阻塞,說(shuō)明:
   // 1.無(wú)緩沖
   // 2.有緩沖,但緩沖區(qū)滿了。因?yàn)橹挥芯彌_區(qū)滿了,才可能有發(fā)送者在等待
   if sg := c.sendq.dequeue(); sg != nil {
      // 將數(shù)據(jù)從緩沖區(qū)拷貝到ep,再將sg的數(shù)據(jù)拷貝到緩沖區(qū),該函數(shù)詳細(xì)流程可看下文
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return  true, true
}
    
   // 如果緩存區(qū)有數(shù)據(jù), 
   if c.qcount > 0 {
      // qp為緩沖區(qū)中下一次接收的位置
      qp := chanbuf(c, c.recvx)
      // 將數(shù)據(jù)從qp拷貝到ep
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      typedmemclr(c.elemtype, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return  true, true
}

   // 接下來(lái)就是既沒(méi)有發(fā)送者在等待,也緩沖區(qū)也沒(méi)數(shù)據(jù)
   if !block {
      unlock(&c.lock)
      return  false, false
}

   // 將當(dāng)前協(xié)程包裝成sudog,阻塞到channel中
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   // 記錄接收地址
   mysg.elem = ep
   mysg.waitlink = nil
   gp.waiting = mysg
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.param = nil
   c.recvq.enqueue(mysg)

   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive,        traceEvGoBlockRecv, 2)

   // 從這里喚醒
   if mysg != gp.waiting {
      throw( "G waiting list is corrupted" )
   }
   gp.waiting = nil
   gp.activeStackChans = false
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   success := mysg.success
   gp.param = nil
   mysg.c = nil
   releaseSudog(mysg)
   return  true, success
}

接收流程如為:

如果channel為nil,根據(jù)參數(shù)中是否阻塞來(lái)決定是否阻塞

如果channel已經(jīng)關(guān)閉,且緩沖區(qū)沒(méi)有元素,返回該類(lèi)型零值

如果有發(fā)送者正在阻塞,說(shuō)明:

  • 要么是無(wú)緩沖

  • 有緩沖,但緩沖區(qū)滿了。因?yàn)?strong>只有緩沖區(qū)滿了,才可能有發(fā)送者在等待

  • 將數(shù)據(jù)從緩沖區(qū)拷貝到ep,再將發(fā)送者的數(shù)據(jù)拷貝到緩沖區(qū),并喚該發(fā)送者

如果緩存區(qū)有數(shù)據(jù), 則從緩沖區(qū)將數(shù)據(jù)復(fù)制到ep,返回

接下來(lái)就是既沒(méi)有發(fā)送者在等待,也緩沖區(qū)也沒(méi)數(shù)據(jù)的情況:

將當(dāng)前協(xié)程包裝成sudog,阻塞到channel中

來(lái)看其中的子函數(shù)recv():

/**
c:操作的channel
sg:阻塞的發(fā)送協(xié)程
ep:接收者接收數(shù)據(jù)的地址
*/
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   // 如果是無(wú)緩沖channel,直接將數(shù)據(jù)從發(fā)送者sg拷貝到ep
   if c.dataqsiz == 0 {
      if ep != nil {
         recvDirect(c.elemtype, sg, ep)
      }
   // 接下來(lái)是有緩沖,且緩沖區(qū)滿的情況   
   } else {
      // qp為channel緩沖區(qū)中,接收者下一次接收的地址
   qp := chanbuf(c, c.recvx)
      // 將數(shù)據(jù)從qp拷貝到ep
   if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
    }
    // 將發(fā)送者的數(shù)據(jù)從sg.elem拷貝到qp
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
       c.recvx = 0
    }
    // 由于一接收已發(fā)送,緩沖區(qū)還是滿的,因此 c.sendx = c.recvx
    c.sendx = c.recvx 
}
   sg.elem = nil
   gp := sg.g
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   // 喚醒發(fā)送者
   goready(gp, skip+1)
}

關(guān)閉

func closechan(c *hchan) {
   // 不能關(guān)閉空channel
   if c == nil {
      panic(plainError( "close of nil channel" ))
   }

   lock(&c.lock)
   // 不能重復(fù)關(guān)閉
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError( "close of closed channel" ))
   }

   // 修改關(guān)閉狀態(tài)
   c.closed = 1

   var glist gList

   // 釋放所有的接收者協(xié)程,并為它們賦予零值
 for {
      sg := c.recvq.dequeue()
      if sg == nil {
         break
      }
      if sg.elem != nil {
         typedmemclr(c.elemtype, sg.elem)
         sg.elem = nil
      }
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = unsafe.Pointer(sg)
      sg.success = false
      glist.push(gp)
   }

   // 釋放所有的發(fā)送者協(xié)程
 for {
      sg := c.sendq.dequeue()
      if sg == nil {
         break
     }
      sg.elem = nil
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = unsafe.Pointer(sg)
      sg.success = false
      glist.push(gp)
   }
   unlock(&c.lock)

   // 執(zhí)行喚醒操作
 for !glist.empty() {
      gp := glist.pop()
      gp.schedlink = 0
      goready(gp, 3)
   }
}

關(guān)閉的流程比較簡(jiǎn)單,可以看出:

不能關(guān)閉空channel,不能重復(fù)關(guān)閉channel

先上一把大鎖,接著把所有掛在這個(gè) channel 上的 sender 和 receiver 全都連成一個(gè) sudog 鏈表,再解鎖。最后,再將所有的 sudog 全都喚醒:

接收者:會(huì)收到該類(lèi)型的零值

這里返回零值沒(méi)有問(wèn)題,因?yàn)橹赃@些接收者會(huì)阻塞,就是因?yàn)榫彌_區(qū)沒(méi)有數(shù)據(jù),因此channel關(guān)閉后該接收者收到零值也符合邏輯

發(fā)送者:會(huì)被喚醒,然后panic

因此不能在有多個(gè)sender的時(shí)候貿(mào)然關(guān)閉channel

關(guān)于“Golang channel如何應(yīng)用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí),可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會(huì)為大家更新不同的知識(shí)點(diǎn)。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI