溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Django異步任務(wù)線程池實(shí)現(xiàn)原理

發(fā)布時(shí)間:2021-06-02 18:07:00 來(lái)源:億速云 閱讀:464 作者:Leah 欄目:開發(fā)技術(shù)

本篇文章為大家展示了Django異步任務(wù)線程池實(shí)現(xiàn)原理,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。

請(qǐng)求任務(wù)異步處理的原理

使用python manage.py runserver模式啟動(dòng)的Django應(yīng)用只有一個(gè)進(jìn)程,對(duì)于每個(gè)請(qǐng)求,主線程會(huì)開啟一個(gè)子線程來(lái)處理請(qǐng)求。請(qǐng)求子線程向主線程申請(qǐng)一個(gè)新線程,然后把耗時(shí)的任務(wù)交給新線程,自身立即響應(yīng),這就是請(qǐng)求任務(wù)異步處理的原理。

可視化線程池

如果想要管理這批異步線程,知道他們是否在運(yùn)行中,可以使用線程池(ThreadPoolExecutor)。

線程池會(huì)先啟動(dòng)若干數(shù)量的線程,并讓這些線程都處于睡眠狀態(tài),當(dāng)向線程池submit一個(gè)任務(wù)后,會(huì)喚醒線程池中的某一個(gè)睡眠線程,讓它來(lái)處理這個(gè)任務(wù),當(dāng)處理完這個(gè)任務(wù),線程又處于睡眠狀態(tài)。

submit任務(wù)后會(huì)返回一個(gè)期程(future),這個(gè)對(duì)象可以查看線程池中執(zhí)行此任務(wù)的線程是否仍在處理中

因此可以構(gòu)建一個(gè)全局可視化線程池:

from concurrent.futures.thread import ThreadPoolExecutor


class ThreadPool(object):
  def __init__(self):
    # 線程池
    self.executor = ThreadPoolExecutor(20)
    # 用于存儲(chǔ)每個(gè)項(xiàng)目批量任務(wù)的期程
    self.future_dict = {}

  # 檢查某個(gè)項(xiàng)目是否有正在運(yùn)行的批量任務(wù)
  def is_project_thread_running(self, project_id):
    future = self.future_dict.get(project_id, None)
    if future and future.running():
      # 存在正在運(yùn)行的批量任務(wù)
      return True
    return False

  # 展示所有的異步任務(wù)
  def check_future(self):
    data = {}
    for project_id, future in self.future_dict.items():
      data[project_id] = future.running()
    return data

  def __del__(self):
    self.executor.shutdown()

# 主線程中的全局線程池
# global_thread_pool的生命周期是Django主線程運(yùn)行的生命周期
global_thread_pool = ThreadPool()

使用:

# 檢查異步任務(wù)
if global_thread_pool.is_project_thread_running(project_id):
  raise exceptions.ValidationError(detail='存在正在處理的批量任務(wù),請(qǐng)稍后重試')

# 提交一個(gè)異步任務(wù)
future = global_thread_pool.executor.submit(self.batch_thread, project_id)
global_thread_pool.future_dict[project_id] = future

# 查看所有異步任務(wù)
@login_required
def check_future(request):
  data = global_thread_pool.check_future()
  return HttpResponse(status=status.HTTP_200_OK, content=json.dumps(data))

串行執(zhí)行

使用線程鎖

在全局線程池中初始化線程鎖

class ThreadPool(object):
  def __init__(self):
    self.executor = ThreadPoolExecutor(20)
    self.future_dict = {}
    self.lock = threading.Lock()

然后執(zhí)行線程前需要獲取鎖并再執(zhí)行結(jié)束后釋放鎖

def batch_thread(self):
  global_thread_pool.lock.acquire()
  try:
    ...
    global_thread_pool.lock.release()
  except Exception:
    trace_log = traceback.format_exc()
    logger.error('異步任務(wù)執(zhí)行失敗:\n %s' % trace_log)
    global_thread_pool.lock.release()

需要捕捉異常預(yù)防子線程出錯(cuò)而無(wú)法釋放鎖的情況

異步線程任務(wù)執(zhí)行前先檢查數(shù)據(jù)庫(kù)連接是否可用,然后關(guān)掉不可用連接

由于django的數(shù)據(jù)庫(kù)連接是保存到線程本地變量中的,通過(guò)ThreadPoolExecutor創(chuàng)建的線程會(huì)保存各自的數(shù)據(jù)庫(kù)連接。

當(dāng)連接被保存的時(shí)間超過(guò)mysql連接的最大超時(shí)時(shí)間,連接失效,但不會(huì)被線程釋放。

之后再調(diào)起線程執(zhí)行涉及到數(shù)據(jù)庫(kù)操作的異步任務(wù)時(shí),會(huì)用到失效的數(shù)據(jù)庫(kù)連接,導(dǎo)致報(bào)錯(cuò)“MySQL server has gone away”。

解決方案是在線程池的所有異步任務(wù)執(zhí)行前先檢查數(shù)據(jù)庫(kù)連接是否可用,然后關(guān)掉不可用連接

def batch_thread(self):
  for conn in connections.all():
    conn.close_if_unusable_or_obsolete()
  ...

上述內(nèi)容就是Django異步任務(wù)線程池實(shí)現(xiàn)原理,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI