您好,登錄后才能下訂單哦!
起步
Celery 是一個(gè)簡(jiǎn)單、靈活且可靠的,處理大量消息的分布式系統(tǒng),并且提供維護(hù)這樣一個(gè)系統(tǒng)的必需工具。它是一個(gè)專注于實(shí)時(shí)處理的任務(wù)隊(duì)列,同時(shí)也支持任務(wù)調(diào)度。
運(yùn)行模式是生產(chǎn)者消費(fèi)者模式:
任務(wù)隊(duì)列:任務(wù)隊(duì)列是一種在線程或機(jī)器間分發(fā)任務(wù)的機(jī)制。
消息隊(duì)列:消息隊(duì)列的輸入是工作的一個(gè)單元,稱為任務(wù),獨(dú)立的職程(Worker)進(jìn)程持續(xù)監(jiān)視隊(duì)列中是否有需要處理的新任務(wù)。
Celery 用消息通信,通常使用中間人(Broker)在客戶端和職程間斡旋。這個(gè)過(guò)程從客戶端向隊(duì)列添加消息開始,之后中間人把消息派送給職程,職程對(duì)消息進(jìn)行處理。
Celery的架構(gòu)由三部分組成,消息中間件(message broker),任務(wù)執(zhí)行單元(worker)和任務(wù)執(zhí)行結(jié)果存儲(chǔ)(task result store)組成。
消息中間件:Celery本身不提供消息服務(wù),但是可以方便的和第三方提供的消息中間件集成,包括,RabbitMQ, Redis, MongoDB等,本文使用 redis 。
任務(wù)執(zhí)行單元:Worker是Celery提供的任務(wù)執(zhí)行的單元,worker并發(fā)的運(yùn)行在分布式的系統(tǒng)節(jié)點(diǎn)中
任務(wù)結(jié)果存儲(chǔ):Task result store用來(lái)存儲(chǔ)Worker執(zhí)行的任務(wù)的結(jié)果,Celery支持以不同方式存儲(chǔ)任務(wù)的結(jié)果,包括Redis,MongoDB,Django ORM,AMQP等,這里我先不去看它是如何存儲(chǔ)的,就先選用Redis來(lái)存儲(chǔ)任務(wù)執(zhí)行結(jié)果。
安裝
通過(guò) pip 命令即可安裝:
pip install celery
本文使用 redis 做消息中間件,所以需要在安裝:
pip install redis
redis軟件也要安裝,官網(wǎng)只提供了 linux 版本的下載:https://redis.io/download,windows 的可以到 https://github.com/MicrosoftArchive/redis 下載 exe 安裝包。
簡(jiǎn)單的demo
為了運(yùn)行一個(gè)簡(jiǎn)單的任務(wù),從中說(shuō)明 celery 的使用方式。在項(xiàng)目文件夾內(nèi)創(chuàng)建 app.py 和 tasks.py 。tasks.py 用來(lái)定義任務(wù):
# tasks.py import time from celery import Celery broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery('my_tasks', broker=broker, backend=backend) @app.task def add(x, y): print('enter task') time.sleep(3) return x + y
這些代碼做了什么事。 broker 指定任務(wù)隊(duì)列的消息中間件,backend 指定了任務(wù)執(zhí)行結(jié)果的存儲(chǔ)。app 就是我們創(chuàng)建的 Celery 對(duì)象。通過(guò) app.task 修飾器將 add 函數(shù)變成一個(gè)一部的任務(wù)。
# app.py from tasks import add if __name__ == '__main__': print('start task') result = add.delay(2, 18) print('end task') print(result)
add.delay 函數(shù)將任務(wù)序列化發(fā)送到消息中間件。終端執(zhí)行 python app.py 可以看到輸出一個(gè)任務(wù)的唯一識(shí)別:
start task
end task
79ef4736-1ecb-4afd-aa5e-b532657acd43
這個(gè)只是將任務(wù)推送到 redis,任務(wù)還沒(méi)被消費(fèi),任務(wù)會(huì)在 celery 隊(duì)列中。
開啟 celery woker 可以將任務(wù)進(jìn)行消費(fèi):
celery worker -A tasks -l info # -A 后是模塊名
A 參數(shù)指定了celery 對(duì)象的位置,l 參數(shù)指定woker的日志級(jí)別。
如果此命令在終端報(bào)錯(cuò):
File "e:\workspace\.env\lib\site-packages\celery\app\trace.py", line 537, in _fast_trace_task
tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)
這是win 10 在使用 Celery 4.x 的時(shí)候會(huì)有這個(gè)問(wèn)題,解決方式可以是改用 Celery 3.x 版本,或者按照 Unable to run tasks under Windows 上提供的方式,該issue提供了兩種方式解決,一種是安裝 eventlet 擴(kuò)展:
pip install eventlet celery -A <mymodule> worker -l info -P eventlet
另一種方式是添加個(gè) FORKED_BY_MULTIPROCESSING = 1 的環(huán)境變量(推薦這種方式):
import os os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
如果一切順利,woker 正常啟動(dòng),就能在終端看到任務(wù)被消費(fèi)了:
[2018-11-27 13:59:27,830: INFO/MainProcess] Received task: tasks.add[745e5be7-4675-4f84-9d57-3f5e91c33a19]
[2018-11-27 13:59:27,831: WARNING/SpawnPoolWorker-2] enter task
[2018-11-27 13:59:30,835: INFO/SpawnPoolWorker-2] Task tasks.add[745e5be7-4675-4f84-9d57-3f5e91c33a19] succeeded in 3.0s: 20
說(shuō)明我們的demo已經(jīng)成功了。
使用配置文件
在上面的demo中,是將broker和backend直接寫在代碼中的,而 Celery 還有其他配置,最好是寫出配置文件的形式,基本配置項(xiàng)有:
整理一下目錄結(jié)構(gòu),將我們的任務(wù)封裝成包:
內(nèi)容如下:
# __init__.py import os from celery import Celery os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1') app = Celery('demo') # 通過(guò) Celery 實(shí)例加載配置模塊 app.config_from_object('celery_app.celery_config') # celery_config.py BROKER_URL = 'redis://127.0.0.1:6379/1' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2' # UTC CELERY_ENABLE_UTC = True CELERY_TIMEZONE = 'Asia/Shanghai' # 導(dǎo)入指定的任務(wù)模塊 CELERY_IMPORTS = ( 'celery_app.task1', 'celery_app.task2', ) # task1.py import time from celery_app import app @app.task def add(x, y): print('enter task') time.sleep(3) return x + y # task2.py import time from celery_app import app @app.task def mul(x, y): print('enter task') time.sleep(4) return x * y # app.py from celery_app import task1 if __name__ == '__main__': pass print('start task') result = task1.add.delay(2, 18) print('end task') print(result)
提交任務(wù)與啟動(dòng)worker:
$ python app.py $ celery worker -A celery_app -l info
result = task1.add.delay(2, 18) 返回的是一個(gè)任務(wù)對(duì)象,通過(guò) delay 函數(shù)的方式可以發(fā)現(xiàn)這個(gè)過(guò)程是非阻塞的,這個(gè)任務(wù)對(duì)象有一個(gè)方法:
r.ready() # 查看任務(wù)狀態(tài),返回布爾值, 任務(wù)執(zhí)行完成, 返回 True, 否則返回 False. r.wait() # 等待任務(wù)完成, 返回任務(wù)執(zhí)行結(jié)果,很少使用; r.get(timeout=1) # 獲取任務(wù)執(zhí)行結(jié)果,可以設(shè)置等待時(shí)間 r.result # 任務(wù)執(zhí)行結(jié)果. r.state # PENDING, START, SUCCESS,任務(wù)當(dāng)前的狀態(tài) r.status # PENDING, START, SUCCESS,任務(wù)當(dāng)前的狀態(tài) r.successful # 任務(wù)成功返回true r.traceback # 如果任務(wù)拋出了一個(gè)異常,你也可以獲取原始的回溯信息
定時(shí)任務(wù)
定時(shí)任務(wù)的功能類似 crontab,可以完成每日統(tǒng)計(jì)任務(wù)等。首先我們需要配置一下 schedule,通過(guò)改造上面的配置文件,添加 CELERYBEAT_SCHEDULE 配置:
import datetime from celery.schedules import crontab CELERYBEAT_SCHEDULE = { 'task1-every-1-min': { 'task': 'celery_app.task1.add', 'schedule': datetime.timedelta(seconds=60), 'args': (2, 15), }, 'task2-once-a-day': { 'task': 'celery_app.task2.mul', 'schedule': crontab(hour=15, minute=23), 'args': (3, 6), } }
task 指定要執(zhí)行的任務(wù);schedule 表示計(jì)劃的時(shí)間,datetime.timedelta(seconds=60) 表示間隔一分鐘,這里其實(shí)也可以是 crontab(minute='*/1') 來(lái)替換;args 表示要傳遞的參數(shù)。
啟動(dòng) celery beat:
$ celery worker -A celery_app -l info
我們目前是用兩個(gè)窗口來(lái)執(zhí)行 woker 和 beat 。當(dāng)然也可以只使用一個(gè)窗口來(lái)運(yùn)行(僅限linux系統(tǒng)):
$ celery -B -A celery_app worker -l info
celery.task 裝飾器
@celery.task() def name(): pass
task() 方法將任務(wù)修飾成異步, name 可以顯示指定的任務(wù)名字;serializer 指定序列化的方式;bind 一個(gè)bool值,若為True,則task實(shí)例會(huì)作為第一個(gè)參數(shù)傳遞到任務(wù)方法中,可以訪問(wèn)task實(shí)例的所有的屬性,即前面反序列化中那些屬性。
@task(bind=True) # 第一個(gè)參數(shù)是self,使用self.request訪問(wèn)相關(guān)的屬性 def add(self, x, y): logger.info(self.request.id)
base 可以指定任務(wù)積累,可以用來(lái)定義回調(diào)函數(shù):
import celery class MyTask(celery.Task): # 任務(wù)失敗時(shí)執(zhí)行 def on_failure(self, exc, task_id, args, kwargs, einfo): print('{0!r} failed: {1!r}'.format(task_id, exc)) # 任務(wù)成功時(shí)執(zhí)行 def on_success(self, retval, task_id, args, kwargs): pass # 任務(wù)重試時(shí)執(zhí)行 def on_retry(self, exc, task_id, args, kwargs, einfo): pass @task(base=MyTask) def add(x, y): raise KeyError() exc:失敗時(shí)的錯(cuò)誤的類型; task_id:任務(wù)的id; args:任務(wù)函數(shù)的參數(shù); kwargs:參數(shù); einfo:失敗時(shí)的異常詳細(xì)信息; retval:任務(wù)成功執(zhí)行的返回值;
總結(jié)
網(wǎng)上找了一份比較常用的配置文件,需要的時(shí)候可以參考下:
# 注意,celery4版本后,CELERY_BROKER_URL改為BROKER_URL BROKER_URL = 'amqp://username:passwd@host:port/虛擬主機(jī)名' # 指定結(jié)果的接受地址 CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db' # 指定任務(wù)序列化方式 CELERY_TASK_SERIALIZER = 'msgpack' # 指定結(jié)果序列化方式 CELERY_RESULT_SERIALIZER = 'msgpack' # 任務(wù)過(guò)期時(shí)間,celery任務(wù)執(zhí)行結(jié)果的超時(shí)時(shí)間 CELERY_TASK_RESULT_EXPIRES = 60 * 20 # 指定任務(wù)接受的序列化類型. CELERY_ACCEPT_CONTENT = ["msgpack"] # 任務(wù)發(fā)送完成是否需要確認(rèn),這一項(xiàng)對(duì)性能有一點(diǎn)影響 CELERY_ACKS_LATE = True # 壓縮方案選擇,可以是zlib, bzip2,默認(rèn)是發(fā)送沒(méi)有壓縮的數(shù)據(jù) CELERY_MESSAGE_COMPRESSION = 'zlib' # 規(guī)定完成任務(wù)的時(shí)間 CELERYD_TASK_TIME_LIMIT = 5 # 在5s內(nèi)完成任務(wù),否則執(zhí)行該任務(wù)的worker將被殺死,任務(wù)移交給父進(jìn)程 # celery worker的并發(fā)數(shù),默認(rèn)是服務(wù)器的內(nèi)核數(shù)目,也是命令行-c參數(shù)指定的數(shù)目 CELERYD_CONCURRENCY = 4 # celery worker 每次去rabbitmq預(yù)取任務(wù)的數(shù)量 CELERYD_PREFETCH_MULTIPLIER = 4 # 每個(gè)worker執(zhí)行了多少任務(wù)就會(huì)死掉,默認(rèn)是無(wú)限的 CELERYD_MAX_TASKS_PER_CHILD = 40 # 這是使用了django-celery默認(rèn)的數(shù)據(jù)庫(kù)調(diào)度模型,任務(wù)執(zhí)行周期都被存在你指定的orm數(shù)據(jù)庫(kù)中 # CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 設(shè)置默認(rèn)的隊(duì)列名稱,如果一個(gè)消息不符合其他的隊(duì)列就會(huì)放在默認(rèn)隊(duì)列里面,如果什么都不設(shè)置的話,數(shù)據(jù)都會(huì)發(fā)送到默認(rèn)的隊(duì)列中 CELERY_DEFAULT_QUEUE = "default" # 設(shè)置詳細(xì)的隊(duì)列 CELERY_QUEUES = { "default": { # 這是上面指定的默認(rèn)隊(duì)列 "exchange": "default", "exchange_type": "direct", "routing_key": "default" }, "topicqueue": { # 這是一個(gè)topic隊(duì)列 凡是topictest開頭的routing key都會(huì)被放到這個(gè)隊(duì)列 "routing_key": "topic.#", "exchange": "topic_exchange", "exchange_type": "topic", }, "task_eeg": { # 設(shè)置扇形交換機(jī) "exchange": "tasks", "exchange_type": "fanout", "binding_key": "tasks", }, }
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持億速云。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。