溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Spring Cloud Eureka服務(wù)注冊與取消方法是什么

發(fā)布時(shí)間:2021-11-16 16:39:18 來源:億速云 閱讀:185 作者:iii 欄目:大數(shù)據(jù)

本篇內(nèi)容主要講解“Spring Cloud Eureka服務(wù)注冊與取消方法是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“Spring Cloud Eureka服務(wù)注冊與取消方法是什么”吧!

關(guān)于服務(wù)注冊

開啟/關(guān)閉服務(wù)注冊配置:eureka.client.register-with-eureka = true (默認(rèn))

什么時(shí)候注冊?

  1. 應(yīng)用第一次啟動(dòng)時(shí),初始化EurekaClient時(shí),應(yīng)用狀態(tài)改變:從STARTING變?yōu)閁P會(huì)觸發(fā)這個(gè)Listener,調(diào)用instanceInfoReplicator.onDemandUpdate(); 可以推測出,實(shí)例狀態(tài)改變時(shí),也會(huì)通過注冊接口更新實(shí)例狀態(tài)信息

statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
    @Override
    public String getId() {
        return "statusChangeListener";
    }

    @Override
    public void notify(StatusChangeEvent statusChangeEvent) {
        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
            // log at warn level if DOWN was involved
            logger.warn("Saw local status change event {}", statusChangeEvent);
        } else {
            logger.info("Saw local status change event {}", statusChangeEvent);
        }
        instanceInfoReplicator.onDemandUpdate();
    }
};
  1. 定時(shí)任務(wù),如果InstanceInfo發(fā)生改變,也會(huì)通過注冊接口更新信息

public void run() {
    try {
        discoveryClient.refreshInstanceInfo();
        //如果實(shí)例信息發(fā)生改變,則需要調(diào)用register更新InstanceInfo
        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}
  1. 在定時(shí)renew時(shí),如果renew接口返回404(代表這個(gè)實(shí)例在EurekaServer上面找不到),可能是之前注冊失敗或者注冊過期導(dǎo)致的。這時(shí)需要調(diào)用register重新注冊

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        //如果renew接口返回404(代表這個(gè)實(shí)例在EurekaServer上面找不到),可能是之前注冊失敗或者注冊過期導(dǎo)致的
        if (httpResponse.getStatusCode() == 404) {
            REREGISTER_COUNTER.increment();
            logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode() == 200;
    } catch (Throwable e) {
        logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
        return false;
    }
}

向Eureka發(fā)送注冊請(qǐng)求EurekaServer發(fā)生了什么?

主要有兩個(gè)存儲(chǔ),一個(gè)是之前提到過的registry,還有一個(gè)最近變化隊(duì)列,后面我們會(huì)知道,這個(gè)最近變化隊(duì)列里面就是客戶端獲取增量實(shí)例信息的內(nèi)容:

# 整體注冊信息緩存
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
# 最近變化隊(duì)列
private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>();

EurekaServer收到實(shí)例注冊主要分兩步:

  • 調(diào)用父類方法注冊

  • 同步到其他EurekaServer實(shí)例

public void register(InstanceInfo info, boolean isReplication) {
    int leaseDuration = 90;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    //調(diào)用父類方法注冊
    super.register(info, leaseDuration, isReplication);
    //同步到其他EurekaServer實(shí)例
    this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication);
}

我們先看同步到其他EurekaServer實(shí)例

其實(shí)就是,注冊到的EurekaServer再依次調(diào)用其他集群內(nèi)的EurekaServer的Register方法將實(shí)例信息同步過去

private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }

        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}

然后看看調(diào)用父類方法注冊:

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        //register雖然看上去好像是修改,但是這里用的是讀鎖,后面會(huì)解釋
        read.lock();
        //從registry中查看這個(gè)app是否存在
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        //不存在就創(chuàng)建
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        //查看這個(gè)app的這個(gè)實(shí)例是否已存在
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        
        if (existingLease != null && (existingLease.getHolder() != null)) {
            //如果已存在,對(duì)比時(shí)間戳,保留比較新的實(shí)例信息......
        } else {
            // 如果不存在,證明是一個(gè)新的實(shí)例
            //更新自我保護(hù)監(jiān)控變量的值的代碼.....
            
        }
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        //放入registry
        gMap.put(registrant.getId(), lease);
        
        //加入最近修改的記錄隊(duì)列
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        //初始化狀態(tài),記錄時(shí)間等相關(guān)代碼......
        
        //主動(dòng)讓Response緩存失效
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
    } finally {
        read.unlock();
    }
}

總結(jié)起來,就是主要三件事:

1.將實(shí)例注冊信息放入或者更新registry

2.將實(shí)例注冊信息加入最近修改的記錄隊(duì)列

3.主動(dòng)讓Response緩存失效

我們來類比下服務(wù)取消

服務(wù)取消CANCEL

protected boolean internalCancel(String appName, String id, boolean isReplication) {
    try {
        //cancel雖然看上去好像是修改,但是這里用的是讀鎖,后面會(huì)解釋
        read.lock();
        
        //從registry中剔除這個(gè)實(shí)例
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToCancel = null;
        if (gMap != null) {
            leaseToCancel = gMap.remove(id);
        }
        if (leaseToCancel == null) {
            logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
            return false;
        } else {
            //改變狀態(tài),記錄狀態(tài)修改時(shí)間等相關(guān)代碼......
            if (instanceInfo != null) {
                instanceInfo.setActionType(ActionType.DELETED);
                //加入最近修改的記錄隊(duì)列
                recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
            }
            //主動(dòng)讓Response緩存失效
            invalidateCache(appName, vip, svip);
            logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
            return true;
        }
    } finally {
        read.unlock();
    }
}

總結(jié)起來,也是主要三件事:

1.從registry中剔除這個(gè)實(shí)例

2.將實(shí)例注冊信息加入最近修改的記錄隊(duì)列

3.主動(dòng)讓Response緩存失效

這里我們注意到了這個(gè)最近修改隊(duì)列,我們來詳細(xì)看看

最近修改隊(duì)列

這個(gè)最近修改隊(duì)列和消費(fèi)者定時(shí)獲取服務(wù)實(shí)例列表有著密切的關(guān)系

private TimerTask getDeltaRetentionTask() {
    return new TimerTask() {

        @Override
        public void run() {
            Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
            while (it.hasNext()) {
                if (it.next().getLastUpdateTime() <
                        System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                    it.remove();
                } else {
                    break;
                }
            }
        }

    };
}

這個(gè)RetentionTimeInMSInDeltaQueue默認(rèn)是180s(配置是eureka.server.retention-time-in-m-s-in-delta-queue,默認(rèn)是180s,官網(wǎng)寫錯(cuò)了),可以看出這個(gè)隊(duì)列是一個(gè)長度為180s的滑動(dòng)窗口,保存最近180s以內(nèi)的應(yīng)用實(shí)例信息修改,后面我們會(huì)看到,客戶端調(diào)用獲取增量信息,實(shí)際上就是從這個(gè)queue中讀取,所以可能一段時(shí)間內(nèi)讀取到的信息都是一樣的。

到此,相信大家對(duì)“Spring Cloud Eureka服務(wù)注冊與取消方法是什么”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI