您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)Spark Remote Shuffle Service最佳實(shí)踐的示例分析,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
經(jīng)過近半年的上線、運(yùn)營,趣頭條大數(shù)據(jù)團(tuán)隊(duì)和阿里云 EMR 團(tuán)隊(duì)共同開發(fā)的 RSS 可以完美解決 Spark Shuffle 面臨的技術(shù)挑戰(zhàn),為集群的穩(wěn)定性和容器化的落地提供強(qiáng)有力的保證,其業(yè)務(wù)價值主要體現(xiàn)在以下方面:
降本增效效果明顯
SLA顯著提升
作業(yè)執(zhí)行效率顯著提升
架構(gòu)靈活性顯著提升
業(yè)務(wù)場景與現(xiàn)狀
當(dāng)前大數(shù)據(jù)平臺的挑戰(zhàn)和思考
近半年大數(shù)據(jù)平臺主要的業(yè)務(wù)指標(biāo)是降本增效,一方面業(yè)務(wù)方希望離線平臺每天能夠承載更多的作業(yè),另一方面我們自身有降本的需求,如何在降本的前提下支撐更多地業(yè)務(wù)量對于每個技術(shù)人都是非常大地挑戰(zhàn)。熟悉Spark的同學(xué)應(yīng)該非常清楚,在大規(guī)模集群場景下,Spark Shuffle在實(shí)現(xiàn)上有比較大的缺陷,體現(xiàn)在以下的幾個方面:
Spark Shuffle Fetch過程存在大量的網(wǎng)絡(luò)小包,現(xiàn)有的External Shuffle Service設(shè)計并沒有非常細(xì)致的處理這些RPC請求,大規(guī)模場景下會有很多connection reset發(fā)生,導(dǎo)致FetchFailed,從而導(dǎo)致stage重算。
Spark Shuffle Fetch過程存在大量的隨機(jī)讀,大規(guī)模高負(fù)載集群條件下,磁盤IO負(fù)載高、CPU滿載時常發(fā)生,極容易發(fā)生FetchFailed,從而導(dǎo)致stage重算。
重算過程會放大集群的繁忙程度,搶占機(jī)器資源,導(dǎo)致惡性循環(huán)嚴(yán)重,SLA完不成,需要運(yùn)維人員手動將作業(yè)跑在空閑的Label集群。
計算和Shuffle過程架構(gòu)不能拆開,不能把Shuffle限定在指定的集群內(nèi),不能利用部分SSD機(jī)器。
M*N次的shuffle過程:對于10K mapper,5K reducer級別的作業(yè),基本跑不完。
NodeManager和Spark Shuffle Service是同一進(jìn)程,Shuffle過程太重,經(jīng)常導(dǎo)致NodeManager重啟,從而影響Yarn調(diào)度穩(wěn)定性。
以上的這些問題對于Spark研發(fā)同學(xué)是非常痛苦的,好多作業(yè)每天運(yùn)行時長方差會非常大,而且總有一些無法完成的作業(yè),要么業(yè)務(wù)進(jìn)行拆分,要么跑到獨(dú)有的Yarn集群中。除了現(xiàn)有面臨的挑戰(zhàn)之外,我們也在積極構(gòu)建下一代基礎(chǔ)架構(gòu)設(shè)施,隨著云原生Kubernetes概念越來越火,Spark社區(qū)也提供了Spark on Kubernetes版本,相比較于Yarn來說,Kubernetes能夠更好的利用云原生的彈性,提供更加豐富的運(yùn)維、部署、隔離等特性。但是Spark on Kubernetes目前還存在很多問題沒有解決,包括容器內(nèi)的Shuffle方式、動態(tài)資源調(diào)度、調(diào)度性能有限等等。我們針對Kubernetes在趣頭條的落地,主要有以下幾個方面的需求:
實(shí)時集群、OLAP集群和Spark集群之前都是相互獨(dú)立的,怎樣能夠?qū)⑦@些資源形成統(tǒng)一大數(shù)據(jù)資源池。通過Kubernetes的天生隔離特性,更好的實(shí)現(xiàn)離線業(yè)務(wù)與實(shí)時業(yè)務(wù)混部,達(dá)到降本增效目的。
公司的在線業(yè)務(wù)都運(yùn)行在Kubernetes集群中,如何利用在線業(yè)務(wù)和大數(shù)據(jù)業(yè)務(wù)的不同特點(diǎn)進(jìn)行錯峰調(diào)度,達(dá)成ECS的總資源量最少。
希望能夠基于Kubernetes來包容在線服務(wù)、大數(shù)據(jù)、AI等基礎(chǔ)架構(gòu),做到運(yùn)維體系統(tǒng)一化。
因?yàn)槿ゎ^條的大數(shù)據(jù)業(yè)務(wù)目前全都部署在阿里云上,阿里云EMR團(tuán)隊(duì)和趣頭條的大數(shù)據(jù)團(tuán)隊(duì)進(jìn)行了深入技術(shù)共創(chuàng),共同研發(fā)了Remote Shuffle Service(以下簡稱RSS),旨在解決Spark on Yarn層面提到的所有問題,并為Spark跑在Kubernetes上提供Shuffle基礎(chǔ)組件。
Remote Shuffle Service設(shè)計與實(shí)現(xiàn)
基于上述背景,我們與阿里云EMR團(tuán)隊(duì)共同開發(fā)了Remote Shuffle Service。RSS可以提供以下的能力,完美的解決了Spark Shuffle面臨的技術(shù)挑戰(zhàn),為我們集群的穩(wěn)定性和容器化的落地提供了強(qiáng)有力的保證,主要體現(xiàn)在以下幾個方面:
高性能服務(wù)器的設(shè)計思路,不同于Spark原有Shuffle Service,RPC更輕量、通用和穩(wěn)定。
兩副本機(jī)制,能夠保證的Shuffle fetch極小概率(低于0.01%)失敗。
合并shuffle文件,從M*N次shuffle變成N次shuffle,順序讀HDD磁盤會顯著提升shuffle heavy作業(yè)性能。
減少Executor計算時內(nèi)存壓力,避免map過程中Shuffle Spill。
計算與存儲分離架構(gòu),可以將Shuffle Service部署到特殊硬件環(huán)境中,例如SSD機(jī)器,可以保證SLA極高的作業(yè)。
完美解決Spark on Kubernetes方案中對于本地磁盤的依賴。
Spark RSS架構(gòu)包含三個角色: Master, Worker, Client。Master和Worker構(gòu)成服務(wù)端,Client以不侵入的方式集成到Spark ShuffleManager里(RssShuffleManager實(shí)現(xiàn)了ShuffleManager接口)。
Master的主要職責(zé)是資源分配與狀態(tài)管理。
Worker的主要職責(zé)是處理和存儲Shuffle數(shù)據(jù)。
Client的主要職責(zé)是緩存和推送Shuffle數(shù)據(jù)。
整體流程如下所示(其中ResourceManager和MetaService是Master的組件),如圖2。
圖2 RSS整體架構(gòu)圖
3.2.2 實(shí)現(xiàn)流程
下面重點(diǎn)來講一下實(shí)現(xiàn)的流程:
RSS采用Push Style的shuffle模式,每個Mapper持有一個按Partition分界的緩存區(qū),Shuffle數(shù)據(jù)首先寫入緩存區(qū),每當(dāng)某個Partition的緩存滿了即觸發(fā)PushData。
Driver先和Master發(fā)生StageStart的請求,Master接受到該RPC后,會分配對應(yīng)的Worker Partition并返回給Driver,Shuffle Client得到這些元信息后,進(jìn)行后續(xù)的推送數(shù)據(jù)。
Client開始向主副本推送數(shù)據(jù)。主副本W(wǎng)orker收到請求后,把數(shù)據(jù)緩存到本地內(nèi)存,同時把該請求以Pipeline的方式轉(zhuǎn)發(fā)給從副本,從而實(shí)現(xiàn)了2副本機(jī)制。
為了不阻塞PushData的請求,Worker收到PushData請求后會以純異步的方式交由專有的線程池異步處理。根據(jù)該Data所屬的Partition拷貝到事先分配的buffer里,若buffer滿了則觸發(fā)flush。RSS支持多種存儲后端,包括DFS和Local。若后端是DFS,則主從副本只有一方會flush,依靠DFS的雙副本保證容錯;若后端是Local,則主從雙方都會flush。
在所有的Mapper都結(jié)束后,Driver會觸發(fā)StageEnd請求。Master接收到該RPC后,會向所有Worker發(fā)送CommitFiles請求,Worker收到后把屬于該Stage buffer里的數(shù)據(jù)flush到存儲層,close文件,并釋放buffer。Master收到所有響應(yīng)后,記錄每個partition對應(yīng)的文件列表。若CommitFiles請求失敗,則Master標(biāo)記此Stage為DataLost。
在Reduce階段,reduce task首先向Master請求該P(yáng)artition對應(yīng)的文件列表,若返回碼是DataLost,則觸發(fā)Stage重算或直接abort作業(yè)。若返回正常,則直接讀取文件數(shù)據(jù)。
總體來講,RSS的設(shè)計要點(diǎn)總結(jié)為3個層面:
采用PushStyle的方式做shuffle,避免了本地存儲,從而適應(yīng)了計算存儲分離架構(gòu)。
按照reduce做聚合,避免了小文件隨機(jī)讀寫和小數(shù)據(jù)量網(wǎng)絡(luò)請求。
做了2副本,提高了系統(tǒng)穩(wěn)定性。
對于RSS系統(tǒng),容錯性是至關(guān)重要的,我們分為以下幾個維度來實(shí)現(xiàn):
PushData失敗
當(dāng)PushData失敗次數(shù)(Worker掛了,網(wǎng)絡(luò)繁忙,CPU繁忙等)超過MaxRetry后,Client會給Master發(fā)消息請求新的Partition Location,此后本Client都會使用新的Location地址,該階段稱為Revive。
若Revive是因?yàn)镃lient端而非Worker的問題導(dǎo)致,則會產(chǎn)生同一個Partition數(shù)據(jù)分布在不同Worker上的情況,Master的Meta組件會正確處理這種情形。
若發(fā)生WorkerLost,則會導(dǎo)致大量PushData同時失敗,此時會有大量同一Partition的Revive請求打到Master。為了避免給同一個Partition分配過多的Location,Master保證僅有一個Revive請求真正得到處理,其余的請求塞到pending queue里,待Revive處理結(jié)束后返回同一個Location。
Worker宕機(jī)
當(dāng)發(fā)生WorkerLost時,對于該Worker上的副本數(shù)據(jù),Master向其peer發(fā)送CommitFile的請求,然后清理peer上的buffer。若Commit Files失敗,則記錄該Stage為DataLost;若成功,則后續(xù)的PushData通過Revive機(jī)制重新申請Location。
數(shù)據(jù)去重
Speculation task和task重算會導(dǎo)致數(shù)據(jù)重復(fù)。解決辦法是每個PushData的數(shù)據(jù)片里編碼了所屬的mapId,attemptId和batchId,并且Master為每個map task記錄成功commit的attemtpId。read端通過attemptId過濾不同的attempt數(shù)據(jù),并通過batchId過濾同一個attempt的重復(fù)數(shù)據(jù)。
多副本
RSS目前支持DFS和Local兩種存儲后端。
在DFS模式下,ReadPartition失敗會直接導(dǎo)致Stage重算或abort job。在Local模式,ReadPartition失敗會觸發(fā)從peer location讀,若主從都失敗則觸發(fā)Stage重算或abort job。
大家可以看到RSS的設(shè)計中Master是一個單點(diǎn),雖然Master的負(fù)載很小,不會輕易地掛掉,但是這對于線上穩(wěn)定性來說無疑是一個風(fēng)險點(diǎn)。在項(xiàng)目的最初上線階段,我們希望可以通過SubCluster的方式進(jìn)行workaround,即通過部署多套RSS來承載不同的業(yè)務(wù),這樣即使RSS Master宕機(jī),也只會影響有限的一部分業(yè)務(wù)。但是隨著系統(tǒng)的深入使用,我們決定直面問題,引進(jìn)高可用Master。主要的實(shí)現(xiàn)如下:
首先,Master目前的元數(shù)據(jù)比較多,我們可以將一部分與ApplD+ShuffleId本身相關(guān)的元數(shù)據(jù)下沉到Driver的ShuffleManager中,由于元數(shù)據(jù)并不會很多,Driver增加的內(nèi)存開銷非常有限。
另外,關(guān)于全局負(fù)載均衡的元數(shù)據(jù)和調(diào)度相關(guān)的元數(shù)據(jù),我們利用Raft實(shí)現(xiàn)了Master組件的高可用,這樣我們通過部署3或5臺Master,真正的實(shí)現(xiàn)了大規(guī)??蓴U(kuò)展的需求。
實(shí)際效果與分析
團(tuán)隊(duì)針對TeraSort,TPC-DS以及大量的內(nèi)部作業(yè)進(jìn)行了測試,在Reduce階段減少了隨機(jī)讀的開銷,任務(wù)的穩(wěn)定性和性能都有了大幅度提升。
圖3是TeraSort的benchmark,以10T Terasort為例,Shuffle量壓縮后大約5.6T??梢钥闯鲈摿考壍淖鳂I(yè)在RSS場景下,由于Shuffle read變?yōu)轫樞蜃x,性能會有大幅提升。
圖3 TeraSort性能測試(RSS性能更好)
圖4是一個線上實(shí)際脫敏后的Shuffle heavy大作業(yè),之前在混部集群中很小概率可以跑完,每天任務(wù)SLA不能按時達(dá)成,分析原因主要是由于大量的FetchFailed導(dǎo)致stage進(jìn)行重算。使用RSS之后每天可以穩(wěn)定的跑完,2.1T的shuffle也不會出現(xiàn)任何FetchFailed的場景。在更大的數(shù)據(jù)集性能和SLA表現(xiàn)都更為顯著。
圖4 實(shí)際業(yè)務(wù)的作業(yè)stage圖(使用RSS保障穩(wěn)定性和性能)
在大數(shù)據(jù)團(tuán)隊(duì)和阿里云EMR團(tuán)隊(duì)的共同努力下,經(jīng)過近半年的上線、運(yùn)營RSS,以及和業(yè)務(wù)部門的長時間測試,業(yè)務(wù)價值主要體現(xiàn)在以下方面:
降本增效效果明顯,在集群規(guī)模小幅下降的基礎(chǔ)上,支撐了更多的計算任務(wù),TCO成本下降20%。
SLA顯著提升,大規(guī)模Spark Shuffle任務(wù)從跑不完到能跑完,我們能夠?qū)⒉煌琒LA級別作業(yè)合并到同一集群,減小集群節(jié)點(diǎn)數(shù)量,達(dá)到統(tǒng)一管理,縮小成本的目的。原本業(yè)務(wù)方有一部分SLA比較高的作業(yè)在一個獨(dú)有的Yarn集群B中運(yùn)行,由于主Yarn集群A的負(fù)載非常高,如果跑到集群A中,會經(jīng)常的掛掉。利用RSS之后可以放心的將作業(yè)跑到主集群A中,從而釋放掉獨(dú)有Yarn集群B。
作業(yè)執(zhí)行效率顯著提升,跑的慢 -> 跑的快。我們比較了幾個典型的Shuffle heavy作業(yè),一個重要的業(yè)務(wù)線作業(yè)原本需要3小時,RSS版本需要1.6小時。抽取線上5~10個作業(yè),大作業(yè)的性能提升相當(dāng)明顯,不同作業(yè)平均下來有30%以上的性能提升,即使是shuffle量不大的作業(yè),由于比較穩(wěn)定不需要stage重算,長期運(yùn)行平均時間也會減少10%-20%。
架構(gòu)靈活性顯著提升,升級為計算與存儲分離架構(gòu)。Spark在容器中運(yùn)行的過程中,將RSS作為基礎(chǔ)組件,可以使得Spark容器化能夠大規(guī)模的落地,為離線在線統(tǒng)一資源、統(tǒng)一調(diào)度打下了基礎(chǔ)。
關(guān)于Spark Remote Shuffle Service最佳實(shí)踐的示例分析就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。