溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何分析Python并行分布式框架中的Celery

發(fā)布時(shí)間:2021-12-02 17:30:50 來源:億速云 閱讀:138 作者:柒染 欄目:云計(jì)算

如何分析Python并行分布式框架中的Celery,相信很多沒有經(jīng)驗(yàn)的人對(duì)此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。

Celery (芹菜)是基于Python開發(fā)的分布式任務(wù)隊(duì)列。它支持使用任務(wù)隊(duì)列的方式在分布的機(jī)器/進(jìn)程/線程上執(zhí)行任務(wù)調(diào)度。

架構(gòu)設(shè)計(jì)

Celery的架構(gòu)由三部分組成,消息中間件(message broker),任務(wù)執(zhí)行單元(worker)和任務(wù)執(zhí)行結(jié)果存儲(chǔ)(task result store)組成。

  • 消息中間件

    Celery本身不提供消息服務(wù),但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, RedisMongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ

  • 任務(wù)執(zhí)行單元

    Worker是Celery提供的任務(wù)執(zhí)行的單元,worker并發(fā)的運(yùn)行在分布式的系統(tǒng)節(jié)點(diǎn)中。

  • 任務(wù)結(jié)果存儲(chǔ)

    Task result store用來存儲(chǔ)Worker執(zhí)行的任務(wù)的結(jié)果,Celery支持以不同方式存儲(chǔ)任務(wù)的結(jié)果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache

另外, Celery還支持不同的并發(fā)和序列化的手段

  • 并發(fā)

    Prefork, Eventlet, gevent, threads/single threaded

  • 序列化

    picklejsonyamlmsgpackzlibbzip2 compression, Cryptographic message signing 等等

安裝和運(yùn)行

Celery的安裝過程略為復(fù)雜,下面的安裝過程是基于我的AWS EC2的Linux版本的安裝過程,不同的系統(tǒng)安裝過程可能會(huì)有差異。大家可以參考官方文檔。

首先我選擇RabbitMQ作為消息中間件,所以要先安裝RabbitMQ。作為安裝準(zhǔn)備,先更新YUM。

sudo yum -y update

RabbitMQ是基于erlang的,所以先安裝erlang

# Add and enable relevant application repositories:
# Note: We are also enabling third party remi package repositories.
wget http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
wget http://rpms.famillecollet.com/enterprise/remi-release-6.rpm
sudo rpm -Uvh remi-release-6*.rpm epel-release-6*.rpm

# Finally, download and install Erlang:
yum install -y erlang

然后安裝RabbitMQ

# Download the latest RabbitMQ package using wget:
wget  
# Add the necessary keys for verification:
rpm --import  
# Install the .RPM package using YUM:
yum install rabbitmq-server-3.2.2-1.noarch.rpm

啟動(dòng)RabbitMQ服務(wù)

rabbitmq-server start

RabbitMQ服務(wù)已經(jīng)準(zhǔn)備好了,然后安裝Celery, 假定你使用pip來管理你的python安裝包

pip install Celery

為了測(cè)試Celery是否工作,我們運(yùn)行一個(gè)最簡(jiǎn)單的任務(wù),編寫tasks.py

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'

@app.task
def add(x, y):
    return x + y

在當(dāng)前目錄運(yùn)行一個(gè)worker,用來執(zhí)行這個(gè)加法的task

celery -A tasks worker --loglevel=info

其中-A參數(shù)表示的是Celery App的名字。注意這里我使用的是SQLAlchemy作為結(jié)果存儲(chǔ)。對(duì)應(yīng)的python包要事先安裝好。

worker日志中我們會(huì)看到這樣的信息

- ** ---------- [config]
- ** ---------- .> app:         tasks:0x1e68d50
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     db+sqlite:///results.sqlite
- *** --- * --- .> concurrency: 8 (prefork)

其中,我們可以看到worker缺省使用prefork來執(zhí)行并發(fā),并設(shè)置并發(fā)數(shù)為8

下面的任務(wù)執(zhí)行的客戶端代碼:

from tasks import add
import time
result = add.delay(4,4)

while not result.ready():
  print "not ready yet"
  time.sleep(5)

print result.get()

用python執(zhí)行這段客戶端代碼,在客戶端,結(jié)果如下

not ready   
8

Work日志顯示

[2015-03-12 02:54:07,973: INFO/MainProcess] Received task: tasks.add[34c4210f-1bc5-420f-a421-1500361b914f]
[2015-03-12 02:54:08,006: INFO/MainProcess] Task tasks.add[34c4210f-1bc5-420f-a421-1500361b914f] succeeded in 0.0309705100954s: 8

這里我們可以發(fā)現(xiàn),每一個(gè)task有一個(gè)唯一的ID,task異步執(zhí)行在worker上。

這里要注意的是,如果你運(yùn)行官方文檔中的例子,你是無法在客戶端得到結(jié)果的,這也是我為什么要使用SQLAlchemy來存儲(chǔ)任務(wù)執(zhí)行結(jié)果的原因。官方的例子使用AMPQ,有可能Worker在打印日志的時(shí)候取出了task的運(yùn)行結(jié)果顯示在worker日志中,然而AMPQ作為一個(gè)消息隊(duì)列,當(dāng)消息被取走后,隊(duì)列中就沒有了,于是客戶端總是無法得到任務(wù)的執(zhí)行結(jié)果。不知道為什么官方文檔對(duì)這樣的錯(cuò)誤視而不見。

看完上述內(nèi)容,你們掌握如何分析Python并行分布式框架中的Celery的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI