您好,登錄后才能下訂單哦!
這篇文章運用簡單易懂的例子給大家介紹c語言中Raft的實現(xiàn),代碼非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
1. 簡介
本文介紹一個簡單的Raft實現(xiàn)。如果有看過Raft論文,那么看這個Raft實現(xiàn)會覺得比較輕松,因為Raft論文中把實現(xiàn)的細(xì)節(jié)描述的非常詳細(xì),工程實現(xiàn)基本上就是將Raft論文中的描述用編程語言重新表達一遍。這就是Raft相對于Paxos最大的優(yōu)點,即容易看懂并且容易實現(xiàn)。本文中介紹的Raft實現(xiàn)是用C語言碼成的,除了日志壓縮功能沒有實現(xiàn),其它特性都有實現(xiàn),成員變更機制也做的比較簡單,一次只支持一條配置更改。關(guān)于Raft的原理可以看Raft論文和《Raft理解》。
2.Raft基本概念
2.1 狀態(tài)
raft有三種狀態(tài):Leader,Candidate和Follower。這三種狀態(tài)的轉(zhuǎn)換如下圖所示。只有Leader具有處理客戶請求和向Follower復(fù)制日志的權(quán)利。Candidate是一種Follower向Leader轉(zhuǎn)換的中間狀態(tài),當(dāng)集群中沒有Leader的時候,F(xiàn)ollower進入Candidate狀態(tài),并向集群中發(fā)起投票,獲取到大多數(shù)投票的Follower會變成Leader。
2.2 消息
Raft為了提高協(xié)議的可理解性,消息類型的設(shè)定及其精簡,只有下面兩種請求。[indent]
requestVote 發(fā)起投票請求。Candidate發(fā)起投票時的請求。由集群中其它Follower和Candidate接收處理。
appendEntries 添加日志請求。Leader向Follower添加日志時發(fā)出的請求。[/indent]
2.3 任期號
Raft協(xié)議中使用任期號term來表明時間的新舊關(guān)系,這個term值在每個Leader的任期內(nèi)是不變的,在不同Leader的中是絕對不同且隨時間單調(diào)遞增的。如果一條請求A的term比另一個請求B要大,那么說明請求B是過時的。
3.Raft實現(xiàn)
3.1 協(xié)議
先介紹四個重要數(shù)據(jù)結(jié)構(gòu),對應(yīng)上面提到過的requestVote和appendEntries請求和回復(fù)。
/** requestVote 請求投票 * 競選者Candidate去競選Leader時發(fā)送給其它node的投票請求。 * 其它Leader或者Candidate收到term比自己大的投票請求時,會自動變成Follower*/ typedef struct { /** 當(dāng)前任期號,通過任期號的大小與其它Candidate競爭Leader */ int term; /** 競選者的id */ int candidate_id; /** 競選者本地保存的最新一條日志的index */ int last_log_idx; /** 競選者本地保存的最新一條日志的任期號*/ int last_log_term; } msg_requestvote_t; /** 投票請求的回復(fù)response. * 該response主要是給返回某個node是否接收了Candidate的投票請求. */ typedef struct { /** node的任期號,Candidate根據(jù)投票結(jié)果和node的任期號來更新自己的任期號 */ int term; /** 投票結(jié)果,如果node給Candidate投票則為true */ int vote_granted; } msg_requestvote_response_t; /** 添加日志請求. * Follower可以從該消息中知道哪些日志可以安全地提交到狀態(tài)機FSM中去。 * Leader可以將該消息作為心跳消息定期發(fā)送。 * 舊的Leader和Candidate收到該消息后可能會自動變成Follower */ typedef struct { /** Leader當(dāng)前的任期號 */ int term; /** 最新日志的前一條日志的index,用于Follower確認(rèn)與Leader的日志完全一致 */ int prev_log_idx; /** 最新日志的前一條日志的任期號term */ int prev_log_term; /** leader當(dāng)前已經(jīng)確認(rèn)提交到狀態(tài)機FSM的日志索引index,這意味著Follower也可以安全地將該索引index以前的日志提交 */ int leader_commit; /** 這條添加日志消息攜帶的日志條數(shù),該實現(xiàn)中最多只有一條 */ int n_entries; /** 這條添加日志消息中攜帶的日志數(shù)組 */ msg_entry_t* entries; } msg_appendentries_t; /** 添加日志回復(fù). * 舊的Leader或Candidate收到該消息會變成Follower */ typedef struct { /** 當(dāng)前任期號 */ int term; /** node成功添加日志時返回ture,即prev_log_index和prev_log_term都比對成功。否則返回false */ int success; /* 下面兩個字段不是Raft論文中規(guī)定的字段: /* 用來優(yōu)化日志追加過程,以加速日志的追加。Raft原文中的追加過程是一次只能追加一條日志*/ /** 處理添加日志請求后本地的最大日志索引 */ int current_idx; /** 從添加日志請求中接受的第一條日志索引 */ int first_idx; } msg_appendentries_response_t;
3.2 兩個重要的抽象
raft_server_private_t 該結(jié)構(gòu)體是Raft在實現(xiàn)中的抽象體,保存了Raft協(xié)議運行過程中狀態(tài)和需要的所有數(shù)據(jù)。
typedef struct { /* 所有服務(wù)器比較固定的狀態(tài): */ /* 服務(wù)器最后一次知道的任期號(初始化為 0,持續(xù)遞增) */ int current_term; /* 記錄在當(dāng)前分期內(nèi)給哪個Candidate投過票, */ int voted_for; /* 日志條目集;每一個條目包含一個用戶狀態(tài)機執(zhí)行的指令,和收到時的任期號 */ void* log; /* 變動比較頻繁的變量: */ /* 已知的最大的已經(jīng)被提交的日志條目的索引值 */ int commit_idx; /* 最后被應(yīng)用到狀態(tài)機的日志條目索引值(初始化為 0,持續(xù)遞增) */ int last_applied_idx; /* 三種狀態(tài):follower/leader/candidate */ int state; /* 計時器,周期函數(shù)每次執(zhí)行時會遞增改值 */ int timeout_elapsed; raft_node_t* nodes; int num_nodes; int election_timeout; int request_timeout; /* 保存Leader的信息,沒有Leader時為NULL */ raft_node_t* current_leader; /* callbacks,由調(diào)用該raft實現(xiàn)的調(diào)用者來實現(xiàn),網(wǎng)絡(luò)IO和持久存儲 * 都由調(diào)用者在callback中實現(xiàn) */ raft_cbs_t cb; void* udata; /* 自己的信息 */ raft_node_t* node; /* 該raft實現(xiàn)每次只進行一個服務(wù)器的配置更改,該變量記錄raft server * 是否正在進行配置更改*/ int voting_cfg_change_log_idx; } raft_server_private_t;
raft_node_private_t 集群中機器節(jié)點的抽象體,包含了raft協(xié)議運行過程中需要保存的其它機器上的信息
typedef struct { void* udata; /*一般保存與其它機器的連接信息,由使用者決定怎么實現(xiàn)連接*/ int next_idx; /*對于每一個服務(wù)器,需要發(fā)送給他的下一個日志條目的索引值(初始化為領(lǐng)導(dǎo)人最后索引值加一)*/ int match_idx; /*對于每一個服務(wù)器,已經(jīng)復(fù)制給他的日志的最高索引值*/ int flags; /*有三種取值,是相或的關(guān)系 1:該機器有給我投票 2:該機器有投票權(quán) 3: 該機器有最新的日志*/ int id; /*機器對應(yīng)的id值,這個每臺機器在全局都是唯一的*/ } raft_node_private_t;
3.3 Raft協(xié)議過程
周期函數(shù) Raft需要周期性地做一些事情,比如Leader需要周期性地給其它服務(wù)器append日志,以讓日志落后的服務(wù)器有機會追上來;所有服務(wù)器需要周期性地將已經(jīng)確認(rèn)提交的日志應(yīng)用到狀態(tài)機中去等等。
raft_periodic函數(shù)是該raft實現(xiàn)中被周期性調(diào)用的函數(shù),調(diào)用周期是1000ms。機器在不同狀態(tài)下會在這個函數(shù)中做不同的事情。Leader周期性地向Follower同步日志。而Follower周期性地檢測是否在特定的時間內(nèi)沒有收到過來自Leader的心跳包,如果是的話就變成Candidate開始發(fā)起投票競選Leader。不管是Leader還是Follower,都會周期性地將已經(jīng)提交的日志commit到狀態(tài)機FSM中去。
/** raft周期性執(zhí)行的函數(shù),實現(xiàn)raft中的定時器以及定期應(yīng)用日志到狀態(tài)機 */ int raft_periodic(raft_server_t* me_, int msec_since_last_period) { raft_server_private_t* me = (raft_server_private_t*)me_; /* 選舉計時器;Follower每次收到Leader的心跳后會重置清0,Leader每次發(fā)送日志也會清0 */ me->timeout_elapsed += msec_since_last_period; /* Leader周期性地向Follower同步日志 */ if (me->state == RAFT_STATE_LEADER) { if (me->request_timeout <= me->timeout_elapsed) raft_send_appendentries_all(me_); } /* Follower檢測選舉計時器是否超時 */ else if (me->election_timeout <= me->timeout_elapsed) { if (1 < me->num_nodes) raft_election_start(me_); } /* 周期性地將已經(jīng)確認(rèn)commit的日志應(yīng)用到狀態(tài)機FSM */ if (me->last_applied_idx < me->commit_idx) if (-1 == raft_apply_entry(me_)) return -1; return 0; }
成為競選者Candidate 集群中每個服務(wù)器都有一個競選計時器,當(dāng)一個服務(wù)器在計時器超時時間內(nèi)都沒有收到來自Leader的心跳,則認(rèn)為集群中不存在Leader或者是Leader掛了,該服務(wù)器就會變成Candidate,進而發(fā)起投票去競選Leader,下面raft_become_candidate函數(shù)就是服務(wù)器變成Candidate的函數(shù),函數(shù)中主要做這幾件事情:
自增當(dāng)前的任期號(currentTerm)
給自己投票
重置選舉超時計時器
發(fā)送請求投票的 RPC 給其他所有服務(wù)器
/** Follower成為Candidate執(zhí)行的函數(shù) */ void raft_become_candidate(raft_server_t* me_) { raft_server_private_t* me = (raft_server_private_t*)me_; int i; /*自增當(dāng)前的任期號;給自己投票,設(shè)置自己的狀態(tài)為CANDIDATE*/ raft_set_current_term(me_, raft_get_current_term(me_) + 1); for (i = 0; i < me->num_nodes; i++) raft_node_vote_for_me(me->nodes[i], 0); raft_vote(me_, me->node); me->current_leader = NULL; raft_set_state(me_, RAFT_STATE_CANDIDATE); /* 重置選舉超時計時器。為了防止多個Candidate競爭,將下一次發(fā)起投票的時間間隔設(shè)置成隨機值*/ /* TODO: this should probably be lower */ me->timeout_elapsed = rand() % me->election_timeout; /*發(fā)送請求投票的 RPC 給其他所有服務(wù)器*/ for (i = 0; i < me->num_nodes; i++) if (me->node != me->nodes[i] && raft_node_is_voting(me->nodes[i])) raft_send_requestvote(me_, me->nodes[i]); }
處理投票請求 處理投票請求的邏輯主要就是判斷是否要同意投票,判斷的依據(jù)就是請求中的任期號和日志信息的新舊程度,還有就是自己是否給其它相同任期號的服務(wù)器投過票,如果投過就不能再投,每人只有一票投票權(quán)。
如果term > currentTerm, 則轉(zhuǎn)為Follower模式。
這里收到投票請求的服務(wù)器有可能是一個網(wǎng)絡(luò)狀況不佳的Leader或者是一個還沒來得及發(fā)出投票請求的Candidate,他們收到任期號比自己要新的請求后,都要無條件變成Follower,以保證只有一個Leader存在
如果term < currentTerm返回false。請求中的term比自己的term還要小,說明是一個過時的請求,則不給它投票返回false。
如果 term == currentTerm,請求中的日志信息不比本地日志舊,并且尚未給其它Candidate投過票,那么就投票給他
/** 處理投票請求 */ int raft_recv_requestvote(raft_server_t* me_, raft_node_t* node, msg_requestvote_t* vr, msg_requestvote_response_t *r) { raft_server_private_t* me = (raft_server_private_t*)me_; /*如果請求中term > 本地currentTerm, 則轉(zhuǎn)為Follower模式*/ if (raft_get_current_term(me_) < vr->term) { raft_set_current_term(me_, vr->term); raft_become_follower(me_); } /*如果需要投票,則回復(fù)true,即將r->vote_granted = 1;*/ if (__should_grant_vote(me, vr)) { assert(!(raft_is_leader(me_) || raft_is_candidate(me_))); /*同意投票--本地記錄給哪個服務(wù)器投了票,并設(shè)置response中的vote_granted為1*/ raft_vote_for_nodeid(me_, vr->candidate_id); r->vote_granted = 1; /* there must be in an election. */ me->current_leader = NULL; me->timeout_elapsed = 0; } else r->vote_granted = 0; __log(me_, node, "node requested vote: %d replying: %s", node, r->vote_granted == 1 ? "granted" : "not granted"); /*更新本地保存的任期號,與請求中的保持一致*/ r->term = raft_get_current_term(me_); return 0; } /** 檢查是否滿足投票的條件 */ static int __should_grant_vote(raft_server_private_t* me, msg_requestvote_t* vr) { /**請求中的任期號term比本地term要小,不給投票*/ if (vr->term < raft_get_current_term((void*)me)) return 0; /*如果已經(jīng)投過票了,返回false*/ /* TODO: if voted for is candiate return 1 (if below checks pass) */ if (raft_already_voted((void*)me)) return 0; /* 下面代碼檢查請求中日志信息是否比本地日志新*/ /* 獲取本地最新的日志索引 */ int current_idx = raft_get_current_idx((void*)me); /* 本地日志為空,請求中的日志信息絕對比本地要新,返回true */ if (0 == current_idx) return 1; /* 如果本地最新日志中的任期號比請求中的last_log_term要小,則返回true */ raft_entry_t* e = raft_get_entry_from_idx((void*)me, current_idx); if (e->term < vr->last_log_term) return 1; /* 本地最新日志中的任期號與請求中的last_log_term相等,則比較日志索引,索引比較大的說明日志比較新*/ if (vr->last_log_term == e->term && current_idx <= vr->last_log_idx) return 1; /*果本地最新日志中的任期號比請求中的last_log_term要大,則返回false */ return 0; }
/** 處理投票恢復(fù) */ int raft_recv_requestvote_response(raft_server_t* me_, raft_node_t* node, msg_requestvote_response_t* r) { raft_server_private_t* me = (raft_server_private_t*)me_; __log(me_, node, "node responded to requestvote status: %s", r->vote_granted == 1 ? "granted" : "not granted"); /* Oh~我不是Candidate,直接返回 */ if (!raft_is_candidate(me_)) { return 0; } /* response中的任期號比自己的大,說明自己的term已經(jīng)過時,無條件轉(zhuǎn)為Follower */ else if (raft_get_current_term(me_) < r->term) { raft_set_current_term(me_, r->term); raft_become_follower(me_); return 0; } /* response中的任期號比自己小,說明收到了一個過時的response,忽略即可。 * 當(dāng)網(wǎng)絡(luò)比較差的時候容易出現(xiàn)這種情況 */ else if (raft_get_current_term(me_) != r->term) { return 0; } __log(me_, node, "node responded to requestvote: %d status: %s ct:%d rt:%d", node, r->vote_granted == 1 ? "granted" : "not granted", me->current_term, r->term); /* Yeah~給我投票了 */ if (1 == r->vote_granted) { /* 記錄給自己投票的服務(wù)器信息 */ if (node) raft_node_vote_for_me(node, 1); int votes = raft_get_nvotes_for_me(me_); /* 如果給自己投票的服務(wù)器超過了總數(shù)的一般,則成為Leader */ if (raft_votes_is_majority(me->num_nodes, votes)) raft_become_leader(me_); } return 0; }
添加日志請求 Leader除了在收到客戶端請求后會發(fā)起添加日志請求,還會在周期函數(shù)raft_periodic中發(fā)起添加日志請求。Leader維護了所有Follower的日志情況,如果Follower的日志比較舊,就會周期性地給它發(fā)送添加日志請求。關(guān)于日志怎么同步和保持一致性的原理,可以閱讀raft論文5.3節(jié)--日志復(fù)制。簡單地說就是,Leader在給Follower發(fā)送一條日志N時,會順帶將前一條日志M的信息也帶過去。Follower會檢查請求中前一條日志M的信息與本地相同索引的日志是否吻合,如果吻合說明本地在M以前的所有日志都是和Leader一致的(raft論文中使用遞歸法證明,因為所有日志都是按照同樣的規(guī)則添加的)。
/** 給某個Follower發(fā)送添加日志請求 */ int raft_send_appendentries(raft_server_t* me_, raft_node_t* node) { raft_server_private_t* me = (raft_server_private_t*)me_; assert(node); assert(node != me->node); /* callback函數(shù),實現(xiàn)網(wǎng)絡(luò)發(fā)送功能,由使用該raft實現(xiàn)的調(diào)用者實現(xiàn)網(wǎng)絡(luò)IO功能*/ if (!(me->cb.send_appendentries)) return -1; /* 初始化請求的參數(shù)-- 當(dāng)前任期號、最新日志索引 */ msg_appendentries_t ae; ae.term = me->current_term; ae.leader_commit = raft_get_commit_idx(me_); ae.prev_log_idx = 0; ae.prev_log_term = 0; ae.n_entries = 0; ae.entries = NULL; /* 根據(jù)記錄的Follower的日志信息,獲取要發(fā)給Follower的下一條日志索引 */ int next_idx = raft_node_get_next_idx(node); msg_entry_t mety; /* 添加下一條日志的內(nèi)容*/ raft_entry_t* ety = raft_get_entry_from_idx(me_, next_idx); if (ety) { mety.term = ety->term; mety.id = ety->id; mety.type = ety->type; mety.data.len = ety->data.len; mety.data.buf = ety->data.buf; ae.entries = &mety; // TODO: we want to send more than 1 at a time ae.n_entries = 1; } /* 添加要添加日志的前一條日志信息,用來做日志一致性檢查,關(guān)于怎么保證 * Leader和Follower日志的一致性,可參看raft論文第5.3節(jié)--日志復(fù)制*/ if (1 < next_idx) { raft_entry_t* prev_ety = raft_get_entry_from_idx(me_, next_idx - 1); ae.prev_log_idx = next_idx - 1; if (prev_ety) ae.prev_log_term = prev_ety->term; } __log(me_, node, "sending appendentries node: ci:%d t:%d lc:%d pli:%d plt:%d", raft_get_current_idx(me_), ae.term, ae.leader_commit, ae.prev_log_idx, ae.prev_log_term); /* 調(diào)用callback發(fā)送請求,callback由該raft實現(xiàn)的調(diào)用者來實現(xiàn)*/ me->cb.send_appendentries(me_, me->udata, node, &ae); return 0; }
處理添加日志請求 所有的服務(wù)器都有可能收到添加日志請求,比如過時的Leader和Candidate以及正常運行的Follower。處理添加日志請求的過程主要就是驗證請求中的日志是否比本地日志新的過程。
/* 1. 處理任期號的三種情況(大于等于和小于) 2. 處理prev log不一致的情況,返回包中告訴Leader自己目前的log情況 3. 處理添加日志成功的情況-- 保存新日志并更新current_idx和commit_idx */ int raft_recv_appendentries( raft_server_t* me_, raft_node_t* node, msg_appendentries_t* ae, msg_appendentries_response_t *r ) { raft_server_private_t* me = (raft_server_private_t*)me_; me->timeout_elapsed = 0; if (0 < ae->n_entries) __log(me_, node, "recvd appendentries from: %lx, t:%d ci:%d lc:%d pli:%d plt:%d #%d", node, ae->term, raft_get_current_idx(me_), ae->leader_commit, ae->prev_log_idx, ae->prev_log_term, ae->n_entries); r->term = me->current_term; /* 處理任期號 */ /* currentTerm == ae->term,當(dāng)自己是Candidate時收到term與自己相等的請求, * 說明已經(jīng)有其它Candidate成為了Leader,自己無條件變成Follower*/ if (raft_is_candidate(me_) && me->current_term == ae->term) { me->voted_for = -1; raft_become_follower(me_); } /* currentTerm < ae->term. 自己的任期號已經(jīng)落后Leader,無條件成為Follower,并且更新自己的term*/ else if (me->current_term < ae->term) { raft_set_current_term(me_, ae->term); r->term = ae->term; raft_become_follower(me_); } /* currentTerm > ae->term. 說明收到一個過時Leader的請求,直接回包告訴它最新的term */ else if (ae->term < me->current_term) { /* 1. Reply false if term < currentTerm (?§5.1) */ __log(me_, node, "AE term %d is less than current term %d", ae->term, me->current_term); goto fail_with_current_idx; } /* NOTE: the log starts at 1 */ /* 檢查請求中prev_log_idx日志的term與本地對應(yīng)索引的term是否一致 */ if (0 < ae->prev_log_idx) { raft_entry_t* e = raft_get_entry_from_idx(me_, ae->prev_log_idx); /* 本地在prev_log_idx位置還不存在日志,說明日志已經(jīng)落后Leader了,返回false * 并告訴leader自己當(dāng)前日志的位置,這樣Leader知道下一次該發(fā)哪條日志過來了*/ if (!e) { __log(me_, node, "AE no log at prev_idx %d", ae->prev_log_idx); goto fail_with_current_idx; } if (raft_get_current_idx(me_) < ae->prev_log_idx) goto fail_with_current_idx; /* 本地在prev_log_idx位置的日志的term與請求中的prev_log_term不一致, * 此時本地?zé)o條件刪除本地與請求不一致的日志,并向Leader返回刪除后的日志位置*/ if (e->term != ae->prev_log_term) { __log(me_, node, "AE term doesn't match prev_term (ie. %d vs %d) ci:%d pli:%d", e->term, ae->prev_log_term, raft_get_current_idx(me_), ae->prev_log_idx); assert(me->commit_idx < ae->prev_log_idx); /* Delete all the following log entries because they don't match */ log_delete(me->log, ae->prev_log_idx); r->current_idx = ae->prev_log_idx - 1; goto fail; } } /* 本地的日志比Leader要多。當(dāng)本地服務(wù)器曾經(jīng)是Leader,收到了很多客戶端請求 * 并還沒來得及同步時會出現(xiàn)這種情況。這時本地?zé)o條件刪除比Leader多的日志 */ if (ae->n_entries == 0 && 0 < ae->prev_log_idx && ae->prev_log_idx + 1 < raft_get_current_idx(me_)) { assert(me->commit_idx < ae->prev_log_idx + 1); log_delete(me->log, ae->prev_log_idx + 1); } r->current_idx = ae->prev_log_idx; /* 下面for循環(huán)跳過請求中已經(jīng)在本地添加過的日志*/ int i; for (i = 0; i < ae->n_entries; i++) { msg_entry_t* ety = &ae->entries[i]; int ety_index = ae->prev_log_idx + 1 + i; raft_entry_t* existing_ety = raft_get_entry_from_idx(me_, ety_index); r->current_idx = ety_index; if (existing_ety && existing_ety->term != ety->term) { assert(me->commit_idx < ety_index); log_delete(me->log, ety_index); break; } else if (!existing_ety) break; } /* 下面for循環(huán)將請求中確認(rèn)的新日志添加到本地 */ for (; i < ae->n_entries; i++) { int e = raft_append_entry(me_, &ae->entries[i]); if (-1 == e) goto fail_with_current_idx; r->current_idx = ae->prev_log_idx + 1 + i; } /* 4. 請求中攜帶了Leader已經(jīng)提交到狀態(tài)機的日志索引,本地同樣也更新這個索引,將其 * 設(shè)置為本地最大日志索引和leader_commit中的較小者*/ if (raft_get_commit_idx(me_) < ae->leader_commit) { int last_log_idx = max(raft_get_current_idx(me_), 1); raft_set_commit_idx(me_, min(last_log_idx, ae->leader_commit)); } /* 更新Leader信息 */ me->current_leader = node; r->success = 1; r->first_idx = ae->prev_log_idx + 1; return 0; fail_with_current_idx: r->current_idx = raft_get_current_idx(me_); fail: r->success = 0; r->first_idx = 0; return -1; }
處理添加日志請求回復(fù) Leader收到添加日志回復(fù)后,可以知道下面這些信息:
自己是不是已經(jīng)過時(current_term < response->term即為過時)
follower是否成功添加日志,如果添加失敗,則減小發(fā)給follower的日志索引nextIndex再重試;如果添加成功則更新本地記錄的follower日志信息,并檢查日志是否最新,如果不是最新則繼續(xù)發(fā)送添加日志請求。
新機器的日志添加,詳見3.4節(jié)-- 成員變更
/** 處理添加日志請求回復(fù) * / int raft_recv_appendentries_response(raft_server_t* me_, raft_node_t* node, msg_appendentries_response_t* r) { raft_server_private_t* me = (raft_server_private_t*)me_; __log(me_, node, "received appendentries response %s ci:%d rci:%d 1stidx:%d", r->success == 1 ? "SUCCESS" : "fail", raft_get_current_idx(me_), r->current_idx, r->first_idx); /* 過時的回復(fù) -- 忽略 */ if (r->current_idx != 0 && r->current_idx <= raft_node_get_match_idx(node)) return 0; /* oh~我不是Leader */ if (!raft_is_leader(me_)) return -1; /* 回復(fù)中的term比自己的要大,說明自己是一個過時的Leader,無條件轉(zhuǎn)為Follower */ if (me->current_term < r->term) { raft_set_current_term(me_, r->term); raft_become_follower(me_); return 0; } /* 過時的回復(fù),網(wǎng)絡(luò)狀況不好時會出現(xiàn) */ else if (me->current_term != r->term) return 0; /* stop processing, this is a node we don't have in our configuration */ if (!node) return 0; /* 由于日志不一致導(dǎo)致添加日志不成功*/ if (0 == r->success) { assert(0 <= raft_node_get_next_idx(node)); /* 將nextIdex減*/ int next_idx = raft_node_get_next_idx(node); assert(0 <= next_idx); /* Follower的日志數(shù)量還遠(yuǎn)遠(yuǎn)少于Leader,將nextIdex設(shè)為回復(fù)中的current_idx+1和Leader * 當(dāng)前索引中較小的一個,一般回復(fù)中的current_idx+1會比較小*/ if (r->current_idx < next_idx - 1) raft_node_set_next_idx(node, min(r->current_idx + 1, raft_get_current_idx(me_))); /* Follower的日志數(shù)量和Leader差不多,但是比對前一條日志時失敗,這種情況將next_idx減1 * 重試*/ else raft_node_set_next_idx(node, next_idx - 1); /* 使用更新后的nextIdx重新發(fā)送添加日志請求 */ raft_send_appendentries(me_, node); return 0; } assert(r->current_idx <= raft_get_current_idx(me_)); /* 下面處理添加日志請求的情況 */ /* 更新本地記錄的Follower的日志情況 */ raft_node_set_next_idx(node, r->current_idx + 1); raft_node_set_match_idx(node, r->current_idx); /* 如果是新加入的機器,則判斷它的日志是否是最新,如果達到了最新,則賦予它投票權(quán), * 這里邏輯的詳細(xì)解釋在第3.4節(jié) -- 成員變更*/ if (!raft_node_is_voting(node) && -1 == me->voting_cfg_change_log_idx && raft_get_current_idx(me_) <= r->current_idx + 1 && me->cb.node_has_sufficient_logs && 0 == raft_node_has_sufficient_logs(node) ) { raft_node_set_has_sufficient_logs(node); me->cb.node_has_sufficient_logs(me_, me->udata, node); } /* 如果一條日志回復(fù)成功的數(shù)量超過一半,則將日志提交commit,即允許應(yīng)用到狀態(tài)機 */ int votes = 1; /* include me */ int point = r->current_idx; int i; for (i = 0; i < me->num_nodes; i++) { if (me->node == me->nodes[i] || !raft_node_is_voting(me->nodes[i])) continue; int match_idx = raft_node_get_match_idx(me->nodes[i]); if (0 < match_idx) { raft_entry_t* ety = raft_get_entry_from_idx(me_, match_idx); /*如果follower已經(jīng)添加了索引大于等于r->current_idx的日志,則vote加1*/ if (ety->term == me->current_term && point <= match_idx) votes++; } } /* 投票數(shù)大于所有服務(wù)器的一半,則將日志提交 */ if (me->num_nodes / 2 < votes && raft_get_commit_idx(me_) < point) raft_set_commit_idx(me_, point); /* 如果follower的日志還沒有最新,那么繼續(xù)發(fā)送添加日志請求 */ if (raft_get_entry_from_idx(me_, raft_node_get_next_idx(node))) raft_send_appendentries(me_, node); /* periodic applies committed entries lazily */ return 0; }
3.3 成員變更
成員的變更都是以日志的形式下發(fā)的。添加的新成員分兩階段進行,第一階段中新成員沒有有投票權(quán),但是有接收日志的權(quán)力;當(dāng)它的日志同步到最新后就進入到第二階段,由Leader賦予投票權(quán),從而成為集群中完整的一員。刪除成員相對比較簡單,所有服務(wù)器收到刪除成員的日志后,立馬將該成員的信息從本地抹除。
添加成員過程
管理員向Leader發(fā)送添加成員命令
Leader添加一條 RAFT_LOGTYPE_ADD_NONVOTING_NODE日志,即添加沒有投票權(quán)的服務(wù)器。該日志與其它普通日志一樣同步給集群中其它服務(wù)器。收到該日志的服務(wù)器在本地保存該新成員的信息。
當(dāng)新成員的日志同步到最新后,Leader添加一條 RAFT_LOGTYPE_ADD_NODE日志,即有投票權(quán)的服務(wù)器,同樣地,該日志與其它普通日志一樣同步給集群中其它服務(wù)器。收到該日志的服務(wù)器在本地保存該新成員的信息,以后的投票活動會將新成員考慮進去。
刪除成員過程
管理員向Leader發(fā)送刪除成員命令。
Leader添加一條 RAFT_LOGTYPE_REMOVE_NODE 日志,并跟普通日志一樣同步給其它服務(wù)器。收到該日志的服務(wù)器立即將被成員信息從本地刪除。
關(guān)于c語言中Raft的實現(xiàn)就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。