您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“Python如何通過隊(duì)列實(shí)現(xiàn)進(jìn)程間通信”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“Python如何通過隊(duì)列實(shí)現(xiàn)進(jìn)程間通信”吧!
在多進(jìn)程中,每個(gè)進(jìn)程之間是什么關(guān)系呢?其實(shí)每個(gè)進(jìn)程都有自己的地址空間、內(nèi)存、數(shù)據(jù)棧以及其他記錄其運(yùn)行狀態(tài)的輔助數(shù)據(jù)。下面通過一個(gè)例子,驗(yàn)證一下進(jìn)程之間能否直接共享信息。
定義一個(gè)全局變量g_num,分別創(chuàng)建2個(gè)子進(jìn)程對(duì)g_num執(zhí)行不同的操作,并輸出操作后的結(jié)果。
代碼如下:
# _*_ coding:utf-8 _*_ from multiprocessing import Process def plus(): print("-------子進(jìn)程1開始----------") global g_num g_num += 50 print("g_num is %d" % g_num) print("-------子進(jìn)程1結(jié)束----------") def minus(): print("-------子進(jìn)程2開始----------") global g_num g_num -= 50 print("g_num is %d" % g_num) print("-------子進(jìn)程2結(jié)束----------") g_num = 100 # 定義一個(gè)全局變量 if __name__ == "__main__": print("-------主進(jìn)程開始----------") print("g_num is %d" % g_num) p1 = Process(target=plus) # 實(shí)例化進(jìn)程p1 p2 = Process(target=minus) # 實(shí)例化進(jìn)程p2 p1.start() # 開啟p1進(jìn)程 p2.start() # 開啟p2進(jìn)程 p1.join() # 等待p1進(jìn)程結(jié)束 p2.join() # 等待p2進(jìn)程結(jié)束 print("-------主進(jìn)程結(jié)束----------")
運(yùn)行結(jié)果如圖所示:
上述代碼中,分別創(chuàng)建了2個(gè)子進(jìn)程,一個(gè)子進(jìn)程中令g_num加上50,另一個(gè)子進(jìn)程令g_num減去50。但是從運(yùn)行結(jié)果可以看出來,g_num在父進(jìn)程和2個(gè)子進(jìn)程中的初始值都是100。也就是全局變量g_num在一個(gè)進(jìn)程中的結(jié)果,沒有傳到下一個(gè)進(jìn)程中,即進(jìn)程之間沒有共享信息。
進(jìn)程間示意圖如圖所示:
要如何才能實(shí)現(xiàn)進(jìn)程間的通信呢?Python的multiprocessing模塊包裝了底層的機(jī)制,提供了Queue(隊(duì)列)、Pipes(管道)等多種方式來交換數(shù)據(jù)。本文將講解通過隊(duì)列(Queue)來實(shí)現(xiàn)進(jìn)程間的通信。
隊(duì)列(Queue)就是模型仿現(xiàn)實(shí)中的排隊(duì)。例如學(xué)生在食堂排隊(duì)買飯。新來的學(xué)生排隊(duì)到隊(duì)伍最后,最前面的學(xué)生買完飯走開,后面的學(xué)生跟上。
可以看出隊(duì)列有兩個(gè)特點(diǎn):
新來的學(xué)生都排在隊(duì)尾。
最前的學(xué)生完成后離隊(duì),后面一個(gè)跟上。
根據(jù)以上特點(diǎn),可以歸納出隊(duì)列的結(jié)構(gòu)如圖所示:
進(jìn)程之間有時(shí)需要通信,操作系統(tǒng)提供了很多機(jī)制來實(shí)現(xiàn)進(jìn)程間的通信??梢允褂胢ultiprocessing模塊的Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。Queue本身是一個(gè)消息隊(duì)列程序,下面介紹一下Queue的使用。
初始化Queue()對(duì)象時(shí)(例如:q=Queue(num)),若括號(hào)中沒有指定最大可接收的消息數(shù)量,或數(shù)量為負(fù)值,那么就代表可接收的消息數(shù)量沒有上限(直到內(nèi)存的盡頭)。
Queue的常用方法如下:
Queue.qsize():返回當(dāng)前隊(duì)列包含的消息數(shù)量。Queue.empty():如果隊(duì)列為空,返回True;返之返回False。Queue.full():如果隊(duì)列滿了,返回True;反之返回False。Queue.get(block[,timeout]):獲取隊(duì)列中的一條信息,然后將其從隊(duì)列中移除,block默認(rèn)值為True。
如果block使用默認(rèn)值,且沒有設(shè)置timeout(單位秒),消息隊(duì)列為空,此時(shí)程序?qū)⒈蛔枞ㄍT谧x取狀態(tài)),直到從消息隊(duì)列讀到消息為止。如果設(shè)置了timeout,則會(huì)等待timeout秒,若還沒有讀取任何消息,則拋出“Queue.Empty”異常。
如果block值為False,消息隊(duì)列為空,則會(huì)立刻拋出“Queue.Empty”異常。
Queue.get_nowait():相當(dāng)于Queue.get(False)。Queue.put(item,[block[,timeout]]):將item消息寫入隊(duì)列,block默認(rèn)值為True。
如果block使用默認(rèn)值,且沒有設(shè)置timeout(單位秒),消息隊(duì)列如果已經(jīng)沒有空間可以寫入,此時(shí)程序?qū)⒈蛔枞?停在寫入狀態(tài)),直到從消息隊(duì)列騰出空間為止,如果設(shè)置了timeout,則會(huì)等待timeout秒,若還沒有空間,則拋出“Queue.Full”異常。
如果block值為False,消息隊(duì)列沒有空間可寫入,則會(huì)立刻拋出“Queue.Full”異常
Queue.put_nowait(item):相當(dāng)Queue.put(item,False)。
下面,通過一個(gè)例子學(xué)習(xí)一下如何使用processing.Queue。
代碼如下:
# _*_ coding:utf-8 _*_ from multiprocessing import Queue if __name__ == "__main__": q = Queue(3) q.put("消息1") q.put("消息2") print(q.full()) # 返回False q.put("消息3") print(q.full()) # 返回True # 因?yàn)橄㈥?duì)列已滿,下面的try都會(huì)拋出異常 # 第一個(gè)try會(huì)等待2秒再拋出異常,第二個(gè)try會(huì)立刻拋出異常 try: q.put("消息4", True, 2) except: print("消息隊(duì)列已滿,現(xiàn)有消息數(shù)量:%s" % q.qsize()) try: q.put_nowait("消息4") except: print("消息隊(duì)列已滿,現(xiàn)有消息數(shù)量:%s" % q.qsize()) # 讀取消息時(shí),先判斷消息隊(duì)列是否為空,再讀取 if not q.empty(): print("-----從隊(duì)列中獲取消息-------") for i in range(q.qsize()): print(q.get_nowait()) # 先判讀消息隊(duì)列是否已滿,再寫入: if not q.full(): q.put_nowait("消息4")
運(yùn)行結(jié)果如圖所示:
我們知道使用multiprocessing.Process可以創(chuàng)建多進(jìn)程,使用multiprocessing.Queue可以實(shí)現(xiàn)隊(duì)列的操作。接下來,通過一個(gè)示例結(jié)合Process和Queue實(shí)現(xiàn)進(jìn)程間的通信。
創(chuàng)建2個(gè)子進(jìn)程,一個(gè)子進(jìn)程負(fù)責(zé)向隊(duì)列中寫入數(shù)據(jù),另外一個(gè)子進(jìn)程負(fù)責(zé)從隊(duì)列中讀取數(shù)據(jù)。為了保證能夠正確從隊(duì)列中讀取數(shù)據(jù),設(shè)置讀取數(shù)據(jù)的進(jìn)程等待時(shí)間為2秒。如果2秒后乃然無法讀取數(shù)據(jù),則拋出異常。
代碼如下:
# _*_ coding:utf-8 _*_ from multiprocessing import Process, Queue import time # 向隊(duì)列中寫入數(shù)據(jù) def write_task(q): if not q.full(): for i in range(5): message = "消息" + str(i) q.put(message) print("寫入:%s" % message) # 從隊(duì)列中讀取數(shù)據(jù) def read_task(q): time.sleep(1) # 休眠1秒 while not q.empty(): print("讀取:%s" % q.get(True, 2)) # 等待2秒中,如果沒有讀取到任何信息,則拋出異常 if __name__ == "__main__": print("--------父進(jìn)程開始---------") q = Queue() # 父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程 pw = Process(target=write_task, args=(q,)) # 實(shí)例化寫入隊(duì)列的子進(jìn)程,并傳遞給隊(duì)列 pr = Process(target=read_task, args=(q,)) # 實(shí)例化讀取隊(duì)列的子進(jìn)程,并傳遞給隊(duì)列 pw.start() # 啟動(dòng)子進(jìn)程pw,寫入 pr.start() # 啟動(dòng)子進(jìn)程pr,讀取 pw.join() # 等待pw結(jié)束 pr.join() # 等待pr結(jié)束 print("-------父進(jìn)程結(jié)束-----------")
運(yùn)行結(jié)果如下:
到此,相信大家對(duì)“Python如何通過隊(duì)列實(shí)現(xiàn)進(jìn)程間通信”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。