溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

python celery 模塊

發(fā)布時間:2020-07-16 19:39:03 來源:網(wǎng)絡(luò) 閱讀:768 作者:weady 欄目:編程語言

Celery是基于Python開發(fā)的一個分布式任務(wù)隊列框架,支持使用任務(wù)隊列的方式在分布的機(jī)器/進(jìn)程/線程上執(zhí)行任務(wù)調(diào)度
python celery 模塊
Celery是典型的生產(chǎn)生-消費者模式,主要由三部分組成:broker(消息隊列)、workers(消費者:處理任務(wù))、backend(存儲結(jié)果)
1.編寫任務(wù)代碼task.py
from celery import Celery

app = Celery('tasks',broker='amqp://guest@localhost//', backend='redis://localhost:6379/0')

@app.task
def add(x, y):
return x + y

當(dāng)函數(shù)使用”@app.task”修飾后,即為可被Celery調(diào)度的任務(wù)
2.啟動workers 命令 celery worker -A tasks --loglevel=info --concurrency=5
3.調(diào)用任務(wù)

result=add.delay(2, 5)
result.ready()
result.get(timeout=1)

4.配置文件
單個參數(shù)配置:
app.conf.CELERY_BROKER_URL = 'amqp://guest@localhost//'
app.conf.CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
多個參數(shù)配置:
app.conf.update(
CELERY_BROKER_URL = 'amqp://guest@localhost//',
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
)

從配置文件中獲取:

先把配置存入配置文件中'celeryconfig.py'

BROKER_URL='amqp://guest@localhost//'
CELERY_RESULT_BACKEND='redis://localhost:6379/0'

導(dǎo)入到celery 對象中app.config_from_object('celeryconfig')
我們之前調(diào)用任務(wù)使用了”delay()”方法,它其實是對”apply_async()”方法的封裝,
使得你只要傳入任務(wù)所需的參數(shù)即可
關(guān)于序列化
Celery默認(rèn)序列化方式是”json”,指定序列化
app = Celery('tasks', broker='...', task_serializer='yaml')

app.conf.update(
CELERY_TASK_SERIALIZER='pickle',
CELERY_RESULT_SERIALIZER='json',
)

@app.task
def add(x, y):
...

add.apply_async((2, 5), serializer='json')


django + celery 實現(xiàn)任務(wù)的異步處理
1.Django Web中從一個http請求發(fā)起,到獲得響應(yīng)返回html頁面的流程大致如下:http請求發(fā)起 -- http handling(request解析) -- url mapping(url正則匹配找到對應(yīng)的View) -- 在View中進(jìn)行邏輯的處理、數(shù)據(jù)計算(包括調(diào)用Model類進(jìn)行數(shù)據(jù)庫的增刪改查)--將數(shù)據(jù)推送到template,返回對應(yīng)的template/response
同步請求:所有邏輯處理、數(shù)據(jù)計算任務(wù)在View中處理完畢后返回response。在View處理任務(wù)時用戶處于等待狀態(tài),直到頁面返回結(jié)果
異步請求:View中先返回response,再在后臺處理任務(wù)。用戶無需等待,可以繼續(xù)瀏覽網(wǎng)站。當(dāng)任務(wù)處理完成時,我們可以再告知用戶
2.建立消息隊列
消息隊列可以使用RabbitMQ、Redis 等
3.安裝django-celery
pip install celery django-celery
4.配置settings.py
import djcelery
djcelery.setup_loader()
BROKER_URL = 'django://' # 使用django做broker
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 定時任務(wù).
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' # 需要跟蹤任務(wù)的狀態(tài)時保存結(jié)果和狀態(tài)
CELERY_ENABLE_UTC = False # 不用UTC.
CELERY_TIMEZONE = 'Asia/Shanghai' # 指定上海時區(qū)
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] # 允許的格式
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_IGNORE_RESULT = True

INSTALLED_APPS = [
'djcelery',# 新增
'kombu.transport.django', # 新增kombu.transport.django則是基于Django的broker
]

其中,當(dāng)djcelery.setup_loader()運行時,Celery便會去查看INSTALLD_APPS下包含的所有app目錄中的tasks.py文件,找到標(biāo)記為task的方法,將它們注冊為celery task
5.在項目 mysite 下新建celery.py
from future import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')
from django.conf import settings # noqa
app = Celery('mysite')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

6.在應(yīng)用celery_project下新建tasks.py

from future import absolute_import
from celery import shared_task
import time

@shared_task(track_started=True)
def add(x, y):
time.sleep(30)
return x + y

在tasks.py中我們就可以編碼實現(xiàn)我們需要執(zhí)行的任務(wù)邏輯,在開始處import task,然后在要執(zhí)行的任務(wù)方法開頭用上裝飾器@task。需要注意的是,與一般的.py中實現(xiàn)celery不同,tasks.py必須建在各app的根目錄下,且不能隨意命名
6.生產(chǎn)任務(wù)
在需要執(zhí)行該任務(wù)的View中,通過test.delay的方式來創(chuàng)建任務(wù),并送入消息隊列
def produce():
a =1
b =2
r = test.delay(a,b)
7.啟動work
#先啟動服務(wù)器 python manage.py runserver
#再啟動worker celery worker -A mysite -c 4 --loglevel=info

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI