溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Dubbo點(diǎn)滴(5)之服務(wù)注冊(cè)中心

發(fā)布時(shí)間:2020-06-16 21:25:17 來(lái)源:網(wǎng)絡(luò) 閱讀:11607 作者:randy_shandong 欄目:開(kāi)發(fā)技術(shù)

首先DUBBO本質(zhì)上是一款RPC框架,不可缺少的2個(gè)角色:服務(wù)提供者和服務(wù)消費(fèi)者。

已經(jīng)剖析了服務(wù)端暴露端口過(guò)程。本文簡(jiǎn)單說(shuō)下注冊(cè)中心。

1.注冊(cè)中心是什么玩意

這是官網(wǎng)提供的圖例

Dubbo點(diǎn)滴(5)之服務(wù)注冊(cè)中心

通過(guò)圖例,可以看出消費(fèi)者和提供者并不是直接通信的,中間有個(gè)第三者,就是注冊(cè)中心。這種結(jié)構(gòu),可以實(shí)現(xiàn)消費(fèi)者和提供者兩者的依賴,和參數(shù)信息的集群化。所以這帶來(lái)了幾個(gè)問(wèn)題。

  1. 服務(wù)注冊(cè)

  2. 服務(wù)發(fā)現(xiàn)

  3. 服務(wù)訂閱

  4. 服務(wù)通知

2. 服務(wù)暴露及服務(wù)注冊(cè)過(guò)程

《Dubbo點(diǎn)滴(4)之暴露服務(wù)解析》已經(jīng)剖析了dubbo協(xié)議具體打開(kāi)網(wǎng)絡(luò)端口過(guò)程。本節(jié)內(nèi)容會(huì)隱去這部分內(nèi)容。因?yàn)橐粋€(gè)完整的服務(wù)暴露,主要涉及2部分內(nèi)容,1)打開(kāi)端口等待消費(fèi)者連接;2)將服務(wù)信息登記到注冊(cè)中心,以告知消費(fèi)者可以連接了。

Dubbo點(diǎn)滴(5)之服務(wù)注冊(cè)中心

有3點(diǎn)需要說(shuō)明:

1)首先,根據(jù)條件判斷會(huì)暴露一個(gè)injvm本地服務(wù)(step 6);

InjvmProtocol協(xié)議完成,主要供同一JVM種的消費(fèi)者調(diào)用,提供RPC效率。

2) 為服務(wù)暴露一個(gè)dubbo服務(wù)(step 12),一般為DubboProtocol完成

3)step 12提供的的服務(wù),注冊(cè)到注冊(cè)中心(step 13-step 23)。這一步是本文的剖析重點(diǎn)。

3.認(rèn)識(shí)注冊(cè)中心

Dubbo點(diǎn)滴(5)之服務(wù)注冊(cè)中心

該圖是DUBBO的總體結(jié)構(gòu)圖。重點(diǎn)停留在Resistry層。比較重要的是幾個(gè)組件

ZookeeperRegistry :負(fù)責(zé)與zookeeper進(jìn)行交互

RegistryProtocol :從注冊(cè)中心獲取可用服務(wù),或者將服務(wù)注冊(cè)到zookeeper,然后提供服務(wù)或者提供調(diào)用代理。

RegistryDirectory :維護(hù)著所有可用的遠(yuǎn)程Invoker或者本地的Invoker。這個(gè)類(lèi)實(shí)現(xiàn)了NotifyListner。

NotifyListener :負(fù)責(zé)RegistryDirectory和ZookeeperRegistry的通信。

FailbackRegistry:繼承自Registry,實(shí)現(xiàn)了失敗重試機(jī)制。

4. 注冊(cè)中心數(shù)據(jù)模型

Dubbo點(diǎn)滴(5)之服務(wù)注冊(cè)中心

流程說(shuō)明:

  • 服務(wù)提供者啟動(dòng)時(shí)

    • 向/dubbo/com.foo.BarService/providers目錄下寫(xiě)入自己的URL地址。

  • 服務(wù)消費(fèi)者啟動(dòng)時(shí)

    • 訂閱/dubbo/com.foo.BarService/providers目錄下的提供者URL地址。

    • 并向/dubbo/com.foo.BarService/consumers目錄下寫(xiě)入自己的URL地址。

  • 監(jiān)控中心啟動(dòng)時(shí)

    • 訂閱/dubbo/com.foo.BarService目錄下的所有提供者和消費(fèi)者URL地址。

4.Registry 結(jié)構(gòu)樹(shù)

Dubbo點(diǎn)滴(5)之服務(wù)注冊(cè)中心

ZookeeperRegistry是常見(jiàn)的注冊(cè)中心實(shí)現(xiàn)方案,由ZookeeperRegistryFactory負(fù)責(zé)構(gòu)造。

AbstractRegistry這個(gè)類(lèi)主要存儲(chǔ)的是已經(jīng)注冊(cè)的服務(wù)接口,已經(jīng)訂閱的服務(wù)接口和已經(jīng)收到通知的接口的URL,不能直接調(diào)用。

public abstract class AbstractRegistry implements Registry {

    // 本地磁盤(pán)緩存,其中特殊的key值.registies記錄注冊(cè)中心列表,其它均為notified服務(wù)提供者列表
    private final Properties properties = new Properties();
    // 文件緩存定時(shí)寫(xiě)入
    private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
    //是否是同步保存文件
    private final boolean syncSaveFile ;
    private final AtomicLong lastCacheChanged = new AtomicLong();
    private final Set<URL> registered = new ConcurrentHashSet<URL>();
    private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
    ...
    }

FailbackRegistry 繼承自AbstractRegistry ,實(shí)現(xiàn)了失敗重試機(jī)制。

public abstract class FailbackRegistry extends AbstractRegistry {
// 定時(shí)任務(wù)執(zhí)行器
private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));

// 失敗重試定時(shí)器,定時(shí)檢查是否有請(qǐng)求失敗,如有,無(wú)限次重試
private final ScheduledFuture<?> retryFuture;

private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();

private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();

private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
...
}


5.服務(wù)提供者啟動(dòng)的注冊(cè)過(guò)程

服務(wù)提供者啟動(dòng)時(shí) 向/dubbo/com.foo.BarService/providers目錄下寫(xiě)入自己的URL地址。接下來(lái),找尋下代碼執(zhí)行路徑。RegistryProtocol 作為注冊(cè)中心的核心組件,作為代碼的入口,還要明確一點(diǎn),這是個(gè)注冊(cè)登記過(guò)程。

//入口
public class RegistryProtocol implements Protocol {
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
}
public abstract class AbstractRegistry implements Registry {
    public void register(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("register url == null");
        }
        if (logger.isInfoEnabled()){
            logger.info("Register: " + url);
        }
        registered.add(url);
    }
...
}
public abstract class FailbackRegistry extends AbstractRegistry {
public void register(URL url) {
    super.register(url);
    failedRegistered.remove(url);
    failedUnregistered.remove(url);
    try {
        // 向服務(wù)器端發(fā)送注冊(cè)請(qǐng)求
        doRegister(url);
    } catch (Exception e) {
    ...
    }

        // 將失敗的注冊(cè)請(qǐng)求記錄到失敗列表,定時(shí)重試
        failedRegistered.add(url);
    }
    protected abstract void doRegister(URL url);
    
    protected abstract void doUnregister(URL url);
    
    protected abstract void doSubscribe(URL url, NotifyListener listener);
    
    protected abstract void doUnsubscribe(URL url, NotifyListener listener);
...
}   
 
 public class ZookeeperRegistry extends FailbackRegistry {
     public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        //如果provider的url是“0.0.0.0”或者在參數(shù)中帶anyHost=true則拋出異常注冊(cè)地址不存在
        if (url.isAnyHost()) {
          throw new IllegalStateException("registry address == null");
       }
        //服務(wù)分組(默認(rèn)“dubbo”)
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (! group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        //服務(wù)分組根地址
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        //添加狀態(tài)監(jiān)聽(tīng)器
        zkClient.addStateListener(new StateListener() {
            public void stateChanged(int state) {
               if (state == RECONNECTED) {
               try {
          recover();
       } catch (Exception e) {
          logger.error(e.getMessage(), e);
       }
               }
            }
        });
    }
 
     protected void doRegister(URL url) {
            try {
                //連接注冊(cè)中心注冊(cè),在zk上創(chuàng)建節(jié)點(diǎn)
               zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
            } catch (Throwable e) {
                throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
 }

6. 注冊(cè)中心關(guān)系類(lèi)圖

Dubbo點(diǎn)滴(5)之服務(wù)注冊(cè)中心

重要概念:

ZookeeperRegistry :負(fù)責(zé)與zookeeper進(jìn)行交互

RegistryProtocol :從注冊(cè)中心獲取可用服務(wù),或者將服務(wù)注冊(cè)到zookeeper,然后提供服務(wù)或者提供調(diào)用代理。

RegistryDirectory :維護(hù)著所有可用的遠(yuǎn)程Invoker或者本地的Invoker。這個(gè)類(lèi)實(shí)現(xiàn)了NotifyListner。

NotifyListener :負(fù)責(zé)RegistryDirectory和ZookeeperRegistry的通信。

FailbackRegistry:繼承自Registry,實(shí)現(xiàn)了失敗重試機(jī)制。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI