您好,登錄后才能下訂單哦!
同樣在services.py 中自定義需要實(shí)現(xiàn)消息協(xié)議、傳輸控制,并且實(shí)現(xiàn)客戶(hù)端存根clientStub和服務(wù)器端存根serverStub,服務(wù)器定義以及channel的定義。
此時(shí),添加DistributedChannel分布式的channel,并在ThreadServer中添加了registry方法。
```
import threading import random import struct import time from io import BytesIO import socket import json from kazoo.client import KazooClient class InvalidOperation(BaseException): def __init__(self, message = None): self.message = message or 'involid operation' class MethodProtocol(object): '''' 解讀方法名 ''' def __init__(self, connection): self.conn = connection def _read_all(self, size): """ 幫助我們讀取二進(jìn)制數(shù)據(jù) :param size: 想要讀取的二進(jìn)制數(shù)據(jù)大小 :return: 二進(jìn)制數(shù)據(jù)bytes """ # self.conn if isinstance(self.conn, BytesIO): buff = self.conn.read(size) return buff else: # 有時(shí)候長(zhǎng)度大于每次讀取的長(zhǎng)度 have = 0 buff = b'' while have < size: chunk = self.conn.recv(size - have) buff += chunk l = len(chunk) have += l if l == 0: # 表示客戶(hù)端已經(jīng)關(guān)閉了 raise EOFError return buff def get_method_name(self): # 讀取字符串長(zhǎng)度 buff = self._read_all(4) length = struct.unpack('!I',buff)[0] # 讀取字符串 buff = self._read_all(length) name = buff.decode() return name class DivideProtocol(object): """ divide過(guò)程消息協(xié)議轉(zhuǎn)換工具 """ def args_encode(self, num1, num2=1): """ 將原始調(diào)用的請(qǐng)求參數(shù)轉(zhuǎn)換打包成二進(jìn)制消息數(shù)據(jù) :param num1: int :param num2: int :return: bytes 二進(jìn)制消息數(shù)據(jù) """ name = 'divide' # 處理函數(shù)名 buff = struct.pack('!I', 6) # 無(wú)符號(hào)int buff += name.encode() # 處理參數(shù)1 buff2 = struct.pack('!B', 1) # 無(wú)符號(hào)byte buff2 += struct.pack('!i', num1) # 處理參數(shù)2 if num2 != 1: # 沒(méi)有傳參的時(shí)候 buff2 += struct.pack('!B', 2) buff2 += struct.pack('!i', num2) # 處理參數(shù)邊界和組合成完整數(shù)據(jù) buff += struct.pack('!I',len(buff2)) buff += buff2 return buff def _read_all(self, size): """ 幫助我們讀取二進(jìn)制數(shù)據(jù) :param size: 想要讀取的二進(jìn)制數(shù)據(jù)大小 :return: 二進(jìn)制數(shù)據(jù)bytes """ # self.conn if isinstance(self.conn, BytesIO): buff = self.conn.read(size) return buff else: # 有時(shí)候長(zhǎng)度大于每次讀取的長(zhǎng)度 have = 0 buff = b'' while have < size: chunk = self.conn.recv(size - have) buff += chunk l = len(chunk) have += l if l == 0: # 表示客戶(hù)端已經(jīng)關(guān)閉了 raise EOFError return buff def args_decode(self, connection): """ 接受調(diào)用請(qǐng)求數(shù)據(jù)病進(jìn)行解析 :param connection: 鏈接請(qǐng)求數(shù)據(jù) socket BytesIO :return: 因?yàn)橛卸鄠€(gè)參數(shù),定義為字典 """ param_len_map = { 1:4, 2:4, } param_fmt_map = { 1:'!i', 2:'!i', } param_name_map = { 1: 'num1', 2: 'num2', } # 保存用來(lái)返回的參數(shù)字典 args = {} self.conn = connection # 處理方法的名字,已經(jīng)提前被處理,稍后處理 # 處理消息邊界 # 1) 讀取二進(jìn)制數(shù)據(jù)----read , ------ByteIO.read # 2) 將二進(jìn)制數(shù)據(jù)轉(zhuǎn)換為python的數(shù)據(jù)類(lèi)型 buff = self._read_all(4) length = struct.unpack('!I',buff)[0] # 記錄已經(jīng)讀取的長(zhǎng)度值 have = 0 # 處理第一個(gè)參數(shù) # 解析參數(shù)序號(hào) buff = self._read_all(1) have += 1 param_seq = struct.unpack('!B', buff)[0] # 解析參數(shù)值 param_len = param_len_map[param_seq] buff = self._read_all(param_len) have += param_len param_fmt = param_fmt_map[param_seq] param = struct.unpack(param_fmt,buff)[0] # 設(shè)置解析后的字典 param_name = param_name_map[param_seq] args[param_name] = param if have >= length: return args # 處理第二個(gè)參數(shù) # 解析參數(shù)序號(hào) buff = self._read_all(1) param_seq = struct.unpack('!B', buff)[0] # 解析參數(shù)值 param_len = param_len_map[param_seq] buff = self._read_all(param_len) param_fmt = param_fmt_map[param_seq] param = struct.unpack(param_fmt, buff)[0] # 設(shè)置解析后的字典 param_name = param_name_map[param_seq] args[param_name] = param return args def result_encode(self, result): """ 將原始結(jié)果數(shù)據(jù)轉(zhuǎn)換為消息協(xié)議二進(jìn)制數(shù)據(jù) :param result: :return: """ if isinstance(result,float): # 處理返回值類(lèi)型 buff = struct.pack('!B', 1) buff += struct.pack('!f', result) return buff else: buff = struct.pack('!B', 2) # 處理返回值 length = len(result.message) # 處理字符串長(zhǎng)度 buff += struct.pack('!I', length) buff += result.message.encode() return buff def result_decode(self, connection): """ 將返回值消息數(shù)據(jù)轉(zhuǎn)換為原始返回值 :param connection: socket BytesIo :return: float InvalidOperation對(duì)象 """ self.conn = connection # 處理返回值類(lèi)型 buff = self._read_all(1) result_type = struct.unpack('!B', buff)[0] if result_type == 1: #正常情況 buff = self._read_all(4) val = struct.unpack('!f', buff)[0] return val else: buff = self._read_all(4) length = struct.unpack('!I', buff)[0] # 讀取字符串 buff = self._read_all(length) message = buff.decode() return InvalidOperation(message) class Channel(object): """ 用于客戶(hù)端建立網(wǎng)絡(luò)鏈接 """ def __init__(self, host, port): self.host = host self.port = port def get_connection(self): """ 獲取鏈接對(duì)象 :return: 與服務(wù)器通訊的socket """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((self.host, self.port)) return sock class DistributedChannel(object): """ 支持分布式的zookeeper的RPC客戶(hù)端鏈接工具 """ def __init__(self): # 創(chuàng)建kazoo對(duì)象,用來(lái)跟zookeeper鏈接,獲取信息 zk = KazooClient('127.0.0.1:2181') zk.start() self.zk = zk self._servers = [] self._get_servers() # 第一次,手動(dòng)開(kāi)啟 def _get_servers(self, event=None): """ 從zookeeper中獲取所有可用的RPC服務(wù)器的地址 :return: """ self._servers = [] # 每次重新獲取地址信息 # 從zookeeper中獲取/rpc節(jié)點(diǎn)下的所有可用的rpc服務(wù)器節(jié)點(diǎn) servers = self.zk.get_children('/rpc', watch=self._get_servers) # 監(jiān)視的回調(diào)函數(shù)為自身 for server in servers: addr_data = self.zk.get('/rpc/' + server)[0] addr = json.loads(addr_data.decode()) self._servers.append(addr) def _get_server(self): """ 從可用的服務(wù)器列表中選出一臺(tái)服務(wù)器 :return: {"host":xxx,"port":xxx} """ return random.choice(self._servers) def get_connection(self): """ 提供一個(gè)具體的與RPC服務(wù)器的鏈接socket :return: """ while True: addr = self._get_server() print(addr) try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((addr['host'], addr['port'])) except ConnectionRefusedError: time.sleep(1) continue else: return sock class ThreadServer(object): """ 多線(xiàn)成RPC服務(wù)器 """ def __init__(self, host, port, handlers): sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) # 地址復(fù)用 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.host = host self.port = port # 綁定地址 sock.bind((self.host, self.port)) # 因?yàn)樵趩?dòng)的方法中才開(kāi)啟監(jiān)聽(tīng),所以不在此處開(kāi)啟 # sock.listen(128) self.sock = sock self.handlers = handlers def serve(self): """ 開(kāi)啟服務(wù)器運(yùn)行,提供RPC服務(wù) :return: """ # 開(kāi)啟服務(wù)器的監(jiān)聽(tīng),等待客戶(hù)端的鏈接請(qǐng)求 self.sock.listen(128) print("服務(wù)器開(kāi)啟監(jiān)聽(tīng),ip地址為%s,port為%d..." % (self.host,self.port)) # 注冊(cè)到zookeeper self.register_zookeeper() while True: # 不斷的接收客戶(hù)端的鏈接請(qǐng)求 client_sock, client_addr = self.sock.accept() print("與客戶(hù)端%s建立連接" % str(client_addr)) t = threading.Thread(target= self.handle, args=(client_sock,)) t.start() def register_zookeeper(self): """ 在zookeeper中心注冊(cè)本服務(wù)器的地址信息 :return: """ # 創(chuàng)建kazoo的客戶(hù)端 zk = KazooClient('127.0.0.1:2181') # 建立與zookeeper的鏈接 zk.start() # 在zookeeper中創(chuàng)建節(jié)點(diǎn)保存數(shù)據(jù) zk.ensure_path('/rpc') data = json.dumps({'host':self.host,'port':self.port}) zk.create('/rpc/server', data.encode(), ephemeral=True, sequence=True) # 子線(xiàn)程函數(shù) def handle(self,client_sock): """ 子線(xiàn)程調(diào)用的方法,用來(lái)處理一個(gè)客戶(hù)段的請(qǐng)求 :return: """ # 交個(gè)ServerStub,完成客戶(hù)端的具體的RPC的調(diào)用請(qǐng)求 stub = ServerStub(client_sock, self.handlers) try: while True: # 不斷的接收 stub.process() except EOFError: # 表示客戶(hù)端關(guān)閉了連接 print('客戶(hù)端關(guān)閉了連接') client_sock.close() class Server(object): """ RPC服務(wù)器 """ def __init__(self, host, port, handlers): sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) # 地址復(fù)用 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.host = host self.port = port # 綁定地址 sock.bind((self.host, self.port)) # 因?yàn)樵趩?dòng)的方法中才開(kāi)啟監(jiān)聽(tīng),所以不在此處開(kāi)啟 # sock.listen(128) self.sock = sock self.handlers = handlers def serve(self): """ 開(kāi)啟服務(wù)器運(yùn)行,提供RPC服務(wù) :return: """ # 開(kāi)啟服務(wù)器的監(jiān)聽(tīng),等待客戶(hù)端的鏈接請(qǐng)求 self.sock.listen(128) print("服務(wù)器開(kāi)啟監(jiān)聽(tīng),ip地址為%s,port為%d..." % (self.host,self.port)) while True: # 不斷的接收客戶(hù)端的鏈接請(qǐng)求 client_sock, client_addr = self.sock.accept() print("與客戶(hù)端%s建立連接" % str(client_addr)) # 交個(gè)ServerStub,完成客戶(hù)端的具體的RPC的調(diào)用請(qǐng)求 stub = ServerStub(client_sock, self.handlers) try: while True: # 不斷的接收 stub.process() except EOFError: # 表示客戶(hù)端關(guān)閉了連接 print('客戶(hù)端關(guān)閉了連接') client_sock.close() class ClientStub(object): """ 用來(lái)幫助客戶(hù)端完成遠(yuǎn)程過(guò)程調(diào)用 RPC調(diào)用 stub = ClientStub() stub.divide(200, 100) """ def __init__(self, channel): self.channel = channel self.conn = self.channel.get_connection() def divide(self, num1, num2 = 1): # 將調(diào)用的參數(shù)打包成消息協(xié)議的數(shù)據(jù) proto = DivideProtocol() args = proto.args_encode(num1, num2) # 將消息數(shù)據(jù)通過(guò)網(wǎng)絡(luò)發(fā)送給服務(wù)器 self.conn.sendall(args) # 接受服務(wù)器返回的消息數(shù)據(jù),并進(jìn)行解析 result = proto.result_decode(self.conn) # 將結(jié)果之(正常float 或 異常InvalidOperation)返回給客戶(hù)端 if isinstance(result,float): return result else: raise result class ServerStub(object): """ 服務(wù)端存根 幫助服務(wù)端完成遠(yuǎn)端過(guò)程調(diào)用 """ def __init__(self, connection, handlers): """ :param connection: 與客戶(hù)端的鏈接 :param handlers: 真正的本地函數(shù)路由 此處不以map的形式處理,實(shí)現(xiàn)類(lèi)的形式 class Handler: @staticmethod def divide(): pass @staticmethod def add(): pass """ self.conn = connection self.method_proto = MethodProtocol(self.conn) self.process_map = { 'divide': self._process_divide, 'add': self._process_add } self.handlers = handlers def process(self): """ 當(dāng)服務(wù)端接受了客戶(hù)的鏈接,建立好鏈接后,完成遠(yuǎn)端調(diào)用的處理 :return: """ # 接收消息數(shù)據(jù),并解析方法的名字 name = self.method_proto.get_method_name() # 根據(jù)解析獲得的方法名,調(diào)用相應(yīng)的過(guò)程協(xié)議,接收并解析消息數(shù)據(jù) self.process_map[name]() def _process_divide(self): """ 處理除法過(guò)程調(diào)用 :return: """ proto = DivideProtocol() args = proto.args_decode(self.conn) # args = {'num1':xxx, 'num2':xxx} # 除法過(guò)程的本地調(diào)用------------------->>>>>>>>> # 將本地調(diào)用過(guò)程的返回值(包括可能的異常)打包成消息協(xié)議的數(shù)據(jù),通過(guò)網(wǎng)絡(luò)返回給客戶(hù)端 try: val = self.handlers.divide(**args) except InvalidOperation as e: ret_message = proto.result_encode(e) else: ret_message = proto.result_encode(val) self.conn.sendall(ret_message) def _process_add(self): """ 處理加法過(guò)程調(diào)用 此方法暫時(shí)不識(shí)閑 :return: """ pass if __name__ == '__main__': # 目的:消息協(xié)議測(cè)試,模擬網(wǎng)絡(luò)傳輸 # 構(gòu)造消息數(shù)據(jù) proto = DivideProtocol() # 測(cè)試一 # divide(200,100) # message = proto.args_encode(200,100) # 測(cè)試二 message = proto.args_encode(200) conn = BytesIO() conn.write(message) conn.seek(0) # 解析消息數(shù)據(jù) method_proto = MethodProtocal(conn) name = method_proto.get_method_name() print(name) args = proto.args_decode(conn) print(args)
```
接下來(lái),修改server.py文件
server.py
```
from services import InvalidOperation # from services import Server from services import ThreadServer import sys class Handlers: @staticmethod def divide(num1, num2 = 1): if num2 == 0: raise InvalidOperation('ck_god_err') val = num1/num2 return val if __name__ == '__main__': # 開(kāi)啟服務(wù)器 # _server = Server('127.0.0.1', 8000, Handlers) # _server.serve() # 從啟動(dòng)命令中提取服務(wù)器運(yùn)行的ip地址和端口號(hào),啟動(dòng)的多線(xiàn)程服務(wù)器 host = sys.argv[1] port = int(sys.argv[2]) _server = ThreadServer(host, port, Handlers) _server.serve()
```
最后,將 client.py文件也稍作修改。
```
import time from services import ClientStub # from services import Channel from services import DistributedChannel from services import InvalidOperation # 創(chuàng)建與服務(wù)器的連接 # channel = Channel('127.0.0.1', 8000) channel = DistributedChannel() # 進(jìn)行調(diào)用 for i in range(50): try: # 創(chuàng)建用于rpc調(diào)用的工具 stub = ClientStub(channel) # 初始化的時(shí)候才真正的創(chuàng)建連接了,所以放到里面 val = stub.divide(i * 100,100) # val = stub.divide(i * 100) # val = stub.divide( 100, 0) except InvalidOperation as e: print(e.message) else: print(val) time.sleep(1)
```
ctrl + shift + T在pycharm中打開(kāi)多個(gè)Terminal窗口
右鍵運(yùn)行客戶(hù)端,可以看到不斷地隨機(jī)切換服務(wù)器。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。