您好,登錄后才能下訂單哦!
如果大家對(duì)RPC有一些了解的話,或多或者都會(huì)聽(tīng)到過(guò)一些大名鼎鼎的RPC框架,比如Dobbo、gRPC。但是大部分人對(duì)于他們底層的實(shí)現(xiàn)原理其實(shí)不甚了解。
有一種比較好的學(xué)習(xí)方式:就是如果你想要了解一個(gè)框架的原理,你可以嘗試去寫(xiě)一個(gè)簡(jiǎn)易版的框架出來(lái),就比如如果你想理解Spring IOC的思想,最好的方式就是自己實(shí)現(xiàn)一個(gè)小型的IOC容器,自己慢慢體會(huì)。
所以本文嘗試帶領(lǐng)大家去設(shè)計(jì)一個(gè)小型的RPC框架,同時(shí)對(duì)于框架會(huì)保持一些拓展點(diǎn)。
通過(guò)閱讀本文,你可以收獲:
理解RPC框架最核心的理念
學(xué)習(xí)在設(shè)計(jì)框架的時(shí)候,如何保持拓展性
本文會(huì)依賴一些組件,他們是實(shí)現(xiàn)RPC框架必要的一些知識(shí),文中會(huì)盡量降低這些知識(shí)帶來(lái)的障礙。但是,最好期望讀者有以下知識(shí)基礎(chǔ):
Zookeeper基本入門
Netty基本入門
RPC框架應(yīng)該長(zhǎng)什么樣子
我們首先來(lái)看一下:一個(gè)RPC框架是什么東西?
我們最直觀的感覺(jué)就是:
集成了RPC框架之后,通過(guò)配置一個(gè)注冊(cè)中心的地址。一個(gè)應(yīng)用(稱為服務(wù)提供者)將某個(gè)接口(interface)“暴露”出去,另外一個(gè)應(yīng)用(稱為服務(wù)消費(fèi)者)通過(guò)“引用”這個(gè)接口(interface),然后調(diào)用了一下,就很神奇的可以調(diào)用到另外一個(gè)應(yīng)用的方法了
給我們的感覺(jué)就好像調(diào)用了一個(gè)本地方法一樣。即便兩個(gè)應(yīng)用不是在同一個(gè)JVM中甚至兩個(gè)應(yīng)用都不在同一臺(tái)機(jī)器中。
那他們是如何做到的呢?
其實(shí)啊,當(dāng)我們的服務(wù)消費(fèi)者調(diào)用某個(gè)RPC接口的方法之后,它的底層會(huì)通過(guò)動(dòng)態(tài)代理,然后經(jīng)過(guò)網(wǎng)絡(luò)調(diào)用,去到服務(wù)提供者的機(jī)器上,然后執(zhí)行對(duì)應(yīng)的方法。
接著方法的結(jié)果通過(guò)網(wǎng)絡(luò)傳輸返回到服務(wù)消費(fèi)者那里,然后就可以拿到結(jié)果了。
整個(gè)過(guò)程如下圖:
那么這個(gè)時(shí)候,可能有人會(huì)問(wèn)了:服務(wù)消費(fèi)者怎么知道服務(wù)提供者在哪臺(tái)機(jī)器的哪個(gè)端口呢?
這個(gè)時(shí)候,就需要“注冊(cè)中心”登場(chǎng)了,具體來(lái)說(shuō)是這樣子的:
服務(wù)提供者在啟動(dòng)的時(shí)候,將自己應(yīng)用所在機(jī)器的信息提交到注冊(cè)中心上面。
服務(wù)消費(fèi)者在啟動(dòng)的時(shí)候,將需要消費(fèi)的接口所在機(jī)器的信息抓回來(lái)
這樣一來(lái),服務(wù)消費(fèi)者就有了一份服務(wù)提供者所在的機(jī)器列表了
“服務(wù)消費(fèi)者”拿到了“服務(wù)提供者”的機(jī)器列表就可以通過(guò)網(wǎng)絡(luò)請(qǐng)求來(lái)發(fā)起請(qǐng)求了。
網(wǎng)絡(luò)客戶端,我們應(yīng)該采用什么呢?有幾種選擇:
使用JDK原生BIO(也就是ServerSocket那一套)。阻塞式IO方法,無(wú)法支撐高并發(fā)。
使用JDK原生NIO(Selector、SelectionKey那一套)。非阻塞式IO,可以支持高并發(fā),但是自己實(shí)現(xiàn)復(fù)雜,需要處理各種網(wǎng)絡(luò)問(wèn)題。
使用大名鼎鼎的NIO框架Netty,天然支持高并發(fā),封裝好,API易用。
作為一個(gè)有追求的程序員,我們要求開(kāi)發(fā)出來(lái)的框架要求支持高并發(fā)、又要求簡(jiǎn)單、還要快。當(dāng)然是選擇Netty來(lái)實(shí)現(xiàn)了,使用Netty的一些很基本的API就能滿足我們的需求。
網(wǎng)絡(luò)協(xié)議定義
當(dāng)然了,既然我們要使用網(wǎng)絡(luò)傳輸數(shù)據(jù)。我們首先要定義一套網(wǎng)絡(luò)協(xié)議出來(lái)。
你可能又要問(wèn)了,啥叫網(wǎng)絡(luò)協(xié)議?
網(wǎng)絡(luò)協(xié)議,通俗理解,意思就是說(shuō)我們的客戶端發(fā)送的數(shù)據(jù)應(yīng)該長(zhǎng)什么樣子,服務(wù)端可以去解析出來(lái)知道要做什么事情。話不多說(shuō),上代碼:
假設(shè)我們現(xiàn)在服務(wù)提供者有兩個(gè)類:
//?com.study.rpc.test.producer.HelloService public?interface?HelloService?{ ?String?sayHello(TestBean?testBean); } //?com.study.rpc.test.producer.TestBean public?class?TestBean?{ ?private?String?name; ?private?Integer?age; ?public?TestBean(String?name,?Integer?age)?{ ?this.name?=?name; ?this.age?=?age; ?} ?//?getter?setter }
現(xiàn)在我要調(diào)用HelloService.sayHello(TestBean testBean)這個(gè)方法
作為“服務(wù)消費(fèi)者”,應(yīng)該怎么定義我們的請(qǐng)求,從而讓服務(wù)端知道我是要調(diào)用這個(gè)方法呢?
這需要我們將這個(gè)接口信息產(chǎn)生一個(gè)唯一的標(biāo)識(shí): 這個(gè)標(biāo)識(shí)會(huì)記錄了接口名、具體是那個(gè)方法、然后具體參數(shù)是什么!
然后將這些信息組織起來(lái)發(fā)送給服務(wù)端,我這里的方式是將信息保存為一個(gè)JSON格式的字符串來(lái)傳輸。
比如上面的接口我們傳輸?shù)臄?shù)據(jù)大概是這樣的:
{ "interfaces":?"interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean", "requestId":?"3", "parameter":?{ "com.study.rpc.test.producer.TestBean":?{ "age":?20, "name":?"張三" } } }
嗯,我這里用一個(gè)JSON來(lái)標(biāo)識(shí)這次調(diào)用是調(diào)用哪個(gè)接口的哪個(gè)方法,其中interface標(biāo)識(shí)了唯一的類,parameter標(biāo)識(shí)了里面具體有哪些參數(shù), 其中key就是參數(shù)的類全限定名,value就是這個(gè)類的JSON信息。
可能看到這里,大家可能有意見(jiàn)了: 數(shù)據(jù)不一定用JSON格式傳輸啊,而且使用JSON也不一定性能最高啊。
你使用JDK的Serializable配合Netty的ObjectDecoder來(lái)實(shí)現(xiàn),這當(dāng)然也可以,其實(shí)這里是一個(gè)拓展點(diǎn),我們應(yīng)該要提供多種序列化方式來(lái)供用戶選擇
但是這里選擇了JSON的原因是因?yàn)樗容^直觀,對(duì)于寫(xiě)文章來(lái)說(shuō)比較合理。
開(kāi)發(fā)服務(wù)提供者
嗯,搞定了網(wǎng)絡(luò)協(xié)議之后,我們開(kāi)始開(kāi)發(fā)“服務(wù)提供者”了。對(duì)于服務(wù)提供者,因?yàn)槲覀冞@里是寫(xiě)一個(gè)簡(jiǎn)單版本的RPC框架,為了保持簡(jiǎn)潔。
我們不會(huì)引入類似Spring之類的容器框架,所以我們需要定義一個(gè)服務(wù)提供者的配置類,它用于定義這個(gè)服務(wù)提供者是什么接口,然后它具體的實(shí)例對(duì)象是什么:
public?class?ServiceConfig{ ?public?Class?type; ?public?T?instance; ?public?ServiceConfig(Classtype,?T?instance)?{ ?this.type?=?type; ?this.instance?=?instance; ?} ?public?ClassgetType()?{ ?return?type; ?} ?public?void?setType(Classtype)?{ ?this.type?=?type; ?} ?public?T?getInstance()?{ ?return?instance; ?} ?public?void?setInstance(T?instance)?{ ?this.instance?=?instance; ?} }
有了這個(gè)東西之后,我們就知道需要暴露哪些接口了。
為了框架有一個(gè)統(tǒng)一的入口,我定義了一個(gè)類叫做ApplicationContext,可以認(rèn)為這是一個(gè)應(yīng)用程序上下文,他的構(gòu)造函數(shù),接收2個(gè)參數(shù),代碼如下:
public?ApplicationContext(String?registryUrl,?ListserviceConfigs){ ?//?1.?保存需要暴露的接口配置 ?this.serviceConfigs?=?serviceConfigs?==?null???new?ArrayList<>()?:?serviceConfigs; ?//?step?2:?實(shí)例化注冊(cè)中心 ?initRegistry(registryUrl); ?//?step?3:?將接口注冊(cè)到注冊(cè)中心,從注冊(cè)中心獲取接口,初始化服務(wù)接口列表 ?RegistryInfo?registryInfo?=?null; ?InetAddress?addr?=?InetAddress.getLocalHost(); ?String?hostname?=?addr.getHostName(); ?String?hostAddress?=?addr.getHostAddress(); ?registryInfo?=?new?RegistryInfo(hostname,?hostAddress,?port); ?doRegistry(registryInfo); ? ? ?//?step?4:初始化Netty服務(wù)器,接受到請(qǐng)求,直接打到服務(wù)提供者的service方法中 ?if?(!this.serviceConfigs.isEmpty())?{ ?//?需要暴露接口才暴露 ?nettyServer?=?new?NettyServer(this.serviceConfigs,?interfaceMethods); ?nettyServer.init(port); ?} }
注冊(cè)中心設(shè)計(jì)
這里分為幾個(gè)步驟,首先保存了接口配置,接著初始化注冊(cè)中心,因?yàn)樽?cè)中心可能會(huì)提供多種來(lái)供用戶選擇,所以這里需要定義一個(gè)注冊(cè)中心的接口:
public?interface?Registry?{ ?/** ?*?將生產(chǎn)者接口注冊(cè)到注冊(cè)中心 ?* ?*?@param?clazz?類 ?*?@param?registryInfo?本機(jī)的注冊(cè)信息 ?*/ ?void?register(Class?clazz,?RegistryInfo?registryInfo)?throws?Exception; }
這里我們提供一個(gè)注冊(cè)的方法,這個(gè)方法的語(yǔ)義是將clazz對(duì)應(yīng)的接口注冊(cè)到注冊(cè)中心。接收兩個(gè)參數(shù),一個(gè)是接口的class對(duì)象,另一個(gè)是注冊(cè)信息,
里面包含了本機(jī)的一些基本信息,如下:
public?class?RegistryInfo?{ ?private?String?hostname; ?private?String?ip; ?private?Integer?port; ?public?RegistryInfo(String?hostname,?String?ip,?Integer?port)?{ ?this.hostname?=?hostname; ?this.ip?=?ip; ?this.port?=?port; ?} ?//?getter?setter }
好了,定義好注冊(cè)中心,回到之前的實(shí)例化注冊(cè)中心的地方,代碼如下:
/** ?*?注冊(cè)中心 ?*/ private?Registry?registry; private?void?initRegistry(String?registryUrl)?{ ?if?(registryUrl.startsWith("zookeeper://"))?{ ?registryUrl?=?registryUrl.substring(12); ?registry?=?new?ZookeeperRegistry(registryUrl); ?}?else?if?(registryUrl.startsWith("multicast://"))?{ ?registry?=?new?MulticastRegistry(registryUrl); ?} }
這里邏輯也非常簡(jiǎn)單,就是根據(jù)url的schema來(lái)判斷是那個(gè)注冊(cè)中心
注冊(cè)中心這里實(shí)現(xiàn)了2個(gè)實(shí)現(xiàn)類,分別使用zookeeper作為注冊(cè)中心,另外一個(gè)是使用廣播的方式作為注冊(cè)中心。
廣播注冊(cè)中心這里僅僅是做個(gè)示范,內(nèi)部沒(méi)有實(shí)現(xiàn)。我們主要是實(shí)現(xiàn)了zookeeper的注冊(cè)中心。
(當(dāng)然了,如果有興趣,可以實(shí)現(xiàn)更多的注冊(cè)中心供用戶選擇,比如redis之類的,這里只是為了保持“拓展點(diǎn)”)
那么實(shí)例化完注冊(cè)中心之后,回到上面的代碼:
注冊(cè)服務(wù)提供者
//?step?3:?將接口注冊(cè)到注冊(cè)中心,從注冊(cè)中心獲取接口,初始化服務(wù)接口列表 RegistryInfo?registryInfo?=?null; InetAddress?addr?=?InetAddress.getLocalHost(); String?hostname?=?addr.getHostName(); String?hostAddress?=?addr.getHostAddress(); registryInfo?=?new?RegistryInfo(hostname,?hostAddress,?port); doRegistry(registryInfo);
這里邏輯很簡(jiǎn)單,就是獲取本機(jī)的的基本信息構(gòu)造成RegistryInfo,然后調(diào)用了doRegistry方法:
/** ?*?接口方法對(duì)應(yīng)method對(duì)象 ?*/ private?MapinterfaceMethods?=?new?ConcurrentHashMap<>(); private?void?doRegistry(RegistryInfo?registryInfo)?throws?Exception?{ ?for?(ServiceConfig?config?:?serviceConfigs)?{ ?Class?type?=?config.getType(); ?registry.register(type,?registryInfo); ?Method[]?declaredMethods?=?type.getDeclaredMethods(); ?for?(Method?method?:?declaredMethods)?{ ?String?identify?=?InvokeUtils.buildInterfaceMethodIdentify(type,?method); ?interfaceMethods.put(identify,?method); ?} ?} }
這里做了2件事情:
將接口注冊(cè)到注冊(cè)中心中
對(duì)于每一個(gè)接口的每一個(gè)方法,生成一個(gè)唯一標(biāo)識(shí),保存在interfaceMethods集合中
下面分別分析這兩件事情,首先是注冊(cè)方法:
因?yàn)槲覀冇玫搅藌ookeeper,為了方便,引入了zookeeper的客戶端框架curator
<dependency> ?<groupId>org.apache.curatorgroupId> ?<artifactId>curator-recipesartifactId> ?<version>2.3.0version> dependency>
接著看代碼:
public?class?ZookeeperRegistry?implements?Registry?{ ?private?CuratorFramework?client; ?public?ZookeeperRegistry(String?connectString)?{ ?RetryPolicy?retryPolicy?=?new?ExponentialBackoffRetry(1000,?3); ?client?=?CuratorFrameworkFactory.newClient(connectString,?retryPolicy); ?client.start(); ?try?{ ?Stat?myRPC?=?client.checkExists().forPath("/myRPC"); ?if?(myRPC?==?null)?{ ?client.create() ?.creatingParentsIfNeeded() ?.forPath("/myRPC"); ?} ?System.out.println("Zookeeper?Client初始化完畢......"); ?}?catch?(Exception?e)?{ ?e.printStackTrace(); ?} ?} ?@Override ?public?void?register(Class?clazz,?RegistryInfo?registryInfo)?throws?Exception?{ ?//?1.?注冊(cè)的時(shí)候,先從zk中獲取數(shù)據(jù) ?//?2.?將自己的服務(wù)器地址加入注冊(cè)中心中 ?//?為每一個(gè)接口的每一個(gè)方法注冊(cè)一個(gè)臨時(shí)節(jié)點(diǎn),然后key為接口方法的唯一標(biāo)識(shí),data為服務(wù)地址列表 ?Method[]?declaredMethods?=?clazz.getDeclaredMethods(); ?for?(Method?method?:?declaredMethods)?{ ?String?key?=?InvokeUtils.buildInterfaceMethodIdentify(clazz,?method); ?String?path?=?"/myRPC/"?+?key; ?Stat?stat?=?client.checkExists().forPath(path); ?ListregistryInfos; ?if?(stat?!=?null)?{ ?//?如果這個(gè)接口已經(jīng)有人注冊(cè)過(guò)了,把數(shù)據(jù)拿回來(lái),然后將自己的信息保存進(jìn)去 ?byte[]?bytes?=?client.getData().forPath(path); ?String?data?=?new?String(bytes,?StandardCharsets.UTF_8); ?registryInfos?=?JSONArray.parseArray(data,?RegistryInfo.class); ?if?(registryInfos.contains(registryInfo))?{ ?//?正常來(lái)說(shuō),zk的臨時(shí)節(jié)點(diǎn),斷開(kāi)連接后,直接就沒(méi)了,但是重啟會(huì)經(jīng)常發(fā)現(xiàn)存在節(jié)點(diǎn),所以有了這樣的代碼 ?System.out.println("地址列表已經(jīng)包含本機(jī)【"?+?key?+?"】,不注冊(cè)了"); ?}?else?{ ?registryInfos.add(registryInfo); ?client.setData().forPath(path,?JSONArray.toJSONString(registryInfos).getBytes()); ?System.out.println("注冊(cè)到注冊(cè)中心,路徑為:【"?+?path?+?"】?信息為:"?+?registryInfo); ?} ?}?else?{ ?registryInfos?=?new?ArrayList<>(); ?registryInfos.add(registryInfo); ?client.create() ?.creatingParentsIfNeeded() ?//?臨時(shí)節(jié)點(diǎn),斷開(kāi)連接就關(guān)閉 ?.withMode(CreateMode.EPHEMERAL) ?.forPath(path,?JSONArray.toJSONString(registryInfos).getBytes()); ?System.out.println("注冊(cè)到注冊(cè)中心,路徑為:【"?+?path?+?"】?信息為:"?+?registryInfo); ?} ?} ?} }
zookeeper注冊(cè)中心在初始化的時(shí)候,會(huì)建立好連接。然后注冊(cè)的時(shí)候,針對(duì)clazz接口的每一個(gè)方法,都會(huì)生成一個(gè)唯一標(biāo)識(shí)
這里使用了InvokeUtils.buildInterfaceMethodIdentify方法:
public?static?String?buildInterfaceMethodIdentify(Class?clazz,?Method?method)?{ ?Map<String,?String>?map?=?new?LinkedHashMap<>(); ?map.put("interface",?clazz.getName()); ?map.put("method",?method.getName()); ?Parameter[]?parameters?=?method.getParameters(); ?if?(parameters.length?>?0)?{ ?StringBuilder?param?=?new?StringBuilder(); ?for?(int?i?=?0;?i?<?parameters.length;?i++)?{ ?Parameter?p?=?parameters[i]; ?param.append(p.getType().getName()); ?if?(i?<?parameters.length?-?1)?{ ?param.append(","); ?} ?} ?map.put("parameter",?param.toString()); ?} ?return?map2String(map); } public?static?String?map2String(Map<String,?String>?map)?{ ?StringBuilder?sb?=?new?StringBuilder(); ?Iterator<map.entry<string,?<=""?span="">String>>?iterator?=?map.entrySet().iterator(); ?while?(iterator.hasNext())?{ ?Map.Entry<String,?String>?entry?=?iterator.next(); ?sb.append(entry.getKey()?+?"="?+?entry.getValue()); ?if?(iterator.hasNext())?{ ?sb.append("&"); ?} ?} ?return?sb.toString(); }
其實(shí)就是對(duì)接口的方法使用他們的限定名和參數(shù)來(lái)組成一個(gè)唯一的標(biāo)識(shí),比如 HelloService#sayHello(TestBean)生成的大概是這樣的:
interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean
接下來(lái)的邏輯就簡(jiǎn)單了,在Zookeeper中的/myRPC路徑下面建立臨時(shí)節(jié)點(diǎn),節(jié)點(diǎn)名稱為我們上面的接口方法唯一標(biāo)識(shí),數(shù)據(jù)內(nèi)容為機(jī)器信息。
之所以采用臨時(shí)節(jié)點(diǎn)是因?yàn)椋喝绻麢C(jī)器宕機(jī)了,連接斷開(kāi)之后,消費(fèi)者可以通過(guò)zookeeper的watcher機(jī)制感知到
大概看起來(lái)是這樣的:
?/myRPC/interface=com.study.rpc.test.producer.HelloService&method=sayHello& ?parameter=com.study.rpc.test.producer.TestBean ?[ ?{ ?"hostname":peer1, ?"port":8080 ?}, ?{ ?"hostname":peer2, ?"port":8081 ?} ?]
通過(guò)這樣的方式,在服務(wù)消費(fèi)的時(shí)候就可以拿到這樣的注冊(cè)信息,然后知道可以調(diào)用那臺(tái)機(jī)器的那個(gè)端口。
好了,注冊(cè)中心弄完了之后,我們回到前面說(shuō)的注冊(cè)方法做的第二件事情,我們將每一個(gè)接口方法標(biāo)識(shí)的方法放入了一個(gè)map中:
/** ?*?接口方法對(duì)應(yīng)method對(duì)象 ?*/ private?Map<String,?Method>?interfaceMethods?=?new?ConcurrentHashMap<>();
這個(gè)的原因是因?yàn)?,我們?cè)谑盏骄W(wǎng)絡(luò)請(qǐng)求的時(shí)候,需要調(diào)用反射的方式調(diào)用method對(duì)象,所以存起來(lái)。
啟動(dòng)網(wǎng)絡(luò)服務(wù)端接受請(qǐng)求
接下來(lái)我們就可以看第四步了:
//?step?4:初始化Netty服務(wù)器,接受到請(qǐng)求,直接打到服務(wù)提供者的service方法中 if?(!this.serviceConfigs.isEmpty())?{ ?//?需要暴露接口才暴露 ?nettyServer?=?new?NettyServer(this.serviceConfigs,?interfaceMethods); ?nettyServer.init(port); }
因?yàn)檫@里使用Netty來(lái)做的所以需要引入Netty的依賴:
<dependency> ?<groupId>io.nettygroupId> ?<artifactId>netty-allartifactId> ?<version>4.1.30.Finalversion> dependency>
接著來(lái)分析:
public?class?NettyServer?{ ?/** ?*?負(fù)責(zé)調(diào)用方法的handler ?*/ ?private?RpcInvokeHandler?rpcInvokeHandler; ?public?NettyServer(ListserverConfigs,?MapinterfaceMethods)?throws?InterruptedException?{ ?this.rpcInvokeHandler?=?new?RpcInvokeHandler(serverConfigs,?interfaceMethods); ?} ?public?int?init(int?port)?throws?Exception?{ ?EventLoopGroup?bossGroup?=?new?NioEventLoopGroup(); ?EventLoopGroup?workerGroup?=?new?NioEventLoopGroup(); ?ServerBootstrap?b?=?new?ServerBootstrap(); ?b.group(bossGroup,?workerGroup) ?.channel(NioServerSocketChannel.class) ?.option(ChannelOption.SO_BACKLOG,?1024) ?.childHandler(new?ChannelInitializer()?{ ?@Override ?protected?void?initChannel(SocketChannel?ch)?throws?Exception?{ ?ByteBuf?delimiter?=?Unpooled.copiedBuffer("$$"); ?//?設(shè)置按照分隔符“&&”來(lái)切分消息,單條消息限制為?1MB ?ch.pipeline().addLast(new?DelimiterBasedFrameDecoder(1024?*?1024,?delimiter)); ?ch.pipeline().addLast(new?StringDecoder()); ?ch.pipeline().addLast().addLast(rpcInvokeHandler); ?} ?}); ?ChannelFuture?sync?=?b.bind(port).sync(); ?System.out.println("啟動(dòng)NettyService,端口為:"?+?port); ?return?port; ?} }
這部分主要的都是netty的api,我們不做過(guò)多的說(shuō)明,就簡(jiǎn)單的說(shuō)一下:
我們通過(guò)“&&”作為標(biāo)識(shí)符號(hào)來(lái)區(qū)分兩條信息,然后一條信息的最大長(zhǎng)度為1MB
所有邏輯都在RpcInvokeHandler中,這里面?zhèn)鬟M(jìn)去了配置的服務(wù)接口實(shí)例,以及服務(wù)接口實(shí)例每個(gè)接口方法唯一標(biāo)識(shí)對(duì)應(yīng)的Method對(duì)象的Map集合。
public?class?RpcInvokeHandler?extends?ChannelInboundHandlerAdapter?{ ?/** ?*?接口方法唯一標(biāo)識(shí)對(duì)應(yīng)的Method對(duì)象 ?*/ ?private?Map<String,?Method>?interfaceMethods; ?/** ?*?接口對(duì)應(yīng)的實(shí)現(xiàn)類 ?*/ ?private?Map<class,?Object>?interfaceToInstance; ?/** ?*?線程池,隨意寫(xiě)的,不要吐槽 ?*/ ?private?ThreadPoolExecutor?threadPoolExecutor?=?new?ThreadPoolExecutor(10, ?50,?60,?TimeUnit.SECONDS,?new?LinkedBlockingQueue<>(100), ?new?ThreadFactory()?{ ?AtomicInteger?m?=?new?AtomicInteger(0); ?@Override ?public?Thread?newThread(Runnable?r)?{ ?return?new?Thread(r,?"IO-thread-"?+?m.incrementAndGet()); ?} ?}); ?public?RpcInvokeHandler(ListserviceConfigList, ?Map<String,?Method>?interfaceMethods)?{ ?this.interfaceToInstance?=?new?ConcurrentHashMap<>(); ?this.interfaceMethods?=?interfaceMethods; ?for?(ServiceConfig?config?:?serviceConfigList)?{ ?interfaceToInstance.put(config.getType(),?config.getInstance()); ?} ?} ?@Override ?public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{ ?try?{ ?String?message?=?(String)?msg; ?//?這里拿到的是一串JSON數(shù)據(jù),解析為Request對(duì)象, ?//?事實(shí)上這里解析網(wǎng)絡(luò)數(shù)據(jù),可以用序列化方式,定一個(gè)接口,可以實(shí)現(xiàn)JSON格式序列化,或者其他序列化 ?//?但是demo版本就算了。 ?System.out.println("接收到消息:"?+?msg); ?RpcRequest?request?=?RpcRequest.parse(message,?ctx); ?threadPoolExecutor.execute(new?RpcInvokeTask(request)); ?}?finally?{ ?ReferenceCountUtil.release(msg); ?} ?} ?@Override ?public?void?channelReadComplete(ChannelHandlerContext?ctx)?throws?Exception?{ ?ctx.flush(); ?} ?@Override ?public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{ ?System.out.println("發(fā)生了異常..."?+?cause); ?cause.printStackTrace(); ?ctx.close(); ?} ?public?class?RpcInvokeTask?implements?Runnable?{ ?private?RpcRequest?rpcRequest; ?RpcInvokeTask(RpcRequest?rpcRequest)?{ ?this.rpcRequest?=?rpcRequest; ?} ?@Override ?public?void?run()?{ ?try?{ ?/* ?*?數(shù)據(jù)大概是這樣子的 ?*?{"interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello?meter=com ?*?.study.rpc.test.producer.TestBean","requestId":"3","parameter":{"com.study.rpc.test.producer ?*?.TestBean":{"age":20,"name":"張三"}}} ?*/ ?//?這里希望能拿到每一個(gè)服務(wù)對(duì)象的每一個(gè)接口的特定聲明 ?String?interfaceIdentity?=?rpcRequest.getInterfaceIdentity(); ?Method?method?=?interfaceMethods.get(interfaceIdentity); ?Map<String,?String>?map?=?string2Map(interfaceIdentity); ?String?interfaceName?=?map.get("interface"); ?Class?interfaceClass?=?Class.forName(interfaceName); ?Object?o?=?interfaceToInstance.get(interfaceClass); ?String?parameterString?=?map.get("parameter"); ?Object?result; ?if?(parameterString?!=?null)?{ ?String[]?parameterTypeClass?=?parameterString.split(","); ?Map<String,?Object>?parameterMap?=?rpcRequest.getParameterMap(); ?Object[]?parameterInstance?=?new?Object[parameterTypeClass.length]; ?for?(int?i?=?0;?i?<?parameterTypeClass.length;?i++)?{ ?String?parameterClazz?=?parameterTypeClass[i]; ?parameterInstance[i]?=?parameterMap.get(parameterClazz); ?} ?result?=?method.invoke(o,?parameterInstance); ?}?else?{ ?result?=?method.invoke(o); ?} ?//?寫(xiě)回響應(yīng) ?ChannelHandlerContext?ctx?=?rpcRequest.getCtx(); ?String?requestId?=?rpcRequest.getRequestId(); ?RpcResponse?response?=?RpcResponse.create(JSONObject.toJSONString(result),?interfaceIdentity, ?requestId); ?String?s?=?JSONObject.toJSONString(response)?+?"$$"; ?ByteBuf?byteBuf?=?Unpooled.copiedBuffer(s.getBytes()); ?ctx.writeAndFlush(byteBuf); ?System.out.println("響應(yīng)給客戶端:"?+?s); ?}?catch?(Exception?e)?{ ?e.printStackTrace(); ?} ?} ? ?public?static?Map<String,?String>?string2Map(String?str)?{ ?String[]?split?=?str.split("&"); ?Map<String,?String>?map?=?new?HashMap<>(16); ?for?(String?s?:?split)?{ ?String[]?split1?=?s.split("="); ?map.put(split1[0],?split1[1]); ?} ?return?map; ?} ?} }
這里說(shuō)明一下上面的邏輯:
channelRead方法用于接收消息,接收到的就是我們前面分析的那個(gè)JSON格式的數(shù)據(jù),接著我們將消息解析成RpcRequest
public?class?RpcRequest?{ ?private?String?interfaceIdentity; ?private?Map<String,?Object>?parameterMap?=?new?HashMap<>(); ?private?ChannelHandlerContext?ctx; ?private?String?requestId; ?public?static?RpcRequest?parse(String?message,?ChannelHandlerContext?ctx)?throws?ClassNotFoundException?{ ?/* ?*?{ ?*?"interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello2?meter=java.lang ?*?.String,com.study.rpc.test.producer.TestBean", ?*?"parameter":{ ?*?"java.lang.String":"haha", ?*?"com.study.rpc.test.producer.TestBean":{ ?*?"name":"小王", ?*?"age":20 ?*?} ?*?} ?*?} ?*/ ?JSONObject?jsonObject?=?JSONObject.parseObject(message); ?String?interfaces?=?jsonObject.getString("interfaces"); ?JSONObject?parameter?=?jsonObject.getJSONObject("parameter"); ?Set<String>?strings?=?parameter.keySet(); ?RpcRequest?request?=?new?RpcRequest(); ?request.setInterfaceIdentity(interfaces); ?Map<String,?Object>?parameterMap?=?new?HashMap<>(16); ?String?requestId?=?jsonObject.getString("requestId"); ?for?(String?key?:?strings)?{ ?if?(key.equals("java.lang.String"))?{ ?parameterMap.put(key,?parameter.getString(key)); ?}?else?{ ?Class?clazz?=?Class.forName(key); ?Object?object?=?parameter.getObject(key,?clazz); ?parameterMap.put(key,?object); ?} ?} ?request.setParameterMap(parameterMap); ?request.setCtx(ctx); ?request.setRequestId(requestId); ?return?request; ?} }
接著從request中解析出來(lái)需要調(diào)用的接口,然后通過(guò)反射調(diào)用對(duì)應(yīng)的接口,得到結(jié)果后我們將響應(yīng)封裝成PrcResponse寫(xiě)回給客戶端:
public?class?RpcResponse?{ ?private?String?result; ?private?String?interfaceMethodIdentify; ?private?String?requestId; ?public?String?getResult()?{ ?return?result; ?} ?public?void?setResult(String?result)?{ ?this.result?=?result; ?} ?public?static?RpcResponse?create(String?result,?String?interfaceMethodIdentify,?String?requestId)?{ ?RpcResponse?response?=?new?RpcResponse(); ?response.setResult(result); ?response.setInterfaceMethodIdentify(interfaceMethodIdentify); ?response.setRequestId(requestId); ?return?response; ?} }
里面包含了請(qǐng)求的結(jié)果JSON串,接口方法唯一標(biāo)識(shí),請(qǐng)求ID。數(shù)據(jù)大概看起來(lái)這個(gè)樣子:
{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean","requestId":"3", "result":"\"牛逼,我收到了消息:TestBean{name='張三',?age=20}\""}
通過(guò)這樣的信息,客戶端就可以通過(guò)響應(yīng)結(jié)果解析出來(lái)。
測(cè)試服務(wù)提供者
既然我們代碼寫(xiě)完了,現(xiàn)在需要測(cè)試一把:
首先我們先寫(xiě)一個(gè)HelloService的實(shí)現(xiàn)類出來(lái):
public?class?HelloServiceImpl?implements?HelloService?{ ?@Override ?public?String?sayHello(TestBean?testBean)?{ ?return?"牛逼,我收到了消息:"?+?testBean; ?} }
接著編寫(xiě)服務(wù)提供者代碼:
public?class?TestProducer?{ ?public?static?void?main(String[]?args)?throws?Exception?{ ?String?connectionString?=?"zookeeper://localhost1:2181,localhost2:2181,localhost3:2181"; ?HelloService?service?=?new?HelloServiceImpl(); ?ServiceConfig?config?=?new?ServiceConfig<>(HelloService.class,?service); ?ListserviceConfigList?=?new?ArrayList<>(); ?serviceConfigList.add(config); ?ApplicationContext?ctx?=?new?ApplicationContext(connectionString,?serviceConfigList, ?null,?50071); ?} }
接著啟動(dòng)起來(lái),看到日志:
Zookeeper?Client初始化完畢...... 注冊(cè)到注冊(cè)中心,路徑為:【/myRPC/interface=com.study.rpc.test.producer.HelloService& method=sayHello?meter=com.study.rpc.test.producer.TestBean】 信息為:RegistryInfo{hostname='localhost',?ip='192.168.16.7',?port=50071} 啟動(dòng)NettyService,端口為:50071
這個(gè)時(shí)候,我們期望用NettyClient發(fā)送請(qǐng)求:
{ "interfaces":?"interface=com.study.rpc.test.producer.HelloService& method=sayHello?meter=com.study.rpc.test.producer.TestBean", "requestId":?"3", "parameter":?{ "com.study.rpc.test.producer.TestBean":?{ "age":?20, "name":?"張三" } } }
得到的響應(yīng)應(yīng)該是:
{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean","requestId":"3", "result":"\"牛逼,我收到了消息:TestBean{name='張三',?age=20}\""}
那么,可以編寫(xiě)一個(gè)測(cè)試程序(這個(gè)程序僅僅用于中間測(cè)試用,讀者不必理解):
public class NettyClient {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture sync = b.connect("127.0.0.1", 50071).sync();
sync.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
private static class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
JSONObject jsonObject = new JSONObject();
jsonObject.put("interfaces", "interface=com.study.rpc.test.producer" +
".HelloService&method=sayHello?meter=com.study.rpc.test.producer.TestBean");
JSONObject param = new JSONObject();
JSONObject bean = new JSONObject();
bean.put("age", 20);
bean.put("name", "張三");
param.put("com.study.rpc.test.producer.TestBean", bean);
jsonObject.put("parameter", param);
jsonObject.put("requestId", 3);
System.out.println("發(fā)送給服務(wù)端JSON為:" + jsonObject.toJSONString());
String msg = jsonObject.toJSONString() + "$$";
ByteBuf byteBuf = Unpooled.buffer(msg.getBytes().length);
byteBuf.writeBytes(msg.getBytes());
ctx.writeAndFlush(byteBuf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到消息:" + msg);
}
}
}
啟動(dòng)之后,看到控制臺(tái)輸出:
發(fā)送給服務(wù)端JSON為:{"interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello&
parameter=com.study.rpc.test.producer.TestBean","requestId":3,
"parameter":{"com.study.rpc.test.producer.TestBean":{"name":"張三","age":20}}}
收到消息:{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService&
method=sayHello?meter=com.study.rpc.test.producer.TestBean","requestId":"3",
"result":"\"牛逼,我收到了消息:TestBean{name='張三', age=20}\""}
bingo,完美實(shí)現(xiàn)了RPC的服務(wù)提供者。接下來(lái)我們只需要實(shí)現(xiàn)服務(wù)消費(fèi)者就完成了。
開(kāi)發(fā)服務(wù)消費(fèi)者
服務(wù)消費(fèi)者是同樣的處理,我們同樣要定義一個(gè)消費(fèi)者的配置:
public?class?ReferenceConfig{ ?private?Class?type; ?public?ReferenceConfig(Classtype)?{ ?this.type?=?type; ?} ?public?ClassgetType()?{ ?return?type; ?} ?public?void?setType(Classtype)?{ ?this.type?=?type; ?} }
然后我們是統(tǒng)一入口,在ApplicationContext中修改代碼:
public?ApplicationContext(String?registryUrl,?ListserviceConfigs, ?ListreferenceConfigs,?int?port)?throws?Exception?{ ?//?step?1:?保存服務(wù)提供者和消費(fèi)者 ?this.serviceConfigs?=?serviceConfigs?==?null???new?ArrayList<>()?:?serviceConfigs; ?this.referenceConfigs?=?referenceConfigs?==?null???new?ArrayList<>()?:?referenceConfigs; ?//?.... ? } private?void?doRegistry(RegistryInfo?registryInfo)?throws?Exception?{ ?for?(ServiceConfig?config?:?serviceConfigs)?{ ?Class?type?=?config.getType(); ?registry.register(type,?registryInfo); ?Method[]?declaredMethods?=?type.getDeclaredMethods(); ?for?(Method?method?:?declaredMethods)?{ ?String?identify?=?InvokeUtils.buildInterfaceMethodIdentify(type,?method); ?interfaceMethods.put(identify,?method); ?} ?} ?for?(ReferenceConfig?config?:?referenceConfigs)?{ ?ListregistryInfos?=?registry.fetchRegistry(config.getType()); ?if?(registryInfos?!=?null)?{ ?interfacesMethodRegistryList.put(config.getType(),?registryInfos); ?initChannel(registryInfos); ?} ?} }
在注冊(cè)的時(shí)候,我們需要將需要消費(fèi)的接口,通過(guò)注冊(cè)中心抓取出來(lái),所以注冊(cè)中心要增加一個(gè)接口方法:
public?interface?Registry?{ ?/** ?*?將生產(chǎn)者接口注冊(cè)到注冊(cè)中心 ?* ?*?@param?clazz?類 ?*?@param?registryInfo?本機(jī)的注冊(cè)信息 ?*/ ?void?register(Class?clazz,?RegistryInfo?registryInfo)?throws?Exception; ?/** ?*?為服務(wù)提供者抓取注冊(cè)表 ?* ?*?@param?clazz?類 ?*?@return?服務(wù)提供者所在的機(jī)器列表 ?*/ ?ListfetchRegistry(Class?clazz)?throws?Exception; }
獲取服務(wù)提供者的機(jī)器列表
具體在Zookeeper中的實(shí)現(xiàn)如下:
@Override public?ListfetchRegistry(Class?clazz)?throws?Exception?{ ?Method[]?declaredMethods?=?clazz.getDeclaredMethods(); ?ListregistryInfos?=?null; ?for?(Method?method?:?declaredMethods)?{ ?String?key?=?InvokeUtils.buildInterfaceMethodIdentify(clazz,?method); ?String?path?=?"/myRPC/"?+?key; ?Stat?stat?=?client.checkExists() ?.forPath(path); ?if?(stat?==?null)?{ ?//?這里可以添加watcher來(lái)監(jiān)聽(tīng)變化,這里簡(jiǎn)化了,沒(méi)有做這個(gè)事情 ?System.out.println("警告:無(wú)法找到服務(wù)接口:"?+?path); ?continue; ?} ?if?(registryInfos?==?null)?{ ?byte[]?bytes?=?client.getData().forPath(path); ?String?data?=?new?String(bytes,?StandardCharsets.UTF_8); ?registryInfos?=?JSONArray.parseArray(data,?RegistryInfo.class); ?} ?} ?return?registryInfos; }
其實(shí)就是去zookeeper獲取節(jié)點(diǎn)中的數(shù)據(jù),得到接口所在的機(jī)器信息,獲取到的注冊(cè)信息諸侯,就會(huì)調(diào)用以下代碼:
if?(registryInfos?!=?null)?{ ?//?保存接口和服務(wù)地址 ?interfacesMethodRegistryList.put(config.getType(),?registryInfos); ?//?初始化網(wǎng)絡(luò)連接 ?initChannel(registryInfos); } private?void?initChannel(ListregistryInfos)?throws?InterruptedException?{ ?for?(RegistryInfo?info?:?registryInfos)?{ ?if?(!channels.containsKey(info))?{ ?System.out.println("開(kāi)始建立連接:"?+?info.getIp()?+?",?"?+?info.getPort()); ?NettyClient?client?=?new?NettyClient(info.getIp(),?info.getPort()); ?client.setMessageCallback(message?->?{ ?//?這里收單服務(wù)端返回的消息,先壓入隊(duì)列 ?RpcResponse?response?=?JSONObject.parseObject(message,?RpcResponse.class); ?responses.offer(response); ?synchronized?(ApplicationContext.this)?{ ?ApplicationContext.this.notifyAll(); ?} ?}); ?//?等待連接建立 ?ChannelHandlerContext?ctx?=?client.getCtx(); ?channels.put(info,?ctx); ?} ?} }
我們會(huì)針對(duì)每一個(gè)唯一的RegistryInfo建立一個(gè)連接,然后有這樣一段代碼:
client.setMessageCallback(message?->?{ ?//?這里收單服務(wù)端返回的消息,先壓入隊(duì)列 ?RpcResponse?response?=?JSONObject.parseObject(message,?RpcResponse.class); ?responses.offer(response); ?synchronized?(ApplicationContext.this)?{ ?ApplicationContext.this.notifyAll(); ?} });
設(shè)置一個(gè)callback,用于收到消息的時(shí)候,回調(diào)這里的代碼,這部分我們后面再分析。
然后在client.getCtx()的時(shí)候,同步阻塞直到連接完成,建立好連接后通過(guò),NettyClient的代碼如下:
public?class?NettyClient?{ ?private?ChannelHandlerContext?ctx; ?private?MessageCallback?messageCallback; ?public?NettyClient(String?ip,?Integer?port)?{ ?EventLoopGroup?group?=?new?NioEventLoopGroup(); ?try?{ ?Bootstrap?b?=?new?Bootstrap(); ?b.group(group) ?.channel(NioSocketChannel.class) ?.option(ChannelOption.TCP_NODELAY,?true) ?.handler(new?ChannelInitializer()?{ ?@Override ?protected?void?initChannel(SocketChannel?ch)?throws?Exception?{ ?ByteBuf?delimiter?=?Unpooled.copiedBuffer("$$".getBytes()); ?//?設(shè)置按照分隔符“&&”來(lái)切分消息,單條消息限制為?1MB ?ch.pipeline().addLast(new?DelimiterBasedFrameDecoder(1024?*?1024,?delimiter)); ?ch.pipeline().addLast(new?StringDecoder()); ?ch.pipeline().addLast(new?NettyClientHandler()); ?} ?}); ?ChannelFuture?sync?=?b.connect(ip,?port).sync(); ?}?catch?(Exception?e)?{ ?e.printStackTrace(); ?} ?} ?public?void?setMessageCallback(MessageCallback?callback)?{ ?this.messageCallback?=?callback; ?} ?public?ChannelHandlerContext?getCtx()?throws?InterruptedException?{ ?System.out.println("等待連接成功..."); ?if?(ctx?==?null)?{ ?synchronized?(this)?{ ?wait(); ?} ?} ?return?ctx; ?} ?private?class?NettyClientHandler?extends?ChannelInboundHandlerAdapter?{ ?@Override ?public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{ ?try?{ ?String?message?=?(String)?msg; ?if?(messageCallback?!=?null)?{ ?messageCallback.onMessage(message); ?} ?}?finally?{ ?ReferenceCountUtil.release(msg); ?} ?} ?@Override ?public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{ ?NettyClient.this.ctx?=?ctx; ?System.out.println("連接成功:"?+?ctx); ?synchronized?(NettyClient.this)?{ ?NettyClient.this.notifyAll(); ?} ?} ?@Override ?public?void?channelReadComplete(ChannelHandlerContext?ctx)?throws?Exception?{ ?ctx.flush(); ?} ?@Override ?public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{ ?cause.printStackTrace(); ?} ?} ?public?interface?MessageCallback?{ ?void?onMessage(String?message); ?} }
這里主要是用了wait()和notifyAll()來(lái)實(shí)現(xiàn)同步阻塞等待連接建立。
建立好連接后,我們保存到集合中:
//?等待連接建立 ChannelHandlerContext?ctx?=?client.getCtx(); channels.put(info,?ctx);
發(fā)送請(qǐng)求
好了,到了這里我們?yōu)槊恳粋€(gè)需要消費(fèi)的接口建立了網(wǎng)絡(luò)連接,接下來(lái)要做的事情就是提供一個(gè)接口給用戶獲取服務(wù)提供者實(shí)例:
我把這個(gè)方法寫(xiě)在ApplicationContext中:
/** ?*?負(fù)責(zé)生成requestId的類 ?*/ private?LongAdder?requestIdWorker?=?new?LongAdder(); /** ?*?獲取調(diào)用服務(wù) ?*/ @SuppressWarnings("unchecked") publicT?getService(Classclazz)?{ ?return?(T)?Proxy.newProxyInstance(getClass().getClassLoader(),?new?Class[]{clazz},?new?InvocationHandler()?{ ?@Override ?public?Object?invoke(Object?proxy,?Method?method,?Object[]?args)?throws?Throwable?{ ?String?methodName?=?method.getName(); ?if?("equals".equals(methodName)?||?"hashCode".equals(methodName))?{ ?throw?new?IllegalAccessException("不能訪問(wèn)"?+?methodName?+?"方法"); ?} ?if?("toString".equals(methodName))?{ ?return?clazz.getName()?+?"#"?+?methodName; ?} ?//?step?1:?獲取服務(wù)地址列表 ?ListregistryInfos?=?interfacesMethodRegistryList.get(clazz); ?if?(registryInfos?==?null)?{ ?throw?new?RuntimeException("無(wú)法找到服務(wù)提供者"); ?} ?//?step?2:?負(fù)載均衡 ?RegistryInfo?registryInfo?=?loadBalancer.choose(registryInfos); ?ChannelHandlerContext?ctx?=?channels.get(registryInfo); ?String?identify?=?InvokeUtils.buildInterfaceMethodIdentify(clazz,?method); ?String?requestId; ?synchronized?(ApplicationContext.this)?{ ?requestIdWorker.increment(); ?requestId?=?String.valueOf(requestIdWorker.longValue()); ?} ?Invoker?invoker?=?new?DefaultInvoker(method.getReturnType(),?ctx,?requestId,?identify); ?inProgressInvoker.put(identify?+?"#"?+?requestId,?invoker); ?return?invoker.invoke(args); ?} ?}); }
這里主要是通過(guò)動(dòng)態(tài)代理來(lái)實(shí)現(xiàn)的,首先通過(guò)class來(lái)獲取對(duì)應(yīng)的機(jī)器列表,接著通過(guò)loadBalancer來(lái)選擇一個(gè)機(jī)器。這個(gè)LoaderBalance是一個(gè)接口:
public?interface?LoadBalancer?{ ?/** ?*?選擇一個(gè)生產(chǎn)者 ?* ?*?@param?registryInfos?生產(chǎn)者列表 ?*?@return?選中的生產(chǎn)者 ?*/ ?RegistryInfo?choose(ListregistryInfos); }
在ApplicationContext初始化的時(shí)候可以選擇不同的實(shí)現(xiàn),我這里主要實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的隨機(jī)算法(后續(xù)可以拓展為其他的,比如RoundRobin之類的):
public?class?RandomLoadbalancer?implements?LoadBalancer?{ ?@Override ?public?RegistryInfo?choose(ListregistryInfos)?{ ?Random?random?=?new?Random(); ?int?index?=?random.nextInt(registryInfos.size()); ?return?registryInfos.get(index); ?} }
接著構(gòu)造接口方法的唯一標(biāo)識(shí)identify,還有一個(gè)requestId。
為什么需要一個(gè)requestId呢?
這是因?yàn)槲覀冊(cè)谔幚眄憫?yīng)的時(shí)候,需要找到某個(gè)響應(yīng)是對(duì)應(yīng)的哪個(gè)請(qǐng)求,但是僅僅使用identify是不行的
因?yàn)槲覀兺粋€(gè)應(yīng)用程序中可能會(huì)有多個(gè)線程同時(shí)調(diào)用同一個(gè)接口的同一個(gè)方法,這樣的identify是相同的。
所以我們需要用 identify + requestId的方式來(lái)判斷,reqeustId是一個(gè)自增的LongAddr。服務(wù)端在響應(yīng)的時(shí)候會(huì)將requestId返回。
接著我們構(gòu)造了一個(gè)Invoker,把它放入inProgressInvoker的集合中。調(diào)用了其invoke方法:
Invoker?invoker?=?new?DefaultInvoker(method.getReturnType(),?ctx,?requestId,?identify); inProgressInvoker.put(identify?+?"#"?+?requestId,?invoker); //?阻塞等待結(jié)果 return?invoker.invoke(args); public?class?DefaultInvokerimplements?Invoker{ ?private?ChannelHandlerContext?ctx; ?private?String?requestId; ?private?String?identify; ?private?Cla***eturnType; ?private?T?result; ?DefaultInvoker(Cla***eturnType,?ChannelHandlerContext?ctx,?String?requestId,?String?identify)?{ ?this.returnType?=?returnType; ?this.ctx?=?ctx; ?this.requestId?=?requestId; ?this.identify?=?identify; ?} ?@SuppressWarnings("unckecked") ?@Override ?public?T?invoke(Object[]?args)?{ ?JSONObject?jsonObject?=?new?JSONObject(); ?jsonObject.put("interfaces",?identify); ?JSONObject?param?=?new?JSONObject(); ?if?(args?!=?null)?{ ?for?(Object?obj?:?args)?{ ?param.put(obj.getClass().getName(),?obj); ?} ?} ?jsonObject.put("parameter",?param); ?jsonObject.put("requestId",?requestId); ?System.out.println("發(fā)送給服務(wù)端JSON為:"?+?jsonObject.toJSONString()); ?String?msg?=?jsonObject.toJSONString()?+?"$$"; ?ByteBuf?byteBuf?=?Unpooled.buffer(msg.getBytes().length); ?byteBuf.writeBytes(msg.getBytes()); ?ctx.writeAndFlush(byteBuf); ?waitForResult(); ?return?result; ?} ?@Override ?public?void?setResult(String?result)?{ ?synchronized?(this)?{ ?this.result?=?JSONObject.parseObject(result,?returnType); ?notifyAll(); ?} ?} ?private?void?waitForResult()?{ ?synchronized?(this)?{ ?try?{ ?wait(); ?}?catch?(InterruptedException?e)?{ ?e.printStackTrace(); ?} ?} ?} }
我們可以看到調(diào)用Invoker的invoke方法之后,會(huì)運(yùn)行到waitForResult()這里,這里已經(jīng)把請(qǐng)求通過(guò)網(wǎng)絡(luò)發(fā)送出去了,但是就會(huì)被卡住。
這是因?yàn)槲覀兊木W(wǎng)絡(luò)請(qǐng)求的結(jié)果不是同步返回的,有可能是客戶端同時(shí)發(fā)起很多個(gè)請(qǐng)求,所以我們不可能在這里讓他同步阻塞等待的。
接受響應(yīng)
那么對(duì)于服務(wù)消費(fèi)者而言,把請(qǐng)求發(fā)送出去但是卡住了,這個(gè)時(shí)候當(dāng)服務(wù)端處理完之后,會(huì)把消息返回給客戶端。返回的入口在
NettyClient的onChannelRead中:
@Override public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{ ?try?{ ?String?message?=?(String)?msg; ?if?(messageCallback?!=?null)?{ ?messageCallback.onMessage(message); ?} ?}?finally?{ ?ReferenceCountUtil.release(msg); ?} }
這里通過(guò)callback回調(diào)出去。是否還記的我們?cè)诔跏蓟疦ettyClient的時(shí)候,會(huì)設(shè)置一個(gè)callback?
/** ?*?響應(yīng)隊(duì)列 ?*/ private?ConcurrentLinkedQueueresponses?=?new?ConcurrentLinkedQueue<>(); client.setMessageCallback(message?->?{ ?//?這里收單服務(wù)端返回的消息,先壓入隊(duì)列 ?RpcResponse?response?=?JSONObject.parseObject(message,?RpcResponse.class); ?responses.offer(response); ?synchronized?(ApplicationContext.this)?{ ?ApplicationContext.this.notifyAll(); ?} });
這里接受消息之后,解析成為一個(gè)RpcResponse對(duì)象,然后壓入responses隊(duì)列中,這樣我們就把所有的請(qǐng)求響應(yīng)放入隊(duì)列中。
但是這樣一來(lái),我們應(yīng)該怎么把響應(yīng)結(jié)果返回給調(diào)用的地方呢?
我們可以這樣做:起一個(gè)或多個(gè)后臺(tái)線程,然后從隊(duì)列中拿出響應(yīng),然后根據(jù)響應(yīng)從我們之前保存的inProcessInvoker中找出對(duì)應(yīng)的Invoker,然后把結(jié)果返回回去
public?ApplicationContext(....){ ? ?//..... ? ?//?step?5:?jiǎn)?dòng)處理響應(yīng)的processor ?initProcessor(); ? } private?void?initProcessor()?{ ?//?事實(shí)上,這里可以通過(guò)配置文件讀取,啟動(dòng)多少個(gè)processor ?int?num?=?3; ?processors?=?new?ResponseProcessor[num]; ?for?(int?i?=?0;?i?<?3;?i++)?{ ?processors[i]?=?createProcessor(i); ?} } /** ?*?處理響應(yīng)的線程 ?*/ private?class?ResponseProcessor?extends?Thread?{ ?@Override ?public?void?run()?{ ?System.out.println("啟動(dòng)響應(yīng)處理線程:"?+?getName()); ?while?(true)?{ ?//?多個(gè)線程在這里獲取響應(yīng),只有一個(gè)成功 ?RpcResponse?response?=?responses.poll(); ?if?(response?==?null)?{ ?try?{ ?synchronized?(ApplicationContext.this)?{ ?//?如果沒(méi)有響應(yīng),先休眠 ?ApplicationContext.this.wait(); ?} ?}?catch?(InterruptedException?e)?{ ?e.printStackTrace(); ?} ?}?else?{ ?System.out.println("收到一個(gè)響應(yīng):"?+?response); ?String?interfaceMethodIdentify?=?response.getInterfaceMethodIdentify(); ?String?requestId?=?response.getRequestId(); ?String?key?=?interfaceMethodIdentify?+?"#"?+?requestId; ?Invoker?invoker?=?inProgressInvoker.remove(key); ?invoker.setResult(response.getResult()); ?} ?} ?} }
這里面如果從隊(duì)列中拿不到數(shù)據(jù),就會(huì)調(diào)用wait()方法等待
這里需要注意的是,在callbak中獲取到響應(yīng)的時(shí)候我們是會(huì)調(diào)用notifyAll()來(lái)喚醒這里的線程的:
responses.offer(response); synchronized?(ApplicationContext.this)?{ ?ApplicationContext.this.notifyAll(); }
這里被喚醒之后,就會(huì)有多個(gè)線程去爭(zhēng)搶那個(gè)響應(yīng),因?yàn)殛?duì)列是線程安全的,所以這里多個(gè)線程可以獲取到響應(yīng)結(jié)果。
接著拿到結(jié)果之后,通過(guò)identify + requestId構(gòu)造成唯一的請(qǐng)求標(biāo)識(shí),從inProgressInvoker中獲取對(duì)應(yīng)的invoker,然后通過(guò)setResult將結(jié)果設(shè)置進(jìn)去:
String?key?=?interfaceMethodIdentify?+?"#"?+?requestId; Invoker?invoker?=?inProgressInvoker.remove(key); invoker.setResult(response.getResult()); @Override public?void?setResult(String?result)?{ ?synchronized?(this)?{ ?this.result?=?JSONObject.parseObject(result,?returnType); ?notifyAll(); ?} }
這里設(shè)置進(jìn)去之后,就會(huì)將結(jié)果用json反序列化成為用戶需要的結(jié)果,然后調(diào)用其notifyAll方法喚醒invoke方法被阻塞的線程:
?@SuppressWarnings("unckecked") ?@Override ?public?T?invoke(Object[]?args)?{ ?JSONObject?jsonObject?=?new?JSONObject(); ?jsonObject.put("interfaces",?identify); ?JSONObject?param?=?new?JSONObject(); ?if?(args?!=?null)?{ ?for?(Object?obj?:?args)?{ ?param.put(obj.getClass().getName(),?obj); ?} ?} ?jsonObject.put("parameter",?param); ?jsonObject.put("requestId",?requestId); ?System.out.println("發(fā)送給服務(wù)端JSON為:"?+?jsonObject.toJSONString()); ?String?msg?=?jsonObject.toJSONString()?+?NettyServer.DELIMITER; ?ByteBuf?byteBuf?=?Unpooled.buffer(msg.getBytes().length); ?byteBuf.writeBytes(msg.getBytes()); ?ctx.writeAndFlush(byteBuf); ?//?這里被喚醒 ?waitForResult(); ?return?result; ?}
然后就可以返回結(jié)果了,返回的結(jié)果就會(huì)返回給用戶了。
整體測(cè)試
到了這里我們的生產(chǎn)者和消費(fèi)者的代碼都寫(xiě)完了,我們來(lái)整體測(cè)試一遍。生產(chǎn)者的代碼是和之前的一致的:
public?class?TestProducer?{ ?public?static?void?main(String[]?args)?throws?Exception?{ ?String?connectionString?=?"zookeeper://localhost1:2181,localhost2:2182,localhost3:2181"; ?HelloService?service?=?new?HelloServiceImpl(); ?ServiceConfig?config?=?new?ServiceConfig<>(HelloService.class,?service); ?ListserviceConfigList?=?new?ArrayList<>(); ?serviceConfigList.add(config); ?ApplicationContext?ctx?=?new?ApplicationContext(connectionString,?serviceConfigList,?null,?50071); ?} }
消費(fèi)者測(cè)試代碼:
public?class?TestConsumer?{ ?public?static?void?main(String[]?args)?throws?Exception?{ ?String?connectionString?=?"zookeeper://localhost1:2181,localhost2:2182,localhost3:2181"; ?ReferenceConfigconfig?=?new?ReferenceConfig<>(HelloService.class); ?ApplicationContext?ctx?=?new?ApplicationContext(connectionString,?null,?Collections.singletonList(config), ?50070); ?HelloService?helloService?=?ctx.getService(HelloService.class); ?System.out.println("sayHello(TestBean)結(jié)果為:"?+?helloService.sayHello(new?TestBean("張三",?20))); ?} }
接著啟動(dòng)生產(chǎn)者,然后啟動(dòng)消費(fèi)者:
生產(chǎn)者得到的日志如下:
Zookeeper?Client初始化完畢...... 注冊(cè)到注冊(cè)中心,路徑為:【/myRPC/interface=com.study.rpc.test.producer.HelloService& method=sayHello?meter=com.study.rpc.test.producer.TestBean】 信息為:RegistryInfo{hostname='localhost',?ip='192.168.16.7',?port=50071} 啟動(dòng)NettyService,端口為:50071 啟動(dòng)響應(yīng)處理線程:Response-processor-0 啟動(dòng)響應(yīng)處理線程:Response-processor-2 啟動(dòng)響應(yīng)處理線程:Response-processor-1 接收到消息:{"interfaces":"interface=com.study.rpc.test.producer.HelloService& method=sayHello?meter=com.study.rpc.test.producer.TestBean","requestId":"1", "parameter":{"com.study.rpc.test.producer.TestBean":{"age":20,"name":"張三"}}} 響應(yīng)給客戶端:{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService& method=sayHello?meter=com.study.rpc.test.producer.TestBean","requestId":"1", "result":"\"牛逼,我收到了消息:TestBean{name='張三',?age=20}\""}
消費(fèi)者得到的日志為:
Zookeeper Client初始化完畢......
開(kāi)始建立連接:192.168.16.7, 50071
等待連接成功...
啟動(dòng)響應(yīng)處理線程:Response-processor-1
啟動(dòng)響應(yīng)處理線程:Response-processor-0
啟動(dòng)響應(yīng)處理線程:Response-processor-2
連接成功:ChannelHandlerContext(NettyClient$NettyClientHandler#0,
[id: 0xb7a59701, L:/192.168.16.7:58354 - R:/192.168.16.7:50071])
發(fā)送給服務(wù)端JSON為:{"interfaces":"interface=com.study.rpc.test.producer.HelloService&
method=sayHello?meter=com.study.rpc.test.producer.TestBean","requestId":"1",
"parameter":{"com.study.rpc.test.producer.TestBean":{"age":20,"name":"張三"}}}
收到一個(gè)響應(yīng):RpcResponse{result='"牛逼,我收到了消息:TestBean{name='張三', age=20}"',
interfaceMethodIdentify='interface=com.study.rpc.test.producer.HelloService&
method=sayHello?meter=com.study.rpc.test.producer.TestBean', requestId='1'}
sayHello(TestBean)結(jié)果為:牛逼,我收到了消息:TestBean{name='張三', age=20}
總結(jié)
通過(guò)完成這個(gè)RPC框架,大家應(yīng)該會(huì)大致對(duì)RPC的實(shí)現(xiàn)原理有個(gè)感性的認(rèn)識(shí),這里總結(jié)一下特性:
支持多種注冊(cè)中心,可配置(雖然只實(shí)現(xiàn)了zookeeper,但是我們拓展是非常簡(jiǎn)單的)
支持負(fù)載均衡
當(dāng)然了還有非常多的不足之處,這是無(wú)可否認(rèn)的,隨意寫(xiě)出來(lái)的框架和工業(yè)級(jí)使用的框架比較還是不一樣
我這里列舉一些不完美的地方把(有興趣的可以搞搞):
實(shí)現(xiàn)序列化框架的拓展,多種序列化供用戶選擇
網(wǎng)絡(luò)請(qǐng)求錯(cuò)誤處理,這里實(shí)現(xiàn)非常簡(jiǎn)陋,健壯性很差
注冊(cè)中心不支持故障感知和自動(dòng)恢復(fù)
調(diào)用監(jiān)控,性能指標(biāo)
歡迎工作一到五年的Java工程師朋友們加入我的個(gè)人粉絲群(Java架構(gòu)技術(shù)棧:728987924)群內(nèi)提供免費(fèi)的Java架構(gòu)學(xué)習(xí)資料(里面有高可用、高并發(fā)、高性能及分布式、Jvm性能調(diào)優(yōu)、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個(gè)知識(shí)點(diǎn)的架構(gòu)資料)
合理利用自己每一分每一秒的時(shí)間來(lái)學(xué)習(xí)提升自己,不要再用"沒(méi)有時(shí)間“來(lái)掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來(lái)的自己一個(gè)交代!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。