溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何用dubbo源碼解析export 遠程服務(wù)

發(fā)布時間:2021-11-09 17:12:03 來源:億速云 閱讀:103 作者:柒染 欄目:大數(shù)據(jù)

本篇文章給大家分享的是有關(guān)如何用dubbo源碼解析export 遠程服務(wù),小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

服務(wù)注冊信息恢復(fù)com.alibaba.dubbo.registry.support.FailbackRegistry#recover

@Override    protected void recover() throws Exception {        // 獲取服務(wù)注冊的url集合        Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());        if (!recoverRegistered.isEmpty()) {            if (logger.isInfoEnabled()) {                logger.info("Recover register url " + recoverRegistered);            }            for (URL url : recoverRegistered) {//                保存注冊失敗的服務(wù)注冊url                failedRegistered.add(url);            }        }        // 獲取訂閱的服務(wù)url集合        Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());        if (!recoverSubscribed.isEmpty()) {            if (logger.isInfoEnabled()) {                logger.info("Recover subscribe url " + recoverSubscribed.keySet());            }            for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {                URL url = entry.getKey();                for (NotifyListener listener : entry.getValue()) {//                    添加訂閱失敗的服務(wù)url                    addFailedSubscribed(url, listener);                }            }        }    }

進入這個方法com.alibaba.dubbo.registry.support.FailbackRegistry#register服務(wù)注冊

    @Override    public void register(URL url) {//        添加注冊服務(wù)url=》        super.register(url);        failedRegistered.remove(url);        failedUnregistered.remove(url);        try {            // Sending a registration request to the server side 向服務(wù)器端發(fā)送注冊請求=》ZookeeperRegistry            doRegister(url);        } catch (Exception e) {            Throwable t = e;            // If the startup detection is opened, the Exception is thrown directly. 如果打開啟動檢測,則直接拋出異常            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)                    && url.getParameter(Constants.CHECK_KEY, true)                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());            boolean skipFailback = t instanceof SkipFailbackWrapperException;            if (check || skipFailback) {                if (skipFailback) {                    t = t.getCause();                }                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);            } else {                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);            }            // Record a failed registration request to a failed list, retry regularly 將失敗的注冊請求記錄到失敗的列表中,定期重試            failedRegistered.add(url);        }    }

    進入這個方法com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister進行zk服務(wù)注冊,這里的邏輯就是創(chuàng)建zk臨時節(jié)點

      @Override    protected void doRegister(URL url) {        try {//            服務(wù)注冊,創(chuàng)建zk節(jié)點,如果dynamic配置的是true,創(chuàng)建的就是zk臨時節(jié)點=》            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));        } catch (Throwable e) {            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);        }    }

      這里默認(rèn)創(chuàng)建的是臨時節(jié)點,這也就是zk注冊的服務(wù)所在節(jié)點掛了之后其他客戶端節(jié)點本地的服務(wù)列表會更新的原因,不會調(diào)用到不存在的服務(wù),當(dāng)然也存在zk臨時節(jié)點刪除,通知其他訂閱這個節(jié)點的客戶端時候出現(xiàn)網(wǎng)絡(luò)抖動,zk會做處理確保一定能通知到,這種中間處理也能要業(yè)務(wù)邏輯要做處理了

      /dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo://172.28.84.147:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=com.alibaba.dubbo.demo.DemoService&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=76579&side=provider&timestamp=1569898563184 服務(wù)注冊的zk path是這樣的

      如果注冊失敗的話怎么辦呢,在創(chuàng)建com.alibaba.dubbo.registry.support.FailbackRegistry#FailbackRegistry對象的時候構(gòu)造方法邏輯中,重試參數(shù)retry.period 默認(rèn)值是每5秒鐘會做重試處理,這里也可以自定義修改

      public FailbackRegistry(URL url) {        super(url);//        每5秒中會重試注冊失敗的服務(wù)信息,可以修改這個參數(shù)retry.period        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {            @Override            public void run() {                // Check and connect to the registry                try {                    retry();                } catch (Throwable t) { // Defensive fault tolerance                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);                }            }        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);    }

      進入這個方法服務(wù)注冊失敗重試邏輯,com.alibaba.dubbo.registry.support.FailbackRegistry#retry

      protected void retry() {        if (!failedRegistered.isEmpty()) {            Set<URL> failed = new HashSet<URL>(failedRegistered);            if (failed.size() > 0) {                if (logger.isInfoEnabled()) {                    logger.info("Retry register " + failed);                }                try {                    for (URL url : failed) {                        try {//                            zk服務(wù)注冊                            doRegister(url);                            failedRegistered.remove(url);                        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry 這里異常不做處理等待下次重試                            logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);                        }                    }                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry                    logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);                }            }        }        if (!failedUnregistered.isEmpty()) {            Set<URL> failed = new HashSet<URL>(failedUnregistered);            if (!failed.isEmpty()) {                if (logger.isInfoEnabled()) {                    logger.info("Retry unregister " + failed);                }                try {                    for (URL url : failed) {                        try {//                            取消服務(wù)注冊失敗重試                            doUnregister(url);                            failedUnregistered.remove(url);                        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry                            logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);                        }                    }                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry                    logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);                }            }        }        if (!failedSubscribed.isEmpty()) {            Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed);            for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {                if (entry.getValue() == null || entry.getValue().size() == 0) {                    failed.remove(entry.getKey());                }            }            if (failed.size() > 0) {                if (logger.isInfoEnabled()) {                    logger.info("Retry subscribe " + failed);                }                try {                    for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {                        URL url = entry.getKey();                        Set<NotifyListener> listeners = entry.getValue();                        for (NotifyListener listener : listeners) {                            try {//                                服務(wù)訂閱失敗的進行重新訂閱                                doSubscribe(url, listener);                                listeners.remove(listener);                            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry                                logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);                            }                        }                    }                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry                    logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);                }            }        }        if (!failedUnsubscribed.isEmpty()) {            Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed);            for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {                if (entry.getValue() == null || entry.getValue().isEmpty()) {                    failed.remove(entry.getKey());                }            }            if (failed.size() > 0) {                if (logger.isInfoEnabled()) {                    logger.info("Retry unsubscribe " + failed);                }                try {                    for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {                        URL url = entry.getKey();                        Set<NotifyListener> listeners = entry.getValue();                        for (NotifyListener listener : listeners) {                            try {//                                取消訂閱                                doUnsubscribe(url, listener);                                listeners.remove(listener);                            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry                                logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);                            }                        }                    }                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry                    logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);                }            }        }        if (!failedNotified.isEmpty()) {            Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, List<URL>>>(failedNotified);            for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, List<URL>>>(failed).entrySet()) {                if (entry.getValue() == null || entry.getValue().size() == 0) {                    failed.remove(entry.getKey());                }            }            if (failed.size() > 0) {                if (logger.isInfoEnabled()) {                    logger.info("Retry notify " + failed);                }                try {                    for (Map<NotifyListener, List<URL>> values : failed.values()) {                        for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) {                            try {                                NotifyListener listener = entry.getKey();                                List<URL> urls = entry.getValue();                                listener.notify(urls);                                values.remove(listener);                            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry                                logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);                            }                        }                    }                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry                    logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);                }            }        }    }

      com.alibaba.dubbo.registry.support.FailbackRegistry#failedRegistered對服務(wù)注冊失敗的重新注冊

      對服務(wù)取消注冊失敗的進行重新取消服務(wù)注冊com.alibaba.dubbo.registry.support.FailbackRegistry#failedUnregistered,進入這個方法com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry#doUnregister,這里的操作就是刪除刪除zk臨時節(jié)點,刪除zk臨時節(jié)點后其他訂閱服務(wù)的服務(wù)節(jié)點會收到zk的監(jiān)聽器重新刷新已經(jīng)生成的代理invoker對象,客戶端在進行負(fù)載均衡的時候是直接路由到具體的invoker

       @Override    protected void doUnregister(URL url) {        try {            zkClient.delete(toUrlPath(url));        } catch (Throwable e) {            throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);        }    }

        以上就是如何用dubbo源碼解析export 遠程服務(wù),小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。

        向AI問一下細節(jié)

        免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

        AI