溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么用django-celery-beat搭建定時任務(wù)

發(fā)布時間:2023-03-21 10:20:20 來源:億速云 閱讀:138 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容主要講解“怎么用django-celery-beat搭建定時任務(wù)”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“怎么用django-celery-beat搭建定時任務(wù)”吧!

    一、創(chuàng)建django項目和app

    1、安裝定時任務(wù)第三方包

    pip install django-celery-beat # 插件用來動態(tài)配置定時任務(wù),一般會配合 django_celery_results 一起使用,所以一起安裝 django_celery_results

    pip install django_celery_results
    pip install eventlet # windows下運(yùn)行celery 4以后版本,還需額外安裝eventlet庫

    2、創(chuàng)建django項目并創(chuàng)建一個使用定時任務(wù)的app

    1.1創(chuàng)建django項目并創(chuàng)建app

    創(chuàng)建的過程省略,不在這里展開,需要注意的是setting文件注冊app的配置如下:

    INSTALLED_APPS = [
        ......
        'myapp',  # 剛創(chuàng)建的使用定時任務(wù)的app
        'django_celery_beat',  # 插件用來動態(tài)配置定時任務(wù),只要進(jìn)行了第一步pip安裝就可以直接注冊了
        'django_celery_results',
    ]

    1.2 創(chuàng)建定時任務(wù)數(shù)據(jù)庫
    依次執(zhí)行: python manage.py makemigrations 和 python manage.py migrate
    打開數(shù)據(jù)庫發(fā)現(xiàn),自動創(chuàng)建了一些表如下:

    怎么用django-celery-beat搭建定時任務(wù)

    這些都是定時任務(wù)需要的表格,自動創(chuàng)建不需要手動管理

    django_celery_beat.models.ClockedSchedule # 特定時刻任務(wù)
    django_celery_beat.models.CrontabSchedule # 特定時間表任務(wù),例如每周1運(yùn)行的計劃
    django_celery_beat.models.IntervalSchedule # 以特定間隔(例如,每5秒)運(yùn)行的計劃。
    django_celery_beat.models.PeriodicTask # 此模型定義要運(yùn)行的單個周期性任務(wù)。
    django_celery_beat.models.PeriodicTasks # 此模型僅用作索引以跟蹤計劃何時更改
    django_celery_beat.models.SolarSchedule # 定制任務(wù)

    如果安裝注冊了django_celery_results 還會有另外三個表:

    怎么用django-celery-beat搭建定時任務(wù)

    2、新建一個celery.py文件

    文件的作用是指定django環(huán)境、創(chuàng)建Celery app和指定Celery配置文件的啟動位置,類似與wsgi.py或asgi.py,因此也建議文件創(chuàng)建位置與wsgi.py或asgi.py同級。
    文件內(nèi)容如下:

    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery, platforms
    
    # 設(shè)置django環(huán)境
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lc_manage.settings.settings')
    
    # 創(chuàng)建一個Celery app
    app = Celery('djangotask')
    platforms.C_FORCE_ROOT = True   # 如果配置沒有生效需要在啟動時設(shè)置  export C_FORCE_ROOT="true"
    
    #  使用CELERY_ 作為前綴,在celeryconfig.py中寫配置
    app.config_from_object('lc_manage.celeryconfig', namespace="CELERY")
    # app.config_from_object('lc_manage.celeryconfig')
    
    # 發(fā)現(xiàn)任務(wù)文件每個app下的tasks.py
    app.autodiscover_tasks()

    3、創(chuàng)建配置文件config.py

    這個文件里的內(nèi)容可以寫到項目的 settings.py里面,因為上面的celery.py 中可以指定配置文件位置;不過內(nèi)容比較多,還是建議單獨(dú)創(chuàng)建一個配置文件celeryconfig.py,可以與celery.py 同級目錄,文件內(nèi)容如下:

    from __future__ import absolute_import
    
    # broker 設(shè)置 指定中間代理人將任務(wù)存到哪里,這里是redis的11號庫
    CELERY_BROKER_URL = 'redis://:123456@127.0.0.1:6379/11'
    # 指定 Backend 儲存結(jié)果的地方,可以使用django數(shù)據(jù)庫(django-db),也可以使用redis,
    # 使用django數(shù)據(jù)庫(django-db),以后運(yùn)行worker就會保存到數(shù)據(jù)庫中,可以通過ORM進(jìn)行訪問
    # CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
    CELERY_RESULT_BACKEND = 'django-db'
    
    # 使用django_celery_beat插件用來動態(tài)配置任務(wù)
    CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
    
    # 指定時區(qū),默認(rèn)是 UTC
    CELERY_TIMEZONE = 'Asia/Shanghai'
    
    # celery 序列化與反序列化配置
    CELERY_TASK_SERIALIZER = 'pickle'
    CELERY_RESULT_SERIALIZER = 'pickle'
    CELERY_ACCEPT_CONTENT = ['pickle', 'json']
    CELERY_TASK_IGNORE_RESULT = True
    
    # 有些情況下可以防止死鎖  非常重要!
    CELERYD_FORCE_EXECV = True
    # 為存儲結(jié)果設(shè)置過期日期,默認(rèn)1天過期。如果beat開啟,Celery每天會自動清除。
    # 設(shè)為0,存儲結(jié)果永不過期
    # CELERY_RESULT_EXPIRES = xx
    # CELERY_TASK_RESULT_EXPIRES = 60*60*24  # 后端存儲的任務(wù)超過一天時,自動刪除數(shù)據(jù)庫中的任務(wù)數(shù)據(jù),單位秒
    CELERY_MAX_TASKS_PER_CHILD = 1000  # 每個worker執(zhí)行1000次任務(wù)后,自動重啟worker,防止任務(wù)占用太多內(nèi)存導(dǎo)致內(nèi)存泄漏
    # 禁用所有速度限制,如果網(wǎng)絡(luò)資源有限,不建議開足馬力。
    CELERY_DISABLE_RATE_LIMITS = True
    # celery beat配置(周期性任務(wù)設(shè)置)
    CELERY_ENABLE_UTC = False
    
    # 官方用來修復(fù)CELERY_ENABLE_UTC=False and USE_TZ = False 時時間比較錯誤的問題;
    # 詳情見:https://github.com/celery/django-celery-beat/pull/216/files
    DJANGO_CELERY_BEAT_TZ_AWARE = False

    這里需要注意的是,如果在celery.py中配置指定了confiig配置文件使用CELERY前綴:app.config_from_object(‘lc_manage.celeryconfig’, namespace=“CELERY”),那么celeryconfig.py配置文件的參數(shù)都應(yīng)加:CELERY_,當(dāng)然你也可以不用第二個參數(shù),namespace=“CELERY"寫成app.config_from_object(‘lc_manage.celeryconfig’”), 那么celeryconfig.py中就不需要加CELERY_ 前綴,注意一定要統(tǒng)一?。?!否則可能 會報錯:

    consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused.

    4、加載celery.py

    我們自己創(chuàng)建的celery.py雖然與wsgi.py 或者 asgi.py等同級,但是 不會像他們一樣自動加載,需要我們通過本級文件下的__init__.py 把celery.py 加載進(jìn)來,打開__init__.py文件,添加如下內(nèi)容:

     from __future__ import absolute_import, unicode_literals
     from .celery import app as celery_app
    # 使得django啟動時加載celery的app
    __all__ = ('celery_app',)

    5、創(chuàng)建定時任務(wù)執(zhí)行內(nèi)容

    經(jīng)過上面的配置,django-celery_beta 會自動去掃描每個app目錄下是否有 tasks.py 文件,需要創(chuàng)建定時任務(wù)的app下我們可以手動創(chuàng)建tasks.py,定時任務(wù)就寫在這個文件上:

    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    
    @shared_task
    def add(x, y):
        print("x + y  =  ", x + y)
        return x + y
    
    @shared_task
    def mul(x, y):
        print("x * y  =  ", x * y)
        return x * y

    二、定時器創(chuàng)建和定時任務(wù)添加

    1、時間和周期控制:IntervalSchedule、ClockedSchedule和CrontabSchedule

    其他帖子都把上面三個稱為定時任務(wù)與PeriodicTask放一起結(jié)束,但是個人理解以上三個都是定時任務(wù)時控制時間和周期執(zhí)行的控制器,并非創(chuàng)建定時任務(wù),真正創(chuàng)建定時任務(wù)的只有PeriodicTask,所以這里個人把這三個稱為定時任務(wù)的“時間和頻率控制器”。

    • IntervalSchedule 按時間間隔頻率執(zhí)行定時任務(wù)的控制器,(例:每間隔1H/1M/…執(zhí)行一次)

    • ClockedSchedule 指定某個時刻執(zhí)行定時任務(wù)的控制器, (例:2018年8月8號 8:00這個時刻執(zhí)行)

    • CrontabSchedule 指定某個時間執(zhí)行定時任務(wù)的控制器 (例:每年的12月星期一的8:30)

    導(dǎo)入這四個模塊:

    from django_celery_beat.models import CrontabSchedule, PeriodicTask, IntervalSchedule,ClockedSchedule

    1.1 IntervalSchedule 時間間隔控制器

    參數(shù):every 間隔數(shù),period 間隔單位

    schedule, created = IntervalSchedule.objects.update_or_create(
        every=1,
        period=IntervalSchedule.MINUTES)   # 按分鐘間隔執(zhí)行

    第二個參數(shù)可選

    • IntervalSchedule.DAYS 固定間隔天數(shù)

    • IntervalSchedule.HOURS 固定間隔小時數(shù)

    • IntervalSchedule.MINUTES 固定間隔分鐘數(shù)

    • IntervalSchedule.SECONDS 固定間隔秒數(shù)

    • IntervalSchedule.MICROSECONDS 固定間隔微秒

    返回值可直接解包,其實只有第一個參數(shù)schedule有用,在PeriodicTask創(chuàng)建定時任務(wù)時作為時間和周期控制參數(shù)傳入。解包獲得的第二個數(shù)據(jù)created 可以直接用下劃線取代

    1.2 ClockedSchedule特定時刻定時器

    參數(shù):clocked_time 指定時間

    clocked, _ = ClockedSchedule.objects.update_or_create(
        clocked_time =datetime.strptime("2020-08-18 16:58:46","%Y-%m-%d %H:%M:%S"))   # 按指定時間執(zhí)行

    這里第二個參數(shù)直接用下劃線取代。特定時刻控制一般時執(zhí)行一次,適合在view中調(diào)用執(zhí)行一次性計劃。

    1.3、周期性任務(wù) CrontabSchedule

    參數(shù):

    • month_of_year # 幾月執(zhí)行

    • day_of_month # 幾號執(zhí)行

    • day_of_week # 周幾執(zhí)行

    • hour # 幾點(diǎn)執(zhí)行

    • minute # 幾分執(zhí)行

    • timezone # 時區(qū)

    crontab, _ = CrontabSchedule.objects.update_or_create(
        minute="00",
        hour="23",   
        day_of_week="*",   # 周幾執(zhí)行
        day_of_month='1',  # 幾點(diǎn)執(zhí)行
        month_of_year='*',    
        timezone=pytz.timezone("Asia/Shanghai"),
    )

    上面的星號代表不使用,上面的配置即: 每月1日的23點(diǎn)執(zhí)行一次。如果day_of_month=“*”則代表每天23點(diǎn)執(zhí)行。

    2、動態(tài)添加任務(wù) PeriodicTask

    參數(shù):

    • name:任務(wù)名

    • task:指定的任務(wù)

    • interval:時間間隔

    • crontab:時間控制器

    • clocked:指定時刻控制器

    • expires:有效日期

    • one_off:啟用狀態(tài)(如果為True,調(diào)度將只運(yùn)行該任務(wù)一次)

    • start_time:開始時間

    • enabled:啟用

    • last_run_at:最后運(yùn)行時間

    • total_run_count:運(yùn)行總次數(shù)

    • date_changed:最后的更改時間

    • description:描述

    • args: 傳參

    注意:

    1、interval、crontab、clocked三選一,不可同時使用。參數(shù)值分別對應(yīng)interval:schedule(IntervalSchedule的第一個返回值);crontab:crontab(CrontabSchedule的第一個返回值);clocked:clocked(ClockedSchedule返回值)
    2、name :任務(wù)名,確保其唯一性!?。。?br/>3、task:后面是以字符串形式調(diào)用定時任務(wù)的具體工作內(nèi)容函數(shù),即第一項的第5條用@shared_task裝飾器創(chuàng)建的方法。
    4、enabled:是否啟用,這里動態(tài)創(chuàng)建的話一般設(shè)為true
    5、args: 傳參(task中指定的任務(wù)需要傳參時使用),注意需要json序列化 json.dumps(…)

    其他參數(shù)均為非必填項。

    PeriodicTask.objects.update_or_create(
        name=working_point_record_id + 'working_time_1',
        defaults={
            "task": "apps.tasks.add",
            "crontab": crontab,
            "enabled": True,
            "args": json.dumps([working_point_record_id])
        },

    4、封裝方法

    一般情況下,可以把IntervalSchedule、ClockedSchedule和CrontabSchedule根據(jù)業(yè)務(wù)需求單獨(dú)封裝一個文件,而PeriodicTask.objects直接在對應(yīng)view視圖中調(diào)用即可。

    三、啟動定時任務(wù)beat

    定時任務(wù)是獨(dú)立與django項目運(yùn)行的,django只是定時任務(wù)的入口和操作數(shù)據(jù)庫的入口,而這前提是django-celery-beat 已經(jīng)獨(dú)立啟動,django-celery-beat的啟動分兩步,一是生產(chǎn)者單獨(dú)啟動(beat),而是工作者單獨(dú)啟動(worker),這里啟動順序需要注意一點(diǎn),建議先啟動worker 再啟動beat ,曾經(jīng)遇到先啟動beat有可能beat會啟動失敗。

    1、啟動工作者worker

    此時仍然需要重新打開一個命令窗口,進(jìn)入django項目的根目錄(manage.py同級目錄)下:

    # Linux下測試,啟動Celery
     Celery -A 【項目名稱】worker -l info  
     
     # Windows下測試,啟動Celery
     Celery -A 【項目名稱】worker -l info -P eventlet
     
     # 如果Windows下Celery不工作,輸入如下命令
     Celery -A 【項目名稱】worker -l info --pool=solo

    2、啟動生產(chǎn)者beat

    beat 是一個生產(chǎn)者角色,是單獨(dú)運(yùn)行,生產(chǎn)者完全不依賴django,需要與django在一個不同的命令窗口運(yùn)行,但啟動時需要先進(jìn)入django項目的根目錄,即與manage.py在同一目錄下:

    celery  -A 【項目名稱】 beat -l info

    四、定時任務(wù)創(chuàng)建

    這里要劃重點(diǎn)了,其實看完上面的已經(jīng)可以使用了,下面的屬于優(yōu)化選擇性理解,看個人理解能力和需求:

    一般情況下,定時器創(chuàng)建和定時任務(wù)添加 PeriodicTask會在view視圖中創(chuàng)建,但是如果view視圖比較頻繁,那么每次執(zhí)行view都創(chuàng)建一個定時任務(wù)的話會有無數(shù)個,會比較占用內(nèi)存資源,當(dāng)然可以在celeryconfig.py設(shè)置執(zhí)行多少次后重啟任務(wù),高并發(fā)下view會不會導(dǎo)致頻繁重啟,這里需要根據(jù)業(yè)務(wù)邏輯把@shared_task包裹下的任務(wù)處理方法做成批量處理放到單獨(dú)文件中(不需要在view視圖中調(diào)用,定時調(diào)用批處理,那么就是一個任務(wù)批量處理數(shù)據(jù)而已),在間隔固定時間下執(zhí)行,單純語言難以表達(dá)清楚,理解的就理解了,不理解的就慢慢體會吧。這里需要畫重點(diǎn)的是:如果你理解了單個文件寫@shared_task批處理方法,那么你的問題重點(diǎn)就是:我如何啟動它呢?難道每次在啟動beat后在命令行啟動嗎?雖然也可以,但是如果項目需要創(chuàng)建的定時任務(wù)很多怎么辦,不用多,幾百行可以了,每次在命令行復(fù)制粘貼執(zhí)行命令 嗎?當(dāng)然不要,其實直接創(chuàng)建一個單獨(dú)的python文件,然后再根目錄下的url中導(dǎo)入即可,因為url在項目啟動的時候會自動加載一次,也就相當(dāng)于啟動django項目的時候就自動創(chuàng)建了一次定時任務(wù)!

    到此,相信大家對“怎么用django-celery-beat搭建定時任務(wù)”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

    向AI問一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI