溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Spark性能優(yōu)化中的開發(fā)調(diào)優(yōu)是怎么樣的呢

發(fā)布時(shí)間:2021-12-17 11:28:07 來源:億速云 閱讀:97 作者:柒染 欄目:大數(shù)據(jù)

這篇文章給大家介紹Spark性能優(yōu)化中的開發(fā)調(diào)優(yōu)是怎么樣的呢,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

1、前言

在大數(shù)據(jù)計(jì)算領(lǐng)域,Spark已經(jīng)成為了越來越流行、越來越受歡迎的計(jì)算平臺之一。Spark的功能涵蓋了大數(shù)據(jù)領(lǐng)域的離線批處理、SQL類處理、流式/實(shí)時(shí)計(jì)算、機(jī)器學(xué)習(xí)、圖計(jì)算等各種不同類型的計(jì)算操作,應(yīng)用范圍與前景非常廣泛。在美團(tuán)•大眾點(diǎn)評,已經(jīng)有很多同學(xué)在各種項(xiàng)目中嘗試使用Spark。大多數(shù)同學(xué)(包括筆者在內(nèi)),最初開始嘗試使用Spark的原因很簡單,主要就是為了讓大數(shù)據(jù)計(jì)算作業(yè)的執(zhí)行速度更快、性能更高。

然而,通過Spark開發(fā)出高性能的大數(shù)據(jù)計(jì)算作業(yè),并不是那么簡單的。如果沒有對Spark作業(yè)進(jìn)行合理的調(diào)優(yōu),Spark作業(yè)的執(zhí)行速度可能會很慢,這樣就完全體現(xiàn)不出Spark作為一種快速大數(shù)據(jù)計(jì)算引擎的優(yōu)勢來。因此,想要用好Spark,就必須對其進(jìn)行合理的性能優(yōu)化。

Spark的性能調(diào)優(yōu)實(shí)際上是由很多部分組成的,不是調(diào)節(jié)幾個(gè)參數(shù)就可以立竿見影提升作業(yè)性能的。我們需要根據(jù)不同的業(yè)務(wù)場景以及數(shù)據(jù)情況,對Spark作業(yè)進(jìn)行綜合性的分析,然后進(jìn)行多個(gè)方面的調(diào)節(jié)和優(yōu)化,才能獲得***性能。

筆者根據(jù)之前的Spark作業(yè)開發(fā)經(jīng)驗(yàn)以及實(shí)踐積累,總結(jié)出了一套Spark作業(yè)的性能優(yōu)化方案。整套方案主要分為開發(fā)調(diào)優(yōu)、資源調(diào)優(yōu)、數(shù)據(jù)傾斜調(diào)優(yōu)、shuffle調(diào)優(yōu)幾個(gè)部分。開發(fā)調(diào)優(yōu)和資源調(diào)優(yōu)是所有Spark作業(yè)都需要注意和遵循的一些基本原則,是高性能Spark作業(yè)的基礎(chǔ);數(shù)據(jù)傾斜調(diào)優(yōu),主要講解了一套完整的用來解決Spark作業(yè)數(shù)據(jù)傾斜的解決方案;shuffle調(diào)優(yōu),面向的是對Spark的原理有較深層次掌握和研究的同學(xué),主要講解了如何對Spark作業(yè)的shuffle運(yùn)行過程以及細(xì)節(jié)進(jìn)行調(diào)優(yōu)。

作為Spark性能優(yōu)化指南的基礎(chǔ)篇,下面主要講解開發(fā)調(diào)優(yōu)以及資源調(diào)優(yōu)。

2、開發(fā)調(diào)優(yōu)

Spark性能優(yōu)化的***步,就是要在開發(fā)Spark作業(yè)的過程中注意和應(yīng)用一些性能優(yōu)化的基本原則。開發(fā)調(diào)優(yōu),就是要讓大家了解以下一些Spark基本開發(fā)原則,包括:RDD  lineage設(shè)計(jì)、算子的合理使用、特殊操作的優(yōu)化等。在開發(fā)過程中,時(shí)時(shí)刻刻都應(yīng)該注意以上原則,并將這些原則根據(jù)具體的業(yè)務(wù)以及實(shí)際的應(yīng)用場景,靈活地運(yùn)用到自己的Spark作業(yè)中。Spark性能優(yōu)化的***步,就是要在開發(fā)Spark作業(yè)的過程中注意和應(yīng)用一些性能優(yōu)化的基本原則。開發(fā)調(diào)優(yōu),就是要讓大家了解以下一些Spark基本開發(fā)原則,包括:RDD  lineage設(shè)計(jì)、算子的合理使用、特殊操作的優(yōu)化等。在開發(fā)過程中,時(shí)時(shí)刻刻都應(yīng)該注意以上原則,并將這些原則根據(jù)具體的業(yè)務(wù)以及實(shí)際的應(yīng)用場景,靈活地運(yùn)用到自己的Spark作業(yè)中。

原則一:避免創(chuàng)建重復(fù)的RDD

通常來說,我們在開發(fā)一個(gè)Spark作業(yè)時(shí),首先是基于某個(gè)數(shù)據(jù)源(比如Hive表或HDFS文件)創(chuàng)建一個(gè)初始的RDD;接著對這個(gè)RDD執(zhí)行某個(gè)算子操作,然后得到下一個(gè)RDD;以此類推,循環(huán)往復(fù),直到計(jì)算出最終我們需要的結(jié)果。在這個(gè)過程中,多個(gè)RDD會通過不同的算子操作(比如map、reduce等)串起來,這個(gè)“RDD串”,就是RDD  lineage,也就是“RDD的血緣關(guān)系鏈”。

我們在開發(fā)過程中要注意:對于同一份數(shù)據(jù),只應(yīng)該創(chuàng)建一個(gè)RDD,不能創(chuàng)建多個(gè)RDD來代表同一份數(shù)據(jù)。

一些Spark初學(xué)者在剛開始開發(fā)Spark作業(yè)時(shí),或者是有經(jīng)驗(yàn)的工程師在開發(fā)RDD  lineage極其冗長的Spark作業(yè)時(shí),可能會忘了自己之前對于某一份數(shù)據(jù)已經(jīng)創(chuàng)建過一個(gè)RDD了,從而導(dǎo)致對于同一份數(shù)據(jù),創(chuàng)建了多個(gè)RDD。這就意味著,我們的Spark作業(yè)會進(jìn)行多次重復(fù)計(jì)算來創(chuàng)建多個(gè)代表相同數(shù)據(jù)的RDD,進(jìn)而增加了作業(yè)的性能開銷。

一個(gè)簡單的例子

//也就是說,需要對一份數(shù)據(jù)執(zhí)行兩次算子操作。 //錯(cuò)誤的做法:對于同一份數(shù)據(jù)執(zhí)行多次算子操作時(shí),創(chuàng)建多個(gè)RDD。 //這里執(zhí)行了兩次textFile方法,針對同一個(gè)HDFS文件,創(chuàng)建了兩個(gè)RDD出來, //然后分別對每個(gè)RDD都執(zhí)行了一個(gè)算子操作。 //這種情況下,Spark需要從HDFS上兩次加載hello.txt文件的內(nèi)容,并創(chuàng)建兩個(gè)單獨(dú)的RDD; //第二次加載HDFS文件以及創(chuàng)建RDD的性能開銷,很明顯是白白浪費(fèi)掉的。 val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt") rdd1.map(...) val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt") rdd2.reduce(...) //正確的用法:對于一份數(shù)據(jù)執(zhí)行多次算子操作時(shí),只使用一個(gè)RDD。 //這種寫法很明顯比上一種寫法要好多了,因?yàn)槲覀儗τ谕环輸?shù)據(jù)只創(chuàng)建了一個(gè)RDD, //然后對這一個(gè)RDD執(zhí)行了多次算子操作。 //但是要注意到這里為止優(yōu)化還沒有結(jié)束,由于rdd1被執(zhí)行了兩次算子操作,第二次執(zhí)行reduce操作的時(shí)候, //還會再次從源頭處重新計(jì)算一次rdd1的數(shù)據(jù),因此還是會有重復(fù)計(jì)算的性能開銷。 //要徹底解決這個(gè)問題,必須結(jié)合“原則三:對多次使用的RDD進(jìn)行持久化”, //才能保證一個(gè)RDD被多次使用時(shí)只被計(jì)算一次。 val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt") rdd1.map(...)

原則二:盡可能復(fù)用同一個(gè)RDD

除了要避免在開發(fā)過程中對一份完全相同的數(shù)據(jù)創(chuàng)建多個(gè)RDD之外,在對不同的數(shù)據(jù)執(zhí)行算子操作時(shí)還要盡可能地復(fù)用一個(gè)RDD。比如說,有一個(gè)RDD的數(shù)據(jù)格式是key-value類型的,另一個(gè)是單value類型的,這兩個(gè)RDD的value數(shù)據(jù)是完全一樣的。那么此時(shí)我們可以只使用key-value類型的那個(gè)RDD,因?yàn)槠渲幸呀?jīng)包含了另一個(gè)的數(shù)據(jù)。對于類似這種多個(gè)RDD的數(shù)據(jù)有重疊或者包含的情況,我們應(yīng)該盡量復(fù)用一個(gè)RDD,這樣可以盡可能地減少RDD的數(shù)量,從而盡可能減少算子執(zhí)行的次數(shù)。

一個(gè)簡單的例子

// 錯(cuò)誤的做法。   // 有一個(gè)<Long, String>格式的RDD,即rdd1。 // 接著由于業(yè)務(wù)需要,對rdd1執(zhí)行了一個(gè)map操作,創(chuàng)建了一個(gè)rdd2,而rdd2中的數(shù)據(jù)僅僅是rdd1中的value值而已,也就是說,rdd2是rdd1的子集。 JavaPairRDD<Long, String> rdd1 = ... JavaRDD<String> rdd2 = rdd1.map(...)   // 分別對rdd1和rdd2執(zhí)行了不同的算子操作。 rdd1.reduceByKey(...) rdd2.map(...)   // 正確的做法。   // 上面這個(gè)case中,其實(shí)rdd1和rdd2的區(qū)別無非就是數(shù)據(jù)格式不同而已,rdd2的數(shù)據(jù)完全就是rdd1的子集而已,卻創(chuàng)建了兩個(gè)rdd,并對兩個(gè)rdd都執(zhí)行了一次算子操作。 // 此時(shí)會因?yàn)閷dd1執(zhí)行map算子來創(chuàng)建rdd2,而多執(zhí)行一次算子操作,進(jìn)而增加性能開銷。 // 其實(shí)在這種情況下完全可以復(fù)用同一個(gè)RDD。 // 我們可以使用rdd1,既做reduceByKey操作,也做map操作。 // 在進(jìn)行第二個(gè)map操作時(shí),只使用每個(gè)數(shù)據(jù)的tuple._2,也就是rdd1中的value值,即可。 JavaPairRDD<Long, String> rdd1 = ... rdd1.reduceByKey(...) rdd1.map(tuple._2...)   // 第二種方式相較于***種方式而言,很明顯減少了一次rdd2的計(jì)算開銷。 // 但是到這里為止,優(yōu)化還沒有結(jié)束,對rdd1我們還是執(zhí)行了兩次算子操作,rdd1實(shí)際上還是會被計(jì)算兩次。 // 因此還需要配合“原則三:對多次使用的RDD進(jìn)行持久化”進(jìn)行使用,才能保證一個(gè)RDD被多次使用時(shí)只被計(jì)算一次。

原則三:對多次使用的RDD進(jìn)行持久化

當(dāng)你在Spark代碼中多次對一個(gè)RDD做了算子操作后,恭喜,你已經(jīng)實(shí)現(xiàn)Spark作業(yè)***步的優(yōu)化了,也就是盡可能復(fù)用RDD。此時(shí)就該在這個(gè)基礎(chǔ)之上,進(jìn)行第二步優(yōu)化了,也就是要保證對一個(gè)RDD執(zhí)行多次算子操作時(shí),這個(gè)RDD本身僅僅被計(jì)算一次。

Spark中對于一個(gè)RDD執(zhí)行多次算子的默認(rèn)原理是這樣的:每次你對一個(gè)RDD執(zhí)行一個(gè)算子操作時(shí),都會重新從源頭處計(jì)算一遍,計(jì)算出那個(gè)RDD來,然后再對這個(gè)RDD執(zhí)行你的算子操作。這種方式的性能是很差的。

因此對于這種情況,我們的建議是:對多次使用的RDD進(jìn)行持久化。此時(shí)Spark就會根據(jù)你的持久化策略,將RDD中的數(shù)據(jù)保存到內(nèi)存或者磁盤中。以后每次對這個(gè)RDD進(jìn)行算子操作時(shí),都會直接從內(nèi)存或磁盤中提取持久化的RDD數(shù)據(jù),然后執(zhí)行算子,而不會從源頭處重新計(jì)算一遍這個(gè)RDD,再執(zhí)行算子操作。

對多次使用的RDD進(jìn)行持久化的代碼示例

// 如果要對一個(gè)RDD進(jìn)行持久化,只要對這個(gè)RDD調(diào)用cache()和persist()即可。   // 正確的做法。 // cache()方法表示:使用非序列化的方式將RDD中的數(shù)據(jù)全部嘗試持久化到內(nèi)存中。 // 此時(shí)再對rdd1執(zhí)行兩次算子操作時(shí),只有在***次執(zhí)行map算子時(shí),才會將這個(gè)rdd1從源頭處計(jì)算一次。 // 第二次執(zhí)行reduce算子時(shí),就會直接從內(nèi)存中提取數(shù)據(jù)進(jìn)行計(jì)算,不會重復(fù)計(jì)算一個(gè)rdd。 val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache() rdd1.map(...) rdd1.reduce(...)   // persist()方法表示:手動選擇持久化級別,并使用指定的方式進(jìn)行持久化。 // 比如說,StorageLevel.MEMORY_AND_DISK_SER表示,內(nèi)存充足時(shí)優(yōu)先持久化到內(nèi)存中,內(nèi)存不充足時(shí)持久化到磁盤文件中。 // 而且其中的_SER后綴表示,使用序列化的方式來保存RDD數(shù)據(jù),此時(shí)RDD中的每個(gè)partition都會序列化成一個(gè)大的字節(jié)數(shù)組,然后再持久化到內(nèi)存或磁盤中。 // 序列化的方式可以減少持久化的數(shù)據(jù)對內(nèi)存/磁盤的占用量,進(jìn)而避免內(nèi)存被持久化數(shù)據(jù)占用過多,從而發(fā)生頻繁GC。 val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER) rdd1.map(...) rdd1.reduce(...)

對于persist()方法而言,我們可以根據(jù)不同的業(yè)務(wù)場景選擇不同的持久化級別。

Spark的持久化級別

Spark性能優(yōu)化中的開發(fā)調(diào)優(yōu)是怎么樣的呢

如何選擇一種最合適的持久化策略

  • 默認(rèn)情況下,性能***的當(dāng)然是MEMORY_ONLY,但前提是你的內(nèi)存必須足夠足夠大,可以綽綽有余地存放下整個(gè)RDD的所有數(shù)據(jù)。因?yàn)椴贿M(jìn)行序列化與反序列化操作,就避免了這部分的性能開銷;對這個(gè)RDD的后續(xù)算子操作,都是基于純內(nèi)存中的數(shù)據(jù)的操作,不需要從磁盤文件中讀取數(shù)據(jù),性能也很高;而且不需要復(fù)制一份數(shù)據(jù)副本,并遠(yuǎn)程傳送到其他節(jié)點(diǎn)上。但是這里必須要注意的是,在實(shí)際的生產(chǎn)環(huán)境中,恐怕能夠直接用這種策略的場景還是有限的,如果RDD中數(shù)據(jù)比較多時(shí)(比如幾十億),直接用這種持久化級別,會導(dǎo)致JVM的OOM內(nèi)存溢出異常。

  • 如果使用MEMORY_ONLY級別時(shí)發(fā)生了內(nèi)存溢出,那么建議嘗試使用MEMORY_ONLY_SER級別。該級別會將RDD數(shù)據(jù)序列化后再保存在內(nèi)存中,此時(shí)每個(gè)partition僅僅是一個(gè)字節(jié)數(shù)組而已,大大減少了對象數(shù)量,并降低了內(nèi)存占用。這種級別比MEMORY_ONLY多出來的性能開銷,主要就是序列化與反序列化的開銷。但是后續(xù)算子可以基于純內(nèi)存進(jìn)行操作,因此性能總體還是比較高的。此外,可能發(fā)生的問題同上,如果RDD中的數(shù)據(jù)量過多的話,還是可能會導(dǎo)致OOM內(nèi)存溢出的異常。

  • 如果純內(nèi)存的級別都無法使用,那么建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因?yàn)榧热坏搅诉@一步,就說明RDD的數(shù)據(jù)量很大,內(nèi)存無法完全放下。序列化后的數(shù)據(jù)比較少,可以節(jié)省內(nèi)存和磁盤的空間開銷。同時(shí)該策略會優(yōu)先盡量嘗試將數(shù)據(jù)緩存在內(nèi)存中,內(nèi)存緩存不下才會寫入磁盤。

  • 通常不建議使用DISK_ONLY和后綴為_2的級別:因?yàn)橥耆诖疟P文件進(jìn)行數(shù)據(jù)的讀寫,會導(dǎo)致性能急劇降低,有時(shí)還不如重新計(jì)算一次所有RDD。后綴為_2的級別,必須將所有數(shù)據(jù)都復(fù)制一份副本,并發(fā)送到其他節(jié)點(diǎn)上,數(shù)據(jù)復(fù)制以及網(wǎng)絡(luò)傳輸會導(dǎo)致較大的性能開銷,除非是要求作業(yè)的高可用性,否則不建議使用。

原則四:盡量避免使用shuffle類算子

如果有可能的話,要盡量避免使用shuffle類算子。因?yàn)镾park作業(yè)運(yùn)行過程中,最消耗性能的地方就是shuffle過程。shuffle過程,簡單來說,就是將分布在集群中多個(gè)節(jié)點(diǎn)上的同一個(gè)key,拉取到同一個(gè)節(jié)點(diǎn)上,進(jìn)行聚合或join等操作。比如reduceByKey、join等算子,都會觸發(fā)shuffle操作。

shuffle過程中,各個(gè)節(jié)點(diǎn)上的相同key都會先寫入本地磁盤文件中,然后其他節(jié)點(diǎn)需要通過網(wǎng)絡(luò)傳輸拉取各個(gè)節(jié)點(diǎn)上的磁盤文件中的相同key。而且相同key都拉取到同一個(gè)節(jié)點(diǎn)進(jìn)行聚合操作時(shí),還有可能會因?yàn)橐粋€(gè)節(jié)點(diǎn)上處理的key過多,導(dǎo)致內(nèi)存不夠存放,進(jìn)而溢寫到磁盤文件中。因此在shuffle過程中,可能會發(fā)生大量的磁盤文件讀寫的IO操作,以及數(shù)據(jù)的網(wǎng)絡(luò)傳輸操作。磁盤IO和網(wǎng)絡(luò)數(shù)據(jù)傳輸也是shuffle性能較差的主要原因。

因此在我們的開發(fā)過程中,能避免則盡可能避免使用reduceByKey、join、distinct、repartition等會進(jìn)行shuffle的算子,盡量使用map類的非shuffle算子。這樣的話,沒有shuffle操作或者僅有較少shuffle操作的Spark作業(yè),可以大大減少性能開銷。

Broadcast與map進(jìn)行join代碼示例

// 傳統(tǒng)的join操作會導(dǎo)致shuffle操作。 // 因?yàn)閮蓚€(gè)RDD中,相同的key都需要通過網(wǎng)絡(luò)拉取到一個(gè)節(jié)點(diǎn)上,由一個(gè)task進(jìn)行join操作。 val rdd3 = rdd1.join(rdd2)   // Broadcast+map的join操作,不會導(dǎo)致shuffle操作。 // 使用Broadcast將一個(gè)數(shù)據(jù)量較小的RDD作為廣播變量。 val rdd2Data = rdd2.collect() val rdd2DataBroadcast = sc.broadcast(rdd2Data)   // 在rdd1.map算子中,可以從rdd2DataBroadcast中,獲取rdd2的所有數(shù)據(jù)。 // 然后進(jìn)行遍歷,如果發(fā)現(xiàn)rdd2中某條數(shù)據(jù)的key與rdd1的當(dāng)前數(shù)據(jù)的key是相同的,那么就判定可以進(jìn)行join。 // 此時(shí)就可以根據(jù)自己需要的方式,將rdd1當(dāng)前數(shù)據(jù)與rdd2中可以連接的數(shù)據(jù),拼接在一起(String或Tuple)。 val rdd3 = rdd1.map(rdd2DataBroadcast...)   // 注意,以上操作,建議僅僅在rdd2的數(shù)據(jù)量比較少(比如幾百M(fèi),或者一兩G)的情況下使用。 // 因?yàn)槊總€(gè)Executor的內(nèi)存中,都會駐留一份rdd2的全量數(shù)據(jù)。

原則五:使用map-side預(yù)聚合的shuffle操作

如果因?yàn)闃I(yè)務(wù)需要,一定要使用shuffle操作,無法用map類的算子來替代,那么盡量使用可以map-side預(yù)聚合的算子。

所謂的map-side預(yù)聚合,說的是在每個(gè)節(jié)點(diǎn)本地對相同的key進(jìn)行一次聚合操作,類似于MapReduce中的本地combiner。map-side預(yù)聚合之后,每個(gè)節(jié)點(diǎn)本地就只會有一條相同的key,因?yàn)槎鄺l相同的key都被聚合起來了。其他節(jié)點(diǎn)在拉取所有節(jié)點(diǎn)上的相同key時(shí),就會大大減少需要拉取的數(shù)據(jù)數(shù)量,從而也就減少了磁盤IO以及網(wǎng)絡(luò)傳輸開銷。通常來說,在可能的情況下,建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子。因?yàn)閞educeByKey和aggregateByKey算子都會使用用戶自定義的函數(shù)對每個(gè)節(jié)點(diǎn)本地的相同key進(jìn)行預(yù)聚合。而groupByKey算子是不會進(jìn)行預(yù)聚合的,全量的數(shù)據(jù)會在集群的各個(gè)節(jié)點(diǎn)之間分發(fā)和傳輸,性能相對來說比較差。

比如如下兩幅圖,就是典型的例子,分別基于reduceByKey和groupByKey進(jìn)行單詞計(jì)數(shù)。其中***張圖是groupByKey的原理圖,可以看到,沒有進(jìn)行任何本地聚合時(shí),所有數(shù)據(jù)都會在集群節(jié)點(diǎn)之間傳輸;第二張圖是reduceByKey的原理圖,可以看到,每個(gè)節(jié)點(diǎn)本地的相同key數(shù)據(jù),都進(jìn)行了預(yù)聚合,然后才傳輸?shù)狡渌?jié)點(diǎn)上進(jìn)行全局聚合。

Spark性能優(yōu)化中的開發(fā)調(diào)優(yōu)是怎么樣的呢

Spark性能優(yōu)化中的開發(fā)調(diào)優(yōu)是怎么樣的呢

原則六:使用高性能的算子

除了shuffle相關(guān)的算子有優(yōu)化原則之外,其他的算子也都有著相應(yīng)的優(yōu)化原則。

使用reduceByKey/aggregateByKey替代groupByKey

詳情見“原則五:使用map-side預(yù)聚合的shuffle操作”。

使用mapPartitions替代普通map

mapPartitions類的算子,一次函數(shù)調(diào)用會處理一個(gè)partition所有的數(shù)據(jù),而不是一次函數(shù)調(diào)用處理一條,性能相對來說會高一些。但是有的時(shí)候,使用mapPartitions會出現(xiàn)OOM(內(nèi)存溢出)的問題。因?yàn)閱未魏瘮?shù)調(diào)用就要處理掉一個(gè)partition所有的數(shù)據(jù),如果內(nèi)存不夠,垃圾回收時(shí)是無法回收掉太多對象的,很可能出現(xiàn)OOM異常。所以使用這類操作時(shí)要慎重!

使用foreachPartitions替代foreach

原理類似于“使用mapPartitions替代map”,也是一次函數(shù)調(diào)用處理一個(gè)partition的所有數(shù)據(jù),而不是一次函數(shù)調(diào)用處理一條數(shù)據(jù)。在實(shí)踐中發(fā)現(xiàn),foreachPartitions類的算子,對性能的提升還是很有幫助的。比如在foreach函數(shù)中,將RDD中所有數(shù)據(jù)寫MySQL,那么如果是普通的foreach算子,就會一條數(shù)據(jù)一條數(shù)據(jù)地寫,每次函數(shù)調(diào)用可能就會創(chuàng)建一個(gè)數(shù)據(jù)庫連接,此時(shí)就勢必會頻繁地創(chuàng)建和銷毀數(shù)據(jù)庫連接,性能是非常低下;但是如果用foreachPartitions算子一次性處理一個(gè)partition的數(shù)據(jù),那么對于每個(gè)partition,只要創(chuàng)建一個(gè)數(shù)據(jù)庫連接即可,然后執(zhí)行批量插入操作,此時(shí)性能是比較高的。實(shí)踐中發(fā)現(xiàn),對于1萬條左右的數(shù)據(jù)量寫MySQL,性能可以提升30%以上。

使用filter之后進(jìn)行coalesce操作*

通常對一個(gè)RDD執(zhí)行filter算子過濾掉RDD中較多數(shù)據(jù)后(比如30%以上的數(shù)據(jù)),建議使用coalesce算子,手動減少RDD的partition數(shù)量,將RDD中的數(shù)據(jù)壓縮到更少的partition中去。因?yàn)閒ilter之后,RDD的每個(gè)partition中都會有很多數(shù)據(jù)被過濾掉,此時(shí)如果照常進(jìn)行后續(xù)的計(jì)算,其實(shí)每個(gè)task處理的partition中的數(shù)據(jù)量并不是很多,有一點(diǎn)資源浪費(fèi),而且此時(shí)處理的task越多,可能速度反而越慢。因此用coalesce減少partition數(shù)量,將RDD中的數(shù)據(jù)壓縮到更少的partition之后,只要使用更少的task即可處理完所有的partition。在某些場景下,對于性能的提升會有一定的幫助。

使用repartitionAndSortWithinPartitions替代repartition與sort類操作

repartitionAndSortWithinPartitions是Spark官網(wǎng)推薦的一個(gè)算子,官方建議,如果需要在repartition重分區(qū)之后,還要進(jìn)行排序,建議直接使用repartitionAndSortWithinPartitions算子。因?yàn)樵撍阕涌梢砸贿呥M(jìn)行重分區(qū)的shuffle操作,一邊進(jìn)行排序。shuffle與sort兩個(gè)操作同時(shí)進(jìn)行,比先shuffle再sort來說,性能可能是要高的。

原則七:廣播大變量

有時(shí)在開發(fā)過程中,會遇到需要在算子函數(shù)中使用外部變量的場景(尤其是大變量,比如100M以上的大集合),那么此時(shí)就應(yīng)該使用Spark的廣播(Broadcast)功能來提升性能。

在算子函數(shù)中使用到外部變量時(shí),默認(rèn)情況下,Spark會將該變量復(fù)制多個(gè)副本,通過網(wǎng)絡(luò)傳輸?shù)絫ask中,此時(shí)每個(gè)task都有一個(gè)變量副本。如果變量本身比較大的話(比如100M,甚至1G),那么大量的變量副本在網(wǎng)絡(luò)中傳輸?shù)男阅荛_銷,以及在各個(gè)節(jié)點(diǎn)的Executor中占用過多內(nèi)存導(dǎo)致的頻繁GC,都會極大地影響性能。

因此對于上述情況,如果使用的外部變量比較大,建議使用Spark的廣播功能,對該變量進(jìn)行廣播。廣播后的變量,會保證每個(gè)Executor的內(nèi)存中,只駐留一份變量副本,而Executor中的task執(zhí)行時(shí)共享該Executor中的那份變量副本。這樣的話,可以大大減少變量副本的數(shù)量,從而減少網(wǎng)絡(luò)傳輸?shù)男阅荛_銷,并減少對Executor內(nèi)存的占用開銷,降低GC的頻率。

廣播大變量的代碼示例

// 以下代碼在算子函數(shù)中,使用了外部的變量。 // 此時(shí)沒有做任何特殊操作,每個(gè)task都會有一份list1的副本。 val list1 = ... rdd1.map(list1...)   // 以下代碼將list1封裝成了Broadcast類型的廣播變量。 // 在算子函數(shù)中,使用廣播變量時(shí),首先會判斷當(dāng)前task所在Executor內(nèi)存中,是否有變量副本。 // 如果有則直接使用;如果沒有則從Driver或者其他Executor節(jié)點(diǎn)上遠(yuǎn)程拉取一份放到本地Executor內(nèi)存中。 // 每個(gè)Executor內(nèi)存中,就只會駐留一份廣播變量副本。 val list1 = ... val list1Broadcast = sc.broadcast(list1) rdd1.map(list1Broadcast...)

原則八:使用Kryo優(yōu)化序列化性能

在Spark中,主要有三個(gè)地方涉及到了序列化:

  • 在算子函數(shù)中使用到外部變量時(shí),該變量會被序列化后進(jìn)行網(wǎng)絡(luò)傳輸(見“原則七:廣播大變量”中的講解)。

  • 將自定義的類型作為RDD的泛型類型時(shí)(比如JavaRDD,Student是自定義類型),所有自定義類型對象,都會進(jìn)行序列化。因此這種情況下,也要求自定義的類必須實(shí)現(xiàn)Serializable接口。

  • 使用可序列化的持久化策略時(shí)(比如MEMORY_ONLY_SER),Spark會將RDD中的每個(gè)partition都序列化成一個(gè)大的字節(jié)數(shù)組。

對于這三種出現(xiàn)序列化的地方,我們都可以通過使用Kryo序列化類庫,來優(yōu)化序列化和反序列化的性能。Spark默認(rèn)使用的是Java的序列化機(jī)制,也就是ObjectOutputStream/ObjectInputStream  API來進(jìn)行序列化和反序列化。但是Spark同時(shí)支持使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高很多。官方介紹,Kryo序列化機(jī)制比Java序列化機(jī)制,性能高10倍左右。Spark之所以默認(rèn)沒有使用Kryo作為序列化類庫,是因?yàn)镵ryo要求***要注冊所有需要進(jìn)行序列化的自定義類型,因此對于開發(fā)者來說,這種方式比較麻煩。

以下是使用Kryo的代碼示例,我們只要設(shè)置序列化類,再注冊要序列化的自定義類型即可(比如算子函數(shù)中使用到的外部變量類型、作為RDD泛型類型的自定義類型等):

// 創(chuàng)建SparkConf對象。 val conf = new SparkConf().setMaster(...).setAppName(...) // 設(shè)置序列化器為KryoSerializer。 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注冊要序列化的自定義類型。 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

原則九:優(yōu)化數(shù)據(jù)結(jié)構(gòu)

Java中,有三種類型比較耗費(fèi)內(nèi)存:

  • 對象,每個(gè)Java對象都有對象頭、引用等額外的信息,因此比較占用內(nèi)存空間。

  • 字符串,每個(gè)字符串內(nèi)部都有一個(gè)字符數(shù)組以及長度等額外信息。

  • 集合類型,比如HashMap、LinkedList等,因?yàn)榧项愋蛢?nèi)部通常會使用一些內(nèi)部類來封裝集合元素,比如Map.Entry。

因此Spark官方建議,在Spark編碼實(shí)現(xiàn)中,特別是對于算子函數(shù)中的代碼,盡量不要使用上述三種數(shù)據(jù)結(jié)構(gòu),盡量使用字符串替代對象,使用原始類型(比如Int、Long)替代字符串,使用數(shù)組替代集合類型,這樣盡可能地減少內(nèi)存占用,從而降低GC頻率,提升性能。

但是在筆者的編碼實(shí)踐中發(fā)現(xiàn),要做到該原則其實(shí)并不容易。因?yàn)槲覀兺瑫r(shí)要考慮到代碼的可維護(hù)性,如果一個(gè)代碼中,完全沒有任何對象抽象,全部是字符串拼接的方式,那么對于后續(xù)的代碼維護(hù)和修改,無疑是一場巨大的災(zāi)難。同理,如果所有操作都基于數(shù)組實(shí)現(xiàn),而不使用HashMap、LinkedList等集合類型,那么對于我們的編碼難度以及代碼可維護(hù)性,也是一個(gè)極大的挑戰(zhàn)。因此筆者建議,在可能以及合適的情況下,使用占用內(nèi)存較少的數(shù)據(jù)結(jié)構(gòu),但是前提是要保證代碼的可維護(hù)性。

關(guān)于Spark性能優(yōu)化中的開發(fā)調(diào)優(yōu)是怎么樣的呢就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI