溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Python生產(chǎn)者與消費(fèi)者模型中的優(yōu)勢(shì)有哪些

發(fā)布時(shí)間:2023-03-25 13:58:45 來(lái)源:億速云 閱讀:87 作者:iii 欄目:開(kāi)發(fā)技術(shù)

這篇文章主要介紹“Python生產(chǎn)者與消費(fèi)者模型中的優(yōu)勢(shì)有哪些”的相關(guān)知識(shí),小編通過(guò)實(shí)際案例向大家展示操作過(guò)程,操作方法簡(jiǎn)單快捷,實(shí)用性強(qiáng),希望這篇“Python生產(chǎn)者與消費(fèi)者模型中的優(yōu)勢(shì)有哪些”文章能幫助大家解決問(wèn)題。

生產(chǎn)者消費(fèi)者模型具體來(lái)講,就是在一個(gè)系統(tǒng)中,存在生產(chǎn)者和消費(fèi)者兩種角色,他們通過(guò)內(nèi)存緩沖區(qū)進(jìn)行通信,生產(chǎn)者生產(chǎn)消費(fèi)者需要的資料,消費(fèi)者把資料做成產(chǎn)品,從而消耗掉生產(chǎn)的數(shù)據(jù)。達(dá)到供需平衡,不能生產(chǎn)多了浪費(fèi),也不能需要消耗資源的時(shí)候沒(méi)有。

multiprocessing-Queue實(shí)現(xiàn)

from  multiprocessing import Process,Queue  #多進(jìn)程組件,隊(duì)列
import time,random  
#生產(chǎn)者方法
def producer(name,food,q):
    for i in  range(4):
        time.sleep(random.randint(1,3)) #模擬獲取數(shù)據(jù)時(shí)間
        f = '%s生產(chǎn)的%s%s'%(name,food,i)
        print(f)
        q.put(f) #添加進(jìn)隊(duì)列
#消費(fèi)者方法
def consumer(q,name):
    while True:
        food = q.get() #如果獲取不到,會(huì)一直阻塞進(jìn)程不會(huì)結(jié)束子進(jìn)程
        # 當(dāng)隊(duì)列中的數(shù)據(jù)是None的時(shí)候結(jié)束while循環(huán)
        if food is None:
            print('%s獲取到一個(gè)空'%name)
            break
        f = '\033[31m%s消費(fèi)了%s\033[0m' % (name, food)
        print(f)
        time.sleep(random.randint(1,3)) # 模擬消耗數(shù)據(jù)時(shí)間
if __name__ == '__main__':
    q = Queue()  # 創(chuàng)建隊(duì)列
    # 模擬生產(chǎn)者 生產(chǎn)數(shù)據(jù)
    p = Process(target=producer, args=('p', '包子', q)) #創(chuàng)建進(jìn)程
    p.start() #啟動(dòng)進(jìn)程
    p1 = Process(target=producer, args=('p1', '燒餅', q))
    p1.start()
   #模擬消費(fèi)者消費(fèi)數(shù)據(jù)
    c = Process(target=consumer, args=(q, 'c'))
    c.start()
    c1 = Process(target=consumer, args=(q, 'c1'))
    c1.start()
    p.join()#阻塞主進(jìn)程 直到p和p1 子進(jìn)程結(jié)束后才執(zhí)行q.put() 方法
    p1.join()#阻塞主進(jìn)程 直到p和p1 子進(jìn)程結(jié)束后才執(zhí)行q.put() 方法
    #為了確保生產(chǎn)者生產(chǎn)完所有數(shù)據(jù)后,
    #最后一個(gè)是None,方便結(jié)束子進(jìn)程中的while循環(huán),
    #否則會(huì)一直等待隊(duì)列中加入新數(shù)據(jù)。
    q.put(None)
    q.put(None)

使用Queue組件實(shí)現(xiàn)的缺點(diǎn)就是,實(shí)現(xiàn)了多少個(gè)消費(fèi)者consumer進(jìn)程,就需要在最后往隊(duì)列中添加多少個(gè)None標(biāo)識(shí),方便生產(chǎn)完畢結(jié)束消費(fèi)者consumer進(jìn)程。否則,p.get() 不到任務(wù)會(huì)阻塞子進(jìn)程,因?yàn)?code>while循環(huán),直到隊(duì)列q中有新的任務(wù)加進(jìn)來(lái),才會(huì)再次執(zhí)行。而我們的生產(chǎn)者只能生產(chǎn)這么多東西,所以相當(dāng)于程序卡死。

multiprocessing-JoinableQueue實(shí)現(xiàn)

from multiprocessing import JoinableQueue,Process
import time,random
#生產(chǎn)者方法
def producer(name,food,q):
    for i in range(4):
        time.sleep(random.randint(1, 2))
        f = '%s生產(chǎn)的%s%s'%(name,food,i)
        q.put(f)
        print(f)
    q.join()  #一直阻塞,等待消耗完所有的數(shù)據(jù)后才釋放
#消費(fèi)者方法
def consumer(name,q):
    while True:
        food = q.get()
        print('\033[31m%s消費(fèi)了%s\033[0m' % (name, food))
        time.sleep(random.randint(4,8))
        q.task_done() #每次消耗減1
if __name__ == '__main__':
    q = JoinableQueue()  #創(chuàng)建隊(duì)列
    #模擬生產(chǎn)者隊(duì)列
    p1 = Process(target=producer,args=('p1','包子',q))
    p1.start()
    p2 = Process(target=producer,args=('p2','燒餅',q))
    p2.start()
    #模擬消費(fèi)者隊(duì)列
    c1 = Process(target=consumer,args=('c1',q))
    c1.daemon = True #守護(hù)進(jìn)程:主進(jìn)程結(jié)束,子進(jìn)程也會(huì)結(jié)束
    c1.start()
    c2 = Process(target=consumer,args=('c2',q))
    c2.daemon = True
    c2.start()
    p1.join() #阻塞主進(jìn)程,等到p1子進(jìn)程結(jié)束才往下執(zhí)行
    p2.join()
    # q.task_done() 每次消耗隊(duì)列中的 任務(wù)數(shù)減1
    # q.join() 一直阻塞,等待隊(duì)列中的任務(wù)數(shù)消耗完才釋放
    # 因?yàn)橛?nbsp;q.join 所有一直會(huì)等待 c1,c2 消耗完畢。才會(huì)執(zhí)行 p.join 后面的代碼
    # 因?yàn)?nbsp;c1 c2 是守護(hù)進(jìn)程,所以到這一步主進(jìn)程代碼執(zhí)行完畢,主進(jìn)程會(huì)釋放死掉,
    # 所以 c1 c2 也會(huì)跟隨 主進(jìn)程釋放死掉。

使用JoinableQueue組件,是因?yàn)?code>JoinableQueue中有兩個(gè)方法:task_done()join() 。首先說(shuō)join()Process中的join()的效果類(lèi)似,都是阻塞當(dāng)前進(jìn)程,防止當(dāng)前進(jìn)程結(jié)束。但是JoinableQueuejoin()是和task_down()配合使用的。
Process中的join()是等到子進(jìn)程中的代碼執(zhí)行完畢,就會(huì)執(zhí)行主進(jìn)程join()下面的代碼。而JoinableQueue中的join()是等到隊(duì)列中的任務(wù)數(shù)量為0的時(shí)候才會(huì)執(zhí)行q.join()下面的代碼,否則會(huì)一直阻塞。
task_down()方法是每獲取一次隊(duì)列中的任務(wù),就需要執(zhí)行一次。直到隊(duì)列中的任務(wù)數(shù)為0的時(shí)候,就會(huì)執(zhí)行JoinableQueuejoin()后面的方法了。所以生產(chǎn)者生產(chǎn)完所有的數(shù)據(jù)后,會(huì)一直阻塞著。不讓p1p2進(jìn)程結(jié)束。等到消費(fèi)者get()一次數(shù)據(jù),就會(huì)執(zhí)行一次task_down()方法,從而隊(duì)列中的任務(wù)數(shù)量減1,當(dāng)數(shù)量為0后,執(zhí)行JoinableQueuejoin()后面代碼,從而p1p2進(jìn)程結(jié)束。
因?yàn)?code>p1和p2添加了join()方法,所以當(dāng)子進(jìn)程中的consumer方法執(zhí)行完后,才會(huì)往下執(zhí)行。從而主進(jìn)程結(jié)束。因?yàn)檫@里把消費(fèi)者進(jìn)程c1c2 設(shè)置成了守護(hù)進(jìn)程,主進(jìn)程結(jié)束的同時(shí),c1c2 進(jìn)程也會(huì)隨之結(jié)束,進(jìn)程都結(jié)束了。所以消費(fèi)者consumer方法也會(huì)結(jié)束。

關(guān)于“Python生產(chǎn)者與消費(fèi)者模型中的優(yōu)勢(shì)有哪些”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí),可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會(huì)為大家更新不同的知識(shí)點(diǎn)。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI