溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

spark性能調(diào)優(yōu)的方法是什么

發(fā)布時間:2021-12-16 14:58:15 來源:億速云 閱讀:133 作者:iii 欄目:云計算

這篇文章主要講解了“spark性能調(diào)優(yōu)的方法是什么”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“spark性能調(diào)優(yōu)的方法是什么”吧!

  • 分配哪些資源?executor、cpu per executor、memory per executor、driver memory

  • 在哪里分配這些資源?在我們在生產(chǎn)環(huán)境中,提交spark作業(yè)時,用的spark-submit shell腳本,里面調(diào)整對應(yīng)的參數(shù)

    /usr/local/spark/bin/spark-submit \
    --class cn.spark.sparktest.core.WordCountCluster \
    --num-executors 3 \  配置executor的數(shù)量
    --driver-memory 100m \  配置driver的內(nèi)存(影響很大)
    --executor-memory 100m \  配置每個executor的內(nèi)存大小
    --executor-cores 3 \  配置每個executor的cpu core數(shù)量
    /usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \


     

  • 調(diào)節(jié)到多大,算是最大呢?

    • 第一種,Spark Standalone,公司集群上,搭建了一套Spark集群,你心里應(yīng)該清楚每臺機器還能夠給你使用的,大概有多少內(nèi)存,多少cpu core;那么,設(shè)置的時候,就根據(jù)這個實際的情況,去調(diào)節(jié)每個spark作業(yè)的資源分配。比如說你的每臺機器能夠給你使用4G內(nèi)存,2個cpu core;20臺機器;executor,20;平均每個executor:4G內(nèi)存,2個cpu core。

    • 第二種,Yarn。資源隊列。資源調(diào)度。應(yīng)該去查看,你的spark作業(yè),要提交到的資源隊列,大概有多少資源?500G內(nèi)存,100個cpu core;executor,50;平均每個executor:10G內(nèi)存,2個cpu core。

    • 設(shè)置隊列名稱:spark.yarn.queue default

    • 一個原則,你能使用的資源有多大,就盡量去調(diào)節(jié)到最大的大小(executor的數(shù)量,幾十個到上百個不等;executor內(nèi)存;executor cpu core)

  • 為什么調(diào)節(jié)了資源以后,性能可以提升?

    • 增加executor:

      如果executor數(shù)量比較少,那么,能夠并行執(zhí)行的task數(shù)量就比較少,就意味著,我們的Application的并行執(zhí)行的能力就很弱。比如有3個executor,每個executor有2個cpu core,那么同時能夠并行執(zhí)行的task,就是6個。6個執(zhí)行完以后,再換下一批6個task。增加了executor數(shù)量以后,那么,就意味著,能夠并行執(zhí)行的task數(shù)量,也就變多了。比如原先是6個,現(xiàn)在可能可以并行執(zhí)行10個,甚至20個,100個。那么并行能力就比之前提升了數(shù)倍,數(shù)十倍。相應(yīng)的,性能(執(zhí)行的速度),也能提升數(shù)倍~數(shù)十倍。

      有時候數(shù)據(jù)量比較少,增加大量的task反而性能會降低,為什么?(想想就明白了,你用多了,別人用的就少了。。。。)

    • 增加每個executor的cpu core:

      也是增加了執(zhí)行的并行能力。原本20個executor,每個才2個cpu core。能夠并行執(zhí)行的task數(shù)量,就是40個task?,F(xiàn)在每個executor的cpu core,增加到了5個。能夠并行執(zhí)行的task數(shù)量,就是100個task。執(zhí)行的速度,提升了2.5倍。

      SparkContext,DAGScheduler,TaskScheduler,會將我們的算子,切割成大量的task,
      提交到Application的executor上面去執(zhí)行。

    • 增加每個executor的內(nèi)存量:

      增加了內(nèi)存量以后,對性能的提升,有三點:
      1、如果需要對RDD進(jìn)行cache,那么更多的內(nèi)存,就可以緩存更多的數(shù)據(jù),將更少的數(shù)據(jù)寫入磁盤,甚至不寫入磁盤。減少了磁盤IO。
      2、對于shuffle操作,reduce端,會需要內(nèi)存來存放拉取的數(shù)據(jù)并進(jìn)行聚合。如果內(nèi)存不夠,也會寫入磁盤。如果給executor分配更多內(nèi)存以后,就有更少的數(shù)據(jù),需要寫入磁盤,
      甚至不需要寫入磁盤。減少了磁盤IO,提升了性能。
      3、對于task的執(zhí)行,可能會創(chuàng)建很多對象。如果內(nèi)存比較小,可能會頻繁導(dǎo)致JVM堆內(nèi)存滿了,然后頻繁GC,垃圾回收,minor GC和full GC。(速度很慢)。內(nèi)存加大以后,帶來更少的GC,垃圾回收,避免了速度變慢,速度變快了。

Spark并行度指的是什么?

Spark作業(yè),Application,Jobs,action(collect)觸發(fā)一個job,1個job;每個job拆成多個stage,
發(fā)生shuffle的時候,會拆分出一個stage,reduceByKey。

stage0
val lines = sc.textFile("hdfs://")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_,1))
val wordCount = pairs.reduceByKey(_ + _)

stage1
val wordCount = pairs.reduceByKey(_ + _)
wordCount.collect()

reduceByKey,stage0的task,在最后,執(zhí)行到reduceByKey的時候,會為每個stage1的task,都創(chuàng)建一份文件(也可能是合并在少量的文件里面);每個stage1的task,會去各個節(jié)點上的各個task創(chuàng)建的屬于自己的那一份文件里面,拉取數(shù)據(jù);每個stage1的task,拉取到的數(shù)據(jù),一定是相同key對應(yīng)的數(shù)據(jù)。對相同的key,對應(yīng)的values,才能去執(zhí)行我們自定義的function操作(_ + _)

并行度:其實就是指的是,Spark作業(yè)中,各個stage的task數(shù)量,也就代表了Spark作業(yè)的在各個階段(stage)的并行度。

如果不調(diào)節(jié)并行度,導(dǎo)致并行度過低,會怎么樣?

  • task沒有設(shè)置,或者設(shè)置的很少,比如就設(shè)置了,100個task。50個executor,每個executor有3個cpu core,也就是說,你的Application任何一個stage運行的時候,都有總數(shù)在150個cpu core,可以并行運行。但是你現(xiàn)在,只有100個task,平均分配一下,每個executor分配到2個task,ok,那么同時在運行的task,只有100個,每個executor只會并行運行2個task。每個executor剩下的一個cpu core,就浪費掉了。

  • 你的資源雖然分配足夠了,但是問題是,并行度沒有與資源相匹配,導(dǎo)致你分配下去的資源都浪費掉了。合理的并行度的設(shè)置,應(yīng)該是要設(shè)置的足夠大,大到可以完全合理的利用你的集群資源;比如上面的例子,總共集群有150個cpu core,可以并行運行150個task。那么就應(yīng)該將你的Application的并行度,至少設(shè)置成150,才能完全有效的利用你的集群資源,讓150個task,并行執(zhí)行;而且task增加到150個以后,即可以同時并行運行,還可以讓每個task要處理的數(shù)據(jù)量變少;比如總共150G的數(shù)據(jù)要處理,如果是100個task,每個task計算1.5G的數(shù)據(jù);現(xiàn)在增加到150個task,可以并行運行,而且每個task主要處理1G的數(shù)據(jù)就可以。

  • 很簡單的道理,只要合理設(shè)置并行度,就可以完全充分利用你的集群計算資源,并且減少每個task要處理的數(shù)據(jù)量,最終,就是提升你的整個Spark作業(yè)的性能和運行速度。

    1. task數(shù)量,至少設(shè)置成與Spark application的總cpu core數(shù)量相同(最理想情況,比如總共150個cpu core,分配了150個task,一起運行,差不多同一時間運行完畢)

    2. 官方是推薦,task數(shù)量,設(shè)置成spark application總cpu core數(shù)量的2~3倍,比如150個cpu core,基本要設(shè)置task數(shù)量為300~500;實際情況,與理想情況不同的,有些task會運行的快一點,比如50s就完了,有些task,可能會慢一點,要1分半才運行完,所以如果你的task數(shù)量,剛好設(shè)置的跟cpu core數(shù)量相同,可能還是會導(dǎo)致資源的浪費,因為,比如150個task,10個先運行完了,剩余140個還在運行,但是這個時候,有10個cpu core就空閑出來了,就導(dǎo)致了浪費。那如果task數(shù)量設(shè)置成cpu core總數(shù)的2~3倍,那么一個task運行完了以后,另一個task馬上可以補上來,就盡量讓cpu core不要空閑,同時也是盡量提升spark作業(yè)運行的效率和速度,提升性能。

    3. 如何設(shè)置一個Spark Application的并行度?

      spark.default.parallelism 
      SparkConf conf = new SparkConf().set("spark.default.parallelism", "500")


       

默認(rèn)情況下,多次對一個RDD執(zhí)行算子,去獲取不同的RDD;都會對這個RDD以及之前的父RDD,全部重新計算一次;讀取HDFS->RDD1->RDD2-RDD4這種情況,是絕對絕對,一定要避免的,一旦出現(xiàn)一個RDD重復(fù)計算的情況,就會導(dǎo)致性能急劇降低。比如,HDFS->RDD1-RDD2的時間是15分鐘,那么此時就要走兩遍,變成30分鐘

  1. RDD架構(gòu)重構(gòu)與優(yōu)化盡量去復(fù)用RDD,差不多的RDD,可以抽取稱為一個共同的RDD,供后面的RDD計算時,反復(fù)使用。

  2. 公共RDD一定要實現(xiàn)持久化。就好比北方吃餃子,現(xiàn)包現(xiàn)煮。你人來了,要點一盤餃子。餡料+餃子皮+水->包好的餃子,對包好的餃子去煮,煮開了以后,才有你需要的熟的,熱騰騰的餃子?,F(xiàn)實生活中,餃子現(xiàn)包現(xiàn)煮,當(dāng)然是最好的了。但是Spark中,RDD要去“現(xiàn)包現(xiàn)煮”,那就是一場致命的災(zāi)難。對于要多次計算和使用的公共RDD,一定要進(jìn)行持久化。持久化,也就是說,將RDD的數(shù)據(jù)緩存到內(nèi)存中/磁盤中,(BlockManager),以后無論對這個RDD做多少次計算,那么都是直接取這個RDD的持久化的數(shù)據(jù),比如從內(nèi)存中或者磁盤中,直接提取一份數(shù)據(jù)。

  3. 持久化,是可以進(jìn)行序列化的如果正常將數(shù)據(jù)持久化在內(nèi)存中,那么可能會導(dǎo)致內(nèi)存的占用過大,這樣的話,也許,會導(dǎo)致OOM內(nèi)存溢出。當(dāng)純內(nèi)存無法支撐公共RDD數(shù)據(jù)完全存放的時候,就優(yōu)先考慮,使用序列化的方式在純內(nèi)存中存儲。將RDD的每個partition的數(shù)據(jù),序列化成一個大的字節(jié)數(shù)組,就一個對象;序列化后,大大減少內(nèi)存的空間占用。序列化的方式,唯一的缺點就是,在獲取數(shù)據(jù)的時候,需要反序列化。如果序列化純內(nèi)存方式,還是導(dǎo)致OOM,內(nèi)存溢出;就只能考慮磁盤的方式,內(nèi)存+磁盤的普通方式(無序列化)。內(nèi)存+磁盤,序列化。

  4. 為了數(shù)據(jù)的高可靠性,而且內(nèi)存充足,可以使用雙副本機制,進(jìn)行持久化持久化的雙副本機制,持久化后的一個副本,因為機器宕機了,副本丟了,就還是得重新計算一次;持久化的每個數(shù)據(jù)單元,存儲一份副本,放在其他節(jié)點上面;從而進(jìn)行容錯;一個副本丟了,不用重新計算,還可以使用另外一份副本。這種方式,僅僅針對你的內(nèi)存資源極度充足.

持久化,很簡單,就是對RDD調(diào)用persist()方法,并傳入一個持久化級別

  • 如果是persist(StorageLevel.MEMORY_ONLY()),純內(nèi)存,無序列化,那么就可以用cache()方法來替代

    • StorageLevel.MEMORY_ONLY_SER(),第二選擇

    • StorageLevel.MEMORY_AND_DISK(),第三選擇

    • StorageLevel.MEMORY_AND_DISK_SER(),第四選擇

    • StorageLevel.DISK_ONLY(),第五選擇

  • 如果內(nèi)存充足,要使用雙副本高可靠機制,選擇后綴帶_2的策略

    • StorageLevel.MEMORY_ONLY_2()

感謝各位的閱讀,以上就是“spark性能調(diào)優(yōu)的方法是什么”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對spark性能調(diào)優(yōu)的方法是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI