溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Python 使用threading+Queue實(shí)現(xiàn)線程池示例

發(fā)布時(shí)間:2020-08-21 03:15:36 來(lái)源:腳本之家 閱讀:570 作者:lx59 欄目:開(kāi)發(fā)技術(shù)

一、線程池

1、為什么需要使用線程池

1.1 創(chuàng)建/銷毀線程伴隨著系統(tǒng)開(kāi)銷,過(guò)于頻繁的創(chuàng)建/銷毀線程,會(huì)很大程度上影響處理效率。

記創(chuàng)建線程消耗時(shí)間T1,執(zhí)行任務(wù)消耗時(shí)間T2,銷毀線程消耗時(shí)間T3,如果T1+T3>T2,那說(shuō)明開(kāi)啟一個(gè)線程來(lái)執(zhí)行這個(gè)任務(wù)太不劃算了!在線程池緩存線程可用已有的閑置線程來(lái)執(zhí)行新任務(wù),避免了創(chuàng)建/銷毀帶來(lái)的系統(tǒng)開(kāi)銷。

1.2 線程并發(fā)數(shù)量過(guò)多,搶占系統(tǒng)資源從而導(dǎo)致阻塞。

線程能共享系統(tǒng)資源,如果同時(shí)執(zhí)行的線程過(guò)多,就有可能導(dǎo)致系統(tǒng)資源不足而產(chǎn)生阻塞的情況。

1.3 對(duì)線程進(jìn)行一些簡(jiǎn)單的管理。

比如:延時(shí)執(zhí)行、定時(shí)循環(huán)執(zhí)行的策略等,運(yùn)用線程池都能進(jìn)行很好的實(shí)現(xiàn)。

2、Python中建立線程池的方法

2.1 使用threadpool模塊,這是個(gè)python的第三方模塊,支持python2和python3

2.2 使用concurrent.futures模塊,這個(gè)模塊是python3中自帶的模塊,python2.7以上版本也可以安裝使用

2.3 自己構(gòu)建一個(gè)線程池

二、隊(duì)列(queue)

Queue模塊提供的隊(duì)列(FIFO)適用于多線程編程,在生產(chǎn)者(producer)和消費(fèi)者(consumer)之間線程安全(thread-safe)地傳遞消息或其它數(shù)據(jù),因此多個(gè)線程可以共用同一個(gè)Queue實(shí)例。常用方法:

Queue.qsize():返回queue的大小。

Queue.empty():判斷隊(duì)列是否為空,通常不太靠譜。

Queue.full():判斷是否滿了。

Queue.put(item, block=True, timeout=None): 往隊(duì)列里放數(shù)據(jù)。

Queue.put_nowait(item):往隊(duì)列里存放元素,不等待

Queue.get(item, block=True, timeout=None): 從隊(duì)列里取數(shù)據(jù)。

Queue.get_nowait(item):從隊(duì)列里取元素,不等待

Queue.task_done():表示隊(duì)列中某個(gè)元素是否的使用情況,使用結(jié)束會(huì)發(fā)送信息。

Queue.join():一直阻塞直到隊(duì)列中的所有元素都執(zhí)行完畢。

三、使用threading+Queue處理多任務(wù)

假設(shè)有十個(gè)任務(wù)需要處理,打算在后臺(tái)開(kāi)啟五個(gè)線程,簡(jiǎn)化后的模型

import Queue
import threading
import time
 
queue = Queue.Queue()
 
class ThreadNum(threading.Thread):
  def __init__(self, queue):
    threading.Thread.__init__(self)
    self.queue = queue
 
  def run(self):
    while True:
      #消費(fèi)者端,從隊(duì)列中獲取num
      num = self.queue.get()
      print("Retrieved", num)
      time.sleep(1) 
      #在完成這項(xiàng)工作之后,使用 queue.task_done() 函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào)
      self.queue.task_done()
    
    print("Consumer Finished")
 
def main():
  #產(chǎn)生一個(gè) threads pool, 并把消息傳遞給thread函數(shù)進(jìn)行處理,這里開(kāi)啟10個(gè)并發(fā)
  for i in range(5):
    t = ThreadNum(queue)
    t.setDaemon(True)
    t.start()
  
  #往隊(duì)列中填數(shù)據(jù) 
  for num in range(10):
    queue.put(num)
    #wait on the queue until everything has been processed
  
  queue.join()
   
if __name__ == '__main__':
  main()
  time.sleep(500)

輸出為:

('Retrieved', 0)
 ('Retrieved', 1)('Retrieved', 2)
('Retrieved', 3)
('Retrieved', 4)
('Retrieved', 5)('Retrieved', 6)
('Retrieved', 7)
('Retrieved', 8)
 ('Retrieved', 9)

具體工作步驟描述如下:

1、創(chuàng)建一個(gè) Queue.Queue() 的實(shí)例,然后使用數(shù)據(jù)對(duì)它進(jìn)行填充。

2、將經(jīng)過(guò)填充數(shù)據(jù)的實(shí)例傳遞給線程類,后者是通過(guò)繼承 threading.Thread 的方式創(chuàng)建的。

3、生成守護(hù)線程池。

4、每次從隊(duì)列中取出一個(gè)項(xiàng)目,并使用該線程中的數(shù)據(jù)和 run 方法以執(zhí)行相應(yīng)的工作。

5、在完成這項(xiàng)工作之后,使用 queue.task_done() 函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào)。

6、對(duì)隊(duì)列執(zhí)行 join 操作,實(shí)際上意味著等到隊(duì)列為空,再退出主程序。

在使用這個(gè)模式時(shí)需要注意一點(diǎn):通過(guò)將守護(hù)線程設(shè)置為 true,程序運(yùn)行完自動(dòng)退出。好處是在退出之前,可以對(duì)隊(duì)列執(zhí)行 join 操作、或者等到隊(duì)列為空。

注意運(yùn)行main函數(shù)后繼續(xù)執(zhí)行time.sleep(500),可以觀察到主線程未結(jié)束的情況下ThreadNum(queue)生成的線程還在運(yùn)行。如果需要停止線程的話可以對(duì)以上代碼加以修改。

import Queue
import threading
import time
 
queue = Queue.Queue()
 
class ThreadNum(threading.Thread):
  """沒(méi)打印一個(gè)數(shù)字等待1秒,并發(fā)打印10個(gè)數(shù)字需要多少秒?"""
  def __init__(self, queue):
    threading.Thread.__init__(self)
    self.queue = queue
 
  def run(self):
    done = False
    while not done:
      #消費(fèi)者端,從隊(duì)列中獲取num
      num = self.queue.get()
      if num is None:
        done = True
      else:
        print("Retrieved", num)
      time.sleep(1) 
      #在完成這項(xiàng)工作之后,使用 queue.task_done() 函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào)
      self.queue.task_done()
    
    print("Consumer Finished")
def main():
  #產(chǎn)生一個(gè) threads pool, 并把消息傳遞給thread函數(shù)進(jìn)行處理,這里開(kāi)啟10個(gè)并發(fā)
  for i in range(5):
    t = ThreadNum(queue)
    t.setDaemon(True)
    t.start()
  
  #往隊(duì)列中填錯(cuò)數(shù)據(jù) 
  for num in range(10):
    queue.put(num)
  
  queue.join()
  time.sleep(100)
  for i in range(10):
    queue.put(None)
    print('None')
  time.sleep(200)
   
if __name__ == '__main__':
  start = time.time()
  main()
  print"Elapsed Time: %s" % (time.time() - start)

main函數(shù)執(zhí)行完后隊(duì)列向線程發(fā)送None消息,觸發(fā)線程的停止標(biāo)識(shí),這樣就可以動(dòng)態(tài)管理線程池了。

以上這篇Python 使用threading+Queue實(shí)現(xiàn)線程池示例就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持億速云。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI