溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

基于python如何實現rpc遠程過程調用

發(fā)布時間:2022-06-13 09:18:35 來源:億速云 閱讀:103 作者:iii 欄目:開發(fā)技術

這篇文章主要介紹“基于python如何實現rpc遠程過程調用”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“基于python如何實現rpc遠程過程調用”文章能幫助大家解決問題。

    一、主要內容

    所謂RPC,是遠程過程調用(Remote Procedure Call)的簡寫,網上解釋很多,簡單來說,就是在當前進程調用其他進程的函數時,體驗就像是調用本地寫的函數一般。
    本文實現的是在本地調用遠端的類class對象的接口,也就是本地的client不實例化類對象,調用的是server端的類對象接口。
    為了達到讓調用層無須關心底層實現,擁有絲滑般的體驗,就需要以下幾個部分:

    • 客戶端需要把類的接口提取出來,并將調用函數事件捕獲存儲起來;服務端需要把類的公有函數作為可遠程調用的接口。

    • 客戶端把調用函數的事件(調用的函數,參數)進行序列化并發(fā)送給服務端;服務端將客戶端的調用事件反序列化,并執(zhí)行相應的接口,將返回值發(fā)送給客戶端。

    • 客戶端與服務端通過某種方式(一般就是網絡socket)進行通信。

    在下面時序圖的灰色部分,對于調用方來說是透明的,它的執(zhí)行結果應該和執(zhí)行本地的函數時一致的。

    基于python如何實現rpc遠程過程調用

    二、實現步驟

    1. 進程間的通信

    本文采用了基于TCP的sokcet連接來進行進程之間的通信,更多實現細節(jié)可參考之前博客。
    在此需要注意:

    本文采用了select模塊來監(jiān)聽網絡事件,如果服務端未收到任何的網絡消息會一直阻塞在這兒。如果服務端除了提供rpc調用服務之外還需要執(zhí)行其他邏輯,那么應當采用非阻塞,輪詢socket的方式來判斷是否有新的網絡事件。

    # ServerBase.py
    def process(self):
        readable, writable, exceptional = select.select(self.inputs, self.outputs, self.conns.values())
        for conn in readable:
            if conn is self.socket:
                self._handle_conn()
            else:
                self._handle_recv(conn)
        for conn in writable:
            pass
        for conn in exceptional:
            self._handle_leave(conn)

    客戶端的網絡事件本文通過創(chuàng)建新的線程來監(jiān)聽的。并不會影響客戶端主線程的執(zhí)行,因此可以盡情的阻塞。部分代碼如下:

    # AsynCallback.py
    class AsyncTaskManager(object):
        _asy_events = dict()
    
        def __init__(self, loop, *args):
            super(AsyncTaskManager, self).__init__()
            self._loop_fun = loop
    
        def __call__(self, *args, **kwargs):
            proc = threading.Thread(target=self._exec_loop, args=args, kwargs=kwargs)
            proc.start()
    
        def _exec_loop(self, *args, **kwargs):
            while True:
                net_resp = self._loop_fun(*args, **kwargs)
                for resp in net_resp:
                    asy_event = self._asy_events.pop(resp.rid)
                    asy_event.set()
    # Client.py
    class Client(TaskHandle, ClientBase):
    
        @AsyncTaskManager
        def process(self):
            super(Client, self).process()
            _events = []
            while self.has_events:
                event = self.get_next_event()
                data = event[1]
                _events.append(self.unpack_respond(data))
            return _events

    序列化方式,本文采用了庫pickle進行序列化與反序列化,使用它的原因是可以將自定義類對象也進行序列化,非常之高級。

    2. 異步回調實現思路

    對于需要返回值的函數調用,處理起來比較簡單,只需要將主線程阻塞等待,直至超時或者接收到了對應函數的返回值即可。本文采用了threading.Event來阻塞與喚醒調用的函數,同時采用了裝飾器來實現這功能。若日后有更好的方法,可以輕易進行替換。相關示例代碼如下所示:

    @AsyncTaskManager.respond
    def _handle_response(self, tid):
        """ 處理有返回值的情況
        會阻塞線程直至收到返回值
        """
        task = self.pop_task(tid)
        if task.callback:
            task.callback()
        return self.pop_respond(tid)
    
    @staticmethod
    def respond(func):
        @wraps(func)
        def make_resp(handle, tid):
            """ 需要注意的是,和裝飾的函數參數含義需一致 """
            event = threading.Event()
            AsyncTaskManager._asy_events[tid] = event
            event.wait(timeout=TIME_OUT)
            return func(handle, tid)    # 這兒才是真正執(zhí)行_handle_response的地方
        return make_resp

    在實際的應用過程中,應有這樣的情況,服務端與客戶端都是獨立的應用,通過rpc函數進行通信和交互,而并不是某方為另外一方提供服務,那么此時返回值并不必要,只需要將要做的事通知另一方即可。對于此種情況,可以采用異步回調的方式來告知調用方對應函數執(zhí)行成功了。

    在文中依舊采用線程來完成該功能,客戶端調用函數之后創(chuàng)建一個新線程并阻塞住,等待服務端將執(zhí)行結果發(fā)回后再喚醒,如果有回調函數就執(zhí)行。示例代碼如下:

    @AsyncTaskManager.callback
    def _handle_call_back(self, tid):
        """ 處理有回調函數的調用
        callback會等tid事件調用成功之后 才會回調,且不會有返回值
        """
        task = self.pop_task(tid)
        if task.callback:
            task.callback()
            
    @staticmethod
    def callback(func):
        @wraps(func)
        def make_thread(event, *args, **kwargs):
            event.wait(timeout=TIME_OUT)
            func(*args, **kwargs)
    
        def make_async(handle, tid):
            """ 注意點同上 """
            event = threading.Event()
            AsyncTaskManager._asy_events[tid] = event
            _task = threading.Thread(target=lambda: make_thread(event, handle, tid))
    
        return make_async

    關于“基于python如何實現rpc遠程過程調用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識,可以關注億速云行業(yè)資訊頻道,小編每天都會為大家更新不同的知識點。

    向AI問一下細節(jié)

    免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI