溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Python如何通過隊(duì)列實(shí)現(xiàn)進(jìn)程間通信

發(fā)布時(shí)間:2022-06-17 13:49:07 來源:億速云 閱讀:127 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容主要講解“Python如何通過隊(duì)列實(shí)現(xiàn)進(jìn)程間通信”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“Python如何通過隊(duì)列實(shí)現(xiàn)進(jìn)程間通信”吧!

一、前言

在多進(jìn)程中,每個(gè)進(jìn)程之間是什么關(guān)系呢?其實(shí)每個(gè)進(jìn)程都有自己的地址空間、內(nèi)存、數(shù)據(jù)棧以及其他記錄其運(yùn)行狀態(tài)的輔助數(shù)據(jù)。下面通過一個(gè)例子,驗(yàn)證一下進(jìn)程之間能否直接共享信息。

定義一個(gè)全局變量g_num,分別創(chuàng)建2個(gè)子進(jìn)程對(duì)g_num執(zhí)行不同的操作,并輸出操作后的結(jié)果。

代碼如下:

# _*_ coding:utf-8 _*_
from multiprocessing import Process
def plus():
    print("-------子進(jìn)程1開始----------")
    global g_num
    g_num += 50
    print("g_num is %d" % g_num)
    print("-------子進(jìn)程1結(jié)束----------")
def minus():
    print("-------子進(jìn)程2開始----------")
    global g_num
    g_num -= 50
    print("g_num is %d" % g_num)
    print("-------子進(jìn)程2結(jié)束----------")
g_num = 100  # 定義一個(gè)全局變量
if __name__ == "__main__":
    print("-------主進(jìn)程開始----------")
    print("g_num is %d" % g_num)
    p1 = Process(target=plus)  # 實(shí)例化進(jìn)程p1
    p2 = Process(target=minus)  # 實(shí)例化進(jìn)程p2
    p1.start()  # 開啟p1進(jìn)程
    p2.start()  # 開啟p2進(jìn)程
    p1.join()  # 等待p1進(jìn)程結(jié)束
    p2.join()  # 等待p2進(jìn)程結(jié)束
    print("-------主進(jìn)程結(jié)束----------")

運(yùn)行結(jié)果如圖所示:

Python如何通過隊(duì)列實(shí)現(xiàn)進(jìn)程間通信

上述代碼中,分別創(chuàng)建了2個(gè)子進(jìn)程,一個(gè)子進(jìn)程中令g_num加上50,另一個(gè)子進(jìn)程令g_num減去50。但是從運(yùn)行結(jié)果可以看出來,g_num在父進(jìn)程和2個(gè)子進(jìn)程中的初始值都是100。也就是全局變量g_num在一個(gè)進(jìn)程中的結(jié)果,沒有傳到下一個(gè)進(jìn)程中,即進(jìn)程之間沒有共享信息。

進(jìn)程間示意圖如圖所示:

Python如何通過隊(duì)列實(shí)現(xiàn)進(jìn)程間通信

要如何才能實(shí)現(xiàn)進(jìn)程間的通信呢?Python的multiprocessing模塊包裝了底層的機(jī)制,提供了Queue(隊(duì)列)、Pipes(管道)等多種方式來交換數(shù)據(jù)。本文將講解通過隊(duì)列(Queue)來實(shí)現(xiàn)進(jìn)程間的通信。

二、隊(duì)列簡(jiǎn)介

隊(duì)列(Queue)就是模型仿現(xiàn)實(shí)中的排隊(duì)。例如學(xué)生在食堂排隊(duì)買飯。新來的學(xué)生排隊(duì)到隊(duì)伍最后,最前面的學(xué)生買完飯走開,后面的學(xué)生跟上。

可以看出隊(duì)列有兩個(gè)特點(diǎn):

  • 新來的學(xué)生都排在隊(duì)尾。

  • 最前的學(xué)生完成后離隊(duì),后面一個(gè)跟上。

根據(jù)以上特點(diǎn),可以歸納出隊(duì)列的結(jié)構(gòu)如圖所示:

Python如何通過隊(duì)列實(shí)現(xiàn)進(jìn)程間通信

三、多進(jìn)程隊(duì)列的使用

進(jìn)程之間有時(shí)需要通信,操作系統(tǒng)提供了很多機(jī)制來實(shí)現(xiàn)進(jìn)程間的通信??梢允褂胢ultiprocessing模塊的Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。Queue本身是一個(gè)消息隊(duì)列程序,下面介紹一下Queue的使用。

初始化Queue()對(duì)象時(shí)(例如:q=Queue(num)),若括號(hào)中沒有指定最大可接收的消息數(shù)量,或數(shù)量為負(fù)值,那么就代表可接收的消息數(shù)量沒有上限(直到內(nèi)存的盡頭)。

Queue的常用方法如下:

Queue.qsize():返回當(dāng)前隊(duì)列包含的消息數(shù)量。Queue.empty():如果隊(duì)列為空,返回True;返之返回False。Queue.full():如果隊(duì)列滿了,返回True;反之返回False。Queue.get(block[,timeout]):獲取隊(duì)列中的一條信息,然后將其從隊(duì)列中移除,block默認(rèn)值為True。

如果block使用默認(rèn)值,且沒有設(shè)置timeout(單位秒),消息隊(duì)列為空,此時(shí)程序?qū)⒈蛔枞ㄍT谧x取狀態(tài)),直到從消息隊(duì)列讀到消息為止。如果設(shè)置了timeout,則會(huì)等待timeout秒,若還沒有讀取任何消息,則拋出“Queue.Empty”異常。

如果block值為False,消息隊(duì)列為空,則會(huì)立刻拋出“Queue.Empty”異常。

Queue.get_nowait():相當(dāng)于Queue.get(False)。Queue.put(item,[block[,timeout]]):將item消息寫入隊(duì)列,block默認(rèn)值為True。

如果block使用默認(rèn)值,且沒有設(shè)置timeout(單位秒),消息隊(duì)列如果已經(jīng)沒有空間可以寫入,此時(shí)程序?qū)⒈蛔枞?停在寫入狀態(tài)),直到從消息隊(duì)列騰出空間為止,如果設(shè)置了timeout,則會(huì)等待timeout秒,若還沒有空間,則拋出“Queue.Full”異常。

如果block值為False,消息隊(duì)列沒有空間可寫入,則會(huì)立刻拋出“Queue.Full”異常

Queue.put_nowait(item):相當(dāng)Queue.put(item,False)。

下面,通過一個(gè)例子學(xué)習(xí)一下如何使用processing.Queue。

代碼如下:

# _*_ coding:utf-8 _*_
from multiprocessing import Queue
if __name__ == "__main__":
    q = Queue(3)
    q.put("消息1")
    q.put("消息2")
    print(q.full())  # 返回False
    q.put("消息3")
    print(q.full())  # 返回True
    # 因?yàn)橄㈥?duì)列已滿,下面的try都會(huì)拋出異常
    # 第一個(gè)try會(huì)等待2秒再拋出異常,第二個(gè)try會(huì)立刻拋出異常
    try:
        q.put("消息4", True, 2)
    except:
        print("消息隊(duì)列已滿,現(xiàn)有消息數(shù)量:%s" % q.qsize())

    try:
        q.put_nowait("消息4")
    except:
        print("消息隊(duì)列已滿,現(xiàn)有消息數(shù)量:%s" % q.qsize())

    # 讀取消息時(shí),先判斷消息隊(duì)列是否為空,再讀取
    if not q.empty():
        print("-----從隊(duì)列中獲取消息-------")
        for i in range(q.qsize()):
            print(q.get_nowait())
    # 先判讀消息隊(duì)列是否已滿,再寫入:
    if not q.full():
        q.put_nowait("消息4")

運(yùn)行結(jié)果如圖所示:

Python如何通過隊(duì)列實(shí)現(xiàn)進(jìn)程間通信

四、使用隊(duì)列在進(jìn)程間通信

我們知道使用multiprocessing.Process可以創(chuàng)建多進(jìn)程,使用multiprocessing.Queue可以實(shí)現(xiàn)隊(duì)列的操作。接下來,通過一個(gè)示例結(jié)合Process和Queue實(shí)現(xiàn)進(jìn)程間的通信。

創(chuàng)建2個(gè)子進(jìn)程,一個(gè)子進(jìn)程負(fù)責(zé)向隊(duì)列中寫入數(shù)據(jù),另外一個(gè)子進(jìn)程負(fù)責(zé)從隊(duì)列中讀取數(shù)據(jù)。為了保證能夠正確從隊(duì)列中讀取數(shù)據(jù),設(shè)置讀取數(shù)據(jù)的進(jìn)程等待時(shí)間為2秒。如果2秒后乃然無法讀取數(shù)據(jù),則拋出異常。

代碼如下:

# _*_ coding:utf-8 _*_
from multiprocessing import Process, Queue
import time
# 向隊(duì)列中寫入數(shù)據(jù)
def write_task(q):
    if not q.full():
        for i in range(5):
            message = "消息" + str(i)
            q.put(message)
            print("寫入:%s" % message)
# 從隊(duì)列中讀取數(shù)據(jù)
def read_task(q):
    time.sleep(1)  # 休眠1秒
    while not q.empty():
        print("讀取:%s" % q.get(True, 2))  # 等待2秒中,如果沒有讀取到任何信息,則拋出異常
if __name__ == "__main__":
    print("--------父進(jìn)程開始---------")
    q = Queue()  # 父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程
    pw = Process(target=write_task, args=(q,))  # 實(shí)例化寫入隊(duì)列的子進(jìn)程,并傳遞給隊(duì)列
    pr = Process(target=read_task, args=(q,))  # 實(shí)例化讀取隊(duì)列的子進(jìn)程,并傳遞給隊(duì)列
    pw.start()  # 啟動(dòng)子進(jìn)程pw,寫入
    pr.start()  # 啟動(dòng)子進(jìn)程pr,讀取
    pw.join()  # 等待pw結(jié)束
    pr.join()  # 等待pr結(jié)束
    print("-------父進(jìn)程結(jié)束-----------")

運(yùn)行結(jié)果如下:

Python如何通過隊(duì)列實(shí)現(xiàn)進(jìn)程間通信

到此,相信大家對(duì)“Python如何通過隊(duì)列實(shí)現(xiàn)進(jìn)程間通信”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI