溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Go連接池設(shè)計與實現(xiàn)的方法是什么

發(fā)布時間:2023-05-08 14:49:04 來源:億速云 閱讀:211 作者:zzz 欄目:開發(fā)技術(shù)

這篇“Go連接池設(shè)計與實現(xiàn)的方法是什么”文章的知識點大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細(xì),步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Go連接池設(shè)計與實現(xiàn)的方法是什么”文章吧。

為什么需要連接池

如果不用連接池,而是每次請求都創(chuàng)建一個連接是比較昂貴的,因此需要完成3次tcp握手

同時在高并發(fā)場景下,由于沒有連接池的最大連接數(shù)限制,可以創(chuàng)建無數(shù)個連接,耗盡文件描述符

連接池就是為了復(fù)用這些創(chuàng)建好的連接

連接池設(shè)計

基本上連接池都會設(shè)計以下幾個參數(shù):

初始連接數(shù):在初始化連接池時就會預(yù)先創(chuàng)建好的連接數(shù)量,如果設(shè)置得:

  • 過大:可能造成浪費

  • 過小:請求到來時需要新建連接

最大空閑連接數(shù)maxIdle:池中最大緩存的連接個數(shù),如果設(shè)置得:

  • 過大:造成浪費,自己不用還把持著連接。因為數(shù)據(jù)庫整體的連接數(shù)是有限的,當(dāng)前進程占用多了,其他進程能獲取的就少了

  • 過?。簾o法應(yīng)對突發(fā)流量

最大連接數(shù)maxCap

  • 如果已經(jīng)用了maxCap個連接,要申請第maxCap+1個連接時,一般會阻塞在那里,直到超時或者別人歸還一個連接

最大空閑時間idleTimeout:當(dāng)發(fā)現(xiàn)某連接空閑超過這個時間時,會將其關(guān)閉,重新去獲取連接

避免連接長時間沒用,自動失效的問題

連接池對外提供兩個方法,Get:獲取一個連接,Put:歸還一個連接

大部分連接池的實現(xiàn)大同小異,基本流程如下:

Get

Go連接池設(shè)計與實現(xiàn)的方法是什么

需要注意:

  • 當(dāng)有空閑連接時,需要進一步判斷連接是否有過期(超過最大空閑時間idleTimeout)

    • 這些連接有可能很久沒用過了,在數(shù)據(jù)庫層面已經(jīng)過期。如果貿(mào)然使用可能出現(xiàn)錯誤,因此最好檢查下是否超時

  • 當(dāng)陷入阻塞時,最好設(shè)置超時時間,避免一直沒等到有人歸還連接而一直阻塞

Put

Go連接池設(shè)計與實現(xiàn)的方法是什么

歸還連接時:

  • 先看有沒有阻塞的獲取連接的請求,如果有轉(zhuǎn)交連接,并喚醒阻塞請求

  • 否則看能否放回去空閑隊列,如果不能直接關(guān)閉請求

總結(jié)

根據(jù)上面總結(jié)的流程,連接池還需要維護另外兩個結(jié)構(gòu):

  • 空閑隊列

  • 阻塞請求的隊列

Go連接池設(shè)計與實現(xiàn)的方法是什么

開源實現(xiàn)

數(shù)據(jù)結(jié)構(gòu):

// channelPool 存放連接信息
type channelPool struct {
   mu                       sync.RWMutex
   // 空閑連接
   conns                    chan *idleConn
   // 產(chǎn)生新連接的方法
   factory                  func() (interface{}, error)
   // 關(guān)閉連接的方法
   close                    func(interface{}) error
   ping                     func(interface{}) error
   // 最大空閑時間,最大阻塞等待時間(實際沒用到)
   idleTimeout, waitTimeOut time.Duration
   // 最大連接數(shù)
   maxActive                int
   openingConns             int
   // 阻塞的請求
   connReqs                 []chan connReq
}

可以看出,silenceper/pool

  • 用channel實現(xiàn)了空閑連接隊列conns

  • 為每個阻塞的請求創(chuàng)建一個channel,加入connReqs中。這樣請求會阻塞在自己的channel上

Get:

func (c *channelPool) Get() (interface{}, error) {
   conns := c.getConns()
   if conns == nil {
      return nil, ErrClosed
   }
   for {
      select {
      // 如果有空閑連接
      case wrapConn := <-conns:
         if wrapConn == nil {
            return nil, ErrClosed
         }
         //判斷是否超時,超時則丟棄
         if timeout := c.idleTimeout; timeout > 0 {
            if wrapConn.t.Add(timeout).Before(time.Now()) {
               //丟棄并關(guān)閉該連接
               c.Close(wrapConn.conn)
               continue
            }
         }
         //判斷是否失效,失效則丟棄,如果用戶沒有設(shè)定 ping 方法,就不檢查
         if c.ping != nil {
            if err := c.Ping(wrapConn.conn); err != nil {
               c.Close(wrapConn.conn)
               continue
            }
         }
         return wrapConn.conn, nil
      // 沒有空閑連接
      default:
         c.mu.Lock()
         log.Debugf("openConn %v %v", c.openingConns, c.maxActive)
         if c.openingConns >= c.maxActive {
            // 連接數(shù)已經(jīng)達(dá)到上線,不能再創(chuàng)建連接
            req := make(chan connReq, 1)
            c.connReqs = append(c.connReqs, req)
            c.mu.Unlock()
            // 將自己阻塞在channel上
            ret, ok := <-req
            if !ok {
               return nil, ErrMaxActiveConnReached
            }
            // 再檢查一次是否超時
            if timeout := c.idleTimeout; timeout > 0 {
               if ret.idleConn.t.Add(timeout).Before(time.Now()) {
                  //丟棄并關(guān)閉該連接
                  c.Close(ret.idleConn.conn)
                  continue
               }
            }
            return ret.idleConn.conn, nil
         }
         
         // 沒有超過最大連接數(shù),創(chuàng)建一個新的連接
         if c.factory == nil {
            c.mu.Unlock()
            return nil, ErrClosed
         }
         conn, err := c.factory()
         if err != nil {
            c.mu.Unlock()
            return nil, err
         }
         c.openingConns++
         c.mu.Unlock()
         return conn, nil
      }
   }
}

這段代碼基本符合上面介紹的Get流程,應(yīng)該很好理解

需要注意:

  • 當(dāng)收到別人歸還的連接狗,這里再檢查了一次是否超時。但我認(rèn)為這次檢查是沒必要的,因為別人剛用完,一般不可能超時

  • 雖然在pool的數(shù)據(jù)結(jié)構(gòu)定義中有waitTimeOut字段,但實際沒有使用,即阻塞獲取可能無限期阻塞,這是一個優(yōu)化點

Put:

// Put 將連接放回pool中
func (c *channelPool) Put(conn interface{}) error {
   if conn == nil {
      return errors.New("connection is nil. rejecting")
   }

   c.mu.Lock()

   if c.conns == nil {
      c.mu.Unlock()
      return c.Close(conn)
   }

   // 如果有請求在阻塞獲取連接
   if l := len(c.connReqs); l > 0 {
      req := c.connReqs[0]
      copy(c.connReqs, c.connReqs[1:])
      c.connReqs = c.connReqs[:l-1]
      // 將連接轉(zhuǎn)交
      req <- connReq{
         idleConn: &idleConn{conn: conn, t: time.Now()},
      }
      c.mu.Unlock()
      return nil
   } else {
      // 否則嘗試是否能放回空閑連接隊列
      select {
      case c.conns <- &idleConn{conn: conn, t: time.Now()}:
         c.mu.Unlock()
         return nil
      default:
         c.mu.Unlock()
         //連接池已滿,直接關(guān)閉該連接
         return c.Close(conn)
      }
   }
}

值得注意的是:

put方法喚醒阻塞請求時,從隊頭開始喚醒,這樣先阻塞的請求先被喚醒,保證了公平性

sql.DB

Go在官方庫sql中就實現(xiàn)了連接池,這樣的好處在于:

  • 對于開發(fā):就不用像java一樣,需要自己找第三方的連接池實現(xiàn)

  • 對于driver的實現(xiàn):只用關(guān)心怎么和數(shù)據(jù)庫交互,不用考慮連接池的問題

sql.DB中和連接池相關(guān)的字段如下:

type DB struct {
   /**
   ...
   */
   
   // 空閑連接隊列
   freeConn     []*driverConn
   // 阻塞請求的隊列
   connRequests map[uint64]chan connRequest
   
   // 已經(jīng)打開的連接
   numOpen      int    // number of opened and pending open connections
   // 最大空閑連接
   maxIdle           int                    // zero means defaultMaxIdleConns; negative means 0
   // 最大連接數(shù)
   maxOpen           int                    // <= 0 means unlimited
   // ...
}

繼續(xù)看獲取連接:

func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
   // 檢測連接池是否被關(guān)閉
   db.mu.Lock()
   if db.closed {
      db.mu.Unlock()
      return nil, errDBClosed
   }

   select {
   default:
   // 檢測ctx是否超時
   case <-ctx.Done():
      db.mu.Unlock()
      return nil, ctx.Err()
   }
   lifetime := db.maxLifetime

   
   
   db.numOpen++ // optimistically
   db.mu.Unlock()
   ci, err := db.connector.Connect(ctx)
   if err != nil {
      db.mu.Lock()
      db.numOpen-- // correct for earlier optimism
      db.maybeOpenNewConnections()
      db.mu.Unlock()
      return nil, err
   }
   db.mu.Lock()
   dc := &driverConn{
      db:        db,
      createdAt: nowFunc(),
      ci:        ci,
      inUse:     true,
   }
   db.addDepLocked(dc, dc)
   db.mu.Unlock()
   return dc, nil
}

接下來檢測是否有空閑連接:

  numFree := len(db.freeConn)
   // 如果有空閑連接
   if strategy == cachedOrNewConn && numFree > 0 {
      // 從隊頭取一個
      conn := db.freeConn[0]
      copy(db.freeConn, db.freeConn[1:])
      db.freeConn = db.freeConn[:numFree-1]
      conn.inUse = true
      db.mu.Unlock()
      if conn.expired(lifetime) {
         conn.Close()
         return nil, driver.ErrBadConn
      }

      // Reset the session if required.
      if err := conn.resetSession(ctx); err == driver.ErrBadConn {
         conn.Close()
         return nil, driver.ErrBadConn
      }

      return conn, nil
   }

以上代碼是1.14版本,但是到了1.18以后,獲取空閑連接的方式發(fā)生了變化:

last := len(db.freeConn) - 1
if strategy == cachedOrNewConn && last >= 0 {
   // 從最后一個位置獲取連接
   conn := db.freeConn[last]
   db.freeConn = db.freeConn[:last]
   conn.inUse = true
   if conn.expired(lifetime) {
      db.maxLifetimeClosed++
      db.mu.Unlock()
      conn.Close()
      return nil, driver.ErrBadConn
   }

可以看出,1.14版本從隊首獲取,1.18改成從隊尾獲取連接

為啥從隊尾拿連接?

因為隊尾的連接是才放進去的,該連接過期概率比隊首連接

繼續(xù)看:

   // 如果已經(jīng)達(dá)到最大連接數(shù)
   if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
      req := make(chan connRequest, 1)
      reqKey := db.nextRequestKeyLocked()
      db.connRequests[reqKey] = req
      db.waitCount++
      db.mu.Unlock()

      waitStart := time.Now()
      // 阻塞當(dāng)前請求,要么ctx超時,要么別人歸還了連接
      select {
      case <-ctx.Done():
         db.mu.Lock()
         // 把自己從阻塞隊列中刪除
         delete(db.connRequests, reqKey)
         db.mu.Unlock()

         atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

         select {
         default:
         case ret, ok := <-req:
            if ok && ret.conn != nil {
               db.putConn(ret.conn, ret.err, false)
            }
         }
         return nil, ctx.Err()
      case ret, ok := <-req:
         // 別人歸還連接
         atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

         if !ok {
            return nil, errDBClosed
         }
         if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
            ret.conn.Close()
            return nil, driver.ErrBadConn
         }
         if ret.conn == nil {
            return nil, ret.err
         }

         return ret.conn, ret.err
      }
   }

這里需要注意,在ctx超時分支中:

  • 首先把自己從阻塞隊列中刪除

  • 再檢查一下req中是否有連接,如果有,將連接放回連接池

奇怪的是為啥把自己刪除后,req還可能收到連接呢?

因為put連接時,會先拿出一個阻塞連接的req,如果這里刪除req在put拿出req:

  • 之前:那沒問題,put不可能再放該req發(fā)送連接

  • 之后:那有可能put往該req發(fā)送了連接,因此需要再檢查下req中是否有連接,如果有歸還

也解釋了為啥阻塞隊列要用map

  • 用于快速找到自己的req,并刪除

最后看看put:

func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
   if db.closed {
      return false
   }
   if db.maxOpen > 0 && db.numOpen > db.maxOpen {
      return false
   }
   
   // 有阻塞的請求,轉(zhuǎn)移連接
   if c := len(db.connRequests); c > 0 {
      var req chan connRequest
      var reqKey uint64
      for reqKey, req = range db.connRequests {
         break
      }
      delete(db.connRequests, reqKey) // Remove from pending requests.
      if err == nil {
         dc.inUse = true
      }
      req <- connRequest{
         conn: dc,
         err:  err,
      }
      return true
      
      
   // 判斷能否放回空閑隊列   
   } else if err == nil && !db.closed {
      if db.maxIdleConnsLocked() > len(db.freeConn) {
         db.freeConn = append(db.freeConn, dc)
         db.startCleanerLocked()
         return true
      }
      db.maxIdleClosed++
   }
   return false
}

以上就是關(guān)于“Go連接池設(shè)計與實現(xiàn)的方法是什么”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對大家有幫助,若想了解更多相關(guān)的知識內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

go
AI