溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么使用Celery Once來防止Celery重復執(zhí)行同一個任務

發(fā)布時間:2021-11-01 09:08:15 來源:億速云 閱讀:239 作者:小新 欄目:開發(fā)技術

這篇文章給大家分享的是有關怎么使用Celery Once來防止Celery重復執(zhí)行同一個任務的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

在使用 Celery 的時候發(fā)現(xiàn)有的時候 Celery 會將同一個任務執(zhí)行兩遍,我遇到的情況是相同的任務在不同的 worker 中被分別執(zhí)行,并且時間只相差幾毫秒。這問題我一直以為是自己哪里處理的邏輯有問題,后來發(fā)現(xiàn)其他人 也有類似的問題,然后基本上出問題的都是使用 Redis 作為 Broker 的,而我這邊一方面不想將 Redis 替換掉,就只能在 task 執(zhí)行的時候加分布式鎖了。

不過在 Celery 的 issue 中搜索了一下,有人使用 Redis 實現(xiàn)了分布式鎖,然后也有人使用了 Celery Once。 大致看了一下 Celery Once ,發(fā)現(xiàn)非常符合現(xiàn)在的情況,就用了下。

Celery Once 也是利用 Redis 加鎖來實現(xiàn), Celery Once 在 Task 類基礎上實現(xiàn)了 QueueOnce 類,該類提供了任務去重的功能,所以在使用時,我們自己實現(xiàn)的方法需要將 QueueOnce 設置為 base

@task(base=QueueOnce, once={'graceful': True})

后面的 once 參數(shù)表示,在遇到重復方法時的處理方式,默認 graceful 為 False,那樣 Celery 會拋出 AlreadyQueued 異常,手動設置為 True,則靜默處理。

另外如果要手動設置任務的 key,可以指定 keys 參數(shù)

@celery.task(base=QueueOnce, once={'keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

總得來說,分為幾步

第一步,安裝

pip install -U celery_once

第二步,增加配置

from celery import Celery
from celery_once import QueueOnce
from time import sleep

celery = Celery('tasks', broker='amqp://guest@localhost//')
celery.conf.ONCE = {
  'backend': 'celery_once.backends.Redis',
  'settings': {
    'url': 'redis://localhost:6379/0',
    'default_timeout': 60 * 60
  }
}

第三步,修改 delay 方法

example.delay(10)
# 修改為
result = example.apply_async(args=(10))

第四步,修改 task 參數(shù)

@celery.task(base=QueueOnce, once={'graceful': True, keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

感謝各位的閱讀!關于“怎么使用Celery Once來防止Celery重復執(zhí)行同一個任務”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。

AI