溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Nacos CP模式下Raft協(xié)議的服務(wù)注冊和數(shù)據(jù)同步

發(fā)布時間:2021-06-23 14:28:01 來源:億速云 閱讀:935 作者:chen 欄目:編程語言

這篇文章主要講解了“Nacos CP模式下Raft協(xié)議的服務(wù)注冊和數(shù)據(jù)同步”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Nacos CP模式下Raft協(xié)議的服務(wù)注冊和數(shù)據(jù)同步”吧!

Raft協(xié)議:

所有節(jié)點有三種狀態(tài)Follower、Candidate、Leader

Leader選舉:

所有節(jié)點都有一個隨機的休眠時間
某節(jié)點最先休眠完成,會先給自己一票

之后將投票請求發(fā)給其它節(jié)點
(如果有節(jié)點同時蘇醒并發(fā)起投票,則重新開始投票)

一開始都是Follower狀態(tài),某個節(jié)點發(fā)起投票前會是Candidate,將投票發(fā)給其它節(jié)點,如果超過半數(shù)節(jié)點返回同意,則發(fā)起投票節(jié)點狀態(tài)置為Leader

數(shù)據(jù)同步:

所有的寫操作都經(jīng)過Leader

寫操作到Leader時寫入節(jié)點,此時狀態(tài)是未提交

之后發(fā)送給其它節(jié)點,其它節(jié)點都返回確認后,Leader將狀態(tài)置為提交,同時通知其它節(jié)點去寫數(shù)據(jù)

Leader會定時向Follower發(fā)送心跳包,F(xiàn)ollower發(fā)現(xiàn)需要更新數(shù)據(jù)則會主動向Leader拉取數(shù)據(jù)

Raft和ZAB區(qū)別:ZAB所有節(jié)點都可以發(fā)起投票,之后進行票數(shù)的比較,而Raft是休眠后最先蘇醒的節(jié)點發(fā)起投票

Raft演示網(wǎng)站:http://thesecretlivesofdata.com/raft/

上篇博客《Nacos源碼分析(注冊發(fā)現(xiàn)、集群同步、心跳、Eureka對比)》寫到添加實例的addInstance方法中調(diào)用consistencyService.put方法,這里的consistencyService是通過key中是否存在"ephemeral."匹配的,上篇梳理了AP模式,這里梳理一下CP模式服務(wù)注冊的邏輯

            consistencyService.put(key, instances);

    @Override
    public void put(String key, Record value) throws NacosException {
        mapConsistencyService(key).put(key, value);
    }

    private ConsistencyService mapConsistencyService(String key) {
        return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
    }

com.alibaba.nacos.naming.consistency.persistent.raft.RaftConsistencyServiceImpl#put:

            raftCore.signalPublish(key, value);

這里核心就是調(diào)用signalPublish方法:

    public void signalPublish(String key, Record value) throws Exception {
        if (stopWork) {
            throw new IllegalStateException("old raft protocol already stop work");
        }
        // 如果當前不是leader節(jié)點
        if (!isLeader()) {
            ObjectNode params = JacksonUtils.createEmptyJsonNode();
            params.put("key", key);
            params.replace("value", JacksonUtils.transferToJsonNode(value));
            Map<String, String> parameters = new HashMap<>(1);
            parameters.put("key", key);
            
            final RaftPeer leader = getLeader();
            // 構(gòu)造post請求將本次注冊轉(zhuǎn)發(fā)到leader節(jié)點
            raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
            return;
        }
        
        OPERATE_LOCK.lock();
        try {
            final long start = System.currentTimeMillis();
            final Datum datum = new Datum();
            datum.key = key;
            datum.value = value;
            if (getDatum(key) == null) {
                datum.timestamp.set(1L);
            } else {
                datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
            }
            
            ObjectNode json = JacksonUtils.createEmptyJsonNode();
            json.replace("datum", JacksonUtils.transferToJsonNode(datum));
            json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
            // 更新注冊實例數(shù)據(jù)到內(nèi)存和磁盤文件上
            onPublish(datum, peers.local());
            
            final String content = json.toString();
            // 構(gòu)建CountDownLatch,值為實例數(shù)/2+1 即半數(shù)以上
            final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
            // 遍歷包括自己的所有節(jié)點
            for (final String server : peers.allServersIncludeMyself()) {
                // 是leader則-1并跳過
                if (isLeader(server)) {
                    latch.countDown();
                    continue;
                }
                // 其它節(jié)點則異步調(diào)用/raft/datum/commit
                final String url = buildUrl(server, API_ON_PUB);
                HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
                    @Override
                    public void onReceive(RestResult<String> result) {
                        if (!result.ok()) {
                            Loggers.RAFT
                                    .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                            datum.key, server, result.getCode());
                            return;
                        }
                        // 調(diào)用成功,latch-1
                        latch.countDown();
                    }
                    
                    @Override
                    public void onError(Throwable throwable) {
                        Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
                    }
                    
                    @Override
                    public void onCancel() {
                    
                    }
                });
                
            }
            // 如果等待時間超過5s則拋異常(但是在之前本地已經(jīng)修改了注冊信息了,拋異常也沒用)
            if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
                // only majority servers return success can we consider this update success
                Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
                throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
            }
            
            long end = System.currentTimeMillis();
            Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
        } finally {
            OPERATE_LOCK.unlock();
        }
    }

更新注冊實例數(shù)據(jù)的邏輯:

    public void onPublish(Datum datum, RaftPeer source) throws Exception {
        if (stopWork) {
            throw new IllegalStateException("old raft protocol already stop work");
        }
        RaftPeer local = peers.local();
        if (datum.value == null) {
            Loggers.RAFT.warn("received empty datum");
            throw new IllegalStateException("received empty datum");
        }
        
        if (!peers.isLeader(source.ip)) {
            Loggers.RAFT
                    .warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
                            JacksonUtils.toJson(getLeader()));
            throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
        }
        
        if (source.term.get() < local.term.get()) {
            Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
                    JacksonUtils.toJson(local));
            throw new IllegalStateException(
                    "out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
        }
        
        local.resetLeaderDue();
        
        // if data should be persisted, usually this is true:
        if (KeyBuilder.matchPersistentKey(datum.key)) {
            // 實例數(shù)據(jù)寫入磁盤文件 NACOS_HOME/data/naming/data
            raftStore.write(datum);
        }
        
        datums.put(datum.key, datum);
        
        if (isLeader()) {
            local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
        } else {
            if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
                //set leader term:
                getLeader().term.set(source.term.get());
                local.term.set(getLeader().term.get());
            } else {
                local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
            }
        }
        raftStore.updateTerm(local.term.get());
        // 發(fā)布ValueChangeEvent事件 
       NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
        Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
    }

com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier#onEvent接收到事件,執(zhí)行notify方法

    @Override
    public void onEvent(ValueChangeEvent event) {
        notify(event.getKey(), event.getAction(), find.apply(event.getKey()));
    }

最終調(diào)用listener.onChange(key, value);去刷新內(nèi)存注冊信息。

以上是CP模式服務(wù)注冊邏輯,接下來分析服務(wù)選舉

com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore#init:

    @PostConstruct
    public void init() throws Exception {
        Loggers.RAFT.info("initializing Raft sub-system");
        final long start = System.currentTimeMillis();
        // 從磁盤中加載配置信息
        raftStore.loadDatums(notifier, datums);
        
        setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
        
        Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
        
        initialized = true;
        
        Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
        // 定時線程池中發(fā)布主節(jié)點選舉任務(wù),500ms
        masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
        // 心跳任務(wù),500ms
        heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());
        
        versionJudgement.registerObserver(isAllNewVersion -> {
            stopWork = isAllNewVersion;
            if (stopWork) {
                try {
                    shutdown();
                    raftListener.removeOldRaftMetadata();
                } catch (NacosException e) {
                    throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
                }
            }
        }, 100);
        
        NotifyCenter.registerSubscriber(notifier);
        
        Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
                GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
    }

主節(jié)點選舉任務(wù)邏輯:

        @Override
        public void run() {
            try {
                if (stopWork) {
                    return;
                }
                if (!peers.isReady()) {
                    return;
                }
                // 選舉前休眠,leaderDueMs大于0則直接返回
                RaftPeer local = peers.local();
                local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                
                if (local.leaderDueMs > 0) {
                    return;
                }
                
                // reset timeout 重置選舉時間和心跳時間
                local.resetLeaderDue();
                local.resetHeartbeatDue();
                // 發(fā)送投票
                sendVote();
            } catch (Exception e) {
                Loggers.RAFT.warn("[RAFT] error while master election {}", e);
            }
            
        }

發(fā)送投票邏輯:

        private void sendVote() {
            
            RaftPeer local = peers.get(NetUtils.localServer());
            Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),
                    local.term);
            
            peers.reset();
            
            local.term.incrementAndGet(); // 周期+1
            local.voteFor = local.ip; // 投票給自己
            local.state = RaftPeer.State.CANDIDATE; // 狀態(tài)改為候選
            
            Map<String, String> params = new HashMap<>(1);
            params.put("vote", JacksonUtils.toJson(local));
            // 遍歷除自己外節(jié)點
            for (final String server : peers.allServersWithoutMySelf()) {
                // 異步調(diào)用/raft/vote
                final String url = buildUrl(server, API_VOTE);
                try {
                    HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {
                        @Override
                        public void onReceive(RestResult<String> result) {
                            if (!result.ok()) {
                                Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);
                                return;
                            }
                            
                            RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);
                            
                            Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));
                            // 解析其它節(jié)點返回數(shù)據(jù),決定leader節(jié)點
                            // 收到其它節(jié)點的同意加上自己的同意,超過半數(shù)則將自己狀態(tài)置為leader
                            peers.decideLeader(peer);
                            
                        }
                        
                        @Override
                        public void onError(Throwable throwable) {
                            Loggers.RAFT.error("error while sending vote to server: {}", server, throwable);
                        }
                        
                        @Override
                        public void onCancel() {
                        
                        }
                    });
                } catch (Exception e) {
                    Loggers.RAFT.warn("error while sending vote to server: {}", server);
                }
            }
        }

接收投票請求的處理邏輯:

    public synchronized RaftPeer receivedVote(RaftPeer remote) {
        if (stopWork) {
            throw new IllegalStateException("old raft protocol already stop work");
        }
        if (!peers.contains(remote)) {
            throw new IllegalStateException("can not find peer: " + remote.ip);
        }
        // 獲得當前服務(wù)
        RaftPeer local = peers.get(NetUtils.localServer());
        // 如果收到的候選節(jié)點term小于等于當前服務(wù)的term
        if (remote.term.get() <= local.term.get()) {
            String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term;
            
            Loggers.RAFT.info(msg);
            // 將本地的voteFor置為自己的ip,即自己更適合做leader
            if (StringUtils.isEmpty(local.voteFor)) {
                local.voteFor = local.ip;
            }
            
            return local;
        }
        // 重置時間,將本地voteFor置為收到的節(jié)點ip 即本次投票通過
        local.resetLeaderDue();
        
        local.state = RaftPeer.State.FOLLOWER;
        local.voteFor = remote.ip;
        local.term.set(remote.term.get());
        
        Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
        
        return local;
    }

接下來分析心跳部分邏輯:

        @Override
        public void run() {
            try {
                if (stopWork) {
                    return;
                }
                if (!peers.isReady()) {
                    return;
                }
                
                RaftPeer local = peers.local();
                local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                if (local.heartbeatDueMs > 0) {
                    return;
                }
                
                local.resetHeartbeatDue();
                
                sendBeat();
            } catch (Exception e) {
                Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
            }
            
        }

開始和選舉邏輯類似,休眠+重置時間,這里主要看sendBeat邏輯:

        private void sendBeat() throws IOException, InterruptedException {
            RaftPeer local = peers.local();
            // 如果當前節(jié)點狀態(tài)不是leader則不能發(fā)送心跳,直接return
            if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {
                return;
            }
            if (Loggers.RAFT.isDebugEnabled()) {
                Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());
            }
            
            local.resetLeaderDue();
            
            // build data
            ObjectNode packet = JacksonUtils.createEmptyJsonNode();
            packet.replace("peer", JacksonUtils.transferToJsonNode(local));
            
            ArrayNode array = JacksonUtils.createEmptyArrayNode();
            
            if (switchDomain.isSendBeatOnly()) {
                Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly());
            }
            
            if (!switchDomain.isSendBeatOnly()) {
                // 從內(nèi)存中取出注冊信息,將key和時間戳封裝為element并放入array
                for (Datum datum : datums.values()) {
                    
                    ObjectNode element = JacksonUtils.createEmptyJsonNode();
                    
                    if (KeyBuilder.matchServiceMetaKey(datum.key)) {
                        element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
                    } else if (KeyBuilder.matchInstanceListKey(datum.key)) {
                        element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
                    }
                    element.put("timestamp", datum.timestamp.get());
                    
                    array.add(element);
                }
            }
            // 封裝參數(shù)
            packet.replace("datums", array);
            // broadcast
            Map<String, String> params = new HashMap<String, String>(1);
            params.put("beat", JacksonUtils.toJson(packet));
            
            String content = JacksonUtils.toJson(params);
            
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            GZIPOutputStream gzip = new GZIPOutputStream(out);
            gzip.write(content.getBytes(StandardCharsets.UTF_8));
            gzip.close();
            
            byte[] compressedBytes = out.toByteArray();
            String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
            
            if (Loggers.RAFT.isDebugEnabled()) {
                Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(),
                        compressedContent.length());
            }
            // 遍歷除自己外節(jié)點,異步調(diào)用/raft/beat發(fā)送心跳請求
            for (final String server : peers.allServersWithoutMySelf()) {
                try {
                    final String url = buildUrl(server, API_BEAT);
                    if (Loggers.RAFT.isDebugEnabled()) {
                        Loggers.RAFT.debug("send beat to server " + server);
                    }
                    HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {
                        @Override
                        public void onReceive(RestResult<String> result) {
                            if (!result.ok()) {
                                Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", result.getCode(), server);
                                MetricsMonitor.getLeaderSendBeatFailedException().increment();
                                return;
                            }
                            
                            peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));
                            if (Loggers.RAFT.isDebugEnabled()) {
                                Loggers.RAFT.debug("receive beat response from: {}", url);
                            }
                        }
                        
                        @Override
                        public void onError(Throwable throwable) {
                            Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server,
                                    throwable);
                            MetricsMonitor.getLeaderSendBeatFailedException().increment();
                        }
                        
                        @Override
                        public void onCancel() {
                        
                        }
                    });
                } catch (Exception e) {
                    Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
                    MetricsMonitor.getLeaderSendBeatFailedException().increment();
                }
            }
            
        }

服務(wù)端接收心跳請求邏輯:

com.alibaba.nacos.naming.controllers.RaftController#beat

    @PostMapping("/beat")
    public JsonNode beat(HttpServletRequest request, HttpServletResponse response) throws Exception {
        if (versionJudgement.allMemberIsNewVersion()) {
            throw new IllegalStateException("old raft protocol already stop");
        }
        String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);
        String value = URLDecoder.decode(entity, "UTF-8");
        value = URLDecoder.decode(value, "UTF-8");
        // 參數(shù)轉(zhuǎn)換
        JsonNode json = JacksonUtils.toObj(value);
        // 處理心跳邏輯
        RaftPeer peer = raftCore.receivedBeat(JacksonUtils.toObj(json.get("beat").asText()));
        
        return JacksonUtils.transferToJsonNode(peer);
    }

處理心跳邏輯:

        // 如果收到的心跳不是來自leader則拋異常
        if (remote.state != RaftPeer.State.LEADER) {
            Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state,
                    JacksonUtils.toJson(remote));
            throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
        }

        // 如果收到心跳時當前節(jié)點不為FOLLOWER,則置為FOLLOWER(即投票中的話就沒必要去再投票了)
        if (local.state != RaftPeer.State.FOLLOWER) {
            
            Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote));
            // mk follower
            local.state = RaftPeer.State.FOLLOWER;
            local.voteFor = remote.ip;
        }
            // 構(gòu)建receivedKeysMap,將本地內(nèi)存的節(jié)點數(shù)據(jù)放入,value為0;即本地的數(shù)據(jù)
            Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());
            
            for (Map.Entry<String, Datum> entry : datums.entrySet()) {
                receivedKeysMap.put(entry.getKey(), 0);
            }

            // 遍歷傳入的數(shù)據(jù)包
            for (Object object : beatDatums) {
                processedCount = processedCount + 1;
                
                JsonNode entry = (JsonNode) object;
                String key = entry.get("key").asText();
                final String datumKey;
                
                if (KeyBuilder.matchServiceMetaKey(key)) {
                    datumKey = KeyBuilder.detailServiceMetaKey(key);
                } else if (KeyBuilder.matchInstanceListKey(key)) {
                    datumKey = KeyBuilder.detailInstanceListkey(key);
                } else {
                    // ignore corrupted key:
                    continue;
                }
                
                long timestamp = entry.get("timestamp").asLong();
                // 取出key放入receivedKeysMap中,即1代表leader發(fā)過來的數(shù)據(jù),下面會進行節(jié)點刪除
                receivedKeysMap.put(datumKey, 1);
                
                try {
                    // 如果傳入的key內(nèi)存中已存在且版本號更大,則跳過
                    if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp
                            && processedCount < beatDatums.size()) {
                        continue;
                    }
                    // 如果內(nèi)存中不存在且版本號更校則添加入batch中
                    if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
                        batch.add(datumKey);
                    }
                    // 如果batch小于50且處理次數(shù)小于本次收到數(shù)據(jù)的數(shù)量則跳過(為了批量處理)
                    if (batch.size() < 50 && processedCount < beatDatums.size()) {
                        continue;
                    }
                    
                    String keys = StringUtils.join(batch, ",");
                    
                    if (batch.size() <= 0) {
                        continue;
                    }
                    
                 
                    // update datum entry
                    // 調(diào)用leader地址的/raft/datum方法,傳入key拉取數(shù)據(jù)
                    String url = buildUrl(remote.ip, API_GET);
                    Map<String, String> queryParam = new HashMap<>(1);
                    queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));
                    HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() {
                        @Override
                        public void onReceive(RestResult<String> result) {
                            if (!result.ok()) {
                                return;
                            }
                            
                            List<JsonNode> datumList = JacksonUtils
                                    .toObj(result.getData(), new TypeReference<List<JsonNode>>() {
                                    });
                            // 遍歷返回結(jié)果,通過raftStore.write寫入本地文件
                            // 且調(diào)用 notifier.notify更新內(nèi)存數(shù)據(jù)
                            for (JsonNode datumJson : datumList) {
                                Datum newDatum = null;
                                OPERATE_LOCK.lock();
                                try {
                                    
                                    Datum oldDatum = getDatum(datumJson.get("key").asText());
                                    
                                    if (oldDatum != null && datumJson.get("timestamp").asLong() <= oldDatum.timestamp
                                            .get()) {
                                        continue;
                                    }
                                    
                                    if (KeyBuilder.matchServiceMetaKey(datumJson.get("key").asText())) {
                                        Datum<Service> serviceDatum = new Datum<>();
                                        serviceDatum.key = datumJson.get("key").asText();
                                        serviceDatum.timestamp.set(datumJson.get("timestamp").asLong());
                                        serviceDatum.value = JacksonUtils
                                                .toObj(datumJson.get("value").toString(), Service.class);
                                        newDatum = serviceDatum;
                                    }
                                    
                                    if (KeyBuilder.matchInstanceListKey(datumJson.get("key").asText())) {
                                        Datum<Instances> instancesDatum = new Datum<>();
                                        instancesDatum.key = datumJson.get("key").asText();
                                        instancesDatum.timestamp.set(datumJson.get("timestamp").asLong());
                                        instancesDatum.value = JacksonUtils
                                                .toObj(datumJson.get("value").toString(), Instances.class);
                                        newDatum = instancesDatum;
                                    }
                                    
                                    if (newDatum == null || newDatum.value == null) {
                                        Loggers.RAFT.error("receive null datum: {}", datumJson);
                                        continue;
                                    }
                                    
                                    raftStore.write(newDatum);
                                    
                                    datums.put(newDatum.key, newDatum);
                                    notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value);
                                    
                                    local.resetLeaderDue();
                                    
                                    if (local.term.get() + 100 > remote.term.get()) {
                                        getLeader().term.set(remote.term.get());
                                        local.term.set(getLeader().term.get());
                                    } else {
                                        local.term.addAndGet(100);
                                    }
                                    
                                    raftStore.updateTerm(local.term.get());
                                    
                                    Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
                                            newDatum.key, newDatum.timestamp, JacksonUtils.toJson(remote), local.term);
                                    
                                } catch (Throwable e) {
                                    Loggers.RAFT
                                            .error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum,
                                                    e);
                                } finally {
                                    OPERATE_LOCK.unlock();
                                }
                            }
                            try {
                                TimeUnit.MILLISECONDS.sleep(200);
                            } catch (InterruptedException e) {
                                Loggers.RAFT.error("[RAFT-BEAT] Interrupted error ", e);
                            }
                            return;
                        }
                        
                        @Override
                        public void onError(Throwable throwable) {
                            Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader", throwable);
                        }
                        
                        @Override
                        public void onCancel() {
                        
                        }
                        
                    });
                    
                    batch.clear();
                    
                } catch (Exception e) {
                    Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
                }
                
            }
            // 遍歷receivedKeysMap取出值為0的數(shù)據(jù)放入deadKeys中
            List<String> deadKeys = new ArrayList<>();
            for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
                if (entry.getValue() == 0) {
                    deadKeys.add(entry.getKey());
                }
            }
            // 將內(nèi)存和文件中被刪除的節(jié)點移除掉
            for (String deadKey : deadKeys) {
                try {
                    deleteDatum(deadKey);
                } catch (Exception e) {
                    Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
                }
            }

感謝各位的閱讀,以上就是“Nacos CP模式下Raft協(xié)議的服務(wù)注冊和數(shù)據(jù)同步”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對Nacos CP模式下Raft協(xié)議的服務(wù)注冊和數(shù)據(jù)同步這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI