您好,登錄后才能下訂單哦!
本篇文章為大家展示了Django異步任務(wù)線程池實(shí)現(xiàn)原理,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。
請(qǐng)求任務(wù)異步處理的原理
使用python manage.py runserver模式啟動(dòng)的Django應(yīng)用只有一個(gè)進(jìn)程,對(duì)于每個(gè)請(qǐng)求,主線程會(huì)開啟一個(gè)子線程來(lái)處理請(qǐng)求。請(qǐng)求子線程向主線程申請(qǐng)一個(gè)新線程,然后把耗時(shí)的任務(wù)交給新線程,自身立即響應(yīng),這就是請(qǐng)求任務(wù)異步處理的原理。
可視化線程池
如果想要管理這批異步線程,知道他們是否在運(yùn)行中,可以使用線程池(ThreadPoolExecutor)。
線程池會(huì)先啟動(dòng)若干數(shù)量的線程,并讓這些線程都處于睡眠狀態(tài),當(dāng)向線程池submit一個(gè)任務(wù)后,會(huì)喚醒線程池中的某一個(gè)睡眠線程,讓它來(lái)處理這個(gè)任務(wù),當(dāng)處理完這個(gè)任務(wù),線程又處于睡眠狀態(tài)。
submit任務(wù)后會(huì)返回一個(gè)期程(future),這個(gè)對(duì)象可以查看線程池中執(zhí)行此任務(wù)的線程是否仍在處理中
因此可以構(gòu)建一個(gè)全局可視化線程池:
from concurrent.futures.thread import ThreadPoolExecutor class ThreadPool(object): def __init__(self): # 線程池 self.executor = ThreadPoolExecutor(20) # 用于存儲(chǔ)每個(gè)項(xiàng)目批量任務(wù)的期程 self.future_dict = {} # 檢查某個(gè)項(xiàng)目是否有正在運(yùn)行的批量任務(wù) def is_project_thread_running(self, project_id): future = self.future_dict.get(project_id, None) if future and future.running(): # 存在正在運(yùn)行的批量任務(wù) return True return False # 展示所有的異步任務(wù) def check_future(self): data = {} for project_id, future in self.future_dict.items(): data[project_id] = future.running() return data def __del__(self): self.executor.shutdown() # 主線程中的全局線程池 # global_thread_pool的生命周期是Django主線程運(yùn)行的生命周期 global_thread_pool = ThreadPool()
使用:
# 檢查異步任務(wù) if global_thread_pool.is_project_thread_running(project_id): raise exceptions.ValidationError(detail='存在正在處理的批量任務(wù),請(qǐng)稍后重試') # 提交一個(gè)異步任務(wù) future = global_thread_pool.executor.submit(self.batch_thread, project_id) global_thread_pool.future_dict[project_id] = future # 查看所有異步任務(wù) @login_required def check_future(request): data = global_thread_pool.check_future() return HttpResponse(status=status.HTTP_200_OK, content=json.dumps(data))
串行執(zhí)行
使用線程鎖
在全局線程池中初始化線程鎖
class ThreadPool(object): def __init__(self): self.executor = ThreadPoolExecutor(20) self.future_dict = {} self.lock = threading.Lock()
然后執(zhí)行線程前需要獲取鎖并再執(zhí)行結(jié)束后釋放鎖
def batch_thread(self): global_thread_pool.lock.acquire() try: ... global_thread_pool.lock.release() except Exception: trace_log = traceback.format_exc() logger.error('異步任務(wù)執(zhí)行失敗:\n %s' % trace_log) global_thread_pool.lock.release()
需要捕捉異常預(yù)防子線程出錯(cuò)而無(wú)法釋放鎖的情況
異步線程任務(wù)執(zhí)行前先檢查數(shù)據(jù)庫(kù)連接是否可用,然后關(guān)掉不可用連接
由于django的數(shù)據(jù)庫(kù)連接是保存到線程本地變量中的,通過(guò)ThreadPoolExecutor創(chuàng)建的線程會(huì)保存各自的數(shù)據(jù)庫(kù)連接。
當(dāng)連接被保存的時(shí)間超過(guò)mysql連接的最大超時(shí)時(shí)間,連接失效,但不會(huì)被線程釋放。
之后再調(diào)起線程執(zhí)行涉及到數(shù)據(jù)庫(kù)操作的異步任務(wù)時(shí),會(huì)用到失效的數(shù)據(jù)庫(kù)連接,導(dǎo)致報(bào)錯(cuò)“MySQL server has gone away”。
解決方案是在線程池的所有異步任務(wù)執(zhí)行前先檢查數(shù)據(jù)庫(kù)連接是否可用,然后關(guān)掉不可用連接
def batch_thread(self): for conn in connections.all(): conn.close_if_unusable_or_obsolete() ...
上述內(nèi)容就是Django異步任務(wù)線程池實(shí)現(xiàn)原理,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。