溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

使用Celery和Docker處理Django中定期任務(wù)的方法

發(fā)布時(shí)間:2020-07-29 12:09:34 來源:億速云 閱讀:674 作者:小豬 欄目:服務(wù)器

這篇文章主要講解了使用Celery和Docker處理Django中定期任務(wù)的方法,內(nèi)容清晰明了,對(duì)此有興趣的小伙伴可以學(xué)習(xí)一下,相信大家閱讀完之后會(huì)有幫助。

在構(gòu)建和擴(kuò)展Django應(yīng)用程序時(shí),不可避免地需要定期在后臺(tái)自動(dòng)運(yùn)行某些任務(wù)。

一些例子:

生成定期報(bào)告

清除緩存

發(fā)送批量電子郵件通知

執(zhí)行每晚維護(hù)工作

這是構(gòu)建和擴(kuò)展不屬于Django核心的Web應(yīng)用程序所需的少數(shù)功能之一。幸運(yùn)的是,Celery提供了一個(gè)強(qiáng)大的解決方案,該解決方案非常容易實(shí)現(xiàn),稱為Celery Beat。

在下面的文章中,我們將向您展示如何使用Docker設(shè)置Django,Celery和Redis,以便通過Celery Beat定期運(yùn)行自定義Django Admin命令。

依存關(guān)系:

Django v3.0.5

Docker v19.03.8

Python v3.8.2

芹菜v4.4.1

Redis v5.0.8

Django + Celery系列:

Django和Celery的異步任務(wù)

使用Celery和Docker處理Django中的定期任務(wù)(本文!)

目標(biāo)

在本教程結(jié)束時(shí),您應(yīng)該能夠:

使用Docker容器化Django,Celery和Redis

將Celery集成到Django應(yīng)用中并創(chuàng)建任務(wù)

編寫自定義Django Admin命令

安排自定義Django Admin命令以通過Celery Beat定期運(yùn)行

項(xiàng)目設(shè)置

django-celery-beat存儲(chǔ)庫中克隆基礎(chǔ)項(xiàng)目,然后簽出基礎(chǔ)分支:

$ git clone
https://github.com/testdrivenio/django-celery-beat
--branch base --single-branch
$ cd django-celery-beat

由于我們總共需要管理四個(gè)流程(Django,Redis,worker和Scheduler),因此我們將使用Docker通過連接起來簡化它們的工作流程,從而使它們都可以通過一個(gè)命令從一個(gè)終端窗口運(yùn)行 。

從項(xiàng)目根目錄創(chuàng)建映像,并啟動(dòng)Docker容器:

$ docker-compose up -d --build
$ docker-compose exec web python manage.py migrate

構(gòu)建完成后,導(dǎo)航至http:// localhost:1337以確保該應(yīng)用程序能夠按預(yù)期運(yùn)行。 您應(yīng)該看到以下文本:

Orders
 No orders found!

項(xiàng)目結(jié)構(gòu):

├── .gitignore
├── docker-compose.yml
└── project
    ├── Dockerfile
    ├── core
    │   ├── __init__.py
    │   ├── asgi.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── entrypoint.sh
    ├── manage.py
    ├── orders
    │   ├── __init__.py
    │   ├── admin.py
    │   ├── apps.py
    │   ├── migrations
    │   │   ├── 0001_initial.py
    │   │   └── __init__.py
    │   ├── models.py
    │   ├── tests.py
    │   ├── urls.py
    │   └── views.py
    ├── requirements.txt
    └── templates
        └── orders
            └── order_list.html

Celery和Redis

現(xiàn)在,我們需要為Celery,Celery Beat和Redis添加容器。

首先,將依賴項(xiàng)添加到requirements.txt文件中:

Django==3.0.5
celery==4.4.1
redis==3.4.1

docker-compose.yml文件內(nèi)容:

redis:
 image: redis:alpine
celery:
 build: ./project
 command: celery -A core worker -l info
 volumes:
 - ./project/:/usr/src/app/
 environment:
 - DEBUG=1
 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
 - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
 depends_on:
 - redis
celery-beat:
 build: ./project
 command: celery -A core beat -l info
 volumes:
 - ./project/:/usr/src/app/
 environment:
 - DEBUG=1
 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
 - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
 depends_on:
 - redis

我們還需要更新Web服務(wù)的depends_on部分:

web:
 build: ./project
 command: python manage.py runserver 0.0.0.0:8000
 volumes:
 - ./project/:/usr/src/app/
 ports:
 - 1337:8000
 environment:
 - DEBUG=1
 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
 - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
 depends_on:
 - redis # NEW

完整的docker-compose文件如下:

version: '3.7'
 
services:
 web:
 build: ./project
 command: python manage.py runserver 0.0.0.0:8000
 volumes:
 - ./project/:/usr/src/app/
 ports:
 - 1337:8000
 environment:
 - DEBUG=1
 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
 - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
 depends_on:
 - redis
 redis:
 image: redis:alpine
 celery:
 build: ./project
 command: celery -A core worker -l info
 volumes:
 - ./project/:/usr/src/app/
 environment:
 - DEBUG=1
 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
 - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
 depends_on:
 - redis
 celery-beat:
 build: ./project
 command: celery -A core beat -l info
 volumes:
 - ./project/:/usr/src/app/
 environment:
 - DEBUG=1
 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
 - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
 depends_on:
 - redis

在構(gòu)建新容器之前,我們需要在Django應(yīng)用中配置Celery。

芹菜配置

設(shè)定

在“核心”目錄中,創(chuàng)建一個(gè)celery.py文件并添加以下代碼:

import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
 
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

這里發(fā)生了什么事?

首先,我們?yōu)镈JANGO_SETTINGS_MODULE環(huán)境變量設(shè)置一個(gè)默認(rèn)值,以便Celery知道如何找到Django項(xiàng)目。

接下來,我們創(chuàng)建了一個(gè)名稱為core的新Celery實(shí)例,并將該值分配給名為app的變量。

然后,我們從django.conf的settings對(duì)象中加載了celery配置值。 我們使用namespace =“ CELERY”來防止與其他Django設(shè)置發(fā)生沖突。 換句話說,Celery的所有配置設(shè)置必須以CELERY_為前綴。

最后,app.autodiscover_tasks()告訴Celery從settings.INSTALLED_APPS中定義的應(yīng)用程序中查找Celery任務(wù)。

將以下代碼添加到core / __ init__.py:

from .celery import app as celery_app
 
__all__ = ("celery_app",)

最后,使用以下Celery設(shè)置更新core / settings.py文件,使其可以連接到Redis:

CELERY_BROKER_URL = "redis://redis:6379"
CELERY_RESULT_BACKEND = "redis://redis:6379"

build:

$ docker-compose up -d --build

查看日志:

$ docker-compose logs 'web'
$ docker-compose logs 'celery'
$ docker-compose logs 'celery-beat'
$ docker-compose logs 'redis'

如果一切順利,我們現(xiàn)在有四個(gè)容器,每個(gè)容器提供不同的服務(wù)。

現(xiàn)在,我們準(zhǔn)備創(chuàng)建一個(gè)示例任務(wù),以查看其是否可以正常工作。

創(chuàng)建一個(gè)任務(wù)

創(chuàng)建一個(gè)新文件core / tasks.py并為僅打印到控制臺(tái)的示例任務(wù)添加以下代碼:

from celery import shared_task

@shared_task
def sample_task():
 print("The sample task just ran.")

安排任務(wù)

在settings.py文件的末尾,添加以下代碼,以使用Celery Beat將sample_task安排為每分鐘運(yùn)行一次:

CELERY_BEAT_SCHEDULE = {
 "sample_task": {
 "task": "core.tasks.sample_task",
 "schedule": crontab(minute="*/1"),
 },
}

在這里,我們使用CELERY_BEAT_SCHEDULE設(shè)置定義了定期任務(wù)。 我們給任務(wù)命名了sample_task,然后聲明了兩個(gè)設(shè)置:

任務(wù)聲明要運(yùn)行的任務(wù)。

時(shí)間表設(shè)置任務(wù)應(yīng)運(yùn)行的時(shí)間間隔。 這可以是整數(shù),時(shí)間增量或crontab。 我們?cè)谌蝿?wù)中使用了crontab模式,告訴它每分鐘運(yùn)行一次。 您可以在此處找到有關(guān)Celery日程安排的更多信息。

確保添加導(dǎo)入:

from celery.schedules import crontab
 
import core.tasks

重啟容器,應(yīng)用變更:

$ docker-compose up -d --build

查看日志:

$ docker-compose logs -f 'celery'
celery_1 | -------------- [queues]
celery_1 | .> celery exchange=celery(direct) key=celery
celery_1 |
celery_1 |
celery_1 | [tasks]
celery_1 | . core.tasks.sample_task

我們可以看到Celery獲得了示例任務(wù)core.tasks.sample_task。

每分鐘,您應(yīng)該在日志中看到一行以“示例任務(wù)剛剛運(yùn)行”結(jié)尾的行:

celery_1  | [2020-04-15 22:49:00,003: INFO/MainProcess]
              Received task: core.tasks.sample_task[8ee5a84f-c54b-4e41-945b-645765e7b20a]
celery_1  | [2020-04-15 22:49:00,007: WARNING/ForkPoolWorker-1] The sample task just ran.

自定義Django Admin命令

Django提供了許多內(nèi)置的django-admin命令,例如:

遷移

啟動(dòng)項(xiàng)目

startapp

轉(zhuǎn)儲(chǔ)數(shù)據(jù)

移民

除了內(nèi)置命令,Django還為我們提供了創(chuàng)建自己的自定義命令的選項(xiàng):

自定義管理命令對(duì)于運(yùn)行獨(dú)立腳本或從UNIX crontab或Windows計(jì)劃任務(wù)控制面板定期執(zhí)行的腳本特別有用。

因此,我們將首先配置一個(gè)新命令,然后使用Celery Beat自動(dòng)運(yùn)行它。

首先創(chuàng)建一個(gè)名為orders / management / commands / my_custom_command.py的新文件。 然后,添加運(yùn)行它所需的最少代碼:

from django.core.management.base import BaseCommand, CommandError
 
 
class Command(BaseCommand):
 help = "A description of the command"
 
 def handle(self, *args, **options):
 pass

BaseCommand有一些可以被覆蓋的方法,但是唯一需要的方法是handle。 handle是自定義命令的入口點(diǎn)。 換句話說,當(dāng)我們運(yùn)行命令時(shí),將調(diào)用此方法。

為了進(jìn)行測試,我們通常只添加一個(gè)快速打印語句。 但是,建議根據(jù)Django文檔使用stdout.write代替:

當(dāng)您使用管理命令并希望提供控制臺(tái)輸出時(shí),應(yīng)該寫入self.stdout和self.stderr,而不是直接打印到stdout和stderr。 通過使用這些代理,測試自定義命令變得更加容易。 另請(qǐng)注意,您無需以換行符結(jié)束消息,除非您指定結(jié)束參數(shù),否則它將自動(dòng)添加。

因此,添加一個(gè)self.stdout.write命令:

from django.core.management.base import BaseCommand, CommandError
 
 
class Command(BaseCommand):
 help = "A description of the command"
 
 def handle(self, *args, **options):
 self.stdout.write("My sample command just ran.") # NEW

測試:

$ docker-compose exec web python manage.py my_custom_command
My sample command just ran.

這樣,讓我們將所有內(nèi)容捆綁在一起!

使用Celery Beat安排自定義命令

現(xiàn)在我們已經(jīng)啟動(dòng)并運(yùn)行了容器,已經(jīng)過測試,可以安排任務(wù)定期運(yùn)行,并編寫了自定義的Django Admin示例命令,現(xiàn)在該進(jìn)行設(shè)置以定期運(yùn)行自定義命令了。

設(shè)定

在項(xiàng)目中,我們有一個(gè)非?;镜膽?yīng)用程序,稱為訂單。 它包含兩個(gè)模型,產(chǎn)品和訂單。 讓我們創(chuàng)建一個(gè)自定義命令,該命令從當(dāng)天發(fā)送確認(rèn)訂單的電子郵件報(bào)告。

首先,我們將通過此項(xiàng)目中包含的夾具將一些產(chǎn)品和訂單添加到數(shù)據(jù)庫中:

$ docker-compose exec web python manage.py loaddata products.json

創(chuàng)建超級(jí)用戶:

$ docker-compose exec web python manage.py createsuperuser

出現(xiàn)提示時(shí),請(qǐng)?zhí)顚懹脩裘?,電子郵件和密碼。 然后在您的Web瀏覽器中導(dǎo)航到http://127.0.0.1:1337/admin。 使用您剛創(chuàng)建的超級(jí)用戶登錄,并創(chuàng)建幾個(gè)訂單。 確保至少有一個(gè)日期為今天。

讓我們?yōu)槲覀兊碾娮余]件報(bào)告創(chuàng)建一個(gè)新的自定義命令。

創(chuàng)建一個(gè)名為orders / management / commands / email_report.py的文件:

from datetime import timedelta, time, datetime
 
from django.core.mail import mail_admins
from django.core.management import BaseCommand
from django.utils import timezone
from django.utils.timezone import make_aware
 
from orders.models import Order
 
today = timezone.now()
tomorrow = today + timedelta(1)
today_start = make_aware(datetime.combine(today, time()))
today_end = make_aware(datetime.combine(tomorrow, time()))
 
 
class Command(BaseCommand):
 help = "Send Today's Orders Report to Admins"
 
 def handle(self, *args, **options):
 orders = Order.objects.filter(confirmed_date__range=(today_start, today_end))
 
 if orders:
 message = ""
 
 for order in orders:
 message += f"{order} \n"
 
 subject = (
 f"Order Report for {today_start.strftime('%Y-%m-%d')} "
 f"to {today_end.strftime('%Y-%m-%d')}"
 )
 
 mail_admins(subject=subject, message=message, html_message=None)
 
 self.stdout.write("E-mail Report was sent.")
 else:
 self.stdout.write("No orders confirmed today.")

在代碼中,我們向數(shù)據(jù)庫查詢了日期為Confirmed_date的訂單,將訂單合并為電子郵件正文的單個(gè)消息,然后使用Django內(nèi)置的mail_admins命令將電子郵件發(fā)送給管理員。

添加一個(gè)虛擬管理員電子郵件,并將EMAIL_BACKEND設(shè)置為使用控制臺(tái)后端,以便將該電子郵件發(fā)送到設(shè)置文件中的stdout:

EMAIL_BACKEND = "django.core.mail.backends.console.EmailBackend"
DEFAULT_FROM_EMAIL = "noreply@email.com"
ADMINS = [("testuser", "test.user@email.com"), ]

運(yùn)行:

$ docker-compose exec web python manage.py email_report
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Subject: [Django] Order Report for 2020-04-15 to 2020-04-16
From: root@localhost
To: test.user@email.com
Date: Wed, 15 Apr 2020 23:10:45 -0000
Message-ID: <158699224565.85.8278261495663971825@5ce6313185d3>
 
Order: 337ef21c-5f53-4761-9f81-07945de385ae - product: Rice
 
-------------------------------------------------------------------------------
E-mail Report was sent.

Celery Beat

現(xiàn)在,我們需要?jiǎng)?chuàng)建一個(gè)定期任務(wù)來每天運(yùn)行此命令。

向core / tasks.py添加一個(gè)新任務(wù):

from celery import shared_task
from django.core.management import call_command # NEW
 
 
@shared_task
def sample_task():
 print("The sample task just ran.")
 
 
# NEW
@shared_task
def send_email_report():
 call_command("email_report", )

因此,首先我們添加了一個(gè)call_command導(dǎo)入,該導(dǎo)入用于以編程方式調(diào)用django-admin命令。 在新任務(wù)中,然后將call_command與自定義命令的名稱一起用作參數(shù)。

要安排此任務(wù),請(qǐng)打開core / settings.py文件,并更新CELERY_BEAT_SCHEDULE設(shè)置以包括新任務(wù)。

CELERY_BEAT_SCHEDULE = {
 "sample_task": {
 "task": "core.tasks.sample_task",
 "schedule": crontab(minute="*/1"),
 },
 "send_email_report": {
 "task": "core.tasks.send_email_report",
 "schedule": crontab(hour="*/1"),
 },
}

在這里,我們向CELERY_BEAT_SCHEDULE添加了一個(gè)名為send_email_report的新條目。 正如我們對(duì)上一個(gè)任務(wù)所做的那樣,我們聲明了該任務(wù)應(yīng)運(yùn)行的任務(wù)-例如core.tasks.send_email_report-并使用crontab模式設(shè)置重復(fù)性。

重新啟動(dòng)容器,以確保新設(shè)置處于活動(dòng)狀態(tài):

$ docker-compose up -d --build
看日志:
$ docker-compose logs -f 'celery'
celery_1 | -------------- [queues]
celery_1 | .> celery exchange=celery(direct) key=celery
celery_1 |
celery_1 |
celery_1 | [tasks]
celery_1 | . core.tasks.sample_task
celery_1 | . core.tasks.send_email_report

一分鐘后郵件發(fā)出:

celery_1  | [2020-04-15 23:20:00,309: WARNING/ForkPoolWorker-1] Content-Type: text/plain; charset="utf-8"
celery_1  | MIME-Version: 1.0
celery_1  | Content-Transfer-Encoding: 7bit
celery_1  | Subject: [Django] Order Report for 2020-04-15 to 2020-04-16
celery_1  | From: root@localhost
celery_1  | To: test.user@email.com
celery_1  | Date: Wed, 15 Apr 2020 23:20:00 -0000
celery_1  | Message-ID: <158699280030.12.8934112422500683251@42481c198b77>
celery_1  |
celery_1  | Order: 337ef21c-5f53-4761-9f81-07945de385ae - product: Rice
celery_1  | [2020-04-15 23:20:00,310: WARNING/ForkPoolWorker-1] -------------------------------------------------------------------------------
celery_1  | [2020-04-15 23:20:00,312: WARNING/ForkPoolWorker-1] E-mail Report was sent.

結(jié)論

在本文中,我們指導(dǎo)您為Celery,Celery Beat和Redis設(shè)置Docker容器。 然后,我們展示了如何使用Celery Beat創(chuàng)建自定義Django Admin命令和定期任務(wù)以自動(dòng)運(yùn)行該命令。

看完上述內(nèi)容,是不是對(duì)使用Celery和Docker處理Django中定期任務(wù)的方法有進(jìn)一步的了解,如果還想學(xué)習(xí)更多內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI