溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

python多進(jìn)程中的生產(chǎn)者和消費(fèi)者模型怎么實(shí)現(xiàn)

發(fā)布時(shí)間:2023-03-25 13:57:47 來(lái)源:億速云 閱讀:120 作者:iii 欄目:開(kāi)發(fā)技術(shù)

這篇文章主要介紹了python多進(jìn)程中的生產(chǎn)者和消費(fèi)者模型怎么實(shí)現(xiàn)的相關(guān)知識(shí),內(nèi)容詳細(xì)易懂,操作簡(jiǎn)單快捷,具有一定借鑒價(jià)值,相信大家閱讀完這篇python多進(jìn)程中的生產(chǎn)者和消費(fèi)者模型怎么實(shí)現(xiàn)文章都會(huì)有所收獲,下面我們一起來(lái)看看吧。

Python生產(chǎn)者消費(fèi)者模型

一、消費(fèi)模式

生產(chǎn)者消費(fèi)者模式 是Controlnet網(wǎng)絡(luò)中特有的一種傳輸數(shù)據(jù)的模式。用于兩個(gè)CPU之間傳輸數(shù)據(jù),即使是不同類(lèi)型同一廠家的CPU也可以通過(guò)設(shè)置來(lái)使用。

二、傳輸原理

  • 類(lèi)似與點(diǎn)對(duì)點(diǎn)傳送,又略有不同,一個(gè)生產(chǎn)者可以對(duì)應(yīng)N個(gè)消費(fèi)者,但是一個(gè)消費(fèi)者只能對(duì)應(yīng)一個(gè)生產(chǎn)者;

  • 每個(gè)生產(chǎn)者消費(fèi)者對(duì)應(yīng)一個(gè)地址,占一個(gè)網(wǎng)絡(luò)節(jié)點(diǎn),屬于預(yù)定性數(shù)據(jù),在網(wǎng)絡(luò)中優(yōu)先級(jí)最高;

  • 此模式如果在網(wǎng)絡(luò)中設(shè)置過(guò)多會(huì)影響網(wǎng)絡(luò)傳輸速度,一般用在傳輸比較重要的信息上,比如設(shè)備的啟動(dòng)、停止、故障、急停等等;

  • 在Controlnet網(wǎng)絡(luò)中節(jié)點(diǎn)數(shù)是有限制的,最高節(jié)點(diǎn)數(shù)為99。

  • 如果兩個(gè)控制器之前建立了多個(gè)生產(chǎn)者消費(fèi)者的連接,只要一個(gè)失敗,則所有的均失敗,將數(shù)據(jù)整合到用戶(hù)自定義結(jié)構(gòu)或數(shù)組中 ,兩個(gè)控制器中只保留一個(gè)連接。

  • 生產(chǎn)者消費(fèi)者信息可以通過(guò)以太網(wǎng)和Controlnet傳輸,但是同時(shí)只能通過(guò)一種途徑傳輸;

  • 建立標(biāo)簽時(shí)必須建立在全局變量里面,不能建立在局部變量里標(biāo)簽的大小不能超過(guò)500B;

  • 如果生產(chǎn)者幾個(gè)數(shù)據(jù)傳輸?shù)降酵粋€(gè)控制器的的幾個(gè)消費(fèi)者中,將幾個(gè)數(shù)據(jù)合并在一個(gè)用戶(hù)自定義標(biāo)簽中,可以減少連接數(shù),但合并后的數(shù)據(jù)將會(huì)會(huì)用相同的RPI。

  • 生產(chǎn)者消費(fèi)者標(biāo)簽只能用DINT和REAL,或它們的數(shù)組,或用戶(hù)自定義結(jié)構(gòu)數(shù)據(jù),因?yàn)閷?duì)外操作數(shù)據(jù)必須是32位的,如果有SINT和INT的數(shù)據(jù)要傳輸,必須將它們組合在用戶(hù)自定義結(jié)構(gòu)中傳送,生產(chǎn)者和消費(fèi)者的標(biāo)簽數(shù)據(jù)格式必須一致,才能確保數(shù)據(jù)的準(zhǔn)確性,如果數(shù)據(jù)打包后超過(guò)了 32位,那么生產(chǎn)者和消費(fèi)者雙方必須使用一個(gè)復(fù)制緩沖指令,以獲得數(shù)據(jù)的同步,例如Control Logix中的CPS指令。

  • 如果生產(chǎn)者要發(fā)送的32位數(shù)據(jù),與非Control Logix的對(duì)方設(shè)備的數(shù)據(jù)結(jié)構(gòu)不匹配,例如對(duì)方是16位的數(shù)據(jù),為避免偏差,改為用戶(hù)自定義結(jié)構(gòu)。

  • 消費(fèi)者的 RPI必須大于等于網(wǎng)絡(luò)刷新時(shí)間NUT,如果幾個(gè)消費(fèi)者請(qǐng)求同一個(gè)生產(chǎn)者,則會(huì)以最小最快的RPI為準(zhǔn)。

python多進(jìn)程中的生產(chǎn)者和消費(fèi)者模型怎么實(shí)現(xiàn)

三、實(shí)現(xiàn)方式

方法一:

import threading,queue,time
# 創(chuàng)建一個(gè)隊(duì)列,隊(duì)列最大長(zhǎng)度為2
q = queue.Queue(maxsize=2)
def product():
    while True:
        # 生產(chǎn)者往隊(duì)列塞數(shù)據(jù)
        q.put('money')
        print('生產(chǎn)了money, 生產(chǎn)時(shí)間:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
def consume():
    while True:
        time.sleep(0.5)
        # 消費(fèi)者取出數(shù)據(jù)
        data = q.get()
        print('消費(fèi)了%s, 消費(fèi)時(shí)間%s' % (data, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())))
t = threading.Thread(target=product)
t1 = threading.Thread(target=consume)
t.start()
t1.start()

缺點(diǎn):

實(shí)現(xiàn)了多少個(gè)消費(fèi)者consumer進(jìn)程,就需要在最后往隊(duì)列中添加多少個(gè)None標(biāo)識(shí),方便生產(chǎn)完畢結(jié)束消費(fèi)者consumer進(jìn)程。否則,p.get() 不到任務(wù)會(huì)阻塞子進(jìn)程,因?yàn)閣hile循環(huán),直到隊(duì)列q中有新的任務(wù)加進(jìn)來(lái),才會(huì)再次執(zhí)行。而我們的生產(chǎn)者只能生產(chǎn)這么多東西,所以相當(dāng)于程序卡死。

方法二:

from multiprocessing import JoinableQueue,Process
import time
def producer(q):
    for i in range(4):
        time.sleep(0.5)
        f = '生產(chǎn)者:已經(jīng)生產(chǎn)'
        q.put(f)
        print(f)
    q.join()  # 一直阻塞,等待消耗完所有的數(shù)據(jù)后才釋放
def consumer(name, q):
    while True:
        food = q.get()
        print('\033[消費(fèi)者:消費(fèi)了%s\033' % name)
        time.sleep(0.5)
        q.task_done()  # 每次消耗減1
if __name__ == '__main__':
    q = JoinableQueue()  # 創(chuàng)建隊(duì)列
    # 模擬生產(chǎn)者隊(duì)列
    p1 = Process(target=producer, args=(q, ))
    p1.start()
    # 模擬消費(fèi)者隊(duì)列
    c1 = Process(target=consumer, args=('money', q))
    c1.daemon = True  # 守護(hù)進(jìn)程:主進(jìn)程結(jié)束,子進(jìn)程也會(huì)結(jié)束
    c1.start()
    p1.join()  # 阻塞主進(jìn)程,等到p1子進(jìn)程結(jié)束才往下執(zhí)行

優(yōu)點(diǎn):

  • 使用JoinableQueue組件,是因?yàn)镴oinableQueue中有兩個(gè)方法:task_done()和join() 。首先說(shuō)join()和Process中的join()的效果類(lèi)似,都是阻塞當(dāng)前進(jìn)程,防止當(dāng)前進(jìn)程結(jié)束。但是JoinableQueue的join()是和task_down()配合使用的。

  • Process中的join()是等到子進(jìn)程中的代碼執(zhí)行完畢,就會(huì)執(zhí)行主進(jìn)程join()下面的代碼。而JoinableQueue中的join()是等到隊(duì)列中的任務(wù)數(shù)量為0的時(shí)候才會(huì)執(zhí)行q.join()下面的代碼,否則會(huì)一直阻塞。

  • task_down()方法是每獲取一次隊(duì)列中的任務(wù),就需要執(zhí)行一次。直到隊(duì)列中的任務(wù)數(shù)為0的時(shí)候,就會(huì)執(zhí)行JoinableQueue的join()后面的方法了。所以生產(chǎn)者生產(chǎn)完所有的數(shù)據(jù)后,會(huì)一直阻塞著。不讓p1和p2進(jìn)程結(jié)束。等到消費(fèi)者get()一次數(shù)據(jù),就會(huì)執(zhí)行一次task_down()方法,從而隊(duì)列中的任務(wù)數(shù)量減1,當(dāng)數(shù)量為0后,執(zhí)行JoinableQueue的join()后面代碼,從而p1和p2進(jìn)程結(jié)束。

  • 因?yàn)閜1和p2添加了join()方法,所以當(dāng)子進(jìn)程中的consumer方法執(zhí)行完后,才會(huì)往下執(zhí)行。從而主進(jìn)程結(jié)束。因?yàn)檫@里把消費(fèi)者進(jìn)程c1和c2 設(shè)置成了守護(hù)進(jìn)程,主進(jìn)程結(jié)束的同時(shí),c1和c2 進(jìn)程也會(huì)隨之結(jié)束,進(jìn)程都結(jié)束了。所以消費(fèi)者consumer方法也會(huì)結(jié)束。

關(guān)于“python多進(jìn)程中的生產(chǎn)者和消費(fèi)者模型怎么實(shí)現(xiàn)”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對(duì)“python多進(jìn)程中的生產(chǎn)者和消費(fèi)者模型怎么實(shí)現(xiàn)”知識(shí)都有一定的了解,大家如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI