溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

利用Django-celery-beat怎么動(dòng)態(tài)添加一個(gè)周期性任務(wù)

發(fā)布時(shí)間:2020-11-27 14:59:07 來(lái)源:億速云 閱讀:447 作者:Leah 欄目:開(kāi)發(fā)技術(shù)

這篇文章將為大家詳細(xì)講解有關(guān)利用Django-celery-beat怎么動(dòng)態(tài)添加一個(gè)周期性任務(wù),文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。

前期準(zhǔn)備

1.beat插件安裝

pip3 install django-celery-beat

2.注冊(cè)APP

INSTALLED_APPS = [
....
'django_celery_beat',
]

3.數(shù)據(jù)庫(kù)變更

python3 manage.py migrate django_celery_beat

配置工作

目錄結(jié)構(gòu)請(qǐng)參考://www.jb51.net/article/200659.htm

1.配置celerypro.py

from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
from django.utils import timezone

# set the default Django settings module for the 'celery' program.
# 為celery設(shè)置環(huán)境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'voice_quality_assurance_configure.settings')
# 創(chuàng)建celery app
app = Celery('voice_quality_assurance_configure')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
# 從單獨(dú)的配置模塊中加載配置
app.config_from_object('voice_quality_assurance_configure.celeryconfig')
# 設(shè)置app自動(dòng)加載任務(wù)
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# 解決時(shí)區(qū)問(wèn)題,定時(shí)任務(wù)啟動(dòng)就循環(huán)輸出
app.now = timezone.now

2.配置celeryconfig.py

from __future__ import absolute_import
from kombu import Queue
from django.conf import settings

# 設(shè)置代理人broker
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/2'
# 指定 Backend
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
# 指定時(shí)區(qū),默認(rèn)是 UTC
CELERY_TIMEZONE='Asia/Shanghai'
# celery 序列化與反序列化配置
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle', 'json']
CELERY_IGNORE_RESULT = True
# celery 的啟動(dòng)工作數(shù)量設(shè)置
CELERY_WORKER_CONCURRENCY = 10
# 任務(wù)預(yù)取功能,會(huì)盡量多拿 n 個(gè),以保證獲取的通訊成本可以壓縮。
CELERYD_PREFETCH_MULTIPLIER = 20
# 有些情況下可以防止死鎖
CELERYD_FORCE_EXECV = True
# celery 的 worker 執(zhí)行多少個(gè)任務(wù)后進(jìn)行重啟操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# 禁用所有速度限制,如果網(wǎng)絡(luò)資源有限,不建議開(kāi)足馬力。
CELERY_DISABLE_RATE_LIMITS = True

# celery beat配置(周期性任務(wù)設(shè)置)
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = settings.TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

3.分別啟動(dòng)woker和beta

項(xiàng)目根目錄終端執(zhí)行(voice_quality_assurance_configure為項(xiàng)目名稱(chēng),簡(jiǎn)單來(lái)說(shuō),和manage.py文件同級(jí))

celery -A voice_quality_assurance_configure beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler #

啟動(dòng)beta 調(diào)度器使用數(shù)據(jù)庫(kù)

celery worker -A voice_quality_assurance_configure --loglevel=info -n worker1 #啟動(dòng)celery worker

4.創(chuàng)建周期性任務(wù)

from datetime import datetime, timedelta
import json
import os,django

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "voice_quality_assurance_configure.settings")# project_name 項(xiàng)目名稱(chēng)
django.setup()
from django_celery_beat.models import PeriodicTask, IntervalSchedule
schedule, created = IntervalSchedule.objects.get_or_create(every=10,period=IntervalSchedule.SECONDS,)

# 帶參數(shù)的創(chuàng)建方法,如下:
PeriodicTask.objects.create(
   interval=schedule,     # 上面創(chuàng)建10秒的間隔 interval 對(duì)象
   name='test_task',     # 設(shè)置任務(wù)的name值
   task='mission.tasks.my_task', # 指定需要周期性執(zhí)行的任務(wù)
   args=json.dumps([10, 2, 76]),
  expires=datetime.utcnow() + timedelta(seconds=30)
)

詳解創(chuàng)建周期性任務(wù)的方法

創(chuàng)建基于interval的周期性任務(wù)

第一步創(chuàng)建間隔對(duì)象

schedule, created = IntervalSchedule.objects.get_or_create(
  every=10,
  period=IntervalSchedule.SECONDS,
)

IntervalSchedule.DAYS 固定間隔天數(shù)
IntervalSchedule.HOURS 固定間隔小時(shí)數(shù)
IntervalSchedule.MINUTES 固定間隔分鐘數(shù)
IntervalSchedule.SECONDS 固定間隔秒數(shù)
IntervalSchedule.MICROSECONDS 固定間隔微秒

第二步創(chuàng)建任務(wù)

無(wú)參數(shù)的創(chuàng)建方法:

PeriodicTask.objects.create(
   interval=schedule,         # we created this above.
   name='test_task',     # simply describes this periodic task.
   task='app名.tasks.任務(wù)函數(shù)名', # name of task.)

有參數(shù)的創(chuàng)建方法:

PeriodicTask.objects.create(
   interval=schedule,         # we created this above.
   name='test'_task',     # simply describes this periodic task.
   task='app名.tasks.任務(wù)函數(shù)名', # name of task. 
   args=json.dumps(['arg1', 'arg2']), 
   kwargs=json.dumps({ 'be_careful': True, }), 
   expires=datetime.utcnow() + timedelta(seconds=30) )
class MonitorDeviceTask(object):
  """
  設(shè)備創(chuàng)建,增加周期性任務(wù)
  """

  def __init__(self, device_obj):
    self.device_obj = device_obj
    self.periodic_task = PeriodicTask.objects.create(
      interval=schedule,
      name='test_task',
      task='mission.tasks.my_task',
      args=json.dumps([self.device_obj.ip])
    )

  def starttask(self):
    """
    啟動(dòng)任務(wù)
    """
    self.periodic_task.enabled = True
    self.periodic_task.save()

  def stoptask(self):
    """
    停止任務(wù)
    """
    self.periodic_task.enabled = False
    self.periodic_task.save()

  def deltask(self):
    """
    刪除任務(wù)
    """
    self.periodic_task.delete()  
    self.periodic_task.save()

創(chuàng)建基于 crontab 的周期性任務(wù)

from django_celery_beat.models import CrontabSchedule, PeriodicTask
schedule, _ = CrontabSchedule.objects.get_or_create(
   minute='30',
   hour='*',
   day_of_week='*',
   day_of_month='*',
   month_of_year='*',
   timezone=pytz.timezone('Canada/Pacific')
)

關(guān)于利用Django-celery-beat怎么動(dòng)態(tài)添加一個(gè)周期性任務(wù)就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI