溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

zookeeper ZAB協(xié)議的詳細介紹

發(fā)布時間:2020-06-17 14:50:05 來源:億速云 閱讀:396 作者:元一 欄目:編程語言

ZAB概念

Zookeeper 使用 Zookeeper Atomic Broadcast (ZAB) 協(xié)議來保障分布式數據一致性。

ZAB是一種支持崩潰恢復的消息廣播協(xié)議,采用類似2PC的廣播模式保證正常運行時性能,并使用基于 Paxos 的策略保證崩潰恢復時的一致性。

有的Follower服務器分發(fā)Commit消息,要求其將前一個Proposal進行提交。

ZAB一些包括兩種基本的模式:崩潰恢復和消息廣播。

1、當整個服務框架啟動過程中或Leader服務器出現(xiàn)網絡中斷、崩潰退出與重啟等異常情況時,ZAB協(xié)議就會進入恢復模式并選舉產生新的Leader服務器。當選舉產生了新的Leader服務器,同時集群中已經有過半的機器與該Leader服務器完成了狀態(tài)同步之后,ZAB協(xié)議就會退出恢復模式,狀態(tài)同步是指數據同步,用來保證集群在過半的機器能夠和Leader服務器的數據狀態(tài)保持一致。

2、當集群中已經有過半的Follower服務器完成了和Leader服務器的狀態(tài)同步,那么整個服務框架就可以進入消息廣播模式,當一臺同樣遵守ZAB協(xié)議的服務器啟動后加入到集群中,如果此時集群中已經存在一個Leader服務器在負責進行消息廣播,那么加入的服務器就會自覺地進入數據恢復模式:找到Leader所在的服務器,并與其進行數據同步,然后一起參與到消息廣播流程中去。Zookeeper只允許唯一的一個Leader服務器來進行事務請求的處理,Leader服務器在接收到客戶端的事務請求后,會生成對應的事務提議并發(fā)起一輪廣播協(xié)議,而如果集群中的其他機器收到客戶端的事務請求后,那么這些非Leader服務器會首先將這個事務請求轉發(fā)給Leader服務器。

3、當Leader服務器出現(xiàn)崩潰或者機器重啟、集群中已經不存在過半的服務器與Leader服務器保持正常通信時,那么在重新開始新的一輪的原子廣播事務操作之前,所有進程首先會使用崩潰恢復協(xié)議來使彼此到達一致狀態(tài),于是整個ZAB流程就會從消息廣播模式進入到崩潰恢復模式。一個機器要成為新的Leader,必須獲得過半機器的支持,同時由于每個機器都有可能會崩潰,因此,ZAB協(xié)議運行過程中,前后會出現(xiàn)多個Leader,并且每臺機器也有可能會多次成為Leader,進入崩潰恢復模式后,只要集群中存在過半的服務器能夠彼此進行正常通信,那么就可以產生一個新的Leader并再次進入消息廣播模式。如一個由三臺機器組成的ZAB服務,通常由一個Leader、2個Follower服務器組成,某一個時刻,加入其中一個Follower掛了,整個ZAB集群是不會中斷服務的。

ZAB協(xié)議中節(jié)點存在四種狀態(tài):

  • Leading: 當前節(jié)點為集群 Leader,負責協(xié)調事務

  • Following: 當前節(jié)點為 Follower 在 Leader 協(xié)調下執(zhí)行事務

  • Looking: 集群沒有正在運行的 Leader, 正處于選舉過程

  • Observing: 節(jié)點跟隨 Leader 保存系統(tǒng)最新的狀態(tài)提供讀服務,但不參與選舉和事務投票

① 消息廣播

Zab協(xié)議消息廣播有以下4個步驟組成:

  1. Leader發(fā)送PROPOSAL給集群中所有的節(jié)點。
  2. 節(jié)點在收到PROPOSAL之后,把PROPOSAL落盤,發(fā)送一個ACK給Leader。
  3. Leader在收到大多數節(jié)點的ACK之后,發(fā)送COMMIT給集群中所有的Follower節(jié)點。
  4. 如果存在Observer節(jié)點,Leader同時發(fā)送INFORM信息給Observer服務節(jié)點同步數據,Observer只接收Leader的INFORM消息同步數據,不參與Leader選舉和事務提交。

zookeeper ZAB協(xié)議的詳細介紹

② 崩潰恢復

在Leader服務器出現(xiàn)崩潰,或者由于網絡原因導致Leader服務器失去了與過半Follower的聯(lián)系,那么就會進入崩潰恢復模式,在ZAB協(xié)議中,為了保證程序的正確運行,整個恢復過程結束后需要選舉出一個新的Leader服務器,因此,ZAB協(xié)議需要一個高效且可靠的Leader選舉算法,從而保證能夠快速地選舉出新的Leader,同時,Leader選舉算法不僅僅需要讓Leader自身知道已經被選舉為Leader,同時還需要讓集群中的所有其他機器也能夠快速地感知到選舉產生的新的Leader服務器。

③ ZAB基本特性

ZAB協(xié)議的基本原則

3.1、ZAB協(xié)議需要確保那些已經在Leader服務器上提交的事務最終被所有服務器都提交

假設一個事務在Leader服務器上被提交了,并且已經得到了過半Follower服務器的Ack反饋,但是在它Commit消息發(fā)送給所有Follower機器之前,Leader服務掛了。如下圖所示:

zookeeper ZAB協(xié)議的詳細介紹

在集群正常運行過程中的某一個時刻,Server1是Leader服務器,其先后廣播了P1、P2、C1、P3、C2(C2是Commit Of Proposal2的縮寫),其中,當Leader服務器發(fā)出C2后就立即崩潰退出了,針對這種情況,ZAB協(xié)議就需要確保事務Proposal2最終能夠在所有的服務器上都被提交成功,否則將出現(xiàn)不一致。

3.2、ZAB協(xié)議需要確保丟棄那些只在Leader服務器上被提出的事務。

如果在崩潰恢復過程中出現(xiàn)一個需要被丟棄的提議,那么在崩潰恢復結束后需要跳過該事務Proposal,如下圖所示:

zookeeper ZAB協(xié)議的詳細介紹

假設初始的Leader服務器Server1在提出一個事務Proposal3之后就崩潰退出了,從而導致集群中的其他服務器都沒有收到這個事務Proposal,于是,當Server1恢復過來再次加入到集群中的時候,ZAB協(xié)議需要確保丟棄Proposal3這個事務。

3.3、ZAB協(xié)議必須的Leader選舉算法

能夠確保提交已經被Leader提交的事務的Proposal,同時丟棄已經被跳過的事務Proposal。如果讓Leader選舉算法能夠保證新選舉出來的Leader服務器擁有集群中所有機器最高編號(ZXID最大)的事務Proposal,那么就可以保證這個新選舉出來的Leader一定具有所有已經提交的提議,更為重要的是如果讓具有最高編號事務的Proposal機器稱為Leader,就可以省去Leader服務器查詢Proposal的提交和丟棄工作這一步驟了。

3.4、數據同步,一致性

完成Leader選舉后,在正式開始工作前,Leader服務器首先會確認日志中的所有Proposal是否都已經被集群中的過半機器提交了,即是否完成了數據同步。Leader服務器需要確所有的Follower服務器都能夠接收到每一條事務Proposal,并且能夠正確地將所有已經提交了的事務Proposal應用到內存數據庫中。Leader服務器會為每個Follower服務器維護一個隊列,并將那些沒有被各Follower服務器同步的事務以Proposal消息的形式逐個發(fā)送給Follower服務器,并在每一個Proposal消息后面緊接著再發(fā)送一個Commit消息,以表示該事務已經被提交,等到Follower服務器將所有其尚未同步的事務Proposal都從Leader服務器上同步過來并成功應用到本地數據庫后,Leader服務器就會將該Follower服務器加入到真正的可用Follower列表并開始之后的其他流程。

④ ZAB總結

1、 發(fā)現(xiàn),選舉產生Leader,產生最新的epoch(每次選舉產生新Leader的同時產生新epoch)。

2、 同步,各Follower和Leader完成數據同步。

3、廣播,Leader處理客戶端的寫操作,并將狀態(tài)變更廣播至Follower,F(xiàn)ollower多數通過之后Leader發(fā)起將狀態(tài)變更落地Commit。

在正常運行過程中,ZAB協(xié)議會一直運行于階段三來反復進行消息廣播流程,如果出現(xiàn)崩潰或其他原因導致Leader缺失,那么此時ZAB協(xié)議會再次進入發(fā)現(xiàn)階段,選舉新的Leader。

源碼分析

1、Leader發(fā)送PROPOSAL

ProposalRequestProcessor.proce***equest()方法發(fā)送PROPOSAL 給每一個節(jié)點。它調用Leader.propose()方法把PROPOSAL
入隊到各個follower的queuedPackets,然后直接把PROPOSAL提交給leader節(jié)點自己的SyncRequestProcessor 。

以下是大概的代碼路徑:

ProposalRequestProcessor.proce***equest(request)
  zks.getLeader().propose(request)
        sendPacket(pp)
            for f in forwardingFollowers
                f.queuePacket(qp) 
                    queuedPackets.add(p)
  syncProcessor.proce***equest(request)

2、Leader處理PROPOSAL

SyncRequestProcessor先處理

SyncRequestProcessor.run() 
    zks.getZKDatabase().append(si) 
    flush(toFlush)
        zks.getZKDatabase().commit() 
            while (!toFlush.isEmpty())
                Request i = toFlush.remove()
                if (nextProcessor != null)
                    nextProcessor.proce***equest(i)

然后是Leader的ACK處理器處理,返回給Leader自己ACK結果

AckRequestProcessor.proce***equest()
    proce***equest()
        leader.processAck(self.getId(), request.zxid, null)

zookeeper ZAB協(xié)議的詳細介紹

3、Follower處理PROPOSAL

Follower. followLeader()方法處理接收到的QuorumPacket, case Leader.PROPOSAL分支處理的就是PROPOSAL。

Follower.followLeader() 
    loop
    readPacket(qp)
      leaderIs.readRecord(pp, "packet")
        processPacket(qp) 
            case Leader.PROPOSAL
                Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr)
                fzk.logRequest(hdr, txn)
                    syncProcessor.proce***equest(request) 
            case Leader.COMMIT:
                    fzk.commit(qp.getZxid())
                        commitProcessor.commit(request)

SyncRequestProcessor的處理邏輯

SyncRequestProcessor.run() 
    zks.getZKDatabase().append(si) 
    flush(toFlush)
        zks.getZKDatabase().commit()
        while (!toFlush.isEmpty())
            Request i = toFlush.remove() 
            if (nextProcessor != null)
                nextProcessor.proce***equest(i)
                    QuorumPacket qp = new QuorumPacket(Leader.ACK) 
                    learner.writePacket(qp, false)
                         leaderOs.writeRecord(pp, "packet")
         ((Flushable)nextProcessor).flush()
                learner.writePacket(null, true) 
                    bufferedOutput.flush()

zookeeper ZAB協(xié)議的詳細介紹

4、Leader的ACK處理

Leader的processAck()處理ACK消息,如果收到大多數節(jié)點的ACK,發(fā)送COMMIT給所有的follower節(jié)點,并調用leader自己 的CommitProcessor。 processAck()有兩個調用入口:1. LeaderHandler的run()方法處理來自follower的ACK。2. AckRequestProcessor的proce***equest方法處理leader自己的ACK。

Leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()) 
    Proposal p = outstandingProposals.get(zxid)
    p.addAck(sid)
    tryToCommit(p, zxid, followerAddr)
        if !p.hasAllQuorums() 
            return false;
        // Commit on all followers
        commit(zxid)
            QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null)
            sendPacket(qp)
        // Commit on Leader 
        zk.commitProcessor.commit(p.request)

5、Leader的COMMIT處理

CommitProcessor.run()
    request = queuedRequests.poll() 
    processCommitted()
        sendToNextProcessor(pending)

已經提交的請求,交給ToBeAppliedRequestProcessor準備應用到內存數據庫

ToBeAppliedRequestProcessor.proce***equest()
    next.proce***equest(request)

最后交給FinalRequestProcessor,返回響應結果

zookeeper ZAB協(xié)議的詳細介紹

6、Follower的COMMIT處理

CommitProcessor.run()
    request = queuedRequests.poll() 
    processCommitted()
        sendToNextProcessor(pending) 
//返回響應結果        
FinalRequestProcessor.proce***equest()

zookeeper ZAB協(xié)議的詳細介紹

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI