您好,登錄后才能下訂單哦!
這篇文章主要介紹“Spark內(nèi)存調(diào)優(yōu)的方法是什么”,在日常操作中,相信很多人在Spark內(nèi)存調(diào)優(yōu)的方法是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Spark內(nèi)存調(diào)優(yōu)的方法是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
在分布式應(yīng)用中數(shù)據(jù)序列化扮演著至關(guān)重要的角色。序列化對象的速度很慢,或者消耗大量字節(jié)的格式,會大大降低計算速度。通常情況下,這將是你優(yōu)化Spark應(yīng)用時首先要調(diào)整的東西。Spark的目標是在易用性(允許你在操作中使用任何Java類型)和性能之間取得平衡。它提供了兩個序列化庫:
Java serialization:默認是這個,Java序列化很靈活,但往往相當慢,而且導致許多類的序列化格式很大。
Kryo serialization:Spark也可以使用Kyro庫更快地序列化對象。Kryo明顯比Java序列化更快、更緊湊(通常高達10倍),但不支持所有的Serializable類型,并要求你提前注冊你將在程序中使用的類以獲得最佳性能。
使用Kryo注冊并不是想象中十分晦澀難懂的操作,多數(shù)情況僅需一行代碼就行!
可以在 SparkConf 里設(shè)置conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
來初始化Kryo
。建議在網(wǎng)絡(luò)密集型應(yīng)用里使用Kyro序列化。從Spark2.0開始,在Shuffle RDD階段的一些簡單類型已經(jīng)自動使用了Kyro序列化。
想要注冊自定義類使用Kyro,只需如下操作:
val conf = new SparkConf().setMaster(...)...... conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf)
如果代碼對象很大,你需要增大spark.kryoserializer.buffer
配置。如果你沒有注冊你的自定義類,Kryo仍然會生效,但是它不得不隨對象存儲全類名,這很浪費資源。
這部分將首先概述Spark的內(nèi)存管理,然后討論用戶可以采取的具體策略,以便在我們的應(yīng)用程序中更有效地利用內(nèi)存。特別是,我們將描述如何確定你的對象的內(nèi)存使用情況,以及如何通過改變你的數(shù)據(jù)結(jié)構(gòu)改善它,或通過以序列化的格式存儲數(shù)據(jù)。然后,我們將介紹調(diào)整Spark的緩存大小和Java的垃圾收集器。
眾所周知的是Spark的內(nèi)存主要分為2大塊:執(zhí)行與存儲(execution and storage)。執(zhí)行內(nèi)存就是計算用的,如shuffle/join/sort這些,存儲內(nèi)存則用于緩存和跨集群傳遞數(shù)據(jù)。在Spark中這兩塊內(nèi)存是統(tǒng)一區(qū)域管理的,名為M。當沒有執(zhí)行內(nèi)存需求時,存儲內(nèi)存可以獲取全部內(nèi)存,反之亦然。執(zhí)行可以在必要時驅(qū)逐存儲占用的內(nèi)存空間,直到存儲內(nèi)存占用降低至某一界限R。換言之,R描述了M中的一個子區(qū)域,其中緩存的塊永遠不會被驅(qū)逐。由于執(zhí)行中的復(fù)雜性,存儲可能不會驅(qū)逐執(zhí)行內(nèi)存。
這種設(shè)計確保了幾個理想的特性。首先,不使用緩存的應(yīng)用程序可以使用整個空間來執(zhí)行,避免了不必要的磁盤溢出。其次,使用緩存的應(yīng)用程序可以保留一個最小的存儲空間(R),其數(shù)據(jù)塊不會被驅(qū)逐。最后,這種方法為各種工作負載提供了合理的開箱即用的性能,而不需要用戶對內(nèi)存的內(nèi)部劃分有專業(yè)認識。
雖然有兩個相關(guān)的配置,但一般用戶應(yīng)該不需要調(diào)整,因為默認值適用于大多數(shù)工作負載。
spark.memory.fraction
將M的大小表示為(JVM堆空間-300MB)的一部分(默認為0.6)。其余的空間(40%)被保留給用戶數(shù)據(jù)結(jié)構(gòu)、Spark的內(nèi)部元數(shù)據(jù),以及在記錄稀少和異常大的情況下對OOM錯誤的保護。
spark.memory.storageFraction
表示R占M多大一部分(默認為0.5)。R是M中的存儲空間,其中的緩存塊對執(zhí)行的驅(qū)逐免疫。
并沒有一個放之四海而皆準的公式告訴你RDD占用了多少內(nèi)存,對一個具體業(yè)務(wù)需要實踐出真知。
確定一個數(shù)據(jù)集所需的內(nèi)存消耗量的最佳方法是創(chuàng)建一個RDD,將其放入緩存,并查看Web UI中的 "Storage "頁面。該頁面將告訴你該RDD占用了多少內(nèi)存。
要估計一個特定對象的內(nèi)存消耗,可以使用SizeEstimator’s estimate
方法。這對于試驗不同的數(shù)據(jù)布局以修整內(nèi)存使用量,以及確定一個廣播變量在每個執(zhí)行器堆上所占用的空間是很有用的。
減少內(nèi)存消耗的第一個方法是避免那些增加開銷的Java特性,如基于指針的數(shù)據(jù)結(jié)構(gòu)和包裝對象。有幾種方法可以做到這一點。
將你的數(shù)據(jù)結(jié)構(gòu)設(shè)計成傾向于對象的數(shù)組和原始類型,而不是標準的Java或Scala集合類(例如HashMap
)fastutil庫為原始類型提供了方便的集合類,與Java標準庫兼容。
盡可能避免使用帶有大量小對象和指針的嵌套結(jié)構(gòu)。
考慮使用數(shù)字ID或枚舉對象而不是字符串作為鍵。
如果你的RAM少于32GiB,設(shè)置JVM標志-XX:+UseCompressedOops,使指針為四字節(jié)而不是八字節(jié)。你可以在 spark-env.sh
中添加這些選項。
當你的對象仍然太大,無法有效地存儲,盡管有這樣的調(diào)整,減少內(nèi)存使用的一個更簡單的方法是以序列化的形式存儲它們,使用RDD持久化API中的序列化存儲級別,如MEMORY_ONLY_SER
。然后,Spark將把每個RDD分區(qū)存儲為一個大的字節(jié)數(shù)組。以序列化形式存儲數(shù)據(jù)的唯一缺點是訪問時間較慢,因為必須在運行中對每個對象進行反序列化。如果你想以序列化的形式緩存數(shù)據(jù),我們強烈建議你使用Kryo
,因為它導致的大小比Java序列化小得多(當然也比原始Java對象?。?。
當你的程序所存儲的RDD有很大的 "流失 "時,JVM的垃圾回收可能是一個問題。(在只讀取一次RDD,然后對其進行許多操作的程序中,這通常不是一個問題)。當Java需要驅(qū)逐舊對象為新對象騰出空間時,它需要追蹤你所有的Java對象并找到未使用的對象。這里需要記住的要點是,垃圾收集的成本與Java對象的數(shù)量成正比,所以使用對象較少的數(shù)據(jù)結(jié)構(gòu)(例如,用Ints
數(shù)組代替LinkedList
)可以大大降低這一成本。一個更好的方法是以序列化的形式持久化對象,如上所述:現(xiàn)在每個RDD分區(qū)將只有一個對象(一個字節(jié)數(shù)組)。在嘗試其他技術(shù)之前,如果GC是一個問題,首先要嘗試的是使用序列化的緩存。
由于你的任務(wù)的工作內(nèi)存(運行任務(wù)所需的空間量)和你的節(jié)點上緩存的RDD之間的干擾,GC也可能成為一個問題。我們將討論如何控制分配給RDD緩存的空間以緩解這一問題。
GC調(diào)整的第一步是收集關(guān)于垃圾收集發(fā)生頻率和花費在GC上的時間的統(tǒng)計數(shù)據(jù)。這可以通過在Java選項中添加-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps來實現(xiàn)。下次運行Spark作業(yè)時,你會看到每次發(fā)生GC時,工作節(jié)點的日志中都會打印出信息。請注意,這些日志將出現(xiàn)在集群的工作節(jié)點上(在其工作目錄的stdout文件中),而不是在你的驅(qū)動程序上。
為了進一步調(diào)整GC,我們首先需要了解一些關(guān)于JVM中內(nèi)存管理的基本信息。
Java的堆空間被分為兩個區(qū)域 Young 和 Old。Young代是用來存放臨時的對象的,而Old代是用來存放壽命較長的對象的。
年輕一代又被劃分為三個區(qū)域[Eden, Survivor1, Survivor2]。
對GC行為的簡化描述:當Eden滿時,在Eden上運行一個小的GC,Eden和Survivor1中活著的對象被復(fù)制到Survivor2。Survivor區(qū)域被交換。如果一個對象足夠老或者Survivor2已經(jīng)滿了,它就會被移到Old。最后,當Old接近滿的時候,一個Full GC
被調(diào)用。
Spark中GC調(diào)整的目標是確保只有長期存在的RDD被存儲在Old一代,而Young一代有足夠的大小來存儲短期對象。這將有助于避免全面GC來清理任務(wù)執(zhí)行過程中創(chuàng)建的臨時對象。一些可能有用的步驟是。
通過收集GC統(tǒng)計信息,檢查是否有太多的垃圾收集。如果在一個任務(wù)完成之前多次調(diào)用Full GC
,這意味著沒有足夠的內(nèi)存可用于執(zhí)行任務(wù)。
如果有太多的小GC但沒有太多的大GC,為Eden分配更多的內(nèi)存會有幫助。你可以將Eden的大小設(shè)置為對每個任務(wù)所需內(nèi)存的高估值。如果Eden的大小被確定為E,那么你可以使用選項-Xmn
=4/3*E來設(shè)置Young generation的大小。(按4/3的比例增加是為了考慮幸存者區(qū)域所使用的空間)。
在打印的GC統(tǒng)計中,如果OldGen接近滿了,通過降低spark.memory.fraction
來減少用于緩存的內(nèi)存量;緩存更少的對象比減緩任務(wù)的執(zhí)行要好。另外,也可以考慮減少Young代的大小。這意味著降低-Xmn
,如果你已經(jīng)如上設(shè)置。如果沒有,可以嘗試改變JVM的NewRatio參數(shù)的值。許多JVM將其默認為2,這意味著老一代占據(jù)了2/3的堆。它應(yīng)該足夠大,以至于這個分數(shù)超過了spark.memory.fraction
。
嘗試設(shè)置-XX:+UseG1GC
來使用G1GC垃圾收集器。在垃圾收集是一個瓶頸的情況下,它可以提高性能。注意,對于大的執(zhí)行器堆大小,用-XX:G1HeapRegionSize
增加G1區(qū)域大小可能很重要。
舉個例子,如果你的任務(wù)是從HDFS讀取數(shù)據(jù),任務(wù)使用的內(nèi)存量可以用從HDFS讀取的數(shù)據(jù)塊的大小來估計。請注意,解壓后的塊的大小往往是塊的2或3倍。因此,如果我們希望有3或4個任務(wù)的工作空間,而HDFS塊的大小是128MiB,我們可以估計Eden的大小是43128MiB。
監(jiān)控垃圾收集的頻率和時間在新的設(shè)置下如何變化。
我們的經(jīng)驗表明,GC調(diào)整的效果取決于你的應(yīng)用程序和可用的內(nèi)存量。網(wǎng)上還描述了許多調(diào)優(yōu)選項,但在高層次上,管理完全GC發(fā)生的頻率可以幫助減少開銷。
可以通過在作業(yè)的配置中設(shè)置 spark.executor.defaultJavaOptions 或 spark.executor.extraJavaOptions 來指定執(zhí)行器的 GC 調(diào)整設(shè)置。
除非你把每個操作的并行度設(shè)置得足夠高,否則集群不會得到充分的利用。Spark會根據(jù)文件的大小自動設(shè)置在每個文件上運行的 map
任務(wù)的數(shù)量(當然你可以通過SparkContext.textFile等的可選參數(shù)來控制),而對于分布式的 "reduce "操作,比如groupByKey
和reduceByKey
,它會使用最大的父RDD的分區(qū)數(shù)量。你可以把并行程度作為第二個參數(shù)傳遞(見spark.PairRDDFunctions文檔),或者設(shè)置配置屬性spark.default.parallelism
來改變默認值。一般來說,我們建議在你的集群中每個CPU核有2-3個任務(wù)。
有時,當作業(yè)輸入有大量的目錄時,你可能還需要增加目錄列表的并行性,否則這個過程可能會花費很長的時間,特別是在針對S3這樣的對象存儲時。如果你的作業(yè)在具有Hadoop輸入格式的RDD上工作(例如,通過SparkContext.sequenceFile),則通過spark.hadoop.mapreduce.input.fileinputformat.list-status.num-reads(目前默認為1)控制并行性。
對于具有基于文件的數(shù)據(jù)源的Spark SQL,你可以調(diào)整spark.sql.sources.parallelPartitionDiscovery.threshold和spark.sql.sources.parallelPartitionDiscovery.parallelism,以提高列舉并行性。更多細節(jié)請參考Spark SQL性能調(diào)優(yōu)指南。
有時,你會得到OutOfMemoryError
,不是因為你的RDDs不適合在內(nèi)存中,而是因為你的某個任務(wù)的工作集,比如groupByKey
中的一個Reduce
任務(wù)太大。Spark的shuffle操作(sortByKey、groupByKey、reduceByKey、join等)在每個任務(wù)中建立一個哈希表來執(zhí)行分組,而這個哈希表往往會很大。這里最簡單的解決方法是提高并行化水平,使每個任務(wù)的輸入集更小。Spark可以有效地支持短至200毫秒的任務(wù),因為它在許多任務(wù)中重復(fù)使用一個執(zhí)行器JVM,而且它的任務(wù)啟動成本很低,所以你可以安全地將并行化水平提高到超過集群中的核心數(shù)量。
使用SparkContext
中的廣播功能可以大大減少每個序列化任務(wù)的大小,以及在集群中啟動作業(yè)的成本。如果你的任務(wù)中使用了驅(qū)動程序中的任何大型對象(例如靜態(tài)查詢表),可以考慮將其變成一個廣播變量。Spark在主程序上打印每個任務(wù)的序列化大小,所以你可以看一下,以決定你的任務(wù)是否太大;一般來說,大于20KiB的任務(wù)可能值得優(yōu)化。
數(shù)據(jù)位置可以對Spark作業(yè)的性能產(chǎn)生重大影響。如果數(shù)據(jù)和對其進行操作的代碼在一起,那么計算往往會很快。但如果代碼和數(shù)據(jù)是分開的,一個必須移動到另一個。通常情況下,將序列化的代碼從一個地方運送到另一個地方要比運送一大塊數(shù)據(jù)快,因為代碼的大小比數(shù)據(jù)小得多。Spark圍繞這個數(shù)據(jù)定位的一般原則建立了它的調(diào)度。
數(shù)據(jù)定位是指數(shù)據(jù)離處理它的代碼有多近。根據(jù)數(shù)據(jù)的當前位置,有幾個級別的定位。按照從最近到最遠的順序:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY。
Spark通常的做法是等待一下,希望有一個繁忙的CPU騰出手來。一旦超時,它就開始把數(shù)據(jù)從遠處移到空閑的CPU上。每個級別之間的回退等待超時可以單獨配置,也可以在一個參數(shù)中全部配置;詳見spark.locality
參數(shù)。如果你的任務(wù)很長,看到的定位性很差,你應(yīng)該增加這些設(shè)置,但默認值通常很好用。
到此,關(guān)于“Spark內(nèi)存調(diào)優(yōu)的方法是什么”的學習就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。