您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)利用python怎么實(shí)現(xiàn)一個(gè)HTTP連接池,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對相關(guān)知識(shí)有一定的了解。
首先, HTTP連接是基于TCP連接的, 與服務(wù)器之間進(jìn)行HTTP通信, 本質(zhì)就是與服務(wù)器之間建立了TCP連接后, 相互收發(fā)基于HTTP協(xié)議的數(shù)據(jù)包. 因此, 如果我們需要頻繁地去請求某個(gè)服務(wù)器的資源, 我們就可以一直維持與個(gè)服務(wù)器的TCP連接不斷開, 然后在需要請求資源的時(shí)候, 把連接拿出來用就行了.
一個(gè)項(xiàng)目可能需要與服務(wù)器之間同時(shí)保持多個(gè)連接, 比如一個(gè)爬蟲項(xiàng)目, 有的線程需要請求服務(wù)器的網(wǎng)頁資源, 有的線程需要請求服務(wù)器的圖片等資源, 而這些請求都可以建立在同一條TCP連接上.
因此, 我們使用一個(gè)管理器來對這些連接進(jìn)行管理, 任何程序需要使用這些連接時(shí), 向管理器申請就可以了, 等到用完之后再將連接返回給管理器, 以供其他程序重復(fù)使用, 這個(gè)管理器就是連接池.
基于上一章的分析, 連接池應(yīng)該是一個(gè)收納連接的容器, 同時(shí)對這些連接有管理能力:
class HTTPConnectionPool: def __init__(self, host: str, port: int = None, max_size: int = None, idle_timeout: int = None) -> None: """ :param host: pass :param port: pass :param max_size: 同時(shí)存在的最大連接數(shù), 默認(rèn)None->連接數(shù)無限,沒了就創(chuàng)建 :param idle_timeout: 單個(gè)連接單次最長空閑時(shí)間,超時(shí)自動(dòng)關(guān)閉,默認(rèn)None->不限時(shí) """ self.host = host self.port = port self.max_size = max_size self.idle_timeout = idle_timeout self._lock = threading.Condition() self._pool = [] # 這里的conn_num指的是總連接數(shù),包括其它線程拿出去正在使用的連接 self.conn_num = 0 self.is_closed = False def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection: ... def release(self, conn: WrapperHTTPConnection) -> None: ...
因此, 我們定義這樣一個(gè)HTTPConnectionPool類, 使用一個(gè)列表來保存可用的連接. 對于外部來說, 只需要調(diào)用這個(gè)連接池對象的acquire和release方法就能取得和釋放連接.
對于線程池內(nèi)部來說, 至少需要三個(gè)關(guān)于連接的操作: 從連接池中取得連接, 將連接放回連接池, 以及創(chuàng)建一個(gè)連接:
def _get_connection(self) -> WrapperHTTPConnection: # 這個(gè)方法會(huì)把連接從_idle_conn移動(dòng)到_used_conn列表中,并返回這個(gè)連接 try: return self._pool.pop() except IndexError: raise EmptyPoolError def _put_connection(self, conn: WrapperHTTPConnection) -> None: self._pool.append(conn) def _create_connection(self) -> WrapperHTTPConnection: self.conn_num += 1 return WrapperHTTPConnection(self, HTTPConnection(self.host, self.port))
對于連接池外部來說, 主要有申請連接和釋放連接這兩個(gè)操作, 實(shí)際上這就是個(gè)簡單的生產(chǎn)者消費(fèi)者模型. 考慮到外部可能是多線程的環(huán)境, 我們使用threading.Condition來保證線程安全. 關(guān)于Condition的資料可以看這里.
def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection: if self.is_closed: raise ConnectionPoolClosed with self._lock: if self.max_size is None or not self.is_full(): # 在還能創(chuàng)建新連接的情況下,如果沒有空閑連接,直接創(chuàng)建一個(gè)就行了 if self.is_pool_empty(): self._put_connection(self._create_connection()) else: # 不能創(chuàng)建新連接的情況下,如果設(shè)置了blocking=False,沒連接就報(bào)錯(cuò) # 否則,就基于timeout進(jìn)行阻塞,直到超時(shí)或者有可用連接為止 if not blocking: if self.is_pool_empty(): raise EmptyPoolError elif timeout is None: while self.is_pool_empty(): self._lock.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: end_time = time.time() + timeout while self.is_pool_empty(): remaining = end_time - time.time() if remaining <= 0: raise EmptyPoolError self._lock.wait(remaining) # 走到這一步了,池子里一定有空閑連接 return self._get_connection() def release(self, conn: WrapperHTTPConnection) -> None: if self.is_closed: # 如果這個(gè)連接是在連接池關(guān)閉后才釋放的,那就不用回連接池了,直接放生 conn.close() return # 實(shí)際上,python列表的append操作是線程安全的,可以不加鎖 # 這里調(diào)用鎖是為了通過notify方法通知其它正在wait的線程:現(xiàn)在有連接可用了 with self._lock: if not conn.is_available: # 如果這個(gè)連接不可用了,就應(yīng)該創(chuàng)建一個(gè)新連接放進(jìn)去,因?yàn)榭赡苓€有其它線程在等著連接用 conn.close() self.conn_num -= 1 conn = self._create_connection() self._put_connection(conn) self._lock.notify()
我們首先看看acquire方法, 這個(gè)方法其實(shí)就是在申請到鎖之后調(diào)用內(nèi)部的_get_connection方法獲取連接, 這樣就線程安全了. 需要注意的是, 如果當(dāng)前的條件無法獲取連接, 就會(huì)調(diào)用條件變量的wait方法, 及時(shí)釋放鎖并阻塞住當(dāng)前線程. 然后, 當(dāng)其它線程作為生產(chǎn)者調(diào)用release方法釋放連接時(shí), 會(huì)觸發(fā)條件變量的notify方法, 從而喚醒一個(gè)阻塞在wait階段的線程, 即消費(fèi)者. 這個(gè)消費(fèi)者再從池中取出剛放回去的線程, 這樣整個(gè)生產(chǎn)者消費(fèi)者模型就運(yùn)轉(zhuǎn)起來了.
對于一個(gè)程序來說, 它使用連接池的形式是獲取連接->使用連接->釋放連接. 因此, 我們應(yīng)該通過with語句來管理這個(gè)連接, 以免在程序的最后遺漏掉釋放連接這一步驟.
基于這個(gè)原因, 我們通過一個(gè)WrapperHTTPConnection類來對HTTPConnection進(jìn)行封裝, 以實(shí)現(xiàn)上下文管理器的功能. HTTPConnection的代碼可以看《用python實(shí)現(xiàn)一個(gè)HTTP客戶端》這篇文章.
class WrapperHTTPConnection: def __init__(self, pool: 'HTTPConnectionPool', conn: HTTPConnection) -> None: self.pool = pool self.conn = conn self.response = None self.is_available = True def __enter__(self) -> 'WrapperHTTPConnection': return self def __exit__(self, *exit_info: Any) -> None: # 如果response沒讀完并且連接需要復(fù)用,就棄用這個(gè)連接 if not self.response.will_close and not self.response.is_closed(): self.close() self.pool.release(self) def request(self, *args: Any, **kwargs: Any) -> HTTPResponse: self.conn.request(*args, **kwargs) self.response = self.conn.get_response() return self.response def close(self) -> None: self.conn.close() self.is_available = False
同樣的, 連接池可能也需要關(guān)閉, 因此我們給連接池也加上上下文管理器的功能:
class HTTPConnectionPool: ... def close(self) -> None: if self.is_closed: return self.is_closed = True pool, self._pool = self._pool, None for conn in pool: conn.close() def __enter__(self) -> 'HTTPConnectionPool': return self def __exit__(self, *exit_info: Any) -> None: self.close()
這樣, 我們就可以通過with語句優(yōu)雅地管理連接池了:
with HTTPConnectionPool(**kwargs) as pool: with pool.acquire() as conn: res = conn.request('GET', '/') ...
如果一個(gè)連接池的所需連接數(shù)是隨時(shí)間變化的, 那么就會(huì)出現(xiàn)一種情況: 在高峰期, 我們創(chuàng)建了非常多的連接, 然后進(jìn)入低谷期之后, 連接過剩, 大量的連接處于空閑狀態(tài), 浪費(fèi)資源. 因此, 我們可以設(shè)置一個(gè)定時(shí)任務(wù), 定期清理空閑時(shí)間過長的連接, 減少連接池的資源占用.
首先, 我們需要為連接對象添加一個(gè)last_time屬性, 每當(dāng)連接釋放進(jìn)入連接池后, 就修改這個(gè)屬性的值為當(dāng)前時(shí)間, 這樣我們就能明確知道, 連接池內(nèi)的每個(gè)空閑連接空閑了多久:
class WrapperHTTPConnection: ... def __init__(self, pool: 'HTTPConnectionPool', conn: HTTPConnection) -> None: ... self.last_time = None class HTTPConnectionPool: ... def _put_connection(self, conn: WrapperHTTPConnection) -> None: conn.last_time = time.time() self._pool.append(conn)
然后, 我們通過threading.Timer來實(shí)現(xiàn)一個(gè)定時(shí)任務(wù):
def start_clear_conn(self) -> None: if self.idle_timeout is None: # 如果空閑連接的超時(shí)時(shí)間為無限,那么就不應(yīng)該清理連接 return self.clear_idle_conn() self._clearer = threading.Timer(self.idle_timeout, self.start_clear_conn) self._clearer.start() def stop_clear_conn(self) -> None: if self._clearer is not None: self._clearer.cancel()
threading.Timer只會(huì)執(zhí)行一次定時(shí)任務(wù), 因此, 我們需要在start_clear_conn中不斷地把自己設(shè)置為定時(shí)任務(wù). 這其實(shí)等同于新開了一個(gè)線程來執(zhí)行start_clear_conn方法, 因此并不會(huì)出現(xiàn)遞歸過深問題. 不過需要注意的是, threading.Timer雖然不會(huì)阻塞當(dāng)前線程, 但是卻會(huì)阻止當(dāng)前線程結(jié)束, 就算把它設(shè)置為守護(hù)線程都不行, 唯一可行的辦法就是調(diào)用stop_clear_conn方法取消這個(gè)定時(shí)任務(wù).
最后, 我們定義clear_idle_conn方法來清理閑置時(shí)間超時(shí)的連接:
def clear_idle_conn(self) -> None: if self.is_closed: raise ConnectionPoolClosed # 這里開一個(gè)新線程來清理空閑連接,避免了阻塞主線程導(dǎo)致的定時(shí)精度出錯(cuò) threading.Thread(target=self._clear_idle_conn).start() def _clear_idle_conn(self) -> None: if not self._lock.acquire(timeout=self.idle_timeout): # 因?yàn)槭敲扛魋elf.idle_timeout秒檢查一次 # 如果過了self.idle_timeout秒還沒申請到鎖,下一次都開始了,本次也就不用繼續(xù)了 return current_time = time.time() if self.is_pool_empty(): pass elif current_time - self._pool[-1].last_time >= self.idle_timeout: # 這里處理下面的二分法沒法處理的邊界情況,即所有連接都閑置超時(shí)的情況 self.conn_num -= len(self._pool) self._pool.clear() else: # 通過二分法找出從左往右第一個(gè)不超時(shí)的連接的指針 left, right = 0, len(self._pool) - 1 while left < right: mid = (left + right) // 2 if current_time - self._pool[mid].last_time >= self.idle_timeout: left = mid + 1 else: right = mid self._pool = self._pool[left:] self.conn_num -= left self._lock.release()
由于我們獲取和釋放連接都是從self._pool的尾部開始操作的, 因此self._pool這個(gè)容器是一個(gè)先進(jìn)后出隊(duì)列, 它里面放著的連接, 一定是越靠近頭部的閑置時(shí)間越長, 從頭到尾閑置時(shí)間依次遞減. 基于這個(gè)原因, 我們使用二分法來找出列表中第一個(gè)沒有閑置超時(shí)的連接, 然后把在它之前的連接一次性刪除, 這樣就能達(dá)到O(logN)的時(shí)間復(fù)雜度, 算是一種比較高效的方法. 需要注意的是, 如果連接池內(nèi)所有的連接都是超時(shí)的, 那么這種方法是刪不干凈的, 需要對這種邊界情況單獨(dú)處理.
這個(gè)連接池的完整代碼如下:
import threading import time from typing import Any from client import HTTPConnection, HTTPResponse class WrapperHTTPConnection: def __init__(self, pool: 'HTTPConnectionPool', conn: HTTPConnection) -> None: self.pool = pool self.conn = conn self.response = None self.last_time = time.time() self.is_available = True def __enter__(self) -> 'WrapperHTTPConnection': return self def __exit__(self, *exit_info: Any) -> None: # 如果response沒讀完并且連接需要復(fù)用,就棄用這個(gè)連接 if not self.response.will_close and not self.response.is_closed(): self.close() self.pool.release(self) def request(self, *args: Any, **kwargs: Any) -> HTTPResponse: self.conn.request(*args, **kwargs) self.response = self.conn.get_response() return self.response def close(self) -> None: self.conn.close() self.is_available = False class HTTPConnectionPool: def __init__(self, host: str, port: int = None, max_size: int = None, idle_timeout: int = None) -> None: """ :param host: pass :param port: pass :param max_size: 同時(shí)存在的最大連接數(shù), 默認(rèn)None->連接數(shù)無限,沒了就創(chuàng)建 :param idle_timeout: 單個(gè)連接單次最長空閑時(shí)間,超時(shí)自動(dòng)關(guān)閉,默認(rèn)None->不限時(shí) """ self.host = host self.port = port self.max_size = max_size self.idle_timeout = idle_timeout self._lock = threading.Condition() self._pool = [] # 這里的conn_num指的是總連接數(shù),包括其它線程拿出去正在使用的連接 self.conn_num = 0 self.is_closed = False self._clearer = None self.start_clear_conn() def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection: if self.is_closed: raise ConnectionPoolClosed with self._lock: if self.max_size is None or not self.is_full(): # 在還能創(chuàng)建新連接的情況下,如果沒有空閑連接,直接創(chuàng)建一個(gè)就行了 if self.is_pool_empty(): self._put_connection(self._create_connection()) else: # 不能創(chuàng)建新連接的情況下,如果設(shè)置了blocking=False,沒連接就報(bào)錯(cuò) # 否則,就基于timeout進(jìn)行阻塞,直到超時(shí)或者有可用連接為止 if not blocking: if self.is_pool_empty(): raise EmptyPoolError elif timeout is None: while self.is_pool_empty(): self._lock.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: end_time = time.time() + timeout while self.is_pool_empty(): remaining = end_time - time.time() if remaining <= 0: raise EmptyPoolError self._lock.wait(remaining) # 走到這一步了,池子里一定有空閑連接 return self._get_connection() def release(self, conn: WrapperHTTPConnection) -> None: if self.is_closed: # 如果這個(gè)連接是在連接池關(guān)閉后才釋放的,那就不用回連接池了,直接放生 conn.close() return # 實(shí)際上,python列表的append操作是線程安全的,可以不加鎖 # 這里調(diào)用鎖是為了通過notify方法通知其它正在wait的線程:現(xiàn)在有連接可用了 with self._lock: if not conn.is_available: # 如果這個(gè)連接不可用了,就應(yīng)該創(chuàng)建一個(gè)新連接放進(jìn)去,因?yàn)榭赡苓€有其它線程在等著連接用 conn.close() self.conn_num -= 1 conn = self._create_connection() self._put_connection(conn) self._lock.notify() def _get_connection(self) -> WrapperHTTPConnection: # 這個(gè)方法會(huì)把連接從_idle_conn移動(dòng)到_used_conn列表中,并返回這個(gè)連接 try: return self._pool.pop() except IndexError: raise EmptyPoolError def _put_connection(self, conn: WrapperHTTPConnection) -> None: conn.last_time = time.time() self._pool.append(conn) def _create_connection(self) -> WrapperHTTPConnection: self.conn_num += 1 return WrapperHTTPConnection(self, HTTPConnection(self.host, self.port)) def is_pool_empty(self) -> bool: # 這里指的是,空閑可用的連接是否為空 return len(self._pool) == 0 def is_full(self) -> bool: if self.max_size is None: return False return self.conn_num >= self.max_size def close(self) -> None: if self.is_closed: return self.is_closed = True self.stop_clear_conn() pool, self._pool = self._pool, None for conn in pool: conn.close() def clear_idle_conn(self) -> None: if self.is_closed: raise ConnectionPoolClosed # 這里開一個(gè)新線程來清理空閑連接,避免了阻塞主線程導(dǎo)致的定時(shí)精度出錯(cuò) threading.Thread(target=self._clear_idle_conn).start() def _clear_idle_conn(self) -> None: if not self._lock.acquire(timeout=self.idle_timeout): # 因?yàn)槭敲扛魋elf.idle_timeout秒檢查一次 # 如果過了self.idle_timeout秒還沒申請到鎖,下一次都開始了,本次也就不用繼續(xù)了 return current_time = time.time() if self.is_pool_empty(): pass elif current_time - self._pool[-1].last_time >= self.idle_timeout: # 這里處理下面的二分法沒法處理的邊界情況,即所有連接都閑置超時(shí)的情況 self.conn_num -= len(self._pool) self._pool.clear() else: # 通過二分法找出從左往右第一個(gè)不超時(shí)的連接的指針 left, right = 0, len(self._pool) - 1 while left < right: mid = (left + right) // 2 if current_time - self._pool[mid].last_time >= self.idle_timeout: left = mid + 1 else: right = mid self._pool = self._pool[left:] self.conn_num -= left self._lock.release() def start_clear_conn(self) -> None: if self.idle_timeout is None: # 如果空閑連接的超時(shí)時(shí)間為無限,那么就不應(yīng)該清理連接 return self.clear_idle_conn() self._clearer = threading.Timer(self.idle_timeout, self.start_clear_conn) self._clearer.start() def stop_clear_conn(self) -> None: if self._clearer is not None: self._clearer.cancel() def __enter__(self) -> 'HTTPConnectionPool': return self def __exit__(self, *exit_info: Any) -> None: self.close() class EmptyPoolError(Exception): pass class ConnectionPoolClosed(Exception): pass
首先, 這個(gè)連接池的核心就是對連接進(jìn)行管理, 而這包含取出連接和釋放連接兩個(gè)過程. 因此這東西的本質(zhì)就是一個(gè)生產(chǎn)者消費(fèi)者模型, 取出線程時(shí)是消費(fèi)者, 放入線程時(shí)是生產(chǎn)者, 使用threading自帶的Condition對象就能完美解決線程安全問題, 使二者協(xié)同合作.
解決獲取連接和釋放連接這個(gè)問題之后, 其實(shí)這個(gè)連接池就已經(jīng)能用了. 但是如果涉及到更多細(xì)節(jié)方面的東西, 比如判斷連接是否可用, 自動(dòng)釋放連接, 清理閑置連接等等, 就需要對這個(gè)連接進(jìn)行封裝, 為它添加更多的屬性和方法, 這就引入了WrapperHTTPConnection這個(gè)類. 實(shí)現(xiàn)它的__enter___和__exit__方法之后, 就能使用上下文管理器來自動(dòng)釋放連接. 至于清理閑置連接, 通過last_time屬性記錄每個(gè)連接的最后釋放時(shí)間, 然后在連接池中添加一個(gè)定時(shí)任務(wù)就行了.
關(guān)于利用python怎么實(shí)現(xiàn)一個(gè)HTTP連接池就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。