溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

使用GO實(shí)現(xiàn)Paxos共識(shí)算法的方法

發(fā)布時(shí)間:2020-10-18 15:21:04 來源:腳本之家 閱讀:232 作者:蘋果沒有熟 欄目:開發(fā)技術(shù)

什么是Paxos共識(shí)算法

最初的服務(wù)往往都是通過單體架構(gòu)對(duì)外提供的,即單Server-單Database模式。隨著業(yè)務(wù)的不斷擴(kuò)展,用戶和請(qǐng)求數(shù)都在不斷上升,如何應(yīng)對(duì)大量的請(qǐng)求就成了每個(gè)服務(wù)都需要解決的問題,這也就是我們常說的高并發(fā)。為了解決單臺(tái)服務(wù)器面對(duì)高并發(fā)的蒼白無力,可以通過增加服務(wù)器數(shù)量來解決,即多Server-單Database(Master-Slave)模式,此時(shí)的壓力就來到了數(shù)據(jù)庫一方,數(shù)據(jù)庫的IO效率決定了整個(gè)服務(wù)的效率,繼續(xù)增加Server數(shù)量將無法提升服務(wù)性能。這就衍生出了當(dāng)前火熱的微服務(wù)架構(gòu)。當(dāng)用戶請(qǐng)求經(jīng)由負(fù)載均衡分配到某一服務(wù)實(shí)例上后,如何保證該服務(wù)的其他實(shí)例最終能夠得到相同的數(shù)據(jù)變化呢?這就要用到Paxos分布式共識(shí)協(xié)議,Paxos解決的就是共識(shí)問題,也就是一段時(shí)間后,無論get哪一個(gè)服務(wù)實(shí)例,都能獲取到相同的數(shù)據(jù)。目前國內(nèi)外的分布式產(chǎn)品很多都使用了Paxos協(xié)議,可以說Paxos幾乎就是共識(shí)協(xié)議的標(biāo)準(zhǔn)和代名詞。

Paxos有兩種協(xié)議,我們常常提到的其實(shí)是Basic Paxos,另一種叫Multi Paxos,如無特殊說明,本文中提到的Paxos協(xié)議均為Basic Paxos。

Paxos協(xié)議是由圖靈獎(jiǎng)獲得者Leslie Lamport于1998年在其論文《The Part-Time Parliament》中首次提出的,講述了一個(gè)希臘小島Paxos是如何通過決議的。但由于該論文晦澀艱深,當(dāng)時(shí)的計(jì)算機(jī)界大牛們也沒幾個(gè)人能理解。于是Lamport2001年再次發(fā)表了《Paxos Made Simple》,摘要部分是這么寫的:

The Paxos algorithm, when presented in plain English, is very simple.

翻譯過來就是:不會(huì)吧,不會(huì)吧,這么簡單的Paxos算法不會(huì)真的有人弄不懂吧?然而事實(shí)卻是很多人對(duì)Paxos都望而卻步,理解Paxos其實(shí)并不難,但是Paxos的難點(diǎn)在于工程化,如何利用Paxos協(xié)議寫出一個(gè)能過夠真正在生產(chǎn)環(huán)境中跑起來的服務(wù)才是Paxos最難的地方,關(guān)于Paxos的工程化可以參考微信后臺(tái)團(tuán)隊(duì)撰寫的《微信自研生產(chǎn)級(jí)paxos類庫PhxPaxos實(shí)現(xiàn)原理介紹》

Paxos如何保證一致性的

Paxos協(xié)議一共有兩個(gè)階段:Prepare和Propose,兩種角色:Proposer和Acceptor,每一個(gè)服務(wù)實(shí)例既是Proposer,同時(shí)也是Acceptor,Proposer負(fù)責(zé)提議,Acceptor決定是否接收來自Proposer的提議,一旦提議被多數(shù)接受,那么我們就可以宣稱對(duì)該提議包含的值達(dá)成了一致,而且不會(huì)再改變。

階段一:Prepare 準(zhǔn)備

  • Proposer生成全局唯一ProposalID(時(shí)間戳+ServerID)
  • Proposer向所有Acceptor(包括Proposer自己)發(fā)送Prepare(n = ProposalID)請(qǐng)求
  • Acceptor比較n和minProposal, if n > minProposal, minProposal = n,Acceptor返回已接受的提議(acceptedProposal, acceptedValue)
  • 承諾1:不再接受n <= minProposal的Prepare請(qǐng)求
  • 承諾2:不再接受n < minProposal的Propose請(qǐng)求
  • 應(yīng)答1:返回此前已接受的提議
  • 當(dāng)Proposer收到大于半數(shù)的返回后
  • Prepare請(qǐng)求被拒絕,重新生成ProposalID并發(fā)送Prepare請(qǐng)求
  • Prepare請(qǐng)求被接受且有已接受的提議,選擇最大的ProposalID對(duì)應(yīng)的值作為提議的值
  • Prepare請(qǐng)求被接受且沒有已接受的提議,可選擇任意提議值

    階段二:Propose 提議

  • Proposer向所有Acceptor(包括Proposer自己)發(fā)送Accept(n=ProposalID,value=ProposalValue)請(qǐng)求
  • Acceptor比較n和minProposal, if n >= minProposal, minProposal = n, acceptedValue = value,返回已接受的提議(minProposal,acceptedValue)
  • 當(dāng)Proposer收到大于半數(shù)的返回后
  • Propose請(qǐng)求被拒絕,重新生成ProposalID并發(fā)送Prepare請(qǐng)求
  • Propose請(qǐng)求被接受,則數(shù)據(jù)達(dá)成一致性

一旦提議被半數(shù)以上的服務(wù)接受,那么我們就可以宣稱整個(gè)服務(wù)集群在這一提議上達(dá)成了一致。

需要注意的是,在一個(gè)服務(wù)集群中以上兩個(gè)階段是很有可能同時(shí)發(fā)生的。 例如:實(shí)例A已完成Prepare階段,并發(fā)送了Propose請(qǐng)求。同時(shí)實(shí)例B開始了Prepare階段,并生成了更大的ProposalID發(fā)送Prepare請(qǐng)求,可能導(dǎo)致實(shí)例A的Propose請(qǐng)求被拒絕。 每個(gè)服務(wù)實(shí)例也是同時(shí)在扮演Proposer和Acceptor角色,向其他服務(wù)發(fā)送請(qǐng)求的同時(shí),可能也在處理別的服務(wù)發(fā)來的請(qǐng)求。

使用GO語言實(shí)現(xiàn)Paxos協(xié)議

服務(wù)注冊與發(fā)現(xiàn)

由于每個(gè)服務(wù)實(shí)例都是在執(zhí)行相同的代碼,那我們要如何知曉其他服務(wù)實(shí)例的入口呢(IP和端口號(hào))?方法之一就是寫死在代碼中,或者提供一份配置文件。服務(wù)啟動(dòng)后可以讀取該配置文件。但是這種方法不利于維護(hù),一旦我們需要移除或添加服務(wù)則需要在每個(gè)機(jī)器上重新休息配置文件。

除此之外,我們可以通過一個(gè)第三方服務(wù):服務(wù)的注冊與發(fā)現(xiàn)來注冊并獲知當(dāng)前集群的總服務(wù)實(shí)例數(shù),即將本地的配置文件改為線上的配置服務(wù)。

服務(wù)注冊:Register函數(shù),服務(wù)實(shí)例啟動(dòng)后通過調(diào)用這個(gè)RPC方法將自己注冊在服務(wù)管理中

func (s *Service) Register(args *RegisterArgs, reply *RegisterReply) error {
 s.mu.Lock()
 defer s.mu.Unlock()
 
 server := args.ServerInfo
 for _, server := range s.Servers {
  if server.IPAddress == args.ServerInfo.IPAddress && server.Port == args.ServerInfo.Port {
   reply.Succeed = false
   return nil
  }
 }
 reply.ServerID = len(s.Servers)
 reply.Succeed = true
 s.Servers = append(s.Servers, server)
 
 fmt.Printf("Current registerd servers:\n%v\n", s.Servers)
 
 return nil
}

服務(wù)發(fā)現(xiàn):GetServers函數(shù),服務(wù)通過調(diào)用該RPC方法獲取所有服務(wù)實(shí)例的信息(IP和端口號(hào))

func (s *Service) GetServers(args *GetServersArgs, reply *GetServersReply) error {
 // return all servers
 reply.ServerInfos = s.Servers
 
 return nil
}

Prepare階段

Proposer,向所有的服務(wù)發(fā)送Prepare請(qǐng)求,并等待直到半數(shù)以上的服務(wù)返回結(jié)果,這里也可以等待所有服務(wù)返回后再處理,但是Paxos協(xié)議可以容忍小于半數(shù)的服務(wù)宕機(jī),因此我們只等待大于N/2個(gè)返回即可。當(dāng)返回的結(jié)果有任何一個(gè)請(qǐng)求被拒絕,那Proposer即認(rèn)為這次的請(qǐng)求被拒絕,返回重新生成ProposalID并發(fā)送新一輪的Prepare請(qǐng)求。

func (s *Server) CallPrepare(allServers []ServerInfo, proposal Proposal) PrepareReply {
 returnedReplies := make([]PrepareReply, 0)
 for _, otherS := range allServers {
  // use a go routine to call every server
  go func(otherS ServerInfo) {
   delay := rand.Intn(10)
   time.Sleep(time.Second * time.Duration(delay))
   args := PrepareArgs{s.Info, proposal.ID}
   reply := PrepareReply{}
   fmt.Printf("【Prepare】Call Prepare on %v:%v with proposal id %v\n", otherS.IPAddress, otherS.Port, args.ProposalID)
   if Call(otherS, "Server.Prepare", &args, &reply) {
    if reply.HasAcceptedProposal {
     fmt.Printf("【Prepare】%v:%v returns accepted proposal: %v\n", otherS.IPAddress, otherS.Port, reply.AcceptedProposal)
    } else {
     fmt.Printf("【Prepare】%v:%v returns empty proposal\n", otherS.IPAddress, otherS.Port)
    }
    s.mu.Lock()
    returnedReplies = append(returnedReplies, reply)
    s.mu.Unlock()
   }
  }(otherS)
 }
 for {
  // wait for responses from majority
  if len(returnedReplies) > (len(allServers))/2.0 {
   checkReplies := returnedReplies
   // three possible response
   // 1. deny the prepare, and return an empty/accepted proposal
   // as the proposal id is not higher than minProposalID on server (proposal id <= server.minProposalID)
   // 2. accept the prepare, and return an empty proposal as the server has not accept any proposal yet
   // 3. accept the prepare, and return an accepted proposal
   // check responses from majority
   // find the response with max proposal id
   acceptedProposal := NewProposal()
   for _, r := range checkReplies {
    // if any response refused the prepare, this server should resend prepare
    if !r.PrepareAccepted {
     return r
    }
    if r.HasAcceptedProposal && r.AcceptedProposal.ID > acceptedProposal.ID {
     acceptedProposal = r.AcceptedProposal
    }
   }
   // if some other server has accepted proposal, return that proposal with max proposal id
   // if no other server has accepted proposal, return an empty proposal
   return PrepareReply{HasAcceptedProposal: !acceptedProposal.IsEmpty(), AcceptedProposal: acceptedProposal, PrepareAccepted: true}
  }
  //fmt.Printf("Waiting for response from majority...\n")
  time.Sleep(time.Second * 1)
 }
}

Acceptor,通過比較ProposalID和minProposal,如果ProposalID小于等于minProposal,則拒絕該P(yáng)repare請(qǐng)求,否則更新minProposal為ProposalID。最后返回已接受的提議

func (s *Server) Prepare(args *PrepareArgs, reply *PrepareReply) error {
 s.mu.Lock()
 defer s.mu.Unlock()
 // 2 promises and 1 response
 // Promise 1
 // do not accept prepare request which ProposalID <= minProposalID
 // Promise 2
 // do not accept propose request which ProposalID < minProposalID
 // Response 1
 // respond with accepted proposal if any
 if reply.PrepareAccepted = args.ProposalID > s.minProposalID; reply.PrepareAccepted {
  // ready to accept the proposal with Id s.minProposalID
  s.minProposalID = args.ProposalID
 }
 reply.HasAcceptedProposal = s.readAcceptedProposal()
 reply.AcceptedProposal = s.Proposal
 return nil
}

Propose階段

Proposer,同樣首先向所有的服務(wù)發(fā)送Propose請(qǐng)求,并等待知道半數(shù)以上的服務(wù)返回結(jié)果。如果返回的結(jié)果有任何一個(gè)請(qǐng)求被拒絕,則Proposer認(rèn)為這次的請(qǐng)求被拒絕,返回重新生成ProposalID并發(fā)送新一輪的Prepare請(qǐng)求

func (s *Server) CallPropose(allServers []ServerInfo, proposal Proposal) ProposeReply {
 returnedReplies := make([]ProposeReply, 0)
 for _, otherS := range allServers {
  go func(otherS ServerInfo) {
   delay := rand.Intn(5000)
   time.Sleep(time.Millisecond * time.Duration(delay))
   args := ProposeArgs{otherS, proposal}
   reply := ProposeReply{}
   fmt.Printf("【Propose】Call Propose on %v:%v with proposal: %v\n", otherS.IPAddress, otherS.Port, args.Proposal)
   if Call(otherS, "Server.Propose", &args, &reply) {
    fmt.Printf("【Propose】%v:%v returns: %v\n", otherS.IPAddress, otherS.Port, reply)
    s.mu.Lock()
    returnedReplies = append(returnedReplies, reply)
    s.mu.Unlock()
   }
  }(otherS)
 }
 for {
  // wait for responses from majority
  if len(returnedReplies) > (len(allServers))/2.0 {
   checkReplies := returnedReplies
   for _, r := range checkReplies {
    if !r.ProposeAccepted {
     return r
    }
   }
   return checkReplies[0]
  }
  time.Sleep(time.Second * 1)
 }
}

Acceptor,通過比較ProposalID和minProposal,如果ProposalID小于minProposal,則拒絕該P(yáng)ropose請(qǐng)求,否則更新minProposal為ProposalID,并將提議持久化到本地磁盤中。

func (s *Server) Propose(args *ProposeArgs, reply *ProposeReply) error {
 if s.minProposalID <= args.Proposal.ID {
  s.mu.Lock()
  s.minProposalID = args.Proposal.ID
  s.Proposal = args.Proposal
  s.SaveAcceptedProposal()
  s.mu.Unlock()
 
  reply.ProposeAccepted = true
 }
 
 reply.ProposalID = s.minProposalID
 
 return nil
}

運(yùn)行

運(yùn)行結(jié)果:

這里我一共開啟了3個(gè)服務(wù)實(shí)例,并在每次請(qǐng)求之前加入了隨機(jī)的延遲,模擬網(wǎng)絡(luò)通信中的延遲,因此每個(gè)服務(wù)的每個(gè)請(qǐng)求并不是同時(shí)發(fā)出的

動(dòng)圖一張:

使用GO實(shí)現(xiàn)Paxos共識(shí)算法的方法

靜態(tài)結(jié)果一張:

使用GO實(shí)現(xiàn)Paxos共識(shí)算法的方法

可以看到3個(gè)服務(wù)盡管一開始會(huì)嘗試以他們自己的端口號(hào)(5001,5002,5003)作為提議值,在Prepare/Propose失敗后,都會(huì)重新生成更大的ProposalID并開啟新一輪的提議過程(Prepare,Propose),且最后都以5003達(dá)成一致。

小結(jié)

至此,我們就用GO實(shí)現(xiàn)了Paxos協(xié)議的核心邏輯。但顯而易見的是,這段代碼仍然存在很多問題,完全無法滿足生產(chǎn)環(huán)境的需求

  • 通過channel而不是mutex鎖來共享數(shù)據(jù)
  • 如何處理服務(wù)實(shí)例的移除和增加
  • 如何避免陷入活鎖

到此這篇關(guān)于使用GO實(shí)現(xiàn)Paxos共識(shí)算法的文章就介紹到這了,更多相關(guān)GO實(shí)現(xiàn)Paxos共識(shí)算法內(nèi)容請(qǐng)搜索億速云以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持億速云!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI