溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Celery-一個(gè)會(huì)做異步任務(wù),定時(shí)任務(wù)的芹菜

發(fā)布時(shí)間:2020-08-31 08:46:19 來源:網(wǎng)絡(luò) 閱讀:2322 作者:wangfeng7399 欄目:編程語言

Celery 分布式任務(wù)隊(duì)列

同步與異步

比如說你要去一個(gè)餐廳吃飯,你點(diǎn)完菜以后假設(shè)服務(wù)員告訴你,你點(diǎn)的菜,要兩個(gè)小時(shí)才能做完,這個(gè)時(shí)候你可以有兩個(gè)選擇

  • 一直在餐廳等著飯菜上桌
  • 你可以回家等著,這個(gè)時(shí)候你就可以把你的電話留給服務(wù)員,告訴服務(wù)員等什么時(shí)候你的飯菜上桌了,在給你打電話

? 所謂同步就是一個(gè)任務(wù)的完成需要依賴另外一個(gè)任務(wù)時(shí),只有等待被依賴的任務(wù)完成后,依賴的任務(wù)才能算完成,這是一種可靠的任務(wù)序列。要么成功都成功,失敗都失敗,兩個(gè)任務(wù)的狀態(tài)可以保持一致。

? 所謂異步是不需要等待被依賴的任務(wù)完成,只是通知被依賴的任務(wù)要完成什么工作,依賴的任務(wù)也立即執(zhí)行,只要自己完成了整個(gè)任務(wù)就算完成了至于被依賴的任務(wù)最終是否真正完成,依賴它的任務(wù)無法確定,所以它是不可靠的任務(wù)序列。

阻塞與非阻塞

繼續(xù)上面的例子

  • 不管你的在餐廳等著還是回家等著,這個(gè)期間你的都不能干別的事,那么該機(jī)制就是阻塞的,表現(xiàn)在程序中,也就是該程序一直阻塞在該函數(shù)調(diào)用處不能繼續(xù)往下執(zhí)行。
  • 你回家以后就可以去做別的事了,一遍做別的事,一般去等待服務(wù)員的電話,這樣的狀態(tài)就是非阻塞的,因?yàn)槟?等待者)沒有阻塞在這個(gè)消息通知上,而是一邊做自己的事情一邊等待。

? 阻塞和非阻塞這兩個(gè)概念與程序(線程)等待消息通知(無所謂同步或者異步)時(shí)的狀態(tài)有關(guān)。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時(shí)的狀態(tài)角度來說的

同步/異步與阻塞/非阻塞

同步阻塞形式

  效率最低。拿上面的例子來說,就是你專心的在餐館等著,什么別的事都不做。

異步阻塞形式

  在家里等待的過程中,你一直盯著手機(jī),不去做其它的事情,那么很顯然,你被阻塞在了這個(gè)等待的操作上面;

  異步操作是可以被阻塞住的,只不過它不是在處理消息時(shí)阻塞,而是在等待消息通知時(shí)被阻塞。

同步非阻塞形式

  實(shí)際上是效率低下的。

  想象一下你如果害怕服務(wù)員忘記給你打電話通知你,你過一會(huì)就要去餐廳看一下你的飯菜好了沒有,沒好 ,在回家等待,過一會(huì)再去看一眼,沒好再回家等著,那么效率可想而知是低下的。

異步非阻塞形式

? 比如說你回家以后就直接看電視了,把手機(jī)放在一邊,等什么時(shí)候電話響了,你在去接電話.這就是異步非阻塞形式,大家想一下這樣是不是效率是最高的  

? 那么同步一定是阻塞的嗎?異步一定是非阻塞的嗎?

生產(chǎn)者消費(fèi)者模型

在實(shí)際的軟件開發(fā)過程中,經(jīng)常會(huì)碰到如下場景:某個(gè)模塊負(fù)責(zé)產(chǎn)生數(shù)據(jù),這些數(shù)據(jù)由另一個(gè)模塊來負(fù)責(zé)處理(此處的模塊是廣義的,可以是類、函數(shù)、線程、進(jìn)程等)。產(chǎn)生數(shù)據(jù)的模塊,就形象地稱為生產(chǎn)者;而處理數(shù)據(jù)的模塊,就稱為消費(fèi)者。

單單抽象出生產(chǎn)者和消費(fèi)者,還夠不上是生產(chǎn)者消費(fèi)者模式。該模式還需要有一個(gè)緩沖區(qū)處于生產(chǎn)者和消費(fèi)者之間,作為一個(gè)中介。生產(chǎn)者把數(shù)據(jù)放入緩沖區(qū),而消費(fèi)者從緩沖區(qū)取出數(shù)據(jù),如下圖所示:

Celery-一個(gè)會(huì)做異步任務(wù),定時(shí)任務(wù)的芹菜

生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過消息隊(duì)列(緩沖區(qū))來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給消息隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從消息隊(duì)列里取,消息隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。這個(gè)消息隊(duì)列就是用來給生產(chǎn)者和消費(fèi)者解耦的。------------->這里又有一個(gè)問題,什么叫做解耦?

解耦:假設(shè)生產(chǎn)者和消費(fèi)者分別是兩個(gè)類。如果讓生產(chǎn)者直接調(diào)用消費(fèi)者的某個(gè)方法,那么生產(chǎn)者對于消費(fèi)者就會(huì)產(chǎn)生依賴(也就是耦合)。將來如果消費(fèi)者的代碼發(fā)生變化,可能會(huì)影響到生產(chǎn)者。而如果兩者都依賴于某個(gè)緩沖區(qū),兩者之間不直接依賴,耦合也就相應(yīng)降低了。生產(chǎn)者直接調(diào)用消費(fèi)者的某個(gè)方法,還有另一個(gè)弊端。由于函數(shù)調(diào)用是同步的(或者叫阻塞的),在消費(fèi)者的方法沒有返回之前,生產(chǎn)者只好一直等在那邊。萬一消費(fèi)者處理數(shù)據(jù)很慢,生產(chǎn)者就會(huì)白白糟蹋大好時(shí)光。緩沖區(qū)還有另一個(gè)好處。如果制造數(shù)據(jù)的速度時(shí)快時(shí)慢,緩沖區(qū)的好處就體現(xiàn)出來了。當(dāng)數(shù)據(jù)制造快的時(shí)候,消費(fèi)者來不及處理,未處理的數(shù)據(jù)可以暫時(shí)存在緩沖區(qū)中。等生產(chǎn)者的制造速度慢下來,消費(fèi)者再慢慢處理掉。

因?yàn)樘橄?,看過網(wǎng)上的說明之后,通過我的理解,我舉了個(gè)例子:吃包子。

假如你非常喜歡吃包子(吃起來根本停不下來),今天,你媽媽(生產(chǎn)者)在蒸包子,廚房有張桌子(緩沖區(qū)),你媽媽將蒸熟的包子盛在盤子(消息)里,然后放到桌子上,你正在看巴西奧運(yùn)會(huì),看到蒸熟的包子放在廚房桌子上的盤子里,你就把盤子取走,一邊吃包子一邊看奧運(yùn)。在這個(gè)過程中,你和你媽媽使用同一個(gè)桌子放置盤子和取走盤子,這里桌子就是一個(gè)共享對象。生產(chǎn)者添加食物,消費(fèi)者取走食物。桌子的好處是,你媽媽不用直接把盤子給你,只是負(fù)責(zé)把包子裝在盤子里放到桌子上,如果桌子滿了,就不再放了,等待。而且生產(chǎn)者還有其他事情要做,消費(fèi)者吃包子比較慢,生產(chǎn)者不能一直等消費(fèi)者吃完包子把盤子放回去再去生產(chǎn),因?yàn)槌园拥娜擞泻芏啵绻@期間你好朋友來了,和你一起吃包子,生產(chǎn)者不用關(guān)注是哪個(gè)消費(fèi)者去桌子上拿盤子,而消費(fèi)者只去關(guān)注桌子上有沒有放盤子,如果有,就端過來吃盤子中的包子,沒有的話就等待。對應(yīng)關(guān)系如下圖:

Celery-一個(gè)會(huì)做異步任務(wù),定時(shí)任務(wù)的芹菜

celery

生產(chǎn)者消費(fèi)者模型

消費(fèi)者
from  celery import Celery

task=Celery('task',broker="redis://10.211.55.19:6379") #task可以是任何名稱,后面跟的是隊(duì)列的緩存者,celery中一般稱為中間人,如果要是密碼訪問的話,需要是redis://:{pass}@IP地址:端口

@task.task
def add(a,b):
    return a+b

啟動(dòng) celery從4.0版本以后就不在支持windows了,如果想在windows環(huán)境下使用的話,需要安裝eventlet這個(gè)包,啟動(dòng)的時(shí)候需要指定-P eventlet

celery worker -A c -l info 

生產(chǎn)者

from c import add
for i in range(10):
    add.delay(1,2)

模擬兩個(gè)消費(fèi)者

在不同的位置在啟動(dòng)一個(gè)worker既可以了
celery worker -A c -l info 

生產(chǎn)者消費(fèi)者模型升級

消費(fèi)者
from  celery import Celery

task=Celery('task',broker="redis://10.211.55.19:6379/0",backend="redis://10.211.55.19:6379/2")#broker和backend可以是不同的隊(duì)列,這里使用redis不同的庫來模擬不同的隊(duì)列,當(dāng)然也可以一樣

@task.task
def add(a,b):

    return a+b

啟動(dòng)過程跟上面一樣

生產(chǎn)者
from c import add

for i in range(10):
    t=add.delay(i,2)
    print(t.get()) #獲取結(jié)果
登錄redis查看信息
redis-cli
127.0.0.1:6379[1]> SELECT 2
127.0.0.1:6379[2]> KEYS *
127.0.0.1:6379[2]> get celery-task-meta-6c8dda5e-c8a3-4f5f-8c2b-e7541aafdc42
"{\"status\": \"SUCCESS\", \"result\": 3, \"traceback\": null, \"children\": [], \"task_id\": \"6c8dda5e-c8a3-4f5f-8c2b-e7541aafdc42\"}"
## 解析數(shù)據(jù)
d="{\"status\": \"SUCCESS\", \"result\": 3, \"traceback\": null, \"children\": [], \"task_id\": \"6c8dda5e-c8a3-4f5f-8c2b-e7541aafdc42\"}"
import json
print(json.loads(d))

獲取執(zhí)行狀態(tài)

倘若任務(wù)拋出了一個(gè)異常, get() 會(huì)重新拋出異常, 但你可以指定 propagate 參數(shù)來覆蓋這一行為:

result.get(propagate=False)

如果任務(wù)拋出了一個(gè)異常,你也可以獲取原始的回溯信息:

result.traceback…
print(t)
print(t.ready())
print(t.get())
print(t.ready())

定時(shí)任務(wù)

apply_async

t=add.apply_async((1,2),countdown=5) #表示延遲5秒鐘執(zhí)行任務(wù)
print(t)
print(t.get())
問題:是延遲5秒發(fā)送還是立即發(fā)送,消費(fèi)者延遲5秒在執(zhí)行那?

支持的參數(shù) :

  • countdown : 等待一段時(shí)間再執(zhí)行.

    add.apply_async((2,3), countdown=5)
  • eta : 定義任務(wù)的開始時(shí)間.這里的時(shí)間是UTC時(shí)間,這里有坑

    add.apply_async((2,3), eta=now+tiedelta(second=10))
  • expires : 設(shè)置超時(shí)時(shí)間.

    add.apply_async((2,3), expires=60)
  • retry : 定時(shí)如果任務(wù)失敗后, 是否重試.

    add.apply_async((2,3), retry=False)
  • retry_policy : 重試策略.

    • max_retries : 最大重試次數(shù), 默認(rèn)為 3 次.
    • interval_start : 重試等待的時(shí)間間隔秒數(shù), 默認(rèn)為 0 , 表示直接重試不等待.
    • interval_step : 每次重試讓重試間隔增加的秒數(shù), 可以是數(shù)字或浮點(diǎn)數(shù), 默認(rèn)為 0.2
    • interval_max : 重試間隔最大的秒數(shù), 即 通過 interval_step 增大到多少秒之后, 就不在增加了, 可以是數(shù)字或者浮點(diǎn)數(shù), 默認(rèn)為 0.2 .

周期任務(wù)

from c import task
task.conf.beat_schedule={
    timezone='Asia/Shanghai',
    "each20s_task":{
        "task":"c.add",
        "schedule":3, # 每3秒鐘執(zhí)行一次
        "args":(10,10)
    },

}

其實(shí)celery也支持linux里面的crontab格式的書寫的

from celery.schedules import crontab
task.conf.beat_schedule={
     timezone='Asia/Shanghai',
    "each4m_task":{
        "task":"c.add",
        "schedule":crontab(minute=3), #每小時(shí)的第3分鐘執(zhí)行
        "args":(10,10)
    },
     "each4m_task":{
        "task":"c.add",
        "schedule":crontab(minute=*/3), #每小時(shí)的第3分鐘執(zhí)行
        "args":(10,10)
    },
}

后臺(tái)啟動(dòng)

worker:
    celery multi start worker1 \
    -A c \
    --pidfile="$HOME/run/celery/%n.pid" \
    --logfile="$HOME/log/celery/%n%I.log"

    celery multi restart worker1 \
    -A proj \
    --logfile="$HOME/log/celery/%n%I.log" \
    --pidfile="$HOME/run/celery/%n.pid

    celery multi stopwait worker1 --pidfile="$HOME/run/celery/%n.pid"

beat:
    celery -A d beat --detach -l info -f beat.log

與django結(jié)合

1.執(zhí)行異步任務(wù)

1.1 在生成的目錄文件中添加celery文件,內(nèi)容如下
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tests.settings') #與項(xiàng)目關(guān)聯(lián)

app = Celery('tests',backend='redis://10.211.55.19/3',broker='redis://10.211.55.19/4')
#創(chuàng)建celery對象
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
#在django中創(chuàng)建celery的命名空間
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
#自動(dòng)加載任務(wù)
1.2編輯settings.py同級目錄的init.py
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ['celery_app']
1.3 在項(xiàng)目中添加tasks文件,用來保存tasks的文件
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def mul(x, y):
    return x * y

@shared_task
def xsum(numbers):
    return sum(numbers)
1.4添加views文件內(nèi)容
from .tasks import add

def index(request):
    result = add.delay(2, 3)
    return HttpResponse('返回?cái)?shù)據(jù){}'.format(result.get()))
1.5 啟動(dòng)worker
celery -A tests  worker -l info
1.6添加url并調(diào)用

2.執(zhí)行周期性任務(wù)

2.1需要安裝一個(gè)django的組件來完成這個(gè)事情
pip install django-celery-beat
2.2將django-celery-beat添加到INSTALLED_APPS里面
INSTALLED_APPS = (
    ...,
    'django_celery_beat',
)
2.3刷新到數(shù)據(jù)庫
python3 manage.py makemigrations #不執(zhí)行這個(gè)會(huì)有問題
python3 manage.py migrate
2.4 admin配置
2.5啟動(dòng)beat
celery -A tests beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
2.6 啟動(dòng)worker
celery -A tests worker -l info 
向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI