溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Python自定義線程池實現(xiàn)方法分析

發(fā)布時間:2020-09-19 14:10:06 來源:腳本之家 閱讀:144 作者:蒼松 欄目:開發(fā)技術(shù)

本文實例講述了Python自定義線程池實現(xiàn)方法。分享給大家供大家參考,具體如下:

關(guān)于python的多線程,由與GIL的存在被廣大群主所詬病,說python的多線程不是真正的多線程。但多線程處理IO密集的任務(wù)效率還是可以杠杠的。

我實現(xiàn)的這個線程池其實是根據(jù)銀角的思路來實現(xiàn)的。

主要思路:

任務(wù)獲取和執(zhí)行:

1、任務(wù)加入隊列,等待線程來獲取并執(zhí)行。
2、按需生成線程,每個線程循環(huán)取任務(wù)。

線程銷毀:

1、獲取任務(wù)是終止符時,線程停止。
2、線程池close()時,向任務(wù)隊列加入和已生成線程等量的終止符。
3、線程池terminate()時,設(shè)置線程下次任務(wù)取到為終止符。

流程概要設(shè)計:

Python自定義線程池實現(xiàn)方法分析

詳細代碼:

import threading
import contextlib
from Queue import Queue
import time
class ThreadPool(object):
  def __init__(self, max_num):
    self.StopEvent = 0#線程任務(wù)終止符,當(dāng)線程從隊列獲取到StopEvent時,代表此線程可以銷毀。可設(shè)置為任意與任務(wù)有區(qū)別的值。
    self.q = Queue()
    self.max_num = max_num #最大線程數(shù)
    self.terminal = False  #是否設(shè)置線程池強制終止
    self.created_list = [] #已創(chuàng)建線程的線程列表
    self.free_list = [] #空閑線程的線程列表
    self.Deamon=False #線程是否是后臺線程
  def run(self, func, args, callback=None):
    """
    線程池執(zhí)行一個任務(wù)
    :param func: 任務(wù)函數(shù)
    :param args: 任務(wù)函數(shù)所需參數(shù)
    :param callback:
    :return: 如果線程池已經(jīng)終止,則返回True否則None
    """
    if len(self.free_list) == 0 and len(self.created_list) < self.max_num:
      self.create_thread()
    task = (func, args, callback,)
    self.q.put(task)
  def create_thread(self):
    """
    創(chuàng)建一個線程
    """
    t = threading.Thread(target=self.call)
    t.setDaemon(self.Deamon)
    t.start()
    self.created_list.append(t)#將當(dāng)前線程加入已創(chuàng)建線程列表created_list
  def call(self):
    """
    循環(huán)去獲取任務(wù)函數(shù)并執(zhí)行任務(wù)函數(shù)
    """
    current_thread = threading.current_thread()  #獲取當(dāng)前線程對象·
    event = self.q.get()  #從任務(wù)隊列獲取任務(wù)
    while event != self.StopEvent:  #判斷獲取到的任務(wù)是否是終止符
      func, arguments, callback = event#從任務(wù)中獲取函數(shù)名、參數(shù)、和回調(diào)函數(shù)名
      try:
        result = func(*arguments)
        func_excute_status =True#func執(zhí)行成功狀態(tài)
      except Exception as e:
        func_excute_status = False
        result =None
        print '函數(shù)執(zhí)行產(chǎn)生錯誤', e#打印錯誤信息
      if func_excute_status:#func執(zhí)行成功后才能執(zhí)行回調(diào)函數(shù)
        if callback is not None:#判斷回調(diào)函數(shù)是否是空的
          try:
            callback(result)
          except Exception as e:
            print '回調(diào)函數(shù)執(zhí)行產(chǎn)生錯誤', e # 打印錯誤信息
      with self.worker_state(self.free_list,current_thread):
        #執(zhí)行完一次任務(wù)后,將線程加入空閑列表。然后繼續(xù)去取任務(wù),如果取到任務(wù)就將線程從空閑列表移除
        if self.terminal:#判斷線程池終止命令,如果需要終止,則使下次取到的任務(wù)為StopEvent。
          event = self.StopEvent
        else: #否則繼續(xù)獲取任務(wù)
          event = self.q.get() # 當(dāng)線程等待任務(wù)時,q.get()方法阻塞住線程,使其持續(xù)等待
    else:#若線程取到的任務(wù)是終止符,就銷毀線程
      #將當(dāng)前線程從已創(chuàng)建線程列表created_list移除
      self.created_list.remove(current_thread)
  def close(self):
    """
    執(zhí)行完所有的任務(wù)后,所有線程停止
    """
    full_size = len(self.created_list)#按已創(chuàng)建的線程數(shù)量往線程隊列加入終止符。
    while full_size:
      self.q.put(self.StopEvent)
      full_size -= 1
  def terminate(self):
    """
    無論是否還有任務(wù),終止線程
    """
    self.terminal = True
    while self.created_list:
      self.q.put(self.StopEvent)
    self.q.queue.clear()#清空任務(wù)隊列
  def join(self):
    """
    阻塞線程池上下文,使所有線程執(zhí)行完后才能繼續(xù)
    """
    for t in self.created_list:
      t.join()
  @contextlib.contextmanager#上下文處理器,使其可以使用with語句修飾
  def worker_state(self, state_list, worker_thread):
    """
    用于記錄線程中正在等待的線程數(shù)
    """
    state_list.append(worker_thread)
    try:
      yield
    finally:
      state_list.remove(worker_thread)
if __name__ == '__main__':
  def Foo(arg):
    return arg
    # time.sleep(0.1)
  def Bar(res):
    print res
  pool=ThreadPool(5)
  # pool.Deamon=True#需在pool.run之前設(shè)置
  for i in range(1000):
    pool.run(func=Foo,args=(i,),callback=Bar)
  pool.close()
  pool.join()
  # pool.terminate()
  print "任務(wù)隊列里任務(wù)數(shù)%s" %pool.q.qsize()
  print "當(dāng)前存活子線程數(shù)量:%d" % threading.activeCount()
  print "當(dāng)前線程創(chuàng)建列表:%s" %pool.created_list
  print "當(dāng)前線程創(chuàng)建列表:%s" %pool.free_list

關(guān)于上下文處理:

來個簡單例子說明:

下面的代碼手動自定義了一個myopen方法,模擬我們常見的with open() as f:語句。具體的contextlib模塊使用,會單獨開章來將。

# coding:utf-8
import contextlib
@contextlib.contextmanager#定義該函數(shù)支持上下文with語句
def myopen(filename,mode):
  f=open(filename,mode)
  try:
    yield f.readlines()#正常執(zhí)行返回f.readlines()
  except Exception as e:
    print e
  finally:
    f.close()#最后在with代碼快執(zhí)行完畢后返回執(zhí)行finally下的f.close()實現(xiàn)關(guān)閉文件
if __name__ == '__main__':
  with myopen(r'c:\ip1.txt','r') as f:
    for line in f:
      print line

更多關(guān)于Python相關(guān)內(nèi)容感興趣的讀者可查看本站專題:《Python進程與線程操作技巧總結(jié)》、《Python Socket編程技巧總結(jié)》、《Python數(shù)據(jù)結(jié)構(gòu)與算法教程》、《Python函數(shù)使用技巧總結(jié)》、《Python字符串操作技巧匯總》、《Python入門與進階經(jīng)典教程》及《Python文件與目錄操作技巧匯總》

希望本文所述對大家Python程序設(shè)計有所幫助。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI