溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么寫一個RPC框架

發(fā)布時間:2021-10-28 15:41:02 來源:億速云 閱讀:129 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要講解了“怎么寫一個RPC框架”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“怎么寫一個RPC框架”吧!

RPC 框架應(yīng)該長什么樣子

我們首先來看一下:一個 RPC 框架是什么東西?我們最直觀的感覺就是:

集成了 RPC  框架之后,通過配置一個注冊中心的地址。一個應(yīng)用(稱為服務(wù)提供者)將某個接口(interface)“暴露”出去,另外一個應(yīng)用(稱為服務(wù)消費者)通過“引用”這個接口(interface),然后調(diào)用了一下,就很神奇的可以調(diào)用到另外一個應(yīng)用的方法了

給我們的感覺就好像調(diào)用了一個本地方法一樣。即便兩個應(yīng)用不是在同一個 JVM 中甚至兩個應(yīng)用都不在同一臺機(jī)器中。

那他們是如何做到的呢?當(dāng)我們的服務(wù)消費者調(diào)用某個 RPC  接口的方法之后,它的底層會通過動態(tài)代理,然后經(jīng)過網(wǎng)絡(luò)調(diào)用,去到服務(wù)提供者的機(jī)器上,然后執(zhí)行對應(yīng)的方法。

接著方法的結(jié)果通過網(wǎng)絡(luò)傳輸返回到服務(wù)消費者那里,然后就可以拿到結(jié)果了。

整個過程如下圖:

怎么寫一個RPC框架

那么這個時候,可能有人會問了:服務(wù)消費者怎么知道服務(wù)提供者在哪臺機(jī)器的哪個端口呢?

這個時候,就需要“注冊中心”登場了,具體來說是這樣子的:

  • 服務(wù)提供者在啟動的時候,將自己應(yīng)用所在機(jī)器的信息提交到注冊中心上面。

  • 服務(wù)消費者在啟動的時候,將需要消費的接口所在機(jī)器的信息抓回來。

這樣一來,服務(wù)消費者就有了一份服務(wù)提供者所在的機(jī)器列表了。

怎么寫一個RPC框架

“服務(wù)消費者”拿到了“服務(wù)提供者”的機(jī)器列表就可以通過網(wǎng)絡(luò)請求來發(fā)起請求了。

網(wǎng)絡(luò)客戶端,我們應(yīng)該采用什么呢?有幾種選擇:

  • 使用 JDK 原生 BIO(也就是 ServerSocket 那一套)。阻塞式 IO 方法,無法支撐高并發(fā)。

  • 使用 JDK 原生 NIO(Selector、SelectionKey 那一套)。非阻塞式  IO,可以支持高并發(fā),但是自己實現(xiàn)復(fù)雜,需要處理各種網(wǎng)絡(luò)問題。

  • 使用大名鼎鼎的 NIO 框架 Netty,天然支持高并發(fā),封裝好,API 易用。

“服務(wù)消費者”拿到了“服務(wù)提供者”的機(jī)器列表就可以通過網(wǎng)絡(luò)請求來發(fā)起請求了。

作為一個有追求的程序員,我們要求開發(fā)出來的框架要求支持高并發(fā)、又要求簡單、還要快。

當(dāng)然是選擇 Netty 來實現(xiàn)了,使用 Netty 的一些很基本的 API 就能滿足我們的需求。

網(wǎng)絡(luò)協(xié)議定義

當(dāng)然了,既然我們要使用網(wǎng)絡(luò)傳輸數(shù)據(jù)。我們首先要定義一套網(wǎng)絡(luò)協(xié)議出來。

你可能又要問了,啥叫網(wǎng)絡(luò)協(xié)議?網(wǎng)絡(luò)協(xié)議,通俗理解,意思就是說我們的客戶端發(fā)送的數(shù)據(jù)應(yīng)該長什么樣子,服務(wù)端可以去解析出來知道要做什么事情。

話不多說,上代碼,假設(shè)我們現(xiàn)在服務(wù)提供者有兩個類:

// com.study.rpc.test.producer.HelloService public interface HelloService {      String sayHello(TestBean testBean); }  // com.study.rpc.test.producer.TestBean public class TestBean {      private String name;     private Integer age;      public TestBean(String name, Integer age) {         this.name = name;         this.age = age;     }     // getter setter }

現(xiàn)在我要調(diào)用 HelloService.sayHello(TestBean testBean) 這個方法。

作為“服務(wù)消費者”,應(yīng)該怎么定義我們的請求,從而讓服務(wù)端知道我是要調(diào)用這個方法呢?

這需要我們將這個接口信息產(chǎn)生一個唯一的標(biāo)識:這個標(biāo)識會記錄了接口名、具體是那個方法、然后具體參數(shù)是什么!

然后將這些信息組織起來發(fā)送給服務(wù)端,我這里的方式是將信息保存為一個 JSON 格式的字符串來傳輸。

比如上面的接口我們傳輸?shù)臄?shù)據(jù)大概是這樣的:

{     "interfaces": "interface=com.study.rpc.test.producer.HelloService&method=sayHello&     parameter=com.study.rpc.test.producer.TestBean",     "requestId": "3",     "parameter": {         "com.study.rpc.test.producer.TestBean": {             "age": 20,             "name": "張三"         }     } }

嗯,我這里用一個 JSON 來標(biāo)識這次調(diào)用是調(diào)用哪個接口的哪個方法,其中 interface 標(biāo)識了唯一的類,parameter  標(biāo)識了里面具體有哪些參數(shù), 其中 key 就是參數(shù)的類全限定名,value 就是這個類的 JSON 信息。

可能看到這里,大家可能有意見了:數(shù)據(jù)不一定用 JSON 格式傳輸啊,而且使用 JSON 也不一定性能最高啊。

你使用 JDK 的 Serializable 配合 Netty 的 ObjectDecoder  來實現(xiàn),這當(dāng)然也可以,其實這里是一個拓展點,我們應(yīng)該要提供多種序列化方式來供用戶選擇。

但是這里選擇了 JSON 的原因是因為它比較直觀,對于寫文章來說比較合理。

開發(fā)服務(wù)提供者

嗯,搞定了網(wǎng)絡(luò)協(xié)議之后,我們開始開發(fā)“服務(wù)提供者”了。對于服務(wù)提供者,因為我們這里是寫一個簡單版本的 RPC 框架,為了保持簡潔。

我們不會引入類似 Spring  之類的容器框架,所以我們需要定義一個服務(wù)提供者的配置類,它用于定義這個服務(wù)提供者是什么接口,然后它具體的實例對象是什么:

public class ServiceConfig{      public Class type;      public T instance;      public ServiceConfig(Classtype, T instance) {         this.type = type;         this.instance = instance;     }      public ClassgetType() {         return type;     }      public void setType(Classtype) {         this.type = type;     }      public T getInstance() {         return instance;     }      public void setInstance(T instance) {         this.instance = instance;     } }

有了這個東西之后,我們就知道需要暴露哪些接口了。為了框架有一個統(tǒng)一的入口,我定義了一個類叫做  ApplicationContext,可以認(rèn)為這是一個應(yīng)用程序上下文,他的構(gòu)造函數(shù),接收 2 個參數(shù)。

代碼如下:

public ApplicationContext(String registryUrl, ListserviceConfigs){     // 1. 保存需要暴露的接口配置     this.serviceConfigs = serviceConfigs == null ? new ArrayList<>() : serviceConfigs;      // step 2: 實例化注冊中心     initRegistry(registryUrl);      // step 3: 將接口注冊到注冊中心,從注冊中心獲取接口,初始化服務(wù)接口列表     RegistryInfo registryInfo = null;     InetAddress addr = InetAddress.getLocalHost();     String hostname = addr.getHostName();     String hostAddress = addr.getHostAddress();     registryInfo = new RegistryInfo(hostname, hostAddress, port);     doRegistry(registryInfo);       // step 4:初始化Netty服務(wù)器,接受到請求,直接打到服務(wù)提供者的service方法中     if (!this.serviceConfigs.isEmpty()) {         // 需要暴露接口才暴露         nettyServer = new NettyServer(this.serviceConfigs, interfaceMethods);         nettyServer.init(port);     } }

注冊中心設(shè)計

這里分為幾個步驟,首先保存了接口配置,接著初始化注冊中心,因為注冊中心可能會提供多種來供用戶選擇,所以這里需要定義一個注冊中心的接口:

public interface Registry {     /**      * 將生產(chǎn)者接口注冊到注冊中心      *      * @param clazz        類      * @param registryInfo 本機(jī)的注冊信息      */     void register(Class clazz, RegistryInfo registryInfo) throws Exception; }

這里我們提供一個注冊的方法,這個方法的語義是將 clazz 對應(yīng)的接口注冊到注冊中心。

接收兩個參數(shù),一個是接口的 class 對象,另一個是注冊信息,里面包含了本機(jī)的一些基本信息,如下:

public class RegistryInfo {      private String hostname;     private String ip;     private Integer port;      public RegistryInfo(String hostname, String ip, Integer port) {         this.hostname = hostname;         this.ip = ip;         this.port = port;     }     // getter setter }

好了,定義好注冊中心,回到之前的實例化注冊中心的地方,代碼如下:

/**  * 注冊中心  */ private Registry registry;  private void initRegistry(String registryUrl) {     if (registryUrl.startsWith("zookeeper://")) {         registryUrl = registryUrl.substring(12);         registry = new ZookeeperRegistry(registryUrl);     } else if (registryUrl.startsWith("multicast://")) {         registry = new MulticastRegistry(registryUrl);     } }

這里邏輯也非常簡單,就是根據(jù) url 的 schema 來判斷是那個注冊中心,注冊中心這里實現(xiàn)了 2 個實現(xiàn)類,分別使用 Zookeeper  作為注冊中心,另外一個是使用廣播的方式作為注冊中心。

廣播注冊中心這里僅僅是做個示范,內(nèi)部沒有實現(xiàn)。我們主要是實現(xiàn)了 Zookeeper 的注冊中心。

當(dāng)然了,如果有興趣,可以實現(xiàn)更多的注冊中心供用戶選擇,比如 Redis 之類的,這里只是為了保持“拓展點”。

那么實例化完注冊中心之后,回到上面的代碼。

注冊服務(wù)提供者

// step 3: 將接口注冊到注冊中心,從注冊中心獲取接口,初始化服務(wù)接口列表 RegistryInfo registryInfo = null; InetAddress addr = InetAddress.getLocalHost(); String hostname = addr.getHostName(); String hostAddress = addr.getHostAddress(); registryInfo = new RegistryInfo(hostname, hostAddress, port); doRegistry(registryInfo);

這里邏輯很簡單,就是獲取本機(jī)的的基本信息構(gòu)造成 RegistryInfo,然后調(diào)用了 doRegistry 方法:

/**  * 接口方法對應(yīng)method對象  */ private MapinterfaceMethods = new ConcurrentHashMap<>();  private void doRegistry(RegistryInfo registryInfo) throws Exception {     for (ServiceConfig config : serviceConfigs) {         Class type = config.getType();         registry.register(type, registryInfo);         Method[] declaredMethods = type.getDeclaredMethods();         for (Method method : declaredMethods) {             String identify = InvokeUtils.buildInterfaceMethodIdentify(type, method);             interfaceMethods.put(identify, method);         }     } }

這里做了兩件事情:

  • 將接口注冊到注冊中心中。

  • 對于每一個接口的每一個方法,生成一個唯一標(biāo)識,保存在 interfaceMethods 集合中。

下面分別分析這兩件事情,首先是注冊方法:因為我們用到了 Zookeeper,為了方便,引入了 Zookeeper 的客戶端框架 Curator。

<dependency>     <groupId>org.apache.curatorgroupId>     <artifactId>curator-recipesartifactId>     <version>2.3.0version> dependency>

接著看代碼:

public class ZookeeperRegistry implements Registry {      private CuratorFramework client;      public ZookeeperRegistry(String connectString) {         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);         client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);         client.start();         try {             Stat myRPC = client.checkExists().forPath("/myRPC");             if (myRPC == null) {                 client.create()                         .creatingParentsIfNeeded()                         .forPath("/myRPC");             }             System.out.println("Zookeeper Client初始化完畢......");         } catch (Exception e) {             e.printStackTrace();         }     }       @Override     public void register(Class clazz, RegistryInfo registryInfo) throws Exception {         // 1. 注冊的時候,先從zk中獲取數(shù)據(jù)         // 2. 將自己的服務(wù)器地址加入注冊中心中          // 為每一個接口的每一個方法注冊一個臨時節(jié)點,然后key為接口方法的唯一標(biāo)識,data為服務(wù)地址列表          Method[] declaredMethods = clazz.getDeclaredMethods();         for (Method method : declaredMethods) {             String key = InvokeUtils.buildInterfaceMethodIdentify(clazz, method);             String path = "/myRPC/" + key;             Stat stat = client.checkExists().forPath(path);             ListregistryInfos;             if (stat != null) {                 // 如果這個接口已經(jīng)有人注冊過了,把數(shù)據(jù)拿回來,然后將自己的信息保存進(jìn)去                 byte[] bytes = client.getData().forPath(path);                 String data = new String(bytes, StandardCharsets.UTF_8);                 registryInfos = JSONArray.parseArray(data, RegistryInfo.class);                 if (registryInfos.contains(registryInfo)) {                     // 正常來說,zk的臨時節(jié)點,斷開連接后,直接就沒了,但是重啟會經(jīng)常發(fā)現(xiàn)存在節(jié)點,所以有了這樣的代碼                     System.out.println("地址列表已經(jīng)包含本機(jī)【" + key + "】,不注冊了");                 } else {                     registryInfos.add(registryInfo);                     client.setData().forPath(path, JSONArray.toJSONString(registryInfos).getBytes());                     System.out.println("注冊到注冊中心,路徑為:【" + path + "】 信息為:" + registryInfo);                 }             } else {                 registryInfos = new ArrayList<>();                 registryInfos.add(registryInfo);                 client.create()                         .creatingParentsIfNeeded()                         // 臨時節(jié)點,斷開連接就關(guān)閉                         .withMode(CreateMode.EPHEMERAL)                         .forPath(path, JSONArray.toJSONString(registryInfos).getBytes());                 System.out.println("注冊到注冊中心,路徑為:【" + path + "】 信息為:" + registryInfo);             }         }     } }

Zookeeper 注冊中心在初始化的時候,會建立好連接。然后注冊的時候,針對 clazz 接口的每一個方法,都會生成一個唯一標(biāo)識。

這里使用了InvokeUtils.buildInterfaceMethodIdentify方法:

public static String buildInterfaceMethodIdentify(Class clazz, Method method) {     Map<String, String> map = new LinkedHashMap<>();     map.put("interface", clazz.getName());     map.put("method", method.getName());     Parameter[] parameters = method.getParameters();     if (parameters.length > 0) {         StringBuilder param = new StringBuilder();         for (int i = 0; i < parameters.length; i++) {             Parameter p = parameters[i];             param.append(p.getType().getName());             if (i < parameters.length - 1) {                 param.append(",");             }         }         map.put("parameter", param.toString());     }     return map2String(map); }  public static String map2String(Map<String, String> map) {     StringBuilder sb = new StringBuilder();     Iterator<map.entry<string, <="" span="">String>> iterator = map.entrySet().iterator();     while (iterator.hasNext()) {         Map.Entry<String, String> entry = iterator.next();         sb.append(entry.getKey() + "=" + entry.getValue());         if (iterator.hasNext()) {             sb.append("&");         }     }     return sb.toString(); }

其實就是對接口的方法使用他們的限定名和參數(shù)來組成一個唯一的標(biāo)識,比如 HelloService#sayHello(TestBean)  生成的大概是這樣的:

interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean

接下來的邏輯就簡單了,在 Zookeeper 中的 /myRPC 路徑下面建立臨時節(jié)點,節(jié)點名稱為我們上面的接口方法唯一標(biāo)識,數(shù)據(jù)內(nèi)容為機(jī)器信息。

之所以采用臨時節(jié)點是因為:如果機(jī)器宕機(jī)了,連接斷開之后,消費者可以通過 Zookeeper 的 watcher 機(jī)制感知到。

大概看起來是這樣的:

/myRPC/interface=com.study.rpc.test.producer.HelloService&method=sayHello&    parameter=com.study.rpc.test.producer.TestBean    [        {            "hostname":peer1,            "port":8080        },        {            "hostname":peer2,            "port":8081        }    ]

通過這樣的方式,在服務(wù)消費的時候就可以拿到這樣的注冊信息,然后知道可以調(diào)用那臺機(jī)器的那個端口。

好了,注冊中心弄完了之后,我們回到前面說的注冊方法做的第二件事情,我們將每一個接口方法標(biāo)識的方法放入了一個 map 中:

/**  * 接口方法對應(yīng)method對象  */ private Map<String, Method> interfaceMethods = new ConcurrentHashMap<>();

這個的原因是因為,我們在收到網(wǎng)絡(luò)請求的時候,需要調(diào)用反射的方式調(diào)用 Method 對象,所以存起來。

啟動網(wǎng)絡(luò)服務(wù)端接受請求

接下來我們就可以看第四步了:

// step 4:初始化Netty服務(wù)器,接受到請求,直接打到服務(wù)提供者的service方法中 if (!this.serviceConfigs.isEmpty()) {     // 需要暴露接口才暴露     nettyServer = new NettyServer(this.serviceConfigs, interfaceMethods);     nettyServer.init(port); }

因為這里使用 Netty 來做的所以需要引入 Netty 的依賴:

<dependency>     <groupId>io.nettygroupId>     <artifactId>netty-allartifactId>     <version>4.1.30.Finalversion> dependency>

接著來分析:

public class NettyServer {      /**      * 負(fù)責(zé)調(diào)用方法的handler      */     private RpcInvokeHandler rpcInvokeHandler;      public NettyServer(ListserverConfigs, MapinterfaceMethods)throws InterruptedException {         this.rpcInvokeHandler = new RpcInvokeHandler(serverConfigs, interfaceMethods);     }      public int init(int port) throws Exception {         EventLoopGroup bossGroup = new NioEventLoopGroup();         EventLoopGroup workerGroup = new NioEventLoopGroup();         ServerBootstrap b = new ServerBootstrap();         b.group(bossGroup, workerGroup)                 .channel(NioServerSocketChannel.class)                 .option(ChannelOption.SO_BACKLOG, 1024)                 .childHandler(new ChannelInitializer(){                     @Override                     protected void initChannel(SocketChannel ch) throws Exception {                         ByteBuf delimiter = Unpooled.copiedBuffer("$$");                         // 設(shè)置按照分隔符“&&”來切分消息,單條消息限制為 1MB                         ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, delimiter));                         ch.pipeline().addLast(new StringDecoder());                         ch.pipeline().addLast().addLast(rpcInvokeHandler);                     }                 });         ChannelFuture sync = b.bind(port).sync();         System.out.println("啟動NettyService,端口為:" + port);         return port;     } }

這部分主要的都是 Netty 的 API,我們不做過多的說明,就簡單的說一下:

  • 我們通過“&&”作為標(biāo)識符號來區(qū)分兩條信息,然后一條信息的最大長度為 1MB。

  • 所有邏輯都在 RpcInvokeHandler 中,這里面?zhèn)鬟M(jìn)去了配置的服務(wù)接口實例,以及服務(wù)接口實例每個接口方法唯一標(biāo)識對應(yīng)的 Method 對象的  Map 集合。

public class RpcInvokeHandler extends ChannelInboundHandlerAdapter {      /**      * 接口方法唯一標(biāo)識對應(yīng)的Method對象      */     private Map<String, Method> interfaceMethods;     /**      * 接口對應(yīng)的實現(xiàn)類      */     private Map<class, Object> interfaceToInstance;      /**      * 線程池,隨意寫的,不要吐槽      */     private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10,             50, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),             new ThreadFactory() {                 AtomicInteger m = new AtomicInteger(0);                  @Override                 public Thread newThread(Runnable r) {                     return new Thread(r, "IO-thread-" + m.incrementAndGet());                 }             });       public RpcInvokeHandler(ListserviceConfigList,                             Map<String, Method> interfaceMethods) {         this.interfaceToInstance = new ConcurrentHashMap<>();         this.interfaceMethods = interfaceMethods;         for (ServiceConfig config : serviceConfigList) {             interfaceToInstance.put(config.getType(), config.getInstance());         }     }      @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {         try {             String message = (String) msg;             // 這里拿到的是一串JSON數(shù)據(jù),解析為Request對象,             // 事實上這里解析網(wǎng)絡(luò)數(shù)據(jù),可以用序列化方式,定一個接口,可以實現(xiàn)JSON格式序列化,或者其他序列化             // 但是demo版本就算了。             System.out.println("接收到消息:" + msg);             RpcRequest request = RpcRequest.parse(message, ctx);             threadPoolExecutor.execute(new RpcInvokeTask(request));         } finally {             ReferenceCountUtil.release(msg);         }     }      @Override     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {         ctx.flush();     }      @Override     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {         System.out.println("發(fā)生了異常..." + cause);         cause.printStackTrace();         ctx.close();     }      public class RpcInvokeTask implements Runnable {          private RpcRequest rpcRequest;          RpcInvokeTask(RpcRequest rpcRequest) {             this.rpcRequest = rpcRequest;         }          @Override         public void run() {             try {                 /*                  * 數(shù)據(jù)大概是這樣子的                  * {"interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello&para;meter=com                  * .study.rpc.test.producer.TestBean","requestId":"3","parameter":{"com.study.rpc.test.producer                  * .TestBean":{"age":20,"name":"張三"}}}                  */                 // 這里希望能拿到每一個服務(wù)對象的每一個接口的特定聲明                 String interfaceIdentity = rpcRequest.getInterfaceIdentity();                 Method method = interfaceMethods.get(interfaceIdentity);                 Map<String, String> map = string2Map(interfaceIdentity);                 String interfaceName = map.get("interface");                 Class interfaceClass = Class.forName(interfaceName);                 Object o = interfaceToInstance.get(interfaceClass);                 String parameterString = map.get("parameter");                 Object result;                 if (parameterString != null) {                     String[] parameterTypeClass = parameterString.split(",");                     Map<String, Object> parameterMap = rpcRequest.getParameterMap();                     Object[] parameterInstance = new Object[parameterTypeClass.length];                     for (int i = 0; i < parameterTypeClass.length; i++) {                         String parameterClazz = parameterTypeClass[i];                         parameterInstance[i] = parameterMap.get(parameterClazz);                     }                     result = method.invoke(o, parameterInstance);                 } else {                     result = method.invoke(o);                 }                 // 寫回響應(yīng)                 ChannelHandlerContext ctx = rpcRequest.getCtx();                 String requestId = rpcRequest.getRequestId();                 RpcResponse response = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity,                         requestId);                 String s = JSONObject.toJSONString(response) + "$$";                 ByteBuf byteBuf = Unpooled.copiedBuffer(s.getBytes());                 ctx.writeAndFlush(byteBuf);                 System.out.println("響應(yīng)給客戶端:" + s);             } catch (Exception e) {                 e.printStackTrace();             }         }           public static Map<String, String> string2Map(String str) {             String[] split = str.split("&");             Map<String, String> map = new HashMap<>(16);             for (String s : split) {                 String[] split1 = s.split("=");                 map.put(split1[0], split1[1]);             }             return map;          }     } }

這里說明一下上面的邏輯:channelRead 方法用于接收消息,接收到的就是我們前面分析的那個 JSON 格式的數(shù)據(jù),接著我們將消息解析成  RpcRequest。

public class RpcRequest {      private String interfaceIdentity;      private Map<String, Object> parameterMap = new HashMap<>();      private ChannelHandlerContext ctx;      private String requestId;      public static RpcRequest parse(String message, ChannelHandlerContext ctx) throws ClassNotFoundException {         /*          * {          *   "interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello2&para;meter=java.lang          * .String,com.study.rpc.test.producer.TestBean",          *   "parameter":{          *      "java.lang.String":"haha",          *      "com.study.rpc.test.producer.TestBean":{          *              "name":"小王",          *              "age":20          *        }          *    }          * }          */         JSONObject jsonObject = JSONObject.parseObject(message);         String interfaces = jsonObject.getString("interfaces");          JSONObject parameter = jsonObject.getJSONObject("parameter");         Set<String> strings = parameter.keySet();         RpcRequest request = new RpcRequest();         request.setInterfaceIdentity(interfaces);         Map<String, Object> parameterMap = new HashMap<>(16);          String requestId = jsonObject.getString("requestId");          for (String key : strings) {             if (key.equals("java.lang.String")) {                 parameterMap.put(key, parameter.getString(key));             } else {                 Class clazz = Class.forName(key);                 Object object = parameter.getObject(key, clazz);                 parameterMap.put(key, object);             }         }         request.setParameterMap(parameterMap);         request.setCtx(ctx);         request.setRequestId(requestId);         return request;     } }

接著從 request 中解析出來需要調(diào)用的接口,然后通過反射調(diào)用對應(yīng)的接口,得到結(jié)果后我們將響應(yīng)封裝成 PrcResponse 寫回給客戶端:

public class RpcResponse {      private String result;      private String interfaceMethodIdentify;      private String requestId;      public String getResult() {         return result;     }      public void setResult(String result) {         this.result = result;     }      public static RpcResponse create(String result, String interfaceMethodIdentify, String requestId) {         RpcResponse response = new RpcResponse();         response.setResult(result);         response.setInterfaceMethodIdentify(interfaceMethodIdentify);         response.setRequestId(requestId);         return response;     } }

里面包含了請求的結(jié)果 JSON 串,接口方法唯一標(biāo)識,請求 ID。數(shù)據(jù)大概看起來這個樣子:

{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean","requestId":"3", "result":"\"牛逼,我收到了消息:TestBean{name='張三', age=20}\""}

通過這樣的信息,客戶端就可以通過響應(yīng)結(jié)果解析出來。

測試服務(wù)提供者

既然我們代碼寫完了,現(xiàn)在需要測試一把,首先我們先寫一個 HelloService 的實現(xiàn)類出來:

public class HelloServiceImpl implements HelloService {      @Override     public String sayHello(TestBean testBean) {         return "牛逼,我收到了消息:" + testBean;     } }

接著編寫服務(wù)提供者代碼:

public class TestProducer {      public static void main(String[] args) throws Exception {         String connectionString = "zookeeper://localhost1:2181,localhost2:2181,localhost3:2181";         HelloService service = new HelloServiceImpl();         ServiceConfig config = new ServiceConfig<>(HelloService.class, service);         ListserviceConfigList = new ArrayList<>();         serviceConfigList.add(config);         ApplicationContext ctx = new ApplicationContext(connectionString, serviceConfigList,         null, 50071);     } }

接著啟動起來,看到日志:

Zookeeper Client初始化完畢...... 注冊到注冊中心,路徑為:【/myRPC/interface=com.study.rpc.test.producer.HelloService& method=sayHello&para;meter=com.study.rpc.test.producer.TestBean】 信息為:RegistryInfo{hostname='localhost', ip='192.168.16.7', port=50071} 啟動NettyService,端口為:50071

這個時候,我們期望用 NettyClient 發(fā)送請求:

{     "interfaces": "interface=com.study.rpc.test.producer.HelloService&     method=sayHello&para;meter=com.study.rpc.test.producer.TestBean",     "requestId": "3",     "parameter": {         "com.study.rpc.test.producer.TestBean": {             "age": 20,             "name": "張三"         }     } }

得到的響應(yīng)應(yīng)該是:

{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean","requestId":"3", "result":"\"牛逼,我收到了消息:TestBean{name='張三', age=20}\""}

那么,可以編寫一個測試程序(這個程序僅僅用于中間測試用,讀者不必理解):

public class NettyClient {     public static void main(String[] args) {         EventLoopGroup group = new NioEventLoopGroup();         try {             Bootstrap b = new Bootstrap();             b.group(group)                     .channel(NioSocketChannel.class)                     .option(ChannelOption.TCP_NODELAY, true)                     .handler(new ChannelInitializer() {                         @Override                         protected void initChannel(SocketChannel ch) throws Exception {                             ch.pipeline().addLast(new StringDecoder());                             ch.pipeline().addLast(new NettyClientHandler());                         }                     });             ChannelFuture sync = b.connect("127.0.0.1", 50071).sync();             sync.channel().closeFuture().sync();         } catch (Exception e) {             e.printStackTrace();         } finally {             group.shutdownGracefully();         }     }      private static class NettyClientHandler extends ChannelInboundHandlerAdapter {          @Override         public void channelActive(ChannelHandlerContext ctx) throws Exception {             JSONObject jsonObject = new JSONObject();             jsonObject.put("interfaces", "interface=com.study.rpc.test.producer" +                     ".HelloService&method=sayHello&para;meter=com.study.rpc.test.producer.TestBean");             JSONObject param = new JSONObject();             JSONObject bean = new JSONObject();             bean.put("age", 20);             bean.put("name", "張三");             param.put("com.study.rpc.test.producer.TestBean", bean);             jsonObject.put("parameter", param);             jsonObject.put("requestId", 3);             System.out.println("發(fā)送給服務(wù)端JSON為:" + jsonObject.toJSONString());             String msg = jsonObject.toJSONString() + "$$";             ByteBuf byteBuf = Unpooled.buffer(msg.getBytes().length);             byteBuf.writeBytes(msg.getBytes());             ctx.writeAndFlush(byteBuf);         }          @Override         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {             System.out.println("收到消息:" + msg);         }     } }

啟動之后,看到控制臺輸出:

發(fā)送給服務(wù)端JSON為:{"interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean","requestId":3, "parameter":{"com.study.rpc.test.producer.TestBean":{"name":"張三","age":20}}}  收到消息:{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService& method=sayHello&para;meter=com.study.rpc.test.producer.TestBean","requestId":"3", "result":"\"牛逼,我收到了消息:TestBean{name='張三', age=20}\""}

Bingo,完美實現(xiàn)了 RPC 的服務(wù)提供者。接下來我們只需要實現(xiàn)服務(wù)消費者就完成了。

開發(fā)服務(wù)消費者

服務(wù)消費者是同樣的處理,我們同樣要定義一個消費者的配置:

public class ReferenceConfig{      private Class type;      public ReferenceConfig(Classtype) {         this.type = type;     }      public ClassgetType() {         return type;     }      public void setType(Classtype) {         this.type = type;     } }

然后我們是統(tǒng)一入口,在 ApplicationContext 中修改代碼:

public ApplicationContext(String registryUrl, ListserviceConfigs,                               ListreferenceConfigs, int port) throws Exception {         // step 1: 保存服務(wù)提供者和消費者         this.serviceConfigs = serviceConfigs == null ? new ArrayList<>() : serviceConfigs;         this.referenceConfigs = referenceConfigs == null ? new ArrayList<>() : referenceConfigs;         // ....  }  private void doRegistry(RegistryInfo registryInfo) throws Exception {     for (ServiceConfig config : serviceConfigs) {         Class type = config.getType();         registry.register(type, registryInfo);         Method[] declaredMethods = type.getDeclaredMethods();         for (Method method : declaredMethods) {             String identify = InvokeUtils.buildInterfaceMethodIdentify(type, method);             interfaceMethods.put(identify, method);         }     }     for (ReferenceConfig config : referenceConfigs) {         ListregistryInfos = registry.fetchRegistry(config.getType());         if (registryInfos != null) {             interfacesMethodRegistryList.put(config.getType(), registryInfos);             initChannel(registryInfos);         }     } }

在注冊的時候,我們需要將需要消費的接口,通過注冊中心抓取出來,所以注冊中心要增加一個接口方法:

public interface Registry {      /**      * 將生產(chǎn)者接口注冊到注冊中心      *      * @param clazz        類      * @param registryInfo 本機(jī)的注冊信息      */     void register(Class clazz, RegistryInfo registryInfo) throws Exception;      /**      * 為服務(wù)提供者抓取注冊表      *      * @param clazz 類      * @return 服務(wù)提供者所在的機(jī)器列表      */     ListfetchRegistry(Class clazz) throws Exception; }

獲取服務(wù)提供者的機(jī)器列表

具體在 Zookeeper 中的實現(xiàn)如下:

@Override public ListfetchRegistry(Class clazz) throws Exception {     Method[] declaredMethods = clazz.getDeclaredMethods();     ListregistryInfos = null;     for (Method method : declaredMethods) {         String key = InvokeUtils.buildInterfaceMethodIdentify(clazz, method);         String path = "/myRPC/" + key;         Stat stat = client.checkExists()                 .forPath(path);         if (stat == null) {             // 這里可以添加watcher來監(jiān)聽變化,這里簡化了,沒有做這個事情             System.out.println("警告:無法找到服務(wù)接口:" + path);             continue;         }         if (registryInfos == null) {             byte[] bytes = client.getData().forPath(path);             String data = new String(bytes, StandardCharsets.UTF_8);             registryInfos = JSONArray.parseArray(data, RegistryInfo.class);         }     }     return registryInfos; }

其實就是去 Zookeeper 獲取節(jié)點中的數(shù)據(jù),得到接口所在的機(jī)器信息,獲取到的注冊信息諸侯,就會調(diào)用以下代碼:

if (registryInfos != null) {     // 保存接口和服務(wù)地址     interfacesMethodRegistryList.put(config.getType(), registryInfos);     // 初始化網(wǎng)絡(luò)連接     initChannel(registryInfos); } private void initChannel(ListregistryInfos) throws InterruptedException {     for (RegistryInfo info : registryInfos) {         if (!channels.containsKey(info)) {             System.out.println("開始建立連接:" + info.getIp() + ", " + info.getPort());             NettyClient client = new NettyClient(info.getIp(), info.getPort());             client.setMessageCallback(message -> {                 // 這里收單服務(wù)端返回的消息,先壓入隊列                 RpcResponse response = JSONObject.parseObject(message, RpcResponse.class);                 responses.offer(response);                 synchronized (ApplicationContext.this) {                     ApplicationContext.this.notifyAll();                 }             });              // 等待連接建立             ChannelHandlerContext ctx = client.getCtx();             channels.put(info, ctx);         }     } }

我們會針對每一個唯一的 RegistryInfo 建立一個連接,然后有這樣一段代碼:

client.setMessageCallback(message -> {     // 這里收單服務(wù)端返回的消息,先壓入隊列     RpcResponse response = JSONObject.parseObject(message, RpcResponse.class);     responses.offer(response);     synchronized (ApplicationContext.this) {         ApplicationContext.this.notifyAll();     } });

設(shè)置一個 callback,用于收到消息的時候,回調(diào)這里的代碼,這部分我們后面再分析。

然后在 client.getCtx() 的時候,同步阻塞直到連接完成,建立好連接后通過,NettyClient 的代碼如下:

public class NettyClient {      private ChannelHandlerContext ctx;      private MessageCallback messageCallback;      public NettyClient(String ip, Integer port) {         EventLoopGroup group = new NioEventLoopGroup();         try {             Bootstrap b = new Bootstrap();             b.group(group)                     .channel(NioSocketChannel.class)                     .option(ChannelOption.TCP_NODELAY, true)                     .handler(new ChannelInitializer() {                         @Override                         protected void initChannel(SocketChannel ch) throws Exception {                             ByteBuf delimiter = Unpooled.copiedBuffer("$$".getBytes());                             // 設(shè)置按照分隔符“&&”來切分消息,單條消息限制為 1MB                             ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, delimiter));                             ch.pipeline().addLast(new StringDecoder());                             ch.pipeline().addLast(new NettyClientHandler());                         }                     });             ChannelFuture sync = b.connect(ip, port).sync();         } catch (Exception e) {             e.printStackTrace();         }     }      public void setMessageCallback(MessageCallback callback) {         this.messageCallback = callback;     }      public ChannelHandlerContext getCtx() throws InterruptedException {         System.out.println("等待連接成功...");         if (ctx == null) {             synchronized (this) {                 wait();             }         }         return ctx;     }     private class NettyClientHandler extends ChannelInboundHandlerAdapter {         @Override         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {             try {                 String message = (String) msg;                 if (messageCallback != null) {                     messageCallback.onMessage(message);                 }             } finally {                 ReferenceCountUtil.release(msg);             }         }         @Override         public void channelActive(ChannelHandlerContext ctx) throws Exception {             NettyClient.this.ctx = ctx;             System.out.println("連接成功:" + ctx);             synchronized (NettyClient.this) {                 NettyClient.this.notifyAll();             }         }         @Override         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {             ctx.flush();         }         @Override         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {             cause.printStackTrace();         }     }     public interface MessageCallback {         void onMessage(String message);     } }

這里主要是用了 wait() 和 notifyAll() 來實現(xiàn)同步阻塞等待連接建立。建立好連接后,我們保存到集合中:

// 等待連接建立 ChannelHandlerContext ctx = client.getCtx(); channels.put(info, ctx);

發(fā)送請求

好了,到了這里我們?yōu)槊恳粋€需要消費的接口建立了網(wǎng)絡(luò)連接,接下來要做的事情就是提供一個接口給用戶獲取服務(wù)提供者實例。

我把這個方法寫在 ApplicationContext 中:

/**  * 負(fù)責(zé)生成requestId的類  */ private LongAdder requestIdWorker = new LongAdder();  /**  * 獲取調(diào)用服務(wù)  */ @SuppressWarnings("unchecked") publicT getService(Classclazz){     return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{clazz}, new InvocationHandler() {         @Override         public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {             String methodName = method.getName();             if ("equals".equals(methodName) || "hashCode".equals(methodName)) {                 throw new IllegalAccessException("不能訪問" + methodName + "方法");             }             if ("toString".equals(methodName)) {                 return clazz.getName() + "#" + methodName;             }               // step 1: 獲取服務(wù)地址列表             ListregistryInfos = interfacesMethodRegistryList.get(clazz);              if (registryInfos == null) {                 throw new RuntimeException("無法找到服務(wù)提供者");             }              // step 2: 負(fù)載均衡             RegistryInfo registryInfo = loadBalancer.choose(registryInfos);               ChannelHandlerContext ctx = channels.get(registryInfo);             String identify = InvokeUtils.buildInterfaceMethodIdentify(clazz, method);             String requestId;             synchronized (ApplicationContext.this) {                 requestIdWorker.increment();                 requestId = String.valueOf(requestIdWorker.longValue());             }             Invoker invoker = new DefaultInvoker(method.getReturnType(), ctx, requestId, identify);             inProgressInvoker.put(identify + "#" + requestId, invoker);             return invoker.invoke(args);         }     }); }

這里主要是通過動態(tài)代理來實現(xiàn)的,首先通過 class 來獲取對應(yīng)的機(jī)器列表,接著通過 loadBalancer 來選擇一個機(jī)器。

這個 LoaderBalance 是一個接口:

public interface LoadBalancer {      /**      * 選擇一個生產(chǎn)者      *      * @param registryInfos 生產(chǎn)者列表      * @return 選中的生產(chǎn)者      */     RegistryInfo choose(ListregistryInfos);  }

在 ApplicationContext 初始化的時候可以選擇不同的實現(xiàn),我這里主要實現(xiàn)了一個簡單的隨機(jī)算法(后續(xù)可以拓展為其他的,比如  RoundRobin 之類的):

public class RandomLoadbalancer implements LoadBalancer {     @Override     public RegistryInfo choose(ListregistryInfos){         Random random = new Random();         int index = random.nextInt(registryInfos.size());         return registryInfos.get(index);     } }

接著構(gòu)造接口方法的唯一標(biāo)識 identify,還有一個 requestId。為什么需要一個 requestId 呢?

這是因為我們在處理響應(yīng)的時候,需要找到某個響應(yīng)是對應(yīng)的哪個請求,但是僅僅使用 identify  是不行的,因為我們同一個應(yīng)用程序中可能會有多個線程同時調(diào)用同一個接口的同一個方法,這樣的 identify 是相同的。

所以我們需要用 identify+requestId 的方式來判斷,reqeustId 是一個自增的 LongAddr。服務(wù)端在響應(yīng)的時候會將  requestId 返回。

接著我們構(gòu)造了一個 Invoker,把它放入 inProgressInvoker 的集合中。調(diào)用了其 invoke 方法:

Invoker invoker = new DefaultInvoker(method.getReturnType(), ctx, requestId, identify); inProgressInvoker.put(identify + "#" + requestId, invoker); // 阻塞等待結(jié)果 return invoker.invoke(args);    public class DefaultInvokerimplements Invoker{      private ChannelHandlerContext ctx;     private String requestId;     private String identify;     private ClassreturnType;      private T result;      DefaultInvoker(ClassreturnType, ChannelHandlerContext ctx, String requestId, String identify){         this.returnType = returnType;         this.ctx = ctx;         this.requestId = requestId;         this.identify = identify;     }      @SuppressWarnings("unckecked")     @Override     public T invoke(Object[] args) {         JSONObject jsonObject = new JSONObject();         jsonObject.put("interfaces", identify);         JSONObject param = new JSONObject();         if (args != null) {             for (Object obj : args) {                 param.put(obj.getClass().getName(), obj);             }         }         jsonObject.put("parameter", param);         jsonObject.put("requestId", requestId);         System.out.println("發(fā)送給服務(wù)端JSON為:" + jsonObject.toJSONString());         String msg = jsonObject.toJSONString() + "$$";         ByteBuf byteBuf = Unpooled.buffer(msg.getBytes().length);         byteBuf.writeBytes(msg.getBytes());         ctx.writeAndFlush(byteBuf);         waitForResult();         return result;     }      @Override     public void setResult(String result) {         synchronized (this) {             this.result = JSONObject.parseObject(result, returnType);             notifyAll();         }     }       private void waitForResult() {         synchronized (this) {             try {                 wait();             } catch (InterruptedException e) {                 e.printStackTrace();             }         }     } }

我們可以看到調(diào)用 Invoker 的 invoke 方法之后,會運行到 waitForResult()  這里,這里已經(jīng)把請求通過網(wǎng)絡(luò)發(fā)送出去了,但是就會被卡住。

這是因為我們的網(wǎng)絡(luò)請求的結(jié)果不是同步返回的,有可能是客戶端同時發(fā)起很多個請求,所以我們不可能在這里讓他同步阻塞等待的。

接受響應(yīng)

那么對于服務(wù)消費者而言,把請求發(fā)送出去但是卡住了,這個時候當(dāng)服務(wù)端處理完之后,會把消息返回給客戶端。

返回的入口在 NettyClient 的 onChannelRead 中:

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {     try {         String message = (String) msg;         if (messageCallback != null) {             messageCallback.onMessage(message);         }     } finally {         ReferenceCountUtil.release(msg);     } }

這里通過 callback 回調(diào)出去。是否還記的我們在初始化 NettyClient 的時候,會設(shè)置一個 callback?

/**  * 響應(yīng)隊列  */ private ConcurrentLinkedQueueresponses = new ConcurrentLinkedQueue<>();   client.setMessageCallback(message -> {     // 這里收單服務(wù)端返回的消息,先壓入隊列     RpcResponse response = JSONObject.parseObject(message, RpcResponse.class);     responses.offer(response);     synchronized (ApplicationContext.this) {         ApplicationContext.this.notifyAll();     } });

這里接受消息之后,解析成為一個 RpcResponse 對象,然后壓入 responses 隊列中,這樣我們就把所有的請求響應(yīng)放入隊列中。

但是這樣一來,我們應(yīng)該怎么把響應(yīng)結(jié)果返回給調(diào)用的地方呢?

我們可以這樣做:起一個或多個后臺線程,然后從隊列中拿出響應(yīng),然后根據(jù)響應(yīng)從我們之前保存的 inProcessInvoker 中找出對應(yīng)的  Invoker,然后把結(jié)果返回回去。

public ApplicationContext(....){      //.....      // step 5:啟動處理響應(yīng)的processor     initProcessor();  }  private void initProcessor() {     // 事實上,這里可以通過配置文件讀取,啟動多少個processor     int num = 3;     processors = new ResponseProcessor[num];     for (int i = 0; i < 3; i++) {         processors[i] = createProcessor(i);     } }  /**  * 處理響應(yīng)的線程  */ private class ResponseProcessor extends Thread {     @Override     public void run() {         System.out.println("啟動響應(yīng)處理線程:" + getName());         while (true) {             // 多個線程在這里獲取響應(yīng),只有一個成功             RpcResponse response = responses.poll();             if (response == null) {                 try {                     synchronized (ApplicationContext.this) {                         // 如果沒有響應(yīng),先休眠                         ApplicationContext.this.wait();                     }                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             } else {                 System.out.println("收到一個響應(yīng):" + response);                 String interfaceMethodIdentify = response.getInterfaceMethodIdentify();                 String requestId = response.getRequestId();                 String key = interfaceMethodIdentify + "#" + requestId;                 Invoker invoker = inProgressInvoker.remove(key);                 invoker.setResult(response.getResult());             }         }     } }

這里面如果從隊列中拿不到數(shù)據(jù),就會調(diào)用 wait() 方法等待。這里需要注意的是,在 callbak 中獲取到響應(yīng)的時候我們是會調(diào)用  notifyAll() 來喚醒這里的線程的:

responses.offer(response); synchronized (ApplicationContext.this) {     ApplicationContext.this.notifyAll(); }

這里被喚醒之后,就會有多個線程去爭搶那個響應(yīng),因為隊列是線程安全的,所以這里多個線程可以獲取到響應(yīng)結(jié)果。

接著拿到結(jié)果之后,通過 identify+requestId 構(gòu)造成唯一的請求標(biāo)識,從 inProgressInvoker 中獲取對應(yīng)的  invoker,然后通過 setResult 將結(jié)果設(shè)置進(jìn)去:

String key = interfaceMethodIdentify + "#" + requestId; Invoker invoker = inProgressInvoker.remove(key); invoker.setResult(response.getResult());   @Override public void setResult(String result) {     synchronized (this) {         this.result = JSONObject.parseObject(result, returnType);         notifyAll();     } }

這里設(shè)置進(jìn)去之后,就會將結(jié)果用 json 反序列化成為用戶需要的結(jié)果,然后調(diào)用其 notifyAll 方法喚醒 invoke 方法被阻塞的線程:

@SuppressWarnings("unckecked")     @Override     public T invoke(Object[] args) {         JSONObject jsonObject = new JSONObject();         jsonObject.put("interfaces", identify);         JSONObject param = new JSONObject();         if (args != null) {             for (Object obj : args) {                 param.put(obj.getClass().getName(), obj);             }         }         jsonObject.put("parameter", param);         jsonObject.put("requestId", requestId);         System.out.println("發(fā)送給服務(wù)端JSON為:" + jsonObject.toJSONString());         String msg = jsonObject.toJSONString() + NettyServer.DELIMITER;         ByteBuf byteBuf = Unpooled.buffer(msg.getBytes().length);         byteBuf.writeBytes(msg.getBytes());         ctx.writeAndFlush(byteBuf);         // 這里被喚醒         waitForResult();         return result;     }

然后就可以返回結(jié)果了,返回的結(jié)果就會返回給用戶了。

整體測試

到了這里我們的生產(chǎn)者和消費者的代碼都寫完了,我們來整體測試一遍。生產(chǎn)者的代碼是和之前的一致的:

public class TestProducer {     public static void main(String[] args) throws Exception {         String connectionString = "zookeeper://localhost1:2181,localhost2:2182,localhost3:2181";         HelloService service = new HelloServiceImpl();         ServiceConfig config = new ServiceConfig<>(HelloService.class, service);         ListserviceConfigList = new ArrayList<>();         serviceConfigList.add(config);         ApplicationContext ctx = new ApplicationContext(connectionString, serviceConfigList, null, 50071);     }  }

消費者測試代碼:

public class TestConsumer {      public static void main(String[] args) throws Exception {         String connectionString = "zookeeper://localhost1:2181,localhost2:2182,localhost3:2181";         ReferenceConfigconfig = new ReferenceConfig<>(HelloService.class);         ApplicationContext ctx = new ApplicationContext(connectionString, null, Collections.singletonList(config),                 50070);         HelloService helloService = ctx.getService(HelloService.class);         System.out.println("sayHello(TestBean)結(jié)果為:" + helloService.sayHello(new TestBean("張三", 20)));     } }

接著啟動生產(chǎn)者,然后啟動消費者。生產(chǎn)者得到的日志如下:

Zookeeper Client初始化完畢...... 注冊到注冊中心,路徑為:【/myRPC/interface=com.study.rpc.test.producer.HelloService& method=sayHello&para;meter=com.study.rpc.test.producer.TestBean】 信息為:RegistryInfo{hostname='localhost', ip='192.168.16.7', port=50071} 啟動NettyService,端口為:50071 啟動響應(yīng)處理線程:Response-processor-0 啟動響應(yīng)處理線程:Response-processor-2 啟動響應(yīng)處理線程:Response-processor-1 接收到消息:{"interfaces":"interface=com.study.rpc.test.producer.HelloService& method=sayHello&para;meter=com.study.rpc.test.producer.TestBean","requestId":"1", "parameter":{"com.study.rpc.test.producer.TestBean":{"age":20,"name":"張三"}}}  響應(yīng)給客戶端:{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService& method=sayHello&para;meter=com.study.rpc.test.producer.TestBean","requestId":"1", "result":"\"牛逼,我收到了消息:TestBean{name='張三', age=20}\""}

消費者得到的日志為:

Zookeeper Client初始化完畢...... 開始建立連接:192.168.16.7, 50071 等待連接成功... 啟動響應(yīng)處理線程:Response-processor-1 啟動響應(yīng)處理線程:Response-processor-0 啟動響應(yīng)處理線程:Response-processor-2 連接成功:ChannelHandlerContext(NettyClient$NettyClientHandler#0, [id: 0xb7a59701, L:/192.168.16.7:58354 - R:/192.168.16.7:50071])  發(fā)送給服務(wù)端JSON為:{"interfaces":"interface=com.study.rpc.test.producer.HelloService& method=sayHello&para;meter=com.study.rpc.test.producer.TestBean","requestId":"1", "parameter":{"com.study.rpc.test.producer.TestBean":{"age":20,"name":"張三"}}}  收到一個響應(yīng):RpcResponse{result='"牛逼,我收到了消息:TestBean{name='張三', age=20}"', interfaceMethodIdentify='interface=com.study.rpc.test.producer.HelloService& method=sayHello&para;meter=com.study.rpc.test.producer.TestBean', requestId='1'} sayHello(TestBean)結(jié)果為:牛逼,我收到了消息:TestBean{name='張三', age=20}

感謝各位的閱讀,以上就是“怎么寫一個RPC框架”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對怎么寫一個RPC框架這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

rpc
AI