溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

zk中快速選舉FastLeaderElection的實(shí)現(xiàn)方法

發(fā)布時(shí)間:2021-06-30 15:34:19 來源:億速云 閱讀:130 作者:chen 欄目:大數(shù)據(jù)

本篇內(nèi)容主要講解“zk中快速選舉FastLeaderElection的實(shí)現(xiàn)方法”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“zk中快速選舉FastLeaderElection的實(shí)現(xiàn)方法”吧!

選舉涉及概念

服務(wù)器狀態(tài)

投票

如何選擇投票?

協(xié)議

選舉

如何進(jìn)行選舉?

epoch

發(fā)送者

接收者

發(fā)送隊(duì)列

接收隊(duì)列

服務(wù)器狀態(tài)

public enum ServerState {

LOOKING,尋找Leader狀態(tài),當(dāng)服務(wù)處于該狀態(tài)時(shí)當(dāng)前集群中沒有Leader,因此需要進(jìn)入Leader選舉

FOLLOWING,跟隨者狀態(tài),表示當(dāng)前是Follower

LEADING,領(lǐng)導(dǎo)者狀態(tài),表明當(dāng)前是Leader

OBSERVING ,觀察者

}

Vote投票

id

被推薦的leader的sid

zxid

被推薦leader的事務(wù)id

electionEpoch

判斷多個(gè)投票是否在同一輪選舉周期中,在服務(wù)器是一個(gè)字增序列,進(jìn)入新一輪投票后,都對(duì)該值進(jìn)行加1

peerEpoch

被推薦的leader的epoch

state

當(dāng)前服務(wù)器狀態(tài)

內(nèi)部類

zk中快速選舉FastLeaderElection的實(shí)現(xiàn)方法

有Messenger ToSend Notification類

Notifications讓其他節(jié)點(diǎn)知道指定節(jié)點(diǎn)的投票發(fā)生了變化,可能是由于節(jié)點(diǎn)競(jìng)選或投票中有更高zxid或相同的zxid有更高的serverid

ToSend類用于包裝發(fā)送的信息

zk中快速選舉FastLeaderElection的實(shí)現(xiàn)方法

Messenger分為

WorkerReceiver和WorkerSender

主要完成這兩個(gè)對(duì)象信息的設(shè)置

LinkedBlockingQueue<ToSend> sendqueue;

LinkedBlockingQueue<Notification> recvqueue;

public Vote lookForLeader() throws InterruptedException {
    try {
        self.jmxLeaderElectionBean = new LeaderElectionBean();
        MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        self.jmxLeaderElectionBean = null;
    }

    self.start_fle = Time.currentElapsedTime();
    try {
        Map<Long, Vote> recvset = new HashMap<Long, Vote>();

        Map<Long, Vote> outofelection = new HashMap<Long, Vote>();

        int notTimeout = minNotificationInterval;

        synchronized (this) {
            logicalclock.incrementAndGet();
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }

        LOG.info("New election. My id =  " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid));
        sendNotifications();

        SyncedLearnerTracker voteSet;

        /*
         * Loop in which we exchange notifications until we find a leader
         */

        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
            /*
             * Remove next notification from queue, times out after 2 times
             * the termination time
             */
            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

            /*
             * Sends more notifications if haven't received enough.
             * Otherwise processes new notification.
             */
            if (n == null) {
                if (manager.haveDelivered()) {
                    sendNotifications();
                } else {
                    manager.connectAll();
                }

                /*
                 * Exponential backoff
                 */
                int tmpTimeOut = notTimeout * 2;
                notTimeout = (tmpTimeOut < maxNotificationInterval ? tmpTimeOut : maxNotificationInterval);
                LOG.info("Notification time out: " + notTimeout);
            } else if (validVoter(n.sid) && validVoter(n.leader)) {
                /*
                 * Only proceed if the vote comes from a replica in the current or next
                 * voting view for a replica in the current or next voting view.
                 */
                switch (n.state) {
                case LOOKING:
                    if (getInitLastLoggedZxid() == -1) {
                        LOG.debug("Ignoring notification as our zxid is -1");
                        break;
                    }
                    if (n.zxid == -1) {
                        LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                        break;
                    }
                    // If notification > current, replace and send messages out
                    if (n.electionEpoch > logicalclock.get()) {
                        logicalclock.set(n.electionEpoch);
                        recvset.clear();
                        if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                        }
                        sendNotifications();
                    } else if (n.electionEpoch < logicalclock.get()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(
                                "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch)
                                + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                        }
                        break;
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Adding vote: from=" + n.sid
                                  + ", proposed leader=" + n.leader
                                  + ", proposed zxid=0x" + Long.toHexString(n.zxid)
                                  + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                    }

                    // don't care about the version if it's in LOOKING state
                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                    voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));

                    if (voteSet.hasAllQuorums()) {

                        // Verify if there is any change in the proposed leader
                        while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                recvqueue.put(n);
                                break;
                            }
                        }

                        /*
                         * This predicate is true once we don't read any new
                         * relevant message from the reception queue
                         */
                        if (n == null) {
                            setPeerState(proposedLeader, voteSet);
                            Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                case OBSERVING:
                    LOG.debug("Notification from observer: {}", n.sid);
                    break;
                case FOLLOWING:
                case LEADING:
                    /*
                     * Consider all notifications from the same epoch
                     * together.
                     */
                    if (n.electionEpoch == logicalclock.get()) {
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                        voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                        if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                            setPeerState(n.leader, voteSet);
                            Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }

                    /*
                     * Before joining an established ensemble, verify that
                     * a majority are following the same leader.
                     */
                    outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                    voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));

                    if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                        synchronized (this) {
                            logicalclock.set(n.electionEpoch);
                            setPeerState(n.leader, voteSet);
                        }
                        Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }
                    break;
                default:
                    LOG.warn("Notification state unrecoginized: " + n.state + " (n.state), " + n.sid + " (n.sid)");
                    break;
                }
            } else {
                if (!validVoter(n.leader)) {
                    LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                }
                if (!validVoter(n.sid)) {
                    LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                }
            }
        }
        return null;
    } finally {
        try {
            if (self.jmxLeaderElectionBean != null) {
                MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
            }
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        self.jmxLeaderElectionBean = null;
        LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
    }
}


投票相關(guān)函數(shù)

更新投票字段

synchronized void updateProposal(long leader, long zxid, long epoch) {
    if (LOG.isDebugEnabled()) {
        LOG.debug("Updating proposal: " + leader
                  + " (newleader), 0x" + Long.toHexString(zxid)
                  + " (newzxid), " + proposedLeader
                  + " (oldleader), 0x" + Long.toHexString(proposedZxid)
                  + " (oldzxid)");
    }
    proposedLeader = leader;
    proposedZxid = zxid;
    proposedEpoch = epoch;
}

生成投票的函數(shù)
public synchronized Vote getVote() {
    return new Vote(proposedLeader, proposedZxid, proposedEpoch);
}


狀態(tài)信息獲取函數(shù)
private ServerState learningState() {
    if (self.getLearnerType() == LearnerType.PARTICIPANT) {
        LOG.debug("I am a participant: {}", self.getId());
        return ServerState.FOLLOWING;
    } else {
        LOG.debug("I am an observer: {}", self.getId());
        return ServerState.OBSERVING;
    }
}


獲取參與投票服務(wù)器的標(biāo)識(shí)id
private long getInitId() {
    if (self.getQuorumVerifier().getVotingMembers().containsKey(self.getId())) {
        return self.getId();
    } else {
        return Long.MIN_VALUE;
    }
}

獲取最新的日志事務(wù)id
private long getInitLastLoggedZxid() {
    if (self.getLearnerType() == LearnerType.PARTICIPANT) {
        return self.getLastLoggedZxid();
    } else {
        return Long.MIN_VALUE;
    }
}



獲取保存在文件中當(dāng)前epoch
public long getCurrentEpoch() throws IOException {
    if (currentEpoch == -1) {
        currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
    }
    return currentEpoch;
}



選舉相關(guān)函數(shù)


判斷當(dāng)前 a pair (server id, zxid)是否贏得了當(dāng)前選票,總而言之 ,當(dāng)前選票和新選票,哪個(gè)id大就選哪一個(gè)

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
    if (LOG.isDebugEnabled()) {
        LOG.debug("id: " + newId
                  + ", proposed id: " + curId
                  + ", zxid: 0x" + Long.toHexString(newZxid)
                  + ", proposed zxid: 0x" + Long.toHexString(curZxid));
    }

    if (self.getQuorumVerifier().getWeight(newId) == 0) {
        return false;
    }

    /*
     * We return true if one of the following three cases hold:
     * 1- New epoch is higher
     * 2- New epoch is the same as current epoch, but new zxid is higher
     * 3- New epoch is the same as current epoch, new zxid is the same
     *  as current zxid, but server id is higher.
     */

    return ((newEpoch > curEpoch)
            || ((newEpoch == curEpoch)
                && ((newZxid > curZxid)
                    || ((newZxid == curZxid)
                        && (newId > curId)))));
}


判斷是否是Leader,把不是leader的情況拆出來
protected boolean checkLeader(Map<Long, Vote> votes, long leader, long electionEpoch) {

    boolean predicate = true;

    /*
     * If everyone else thinks I'm the leader, I must be the leader.
     * The other two checks are just for the case in which I'm not the
     * leader. If I'm not the leader and I haven't received a message
     * from leader stating that it is leading, then predicate is false.
     */

    if (leader != self.getId()) {
        if (votes.get(leader) == null) {
            predicate = false;
        } else if (votes.get(leader).getState() != ServerState.LEADING) {
            predicate = false;
        }
    } else if (logicalclock.get() != electionEpoch) {
        predicate = false;
    }

    return predicate;
}

開始新一輪競(jìng)選工作

public Vote lookForLeader() throws InterruptedException

zk中快速選舉FastLeaderElection的實(shí)現(xiàn)方法

選舉中涉及的數(shù)據(jù)結(jié)構(gòu)信息類

zk中快速選舉FastLeaderElection的實(shí)現(xiàn)方法

electionEpoch和peerEpoch區(qū)別

electionEpoch是選舉周期,用于判斷是不是他弄一個(gè)選舉周期,從0開始累計(jì)

peerEpoch是當(dāng)前周期

兩個(gè)vote比較規(guī)則

依次比較peerEpoch,zxid,sid

peerEpoch代表所處周期,越大投票越新

peerEpoch相同時(shí),zxid代表一個(gè)周期中事務(wù)記錄,越大投票越新

peerEpoch,zxid均相同時(shí),sid大的贏取選票

到此,相信大家對(duì)“zk中快速選舉FastLeaderElection的實(shí)現(xiàn)方法”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

zk
AI