溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Python進(jìn)程間的通信方式是什么

發(fā)布時(shí)間:2022-04-11 10:54:15 來源:億速云 閱讀:377 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹“Python進(jìn)程間的通信方式是什么”的相關(guān)知識(shí),小編通過實(shí)際案例向大家展示操作過程,操作方法簡單快捷,實(shí)用性強(qiáng),希望這篇“Python進(jìn)程間的通信方式是什么”文章能幫助大家解決問題。

什么是進(jìn)程的通信

這里舉一個(gè)例子接介紹通信的機(jī)制:通信 一詞大家并不陌生,比如一個(gè)人要給他的女友打電話。當(dāng)建立了通話之后,在這個(gè)通話的過程中就是建立了一條隱形的 隊(duì)列 (記住這個(gè)詞)。此時(shí)這個(gè)人就會(huì)通過對(duì)話的方式不停的將信息告訴女友,而這個(gè)人的女友也是在傾聽著。(嗯…我個(gè)人覺得大部分情況下可能是反著來的)。

這里可以將他們兩個(gè)人比作是兩個(gè)進(jìn)程,"這個(gè)人"的進(jìn)程需要將信息發(fā)送給"女友"的進(jìn)程,就需要一個(gè)隊(duì)列的幫助。而女友需要不停的接收隊(duì)列的信息,可以做一些其他的事情,所以兩個(gè)進(jìn)程之間的通信主要依賴于隊(duì)列。

這個(gè)隊(duì)列可以支持發(fā)送消息與接收消息,“這個(gè)人"負(fù)責(zé)發(fā)送消息,反之"女友” 負(fù)責(zé)的是接收消息。

既然隊(duì)列才是重點(diǎn),那么來看一下隊(duì)列要如何創(chuàng)建。

隊(duì)列的創(chuàng)建 - multiprocessing

依然使用 multiprocessing 模塊,調(diào)用該模塊的 Queue 函數(shù)來實(shí)現(xiàn)隊(duì)列的創(chuàng)建。

函數(shù)名介紹參數(shù)返回值
Queue隊(duì)列的創(chuàng)建mac_count隊(duì)列對(duì)象

Queue 函數(shù)功能介紹:調(diào)用 Queue 可以創(chuàng)建隊(duì)列;它有一個(gè)參數(shù) mac_count 代表隊(duì)列最大可以創(chuàng)建多少信息,如果不傳默認(rèn)是無限長度。實(shí)例化一個(gè)隊(duì)列對(duì)象之后,需要操作這個(gè)隊(duì)列的對(duì)象進(jìn)行放入與取出數(shù)據(jù)。

進(jìn)程之間通信的方法

函數(shù)名介紹參數(shù)返回值
put將消息放入隊(duì)列message
get獲取隊(duì)列消息str

put 函數(shù)功能介紹:將數(shù)據(jù)傳入。它有一個(gè)參數(shù) message ,是一個(gè)字符串類型。

get 函數(shù)功能介紹:用來接收隊(duì)列中的數(shù)據(jù)。(其實(shí)這里就是一個(gè)常用的json場景,有很多的數(shù)據(jù)傳輸都是 字符串 的,隊(duì)列的插入與獲取就是使用的字符串,所以 json 就非常適用這個(gè)場景。)

接下來就來練習(xí)一下 隊(duì)列的使用 。

進(jìn)程間的通信 - 隊(duì)列演示案例

代碼示例如下:

# coding:utf-8


import json
import multiprocessing


class Work(object):     # 定義一個(gè) Work 類
    def __init__(self, queue):      # 構(gòu)造函數(shù)傳入一個(gè) '隊(duì)列對(duì)象' --> queue
            self.queue = queue

    def send(self, message):        # 定義一個(gè) send(發(fā)送) 函數(shù),傳入 message
                                    # [這里有個(gè)隱藏的bug,就是只判斷了傳入的是否字符串類型;如果傳入的是函數(shù)、類、集合等依然會(huì)報(bào)錯(cuò)]
        if not isinstance(message, str):    # 判斷傳入的 message 是否為字符串,若不是,則進(jìn)行 json 序列化
            message = json.dumps(message)
        self.queue.put(message)     # 利用 queue 的隊(duì)列實(shí)例化對(duì)象將 message 發(fā)送出去

    def receive(self):      # 定義一個(gè) receive(接收) 函數(shù),不需傳入?yún)?shù),但是因?yàn)榻邮帐且粋€(gè)源源不斷的過程,所以需要使用 while 循環(huán)
        while 1:
            result = self.queue.get()   # 獲取 '隊(duì)列對(duì)象' --> queue 傳入的message
                                        # 由于我們接收的 message 可能不是一個(gè)字符串,所以要進(jìn)程異常的捕獲
            try:                        # 如果傳入的 message 符合 JSON 格式將賦值給 res ;若不符合,則直接使用 result 賦值 res
                res = json.loads(result)
            except:
                res = result
            print('接收到的信息為:{}'.format(res))


if __name__ == '__main__':
    queue = multiprocessing.Queue()
    work = Work(queue)
    send = multiprocessing.Process(target=work.send, args=({'message': '這是一條測試的消息'},))
    receive = multiprocessing.Process(target=work.receive)

    send.start()
    receive.start()

使用隊(duì)列建立進(jìn)程間通信遇到的異常

但是這里會(huì)出現(xiàn)一個(gè) 報(bào)錯(cuò),如下圖:

報(bào)錯(cuò)截圖示例如下:

Python進(jìn)程間的通信方式是什么

這里的報(bào)錯(cuò)提示是 文件沒有被發(fā)現(xiàn)的意思 。其實(shí)這里是我們使用 隊(duì)列做 put() 和 get()的時(shí)候 有一把無形的鎖加了上去,就是上圖中圈中的 .SemLock 。我們不需要去關(guān)心造成這個(gè)錯(cuò)誤的具體原因,要解決這個(gè)問題其實(shí)也很簡單。

FileNotFoundError: [Errno 2] No such file or directory 異常的解決

我們只需要給 send 或者 receive 其中一個(gè)子進(jìn)程添加 join 阻塞進(jìn)程即可,理論上如此。但是我們的 receive子進(jìn)程是一個(gè) while循環(huán),它會(huì)一直執(zhí)行,所以只需要給 send 子進(jìn)程加上一個(gè) join 即可。

解決示意圖如下:

Python進(jìn)程間的通信方式是什么

PS:雖然解決了報(bào)錯(cuò)問題,但是程序沒有正常退出。

實(shí)際上由于我們的 receive 進(jìn)程是個(gè) while循環(huán),并不知道要處理到什么時(shí)候,沒有辦法立刻終止。所以我們需要在 receive 進(jìn)程 使用 terminate() 函數(shù)終結(jié)接收端。

運(yùn)行結(jié)果如下:

Python進(jìn)程間的通信方式是什么

批量給 send 函數(shù)加入數(shù)據(jù)

新建一個(gè)函數(shù),寫入 for循環(huán) 模擬批量添加要發(fā)送的消息

然后再給這個(gè)模擬批量發(fā)送數(shù)據(jù)的函數(shù)添加一個(gè)線程。

示例代碼如下:

# coding:utf-8


import json
import time
import multiprocessing


class Work(object):     # 定義一個(gè) Work 類
    def __init__(self, queue):      # 構(gòu)造函數(shù)傳入一個(gè) '隊(duì)列對(duì)象' --> queue
            self.queue = queue

    def send(self, message):        # 定義一個(gè) send(發(fā)送) 函數(shù),傳入 message
                                    # [這里有個(gè)隱藏的bug,就是只判斷了傳入的是否字符串類型;如果傳入的是函數(shù)、類、集合等依然會(huì)報(bào)錯(cuò)]
        if not isinstance(message, str):    # 判斷傳入的 message 是否為字符串,若不是,則進(jìn)行 json 序列化
            message = json.dumps(message)
        self.queue.put(message)     # 利用 queue 的隊(duì)列實(shí)例化對(duì)象將 message 發(fā)送出去


    def send_all(self):             # 定義一個(gè) send_all(發(fā)送)函數(shù),然后通過for循環(huán)模擬批量發(fā)送的 message
        for i in range(20):
            self.queue.put('第 {} 次循環(huán),發(fā)送的消息為:{}'.format(i, i))
            time.sleep(1)



    def receive(self):      # 定義一個(gè) receive(接收) 函數(shù),不需傳入?yún)?shù),但是因?yàn)榻邮帐且粋€(gè)源源不斷的過程,所以需要使用 while 循環(huán)
        while 1:
            result = self.queue.get()   # 獲取 '隊(duì)列對(duì)象' --> queue 傳入的message
                                        # 由于我們接收的 message 可能不是一個(gè)字符串,所以要進(jìn)程異常的捕獲
            try:                        # 如果傳入的 message 符合 JSON 格式將賦值給 res ;若不符合,則直接使用 result 賦值 res
                res = json.loads(result)
            except:
                res = result
            print('接收到的信息為:{}'.format(res))


if __name__ == '__main__':
    queue = multiprocessing.Queue()
    work = Work(queue)
    send = multiprocessing.Process(target=work.send, args=({'message': '這是一條測試的消息'},))
    receive = multiprocessing.Process(target=work.receive)
    send_all = multiprocessing.Process(target=work.send_all,)


    send_all.start()    # 這里因?yàn)?nbsp;send 只執(zhí)行了1次,然后就結(jié)束了。而 send_all 卻要循環(huán)20次,它的執(zhí)行時(shí)間是最長的,信息也是發(fā)送的最多的
    send.start()
    receive.start()

    # send.join()       # 使用 send 的阻塞會(huì)造成 send_all 循環(huán)還未結(jié)束 ,receive.terminate() 函數(shù)接收端就會(huì)終結(jié)。
    send_all.join()     # 所以我們只需要阻塞最長使用率的進(jìn)程就可以了
    receive.terminate()

運(yùn)行結(jié)果如下:

Python進(jìn)程間的通信方式是什么

從上圖中我們可以看到 send 與 send_all 兩個(gè)進(jìn)程都可以通過 queue這個(gè)實(shí)例化的 Queue 對(duì)象發(fā)送消息,同樣的 receive接收函數(shù)也會(huì)將兩個(gè)進(jìn)程傳入的 message 打印輸出出來。

小節(jié)

該章節(jié)我們通過隊(duì)列的方式實(shí)現(xiàn)了進(jìn)程間通信的方法,并且了解了隊(duì)列的使用方法。一個(gè)隊(duì)列中,有一端(這里我們演示的是 send端)通過 put方法實(shí)現(xiàn)添加相關(guān)的信息,另一端使用 get 方法獲取相關(guān)的信息;兩個(gè)進(jìn)程相互配合達(dá)到一個(gè)進(jìn)程通信的效果。

其實(shí)進(jìn)程之間的通信不僅僅只有隊(duì)列這一種方式,感興趣的話還可以通過 管道、信號(hào)量、共享內(nèi)存的方式來實(shí)現(xiàn)??梢宰孕型卣挂幌隆?/p>

進(jìn)程間通信的其他方式 - 補(bǔ)充

python提供了多種進(jìn)程通信的方式,包括信號(hào),管道,消息隊(duì)列,信號(hào)量,共享內(nèi)存,socket等

主要Queue和Pipe這兩種方式,Queue用于多個(gè)進(jìn)程間實(shí)現(xiàn)通信,Pipe是兩個(gè)進(jìn)程的通信。

1.管道:分為匿名管道和命名管道

匿名管道:在內(nèi)核中申請(qǐng)一塊固定大小的緩沖區(qū),程序擁有寫入和讀取的權(quán)利,一般使用fock函數(shù)實(shí)現(xiàn)父子進(jìn)程的通信

命名管道:在內(nèi)存中申請(qǐng)一塊固定大小的緩沖區(qū),程序擁有寫入和讀取的權(quán)利,沒有血緣關(guān)系的進(jìn)程也可以進(jìn)程間通信

特點(diǎn):面向字節(jié)流;生命周期隨內(nèi)核;自帶同步互斥機(jī)制;半雙工,單向通信,兩個(gè)管道實(shí)現(xiàn)雙向通信

2.消息隊(duì)列:在內(nèi)核中創(chuàng)建一個(gè)隊(duì)列,隊(duì)列中每個(gè)元素是一個(gè)數(shù)據(jù)報(bào),不同的進(jìn)程可以通過句柄去訪問這個(gè)隊(duì)列。消息隊(duì)列提供了一個(gè)從一個(gè)進(jìn)程向另外一個(gè)進(jìn)程發(fā)送一塊數(shù)據(jù)的方法。每個(gè)數(shù)據(jù)塊都被認(rèn)為是有一個(gè)類型,接收者進(jìn)程接收的數(shù)據(jù)塊可以有不同的類型。消息隊(duì)列也有管道一樣的不足,就是每個(gè)消息的最大長度是有上限的,每個(gè)消息隊(duì)列的總的字節(jié)數(shù)是有上限的,系統(tǒng)上消息隊(duì)列的總數(shù)也有一個(gè)上限

特點(diǎn):消息隊(duì)列可以被認(rèn)為是一個(gè)全局的一個(gè)鏈表,鏈表節(jié)點(diǎn)中存放著數(shù)據(jù)報(bào)的類型和內(nèi)容,有消息隊(duì)列的標(biāo)識(shí)符進(jìn)行標(biāo)記;消息隊(duì)列允許一個(gè)或多個(gè)進(jìn)程寫入或讀取消息;消息隊(duì)列的生命周期隨內(nèi)核;消息隊(duì)列可實(shí)現(xiàn)雙向通信

3.信號(hào)量:在內(nèi)核中創(chuàng)建一個(gè)信號(hào)量集合(本質(zhì)上是數(shù)組),數(shù)組的元素(信號(hào)量)都是1,使用P操作進(jìn)行-1,使用V操作+1

P(sv):如果sv的值大于零,就給它減1;如果它的值為零,就掛起該程序的執(zhí)行

V(sv):如果有其他進(jìn)程因等待sv而被掛起,就讓它恢復(fù)運(yùn)行,如果沒有進(jìn)程因等待sv而掛起,就給它加1

PV操作用于同一個(gè)進(jìn)程,實(shí)現(xiàn)互斥;PV操作用于不同進(jìn)程,實(shí)現(xiàn)同步

功能:對(duì)臨界資源進(jìn)行保護(hù)

4.共享內(nèi)存:將同一塊物理內(nèi)存一塊映射到不同的進(jìn)程的虛擬地址空間中,實(shí)現(xiàn)不同進(jìn)程間對(duì)同一資源的共享。共享內(nèi)存可以說是最有用的進(jìn)程間通信方式,也是最快的IPC形式

特點(diǎn):不同從用戶態(tài)到內(nèi)核態(tài)的頻繁切換和拷貝數(shù)據(jù),直接從內(nèi)存中讀取就可以;共享內(nèi)存是臨界資源,所以需要操作時(shí)必須要保證原子性。使用信號(hào)量或者互斥鎖都可以.

關(guān)于“Python進(jìn)程間的通信方式是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí),可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會(huì)為大家更新不同的知識(shí)點(diǎn)。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI