您好,登錄后才能下訂單哦!
Spark2.x中如何進(jìn)行Shuffle相關(guān)參數(shù)優(yōu)化,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
一、概述
這里也結(jié)合網(wǎng)絡(luò)上的一些調(diào)優(yōu)建議,把整個Shuffle過程常用的優(yōu)化參數(shù)進(jìn)行了整理。注意這里是基于Spark2.x版本的優(yōu)化。
二、相關(guān)參數(shù)及優(yōu)化建議
1.spark.shuffle.file.buffer
默認(rèn)值:
32KB
參數(shù)說明:
該參數(shù)用于設(shè)置shuffle write task的BufferedOutputStream的buffer緩沖大小。將數(shù)據(jù)寫到磁盤文件之前,會先寫入buffer緩沖區(qū)中,待緩沖區(qū)寫滿之后,才會溢寫到磁盤。
調(diào)優(yōu)建議:
如果作業(yè)可用的內(nèi)存資源較為充足的話,可以適當(dāng)增加這個參數(shù)的大小(比如64KB),從而減少shuffle write過程中溢寫磁盤文件的次數(shù),也就可以減少磁盤IO次數(shù),進(jìn)而提升性能。在實踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù),性能會有1%~5%的提升。
2.spark.reducer.maxSizeInFlight
默認(rèn)值:
48MB
參數(shù)說明:
該參數(shù)用于設(shè)置shuffle read task的buffer緩沖大小,而這個buffer緩沖決定了每次能夠拉取多少數(shù)據(jù)。
調(diào)優(yōu)建議:
如果作業(yè)可用的內(nèi)存資源較為充足的話,可以適當(dāng)增加這個參數(shù)的大小(比如96MB),從而減少拉取數(shù)據(jù)的次數(shù),也就可以減少網(wǎng)絡(luò)傳輸?shù)拇螖?shù),進(jìn)而提升性能。在實踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù),性能會有1%~5%的提升。
常見問題:shuffle read task階段出現(xiàn)OOM
shuffle read task去拉取shuffle writer task輸出的數(shù)據(jù),它是一邊拉取一邊進(jìn)行聚合,shuffle read task有一塊聚合內(nèi)存,大小為executor memory * spark.shuffle.memoryFraction。
解決辦法:
a.增加reduce 聚合的內(nèi)存的比例,設(shè)置spark.shuffle.memoryFraction 0.3
b.增加executor memory的大小 --executor-memory 4G
c.減少reduce task每次拉取的數(shù)據(jù)量 設(shè)置spark.reducer.maxSizeInFlight 24MB
3.spark.shuffle.io.maxRetyies
默認(rèn)值:
3次
參數(shù)說明:
shuffle read task訪問shuffle write task的重試次數(shù),如果超過該次數(shù),read端的task就會被kill掉。
調(diào)優(yōu)建議:
調(diào)大該數(shù)值,提高重試次數(shù),防止write端在進(jìn)行full gc或其他操作read被kill影響整個程序的正常執(zhí)行
4.spark.shuffle.io.retryWait
默認(rèn)值:
5s
參數(shù)說明:
shuffle read task訪問shuffle write task數(shù)據(jù)的等待時間間隔
調(diào)優(yōu)建議:
調(diào)大改參數(shù),效果同參數(shù)3
5.spark.shuffle.memoryFraction
默認(rèn)值:
0.2
參數(shù)說明:
該參數(shù)代表了Executor內(nèi)存中,分配給shuffle read task進(jìn)行聚合操作的內(nèi)存比例,默認(rèn)是20%。
調(diào)優(yōu)建議:
如果聚合內(nèi)存充足,而且很少使用持久化操作建議調(diào)高這個比例,給shuffle read的聚合操作更多內(nèi)存,以避免由于內(nèi)存不足導(dǎo)致聚合過程中頻繁讀寫磁盤。在實踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù)可以將性能提升10%左右。
6.spark.shuffle.sort.bypassMergeThreshold
默認(rèn)值:
200
參數(shù)說明:
當(dāng)ShuffleManager為SortShuffleManager時,如果shuffle read task的數(shù)量小于這個閾值(默認(rèn)是200),則shuffle write過程中不會進(jìn)行排序操作,而是直接按照未經(jīng)優(yōu)化的HashShuffleManager的方式去寫數(shù)據(jù),但是最后會將每個task產(chǎn)生的所有臨時磁盤文件都合并成一個文件,并會創(chuàng)建單獨(dú)的索引文件。
調(diào)優(yōu)建議:
當(dāng)你使用SortShuffleManager時,如果的確不需要排序操作,那么建議將這個參數(shù)調(diào)大一些,大于shuffle read task的數(shù)量。那么此時就會自動啟用bypass機(jī)制,map-side就不會進(jìn)行排序了,減少了排序的性能開銷。但是這種方式下,依然會產(chǎn)生大量的磁盤文件,因此shuffle write性能有待提高
7.spark.shuffle.consolidateFiles(spark2.x不需要,只spark1.x需要)
默認(rèn)值:
false
參數(shù)說明:
如果使用HashShuffleManager,該參數(shù)有效。如果設(shè)置為true,那么就會開啟consolidate機(jī)制,會大幅度合并shuffle write的輸出文件,對于shuffle read task數(shù)量特別多的情況下,這種方法可以極大地減少磁盤IO開銷,提升性能。
調(diào)優(yōu)建議:
如果的確不需要SortShuffleManager的排序機(jī)制,那么除了使用bypass機(jī)制,還可以嘗試將spark.shffle.manager參數(shù)手動指定為hash,使用HashShuffleManager,同時開啟consolidate機(jī)制。在實踐中嘗試過,發(fā)現(xiàn)其性能比開啟了bypass機(jī)制的SortShuffleManager要高出10%~30%。
8.spark.shuffle.compress和 spark.shuffle.spill.compress
默認(rèn)值
這兩個參數(shù)的默認(rèn)配置都是true。
參數(shù)說明:
spark.shuffle.compress和spark.shuffle.spill.compress都是用來設(shè)置Shuffle過程中是否對Shuffle數(shù)據(jù)進(jìn)行壓縮;其中前者針對最終寫入本地文件系統(tǒng)的輸出文件,后者針對在處理過程需要spill到外部存儲的中間數(shù)據(jù),后者針對最終的shuffle輸出文件。
調(diào)優(yōu)建議:
對于參數(shù)spark.shuffle.compress,如果下游的Task通過網(wǎng)絡(luò)獲取上游Shuffle Map Task的結(jié)果的網(wǎng)絡(luò)IO成為瓶頸,那么就需要考慮將它設(shè)置為true:通過壓縮數(shù)據(jù)來減少網(wǎng)絡(luò)IO。由于上游Shuffle Map Task和下游的Task現(xiàn)階段是不會并行處理的,即上游Shuffle Map Task處理完成,然后下游的Task才會開始執(zhí)行。因此如果需要壓縮的時間消耗就是Shuffle MapTask壓縮數(shù)據(jù)的時間 + 網(wǎng)絡(luò)傳輸?shù)臅r間 + 下游Task解壓的時間;而不需要壓縮的時間消耗僅僅是網(wǎng)絡(luò)傳輸?shù)臅r間。因此需要評估壓縮解壓時間帶來的時間消耗和因為數(shù)據(jù)壓縮帶來的時間節(jié)省。如果網(wǎng)絡(luò)成為瓶頸,比如集群普遍使用的是千兆網(wǎng)絡(luò),那么可能將這個選項設(shè)置為true是合理的;如果計算是CPU密集型的,那么可能將這個選項設(shè)置為false才更好。
對于參數(shù)spark.shuffle.spill.compress,如果設(shè)置為true,代表處理的中間結(jié)果在spill到本地硬盤時都會進(jìn)行壓縮,在將中間結(jié)果取回進(jìn)行merge的時候,要進(jìn)行解壓。因此要綜合考慮CPU由于引入壓縮解壓的消耗時間和Disk IO因為壓縮帶來的節(jié)省時間的比較。在Disk IO成為瓶頸的場景下,這個被設(shè)置為true可能比較合適;如果本地硬盤是SSD,那么這個設(shè)置為false可能比較合適。
9.spark.shuffle.service.enabled
默認(rèn)值
false
參數(shù)說明:
是否啟用External shuffle Service服務(wù),Spark系統(tǒng)在運(yùn)行含shuffle過程的應(yīng)用時,Executor進(jìn)程除了運(yùn)行task,還要負(fù)責(zé)寫shuffle數(shù)據(jù),給其他Executor提供shuffle數(shù)據(jù)。當(dāng)Executor進(jìn)程任務(wù)過重,導(dǎo)致GC而不能為其他Executor提供shuffle數(shù)據(jù)時,會影響任務(wù)運(yùn)行。External shuffle Service是長期存在于NodeManager進(jìn)程中的一個輔助服務(wù)。通過該服務(wù)來抓取shuffle數(shù)據(jù),減少了Executor的壓力,在Executor GC的時候也不會影響其他Executor的任務(wù)運(yùn)行
優(yōu)化建議:
啟用外部shuffle服務(wù),這個服務(wù)會安全地保存shuffle過程中,executor寫的磁盤文件,因此executor即使掛掉也不要緊,必須配合spark.dynamicAllocation.enabled屬性設(shè)置為true,才能生效,而且外部shuffle服務(wù)必須進(jìn)行安裝和啟動,才能啟用這個屬性。
看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。