您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“zk中快速選舉FastLeaderElection的實(shí)現(xiàn)方法”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“zk中快速選舉FastLeaderElection的實(shí)現(xiàn)方法”吧!
選舉涉及概念
服務(wù)器狀態(tài)
投票
如何選擇投票?
協(xié)議
選舉
如何進(jìn)行選舉?
epoch
發(fā)送者
接收者
發(fā)送隊(duì)列
接收隊(duì)列
服務(wù)器狀態(tài)
public enum ServerState {
LOOKING,尋找Leader狀態(tài),當(dāng)服務(wù)處于該狀態(tài)時(shí)當(dāng)前集群中沒有Leader,因此需要進(jìn)入Leader選舉
FOLLOWING,跟隨者狀態(tài),表示當(dāng)前是Follower
LEADING,領(lǐng)導(dǎo)者狀態(tài),表明當(dāng)前是Leader
OBSERVING ,觀察者
}
Vote投票
id | 被推薦的leader的sid |
zxid | 被推薦leader的事務(wù)id |
electionEpoch | 判斷多個(gè)投票是否在同一輪選舉周期中,在服務(wù)器是一個(gè)字增序列,進(jìn)入新一輪投票后,都對(duì)該值進(jìn)行加1 |
peerEpoch | 被推薦的leader的epoch |
state | 當(dāng)前服務(wù)器狀態(tài) |
內(nèi)部類
有Messenger ToSend Notification類
Notifications讓其他節(jié)點(diǎn)知道指定節(jié)點(diǎn)的投票發(fā)生了變化,可能是由于節(jié)點(diǎn)競(jìng)選或投票中有更高zxid或相同的zxid有更高的serverid
ToSend類用于包裝發(fā)送的信息
Messenger分為
WorkerReceiver和WorkerSender
主要完成這兩個(gè)對(duì)象信息的設(shè)置
LinkedBlockingQueue<ToSend> sendqueue;
LinkedBlockingQueue<Notification> recvqueue;
public Vote lookForLeader() throws InterruptedException { try { self.jmxLeaderElectionBean = new LeaderElectionBean(); MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); self.jmxLeaderElectionBean = null; } self.start_fle = Time.currentElapsedTime(); try { Map<Long, Vote> recvset = new HashMap<Long, Vote>(); Map<Long, Vote> outofelection = new HashMap<Long, Vote>(); int notTimeout = minNotificationInterval; synchronized (this) { logicalclock.incrementAndGet(); updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid)); sendNotifications(); SyncedLearnerTracker voteSet; /* * Loop in which we exchange notifications until we find a leader */ while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) { /* * Remove next notification from queue, times out after 2 times * the termination time */ Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); /* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */ if (n == null) { if (manager.haveDelivered()) { sendNotifications(); } else { manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout * 2; notTimeout = (tmpTimeOut < maxNotificationInterval ? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } else if (validVoter(n.sid) && validVoter(n.leader)) { /* * Only proceed if the vote comes from a replica in the current or next * voting view for a replica in the current or next voting view. */ switch (n.state) { case LOOKING: if (getInitLastLoggedZxid() == -1) { LOG.debug("Ignoring notification as our zxid is -1"); break; } if (n.zxid == -1) { LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid); break; } // If notification > current, replace and send messages out if (n.electionEpoch > logicalclock.get()) { logicalclock.set(n.electionEpoch); recvset.clear(); if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) { if (LOG.isDebugEnabled()) { LOG.debug( "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } if (LOG.isDebugEnabled()) { LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } // don't care about the version if it's in LOOKING state recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch)); if (voteSet.hasAllQuorums()) { // Verify if there is any change in the proposed leader while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) { if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { setPeerState(proposedLeader, voteSet); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } } break; case OBSERVING: LOG.debug("Notification from observer: {}", n.sid); break; case FOLLOWING: case LEADING: /* * Consider all notifications from the same epoch * together. */ if (n.electionEpoch == logicalclock.get()) { recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) { setPeerState(n.leader, voteSet); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } /* * Before joining an established ensemble, verify that * a majority are following the same leader. */ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) { synchronized (this) { logicalclock.set(n.electionEpoch); setPeerState(n.leader, voteSet); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: LOG.warn("Notification state unrecoginized: " + n.state + " (n.state), " + n.sid + " (n.sid)"); break; } } else { if (!validVoter(n.leader)) { LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid); } if (!validVoter(n.sid)) { LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid); } } } return null; } finally { try { if (self.jmxLeaderElectionBean != null) { MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } self.jmxLeaderElectionBean = null; LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount()); } } 投票相關(guān)函數(shù) 更新投票字段 synchronized void updateProposal(long leader, long zxid, long epoch) { if (LOG.isDebugEnabled()) { LOG.debug("Updating proposal: " + leader + " (newleader), 0x" + Long.toHexString(zxid) + " (newzxid), " + proposedLeader + " (oldleader), 0x" + Long.toHexString(proposedZxid) + " (oldzxid)"); } proposedLeader = leader; proposedZxid = zxid; proposedEpoch = epoch; } 生成投票的函數(shù) public synchronized Vote getVote() { return new Vote(proposedLeader, proposedZxid, proposedEpoch); } 狀態(tài)信息獲取函數(shù) private ServerState learningState() { if (self.getLearnerType() == LearnerType.PARTICIPANT) { LOG.debug("I am a participant: {}", self.getId()); return ServerState.FOLLOWING; } else { LOG.debug("I am an observer: {}", self.getId()); return ServerState.OBSERVING; } } 獲取參與投票服務(wù)器的標(biāo)識(shí)id private long getInitId() { if (self.getQuorumVerifier().getVotingMembers().containsKey(self.getId())) { return self.getId(); } else { return Long.MIN_VALUE; } } 獲取最新的日志事務(wù)id private long getInitLastLoggedZxid() { if (self.getLearnerType() == LearnerType.PARTICIPANT) { return self.getLastLoggedZxid(); } else { return Long.MIN_VALUE; } } 獲取保存在文件中當(dāng)前epoch public long getCurrentEpoch() throws IOException { if (currentEpoch == -1) { currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); } return currentEpoch; } 選舉相關(guān)函數(shù) 判斷當(dāng)前 a pair (server id, zxid)是否贏得了當(dāng)前選票,總而言之 ,當(dāng)前選票和新選票,哪個(gè)id大就選哪一個(gè) protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { if (LOG.isDebugEnabled()) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); } if (self.getQuorumVerifier().getWeight(newId) == 0) { return false; } /* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); } 判斷是否是Leader,把不是leader的情況拆出來 protected boolean checkLeader(Map<Long, Vote> votes, long leader, long electionEpoch) { boolean predicate = true; /* * If everyone else thinks I'm the leader, I must be the leader. * The other two checks are just for the case in which I'm not the * leader. If I'm not the leader and I haven't received a message * from leader stating that it is leading, then predicate is false. */ if (leader != self.getId()) { if (votes.get(leader) == null) { predicate = false; } else if (votes.get(leader).getState() != ServerState.LEADING) { predicate = false; } } else if (logicalclock.get() != electionEpoch) { predicate = false; } return predicate; }
開始新一輪競(jìng)選工作
public Vote lookForLeader() throws InterruptedException
選舉中涉及的數(shù)據(jù)結(jié)構(gòu)信息類
electionEpoch和peerEpoch區(qū)別
electionEpoch是選舉周期,用于判斷是不是他弄一個(gè)選舉周期,從0開始累計(jì)
peerEpoch是當(dāng)前周期
兩個(gè)vote比較規(guī)則
依次比較peerEpoch,zxid,sid
peerEpoch代表所處周期,越大投票越新
peerEpoch相同時(shí),zxid代表一個(gè)周期中事務(wù)記錄,越大投票越新
peerEpoch,zxid均相同時(shí),sid大的贏取選票
到此,相信大家對(duì)“zk中快速選舉FastLeaderElection的實(shí)現(xiàn)方法”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。