您好,登錄后才能下訂單哦!
在web開(kāi)發(fā)中我們經(jīng)常會(huì)遇到一些耗時(shí)的操作,比如發(fā)送郵件/短信,執(zhí)行各種任務(wù)等等,這時(shí)我們會(huì)采取異步的方式去執(zhí)行這些任務(wù),而celery就是這樣的一個(gè)異步的分布式任務(wù)處理框架,官方文檔
今天,我們的主題是celery如何與flask一起工作,我們都知道,flask是一個(gè)非常小巧的web框架,有許許多多的擴(kuò)展,celery也不例外,我們先看下目前常用的幾個(gè)flask-celery的擴(kuò)展:
除這些擴(kuò)展之外,其實(shí)flask的官方文檔中已經(jīng)給出了在flask中使用celery的方式,不過(guò),那是一個(gè)單文件中運(yùn)行flask的demo,在實(shí)際項(xiàng)目中使用,還是有許多需要注意的地方,接下來(lái),我們就一起探究下如何在flask項(xiàng)目中使用celery。
├── celery_task # celery任務(wù)相關(guān)
│ ├── __init__.py
│ ├── tasks.py
│ └── test.py
├── manage.py # celery worker實(shí)例
├── requirements.txt # 依賴包
└── test_api # flask 項(xiàng)目
├── api # 藍(lán)本相關(guān)
│ ├── __init__.py
│ └── v1
│ ├── __init__.py
│ └── views.py
├── extensions.py # 擴(kuò)展初始化
├── __init__.py # flask app
├── models.py # 模型文件
└── settings.py # 配置文件
本項(xiàng)目中沒(méi)有使用擴(kuò)展,只是基于官方文檔中的示例做進(jìn)一步的應(yīng)用。
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
這是一個(gè)celery的工廠函數(shù),使用flask app中的配置設(shè)置celery相關(guān)的屬性,并且更改了celery對(duì)象的Task,使其能夠使用flask的應(yīng)用上下文,這一點(diǎn)非常重要。我們將這段代碼放置到flask項(xiàng)目初始化文件中去也就是testapi/__init_\.py
celerytask/__init_\.py
rom test_api import create_app, make_celery
app = create_app()
celery = make_celery(app)
class MyTask(celery.Task): # celery 基類
def on_success(self, retval, task_id, args, kwargs):
# 執(zhí)行成功的操作
print('MyTasks 基類回調(diào),任務(wù)執(zhí)行成功')
return super(MyTask, self).on_success(retval, task_id, args, kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
# 執(zhí)行失敗的操作
# 任務(wù)執(zhí)行失敗,可以調(diào)用接口進(jìn)行失敗報(bào)警等操作
print('MyTasks 基類回調(diào),任務(wù)執(zhí)行失敗')
return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)
這里我對(duì)Task做了進(jìn)一步的定制,用于添加一些任務(wù)信息。
import datetime
import time
import os
import random
from flask import current_app
from test_api.models import User
from test_api.extensions import db
from celery_task import celery, MyTask
@celery.task(bind=True, base=MyTask)
def apptask(self):
print(current_app.config)
print("==============%s " % current_app.config["SQLALCHEMY_DATABASE_URI"])
print("++++++++++++++%s " % os.getenv("DATABASE_URL"))
time.sleep(5)
user = User(username="user%s" % random.randint(1,100))
db.session.add(user)
db.session.commit()
return 'success'
這個(gè)任務(wù)很簡(jiǎn)單,使用User模型類異步向數(shù)據(jù)庫(kù)中添加數(shù)據(jù),為了體現(xiàn)耗時(shí)操作,使用sleep函數(shù)模擬。
test_api/api/v1/views.py
from flask import jsonify
from celery_task.tasks import apptask
from test_api.api.v1 import api_v1
from test_api.extensions import db
from flask import current_app
@api_v1.route("/", methods=["GET"])
def index():
r = apptask.apply_async()
return jsonify({"status": "success"})
視圖函數(shù)非常的簡(jiǎn)單,只做了提交任務(wù)的操作。
為了避免循環(huán)導(dǎo)入問(wèn)題,我們?cè)陧?xiàng)目根目錄下新建manage.py
from test_api import create_app, make_celery
app = create_app()
celery = make_celery(app)
if __name__ == '__main__':
app.run()
這個(gè)文件只用來(lái)啟動(dòng)celery,啟動(dòng)命令如下:
# celery worker -A manage:celery -l debug
看到如下輸出,表明啟動(dòng)成功:
-------------- celery@test-3 v4.4.0 (cliffs)
--- ***** -----
-- ******* ---- Linux-3.10.0-693.2.2.el7.x86_64-x86_64-with-centos-7.4.1708-Core 2020-03-03 21:14:13
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: test_api:0x7f87c31a4e48
- ** ---------- .> transport: redis://127.0.0.1:6379/3
- ** ---------- .> results: redis://127.0.0.1:6379/4
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery.accumulate
. celery.backend_cleanup
. celery.chain
. celery.chord
. celery.chord_unlock
. celery.chunks
. celery.group
. celery.map
. celery.starmap
. celery_task.tasks.apptask
[2020-03-03 21:14:13,632: DEBUG/MainProcess] | Worker: Starting Hub
[2020-03-03 21:14:13,632: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:13,632: DEBUG/MainProcess] | Worker: Starting Pool
[2020-03-03 21:14:13,690: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:13,691: DEBUG/MainProcess] | Worker: Starting Consumer
[2020-03-03 21:14:13,691: DEBUG/MainProcess] | Consumer: Starting Connection
[2020-03-03 21:14:13,708: INFO/MainProcess] Connected to redis://127.0.0.1:6379/3
[2020-03-03 21:14:13,708: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:13,708: DEBUG/MainProcess] | Consumer: Starting Events
[2020-03-03 21:14:13,718: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:13,718: DEBUG/MainProcess] | Consumer: Starting Mingle
[2020-03-03 21:14:13,718: INFO/MainProcess] mingle: searching for neighbors
[2020-03-03 21:14:14,743: INFO/MainProcess] mingle: all alone
[2020-03-03 21:14:14,743: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:14,744: DEBUG/MainProcess] | Consumer: Starting Gossip
[2020-03-03 21:14:14,748: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:14,748: DEBUG/MainProcess] | Consumer: Starting Heart
[2020-03-03 21:14:14,750: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:14,750: DEBUG/MainProcess] | Consumer: Starting Tasks
[2020-03-03 21:14:14,756: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:14,756: DEBUG/MainProcess] | Consumer: Starting Control
[2020-03-03 21:14:14,759: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:14,759: DEBUG/MainProcess] | Consumer: Starting event loop
[2020-03-03 21:14:14,759: DEBUG/MainProcess] | Worker: Hub.register Pool...
[2020-03-03 21:14:14,760: INFO/MainProcess] celery@test-3 ready.
[2020-03-03 21:14:14,760: DEBUG/MainProcess] basic.qos: prefetch_count->8
啟動(dòng)flask:
# flask run
* Serving Flask app "test_api" (lazy loading)
* Environment: development
* Debug mode: on
* Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)
* Restarting with stat
* Debugger is active!
* Debugger PIN: 237-492-852
調(diào)試接口:
# curl http://127.0.0.1:5000/api/v1/
{
"status": "success"
}
查看celery日志:
[2020-03-03 21:17:31,330: WARNING/ForkPoolWorker-2]
[2020-03-03 21:17:31,330: DEBUG/MainProcess] Task accepted: celery_task.tasks.apptask[5f27a148-161f-4485-931f-17d94637168e] pid:2341
[2020-03-03 21:17:36,391: WARNING/ForkPoolWorker-2] MyTasks 基類回調(diào),任務(wù)執(zhí)行成功
[2020-03-03 21:17:36,392: INFO/ForkPoolWorker-2] Task celery_task.tasks.apptask[5f27a148-161f-4485-931f-17d94637168e] succeeded in 5.0624741315841675s: 'success'
任務(wù)執(zhí)行成功,查看數(shù)據(jù)庫(kù)數(shù)據(jù):
mysql> select * from user order by id;
+----+----------+
| id | username |
+----+----------+
| 1 | user26 |
| 2 | user69 |
| 3 | user71 |
| 4 | user35 |
| 5 | user13 |
| 6 | user54 |
| 7 | user88 |
| 8 | user63 |
| 9 | user87 |
| 10 | user90 |
| 11 | user3 |
| 12 | user18 |
| 13 | user65 |
+----+----------+
數(shù)據(jù)已被插入,實(shí)驗(yàn)成功!
有幾個(gè)坑希望大家注意下
出錯(cuò)文件: testapi/__init_\.py
import os
import click
from flask import Flask, jsonify
from test_api.api.v1 import api_v1 # 藍(lán)圖在上方導(dǎo)入,循環(huán)報(bào)錯(cuò)產(chǎn)生
from test_api.settings import config
from test_api.models import User
from celery import Celery
def make_celery(app):
...
def create_app(config_name=None):
if config_name is None:
config_name = os.getenv('FLASK_ENV', 'development')
app = Flask('test_api')
app.config.from_object(config[config_name])
register_extensions(app)
register_blueprints(app)
register_commands(app)
register_errors(app)
return app
# 注冊(cè)藍(lán)圖函數(shù)
def register_blueprints(app):
app.register_blueprint(api_v1, url_prefix='/api/v1')
啟動(dòng)celery和請(qǐng)求接口時(shí)均會(huì)報(bào)錯(cuò),錯(cuò)誤堆棧如下:
from test_api import create_app, make_celery
File "/tmp/test/test_api/__init__.py", line 5, in <module>
from test_api.api.v1 import api_v1
File "/tmp/test/test_api/api/v1/__init__.py", line 9, in <module>
from test_api.api.v1 import views
File "/tmp/test/test_api/api/v1/views.py", line 2, in <module>
from celery_task.tasks import apptask
File "/tmp/test/celery_task/__init__.py", line 1, in <module>
from test_api import create_app, make_celery
ImportError: cannot import name 'create_app'
將藍(lán)圖的導(dǎo)入下放置藍(lán)圖注冊(cè)函數(shù)中testapi/__init_\.py:
...
def register_blueprints(app):
from test_api.api.v1 import api_v1
app.register_blueprint(api_v1, url_prefix='/api/v1')
...
提交任務(wù),celery報(bào)錯(cuò)如下:
...
options = self.get_options(sa_url, echo)
File "/tmp/py3/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 575, in get_options
self._sa.apply_driver_hacks(self._app, sa_url, options)
File "/tmp/py3/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 877, in apply_driver_hacks
if sa_url.drivername.startswith('mysql'):
AttributeError: 'NoneType' object has no attribute 'drivername'
通過(guò)調(diào)試我發(fā)現(xiàn),flask的app的配置是可以拿到的,因?yàn)槲覀冊(cè)诠S函數(shù)中推送了應(yīng)用上下文,我的數(shù)據(jù)庫(kù)配置信息是以鍵值的形式寫(xiě)在了.env文件中,這也是目前flask推薦的方式。那為什么celery取不到數(shù)據(jù)庫(kù)連接配置呢?其實(shí),啟動(dòng)celery的app和我們web服務(wù)所用app是兩個(gè)獨(dú)立的app,celery無(wú)法通過(guò).env中的環(huán)境變量取到相應(yīng)的值,這里有三種解決辦法:
不使用環(huán)境變量的方式,直接將相關(guān)信息寫(xiě)在配置文件中例如: SQLALCHEMY_DATABASE_URI = "mysql+pymysql://xxx:xxx@127.0.0.1:3306/test?charset=utf8"
相比之下,方案三是采納比較多的,于是我們?cè)趖est_api/settings.py文件中加入如下代碼:
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())
find_dotenv函數(shù)會(huì)在當(dāng)前以及父目錄中搜尋.env文件,load_dotenv函數(shù)則負(fù)責(zé)加載環(huán)境變量。如此,大功告成。我們可以繼續(xù)愉快擼代碼啦。
附:項(xiàng)目源碼
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。