您好,登錄后才能下訂單哦!
這篇文章主要講解了“nacos中RaftPeerSet的原理和作用是什么”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“nacos中RaftPeerSet的原理和作用是什么”吧!
本文主要研究一下nacos的RaftPeerSet
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java
@Component @DependsOn("serverListManager") public class RaftPeerSet implements ServerChangeListener, ApplicationContextAware { @Autowired private ServerListManager serverListManager; private ApplicationContext applicationContext; private AtomicLong localTerm = new AtomicLong(0L); private RaftPeer leader = null; private Map<String, RaftPeer> peers = new HashMap<>(); private Set<String> sites = new HashSet<>(); private boolean ready = false; public RaftPeerSet() { } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @PostConstruct public void init() { serverListManager.listen(this); } public RaftPeer getLeader() { if (STANDALONE_MODE) { return local(); } return leader; } public Set<String> allSites() { return sites; } public boolean isReady() { return ready; } public void remove(List<String> servers) { for (String server : servers) { peers.remove(server); } } public RaftPeer update(RaftPeer peer) { peers.put(peer.ip, peer); return peer; } public boolean isLeader(String ip) { if (STANDALONE_MODE) { return true; } if (leader == null) { Loggers.RAFT.warn("[IS LEADER] no leader is available now!"); return false; } return StringUtils.equals(leader.ip, ip); } public Set<String> allServersIncludeMyself() { return peers.keySet(); } public Set<String> allServersWithoutMySelf() { Set<String> servers = new HashSet<String>(peers.keySet()); // exclude myself servers.remove(local().ip); return servers; } public Collection<RaftPeer> allPeers() { return peers.values(); } public int size() { return peers.size(); } public RaftPeer decideLeader(RaftPeer candidate) { peers.put(candidate.ip, candidate); SortedBag ips = new TreeBag(); int maxApproveCount = 0; String maxApprovePeer = null; for (RaftPeer peer : peers.values()) { if (StringUtils.isEmpty(peer.voteFor)) { continue; } ips.add(peer.voteFor); if (ips.getCount(peer.voteFor) > maxApproveCount) { maxApproveCount = ips.getCount(peer.voteFor); maxApprovePeer = peer.voteFor; } } if (maxApproveCount >= majorityCount()) { RaftPeer peer = peers.get(maxApprovePeer); peer.state = RaftPeer.State.LEADER; if (!Objects.equals(leader, peer)) { leader = peer; applicationContext.publishEvent(new LeaderElectFinishedEvent(this, leader)); Loggers.RAFT.info("{} has become the LEADER", leader.ip); } } return leader; } public RaftPeer makeLeader(RaftPeer candidate) { if (!Objects.equals(leader, candidate)) { leader = candidate; applicationContext.publishEvent(new MakeLeaderEvent(this, leader)); Loggers.RAFT.info("{} has become the LEADER, local: {}, leader: {}", leader.ip, JSON.toJSONString(local()), JSON.toJSONString(leader)); } for (final RaftPeer peer : peers.values()) { Map<String, String> params = new HashMap<>(1); if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) { try { String url = RaftCore.buildURL(peer.ip, RaftCore.API_GET_PEER); HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT.error("[NACOS-RAFT] get peer failed: {}, peer: {}", response.getResponseBody(), peer.ip); peer.state = RaftPeer.State.FOLLOWER; return 1; } update(JSON.parseObject(response.getResponseBody(), RaftPeer.class)); return 0; } }); } catch (Exception e) { peer.state = RaftPeer.State.FOLLOWER; Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: {}", peer.ip); } } } return update(candidate); } public RaftPeer local() { RaftPeer peer = peers.get(NetUtils.localServer()); if (peer == null && SystemUtils.STANDALONE_MODE) { RaftPeer localPeer = new RaftPeer(); localPeer.ip = NetUtils.localServer(); localPeer.term.set(localTerm.get()); peers.put(localPeer.ip, localPeer); return localPeer; } if (peer == null) { throw new IllegalStateException("unable to find local peer: " + NetUtils.localServer() + ", all peers: " + Arrays.toString(peers.keySet().toArray())); } return peer; } public RaftPeer get(String server) { return peers.get(server); } public int majorityCount() { return peers.size() / 2 + 1; } public void reset() { leader = null; for (RaftPeer peer : peers.values()) { peer.voteFor = null; } } public void setTerm(long term) { localTerm.set(term); } public long getTerm() { return localTerm.get(); } public boolean contains(RaftPeer remote) { return peers.containsKey(remote.ip); } //...... }
RaftPeerSet提供了remove、update、isLeader、allServersIncludeMyself、allServersWithoutMySelf、allPeers、decideLeader、makeLeader、majorityCount、reset等方法
decideLeader方法會遍歷peers,然后使用TreeBag來統(tǒng)計peer.voteFor,當(dāng)maxApproveCount大于等于majorityCount(),則將對應(yīng)的peer的state標(biāo)記為RaftPeer.State.LEADER,然后判斷l(xiāng)eader是否變更,變更則發(fā)布LeaderElectFinishedEvent事件
makeLeader方法判斷candidate與當(dāng)前l(fā)eader是否一致,不一致則更新leader為candidate,發(fā)布MakeLeaderEvent事件,然后遍歷peers給非candidate且state是LEADER狀態(tài)的的節(jié)點發(fā)送API_GET_PEER請求,然后更新該peer在本地的信息,如果請求失敗則更新其state為RaftPeer.State.FOLLOWER
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java
public class MasterElection implements Runnable { @Override public void run() { try { if (!peers.isReady()) { return; } RaftPeer local = peers.local(); local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS; if (local.leaderDueMs > 0) { return; } // reset timeout local.resetLeaderDue(); local.resetHeartbeatDue(); sendVote(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while master election {}", e); } } public void sendVote() { RaftPeer local = peers.get(NetUtils.localServer()); Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JSON.toJSONString(getLeader()), local.term); peers.reset(); local.term.incrementAndGet(); local.voteFor = local.ip; local.state = RaftPeer.State.CANDIDATE; Map<String, String> params = new HashMap<>(1); params.put("vote", JSON.toJSONString(local)); for (final String server : peers.allServersWithoutMySelf()) { final String url = buildURL(server, API_VOTE); try { HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url); return 1; } RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class); Loggers.RAFT.info("received approve from peer: {}", JSON.toJSONString(peer)); peers.decideLeader(peer); return 0; } }); } catch (Exception e) { Loggers.RAFT.warn("error while sending vote to server: {}", server); } } } }
RaftCore.MasterElection的sendVote方法在請求成功時會執(zhí)行peers.decideLeader(peer)方法來選舉leader
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java
@Component public class RaftCore { //...... public RaftPeer receivedBeat(JSONObject beat) throws Exception { final RaftPeer local = peers.local(); final RaftPeer remote = new RaftPeer(); remote.ip = beat.getJSONObject("peer").getString("ip"); remote.state = RaftPeer.State.valueOf(beat.getJSONObject("peer").getString("state")); remote.term.set(beat.getJSONObject("peer").getLongValue("term")); remote.heartbeatDueMs = beat.getJSONObject("peer").getLongValue("heartbeatDueMs"); remote.leaderDueMs = beat.getJSONObject("peer").getLongValue("leaderDueMs"); remote.voteFor = beat.getJSONObject("peer").getString("voteFor"); if (remote.state != RaftPeer.State.LEADER) { Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state, JSON.toJSONString(remote)); throw new IllegalArgumentException("invalid state from master, state: " + remote.state); } if (local.term.get() > remote.term.get()) { Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}" , remote.term.get(), local.term.get(), JSON.toJSONString(remote), local.leaderDueMs); throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get()); } if (local.state != RaftPeer.State.FOLLOWER) { Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JSON.toJSONString(remote)); // mk follower local.state = RaftPeer.State.FOLLOWER; local.voteFor = remote.ip; } final JSONArray beatDatums = beat.getJSONArray("datums"); local.resetLeaderDue(); local.resetHeartbeatDue(); peers.makeLeader(remote); Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size()); for (Map.Entry<String, Datum> entry : datums.entrySet()) { receivedKeysMap.put(entry.getKey(), 0); } // now check datums List<String> batch = new ArrayList<>(); if (!switchDomain.isSendBeatOnly()) { int processedCount = 0; if (Loggers.RAFT.isDebugEnabled()) { Loggers.RAFT.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}", beatDatums.size(), datums.size(), remote.ip, remote.term, local.term); } for (Object object : beatDatums) { processedCount = processedCount + 1; JSONObject entry = (JSONObject) object; String key = entry.getString("key"); final String datumKey; if (KeyBuilder.matchServiceMetaKey(key)) { datumKey = KeyBuilder.detailServiceMetaKey(key); } else if (KeyBuilder.matchInstanceListKey(key)) { datumKey = KeyBuilder.detailInstanceListkey(key); } else { // ignore corrupted key: continue; } long timestamp = entry.getLong("timestamp"); receivedKeysMap.put(datumKey, 1); try { if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) { continue; } if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) { batch.add(datumKey); } if (batch.size() < 50 && processedCount < beatDatums.size()) { continue; } String keys = StringUtils.join(batch, ","); if (batch.size() <= 0) { continue; } Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}, datums' size is {}, RaftCore.datums' size is {}" , getLeader().ip, batch.size(), processedCount, beatDatums.size(), datums.size()); // update datum entry String url = buildURL(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8"); HttpClient.asyncHttpGet(url, null, null, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { return 1; } List<JSONObject> datumList = JSON.parseObject(response.getResponseBody(), new TypeReference<List<JSONObject>>() { }); for (JSONObject datumJson : datumList) { OPERATE_LOCK.lock(); Datum newDatum = null; try { Datum oldDatum = getDatum(datumJson.getString("key")); if (oldDatum != null && datumJson.getLongValue("timestamp") <= oldDatum.timestamp.get()) { Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}", datumJson.getString("key"), datumJson.getLongValue("timestamp"), oldDatum.timestamp); continue; } if (KeyBuilder.matchServiceMetaKey(datumJson.getString("key"))) { Datum<Service> serviceDatum = new Datum<>(); serviceDatum.key = datumJson.getString("key"); serviceDatum.timestamp.set(datumJson.getLongValue("timestamp")); serviceDatum.value = JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Service.class); newDatum = serviceDatum; } if (KeyBuilder.matchInstanceListKey(datumJson.getString("key"))) { Datum<Instances> instancesDatum = new Datum<>(); instancesDatum.key = datumJson.getString("key"); instancesDatum.timestamp.set(datumJson.getLongValue("timestamp")); instancesDatum.value = JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Instances.class); newDatum = instancesDatum; } if (newDatum == null || newDatum.value == null) { Loggers.RAFT.error("receive null datum: {}", datumJson); continue; } raftStore.write(newDatum); datums.put(newDatum.key, newDatum); notifier.addTask(newDatum.key, ApplyAction.CHANGE); local.resetLeaderDue(); if (local.term.get() + 100 > remote.term.get()) { getLeader().term.set(remote.term.get()); local.term.set(getLeader().term.get()); } else { local.term.addAndGet(100); } raftStore.updateTerm(local.term.get()); Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}", newDatum.key, newDatum.timestamp, JSON.toJSONString(remote), local.term); } catch (Throwable e) { Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, e); } finally { OPERATE_LOCK.unlock(); } } TimeUnit.MILLISECONDS.sleep(200); return 0; } }); batch.clear(); } catch (Exception e) { Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey); } } List<String> deadKeys = new ArrayList<>(); for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) { if (entry.getValue() == 0) { deadKeys.add(entry.getKey()); } } for (String deadKey : deadKeys) { try { deleteDatum(deadKey); } catch (Exception e) { Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e); } } } return local; } //...... }
receivedBeat方法會調(diào)用peers.makeLeader(remote)來更新leader
RaftPeerSet提供了remove、update、isLeader、allServersIncludeMyself、allServersWithoutMySelf、allPeers、decideLeader、makeLeader、majorityCount、reset等方法
decideLeader方法會遍歷peers,然后使用TreeBag來統(tǒng)計peer.voteFor,當(dāng)maxApproveCount大于等于majorityCount(),則將對應(yīng)的peer的state標(biāo)記為RaftPeer.State.LEADER,然后判斷l(xiāng)eader是否變更,變更則發(fā)布LeaderElectFinishedEvent事件
makeLeader方法判斷candidate與當(dāng)前l(fā)eader是否一致,不一致則更新leader為candidate,發(fā)布MakeLeaderEvent事件,然后遍歷peers給非candidate且state是LEADER狀態(tài)的的節(jié)點發(fā)送API_GET_PEER請求,然后更新該peer在本地的信息,如果請求失敗則更新其state為RaftPeer.State.FOLLOWER
感謝各位的閱讀,以上就是“nacos中RaftPeerSet的原理和作用是什么”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對nacos中RaftPeerSet的原理和作用是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。