溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

詳解分布式任務(wù)隊(duì)列Celery使用說(shuō)明

發(fā)布時(shí)間:2020-08-27 07:08:40 來(lái)源:腳本之家 閱讀:465 作者:棲遲于一丘 欄目:開發(fā)技術(shù)

起步

Celery 是一個(gè)簡(jiǎn)單、靈活且可靠的,處理大量消息的分布式系統(tǒng),并且提供維護(hù)這樣一個(gè)系統(tǒng)的必需工具。它是一個(gè)專注于實(shí)時(shí)處理的任務(wù)隊(duì)列,同時(shí)也支持任務(wù)調(diào)度。

運(yùn)行模式是生產(chǎn)者消費(fèi)者模式:

詳解分布式任務(wù)隊(duì)列Celery使用說(shuō)明

任務(wù)隊(duì)列:任務(wù)隊(duì)列是一種在線程或機(jī)器間分發(fā)任務(wù)的機(jī)制。

消息隊(duì)列:消息隊(duì)列的輸入是工作的一個(gè)單元,稱為任務(wù),獨(dú)立的職程(Worker)進(jìn)程持續(xù)監(jiān)視隊(duì)列中是否有需要處理的新任務(wù)。

Celery 用消息通信,通常使用中間人(Broker)在客戶端和職程間斡旋。這個(gè)過(guò)程從客戶端向隊(duì)列添加消息開始,之后中間人把消息派送給職程,職程對(duì)消息進(jìn)行處理。

Celery的架構(gòu)由三部分組成,消息中間件(message broker),任務(wù)執(zhí)行單元(worker)和任務(wù)執(zhí)行結(jié)果存儲(chǔ)(task result store)組成。

消息中間件:Celery本身不提供消息服務(wù),但是可以方便的和第三方提供的消息中間件集成,包括,RabbitMQ, Redis, MongoDB等,本文使用 redis 。

任務(wù)執(zhí)行單元:Worker是Celery提供的任務(wù)執(zhí)行的單元,worker并發(fā)的運(yùn)行在分布式的系統(tǒng)節(jié)點(diǎn)中

任務(wù)結(jié)果存儲(chǔ):Task result store用來(lái)存儲(chǔ)Worker執(zhí)行的任務(wù)的結(jié)果,Celery支持以不同方式存儲(chǔ)任務(wù)的結(jié)果,包括Redis,MongoDB,Django ORM,AMQP等,這里我先不去看它是如何存儲(chǔ)的,就先選用Redis來(lái)存儲(chǔ)任務(wù)執(zhí)行結(jié)果。

安裝

通過(guò) pip 命令即可安裝:

pip install celery

本文使用 redis 做消息中間件,所以需要在安裝:

pip install redis

redis軟件也要安裝,官網(wǎng)只提供了 linux 版本的下載:https://redis.io/download,windows 的可以到 https://github.com/MicrosoftArchive/redis 下載 exe 安裝包。

簡(jiǎn)單的demo

為了運(yùn)行一個(gè)簡(jiǎn)單的任務(wù),從中說(shuō)明 celery 的使用方式。在項(xiàng)目文件夾內(nèi)創(chuàng)建 app.py 和 tasks.py 。tasks.py 用來(lái)定義任務(wù):

# tasks.py
import time
from celery import Celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('my_tasks', broker=broker, backend=backend)

@app.task
def add(x, y):
  print('enter task')
  time.sleep(3)
  return x + y

這些代碼做了什么事。 broker 指定任務(wù)隊(duì)列的消息中間件,backend 指定了任務(wù)執(zhí)行結(jié)果的存儲(chǔ)。app 就是我們創(chuàng)建的 Celery 對(duì)象。通過(guò) app.task 修飾器將 add 函數(shù)變成一個(gè)一部的任務(wù)。

# app.py
from tasks import add

if __name__ == '__main__':
  print('start task')
  result = add.delay(2, 18)
  print('end task')
  print(result)

add.delay 函數(shù)將任務(wù)序列化發(fā)送到消息中間件。終端執(zhí)行 python app.py 可以看到輸出一個(gè)任務(wù)的唯一識(shí)別:

start task
end task
79ef4736-1ecb-4afd-aa5e-b532657acd43

這個(gè)只是將任務(wù)推送到 redis,任務(wù)還沒(méi)被消費(fèi),任務(wù)會(huì)在 celery 隊(duì)列中。

開啟 celery woker 可以將任務(wù)進(jìn)行消費(fèi):

celery worker -A tasks -l info  # -A 后是模塊名

A 參數(shù)指定了celery 對(duì)象的位置,l 參數(shù)指定woker的日志級(jí)別。

如果此命令在終端報(bào)錯(cuò):

  File "e:\workspace\.env\lib\site-packages\celery\app\trace.py", line 537, in _fast_trace_task
    tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)

這是win 10 在使用 Celery 4.x 的時(shí)候會(huì)有這個(gè)問(wèn)題,解決方式可以是改用 Celery 3.x 版本,或者按照 Unable to run tasks under Windows 上提供的方式,該issue提供了兩種方式解決,一種是安裝 eventlet 擴(kuò)展:

pip install eventlet
celery -A <mymodule> worker -l info -P eventlet

另一種方式是添加個(gè) FORKED_BY_MULTIPROCESSING = 1 的環(huán)境變量(推薦這種方式):

import os
os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')

如果一切順利,woker 正常啟動(dòng),就能在終端看到任務(wù)被消費(fèi)了:

[2018-11-27 13:59:27,830: INFO/MainProcess] Received task: tasks.add[745e5be7-4675-4f84-9d57-3f5e91c33a19]
[2018-11-27 13:59:27,831: WARNING/SpawnPoolWorker-2] enter task
[2018-11-27 13:59:30,835: INFO/SpawnPoolWorker-2] Task tasks.add[745e5be7-4675-4f84-9d57-3f5e91c33a19] succeeded in 3.0s: 20

說(shuō)明我們的demo已經(jīng)成功了。

使用配置文件
在上面的demo中,是將broker和backend直接寫在代碼中的,而 Celery 還有其他配置,最好是寫出配置文件的形式,基本配置項(xiàng)有:

  • CELERY_DEFAULT_QUEUE:默認(rèn)隊(duì)列
  • BROKER_URL  : 代理人的網(wǎng)址
  • CELERY_RESULT_BACKEND:結(jié)果存儲(chǔ)地址
  • CELERY_TASK_SERIALIZER:任務(wù)序列化方式
  • CELERY_RESULT_SERIALIZER:任務(wù)執(zhí)行結(jié)果序列化方式
  • CELERY_TASK_RESULT_EXPIRES:任務(wù)過(guò)期時(shí)間
  • CELERY_ACCEPT_CONTENT:指定任務(wù)接受的內(nèi)容序列化類型(序列化),一個(gè)列表;

整理一下目錄結(jié)構(gòu),將我們的任務(wù)封裝成包:

詳解分布式任務(wù)隊(duì)列Celery使用說(shuō)明

內(nèi)容如下:

# __init__.py
import os
from celery import Celery

os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
app = Celery('demo')

# 通過(guò) Celery 實(shí)例加載配置模塊
app.config_from_object('celery_app.celery_config')

# celery_config.py
BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'

# UTC
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = 'Asia/Shanghai'

# 導(dǎo)入指定的任務(wù)模塊
CELERY_IMPORTS = (
  'celery_app.task1',
  'celery_app.task2',
)

# task1.py
import time
from celery_app import app

@app.task
def add(x, y):
  print('enter task')
  time.sleep(3)
  return x + y

# task2.py
import time
from celery_app import app

@app.task
def mul(x, y):
  print('enter task')
  time.sleep(4)
  return x * y

# app.py
from celery_app import task1

if __name__ == '__main__':
  pass
  print('start task')
  result = task1.add.delay(2, 18)
  print('end task')
  print(result)

提交任務(wù)與啟動(dòng)worker:

$ python app.py
$ celery worker -A celery_app -l info

result = task1.add.delay(2, 18) 返回的是一個(gè)任務(wù)對(duì)象,通過(guò) delay 函數(shù)的方式可以發(fā)現(xiàn)這個(gè)過(guò)程是非阻塞的,這個(gè)任務(wù)對(duì)象有一個(gè)方法:

r.ready()   # 查看任務(wù)狀態(tài),返回布爾值, 任務(wù)執(zhí)行完成, 返回 True, 否則返回 False.
r.wait()   # 等待任務(wù)完成, 返回任務(wù)執(zhí)行結(jié)果,很少使用;
r.get(timeout=1)    # 獲取任務(wù)執(zhí)行結(jié)果,可以設(shè)置等待時(shí)間
r.result   # 任務(wù)執(zhí)行結(jié)果.
r.state    # PENDING, START, SUCCESS,任務(wù)當(dāng)前的狀態(tài)
r.status   # PENDING, START, SUCCESS,任務(wù)當(dāng)前的狀態(tài)
r.successful # 任務(wù)成功返回true
r.traceback # 如果任務(wù)拋出了一個(gè)異常,你也可以獲取原始的回溯信息

定時(shí)任務(wù)

定時(shí)任務(wù)的功能類似 crontab,可以完成每日統(tǒng)計(jì)任務(wù)等。首先我們需要配置一下 schedule,通過(guò)改造上面的配置文件,添加 CELERYBEAT_SCHEDULE 配置:

import datetime
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
  'task1-every-1-min': {
    'task': 'celery_app.task1.add',
    'schedule': datetime.timedelta(seconds=60),
    'args': (2, 15),
  },
  'task2-once-a-day': {
    'task': 'celery_app.task2.mul',
    'schedule': crontab(hour=15, minute=23),
    'args': (3, 6),
  }
}

task 指定要執(zhí)行的任務(wù);schedule 表示計(jì)劃的時(shí)間,datetime.timedelta(seconds=60) 表示間隔一分鐘,這里其實(shí)也可以是 crontab(minute='*/1') 來(lái)替換;args 表示要傳遞的參數(shù)。

啟動(dòng) celery beat:

$ celery worker -A celery_app -l info

詳解分布式任務(wù)隊(duì)列Celery使用說(shuō)明

我們目前是用兩個(gè)窗口來(lái)執(zhí)行 woker 和 beat 。當(dāng)然也可以只使用一個(gè)窗口來(lái)運(yùn)行(僅限linux系統(tǒng)):

$ celery -B -A celery_app worker -l info

celery.task 裝飾器

@celery.task()
def name():
  pass

task() 方法將任務(wù)修飾成異步, name 可以顯示指定的任務(wù)名字;serializer 指定序列化的方式;bind 一個(gè)bool值,若為True,則task實(shí)例會(huì)作為第一個(gè)參數(shù)傳遞到任務(wù)方法中,可以訪問(wèn)task實(shí)例的所有的屬性,即前面反序列化中那些屬性。

@task(bind=True) # 第一個(gè)參數(shù)是self,使用self.request訪問(wèn)相關(guān)的屬性
def add(self, x, y):
  logger.info(self.request.id)

base 可以指定任務(wù)積累,可以用來(lái)定義回調(diào)函數(shù):

import celery

class MyTask(celery.Task):
  # 任務(wù)失敗時(shí)執(zhí)行
  def on_failure(self, exc, task_id, args, kwargs, einfo):
    print('{0!r} failed: {1!r}'.format(task_id, exc))
  # 任務(wù)成功時(shí)執(zhí)行
  def on_success(self, retval, task_id, args, kwargs):
    pass
  # 任務(wù)重試時(shí)執(zhí)行
  def on_retry(self, exc, task_id, args, kwargs, einfo):
    pass

@task(base=MyTask)
def add(x, y):
  raise KeyError()

exc:失敗時(shí)的錯(cuò)誤的類型;
task_id:任務(wù)的id;
args:任務(wù)函數(shù)的參數(shù);
kwargs:參數(shù);
einfo:失敗時(shí)的異常詳細(xì)信息;
retval:任務(wù)成功執(zhí)行的返回值;

總結(jié)

網(wǎng)上找了一份比較常用的配置文件,需要的時(shí)候可以參考下:

# 注意,celery4版本后,CELERY_BROKER_URL改為BROKER_URL
BROKER_URL = 'amqp://username:passwd@host:port/虛擬主機(jī)名'
# 指定結(jié)果的接受地址
CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db'
# 指定任務(wù)序列化方式
CELERY_TASK_SERIALIZER = 'msgpack' 
# 指定結(jié)果序列化方式
CELERY_RESULT_SERIALIZER = 'msgpack'
# 任務(wù)過(guò)期時(shí)間,celery任務(wù)執(zhí)行結(jié)果的超時(shí)時(shí)間
CELERY_TASK_RESULT_EXPIRES = 60 * 20  
# 指定任務(wù)接受的序列化類型.
CELERY_ACCEPT_CONTENT = ["msgpack"]  
# 任務(wù)發(fā)送完成是否需要確認(rèn),這一項(xiàng)對(duì)性能有一點(diǎn)影響   
CELERY_ACKS_LATE = True 
# 壓縮方案選擇,可以是zlib, bzip2,默認(rèn)是發(fā)送沒(méi)有壓縮的數(shù)據(jù)
CELERY_MESSAGE_COMPRESSION = 'zlib' 
# 規(guī)定完成任務(wù)的時(shí)間
CELERYD_TASK_TIME_LIMIT = 5 # 在5s內(nèi)完成任務(wù),否則執(zhí)行該任務(wù)的worker將被殺死,任務(wù)移交給父進(jìn)程
# celery worker的并發(fā)數(shù),默認(rèn)是服務(wù)器的內(nèi)核數(shù)目,也是命令行-c參數(shù)指定的數(shù)目
CELERYD_CONCURRENCY = 4 
# celery worker 每次去rabbitmq預(yù)取任務(wù)的數(shù)量
CELERYD_PREFETCH_MULTIPLIER = 4 
# 每個(gè)worker執(zhí)行了多少任務(wù)就會(huì)死掉,默認(rèn)是無(wú)限的
CELERYD_MAX_TASKS_PER_CHILD = 40 
# 這是使用了django-celery默認(rèn)的數(shù)據(jù)庫(kù)調(diào)度模型,任務(wù)執(zhí)行周期都被存在你指定的orm數(shù)據(jù)庫(kù)中
# CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
# 設(shè)置默認(rèn)的隊(duì)列名稱,如果一個(gè)消息不符合其他的隊(duì)列就會(huì)放在默認(rèn)隊(duì)列里面,如果什么都不設(shè)置的話,數(shù)據(jù)都會(huì)發(fā)送到默認(rèn)的隊(duì)列中
CELERY_DEFAULT_QUEUE = "default"
# 設(shè)置詳細(xì)的隊(duì)列
CELERY_QUEUES = {
  "default": { # 這是上面指定的默認(rèn)隊(duì)列
    "exchange": "default",
    "exchange_type": "direct",
    "routing_key": "default"
  },
  "topicqueue": { # 這是一個(gè)topic隊(duì)列 凡是topictest開頭的routing key都會(huì)被放到這個(gè)隊(duì)列
    "routing_key": "topic.#",
    "exchange": "topic_exchange",
    "exchange_type": "topic",
  },
  "task_eeg": { # 設(shè)置扇形交換機(jī)
    "exchange": "tasks",
    "exchange_type": "fanout",
    "binding_key": "tasks",
  },

}

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持億速云。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI