您好,登錄后才能下訂單哦!
首先DUBBO本質(zhì)上是一款RPC框架,不可缺少的2個(gè)角色:服務(wù)提供者和服務(wù)消費(fèi)者。
已經(jīng)剖析了服務(wù)端暴露端口過(guò)程。本文簡(jiǎn)單說(shuō)下注冊(cè)中心。
1.注冊(cè)中心是什么玩意
這是官網(wǎng)提供的圖例
通過(guò)圖例,可以看出消費(fèi)者和提供者并不是直接通信的,中間有個(gè)第三者,就是注冊(cè)中心。這種結(jié)構(gòu),可以實(shí)現(xiàn)消費(fèi)者和提供者兩者的依賴,和參數(shù)信息的集群化。所以這帶來(lái)了幾個(gè)問(wèn)題。
服務(wù)注冊(cè)
服務(wù)發(fā)現(xiàn)
服務(wù)訂閱
服務(wù)通知
2. 服務(wù)暴露及服務(wù)注冊(cè)過(guò)程
《Dubbo點(diǎn)滴(4)之暴露服務(wù)解析》已經(jīng)剖析了dubbo協(xié)議具體打開(kāi)網(wǎng)絡(luò)端口過(guò)程。本節(jié)內(nèi)容會(huì)隱去這部分內(nèi)容。因?yàn)橐粋€(gè)完整的服務(wù)暴露,主要涉及2部分內(nèi)容,1)打開(kāi)端口等待消費(fèi)者連接;2)將服務(wù)信息登記到注冊(cè)中心,以告知消費(fèi)者可以連接了。
有3點(diǎn)需要說(shuō)明:
1)首先,根據(jù)條件判斷會(huì)暴露一個(gè)injvm本地服務(wù)(step 6);
InjvmProtocol協(xié)議完成,主要供同一JVM種的消費(fèi)者調(diào)用,提供RPC效率。
2) 為服務(wù)暴露一個(gè)dubbo服務(wù)(step 12),一般為DubboProtocol完成
3)step 12提供的的服務(wù),注冊(cè)到注冊(cè)中心(step 13-step 23)。這一步是本文的剖析重點(diǎn)。
3.認(rèn)識(shí)注冊(cè)中心
該圖是DUBBO的總體結(jié)構(gòu)圖。重點(diǎn)停留在Resistry層。比較重要的是幾個(gè)組件
ZookeeperRegistry :負(fù)責(zé)與zookeeper進(jìn)行交互
RegistryProtocol :從注冊(cè)中心獲取可用服務(wù),或者將服務(wù)注冊(cè)到zookeeper,然后提供服務(wù)或者提供調(diào)用代理。
RegistryDirectory :維護(hù)著所有可用的遠(yuǎn)程Invoker或者本地的Invoker。這個(gè)類(lèi)實(shí)現(xiàn)了NotifyListner。
NotifyListener :負(fù)責(zé)RegistryDirectory和ZookeeperRegistry的通信。
FailbackRegistry:繼承自Registry,實(shí)現(xiàn)了失敗重試機(jī)制。
4. 注冊(cè)中心數(shù)據(jù)模型
流程說(shuō)明:
服務(wù)提供者啟動(dòng)時(shí)
向/dubbo/com.foo.BarService/providers目錄下寫(xiě)入自己的URL地址。
服務(wù)消費(fèi)者啟動(dòng)時(shí)
訂閱/dubbo/com.foo.BarService/providers目錄下的提供者URL地址。
并向/dubbo/com.foo.BarService/consumers目錄下寫(xiě)入自己的URL地址。
監(jiān)控中心啟動(dòng)時(shí)
訂閱/dubbo/com.foo.BarService目錄下的所有提供者和消費(fèi)者URL地址。
4.Registry 結(jié)構(gòu)樹(shù)
ZookeeperRegistry是常見(jiàn)的注冊(cè)中心實(shí)現(xiàn)方案,由ZookeeperRegistryFactory負(fù)責(zé)構(gòu)造。
AbstractRegistry這個(gè)類(lèi)主要存儲(chǔ)的是已經(jīng)注冊(cè)的服務(wù)接口,已經(jīng)訂閱的服務(wù)接口和已經(jīng)收到通知的接口的URL,不能直接調(diào)用。
public abstract class AbstractRegistry implements Registry { // 本地磁盤(pán)緩存,其中特殊的key值.registies記錄注冊(cè)中心列表,其它均為notified服務(wù)提供者列表 private final Properties properties = new Properties(); // 文件緩存定時(shí)寫(xiě)入 private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true)); //是否是同步保存文件 private final boolean syncSaveFile ; private final AtomicLong lastCacheChanged = new AtomicLong(); private final Set<URL> registered = new ConcurrentHashSet<URL>(); private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>(); ... }
FailbackRegistry 繼承自AbstractRegistry ,實(shí)現(xiàn)了失敗重試機(jī)制。
public abstract class FailbackRegistry extends AbstractRegistry { // 定時(shí)任務(wù)執(zhí)行器 private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true)); // 失敗重試定時(shí)器,定時(shí)檢查是否有請(qǐng)求失敗,如有,無(wú)限次重試 private final ScheduledFuture<?> retryFuture; private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>(); private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>(); private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>(); ... }
5.服務(wù)提供者啟動(dòng)的注冊(cè)過(guò)程
服務(wù)提供者啟動(dòng)時(shí) 向/dubbo/com.foo.BarService/providers目錄下寫(xiě)入自己的URL地址。接下來(lái),找尋下代碼執(zhí)行路徑。RegistryProtocol 作為注冊(cè)中心的核心組件,作為代碼的入口,還要明確一點(diǎn),這是個(gè)注冊(cè)登記過(guò)程。
//入口 public class RegistryProtocol implements Protocol { final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); registry.register(registedProviderUrl); } public abstract class AbstractRegistry implements Registry { public void register(URL url) { if (url == null) { throw new IllegalArgumentException("register url == null"); } if (logger.isInfoEnabled()){ logger.info("Register: " + url); } registered.add(url); } ... } public abstract class FailbackRegistry extends AbstractRegistry { public void register(URL url) { super.register(url); failedRegistered.remove(url); failedUnregistered.remove(url); try { // 向服務(wù)器端發(fā)送注冊(cè)請(qǐng)求 doRegister(url); } catch (Exception e) { ... } // 將失敗的注冊(cè)請(qǐng)求記錄到失敗列表,定時(shí)重試 failedRegistered.add(url); } protected abstract void doRegister(URL url); protected abstract void doUnregister(URL url); protected abstract void doSubscribe(URL url, NotifyListener listener); protected abstract void doUnsubscribe(URL url, NotifyListener listener); ... } public class ZookeeperRegistry extends FailbackRegistry { public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); //如果provider的url是“0.0.0.0”或者在參數(shù)中帶anyHost=true則拋出異常注冊(cè)地址不存在 if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } //服務(wù)分組(默認(rèn)“dubbo”) String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (! group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } //服務(wù)分組根地址 this.root = group; zkClient = zookeeperTransporter.connect(url); //添加狀態(tài)監(jiān)聽(tīng)器 zkClient.addStateListener(new StateListener() { public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); } protected void doRegister(URL url) { try { //連接注冊(cè)中心注冊(cè),在zk上創(chuàng)建節(jié)點(diǎn) zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } }
6. 注冊(cè)中心關(guān)系類(lèi)圖
重要概念:
ZookeeperRegistry :負(fù)責(zé)與zookeeper進(jìn)行交互
RegistryProtocol :從注冊(cè)中心獲取可用服務(wù),或者將服務(wù)注冊(cè)到zookeeper,然后提供服務(wù)或者提供調(diào)用代理。
RegistryDirectory :維護(hù)著所有可用的遠(yuǎn)程Invoker或者本地的Invoker。這個(gè)類(lèi)實(shí)現(xiàn)了NotifyListner。
NotifyListener :負(fù)責(zé)RegistryDirectory和ZookeeperRegistry的通信。
FailbackRegistry:繼承自Registry,實(shí)現(xiàn)了失敗重試機(jī)制。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。