溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

如何進(jìn)行Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐

發(fā)布時(shí)間:2021-12-17 13:43:58 來(lái)源:億速云 閱讀:164 作者:柒染 欄目:大數(shù)據(jù)

本篇文章給大家分享的是有關(guān)如何進(jìn)行Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐,小編覺(jué)得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話(huà)不多說(shuō),跟著小編一起來(lái)看看吧。

Spark SQL是Apache Spark最廣泛使用的一個(gè)組件,它提供了非常友好的接口來(lái)分布式處理結(jié)構(gòu)化數(shù)據(jù),在很多應(yīng)用領(lǐng)域都有成功的生產(chǎn)實(shí)踐,但是在超大規(guī)模集群和數(shù)據(jù)集上,Spark SQL仍然遇到不少易用性和可擴(kuò)展性的挑戰(zhàn)。為了應(yīng)對(duì)這些挑戰(zhàn),英特爾大數(shù)據(jù)技術(shù)團(tuán)隊(duì)和百度大數(shù)據(jù)基礎(chǔ)架構(gòu)部工程師在Spark 社區(qū)版本的基礎(chǔ)上,改進(jìn)并實(shí)現(xiàn)了自適應(yīng)執(zhí)行引擎。本文首先討論Spark SQL在大規(guī)模數(shù)據(jù)集上遇到的挑戰(zhàn),然后介紹自適應(yīng)執(zhí)行的背景和基本架構(gòu),以及自適應(yīng)執(zhí)行如何應(yīng)對(duì)Spark SQL這些問(wèn)題,最后我們將比較自適應(yīng)執(zhí)行和現(xiàn)有的社區(qū)版本Spark SQL在100 TB 規(guī)模TPC-DS基準(zhǔn)測(cè)試碰到的挑戰(zhàn)和性能差異,以及自適應(yīng)執(zhí)行在Baidu Big SQL平臺(tái)的使用情況。

挑戰(zhàn)1:關(guān)于shuffle partition數(shù)

在Spark SQL中, shufflepartition數(shù)可以通過(guò)參數(shù)spark.sql.shuffle.partition來(lái)設(shè)置,默認(rèn)值是200。這個(gè)參數(shù)決定了SQL作業(yè)每個(gè)reduce階段任務(wù)數(shù)量,對(duì)整個(gè)查詢(xún)性能有很大影響。假設(shè)一個(gè)查詢(xún)運(yùn)行前申請(qǐng)了E個(gè)Executor,每個(gè)Executor包含C個(gè)core(并發(fā)執(zhí)行線(xiàn)程數(shù)),那么該作業(yè)在運(yùn)行時(shí)可以并行執(zhí)行的任務(wù)數(shù)就等于E x C個(gè),或者說(shuō)該作業(yè)的并發(fā)數(shù)是E x C。假設(shè)shuffle partition個(gè)數(shù)為P,除了map stage的任務(wù)數(shù)和原始數(shù)據(jù)的文件數(shù)量以及大小相關(guān),后續(xù)的每個(gè)reduce stage的任務(wù)數(shù)都是P。由于Spark作業(yè)調(diào)度是搶占式的,E x C個(gè)并發(fā)任務(wù)執(zhí)行單元會(huì)搶占執(zhí)行P個(gè)任務(wù),“能者多勞”,直至所有任務(wù)完成,則進(jìn)入到下一個(gè)Stage。但這個(gè)過(guò)程中,如果有任務(wù)因?yàn)樘幚頂?shù)據(jù)量過(guò)大(例如:數(shù)據(jù)傾斜導(dǎo)致大量數(shù)據(jù)被劃分到同一個(gè)reducer partition)或者其它原因造成該任務(wù)執(zhí)行時(shí)間過(guò)長(zhǎng),一方面會(huì)導(dǎo)致整個(gè)stage執(zhí)行時(shí)間變長(zhǎng),另一方面E x C個(gè)并發(fā)執(zhí)行單元大部分可能都處于空閑等待狀態(tài),集群資源整體利用率急劇下降。

那么spark.sql.shuffle.partition參數(shù)究竟是多少比較合適?如果設(shè)置過(guò)小,分配給每一個(gè)reduce任務(wù)處理的數(shù)據(jù)量就越多,在內(nèi)存大小有限的情況下,不得不溢寫(xiě)(spill)到計(jì)算節(jié)點(diǎn)本地磁盤(pán)上。Spill會(huì)導(dǎo)致額外的磁盤(pán)讀寫(xiě),影響整個(gè)SQL查詢(xún)的性能,更差的情況還可能導(dǎo)致嚴(yán)重的GC問(wèn)題甚至是OOM。相反,如果shuffle partition設(shè)置過(guò)大。第一,每一個(gè)reduce任務(wù)處理的數(shù)據(jù)量很小并且很快結(jié)束,進(jìn)而導(dǎo)致Spark任務(wù)調(diào)度負(fù)擔(dān)變大。第二,每一個(gè)mapper任務(wù)必須把自己的shuffle輸出數(shù)據(jù)分成P個(gè)hash bucket,即確定數(shù)據(jù)屬于哪一個(gè)reduce partition,當(dāng)shuffle partition數(shù)量太多時(shí),hash bucket里數(shù)據(jù)量會(huì)很小,在作業(yè)并發(fā)數(shù)很大時(shí),reduce任務(wù)shuffle拉取數(shù)據(jù)會(huì)造成一定程度的隨機(jī)小數(shù)據(jù)讀操作,當(dāng)使用機(jī)械硬盤(pán)作為shuffle數(shù)據(jù)臨時(shí)存取的時(shí)候性能下降會(huì)更加明顯。最后,當(dāng)最后一個(gè)stage保存數(shù)據(jù)時(shí)會(huì)寫(xiě)出P個(gè)文件,也可能會(huì)造成HDFS文件系統(tǒng)中大量的小文件。

從上,shuffle partition的設(shè)置既不能太小也不能太大。為了達(dá)到最佳的性能,往往需要經(jīng)多次試驗(yàn)才能確定某個(gè)SQL查詢(xún)最佳的shuffle partition值。然而在生產(chǎn)環(huán)境中,往往SQL以定時(shí)作業(yè)的方式處理不同時(shí)間段的數(shù)據(jù),數(shù)據(jù)量大小可能變化很大,我們也無(wú)法為每一個(gè)SQL查詢(xún)?nèi)プ龊臅r(shí)的人工調(diào)優(yōu),這也意味這些SQL作業(yè)很難以最佳的性能方式運(yùn)行。

Shuffle partition的另外一個(gè)問(wèn)題是,同一個(gè)shuffle partition數(shù)設(shè)置將應(yīng)用到所有的stage。Spark在執(zhí)行一個(gè)SQL作業(yè)時(shí),會(huì)劃分成多個(gè)stage。通常情況下,每個(gè)stage的數(shù)據(jù)分布和大小可能都不太一樣,全局的shuffle partition設(shè)置最多只能對(duì)某個(gè)或者某些stage最優(yōu),沒(méi)有辦法做到全局所有的stage設(shè)置最優(yōu)。

這一系列關(guān)于shufflepartition的性能和易用性挑戰(zhàn),促使我們思考新的方法:我們能否根據(jù)運(yùn)行時(shí)獲取的shuffle數(shù)據(jù)量信息,例如數(shù)據(jù)塊大小,記錄行數(shù)等等,自動(dòng)為每一個(gè)stage設(shè)置合適的shuffle partition值?

挑戰(zhàn)2:Spark SQL最佳執(zhí)行計(jì)劃

Spark SQL在執(zhí)行SQL之前,會(huì)將SQL或者Dataset程序解析成邏輯計(jì)劃,然后經(jīng)歷一系列的優(yōu)化,最后確定一個(gè)可執(zhí)行的物理計(jì)劃。最終選擇的物理計(jì)劃的不同對(duì)性能有很大的影響。如何選擇最佳的執(zhí)行計(jì)劃,這便是Spark SQL的Catalyst優(yōu)化器的核心工作。Catalyst早期主要是基于規(guī)則的優(yōu)化器(RBO),在Spark 2.2中又加入了基于代價(jià)的優(yōu)化(CBO)。目前執(zhí)行計(jì)劃的確定是在計(jì)劃階段,一旦確認(rèn)以后便不再改變。然而在運(yùn)行期間,當(dāng)我們獲取到更多運(yùn)行時(shí)信息時(shí),我們將有可能得到一個(gè)更佳的執(zhí)行計(jì)劃。

以join操作為例,在Spark中最常見(jiàn)的策略是BroadcastHashJoin和SortMergeJoin。BroadcastHashJoin屬于map side join,其原理是當(dāng)其中一張表存儲(chǔ)空間大小小于broadcast閾值時(shí),Spark選擇將這張小表廣播到每一個(gè)Executor上,然后在map階段,每一個(gè)mapper讀取大表的一個(gè)分片,并且和整張小表進(jìn)行join,整個(gè)過(guò)程中避免了把大表的數(shù)據(jù)在集群中進(jìn)行shuffle。而SortMergeJoin在map階段2張數(shù)據(jù)表都按相同的分區(qū)方式進(jìn)行shuffle寫(xiě),reduce階段每個(gè)reducer將兩張表屬于對(duì)應(yīng)partition的數(shù)據(jù)拉取到同一個(gè)任務(wù)中做join。RBO根據(jù)數(shù)據(jù)的大小,盡可能把join操作優(yōu)化成BroadcastHashJoin。Spark中使用參數(shù)spark.sql.autoBroadcastJoinThreshold來(lái)控制選擇BroadcastHashJoin的閾值,默認(rèn)是10MB。然而對(duì)于復(fù)雜的SQL查詢(xún),它可能使用中間結(jié)果來(lái)作為join的輸入,在計(jì)劃階段,Spark并不能精確地知道join中兩表的大小或者會(huì)錯(cuò)誤地估計(jì)它們的大小,以致于錯(cuò)失了使用BroadcastHashJoin策略來(lái)優(yōu)化join執(zhí)行的機(jī)會(huì)。但是在運(yùn)行時(shí),通過(guò)從shuffle寫(xiě)得到的信息,我們可以動(dòng)態(tài)地選用BroadcastHashJoin。以下是一個(gè)例子,join一邊的輸入大小只有600K,但Spark仍然規(guī)劃成SortMergeJoin。

如何進(jìn)行Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐

圖1

這促使我們思考第二個(gè)問(wèn)題:我們能否通過(guò)運(yùn)行時(shí)收集到的信息,來(lái)動(dòng)態(tài)地調(diào)整執(zhí)行計(jì)劃?

挑戰(zhàn)3:數(shù)據(jù)傾斜

數(shù)據(jù)傾斜是常見(jiàn)的導(dǎo)致Spark SQL性能變差的問(wèn)題。數(shù)據(jù)傾斜是指某一個(gè)partition的數(shù)據(jù)量遠(yuǎn)遠(yuǎn)大于其它partition的數(shù)據(jù),導(dǎo)致個(gè)別任務(wù)的運(yùn)行時(shí)間遠(yuǎn)遠(yuǎn)大于其它任務(wù),因此拖累了整個(gè)SQL的運(yùn)行時(shí)間。在實(shí)際SQL作業(yè)中,數(shù)據(jù)傾斜很常見(jiàn),join key對(duì)應(yīng)的hash bucket總是會(huì)出現(xiàn)記錄數(shù)不太平均的情況,在極端情況下,相同join key對(duì)應(yīng)的記錄數(shù)特別多,大量的數(shù)據(jù)必然被分到同一個(gè)partition因而造成數(shù)據(jù)嚴(yán)重傾斜。如圖2,可以看到大部分任務(wù)3秒左右就完成了,而最慢的任務(wù)卻花了4分鐘,它處理的數(shù)據(jù)量卻是其它任務(wù)的若干倍。

如何進(jìn)行Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐

圖2

目前,處理join時(shí)數(shù)據(jù)傾斜的一些常見(jiàn)手段有: (1)增加shuffle partition數(shù)量,期望原本分在同一個(gè)partition中的數(shù)據(jù)可以被分散到多個(gè)partition中,但是對(duì)于同key的數(shù)據(jù)沒(méi)有作用。(2)調(diào)大BroadcastHashJoin的閾值,在某些場(chǎng)景下可以把SortMergeJoin轉(zhuǎn)化成BroadcastHashJoin而避免shuffle產(chǎn)生的數(shù)據(jù)傾斜。(3)手動(dòng)過(guò)濾傾斜的key,并且對(duì)這些數(shù)據(jù)加入隨機(jī)的前綴,在另一張表中這些key對(duì)應(yīng)的數(shù)據(jù)也相應(yīng)的膨脹處理,然后再做join。綜上,這些手段都有各自的局限性并且涉及很多的人為處理?;诖耍覀兯伎剂说谌齻€(gè)問(wèn)題:Spark能否在運(yùn)行時(shí)自動(dòng)地處理join中的數(shù)據(jù)傾斜?

自適應(yīng)執(zhí)行背景和簡(jiǎn)介

早在2015年,Spark社區(qū)就提出了自適應(yīng)執(zhí)行的基本想法,在Spark的DAGScheduler中增加了提交單個(gè)map stage的接口,并且在實(shí)現(xiàn)運(yùn)行時(shí)調(diào)整shuffle partition數(shù)量上做了嘗試。但目前該實(shí)現(xiàn)有一定的局限性,在某些場(chǎng)景下會(huì)引入更多的shuffle,即更多的stage,對(duì)于三表在同一個(gè)stage中做join等情況也無(wú)法很好的處理。所以該功能一直處于實(shí)驗(yàn)階段,配置參數(shù)也沒(méi)有在官方文檔中提及。

基于這些社區(qū)的工作,英特爾大數(shù)據(jù)技術(shù)團(tuán)隊(duì)對(duì)自適應(yīng)執(zhí)行做了重新的設(shè)計(jì),實(shí)現(xiàn)了一個(gè)更為靈活的自適性執(zhí)行框架。在這個(gè)框架下面,我們可以添加額外的規(guī)則,來(lái)實(shí)現(xiàn)更多的功能。目前,已實(shí)現(xiàn)的特性包括:自動(dòng)設(shè)置shuffle partition數(shù),動(dòng)態(tài)調(diào)整執(zhí)行計(jì)劃,動(dòng)態(tài)處理數(shù)據(jù)傾斜等等。

自適應(yīng)執(zhí)行架構(gòu)

在Spark SQL中,當(dāng)Spark確定最后的物理執(zhí)行計(jì)劃后,根據(jù)每一個(gè)operator對(duì)RDD的轉(zhuǎn)換定義,它會(huì)生成一個(gè)RDD的DAG圖。之后Spark基于DAG圖靜態(tài)劃分stage并且提交執(zhí)行,所以一旦執(zhí)行計(jì)劃確定后,在運(yùn)行階段無(wú)法再更新。自適應(yīng)執(zhí)行的基本思路是在執(zhí)行計(jì)劃中事先劃分好stage,然后按stage提交執(zhí)行,在運(yùn)行時(shí)收集當(dāng)前stage的shuffle統(tǒng)計(jì)信息,以此來(lái)優(yōu)化下一個(gè)stage的執(zhí)行計(jì)劃,然后再提交執(zhí)行后續(xù)的stage。

如何進(jìn)行Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐

圖3

從圖3中我們可以看出自適應(yīng)執(zhí)行的工作方法,首先以Exchange節(jié)點(diǎn)作為分界將執(zhí)行計(jì)劃這棵樹(shù)劃分成多個(gè)QueryStage(Exchange節(jié)點(diǎn)在Spark SQL中代表shuffle)。每一個(gè)QueryStage都是一棵獨(dú)立的子樹(shù),也是一個(gè)獨(dú)立的執(zhí)行單元。在加入QueryStage的同時(shí),我們也加入一個(gè)QueryStageInput的葉子節(jié)點(diǎn),作為父親QueryStage的輸入。例如對(duì)于圖中兩表join的執(zhí)行計(jì)劃來(lái)說(shuō)我們會(huì)創(chuàng)建3個(gè)QueryStage。最后一個(gè)QueryStage中的執(zhí)行計(jì)劃是join本身,它有2個(gè)QueryStageInput代表它的輸入,分別指向2個(gè)孩子的QueryStage。在執(zhí)行QueryStage時(shí),我們首先提交它的孩子stage,并且收集這些stage運(yùn)行時(shí)的信息。當(dāng)這些孩子stage運(yùn)行完畢后,我們可以知道它們的大小等信息,以此來(lái)判斷QueryStage中的計(jì)劃是否可以?xún)?yōu)化更新。例如當(dāng)我們獲知某一張表的大小是5M,它小于broadcast的閾值時(shí),我們可以將SortMergeJoin轉(zhuǎn)化成BroadcastHashJoin來(lái)優(yōu)化當(dāng)前的執(zhí)行計(jì)劃。我們也可以根據(jù)孩子stage產(chǎn)生的shuffle數(shù)據(jù)量,來(lái)動(dòng)態(tài)地調(diào)整該stage的reducer個(gè)數(shù)。在完成一系列的優(yōu)化處理后,最終我們?yōu)樵換ueryStage生成RDD的DAG圖,并且提交給DAG Scheduler來(lái)執(zhí)行。

自動(dòng)設(shè)置reducer個(gè)數(shù)

假設(shè)我們?cè)O(shè)置的shufflepartition個(gè)數(shù)為5,在map stage結(jié)束之后,我們知道每一個(gè)partition的大小分別是70MB,30MB,20MB,10MB和50MB。假設(shè)我們?cè)O(shè)置每一個(gè)reducer處理的目標(biāo)數(shù)據(jù)量是64MB,那么在運(yùn)行時(shí),我們可以實(shí)際使用3個(gè)reducer。第一個(gè)reducer處理partition 0 (70MB),第二個(gè)reducer處理連續(xù)的partition 1 到3,共60MB,第三個(gè)reducer處理partition 4 (50MB),如圖4所示。

如何進(jìn)行Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐

圖4

在自適應(yīng)執(zhí)行的框架中,因?yàn)槊總€(gè)QueryStage都知道自己所有的孩子stage,因此在調(diào)整reducer個(gè)數(shù)時(shí),可以考慮到所有的stage輸入。另外,我們也可以將記錄條數(shù)作為一個(gè)reducer處理的目標(biāo)值。因?yàn)閟huffle的數(shù)據(jù)往往都是經(jīng)過(guò)壓縮的,有時(shí)partition的數(shù)據(jù)量并不大,但解壓后記錄條數(shù)確遠(yuǎn)遠(yuǎn)大于其它partition,造成數(shù)據(jù)不均。所以同時(shí)考慮數(shù)據(jù)大小和記錄條數(shù)可以更好地決定reducer的個(gè)數(shù)。

動(dòng)態(tài)調(diào)整執(zhí)行計(jì)劃

目前我們支持在運(yùn)行時(shí)動(dòng)態(tài)調(diào)整join的策略,在滿(mǎn)足條件的情況下,即一張表小于Broadcast閾值,可以將SortMergeJoin轉(zhuǎn)化成BroadcastHashJoin。由于SortMergeJoin和BroadcastHashJoin輸出的partition情況并不相同,隨意轉(zhuǎn)換可能在下一個(gè)stage引入額外的shuffle操作。因此我們?cè)趧?dòng)態(tài)調(diào)整join策略時(shí),遵循一個(gè)規(guī)則,即在不引入額外shuffle的前提下才進(jìn)行轉(zhuǎn)換。

將SortMergeJoin轉(zhuǎn)化成BroadcastHashJoin有哪些好處呢?因?yàn)閿?shù)據(jù)已經(jīng)shuffle寫(xiě)到磁盤(pán)上,我們?nèi)匀恍枰猻huffle讀取這些數(shù)據(jù)。我們可以看看圖5的例子,假設(shè)A表和B表join,map階段2張表各有2個(gè)map任務(wù),并且shuffle partition個(gè)數(shù)為5。如果做SortMergeJoin,在reduce階段需要啟動(dòng)5個(gè)reducer,每個(gè)reducer通過(guò)網(wǎng)絡(luò)shuffle讀取屬于自己的數(shù)據(jù)。然而,當(dāng)我們?cè)谶\(yùn)行時(shí)發(fā)現(xiàn)B表可以broadcast,并且將其轉(zhuǎn)換成BroadcastHashJoin之后,我們只需要啟動(dòng)2個(gè)reducer,每一個(gè)reducer讀取一個(gè)mapper的整個(gè)shuffle output文件。當(dāng)我們調(diào)度這2個(gè)reducer任務(wù)時(shí),可以?xún)?yōu)先將其調(diào)度在運(yùn)行mapper的Executor上,因此整個(gè)shuffle讀變成了本地讀取,沒(méi)有數(shù)據(jù)通過(guò)網(wǎng)絡(luò)傳輸。并且讀取一個(gè)文件這樣的順序讀,相比原先shuffle時(shí)隨機(jī)的小文件讀,效率也更勝一籌。另外,SortMergeJoin過(guò)程中往往會(huì)出現(xiàn)不同程度的數(shù)據(jù)傾斜問(wèn)題,拖慢整體的運(yùn)行時(shí)間。而轉(zhuǎn)換成BroadcastHashJoin后,數(shù)據(jù)量一般比較均勻,也就避免了傾斜,我們可以在下文實(shí)驗(yàn)結(jié)果中看到更具體的信息。

如何進(jìn)行Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐

圖5

動(dòng)態(tài)處理數(shù)據(jù)傾斜

在自適應(yīng)執(zhí)行的框架下,我們可以在運(yùn)行時(shí)很容易地檢測(cè)出有數(shù)據(jù)傾斜的partition。當(dāng)執(zhí)行某個(gè)stage時(shí),我們收集該stage每個(gè)mapper 的shuffle數(shù)據(jù)大小和記錄條數(shù)。如果某一個(gè)partition的數(shù)據(jù)量或者記錄條數(shù)超過(guò)中位數(shù)的N倍,并且大于某個(gè)預(yù)先配置的閾值,我們就認(rèn)為這是一個(gè)數(shù)據(jù)傾斜的partition,需要進(jìn)行特殊的處理。

如何進(jìn)行Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐

圖6

假設(shè)我們A表和B表做inner join,并且A表中第0個(gè)partition是一個(gè)傾斜的partition。一般情況下,A表和B表中partition 0的數(shù)據(jù)都會(huì)shuffle到同一個(gè)reducer中進(jìn)行處理,由于這個(gè)reducer需要通過(guò)網(wǎng)絡(luò)拉取大量的數(shù)據(jù)并且進(jìn)行處理,它會(huì)成為一個(gè)最慢的任務(wù)拖慢整體的性能。在自適應(yīng)執(zhí)行框架下,一旦我們發(fā)現(xiàn)A表的partition 0發(fā)生傾斜,我們隨后使用N個(gè)任務(wù)去處理該partition。每個(gè)任務(wù)只讀取若干個(gè)mapper的shuffle 輸出文件,然后讀取B表partition 0的數(shù)據(jù)做join。最后,我們將N個(gè)任務(wù)join的結(jié)果通過(guò)Union操作合并起來(lái)。為了實(shí)現(xiàn)這樣的處理,我們對(duì)shuffle read的接口也做了改變,允許它只讀取部分mapper中某一個(gè)partition的數(shù)據(jù)。在這樣的處理中,B表的partition 0會(huì)被讀取N次,雖然這增加了一定的額外代價(jià),但是通過(guò)N個(gè)任務(wù)處理傾斜數(shù)據(jù)帶來(lái)的收益仍然大于這樣的代價(jià)。如果B表中partition 0也發(fā)生傾斜,對(duì)于inner join來(lái)說(shuō)我們也可以將B表的partition 0分成若干塊,分別與A表的partition 0進(jìn)行join,最終union起來(lái)。但對(duì)于其它的join類(lèi)型例如Left Semi Join我們暫時(shí)不支持將B表的partition 0拆分。

自適應(yīng)執(zhí)行和Spark SQL在100TB上的性能比較

我們使用99臺(tái)機(jī)器搭建了一個(gè)集群,使用Spark2.2在TPC-DS 100TB的數(shù)據(jù)集進(jìn)行了實(shí)驗(yàn),比較原版Spark和自適應(yīng)執(zhí)行的性能。以下是集群的詳細(xì)信息:

如何進(jìn)行Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐

如何進(jìn)行Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐

圖7

實(shí)驗(yàn)結(jié)果顯示,在自適應(yīng)執(zhí)行模式下,103條SQL中有92條都得到了明顯的性能提升,其中47條SQL的性能提升超過(guò)10%,最大的性能提升達(dá)到了3.8倍,并且沒(méi)有出現(xiàn)性能下降的情況。另外在原版Spark中,有5條SQL因?yàn)镺OM等原因無(wú)法順利運(yùn)行,在自適應(yīng)模式下我們也對(duì)這些問(wèn)題做了優(yōu)化,使得103條SQL在TPC-DS 100TB數(shù)據(jù)集上全部成功運(yùn)行。以下是具體的性能提升比例和性能提升最明顯的幾條SQL。

如何進(jìn)行Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐

圖8

如何進(jìn)行Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐

圖9

通過(guò)仔細(xì)分析了這些性能提升的SQL,我們可以看到自適應(yīng)執(zhí)行帶來(lái)的好處。首先是自動(dòng)設(shè)置reducer個(gè)數(shù),原版Spark使用10976作為shuffle partition數(shù),在自適應(yīng)執(zhí)行時(shí),以下SQL的reducer個(gè)數(shù)自動(dòng)調(diào)整為1064和1079,可以明顯看到執(zhí)行時(shí)間上也提升了很多。這正是因?yàn)闇p少了調(diào)度的負(fù)擔(dān)和任務(wù)啟動(dòng)的時(shí)間,以及減少了磁盤(pán)IO請(qǐng)求。

原版Spark:

如何進(jìn)行Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐

圖10

自適應(yīng)執(zhí)行:

如何進(jìn)行Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐

圖11

在運(yùn)行時(shí)動(dòng)態(tài)調(diào)整執(zhí)行計(jì)劃,將SortMergeJoin轉(zhuǎn)化成BroadcastHashJoin在某些SQL中也帶來(lái)了很大的提升。例如在以下的例子中,原本使用SortMergeJoin因?yàn)閿?shù)據(jù)傾斜等問(wèn)題花費(fèi)了2.5分鐘。在自適應(yīng)執(zhí)行時(shí),因?yàn)槠渲幸粡埍淼拇笮≈挥?.5k所以在運(yùn)行時(shí)轉(zhuǎn)化成了BroadcastHashJoin,執(zhí)行時(shí)間縮短為10秒。


原版Spark:

如何進(jìn)行Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐

圖12

自適應(yīng)執(zhí)行:

如何進(jìn)行Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐

圖13

100 TB的挑戰(zhàn)及優(yōu)化

成功運(yùn)行TPC-DS 100 TB數(shù)據(jù)集中的所有SQL,對(duì)于Apache Spark來(lái)說(shuō)也是一大挑戰(zhàn)。雖然SparkSQL官方表示支持TPC-DS所有的SQL,但這是基于小數(shù)據(jù)集。在100TB這個(gè)量級(jí)上,Spark暴露出了一些問(wèn)題導(dǎo)致有些SQL執(zhí)行效率不高,甚至無(wú)法順利執(zhí)行。在做實(shí)驗(yàn)的過(guò)程中,我們?cè)谧赃m應(yīng)執(zhí)行框架的基礎(chǔ)上,對(duì)Spark也做了其它的優(yōu)化改進(jìn),來(lái)確保所有SQL在100TB數(shù)據(jù)集上可以成功運(yùn)行。以下是一些典型的問(wèn)題。

統(tǒng)計(jì)map端輸出數(shù)據(jù)時(shí)driver單點(diǎn)瓶頸的優(yōu)化(SPARK-22537)

在每個(gè)map任務(wù)結(jié)束后,會(huì)有一個(gè)表示每個(gè)partition大小的數(shù)據(jù)結(jié)構(gòu)(即下面提到的CompressedMapStatus或HighlyCompressedMapStatus)返回給driver。而在自適應(yīng)執(zhí)行中,當(dāng)一次shuffle的map stage結(jié)束后,driver會(huì)聚合每個(gè)mapper給出的partition大小信息,得到在各個(gè)partition上所有mapper輸出的數(shù)據(jù)總大小。該統(tǒng)計(jì)由單線(xiàn)程完成,如果mapper的數(shù)量是M,shuffle partition的數(shù)量為S,那么統(tǒng)計(jì)的時(shí)間復(fù)雜度在O(M x S) ~ O (M x S x log(M x S)) 之間,當(dāng)CompressedMapStatus被使用時(shí),復(fù)雜度為這個(gè)區(qū)間的下限,當(dāng)HighlyCompressedMapStatus被使用時(shí),空間有所節(jié)省,時(shí)間會(huì)更長(zhǎng),在幾乎所有的partition數(shù)據(jù)都為空時(shí),復(fù)雜度會(huì)接近該區(qū)間的上限。

在M x S增大時(shí),我們會(huì)遇到driver上的單點(diǎn)瓶頸,一個(gè)明顯的表現(xiàn)是UI上map stage和reduce stage之間的停頓。為了解決這個(gè)單點(diǎn)瓶頸,我們將任務(wù)盡量均勻地劃分給多個(gè)線(xiàn)程,線(xiàn)程之間不相交地為scala Array中的不同元素賦聚合值。

在這項(xiàng)優(yōu)化中,新的spark.shuffle.mapOutput.parallelAggregationThreshold(簡(jiǎn)稱(chēng)threshold)被引入,用于配置使用多線(xiàn)程聚合的閾值,聚合的并行度由JVM中可用core數(shù)和M * S / threshold + 1中的小值決定。

Shuffle讀取連續(xù)partition時(shí)的優(yōu)化 (SPARK-9853)

在自適應(yīng)執(zhí)行的模式下,一個(gè)reducer可能會(huì)從一個(gè)mapoutput文件中讀取諾干個(gè)連續(xù)的數(shù)據(jù)塊。目前的實(shí)現(xiàn)中,它需要拆分成許多獨(dú)立的getBlockData調(diào)用,每次調(diào)用分別從硬盤(pán)讀取一小塊數(shù)據(jù),這樣就需要很多的磁盤(pán)IO。我們對(duì)這樣的場(chǎng)景做了優(yōu)化,使得Spark可以一次性地把這些連續(xù)數(shù)據(jù)塊都讀上來(lái),這樣就大大減少了磁盤(pán)的IO。在小的基準(zhǔn)測(cè)試程序中,我們發(fā)現(xiàn)shuffle read的性能可以提升3倍。

BroadcastHashJoin中避免不必要的partition讀的優(yōu)化

自適應(yīng)執(zhí)行可以為現(xiàn)有的operator提供更多優(yōu)化的可能。在SortMergeJoin中有一個(gè)基本的設(shè)計(jì):每個(gè)reducetask會(huì)先讀取左表中的記錄,如果左表的 partition為空,則右表中的數(shù)據(jù)我們無(wú)需關(guān)注(對(duì)于非anti join的情況),這樣的設(shè)計(jì)在左表有一些partition為空時(shí)可以節(jié)省不必要的右表讀取,在SortMergeJoin中這樣的實(shí)現(xiàn)很自然。

BroadcastHashJoin中不存在按照join key分區(qū)的過(guò)程,所以缺失了這項(xiàng)優(yōu)化。然而在自適應(yīng)執(zhí)行的一些情況中,利用stage間的精確統(tǒng)計(jì)信息,我們可以找回這項(xiàng)優(yōu)化:如果SortMergeJoin在運(yùn)行時(shí)被轉(zhuǎn)換成了BroadcastHashJoin,且我們能得到各個(gè)partition key對(duì)應(yīng)partition的精確大小,則新轉(zhuǎn)換成的BroadcastHashJoin將被告知:無(wú)需去讀那些小表中為空的partition,因?yàn)椴粫?huì)join出任何結(jié)果。

Baidu真實(shí)產(chǎn)品線(xiàn)試用情況

我們將自適應(yīng)執(zhí)行優(yōu)化應(yīng)用在Baidu內(nèi)部基于Spark SQL的即席查詢(xún)服務(wù)BaiduBig SQL之上,做了進(jìn)一步的落地驗(yàn)證,通過(guò)選取單日全天真實(shí)用戶(hù)查詢(xún),按照原有執(zhí)行順序回放重跑和分析,得到如下幾點(diǎn)結(jié)論:

1. 對(duì)于秒級(jí)的簡(jiǎn)單查詢(xún),自適應(yīng)版本的性能提升并不明顯,這主要是因?yàn)樗鼈兊钠款i和主要耗時(shí)集中在了IO上面,而這不是自適應(yīng)執(zhí)行的優(yōu)化點(diǎn)。

2. 按照查詢(xún)復(fù)雜度維度考量測(cè)試結(jié)果發(fā)現(xiàn):查詢(xún)中迭代次數(shù)越多,多表join場(chǎng)景越復(fù)雜的情況下自適應(yīng)執(zhí)行效果越好。我們簡(jiǎn)單按照group by, sort, join, 子查詢(xún)等操作個(gè)數(shù)來(lái)將查詢(xún)分類(lèi),如上關(guān)鍵詞大于3的查詢(xún)有明顯的性能提升,優(yōu)化比從50%~200%不等,主要優(yōu)化點(diǎn)來(lái)源于shuffle的動(dòng)態(tài)并發(fā)數(shù)調(diào)整及join優(yōu)化。

3. 從業(yè)務(wù)使用角度來(lái)分析,前文所述SortMergeJoin轉(zhuǎn)BroadcastHashJoin的優(yōu)化在Big SQL場(chǎng)景中命中了多種典型的業(yè)務(wù)SQL模板,試考慮如下計(jì)算需求:用戶(hù)期望從兩張不同維度的計(jì)費(fèi)信息中撈取感興趣的user列表在兩個(gè)維度的整體計(jì)費(fèi)。收入信息原表大小在百T級(jí)別,用戶(hù)列表只包含對(duì)應(yīng)用戶(hù)的元信息,大小在10M以?xún)?nèi)。兩張計(jì)費(fèi)信息表字段基本一致,所以我們將兩張表與用戶(hù)列表做inner join后union做進(jìn)一步分析,SQL表達(dá)如下:

select t.c1, t.id, t.c2, t.c3, t.c4,  sum(t.num1), sum(t.num2), sum(t.num3) from

(

select  c1, t1.id as id, c2, c3, c4, sum(num1s) as num1, sum(num2) as num2, sum(num3)  as num3 from basedata.shitu_a t1 INNER JOIN basedata.user_82_1512023432000 t2  ON (t1.id = t2.id)  where  (event_day=20171107)  and flag !=  'true'  group by c1, t1.id, c2, c3, c4

union  all

select  c1, t1.id as id, c2, c3, c4, sum(num1s) as num1, sum(num2) as num2, sum(num3)  as num3 from basedata.shitu_b t1 INNER JOIN basedata.user_82_1512023432000 t2  ON (t1.id = t2.id)  where  (event_day=20171107)  and flag !=  'true'  group by c1, t1.id, c2, c3, c4

) t group by t.c1, t.id, t.c2, t.c3, c4

 

對(duì)應(yīng)的原版Spark執(zhí)行計(jì)劃如下:

如何進(jìn)行Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐

圖14

針對(duì)于此類(lèi)用戶(hù)場(chǎng)景,可以全部命中自適應(yīng)執(zhí)行的join優(yōu)化邏輯,執(zhí)行過(guò)程中多次SortMergeJoin轉(zhuǎn)為BroadcastHashJoin,減少了中間內(nèi)存消耗及多輪sort,得到了近200%的性能提升。

結(jié)合上述3點(diǎn),下一步自適應(yīng)執(zhí)行在Baidu內(nèi)部的優(yōu)化落地工作將進(jìn)一步集中在大數(shù)據(jù)量、復(fù)雜查詢(xún)的例行批量作業(yè)之上,并考慮與用戶(hù)查詢(xún)復(fù)雜度關(guān)聯(lián)進(jìn)行動(dòng)態(tài)的開(kāi)關(guān)控制。對(duì)于數(shù)千臺(tái)的大規(guī)模集群上運(yùn)行的復(fù)雜查詢(xún),自適應(yīng)執(zhí)行可以動(dòng)態(tài)調(diào)整計(jì)算過(guò)程中的并行度,可以幫助大幅提升集群的資源利用率。另外,自適應(yīng)執(zhí)行可以獲取到多輪stage之間更完整的統(tǒng)計(jì)信息,下一步我們也考慮將對(duì)應(yīng)數(shù)據(jù)及Strategy接口開(kāi)放給Baidu Spark平臺(tái)上層用戶(hù),針對(duì)特殊作業(yè)進(jìn)行進(jìn)一步的定制化Strategy策略編寫(xiě)。

以上就是如何進(jìn)行Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見(jiàn)到或用到的。希望你能通過(guò)這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI