您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關(guān)python中怎么利用Paxos實現(xiàn)分布式系統(tǒng),可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
一致性算法解決的問題:分布式系統(tǒng)中數(shù)據(jù)不能存在單個節(jié)點(主機)上,否則可能出現(xiàn)單點故障;多個節(jié)點(主機)需要保證具有相同的數(shù)據(jù)。
什么是一致性:一致性就是數(shù)據(jù)保持一致,在分布式系統(tǒng)中,可以理解為多個節(jié)點中數(shù)據(jù)的值是一致的。
一致性模型分類:一般分為強一致性和弱一致性,強一致性保證系統(tǒng)改變提交以后立即改變集群的狀態(tài)。常見模型包括:Paxos,Raft(muti-paxos),ZAB(muti-paxos); 弱一致性也叫最終一致性,系統(tǒng)不保證改變提交以后立即改變集群的狀態(tài),但是隨著時間的推移最終狀態(tài)一致的。常見模型包括:DNS系統(tǒng),Gossip協(xié)議
一致性算法使用案例: Google的Chubby分布式鎖服務,采用了Paxos算法;etcd分布式鍵值數(shù)據(jù)庫,采用了Raft算法;ZooKeeper分布式應用協(xié)調(diào)服務以及Chubby的開源實現(xiàn),采用ZAB算法
simple-paxos就單個靜態(tài)值達一致性本身并不實用,我們需要實現(xiàn)的集群系統(tǒng)(銀行賬戶服務)希望就隨時間變化的特定狀態(tài)(賬戶余額)達成一致。所以需要使用Paxos就每個操作達成一致,將每個修改視為狀態(tài)機轉(zhuǎn)換。
Multi-Paxos實際上是simple Paxos實例(插槽)的序列,每個實例都按順序編號。每個狀態(tài)轉(zhuǎn)換都被賦予一個“插槽編號”,集群的每個成員都以嚴格的數(shù)字順序執(zhí)行轉(zhuǎn)換。為了更改群集的狀態(tài)(例如,處理傳輸操作),我們嘗試在下一個插槽中就該操作達成一致性。具體來說,這意味著向每個消息添加一個插槽編號,并在每個插槽的基礎(chǔ)上跟蹤所有協(xié)議狀態(tài)。
為每個插槽運行Paxos,至少兩次往返會太慢。Multi-Paxos通過對所有插槽使用相同的選票編號集進行優(yōu)化,并同時對所有插槽執(zhí)行Prepare/Promise。
Client Proposer Acceptor Learner | | | | | | | --- First Request --- X-------->| | | | | | Request | X--------->|->|->| | | Prepare(N) | |<---------X--X--X | | Promise(N,I,{Va,Vb,Vc}) | X--------->|->|->| | | Accept!(N,I,V) | |<---------X--X--X------>|->| Accepted(N,I,V) |<---------------------------------X--X Response | | | | | | |
在實用軟件中實現(xiàn)Multi-Paxos是出了名的困難,催生了許多論文如"Paxos Made Simple",“Paxos Made Practical”
首先,multi-poposer在繁忙的環(huán)境中可能會成為問題,因為每個群集成員都試圖在每個插槽中決定其狀態(tài)機操作。解決方法是選舉一名“l(fā)eader”,負責為每個時段提交選票。所有其他群集節(jié)點將新操作發(fā)送到領(lǐng)導者執(zhí)行。因此,在只有一名領(lǐng)導人的正常運作中,不會發(fā)生投票沖突。
Prepare/Promise階段可以作為一種leader選舉:無論哪個集群成員擁有最近承諾的選票號碼,都被視為leader。leader后續(xù)可以自由地直接執(zhí)行Accept/Accepted階段,而不重復第一階段。我們將在下文看到的,leader選舉實際上是相當復雜的。
雖然simple Paxos保證集群不會達成沖突的決定,但它不能保證會做出任何決定。例如,如果初始的Prepare消息丟失,并且沒有到達接受者,則提議者將等待永遠不會到達的Promise消息。解決這個問題需要精心設計的重新傳輸:足以最終取得進展,但不會群集產(chǎn)生數(shù)據(jù)包風暴。
另一個問題是決定的傳播。在正常情況下,簡單地廣播Decision信息就可以解決這個問題。但是,如果消息丟失,節(jié)點可能會永遠不知道該決定,并且無法為以后的插槽應用狀態(tài)機轉(zhuǎn)換。所以實現(xiàn)需要一些機制來共享有關(guān)已決定提案的信息。
使用分布式狀態(tài)機帶來了另一個挑戰(zhàn):當新節(jié)點啟動時,它需要獲取群集的現(xiàn)有狀態(tài)。
雖然可以通過趕上第一個插槽以來的所有插槽的決策來做到這一點,但在一個大的集群中,這可能涉及數(shù)百萬個插槽。此外,我們需要一些方法來初始化一個新的群集。
前面都是理論介紹,下面我們使用python來實現(xiàn)一個簡化的Multi-Paxos
我們以簡單的銀行賬戶管理服務的場景作為案例。在這個服務中,每一個賬戶都有一個當前余額,同時每個賬戶都有自己的賬號。用戶可以對賬戶進行“存款”、“轉(zhuǎn)賬”、“查詢當前余額”等操作?!稗D(zhuǎn)賬”操作同時涉及了兩個賬戶:轉(zhuǎn)出賬戶和轉(zhuǎn)入賬戶,如果賬戶余額不足,轉(zhuǎn)賬操作必須被駁回。
如果這個服務僅僅在一個服務器上部署,很容易就能夠?qū)崿F(xiàn):使用一個操作鎖來確保“轉(zhuǎn)賬”操作不會同時進行,同時對轉(zhuǎn)出賬戶的進行校驗。然而,銀行不可能僅僅依賴于一個服務器來儲存賬戶余額這樣的關(guān)鍵信息,通常,這些服務都是被分布在多個服務器上的,每一個服務器各自運行著相同代碼的實例。用戶可以通過任何一個服務器來操作賬戶。
在一個簡單的分布式處理系統(tǒng)的實現(xiàn)中,每個服務器都會保存一份賬戶余額的副本。它會處理任何收到的操作,并且將賬戶余額的更新發(fā)送給其他的服務器。但是這種方法有一個嚴重的問題:如果兩個服務器同時對一個賬戶進行操作,哪一個新的賬戶余額是正確的?即使服務器不共享余額而是共享操作,對一個賬戶同時進行轉(zhuǎn)賬操作也可能造成透支。
從根本上來說,這些錯誤的發(fā)生都是由于服務器使用它們本地狀態(tài)來響應操作,而不是首先確保本地狀態(tài)與其他服務器相匹配。比如,想象服務器A接到了從賬號101向賬號202轉(zhuǎn)賬的操作指令,而此時服務器B已經(jīng)處理了另一個把賬號101的錢都轉(zhuǎn)到賬號202的請求,卻沒有通知服務器A。這樣,服務器A的本地狀態(tài)與服務器B不一樣,即使會造成賬戶101透支,服務器A依然允許從賬號101進行轉(zhuǎn)賬操作。
為了防止上述情況發(fā)生我們采用了一種叫做“分布式狀態(tài)機”的工具。它的思路是對每個同樣的輸入,每個服務器都運行同樣的對應的狀態(tài)機。由于狀態(tài)機的特性,對于同樣的輸入每個服務器的輸出都是一樣的。對于像“轉(zhuǎn)賬”、“查詢當前余額”等操作,賬號和余額也都是狀態(tài)機的輸入。
這個應用的狀態(tài)機比較簡單:
def execute_operation(state, operation): if operation.name == 'deposit': if not verify_signature(operation.deposit_signature): return state, False state.accounts[operation.destination_account] += operation.amount return state, True elif operation.name == 'transfer': if state.accounts[operation.source_account] < operation.amount: return state, False state.accounts[operation.source_account] -= operation.amount state.accounts[operation.destination_account] += operation.amount return state, True elif operation.name == 'get-balance': return state, state.accounts[operation.account]
值得注意的是,運行“查詢當前余額”操作時雖然并不會改變當前狀態(tài),但是我們依然把它當做一個狀態(tài)變化操作來實現(xiàn)。這確保了返回的余額是分布式系統(tǒng)中的最新信息,并且不是基于一個服務器上的本地狀態(tài)來進行返回的。
這可能跟你在計算機課程中學習到的典型的狀態(tài)機不太一樣。傳統(tǒng)的狀態(tài)機是一系列有限個狀態(tài)的集合,每個狀態(tài)都與一個標記的轉(zhuǎn)移行為相對應,而在本文中,狀態(tài)機的狀態(tài)是賬戶余額的集合,因此存在無窮多個可能的狀態(tài)。但是,狀態(tài)機的基本規(guī)則同樣適用于本文的狀態(tài)機:對于同樣的初始狀態(tài),同樣的輸入總是有同樣的輸出。
因此,分布式狀態(tài)機確保了對于同樣的操作,每個主機都會有同樣的相應。但是,為了確保每個服務器都允許狀態(tài)機的輸入,前文中提到的問題依然存在。這是一個一致性問題,為了解決它我們采用了一種派生的Paxos算法。
可以為較大的應用程序提供一致性服務: 我們用一個Cluster庫來實現(xiàn)簡化的Multi-Paxos
正確性是這個庫最重要的能力,因此結(jié)構(gòu)化代碼是很重要的,以便我們可以看到并測試它與規(guī)范的對應關(guān)系。
復雜的協(xié)議可能會出現(xiàn)復雜的故障,因此我們將構(gòu)建對復現(xiàn)和調(diào)試不常見的故障的支持。
我們會實現(xiàn)POC代碼:足以證明核心概念是實用的,代碼的結(jié)構(gòu)化是為了后續(xù)添加此功能對核心實現(xiàn)的更改最小
我們開始coding吧。
cluster中的協(xié)議需要使用15不同的消息類型,每種消息類型使用collection中的namedturple定義:
Accepted = namedtuple('Accepted', ['slot', 'ballot_num']) Accept = namedtuple('Accept', ['slot', 'ballot_num', 'proposal']) Decision = namedtuple('Decision', ['slot', 'proposal']) Invoked = namedtuple('Invoked', ['client_id', 'output']) Invoke = namedtuple('Invoke', ['caller', 'client_id', 'input_value']) Join = namedtuple('Join', []) Active = namedtuple('Active', []) Prepare = namedtuple('Prepare', ['ballot_num']) Promise = namedtuple('Promise', ['ballot_num', 'accepted_proposals']) Propose = namedtuple('Propose', ['slot', 'proposal']) Welcome = namedtuple('Welcome', ['state', 'slot', 'decisions']) Decided = namedtuple('Decided', ['slot']) Preempted = namedtuple('Preempted', ['slot', 'preempted_by']) Adopted = namedtuple('Adopted', ['ballot_num', 'accepted_proposals']) Accepting = namedtuple('Accepting', ['leader'])
使用命名元組描述每種消息類型可以保持代碼的clean,并有助于避免一些簡單的錯誤。如果命名元組構(gòu)造函數(shù)沒有被賦予正確的屬性,則它將引發(fā)異常,從而使錯誤變得明顯。元組在日志消息中k可以很好地格式化,不會像字典那樣使用那么多的內(nèi)存。
創(chuàng)建消息:
msg = Accepted(slot=10, ballot_num=30)
訪問消息:
got_ballot_num = msg.ballot_num
后面我們會了解這些消息的含義。
代碼還引入了一些常量,其中大多數(shù)常量定義了各種消息的超時:
JOIN_RETRANSMIT = 0.7 CATCHUP_INTERVAL = 0.6 ACCEPT_RETRANSMIT = 1.0 PREPARE_RETRANSMIT = 1.0 INVOKE_RETRANSMIT = 0.5 LEADER_TIMEOUT = 1.0 NULL_BALLOT = Ballot(-1, -1) # sorts before all real ballots NOOP_PROPOSAL = Proposal(None, None, None) # no-op to fill otherwise empty slots
最后我們需要定義協(xié)議中的Proposal和Ballot
Proposal = namedtuple('Proposal', ['caller', 'client_id', 'input']) Ballot = namedtuple('Ballot', ['n', 'leader'])
實現(xiàn)multi-paxos的核心組件包括Role和Node。
為了保證可測試性并保持代碼的可讀性,我們將Cluster分解為與協(xié)議中描述的角色相對應的幾個類。每個都是Role的子類。
class Role(object): def __init__(self, node): self.node = node self.node.register(self) self.running = True self.logger = node.logger.getChild(type(self).__name__) def set_timer(self, seconds, callback): return self.node.network.set_timer(self.node.address, seconds, lambda: self.running and callback()) def stop(self): self.running = False self.node.unregister(self)
群集節(jié)點的角色由Node類粘在一起,該類代表網(wǎng)絡上的單個節(jié)點。在程序過程中角色將添加到節(jié)點中,并從節(jié)點中刪除。
到達節(jié)點的消息將中繼到所有活動角色,調(diào)用以消息類型命名的方法,前綴為do_。 這些do_方法接收消息的屬性作為關(guān)鍵字參數(shù),以便于訪問。Node``類還提供了``send方法作為方便,使用functools.partial為Network類的相同方法提供一些參數(shù)。
class Node(object): unique_ids = itertools.count() def __init__(self, network, address): self.network = network self.address = address or 'N%d' % self.unique_ids.next() self.logger = SimTimeLogger( logging.getLogger(self.address), {'network': self.network}) self.logger.info('starting') self.roles = [] self.send = functools.partial(self.network.send, self) def register(self, roles): self.roles.append(roles) def unregister(self, roles): self.roles.remove(roles) def receive(self, sender, message): handler_name = 'do_%s' % type(message).__name__ for comp in self.roles[:]: if not hasattr(comp, handler_name): continue comp.logger.debug("received %s from %s", message, sender) fn = getattr(comp, handler_name) fn(sender=sender, **message._asdict())
每個集群成員上都會創(chuàng)建并啟動一個Member對象,提供特定于應用程序的狀態(tài)機和對等項列表。如果成員對象正在加入現(xiàn)有集群,則該成員對象向該節(jié)點添加bootstrap角色,如果正在創(chuàng)建新集群,則該成員對象添加seed。再用Network.run在單獨的線程中運行協(xié)議。
應用程序通過該invoke方法與集群進行交互,從而啟動了狀態(tài)轉(zhuǎn)換, 確定該提議并運行狀態(tài)機后,invoke將返回狀態(tài)機的輸出。該方法使用簡單的同步Queue來等待協(xié)議線程的結(jié)果。
class Member(object): def __init__(self, state_machine, network, peers, seed=None, seed_cls=Seed, bootstrap_cls=Bootstrap): self.network = network self.node = network.new_node() if seed is not None: self.startup_role = seed_cls(self.node, initial_state=seed, peers=peers, execute_fn=state_machine) else: self.startup_role = bootstrap_cls(self.node, execute_fn=state_machine, peers=peers) self.requester = None def start(self): self.startup_role.start() self.thread = threading.Thread(target=self.network.run) self.thread.start() def invoke(self, input_value, request_cls=Requester): assert self.requester is None q = Queue.Queue() self.requester = request_cls(self.node, input_value, q.put) self.requester.start() output = q.get() self.requester = None return output
Paxos協(xié)議中的角色包括:client, acceptor, proposer, learner, and leader。在典型的實現(xiàn)中,單個processor可以同時扮演一個或多個角色。這不會影響協(xié)議的正確性,通常會合并角色以改善協(xié)議中的延遲和/或消息數(shù)量。
下面逐一實現(xiàn)每個角色類
Acceptor 類實現(xiàn)的是Paxos中的 acceptor角色,所以必須存儲最近promise的選票編號,以及每個時段接受的各個slot的proposal,同時需要相應Prepare和Accept消息。 這里的POC實現(xiàn)是一個和協(xié)議可以直接對應的短類,對于acceptor來說Multi-paxos看起來像是簡單的Paxos,只是在message中添加了slot number。
class Acceptor(Role): def __init__(self, node): super(Acceptor, self).__init__(node) self.ballot_num = NULL_BALLOT self.accepted_proposals = {} # {slot: (ballot_num, proposal)} def do_Prepare(self, sender, ballot_num): if ballot_num > self.ballot_num: self.ballot_num = ballot_num # we've heard from a scout, so it might be the next leader self.node.send([self.node.address], Accepting(leader=sender)) self.node.send([sender], Promise( ballot_num=self.ballot_num, accepted_proposals=self.accepted_proposals )) def do_Accept(self, sender, ballot_num, slot, proposal): if ballot_num >= self.ballot_num: self.ballot_num = ballot_num acc = self.accepted_proposals if slot not in acc or acc[slot][0] < ballot_num: acc[slot] = (ballot_num, proposal) self.node.send([sender], Accepted( slot=slot, ballot_num=self.ballot_num))
Replica類是Role類最復雜的子類,對應協(xié)議中的Learner和Proposal角色,它的主要職責是:提出新的proposal;在決定proposal時調(diào)用本地狀態(tài)機;跟蹤當前Leader;以及將新啟動的節(jié)點添加到集群中。
Replica創(chuàng)建新的proposal以響應來自客戶端的“invoke”消息,選擇它認為是未使用的插槽,并向當前l(fā)eader發(fā)送“Propose”消息。如果選定插槽的共識是針對不同proposal,則replica必須使用新插槽re-propose。
下圖顯示Replica的角色控制流程:
Requester Local Rep Current Leader X---------->| | Invoke | X------------>| Propose | |<------------X Decision |<----------X | Decision | | |
Decision消息表示集群已達成共識的插槽, Replica類存儲新的決定并運行狀態(tài)機,直到到達未確定的插槽。Replica從本地狀態(tài)機已處理的提交的slot識別出集群已同意的已決定的slot。如果slot出現(xiàn)亂序,提交的提案可能會滯后,等待下一個空位被決定。提交slot后,每個replica會將操作結(jié)果發(fā)送回一條Invoked消息給請求者。
在某些情況下slot可能沒有有效的提案,也沒有決策,需要狀態(tài)機一個接一個地執(zhí)行slot,因此群集必須就填充slot的內(nèi)容達成共識。為了避免這種可能性,Replica在遇到插槽時會提出“no-op”的proposal。如果最終決定了這樣的proposal,則狀態(tài)機對該slot不執(zhí)行任何操作。
同樣,同一proposal有可能被Decision兩次。對于任何此類重復的proposal,Replica將跳過調(diào)用狀態(tài)機,而不會對該slot執(zhí)行任何狀態(tài)轉(zhuǎn)換。
Replicas需要知道哪個節(jié)點是active leader才能向其發(fā)送Propose消息, 要實現(xiàn)這一目標,每個副本都使用三個信息源跟蹤active leader。
當leader 的角色轉(zhuǎn)換為active時,它會向同一節(jié)點上的副本發(fā)送一條Adopted消息(下圖):
Leader Local Repplica X----------->| Admopted
當acceptor角色向Promise新的leader發(fā)送Accepting消息時,它將消息發(fā)送到其本地副本(下圖)。
Acceptor Local Repplica X----------->| Accepting
active leader將以心跳的形式發(fā)送Active消息。如果在LEADER_TIMEOUT到期之前沒有此類消息到達,則Replica將假定該Leader已死,并轉(zhuǎn)向下一個Leader。在這種情況下,重要的是所有副本都選擇相同的新領(lǐng)導者,我們可以通過對成員進行排序并在列表中選擇下一個leader。
當節(jié)點加入網(wǎng)絡時,Bootstrap將發(fā)送一條Join消息(下圖)。Replica以一條Welcome包含其最新狀態(tài)的消息作為響應,從而使新節(jié)點能夠快速啟用。
BootStrap Replica Replica Replica X---------->| | | Join |<----------X X | Welcome X------------------------>| | Join |<------------------------X | Welcome X-------------------------------------->| Join |<--------------------------------------X Welcome class Replica(Role): def __init__(self, node, execute_fn, state, slot, decisions, peers): super(Replica, self).__init__(node) self.execute_fn = execute_fn self.state = state self.slot = slot self.decisions = decisions self.peers = peers self.proposals = {} # next slot num for a proposal (may lead slot) self.next_slot = slot self.latest_leader = None self.latest_leader_timeout = None # making proposals def do_Invoke(self, sender, caller, client_id, input_value): proposal = Proposal(caller, client_id, input_value) slot = next((s for s, p in self.proposals.iteritems() if p == proposal), None) # propose, or re-propose if this proposal already has a slot self.propose(proposal, slot) def propose(self, proposal, slot=None): """Send (or resend, if slot is specified) a proposal to the leader""" if not slot: slot, self.next_slot = self.next_slot, self.next_slot + 1 self.proposals[slot] = proposal # find a leader we think is working - either the latest we know of, or # ourselves (which may trigger a scout to make us the leader) leader = self.latest_leader or self.node.address self.logger.info( "proposing %s at slot %d to leader %s" % (proposal, slot, leader)) self.node.send([leader], Propose(slot=slot, proposal=proposal)) # handling decided proposals def do_Decision(self, sender, slot, proposal): assert not self.decisions.get(self.slot, None), \ "next slot to commit is already decided" if slot in self.decisions: assert self.decisions[slot] == proposal, \ "slot %d already decided with %r!" % (slot, self.decisions[slot]) return self.decisions[slot] = proposal self.next_slot = max(self.next_slot, slot + 1) # re-propose our proposal in a new slot if it lost its slot and wasn't a no-op our_proposal = self.proposals.get(slot) if (our_proposal is not None and our_proposal != proposal and our_proposal.caller): self.propose(our_proposal) # execute any pending, decided proposals while True: commit_proposal = self.decisions.get(self.slot) if not commit_proposal: break # not decided yet commit_slot, self.slot = self.slot, self.slot + 1 self.commit(commit_slot, commit_proposal) def commit(self, slot, proposal): """Actually commit a proposal that is decided and in sequence""" decided_proposals = [p for s, p in self.decisions.iteritems() if s < slot] if proposal in decided_proposals: self.logger.info( "not committing duplicate proposal %r, slot %d", proposal, slot) return # duplicate self.logger.info("committing %r at slot %d" % (proposal, slot)) if proposal.caller is not None: # perform a client operation self.state, output = self.execute_fn(self.state, proposal.input) self.node.send([proposal.caller], Invoked(client_id=proposal.client_id, output=output)) # tracking the leader def do_Adopted(self, sender, ballot_num, accepted_proposals): self.latest_leader = self.node.address self.leader_alive() def do_Accepting(self, sender, leader): self.latest_leader = leader self.leader_alive() def do_Active(self, sender): if sender != self.latest_leader: return self.leader_alive() def leader_alive(self): if self.latest_leader_timeout: self.latest_leader_timeout.cancel() def reset_leader(): idx = self.peers.index(self.latest_leader) self.latest_leader = self.peers[(idx + 1) % len(self.peers)] self.logger.debug("leader timed out; tring the next one, %s", self.latest_leader) self.latest_leader_timeout = self.set_timer(LEADER_TIMEOUT, reset_leader) # adding new cluster members def do_Join(self, sender): if sender in self.peers: self.node.send([sender], Welcome( state=self.state, slot=self.slot, decisions=self.decisions))
Leader的主要任務是接受Propose要求新投票的消息并做出決定。成功完成協(xié)議的Prepare/Promise部分后Leader將處于“Active狀態(tài)” ?;钴S的Leader可以立即發(fā)送Accept消息以響應Propose。
與按角色分類的模型保持一致,Leader會委派scout和Commander角色來執(zhí)行協(xié)議的每個部分。
class Leader(Role): def __init__(self, node, peers, commander_cls=Commander, scout_cls=Scout): super(Leader, self).__init__(node) self.ballot_num = Ballot(0, node.address) self.active = False self.proposals = {} self.commander_cls = commander_cls self.scout_cls = scout_cls self.scouting = False self.peers = peers def start(self): # reminder others we're active before LEADER_TIMEOUT expires def active(): if self.active: self.node.send(self.peers, Active()) self.set_timer(LEADER_TIMEOUT / 2.0, active) active() def spawn_scout(self): assert not self.scouting self.scouting = True self.scout_cls(self.node, self.ballot_num, self.peers).start() def do_Adopted(self, sender, ballot_num, accepted_proposals): self.scouting = False self.proposals.update(accepted_proposals) # note that we don't re-spawn commanders here; if there are undecided # proposals, the replicas will re-propose self.logger.info("leader becoming active") self.active = True def spawn_commander(self, ballot_num, slot): proposal = self.proposals[slot] self.commander_cls(self.node, ballot_num, slot, proposal, self.peers).start() def do_Preempted(self, sender, slot, preempted_by): if not slot: # from the scout self.scouting = False self.logger.info("leader preempted by %s", preempted_by.leader) self.active = False self.ballot_num = Ballot((preempted_by or self.ballot_num).n + 1, self.ballot_num.leader) def do_Propose(self, sender, slot, proposal): if slot not in self.proposals: if self.active: self.proposals[slot] = proposal self.logger.info("spawning commander for slot %d" % (slot,)) self.spawn_commander(self.ballot_num, slot) else: if not self.scouting: self.logger.info("got PROPOSE when not active - scouting") self.spawn_scout() else: self.logger.info("got PROPOSE while scouting; ignored") else: self.logger.info("got PROPOSE for a slot already being proposed")
Leader想要變?yōu)榛顒訝顟B(tài)時會創(chuàng)建一個Scout角色,以響應Propose在其處于非活動狀態(tài)時收到消息(下圖),Scout發(fā)送(并在必要時重新發(fā)送)Prepare消息,并收集Promise響應,直到聽到消息為止。多數(shù)同行或直到被搶占為止。在通過Adopted或Preempted回復給Leader。
Leader Scout Acceptor Acceptor Acceptor | | | | | | X--------->| | | Prepare | |<---------X | | Promise | X---------------------->| | Prepare | |<----------------------X | Promise | X---------------------------------->| Prepare | |<----------------------------------X Promise |<---------X | | | Adopted class Scout(Role):
def __init__(self, node, ballot_num, peers): super(Scout, self).__init__(node) self.ballot_num = ballot_num self.accepted_proposals = {} self.acceptors = set([]) self.peers = peers self.quorum = len(peers) / 2 + 1 self.retransmit_timer = None def start(self): self.logger.info("scout starting") self.send_prepare() def send_prepare(self): self.node.send(self.peers, Prepare(ballot_num=self.ballot_num)) self.retransmit_timer = self.set_timer(PREPARE_RETRANSMIT, self.send_prepare) def update_accepted(self, accepted_proposals): acc = self.accepted_proposals for slot, (ballot_num, proposal) in accepted_proposals.iteritems(): if slot not in acc or acc[slot][0] < ballot_num: acc[slot] = (ballot_num, proposal) def do_Promise(self, sender, ballot_num, accepted_proposals): if ballot_num == self.ballot_num: self.logger.info("got matching promise; need %d" % self.quorum) self.update_accepted(accepted_proposals) self.acceptors.add(sender) if len(self.acceptors) >= self.quorum: # strip the ballot numbers from self.accepted_proposals, now that it # represents a majority accepted_proposals = \ dict((s, p) for s, (b, p) in self.accepted_proposals.iteritems()) # We're adopted; note that this does *not* mean that no other # leader is active. # Any such conflicts will be handled by the # commanders. self.node.send([self.node.address], Adopted(ballot_num=ballot_num, accepted_proposals=accepted_proposals)) self.stop() else: # this acceptor has promised another leader a higher ballot number, # so we've lost self.node.send([self.node.address], Preempted(slot=None, preempted_by=ballot_num)) self.stop()
Leader為每個有active proposal的slot創(chuàng)建一個Commander角色(下圖)。像Scout一樣,Commander發(fā)送和重新發(fā)送Accept消息,并等待大多數(shù)接受者的回復Accepted或搶占消息。接受建議后,Commander將Decision消息廣播到所有節(jié)點。它用Decided或Preempted響應Leader。
Leader Commander Acceptor Acceptor Acceptor | | | | | | X--------->| | | Accept | |<---------X | | Accepted | X---------------------->| | Accept | |<----------------------X | Accepted | X---------------------------------->| Accept | |<----------------------------------X Accepted |<---------X | | | Decided class Commander(Role):
def __init__(self, node, ballot_num, slot, proposal, peers): super(Commander, self).__init__(node) self.ballot_num = ballot_num self.slot = slot self.proposal = proposal self.acceptors = set([]) self.peers = peers self.quorum = len(peers) / 2 + 1 def start(self): self.node.send(set(self.peers) - self.acceptors, Accept( slot=self.slot, ballot_num=self.ballot_num, proposal=self.proposal)) self.set_timer(ACCEPT_RETRANSMIT, self.start) def finished(self, ballot_num, preempted): if preempted: self.node.send([self.node.address], Preempted(slot=self.slot, preempted_by=ballot_num)) else: self.node.send([self.node.address], Decided(slot=self.slot)) self.stop() def do_Accepted(self, sender, slot, ballot_num): if slot != self.slot: return if ballot_num == self.ballot_num: self.acceptors.add(sender) if len(self.acceptors) < self.quorum: return self.node.send(self.peers, Decision( slot=self.slot, proposal=self.proposal)) self.finished(ballot_num, False) else: self.finished(ballot_num, True)
有一個問題是后續(xù)會介紹的網(wǎng)絡模擬器甚至在節(jié)點內(nèi)的消息上也引入了數(shù)據(jù)包丟失。當所有 Decision消息丟失時,該協(xié)議無法繼續(xù)進行。Replica繼續(xù)重新傳輸Propose消息,但是Leader忽略了這些消息,因為它已經(jīng)對該slot提出了proposal,由于沒有Replica收到Decision所以Replica的catch過程找不到結(jié)果,解決方案是像實際網(wǎng)絡堆棧以西洋確保本地消息始終傳遞成功。
node加入cluster時必須獲取當前的cluster狀態(tài), Bootstrap role循環(huán)每個節(jié)點發(fā)送join消息,知道收到Welcome, Bootstrap的時序圖如下所示:
如果在每個role(replica,leader,acceptor)中實現(xiàn)啟動過程,并等待welcome消息,會把初始化邏輯分散到每個role,測試起來會非常麻煩,最終,我們決定添加bootstrap role,一旦啟動完成,就給node添加每個role,并且將初始狀態(tài)傳遞給他們的構(gòu)造函數(shù)。
class Bootstrap(Role): def __init__(self, node, peers, execute_fn, replica_cls=Replica, acceptor_cls=Acceptor, leader_cls=Leader, commander_cls=Commander, scout_cls=Scout): super(Bootstrap, self).__init__(node) self.execute_fn = execute_fn self.peers = peers self.peers_cycle = itertools.cycle(peers) self.replica_cls = replica_cls self.acceptor_cls = acceptor_cls self.leader_cls = leader_cls self.commander_cls = commander_cls self.scout_cls = scout_cls def start(self): self.join() def join(self): self.node.send([next(self.peers_cycle)], Join()) self.set_timer(JOIN_RETRANSMIT, self.join) def do_Welcome(self, sender, state, slot, decisions): self.acceptor_cls(self.node) self.replica_cls(self.node, execute_fn=self.execute_fn, peers=self.peers, state=state, slot=slot, decisions=decisions) self.leader_cls(self.node, peers=self.peers, commander_cls=self.commander_cls, scout_cls=self.scout_cls).start() self.stop()
看完上述內(nèi)容,你們對python中怎么利用Paxos實現(xiàn)分布式系統(tǒng)有進一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。