溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark性能優(yōu)化的方法是什么

發(fā)布時間:2021-12-16 15:10:29 來源:億速云 閱讀:117 作者:iii 欄目:云計算

本篇內(nèi)容介紹了“Spark性能優(yōu)化的方法是什么”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

#ShuffleManager發(fā)展概述
在Spark的源碼中,負責shuffle過程的執(zhí)行、計算和處理的組件主要就是ShuffleManager,也即shuffle管理器。而隨著Spark的版本的發(fā)展,ShuffleManager也在不斷迭代,變得越來越先進。

在Spark 1.2以前,默認的shuffle計算引擎是HashShuffleManager。該ShuffleManager而HashShuffleManager有著一個非常嚴重的弊端,就是會產(chǎn)生大量的中間磁盤文件,進而由大量的磁盤IO操作影響了性能。

因此在Spark 1.2以后的版本中,默認的ShuffleManager改成了SortShuffleManager。SortShuffleManager相較于HashShuffleManager來說,有了一定的改進。主要就在于,每個Task在進行shuffle操作時,雖然也會產(chǎn)生較多的臨時磁盤文件,但是最后會將所有的臨時文件合并(merge)成一個磁盤文件,因此每個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取自己的數(shù)據(jù)時,只要根據(jù)索引讀取每個磁盤文件中的部分數(shù)據(jù)即可。

下面我們詳細分析一下HashShuffleManager和SortShuffleManager的原理。

#HashShuffleManager運行原理

##未經(jīng)優(yōu)化的HashShuffleManager

下圖說明了未經(jīng)優(yōu)化的HashShuffleManager的原理。這里我們先明確一個假設(shè)前提:每個Executor只有1個CPU core,也就是說,無論這個Executor上分配多少個task線程,同一時間都只能執(zhí)行一個task線程。

我們先從shuffle write開始說起。shuffle write階段,主要就是在一個stage結(jié)束計算之后,為了下一個stage可以執(zhí)行shuffle類的算子(比如reduceByKey),而將每個task處理的數(shù)據(jù)按key進行“分類”。所謂“分類”,就是對相同的key執(zhí)行hash算法,從而將相同key都寫入同一個磁盤文件中,而每一個磁盤文件都只屬于下游stage的一個task。在將數(shù)據(jù)寫入磁盤之前,會先將數(shù)據(jù)寫入內(nèi)存緩沖中,當內(nèi)存緩沖填滿之后,才會溢寫到磁盤文件中去。

那么每個執(zhí)行shuffle write的task,要為下一個stage創(chuàng)建多少個磁盤文件呢?很簡單,下一個stage的task有多少個,當前stage的每個task就要創(chuàng)建多少份磁盤文件。比如下一個stage總共有100個task,那么當前stage的每個task都要創(chuàng)建100份磁盤文件。如果當前stage有50個task,總共有10個Executor,每個Executor執(zhí)行5個Task,那么每個Executor上總共就要創(chuàng)建500個磁盤文件,所有Executor上會創(chuàng)建5000個磁盤文件。由此可見,未經(jīng)優(yōu)化的shuffle write操作所產(chǎn)生的磁盤文件的數(shù)量是極其驚人的。

接著我們來說說shuffle read。shuffle read,通常就是一個stage剛開始時要做的事情。此時該stage的每一個task就需要將上一個stage的計算結(jié)果中的所有相同key,從各個節(jié)點上通過網(wǎng)絡都拉取到自己所在的節(jié)點上,然后進行key的聚合或連接等操作。由于shuffle write的過程中,task給下游stage的每個task都創(chuàng)建了一個磁盤文件,因此shuffle read的過程中,每個task只要從上游stage的所有task所在節(jié)點上,拉取屬于自己的那一個磁盤文件即可。

shuffle read的拉取過程是一邊拉取一邊進行聚合的。每個shuffle read task都會有一個自己的buffer緩沖,每次都只能拉取與buffer緩沖相同大小的數(shù)據(jù),然后通過內(nèi)存中的一個Map進行聚合等操作。聚合完一批數(shù)據(jù)后,再拉取下一批數(shù)據(jù),并放到buffer緩沖中進行聚合操作。以此類推,直到最后將所有數(shù)據(jù)到拉取完,并得到最終的結(jié)果。 Spark性能優(yōu)化的方法是什么

##優(yōu)化后的HashShuffleManager

下圖說明了優(yōu)化后的HashShuffleManager的原理。這里說的優(yōu)化,是指我們可以設(shè)置一個參數(shù),spark.shuffle.consolidateFiles。該參數(shù)默認值為false,將其設(shè)置為true即可開啟優(yōu)化機制。通常來說,如果我們使用HashShuffleManager,那么都建議開啟這個選項。

開啟consolidate機制之后,在shuffle write過程中,task就不是為下游stage的每個task創(chuàng)建一個磁盤文件了。此時會出現(xiàn)shuffleFileGroup的概念,每個shuffleFileGroup會對應一批磁盤文件,磁盤文件的數(shù)量與下游stage的task數(shù)量是相同的。一個Executor上有多少個CPU core,就可以并行執(zhí)行多少個task。而第一批并行執(zhí)行的每個task都會創(chuàng)建一個shuffleFileGroup,并將數(shù)據(jù)寫入對應的磁盤文件內(nèi)。

當Executor的CPU core執(zhí)行完一批task,接著執(zhí)行下一批task時,下一批task就會復用之前已有的shuffleFileGroup,包括其中的磁盤文件。也就是說,此時task會將數(shù)據(jù)寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。因此,consolidate機制允許不同的task復用同一批磁盤文件,這樣就可以有效將多個task的磁盤文件進行一定程度上的合并,從而大幅度減少磁盤文件的數(shù)量,進而提升shuffle write的性能。

假設(shè)第二個stage有100個task,第一個stage有50個task,總共還是有10個Executor,每個Executor執(zhí)行5個task。那么原本使用未經(jīng)優(yōu)化的HashShuffleManager時,每個Executor會產(chǎn)生500個磁盤文件,所有Executor會產(chǎn)生5000個磁盤文件的。但是此時經(jīng)過優(yōu)化之后,每個Executor創(chuàng)建的磁盤文件的數(shù)量的計算公式為:CPU core的數(shù)量 * 下一個stage的task數(shù)量。也就是說,每個Executor此時只會創(chuàng)建100個磁盤文件,所有Executor只會創(chuàng)建1000個磁盤文件。 Spark性能優(yōu)化的方法是什么

#SortShuffleManager運行原理

SortShuffleManager的運行機制主要分成兩種,一種是普通運行機制,另一種是bypass運行機制。當shuffle read task的數(shù)量小于等于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值時(默認為200),就會啟用bypass機制。

##普通運行機制

下圖說明了普通的SortShuffleManager的原理。在該模式下,數(shù)據(jù)會先寫入一個內(nèi)存數(shù)據(jù)結(jié)構(gòu)中,此時根據(jù)不同的shuffle算子,可能選用不同的數(shù)據(jù)結(jié)構(gòu)。如果是reduceByKey這種聚合類的shuffle算子,那么會選用Map數(shù)據(jù)結(jié)構(gòu),一邊通過Map進行聚合,一邊寫入內(nèi)存;如果是join這種普通的shuffle算子,那么會選用Array數(shù)據(jù)結(jié)構(gòu),直接寫入內(nèi)存。接著,每寫一條數(shù)據(jù)進入內(nèi)存數(shù)據(jù)結(jié)構(gòu)之后,就會判斷一下,是否達到了某個臨界閾值。如果達到臨界閾值的話,那么就會嘗試將內(nèi)存數(shù)據(jù)結(jié)構(gòu)中的數(shù)據(jù)溢寫到磁盤,然后清空內(nèi)存數(shù)據(jù)結(jié)構(gòu)。

在溢寫到磁盤文件之前,會先根據(jù)key對內(nèi)存數(shù)據(jù)結(jié)構(gòu)中已有的數(shù)據(jù)進行排序。排序過后,會分批將數(shù)據(jù)寫入磁盤文件。默認的batch數(shù)量是10000條,也就是說,排序好的數(shù)據(jù),會以每批1萬條數(shù)據(jù)的形式分批寫入磁盤文件。寫入磁盤文件是通過Java的BufferedOutputStream實現(xiàn)的。BufferedOutputStream是Java的緩沖輸出流,首先會將數(shù)據(jù)緩沖在內(nèi)存中,當內(nèi)存緩沖滿溢之后再一次寫入磁盤文件中,這樣可以減少磁盤IO次數(shù),提升性能。

一個task將所有數(shù)據(jù)寫入內(nèi)存數(shù)據(jù)結(jié)構(gòu)的過程中,會發(fā)生多次磁盤溢寫操作,也就會產(chǎn)生多個臨時文件。最后會將之前所有的臨時磁盤文件都進行合并,這就是merge過程,此時會將之前所有臨時磁盤文件中的數(shù)據(jù)讀取出來,然后依次寫入最終的磁盤文件之中。此外,由于一個task就只對應一個磁盤文件,也就意味著該task為下游stage的task準備的數(shù)據(jù)都在這一個文件中,因此還會單獨寫一份索引文件,其中標識了下游各個task的數(shù)據(jù)在文件中的start offset與end offset。

SortShuffleManager由于有一個磁盤文件merge的過程,因此大大減少了文件數(shù)量。比如第一個stage有50個task,總共有10個Executor,每個Executor執(zhí)行5個task,而第二個stage有100個task。由于每個task最終只有一個磁盤文件,因此此時每個Executor上只有5個磁盤文件,所有Executor只有50個磁盤文件。
Spark性能優(yōu)化的方法是什么

##bypass運行機制 下圖說明了bypass SortShuffleManager的原理。bypass運行機制的觸發(fā)條件如下:

  • shuffle map task數(shù)量小于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值。

  • 不是聚合類的shuffle算子(比如reduceByKey)。

此時task會為每個下游task都創(chuàng)建一個臨時磁盤文件,并將數(shù)據(jù)按key進行hash然后根據(jù)key的hash值,將key寫入對應的磁盤文件之中。當然,寫入磁盤文件時也是先寫入內(nèi)存緩沖,緩沖寫滿之后再溢寫到磁盤文件的。最后,同樣會將所有臨時磁盤文件都合并成一個磁盤文件,并創(chuàng)建一個單獨的索引文件。

該過程的磁盤寫機制其實跟未經(jīng)優(yōu)化的HashShuffleManager是一模一樣的,因為都要創(chuàng)建數(shù)量驚人的磁盤文件,只是在最后會做一個磁盤文件的合并而已。因此少量的最終磁盤文件,也讓該機制相對未經(jīng)優(yōu)化的HashShuffleManager來說,shuffle read的性能會更好。

而該機制與普通SortShuffleManager運行機制的不同在于:第一,磁盤寫機制不同;第二,不會進行排序。也就是說,啟用該機制的最大好處在于,shuffle write過程中,不需要進行數(shù)據(jù)的排序操作,也就節(jié)省掉了這部分的性能開銷。
Spark性能優(yōu)化的方法是什么

#shuffle相關(guān)參數(shù)調(diào)優(yōu)
以下是Shffule過程中的一些主要參數(shù),這里詳細講解了各個參數(shù)的功能、默認值以及基于實踐經(jīng)驗給出的調(diào)優(yōu)建議。
spark.shuffle.file.buffer

  • 默認值:32k

  • 參數(shù)說明:該參數(shù)用于設(shè)置shuffle write task的BufferedOutputStream的buffer緩沖大小。將數(shù)據(jù)寫到磁盤文件之前,會先寫入buffer緩沖中,待緩沖寫滿之后,才會溢寫到磁盤。

  • 調(diào)優(yōu)建議:如果作業(yè)可用的內(nèi)存資源較為充足的話,可以適當增加這個參數(shù)的大?。ū热?4k),從而減少shuffle write過程中溢寫磁盤文件的次數(shù),也就可以減少磁盤IO次數(shù),進而提升性能。在實踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù),性能會有1%~5%的提升。

spark.reducer.maxSizeInFlight

  • 默認值:48m

  • 參數(shù)說明:該參數(shù)用于設(shè)置shuffle read task的buffer緩沖大小,而這個buffer緩沖決定了每次能夠拉取多少數(shù)據(jù)。

  • 調(diào)優(yōu)建議:如果作業(yè)可用的內(nèi)存資源較為充足的話,可以適當增加這個參數(shù)的大?。ū热?6m),從而減少拉取數(shù)據(jù)的次數(shù),也就可以減少網(wǎng)絡傳輸?shù)拇螖?shù),進而提升性能。在實踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù),性能會有1%~5%的提升。

spark.shuffle.io.maxRetries

  • 默認值:3

  • 參數(shù)說明:shuffle read task從shuffle write task所在節(jié)點拉取屬于自己的數(shù)據(jù)時,如果因為網(wǎng)絡異常導致拉取失敗,是會自動進行重試的。該參數(shù)就代表了可以重試的最大次數(shù)。如果在指定次數(shù)之內(nèi)拉取還是沒有成功,就可能會導致作業(yè)執(zhí)行失敗。

  • 調(diào)優(yōu)建議:對于那些包含了特別耗時的shuffle操作的作業(yè),建議增加重試最大次數(shù)(比如60次),以避免由于JVM的full gc或者網(wǎng)絡不穩(wěn)定等因素導致的數(shù)據(jù)拉取失敗。在實踐中發(fā)現(xiàn),對于針對超大數(shù)據(jù)量(數(shù)十億~上百億)的shuffle過程,調(diào)節(jié)該參數(shù)可以大幅度提升穩(wěn)定性。

spark.shuffle.io.retryWait

  • 默認值:5s

  • 參數(shù)說明:具體解釋同上,該參數(shù)代表了每次重試拉取數(shù)據(jù)的等待間隔,默認是5s。

  • 調(diào)優(yōu)建議:建議加大間隔時長(比如60s),以增加shuffle操作的穩(wěn)定性。

spark.shuffle.memoryFraction

  • 默認值:0.2

  • 參數(shù)說明:該參數(shù)代表了Executor內(nèi)存中,分配給shuffle read task進行聚合操作的內(nèi)存比例,默認是20%。

  • 調(diào)優(yōu)建議:在資源參數(shù)調(diào)優(yōu)中講解過這個參數(shù)。如果內(nèi)存充足,而且很少使用持久化操作,建議調(diào)高這個比例,給shuffle read的聚合操作更多內(nèi)存,以避免由于內(nèi)存不足導致聚合過程中頻繁讀寫磁盤。在實踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù)可以將性能提升10%左右。

spark.shuffle.manager

  • 默認值:sort

  • 參數(shù)說明:該參數(shù)用于設(shè)置ShuffleManager的類型。Spark 1.5以后,有三個可選項:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默認選項,但是Spark 1.2以及之后的版本默認都是SortShuffleManager了。tungsten-sort與sort類似,但是使用了tungsten計劃中的堆外內(nèi)存管理機制,內(nèi)存使用效率更高。

  • 調(diào)優(yōu)建議:由于SortShuffleManager默認會對數(shù)據(jù)進行排序,因此如果你的業(yè)務邏輯中需要該排序機制的話,則使用默認的SortShuffleManager就可以;而如果你的業(yè)務邏輯不需要對數(shù)據(jù)進行排序,那么建議參考后面的幾個參數(shù)調(diào)優(yōu),通過bypass機制或優(yōu)化的HashShuffleManager來避免排序操作,同時提供較好的磁盤讀寫性能。這里要注意的是,tungsten-sort要慎用,因為之前發(fā)現(xiàn)了一些相應的bug。

spark.shuffle.sort.bypassMergeThreshold

  • 默認值:200

  • 參數(shù)說明:當ShuffleManager為SortShuffleManager時,如果shuffle read task的數(shù)量小于這個閾值(默認是200),則shuffle write過程中不會進行排序操作,而是直接按照未經(jīng)優(yōu)化的HashShuffleManager的方式去寫數(shù)據(jù),但是最后會將每個task產(chǎn)生的所有臨時磁盤文件都合并成一個文件,并會創(chuàng)建單獨的索引文件。

  • 調(diào)優(yōu)建議:當你使用SortShuffleManager時,如果的確不需要排序操作,那么建議將這個參數(shù)調(diào)大一些,大于shuffle read task的數(shù)量。那么此時就會自動啟用bypass機制,map-side就不會進行排序了,減少了排序的性能開銷。但是這種方式下,依然會產(chǎn)生大量的磁盤文件,因此shuffle write性能有待提高。

spark.shuffle.consolidateFiles

  • 默認值:false

  • 參數(shù)說明:如果使用HashShuffleManager,該參數(shù)有效。如果設(shè)置為true,那么就會開啟consolidate機制,會大幅度合并shuffle write的輸出文件,對于shuffle read task數(shù)量特別多的情況下,這種方法可以極大地減少磁盤IO開銷,提升性能。

  • 調(diào)優(yōu)建議:如果的確不需要SortShuffleManager的排序機制,那么除了使用bypass機制,還可以嘗試將spark.shffle.manager參數(shù)手動指定為hash,使用HashShuffleManager,同時開啟consolidate機制。在實踐中嘗試過,發(fā)現(xiàn)其性能比開啟了bypass機制的SortShuffleManager要高出10%~30%。

“Spark性能優(yōu)化的方法是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI