溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

django+celery 實(shí)現(xiàn)分布式任務(wù)

發(fā)布時(shí)間:2020-06-28 00:45:16 來(lái)源:網(wǎng)絡(luò) 閱讀:3328 作者:lianlongfei 欄目:編程語(yǔ)言

想用django做一個(gè)自動(dòng)運(yùn)維平臺(tái),利用netsnmp來(lái)獲取交換機(jī)及服務(wù)器信息,但是snmpget任務(wù)需要在后臺(tái)實(shí)時(shí)運(yùn)行,為了不影響html響應(yīng),利用celery來(lái)結(jié)合django做異步任務(wù)隊(duì)列。

一、環(huán)境準(zhǔn)備
1.首先安裝celery
pip3 install celery
2.安裝djcelery
pip3 install django-celery
3.安裝一個(gè)broker
我們必須擁有一個(gè)broker消息隊(duì)列用于發(fā)送和接收消息。Celery官網(wǎng)給出了多個(gè)broker的備選方案:RabbitMQ、Redis、Database(不推薦)以及其他的消息中間件。本次我們利用redis

sudo apt-get install redis

redis-server
啟動(dòng)redis服務(wù), 端口假設(shè)為6379
redis-cli 查看redis 狀態(tài)
root@ubuntu:~/Desktop/webserver/myserver# redis-cli
127.0.0.1:6379>

二、django 配置
配置settings.py
首先,在Django工程的settings.py文件中加入如下配置代碼:

celery 配置信息 start

#############################
import djcelery

celery 配置

djcelery.setup_loader()
BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERY_IMPORTS = ('serverapp.task')
#############################

celery 配置信息 end

#############################

當(dāng)djcelery.setup_loader()運(yùn)行時(shí),Celery便會(huì)去查看INSTALLD_APPS下包含的所有app目錄中的tasks.py文件,找到標(biāo)記為task的方法,將它們注冊(cè)為celery task

? BROKER_URL:broker是代理人,它負(fù)責(zé)分發(fā)任務(wù)給worker去執(zhí)行。我使用的是Redis作為broker

? 沒(méi)有設(shè)置 CELERY_RESULT_BACKEND,默認(rèn)沒(méi)有配置,此時(shí)Django會(huì)使用默認(rèn)的數(shù)據(jù)庫(kù)(也是你指定的orm數(shù)據(jù)庫(kù))。

CELERY_IMPORTS:是導(dǎo)入目標(biāo)任務(wù)文件

CELERYBEAT_SCHEDULER:使用了django-celery默認(rèn)的數(shù)據(jù)庫(kù)調(diào)度模型,任務(wù)執(zhí)行周期都被存在默認(rèn)指定的orm數(shù)據(jù)庫(kù)中.

CELERYBEAT_SCHEDULE:設(shè)置定時(shí)的時(shí)間配置, 可以精確到秒,分鐘,小時(shí),天,周等。

3.在主工程的配置文件settings.py 中應(yīng)用注冊(cè)表INSTALLED_APPS中加入 djcelery

INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'djcelery', #加入djcelery
]

4.(3)創(chuàng)建應(yīng)用實(shí)例

? 在主工程目錄添加celery.py, 添加自動(dòng)檢索django工程tasks任務(wù)

? vim artproject/celery.py

#目的是拒絕隱士引入,celery.py和celery沖突。
from future import absolute_import

import os

from celery import Celery, platforms
platforms.C_FORCE_ROOT = True

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myserver.settings')
#Specifying the settings here means the celery command line program will know where your Django project is.
#This statement must always appear before the app instance is created, which is what we do next:
from django.conf import settings

app = Celery('serverapp')

app.config_from_object('django.conf:settings')
#This means that you don’t have to use multiple configuration files, and instead configure Celery directly from the Django settings.
#You can pass the object directly here, but using a string is better since then the worker doesn’t have to serialize the object.

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
#With the line above Celery will automatically discover tasks in reusable apps if you define all tasks in a separate tasks.py module.
#The tasks.py should be in dir which is added to INSTALLED_APP in settings.py.
#So you do not have to manually add the individual modules to the CELERY_IMPORT in settings.py.

@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request)) #dumps its own request information

5.(4) 創(chuàng)建任務(wù) tasks

每個(gè)任務(wù)本質(zhì)上就是一個(gè)函數(shù),在tasks.py中,寫(xiě)入你想要執(zhí)行的函數(shù)即可。

在應(yīng)用art中添加我們需要提供的異步服務(wù)和定時(shí)服務(wù)

vim art/tasks.py

#!/usr/bin/env python

encoding: utf-8

from future import absolute_import
import time
from django.core.mail import send_mail
from celery.utils.log import get_task_logger
from artproject.celery import app
?
from art.utils.send_mail import pack_html, send_email?
@app.task
br/>?
@app.task
url = "http://1000phone.com"
receiver = 'zhouguangyou@1000phone.com'
content = pack_html(receiver, url)

content = 'this is email content.'

send_email(receiver, content)
print('send email ok!')?
?
@app.task
br/>?
?
@app.task
return x+y
6.遷移生成celery需要的數(shù)據(jù)表

python manage.py migrate
此時(shí)數(shù)據(jù)庫(kù)表結(jié)構(gòu)多出了幾個(gè)

7.
8.
9.啟動(dòng)服務(wù),測(cè)試
我們可以采用 python manage.py help 發(fā)現(xiàn)多出了 celery 相關(guān)選項(xiàng)。

(1)啟動(dòng)django celery 服務(wù)

啟動(dòng)服務(wù):

python manage.py celery worker --loglevel=info

此時(shí)異步處理和定時(shí)處理服務(wù)都已經(jīng)啟動(dòng)了

(2)web端接口觸發(fā)異步任務(wù)處理

我們?cè)趙eb端加入一個(gè)入口,觸發(fā)異步任務(wù)處理add函數(shù)

在應(yīng)用art的urls.py 中加入如下對(duì)應(yīng)關(guān)系

from art.views import add_handler
?
?
url(r'^add', add_handler),

art/views.py 中加入處理邏輯

def add_handler(request):
x = request.GET.get('x', '1')
y = request.GET.get('y', '1')
from .tasks import add
add.delay(int(x), int(y))
res = {'code':200, 'message':'ok', 'data':[{'x':x, 'y':y}]}
return HttpResponse(json.dumps(res))

啟動(dòng)web服務(wù),通過(guò)url傳入的參數(shù),通過(guò)handler的add.delay(x, y)計(jì)算并存入mysql

http://127.0.0.1:8000/art/add?x=188&y=22

(4) 測(cè)試定時(shí)器,發(fā)送郵件

在終端輸入 python manage.py celerybeat -l info

會(huì)自動(dòng)觸發(fā)每隔30s執(zhí)行一次tsend_email定時(shí)器函數(shù),發(fā)送郵件:

CELERYBEAT_SCHEDULE = { #定時(shí)器策略
#定時(shí)任務(wù)一: 每隔30s運(yùn)行一次
u'測(cè)試定時(shí)器1': {
"task": "art.tasks.tsend_email",
#"schedule": crontab(minute='*/2'), # or 'schedule': timedelta(seconds=3),
"schedule":timedelta(seconds=30),
"args": (),
},
}
具體發(fā)送郵件服務(wù)程序見(jiàn)下面的第4節(jié)

4 郵件發(fā)送服務(wù)
項(xiàng)目中經(jīng)常會(huì)有定時(shí)發(fā)送郵件的情形,比如發(fā)送數(shù)據(jù)報(bào)告,發(fā)送異常服務(wù)報(bào)告等。

可以編輯文件 art/utils/send_mail.py, 內(nèi)容編輯如下:

#!/usr/bin/env python
#-- coding:utf-8 --
#written by zhouguangyou
#發(fā)送郵件(wd_email_check123賬號(hào)用于內(nèi)部測(cè)試使用,不要用于其他用途)
?
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.header import Header
import time
?
sender = 'wwwwww@163.com'
subject = u'api開(kāi)放平臺(tái)郵箱驗(yàn)證'
smtpserver = 'smtp.163.com'
username = 'wwwwww'
password = 'wwwww1234'
mail_postfix="163.com"
?
def send_email(receiver, content):
try:
me = username+"<"+username+"@"+mail_postfix+">"
msg = MIMEText(content, 'html', 'utf-8')
msg['Subject'] = subject
msg['From'] = sender
msg['To'] = receiver
smtp = smtplib.SMTP()
smtp.connect(smtpserver)
smtp.login(username, password)
smtp.sendmail(sender, receiver, msg.as_string())
smtp.quit()
return True
except Exception as e:
print('send_email has error with : ' + str(e))
return False
?
?
def pack_html(receiver, url):
html_content = u"<html><div>尊敬的用戶<font color='#0066FF'>%s</font> 您好!</div><br>" \
"<div>感謝您關(guān)注我們的平臺(tái) ,我們將為您提供最貼心的服務(wù),祝您購(gòu)物愉快。</div><br>" \
"<div>點(diǎn)擊以下鏈接,即可完成郵箱安全驗(yàn)證:</div><br>" \
"<div><a href='%s'>%s</a></div><br>" \
"<div>為保障您的帳號(hào)安全,請(qǐng)?jiān)?4小時(shí)內(nèi)點(diǎn)擊該鏈接; </div><br>" \
"<div>若您沒(méi)有申請(qǐng)過(guò)驗(yàn)證郵箱 ,請(qǐng)您忽略此郵件,由此給您帶來(lái)的不便請(qǐng)諒解。</div>" \
"</html>" % (receiver, url, url)
html_content = html_content
return html_content
?
?
if name == "main":
url = "http://xxxx.com"
receiver = 'xxx@126.com'
#content = pack_html(receiver, url)
content = 'this is email content. at %s.'%int(time.time())
send_email(receiver, content)

至此,在celery ui界面可以看到兩類,定時(shí)器處理和異步處理。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI