您好,登錄后才能下訂單哦!
這篇文章主要介紹“基于python如何實現rpc遠程過程調用”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“基于python如何實現rpc遠程過程調用”文章能幫助大家解決問題。
所謂RPC,是遠程過程調用(Remote Procedure Call)的簡寫,網上解釋很多,簡單來說,就是在當前進程調用其他進程的函數時,體驗就像是調用本地寫的函數一般。
本文實現的是在本地調用遠端的類class對象的接口,也就是本地的client不實例化類對象,調用的是server端的類對象接口。
為了達到讓調用層無須關心底層實現,擁有絲滑般的體驗,就需要以下幾個部分:
客戶端需要把類的接口提取出來,并將調用函數事件捕獲存儲起來;服務端需要把類的公有函數作為可遠程調用的接口。
客戶端把調用函數的事件(調用的函數,參數)進行序列化并發(fā)送給服務端;服務端將客戶端的調用事件反序列化,并執(zhí)行相應的接口,將返回值發(fā)送給客戶端。
客戶端與服務端通過某種方式(一般就是網絡socket)進行通信。
在下面時序圖的灰色部分,對于調用方來說是透明的,它的執(zhí)行結果應該和執(zhí)行本地的函數時一致的。
本文采用了基于TCP的sokcet連接來進行進程之間的通信,更多實現細節(jié)可參考之前博客。
在此需要注意:
本文采用了select模塊來監(jiān)聽網絡事件,如果服務端未收到任何的網絡消息會一直阻塞在這兒。如果服務端除了提供rpc調用服務之外還需要執(zhí)行其他邏輯,那么應當采用非阻塞,輪詢socket的方式來判斷是否有新的網絡事件。
# ServerBase.py def process(self): readable, writable, exceptional = select.select(self.inputs, self.outputs, self.conns.values()) for conn in readable: if conn is self.socket: self._handle_conn() else: self._handle_recv(conn) for conn in writable: pass for conn in exceptional: self._handle_leave(conn)
客戶端的網絡事件本文通過創(chuàng)建新的線程來監(jiān)聽的。并不會影響客戶端主線程的執(zhí)行,因此可以盡情的阻塞。部分代碼如下:
# AsynCallback.py class AsyncTaskManager(object): _asy_events = dict() def __init__(self, loop, *args): super(AsyncTaskManager, self).__init__() self._loop_fun = loop def __call__(self, *args, **kwargs): proc = threading.Thread(target=self._exec_loop, args=args, kwargs=kwargs) proc.start() def _exec_loop(self, *args, **kwargs): while True: net_resp = self._loop_fun(*args, **kwargs) for resp in net_resp: asy_event = self._asy_events.pop(resp.rid) asy_event.set()
# Client.py class Client(TaskHandle, ClientBase): @AsyncTaskManager def process(self): super(Client, self).process() _events = [] while self.has_events: event = self.get_next_event() data = event[1] _events.append(self.unpack_respond(data)) return _events
序列化方式,本文采用了庫pickle進行序列化與反序列化,使用它的原因是可以將自定義類對象也進行序列化,非常之高級。
對于需要返回值的函數調用,處理起來比較簡單,只需要將主線程阻塞等待,直至超時或者接收到了對應函數的返回值即可。本文采用了threading.Event來阻塞與喚醒調用的函數,同時采用了裝飾器來實現這功能。若日后有更好的方法,可以輕易進行替換。相關示例代碼如下所示:
@AsyncTaskManager.respond def _handle_response(self, tid): """ 處理有返回值的情況 會阻塞線程直至收到返回值 """ task = self.pop_task(tid) if task.callback: task.callback() return self.pop_respond(tid) @staticmethod def respond(func): @wraps(func) def make_resp(handle, tid): """ 需要注意的是,和裝飾的函數參數含義需一致 """ event = threading.Event() AsyncTaskManager._asy_events[tid] = event event.wait(timeout=TIME_OUT) return func(handle, tid) # 這兒才是真正執(zhí)行_handle_response的地方 return make_resp
在實際的應用過程中,應有這樣的情況,服務端與客戶端都是獨立的應用,通過rpc函數進行通信和交互,而并不是某方為另外一方提供服務,那么此時返回值并不必要,只需要將要做的事通知另一方即可。對于此種情況,可以采用異步回調的方式來告知調用方對應函數執(zhí)行成功了。
在文中依舊采用線程來完成該功能,客戶端調用函數之后創(chuàng)建一個新線程并阻塞住,等待服務端將執(zhí)行結果發(fā)回后再喚醒,如果有回調函數就執(zhí)行。示例代碼如下:
@AsyncTaskManager.callback def _handle_call_back(self, tid): """ 處理有回調函數的調用 callback會等tid事件調用成功之后 才會回調,且不會有返回值 """ task = self.pop_task(tid) if task.callback: task.callback() @staticmethod def callback(func): @wraps(func) def make_thread(event, *args, **kwargs): event.wait(timeout=TIME_OUT) func(*args, **kwargs) def make_async(handle, tid): """ 注意點同上 """ event = threading.Event() AsyncTaskManager._asy_events[tid] = event _task = threading.Thread(target=lambda: make_thread(event, handle, tid)) return make_async
關于“基于python如何實現rpc遠程過程調用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識,可以關注億速云行業(yè)資訊頻道,小編每天都會為大家更新不同的知識點。
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。