溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

用python實(shí)現(xiàn)的線程池實(shí)例代碼

發(fā)布時(shí)間:2020-10-05 17:08:52 來(lái)源:腳本之家 閱讀:144 作者:Orisun 欄目:開發(fā)技術(shù)

python3標(biāo)準(zhǔn)庫(kù)里自帶線程池ThreadPoolExecutor和進(jìn)程池ProcessPoolExecutor。

如果你用的是python2,那可以下載一個(gè)模塊,叫threadpool,這是線程池。對(duì)于進(jìn)程池可以使用python自帶的multiprocessing.Pool。

當(dāng)然也可以自己寫一個(gè)threadpool。

# coding:utf-8
 
import Queue
import threading
import sys
import time
import math
 
 
class WorkThread(threading.Thread):
 
  def __init__(self, task_queue):
    threading.Thread.__init__(self)
    self.setDaemon(True)
    self.task_queue = task_queue
    self.start()
    self.idle = True
 
  def run(self):
    sleep_time = 0.01 # 第1次無(wú)任務(wù)可做時(shí)休息10毫秒
    multiply = 0
    while True:
      try:
        # 從隊(duì)列中取一個(gè)任務(wù)
        func, args, kwargs = self.task_queue.get(block=False)
        self.idle = False
        multiply = 0
        # 執(zhí)行之
        func(*args, **kwargs)
      except Queue.Empty:
        time.sleep(sleep_time * math.pow(2, multiply))
        self.idle = True
        multiply += 1
        continue
      except:
        print sys.exc_info()
        raise
 
 
class ThreadPool:
 
  def __init__(self, thread_num=10, max_queue_len=1000):
    self.max_queue_len = max_queue_len
    self.task_queue = Queue.Queue(max_queue_len) # 任務(wù)等待隊(duì)列
    self.threads = []
    self.__create_pool(thread_num)
 
  def __create_pool(self, thread_num):
    for i in xrange(thread_num):
      thread = WorkThread(self.task_queue)
      self.threads.append(thread)
 
  def add_task(self, func, *args, **kwargs):
    '''添加一個(gè)任務(wù),返回任務(wù)等待隊(duì)列的長(zhǎng)度
      調(diào)用該方法前最后先調(diào)用isSafe()判斷一下等待的任務(wù)是不是很多,以防止提交的任務(wù)被拒絕
    '''
    try:
      self.task_queue.put((func, args, kwargs))
    except Queue.Full:
      raise # 隊(duì)列已滿時(shí)直接拋出異常,不給執(zhí)行
    return self.task_queue.qsize()
 
  def isSafe(self):
    '''等待的任務(wù)數(shù)量離警界線還比較遠(yuǎn)
    '''
    return self.task_queue.qsize() < 0.9 * self.max_queue_len
 
  def wait_for_complete(self):
    '''等待提交到線程池的所有任務(wù)都執(zhí)行完畢
    '''
    #首先任務(wù)等待隊(duì)列要變成空
    while not self.task_queue.empty():
      time.sleep(1)
    # 其次,所以計(jì)算線程要變成idle狀態(tài)
    while True:
      all_idle = True
      for th in self.threads:
        if not th.idle:
          all_idle = False
          break
      if all_idle:
        break
      else:
        time.sleep(1)
 
 
if __name__ == '__main__':
  def foo(a, b):
    print a + b
    time.sleep(0.01)
  thread_pool = ThreadPool(10, 100)
  '''在Windows上測(cè)試不通過(guò),Windows上Queue.Queue不是線程安全的'''
  size = 0
  for i in xrange(10000):
    try:
      size = thread_pool.add_task(foo, i, 2 * i)
    except Queue.Full:
      print 'queue full, queue size is ', size
  time.sleep(2)

總結(jié)

以上就是本文關(guān)于用python實(shí)現(xiàn)的線程池實(shí)例代碼的全部?jī)?nèi)容,希望對(duì)大家有所幫助。感興趣的朋友可以繼續(xù)參閱本站其他相關(guān)專題,如有不足之處,歡迎留言指出。感謝朋友們對(duì)本站的支持!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI