您好,登錄后才能下訂單哦!
這篇文章主要介紹了Golang如何實現(xiàn)連接池,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
因為TCP的三只握手等等原因,建立一個連接是一件成本比較高的行為。所以在一個需要多次與特定實體交互的程序中,就需要維持一個連接池,里面有可以復(fù)用的連接可供重復(fù)使用。
而維持一個連接池,最基本的要求就是要做到:thread safe(線程安全),尤其是在Golang這種特性是goroutine的語言中。
type Pool struct { m sync.Mutex // 保證多個goroutine訪問時候,closed的線程安全 res chan io.Closer //連接存儲的chan factory func() (io.Closer,error) //新建連接的工廠方法 closed bool //連接池關(guān)閉標(biāo)志 }
這個簡單的連接池,我們利用chan來存儲池里的連接。而新建結(jié)構(gòu)體的方法也比較簡單:
func New(fn func() (io.Closer, error), size uint) (*Pool, error) { if size <= 0 { return nil, errors.New("size的值太小了。") } return &Pool{ factory: fn, res: make(chan io.Closer, size), }, nil }
只需要提供對應(yīng)的工廠函數(shù)和連接池的大小就可以了。
獲取連接
那么我們要怎么從中獲取資源呢?因為我們內(nèi)部存儲連接的結(jié)構(gòu)是chan,所以只需要簡單的select就可以保證線程安全:
//從資源池里獲取一個資源 func (p *Pool) Acquire() (io.Closer,error) { select { case r,ok := <-p.res: log.Println("Acquire:共享資源") if !ok { return nil,ErrPoolClosed } return r,nil default: log.Println("Acquire:新生成資源") return p.factory() } }
我們先從連接池的res這個chan里面獲取,如果沒有的話我們就利用我們早已經(jīng)準(zhǔn)備好的工廠函數(shù)進(jìn)行構(gòu)造連接。同時我們在從res獲取連接的時候利用ok先確定了這個連接池是否已經(jīng)關(guān)閉。如果已經(jīng)關(guān)閉的話我們就返回早已經(jīng)準(zhǔn)備好的連接已關(guān)閉錯誤。
關(guān)閉連接池
那么既然提到關(guān)閉連接池,我們是怎么樣關(guān)閉連接池的呢?
//關(guān)閉資源池,釋放資源 func (p *Pool) Close() { p.m.Lock() defer p.m.Unlock() if p.closed { return } p.closed = true //關(guān)閉通道,不讓寫入了 close(p.res) //關(guān)閉通道里的資源 for r:=range p.res { r.Close() } }
這邊我們需要先進(jìn)行p.m.Lock()上鎖操作,這么做是因為我們需要對結(jié)構(gòu)體里面的closed進(jìn)行讀寫。需要先把這個標(biāo)志位設(shè)定后,關(guān)閉res這個chan,使得Acquire方法無法再獲取新的連接。我們再對res這個chan里面的連接進(jìn)行Close操作。
釋放連接
釋放連接首先得有個前提,就是連接池還沒有關(guān)閉。如果連接池已經(jīng)關(guān)閉再往res里面送連接的話就好觸發(fā)panic。
func (p *Pool) Release(r io.Closer){ //保證該操作和Close方法的操作是安全的 p.m.Lock() defer p.m.Unlock() //資源池都關(guān)閉了,就省這一個沒有釋放的資源了,釋放即可 if p.closed { r.Close() return } select { case p.res <- r: log.Println("資源釋放到池子里了") default: log.Println("資源池滿了,釋放這個資源吧") r.Close() } }
以上就是一個簡單且線程安全的連接池實現(xiàn)方式了。我們可以看到的是,現(xiàn)在連接池雖然已經(jīng)實現(xiàn)了,但是還有幾個小缺點:
我們對連接最大的數(shù)量沒有限制,如果線程池空的話都我們默認(rèn)就直接新建一個連接返回了。一旦并發(fā)量高的話將會不斷新建連接,很容易(尤其是MySQL)造成too many connections的報錯發(fā)生。
既然我們需要保證最大可獲取連接數(shù)量,那么我們就不希望數(shù)量定的太死。希望空閑的時候可以維護(hù)一定的空閑連接數(shù)量idleNum,但是又希望我們能限制最大可獲取連接數(shù)量maxNum。
第一種情況是并發(fā)過多的情況,那么如果并發(fā)量過少呢?現(xiàn)在我們在新建一個連接并且歸還后,我們很長一段時間不再使用這個連接。那么這個連接很有可能在幾個小時甚至更長時間之前就已經(jīng)建立的了。長時間閑置的連接我們并沒有辦法保證它的可用性。便有可能我們下次獲取的連接是已經(jīng)失效的連接。
那么我們可以從已經(jīng)成熟使用的MySQL連接池庫和Redis連接池庫中看看,它們是怎么解決這些問題的。
Golang的連接池實現(xiàn)在標(biāo)準(zhǔn)庫database/sql/sql.go下。當(dāng)我們運(yùn)行:
db, err := sql.Open("mysql", "xxxx")
的時候,就會打開一個連接池。我們可以看看返回的db的結(jié)構(gòu)體:
type DB struct { waitDuration int64 // Total time waited for new connections. mu sync.Mutex // protects following fields freeConn []*driverConn connRequests map[uint64]chan connRequest nextRequest uint64 // Next key to use in connRequests. numOpen int // number of opened and pending open connections // Used to signal the need for new connections // a goroutine running connectionOpener() reads on this chan and // maybeOpenNewConnections sends on the chan (one send per needed connection) // It is closed during db.Close(). The close tells the connectionOpener // goroutine to exit. openerCh chan struct{} closed bool maxIdle int // zero means defaultMaxIdleConns; negative means 0 maxOpen int // <= 0 means unlimited maxLifetime time.Duration // maximum amount of time a connection may be reused cleanerCh chan struct{} waitCount int64 // Total number of connections waited for. maxIdleClosed int64 // Total number of connections closed due to idle. maxLifetimeClosed int64 // Total number of connections closed due to max free limit. }
上面省去了一些暫時不需要關(guān)注的field。我們可以看的,DB這個連接池內(nèi)部存儲連接的結(jié)構(gòu)freeConn,并不是我們之前使用的chan,而是**[]driverConn**,一個連接切片。同時我們還可以看到,里面有maxIdle等相關(guān)變量來控制空閑連接數(shù)量。值得注意的是,DB的初始化函數(shù)Open函數(shù)并沒有新建數(shù)據(jù)庫連接。而新建連接在哪個函數(shù)呢?我們可以在Query方法一路往回找,我們可以看到這個函數(shù):func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error)。而我們從連接池獲取連接的方法,就從這里開始:
獲取連接
// conn returns a newly-opened or cached *driverConn. func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) { // 先判斷db是否已經(jīng)關(guān)閉。 db.mu.Lock() if db.closed { db.mu.Unlock() return nil, errDBClosed } // 注意檢測context是否已經(jīng)被超時等原因被取消。 select { default: case <-ctx.Done(): db.mu.Unlock() return nil, ctx.Err() } lifetime := db.maxLifetime // 這邊如果在freeConn這個切片有空閑連接的話,就left pop一個出列。注意的是,這邊因為是切片操作,所以需要前面需要加鎖且獲取后進(jìn)行解鎖操作。同時判斷返回的連接是否已經(jīng)過期。 numFree := len(db.freeConn) if strategy == cachedOrNewConn && numFree > 0 { conn := db.freeConn[0] copy(db.freeConn, db.freeConn[1:]) db.freeConn = db.freeConn[:numFree-1] conn.inUse = true db.mu.Unlock() if conn.expired(lifetime) { conn.Close() return nil, driver.ErrBadConn } // Lock around reading lastErr to ensure the session resetter finished. conn.Lock() err := conn.lastErr conn.Unlock() if err == driver.ErrBadConn { conn.Close() return nil, driver.ErrBadConn } return conn, nil } // 這邊就是等候獲取連接的重點了。當(dāng)空閑的連接為空的時候,這邊將會新建一個request(的等待連接 的請求)并且開始等待 if db.maxOpen > 0 && db.numOpen >= db.maxOpen { // 下面的動作相當(dāng)于往connRequests這個map插入自己的號碼牌。 // 插入號碼牌之后這邊就不需要阻塞等待繼續(xù)往下走邏輯。 req := make(chan connRequest, 1) reqKey := db.nextRequestKeyLocked() db.connRequests[reqKey] = req db.waitCount++ db.mu.Unlock() waitStart := time.Now() // Timeout the connection request with the context. select { case <-ctx.Done(): // context取消操作的時候,記得從connRequests這個map取走自己的號碼牌。 db.mu.Lock() delete(db.connRequests, reqKey) db.mu.Unlock() atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) select { default: case ret, ok := <-req: // 這邊值得注意了,因為現(xiàn)在已經(jīng)被context取消了。但是剛剛放了自己的號碼牌進(jìn)去排隊里面。意思是說不定已經(jīng)發(fā)了連接了,所以得注意歸還! if ok && ret.conn != nil { db.putConn(ret.conn, ret.err, false) } } return nil, ctx.Err() case ret, ok := <-req: // 下面是已經(jīng)獲得連接后的操作了。檢測一下獲得連接的狀況。因為有可能已經(jīng)過期了等等。 atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) if !ok { return nil, errDBClosed } if ret.err == nil && ret.conn.expired(lifetime) { ret.conn.Close() return nil, driver.ErrBadConn } if ret.conn == nil { return nil, ret.err } ret.conn.Lock() err := ret.conn.lastErr ret.conn.Unlock() if err == driver.ErrBadConn { ret.conn.Close() return nil, driver.ErrBadConn } return ret.conn, ret.err } } // 下面就是如果上面說的限制情況不存在,可以創(chuàng)建先連接時候,要做的創(chuàng)建連接操作了。 db.numOpen++ // optimistically db.mu.Unlock() ci, err := db.connector.Connect(ctx) if err != nil { db.mu.Lock() db.numOpen-- // correct for earlier optimism db.maybeOpenNewConnections() db.mu.Unlock() return nil, err } db.mu.Lock() dc := &driverConn{ db: db, createdAt: nowFunc(), ci: ci, inUse: true, } db.addDepLocked(dc, dc) db.mu.Unlock() return dc, nil }
簡單來說,DB結(jié)構(gòu)體除了用的是slice來存儲連接,還加了一個類似排隊機(jī)制的connRequests來解決獲取等待連接的過程。同時在判斷連接健康性都有很好的兼顧。那么既然有了排隊機(jī)制,歸還連接的時候是怎么做的呢?
釋放連接
我們可以直接找到func (db *DB) putConnDBLocked(dc *driverConn, err error) bool這個方法。就像注釋說的,這個方法主要的目的是:
Satisfy a connRequest or put the driverConn in the idle pool and return true or return false.
我們主要來看看里面重點那幾行:
... // 如果已經(jīng)超過最大打開數(shù)量了,就不需要在回歸pool了 if db.maxOpen > 0 && db.numOpen > db.maxOpen { return false } // 這邊是重點了,基本來說就是從connRequest這個map里面隨機(jī)抽一個在排隊等著的請求。取出來后發(fā)給他。就不用歸還池子了。 if c := len(db.connRequests); c > 0 { var req chan connRequest var reqKey uint64 for reqKey, req = range db.connRequests { break } delete(db.connRequests, reqKey) // 刪除這個在排隊的請求。 if err == nil { dc.inUse = true } // 把連接給這個正在排隊的連接。 req <- connRequest{ conn: dc, err: err, } return true } else if err == nil && !db.closed { // 既然沒人排隊,就看看到了最大連接數(shù)目沒有。沒到就歸還給freeConn。 if db.maxIdleConnsLocked() > len(db.freeConn) { db.freeConn = append(db.freeConn, dc) db.startCleanerLocked() return true } db.maxIdleClosed++ } ...
我們可以看到,當(dāng)歸還連接時候,如果有在排隊輪候的請求就不歸還給池子直接發(fā)給在輪候的人了。
現(xiàn)在基本就解決前面說的小問題了。不會出現(xiàn)連接太多導(dǎo)致無法控制too many connections的情況。也很好了維持了連接池的最小數(shù)量。同時也做了相關(guān)對于連接健康性的檢查操作。
值得注意的是,作為標(biāo)準(zhǔn)庫的代碼,相關(guān)注釋和代碼都非常完美,真的可以看的神清氣爽。
這個Golang實現(xiàn)的Redis客戶端,是怎么實現(xiàn)連接池的。這邊的思路非常奇妙,還是能學(xué)習(xí)到不少好思路。當(dāng)然了,由于代碼注釋比較少,啃起來第一下還是有點迷糊的。相關(guān)代碼地址在https://github.com/go-redis/redis/blob/master/internal/pool/pool.go 可以看到。
而它的連接池結(jié)構(gòu)如下
type ConnPool struct { ... queue chan struct{} connsMu sync.Mutex conns []*Conn idleConns []*Conn poolSize int idleConnsLen int stats Stats _closed uint32 // atomic closedCh chan struct{} }
我們可以看到里面存儲連接的結(jié)構(gòu)還是slice。但是我們可以重點看看queue,conns,idleConns這幾個變量,后面會提及到。但是值得注意的是!我們可以看到,這里有兩個**[]Conn**結(jié)構(gòu):conns、idleConns,那么問題來了:
到底連接存在哪里?
新建連接池連接
我們先從新建連接池連接開始看:
func NewConnPool(opt *Options) *ConnPool { .... p.checkMinIdleConns() if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { go p.reaper(opt.IdleCheckFrequency) } .... }
初始化連接池的函數(shù)有個和前面兩個不同的地方。
checkMinIdleConns方法,在連接池初始化的時候就會往連接池填滿空閑的連接。
go p.reaper(opt.IdleCheckFrequency)則會在初始化連接池的時候就會起一個go程,周期性的淘汰連接池里面要被淘汰的連接。
獲取連接
func (p *ConnPool) Get(ctx context.Context) (*Conn, error) { if p.closed() { return nil, ErrClosed } //這邊和前面sql獲取連接函數(shù)的流程先不同。sql是先看看連接池有沒有空閑連接,有的話先獲取不到再排隊。這邊是直接先排隊獲取令牌,排隊函數(shù)后面會分析。 err := p.waitTurn(ctx) if err != nil { return nil, err } //前面沒出error的話,就已經(jīng)排隊輪候到了。接下來就是獲取的流程。 for { p.connsMu.Lock() //從空閑連接里面先獲取一個空閑連接。 cn := p.popIdle() p.connsMu.Unlock() if cn == nil { // 沒有空閑連接時候直接跳出循環(huán)。 break } // 判斷是否已經(jīng)過時,是的話close掉了然后繼續(xù)取出。 if p.isStaleConn(cn) { _ = p.CloseConn(cn) continue } atomic.AddUint32(&p.stats.Hits, 1) return cn, nil } atomic.AddUint32(&p.stats.Misses, 1) // 如果沒有空閑連接的話,這邊就直接新建連接了。 newcn, err := p.newConn(ctx, true) if err != nil { // 歸還令牌。 p.freeTurn() return nil, err } return newcn, nil }
我們可以試著回答開頭那個問題:連接到底存在哪里?答案是從cn := p.popIdle()這句話可以看出,獲取連接這個動作,是從idleConns里面獲取的,而里面的函數(shù)也證明了這一點。但是,真的是這樣的嘛?我們后面再看看。
同時我的理解是:
sql的排隊意味著我對連接池申請連接后,把自己的編號告訴連接池。連接那邊一看到有空閑了,就叫我的號。我答應(yīng)了一聲,然后連接池就直接給個連接給我。我如果不歸還,連接池就一直不叫下一個號。
redis這邊的意思是,我去和連接池申請的不是連接而是令牌。我就一直排隊等著,連接池給我令牌了,我才去倉庫里面找空閑連接或者自己新建一個連接。用完了連接除了歸還連接外,還得歸還令牌。當(dāng)然了,如果我自己新建連接出錯了,我哪怕拿不到連接回家,我也得把令牌給回連接池,不然連接池的令牌數(shù)少了,最大連接數(shù)也會變小。
而:
func (p *ConnPool) freeTurn() { <-p.queue } func (p *ConnPool) waitTurn(ctx context.Context) error { ... case p.queue <- struct{}{}: return nil ... }
就是在靠queue這個chan來維持令牌數(shù)量。
那么conns的作用是什么呢?我們可以來看看新建連接這個函數(shù):
新建連接
func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) { cn, err := p.dialConn(ctx, pooled) if err != nil { return nil, err } p.connsMu.Lock() p.conns = append(p.conns, cn) if pooled { // 如果連接池滿了,會在后面移除。 if p.poolSize >= p.opt.PoolSize { cn.pooled = false } else { p.poolSize++ } } p.connsMu.Unlock() return cn, nil }
基本邏輯出來了。就是如果新建連接的話,我并不會直接放在idleConns里面,而是先放conns里面。同時先看池子滿了沒有。滿的話后面歸還的時候會標(biāo)記,后面會刪除。那么這個后面會刪除,指的是什么時候呢?那就是下面說的歸還連接的時候了。
歸還連接
func (p *ConnPool) Put(cn *Conn) { if cn.rd.Buffered() > 0 { internal.Logger.Printf("Conn has unread data") p.Remove(cn, BadConnError{}) return } //這就是我們剛剛說的后面了,前面標(biāo)記過不要入池的,這邊就刪除了。當(dāng)然了,里面也會進(jìn)行freeTurn操作。 if !cn.pooled { // 這個方法就是前面的標(biāo)志位,判斷里面可以知道,前面標(biāo)志不要池化的,這里會將它刪除。 p.Remove(cn, nil) return } p.connsMu.Lock() p.idleConns = append(p.idleConns, cn) p.idleConnsLen++ p.connsMu.Unlock() //我們可以看到很明顯的這個歸還號碼牌的動作。 p.freeTurn() }
答案就是,所有的連接其實是存放在conns這個切片里面。如果這個連接是空閑等待的狀態(tài)的話,那就在idleConns里面加一個自己的指針!
其實歸還的過程,就是檢查一下我打算還的這個連接,是不是超售的產(chǎn)物,如果是就沒必要池化了,直接刪除就可以了。不是的話,就是把連接自身(一個指針)在idleConns也append一下。
等等,上面的邏輯似乎有點不對?我們來理一下獲取連接流程:
先waitTurn,拿到令牌。而令牌數(shù)量是根據(jù)pool里面的queue決定的。
拿到令牌了,去庫房idleConns里面拿空閑的連接。沒有的話就自己newConn一個,并且把他記錄到conns里面。
用完了,就調(diào)用put歸還:也就是從conns添加這個連接的指針到idleConns。歸還的時候就檢查在newConn時候是不是已經(jīng)做了超賣標(biāo)記了。是的話就不轉(zhuǎn)移到idleConns。
我當(dāng)時疑惑了好久,既然始終都需要獲得令牌才能得到連接,令牌數(shù)量是定的。為什么還會超賣呢?翻了一下源碼,我的答案是:
雖然Get方法獲取連接是newConn這個私用方法,受到令牌管制導(dǎo)致不會出現(xiàn)超賣。但是這個方法接受傳參:pooled bool。所以我猜是擔(dān)心其他人調(diào)用這個方法時候,不管三七二十一就傳了true,導(dǎo)致poolSize越來越大。
總的來說,redis這個連接池的連接數(shù)控制,還是在queue這個我稱為令牌的chan進(jìn)行操作。
上面可以看到,連接池的最基本的保證,就是獲取連接時候的線程安全。但是在實現(xiàn)諸多額外特性時候卻又從不同角度來實現(xiàn)。還是非常有意思的。但是不管存儲結(jié)構(gòu)是用chan還是還是slice,都可以很好的實現(xiàn)這一點。如果像sql或者redis那樣用slice來存儲連接,就得維護(hù)一個結(jié)構(gòu)來表示排隊等候的效果。
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“Golang如何實現(xiàn)連接池”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。