溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Django Celery異步任務隊列的實現(xiàn)

發(fā)布時間:2020-08-25 21:42:49 來源:腳本之家 閱讀:140 作者:嚴北 欄目:開發(fā)技術(shù)

背景

在開發(fā)中,我們常常會遇到一些耗時任務,舉個例子:

上傳并解析一個 1w 條數(shù)據(jù)的 Excel 文件,最后持久化至數(shù)據(jù)庫。

在我的程序中,這個任務耗時大約 6s,對于用戶來說,6s 的等待已經(jīng)是個災難了。

比較好的處理方式是:

  1. 接收這個任務的請求
  2. 將這個任務添加到隊列中
  3. 立即返回「操作成功,正在后臺處理」的字樣
  4. 后臺消費這個隊列,執(zhí)行這個任務

我們按照這個思路,借助 Celery 進行實現(xiàn)。

實現(xiàn)

本文所使用的環(huán)境如下:

  • Python 3.6.7
  • RabbitMQ 3.8
  • Celery 4.3

使用 Docker 安裝 RabbitMQ

Celery 依賴一個消息后端,可選方案有 RabbitMQ, Redis 等,本文選用 RabbitMQ 。

同時為了安裝方便,RabbitMQ 我直接使用 Docker 安裝:

docker run -d --name anno-rabbit -p 5672:5672 rabbitmq:3

啟動成功后,即可通過 amqp://localhost 訪問該消息隊列。

安裝并配置 Celery

Celery 是 Python 實現(xiàn)的工具,安裝可以直接通過 Pip 完成:

pip install celery

同時假設當前我的項目文件夾為 proj ,項目名為 myproj ,應用名為 myapp

安裝完成后,在 proj/myproj/ 路徑下創(chuàng)建一個 celery.py 文件,用來初始化 Celery 實例:

proj/myproj/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproj.settings')

app = Celery('myproj',
       broker='amqp://localhost//',
       backend='amqp://localhost//')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.s
# - namespace='CELERY' means all celery-related configuration keys
#  should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

然后在 proj/myproj/__init__.py 中添加對 Celery 對象的引用,確保 Django 啟動后能夠初始化 Celery:

proj/myproj/__init__.py

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

無其他特殊配置的話,Celery 的基本配置就是這些。

編寫一個耗時任務

為了模擬一個耗時任務,我們直接創(chuàng)建一個方法,使其「睡」10s ,并將其設置為 Celery 的任務:

proj/myapp/tasks.py

import time
from myproj.celery import app as celery_app

@celery_app.task
def waste_time():
  time.sleep(10)
  return "Run function 'waste_time' finished."

啟動 Celery Worker

Celery 配置完成,并且任務創(chuàng)建成功后,我們以異步任務的模式啟動 Celery :

celery -A myproj worker -l info

注意到我強調(diào)了異步模式,是因為 Celery 除了支持異步任務,還支持定時任務,因此啟動時候要指明。

同時要注意,Celery 一旦啟動,對 Task(此處為 waste_time) 的修改必須重啟 Celery 才會生效。

任務調(diào)用

在請求處理的邏輯代碼中,調(diào)用上面創(chuàng)建好的任務:

proj/myapp/views.py

from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from .tasks import waste_time

@require_http_methods(["POST"])
def upload_files(request):
  waste_time.delay()
  # Status code 202: Accepted, 表示異步任務已接受,可能還在處理中
  return JsonResponse({"results": "操作成功,正在上傳,請稍候..."}, status=202)

調(diào)用 waste_time.delay() 方法后, waste_time 會被加入到任務隊列中,等待空閑的 Celery Worker 調(diào)用。

效果

當我們發(fā)送請求時,這個接口會直接返回 {"results": "操作成功,正在上傳,請稍候..."} 的響應內(nèi)容而非卡住十秒,用戶體驗要好許多。

總結(jié)

用 Celery 處理這種異步任務是 Python 常用的方法,雖然實際執(zhí)行成功耗時不變甚至有所增加(如 Worker 繁忙導致處理滯后),但是對于用戶體驗來說更容易接受,點擊上傳大文件后可以繼續(xù)處理其他事務,而不需要在頁面等待。
Celery 還有更多用法本文未介紹到,其文檔已經(jīng)非常詳盡,有需要可直接參考。

參考

http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html

https://hub.docker.com/_/rabbitmq

以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持億速云。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI