您好,登錄后才能下訂單哦!
一、線程池
1、為什么需要使用線程池
1.1 創(chuàng)建/銷毀線程伴隨著系統(tǒng)開(kāi)銷,過(guò)于頻繁的創(chuàng)建/銷毀線程,會(huì)很大程度上影響處理效率。
記創(chuàng)建線程消耗時(shí)間T1,執(zhí)行任務(wù)消耗時(shí)間T2,銷毀線程消耗時(shí)間T3,如果T1+T3>T2,那說(shuō)明開(kāi)啟一個(gè)線程來(lái)執(zhí)行這個(gè)任務(wù)太不劃算了!在線程池緩存線程可用已有的閑置線程來(lái)執(zhí)行新任務(wù),避免了創(chuàng)建/銷毀帶來(lái)的系統(tǒng)開(kāi)銷。
1.2 線程并發(fā)數(shù)量過(guò)多,搶占系統(tǒng)資源從而導(dǎo)致阻塞。
線程能共享系統(tǒng)資源,如果同時(shí)執(zhí)行的線程過(guò)多,就有可能導(dǎo)致系統(tǒng)資源不足而產(chǎn)生阻塞的情況。
1.3 對(duì)線程進(jìn)行一些簡(jiǎn)單的管理。
比如:延時(shí)執(zhí)行、定時(shí)循環(huán)執(zhí)行的策略等,運(yùn)用線程池都能進(jìn)行很好的實(shí)現(xiàn)。
2、Python中建立線程池的方法
2.1 使用threadpool模塊,這是個(gè)python的第三方模塊,支持python2和python3
2.2 使用concurrent.futures模塊,這個(gè)模塊是python3中自帶的模塊,python2.7以上版本也可以安裝使用
2.3 自己構(gòu)建一個(gè)線程池
二、隊(duì)列(queue)
Queue模塊提供的隊(duì)列(FIFO)適用于多線程編程,在生產(chǎn)者(producer)和消費(fèi)者(consumer)之間線程安全(thread-safe)地傳遞消息或其它數(shù)據(jù),因此多個(gè)線程可以共用同一個(gè)Queue實(shí)例。常用方法:
Queue.qsize():返回queue的大小。
Queue.empty():判斷隊(duì)列是否為空,通常不太靠譜。
Queue.full():判斷是否滿了。
Queue.put(item, block=True, timeout=None): 往隊(duì)列里放數(shù)據(jù)。
Queue.put_nowait(item):往隊(duì)列里存放元素,不等待
Queue.get(item, block=True, timeout=None): 從隊(duì)列里取數(shù)據(jù)。
Queue.get_nowait(item):從隊(duì)列里取元素,不等待
Queue.task_done():表示隊(duì)列中某個(gè)元素是否的使用情況,使用結(jié)束會(huì)發(fā)送信息。
Queue.join():一直阻塞直到隊(duì)列中的所有元素都執(zhí)行完畢。
三、使用threading+Queue處理多任務(wù)
假設(shè)有十個(gè)任務(wù)需要處理,打算在后臺(tái)開(kāi)啟五個(gè)線程,簡(jiǎn)化后的模型
import Queue import threading import time queue = Queue.Queue() class ThreadNum(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): while True: #消費(fèi)者端,從隊(duì)列中獲取num num = self.queue.get() print("Retrieved", num) time.sleep(1) #在完成這項(xiàng)工作之后,使用 queue.task_done() 函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào) self.queue.task_done() print("Consumer Finished") def main(): #產(chǎn)生一個(gè) threads pool, 并把消息傳遞給thread函數(shù)進(jìn)行處理,這里開(kāi)啟10個(gè)并發(fā) for i in range(5): t = ThreadNum(queue) t.setDaemon(True) t.start() #往隊(duì)列中填數(shù)據(jù) for num in range(10): queue.put(num) #wait on the queue until everything has been processed queue.join() if __name__ == '__main__': main() time.sleep(500)
輸出為:
('Retrieved', 0) ('Retrieved', 1)('Retrieved', 2) ('Retrieved', 3) ('Retrieved', 4) ('Retrieved', 5)('Retrieved', 6) ('Retrieved', 7) ('Retrieved', 8) ('Retrieved', 9)
具體工作步驟描述如下:
1、創(chuàng)建一個(gè) Queue.Queue() 的實(shí)例,然后使用數(shù)據(jù)對(duì)它進(jìn)行填充。
2、將經(jīng)過(guò)填充數(shù)據(jù)的實(shí)例傳遞給線程類,后者是通過(guò)繼承 threading.Thread 的方式創(chuàng)建的。
3、生成守護(hù)線程池。
4、每次從隊(duì)列中取出一個(gè)項(xiàng)目,并使用該線程中的數(shù)據(jù)和 run 方法以執(zhí)行相應(yīng)的工作。
5、在完成這項(xiàng)工作之后,使用 queue.task_done() 函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào)。
6、對(duì)隊(duì)列執(zhí)行 join 操作,實(shí)際上意味著等到隊(duì)列為空,再退出主程序。
在使用這個(gè)模式時(shí)需要注意一點(diǎn):通過(guò)將守護(hù)線程設(shè)置為 true,程序運(yùn)行完自動(dòng)退出。好處是在退出之前,可以對(duì)隊(duì)列執(zhí)行 join 操作、或者等到隊(duì)列為空。
注意運(yùn)行main函數(shù)后繼續(xù)執(zhí)行time.sleep(500),可以觀察到主線程未結(jié)束的情況下ThreadNum(queue)生成的線程還在運(yùn)行。如果需要停止線程的話可以對(duì)以上代碼加以修改。
import Queue import threading import time queue = Queue.Queue() class ThreadNum(threading.Thread): """沒(méi)打印一個(gè)數(shù)字等待1秒,并發(fā)打印10個(gè)數(shù)字需要多少秒?""" def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): done = False while not done: #消費(fèi)者端,從隊(duì)列中獲取num num = self.queue.get() if num is None: done = True else: print("Retrieved", num) time.sleep(1) #在完成這項(xiàng)工作之后,使用 queue.task_done() 函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào) self.queue.task_done() print("Consumer Finished") def main(): #產(chǎn)生一個(gè) threads pool, 并把消息傳遞給thread函數(shù)進(jìn)行處理,這里開(kāi)啟10個(gè)并發(fā) for i in range(5): t = ThreadNum(queue) t.setDaemon(True) t.start() #往隊(duì)列中填錯(cuò)數(shù)據(jù) for num in range(10): queue.put(num) queue.join() time.sleep(100) for i in range(10): queue.put(None) print('None') time.sleep(200) if __name__ == '__main__': start = time.time() main() print"Elapsed Time: %s" % (time.time() - start)
main函數(shù)執(zhí)行完后隊(duì)列向線程發(fā)送None消息,觸發(fā)線程的停止標(biāo)識(shí),這樣就可以動(dòng)態(tài)管理線程池了。
以上這篇Python 使用threading+Queue實(shí)現(xiàn)線程池示例就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持億速云。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。