溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

發(fā)布時間:2023-04-13 11:58:55 來源:億速云 閱讀:159 作者:iii 欄目:編程語言

本篇內(nèi)容介紹了“Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

什么是 celery

這次我們來介紹一下 Python 的一個第三方模塊 celery,那么 celery 是什么呢?

  • celery 是一個靈活且可靠的,處理大量消息的分布式系統(tǒng),可以在多個節(jié)點之間處理某個任務(wù);

  • celery 是一個專注于實時處理的任務(wù)隊列,支持任務(wù)調(diào)度;

  • celery 是開源的,有很多的使用者;

  • celery 完全基于 Python 語言編寫;

所以 celery 本質(zhì)上就是一個任務(wù)調(diào)度框架,類似于 Apache 的 airflow,當(dāng)然 airflow 也是基于 Python 語言編寫。

不過有一點需要注意,celery 是用來調(diào)度任務(wù)的,但它本身并不具備存儲任務(wù)的功能,而調(diào)度任務(wù)的時候肯定是要把任務(wù)存起來的。因此要使用 celery 的話,還需要搭配一些具備存儲、訪問功能的工具,比如:消息隊列、Redis緩存、數(shù)據(jù)庫等等。官方推薦的是消息隊列 RabbitMQ,個人認(rèn)為有些時候使用 Redis 也是不錯的選擇,當(dāng)然我們都會介紹。

那么 celery 都可以在哪些場景中使用呢?

  • 異步任務(wù):一些耗時的操作可以交給celery異步執(zhí)行,而不用等著程序處理完才知道結(jié)果。比如:視頻轉(zhuǎn)碼、郵件發(fā)送、消息推送等等;

  • 定時任務(wù):比如定時推送消息、定時爬取數(shù)據(jù)、定時統(tǒng)計數(shù)據(jù)等等;

celery 的架構(gòu)

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

我們看一下 celery 的架構(gòu):

  • producer:生產(chǎn)者,專門用來生產(chǎn)任務(wù)(task);

  • celery beat:任務(wù)調(diào)度器,調(diào)度器進(jìn)程會讀取配置文件的內(nèi)容,周期性地將配置文件里面到期需要執(zhí)行的任務(wù)發(fā)送給消息隊列,說白了就是生產(chǎn)定時任務(wù);

  • broker:任務(wù)隊列,用于存放生產(chǎn)者和調(diào)度器生產(chǎn)的任務(wù)。一般使用消息隊列或者 Redis 來存儲,當(dāng)然具有存儲功能的數(shù)據(jù)庫也是可以的。這一部分是 celery 所不提供的,需要依賴第三方。作用就是接收任務(wù),存進(jìn)隊列;

  • worker:任務(wù)的執(zhí)行單元,會將任務(wù)從隊列中順序取出并執(zhí)行;

  • backend:用于在任務(wù)結(jié)束之后保存狀態(tài)信息和結(jié)果,以便查詢,一般是數(shù)據(jù)庫,當(dāng)然只要具備存儲功能都可以作為 backend;

下面我們來安裝 celery,安裝比較簡單,直接 pip install celery 即可。這里我本地的 celery 版本是 5.2.7,Python 版本是 3.8.10。

另外,由于 celery 本身不提供任務(wù)存儲的功能,所以這里我們使用 Redis 作為消息隊列,負(fù)責(zé)存儲任務(wù)。因此你還要在機(jī)器上安裝 Redis,我這里有一臺云服務(wù)器,已經(jīng)安裝好了。

后續(xù) celery 就會將任務(wù)存到 broker 里面,當(dāng)然要想實現(xiàn)這一點,就必須還要有能夠操作相應(yīng) broker 的驅(qū)動。Python 操作 Redis 的驅(qū)動也叫 redis,操作 RabbitMQ 的驅(qū)動叫 pika,直接 pip install ... 安裝即可。

celery 實現(xiàn)異步任務(wù)

我們新建一個工程,就叫 celery_demo,然后在里面新建一個 app.py 文件。

# 文件名:app.py
import time
# 這個 Celery 就類似于 flask.Flask
# 然后實例化得到一個app
from celery import Celery
# 指定一個 name、以及 broker 的地址、backend 的地址
app = Celery(
 "satori",
 # 這里使用我服務(wù)器上的 Redis
 # broker 用 1 號庫, backend 用 2 號庫
 broker="redis://:maverick@82.157.146.194:6379/1",
 backend="redis://:maverick@82.157.146.194:6379/2")
# 這里通過 @app.task 對函數(shù)進(jìn)行裝飾
# 那么之后我們便可調(diào)用 task.delay 創(chuàng)建一個任務(wù)
@app.task
def task(name, age):
 print("準(zhǔn)備執(zhí)行任務(wù)啦")
 time.sleep(3)
 return f"name is {name}, age is {age}"

我們說執(zhí)行任務(wù)的對象是 worker,那么我們是不是需要創(chuàng)建一個 worker 呢?顯然是需要的,而創(chuàng)建 worker 可以使用如下命令創(chuàng)建:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

注意:在 5.0 之前我們可以寫成 celery worker -A app ...,也就是把所有的參數(shù)都放在子命令 celery worker 的后面。但從 5.0 開始這種做法就不允許了,必須寫成 celery -A app worker ...,因為 -A 變成了一個全局參數(shù),所以它不應(yīng)該放在 worker 的后面,而是要放在 worker 的前面。

下面執(zhí)行該命令:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

以上就前臺啟動了一個 worker,正在等待從隊列中獲取任務(wù),圖中也顯示了相應(yīng)的信息。然而此時隊列中并沒有任務(wù),所以我們需要在另一個文件中創(chuàng)建任務(wù)并發(fā)送到隊列里面去。

import time
from app import task
# 從 app 導(dǎo)入 task, 創(chuàng)建任務(wù), 但是注意: 不要直接調(diào)用 task
# 因為那樣的話就在本地執(zhí)行了, 我們的目的是將任務(wù)發(fā)送到隊列里面去
# 然后讓監(jiān)聽隊列的 worker 從隊列里面取任務(wù)并執(zhí)行
# 而 task 被 @app.task 裝飾, 所以它不再是原來的 task 了
# 我們需要調(diào)用它的 delay 方法
# 調(diào)用 delay 之后, 就會創(chuàng)建一個任務(wù)
# 然后發(fā)送到隊列里面去, 也就是我們這里的 Redis
# 至于參數(shù), 普通調(diào)用的時候怎么傳, 在 delay 里面依舊怎么傳
start = time.perf_counter()
task.delay("古明地覺", 17)
print(
 time.perf_counter() - start
)# 0.11716766700000003

然后執(zhí)行該文件,發(fā)現(xiàn)只用了 0.12 秒,而 task 里面明明 sleep 了 3 秒。所以說明這一步是不會阻塞的,調(diào)用 task.delay 只是創(chuàng)建一個任務(wù)并發(fā)送至隊列。我們再看一下 worker 的輸出信息:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

可以看到任務(wù)已經(jīng)被消費者接收并且消費了,而且調(diào)用 delay 方法是不會阻塞的,花費的那 0.12 秒是用在了其它地方,比如連接 Redis 發(fā)送任務(wù)等等。

另外需要注意,函數(shù)被 @app.task 裝飾之后,可以理解為它就變成了一個任務(wù)工廠,因為被裝飾了嘛,然后調(diào)用任務(wù)工廠的 delay 方法即可創(chuàng)建任務(wù)并發(fā)送到隊列里面。我們也可以創(chuàng)建很多個任務(wù)工廠,但是這些任務(wù)工廠必須要讓 worker 知道,否則不會生效。所以如果修改了某個任務(wù)工廠、或者添加、刪除了某個任務(wù)工廠,那么一定要讓 worker 知道,而做法就是先停止 celery worker 進(jìn)程,然后再重新啟動。

如果我們新建了一個任務(wù)工廠,然后在沒有重啟 worker 的情況下,就用調(diào)用它的 delay 方法創(chuàng)建任務(wù)、并發(fā)送到隊列的話,那么會拋出一個 KeyError,提示找不到相應(yīng)的任務(wù)工廠。

  • 其實很好理解,因為代碼已經(jīng)加載到內(nèi)存里面了,光修改了源文件而不重啟是沒用的。因為加載到內(nèi)存里面的還是原來的代碼,不是修改過后的。

然后我們再來看看 Redis 中存儲的信息,1 號庫用作 broker,負(fù)責(zé)存儲任務(wù);2 號庫用作 backend,負(fù)責(zé)存儲執(zhí)行結(jié)果。我們來看 2 號庫:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

以上我們就啟動了一個 worker 并成功消費了隊列中的任務(wù),并且還從 Redis 里面拿到了執(zhí)行信息。當(dāng)然啦,如果只能通過查詢 backend 才能拿到信息的話,那 celery 就太不智能了。我們也可以直接從程序中獲取。

直接查詢?nèi)蝿?wù)執(zhí)行信息

Redis(backend)里面存儲了很多關(guān)于任務(wù)的信息,這些信息我們可以直接在程序中獲取。

from app import task
res = task.delay("古明地覺", 17)
print(type(res))
""""""
# 直接打印,顯示任務(wù)的 id
print(res)
"""
4bd48a6d-1f0e-45d6-a225-6884067253c3
"""
# 獲取狀態(tài), 顯然此刻沒有執(zhí)行完
# 因此結(jié)果是PENDING, 表示等待狀態(tài)
print(res.status)
"""
PENDING
"""
# 獲取 id,兩種方式均可
print(res.task_id)
print(res.id)
"""
4bd48a6d-1f0e-45d6-a225-6884067253c3
4bd48a6d-1f0e-45d6-a225-6884067253c3
"""
# 獲取任務(wù)執(zhí)行結(jié)束時的時間
# 任務(wù)還沒有結(jié)束, 所以返回None
print(res.date_done)
"""
None
"""
# 獲取任務(wù)的返回值, 可以通過 result 或者 get()
# 注意: 如果是 result, 那么任務(wù)還沒有執(zhí)行完的話會直接返回 None
# 如果是 get(), 那么會阻塞直到任務(wù)完成
print(res.result)
print(res.get())
"""
None
name is 古明地覺, age is 17
"""
# 再次查看狀態(tài)和執(zhí)行結(jié)束時的時間
# 發(fā)現(xiàn) status 變成SUCCESS
# date_done 變成了執(zhí)行結(jié)束時的時間
print(res.status)
# 但顯示的是 UTC 時間
print(res.date_done)
"""
SUCCESS
2022-09-08 06:40:34.525492
"""

另外我們說結(jié)果需要存儲在 backend 中,如果沒有配置 backend,那么獲取結(jié)果的時候會報錯。至于 backend,因為它是存儲結(jié)果的,所以一般會保存在數(shù)據(jù)庫中,因為要持久化。我這里為了方便,就還是保存在 Redis 中。

celery.result.AsyncResult 對象

調(diào)用完任務(wù)工廠的 delay 方法之后,會創(chuàng)建一個任務(wù)并發(fā)送至隊列,同時返回一個 AsyncResult 對象,基于此對象我們可以拿到任務(wù)執(zhí)行時的所有信息。但是 AsyncResult 對象我們也可以手動構(gòu)造,舉個例子:

import time
# 我們不光要導(dǎo)入 task, 還要導(dǎo)入里面的 app
from app import app, task
# 導(dǎo)入 AsyncResult 這個類
from celery.result import AsyncResult
# 發(fā)送任務(wù)到隊列當(dāng)中
res = task.delay("古明地覺", 17)
# 傳入任務(wù)的 id 和 app, 創(chuàng)建 AsyncResult 對象
async_result = AsyncResult(res.id, app=app)
# 此時的這個 res 和 async_result 之間是等價的
# 兩者都是 AsyncResult 對象, 它們所擁有的方法也是一樣的
# 下面用誰都可以
while True:
 # 等價于async_result.state == "SUCCESS"
 if async_result.successful():
 print(async_result.get())
 break
 # 等價于async_result.state == "FAILURE"
 elif async_result.failed():
 print("任務(wù)執(zhí)行失敗")
 elif async_result.status == "PENDING":
 print("任務(wù)正在被執(zhí)行")
 elif async_result.status == "RETRY":
 print("任務(wù)執(zhí)行異常正在重試")
 elif async_result.status == "REJECTED":
 print("任務(wù)被拒絕接收")
 elif async_result.status == "REVOKED":
 print("任務(wù)被取消")
 else:
 print("其它的一些狀態(tài)")
 time.sleep(0.8)
"""
任務(wù)正在被執(zhí)行
任務(wù)正在被執(zhí)行
任務(wù)正在被執(zhí)行
任務(wù)正在被執(zhí)行
name is 古明地覺, age is 17
"""

以上就是任務(wù)可能出現(xiàn)的一些狀態(tài),通過輪詢的方式,我們也可以查看任務(wù)是否已經(jīng)執(zhí)行完畢。當(dāng)然 AsyncResult 還有一些別的方法,我們來看一下:

from app import task
res = task.delay("古明地覺", 17)
# 1. ready():查看任務(wù)狀態(tài),返回布爾值。
# 任務(wù)執(zhí)行完成返回 True,否則為 False
# 那么問題來了,它和 successful() 有什么區(qū)別呢?
# successful() 是在任務(wù)執(zhí)行成功之后返回 True, 否則返回 False
# 而 ready() 只要是任務(wù)沒有處于阻塞狀態(tài)就會返回 True
# 比如執(zhí)行成功、執(zhí)行失敗、被 worker 拒收都看做是已經(jīng) ready 了
print(res.ready())
"""
False
"""
# 2. wait():和之前的 get 一樣, 因為在源碼中寫了: wait = get
# 所以調(diào)用哪個都可以, 不過 wait 可能會被移除,建議直接用 get 就行
print(res.wait())
print(res.get())
"""
name is 古明地覺, age is 17
name is 古明地覺, age is 17
"""
# 3. trackback:如果任務(wù)拋出了一個異常,可以獲取原始的回溯信息
# 執(zhí)行成功就是 None
print(res.traceback)
"""
None
"""

以上就是獲取任務(wù)執(zhí)行結(jié)果相關(guān)的部分。

celery 的配置

celery 的配置不同,所表現(xiàn)出來的性能也不同,比如序列化的方式、連接隊列的方式,單線程、多線程、多進(jìn)程等等。那么 celery 都有那些配置呢?

  • broker_url:broker 的地址,就是類 Celery 里面?zhèn)魅氲?broker 參數(shù)。

  • result_backend:存儲結(jié)果地址,就是類 Celery 里面?zhèn)魅氲?backend 參數(shù)。

  • task_serializer:任務(wù)序列化方式,支持以下幾種:

  • binary:二進(jìn)制序列化方式,pickle 模塊默認(rèn)的序列化方法;

  • json:支持多種語言,可解決多語言的問題,但通用性不高;

  • xml:標(biāo)簽語言,和 json 定位相似;

  • msgpack:二進(jìn)制的類 json 序列化,但比 json 更小、更快;

  • yaml:表達(dá)能力更強(qiáng)、支持的類型更多,但是在 Python里面的性能不如 json;

  • 根據(jù)情況,選擇合適的類型。如果不是跨語言的話,直接選擇 binary 即可,默認(rèn)是 json。

  • result_serializer:任務(wù)執(zhí)行結(jié)果序列化方式,支持的方式和任務(wù)序列化方式一致。

  • result_expires:任務(wù)結(jié)果的過期時間,單位是秒。

  • accept_content:指定任務(wù)接受的內(nèi)容序列化類型(序列化),一個列表,比如:["msgpack", "binary", "json"]。

  • timezone:時區(qū),默認(rèn)是 UTC 時區(qū)。

  • enable_utc:是否開啟 UTC 時區(qū),默認(rèn)為 True;如果為 False,則使用本地時區(qū)。

  • task_publish_retry:發(fā)送消息失敗時是否重試,默認(rèn)為 True。

  • worker_concurrency:并發(fā)的 worker 數(shù)量。

  • worker_prefetch_multiplier:每次 worker 從任務(wù)隊列中獲取的任務(wù)數(shù)量。

  • worker_max_tasks_per_child:每個 worker 執(zhí)行多少次就會被殺掉,默認(rèn)是無限的。

  • task_time_limit:單個任務(wù)執(zhí)行的最大時間,單位是秒。

  • task_default_queue :設(shè)置默認(rèn)的隊列名稱,如果一個消息不符合其它的隊列規(guī)則,就會放在默認(rèn)隊列里面。如果什么都不設(shè)置的話,數(shù)據(jù)都會發(fā)送到默認(rèn)的隊列中。

  • task_queues :設(shè)置詳細(xì)的隊列

# 將 RabbitMQ 作為 broker 時需要使用
task_queues = {
 # 這是指定的默認(rèn)隊列
"default": {
 "exchange": "default",
 "exchange_type": "direct",
 "routing_key": "default"
 },
 # 凡是 topic 開頭的 routing key
 # 都會被放到這個隊列
"topicqueue": {
 "routing_key": "topic.#",
 "exchange": "topic_exchange",
 "exchange_type": "topic",
 },
 "task_eeg": { # 設(shè)置扇形交換機(jī)
 "exchange": "tasks",
 "exchange_type": "fanout",
 "binding_key": "tasks",
 },
}

celery 的配置非常多,不止我們上面說的那些,更多配置可以查看官網(wǎng),寫的比較詳細(xì)。

  • https://docs.celeryq.dev/en/latest/userguide/configuration.html#general-settings

值得一提的是,在 5.0 之前配置項都是大寫的,而從 5.0 開始配置項改成小寫了。不過老的寫法目前仍然支持,只是啟動的時候會拋警告,并且在 6.0 的時候不再兼容老的寫法。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

官網(wǎng)也很貼心地將老版本的配置和新版本的配置羅列了出來,盡管配置有很多,但并不是每一個都要用,可以根據(jù)自身的業(yè)務(wù)合理選擇。

然后下面我們就根據(jù)配置文件的方式啟動 celery,當(dāng)前目錄結(jié)構(gòu)如下:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

celery_demo/config.py

broker_url = "redis://:maverick@82.157.146.194:6379/1"
result_backend = "redis://:maverick@82.157.146.194:6379"
# 寫倆就完事了

celery_demo/tasks/task1.py

celery 可以支持非常多的定時任務(wù),而不同種類的定時任務(wù)我們一般都會寫在不同的模塊中(當(dāng)然這里目前只有一個),然后再將這些模塊組織在一個單獨的目錄中。

當(dāng)前只有一個 task1.py,我們隨便往里面寫點東西,當(dāng)然你也可以創(chuàng)建更多的文件。

def add(x, y):
return x + y
def sub(x, y):
 return x - y
def mul(x, y):
return x * y
def div(x, y):
 return x / y

celery_demo/app.py

from celery import Celery
import config
from tasks.task1 import (
 add, sub, mul, div
)
# 指定一個 name 即可
app = Celery("satori")
# 其它參數(shù)通過加載配置文件的方式指定
# 和 flask 非常類似
app.config_from_object(config)
# 創(chuàng)建任務(wù)工廠,有了任務(wù)工廠才能創(chuàng)建任務(wù)
# 這種方式和裝飾器的方式是等價的
add = app.task(add)
sub = app.task(sub)
mul = app.task(mul)
div = app.task(div)

然后重新啟動 worker:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

輸出結(jié)果顯示,任務(wù)工廠都已經(jīng)被加載進(jìn)來了,然后我們創(chuàng)建任務(wù)并發(fā)送至隊列。

# 在 celery_demo 目錄下
# 將 app.py 里面的任務(wù)工廠導(dǎo)入進(jìn)來
>>> from app import add, sub, mul, div
# 然后創(chuàng)建任務(wù)發(fā)送至隊列,并等待結(jié)果
>>> add.delay(3, 4).get()
7
>>> sub.delay(3, 4).get()
-1
>>> mul.delay(3, 4).get()
12
>>> div.delay(3, 4).get()
0.75
>>>

結(jié)果正常返回了,再來看看 worker 的輸出,

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

多個任務(wù)都被執(zhí)行了。

發(fā)送任務(wù)時指定參數(shù)

我們在發(fā)送任務(wù)到隊列的時候,使用的是 delay 方法,里面直接傳遞函數(shù)所需的參數(shù)即可,那么除了函數(shù)需要的參數(shù)之外,還有沒有其它參數(shù)呢?

首先 delay 方法實際上是調(diào)用的 apply_async 方法,并且 delay 方法里面只接收函數(shù)的參數(shù),但是 apply_async 接收的參數(shù)就很多了,我們先來看看它們的函數(shù)原型:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

delay 方法的 *args 和 **kwargs 就是函數(shù)的參數(shù),它會傳遞給 apply_async 的 args 和 kwargs。而其它的參數(shù)就是發(fā)送任務(wù)時所設(shè)置的一些參數(shù),我們這里重點介紹一下 apply_async 的其它參數(shù)。

  • countdown:倒計時,表示任務(wù)延遲多少秒之后再執(zhí)行,參數(shù)為整型;

  • eta:任務(wù)的開始時間,datetime 類型,如果指定了 countdown,那么這個參數(shù)就不應(yīng)該再指定;

  • expires:datetime 或者整型,如果到規(guī)定時間、或者未來的多少秒之內(nèi),任務(wù)還沒有發(fā)送到隊列被 worker 執(zhí)行,那么該任務(wù)將被丟棄;

  • shadow:重新指定任務(wù)的名稱,覆蓋 app.py 創(chuàng)建任務(wù)時日志上所指定的名字;

  • retry:任務(wù)失敗之后是否重試,bool 類型;

  • retry_policy:重試所采用的策略,如果指定這個參數(shù),那么 retry 必須要為 True。參數(shù)類型是一個字典,里面參數(shù)如下:

  • max_retries : 最大重試次數(shù),默認(rèn)為 3 次;

  • interval_start : 重試等待的時間間隔秒數(shù),默認(rèn)為 0,表示直接重試不等待;

  • interval_step : 每次重試讓重試間隔增加的秒數(shù),可以是數(shù)字或浮點數(shù),默認(rèn)為 0.2;

  • interval_max : 重試間隔最大的秒數(shù),即通過 interval_step 增大到多少秒之后, 就不在增加了, 可以是數(shù)字或者浮點數(shù);

  • routing_key:自定義路由鍵,針對 RabbitMQ;

  • queue:指定發(fā)送到哪個隊列,針對 RabbitMQ;

  • exchange:指定發(fā)送到哪個交換機(jī),針對 RabbitMQ;

  • priority:任務(wù)隊列的優(yōu)先級,0-9 之間,對于 RabbitMQ 而言,0是最高級;

  • serializer:任務(wù)序列化方法,通常不設(shè)置;

  • compression:壓縮方案,通常有zlib、bzip2;

  • headers:為任務(wù)添加額外的消息頭;

  • link:任務(wù)成功執(zhí)行后的回調(diào)方法,是一個signature對象,可以用作關(guān)聯(lián)任務(wù);

  • link_error: 任務(wù)失敗后的回調(diào)方法,是一個signature對象;

我們隨便挑幾個舉例說明:

>>> from app import add
# 使用 apply_async,要注意參數(shù)的傳遞
# 位置參數(shù)使用元組或者列表,關(guān)鍵字參數(shù)使用字典
# 因為是args和kwargs, 不是 *args和 **kwargs
>>> add.apply_async([3], {"y": 4},
... task_,
... countdown=5).get()
7
>>>

查看一下 worker 的輸出:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

注意左邊的時間,16:25:16 收到的消息,但 5 秒后才執(zhí)行完畢,因為我們將 countdown 參數(shù)設(shè)置為 5。并且任務(wù)的 id 也被我們修改了。

另外還需要注意一下那些接收時間的參數(shù),比如 eta。如果我們手動指定了eta,那么一定要注意時區(qū)的問題,要保證 celery 所使用的時區(qū)和你傳遞的 datetime 的時區(qū)是統(tǒng)一的。

其它的參數(shù)可以自己手動測試一下,這里不細(xì)說了,根據(jù)自身的業(yè)務(wù)選擇合適的參數(shù)即可。

創(chuàng)建任務(wù)工廠的另一種方式

之前在創(chuàng)建任務(wù)工廠的時候,是將函數(shù)導(dǎo)入到 app.py 中,然后通過 add = app.task(add) 的方式手動裝飾,因為有哪些任務(wù)工廠必須要讓 worker 知道,所以一定要在 app.py 里面出現(xiàn)。但是這顯然不夠優(yōu)雅,那么可不可以這么做呢?

# celery_demo/tasks/task1.py
from app import app
# celery_demo 所在路徑位于 sys.path 中
# 因此這里可以直接 from app import app
@app.task
def add(x, y):
 return x + y
@app.task
def sub(x, y):
 return x - y
# celery_demo/app.py
from tasks.task1 import add, sub

按照上面這種做法,理想上可以,但現(xiàn)實不行,因為會發(fā)生循環(huán)導(dǎo)入。

所以 celery 提供了一個辦法,我們依舊在 task1.py 中 import app,但在 app.py 中不再使用 import,而是通過 include 加載的方式,我們看一下:

# celery_demo/tasks/task1.py
from app import app
@app.task
def add(x, y):
 return x + y
@app.task
def sub(x, y):
 return x - y
# celery_demo/app.py
from celery import Celery
import config
# 通過 include 指定存放任務(wù)的 py 文件
# 注意它和 worker 啟動路徑之間的關(guān)系
# 我們是在 celery_demo 目錄下啟動的 worker
# 所以應(yīng)該寫成 "tasks.task1"
# 如果是在 celery_demo 的上一級目錄啟動 worker
# 那么這里就要指定為 "celery_demo.tasks.task1"
# 當(dāng)然啟動時的 -A app 也要換成 -A celery_demo.app
app = Celery(__name__, include=["tasks.task1"])
# 如果還有其它文件,比如 task2.py, task3.py
# 那么就把 "tasks.task2", "tasks.task3" 加進(jìn)去
app.config_from_object(config)

在 celery_demo 目錄下重新啟動 worker。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

為了方便,我們只保留了兩個任務(wù)工廠??梢钥吹酱藭r就成功啟動了,并且也更加方便和優(yōu)雅一些。之前是在 task1.py 中定義函數(shù),然后再把 task1.py 中的函數(shù)導(dǎo)入到 app.py 里面,然后手動進(jìn)行裝飾。雖然這么做是沒問題的,但很明顯這種做法不適合管理。

所以還是要將 app.py 中的 app 導(dǎo)入到 task1.py 中直接創(chuàng)建任務(wù)工廠,但如果再將 task1.py 中的任務(wù)工廠導(dǎo)入到 app.py 中就會發(fā)生循環(huán)導(dǎo)入。于是 celery 提供了一個 include 參數(shù),可以在創(chuàng)建 app 的時候自動將里面所有的任務(wù)工廠加載進(jìn)來,然后啟動并告訴 worker。

我們來測試一下:

# 通過 tasks.task1 導(dǎo)入任務(wù)工廠
# 然后創(chuàng)建任務(wù),發(fā)送至隊列
>>> from tasks.task1 import add, sub
>>> add.delay(11, 22).get()
33
>>> sub.delay(11, 22).get()
-11
>>>

查看一下 worker 的輸出:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

結(jié)果一切正常。

Task 對象

我們之前通過對一個函數(shù)使用 @app.task 即可將其變成一個任務(wù)工廠,而這個任務(wù)工廠就是一個 Task 實例對象。而我們在使用 @app.task 的時候,其實是可以加上很多的參數(shù)的,常用參數(shù)如下:

name:默認(rèn)的任務(wù)名是一個uuid,我們可以通過 name 參數(shù)指定任務(wù)名,當(dāng)然這個 name 就是 apply_async 的參數(shù) name。如果在 apply_async 中指定了,那么以 apply_async 指定的為準(zhǔn);

  • bind:一個 bool 值,表示是否和任務(wù)工廠進(jìn)行綁定。如果綁定,任務(wù)工廠會作為參數(shù)傳遞到方法中;

  • base:定義任務(wù)的基類,用于定義回調(diào)函數(shù),當(dāng)任務(wù)到達(dá)某個狀態(tài)時觸發(fā)不同的回調(diào)函數(shù),默認(rèn)是 Task,所以我們一般會自己寫一個類然后繼承 Task;

  • default_retry_delay:設(shè)置該任務(wù)重試的延遲機(jī)制,當(dāng)任務(wù)執(zhí)行失敗后,會自動重試,單位是秒,默認(rèn)是3分鐘;

  • serializer:指定序列化的方法;

當(dāng)然 app.task 還有很多不常用的參數(shù),這里就不說了,有興趣可以去查看官網(wǎng)或源碼,我們演示一下幾個常用的參數(shù):

# celery_demo/tasks/task1.py
from app import app
@app.task(name="你好")
def add(x, y):
 return x + y
@app.task(name="我不好", bind=True)
def sub(self, x, y):
 """
 如果 bind=True,則需要多指定一個 self
 這個 self 就是對應(yīng)的任務(wù)工廠
 """
 # self.request 是一個 celery.task.Context 對象
 # 獲取它的屬性字典,即可拿到該任務(wù)的所有屬性
 print(self.request.__dict__)
 return x - y

其它代碼不變,我們重新啟動 worker:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

然后創(chuàng)建任務(wù)發(fā)送至隊列,再由 worker 取出執(zhí)行:

>>> from tasks.task1 import add, sub
>>> add.delay(111, 222).get()
333
>>> sub.delay(111, 222).get()
-111
>>>

執(zhí)行沒有問題,然后看看 worker 的輸出:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

創(chuàng)建任務(wù)工廠時,如果指定了 bind=True,那么執(zhí)行任務(wù)時會將任務(wù)工廠本身作為第一個參數(shù)傳過去。任務(wù)工廠本質(zhì)上就是 Task 實例對象,調(diào)用它的 delay 方法即可創(chuàng)建任務(wù)。

所以如果我們在 sub 內(nèi)部繼續(xù)調(diào)用 self.delay(11, 22),會有什么后果呢?沒錯,worker 會進(jìn)入無限遞歸。因為執(zhí)行任務(wù)的時候,在任務(wù)的內(nèi)部又創(chuàng)建了任務(wù),所以會死循環(huán)下去。

當(dāng)然 self 還有很多其它屬性和方法,具體有哪些可以通過 Task 這個類來查看。這里面比較重要的是 self.request,它包含了某個具體任務(wù)的相關(guān)信息,而且信息非常多。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

比如當(dāng)前傳遞的參數(shù)是什么,就可以通過 self.request 拿到。當(dāng)然啦,self.request 是一個 Context 對象,因為不同任務(wù)獲取 self.request 的結(jié)果肯定是不同的,但 self(任務(wù)工廠)卻只有一個,所以要基于 Context 進(jìn)行隔離。

我們可以通過 __dict__ 拿到 Context 對象的屬性字典,然后再進(jìn)行操作。

最后再來說一說 @app.task 里面的 base 參數(shù)。

# celery_demo/tasks/task1.py
from celery import app
from app import Task
class MyTask(Task):
 """
 自定義一個類,繼承自celery.Task
 exc: 失敗時的錯誤的類型;
 task_id: 任務(wù)的id;
 args: 任務(wù)函數(shù)的位置參數(shù);
 kwargs: 任務(wù)函數(shù)的關(guān)鍵字參數(shù);
 einfo: 失敗時的異常詳細(xì)信息;
 retval: 任務(wù)成功執(zhí)行的返回值;
 """
 def on_failure(self, exc, task_id, args, kwargs, einfo):
 """任務(wù)失敗時執(zhí)行"""
 def on_success(self, retval, task_id, args, kwargs):
 """任務(wù)成功時執(zhí)行"""
 print("任務(wù)執(zhí)行成功")
 def on_retry(self, exc, task_id, args, kwargs, einfo):
 """任務(wù)重試時執(zhí)行"""
# 使用 @app.task 的時候,指定 base 即可
# 然后任務(wù)在執(zhí)行的時候,會觸發(fā) MyTask 里面的回調(diào)函數(shù)
@app.task(name="地靈殿", base=MyTask)
def add(x, y):
 print("加法計算")
 return x + y

重新啟動 worker,然后創(chuàng)建任務(wù)。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

指定了 base,任務(wù)在執(zhí)行的時候會根據(jù)執(zhí)行狀態(tài)的不同,觸發(fā) MyTask 里面的不同方法。

自定義任務(wù)流

有時候我們也可以將執(zhí)行的多個任務(wù),劃分到一個組中。

# celery_demo/tasks/task1.py
from app import app
@app.task()
def add(x, y):
 print("加法計算")
 return x + y
@app.task()
def sub(x, y):
 print("減法計算")
 return x - y
@app.task()
def mul(x, y):
 print("乘法計算")
 return x * y
@app.task()
def div(x, y):
 print("除法計算")
 return x // y

老規(guī)矩,重啟 worker,因為我們修改了任務(wù)工廠。

然后來導(dǎo)入它們,創(chuàng)建任務(wù),并將這些任務(wù)劃分到一個組中。

>>> from tasks.task1 import add, sub, mul, div
>>> from celery import group
# 調(diào)用 signature 方法,得到 signature 對象
# 此時 t1.delay() 和 add.delay(2, 3) 是等價的
>>> t1 = add.signature(args=(2, 3))
>>> t2 = sub.signature(args=(2, 3))
>>> t3 = mul.signature(args=(2, 3))
>>> t4 = div.signature(args=(4, 2))
# 但是變成 signature 對象之后,
# 我們可以將其放到一個組里面
>>> gp = group(t1, t2, t3, t4)
# 執(zhí)行組任務(wù)
# 返回 celery.result.GroupResult 對象
>>> res = gp()
# 每個組也有一個唯一 id
>>> print("組id:", res.id)
組id: 65f32cc4-b8ce-4bf8-916b-f5cc359a901a
# 調(diào)用 get 方法也會阻塞,知道組里面任務(wù)全部完成
>>> print("組結(jié)果:", res.get())
組結(jié)果: [5, -1, 6, 2]
>>>

可以看到整個組也是有唯一 id 的,另外 signature 也可以寫成 subtask 或者 s,在源碼里面這幾個是等價的。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

我們觀察一下 worker 的輸出,任務(wù)是并發(fā)執(zhí)行的,所以哪個先完成不好說。但是調(diào)用組的 get 方法時,里面的返回值順序一定和任務(wù)添加時候的順序保持一致。

除此之外,celery 還支持將多個任務(wù)像鏈子一樣串起來,第一個任務(wù)的輸出會作為第二個任務(wù)的輸入,傳遞給下一個任務(wù)的第一個參數(shù)。

# celery_demo/tasks/task1.py
from app import app
@app.task
def task1():
 l = []
 return l
@app.task
# task1 的返回值會傳遞給這里的 task1_return
def task2(task1_return, value):
 task1_return.append(value)
 return task1_return
@app.task
# task2 的返回值會傳遞給這里的 task2_return
def task3(task2_return, num):
 return [i + num for i in task2_return]
@app.task
# task3 的返回值會傳遞給這里的 task3_return
def task4(task3_return):
 return sum(task3_return)

然后我們看怎么將這些任務(wù)像鏈子一樣串起來。

>>> from tasks.task1 import *
>>> from celery import chain
# 將多個 signature 對象進(jìn)行與運算
# 當(dāng)然內(nèi)部肯定重寫了 __or__ 這個魔法方法
>>> my_chain = chain(
... task1.s() | task2.s(123) | task3.s(5) | task4.s())
>>>
# 執(zhí)行任務(wù)鏈
>>> res = my_chain()
# 獲取返回值
>>> print(res.get())
128
>>>

這種鏈?zhǔn)教幚淼膱鼍胺浅3R?,比?MapReduce。

celery 實現(xiàn)定時任務(wù)

既然是定時任務(wù),那么就意味著 worker 要后臺啟動,否則一旦遠(yuǎn)程連接斷開,就停掉了。因此 celery 是支持我們后臺啟動的,并且可以啟動多個。

# 啟動 worker
celery multi start w1 -A app -l info
# 可以同時啟動多個
celery multi start w2 w3 -A app -l info
# 以上我們就啟動了 3 個 worker
# 如果想停止的話
celery multi stop w1 w2 w3 -A app -l info

但是注意,這種啟動方式在 Windows 上面不支持,因為 celery 會默認(rèn)創(chuàng)建兩個目錄,分別是 /var/log/celery 和 /var/run/celery,顯然這是類 Unix 系統(tǒng)的目錄結(jié)構(gòu)。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

顯然啟動和關(guān)閉是沒有問題的,不過為了更好地觀察到輸出,我們還是用之前的方式,選擇前臺啟動。

然后回顧一下 celery 的架構(gòu),里面除了 producer 之外還有一個 celery beat,也就是調(diào)度器。我們調(diào)用任務(wù)工廠的 delay 方法,手動將任務(wù)發(fā)送到隊列,此時就相當(dāng)于 producer。如果是設(shè)置定時任務(wù),那么會由調(diào)度器自動將任務(wù)添加到隊列。

我們在 tasks 目錄里面再創(chuàng)建一個 period_task1.py 文件。

# celery_demo/tasks/period_task1.py
from celery.schedules import crontab
from app import app
from .task1 import task1, task2, task3, task4
@app.on_after_configure.connect
def period_task(sender, **kwargs):
 # 第一個參數(shù)為 schedule,可以是 float,或者 crontab
 # crontab 后面會說,第二個參數(shù)是任務(wù),第三個參數(shù)是名字
 sender.add_periodic_task(10.0, task1.s(),
name="每10秒執(zhí)行一次")
 sender.add_periodic_task(15.0, task2.s("task2"),
name="每15秒執(zhí)行一次")
 sender.add_periodic_task(20.0, task3.s(),
name="每20秒執(zhí)行一次")
 sender.add_periodic_task(
 crontab(hour=18, minute=5, day_of_week=0),
 task4.s("task4"),
 name="每個星期天的18:05運行一次"
 )
# celery_demo/tasks/task1.py
from app import app
@app.task
def task1():
 print("我是task1")
 return "task1你好"
@app.task
def task2(name):
 print(f"我是{name}")
 return f"{name}你好"
@app.task
def task3():
 print("我是task3")
 return "task3你好"
@app.task
def task4(name):
 print(f"我是{name}")
 return f"{name}你好"

既然使用了定時任務(wù),那么一定要設(shè)置時區(qū)。

# celery_demo/config.py
broker_url = "redis://:maverick@82.157.146.194:6379/1"
result_backend = "redis://:maverick@82.157.146.194:6379/2"
# 之前說過,celery 默認(rèn)使用 utc 時間
# 其實我們是可以手動禁用的,然后手動指定時區(qū)
enable_utc = False
timezone = "Asia/Shanghai"

最后是修改 app.py,將定時任務(wù)加進(jìn)去。

from celery import Celery
import config
app = Celery(
 __name__,
 include=["tasks.task1", "tasks.period_task1"])
app.config_from_object(config)

下面就來啟動任務(wù),先來啟動 worker,生產(chǎn)上應(yīng)該后臺啟動,這里為了看到信息,選擇前臺啟動。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

tasks.task1 里面的 4 個任務(wù)工廠都被添加進(jìn)來了,然后再來啟動調(diào)度器。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

調(diào)度器啟動之后會自動檢測定時任務(wù),如果到時間了,就發(fā)送到隊列。而啟動調(diào)度器的命令如下:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

根據(jù)調(diào)度器的輸出內(nèi)容,我們知道定時任務(wù)執(zhí)行完了,但很明顯定時任務(wù)本質(zhì)上也是任務(wù),只不過有定時功能,但也要發(fā)到隊列里面。然后 worker 從隊列里面取出任務(wù),并執(zhí)行,那么 worker 必然會有信息輸出。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

調(diào)度器啟動到現(xiàn)在已經(jīng)有一段時間了,worker 在終端中輸出了非常多的信息。

此時我們就成功實現(xiàn)了定時任務(wù),并且是通過定義函數(shù)、打上裝飾器的方式實現(xiàn)的。除此之外,我們還可以通過配置的方式實現(xiàn)。

# celery_demo/tasks/period_task1.py
from celery.schedules import crontab
from app import app
# 此時也不需顯式導(dǎo)入任務(wù)工廠了
# 直接以字符串的方式指定即可
app.conf.beat_schedule = {
 # 參數(shù)通過 args 和 kwargs 指定
 "每10秒執(zhí)行一次": {"task": "tasks.task1.task1",
"schedule": 10.0},
 "每15秒執(zhí)行一次": {"task": "tasks.task1.task2",
"schedule": 15.0,
"args": ("task2",)},
 "每20秒執(zhí)行一次": {"task": "tasks.task1.task3",
"schedule": 20.0},
 "每個星期天的18:05運行一次": {"task": "tasks.task1.task4",
 "schedule": crontab(hour=18,
 minute=5,
 day_of_week=0),
 "args": ("task4",)}
}

需要注意:雖然我們不用顯式導(dǎo)入任務(wù)工廠,但其實是 celery 自動幫我們導(dǎo)入。由于這些任務(wù)工廠都位于 celery_demo/tasks/task1.py 里面,而 worker 也是在 celery_demo 目錄下啟動的,所以需要指定為 tasks.task1.task{1234}。

這種啟動方式也是可以成功的,貌似還更方便一些,但是會多出一個文件,用來存儲配置信息。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

crontab 參數(shù)

定時任務(wù)除了指定一個浮點數(shù)之外(表示每隔多少秒執(zhí)行一次),還可以指定 crontab。關(guān)于 crontab 應(yīng)該都知道是什么,我們在 Linux 上想啟動定時任務(wù)的話,直接 crontab -e 然后添加即可。

而 celery 的 crontab 和 Linux 高度相似,我們看一下函數(shù)原型就知道了。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

簡單解釋一下:

  • minute:0-59,表示第幾分鐘觸發(fā),* 表示每分鐘觸發(fā)一次;

  • hour:0-23,表示第幾個小時觸發(fā),* 表示每小時都會觸發(fā)。比如 minute=2, hour=*,表示每小時的第二分鐘觸發(fā)一次;

  • day_of_week:0-6,表示一周的第幾天觸發(fā),0 是星期天,1-6 分別是星期一到星期六,不習(xí)慣的話也可以用字符串 mon,tue,wed,thu,fri,sat,sun 表示;

  • month_of_year:當(dāng)前年份的第幾個月;

以上就是這些參數(shù)的含義,并且參數(shù)接收的值還可以是一些特殊的通配符:

  • *:所有,比如 minute=*,表示每分鐘觸發(fā);

  • */a:所有可被 a 整除的時候觸發(fā);

  • a-b:a 到 b范圍內(nèi)觸發(fā);

  • a-b/c:范圍 a-b 且能夠被 c 整除的時候觸發(fā);

  • 2,10,40:比如 minute=2,10,40 表示第 2、10、40 分鐘的時候觸發(fā);

通配符之間是可以自由組合的,比如 */3,8-17 就表示能被 3 整除,或范圍處于 8-17 的時候觸發(fā)。

除此之外,還可以根據(jù)天色來設(shè)置定時任務(wù)(有點離譜)。

from celery.schedules import solar
app.conf.beat_schedule = {
"日落": {"task": "task1",
 "schedule": solar("sunset",
 -37.81753,
144.96715)
 },
}

solar 里面接收三個參數(shù),分別是 event、lat、lon,后兩個比較簡單,表示觀測者所在的緯度和經(jīng)度。值大于 0,則對應(yīng)東經(jīng)/北緯,小于 0,則對應(yīng)西經(jīng)/南緯。

我們重點看第一個參數(shù) event,可選值如下:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

比如代碼中的 "sunset", -37.81753, 144.96715 就表示,當(dāng)站在南緯 37.81753、東經(jīng) 144.96715 的地方觀察,發(fā)現(xiàn)傍晚太陽的上邊緣消失在西方地平線上的時候,觸發(fā)任務(wù)執(zhí)行。

“Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI