queue 二、隊(duì)列(Queue) ..."/>
溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Python進(jìn)程間通信 multiProcessing Queue隊(duì)列實(shí)現(xiàn)詳解

發(fā)布時(shí)間:2020-10-04 10:11:18 來源:腳本之家 閱讀:1829 作者:Cecilia喜陳 欄目:開發(fā)技術(shù)

一、進(jìn)程間通信

IPC(Inter-Process Communication)

IPC機(jī)制:實(shí)現(xiàn)進(jìn)程之間通訊

管道:pipe 基于共享的內(nèi)存空間

隊(duì)列:pipe+鎖的概念--->queue

二、隊(duì)列(Queue)

2.1 概念-----multiProcess.Queue

創(chuàng)建共享的進(jìn)程隊(duì)列,Queue是多進(jìn)程安全的隊(duì)列,可以使用Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。

Queue([maxsize])創(chuàng)建共享的進(jìn)程隊(duì)列。

參數(shù) :maxsize是隊(duì)列中允許的最大項(xiàng)數(shù)。如果省略此參數(shù),則無(wú)大小限制。

底層隊(duì)列使用管道和鎖定實(shí)現(xiàn)。

2.2 Queue方法使用

2.2.1 q.get的使用:

是從隊(duì)列里面取值并且把隊(duì)列面的取出來的值刪掉,沒有參數(shù)的情況下就是是默認(rèn)一直等著取值

就算是隊(duì)列里面沒有可取的值的時(shí)候,程序也不會(huì)結(jié)束,就會(huì)卡在哪里,一直等著

from multiprocessing import Queue
q = Queue() # 生成一個(gè)隊(duì)列對(duì)象
# put方法是往隊(duì)列里面放值
q.put('Cecilia陳')
q.put('xuchen')
q.put('喜陳')

# get方法是從隊(duì)列里面取值
print(q.get())
print(q.get())
print(q.get())

q.put(5)
q.put(6)
print(q.get())

Cecilia陳

xuchen

喜陳

5

2.2.2 Queue(參數(shù)) +參數(shù)的使用:

Queue加參數(shù)以后,參數(shù)是數(shù)值

參數(shù)實(shí)幾就表示實(shí)例化的這個(gè)Queue隊(duì)列可以放幾個(gè)值

當(dāng)隊(duì)列已經(jīng)滿的時(shí)候,再放值,程序會(huì)阻塞,但不會(huì)結(jié)束

from multiprocessing import Queue
q = Queue(3)
q.put('Cecilia陳')
q.put('xuchen')
q.put('喜陳')
print(q.full()) # 判斷隊(duì)列是否滿了 返回的是True/False
q.put(2) # 當(dāng)隊(duì)列已經(jīng)滿的時(shí)候,再放值,程序會(huì)阻塞,但不會(huì)結(jié)束

True 隊(duì)列已經(jīng)滿了

2.2.3 q.put(參數(shù)1,參數(shù)2,參數(shù)3,參數(shù)4):

q.put(self, obj, block=True, timeout=None)

self :put就相當(dāng)于是Queue里的一個(gè)方法,這個(gè)時(shí)候q.put就相當(dāng)于是隊(duì)列對(duì)象q來調(diào)用對(duì)象的綁定方法,這個(gè)參數(shù)可以省略即可

obj:是我們需要往隊(duì)列里面放的值

block=True :隊(duì)列如果滿了的話,再往隊(duì)列里放值的話會(huì)等待,程序不會(huì)結(jié)束

timeout=None:是再block這個(gè)參數(shù)的基礎(chǔ)上的,當(dāng)block的值為真的時(shí)候,timeout是用來等待多少秒,如果再這個(gè)時(shí)間里,隊(duì)列一直是滿的,那么程序就會(huì)報(bào)錯(cuò)并結(jié)束(Queue.Full異常)

from multiprocessing import Queue
q = Queue(3)
q.put('zhao',block=True,timeout=2)
q.put('zhao',block=True,timeout=2)
q.put('zhao',block=True,timeout=2)
q.put('zhao',block=True,timeout=5) # 此時(shí)程序?qū)?duì)等待5秒以后報(bào)錯(cuò)了

2.2.4 q.get(參數(shù)1,參數(shù)2,參數(shù)3,參數(shù)4):

q.get(self,block=True, timeout=None)

self :get就相當(dāng)于是Queue里的一個(gè)方法,這個(gè)時(shí)候q.get就相當(dāng)于是隊(duì)列對(duì)象q來調(diào)用對(duì)象的綁定方法,這個(gè)參數(shù)可以省略即可

block=True :從隊(duì)列q對(duì)象里面取值,如果娶不到值的話,程序不會(huì)結(jié)束

timeout=None:是再block這個(gè)參數(shù)的基礎(chǔ)上的,當(dāng)block的值為真的時(shí)候,timeout是用來等待多少秒,如果再這個(gè)時(shí)間里,get取不到隊(duì)列里面的值的話,那么程序就會(huì)報(bào)錯(cuò)并結(jié)束(queue.Empty異常)

from multiprocessing import Queue
q = Queue()
q.put('Cecilia陳')
print(q.get())
q.get(block=True,timeout=2) # 此時(shí)程序會(huì)等待2秒后,報(bào)錯(cuò)了,隊(duì)列里面沒有值了

2.2.5 block=False:

如果block的值是False的話,那么put方法再隊(duì)列是滿的情況下,不會(huì)等待阻塞,程序直接報(bào)錯(cuò)(Queue.Full異常)結(jié)束

如果block的值是False的話,那么get方法再隊(duì)列里面沒有值的情況下,再去取的時(shí)候,不會(huì)等待阻塞,程序直接報(bào)錯(cuò)(queue.Empty異常)結(jié)束

1.put()的block=False

from multiprocessing import Queue
q = Queue(2)
q.put('Cecilia陳')
q.put('喜陳')
print(q.full())
q.put('xichen',block=False) # 隊(duì)列已經(jīng)滿了,我不等待了,直接報(bào)錯(cuò)

2.get()的block=Flase

from multiprocessing import Queue
q = Queue(2)
q.put('Cecilia陳')
q.put('喜陳')
print(q.get())
print(q.get())
print(q.get(block=False)) # 隊(duì)列已經(jīng)沒有值了,我不等待了,直接報(bào)錯(cuò)

2.2.6 put_nowait()/get_nowait()

1.put_nowait() 相當(dāng)于bolok=False,隊(duì)列滿的時(shí)候,再放值的時(shí)候,程序不等待,不阻塞,直接報(bào)錯(cuò)

from multiprocessing import Queue
q = Queue(2)
q.put('Cecilia陳')
q.put('喜陳')
print(q.full())

q.put_nowait('xichen') # 程序不等待,不阻塞,直接報(bào)錯(cuò)

2.get_nowait() 相當(dāng)于bolok=False,當(dāng)隊(duì)列里沒有值的時(shí)候,再取值的時(shí)候,程序不等待,不阻塞,程序直接報(bào)錯(cuò)

from multiprocessing import Queue
q = Queue(2)
q.put('Cecilia陳')
q.put('喜陳')
print(q.get())
print(q.get())
print(q.full())
q.get_nowait()# 再取值的時(shí)候,程序不等待,不阻塞,程序直接報(bào)錯(cuò)

三、代碼實(shí)例

3.1 單看隊(duì)列的存取數(shù)據(jù)用法

這個(gè)例子還沒有加入進(jìn)程通信,只是先來看看隊(duì)列為我們提供的方法,以及這些方法的使用和現(xiàn)象。

'''
multiprocessing模塊支持進(jìn)程間通信的兩種主要形式:管道和隊(duì)列
都是基于消息傳遞實(shí)現(xiàn)的,但是隊(duì)列接口
'''

from multiprocessing import Queue
q=Queue(3)

#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3)  # 如果隊(duì)列已經(jīng)滿了,程序就會(huì)停在這里,等待數(shù)據(jù)被別人取走,再將數(shù)據(jù)放入隊(duì)列。
      # 如果隊(duì)列中的數(shù)據(jù)一直不被取走,程序就會(huì)永遠(yuǎn)停在這里。
try:
  q.put_nowait(3) # 可以使用put_nowait,如果隊(duì)列滿了不會(huì)阻塞,但是會(huì)因?yàn)殛?duì)列滿了而報(bào)錯(cuò)。
except: # 因此我們可以用一個(gè)try語(yǔ)句來處理這個(gè)錯(cuò)誤。這樣程序不會(huì)一直阻塞下去,但是會(huì)丟掉這個(gè)消息。
  print('隊(duì)列已經(jīng)滿了')

# 因此,我們?cè)俜湃霐?shù)據(jù)之前,可以先看一下隊(duì)列的狀態(tài),如果已經(jīng)滿了,就不繼續(xù)put了。
print(q.full()) #滿了
print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 同put方法一樣,如果隊(duì)列已經(jīng)空了,那么繼續(xù)取就會(huì)出現(xiàn)阻塞。
try:
  q.get_nowait(3) # 可以使用get_nowait,如果隊(duì)列滿了不會(huì)阻塞,但是會(huì)因?yàn)闆]取到值而報(bào)錯(cuò)。
except: # 因此我們可以用一個(gè)try語(yǔ)句來處理這個(gè)錯(cuò)誤。這樣程序不會(huì)一直阻塞下去。
  print('隊(duì)列已經(jīng)空了')

print(q.empty()) #空了

3.2 子進(jìn)程向父進(jìn)程發(fā)送數(shù)據(jù)

這是一個(gè)queue的簡(jiǎn)單應(yīng)用,使用隊(duì)列q對(duì)象調(diào)用get函數(shù)來取得隊(duì)列中最先進(jìn)入的數(shù)據(jù)。

from multiprocessing import Process, Queue
def f(q,name,age):
  q.put(name,age) #調(diào)用主函數(shù)中p進(jìn)程傳遞過來的進(jìn)程參數(shù) put函數(shù)為向隊(duì)列中添加一條數(shù)據(jù)。
if __name__ == '__main__':
  q = Queue() #創(chuàng)建一個(gè)Queue對(duì)象
  p = Process(target=f, args=(q,'Cecilia陳',18)) #創(chuàng)建一個(gè)進(jìn)程
  p.start()
  print(q.get())
  p.join()

['Cecilia陳', 18]

四、生產(chǎn)者消費(fèi)者模型

生產(chǎn)者: 生產(chǎn)數(shù)據(jù)的任務(wù)

消費(fèi)者: 處理數(shù)據(jù)的任務(wù)

生產(chǎn)者--隊(duì)列(盆)-->消費(fèi)者

生產(chǎn)者可以不停的生產(chǎn),達(dá)到了自己最大的生產(chǎn)效率,消費(fèi)者可以不停的消費(fèi),也達(dá)到了自己最大的消費(fèi)效率.

生產(chǎn)者消費(fèi)者模型大大提高了生產(chǎn)者生產(chǎn)的效率和消費(fèi)者消費(fèi)的效率.

補(bǔ)充: queue不適合傳大文件,通產(chǎn)傳一些消息.

在并發(fā)編程中使用生產(chǎn)者和消費(fèi)者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費(fèi)線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。

4.1 為什么要使用生產(chǎn)者和消費(fèi)者模型

在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個(gè)問題于是引入了生產(chǎn)者和消費(fèi)者模式。

4.2 什么是生產(chǎn)者消費(fèi)者模型

生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊(duì)列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。

4.3 基于Queue隊(duì)列實(shí)現(xiàn)的生產(chǎn)者消費(fèi)者模型

from multiprocessing import Queue,Process
# 生產(chǎn)者
def producer(q,name,food):
  for i in range(3):
    print(f'{name}生產(chǎn)了{(lán)food}{i}')
    res = f'{food}{i}'
    q.put(res)
# 消費(fèi)者
def consumer(q,name):
  while True:
    res = q.get(timeout=5)
    print(f'{name}吃了{(lán)res}')
if __name__ == '__main__':
  q = Queue() # 為的是讓生產(chǎn)者和消費(fèi)者使用同一個(gè)隊(duì)列,使用同一個(gè)隊(duì)列進(jìn)行通訊
  p1 = Process(target=producer,args=(q,'Cecilia陳','巧克力'))
  c1 = Process(target=consumer,args=(q,'Tom'))
  p1.start()
  c1.start()

此時(shí)的問題是主進(jìn)程永遠(yuǎn)不會(huì)結(jié)束,原因是:生產(chǎn)者p在生產(chǎn)完后就結(jié)束了,但是消費(fèi)者c在取空了q之后,則一直處于死循環(huán)中且卡在q.get()這一步。

解決方式無(wú)非是讓生產(chǎn)者在生產(chǎn)完畢后,往隊(duì)列中再發(fā)一個(gè)結(jié)束信號(hào),這樣消費(fèi)者在接收到結(jié)束信號(hào)后就可以break出死循環(huán)。

4.4 改良版----生產(chǎn)者消費(fèi)者模型

注意:結(jié)束信號(hào)None,不一定要由生產(chǎn)者發(fā),主進(jìn)程里同樣可以發(fā),但主進(jìn)程需要等生產(chǎn)者結(jié)束后才應(yīng)該發(fā)送該信號(hào)

from multiprocessing import Queue,Process
def producer(q,name,food):
  for i in range(3):
    print(f'{name}生產(chǎn)了{(lán)food}{i}')
    res = f'{food}{i}'
    q.put(res)
  q.put(None) # 當(dāng)生產(chǎn)者結(jié)束生產(chǎn)的的時(shí)候,我們?cè)訇?duì)列的最后再做一個(gè)表示,告訴消費(fèi)者,生產(chǎn)者已經(jīng)不生產(chǎn)了,讓消費(fèi)者不要再去隊(duì)列里拿東西了
def consumer(q,name):
  while True:
    res = q.get(timeout=5)
    if res == None:break # 判斷隊(duì)列拿出的是不是生產(chǎn)者放的結(jié)束生產(chǎn)的標(biāo)識(shí),如果是則不取,直接退出,結(jié)束程序
    print(f'{name}吃了{(lán)res}')
if __name__ == '__main__':
  q = Queue() # 為的是讓生產(chǎn)者和消費(fèi)者使用同一個(gè)隊(duì)列,使用同一個(gè)隊(duì)列進(jìn)行通訊
  p1 = Process(target=producer,args=(q,'Cecilia陳','巧克力'))
  c1 = Process(target=consumer,args=(q,'Tom'))
  p1.start()
  c1.start()

4.5 主進(jìn)程在生產(chǎn)者生產(chǎn)結(jié)束以后,發(fā)送結(jié)束信號(hào)

使用這個(gè)方法的話,是很low的,有幾個(gè)消費(fèi)者就要在主進(jìn)程中向隊(duì)列中put幾個(gè)結(jié)束信號(hào)

from multiprocessing import Queue,Process
import time,random

def producer(q,name,food):
  for i in range(3):
    print(f'{name}生產(chǎn)了{(lán)food}{i}')
    time.sleep((random.randint(1,3)))
    res = f'{food}{i}'
    q.put(res)
  # q.put(None) # 當(dāng)生產(chǎn)者結(jié)束生產(chǎn)的的時(shí)候,我們?cè)訇?duì)列的最后再做一個(gè)表示,告訴消費(fèi)者,生產(chǎn)者已經(jīng)不生產(chǎn)了,讓消費(fèi)者不要再去隊(duì)列里拿東西了



def consumer(q,name):
  while True:
    res = q.get(timeout=5)
    if res == None:break # 判斷隊(duì)列拿出的是不是生產(chǎn)者放的結(jié)束生產(chǎn)的標(biāo)識(shí),如果是則不取,直接退出,結(jié)束程序
    time.sleep((random.randint(1, 3)))
    print(f'{name}吃了{(lán)res}')

if __name__ == '__main__':
  q = Queue() # 為的是讓生產(chǎn)者和消費(fèi)者使用同一個(gè)隊(duì)列,使用同一個(gè)隊(duì)列進(jìn)行通訊
  # 多個(gè)生產(chǎn)者進(jìn)程
  p1 = Process(target=producer,args=(q,'Cecilia陳','巧克力'))
  p2 = Process(target=producer,args=(q,'xichen','冰激凌'))
  p3 = Process(target=producer,args=(q,'喜陳','可樂'))
  # 多個(gè)消費(fèi)者進(jìn)程
  c1 = Process(target=consumer,args=(q,'Tom'))
  c2 = Process(target=consumer,args=(q,'jack'))


  # 告訴操作系統(tǒng)啟動(dòng)生產(chǎn)者進(jìn)程
  p1.start()
  p2.start()
  p3.start()

  # 告訴操作系統(tǒng)啟動(dòng)消費(fèi)者進(jìn)程
  c1.start()
  c2.start()

  p1.join()
  p2.join()
  p3.join()

  q.put(None) # 幾個(gè)消費(fèi)者put幾次
  q.put(None)

五、JoinableQueue方法

創(chuàng)建可連接的共享進(jìn)程隊(duì)列。這就像是一個(gè)Queue對(duì)象,但隊(duì)列允許項(xiàng)目的使用者通知生產(chǎn)者項(xiàng)目已經(jīng)被成功處理。通知進(jìn)程是使用共享的信號(hào)和條件變量來實(shí)現(xiàn)的。

5.1 方法介紹

JoinableQueue的實(shí)例p除了與Queue對(duì)象相同的方法之外,還具有以下方法:

q.task_done():使用者使用此方法發(fā)出信號(hào),表示q.get()返回的項(xiàng)目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊(duì)列中刪除的項(xiàng)目數(shù)量,將引發(fā)ValueError異常。

q.join():生產(chǎn)者將使用此方法進(jìn)行阻塞,直到隊(duì)列中所有項(xiàng)目均被處理。阻塞將持續(xù)到為隊(duì)列中的每個(gè)項(xiàng)目均調(diào)用q.task_done()方法為止。

5.2 joinableQueue隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型

from multiprocessing import Queue,Process,JoinableQueue
import time,random

def producer(q,name,food):
  for i in range(3):
    print(f'{name}生產(chǎn)了{(lán)food}{i}')
    # time.sleep((random.randint(1,3)))
    res = f'{food}{i}'
    q.put(res)
  # q.put(None) # 當(dāng)生產(chǎn)者結(jié)束生產(chǎn)的的時(shí)候,我們?cè)訇?duì)列的最后再做一個(gè)表示,告訴消費(fèi)者,生產(chǎn)者已經(jīng)不生產(chǎn)了,讓消費(fèi)者不要再去隊(duì)列里拿東西了
  q.join()


def consumer(q,name):
  while True:
    res = q.get(timeout=5)
    # if res == None:break # 判斷隊(duì)列拿出的是不是生產(chǎn)者放的結(jié)束生產(chǎn)的標(biāo)識(shí),如果是則不取,直接退出,結(jié)束程序
    # time.sleep((random.randint(1, 3)))
    print(f'{name}吃了{(lán)res}')
    q.task_done()#向q.join()發(fā)送一次信號(hào),證明一個(gè)數(shù)據(jù)已經(jīng)被取走了


if __name__ == '__main__':
  q = JoinableQueue() # 為的是讓生產(chǎn)者和消費(fèi)者使用同一個(gè)隊(duì)列,使用同一個(gè)隊(duì)列進(jìn)行通訊
  # 多個(gè)生產(chǎn)者進(jìn)程
  p1 = Process(target=producer,args=(q,'Cecilia陳','巧克力'))
  p2 = Process(target=producer,args=(q,'xichen','冰激凌'))
  p3 = Process(target=producer,args=(q,'喜陳','可樂'))
  # 多個(gè)消費(fèi)者進(jìn)程
  c1 = Process(target=consumer,args=(q,'Tom'))
  c2 = Process(target=consumer,args=(q,'jack'))


  # 告訴操作系統(tǒng)啟動(dòng)生產(chǎn)者進(jìn)程
  p1.start()
  p2.start()
  p3.start()

  # 把生產(chǎn)者設(shè)為守護(hù)進(jìn)程
  c1.daemon = True
  c2.daemon = True
  # 告訴操作系統(tǒng)啟動(dòng)消費(fèi)者進(jìn)程
  c1.start()
  c2.start()

  p1.join()
  p2.join()
  p3.join() # 等待生產(chǎn)者生產(chǎn)完畢

  print('主進(jìn)程')

  ### 分析
  # 生產(chǎn)者生產(chǎn)完畢--這是主進(jìn)程最后一行代碼結(jié)束--q.join()消費(fèi)者已經(jīng)取干凈了,沒有存在的意義了
  # 這是主進(jìn)程最后一行代碼結(jié)束,消費(fèi)者已經(jīng)取干凈了,沒有存在的意義了.守護(hù)進(jìn)程的概念.

5.3 測(cè)試joinableQueue

from multiprocessing import Process,Queue,JoinableQueue
q = JoinableQueue()
q.put('zhao') # 放隊(duì)列里一個(gè)任務(wù)
q.put('qian')
print(q.get())
q.task_done() # 完成了一次任務(wù)
print(q.get())
q.task_done() # 完成了一次任務(wù)
q.join() #計(jì)數(shù)器不為0的時(shí)候 阻塞等待計(jì)數(shù)器為0后通過

# 想象成一個(gè)計(jì)數(shù)器 :put +1  task_done -1

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持億速云。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI