您好,登錄后才能下訂單哦!
這篇文章主要介紹了Python如何實現(xiàn)的自定義多線程多進程類示例,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
最近經(jīng)常使用到對大量文件進行操作的程序以前每次寫的時候都要在函數(shù)中再寫一個多線程多進程的函數(shù),做了些重復的工作遇到新的任務時還要重寫,因此將多線程與多進程的一些簡單功能寫成一個類,方便使用。功能簡單只為以后方便使用。
使用中發(fā)現(xiàn)bug會再進行更新
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2017/5/10 12:47 # @Author : zhaowen.zhu # @Site : # @File : MultiThread.py # @Software: Python Idle import threading,time,sys,multiprocessing from multiprocessing import Pool class MyTMultithread(threading.Thread): ''''' 自定義的線程函數(shù), 功能:使用多線程運行函數(shù),函數(shù)的參數(shù)只有一個file,并且未實現(xiàn)結(jié)果值的返回 args: filelist 函數(shù)的參數(shù)為列表格式, funname 函數(shù)的名字為字符串,函數(shù)僅有一個參數(shù)為file delay 每個線程之間的延遲, max_threads 線程的最大值 ''' def __init__(self,filelist,delay,funname,max_threads = 50): threading.Thread.__init__(self) self.funname = funname self.filelist = filelist[:] self.delay = delay self.max_threads = max_threads def startrun(self): def runs(): time.sleep(self.delay) while True: try: file = self.filelist.pop() except IndexError as e: break else: self.funname(file) threads = [] while threads or self.filelist: for thread in threads: if not thread.is_alive(): threads.remove(thread) while len(threads) < self.max_threads and self.filelist: thread = threading.Thread(target = runs) thread.setDaemon(True) thread.start() threads.append(thread) class Mymultiprocessing (MyTMultithread): ''''' 多進程運行函數(shù),多進程多線程運行函數(shù) args: filelist 函數(shù)的參數(shù)為列表格式, funname 函數(shù)的名字為字符串,函數(shù)僅有一個參數(shù)為file delay 每個線程\進程之間的延遲, max_threads 最大的線程數(shù) max_multiprocess 最大的進程數(shù) ''' def __init__(self,filelist,delay,funname,max_multiprocess = 1,max_threads = 1): self.funname = funname self.filelist = filelist[:] self.delay = delay self.max_threads = max_threads self.max_multiprocess = max_multiprocess self.num_cpus = multiprocessing.cpu_count() def multiprocessingOnly(self): ''''' 只使用多進程 ''' num_process = min(self.num_cpus,self.max_multiprocess) processes = [] while processes or self.filelist: for p in processes: if not p.is_alive(): # print(p.pid,p.name,len(self.filelist)) processes.remove(p) while len(processes) < num_process and self.filelist: try: file = self.filelist.pop() except IndexError as e: break else: p = multiprocessing.Process(target=self.funname,args=(file,)) p.start() processes.append(p) def multiprocessingThreads(self): num_process = min(self.num_cpus,self.max_multiprocess) p = Pool(num_process) DATALISTS = [] tempmod = len(self.filelist) % (num_process) CD = int((len(self.filelist) + 1 + tempmod)/ (num_process)) for i in range(num_process): if i == num_process: DATALISTS.append(self.filelist[i*CD:-1]) DATALISTS.append(self.filelist[(i*CD):((i+1)*CD)]) try: processes = [] for i in range(num_process): #print('wait add process:',i+1,time.clock()) #print(eval(self.funname),DATALISTS[i]) MultThread = MyTMultithread(DATALISTS[i],self.delay,self.funname,self.max_threads) p = multiprocessing.Process(target=MultThread.startrun()) #print('pid & name:',p.pid,p.name) processes.append(p) for p in processes: print('wait join ') p.start() print('waite over') except Exception as e: print('error :',e) print ('end process') def func1(file): print(file) if __name__ == '__main__': a = list(range(0,97)) ''''' 測試使用5線程 ''' st = time.clock() asc = MyTMultithread(a,0,'func1',5) asc.startrun() end = time.clock() print('*'*50) print('多線程使用時間:',end-st) #測試使用5個進程 st = time.clock() asd = Mymultiprocessing(a,0,'func1',5) asd.multiprocessingOnly() end = time.clock() print('*'*50) print('多進程使用時間:',end-st) #測試使用5進程10線程 st = time.clock() multiPT = Mymultiprocessing(a,0,'func1',5,10) multiPT.multiprocessingThreads() end = time.clock() print('*'*50) print('多進程多線程使用時間:',end-st)
感謝你能夠認真閱讀完這篇文章,希望小編分享的“Python如何實現(xiàn)的自定義多線程多進程類示例”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業(yè)資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。