溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

分布式RPC服務(wù)器(容災(zāi)和服務(wù)器識(shí)別機(jī)制的實(shí)現(xiàn),借助zookeeper)最終完整代碼

發(fā)布時(shí)間:2020-08-11 03:57:09 來(lái)源:網(wǎng)絡(luò) 閱讀:661 作者:ck_god 欄目:編程語(yǔ)言

同樣在services.py 中自定義需要實(shí)現(xiàn)消息協(xié)議、傳輸控制,并且實(shí)現(xiàn)客戶(hù)端存根clientStub和服務(wù)器端存根serverStub,服務(wù)器定義以及channel的定義。

此時(shí),添加DistributedChannel分布式的channel,并在ThreadServer中添加了registry方法。

```

import threading
import random
import struct
import time
from io import BytesIO
import socket
import json
from kazoo.client import KazooClient
class InvalidOperation(BaseException):
    def __init__(self, message = None):
        self.message = message or 'involid operation'
class MethodProtocol(object):
    ''''
    解讀方法名
    '''
    def __init__(self, connection):
        self.conn = connection
    def _read_all(self, size):
        """
        幫助我們讀取二進(jìn)制數(shù)據(jù)
        :param  size: 想要讀取的二進(jìn)制數(shù)據(jù)大小
        :return:  二進(jìn)制數(shù)據(jù)bytes
        """
        # self.conn
        if isinstance(self.conn, BytesIO):
            buff = self.conn.read(size)
            return buff
        else:
            # 有時(shí)候長(zhǎng)度大于每次讀取的長(zhǎng)度
            have = 0
            buff = b''
            while have < size:
                chunk = self.conn.recv(size - have)
                buff += chunk
                l = len(chunk)
                have += l
                if l == 0:
                    # 表示客戶(hù)端已經(jīng)關(guān)閉了
                    raise EOFError
            return buff
    def get_method_name(self):
        # 讀取字符串長(zhǎng)度
        buff = self._read_all(4)
        length = struct.unpack('!I',buff)[0]
        # 讀取字符串
        buff = self._read_all(length)
        name = buff.decode()
        return name
class DivideProtocol(object):
    """
    divide過(guò)程消息協(xié)議轉(zhuǎn)換工具
    """
    def args_encode(self, num1, num2=1):
        """
        將原始調(diào)用的請(qǐng)求參數(shù)轉(zhuǎn)換打包成二進(jìn)制消息數(shù)據(jù)
        :param num1: int
        :param num2: int
        :return: bytes 二進(jìn)制消息數(shù)據(jù)
        """
        name = 'divide'
        # 處理函數(shù)名
        buff = struct.pack('!I', 6) # 無(wú)符號(hào)int
        buff += name.encode()
        # 處理參數(shù)1
        buff2 = struct.pack('!B', 1) # 無(wú)符號(hào)byte
        buff2 += struct.pack('!i', num1)
        # 處理參數(shù)2
        if num2 != 1:
            # 沒(méi)有傳參的時(shí)候
            buff2 += struct.pack('!B', 2)
            buff2 += struct.pack('!i', num2)
        # 處理參數(shù)邊界和組合成完整數(shù)據(jù)
        buff += struct.pack('!I',len(buff2))
        buff += buff2
        return buff
    def _read_all(self, size):
        """
        幫助我們讀取二進(jìn)制數(shù)據(jù)
        :param  size: 想要讀取的二進(jìn)制數(shù)據(jù)大小
        :return:  二進(jìn)制數(shù)據(jù)bytes
        """
        # self.conn
        if isinstance(self.conn, BytesIO):
            buff = self.conn.read(size)
            return buff
        else:
            # 有時(shí)候長(zhǎng)度大于每次讀取的長(zhǎng)度
            have = 0
            buff = b''
            while have < size:
                chunk = self.conn.recv(size - have)
                buff += chunk
                l = len(chunk)
                have +=  l
                if l == 0:
                    # 表示客戶(hù)端已經(jīng)關(guān)閉了
                    raise EOFError
            return buff
    def args_decode(self, connection):
        """
        接受調(diào)用請(qǐng)求數(shù)據(jù)病進(jìn)行解析
        :param connection: 鏈接請(qǐng)求數(shù)據(jù) socket  BytesIO
        :return: 因?yàn)橛卸鄠€(gè)參數(shù),定義為字典
        """
        param_len_map = {
            1:4,
            2:4,
        }
        param_fmt_map = {
            1:'!i',
            2:'!i',
        }
        param_name_map = {
            1: 'num1',
            2: 'num2',
        }
        # 保存用來(lái)返回的參數(shù)字典
        args = {}
        self.conn = connection
        # 處理方法的名字,已經(jīng)提前被處理,稍后處理
        # 處理消息邊界
        # 1) 讀取二進(jìn)制數(shù)據(jù)----read  , ------ByteIO.read
        # 2) 將二進(jìn)制數(shù)據(jù)轉(zhuǎn)換為python的數(shù)據(jù)類(lèi)型
        buff = self._read_all(4)
        length = struct.unpack('!I',buff)[0]
        # 記錄已經(jīng)讀取的長(zhǎng)度值
        have = 0
        # 處理第一個(gè)參數(shù)
        # 解析參數(shù)序號(hào)
        buff = self._read_all(1)
        have += 1
        param_seq = struct.unpack('!B', buff)[0]
        # 解析參數(shù)值
        param_len = param_len_map[param_seq]
        buff = self._read_all(param_len)
        have += param_len
        param_fmt = param_fmt_map[param_seq]
        param = struct.unpack(param_fmt,buff)[0]
        # 設(shè)置解析后的字典
        param_name = param_name_map[param_seq]
        args[param_name] = param
        if have >= length:
            return args
        # 處理第二個(gè)參數(shù)
        # 解析參數(shù)序號(hào)
        buff = self._read_all(1)
        param_seq = struct.unpack('!B', buff)[0]
        # 解析參數(shù)值
        param_len = param_len_map[param_seq]
        buff = self._read_all(param_len)
        param_fmt = param_fmt_map[param_seq]
        param = struct.unpack(param_fmt, buff)[0]
        # 設(shè)置解析后的字典
        param_name = param_name_map[param_seq]
        args[param_name] = param
        return args
    def result_encode(self, result):
        """
        將原始結(jié)果數(shù)據(jù)轉(zhuǎn)換為消息協(xié)議二進(jìn)制數(shù)據(jù)
        :param result:
        :return:
        """
        if  isinstance(result,float):
            # 處理返回值類(lèi)型
            buff = struct.pack('!B', 1)
            buff += struct.pack('!f', result)
            return buff
        else:
            buff = struct.pack('!B', 2)
            # 處理返回值
            length = len(result.message)
            # 處理字符串長(zhǎng)度
            buff += struct.pack('!I', length)
            buff += result.message.encode()
            return buff
    def result_decode(self, connection):
        """
        將返回值消息數(shù)據(jù)轉(zhuǎn)換為原始返回值
        :param connection: socket BytesIo
        :return: float InvalidOperation對(duì)象
        """
        self.conn = connection
        # 處理返回值類(lèi)型
        buff = self._read_all(1)
        result_type = struct.unpack('!B', buff)[0]
        if result_type == 1:
            #正常情況
            buff = self._read_all(4)
            val = struct.unpack('!f', buff)[0]
            return val
        else:
            buff = self._read_all(4)
            length = struct.unpack('!I', buff)[0]
            # 讀取字符串
            buff = self._read_all(length)
            message = buff.decode()
            return InvalidOperation(message)
class Channel(object):
    """
    用于客戶(hù)端建立網(wǎng)絡(luò)鏈接
    """
    def __init__(self, host, port):
        self.host = host
        self.port = port
    def get_connection(self):
        """
        獲取鏈接對(duì)象
        :return: 與服務(wù)器通訊的socket
        """
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((self.host, self.port))
        return sock
class DistributedChannel(object):
    """
    支持分布式的zookeeper的RPC客戶(hù)端鏈接工具
    """
    def __init__(self):
        # 創(chuàng)建kazoo對(duì)象,用來(lái)跟zookeeper鏈接,獲取信息
        zk = KazooClient('127.0.0.1:2181')
        zk.start()
        self.zk = zk
        self._servers = []
        self._get_servers() # 第一次,手動(dòng)開(kāi)啟
    def _get_servers(self, event=None):
        """
        從zookeeper中獲取所有可用的RPC服務(wù)器的地址
        :return:
        """
        self._servers = [] # 每次重新獲取地址信息
        # 從zookeeper中獲取/rpc節(jié)點(diǎn)下的所有可用的rpc服務(wù)器節(jié)點(diǎn)
        servers = self.zk.get_children('/rpc', watch=self._get_servers) # 監(jiān)視的回調(diào)函數(shù)為自身
        for server in servers:
            addr_data = self.zk.get('/rpc/' + server)[0]
            addr = json.loads(addr_data.decode())
            self._servers.append(addr)
    def _get_server(self):
        """
        從可用的服務(wù)器列表中選出一臺(tái)服務(wù)器
        :return: {"host":xxx,"port":xxx}
        """
        return random.choice(self._servers)
    def get_connection(self):
        """
        提供一個(gè)具體的與RPC服務(wù)器的鏈接socket
        :return:
        """
        while True:
            addr = self._get_server()
            print(addr)
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.connect((addr['host'], addr['port']))
            except ConnectionRefusedError:
                time.sleep(1)
                continue
            else:
                return sock
class ThreadServer(object):
    """
    多線(xiàn)成RPC服務(wù)器
    """
    def __init__(self, host, port, handlers):
        sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        # 地址復(fù)用
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.host = host
        self.port = port
        # 綁定地址
        sock.bind((self.host, self.port))
        #  因?yàn)樵趩?dòng)的方法中才開(kāi)啟監(jiān)聽(tīng),所以不在此處開(kāi)啟
        # sock.listen(128)
        self.sock = sock
        self.handlers = handlers
    def serve(self):
        """
        開(kāi)啟服務(wù)器運(yùn)行,提供RPC服務(wù)
        :return:
        """
        # 開(kāi)啟服務(wù)器的監(jiān)聽(tīng),等待客戶(hù)端的鏈接請(qǐng)求
        self.sock.listen(128)
        print("服務(wù)器開(kāi)啟監(jiān)聽(tīng),ip地址為%s,port為%d..." % (self.host,self.port))
        # 注冊(cè)到zookeeper
        self.register_zookeeper()
        while True:
            # 不斷的接收客戶(hù)端的鏈接請(qǐng)求
            client_sock, client_addr = self.sock.accept()
            print("與客戶(hù)端%s建立連接" % str(client_addr))
            t = threading.Thread(target= self.handle, args=(client_sock,))
            t.start()
    def  register_zookeeper(self):
        """
        在zookeeper中心注冊(cè)本服務(wù)器的地址信息
        :return:
        """
        # 創(chuàng)建kazoo的客戶(hù)端
        zk = KazooClient('127.0.0.1:2181')
        # 建立與zookeeper的鏈接
        zk.start()
        # 在zookeeper中創(chuàng)建節(jié)點(diǎn)保存數(shù)據(jù)
        zk.ensure_path('/rpc')
        data = json.dumps({'host':self.host,'port':self.port})
        zk.create('/rpc/server', data.encode(), ephemeral=True, sequence=True)
    # 子線(xiàn)程函數(shù)
    def handle(self,client_sock):
        """
        子線(xiàn)程調(diào)用的方法,用來(lái)處理一個(gè)客戶(hù)段的請(qǐng)求
        :return:
        """
        # 交個(gè)ServerStub,完成客戶(hù)端的具體的RPC的調(diào)用請(qǐng)求
        stub = ServerStub(client_sock, self.handlers)
        try:
            while True:
                # 不斷的接收
                stub.process()
        except EOFError:
            # 表示客戶(hù)端關(guān)閉了連接
            print('客戶(hù)端關(guān)閉了連接')
            client_sock.close()
class Server(object):
    """
    RPC服務(wù)器
    """
    def __init__(self, host, port, handlers):
        sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        # 地址復(fù)用
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.host = host
        self.port = port
        # 綁定地址
        sock.bind((self.host, self.port))
        #  因?yàn)樵趩?dòng)的方法中才開(kāi)啟監(jiān)聽(tīng),所以不在此處開(kāi)啟
        # sock.listen(128)
        self.sock = sock
        self.handlers = handlers
    def serve(self):
        """
        開(kāi)啟服務(wù)器運(yùn)行,提供RPC服務(wù)
        :return:
        """
        # 開(kāi)啟服務(wù)器的監(jiān)聽(tīng),等待客戶(hù)端的鏈接請(qǐng)求
        self.sock.listen(128)
        print("服務(wù)器開(kāi)啟監(jiān)聽(tīng),ip地址為%s,port為%d..." % (self.host,self.port))
        while True:
            # 不斷的接收客戶(hù)端的鏈接請(qǐng)求
            client_sock, client_addr = self.sock.accept()
            print("與客戶(hù)端%s建立連接" % str(client_addr))
            # 交個(gè)ServerStub,完成客戶(hù)端的具體的RPC的調(diào)用請(qǐng)求
            stub = ServerStub(client_sock, self.handlers)
            try:
                while True:
                    # 不斷的接收
                    stub.process()
            except EOFError:
                # 表示客戶(hù)端關(guān)閉了連接
                print('客戶(hù)端關(guān)閉了連接')
                client_sock.close()
class ClientStub(object):
    """
    用來(lái)幫助客戶(hù)端完成遠(yuǎn)程過(guò)程調(diào)用 RPC調(diào)用
    stub = ClientStub()
    stub.divide(200, 100)
    """
    def __init__(self, channel):
        self.channel = channel
        self.conn = self.channel.get_connection()
    def divide(self, num1, num2 = 1):
        # 將調(diào)用的參數(shù)打包成消息協(xié)議的數(shù)據(jù)
        proto = DivideProtocol()
        args = proto.args_encode(num1, num2)
        # 將消息數(shù)據(jù)通過(guò)網(wǎng)絡(luò)發(fā)送給服務(wù)器
        self.conn.sendall(args)
        # 接受服務(wù)器返回的消息數(shù)據(jù),并進(jìn)行解析
        result = proto.result_decode(self.conn)
        # 將結(jié)果之(正常float 或 異常InvalidOperation)返回給客戶(hù)端
        if isinstance(result,float):
            return result
        else:
            raise result
class ServerStub(object):
    """
    服務(wù)端存根
    幫助服務(wù)端完成遠(yuǎn)端過(guò)程調(diào)用
    """
    def __init__(self, connection, handlers):
        """
        :param connection: 與客戶(hù)端的鏈接
        :param handlers: 真正的本地函數(shù)路由
        此處不以map的形式處理,實(shí)現(xiàn)類(lèi)的形式
        class Handler:
            @staticmethod
            def divide():
                pass
            @staticmethod
            def add():
                pass
        """
        self.conn = connection
        self.method_proto = MethodProtocol(self.conn)
        self.process_map = {
            'divide': self._process_divide,
            'add': self._process_add
        }
        self.handlers = handlers
    def process(self):
        """
        當(dāng)服務(wù)端接受了客戶(hù)的鏈接,建立好鏈接后,完成遠(yuǎn)端調(diào)用的處理
        :return:
        """
        # 接收消息數(shù)據(jù),并解析方法的名字
        name = self.method_proto.get_method_name()
        # 根據(jù)解析獲得的方法名,調(diào)用相應(yīng)的過(guò)程協(xié)議,接收并解析消息數(shù)據(jù)
        self.process_map[name]()
    def _process_divide(self):
        """
        處理除法過(guò)程調(diào)用
        :return:
        """
        proto = DivideProtocol()
        args = proto.args_decode(self.conn)
        # args = {'num1':xxx, 'num2':xxx}
        # 除法過(guò)程的本地調(diào)用------------------->>>>>>>>>
        # 將本地調(diào)用過(guò)程的返回值(包括可能的異常)打包成消息協(xié)議的數(shù)據(jù),通過(guò)網(wǎng)絡(luò)返回給客戶(hù)端
        try:
            val = self.handlers.divide(**args)
        except InvalidOperation as e:
            ret_message = proto.result_encode(e)
        else:
            ret_message = proto.result_encode(val)
        self.conn.sendall(ret_message)
    def _process_add(self):
        """
        處理加法過(guò)程調(diào)用
        此方法暫時(shí)不識(shí)閑
        :return:
        """
        pass
if __name__ == '__main__':
    # 目的:消息協(xié)議測(cè)試,模擬網(wǎng)絡(luò)傳輸
    # 構(gòu)造消息數(shù)據(jù)
    proto = DivideProtocol()
    # 測(cè)試一
    # divide(200,100)
    # message = proto.args_encode(200,100)
    # 測(cè)試二
    message = proto.args_encode(200)
    conn = BytesIO()
    conn.write(message)
    conn.seek(0)
    # 解析消息數(shù)據(jù)
    method_proto = MethodProtocal(conn)
    name = method_proto.get_method_name()
    print(name)
    args = proto.args_decode(conn)
    print(args)

```

接下來(lái),修改server.py文件

server.py

```

from services import InvalidOperation
# from services import Server
from services import ThreadServer
import sys
class Handlers:
    @staticmethod
    def divide(num1, num2 = 1):
        if num2 == 0:
            raise InvalidOperation('ck_god_err')
        val = num1/num2
        return val
if __name__ == '__main__':
    # 開(kāi)啟服務(wù)器
    # _server = Server('127.0.0.1', 8000, Handlers)
    # _server.serve()
    # 從啟動(dòng)命令中提取服務(wù)器運(yùn)行的ip地址和端口號(hào),啟動(dòng)的多線(xiàn)程服務(wù)器
    host = sys.argv[1]
    port = int(sys.argv[2])
    _server = ThreadServer(host, port, Handlers)
    _server.serve()

```


最后,將 client.py文件也稍作修改。

```

import time
from services import ClientStub
# from services import Channel
from services import DistributedChannel
from services import InvalidOperation
# 創(chuàng)建與服務(wù)器的連接
# channel = Channel('127.0.0.1', 8000)
channel = DistributedChannel()
# 進(jìn)行調(diào)用
for i in range(50):
    try:
        # 創(chuàng)建用于rpc調(diào)用的工具
        stub = ClientStub(channel) # 初始化的時(shí)候才真正的創(chuàng)建連接了,所以放到里面
        val = stub.divide(i * 100,100)
        # val = stub.divide(i * 100)
        # val = stub.divide( 100, 0)
    except InvalidOperation as e:
        print(e.message)
    else:
        print(val)
    time.sleep(1)

```

ctrl + shift + T在pycharm中打開(kāi)多個(gè)Terminal窗口


分布式RPC服務(wù)器(容災(zāi)和服務(wù)器識(shí)別機(jī)制的實(shí)現(xiàn),借助zookeeper)最終完整代碼

分布式RPC服務(wù)器(容災(zāi)和服務(wù)器識(shí)別機(jī)制的實(shí)現(xiàn),借助zookeeper)最終完整代碼




右鍵運(yùn)行客戶(hù)端,可以看到不斷地隨機(jī)切換服務(wù)器。

分布式RPC服務(wù)器(容災(zāi)和服務(wù)器識(shí)別機(jī)制的實(shí)現(xiàn),借助zookeeper)最終完整代碼


向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI