您好,登錄后才能下訂單哦!
一、concurrent模塊的介紹
concurrent.futures
模塊提供了高度封裝的異步調(diào)用接口
ThreadPoolExecutor
:線程池,提供異步調(diào)用
ProcessPoolExecutor
:進程池,提供異步調(diào)用
ProcessPoolExecutor
和 ThreadPoolExecutor
:兩者都實現(xiàn)相同的接口,該接口由抽象Executor類定義。
二、基本方法
submit(fn, *args, **kwargs)
:異步提交任務(wù)
map(func, *iterables, timeout=None, chunksize=1)
:取代for循環(huán)submit的操作
shutdown(wait=True)
:相當于進程池的pool.close()+pool.join()
操作
result(timeout=None)
:取得結(jié)果
add_done_callback(fn)
:回調(diào)函數(shù)
三、進程池和線程池
池的功能:限制進程數(shù)或線程數(shù).
什么時候限制: 當并發(fā)的任務(wù)數(shù)量遠遠大于計算機所能承受的范圍,即無法一次性開啟過多的任務(wù)數(shù)量 我就應(yīng)該考慮去限制我進程數(shù)或線程數(shù),從保證服務(wù)器不崩.
3.1 進程池
from concurrent.futures import ProcessPoolExecutor from multiprocessing import Process,current_process import time def task(i): print(f'{current_process().name} 在執(zhí)行任務(wù){(diào)i}') time.sleep(1) if __name__ == '__main__': pool = ProcessPoolExecutor(4) # 進程池里又4個進程 for i in range(20): # 20個任務(wù) pool.submit(task,i)# 進程池里當前執(zhí)行的任務(wù)i,池子里的4個進程一次一次執(zhí)行任務(wù)
3.2 線程池
from concurrent.futures import ThreadPoolExecutor from threading import Thread,currentThread import time def task(i): print(f'{currentThread().name} 在執(zhí)行任務(wù){(diào)i}') time.sleep(1) if __name__ == '__main__': pool = ThreadPoolExecutor(4) # 進程池里又4個線程 for i in range(20): # 20個任務(wù) pool.submit(task,i)# 線程池里當前執(zhí)行的任務(wù)i,池子里的4個線程一次一次執(zhí)行任務(wù)
四、Map的用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(20): # future=executor.submit(task,i) executor.map(task,range(1,21)) #map取代了for+submit
五、同步和異步
理解為提交任務(wù)的兩種方式
同步: 提交了一個任務(wù),必須等任務(wù)執(zhí)行完了(拿到返回值),才能執(zhí)行下一行代碼
異步: 提交了一個任務(wù),不要等執(zhí)行完了,可以直接執(zhí)行下一行代碼.
同步:相當于執(zhí)行任務(wù)的串行執(zhí)行
異步
from concurrent.futures import ProcessPoolExecutor from multiprocessing import Process,current_process import time n = 1 def task(i): global n print(f'{current_process().name} 在執(zhí)行任務(wù){(diào)i}') time.sleep(1) n += i return n if __name__ == '__main__': pool = ProcessPoolExecutor(4) # 進程池里又4個線程 pool_lis = [] for i in range(20): # 20個任務(wù) future = pool.submit(task,i)# 進程池里當前執(zhí)行的任務(wù)i,池子里的4個線程一次一次執(zhí)行任務(wù) # print(future.result()) # 這是在等待我執(zhí)行任務(wù)得到的結(jié)果,如果一直沒有結(jié)果,這里會導(dǎo)致我們所有任務(wù)編程了串行 # 在這里就引出了下面的pool.shutdown()方法 pool_lis.append(future) pool.shutdown(wait=True) # 關(guān)閉了池的入口,不允許在往里面添加任務(wù)了,會等帶所有的任務(wù)執(zhí)行完,結(jié)束阻塞 for p in pool_lis: print(p.result()) print(n)# 這里一開始肯定是拿到0的,因為我只是去告訴操作系統(tǒng)執(zhí)行子進程的任務(wù),代碼依然會繼續(xù)往下執(zhí)行 # 可以用join去解決,等待每一個進程結(jié)束后,拿到他的結(jié)果
六、回調(diào)函數(shù)
import time from threading import Thread,currentThread from concurrent.futures import ThreadPoolExecutor def task(i): print(f'{currentThread().name} 在執(zhí)行{i}') time.sleep(1) return i**2 # parse 就是一個回調(diào)函數(shù) def parse(future): # 處理拿到的結(jié)果 print(f'{currentThread().name} 結(jié)束了當前任務(wù)') print(future.result()) if __name__ == '__main__': pool = ThreadPoolExecutor(4) for i in range(20): future = pool.submit(task,i) ''' 給當前執(zhí)行的任務(wù)綁定了一個函數(shù),在當前任務(wù)結(jié)束的時候就會觸發(fā)這個函數(shù)(稱之為回調(diào)函數(shù)) 會把future對象作為參數(shù)傳給函數(shù) 注:這個稱為回調(diào)函數(shù),當前任務(wù)處理結(jié)束了,就回來調(diào)parse這個函數(shù) ''' future.add_done_callback(parse) # add_done_callback (parse) parse是一個回調(diào)函數(shù) # add_done_callback () 是對象的一個綁定方法,他的參數(shù)就是一個函數(shù)
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持億速云。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。