溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

python怎么基于celery實(shí)現(xiàn)異步任務(wù)周期任務(wù)定時(shí)任務(wù)

發(fā)布時(shí)間:2021-04-25 13:37:45 來源:億速云 閱讀:219 作者:小新 欄目:開發(fā)技術(shù)

小編給大家分享一下python怎么基于celery實(shí)現(xiàn)異步任務(wù)周期任務(wù)定時(shí)任務(wù),希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!

Python主要用來做什么

Python主要應(yīng)用于:1、Web開發(fā);2、數(shù)據(jù)科學(xué)研究;3、網(wǎng)絡(luò)爬蟲;4、嵌入式應(yīng)用開發(fā);5、游戲開發(fā);6、桌面應(yīng)用開發(fā)。

首先是對celery的介紹,Celery其實(shí)是一個(gè)專注于實(shí)時(shí)處理和調(diào)度任務(wù)的分布式任務(wù)隊(duì)列,同時(shí)提供操作和維護(hù)分布式系統(tǒng)所需要的全部數(shù)據(jù), 因此可以用它提供的接口快速實(shí)現(xiàn)并管理一個(gè)分布式的任務(wù)隊(duì)列,它本身不是任務(wù)隊(duì)列,它是封裝了操作常見任務(wù)隊(duì)列的各種操作, 可以使用它快速進(jìn)行任務(wù)隊(duì)列的使用與管理.在Python中的組成部分是 1.用戶任務(wù) app 2.管道 broker 用于存儲(chǔ)任務(wù) 官方推薦的是 redis rabbitMQ / backend 用于存儲(chǔ)任務(wù)執(zhí)行結(jié)果的 3, 員工 worker 大致流程入下:

python怎么基于celery實(shí)現(xiàn)異步任務(wù)周期任務(wù)定時(shí)任務(wù)

最左邊的是用戶, 用戶發(fā)起1個(gè)請求給服務(wù)器, 要服務(wù)器執(zhí)行10個(gè)任務(wù),將這10個(gè)任務(wù)分給10個(gè)調(diào)度器,即開啟10個(gè)線程進(jìn)行任務(wù)處理,worker會(huì)一直監(jiān)聽調(diào)度器是否有任務(wù), 一旦發(fā)現(xiàn)有新的任務(wù), 就會(huì)立即執(zhí)行新任務(wù),一旦執(zhí)行完就會(huì)返回給調(diào)度器, 即backend, backend會(huì)將請求發(fā)送給服務(wù)器, 服務(wù)器將結(jié)果返回給用戶, 表現(xiàn)的結(jié)果就是,這10個(gè)任務(wù)同時(shí)完成,同時(shí)返回,,這就是Celery的整個(gè)工作流程, 其中的角色分別為,任務(wù)(app_work), 調(diào)度器(broker + backend), 將任務(wù)緩存的部分, 即將所有任務(wù)暫時(shí)存在的地方,相當(dāng)于生產(chǎn)者, 消費(fèi)者(worker 可以指定數(shù)量, 即在創(chuàng)建worker命令的時(shí)候可以指定數(shù)量), 在worker拿到任務(wù)后,人就控制不了了, 除非把worker殺死, 不然肯定會(huì)執(zhí)行完.

也即 任務(wù)來了以后, 調(diào)度器(broker)去緩存任務(wù), worker去執(zhí)行任務(wù), 完成后返回backend,接著返回,

還有就是關(guān)于定時(shí)任務(wù)和周期任務(wù)在linux上為什么不用自身所帶著的去做,是因?yàn)閘inux周期定時(shí)任務(wù)是不可控的, 不好管理, 返回值保存也是個(gè)麻煩事, 而celery只要開啟著調(diào)度器, 就可以隨時(shí)把人物結(jié)果獲取到,即使用celery控制起來是非常方便的.

接下來就是實(shí)例代碼:

workers.py

from celery import Celery
import time
# 創(chuàng)建一個(gè)Celery實(shí)例, 就是用戶的應(yīng)用app 第一個(gè)參數(shù)是任務(wù)名稱, 可以隨意起 后面的就是配置的broker和backend
diaoduqi= Celery("mytask", broker="redis://127.0.0.1:6379", backend="redis:127.0.0.1:6379")
# 接下來是為應(yīng)用創(chuàng)建任務(wù) ab
@diaoduqi.task
def ab(a,b):
  time.sleep(15)
  return a+b

brokers.py

from worker import ab

# 將任務(wù)交給Celery的Worker執(zhí)行
res = ab.delay(2,4)

#返回任務(wù)ID
print(res.id)

backends.py

from celery.result import AsyncResult
from worker import diaoduqi

# 異步獲取任務(wù)返回值
async_task = AsyncResult(id="31ec65e8-3995-4ee1-b3a8-1528400afd5a",app=diaoduqi)

# 判斷異步任務(wù)是否執(zhí)行成功
if async_task.successful():
  #獲取異步任務(wù)的返回值
  result = async_task.get()
  print(result)
else:
  print("任務(wù)還未執(zhí)行完成")

為了方便,現(xiàn)在直接將三個(gè)文件代表的部分命名在文件名稱中.首先是啟動(dòng)workers.py

啟動(dòng)方式是依據(jù)系統(tǒng)的不同來啟動(dòng)的, 對于linux下 celery worker -A workers -l INFO 也可以指定開啟的worker數(shù)量 即在后面添加的參數(shù)是 -c 5 表示指定5個(gè)worker 理論上指定的worker是無上限的,

在windows下需要安裝一個(gè)eventlet模塊進(jìn)行運(yùn)行, 不然不會(huì)運(yùn)行成功 pip install eventlet 可以開啟線程 不指定數(shù)量是默認(rèn)6個(gè)worker, 理論上worker的數(shù)量可以開啟無限個(gè),但是celery worker -A s1 -l INFO -P eventlet -c 5 使用eventlet 開啟5個(gè)worker 執(zhí)行

該命令后 處于就緒狀態(tài), 需要發(fā)布任務(wù), 即brokers.py進(jìn)行任務(wù)發(fā)布, 方法是使用delay的方式執(zhí)行異步任務(wù), 返回了一個(gè)任務(wù)id, 接著去backends.py中取這個(gè)任務(wù)id, 去查詢?nèi)蝿?wù)是否完成,判定條件即任務(wù).successful 判斷是否執(zhí)行完, 上面就是celery異步執(zhí)行任務(wù)的用法與解釋

接下來就是celery在項(xiàng)目中的應(yīng)用

在實(shí)際項(xiàng)目中應(yīng)用celery是有一定規(guī)則的, 即目錄結(jié)構(gòu)應(yīng)該如下.

python怎么基于celery實(shí)現(xiàn)異步任務(wù)周期任務(wù)定時(shí)任務(wù)

結(jié)構(gòu)說明 首先是創(chuàng)建一個(gè)CeleryTask的包,接著是在里面創(chuàng)建一個(gè)celery.py,必須是這個(gè)文件 關(guān)于重名的問題, 找尋模塊的順序是先從當(dāng)前目錄中去尋找, 根本找不到,接著是從內(nèi)置模塊中去找, 根本就找不到寫的這個(gè)celery這個(gè)文件,

celery.py

from celery import Celery
DDQ = Celery("DDQ",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379",
       include=["CeleryTask.TaskOne","CeleryTask.TaskTwo"])

TaskOne.py

import time
from CeleryTask.celery import DDQ
@DDQ.task
def one1(a,b):
  # time.sleep(3)
  return a+b
@DDQ.task
def one2():
  time.sleep(2)
  return "one2"

taskTwo.py

import time
from CeleryTask.celery import DDQ
@DDQ.task
def two1():
  time.sleep(2)
  return "two1"
@DDQ.task
def two2():
  time.sleep(3)
  return "two2"

getR.py

from CeleryTask.TaskOne import one1 as one

# one.delay(10,10)
# two.delay(20,20)

# 定時(shí)任務(wù)我們不在使用delay這個(gè)方法了,delay是立即交給task 去執(zhí)行
# 現(xiàn)在我們使用apply_async定時(shí)執(zhí)行

# 首先我們要先給task一個(gè)執(zhí)行任務(wù)的時(shí)間
import datetime, time

# 獲取當(dāng)前時(shí)間 此時(shí)間為東八區(qū)時(shí)間
ctime = time.time()
# 將當(dāng)前的東八區(qū)時(shí)間改為 UTC時(shí)間 注意這里一定是UTC時(shí)間,沒有其他說法
utc_time = datetime.datetime.utcfromtimestamp(ctime)
# 為當(dāng)前時(shí)間增加 10 秒
add_time = datetime.timedelta(seconds=10)
action_time = utc_time + add_time

# action_time 就是當(dāng)前時(shí)間未來10秒之后的時(shí)間
# 現(xiàn)在我們使用apply_async定時(shí)執(zhí)行
res = one.apply_async(args=(6, 10), eta=action_time)
res = one.apply_async(args=(6, 10), eta=action_time)
res = one.apply_async(args=(6, 10), eta=action_time)
res = one.apply_async(args=(6, 10), eta=action_time)
res = one.apply_async(args=(6, 10), eta=action_time)
res = one.apply_async(args=(6, 10), eta=action_time)
print(res.id)
# 這樣原本延遲5秒執(zhí)行的One函數(shù)現(xiàn)在就要在10秒鐘以后執(zhí)行了

接著是在命令行cd到與CeleryTask同級(jí)目錄下, 使用命令 celery worker -A CeleryTask -l INFO -P eventlet -c 50 這樣 就開啟了worker 接著去 發(fā)布任務(wù), 在定時(shí)任務(wù)中不再使用delay這個(gè)方法了,

delay是立即交給ttask去執(zhí)行, 在這里使用 apply_async定時(shí)執(zhí)行 指的是調(diào)度的時(shí)候去定時(shí)執(zhí)行

需要設(shè)置的是UTC時(shí)間, 以及定時(shí)的時(shí)間(多長時(shí)間以后執(zhí)行) 之后使用 celery worker -A CeleryTask -l INFO -P eventlet -c 50 命令開啟worker, 之后運(yùn)行 getR.py文件發(fā)布任務(wù), 可以看到在定義的時(shí)間以后執(zhí)行該任務(wù)

周期任務(wù)

周期任務(wù) 指的是在指定時(shí)間去執(zhí)行任務(wù) 需要導(dǎo)入的一個(gè)模塊有 crontab

文件結(jié)構(gòu)如下

python怎么基于celery實(shí)現(xiàn)異步任務(wù)周期任務(wù)定時(shí)任務(wù)

結(jié)構(gòu)同定時(shí)任務(wù)差不多,只不過需要變動(dòng)一下文件內(nèi)容 GetR文件已經(jīng)不需要了,可以刪除.

celery.py

from celery import Celery
from celery.schedules import crontab

DDQ = Celery("DDQ", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379",
       include=["CeleryTask.TaskOne", "CeleryTask.TaskTwo"])

# 我要要對beat任務(wù)生產(chǎn)做一個(gè)配置,這個(gè)配置的意思就是每10秒執(zhí)行一次Celery_task.task_one任務(wù)參數(shù)是(10,10)
DDQ.conf.beat_schedule = {
  "each20s_task": {
    "task": "CeleryTask.TaskOne.one1",
    "schedule": 10, # 每10秒鐘執(zhí)行一次
    "args": (10, 10)
  },
  "each2m_task": {
    "task": "CeleryTask.TaskOne.one2",
    "schedule": crontab(minute=1) # 每1分鐘執(zhí)行一次 也可以替換成 60 即 "schedule": 60
  }
}

TaskOne.py

import time
from CeleryTask.celery import DDQ
@DDQ.task
def one1(a,b):
  # time.sleep(3)
  return a+b
@DDQ.task
def one2():
  time.sleep(2)
  return "one2"

taskTwo.py

import time
from CeleryTask.celery import DDQ
@DDQ.task
def two1():
  time.sleep(2)
  return "two1"
@DDQ.task
def two2():
  time.sleep(3)
  return "two2"

以上配置完成以后,這時(shí)候就不能直接創(chuàng)建worker了,因?yàn)橐獔?zhí)行周期任務(wù),需要首先有一個(gè)任務(wù)的生產(chǎn)方, 即 celery beat -A CeleryTask, 用來產(chǎn)生創(chuàng)建者, 接著是創(chuàng)建worker worker的創(chuàng)建命令還是原來的命令, 即 celery worker -A CeleryTask -l INFO -P eventlet -c 50 , 創(chuàng)建完worker之后, 每10秒就會(huì)由beat創(chuàng)建一個(gè)任務(wù)給 worker去執(zhí)行.至此, celery創(chuàng)建異步任務(wù), 周期任務(wù),定時(shí)任務(wù)完畢, 伙伴們自己拿去測試吧.

看完了這篇文章,相信你對“python怎么基于celery實(shí)現(xiàn)異步任務(wù)周期任務(wù)定時(shí)任務(wù)”有了一定的了解,如果想了解更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI