溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

zookeeper的Leader選舉機(jī)制是什么

發(fā)布時間:2023-03-31 16:11:26 來源:億速云 閱讀:184 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容主要講解“zookeeper的Leader選舉機(jī)制是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“zookeeper的Leader選舉機(jī)制是什么”吧!

    zookeeper

    一個分布式服務(wù)框架,主要解決分布式應(yīng)用中常見的多種數(shù)據(jù)問題,例如集群管理,狀態(tài)同步等。為解決這些問題zookeeper需要Leader選舉進(jìn)行保障數(shù)據(jù)的強(qiáng)一致性機(jī)制和穩(wěn)定性。

    01Leader選舉機(jī)制

    Leader選舉機(jī)制采用半數(shù)選舉算法。

    每一個zookeeper服務(wù)端稱之為一個節(jié)點,每個節(jié)點都有投票權(quán),把其選票投向每一個有選舉權(quán)的節(jié)點,當(dāng)其中一個節(jié)點選舉出票數(shù)過半,這個節(jié)點就會成為Leader,其它節(jié)點成為Follower。

    02Leader選舉集群配置

    • 重命名zoo_sample.cfg文件為zoo1.cfg ,zoo2.cfg,zoo3.cfg,zoo4.cfg

    • 修改zoo.cfg文件,修改值如下:

    【plain】
    zoo1.cfg文件內(nèi)容:
    dataDir=/export/data/zookeeper-1
    clientPort=2181
    server.1=127.0.0.1:2001:3001
    server.2=127.0.0.1:2002:3002:participant
    server.3=127.0.0.1:2003:3003:participant
    server.4=127.0.0.1:2004:3004:observer
    zoo2.cfg文件內(nèi)容:
    dataDir=/export/data/zookeeper-2
    clientPort=2182
    server.1=127.0.0.1:2001:3001
    server.2=127.0.0.1:2002:3002:participant
    server.3=127.0.0.1:2003:3003:participant
    server.4=127.0.0.1:2004:3004:observer
    zoo3.cfg文件內(nèi)容:
    dataDir=/export/data/zookeeper-3
    clientPort=2183
    server.1=127.0.0.1:2001:3001
    server.2=127.0.0.1:2002:3002:participant
    server.3=127.0.0.1:2003:3003:participant
    server.4=127.0.0.1:2004:3004:observer
    zoo4.cfg文件內(nèi)容:
    dataDir=/export/data/zookeeper-4
    clientPort=2184
    server.1=127.0.0.1:2001:3001
    server.2=127.0.0.1:2002:3002:participant
    server.3=127.0.0.1:2003:3003:participant
    server.4=127.0.0.1:2004:3004:observer
    • server.第幾號服務(wù)器(對應(yīng)myid文件內(nèi)容)=ip:數(shù)據(jù)同步端口:選舉端口:選舉標(biāo)識

    • participant默認(rèn)參與選舉標(biāo)識,可不寫. observer不參與選舉

    4.在/export/data/zookeeper-1,/export/data/zookeeper-2,/export/data/zookeeper-3,/export/data/zookeeper-4目錄下創(chuàng)建myid文件,文件內(nèi)容分別寫1 ,2,3,4,用于標(biāo)識sid(全稱:Server ID)賦值。

    • 啟動三個zookeeper實例:

    • bin/zkServer.sh start conf/zoo1.cfg

    • bin/zkServer.sh start conf/zoo2.cfg

    • bin/zkServer.sh start conf/zoo3.cfg

    • 每啟動一個實例,都會讀取啟動參數(shù)配置zoo.cfg文件,這樣實例就可以知道其作為服務(wù)端身份信息sid以及集群中有多少個實例參與選舉。

    03Leader選舉流程

    zookeeper的Leader選舉機(jī)制是什么

    圖1 第一輪到第二輪投票流程

    前提:

    設(shè)定票據(jù)數(shù)據(jù)格式vote(sid,zxid,epoch)

    • sid是Server ID每臺服務(wù)的唯一標(biāo)識,是myid文件內(nèi)容;

    • zxid是數(shù)據(jù)事務(wù)id號;

    • epoch為選舉周期,為方便理解下面講解內(nèi)容暫定為1初次選舉,不寫入下面內(nèi)容里。

    按照順序啟動sid=1,sid=2節(jié)點

    第一輪投票:

    • sid=1節(jié)點:初始選票為自己,將選票vote(1,0)發(fā)送給sid=2節(jié)點;

    • sid=2節(jié)點:初始選票為自己,將選票vote(2,0)發(fā)送給sid=1節(jié)點;

    • sid=1節(jié)點:收到sid=2節(jié)點選票vote(2,0)和當(dāng)前自己的選票vote(1,0),首先比對zxid值,zxid越大代表數(shù)據(jù)最新,優(yōu)先選擇zxid最大的選票,如果zxid相同,選舉最大sid。當(dāng)前投票選舉結(jié)果為vote(2,0),sid=1節(jié)點的選票變?yōu)関ote(2,0);

    • sid=2節(jié)點:收到sid=1節(jié)點選票vote(1,0)和當(dāng)前自己的選票vote(2,0),參照上述選舉方式,選舉結(jié)果為vote(2,0),sid=2節(jié)點的選票不變;

    • 第一輪投票選舉結(jié)束。

    第二輪投票:

    • sid=1節(jié)點:當(dāng)前自己的選票為vote(2,0),將選票vote(2,0)發(fā)送給sid=2節(jié)點;

    • sid=2節(jié)點:當(dāng)前自己的選票為vote(2,0),將選票vote(2,0)發(fā)送給sid=1節(jié)點;

    • sid=1節(jié)點:收到sid=2節(jié)點選票vote(2,0)和自己的選票vote(2,0), 按照半數(shù)選舉算法,總共3個節(jié)點參與選舉,已有2個節(jié)點選舉出相同選票,推舉sid=2節(jié)點為Leader,自己角色變?yōu)镕ollower;

    • sid=2節(jié)點:收到sid=1節(jié)點選票vote(2,0)和自己的選票vote(2,0),按照半數(shù)選舉算法推舉sid=2節(jié)點為Leader,自己角色變?yōu)長eader。

    這時啟動sid=3節(jié)點后,集群里已經(jīng)選舉出leader,sid=1和sid=2節(jié)點會將自己的leader選票發(fā)回給sid=3節(jié)點,通過半數(shù)選舉結(jié)果還是sid=2節(jié)點為leader。

    3.1 Leader選舉采用多層隊列架構(gòu)

    zookeeper選舉底層主要分為選舉應(yīng)用層和消息傳輸隊列層,第一層應(yīng)用層隊列統(tǒng)一接收和發(fā)送選票,而第二層傳輸層隊列,是按照服務(wù)端sid分成了多個隊列,是為了避免給每臺服務(wù)端發(fā)送消息互相影響。比如對某臺機(jī)器發(fā)送不成功不會影響正常服務(wù)端的發(fā)送。

    zookeeper的Leader選舉機(jī)制是什么

    圖2 多層隊列上下關(guān)系交互流程圖

    04解析代碼入口類

    通過查看zkServer.sh文件內(nèi)容找到服務(wù)啟動類:

    org.apache.zookeeper.server.quorum.QuorumPeerMain

    05選舉流程代碼解析

    • 加載配置文件QuorumPeerConfig.parse(path);

    針對 Leader選舉關(guān)鍵配置信息如下:

    • 讀取dataDir目錄找到myid文件內(nèi)容,設(shè)置當(dāng)前應(yīng)用sid標(biāo)識,做為投票人身份信息。下面遇到myid變量為當(dāng)前節(jié)點自己sid標(biāo)識。


      • 設(shè)置peerType當(dāng)前應(yīng)用是否參與選舉

    • new QuorumMaj()解析server.前綴加載集群成員信息,加載allMembers所有成員,votingMembers參與選舉成員,observingMembers觀察者成員,設(shè)置half值votingMembers.size()/2.

    【Java】
    public QuorumMaj(Properties props) throws ConfigException {
            for (Entry<Object, Object> entry : props.entrySet()) {
                String key = entry.getKey().toString();
                String value = entry.getValue().toString();
                //讀取集群配置文件中的server.開頭的應(yīng)用實例配置信息
                if (key.startsWith("server.")) {
                    int dot = key.indexOf('.');
                    long sid = Long.parseLong(key.substring(dot + 1));
                    QuorumServer qs = new QuorumServer(sid, value);
                    allMembers.put(Long.valueOf(sid), qs);
                    if (qs.type == LearnerType.PARTICIPANT)
    //應(yīng)用實例綁定的角色為PARTICIPANT意為參與選舉
                        votingMembers.put(Long.valueOf(sid), qs);
                    else {
                        //觀察者成員
                        observingMembers.put(Long.valueOf(sid), qs);
                    }
                } else if (key.equals("version")) {
                    version = Long.parseLong(value, 16);
                }
            }
            //過半基數(shù)
            half = votingMembers.size() / 2;
        }

    QuorumPeerMain.runFromConfig(config) 啟動服務(wù);

    QuorumPeer.startLeaderElection() 開啟選舉服務(wù);

    • 設(shè)置當(dāng)前選票new Vote(sid,zxid,epoch)

    【plain】
    synchronized public void startLeaderElection(){
    try {
               if (getPeerState() == ServerState.LOOKING) {
                   //首輪:當(dāng)前節(jié)點默認(rèn)投票對象為自己
                   currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
               }
           } catch(IOException e) {
               RuntimeException re = new RuntimeException(e.getMessage());
               re.setStackTrace(e.getStackTrace());
               throw re;
           }
    //........
    }
    • 創(chuàng)建選舉管理類:QuorumCnxnManager;

    • 初始化recvQueue<Message(sid,ByteBuffer)>接收投票隊列(第二層傳輸隊列);

    • 初始化queueSendMap<sid,queue>按sid發(fā)送投票隊列(第二層傳輸隊列);

    • 初始化senderWorkerMap<sid,SendWorker>發(fā)送投票工作線程容器,表示著與sid投票節(jié)點已連接;

    • 初始化選舉監(jiān)聽線程類QuorumCnxnManager.Listener。

    【Java】
    //QuorumPeer.createCnxnManager()
    public QuorumCnxManager(QuorumPeer self,
                            final long mySid,
                            Map&lt;Long,QuorumPeer.QuorumServer&gt; view,
                            QuorumAuthServer authServer,
                            QuorumAuthLearner authLearner,
                            int socketTimeout,
                            boolean listenOnAllIPs,
                            int quorumCnxnThreadsSize,
                            boolean quorumSaslAuthEnabled) {
        //接收投票隊列(第二層傳輸隊列)
        this.recvQueue = new ArrayBlockingQueue&lt;Message&gt;(RECV_CAPACITY);
        //按sid發(fā)送投票隊列(第二層傳輸隊列)
        this.queueSendMap = new ConcurrentHashMap&lt;Long, ArrayBlockingQueue&lt;ByteBuffer&gt;&gt;();
        //發(fā)送投票工作線程容器,表示著與sid投票節(jié)點已連接 
        this.senderWorkerMap = new ConcurrentHashMap&lt;Long, SendWorker&gt;();
        this.lastMessageSent = new ConcurrentHashMap&lt;Long, ByteBuffer&gt;();
        String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
        if(cnxToValue != null){
            this.cnxTO = Integer.parseInt(cnxToValue);
        }
        this.self = self;
        this.mySid = mySid;
        this.socketTimeout = socketTimeout;
        this.view = view;
        this.listenOnAllIPs = listenOnAllIPs;
        initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
                quorumSaslAuthEnabled);
        // Starts listener thread that waits for connection requests 
        //創(chuàng)建選舉監(jiān)聽線程 接收選舉投票請求
        listener = new Listener();
        listener.setName("QuorumPeerListener");
    }
    //QuorumPeer.createElectionAlgorithm
    protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;
        //TODO: use a factory rather than a switch
        switch (electionAlgorithm) {
        case 0:
            le = new LeaderElection(this);
            break;
        case 1:
            le = new AuthFastLeaderElection(this);
            break;
        case 2:
            le = new AuthFastLeaderElection(this, true);
            break;
        case 3:
            qcm = createCnxnManager();// new QuorumCnxManager(... new Listener())
            QuorumCnxManager.Listener listener = qcm.listener;
            if(listener != null){
                listener.start();//啟動選舉監(jiān)聽線程
                FastLeaderElection fle = new FastLeaderElection(this, qcm);
                fle.start();
                le = fle;
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            assert false;
        }
    return le;}
    • 開啟選舉監(jiān)聽線程QuorumCnxnManager.Listener;

    • 創(chuàng)建ServerSockket等待大于自己sid節(jié)點連接,連接信息存儲到senderWorkerMap<sid,SendWorker>;

    • sid>self.sid才可以連接過來。

    【Java】
    //上面的listener.start()執(zhí)行后,選擇此方法
    public void run() {
        int numRetries = 0;
        InetSocketAddress addr;
        Socket client = null;
        while((!shutdown) && (numRetries < 3)){
            try {
                ss = new ServerSocket();
                ss.setReuseAddress(true);
                if (self.getQuorumListenOnAllIPs()) {
                    int port = self.getElectionAddress().getPort();
                    addr = new InetSocketAddress(port);
                } else {
                    // Resolve hostname for this server in case the
                    // underlying ip address has changed.
                    self.recreateSocketAddresses(self.getId());
                    addr = self.getElectionAddress();
                }
                LOG.info("My election bind port: " + addr.toString());
                setName(addr.toString());
                ss.bind(addr);
                while (!shutdown) {
                    client = ss.accept();
                    setSockOpts(client);
                    LOG.info("Received connection request "
                            + client.getRemoteSocketAddress());
                    // Receive and handle the connection request
                    // asynchronously if the quorum sasl authentication is
                    // enabled. This is required because sasl server
                    // authentication process may take few seconds to finish,
                    // this may delay next peer connection requests.
                    if (quorumSaslAuthEnabled) {
                        receiveConnectionAsync(client);
                    } else {
    //接收連接信息
                        receiveConnection(client);
                    }
                    numRetries = 0;
                }
            } catch (IOException e) {
                if (shutdown) {
                    break;
                }
                LOG.error("Exception while listening", e);
                numRetries++;
                try {
                    ss.close();
                    Thread.sleep(1000);
                } catch (IOException ie) {
                    LOG.error("Error closing server socket", ie);
                } catch (InterruptedException ie) {
                    LOG.error("Interrupted while sleeping. " +
                        "Ignoring exception", ie);
                }
                closeSocket(client);
            }
        }
        LOG.info("Leaving listener");
        if (!shutdown) {
            LOG.error("As I'm leaving the listener thread, "
                    + "I won't be able to participate in leader "
                    + "election any longer: "
                    + self.getElectionAddress());
        } else if (ss != null) {
            // Clean up for shutdown.
            try {
                ss.close();
            } catch (IOException ie) {
                // Don't log an error for shutdown.
                LOG.debug("Error closing server socket", ie);
            }
        }
    }
    //代碼執(zhí)行路徑:receiveConnection()->handleConnection(...)
    private void handleConnection(Socket sock, DataInputStream din)
                throws IOException {
    //...省略
         if (sid < self.getId()) {
                /*
                 * This replica might still believe that the connection to sid is
                 * up, so we have to shut down the workers before trying to open a
                 * new connection.
                 */
                SendWorker sw = senderWorkerMap.get(sid);
                if (sw != null) {
                    sw.finish();
                }
                /*
                 * Now we start a new connection
                 */
                LOG.debug("Create new connection to server: {}", sid);
                closeSocket(sock);
                if (electionAddr != null) {
                    connectOne(sid, electionAddr);
                } else {
                    connectOne(sid);
                }
            } else { // Otherwise start worker threads to receive data.
                SendWorker sw = new SendWorker(sock, sid);
                RecvWorker rw = new RecvWorker(sock, din, sid, sw);
                sw.setRecv(rw);
                SendWorker vsw = senderWorkerMap.get(sid);
                if (vsw != null) {
                    vsw.finish();
                }
      //存儲連接信息<sid,SendWorker>
                senderWorkerMap.put(sid, sw);
                queueSendMap.putIfAbsent(sid,
                        new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
                sw.start();
                rw.start();
         }
    }
    • 創(chuàng)建FastLeaderElection快速選舉服務(wù);

    • 初始選票發(fā)送隊列sendqueue(第一層隊列)

    • 初始選票接收隊列recvqueue(第一層隊列)

    • 創(chuàng)建線程WorkerSender

    • 創(chuàng)建線程WorkerReceiver

    【Java】
    //FastLeaderElection.starter
    private void starter(QuorumPeer self, QuorumCnxManager manager) {
        this.self = self;
        proposedLeader = -1;
        proposedZxid = -1;
        //發(fā)送隊列sendqueue(第一層隊列)
        sendqueue = new LinkedBlockingQueue<ToSend>();
        //接收隊列recvqueue(第一層隊列)
        recvqueue = new LinkedBlockingQueue<Notification>();
        this.messenger = new Messenger(manager);
    }
    //new Messenger(manager)
    Messenger(QuorumCnxManager manager) {
        //創(chuàng)建線程WorkerSender
        this.ws = new WorkerSender(manager);
        this.wsThread = new Thread(this.ws,
                "WorkerSender[myid=" + self.getId() + "]");
        this.wsThread.setDaemon(true);
        //創(chuàng)建線程WorkerReceiver
        this.wr = new WorkerReceiver(manager);
        this.wrThread = new Thread(this.wr,
                "WorkerReceiver[myid=" + self.getId() + "]");
        this.wrThread.setDaemon(true);
    }
    • 開啟WorkerSender和WorkerReceiver線程。

    WorkerSender線程自旋獲取sendqueue第一層隊列元素

    • sendqueue隊列元素內(nèi)容為相關(guān)選票信息詳見ToSend類;

    • 首先判斷選票sid是否和自己sid值相同,相等直接放入到recvQueue隊列中;

    • 不相同將sendqueue隊列元素轉(zhuǎn)儲到queueSendMap<sid,queue>第二層傳輸隊列中。

    【Java】//FastLeaderElection.Messenger.WorkerSenderclass WorkerSender extends ZooKeeperThread{
    //...
      public void run() {
        while (!stop) {
            try {
                ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                if(m == null) continue;
      //將投票信息發(fā)送出去
                process(m);
            } catch (InterruptedException e) {
                break;
            }
        }
        LOG.info("WorkerSender is down");
      }
    }
    //QuorumCnxManager#toSend
    public void toSend(Long sid, ByteBuffer b) {
        /*
         * If sending message to myself, then simply enqueue it (loopback).
         */
        if (this.mySid == sid) {
             b.position(0);
             addToRecvQueue(new Message(b.duplicate(), sid));
            /*
             * Otherwise send to the corresponding thread to send.
             */
        } else {
             /*
              * Start a new connection if doesn't have one already.
              */
             ArrayBlockingQueue&lt;ByteBuffer&gt; bq = new ArrayBlockingQueue&lt;ByteBuffer&gt;(
                SEND_CAPACITY);
             ArrayBlockingQueue&lt;ByteBuffer&gt; oldq = queueSendMap.putIfAbsent(sid, bq);
             //轉(zhuǎn)儲到queueSendMap&lt;sid,queue&gt;第二層傳輸隊列中
             if (oldq != null) {
                 addToSendQueue(oldq, b);
             } else {
                 addToSendQueue(bq, b);
             }
             connectOne(sid);     
        }
    }

    WorkerReceiver線程自旋獲取recvQueue第二層傳輸隊列元素轉(zhuǎn)存到recvqueue第一層隊列中。

    【Java】
    //WorkerReceiver
    public void run() {
        Message response;
        while (!stop) {
          // Sleeps on receive
          try {
              //自旋獲取recvQueue第二層傳輸隊列元素
              response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
              if(response == null) continue;
              // The current protocol and two previous generations all send at least 28 bytes
              if (response.buffer.capacity() &lt; 28) {
                  LOG.error("Got a short response: " + response.buffer.capacity());
                  continue;
              }
              //...
      if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
             //第二層傳輸隊列元素轉(zhuǎn)存到recvqueue第一層隊列中
             recvqueue.offer(n);
             //...
          }
        }
    //...
    }

    06選舉核心邏輯

    • 啟動線程QuorumPeer

    開始Leader選舉投票makeLEStrategy().lookForLeader();

    sendNotifications()向其它節(jié)點發(fā)送選票信息,選票信息存儲到sendqueue隊列中。sendqueue隊列由WorkerSender線程處理。

    【plain】
    //QuorunPeer.run
    //...
    try {
       reconfigFlagClear();
        if (shuttingDownLE) {
           shuttingDownLE = false;
           startLeaderElection();
           }
        //makeLEStrategy().lookForLeader() 發(fā)送投票
        setCurrentVote(makeLEStrategy().lookForLeader());
    } catch (Exception e) {
        LOG.warn("Unexpected exception", e);
        setPeerState(ServerState.LOOKING);
    }  
    //...
    //FastLeaderElection.lookLeader
    public Vote lookForLeader() throws InterruptedException {
    //...
      //向其他應(yīng)用發(fā)送投票
    sendNotifications();
    //...
    }
    private void sendNotifications() {
        //獲取應(yīng)用節(jié)點
        for (long sid : self.getCurrentAndNextConfigVoters()) {
            QuorumVerifier qv = self.getQuorumVerifier();
            ToSend notmsg = new ToSend(ToSend.mType.notification,
                    proposedLeader,
                    proposedZxid,
                    logicalclock.get(),
                    QuorumPeer.ServerState.LOOKING,
                    sid,
                    proposedEpoch, qv.toString().getBytes());
            if(LOG.isDebugEnabled()){
                LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                      Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +
                      " (n.round), " + sid + " (recipient), " + self.getId() +
                      " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
            }
            //儲存投票信息
            sendqueue.offer(notmsg);
        }
    }
    class WorkerSender extends ZooKeeperThread {
        //...
        public void run() {
        while (!stop) {
            try {
    //提取已儲存的投票信息
                ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                if(m == null) continue;
                process(m);
            } catch (InterruptedException e) {
                break;
            }
        }
        LOG.info("WorkerSender is down");
      }
    //...
    }

    自旋recvqueue隊列元素獲取投票過來的選票信息:

    【Java】
    public Vote lookForLeader() throws InterruptedException {
    //...
    /*
     * Loop in which we exchange notifications until we find a leader
     */
    while ((self.getPeerState() == ServerState.LOOKING) &amp;&amp;
            (!stop)){
        /*
         * Remove next notification from queue, times out after 2 times
         * the termination time
         */
        //提取投遞過來的選票信息
        Notification n = recvqueue.poll(notTimeout,
                TimeUnit.MILLISECONDS);
    /*
     * Sends more notifications if haven't received enough.
     * Otherwise processes new notification.
     */
    if(n == null){
        if(manager.haveDelivered()){
            //已全部連接成功,并且前一輪投票都完成,需要再次發(fā)起投票
            sendNotifications();
        } else {
            //如果未收到選票信息,manager.contentAll()自動連接其它socket節(jié)點
            manager.connectAll();
        }
        /*
         * Exponential backoff
         */
        int tmpTimeOut = notTimeout*2;
        notTimeout = (tmpTimeOut &lt; maxNotificationInterval?
                tmpTimeOut : maxNotificationInterval);
        LOG.info("Notification time out: " + notTimeout);
             }
         //....
        }
      //...
    }
    【Java】
    //manager.connectAll()-&gt;connectOne(sid)-&gt;initiateConnection(...)-&gt;startConnection(...)
    private boolean startConnection(Socket sock, Long sid)
            throws IOException {
        DataOutputStream dout = null;
        DataInputStream din = null;
        try {
            // Use BufferedOutputStream to reduce the number of IP packets. This is
            // important for x-DC scenarios.
            BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
            dout = new DataOutputStream(buf);
            // Sending id and challenge
            // represents protocol version (in other words - message type)
            dout.writeLong(PROTOCOL_VERSION);
            dout.writeLong(self.getId());
            String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort();
            byte[] addr_bytes = addr.getBytes();
            dout.writeInt(addr_bytes.length);
            dout.write(addr_bytes);
            dout.flush();
            din = new DataInputStream(
                    new BufferedInputStream(sock.getInputStream()));
        } catch (IOException e) {
            LOG.warn("Ignoring exception reading or writing challenge: ", e);
            closeSocket(sock);
            return false;
        }
        // authenticate learner
        QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
        if (qps != null) {
            // TODO - investigate why reconfig makes qps null.
            authLearner.authenticate(sock, qps.hostname);
        }
        // If lost the challenge, then drop the new connection
        //保證集群中所有節(jié)點之間只有一個通道連接
        if (sid &gt; self.getId()) {
            LOG.info("Have smaller server identifier, so dropping the " +
                    "connection: (" + sid + ", " + self.getId() + ")");
            closeSocket(sock);
            // Otherwise proceed with the connection
        } else {
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);
            SendWorker vsw = senderWorkerMap.get(sid);
            if(vsw != null)
                vsw.finish();
            senderWorkerMap.put(sid, sw);
            queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue&lt;ByteBuffer&gt;(
                    SEND_CAPACITY));
            sw.start();
            rw.start();
            return true;
        }
        return false;
    }

    如上述代碼中所示,sid>self.sid才可以創(chuàng)建連接Socket和SendWorker,RecvWorker線程,存儲到senderWorkerMap<sid,SendWorker>中。對應(yīng)第2步中的sid<self.sid邏輯,保證集群中所有節(jié)點之間只有一個通道連接。

    zookeeper的Leader選舉機(jī)制是什么

    節(jié)點之間連接方式

    【Java】
    public Vote lookForLeader() throws InterruptedException {
    //...
        if (n.electionEpoch > logicalclock.get()) {
            //當(dāng)前選舉周期小于選票周期,重置recvset選票池
            //大于當(dāng)前周期更新當(dāng)前選票信息,再次發(fā)送投票
            logicalclock.set(n.electionEpoch);
            recvset.clear();
            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                updateProposal(n.leader, n.zxid, n.peerEpoch);
            } else {
                updateProposal(getInitId(),
                        getInitLastLoggedZxid(),
                        getPeerEpoch());
            }
            sendNotifications();
        } else if (n.electionEpoch < logicalclock.get()) {
            if(LOG.isDebugEnabled()){
                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                        + Long.toHexString(n.electionEpoch)
                        + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
            }
            break;
        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                proposedLeader, proposedZxid, proposedEpoch)) {//相同選舉周期
            //接收的選票與當(dāng)前選票PK成功后,替換當(dāng)前選票
            updateProposal(n.leader, n.zxid, n.peerEpoch);
            sendNotifications();
        }
    //...
    }

    在上代碼中,自旋從recvqueue隊列中獲取到選票信息。開始進(jìn)行選舉:

    • 判斷當(dāng)前選票和接收過來的選票周期是否一致

    • 大于當(dāng)前周期更新當(dāng)前選票信息,再次發(fā)送投票

    • 周期相等:當(dāng)前選票信息和接收的選票信息進(jìn)行PK

    【Java】
    //接收的選票與當(dāng)前選票PK
    protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
            LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
                    Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
            if(self.getQuorumVerifier().getWeight(newId) == 0){
                return false;
            }
            /*
             * We return true if one of the following three cases hold:
             * 1- New epoch is higher
             * 2- New epoch is the same as current epoch, but new zxid is higher
             * 3- New epoch is the same as current epoch, new zxid is the same
             *  as current zxid, but server id is higher.
             */
            return ((newEpoch > curEpoch) ||
                    ((newEpoch == curEpoch) &&
                    ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));wId > curId)))));
      }

    在上述代碼中的totalOrderPredicate方法邏輯如下:

    • 競選周期大于當(dāng)前周期為true

    • 競選周期相等,競選zxid大于當(dāng)前zxid為true

    • 競選周期相等,競選zxid等于當(dāng)前zxid,競選sid大于當(dāng)前sid為true

    • 經(jīng)過上述條件判斷為true將當(dāng)前選票信息替換為競選成功的選票,同時再次將新的選票投出去。

    【Java】
    public Vote lookForLeader() throws InterruptedException {
    //...
       //存儲節(jié)點對應(yīng)的選票信息
        // key:選票來源sid  value:選票推舉的Leader sid
        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
        //半數(shù)選舉開始
        if (termPredicate(recvset,
                new Vote(proposedLeader, proposedZxid,
                        logicalclock.get(), proposedEpoch))) {
            // Verify if there is any change in the proposed leader
            while((n = recvqueue.poll(finalizeWait,
                    TimeUnit.MILLISECONDS)) != null){
                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                        proposedLeader, proposedZxid, proposedEpoch)){
                    recvqueue.put(n);
                    break;
                }
            }
            /*WorkerSender
             * This predicate is true once we don't read any new
             * relevant message from the reception queue
             */
            if (n == null) {
                //已選舉出leader 更新當(dāng)前節(jié)點是否為leader 
                self.setPeerState((proposedLeader == self.getId()) ?
                        ServerState.LEADING: learningState());
                Vote endVote = new Vote(proposedLeader,
                        proposedZxid, proposedEpoch);
                leaveInstance(endVote);
                return endVote;
            }
        }
    //...
    }
    /**
         * Termination predicate. Given a set of votes, determines if have
         * sufficient to declare the end of the election round.
         *
         * @param votes
         *            Set of votes
         * @param vote
         *            Identifier of the vote received last  PK后的選票
         */
    private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
        SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
        voteSet.addQuorumVerifier(self.getQuorumVerifier());
        if (self.getLastSeenQuorumVerifier() != null
                && self.getLastSeenQuorumVerifier().getVersion() > self
                        .getQuorumVerifier().getVersion()) {
            voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
        }
        /*
         * First make the views consistent. Sometimes peers will have different
         * zxids for a server depending on timing.
         */
        //votes 來源于recvset 存儲各個節(jié)點推舉出來的選票信息
        for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
    //選舉出的sid和其它節(jié)點選擇的sid相同存儲到voteSet變量中。
            if (vote.equals(entry.getValue())) {
    //保存推舉出來的sid
                voteSet.addAck(entry.getKey());
            }
        }
        //判斷選舉出來的選票數(shù)量是否過半
        return voteSet.hasAllQuorums();
    }
    //QuorumMaj#containsQuorum
    public boolean containsQuorum(Set<Long> ackSet) {
        return (ackSet.size() > half);
       }

    在上述代碼中:recvset是存儲每個sid推舉的選票信息。

    第一輪 sid1:vote(1,0,1) ,sid2:vote(2,0,1);

    第二輪 sid1:vote(2,0,1) ,sid2:vote(2,0,1)。

    最終經(jīng)過選舉信息vote(2,0,1)為推薦leader,并用推薦leader在recvset選票池里比對持相同票數(shù)量為2個。因為總共有3個節(jié)點參與選舉,sid1和sid2都選舉sid2為leader,滿足票數(shù)過半要求,故確認(rèn)sid2為leader。

    • setPeerState更新當(dāng)前節(jié)點角色;

    • proposedLeader選舉出來的sid和自己sid相等,設(shè)置為Leader;

    • 上述條件不相等,設(shè)置為Follower或Observing;

    • 更新currentVote當(dāng)前選票為Leader的選票vote(2,0,1)。

    到此,相信大家對“zookeeper的Leader選舉機(jī)制是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

    向AI問一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI