溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

celery (芹菜) 異步任務(wù)

發(fā)布時間:2020-07-16 22:57:49 來源:網(wǎng)絡(luò) 閱讀:511 作者:大牙啊 欄目:編程語言

第1章?celery

1.1什么是celery?

Celery是一個簡單靈活,且可靠,處理大量消息的分布式系統(tǒng)

python寫的,用來執(zhí)行定時任務(wù)和異步任務(wù)的框架

1.2celery的組成

消息中間àbroker

任務(wù)執(zhí)行單元àworker

任務(wù)執(zhí)行結(jié)果存儲àstore

1.3使用場景

異步任務(wù):將耗時操作任務(wù)交給celery去異步執(zhí)行,比如發(fā)送短信/郵件,消息推送,音視頻處理等

定時任務(wù):定時執(zhí)行某件事情,比如每天的數(shù)據(jù)統(tǒng)計(jì)

第2章?celery的安裝配置

?

pip install celery

消息中間件:rabbitmq/redis,這里使用redis

docker run -p 6379:6379 --name=redis -d redis:latest redis-server

第3章?celery執(zhí)行異步任務(wù)

3.1基本使用

定義任務(wù):

from?celery?import?Celery
import?time

#?任務(wù)產(chǎn)生存儲
broker =?'redis://10.211.55.8:16379/1'
#?任務(wù)執(zhí)行結(jié)果存儲
backend =?'redis://10.211.55.8:16379/2'
#?第一個參數(shù)是別名,隨便寫即可
app = Celery('first_celery',?backend=backend,?broker=broker)

@
app.task
def?test_celery(x,?y):
????time.sleep(
2)
????
return?x +?y

添加任務(wù):

import?task

if?__name__ ==?'__main__':
????
# result不是函數(shù)的執(zhí)行結(jié)果,是一個對象
????
result = tasks.add.delay(2,3)
????
print(result.id)

使用celery命令開啟work進(jìn)程

celery –A tasks worker –l info

?

3.2多任務(wù)結(jié)構(gòu):

當(dāng)有多個任務(wù)需要執(zhí)行時,會有多個任務(wù)函數(shù),位了方便解藕,我們可以使用這種方式,方便管理

celery.py

from?celeryimport?Celery

broker =?
'redis://10.211.55.8:16379/2'
backend =?'redis://10.211.55.8:16379/3'
# include包含兩個任務(wù)的函數(shù),去相應(yīng)文件中找對應(yīng)的任務(wù)函數(shù)
cel = Celery('test',?broker=broker,?backend=backend,?include=['celery_task.task',?'celery_task.task2'])

#?時區(qū),有時候我們想在固定的時間來執(zhí)行相關(guān)任務(wù),就需要設(shè)置時區(qū)相關(guān)配置
cel.conf.timezone =?'Asia/Shanghai'
#?是否UTC
cel.conf.enable_utc =?False

task.py

from?.celeryimport?app

@
app.task
def?sum(x,?y):
????
returnx+y

task2.py

from?.celeryimport?app

@
app.task
def?less(x,?y):
????
returnx-y

celery_task目錄下創(chuàng)建添加任務(wù)的py文件

from?celery_taskimport?task
from?celery_task?import?task2

task.sum.delay(
1,?2)
task2.less.delay(
3,?1)

第4章?celery執(zhí)行定時任務(wù)

4.1設(shè)定時間讓celery執(zhí)行一個任務(wù)

from?celery_taskimport?task
from?celery_task?import?task2
from?datetime?import?datetime,?timedelta

# #?方式一:
# #
獲取時間對象
# time = datetime(2019, 8, 3, 15, 0, 0)
# print(time)
# #?
將時間轉(zhuǎn)換成UTC時間
# utc_time = datetime.utcfromtimestamp(time.timestamp())
# print(utc_time)
# #?
apply_async,args是函數(shù)參數(shù),eta是指定時間
# result = task.add.apply_async(args=[1, 2], eta=utc_time)

#?
方式二:
ctime = datetime.now()
#?把當(dāng)前時間轉(zhuǎn)換成UTC時間
utc_time = datetime.utcfromtimestamp(ctime.utcfromtimestamp())
#?當(dāng)前時間延遲十秒
time_delta = timedelta(seconds=10)
task_time = utc_time + time_delta
result = task.add.apply_async(
args=[2,?3],?eta=task_time)


# task.sum.delay(1, 2)
# task2.less.delay(3, 1)

4.2類似crontab的定時任務(wù)

?多任務(wù)結(jié)構(gòu)中celery.py文件如下

from?celeryimport?Celery
from?datetime?import?timedelta
from?celery.schedules?import?crontab

broker =?
'redis://10.211.55.8:16379/2'
backend =?'redis://10.211.55.8:16379/3'

# include包含兩個任務(wù)的函數(shù),去相應(yīng)文件中找對應(yīng)的任務(wù)函數(shù)
app = Celery('test',?broker=broker,?backend=backend,?include=['celery_task.task',?'celery_task.task2'])
#?時區(qū)
app.conf.timezone =?'Asia/Shanghai'
#?是否UTC
app.conf.enable_utc =?False

app.conf.beat_schedules = {
????
#別名,可隨意編寫
????
'add_crontab': {
????????
#執(zhí)行task下的add函數(shù)
????????
'task':?'celery_task.task.add',
????????
#每隔兩秒執(zhí)行
????????
'schedules': timedelta(seconds=2),

????????
#固定時間執(zhí)行,每年411號,8點(diǎn)42分執(zhí)行
????????#'schedules': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),

????????#
傳遞參數(shù)
????????
'args': [1,?2],
????
}
}

第5章?Django中使用celery

在項(xiàng)目的目錄下創(chuàng)建celeryconfig.py

import?djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
????
'app01.tasks',
)
#有些情況可以防止死鎖
CELERYD_FORCE_EXECV=True
#?設(shè)置并發(fā)worker數(shù)量
CELERYD_CONCURRENCY=4
#允許重試
CELERY_ACKS_LATE=True
#?每個worker最多執(zhí)行100個任務(wù)被銷毀,可以防止內(nèi)存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
#?超時時間
CELERYD_TASK_TIME_LIMIT=12*30

app目錄下創(chuàng)建tasks.py

from?celeryimport?task

@
task
def?add(x,?y):
????
returnx+y

試圖函數(shù)views中使用

from?django.shortcutsimport?render,HttpResponse
from?app01.tasks?import?add
from?datetime?import?datetime,?timedelta
def?test(request):
????
# result=add.delay(2,3)
????
ctime = datetime.now()
????
#?默認(rèn)用utc時間
????
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
????time_delay= timedelta(
seconds=5)
????task_time = utc_ctime +?time_delay
????result = add.apply_async(
args=[4,?3],?eta=task_time)
????
print(result.id)
????
returnHttpResponse('ok')

settings中注冊

INSTALLED_APPS= [
????
'django.contrib.admin',
????
'django.contrib.auth',
????
'django.contrib.contenttypes',
????
'django.contrib.sessions',
????
'django.contrib.messages',
????
'django.contrib.staticfiles',
????
'djcelery',
????
'app01',
]

from?djceleryimport?celeryconfig

#?默認(rèn)使用rabbitmqredis的話需要指定下
BROKER_BACKEND=
'redis'
BROKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'

?


向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI