溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何設(shè)計(jì)一個(gè)短小精悍、可拓展的RPC框架?(含實(shí)現(xiàn)代碼)

發(fā)布時(shí)間:2020-04-09 20:40:59 來(lái)源:網(wǎng)絡(luò) 閱讀:1116 作者:Java技術(shù)箭 欄目:編程語(yǔ)言

簡(jiǎn)介

如果大家對(duì)RPC有一些了解的話,或多或者都會(huì)聽(tīng)到過(guò)一些大名鼎鼎的RPC框架,比如Dobbo、gRPC。但是大部分人對(duì)于他們底層的實(shí)現(xiàn)原理其實(shí)不甚了解。

有一種比較好的學(xué)習(xí)方式:就是如果你想要了解一個(gè)框架的原理,你可以嘗試去寫(xiě)一個(gè)簡(jiǎn)易版的框架出來(lái),就比如如果你想理解Spring IOC的思想,最好的方式就是自己實(shí)現(xiàn)一個(gè)小型的IOC容器,自己慢慢體會(huì)。

所以本文嘗試帶領(lǐng)大家去設(shè)計(jì)一個(gè)小型的RPC框架,同時(shí)對(duì)于框架會(huì)保持一些拓展點(diǎn)。

通過(guò)閱讀本文,你可以收獲:

  • 理解RPC框架最核心的理念

  • 學(xué)習(xí)在設(shè)計(jì)框架的時(shí)候,如何保持拓展性

本文會(huì)依賴一些組件,他們是實(shí)現(xiàn)RPC框架必要的一些知識(shí),文中會(huì)盡量降低這些知識(shí)帶來(lái)的障礙。但是,最好期望讀者有以下知識(shí)基礎(chǔ):

  • Zookeeper基本入門

  • Netty基本入門

RPC框架應(yīng)該長(zhǎng)什么樣子

我們首先來(lái)看一下:一個(gè)RPC框架是什么東西?

我們最直觀的感覺(jué)就是:

集成了RPC框架之后,通過(guò)配置一個(gè)注冊(cè)中心的地址。一個(gè)應(yīng)用(稱為服務(wù)提供者)將某個(gè)接口(interface)“暴露”出去,另外一個(gè)應(yīng)用(稱為服務(wù)消費(fèi)者)通過(guò)“引用”這個(gè)接口(interface),然后調(diào)用了一下,就很神奇的可以調(diào)用到另外一個(gè)應(yīng)用的方法了

給我們的感覺(jué)就好像調(diào)用了一個(gè)本地方法一樣。即便兩個(gè)應(yīng)用不是在同一個(gè)JVM中甚至兩個(gè)應(yīng)用都不在同一臺(tái)機(jī)器中。

那他們是如何做到的呢?

其實(shí)啊,當(dāng)我們的服務(wù)消費(fèi)者調(diào)用某個(gè)RPC接口的方法之后,它的底層會(huì)通過(guò)動(dòng)態(tài)代理,然后經(jīng)過(guò)網(wǎng)絡(luò)調(diào)用,去到服務(wù)提供者的機(jī)器上,然后執(zhí)行對(duì)應(yīng)的方法。

接著方法的結(jié)果通過(guò)網(wǎng)絡(luò)傳輸返回到服務(wù)消費(fèi)者那里,然后就可以拿到結(jié)果了。

整個(gè)過(guò)程如下圖:

如何設(shè)計(jì)一個(gè)短小精悍、可拓展的RPC框架?(含實(shí)現(xiàn)代碼)


那么這個(gè)時(shí)候,可能有人會(huì)問(wèn)了:服務(wù)消費(fèi)者怎么知道服務(wù)提供者在哪臺(tái)機(jī)器的哪個(gè)端口呢?

這個(gè)時(shí)候,就需要“注冊(cè)中心”登場(chǎng)了,具體來(lái)說(shuō)是這樣子的:

  • 服務(wù)提供者在啟動(dòng)的時(shí)候,將自己應(yīng)用所在機(jī)器的信息提交到注冊(cè)中心上面。

  • 服務(wù)消費(fèi)者在啟動(dòng)的時(shí)候,將需要消費(fèi)的接口所在機(jī)器的信息抓回來(lái)

這樣一來(lái),服務(wù)消費(fèi)者就有了一份服務(wù)提供者所在的機(jī)器列表了

如何設(shè)計(jì)一個(gè)短小精悍、可拓展的RPC框架?(含實(shí)現(xiàn)代碼)


“服務(wù)消費(fèi)者”拿到了“服務(wù)提供者”的機(jī)器列表就可以通過(guò)網(wǎng)絡(luò)請(qǐng)求來(lái)發(fā)起請(qǐng)求了。

網(wǎng)絡(luò)客戶端,我們應(yīng)該采用什么呢?有幾種選擇:

  • 使用JDK原生BIO(也就是ServerSocket那一套)。阻塞式IO方法,無(wú)法支撐高并發(fā)。

  • 使用JDK原生NIO(Selector、SelectionKey那一套)。非阻塞式IO,可以支持高并發(fā),但是自己實(shí)現(xiàn)復(fù)雜,需要處理各種網(wǎng)絡(luò)問(wèn)題。

  • 使用大名鼎鼎的NIO框架Netty,天然支持高并發(fā),封裝好,API易用。

作為一個(gè)有追求的程序員,我們要求開(kāi)發(fā)出來(lái)的框架要求支持高并發(fā)、又要求簡(jiǎn)單、還要快。當(dāng)然是選擇Netty來(lái)實(shí)現(xiàn)了,使用Netty的一些很基本的API就能滿足我們的需求。

網(wǎng)絡(luò)協(xié)議定義

當(dāng)然了,既然我們要使用網(wǎng)絡(luò)傳輸數(shù)據(jù)。我們首先要定義一套網(wǎng)絡(luò)協(xié)議出來(lái)。

你可能又要問(wèn)了,啥叫網(wǎng)絡(luò)協(xié)議?

網(wǎng)絡(luò)協(xié)議,通俗理解,意思就是說(shuō)我們的客戶端發(fā)送的數(shù)據(jù)應(yīng)該長(zhǎng)什么樣子,服務(wù)端可以去解析出來(lái)知道要做什么事情。話不多說(shuō),上代碼:

假設(shè)我們現(xiàn)在服務(wù)提供者有兩個(gè)類:

//?com.study.rpc.test.producer.HelloService
public?interface?HelloService?{
?String?sayHello(TestBean?testBean);
}
//?com.study.rpc.test.producer.TestBean
public?class?TestBean?{
?private?String?name;
?private?Integer?age;
?public?TestBean(String?name,?Integer?age)?{
?this.name?=?name;
?this.age?=?age;
?}
?//?getter?setter
}

現(xiàn)在我要調(diào)用HelloService.sayHello(TestBean testBean)這個(gè)方法

作為“服務(wù)消費(fèi)者”,應(yīng)該怎么定義我們的請(qǐng)求,從而讓服務(wù)端知道我是要調(diào)用這個(gè)方法呢?

這需要我們將這個(gè)接口信息產(chǎn)生一個(gè)唯一的標(biāo)識(shí): 這個(gè)標(biāo)識(shí)會(huì)記錄了接口名、具體是那個(gè)方法、然后具體參數(shù)是什么!

然后將這些信息組織起來(lái)發(fā)送給服務(wù)端,我這里的方式是將信息保存為一個(gè)JSON格式的字符串來(lái)傳輸。

比如上面的接口我們傳輸?shù)臄?shù)據(jù)大概是這樣的:

{
	"interfaces":?"interface=com.study.rpc.test.producer.HelloService&method=sayHello&
	parameter=com.study.rpc.test.producer.TestBean",
	"requestId":?"3",
	"parameter":?{
		"com.study.rpc.test.producer.TestBean":?{
			"age":?20,
			"name":?"張三"
		}
	}
}

嗯,我這里用一個(gè)JSON來(lái)標(biāo)識(shí)這次調(diào)用是調(diào)用哪個(gè)接口的哪個(gè)方法,其中interface標(biāo)識(shí)了唯一的類,parameter標(biāo)識(shí)了里面具體有哪些參數(shù), 其中key就是參數(shù)的類全限定名,value就是這個(gè)類的JSON信息。

可能看到這里,大家可能有意見(jiàn)了: 數(shù)據(jù)不一定用JSON格式傳輸啊,而且使用JSON也不一定性能最高啊。

你使用JDK的Serializable配合Netty的ObjectDecoder來(lái)實(shí)現(xiàn),這當(dāng)然也可以,其實(shí)這里是一個(gè)拓展點(diǎn),我們應(yīng)該要提供多種序列化方式來(lái)供用戶選擇

但是這里選擇了JSON的原因是因?yàn)樗容^直觀,對(duì)于寫(xiě)文章來(lái)說(shuō)比較合理。

開(kāi)發(fā)服務(wù)提供者

嗯,搞定了網(wǎng)絡(luò)協(xié)議之后,我們開(kāi)始開(kāi)發(fā)“服務(wù)提供者”了。對(duì)于服務(wù)提供者,因?yàn)槲覀冞@里是寫(xiě)一個(gè)簡(jiǎn)單版本的RPC框架,為了保持簡(jiǎn)潔。

我們不會(huì)引入類似Spring之類的容器框架,所以我們需要定義一個(gè)服務(wù)提供者的配置類,它用于定義這個(gè)服務(wù)提供者是什么接口,然后它具體的實(shí)例對(duì)象是什么:

public?class?ServiceConfig{
?public?Class?type;
?public?T?instance;
?public?ServiceConfig(Classtype,?T?instance)?{
?this.type?=?type;
?this.instance?=?instance;
?}
?public?ClassgetType()?{
?return?type;
?}
?public?void?setType(Classtype)?{
?this.type?=?type;
?}
?public?T?getInstance()?{
?return?instance;
?}
?public?void?setInstance(T?instance)?{
?this.instance?=?instance;
?}
}

有了這個(gè)東西之后,我們就知道需要暴露哪些接口了。

為了框架有一個(gè)統(tǒng)一的入口,我定義了一個(gè)類叫做ApplicationContext,可以認(rèn)為這是一個(gè)應(yīng)用程序上下文,他的構(gòu)造函數(shù),接收2個(gè)參數(shù),代碼如下:

public?ApplicationContext(String?registryUrl,?ListserviceConfigs){
?//?1.?保存需要暴露的接口配置
?this.serviceConfigs?=?serviceConfigs?==?null???new?ArrayList<>()?:?serviceConfigs;
?//?step?2:?實(shí)例化注冊(cè)中心
?initRegistry(registryUrl);
?//?step?3:?將接口注冊(cè)到注冊(cè)中心,從注冊(cè)中心獲取接口,初始化服務(wù)接口列表
?RegistryInfo?registryInfo?=?null;
?InetAddress?addr?=?InetAddress.getLocalHost();
?String?hostname?=?addr.getHostName();
?String?hostAddress?=?addr.getHostAddress();
?registryInfo?=?new?RegistryInfo(hostname,?hostAddress,?port);
?doRegistry(registryInfo);
?
?
?//?step?4:初始化Netty服務(wù)器,接受到請(qǐng)求,直接打到服務(wù)提供者的service方法中
?if?(!this.serviceConfigs.isEmpty())?{
?//?需要暴露接口才暴露
?nettyServer?=?new?NettyServer(this.serviceConfigs,?interfaceMethods);
?nettyServer.init(port);
?}
}

注冊(cè)中心設(shè)計(jì)

這里分為幾個(gè)步驟,首先保存了接口配置,接著初始化注冊(cè)中心,因?yàn)樽?cè)中心可能會(huì)提供多種來(lái)供用戶選擇,所以這里需要定義一個(gè)注冊(cè)中心的接口:

public?interface?Registry?{
?/**
?*?將生產(chǎn)者接口注冊(cè)到注冊(cè)中心
?*
?*?@param?clazz?類
?*?@param?registryInfo?本機(jī)的注冊(cè)信息
?*/
?void?register(Class?clazz,?RegistryInfo?registryInfo)?throws?Exception;
}

這里我們提供一個(gè)注冊(cè)的方法,這個(gè)方法的語(yǔ)義是將clazz對(duì)應(yīng)的接口注冊(cè)到注冊(cè)中心。接收兩個(gè)參數(shù),一個(gè)是接口的class對(duì)象,另一個(gè)是注冊(cè)信息,

里面包含了本機(jī)的一些基本信息,如下:

public?class?RegistryInfo?{
?private?String?hostname;
?private?String?ip;
?private?Integer?port;
?public?RegistryInfo(String?hostname,?String?ip,?Integer?port)?{
?this.hostname?=?hostname;
?this.ip?=?ip;
?this.port?=?port;
?}
?//?getter?setter
}

好了,定義好注冊(cè)中心,回到之前的實(shí)例化注冊(cè)中心的地方,代碼如下:

/**
?*?注冊(cè)中心
?*/
private?Registry?registry;
private?void?initRegistry(String?registryUrl)?{
?if?(registryUrl.startsWith("zookeeper://"))?{
?registryUrl?=?registryUrl.substring(12);
?registry?=?new?ZookeeperRegistry(registryUrl);
?}?else?if?(registryUrl.startsWith("multicast://"))?{
?registry?=?new?MulticastRegistry(registryUrl);
?}
}

這里邏輯也非常簡(jiǎn)單,就是根據(jù)url的schema來(lái)判斷是那個(gè)注冊(cè)中心

注冊(cè)中心這里實(shí)現(xiàn)了2個(gè)實(shí)現(xiàn)類,分別使用zookeeper作為注冊(cè)中心,另外一個(gè)是使用廣播的方式作為注冊(cè)中心。

廣播注冊(cè)中心這里僅僅是做個(gè)示范,內(nèi)部沒(méi)有實(shí)現(xiàn)。我們主要是實(shí)現(xiàn)了zookeeper的注冊(cè)中心。

(當(dāng)然了,如果有興趣,可以實(shí)現(xiàn)更多的注冊(cè)中心供用戶選擇,比如redis之類的,這里只是為了保持“拓展點(diǎn)”)

那么實(shí)例化完注冊(cè)中心之后,回到上面的代碼:

注冊(cè)服務(wù)提供者

//?step?3:?將接口注冊(cè)到注冊(cè)中心,從注冊(cè)中心獲取接口,初始化服務(wù)接口列表
RegistryInfo?registryInfo?=?null;
InetAddress?addr?=?InetAddress.getLocalHost();
String?hostname?=?addr.getHostName();
String?hostAddress?=?addr.getHostAddress();
registryInfo?=?new?RegistryInfo(hostname,?hostAddress,?port);
doRegistry(registryInfo);

這里邏輯很簡(jiǎn)單,就是獲取本機(jī)的的基本信息構(gòu)造成RegistryInfo,然后調(diào)用了doRegistry方法:

/**
?*?接口方法對(duì)應(yīng)method對(duì)象
?*/
private?MapinterfaceMethods?=?new?ConcurrentHashMap<>();
private?void?doRegistry(RegistryInfo?registryInfo)?throws?Exception?{
?for?(ServiceConfig?config?:?serviceConfigs)?{
?Class?type?=?config.getType();
?registry.register(type,?registryInfo);
?Method[]?declaredMethods?=?type.getDeclaredMethods();
?for?(Method?method?:?declaredMethods)?{
?String?identify?=?InvokeUtils.buildInterfaceMethodIdentify(type,?method);
?interfaceMethods.put(identify,?method);
?}
?}
}

這里做了2件事情:

  • 將接口注冊(cè)到注冊(cè)中心中

  • 對(duì)于每一個(gè)接口的每一個(gè)方法,生成一個(gè)唯一標(biāo)識(shí),保存在interfaceMethods集合中

下面分別分析這兩件事情,首先是注冊(cè)方法:

因?yàn)槲覀冇玫搅藌ookeeper,為了方便,引入了zookeeper的客戶端框架curator

<dependency>
?<groupId>org.apache.curatorgroupId>
?<artifactId>curator-recipesartifactId>
?<version>2.3.0version>
dependency>

接著看代碼:

public?class?ZookeeperRegistry?implements?Registry?{
?private?CuratorFramework?client;
?public?ZookeeperRegistry(String?connectString)?{
?RetryPolicy?retryPolicy?=?new?ExponentialBackoffRetry(1000,?3);
?client?=?CuratorFrameworkFactory.newClient(connectString,?retryPolicy);
?client.start();
?try?{
?Stat?myRPC?=?client.checkExists().forPath("/myRPC");
?if?(myRPC?==?null)?{
?client.create()
?.creatingParentsIfNeeded()
?.forPath("/myRPC");
?}
?System.out.println("Zookeeper?Client初始化完畢......");
?}?catch?(Exception?e)?{
?e.printStackTrace();
?}
?}
?@Override
?public?void?register(Class?clazz,?RegistryInfo?registryInfo)?throws?Exception?{
?//?1.?注冊(cè)的時(shí)候,先從zk中獲取數(shù)據(jù)
?//?2.?將自己的服務(wù)器地址加入注冊(cè)中心中
?//?為每一個(gè)接口的每一個(gè)方法注冊(cè)一個(gè)臨時(shí)節(jié)點(diǎn),然后key為接口方法的唯一標(biāo)識(shí),data為服務(wù)地址列表
?Method[]?declaredMethods?=?clazz.getDeclaredMethods();
?for?(Method?method?:?declaredMethods)?{
?String?key?=?InvokeUtils.buildInterfaceMethodIdentify(clazz,?method);
?String?path?=?"/myRPC/"?+?key;
?Stat?stat?=?client.checkExists().forPath(path);
?ListregistryInfos;
?if?(stat?!=?null)?{
?//?如果這個(gè)接口已經(jīng)有人注冊(cè)過(guò)了,把數(shù)據(jù)拿回來(lái),然后將自己的信息保存進(jìn)去
?byte[]?bytes?=?client.getData().forPath(path);
?String?data?=?new?String(bytes,?StandardCharsets.UTF_8);
?registryInfos?=?JSONArray.parseArray(data,?RegistryInfo.class);
?if?(registryInfos.contains(registryInfo))?{
?//?正常來(lái)說(shuō),zk的臨時(shí)節(jié)點(diǎn),斷開(kāi)連接后,直接就沒(méi)了,但是重啟會(huì)經(jīng)常發(fā)現(xiàn)存在節(jié)點(diǎn),所以有了這樣的代碼
?System.out.println("地址列表已經(jīng)包含本機(jī)【"?+?key?+?"】,不注冊(cè)了");
?}?else?{
?registryInfos.add(registryInfo);
?client.setData().forPath(path,?JSONArray.toJSONString(registryInfos).getBytes());
?System.out.println("注冊(cè)到注冊(cè)中心,路徑為:【"?+?path?+?"】?信息為:"?+?registryInfo);
?}
?}?else?{
?registryInfos?=?new?ArrayList<>();
?registryInfos.add(registryInfo);
?client.create()
?.creatingParentsIfNeeded()
?//?臨時(shí)節(jié)點(diǎn),斷開(kāi)連接就關(guān)閉
?.withMode(CreateMode.EPHEMERAL)
?.forPath(path,?JSONArray.toJSONString(registryInfos).getBytes());
?System.out.println("注冊(cè)到注冊(cè)中心,路徑為:【"?+?path?+?"】?信息為:"?+?registryInfo);
?}
?}
?}
}

zookeeper注冊(cè)中心在初始化的時(shí)候,會(huì)建立好連接。然后注冊(cè)的時(shí)候,針對(duì)clazz接口的每一個(gè)方法,都會(huì)生成一個(gè)唯一標(biāo)識(shí)

這里使用了InvokeUtils.buildInterfaceMethodIdentify方法:

public?static?String?buildInterfaceMethodIdentify(Class?clazz,?Method?method)?{
?Map<String,?String>?map?=?new?LinkedHashMap<>();
?map.put("interface",?clazz.getName());
?map.put("method",?method.getName());
?Parameter[]?parameters?=?method.getParameters();
?if?(parameters.length?>?0)?{
?StringBuilder?param?=?new?StringBuilder();
?for?(int?i?=?0;?i?<?parameters.length;?i++)?{
?Parameter?p?=?parameters[i];
?param.append(p.getType().getName());
?if?(i?<?parameters.length?-?1)?{
?param.append(",");
?}
?}
?map.put("parameter",?param.toString());
?}
?return?map2String(map);
}
public?static?String?map2String(Map<String,?String>?map)?{
?StringBuilder?sb?=?new?StringBuilder();
?Iterator<map.entry<string,?<=""?span="">String>>?iterator?=?map.entrySet().iterator();
?while?(iterator.hasNext())?{
?Map.Entry<String,?String>?entry?=?iterator.next();
?sb.append(entry.getKey()?+?"="?+?entry.getValue());
?if?(iterator.hasNext())?{
?sb.append("&");
?}
?}
?return?sb.toString();
}

其實(shí)就是對(duì)接口的方法使用他們的限定名和參數(shù)來(lái)組成一個(gè)唯一的標(biāo)識(shí),比如 HelloService#sayHello(TestBean)生成的大概是這樣的:

interface=com.study.rpc.test.producer.HelloService&method=sayHello&
parameter=com.study.rpc.test.producer.TestBean

接下來(lái)的邏輯就簡(jiǎn)單了,在Zookeeper中的/myRPC路徑下面建立臨時(shí)節(jié)點(diǎn),節(jié)點(diǎn)名稱為我們上面的接口方法唯一標(biāo)識(shí),數(shù)據(jù)內(nèi)容為機(jī)器信息。

之所以采用臨時(shí)節(jié)點(diǎn)是因?yàn)椋喝绻麢C(jī)器宕機(jī)了,連接斷開(kāi)之后,消費(fèi)者可以通過(guò)zookeeper的watcher機(jī)制感知到

大概看起來(lái)是這樣的:

?/myRPC/interface=com.study.rpc.test.producer.HelloService&method=sayHello&
?parameter=com.study.rpc.test.producer.TestBean
?[
?{
?"hostname":peer1,
?"port":8080
?},
?{
?"hostname":peer2,
?"port":8081
?}
?]

通過(guò)這樣的方式,在服務(wù)消費(fèi)的時(shí)候就可以拿到這樣的注冊(cè)信息,然后知道可以調(diào)用那臺(tái)機(jī)器的那個(gè)端口。

好了,注冊(cè)中心弄完了之后,我們回到前面說(shuō)的注冊(cè)方法做的第二件事情,我們將每一個(gè)接口方法標(biāo)識(shí)的方法放入了一個(gè)map中:

/**
?*?接口方法對(duì)應(yīng)method對(duì)象
?*/
private?Map<String,?Method>?interfaceMethods?=?new?ConcurrentHashMap<>();

這個(gè)的原因是因?yàn)?,我們?cè)谑盏骄W(wǎng)絡(luò)請(qǐng)求的時(shí)候,需要調(diào)用反射的方式調(diào)用method對(duì)象,所以存起來(lái)。

啟動(dòng)網(wǎng)絡(luò)服務(wù)端接受請(qǐng)求

接下來(lái)我們就可以看第四步了:

//?step?4:初始化Netty服務(wù)器,接受到請(qǐng)求,直接打到服務(wù)提供者的service方法中
if?(!this.serviceConfigs.isEmpty())?{
?//?需要暴露接口才暴露
?nettyServer?=?new?NettyServer(this.serviceConfigs,?interfaceMethods);
?nettyServer.init(port);
}

因?yàn)檫@里使用Netty來(lái)做的所以需要引入Netty的依賴:

<dependency>
?<groupId>io.nettygroupId>
?<artifactId>netty-allartifactId>
?<version>4.1.30.Finalversion>
dependency>

接著來(lái)分析:

public?class?NettyServer?{
?/**
?*?負(fù)責(zé)調(diào)用方法的handler
?*/
?private?RpcInvokeHandler?rpcInvokeHandler;
?public?NettyServer(ListserverConfigs,?MapinterfaceMethods)?throws?InterruptedException?{
?this.rpcInvokeHandler?=?new?RpcInvokeHandler(serverConfigs,?interfaceMethods);
?}
?public?int?init(int?port)?throws?Exception?{
?EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();
?EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();
?ServerBootstrap?b?=?new?ServerBootstrap();
?b.group(bossGroup,?workerGroup)
?.channel(NioServerSocketChannel.class)
?.option(ChannelOption.SO_BACKLOG,?1024)
?.childHandler(new?ChannelInitializer()?{
?@Override
?protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
?ByteBuf?delimiter?=?Unpooled.copiedBuffer("$$");
?//?設(shè)置按照分隔符“&&”來(lái)切分消息,單條消息限制為?1MB
?ch.pipeline().addLast(new?DelimiterBasedFrameDecoder(1024?*?1024,?delimiter));
?ch.pipeline().addLast(new?StringDecoder());
?ch.pipeline().addLast().addLast(rpcInvokeHandler);
?}
?});
?ChannelFuture?sync?=?b.bind(port).sync();
?System.out.println("啟動(dòng)NettyService,端口為:"?+?port);
?return?port;
?}
}

這部分主要的都是netty的api,我們不做過(guò)多的說(shuō)明,就簡(jiǎn)單的說(shuō)一下:

  • 我們通過(guò)“&&”作為標(biāo)識(shí)符號(hào)來(lái)區(qū)分兩條信息,然后一條信息的最大長(zhǎng)度為1MB

  • 所有邏輯都在RpcInvokeHandler中,這里面?zhèn)鬟M(jìn)去了配置的服務(wù)接口實(shí)例,以及服務(wù)接口實(shí)例每個(gè)接口方法唯一標(biāo)識(shí)對(duì)應(yīng)的Method對(duì)象的Map集合。

public?class?RpcInvokeHandler?extends?ChannelInboundHandlerAdapter?{
?/**
?*?接口方法唯一標(biāo)識(shí)對(duì)應(yīng)的Method對(duì)象
?*/
?private?Map<String,?Method>?interfaceMethods;
?/**
?*?接口對(duì)應(yīng)的實(shí)現(xiàn)類
?*/
?private?Map<class,?Object>?interfaceToInstance;
?/**
?*?線程池,隨意寫(xiě)的,不要吐槽
?*/
?private?ThreadPoolExecutor?threadPoolExecutor?=?new?ThreadPoolExecutor(10,
?50,?60,?TimeUnit.SECONDS,?new?LinkedBlockingQueue<>(100),
?new?ThreadFactory()?{
?AtomicInteger?m?=?new?AtomicInteger(0);
?@Override
?public?Thread?newThread(Runnable?r)?{
?return?new?Thread(r,?"IO-thread-"?+?m.incrementAndGet());
?}
?});
?public?RpcInvokeHandler(ListserviceConfigList,
?Map<String,?Method>?interfaceMethods)?{
?this.interfaceToInstance?=?new?ConcurrentHashMap<>();
?this.interfaceMethods?=?interfaceMethods;
?for?(ServiceConfig?config?:?serviceConfigList)?{
?interfaceToInstance.put(config.getType(),?config.getInstance());
?}
?}
?@Override
?public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{
?try?{
?String?message?=?(String)?msg;
?//?這里拿到的是一串JSON數(shù)據(jù),解析為Request對(duì)象,
?//?事實(shí)上這里解析網(wǎng)絡(luò)數(shù)據(jù),可以用序列化方式,定一個(gè)接口,可以實(shí)現(xiàn)JSON格式序列化,或者其他序列化
?//?但是demo版本就算了。
?System.out.println("接收到消息:"?+?msg);
?RpcRequest?request?=?RpcRequest.parse(message,?ctx);
?threadPoolExecutor.execute(new?RpcInvokeTask(request));
?}?finally?{
?ReferenceCountUtil.release(msg);
?}
?}
?@Override
?public?void?channelReadComplete(ChannelHandlerContext?ctx)?throws?Exception?{
?ctx.flush();
?}
?@Override
?public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
?System.out.println("發(fā)生了異常..."?+?cause);
?cause.printStackTrace();
?ctx.close();
?}
?public?class?RpcInvokeTask?implements?Runnable?{
?private?RpcRequest?rpcRequest;
?RpcInvokeTask(RpcRequest?rpcRequest)?{
?this.rpcRequest?=?rpcRequest;
?}
?@Override
?public?void?run()?{
?try?{
?/*
?*?數(shù)據(jù)大概是這樣子的
?*?{"interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello?meter=com
?*?.study.rpc.test.producer.TestBean","requestId":"3","parameter":{"com.study.rpc.test.producer
?*?.TestBean":{"age":20,"name":"張三"}}}
?*/
?//?這里希望能拿到每一個(gè)服務(wù)對(duì)象的每一個(gè)接口的特定聲明
?String?interfaceIdentity?=?rpcRequest.getInterfaceIdentity();
?Method?method?=?interfaceMethods.get(interfaceIdentity);
?Map<String,?String>?map?=?string2Map(interfaceIdentity);
?String?interfaceName?=?map.get("interface");
?Class?interfaceClass?=?Class.forName(interfaceName);
?Object?o?=?interfaceToInstance.get(interfaceClass);
?String?parameterString?=?map.get("parameter");
?Object?result;
?if?(parameterString?!=?null)?{
?String[]?parameterTypeClass?=?parameterString.split(",");
?Map<String,?Object>?parameterMap?=?rpcRequest.getParameterMap();
?Object[]?parameterInstance?=?new?Object[parameterTypeClass.length];
?for?(int?i?=?0;?i?<?parameterTypeClass.length;?i++)?{
?String?parameterClazz?=?parameterTypeClass[i];
?parameterInstance[i]?=?parameterMap.get(parameterClazz);
?}
?result?=?method.invoke(o,?parameterInstance);
?}?else?{
?result?=?method.invoke(o);
?}
?//?寫(xiě)回響應(yīng)
?ChannelHandlerContext?ctx?=?rpcRequest.getCtx();
?String?requestId?=?rpcRequest.getRequestId();
?RpcResponse?response?=?RpcResponse.create(JSONObject.toJSONString(result),?interfaceIdentity,
?requestId);
?String?s?=?JSONObject.toJSONString(response)?+?"$$";
?ByteBuf?byteBuf?=?Unpooled.copiedBuffer(s.getBytes());
?ctx.writeAndFlush(byteBuf);
?System.out.println("響應(yīng)給客戶端:"?+?s);
?}?catch?(Exception?e)?{
?e.printStackTrace();
?}
?}
?
?public?static?Map<String,?String>?string2Map(String?str)?{
?String[]?split?=?str.split("&");
?Map<String,?String>?map?=?new?HashMap<>(16);
?for?(String?s?:?split)?{
?String[]?split1?=?s.split("=");
?map.put(split1[0],?split1[1]);
?}
?return?map;
?}
?}
}

這里說(shuō)明一下上面的邏輯:

channelRead方法用于接收消息,接收到的就是我們前面分析的那個(gè)JSON格式的數(shù)據(jù),接著我們將消息解析成RpcRequest

public?class?RpcRequest?{
?private?String?interfaceIdentity;
?private?Map<String,?Object>?parameterMap?=?new?HashMap<>();
?private?ChannelHandlerContext?ctx;
?private?String?requestId;
?public?static?RpcRequest?parse(String?message,?ChannelHandlerContext?ctx)?throws?ClassNotFoundException?{
?/*
?*?{
?*?"interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello2?meter=java.lang
?*?.String,com.study.rpc.test.producer.TestBean",
?*?"parameter":{
?*?"java.lang.String":"haha",
?*?"com.study.rpc.test.producer.TestBean":{
?*?"name":"小王",
?*?"age":20
?*?}
?*?}
?*?}
?*/
?JSONObject?jsonObject?=?JSONObject.parseObject(message);
?String?interfaces?=?jsonObject.getString("interfaces");
?JSONObject?parameter?=?jsonObject.getJSONObject("parameter");
?Set<String>?strings?=?parameter.keySet();
?RpcRequest?request?=?new?RpcRequest();
?request.setInterfaceIdentity(interfaces);
?Map<String,?Object>?parameterMap?=?new?HashMap<>(16);
?String?requestId?=?jsonObject.getString("requestId");
?for?(String?key?:?strings)?{
?if?(key.equals("java.lang.String"))?{
?parameterMap.put(key,?parameter.getString(key));
?}?else?{
?Class?clazz?=?Class.forName(key);
?Object?object?=?parameter.getObject(key,?clazz);
?parameterMap.put(key,?object);
?}
?}
?request.setParameterMap(parameterMap);
?request.setCtx(ctx);
?request.setRequestId(requestId);
?return?request;
?}
}

接著從request中解析出來(lái)需要調(diào)用的接口,然后通過(guò)反射調(diào)用對(duì)應(yīng)的接口,得到結(jié)果后我們將響應(yīng)封裝成PrcResponse寫(xiě)回給客戶端:

public?class?RpcResponse?{
?private?String?result;
?private?String?interfaceMethodIdentify;
?private?String?requestId;
?public?String?getResult()?{
?return?result;
?}
?public?void?setResult(String?result)?{
?this.result?=?result;
?}
?public?static?RpcResponse?create(String?result,?String?interfaceMethodIdentify,?String?requestId)?{
?RpcResponse?response?=?new?RpcResponse();
?response.setResult(result);
?response.setInterfaceMethodIdentify(interfaceMethodIdentify);
?response.setRequestId(requestId);
?return?response;
?}
}

里面包含了請(qǐng)求的結(jié)果JSON串,接口方法唯一標(biāo)識(shí),請(qǐng)求ID。數(shù)據(jù)大概看起來(lái)這個(gè)樣子:

{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService&method=sayHello&
parameter=com.study.rpc.test.producer.TestBean","requestId":"3",
"result":"\"牛逼,我收到了消息:TestBean{name='張三',?age=20}\""}

通過(guò)這樣的信息,客戶端就可以通過(guò)響應(yīng)結(jié)果解析出來(lái)。

測(cè)試服務(wù)提供者

既然我們代碼寫(xiě)完了,現(xiàn)在需要測(cè)試一把:

首先我們先寫(xiě)一個(gè)HelloService的實(shí)現(xiàn)類出來(lái):

public?class?HelloServiceImpl?implements?HelloService?{
?@Override
?public?String?sayHello(TestBean?testBean)?{
?return?"牛逼,我收到了消息:"?+?testBean;
?}
}

接著編寫(xiě)服務(wù)提供者代碼:

public?class?TestProducer?{
?public?static?void?main(String[]?args)?throws?Exception?{
?String?connectionString?=?"zookeeper://localhost1:2181,localhost2:2181,localhost3:2181";
?HelloService?service?=?new?HelloServiceImpl();
?ServiceConfig?config?=?new?ServiceConfig<>(HelloService.class,?service);
?ListserviceConfigList?=?new?ArrayList<>();
?serviceConfigList.add(config);
?ApplicationContext?ctx?=?new?ApplicationContext(connectionString,?serviceConfigList,
?null,?50071);
?}
}

接著啟動(dòng)起來(lái),看到日志:

Zookeeper?Client初始化完畢......
注冊(cè)到注冊(cè)中心,路徑為:【/myRPC/interface=com.study.rpc.test.producer.HelloService&
method=sayHello?meter=com.study.rpc.test.producer.TestBean】
信息為:RegistryInfo{hostname='localhost',?ip='192.168.16.7',?port=50071}
啟動(dòng)NettyService,端口為:50071

這個(gè)時(shí)候,我們期望用NettyClient發(fā)送請(qǐng)求:

{
	"interfaces":?"interface=com.study.rpc.test.producer.HelloService&
	method=sayHello?meter=com.study.rpc.test.producer.TestBean",
	"requestId":?"3",
	"parameter":?{
		"com.study.rpc.test.producer.TestBean":?{
			"age":?20,
			"name":?"張三"
		}
	}
}

得到的響應(yīng)應(yīng)該是:

{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService&method=sayHello&
parameter=com.study.rpc.test.producer.TestBean","requestId":"3",
"result":"\"牛逼,我收到了消息:TestBean{name='張三',?age=20}\""}

那么,可以編寫(xiě)一個(gè)測(cè)試程序(這個(gè)程序僅僅用于中間測(cè)試用,讀者不必理解):

public class NettyClient {

public static void main(String[] args) {

EventLoopGroup group = new NioEventLoopGroup();

try {

Bootstrap b = new Bootstrap();

b.group(group)

.channel(NioSocketChannel.class)

.option(ChannelOption.TCP_NODELAY, true)

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(new StringDecoder());

ch.pipeline().addLast(new NettyClientHandler());

}

});

ChannelFuture sync = b.connect("127.0.0.1", 50071).sync();

sync.channel().closeFuture().sync();

} catch (Exception e) {

e.printStackTrace();

} finally {

group.shutdownGracefully();

}

}

private static class NettyClientHandler extends ChannelInboundHandlerAdapter {

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

JSONObject jsonObject = new JSONObject();

jsonObject.put("interfaces", "interface=com.study.rpc.test.producer" +

".HelloService&method=sayHello?meter=com.study.rpc.test.producer.TestBean");

JSONObject param = new JSONObject();

JSONObject bean = new JSONObject();

bean.put("age", 20);

bean.put("name", "張三");

param.put("com.study.rpc.test.producer.TestBean", bean);

jsonObject.put("parameter", param);

jsonObject.put("requestId", 3);

System.out.println("發(fā)送給服務(wù)端JSON為:" + jsonObject.toJSONString());

String msg = jsonObject.toJSONString() + "$$";

ByteBuf byteBuf = Unpooled.buffer(msg.getBytes().length);

byteBuf.writeBytes(msg.getBytes());

ctx.writeAndFlush(byteBuf);

}

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

System.out.println("收到消息:" + msg);

}

}

}

啟動(dòng)之后,看到控制臺(tái)輸出:

發(fā)送給服務(wù)端JSON為:{"interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello&

parameter=com.study.rpc.test.producer.TestBean","requestId":3,

"parameter":{"com.study.rpc.test.producer.TestBean":{"name":"張三","age":20}}}

收到消息:{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService&

method=sayHello?meter=com.study.rpc.test.producer.TestBean","requestId":"3",

"result":"\"牛逼,我收到了消息:TestBean{name='張三', age=20}\""}

bingo,完美實(shí)現(xiàn)了RPC的服務(wù)提供者。接下來(lái)我們只需要實(shí)現(xiàn)服務(wù)消費(fèi)者就完成了。

開(kāi)發(fā)服務(wù)消費(fèi)者

服務(wù)消費(fèi)者是同樣的處理,我們同樣要定義一個(gè)消費(fèi)者的配置:

public?class?ReferenceConfig{
?private?Class?type;
?public?ReferenceConfig(Classtype)?{
?this.type?=?type;
?}
?public?ClassgetType()?{
?return?type;
?}
?public?void?setType(Classtype)?{
?this.type?=?type;
?}
}

然后我們是統(tǒng)一入口,在ApplicationContext中修改代碼:

public?ApplicationContext(String?registryUrl,?ListserviceConfigs,
?ListreferenceConfigs,?int?port)?throws?Exception?{
?//?step?1:?保存服務(wù)提供者和消費(fèi)者
?this.serviceConfigs?=?serviceConfigs?==?null???new?ArrayList<>()?:?serviceConfigs;
?this.referenceConfigs?=?referenceConfigs?==?null???new?ArrayList<>()?:?referenceConfigs;
?//?....
?
}
private?void?doRegistry(RegistryInfo?registryInfo)?throws?Exception?{
?for?(ServiceConfig?config?:?serviceConfigs)?{
?Class?type?=?config.getType();
?registry.register(type,?registryInfo);
?Method[]?declaredMethods?=?type.getDeclaredMethods();
?for?(Method?method?:?declaredMethods)?{
?String?identify?=?InvokeUtils.buildInterfaceMethodIdentify(type,?method);
?interfaceMethods.put(identify,?method);
?}
?}
?for?(ReferenceConfig?config?:?referenceConfigs)?{
?ListregistryInfos?=?registry.fetchRegistry(config.getType());
?if?(registryInfos?!=?null)?{
?interfacesMethodRegistryList.put(config.getType(),?registryInfos);
?initChannel(registryInfos);
?}
?}
}

在注冊(cè)的時(shí)候,我們需要將需要消費(fèi)的接口,通過(guò)注冊(cè)中心抓取出來(lái),所以注冊(cè)中心要增加一個(gè)接口方法:

public?interface?Registry?{
?/**
?*?將生產(chǎn)者接口注冊(cè)到注冊(cè)中心
?*
?*?@param?clazz?類
?*?@param?registryInfo?本機(jī)的注冊(cè)信息
?*/
?void?register(Class?clazz,?RegistryInfo?registryInfo)?throws?Exception;
?/**
?*?為服務(wù)提供者抓取注冊(cè)表
?*
?*?@param?clazz?類
?*?@return?服務(wù)提供者所在的機(jī)器列表
?*/
?ListfetchRegistry(Class?clazz)?throws?Exception;
}

獲取服務(wù)提供者的機(jī)器列表

具體在Zookeeper中的實(shí)現(xiàn)如下:

@Override
public?ListfetchRegistry(Class?clazz)?throws?Exception?{
?Method[]?declaredMethods?=?clazz.getDeclaredMethods();
?ListregistryInfos?=?null;
?for?(Method?method?:?declaredMethods)?{
?String?key?=?InvokeUtils.buildInterfaceMethodIdentify(clazz,?method);
?String?path?=?"/myRPC/"?+?key;
?Stat?stat?=?client.checkExists()
?.forPath(path);
?if?(stat?==?null)?{
?//?這里可以添加watcher來(lái)監(jiān)聽(tīng)變化,這里簡(jiǎn)化了,沒(méi)有做這個(gè)事情
?System.out.println("警告:無(wú)法找到服務(wù)接口:"?+?path);
?continue;
?}
?if?(registryInfos?==?null)?{
?byte[]?bytes?=?client.getData().forPath(path);
?String?data?=?new?String(bytes,?StandardCharsets.UTF_8);
?registryInfos?=?JSONArray.parseArray(data,?RegistryInfo.class);
?}
?}
?return?registryInfos;
}

其實(shí)就是去zookeeper獲取節(jié)點(diǎn)中的數(shù)據(jù),得到接口所在的機(jī)器信息,獲取到的注冊(cè)信息諸侯,就會(huì)調(diào)用以下代碼:

if?(registryInfos?!=?null)?{
?//?保存接口和服務(wù)地址
?interfacesMethodRegistryList.put(config.getType(),?registryInfos);
?//?初始化網(wǎng)絡(luò)連接
?initChannel(registryInfos);
}
private?void?initChannel(ListregistryInfos)?throws?InterruptedException?{
?for?(RegistryInfo?info?:?registryInfos)?{
?if?(!channels.containsKey(info))?{
?System.out.println("開(kāi)始建立連接:"?+?info.getIp()?+?",?"?+?info.getPort());
?NettyClient?client?=?new?NettyClient(info.getIp(),?info.getPort());
?client.setMessageCallback(message?->?{
?//?這里收單服務(wù)端返回的消息,先壓入隊(duì)列
?RpcResponse?response?=?JSONObject.parseObject(message,?RpcResponse.class);
?responses.offer(response);
?synchronized?(ApplicationContext.this)?{
?ApplicationContext.this.notifyAll();
?}
?});
?//?等待連接建立
?ChannelHandlerContext?ctx?=?client.getCtx();
?channels.put(info,?ctx);
?}
?}
}

我們會(huì)針對(duì)每一個(gè)唯一的RegistryInfo建立一個(gè)連接,然后有這樣一段代碼:

client.setMessageCallback(message?->?{
?//?這里收單服務(wù)端返回的消息,先壓入隊(duì)列
?RpcResponse?response?=?JSONObject.parseObject(message,?RpcResponse.class);
?responses.offer(response);
?synchronized?(ApplicationContext.this)?{
?ApplicationContext.this.notifyAll();
?}
});

設(shè)置一個(gè)callback,用于收到消息的時(shí)候,回調(diào)這里的代碼,這部分我們后面再分析。

然后在client.getCtx()的時(shí)候,同步阻塞直到連接完成,建立好連接后通過(guò),NettyClient的代碼如下:

public?class?NettyClient?{
?private?ChannelHandlerContext?ctx;
?private?MessageCallback?messageCallback;
?public?NettyClient(String?ip,?Integer?port)?{
?EventLoopGroup?group?=?new?NioEventLoopGroup();
?try?{
?Bootstrap?b?=?new?Bootstrap();
?b.group(group)
?.channel(NioSocketChannel.class)
?.option(ChannelOption.TCP_NODELAY,?true)
?.handler(new?ChannelInitializer()?{
?@Override
?protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
?ByteBuf?delimiter?=?Unpooled.copiedBuffer("$$".getBytes());
?//?設(shè)置按照分隔符“&&”來(lái)切分消息,單條消息限制為?1MB
?ch.pipeline().addLast(new?DelimiterBasedFrameDecoder(1024?*?1024,?delimiter));
?ch.pipeline().addLast(new?StringDecoder());
?ch.pipeline().addLast(new?NettyClientHandler());
?}
?});
?ChannelFuture?sync?=?b.connect(ip,?port).sync();
?}?catch?(Exception?e)?{
?e.printStackTrace();
?}
?}
?public?void?setMessageCallback(MessageCallback?callback)?{
?this.messageCallback?=?callback;
?}
?public?ChannelHandlerContext?getCtx()?throws?InterruptedException?{
?System.out.println("等待連接成功...");
?if?(ctx?==?null)?{
?synchronized?(this)?{
?wait();
?}
?}
?return?ctx;
?}
?private?class?NettyClientHandler?extends?ChannelInboundHandlerAdapter?{
?@Override
?public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{
?try?{
?String?message?=?(String)?msg;
?if?(messageCallback?!=?null)?{
?messageCallback.onMessage(message);
?}
?}?finally?{
?ReferenceCountUtil.release(msg);
?}
?}
?@Override
?public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
?NettyClient.this.ctx?=?ctx;
?System.out.println("連接成功:"?+?ctx);
?synchronized?(NettyClient.this)?{
?NettyClient.this.notifyAll();
?}
?}
?@Override
?public?void?channelReadComplete(ChannelHandlerContext?ctx)?throws?Exception?{
?ctx.flush();
?}
?@Override
?public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
?cause.printStackTrace();
?}
?}
?public?interface?MessageCallback?{
?void?onMessage(String?message);
?}
}

這里主要是用了wait()和notifyAll()來(lái)實(shí)現(xiàn)同步阻塞等待連接建立。

建立好連接后,我們保存到集合中:

//?等待連接建立
ChannelHandlerContext?ctx?=?client.getCtx();
channels.put(info,?ctx);

發(fā)送請(qǐng)求

好了,到了這里我們?yōu)槊恳粋€(gè)需要消費(fèi)的接口建立了網(wǎng)絡(luò)連接,接下來(lái)要做的事情就是提供一個(gè)接口給用戶獲取服務(wù)提供者實(shí)例:

我把這個(gè)方法寫(xiě)在ApplicationContext中:

/**
?*?負(fù)責(zé)生成requestId的類
?*/
private?LongAdder?requestIdWorker?=?new?LongAdder();
/**
?*?獲取調(diào)用服務(wù)
?*/
@SuppressWarnings("unchecked")
publicT?getService(Classclazz)?{
?return?(T)?Proxy.newProxyInstance(getClass().getClassLoader(),?new?Class[]{clazz},?new?InvocationHandler()?{
?@Override
?public?Object?invoke(Object?proxy,?Method?method,?Object[]?args)?throws?Throwable?{
?String?methodName?=?method.getName();
?if?("equals".equals(methodName)?||?"hashCode".equals(methodName))?{
?throw?new?IllegalAccessException("不能訪問(wèn)"?+?methodName?+?"方法");
?}
?if?("toString".equals(methodName))?{
?return?clazz.getName()?+?"#"?+?methodName;
?}
?//?step?1:?獲取服務(wù)地址列表
?ListregistryInfos?=?interfacesMethodRegistryList.get(clazz);
?if?(registryInfos?==?null)?{
?throw?new?RuntimeException("無(wú)法找到服務(wù)提供者");
?}
?//?step?2:?負(fù)載均衡
?RegistryInfo?registryInfo?=?loadBalancer.choose(registryInfos);
?ChannelHandlerContext?ctx?=?channels.get(registryInfo);
?String?identify?=?InvokeUtils.buildInterfaceMethodIdentify(clazz,?method);
?String?requestId;
?synchronized?(ApplicationContext.this)?{
?requestIdWorker.increment();
?requestId?=?String.valueOf(requestIdWorker.longValue());
?}
?Invoker?invoker?=?new?DefaultInvoker(method.getReturnType(),?ctx,?requestId,?identify);
?inProgressInvoker.put(identify?+?"#"?+?requestId,?invoker);
?return?invoker.invoke(args);
?}
?});
}

這里主要是通過(guò)動(dòng)態(tài)代理來(lái)實(shí)現(xiàn)的,首先通過(guò)class來(lái)獲取對(duì)應(yīng)的機(jī)器列表,接著通過(guò)loadBalancer來(lái)選擇一個(gè)機(jī)器。這個(gè)LoaderBalance是一個(gè)接口:

public?interface?LoadBalancer?{
?/**
?*?選擇一個(gè)生產(chǎn)者
?*
?*?@param?registryInfos?生產(chǎn)者列表
?*?@return?選中的生產(chǎn)者
?*/
?RegistryInfo?choose(ListregistryInfos);
}

在ApplicationContext初始化的時(shí)候可以選擇不同的實(shí)現(xiàn),我這里主要實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的隨機(jī)算法(后續(xù)可以拓展為其他的,比如RoundRobin之類的):

public?class?RandomLoadbalancer?implements?LoadBalancer?{
?@Override
?public?RegistryInfo?choose(ListregistryInfos)?{
?Random?random?=?new?Random();
?int?index?=?random.nextInt(registryInfos.size());
?return?registryInfos.get(index);
?}
}

接著構(gòu)造接口方法的唯一標(biāo)識(shí)identify,還有一個(gè)requestId。

為什么需要一個(gè)requestId呢?

這是因?yàn)槲覀冊(cè)谔幚眄憫?yīng)的時(shí)候,需要找到某個(gè)響應(yīng)是對(duì)應(yīng)的哪個(gè)請(qǐng)求,但是僅僅使用identify是不行的

因?yàn)槲覀兺粋€(gè)應(yīng)用程序中可能會(huì)有多個(gè)線程同時(shí)調(diào)用同一個(gè)接口的同一個(gè)方法,這樣的identify是相同的。

所以我們需要用 identify + requestId的方式來(lái)判斷,reqeustId是一個(gè)自增的LongAddr。服務(wù)端在響應(yīng)的時(shí)候會(huì)將requestId返回。

接著我們構(gòu)造了一個(gè)Invoker,把它放入inProgressInvoker的集合中。調(diào)用了其invoke方法:

Invoker?invoker?=?new?DefaultInvoker(method.getReturnType(),?ctx,?requestId,?identify);
inProgressInvoker.put(identify?+?"#"?+?requestId,?invoker);
//?阻塞等待結(jié)果
return?invoker.invoke(args);
public?class?DefaultInvokerimplements?Invoker{
?private?ChannelHandlerContext?ctx;
?private?String?requestId;
?private?String?identify;
?private?Cla***eturnType;
?private?T?result;
?DefaultInvoker(Cla***eturnType,?ChannelHandlerContext?ctx,?String?requestId,?String?identify)?{
?this.returnType?=?returnType;
?this.ctx?=?ctx;
?this.requestId?=?requestId;
?this.identify?=?identify;
?}
?@SuppressWarnings("unckecked")
?@Override
?public?T?invoke(Object[]?args)?{
?JSONObject?jsonObject?=?new?JSONObject();
?jsonObject.put("interfaces",?identify);
?JSONObject?param?=?new?JSONObject();
?if?(args?!=?null)?{
?for?(Object?obj?:?args)?{
?param.put(obj.getClass().getName(),?obj);
?}
?}
?jsonObject.put("parameter",?param);
?jsonObject.put("requestId",?requestId);
?System.out.println("發(fā)送給服務(wù)端JSON為:"?+?jsonObject.toJSONString());
?String?msg?=?jsonObject.toJSONString()?+?"$$";
?ByteBuf?byteBuf?=?Unpooled.buffer(msg.getBytes().length);
?byteBuf.writeBytes(msg.getBytes());
?ctx.writeAndFlush(byteBuf);
?waitForResult();
?return?result;
?}
?@Override
?public?void?setResult(String?result)?{
?synchronized?(this)?{
?this.result?=?JSONObject.parseObject(result,?returnType);
?notifyAll();
?}
?}
?private?void?waitForResult()?{
?synchronized?(this)?{
?try?{
?wait();
?}?catch?(InterruptedException?e)?{
?e.printStackTrace();
?}
?}
?}
}

我們可以看到調(diào)用Invoker的invoke方法之后,會(huì)運(yùn)行到waitForResult()這里,這里已經(jīng)把請(qǐng)求通過(guò)網(wǎng)絡(luò)發(fā)送出去了,但是就會(huì)被卡住。

這是因?yàn)槲覀兊木W(wǎng)絡(luò)請(qǐng)求的結(jié)果不是同步返回的,有可能是客戶端同時(shí)發(fā)起很多個(gè)請(qǐng)求,所以我們不可能在這里讓他同步阻塞等待的。

接受響應(yīng)

那么對(duì)于服務(wù)消費(fèi)者而言,把請(qǐng)求發(fā)送出去但是卡住了,這個(gè)時(shí)候當(dāng)服務(wù)端處理完之后,會(huì)把消息返回給客戶端。返回的入口在

NettyClient的onChannelRead中:

@Override
public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{
?try?{
?String?message?=?(String)?msg;
?if?(messageCallback?!=?null)?{
?messageCallback.onMessage(message);
?}
?}?finally?{
?ReferenceCountUtil.release(msg);
?}
}

這里通過(guò)callback回調(diào)出去。是否還記的我們?cè)诔跏蓟疦ettyClient的時(shí)候,會(huì)設(shè)置一個(gè)callback?

/**
?*?響應(yīng)隊(duì)列
?*/
private?ConcurrentLinkedQueueresponses?=?new?ConcurrentLinkedQueue<>();
client.setMessageCallback(message?->?{
?//?這里收單服務(wù)端返回的消息,先壓入隊(duì)列
?RpcResponse?response?=?JSONObject.parseObject(message,?RpcResponse.class);
?responses.offer(response);
?synchronized?(ApplicationContext.this)?{
?ApplicationContext.this.notifyAll();
?}
});

這里接受消息之后,解析成為一個(gè)RpcResponse對(duì)象,然后壓入responses隊(duì)列中,這樣我們就把所有的請(qǐng)求響應(yīng)放入隊(duì)列中。

但是這樣一來(lái),我們應(yīng)該怎么把響應(yīng)結(jié)果返回給調(diào)用的地方呢?

我們可以這樣做:起一個(gè)或多個(gè)后臺(tái)線程,然后從隊(duì)列中拿出響應(yīng),然后根據(jù)響應(yīng)從我們之前保存的inProcessInvoker中找出對(duì)應(yīng)的Invoker,然后把結(jié)果返回回去

public?ApplicationContext(....){
?
?//.....
?
?//?step?5:?jiǎn)?dòng)處理響應(yīng)的processor
?initProcessor();
?
}
private?void?initProcessor()?{
?//?事實(shí)上,這里可以通過(guò)配置文件讀取,啟動(dòng)多少個(gè)processor
?int?num?=?3;
?processors?=?new?ResponseProcessor[num];
?for?(int?i?=?0;?i?<?3;?i++)?{
?processors[i]?=?createProcessor(i);
?}
}
/**
?*?處理響應(yīng)的線程
?*/
private?class?ResponseProcessor?extends?Thread?{
?@Override
?public?void?run()?{
?System.out.println("啟動(dòng)響應(yīng)處理線程:"?+?getName());
?while?(true)?{
?//?多個(gè)線程在這里獲取響應(yīng),只有一個(gè)成功
?RpcResponse?response?=?responses.poll();
?if?(response?==?null)?{
?try?{
?synchronized?(ApplicationContext.this)?{
?//?如果沒(méi)有響應(yīng),先休眠
?ApplicationContext.this.wait();
?}
?}?catch?(InterruptedException?e)?{
?e.printStackTrace();
?}
?}?else?{
?System.out.println("收到一個(gè)響應(yīng):"?+?response);
?String?interfaceMethodIdentify?=?response.getInterfaceMethodIdentify();
?String?requestId?=?response.getRequestId();
?String?key?=?interfaceMethodIdentify?+?"#"?+?requestId;
?Invoker?invoker?=?inProgressInvoker.remove(key);
?invoker.setResult(response.getResult());
?}
?}
?}
}

這里面如果從隊(duì)列中拿不到數(shù)據(jù),就會(huì)調(diào)用wait()方法等待

這里需要注意的是,在callbak中獲取到響應(yīng)的時(shí)候我們是會(huì)調(diào)用notifyAll()來(lái)喚醒這里的線程的:

responses.offer(response);
synchronized?(ApplicationContext.this)?{
?ApplicationContext.this.notifyAll();
}

這里被喚醒之后,就會(huì)有多個(gè)線程去爭(zhēng)搶那個(gè)響應(yīng),因?yàn)殛?duì)列是線程安全的,所以這里多個(gè)線程可以獲取到響應(yīng)結(jié)果。

接著拿到結(jié)果之后,通過(guò)identify + requestId構(gòu)造成唯一的請(qǐng)求標(biāo)識(shí),從inProgressInvoker中獲取對(duì)應(yīng)的invoker,然后通過(guò)setResult將結(jié)果設(shè)置進(jìn)去:

String?key?=?interfaceMethodIdentify?+?"#"?+?requestId;
Invoker?invoker?=?inProgressInvoker.remove(key);
invoker.setResult(response.getResult());
@Override
public?void?setResult(String?result)?{
?synchronized?(this)?{
?this.result?=?JSONObject.parseObject(result,?returnType);
?notifyAll();
?}
}

這里設(shè)置進(jìn)去之后,就會(huì)將結(jié)果用json反序列化成為用戶需要的結(jié)果,然后調(diào)用其notifyAll方法喚醒invoke方法被阻塞的線程:

?@SuppressWarnings("unckecked")
?@Override
?public?T?invoke(Object[]?args)?{
?JSONObject?jsonObject?=?new?JSONObject();
?jsonObject.put("interfaces",?identify);
?JSONObject?param?=?new?JSONObject();
?if?(args?!=?null)?{
?for?(Object?obj?:?args)?{
?param.put(obj.getClass().getName(),?obj);
?}
?}
?jsonObject.put("parameter",?param);
?jsonObject.put("requestId",?requestId);
?System.out.println("發(fā)送給服務(wù)端JSON為:"?+?jsonObject.toJSONString());
?String?msg?=?jsonObject.toJSONString()?+?NettyServer.DELIMITER;
?ByteBuf?byteBuf?=?Unpooled.buffer(msg.getBytes().length);
?byteBuf.writeBytes(msg.getBytes());
?ctx.writeAndFlush(byteBuf);
?//?這里被喚醒
?waitForResult();
?return?result;
?}

然后就可以返回結(jié)果了,返回的結(jié)果就會(huì)返回給用戶了。

整體測(cè)試

到了這里我們的生產(chǎn)者和消費(fèi)者的代碼都寫(xiě)完了,我們來(lái)整體測(cè)試一遍。生產(chǎn)者的代碼是和之前的一致的:

public?class?TestProducer?{
?public?static?void?main(String[]?args)?throws?Exception?{
?String?connectionString?=?"zookeeper://localhost1:2181,localhost2:2182,localhost3:2181";
?HelloService?service?=?new?HelloServiceImpl();
?ServiceConfig?config?=?new?ServiceConfig<>(HelloService.class,?service);
?ListserviceConfigList?=?new?ArrayList<>();
?serviceConfigList.add(config);
?ApplicationContext?ctx?=?new?ApplicationContext(connectionString,?serviceConfigList,?null,?50071);
?}
}

消費(fèi)者測(cè)試代碼:

public?class?TestConsumer?{
?public?static?void?main(String[]?args)?throws?Exception?{
?String?connectionString?=?"zookeeper://localhost1:2181,localhost2:2182,localhost3:2181";
?ReferenceConfigconfig?=?new?ReferenceConfig<>(HelloService.class);
?ApplicationContext?ctx?=?new?ApplicationContext(connectionString,?null,?Collections.singletonList(config),
?50070);
?HelloService?helloService?=?ctx.getService(HelloService.class);
?System.out.println("sayHello(TestBean)結(jié)果為:"?+?helloService.sayHello(new?TestBean("張三",?20)));
?}
}

接著啟動(dòng)生產(chǎn)者,然后啟動(dòng)消費(fèi)者:

生產(chǎn)者得到的日志如下:

Zookeeper?Client初始化完畢......
注冊(cè)到注冊(cè)中心,路徑為:【/myRPC/interface=com.study.rpc.test.producer.HelloService&
method=sayHello?meter=com.study.rpc.test.producer.TestBean】
信息為:RegistryInfo{hostname='localhost',?ip='192.168.16.7',?port=50071}
啟動(dòng)NettyService,端口為:50071
啟動(dòng)響應(yīng)處理線程:Response-processor-0
啟動(dòng)響應(yīng)處理線程:Response-processor-2
啟動(dòng)響應(yīng)處理線程:Response-processor-1
接收到消息:{"interfaces":"interface=com.study.rpc.test.producer.HelloService&
method=sayHello?meter=com.study.rpc.test.producer.TestBean","requestId":"1",
"parameter":{"com.study.rpc.test.producer.TestBean":{"age":20,"name":"張三"}}}
響應(yīng)給客戶端:{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService&
method=sayHello?meter=com.study.rpc.test.producer.TestBean","requestId":"1",
"result":"\"牛逼,我收到了消息:TestBean{name='張三',?age=20}\""}

消費(fèi)者得到的日志為:

Zookeeper Client初始化完畢......

開(kāi)始建立連接:192.168.16.7, 50071

等待連接成功...

啟動(dòng)響應(yīng)處理線程:Response-processor-1

啟動(dòng)響應(yīng)處理線程:Response-processor-0

啟動(dòng)響應(yīng)處理線程:Response-processor-2

連接成功:ChannelHandlerContext(NettyClient$NettyClientHandler#0,

[id: 0xb7a59701, L:/192.168.16.7:58354 - R:/192.168.16.7:50071])

發(fā)送給服務(wù)端JSON為:{"interfaces":"interface=com.study.rpc.test.producer.HelloService&

method=sayHello?meter=com.study.rpc.test.producer.TestBean","requestId":"1",

"parameter":{"com.study.rpc.test.producer.TestBean":{"age":20,"name":"張三"}}}

收到一個(gè)響應(yīng):RpcResponse{result='"牛逼,我收到了消息:TestBean{name='張三', age=20}"',

interfaceMethodIdentify='interface=com.study.rpc.test.producer.HelloService&

method=sayHello?meter=com.study.rpc.test.producer.TestBean', requestId='1'}

sayHello(TestBean)結(jié)果為:牛逼,我收到了消息:TestBean{name='張三', age=20}

總結(jié)

通過(guò)完成這個(gè)RPC框架,大家應(yīng)該會(huì)大致對(duì)RPC的實(shí)現(xiàn)原理有個(gè)感性的認(rèn)識(shí),這里總結(jié)一下特性:

  • 支持多種注冊(cè)中心,可配置(雖然只實(shí)現(xiàn)了zookeeper,但是我們拓展是非常簡(jiǎn)單的)

  • 支持負(fù)載均衡

當(dāng)然了還有非常多的不足之處,這是無(wú)可否認(rèn)的,隨意寫(xiě)出來(lái)的框架和工業(yè)級(jí)使用的框架比較還是不一樣

我這里列舉一些不完美的地方把(有興趣的可以搞搞):

  • 實(shí)現(xiàn)序列化框架的拓展,多種序列化供用戶選擇

  • 網(wǎng)絡(luò)請(qǐng)求錯(cuò)誤處理,這里實(shí)現(xiàn)非常簡(jiǎn)陋,健壯性很差

  • 注冊(cè)中心不支持故障感知和自動(dòng)恢復(fù)

  • 調(diào)用監(jiān)控,性能指標(biāo)

歡迎工作一到五年的Java工程師朋友們加入我的個(gè)人粉絲群(Java架構(gòu)技術(shù)棧:728987924)群內(nèi)提供免費(fèi)的Java架構(gòu)學(xué)習(xí)資料(里面有高可用、高并發(fā)、高性能及分布式、Jvm性能調(diào)優(yōu)、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個(gè)知識(shí)點(diǎn)的架構(gòu)資料)

合理利用自己每一分每一秒的時(shí)間來(lái)學(xué)習(xí)提升自己,不要再用"沒(méi)有時(shí)間“來(lái)掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來(lái)的自己一個(gè)交代!


向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI