溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

raft理論與實踐[6]-lab3a-基于raft構(gòu)建分布式容錯kv服務

發(fā)布時間:2020-04-02 16:49:41 來源:網(wǎng)絡 閱讀:596 作者:jonson_jackson 欄目:編程語言

準備工作

  • 閱讀raft論文

  • 閱讀raft理論與實踐[1]-理論篇

  • 閱讀raft理論與實踐[2]-lab2a

  • 閱讀raft理論與實踐[3]-lab2a講解

  • 閱讀raft理論與實踐[4]-lab2b日志復制

  • 閱讀raft理論與實踐[5]-lab2c日志復制

  • 閱讀模擬RPC遠程過程調(diào)用

前言

  • 在之前的文章中,我們實現(xiàn)了raft算法的基本框架

  • 在本實驗中,我們將基于raft算法實現(xiàn)分布式容錯的kv服務器

  • 客戶端用于交互raft服務器

  • kvraft/client.go文件用于書寫我們的客戶端代碼,調(diào)用Clerk的Get/Put/Append方法為系統(tǒng)提供強一致性的保證

  • 這里的強一致性指的是,如果我們一個一個的調(diào)用(而不是并發(fā))Clerk的Get/Put/Append方法,那么我們的系統(tǒng)就好像是只有一個raft服務器存在一樣,并且調(diào)用是序列的,即后面的調(diào)用比前面的調(diào)用后執(zhí)行

  • 對于并發(fā)調(diào)用,最終狀態(tài)可能難以預料,但是必須與這些方法按某種順序序列化后執(zhí)行一次的結(jié)果相同

  • 如果調(diào)用在時間上重疊,則這些調(diào)用是并發(fā)的。例如,如果客戶端X調(diào)用Clerk.Put(),同時客戶端Y調(diào)用Clerk.Append()

  • 同時,后面的方法在執(zhí)行之前,必須保證已經(jīng)觀察到前面所有方法執(zhí)行后的狀態(tài)(技術(shù)上叫做線性化(linearizability))

  • 強一致性保證對應用程序很方便,因為這意味著所有客戶端都看到相同的最新狀態(tài)

  • 對于單個服務器,強一致性相對簡單。多臺的副本服務器卻相對困難,因為所有服務器必須為并發(fā)請求選擇相同的執(zhí)行順序,并且必須避免使用最新狀態(tài)來回復客戶端

本服務實現(xiàn)的功能

  • 本服務支持3種基本的操作,Put(key, value),?Append(key, arg), and?Get(key)

  • 維護著一個簡單的鍵/值對數(shù)據(jù)庫

  • Put(key, value)將數(shù)據(jù)庫中特定key的值綁定為value

  • Append(key, arg)添加,將arg與key對應。如果key的值不存在,則其行為類似于Put

  • Get(key)?獲取當前key的值

  • 在本實驗中,我們將實現(xiàn)服務具體的功能,而不必擔心Raft log日志會無限增長

實驗思路

  • 對lab2中的raft服務器架構(gòu)進行封裝,封裝上一些數(shù)據(jù)庫、數(shù)據(jù)庫快照、并會處理log的具體執(zhí)行邏輯。

  • 對于數(shù)據(jù)庫執(zhí)行的Get/Put/Append方法都對其進行序列化并放入到lab2 raft的體系中,這樣就能保證這些方法的一致性

獲取源代碼

  • 假設讀者已經(jīng)閱讀了準備工作中的一系列文章

  • 在此基礎上我們增加了本實驗的基本框架kvraft文件以及l(fā)inearizability文件

  • 讀者需要在kvraft文件夾中,實驗本實驗的具體功能

  • 獲取實驗代碼如下

git?clone?git@github.com:dreamerjackson/golang-deep-distributed-lab.git
git?reset?--hard???d345b34bc

客戶端

  • Clerk結(jié)構(gòu)體存儲了所有raft服務器的客戶端servers []*labrpc.ClientEnd,因此我們可以通過Clerk結(jié)構(gòu)體與所有raft服務器通信

  • 我們需要為Clerk結(jié)構(gòu)體實現(xiàn)Put(key, value),?Append(key, arg),?Get(key)方法

  • Clerk結(jié)構(gòu)體是我們連接raft服務器的橋梁

  • 注意Clerk必須將方法發(fā)送到當前的leader節(jié)點中,由于其可能并不會知道哪一個節(jié)點為leader,因此需要重試。但是記住保存上一個leader的id會加快這一過程,因為leader在穩(wěn)定的系統(tǒng)里面是不會變的。

  • 客戶端必須要等到此操作不僅為commit,而且已經(jīng)被完全應用后,才能夠返回,這才能夠保證下次get操作能夠得到最新的

  • 需要注意的是,如果raft服務器出現(xiàn)了分區(qū),可能會陷入一直等待,直到分區(qū)消失

補充Clerk

  • leader記錄最后一個leader的序號

  • seq 記錄rpc的序號

  • id記錄客戶端的唯一id

type?Clerk?struct?{
????...
????leader?int???//?remember?last?leader
????seq????int???//?RPC?sequence?number
????id?????int64?//?client?id
}

補充Get方法

  • Get方法會遍歷訪問每一個raft服務,直到找到leader

  • 調(diào)用時會陷入堵塞,等待rpc方法返回

  • 設置有超時時間,一旦超時,會重新發(fā)送

  • 為了保證Get方法到的數(shù)據(jù)是準確最新的,也必須要將其加入到raft算法中

  • 客戶端必須要等到此操作不僅為commit,而且已經(jīng)被完全應用后,才能夠返回,這才能夠保證下次get操作能夠得到最新的。

func?(ck?*Clerk)?Get(key?string)?string?{
????DPrintf("Clerk:?Get:?%q\n",?key)
????cnt?:=?len(ck.servers)
????for?{
????????args?:=?&GetArgs{Key:?key,?ClientID:?ck.id,?SeqNo:?ck.seq}
????????reply?:=?new(GetReply)

????????ck.leader?%=?cnt
????????done?:=?make(chan?bool,?1)
????????go?func()?{
????????????ok?:=?ck.servers[ck.leader].Call("KVServer.Get",?args,?reply)
????????????done?<-?ok
????????}()
????????select?{
????????case?<-time.After(200?*?time.Millisecond):?//?rpc?timeout:?200ms
????????????ck.leader++
????????????continue
????????case?ok?:=?<-done:
????????????if?ok?&&?!reply.WrongLeader?{
????????????????ck.seq++
????????????????if?reply.Err?==?OK?{
????????????????????return?reply.Value
????????????????}
????????????????return?""
????????????}
????????????ck.leader++
????????}
????}

????return?""
}

補充Append和Put方法

  • 調(diào)用同一個PutAppend方法,但是最后一個參數(shù)用于標識具體的操作

func?(ck?*Clerk)?Put(key?string,?value?string)?{
????ck.PutAppend(key,?value,?"Put")
}
func?(ck?*Clerk)?Append(key?string,?value?string)?{
????ck.PutAppend(key,?value,?"Append")
}
  • 和Get方法相似,遍歷訪問每一個raft服務,直到找到leader

  • 調(diào)用時會陷入堵塞,等待rpc方法返回

  • 設置有超時時間,一旦超時,會重新發(fā)送

  • 客戶端必須要等到此操作不僅為commit,而且已經(jīng)被完全應用后,才能夠返回,這才能夠保證下次get操作能夠得到最新的。

func?(ck?*Clerk)?PutAppend(key?string,?value?string,?op?string)?{
????//?You?will?have?to?modify?this?function.
????DPrintf("Clerk:?PutAppend:?%q?=>?(%q,%q)?from:?%d\n",?op,?key,?value,?ck.id)
????cnt?:=?len(ck.servers)
????for?{
????????args?:=?&PutAppendArgs{Key:?key,?Value:?value,?Op:?op,?ClientID:?ck.id,?SeqNo:?ck.seq}
????????reply?:=?new(PutAppendReply)

????????ck.leader?%=?cnt
????????done?:=?make(chan?bool,?1)
????????go?func()?{
????????????ok?:=?ck.servers[ck.leader].Call("KVServer.PutAppend",?args,?reply)
????????????done?<-?ok
????????}()
????????select?{
????????case?<-time.After(200?*?time.Millisecond):?//?rpc?timeout:?200ms
????????????ck.leader++
????????????continue
????????case?ok?:=?<-done:
????????????if?ok?&&?!reply.WrongLeader?&&?reply.Err?==?OK?{
????????????????ck.seq++
????????????????return
????????????}
????????????ck.leader++
????????}
????}
}

Server

  • kvraft/server.go文件用于書寫我們的客戶端代碼

  • KVServer結(jié)構(gòu)是對于之前書寫的raft架構(gòu)的封裝

  • applyCh chan raft.ApplyMsg?用于狀態(tài)虛擬機應用coommit log,執(zhí)行操作

  • db map[string]string?是模擬的一個數(shù)據(jù)庫

  • notifyChs map[int]chan struct{}?commandID => notify chan 狀態(tài)虛擬機應用此command后,會通知此通道

  • duplicate map[int64]*LatestReply?檢測重復請求

type?KVServer?struct?{
????...
????rf??????*raft.Raft
????applyCh?chan?raft.ApplyMsg
????//?Your?definitions?here.
????persist???????*raft.Persister
????db????????????map[string]string
????notifyChs?????map[int]chan?struct{}?//?per?log
????//?duplication?detection?table
????duplicate?map[int64]*LatestReply
}

完成PutAppend、Get方法

  • 下面以PutAppend為例,Get方法類似

  • 檢測當前是否leader狀態(tài)

  • 檢測是否重復請求

  • 將此command通過rf.Start(cmd)?放入raft中

  • select等待直到ch被激活,即command index被此kv服務器應用

  • ch被激活后,需要再次檢測當前節(jié)點是否為leader

    • 如果不是,說明leader更換,立即返回錯誤,這時由于如果不再是leader,那么雖然此kv服務器應用了此command index,但不一定是相同的command

    • 這個時候會堵塞直到序號為commandIndex的命令被應用,但是,如果leader更換,此commandIndex的命令不一定就是我們的當前的命令

    • 但是完全有可能新的leader已經(jīng)應用了此狀態(tài),我們這時候雖然仍然返回錯誤,希望客戶端重試,這是由于操作是冪等的并且重復操作無影響。

    • 優(yōu)化方案是為command指定一個唯一的標識,這樣就能夠明確此特定操作是否被應用


func?(kv?*KVServer)?PutAppend(args?*PutAppendArgs,?reply?*PutAppendReply)?{
????//?Your?code?here.
????//?not?leader
????if?_,?isLeader?:=?kv.rf.GetState();?!isLeader?{
????????reply.WrongLeader?=?true
????????reply.Err?=?""
????????return
????}

????DPrintf("[%d]:?leader?%d?receive?rpc:?PutAppend(%q?=>?(%q,%q),?(%d-%d).\n",?kv.me,?kv.me,
????????args.Op,?args.Key,?args.Value,?args.ClientID,?args.SeqNo)

????kv.mu.Lock()
????//?duplicate?put/append?request
????if?dup,?ok?:=?kv.duplicate[args.ClientID];?ok?{
????????//?filter?duplicate
????????if?args.SeqNo?<=?dup.Seq?{
????????????kv.mu.Unlock()
????????????reply.WrongLeader?=?false
????????????reply.Err?=?OK
????????????return
????????}
????}

????//?new?request
????cmd?:=?Op{Key:?args.Key,?Value:?args.Value,?Op:?args.Op,?ClientID:?args.ClientID,?SeqNo:?args.SeqNo}
????index,?term,?_?:=?kv.rf.Start(cmd)
????ch?:=?make(chan?struct{})
????kv.notifyChs[index]?=?ch
????kv.mu.Unlock()

????reply.WrongLeader?=?false
????reply.Err?=?OK

????//?wait?for?Raft?to?complete?agreement
????select?{
????case?<-ch:
????????//?lose?leadership
????????curTerm,?isLeader?:=?kv.rf.GetState()
????????if?!isLeader?||?term?!=?curTerm?{
????????????reply.WrongLeader?=?true
????????????reply.Err?=?""
????????????return
????????}
????case?<-kv.shutdownCh:
????????return
????}
}

完成對于log的應用操作

  • &lt;-kv.applyCh?是當log成為commit狀態(tài)時,狀態(tài)機對于log的應用操作

  • 本系列構(gòu)建的為kv-raft服務,根據(jù)不同的服務其應用操作的方式不同

  • 下面的操作是簡單的操作內(nèi)存map數(shù)據(jù)庫

  • 同時,將最后一個操作記錄下來,避免同一個log應用了兩次。

func?(kv?*KVServer)?applyDaemon()?{
????for?{
????????select?{
????????case?msg,?ok?:=?<-kv.applyCh:
????????????if?ok?{
????????????????//?have?client's?request??must?filter?duplicate?command
????????????????if?msg.Command?!=?nil?{
????????????????????cmd?:=?msg.Command.(Op)
????????????????????kv.mu.Lock()
????????????????????if?dup,?ok?:=?kv.duplicate[cmd.ClientID];?!ok?||?dup.Seq?<?cmd.SeqNo?{
????????????????????????switch?cmd.Op?{
????????????????????????case?"Get":
????????????????????????????kv.duplicate[cmd.ClientID]?=?&LatestReply{Seq:?cmd.SeqNo,
????????????????????????????????Reply:?GetReply{Value:?kv.db[cmd.Key],}}
????????????????????????case?"Put":
????????????????????????????kv.db[cmd.Key]?=?cmd.Value
????????????????????????????kv.duplicate[cmd.ClientID]?=?&LatestReply{Seq:?cmd.SeqNo,}
????????????????????????case?"Append":
????????????????????????????kv.db[cmd.Key]?+=?cmd.Value
????????????????????????????kv.duplicate[cmd.ClientID]?=?&LatestReply{Seq:?cmd.SeqNo,}
????????????????????????default:
????????????????????????????DPrintf("[%d]:?server?%d?receive?invalid?cmd:?%v\n",?kv.me,?kv.me,?cmd)
????????????????????????????panic("invalid?command?operation")
????????????????????????}
????????????????????????if?ok?{
????????????????????????????DPrintf("[%d]:?server?%d?apply?index:?%d,?cmd:?%v?(client:?%d,?dup?seq:?%d?<?%d)\n",
????????????????????????????????kv.me,?kv.me,?msg.CommandIndex,?cmd,?cmd.ClientID,?dup.Seq,?cmd.SeqNo)
????????????????????????}
????????????????????}
????????????????????//?notify?channel
????????????????????if?notifyCh,?ok?:=?kv.notifyChs[msg.CommandIndex];?ok?&&?notifyCh?!=?nil?{
????????????????????????close(notifyCh)
????????????????????????delete(kv.notifyChs,?msg.CommandIndex)
????????????????????}
????????????????????kv.mu.Unlock()
????????????????}
????????????}
????????}
????}
}

測試

>?go?test?-v?-run=3A
  • 注意,如果上面的測試出現(xiàn)錯誤也不一定是程序本身的問題,可能是單個進程運行多個測試程序帶來的影響

  • 同時,我們可以運行多次避免偶然的影響

  • 因此,如果出現(xiàn)了這種情況,我們可以為單個測試程序獨立的運行n次,保證正確性,下面是每10個測試程序獨立運行,運行n次的腳本

rm?-rf?res
mkdir?res
set?int?j?=?0
for?((i?=?0;?i?<?2;?i++))
do
????for?((c?=?$((i*10));?c?<?$((?(i+1)*10));?c++))
????do
?????????(go?test?-v?-run?TestPersistPartitionUnreliableLinearizable3A)?&>?./res/$c?&
????done

????sleep?40

????if?grep?-nr?"FAIL.*raft.*"?res;?then
????????echo?"fail"
????fi

done

總結(jié)

  • 在本實驗中,我們封裝了lab2a raft框架實現(xiàn)了容錯的kv服務

  • 如果出現(xiàn)了問題,需要仔細查看log,思考問題出現(xiàn)的原因

  • 下一個實驗中,我們將實現(xiàn)日志的壓縮

參考資料

  • 項目鏈接

  • [lab3實驗介紹]nil.csail.mit.edu/6.824)

  • 閱讀raft論文

  • 閱讀raft理論與實踐[1]-理論篇

  • 閱讀raft理論與實踐[2]-lab2a

  • 閱讀raft理論與實踐[3]-lab2a講解

  • 閱讀raft理論與實踐[4]-lab2b日志復制

  • 閱讀raft理論與實踐[5]-lab2c日志復制

  • 閱讀模擬RPC遠程過程調(diào)用


向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI