溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

為什么python ThreadPoolExecutor 出現(xiàn)線(xiàn)程池異常捕獲的問(wèn)題

發(fā)布時(shí)間:2021-03-10 15:36:46 來(lái)源:億速云 閱讀:350 作者:TREX 欄目:開(kāi)發(fā)技術(shù)

本篇內(nèi)容介紹了“為什么python ThreadPoolExecutor 出現(xiàn)線(xiàn)程池異常捕獲的問(wèn)題”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

問(wèn)題

最近寫(xiě)了涉及線(xiàn)程池及線(xiàn)程的 python 腳本,運(yùn)行過(guò)程中發(fā)現(xiàn)一個(gè)有趣的現(xiàn)象,線(xiàn)程池中的工作線(xiàn)程出現(xiàn)問(wèn)題,引發(fā)了異常,但是主線(xiàn)程沒(méi)有捕獲異常,還在發(fā)現(xiàn) BUG 之前一度以為線(xiàn)程池代碼正常返回。

先說(shuō)重點(diǎn)

這里主要想介紹 python concurrent.futuresthread.ThreadPoolExecutor 線(xiàn)程池中的 worker 引發(fā)異常的時(shí)候,并不會(huì)直接向上拋起異常,而是需要主線(xiàn)程通過(guò)調(diào)用concurrent.futures.Future.exception(timeout=None) 方法主動(dòng)獲取 worker 的異常。

問(wèn)題重現(xiàn)及解決

引子

問(wèn)題主要由這樣一段代碼引起的:

def thread_executor():
 logger.info("I am slave. I am working. I am going to sleep 3s")
 sleep(3)
 logger.info("Exit thread executor")


def main():
 thread_obj = threading.Thread(target=thread_executor)
 while True:
  logger.info("Master starts thread worker")

  try:
   # 工作線(xiàn)程由于某種異常而結(jié)束并退出了,想重啟工作線(xiàn)程的工作,但又不想重復(fù)創(chuàng)建線(xiàn)程
   thread_obj.start() # 這一行會(huì)報(bào)錯(cuò),同一線(xiàn)程不能重復(fù)啟動(dòng)
  except Exception as e:
   logger.error("Master start thread error", exc_info=True)
   raise e

  logger.info("Master is going to sleep 5s")
  sleep(5)

上面這段代碼的功能如注釋中解釋的,主要要實(shí)現(xiàn)類(lèi)似生產(chǎn)者消費(fèi)者的功能,工作線(xiàn)程一直去生產(chǎn)資源,主線(xiàn)程去消費(fèi)工作線(xiàn)程生產(chǎn)的資源。但是工作線(xiàn)程由于異常推出了,想重新啟動(dòng)生產(chǎn)工作。顯然,這個(gè)代碼會(huì)報(bào)錯(cuò)。

運(yùn)行結(jié)果:

thread: MainThread [INFO] Master starts thread worker
thread: Thread-1 [INFO] I am slave. I am working. I am going to sleep 3s
thread: MainThread [INFO] Master is going to sleep 5s
thread: Thread-1 [INFO] Exit thread executor because of some exception
thread: MainThread [INFO] Master starts thread worker
thread: MainThread [ERROR] Master start thread error
Traceback (most recent call last):
File "xxx.py", line 47, in main
 thread_obj.start()
File "E:\anaconda\lib\threading.py", line 843, in start
 raise RuntimeError("threads can only be started once")
RuntimeError: threads can only be started once
Traceback (most recent call last):
File "xxx.py", line 56, in <module>
 main()
File "xxx.py", line 50, in main
 raise e
File "xxx.py", line 47, in main
 thread_obj.start()
File "E:\anaconda\lib\threading.py", line 843, in start
 raise RuntimeError("threads can only be started once")
RuntimeError: threads can only be started once

切入正題

然而腳本還有其他業(yè)務(wù)代碼要運(yùn)行,所以需要把上面的資源生產(chǎn)和消費(fèi)的代碼放到一個(gè)線(xiàn)程里完成,所以引入線(xiàn)程池來(lái)執(zhí)行這段代碼:

def thread_executor():
 while True:
  logger.info("I am slave. I am working. I am going to sleep 3s")
  sleep(3)
  logger.info("Exit thread executor because of some exception")
  break


def main():
 thread_obj = threading.Thread(target=thread_executor)
 while True:
  logger.info("Master starts thread worker")

  # 工作線(xiàn)程由于某種異常而結(jié)束并退出了,想重啟工作線(xiàn)程的工作,但又不想重復(fù)創(chuàng)建線(xiàn)程
  # 沒(méi)有想到這里會(huì)有異常
  thread_obj.start() # 這一行會(huì)報(bào)錯(cuò),同一線(xiàn)程不能重復(fù)啟動(dòng)

  logger.info("Master is going to sleep 5s")
  sleep(5)


def thread_pool_main():
 thread_obj = ThreadPoolExecutor(max_workers=1, thread_name_prefix="WorkExecutor")
 logger.info("Master ThreadPool Executor starts thread worker")
 thread_obj.submit(main)

 while True:
  logger.info("Master ThreadPool Executor is going to sleep 5s")
  sleep(5)

if __name__ == '__main__':
 thread_pool_main()

代碼運(yùn)行結(jié)果如下:

INFO [thread: MainThread] Master ThreadPool Executor starts thread worker
INFO [thread: WorkExecutor_0] Master starts thread worker
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: Thread-1] I am slave. I am working. I am going to sleep 3s
INFO [thread: WorkExecutor_0] Master is going to sleep 5s
INFO [thread: Thread-1] Exit thread executor because of some exception
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: WorkExecutor_0] Master starts thread worker
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s

... ...

顯然,由上面的結(jié)果,在線(xiàn)程池 worker 執(zhí)行到 INFO [thread: WorkExecutor_0] Master starts thread worker 的時(shí)候,是會(huì)有異常產(chǎn)生的,但是整個(gè)代碼并沒(méi)有拋棄任何異常。

解決方法

發(fā)現(xiàn)上面的 bug 后,想在線(xiàn)程池 worker 出錯(cuò)的時(shí)候,把異常記錄到日志。查閱資料,要獲取線(xiàn)程池的異常信息,需要調(diào)用 concurrent.futures.Future.exception(timeout=None) 方法,為了記錄日志,這里加了線(xiàn)程池執(zhí)行結(jié)束的回調(diào)函數(shù)。同時(shí),日志中記錄異常信息,用了 logging.exception() 方法。

def thread_executor():
 while True:
  logger.info("I am slave. I am working. I am going to sleep 3s")
  sleep(3)
  logger.info("Exit thread executor because of some exception")
  break


def main():
 thread_obj = threading.Thread(target=thread_executor)
 while True:
  logger.info("Master starts thread worker")

  # 工作線(xiàn)程由于某種異常而結(jié)束并退出了,想重啟工作線(xiàn)程的工作,但又不想重復(fù)創(chuàng)建線(xiàn)程
  # 沒(méi)有想到這里會(huì)有異常
  thread_obj.start() # 這一行會(huì)報(bào)錯(cuò),同一線(xiàn)程不能重復(fù)啟動(dòng)

  logger.info("Master is going to sleep 5s")
  sleep(5)


def thread_pool_callback(worker):
 logger.info("called thread pool executor callback function")
 worker_exception = worker.exception()
 if worker_exception:
  logger.exception("Worker return exception: {}".format(worker_exception))


def thread_pool_main():
 thread_obj = ThreadPoolExecutor(max_workers=1, thread_name_prefix="WorkExecutor")
 logger.info("Master ThreadPool Executor starts thread worker")
 thread_pool_exc = thread_obj.submit(main)
 thread_pool_exc.add_done_callback(thread_pool_callback)
 # logger.info("thread pool exception: {}".format(thread_pool_exc.exception()))

 while True:
  logger.info("Master ThreadPool Executor is going to sleep 5s")
  sleep(5)


if __name__ == '__main__':
 thread_pool_main()

代碼運(yùn)行結(jié)果:

INFO [thread: MainThread] Master ThreadPool Executor starts thread worker
INFO [thread: WorkExecutor_0] Master starts thread worker
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: Thread-1] I am slave. I am working. I am going to sleep 3s
INFO [thread: WorkExecutor_0] Master is going to sleep 5s
INFO [thread: Thread-1] Exit thread executor because of some exception
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: WorkExecutor_0] Master starts thread worker
INFO [thread: WorkExecutor_0] called thread pool executor callback function
ERROR [thread: WorkExecutor_0] Worker return exception: threads can only be started once
Traceback (most recent call last):
File "E:\anaconda\lib\concurrent\futures\thread.py", line 57, in run
 result = self.fn(*self.args, **self.kwargs)
File "xxxx.py", line 46, in main
 thread_obj.start() # 這一行會(huì)報(bào)錯(cuò),同一線(xiàn)程不能重復(fù)啟動(dòng)
File "E:\anaconda\lib\threading.py", line 843, in start
 raise RuntimeError("threads can only be started once")
RuntimeError: threads can only be started once
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
... ...

最終的寫(xiě)法

其實(shí),上面寫(xiě)法中,想重復(fù)利用一個(gè)線(xiàn)程去實(shí)現(xiàn)生產(chǎn)者線(xiàn)程的實(shí)現(xiàn)方法是有問(wèn)題的,在此處,一般情況下,線(xiàn)程執(zhí)行結(jié)束后,線(xiàn)程資源會(huì)被會(huì)被操作系統(tǒng),所以線(xiàn)程不能被重復(fù)調(diào)用 start() 。

為什么python ThreadPoolExecutor 出現(xiàn)線(xiàn)程池異常捕獲的問(wèn)題

為什么python ThreadPoolExecutor 出現(xiàn)線(xiàn)程池異常捕獲的問(wèn)題

一種可行的實(shí)現(xiàn)方式就是,用線(xiàn)程池替代。當(dāng)然,這樣做得注意上面提到的線(xiàn)程池執(zhí)行體的異常捕獲問(wèn)題。

def thread_executor():
 while True:
  logger.info("I am slave. I am working. I am going to sleep 3s")
  sleep(3)
  logger.info("Exit thread executor because of some exception")
  break

def executor_callback(worker):
 logger.info("called worker callback function")
 worker_exception = worker.exception()
 if worker_exception:
  logger.exception("Worker return exception: {}".format(worker_exception))
  # raise worker_exception


def main():
 slave_thread_pool = ThreadPoolExecutor(max_workers=1, thread_name_prefix="SlaveExecutor")
 restart_flag = False
 while True:
  logger.info("Master starts thread worker")

  if not restart_flag:
   restart_flag = not restart_flag
   logger.info("Restart Slave work")
  slave_thread_pool.submit(thread_executor).add_done_callback(executor_callback)

  logger.info("Master is going to sleep 5s")
  sleep(5)

總結(jié)

這個(gè)問(wèn)題主要還是因?yàn)閷?duì) Python 的 concurrent.futuresthread.ThreadPoolExecutor 不夠了解導(dǎo)致的,接觸這個(gè)包是在書(shū)本上,但是書(shū)本沒(méi)完全介紹包的全部 API 及用法,所以代碼產(chǎn)生異常情況后,DEBUG 了許久在真正找到問(wèn)題所在。查閱 python docs 后才對(duì)其完整用法有所認(rèn)識(shí),所以,以后學(xué)習(xí)新的 python 包的時(shí)候還是可以查一查官方文檔的。

參考資料

英文版: docs of python concurrent.futures

中文版: python docs concurrent.futures — 啟動(dòng)并行任務(wù)

exception(timeout=None)

返回由調(diào)用引發(fā)的異常。如果調(diào)用還沒(méi)完成那么這個(gè)方法將等待 timeout 秒。如果在 timeout 秒內(nèi)沒(méi)有執(zhí)行完成,concurrent.futures.TimeoutError 將會(huì)被觸發(fā)。timeout 可以是整數(shù)或浮點(diǎn)數(shù)。如果 timeout 沒(méi)有指定或?yàn)?None,那么等待時(shí)間就沒(méi)有限制。

如果 futrue 在完成前被取消則 CancelledError 將被觸發(fā)。

如果調(diào)用正常完成那么返回 None。

add_done_callback(fn)

附加可調(diào)用 fn 到期程。當(dāng)期程被取消或完成運(yùn)行時(shí),將會(huì)調(diào)用 fn,而這個(gè)期程將作為它唯一的參數(shù)。

加入的可調(diào)用對(duì)象總被屬于添加它們的進(jìn)程中的線(xiàn)程按加入的順序調(diào)用。如果可調(diào)用對(duì)象引發(fā)一個(gè) Exception 子類(lèi),它會(huì)被記錄下來(lái)并被忽略掉。如果可調(diào)用對(duì)象引發(fā)一個(gè) BaseException 子類(lèi),這個(gè)行為沒(méi)有定義。

如果期程已經(jīng)完成或已取消,fn 會(huì)被立即調(diào)用。


“為什么python ThreadPoolExecutor 出現(xiàn)線(xiàn)程池異常捕獲的問(wèn)題”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI