溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

spark shuffle調(diào)優(yōu)的方法是什么

發(fā)布時間:2021-12-16 14:58:42 來源:億速云 閱讀:163 作者:iii 欄目:云計算

這篇文章主要介紹“spark shuffle調(diào)優(yōu)的方法是什么”,在日常操作中,相信很多人在spark shuffle調(diào)優(yōu)的方法是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”spark shuffle調(diào)優(yōu)的方法是什么”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

什么情況下會發(fā)生shuffle,然后shuffle的原理是什么?

  • 在spark中,主要是以下幾個算子:groupByKey、reduceByKey、countByKey、join,等等。

  • groupByKey,要把分布在集群各個節(jié)點(diǎn)上的數(shù)據(jù)中的同一個key,對應(yīng)的values,都給集中到一塊兒,集中到集群中同一個節(jié)點(diǎn)上,更嚴(yán)密一點(diǎn)說,就是集中到一個節(jié)點(diǎn)的一個executor的一個task中。然后呢,集中一個key對應(yīng)的values之后,才能交給我們來進(jìn)行處理,<key, Iterable<value>>;reduceByKey,算子函數(shù)去對values集合進(jìn)行reduce操作,最后變成一個value;countByKey,需要在一個task中,獲取到一個key對應(yīng)的所有的value,然后進(jìn)行計數(shù),統(tǒng)計總共有多少個value;join,RDD<key, value>,RDD<key, value>,只要是兩個RDD中,key相同對應(yīng)的2個value,都能到一個節(jié)點(diǎn)的executor的task中,給我們進(jìn)行處理。

  • 問題在于,同一個單詞,比如說(hello, 1),可能散落在不同的節(jié)點(diǎn)上;對每個單詞進(jìn)行累加計數(shù),就必須讓所有單詞都跑到同一個節(jié)點(diǎn)的一個task中,給一個task來進(jìn)行處理;

  • 每一個shuffle的前半部分stage的task,每個task都會創(chuàng)建下一個stage的task數(shù)量相同的文件,比如下一個stage會有100個task,那么當(dāng)前stage每個task都會創(chuàng)建100份文件;會將同一個key對應(yīng)的values,一定是寫入同一個文件中的;

  • shuffle的后半部分stage的task,每個task都會從各個節(jié)點(diǎn)上的task寫的屬于自己的那一份文件中,拉取key, value對;然后task會有一個內(nèi)存緩沖區(qū),然后會用HashMap,進(jìn)行key, values的匯聚;(key ,values);

  • task會用我們自己定義的聚合函數(shù),比如reduceByKey(_+_),把所有values進(jìn)行一對一的累加,聚合出來最終的值。就完成了shuffle;

  • shuffle,一定是分為兩個stage來完成的。因為這其實是個逆向的過程,不是stage決定shuffle,是shuffle決定stage。

  • reduceByKey(_+_),在某個action觸發(fā)job的時候,DAGScheduler,會負(fù)責(zé)劃分job為多個stage。劃分的依據(jù),就是,如果發(fā)現(xiàn)有會觸發(fā)shuffle操作的算子,比如reduceByKey,就將這個操作的前半部分,以及之前所有的RDD和transformation操作,劃分為一個stage;shuffle操作的后半部分,以及后面的,直到action為止的RDD和transformation操作,劃分為另外一個stage;

  • shuffle前半部分的task在寫入數(shù)據(jù)到磁盤文件之前,都會先寫入一個一個的內(nèi)存緩沖,內(nèi)存緩沖滿溢之后,再spill溢寫到磁盤文件中。

如果不合并map端輸出文件的話,會怎么樣?

  1. 減少網(wǎng)絡(luò)傳輸、disk io、減少reduce端內(nèi)存緩沖

    • 實際生產(chǎn)環(huán)境的條件:

    • 100個節(jié)點(diǎn)(每個節(jié)點(diǎn)一個executor):100個executor,每個executor:2個cpu core,總共1000個task:每個executor平均10個task,上游1000個task,下游1000個task,每個節(jié)點(diǎn),10個task,每個節(jié)點(diǎn)或者說每一個executor會輸出多少份map端文件?10 * 1000=1萬個文件(M*R)

    • 總共有多少份map端輸出文件?100 * 10000 = 100萬。

    • 問題來了:默認(rèn)的這種shuffle行為,對性能有什么樣的惡劣影響呢?

    • shuffle中的寫磁盤的操作,基本上就是shuffle中性能消耗最為嚴(yán)重的部分。

    • 通過上面的分析,一個普通的生產(chǎn)環(huán)境的spark job的一個shuffle環(huán)節(jié),會寫入磁盤100萬個文件。

    • 磁盤IO對性能和spark作業(yè)執(zhí)行速度的影響,是極其驚人和嚇人的。

    • 基本上,spark作業(yè)的性能,都消耗在shuffle中了,雖然不只是shuffle的map端輸出文件這一個部分,但是這里也是非常大的一個性能消耗點(diǎn)。

      new SparkConf().set("spark.shuffle.consolidateFiles", "true")


       

  2. 開啟shuffle map端輸出文件合并的機(jī)制;默認(rèn)情況下,是不開啟的,就是會發(fā)生如上所述的大量map端輸出文件的操作,嚴(yán)重影響性能。

  • 開啟了map端輸出文件的合并機(jī)制之后:

    • 第一個stage,同時就運(yùn)行cpu core個task,比如cpu core是2個,并行運(yùn)行2個task;

    • 每個task都創(chuàng)建下一個stage的task數(shù)量個文件;

    • 第一個stage,并行運(yùn)行的2個task執(zhí)行完以后,就會執(zhí)行另外兩個task;

    • 另外2個task不會再重新創(chuàng)建輸出文件;而是復(fù)用之前的task創(chuàng)建的map端輸出文件,將數(shù)據(jù)寫入上一批task的輸出文件中;

    • 第二個stage,task在拉取數(shù)據(jù)的時候,就不會去拉取上一個stage每一個task為自己創(chuàng)建的那份輸出文件了;


提醒一下(map端輸出文件合并):

  • 只有并行執(zhí)行的task會去創(chuàng)建新的輸出文件;

  • 下一批并行執(zhí)行的task,就會去復(fù)用之前已有的輸出文件;

  • 但是有一個例外,比如2個task并行在執(zhí)行,但是此時又啟動要執(zhí)行2個task(不是同一批次);

  • 那么這個時候的話,就無法去復(fù)用剛才的2個task創(chuàng)建的輸出文件了;

  • 而是還是只能去創(chuàng)建新的輸出文件。

要實現(xiàn)輸出文件的合并的效果,必須是一批task先執(zhí)行,然后下一批task再執(zhí)行,
才能復(fù)用之前的輸出文件;負(fù)責(zé)多批task同時起來執(zhí)行,還是做不到復(fù)用的。

開啟了map端輸出文件合并機(jī)制之后,生產(chǎn)環(huán)境上的例子,會有什么樣的變化?

實際生產(chǎn)環(huán)境的條件:
100個節(jié)點(diǎn)(每個節(jié)點(diǎn)一個executor):100個executor
每個executor:2個cpu core
總共1000個task:每個executor平均10個task
上游1000個task,下游1000個task


每個節(jié)點(diǎn),2個cpu core,有多少份輸出文件呢?2 * 1000 = 2000個(C*R)
總共100個節(jié)點(diǎn),總共創(chuàng)建多少份輸出文件呢?100 * 2000 = 20萬個文件

相比較開啟合并機(jī)制之前的情況,100萬個

map端輸出文件,在生產(chǎn)環(huán)境中,立減5倍!

合并map端輸出文件,對咱們的spark的性能有哪些方面的影響呢?

  1. map task寫入磁盤文件的IO,減少:100萬文件 -> 20萬文件

  2. 第二個stage,原本要拉取第一個stage的task數(shù)量份文件,1000個task,第二個stage的每個task,都要拉取1000份文件,走網(wǎng)絡(luò)傳輸;合并以后,100個節(jié)點(diǎn),每個節(jié)點(diǎn)2個cpu core,第二個stage的每個task,主要拉取1000 * 2 = 2000個文件即可;網(wǎng)絡(luò)傳輸?shù)男阅芟氖遣皇且泊蟠鬁p少分享一下,實際在生產(chǎn)環(huán)境中,使用了spark.shuffle.consolidateFiles機(jī)制以后,實際的性能調(diào)優(yōu)的效果:對于上述的這種生產(chǎn)環(huán)境的配置,性能的提升,還是相當(dāng)?shù)目陀^的。

spark作業(yè),5個小時 -> 2~3個小時。

大家不要小看這個map端輸出文件合并機(jī)制。實際上,在數(shù)據(jù)量比較大,你自己本身做了前面的性能調(diào)優(yōu),
executor上去->cpu core上去->并行度(task數(shù)量)上去,shuffle沒調(diào)優(yōu),shuffle就很糟糕了;
大量的map端輸出文件的產(chǎn)生。對性能有比較惡劣的影響。

這個時候,去開啟這個機(jī)制,可以很有效的提升性能。
spark.shuffle.manager hash M*R 個小文件
spark.shuffle.manager sort   C*R 個小文件  (默認(rèn)的shuffle管理機(jī)制

spark.shuffle.file.buffer,默認(rèn)32k
spark.shuffle.memoryFraction,0.2

        默認(rèn)情況下,shuffle的map task,輸出到磁盤文件的時候,統(tǒng)一都會先寫入每個task自己關(guān)聯(lián)的一個內(nèi)存緩沖區(qū)。這個緩沖區(qū)大小,默認(rèn)是32kb。每一次,當(dāng)內(nèi)存緩沖區(qū)滿溢之后,才會進(jìn)行spill操作,溢寫操作,溢寫到磁盤文件中去reduce端task,在拉取到數(shù)據(jù)之后,會用hashmap的數(shù)據(jù)格式,來對各個key對應(yīng)的values進(jìn)行匯聚。針對每個key對應(yīng)的values,執(zhí)行我們自定義的聚合函數(shù)的代碼,比如_ + _(把所有values累加起來)reduce task,在進(jìn)行匯聚、聚合等操作的時候,實際上,使用的就是自己對應(yīng)的executor的內(nèi)存,executor(jvm進(jìn)程,堆),默認(rèn)executor內(nèi)存中劃分給reduce task進(jìn)行聚合的比例,是0.2。問題來了,因為比例是0.2,所以,理論上,很有可能會出現(xiàn),拉取過來的數(shù)據(jù)很多,那么在內(nèi)存中,放不下;這個時候,默認(rèn)的行為,就是說,將在內(nèi)存放不下的數(shù)據(jù),都spill(溢寫)到磁盤文件中去。

原理說完之后,來看一下,默認(rèn)情況下,不調(diào)優(yōu),可能會出現(xiàn)什么樣的問題?

默認(rèn),map端內(nèi)存緩沖是每個task,32kb。
默認(rèn),reduce端聚合內(nèi)存比例,是0.2,也就是20%。

如果map端的task,處理的數(shù)據(jù)量比較大,但是呢,你的內(nèi)存緩沖大小是固定的。
可能會出現(xiàn)什么樣的情況?

每個task就處理320kb,32kb,總共會向磁盤溢寫320 / 32 = 10次。
每個task處理32000kb,32kb,總共會向磁盤溢寫32000 / 32 = 1000次。

在map task處理的數(shù)據(jù)量比較大的情況下,而你的task的內(nèi)存緩沖默認(rèn)是比較小的,32kb??赡軙斐啥啻蔚膍ap端往磁盤文件的spill溢寫操作,發(fā)生大量的磁盤IO,從而降低性能。

reduce端聚合內(nèi)存,占比。默認(rèn)是0.2。如果數(shù)據(jù)量比較大,reduce task拉取過來的數(shù)據(jù)很多,那么就會頻繁發(fā)生reduce端聚合內(nèi)存不夠用,頻繁發(fā)生spill操作,溢寫到磁盤上去。而且最要命的是,磁盤上溢寫的數(shù)據(jù)量越大,后面在進(jìn)行聚合操作的時候,很可能會多次讀取磁盤中的數(shù)據(jù),進(jìn)行聚合。

默認(rèn)不調(diào)優(yōu),在數(shù)據(jù)量比較大的情況下,可能頻繁地發(fā)生reduce端的磁盤文件的讀寫。

這兩個點(diǎn)之所以放在一起講,是因為他們倆是有關(guān)聯(lián)的。數(shù)據(jù)量變大,map端肯定會出點(diǎn)問題;
reduce端肯定也會出點(diǎn)問題;出的問題是一樣的,都是磁盤IO頻繁,變多,影響性能。

調(diào)優(yōu):

調(diào)節(jié)map task內(nèi)存緩沖:spark.shuffle.file.buffer,默認(rèn)32k(spark 1.3.x不是這個參數(shù),
后面還有一個后綴,kb;spark 1.5.x以后,變了,就是現(xiàn)在這個參數(shù))
調(diào)節(jié)reduce端聚合內(nèi)存占比:spark.shuffle.memoryFraction,0.2

在實際生產(chǎn)環(huán)境中,我們在什么時候來調(diào)節(jié)兩個參數(shù)?

看Spark UI,如果你的公司是決定采用standalone模式,那么很簡單,你的spark跑起來,會顯示一個Spark UI的地址,4040的端口,進(jìn)去看,依次點(diǎn)擊進(jìn)去,可以看到,你的每個stage的詳情,有哪些executor,有哪些task,每個task的shuffle write和shuffle read的量,shuffle的磁盤和內(nèi)存,讀寫的數(shù)據(jù)量;如果是用的yarn模式來提交,課程最前面,從yarn的界面進(jìn)去,點(diǎn)擊對應(yīng)的application,進(jìn)入Spark UI,查看詳情。

如果發(fā)現(xiàn)shuffle 磁盤的write和read,很大,可以調(diào)節(jié)這兩個參數(shù)

調(diào)節(jié)上面說的那兩個參數(shù)。調(diào)節(jié)的時候的原則。spark.shuffle.file.buffer,每次擴(kuò)大一倍,然后看看效果,64,128;spark.shuffle.memoryFraction,每次提高0.1,看看效果。不能調(diào)節(jié)的太大,太大了以后過猶不及,因為內(nèi)存資源是有限的,你這里調(diào)節(jié)的太大了,其他環(huán)節(jié)的內(nèi)存使用就會有問題了。

調(diào)節(jié)了以后,效果?map task內(nèi)存緩沖變大了,減少spill到磁盤文件的次數(shù);reduce端聚合內(nèi)存變大了,
減少spill到磁盤的次數(shù),而且減少了后面聚合讀取磁盤文件的數(shù)量。
 

到此,關(guān)于“spark shuffle調(diào)優(yōu)的方法是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI