溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Flask如何實(shí)現(xiàn)異步執(zhí)行任務(wù)

發(fā)布時(shí)間:2022-03-01 10:18:20 來(lái)源:億速云 閱讀:247 作者:iii 欄目:開(kāi)發(fā)技術(shù)

這篇文章主要介紹了Flask如何實(shí)現(xiàn)異步執(zhí)行任務(wù)的相關(guān)知識(shí),內(nèi)容詳細(xì)易懂,操作簡(jiǎn)單快捷,具有一定借鑒價(jià)值,相信大家閱讀完這篇Flask如何實(shí)現(xiàn)異步執(zhí)行任務(wù)文章都會(huì)有所收獲,下面我們一起來(lái)看看吧。

Flask 是 Python 中有名的輕量級(jí)同步 web 框架,在一些開(kāi)發(fā)中,可能會(huì)遇到需要長(zhǎng)時(shí)間處理的任務(wù),此時(shí)就需要使用異步的方式來(lái)實(shí)現(xiàn),讓長(zhǎng)時(shí)間任務(wù)在后臺(tái)運(yùn)行,先將本次請(qǐng)求的響應(yīng)狀態(tài)返回給前端,不讓前端界面「卡頓」,當(dāng)異步任務(wù)處理好后,如果需要返回狀態(tài),再將狀態(tài)返回。

怎么實(shí)現(xiàn)呢?

使用線程的方式

當(dāng)要執(zhí)行耗時(shí)任務(wù)時(shí),直接開(kāi)啟一個(gè)新的線程來(lái)執(zhí)行任務(wù),這種方式最為簡(jiǎn)單快速。

通過(guò) ThreadPoolExecutor 來(lái)實(shí)現(xiàn)

from flask import Flask
from time import sleep
from concurrent.futures import ThreadPoolExecutor
# DOCS https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
# 創(chuàng)建線程池執(zhí)行器
executor = ThreadPoolExecutor(2)
app = Flask(__name__)
@app.route('/jobs')
def run_jobs():
 # 交由線程去執(zhí)行耗時(shí)任務(wù)
 executor.submit(long_task, 'hello', 123)
 return 'long task running.'
# 耗時(shí)任務(wù)
def long_task(arg1, arg2):
 print("args: %s %s!" % (arg1, arg2))
 sleep(5)
 print("Task is done!")
if __name__ == '__main__':
 app.run()

當(dāng)要執(zhí)行一些比較簡(jiǎn)單的耗時(shí)任務(wù)時(shí)就可以使用這種方式,如發(fā)郵件、發(fā)短信驗(yàn)證碼等。

但這種方式有個(gè)問(wèn)題,就是前端無(wú)法得知任務(wù)執(zhí)行狀態(tài)。

如果想要前端知道,就需要設(shè)計(jì)一些邏輯,比如將任務(wù)執(zhí)行狀態(tài)存儲(chǔ)到 redis 中,通過(guò)唯一的任務(wù) id 進(jìn)行標(biāo)識(shí),然后再寫一個(gè)接口,通過(guò)任務(wù) id 去獲取任務(wù)的狀態(tài),然后讓前端定時(shí)去請(qǐng)求該接口,從而獲得任務(wù)狀態(tài)信息。

全部自己實(shí)現(xiàn)就顯得有些麻煩了,而 Celery 剛好實(shí)現(xiàn)了這樣的邏輯,來(lái)使用一下。

使用 Celery

為了滿足前端可以獲得任務(wù)狀態(tài)的需求,可以使用 Celery。

Celery 是實(shí)時(shí)任務(wù)處理與調(diào)度的分布式任務(wù)隊(duì)列,它常用于 web 異步任務(wù)、定時(shí)任務(wù)等,后面單獨(dú)寫一篇文章描述 Celery 的架構(gòu),這里不深入討論。

現(xiàn)在我想讓前端可以通過(guò)一個(gè)進(jìn)度條來(lái)判斷后端任務(wù)的執(zhí)行情況。使用 Celery 就很容易實(shí)現(xiàn),首先通過(guò) pip 安裝 Celery 與 redis,之所以要安裝 redis,是因?yàn)樽?Celery 選擇 redis 作為「消息代理 / 消息中間件」。

pip install celery
pip install redis

在 Flask 中使用 Celery 其實(shí)很簡(jiǎn)單,這里先簡(jiǎn)單的過(guò)一下 Flask 中使用 Celery 的整體流程,然后再去實(shí)現(xiàn)具體的項(xiàng)目

1.在 Flask 中初始化 Celery

from flask import Flask
from celery import Celery
app = Flask(__name__)
# 配置
# 配置消息代理的路徑,如果是在遠(yuǎn)程服務(wù)器上,則配置遠(yuǎn)程服務(wù)器中redis的URL
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
# 要存儲(chǔ) Celery 任務(wù)的狀態(tài)或運(yùn)行結(jié)果時(shí)就必須要配置
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
# 初始化Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
# 將Flask中的配置直接傳遞給Celery
celery.conf.update(app.config)

上述代碼中,通過(guò) Celery 類初始化 celery 對(duì)象,傳入的應(yīng)用名稱與消息代理的連接 URL。

2.通過(guò) celery.task 裝飾器裝飾耗時(shí)任務(wù)對(duì)應(yīng)的函數(shù)

@celery.task
def long_task(arg1, arg2):
 # 耗時(shí)任務(wù)的邏輯
 return result

3.Flask 中定義接口通過(guò)異步的方式執(zhí)行耗時(shí)任務(wù)

@app.route('/', methods=['GET', 'POST'])
def index():
 task = long_task.delay(1, 2)
delay () 方法是 applyasync () 方法的快捷方式,applyasync () 參數(shù)更多,可以更加細(xì)致的控制耗時(shí)任務(wù),比如想要 long_task () 在一分鐘后再執(zhí)行
@app.route('/', methods=['GET', 'POST'])
def index():
 task = long_task.apply_async(args=[1, 2], countdown=60)

delay () 與 apply_async () 會(huì)返回一個(gè)任務(wù)對(duì)象,該對(duì)象可以獲取任務(wù)的狀態(tài)與各種相關(guān)信息。
通過(guò)這 3 步就可以使用 Celery 了。

接著就具體來(lái)實(shí)現(xiàn)「讓前端可以通過(guò)一個(gè)進(jìn)度條來(lái)判斷后端任務(wù)的執(zhí)行情況」的需求。

# bind為True,會(huì)傳入self給被裝飾的方法
@celery.task(bind=True)
def long_task(self):
 verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
 adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
 noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
 message = ''
 total = random.randint(10, 50)
 for i in range(total):
 if not message or random.random() < 0.25:
 # 隨機(jī)的獲取一些信息
 message = '{0} {1} {2}...'.format(random.choice(verb),
 random.choice(adjective),
 random.choice(noun))
 # 更新Celery任務(wù)狀態(tài)
 self.update_state(state='PROGRESS',
 meta={'current': i, 'total': total,
 'status': message})
 time.sleep(1)
 # 返回字典
 return {'current': 100, 'total': 100, 'status': 'Task completed!',
 'result': 42}

上述代碼中,celery.task () 裝飾器使用了 bind=True 參數(shù),這個(gè)參數(shù)會(huì)讓 Celery 將 Celery 本身傳入,可以用于記錄與更新任務(wù)狀態(tài)。

然后就是一個(gè) for 迭代,迭代的邏輯沒(méi)什么意義,就是隨機(jī)從 list 中抽取一些詞匯來(lái)模擬一些邏輯的運(yùn)行,為了表示這是耗時(shí)邏輯,通過(guò) time.sleep (1) 休眠一秒。

每次獲取一次詞匯,就通過(guò) self.update_state () 更新 Celery 任務(wù)的狀態(tài),Celery 包含一些內(nèi)置狀態(tài),如 SUCCESS、STARTED 等等,這里使用了自定義狀態(tài)「PROGRESS」,除了狀態(tài)外,還將本次循環(huán)的一些信息通過(guò) meta 參數(shù) (元數(shù)據(jù)) 以字典的形式存儲(chǔ)起來(lái)。有了這些數(shù)據(jù),前端就可以顯示進(jìn)度條了。

定義好耗時(shí)方法后,再定義一個(gè) Flask 接口方法來(lái)調(diào)用該耗時(shí)方法

@app.route('/longtask', methods=['POST'])
def longtask():
 # 異步調(diào)用
 task = long_task.apply_async()
 # 返回 202,與Location頭
 return jsonify({}), 202, {'Location': url_for('taskstatus',
 task_id=task.id)}

簡(jiǎn)單而言,前端通過(guò) POST 請(qǐng)求到 /longtask,讓后端開(kāi)始去執(zhí)行耗時(shí)任務(wù)。

返回的狀態(tài)碼為 202,202 通常表示一個(gè)請(qǐng)求正在進(jìn)行中,然后還在返回?cái)?shù)據(jù)包的包頭 (Header) 中添加了 Location 頭信息,前端可以通過(guò)讀取數(shù)據(jù)包中 Header 中的 Location 的信息來(lái)獲取任務(wù) id 對(duì)應(yīng)的完整 url。

前端有了任務(wù) id 對(duì)應(yīng)的 url 后,還需要提供一個(gè)接口給前端,讓前端可以通過(guò)任務(wù) id 去獲取當(dāng)前時(shí)刻任務(wù)的具體狀態(tài)。

@app.route('/status/<task_id>')
def taskstatus(task_id):
 task = long_task.AsyncResult(task_id)
 if task.state == 'PENDING': # 在等待
 response = {
 'state': task.state,
 'current': 0,
 'total': 1,
 'status': 'Pending...'
 }
 elif task.state != 'FAILURE': # 沒(méi)有失敗
 response = {
 'state': task.state, # 狀態(tài)
 # meta中的數(shù)據(jù),通過(guò)task.info.get()可以獲得
 'current': task.info.get('current', 0), # 當(dāng)前循環(huán)進(jìn)度
 'total': task.info.get('total', 1), # 總循環(huán)進(jìn)度
 'status': task.info.get('status', '')
 }
 if 'result' in task.info:
 response['result'] = task.info['result']
 else:
 # 后端執(zhí)行任務(wù)出現(xiàn)了一些問(wèn)題
 response = {
 'state': task.state,
 'current': 1,
 'total': 1,
 'status': str(task.info), # 報(bào)錯(cuò)的具體異常
 }
 return jsonify(response)

為了可以獲得任務(wù)對(duì)象中的信息,使用任務(wù) id 初始化 AsyncResult 類,獲得任務(wù)對(duì)象,然后就可以從任務(wù)對(duì)象中獲得當(dāng)前任務(wù)的信息。

該方法會(huì)返回一個(gè) JSON,其中包含了任務(wù)狀態(tài)以及 meta 中指定的信息,前端可以利用這些信息構(gòu)建一個(gè)進(jìn)度條。

如果任務(wù)在 PENDING 狀態(tài),表示該任務(wù)還沒(méi)有開(kāi)始,在這種狀態(tài)下,任務(wù)中是沒(méi)有什么信息的,這里人為的返回一些數(shù)據(jù)。如果任務(wù)執(zhí)行失敗,就返回 task.info 中包含的異常信息,此外就是正常執(zhí)行了,正常執(zhí)行可以通 task.info 獲得任務(wù)中具體的信息。

這樣,后端的邏輯就處理完成了,接著就來(lái)實(shí)現(xiàn)前端的邏輯,要實(shí)現(xiàn)圖形進(jìn)度條,可以直接使用 nanobar.js,簡(jiǎn)單兩句話就可以實(shí)現(xiàn)一個(gè)進(jìn)度條,其官網(wǎng)例子如下:

var options = {
 classname: 'my-class',
 id: 'my-id',
 // 進(jìn)度條要出現(xiàn)的位置
 target: document.getElementById('myDivId')
};
// 初始化進(jìn)度條對(duì)象
var nanobar = new Nanobar( options );
nanobar.go( 30 ); // 30% 進(jìn)度條
nanobar.go( 76 ); // 76% 進(jìn)度條
// 100% 進(jìn)度條,進(jìn)度條結(jié)束
nanobar.go(100);

有了 nanobar.js 就非常簡(jiǎn)單了。

先定義一個(gè)簡(jiǎn)單的 HTML 界面

<h3>Long running task with progress updates</h3>
<button id="start-bg-job">Start Long Calculation</button><br><br>
<div id="progress"></div>

通過(guò) JavaScript 實(shí)現(xiàn)對(duì)后臺(tái)的請(qǐng)求

// 按鈕點(diǎn)擊事件
$(function() {
 $('#start-bg-job').click(start_long_task);
 });
// 請(qǐng)求 longtask 接口
function start_long_task() {
 // 添加元素在html中
 div = $('<div class="progress"><div></div><div>0%</div><div>...</div><div> </div></div><hr>');
 $('#progress').append(div);
 // 創(chuàng)建進(jìn)度條對(duì)象
 var nanobar = new Nanobar({
 bg: '#44f',
 target: div[0].childNodes[0]
 });
 // ajax請(qǐng)求longtask
 $.ajax({
 type: 'POST',
 url: '/longtask',
 // 獲得數(shù)據(jù),從響應(yīng)頭中獲取Location
 success: function(data, status, request) {
 status_url = request.getResponseHeader('Location');
 // 調(diào)用 update_progress() 方法更新進(jìn)度條
 update_progress(status_url, nanobar, div[0]);
 },
 error: function() {
 alert('Unexpected error');
 }
 });
 }
// 更新進(jìn)度條
function update_progress(status_url, nanobar, status_div) {
 // getJSON()方法是JQuery內(nèi)置方法,這里向Location中對(duì)應(yīng)的url發(fā)起請(qǐng)求,即請(qǐng)求「/status/<task_id>」
 $.getJSON(status_url, function(data) {
 // 計(jì)算進(jìn)度
 percent = parseInt(data['current'] * 100 / data['total']);
 // 更新進(jìn)度條
 nanobar.go(percent);
 // 更新文字
 $(status_div.childNodes[1]).text(percent + '%');
 $(status_div.childNodes[2]).text(data['status']);
 if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') {
 if ('result' in data) {
 // 展示結(jié)果
 $(status_div.childNodes[3]).text('Result: ' + data['result']);
 }
 else {
 // 意料之外的事情發(fā)生
 $(status_div.childNodes[3]).text('Result: ' + data['state']);
 }
 }
 else {
 // 2秒后再次運(yùn)行
 setTimeout(function() {
 update_progress(status_url, nanobar, status_div);
 }, 2000);
 }
 }); 
 }

可以通過(guò)注釋閱讀代碼整體邏輯。

至此,需求實(shí)現(xiàn)完了,運(yùn)行一下。

首先運(yùn)行 Redis

redis-server

然后運(yùn)行 celery

celery worker -A app.celery --loglevel=info

最后運(yùn)行 Flask 項(xiàng)目

python app.py

關(guān)于“Flask如何實(shí)現(xiàn)異步執(zhí)行任務(wù)”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對(duì)“Flask如何實(shí)現(xiàn)異步執(zhí)行任務(wù)”知識(shí)都有一定的了解,大家如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI