queue 二、隊(duì)列(Queue) ..."/>
您好,登錄后才能下訂單哦!
一、進(jìn)程間通信
IPC(Inter-Process Communication)
IPC機(jī)制:實(shí)現(xiàn)進(jìn)程之間通訊
管道:pipe 基于共享的內(nèi)存空間
隊(duì)列:pipe+鎖的概念--->queue
二、隊(duì)列(Queue)
2.1 概念-----multiProcess.Queue
創(chuàng)建共享的進(jìn)程隊(duì)列,Queue是多進(jìn)程安全的隊(duì)列,可以使用Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。
Queue([maxsize])
創(chuàng)建共享的進(jìn)程隊(duì)列。
參數(shù) :maxsize是隊(duì)列中允許的最大項(xiàng)數(shù)。如果省略此參數(shù),則無(wú)大小限制。
底層隊(duì)列使用管道和鎖定實(shí)現(xiàn)。
2.2 Queue方法使用
2.2.1 q.get的使用:
是從隊(duì)列里面取值并且把隊(duì)列面的取出來的值刪掉,沒有參數(shù)的情況下就是是默認(rèn)一直等著取值
就算是隊(duì)列里面沒有可取的值的時(shí)候,程序也不會(huì)結(jié)束,就會(huì)卡在哪里,一直等著
from multiprocessing import Queue q = Queue() # 生成一個(gè)隊(duì)列對(duì)象 # put方法是往隊(duì)列里面放值 q.put('Cecilia陳') q.put('xuchen') q.put('喜陳') # get方法是從隊(duì)列里面取值 print(q.get()) print(q.get()) print(q.get()) q.put(5) q.put(6) print(q.get())
Cecilia陳
xuchen
喜陳
5
2.2.2 Queue(參數(shù)) +參數(shù)的使用:
Queue加參數(shù)以后,參數(shù)是數(shù)值
參數(shù)實(shí)幾就表示實(shí)例化的這個(gè)Queue隊(duì)列可以放幾個(gè)值
當(dāng)隊(duì)列已經(jīng)滿的時(shí)候,再放值,程序會(huì)阻塞,但不會(huì)結(jié)束
from multiprocessing import Queue q = Queue(3) q.put('Cecilia陳') q.put('xuchen') q.put('喜陳') print(q.full()) # 判斷隊(duì)列是否滿了 返回的是True/False q.put(2) # 當(dāng)隊(duì)列已經(jīng)滿的時(shí)候,再放值,程序會(huì)阻塞,但不會(huì)結(jié)束
True 隊(duì)列已經(jīng)滿了
2.2.3 q.put(參數(shù)1,參數(shù)2,參數(shù)3,參數(shù)4):
q.put(self, obj, block=True, timeout=None)
self :put就相當(dāng)于是Queue里的一個(gè)方法,這個(gè)時(shí)候q.put就相當(dāng)于是隊(duì)列對(duì)象q來調(diào)用對(duì)象的綁定方法,這個(gè)參數(shù)可以省略即可
obj:是我們需要往隊(duì)列里面放的值
block=True :隊(duì)列如果滿了的話,再往隊(duì)列里放值的話會(huì)等待,程序不會(huì)結(jié)束
timeout=None:是再block這個(gè)參數(shù)的基礎(chǔ)上的,當(dāng)block的值為真的時(shí)候,timeout是用來等待多少秒,如果再這個(gè)時(shí)間里,隊(duì)列一直是滿的,那么程序就會(huì)報(bào)錯(cuò)并結(jié)束(Queue.Full異常)
from multiprocessing import Queue q = Queue(3) q.put('zhao',block=True,timeout=2) q.put('zhao',block=True,timeout=2) q.put('zhao',block=True,timeout=2) q.put('zhao',block=True,timeout=5) # 此時(shí)程序?qū)?duì)等待5秒以后報(bào)錯(cuò)了
2.2.4 q.get(參數(shù)1,參數(shù)2,參數(shù)3,參數(shù)4):
q.get(self,block=True, timeout=None)
self :get就相當(dāng)于是Queue里的一個(gè)方法,這個(gè)時(shí)候q.get就相當(dāng)于是隊(duì)列對(duì)象q來調(diào)用對(duì)象的綁定方法,這個(gè)參數(shù)可以省略即可
block=True :從隊(duì)列q對(duì)象里面取值,如果娶不到值的話,程序不會(huì)結(jié)束
timeout=None:是再block這個(gè)參數(shù)的基礎(chǔ)上的,當(dāng)block的值為真的時(shí)候,timeout是用來等待多少秒,如果再這個(gè)時(shí)間里,get取不到隊(duì)列里面的值的話,那么程序就會(huì)報(bào)錯(cuò)并結(jié)束(queue.Empty異常)
from multiprocessing import Queue q = Queue() q.put('Cecilia陳') print(q.get()) q.get(block=True,timeout=2) # 此時(shí)程序會(huì)等待2秒后,報(bào)錯(cuò)了,隊(duì)列里面沒有值了
2.2.5 block=False:
如果block的值是False的話,那么put方法再隊(duì)列是滿的情況下,不會(huì)等待阻塞,程序直接報(bào)錯(cuò)(Queue.Full異常)結(jié)束
如果block的值是False的話,那么get方法再隊(duì)列里面沒有值的情況下,再去取的時(shí)候,不會(huì)等待阻塞,程序直接報(bào)錯(cuò)(queue.Empty異常)結(jié)束
1.put()的block=False
from multiprocessing import Queue q = Queue(2) q.put('Cecilia陳') q.put('喜陳') print(q.full()) q.put('xichen',block=False) # 隊(duì)列已經(jīng)滿了,我不等待了,直接報(bào)錯(cuò)
2.get()的block=Flase
from multiprocessing import Queue q = Queue(2) q.put('Cecilia陳') q.put('喜陳') print(q.get()) print(q.get()) print(q.get(block=False)) # 隊(duì)列已經(jīng)沒有值了,我不等待了,直接報(bào)錯(cuò)
2.2.6 put_nowait()/get_nowait()
1.put_nowait() 相當(dāng)于bolok=False,隊(duì)列滿的時(shí)候,再放值的時(shí)候,程序不等待,不阻塞,直接報(bào)錯(cuò)
from multiprocessing import Queue q = Queue(2) q.put('Cecilia陳') q.put('喜陳') print(q.full()) q.put_nowait('xichen') # 程序不等待,不阻塞,直接報(bào)錯(cuò)
2.get_nowait() 相當(dāng)于bolok=False,當(dāng)隊(duì)列里沒有值的時(shí)候,再取值的時(shí)候,程序不等待,不阻塞,程序直接報(bào)錯(cuò)
from multiprocessing import Queue q = Queue(2) q.put('Cecilia陳') q.put('喜陳') print(q.get()) print(q.get()) print(q.full()) q.get_nowait()# 再取值的時(shí)候,程序不等待,不阻塞,程序直接報(bào)錯(cuò)
三、代碼實(shí)例
3.1 單看隊(duì)列的存取數(shù)據(jù)用法
這個(gè)例子還沒有加入進(jìn)程通信,只是先來看看隊(duì)列為我們提供的方法,以及這些方法的使用和現(xiàn)象。
''' multiprocessing模塊支持進(jìn)程間通信的兩種主要形式:管道和隊(duì)列 都是基于消息傳遞實(shí)現(xiàn)的,但是隊(duì)列接口 ''' from multiprocessing import Queue q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) # q.put(3) # 如果隊(duì)列已經(jīng)滿了,程序就會(huì)停在這里,等待數(shù)據(jù)被別人取走,再將數(shù)據(jù)放入隊(duì)列。 # 如果隊(duì)列中的數(shù)據(jù)一直不被取走,程序就會(huì)永遠(yuǎn)停在這里。 try: q.put_nowait(3) # 可以使用put_nowait,如果隊(duì)列滿了不會(huì)阻塞,但是會(huì)因?yàn)殛?duì)列滿了而報(bào)錯(cuò)。 except: # 因此我們可以用一個(gè)try語(yǔ)句來處理這個(gè)錯(cuò)誤。這樣程序不會(huì)一直阻塞下去,但是會(huì)丟掉這個(gè)消息。 print('隊(duì)列已經(jīng)滿了') # 因此,我們?cè)俜湃霐?shù)據(jù)之前,可以先看一下隊(duì)列的狀態(tài),如果已經(jīng)滿了,就不繼續(xù)put了。 print(q.full()) #滿了 print(q.get()) print(q.get()) print(q.get()) # print(q.get()) # 同put方法一樣,如果隊(duì)列已經(jīng)空了,那么繼續(xù)取就會(huì)出現(xiàn)阻塞。 try: q.get_nowait(3) # 可以使用get_nowait,如果隊(duì)列滿了不會(huì)阻塞,但是會(huì)因?yàn)闆]取到值而報(bào)錯(cuò)。 except: # 因此我們可以用一個(gè)try語(yǔ)句來處理這個(gè)錯(cuò)誤。這樣程序不會(huì)一直阻塞下去。 print('隊(duì)列已經(jīng)空了') print(q.empty()) #空了
3.2 子進(jìn)程向父進(jìn)程發(fā)送數(shù)據(jù)
這是一個(gè)queue的簡(jiǎn)單應(yīng)用,使用隊(duì)列q對(duì)象調(diào)用get函數(shù)來取得隊(duì)列中最先進(jìn)入的數(shù)據(jù)。
from multiprocessing import Process, Queue def f(q,name,age): q.put(name,age) #調(diào)用主函數(shù)中p進(jìn)程傳遞過來的進(jìn)程參數(shù) put函數(shù)為向隊(duì)列中添加一條數(shù)據(jù)。 if __name__ == '__main__': q = Queue() #創(chuàng)建一個(gè)Queue對(duì)象 p = Process(target=f, args=(q,'Cecilia陳',18)) #創(chuàng)建一個(gè)進(jìn)程 p.start() print(q.get()) p.join()
['Cecilia陳', 18]
四、生產(chǎn)者消費(fèi)者模型
生產(chǎn)者: 生產(chǎn)數(shù)據(jù)的任務(wù)
消費(fèi)者: 處理數(shù)據(jù)的任務(wù)
生產(chǎn)者--隊(duì)列(盆)-->消費(fèi)者
生產(chǎn)者可以不停的生產(chǎn),達(dá)到了自己最大的生產(chǎn)效率,消費(fèi)者可以不停的消費(fèi),也達(dá)到了自己最大的消費(fèi)效率.
生產(chǎn)者消費(fèi)者模型大大提高了生產(chǎn)者生產(chǎn)的效率和消費(fèi)者消費(fèi)的效率.
補(bǔ)充: queue不適合傳大文件,通產(chǎn)傳一些消息.
在并發(fā)編程中使用生產(chǎn)者和消費(fèi)者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費(fèi)線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。
4.1 為什么要使用生產(chǎn)者和消費(fèi)者模型
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個(gè)問題于是引入了生產(chǎn)者和消費(fèi)者模式。
4.2 什么是生產(chǎn)者消費(fèi)者模型
生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊(duì)列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。
4.3 基于Queue隊(duì)列實(shí)現(xiàn)的生產(chǎn)者消費(fèi)者模型
from multiprocessing import Queue,Process # 生產(chǎn)者 def producer(q,name,food): for i in range(3): print(f'{name}生產(chǎn)了{(lán)food}{i}') res = f'{food}{i}' q.put(res) # 消費(fèi)者 def consumer(q,name): while True: res = q.get(timeout=5) print(f'{name}吃了{(lán)res}') if __name__ == '__main__': q = Queue() # 為的是讓生產(chǎn)者和消費(fèi)者使用同一個(gè)隊(duì)列,使用同一個(gè)隊(duì)列進(jìn)行通訊 p1 = Process(target=producer,args=(q,'Cecilia陳','巧克力')) c1 = Process(target=consumer,args=(q,'Tom')) p1.start() c1.start()
此時(shí)的問題是主進(jìn)程永遠(yuǎn)不會(huì)結(jié)束,原因是:生產(chǎn)者p在生產(chǎn)完后就結(jié)束了,但是消費(fèi)者c在取空了q之后,則一直處于死循環(huán)中且卡在q.get()這一步。
解決方式無(wú)非是讓生產(chǎn)者在生產(chǎn)完畢后,往隊(duì)列中再發(fā)一個(gè)結(jié)束信號(hào),這樣消費(fèi)者在接收到結(jié)束信號(hào)后就可以break出死循環(huán)。
4.4 改良版----生產(chǎn)者消費(fèi)者模型
注意:結(jié)束信號(hào)None,不一定要由生產(chǎn)者發(fā),主進(jìn)程里同樣可以發(fā),但主進(jìn)程需要等生產(chǎn)者結(jié)束后才應(yīng)該發(fā)送該信號(hào)
from multiprocessing import Queue,Process def producer(q,name,food): for i in range(3): print(f'{name}生產(chǎn)了{(lán)food}{i}') res = f'{food}{i}' q.put(res) q.put(None) # 當(dāng)生產(chǎn)者結(jié)束生產(chǎn)的的時(shí)候,我們?cè)訇?duì)列的最后再做一個(gè)表示,告訴消費(fèi)者,生產(chǎn)者已經(jīng)不生產(chǎn)了,讓消費(fèi)者不要再去隊(duì)列里拿東西了 def consumer(q,name): while True: res = q.get(timeout=5) if res == None:break # 判斷隊(duì)列拿出的是不是生產(chǎn)者放的結(jié)束生產(chǎn)的標(biāo)識(shí),如果是則不取,直接退出,結(jié)束程序 print(f'{name}吃了{(lán)res}') if __name__ == '__main__': q = Queue() # 為的是讓生產(chǎn)者和消費(fèi)者使用同一個(gè)隊(duì)列,使用同一個(gè)隊(duì)列進(jìn)行通訊 p1 = Process(target=producer,args=(q,'Cecilia陳','巧克力')) c1 = Process(target=consumer,args=(q,'Tom')) p1.start() c1.start()
4.5 主進(jìn)程在生產(chǎn)者生產(chǎn)結(jié)束以后,發(fā)送結(jié)束信號(hào)
使用這個(gè)方法的話,是很low的,有幾個(gè)消費(fèi)者就要在主進(jìn)程中向隊(duì)列中put幾個(gè)結(jié)束信號(hào)
from multiprocessing import Queue,Process import time,random def producer(q,name,food): for i in range(3): print(f'{name}生產(chǎn)了{(lán)food}{i}') time.sleep((random.randint(1,3))) res = f'{food}{i}' q.put(res) # q.put(None) # 當(dāng)生產(chǎn)者結(jié)束生產(chǎn)的的時(shí)候,我們?cè)訇?duì)列的最后再做一個(gè)表示,告訴消費(fèi)者,生產(chǎn)者已經(jīng)不生產(chǎn)了,讓消費(fèi)者不要再去隊(duì)列里拿東西了 def consumer(q,name): while True: res = q.get(timeout=5) if res == None:break # 判斷隊(duì)列拿出的是不是生產(chǎn)者放的結(jié)束生產(chǎn)的標(biāo)識(shí),如果是則不取,直接退出,結(jié)束程序 time.sleep((random.randint(1, 3))) print(f'{name}吃了{(lán)res}') if __name__ == '__main__': q = Queue() # 為的是讓生產(chǎn)者和消費(fèi)者使用同一個(gè)隊(duì)列,使用同一個(gè)隊(duì)列進(jìn)行通訊 # 多個(gè)生產(chǎn)者進(jìn)程 p1 = Process(target=producer,args=(q,'Cecilia陳','巧克力')) p2 = Process(target=producer,args=(q,'xichen','冰激凌')) p3 = Process(target=producer,args=(q,'喜陳','可樂')) # 多個(gè)消費(fèi)者進(jìn)程 c1 = Process(target=consumer,args=(q,'Tom')) c2 = Process(target=consumer,args=(q,'jack')) # 告訴操作系統(tǒng)啟動(dòng)生產(chǎn)者進(jìn)程 p1.start() p2.start() p3.start() # 告訴操作系統(tǒng)啟動(dòng)消費(fèi)者進(jìn)程 c1.start() c2.start() p1.join() p2.join() p3.join() q.put(None) # 幾個(gè)消費(fèi)者put幾次 q.put(None)
五、JoinableQueue方法
創(chuàng)建可連接的共享進(jìn)程隊(duì)列。這就像是一個(gè)Queue對(duì)象,但隊(duì)列允許項(xiàng)目的使用者通知生產(chǎn)者項(xiàng)目已經(jīng)被成功處理。通知進(jìn)程是使用共享的信號(hào)和條件變量來實(shí)現(xiàn)的。
5.1 方法介紹
JoinableQueue的實(shí)例p除了與Queue對(duì)象相同的方法之外,還具有以下方法:
q.task_done():使用者使用此方法發(fā)出信號(hào),表示q.get()返回的項(xiàng)目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊(duì)列中刪除的項(xiàng)目數(shù)量,將引發(fā)ValueError異常。
q.join():生產(chǎn)者將使用此方法進(jìn)行阻塞,直到隊(duì)列中所有項(xiàng)目均被處理。阻塞將持續(xù)到為隊(duì)列中的每個(gè)項(xiàng)目均調(diào)用q.task_done()方法為止。
5.2 joinableQueue隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
from multiprocessing import Queue,Process,JoinableQueue import time,random def producer(q,name,food): for i in range(3): print(f'{name}生產(chǎn)了{(lán)food}{i}') # time.sleep((random.randint(1,3))) res = f'{food}{i}' q.put(res) # q.put(None) # 當(dāng)生產(chǎn)者結(jié)束生產(chǎn)的的時(shí)候,我們?cè)訇?duì)列的最后再做一個(gè)表示,告訴消費(fèi)者,生產(chǎn)者已經(jīng)不生產(chǎn)了,讓消費(fèi)者不要再去隊(duì)列里拿東西了 q.join() def consumer(q,name): while True: res = q.get(timeout=5) # if res == None:break # 判斷隊(duì)列拿出的是不是生產(chǎn)者放的結(jié)束生產(chǎn)的標(biāo)識(shí),如果是則不取,直接退出,結(jié)束程序 # time.sleep((random.randint(1, 3))) print(f'{name}吃了{(lán)res}') q.task_done()#向q.join()發(fā)送一次信號(hào),證明一個(gè)數(shù)據(jù)已經(jīng)被取走了 if __name__ == '__main__': q = JoinableQueue() # 為的是讓生產(chǎn)者和消費(fèi)者使用同一個(gè)隊(duì)列,使用同一個(gè)隊(duì)列進(jìn)行通訊 # 多個(gè)生產(chǎn)者進(jìn)程 p1 = Process(target=producer,args=(q,'Cecilia陳','巧克力')) p2 = Process(target=producer,args=(q,'xichen','冰激凌')) p3 = Process(target=producer,args=(q,'喜陳','可樂')) # 多個(gè)消費(fèi)者進(jìn)程 c1 = Process(target=consumer,args=(q,'Tom')) c2 = Process(target=consumer,args=(q,'jack')) # 告訴操作系統(tǒng)啟動(dòng)生產(chǎn)者進(jìn)程 p1.start() p2.start() p3.start() # 把生產(chǎn)者設(shè)為守護(hù)進(jìn)程 c1.daemon = True c2.daemon = True # 告訴操作系統(tǒng)啟動(dòng)消費(fèi)者進(jìn)程 c1.start() c2.start() p1.join() p2.join() p3.join() # 等待生產(chǎn)者生產(chǎn)完畢 print('主進(jìn)程') ### 分析 # 生產(chǎn)者生產(chǎn)完畢--這是主進(jìn)程最后一行代碼結(jié)束--q.join()消費(fèi)者已經(jīng)取干凈了,沒有存在的意義了 # 這是主進(jìn)程最后一行代碼結(jié)束,消費(fèi)者已經(jīng)取干凈了,沒有存在的意義了.守護(hù)進(jìn)程的概念.
5.3 測(cè)試joinableQueue
from multiprocessing import Process,Queue,JoinableQueue q = JoinableQueue() q.put('zhao') # 放隊(duì)列里一個(gè)任務(wù) q.put('qian') print(q.get()) q.task_done() # 完成了一次任務(wù) print(q.get()) q.task_done() # 完成了一次任務(wù) q.join() #計(jì)數(shù)器不為0的時(shí)候 阻塞等待計(jì)數(shù)器為0后通過 # 想象成一個(gè)計(jì)數(shù)器 :put +1 task_done -1
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持億速云。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。