您好,登錄后才能下訂單哦!
這篇“Go連接池設(shè)計與實現(xiàn)的方法是什么”文章的知識點大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細(xì),步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Go連接池設(shè)計與實現(xiàn)的方法是什么”文章吧。
如果不用連接池,而是每次請求都創(chuàng)建一個連接是比較昂貴的,因此需要完成3次tcp握手
同時在高并發(fā)場景下,由于沒有連接池的最大連接數(shù)限制,可以創(chuàng)建無數(shù)個連接,耗盡文件描述符
連接池就是為了復(fù)用這些創(chuàng)建好的連接
基本上連接池都會設(shè)計以下幾個參數(shù):
初始連接數(shù):在初始化連接池時就會預(yù)先創(chuàng)建好的連接數(shù)量,如果設(shè)置得:
過大:可能造成浪費
過小:請求到來時需要新建連接
最大空閑連接數(shù)maxIdle:池中最大緩存的連接個數(shù),如果設(shè)置得:
過大:造成浪費,自己不用還把持著連接。因為數(shù)據(jù)庫整體的連接數(shù)是有限的,當(dāng)前進程占用多了,其他進程能獲取的就少了
過?。簾o法應(yīng)對突發(fā)流量
最大連接數(shù)maxCap:
如果已經(jīng)用了maxCap個連接,要申請第maxCap+1個連接時,一般會阻塞在那里,直到超時或者別人歸還一個連接
最大空閑時間idleTimeout:當(dāng)發(fā)現(xiàn)某連接空閑超過這個時間時,會將其關(guān)閉,重新去獲取連接
避免連接長時間沒用,自動失效的問題
連接池對外提供兩個方法,Get
:獲取一個連接,Put
:歸還一個連接
大部分連接池的實現(xiàn)大同小異,基本流程如下:
需要注意:
當(dāng)有空閑連接時,需要進一步判斷連接是否有過期(超過最大空閑時間idleTimeout)
這些連接有可能很久沒用過了,在數(shù)據(jù)庫層面已經(jīng)過期。如果貿(mào)然使用可能出現(xiàn)錯誤,因此最好檢查下是否超時
當(dāng)陷入阻塞時,最好設(shè)置超時時間,避免一直沒等到有人歸還連接而一直阻塞
歸還連接時:
先看有沒有阻塞的獲取連接的請求,如果有轉(zhuǎn)交連接,并喚醒阻塞請求
否則看能否放回去空閑隊列,如果不能直接關(guān)閉請求
根據(jù)上面總結(jié)的流程,連接池還需要維護另外兩個結(jié)構(gòu):
空閑隊列
阻塞請求的隊列
數(shù)據(jù)結(jié)構(gòu):
// channelPool 存放連接信息 type channelPool struct { mu sync.RWMutex // 空閑連接 conns chan *idleConn // 產(chǎn)生新連接的方法 factory func() (interface{}, error) // 關(guān)閉連接的方法 close func(interface{}) error ping func(interface{}) error // 最大空閑時間,最大阻塞等待時間(實際沒用到) idleTimeout, waitTimeOut time.Duration // 最大連接數(shù) maxActive int openingConns int // 阻塞的請求 connReqs []chan connReq }
可以看出,silenceper/pool
:
用channel實現(xiàn)了空閑連接隊列conns
為每個阻塞的請求創(chuàng)建一個channel,加入connReqs
中。這樣請求會阻塞在自己的channel上
func (c *channelPool) Get() (interface{}, error) { conns := c.getConns() if conns == nil { return nil, ErrClosed } for { select { // 如果有空閑連接 case wrapConn := <-conns: if wrapConn == nil { return nil, ErrClosed } //判斷是否超時,超時則丟棄 if timeout := c.idleTimeout; timeout > 0 { if wrapConn.t.Add(timeout).Before(time.Now()) { //丟棄并關(guān)閉該連接 c.Close(wrapConn.conn) continue } } //判斷是否失效,失效則丟棄,如果用戶沒有設(shè)定 ping 方法,就不檢查 if c.ping != nil { if err := c.Ping(wrapConn.conn); err != nil { c.Close(wrapConn.conn) continue } } return wrapConn.conn, nil // 沒有空閑連接 default: c.mu.Lock() log.Debugf("openConn %v %v", c.openingConns, c.maxActive) if c.openingConns >= c.maxActive { // 連接數(shù)已經(jīng)達(dá)到上線,不能再創(chuàng)建連接 req := make(chan connReq, 1) c.connReqs = append(c.connReqs, req) c.mu.Unlock() // 將自己阻塞在channel上 ret, ok := <-req if !ok { return nil, ErrMaxActiveConnReached } // 再檢查一次是否超時 if timeout := c.idleTimeout; timeout > 0 { if ret.idleConn.t.Add(timeout).Before(time.Now()) { //丟棄并關(guān)閉該連接 c.Close(ret.idleConn.conn) continue } } return ret.idleConn.conn, nil } // 沒有超過最大連接數(shù),創(chuàng)建一個新的連接 if c.factory == nil { c.mu.Unlock() return nil, ErrClosed } conn, err := c.factory() if err != nil { c.mu.Unlock() return nil, err } c.openingConns++ c.mu.Unlock() return conn, nil } } }
這段代碼基本符合上面介紹的Get流程,應(yīng)該很好理解
需要注意:
當(dāng)收到別人歸還的連接狗,這里再檢查了一次是否超時。但我認(rèn)為這次檢查是沒必要的,因為別人剛用完,一般不可能超時
雖然在pool的數(shù)據(jù)結(jié)構(gòu)定義中有waitTimeOut
字段,但實際沒有使用,即阻塞獲取可能無限期阻塞,這是一個優(yōu)化點
// Put 將連接放回pool中 func (c *channelPool) Put(conn interface{}) error { if conn == nil { return errors.New("connection is nil. rejecting") } c.mu.Lock() if c.conns == nil { c.mu.Unlock() return c.Close(conn) } // 如果有請求在阻塞獲取連接 if l := len(c.connReqs); l > 0 { req := c.connReqs[0] copy(c.connReqs, c.connReqs[1:]) c.connReqs = c.connReqs[:l-1] // 將連接轉(zhuǎn)交 req <- connReq{ idleConn: &idleConn{conn: conn, t: time.Now()}, } c.mu.Unlock() return nil } else { // 否則嘗試是否能放回空閑連接隊列 select { case c.conns <- &idleConn{conn: conn, t: time.Now()}: c.mu.Unlock() return nil default: c.mu.Unlock() //連接池已滿,直接關(guān)閉該連接 return c.Close(conn) } } }
值得注意的是:
put方法喚醒阻塞請求時,從隊頭開始喚醒,這樣先阻塞的請求先被喚醒,保證了公平性
Go在官方庫sql中就實現(xiàn)了連接池,這樣的好處在于:
對于開發(fā):就不用像java一樣,需要自己找第三方的連接池實現(xiàn)
對于driver的實現(xiàn):只用關(guān)心怎么和數(shù)據(jù)庫交互,不用考慮連接池的問題
sql.DB
中和連接池相關(guān)的字段如下:
type DB struct { /** ... */ // 空閑連接隊列 freeConn []*driverConn // 阻塞請求的隊列 connRequests map[uint64]chan connRequest // 已經(jīng)打開的連接 numOpen int // number of opened and pending open connections // 最大空閑連接 maxIdle int // zero means defaultMaxIdleConns; negative means 0 // 最大連接數(shù) maxOpen int // <= 0 means unlimited // ... }
繼續(xù)看獲取連接:
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) { // 檢測連接池是否被關(guān)閉 db.mu.Lock() if db.closed { db.mu.Unlock() return nil, errDBClosed } select { default: // 檢測ctx是否超時 case <-ctx.Done(): db.mu.Unlock() return nil, ctx.Err() } lifetime := db.maxLifetime db.numOpen++ // optimistically db.mu.Unlock() ci, err := db.connector.Connect(ctx) if err != nil { db.mu.Lock() db.numOpen-- // correct for earlier optimism db.maybeOpenNewConnections() db.mu.Unlock() return nil, err } db.mu.Lock() dc := &driverConn{ db: db, createdAt: nowFunc(), ci: ci, inUse: true, } db.addDepLocked(dc, dc) db.mu.Unlock() return dc, nil }
接下來檢測是否有空閑連接:
numFree := len(db.freeConn) // 如果有空閑連接 if strategy == cachedOrNewConn && numFree > 0 { // 從隊頭取一個 conn := db.freeConn[0] copy(db.freeConn, db.freeConn[1:]) db.freeConn = db.freeConn[:numFree-1] conn.inUse = true db.mu.Unlock() if conn.expired(lifetime) { conn.Close() return nil, driver.ErrBadConn } // Reset the session if required. if err := conn.resetSession(ctx); err == driver.ErrBadConn { conn.Close() return nil, driver.ErrBadConn } return conn, nil }
以上代碼是1.14版本,但是到了1.18以后,獲取空閑連接的方式發(fā)生了變化:
last := len(db.freeConn) - 1 if strategy == cachedOrNewConn && last >= 0 { // 從最后一個位置獲取連接 conn := db.freeConn[last] db.freeConn = db.freeConn[:last] conn.inUse = true if conn.expired(lifetime) { db.maxLifetimeClosed++ db.mu.Unlock() conn.Close() return nil, driver.ErrBadConn }
可以看出,1.14版本從隊首獲取,1.18改成從隊尾獲取連接
為啥從隊尾拿連接?
因為隊尾的連接是才放進去的,該連接過期的概率比隊首連接小
繼續(xù)看:
// 如果已經(jīng)達(dá)到最大連接數(shù) if db.maxOpen > 0 && db.numOpen >= db.maxOpen { req := make(chan connRequest, 1) reqKey := db.nextRequestKeyLocked() db.connRequests[reqKey] = req db.waitCount++ db.mu.Unlock() waitStart := time.Now() // 阻塞當(dāng)前請求,要么ctx超時,要么別人歸還了連接 select { case <-ctx.Done(): db.mu.Lock() // 把自己從阻塞隊列中刪除 delete(db.connRequests, reqKey) db.mu.Unlock() atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) select { default: case ret, ok := <-req: if ok && ret.conn != nil { db.putConn(ret.conn, ret.err, false) } } return nil, ctx.Err() case ret, ok := <-req: // 別人歸還連接 atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) if !ok { return nil, errDBClosed } if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) { ret.conn.Close() return nil, driver.ErrBadConn } if ret.conn == nil { return nil, ret.err } return ret.conn, ret.err } }
這里需要注意,在ctx超時分支中:
首先把自己從阻塞隊列中刪除
再檢查一下req中是否有連接,如果有,將連接放回連接池
奇怪的是為啥把自己刪除后,req
還可能收到連接呢?
因為put
連接時,會先拿出一個阻塞連接的req,如果這里刪除req在put拿出req:
之前:那沒問題,put不可能再放該req發(fā)送連接
之后:那有可能put往該req發(fā)送了連接,因此需要再檢查下req中是否有連接,如果有歸還
也解釋了為啥阻塞隊列要用map
:
用于快速找到自己的req,并刪除
最后看看put:
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool { if db.closed { return false } if db.maxOpen > 0 && db.numOpen > db.maxOpen { return false } // 有阻塞的請求,轉(zhuǎn)移連接 if c := len(db.connRequests); c > 0 { var req chan connRequest var reqKey uint64 for reqKey, req = range db.connRequests { break } delete(db.connRequests, reqKey) // Remove from pending requests. if err == nil { dc.inUse = true } req <- connRequest{ conn: dc, err: err, } return true // 判斷能否放回空閑隊列 } else if err == nil && !db.closed { if db.maxIdleConnsLocked() > len(db.freeConn) { db.freeConn = append(db.freeConn, dc) db.startCleanerLocked() return true } db.maxIdleClosed++ } return false }
以上就是關(guān)于“Go連接池設(shè)計與實現(xiàn)的方法是什么”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對大家有幫助,若想了解更多相關(guān)的知識內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。