您好,登錄后才能下訂單哦!
這篇文章主要介紹“Python進(jìn)程間的通信方式是什么”的相關(guān)知識(shí),小編通過實(shí)際案例向大家展示操作過程,操作方法簡單快捷,實(shí)用性強(qiáng),希望這篇“Python進(jìn)程間的通信方式是什么”文章能幫助大家解決問題。
這里舉一個(gè)例子接介紹通信的機(jī)制:通信 一詞大家并不陌生,比如一個(gè)人要給他的女友打電話。當(dāng)建立了通話之后,在這個(gè)通話的過程中就是建立了一條隱形的 隊(duì)列 (記住這個(gè)詞)。此時(shí)這個(gè)人就會(huì)通過對(duì)話的方式不停的將信息告訴女友,而這個(gè)人的女友也是在傾聽著。(嗯…我個(gè)人覺得大部分情況下可能是反著來的)。
這里可以將他們兩個(gè)人比作是兩個(gè)進(jìn)程,"這個(gè)人"的進(jìn)程需要將信息發(fā)送給"女友"的進(jìn)程,就需要一個(gè)隊(duì)列的幫助。而女友需要不停的接收隊(duì)列的信息,可以做一些其他的事情,所以兩個(gè)進(jìn)程之間的通信主要依賴于隊(duì)列。
這個(gè)隊(duì)列可以支持發(fā)送消息與接收消息,“這個(gè)人"負(fù)責(zé)發(fā)送消息,反之"女友” 負(fù)責(zé)的是接收消息。
既然隊(duì)列才是重點(diǎn),那么來看一下隊(duì)列要如何創(chuàng)建。
依然使用 multiprocessing 模塊,調(diào)用該模塊的 Queue 函數(shù)來實(shí)現(xiàn)隊(duì)列的創(chuàng)建。
函數(shù)名 | 介紹 | 參數(shù) | 返回值 |
---|---|---|---|
Queue | 隊(duì)列的創(chuàng)建 | mac_count | 隊(duì)列對(duì)象 |
Queue 函數(shù)功能介紹:調(diào)用 Queue 可以創(chuàng)建隊(duì)列;它有一個(gè)參數(shù) mac_count 代表隊(duì)列最大可以創(chuàng)建多少信息,如果不傳默認(rèn)是無限長度。實(shí)例化一個(gè)隊(duì)列對(duì)象之后,需要操作這個(gè)隊(duì)列的對(duì)象進(jìn)行放入與取出數(shù)據(jù)。
函數(shù)名 | 介紹 | 參數(shù) | 返回值 |
---|---|---|---|
put | 將消息放入隊(duì)列 | message | 無 |
get | 獲取隊(duì)列消息 | 無 | str |
put 函數(shù)功能介紹:將數(shù)據(jù)傳入。它有一個(gè)參數(shù) message ,是一個(gè)字符串類型。
get 函數(shù)功能介紹:用來接收隊(duì)列中的數(shù)據(jù)。(其實(shí)這里就是一個(gè)常用的json場景,有很多的數(shù)據(jù)傳輸都是 字符串 的,隊(duì)列的插入與獲取就是使用的字符串,所以 json 就非常適用這個(gè)場景。)
接下來就來練習(xí)一下 隊(duì)列的使用 。
代碼示例如下:
# coding:utf-8 import json import multiprocessing class Work(object): # 定義一個(gè) Work 類 def __init__(self, queue): # 構(gòu)造函數(shù)傳入一個(gè) '隊(duì)列對(duì)象' --> queue self.queue = queue def send(self, message): # 定義一個(gè) send(發(fā)送) 函數(shù),傳入 message # [這里有個(gè)隱藏的bug,就是只判斷了傳入的是否字符串類型;如果傳入的是函數(shù)、類、集合等依然會(huì)報(bào)錯(cuò)] if not isinstance(message, str): # 判斷傳入的 message 是否為字符串,若不是,則進(jìn)行 json 序列化 message = json.dumps(message) self.queue.put(message) # 利用 queue 的隊(duì)列實(shí)例化對(duì)象將 message 發(fā)送出去 def receive(self): # 定義一個(gè) receive(接收) 函數(shù),不需傳入?yún)?shù),但是因?yàn)榻邮帐且粋€(gè)源源不斷的過程,所以需要使用 while 循環(huán) while 1: result = self.queue.get() # 獲取 '隊(duì)列對(duì)象' --> queue 傳入的message # 由于我們接收的 message 可能不是一個(gè)字符串,所以要進(jìn)程異常的捕獲 try: # 如果傳入的 message 符合 JSON 格式將賦值給 res ;若不符合,則直接使用 result 賦值 res res = json.loads(result) except: res = result print('接收到的信息為:{}'.format(res)) if __name__ == '__main__': queue = multiprocessing.Queue() work = Work(queue) send = multiprocessing.Process(target=work.send, args=({'message': '這是一條測試的消息'},)) receive = multiprocessing.Process(target=work.receive) send.start() receive.start()
使用隊(duì)列建立進(jìn)程間通信遇到的異常
但是這里會(huì)出現(xiàn)一個(gè) 報(bào)錯(cuò),如下圖:
報(bào)錯(cuò)截圖示例如下:
這里的報(bào)錯(cuò)提示是 文件沒有被發(fā)現(xiàn)的意思 。其實(shí)這里是我們使用 隊(duì)列做 put() 和 get()的時(shí)候 有一把無形的鎖加了上去,就是上圖中圈中的 .SemLock 。我們不需要去關(guān)心造成這個(gè)錯(cuò)誤的具體原因,要解決這個(gè)問題其實(shí)也很簡單。
FileNotFoundError: [Errno 2] No such file or directory 異常的解決
我們只需要給 send 或者 receive 其中一個(gè)子進(jìn)程添加 join 阻塞進(jìn)程即可,理論上如此。但是我們的 receive子進(jìn)程是一個(gè) while循環(huán),它會(huì)一直執(zhí)行,所以只需要給 send 子進(jìn)程加上一個(gè) join 即可。
解決示意圖如下:
PS:雖然解決了報(bào)錯(cuò)問題,但是程序沒有正常退出。
實(shí)際上由于我們的 receive 進(jìn)程是個(gè) while循環(huán),并不知道要處理到什么時(shí)候,沒有辦法立刻終止。所以我們需要在 receive 進(jìn)程 使用 terminate() 函數(shù)終結(jié)接收端。
運(yùn)行結(jié)果如下:
新建一個(gè)函數(shù),寫入 for循環(huán) 模擬批量添加要發(fā)送的消息
然后再給這個(gè)模擬批量發(fā)送數(shù)據(jù)的函數(shù)添加一個(gè)線程。
示例代碼如下:
# coding:utf-8 import json import time import multiprocessing class Work(object): # 定義一個(gè) Work 類 def __init__(self, queue): # 構(gòu)造函數(shù)傳入一個(gè) '隊(duì)列對(duì)象' --> queue self.queue = queue def send(self, message): # 定義一個(gè) send(發(fā)送) 函數(shù),傳入 message # [這里有個(gè)隱藏的bug,就是只判斷了傳入的是否字符串類型;如果傳入的是函數(shù)、類、集合等依然會(huì)報(bào)錯(cuò)] if not isinstance(message, str): # 判斷傳入的 message 是否為字符串,若不是,則進(jìn)行 json 序列化 message = json.dumps(message) self.queue.put(message) # 利用 queue 的隊(duì)列實(shí)例化對(duì)象將 message 發(fā)送出去 def send_all(self): # 定義一個(gè) send_all(發(fā)送)函數(shù),然后通過for循環(huán)模擬批量發(fā)送的 message for i in range(20): self.queue.put('第 {} 次循環(huán),發(fā)送的消息為:{}'.format(i, i)) time.sleep(1) def receive(self): # 定義一個(gè) receive(接收) 函數(shù),不需傳入?yún)?shù),但是因?yàn)榻邮帐且粋€(gè)源源不斷的過程,所以需要使用 while 循環(huán) while 1: result = self.queue.get() # 獲取 '隊(duì)列對(duì)象' --> queue 傳入的message # 由于我們接收的 message 可能不是一個(gè)字符串,所以要進(jìn)程異常的捕獲 try: # 如果傳入的 message 符合 JSON 格式將賦值給 res ;若不符合,則直接使用 result 賦值 res res = json.loads(result) except: res = result print('接收到的信息為:{}'.format(res)) if __name__ == '__main__': queue = multiprocessing.Queue() work = Work(queue) send = multiprocessing.Process(target=work.send, args=({'message': '這是一條測試的消息'},)) receive = multiprocessing.Process(target=work.receive) send_all = multiprocessing.Process(target=work.send_all,) send_all.start() # 這里因?yàn)?nbsp;send 只執(zhí)行了1次,然后就結(jié)束了。而 send_all 卻要循環(huán)20次,它的執(zhí)行時(shí)間是最長的,信息也是發(fā)送的最多的 send.start() receive.start() # send.join() # 使用 send 的阻塞會(huì)造成 send_all 循環(huán)還未結(jié)束 ,receive.terminate() 函數(shù)接收端就會(huì)終結(jié)。 send_all.join() # 所以我們只需要阻塞最長使用率的進(jìn)程就可以了 receive.terminate()
運(yùn)行結(jié)果如下:
從上圖中我們可以看到 send 與 send_all 兩個(gè)進(jìn)程都可以通過 queue這個(gè)實(shí)例化的 Queue 對(duì)象發(fā)送消息,同樣的 receive接收函數(shù)也會(huì)將兩個(gè)進(jìn)程傳入的 message 打印輸出出來。
該章節(jié)我們通過隊(duì)列的方式實(shí)現(xiàn)了進(jìn)程間通信的方法,并且了解了隊(duì)列的使用方法。一個(gè)隊(duì)列中,有一端(這里我們演示的是 send端)通過 put方法實(shí)現(xiàn)添加相關(guān)的信息,另一端使用 get 方法獲取相關(guān)的信息;兩個(gè)進(jìn)程相互配合達(dá)到一個(gè)進(jìn)程通信的效果。
其實(shí)進(jìn)程之間的通信不僅僅只有隊(duì)列這一種方式,感興趣的話還可以通過 管道、信號(hào)量、共享內(nèi)存的方式來實(shí)現(xiàn)??梢宰孕型卣挂幌隆?/p>
python提供了多種進(jìn)程通信的方式,包括信號(hào),管道,消息隊(duì)列,信號(hào)量,共享內(nèi)存,socket等
主要Queue和Pipe這兩種方式,Queue用于多個(gè)進(jìn)程間實(shí)現(xiàn)通信,Pipe是兩個(gè)進(jìn)程的通信。
1.管道:分為匿名管道和命名管道
匿名管道:在內(nèi)核中申請(qǐng)一塊固定大小的緩沖區(qū),程序擁有寫入和讀取的權(quán)利,一般使用fock函數(shù)實(shí)現(xiàn)父子進(jìn)程的通信
命名管道:在內(nèi)存中申請(qǐng)一塊固定大小的緩沖區(qū),程序擁有寫入和讀取的權(quán)利,沒有血緣關(guān)系的進(jìn)程也可以進(jìn)程間通信
特點(diǎn):面向字節(jié)流;生命周期隨內(nèi)核;自帶同步互斥機(jī)制;半雙工,單向通信,兩個(gè)管道實(shí)現(xiàn)雙向通信
2.消息隊(duì)列:在內(nèi)核中創(chuàng)建一個(gè)隊(duì)列,隊(duì)列中每個(gè)元素是一個(gè)數(shù)據(jù)報(bào),不同的進(jìn)程可以通過句柄去訪問這個(gè)隊(duì)列。消息隊(duì)列提供了一個(gè)從一個(gè)進(jìn)程向另外一個(gè)進(jìn)程發(fā)送一塊數(shù)據(jù)的方法。每個(gè)數(shù)據(jù)塊都被認(rèn)為是有一個(gè)類型,接收者進(jìn)程接收的數(shù)據(jù)塊可以有不同的類型。消息隊(duì)列也有管道一樣的不足,就是每個(gè)消息的最大長度是有上限的,每個(gè)消息隊(duì)列的總的字節(jié)數(shù)是有上限的,系統(tǒng)上消息隊(duì)列的總數(shù)也有一個(gè)上限
特點(diǎn):消息隊(duì)列可以被認(rèn)為是一個(gè)全局的一個(gè)鏈表,鏈表節(jié)點(diǎn)中存放著數(shù)據(jù)報(bào)的類型和內(nèi)容,有消息隊(duì)列的標(biāo)識(shí)符進(jìn)行標(biāo)記;消息隊(duì)列允許一個(gè)或多個(gè)進(jìn)程寫入或讀取消息;消息隊(duì)列的生命周期隨內(nèi)核;消息隊(duì)列可實(shí)現(xiàn)雙向通信
3.信號(hào)量:在內(nèi)核中創(chuàng)建一個(gè)信號(hào)量集合(本質(zhì)上是數(shù)組),數(shù)組的元素(信號(hào)量)都是1,使用P操作進(jìn)行-1,使用V操作+1
P(sv):如果sv的值大于零,就給它減1;如果它的值為零,就掛起該程序的執(zhí)行
V(sv):如果有其他進(jìn)程因等待sv而被掛起,就讓它恢復(fù)運(yùn)行,如果沒有進(jìn)程因等待sv而掛起,就給它加1
PV操作用于同一個(gè)進(jìn)程,實(shí)現(xiàn)互斥;PV操作用于不同進(jìn)程,實(shí)現(xiàn)同步
功能:對(duì)臨界資源進(jìn)行保護(hù)
4.共享內(nèi)存:將同一塊物理內(nèi)存一塊映射到不同的進(jìn)程的虛擬地址空間中,實(shí)現(xiàn)不同進(jìn)程間對(duì)同一資源的共享。共享內(nèi)存可以說是最有用的進(jìn)程間通信方式,也是最快的IPC形式
特點(diǎn):不同從用戶態(tài)到內(nèi)核態(tài)的頻繁切換和拷貝數(shù)據(jù),直接從內(nèi)存中讀取就可以;共享內(nèi)存是臨界資源,所以需要操作時(shí)必須要保證原子性。使用信號(hào)量或者互斥鎖都可以.
關(guān)于“Python進(jìn)程間的通信方式是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí),可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會(huì)為大家更新不同的知識(shí)點(diǎn)。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。