溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Python線程池如何使用

發(fā)布時(shí)間:2020-09-24 13:06:18 來(lái)源:億速云 閱讀:125 作者:Leah 欄目:編程語(yǔ)言

Python線程池如何使用?相信很多沒(méi)有經(jīng)驗(yàn)的人對(duì)此束手無(wú)策,為此本文總結(jié)了問(wèn)題出現(xiàn)的原因和解決方法,通過(guò)這篇文章希望你能解決這個(gè)問(wèn)題。

系統(tǒng)啟動(dòng)一個(gè)新線程的成本是比較高的,因?yàn)樗婕芭c操作系統(tǒng)的交互。在這種情形下,使用線程池可以很好地提升性能,尤其是當(dāng)程序中需要?jiǎng)?chuàng)建大量生存期很短暫的線程時(shí),更應(yīng)該考慮使用線程池。

線程池在系統(tǒng)啟動(dòng)時(shí)即創(chuàng)建大量空閑的線程,程序只要將一個(gè)函數(shù)提交給線程池,線程池就會(huì)啟動(dòng)一個(gè)空閑的線程來(lái)執(zhí)行它。當(dāng)該函數(shù)執(zhí)行結(jié)束后,該線程并不會(huì)死亡,而是再次返回到線程池中變成空閑狀態(tài),等待執(zhí)行下一個(gè)函數(shù)。

此外,使用線程池可以有效地控制系統(tǒng)中并發(fā)線程的數(shù)量。當(dāng)系統(tǒng)中包含有大量的并發(fā)線程時(shí),會(huì)導(dǎo)致系統(tǒng)性能急劇下降,甚至導(dǎo)致 Python 解釋器崩潰,而線程池的最大線程數(shù)參數(shù)可以控制系統(tǒng)中并發(fā)線程的數(shù)量不超過(guò)此數(shù)。

線程池的使用

線程池的基類(lèi)是 concurrent.futures 模塊中的 Executor,Executor 提供了兩個(gè)子類(lèi),即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于創(chuàng)建線程池,而 ProcessPoolExecutor 用于創(chuàng)建進(jìn)程池。

如果使用線程池/進(jìn)程池來(lái)管理并發(fā)編程,那么只要將相應(yīng)的 task 函數(shù)提交給線程池/進(jìn)程池,剩下的事情就由線程池/進(jìn)程池來(lái)搞定。

Exectuor 提供了如下常用方法:

submit(fn, *args, **kwargs):將 fn 函數(shù)提交給線程池。*args 代表傳給 fn 函數(shù)的參數(shù),*kwargs 代表以關(guān)鍵字參數(shù)的形式為 fn 函數(shù)傳入?yún)?shù)。

map(func, *iterables, timeout=None, chunksize=1):該函數(shù)類(lèi)似于全局函數(shù) map(func, *iterables),只是該函數(shù)將會(huì)啟動(dòng)多個(gè)線程,以異步方式立即對(duì) iterables 執(zhí)行 map 處理。

shutdown(wait=True):關(guān)閉線程池。

程序?qū)?task 函數(shù)提交(submit)給線程池后,submit 方法會(huì)返回一個(gè) Future 對(duì)象,F(xiàn)uture 類(lèi)主要用于獲取線程任務(wù)函數(shù)的返回值。由于線程任務(wù)會(huì)在新線程中以異步方式執(zhí)行,因此,線程執(zhí)行的函數(shù)相當(dāng)于一個(gè)“將來(lái)完成”的任務(wù),所以 Python 使用 Future 來(lái)代表。

Future 提供了如下方法:

cancel():取消該 Future 代表的線程任務(wù)。如果該任務(wù)正在執(zhí)行,不可取消,則該方法返回 False;否則,程序會(huì)取消該任務(wù),并返回 True。

cancelled():返回 Future 代表的線程任務(wù)是否被成功取消。

running():如果該 Future 代表的線程任務(wù)正在執(zhí)行、不可被取消,該方法返回 True。

done():如果該 Funture 代表的線程任務(wù)被成功取消或執(zhí)行完成,則該方法返回 True。

result(timeout=None):獲取該 Future 代表的線程任務(wù)最后返回的結(jié)果。如果 Future 代表的線程任務(wù)還未完成,該方法將會(huì)阻塞當(dāng)前線程,其中 timeout 參數(shù)指定最多阻塞多少秒。

exception(timeout=None):獲取該 Future 代表的線程任務(wù)所引發(fā)的異常。如果該任務(wù)成功完成,沒(méi)有異常,則該方法返回 None。

add_done_callback(fn):為該 Future 代表的線程任務(wù)注冊(cè)一個(gè)“回調(diào)函數(shù)”,當(dāng)該任務(wù)成功完成時(shí),程序會(huì)自動(dòng)觸發(fā)該 fn 函數(shù)。

在用完一個(gè)線程池后,應(yīng)該調(diào)用該線程池的 shutdown() 方法,該方法將啟動(dòng)線程池的關(guān)閉序列。調(diào)用 shutdown() 方法后的線程池不再接收新任務(wù),但會(huì)將以前所有的已提交任務(wù)執(zhí)行完成。當(dāng)線程池中的所有任務(wù)都執(zhí)行完成后,該線程池中的所有線程都會(huì)死亡。

使用線程池來(lái)執(zhí)行線程任務(wù)的步驟如下:

調(diào)用 ThreadPoolExecutor 類(lèi)的構(gòu)造器創(chuàng)建一個(gè)線程池。

定義一個(gè)普通函數(shù)作為線程任務(wù)。

調(diào)用 ThreadPoolExecutor 對(duì)象的 submit() 方法來(lái)提交線程任務(wù)。

當(dāng)不想提交任何任務(wù)時(shí),調(diào)用 ThreadPoolExecutor 對(duì)象的 shutdown() 方法來(lái)關(guān)閉線程池。

下面程序示范了如何使用線程池來(lái)執(zhí)行線程任務(wù):

from concurrent.futures import ThreadPoolExecutor
import threading
import time

# 定義一個(gè)準(zhǔn)備作為線程任務(wù)的函數(shù)
def action(max):
    my_sum = 0
    for i in range(max):
        print(threading.current_thread().name + '  ' + str(i))
        my_sum += i
    return my_sum
# 創(chuàng)建一個(gè)包含2條線程的線程池
pool = ThreadPoolExecutor(max_workers=2)
# 向線程池提交一個(gè)task, 50會(huì)作為action()函數(shù)的參數(shù)
future1 = pool.submit(action, 50)
# 向線程池再提交一個(gè)task, 100會(huì)作為action()函數(shù)的參數(shù)
future2 = pool.submit(action, 100)
# 判斷future1代表的任務(wù)是否結(jié)束
print(future1.done())
time.sleep(3)
# 判斷future2代表的任務(wù)是否結(jié)束
print(future2.done())
# 查看future1代表的任務(wù)返回的結(jié)果
print(future1.result())
# 查看future2代表的任務(wù)返回的結(jié)果
print(future2.result())
# 關(guān)閉線程池
pool.shutdown()

上面程序中,第 13 行代碼創(chuàng)建了一個(gè)包含兩個(gè)線程的線程池,接下來(lái)的兩行代碼只要將 action() 函數(shù)提交(submit)給線程池,該線程池就會(huì)負(fù)責(zé)啟動(dòng)線程來(lái)執(zhí)行 action() 函數(shù)。這種啟動(dòng)線程的方法既優(yōu)雅,又具有更高的效率。

當(dāng)程序把 action() 函數(shù)提交給線程池時(shí),submit() 方法會(huì)返回該任務(wù)所對(duì)應(yīng)的 Future 對(duì)象,程序立即判斷 futurel 的 done() 方法,該方法將會(huì)返回 False(表明此時(shí)該任務(wù)還未完成)。接下來(lái)主程序暫停 3 秒,然后判斷 future2 的 done() 方法,如果此時(shí)該任務(wù)已經(jīng)完成,那么該方法將會(huì)返回 True。

程序最后通過(guò) Future 的 result() 方法來(lái)獲取兩個(gè)異步任務(wù)返回的結(jié)果。

讀者可以自己運(yùn)行此代碼查看運(yùn)行結(jié)果,這里不再演示。

當(dāng)程序使用 Future 的 result() 方法來(lái)獲取結(jié)果時(shí),該方法會(huì)阻塞當(dāng)前線程,如果沒(méi)有指定 timeout 參數(shù),當(dāng)前線程將一直處于阻塞狀態(tài),直到 Future 代表的任務(wù)返回。

獲取執(zhí)行結(jié)果

前面程序調(diào)用了 Future 的 result() 方法來(lái)獲取線程任務(wù)的運(yùn)回值,但該方法會(huì)阻塞當(dāng)前主線程,只有等到錢(qián)程任務(wù)完成后,result() 方法的阻塞才會(huì)被解除。

如果程序不希望直接調(diào)用 result() 方法阻塞線程,則可通過(guò) Future 的 add_done_callback() 方法來(lái)添加回調(diào)函數(shù),該回調(diào)函數(shù)形如 fn(future)。當(dāng)線程任務(wù)完成后,程序會(huì)自動(dòng)觸發(fā)該回調(diào)函數(shù),并將對(duì)應(yīng)的 Future 對(duì)象作為參數(shù)傳給該回調(diào)函數(shù)。

下面程序使用 add_done_callback() 方法來(lái)獲取線程任務(wù)的返回值:

from concurrent.futures import ThreadPoolExecutor
import threading
import time

# 定義一個(gè)準(zhǔn)備作為線程任務(wù)的函數(shù)
def action(max):
    my_sum = 0
    for i in range(max):
        print(threading.current_thread().name + '  ' + str(i))
        my_sum += i
    return my_sum
# 創(chuàng)建一個(gè)包含2條線程的線程池
with ThreadPoolExecutor(max_workers=2) as pool:
    # 向線程池提交一個(gè)task, 50會(huì)作為action()函數(shù)的參數(shù)
    future1 = pool.submit(action, 50)
    # 向線程池再提交一個(gè)task, 100會(huì)作為action()函數(shù)的參數(shù)
    future2 = pool.submit(action, 100)
    def get_result(future):
        print(future.result())
    # 為future1添加線程完成的回調(diào)函數(shù)
    future1.add_done_callback(get_result)
    # 為future2添加線程完成的回調(diào)函數(shù)
    future2.add_done_callback(get_result)
    print('--------------')

上面主程序分別為 future1、future2 添加了同一個(gè)回調(diào)函數(shù),該回調(diào)函數(shù)會(huì)在線程任務(wù)結(jié)束時(shí)獲取其返回值。

主程序的最后一行代碼打印了一條橫線。由于程序并未直接調(diào)用 future1、future2 的 result() 方法,因此主線程不會(huì)被阻塞,可以立即看到輸出主線程打印出的橫線。接下來(lái)將會(huì)看到兩個(gè)新線程并發(fā)執(zhí)行,當(dāng)線程任務(wù)執(zhí)行完成后,get_result() 函數(shù)被觸發(fā),輸出線程任務(wù)的返回值。

另外,由于線程池實(shí)現(xiàn)了上下文管理協(xié)議(Context Manage Protocol),因此,程序可以使用 with 語(yǔ)句來(lái)管理線程池,這樣即可避免手動(dòng)關(guān)閉線程池,如上面的程序所示。

此外,Exectuor 還提供了一個(gè) map(func, *iterables, timeout=None, chunksize=1) 方法,該方法的功能類(lèi)似于全局函數(shù) map(),區(qū)別在于線程池的 map() 方法會(huì)為 iterables 的每個(gè)元素啟動(dòng)一個(gè)線程,以并發(fā)方式來(lái)執(zhí)行 func 函數(shù)。這種方式相當(dāng)于啟動(dòng) len(iterables) 個(gè)線程,井收集每個(gè)線程的執(zhí)行結(jié)果。

例如,如下程序使用 Executor 的 map() 方法來(lái)啟動(dòng)線程,并收集線程任務(wù)的返回值:

from concurrent.futures import ThreadPoolExecutor
import threading
import time

# 定義一個(gè)準(zhǔn)備作為線程任務(wù)的函數(shù)
def action(max):
    my_sum = 0
    for i in range(max):
        print(threading.current_thread().name + '  ' + str(i))
        my_sum += i
    return my_sum
# 創(chuàng)建一個(gè)包含4條線程的線程池
with ThreadPoolExecutor(max_workers=4) as pool:
    # 使用線程執(zhí)行map計(jì)算
    # 后面元組有3個(gè)元素,因此程序啟動(dòng)3條線程來(lái)執(zhí)行action函數(shù)
    results = pool.map(action, (50, 100, 150))
    print('--------------')
    for r in results:
        print(r)

上面程序使用 map() 方法來(lái)啟動(dòng) 3 個(gè)線程(該程序的線程池包含 4 個(gè)線程,如果繼續(xù)使用只包含兩個(gè)線程的線程池,此時(shí)將有一個(gè)任務(wù)處于等待狀態(tài),必須等其中一個(gè)任務(wù)完成,線程空閑出來(lái)才會(huì)獲得執(zhí)行的機(jī)會(huì)),map() 方法的返回值將會(huì)收集每個(gè)線程任務(wù)的返回結(jié)果。
運(yùn)行上面程序,同樣可以看到 3 個(gè)線程并發(fā)執(zhí)行的結(jié)果,最后通過(guò) results 可以看到 3 個(gè)線程任務(wù)的返回結(jié)果。
通過(guò)上面程序可以看出,使用 map() 方法來(lái)啟動(dòng)線程,并收集線程的執(zhí)行結(jié)果,不僅具有代碼簡(jiǎn)單的優(yōu)點(diǎn),而且雖然程序會(huì)以并發(fā)方式來(lái)執(zhí)行 action() 函數(shù),但最后收集的 action() 函數(shù)的執(zhí)行結(jié)果,依然與傳入?yún)?shù)的結(jié)果保持一致。也就是說(shuō),上面 results 的第一個(gè)元素是 action(50) 的結(jié)果,第二個(gè)元素是 action(100) 的結(jié)果,第三個(gè)元素是 action(150) 的結(jié)果。

看完上述內(nèi)容,你們掌握Python線程池如何使用的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI