溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

揭秘Spark應(yīng)用性能調(diào)優(yōu)

發(fā)布時(shí)間:2020-08-01 02:41:40 來(lái)源:網(wǎng)絡(luò) 閱讀:3479 作者:博文視點(diǎn) 欄目:大數(shù)據(jù)

引言:在多臺(tái)機(jī)器上分布數(shù)據(jù)以及處理數(shù)據(jù)是Spark的核心能力,即我們所說(shuō)的大規(guī)模的數(shù)據(jù)集處理。為了充分利用Spark特性,應(yīng)該考慮一些調(diào)優(yōu)技術(shù)。本文每一小節(jié)都是關(guān)于調(diào)優(yōu)技術(shù)的,并給出了如何實(shí)現(xiàn)調(diào)優(yōu)的必要步驟。
本文選自《Spark GraphX實(shí)戰(zhàn)》。

1 用緩存和持久化來(lái)加速 Spark

  我們知道Spark 可以通過(guò) RDD 實(shí)現(xiàn)計(jì)算鏈的原理 :轉(zhuǎn)換函數(shù)包含在 RDD 鏈中,但僅在調(diào)用 action 函數(shù)后才會(huì)觸發(fā)實(shí)際的求值過(guò)程,執(zhí)行分布式運(yùn)算,返回運(yùn)算結(jié)果。要是在 同一 RDD 上重復(fù)調(diào)用 action 會(huì)發(fā)生什么?

RDD 持久化

  一般 RDD 不會(huì)保留運(yùn)算結(jié)果,如果再次調(diào)用 action 函數(shù),整個(gè) RDD 鏈會(huì)重新 運(yùn)算。有些情況下這不會(huì)有問(wèn)題,但是對(duì)于許多機(jī)器學(xué)習(xí)任務(wù)和圖處理任務(wù),這就 是很大的問(wèn)題了。通常需要多次迭代的算法,在同一個(gè) RDD 上執(zhí)行很多次,反復(fù) 地重新加載數(shù)據(jù)和重新計(jì)算會(huì)導(dǎo)致時(shí)間浪費(fèi)。更糟糕的是,這些算法通常需要很長(zhǎng) 的 RDD 鏈。
  看來(lái)我們需要另一種方式來(lái)充分利用集群可用內(nèi)存來(lái)保存 RDD 的運(yùn)算結(jié)果。 這就是 Spark 緩存(緩存也是 Spark 支持的一種持久化類型)。
  要在內(nèi)存中緩存一個(gè) RDD,可以調(diào)用 RDD 對(duì)象的 cache 函數(shù)。以下在 spark- shell 中執(zhí)行的代碼,會(huì)計(jì)算文件的總行數(shù),輸出文件內(nèi)容 :

val filename = "..."val rdd1 = sc.textFile(filename).cacherdd1.countrdd1.collect

如果不調(diào)用 cache 函數(shù),當(dāng) count 和 collect 這兩個(gè) action 函數(shù)被調(diào)用時(shí), 會(huì)導(dǎo)致執(zhí)行從存儲(chǔ)系統(tǒng)中讀文件兩次。調(diào)用了 cache 函數(shù),第一個(gè) action 函數(shù)(count 函數(shù))會(huì)把它的運(yùn)算結(jié)果保留在內(nèi)存中,在執(zhí)行第二個(gè) action 函數(shù)(collection 函數(shù))時(shí),會(huì)直接在使用緩存的數(shù)據(jù)上繼續(xù)運(yùn)算,而不需要重新計(jì)算整個(gè) RDD 鏈。 即使通過(guò)轉(zhuǎn)換緩存的 RDD,生成新的 RDD,緩存的數(shù)據(jù)仍然可用。下面的代碼會(huì)找出所有的注釋行(以 # 開(kāi)始的行數(shù)據(jù))。

val rdd2 =rdd1.filter(_.startsWith("#"))
rdd2.collect

  因?yàn)?rdd2 源于已緩存的 rdd1,rdd1 已經(jīng)把它的運(yùn)算結(jié)果緩存在內(nèi)存中了, 所以 rdd2 也就不需要重新從存儲(chǔ)系統(tǒng)中讀取數(shù)據(jù)。

注意:cache 方法作為一個(gè)標(biāo)志表示 RDD 應(yīng)當(dāng)緩存,但并不是立即緩存。 緩存發(fā)生在當(dāng)前 RDD 在下一次要被計(jì)算的時(shí)候。

持久化等級(jí)

  如上所述,緩存是其中一種持久化類型。下表列出了 Spark 支持的所有持久 化等級(jí)。
        揭秘Spark應(yīng)用性能調(diào)優(yōu)
  每個(gè)持久化等級(jí)都定義在單例對(duì)象 StorageLevel 中。例如,調(diào)用 rdd.persist(StorageLevel.MEMORY_AND_DISK)方法會(huì)把 RDD 設(shè)置成內(nèi)存和磁盤緩 存。 cache 方法內(nèi)部也是調(diào)用 rdd.persist(StorageLevel.MEMORY_ONLY)。

注意 :其他的持久化等級(jí),如 MEMORY_ONLY2、MEMORY_AND_ DISK2 等,也是可用的。它們會(huì)復(fù)制 RDD到集群的其他節(jié)點(diǎn)上,以便 提供容錯(cuò)能力。這些內(nèi)容超出了本書(shū)范圍,感興趣的讀者可以看看 Petar Zec' evic' 和 Marko
Bonac' i(Manning, 2016)的書(shū) Spark in Action,這本書(shū)更 深入地介紹了 Spark 容錯(cuò)方面的內(nèi)容。

圖的持久化

  無(wú)論什么時(shí)候,通過(guò) Graph 對(duì)象調(diào)用一些函數(shù)如 mapVertices 或 aggregateMessages, 這些操作都是基于下層的 RDD 實(shí)現(xiàn)的。
  Graph 對(duì)象提供了基于頂點(diǎn) RDD 和邊 RDD 方便的緩存和持久化方法。

在合適的時(shí)機(jī)反持久化

  雖然看起來(lái)緩存是一個(gè)應(yīng)該被到處使用的好東西,但是用得太多也會(huì)讓人過(guò)度依賴它。
  當(dāng)緩存越來(lái)越多的 RDD 后,可用的內(nèi)存就會(huì)減少。最終 Spark 會(huì)把分區(qū)數(shù)據(jù)從 內(nèi)存中逐出(使用最少最近使用算法,LRU)。同時(shí),緩存過(guò)多的 Java 對(duì)象,JVM 垃圾回收高耗是不可避免的。這就是為什么當(dāng)緩存不再被使用時(shí)很有必要調(diào)用 un- persist 方法。對(duì)迭代算法而言,在循環(huán)中常用下面的方法調(diào)用模式 :

  • 調(diào)用 Graph 的 cache 或 persist 方法。

  • 調(diào)用 Graph 的 action 函數(shù),觸發(fā) Graph 下面的 RDD 被緩存……

  • 執(zhí)行算法主體的其余部分。

  • 在循環(huán)體的最后部分,反持久化,即釋放緩存。

提示 :用Pregel API的好處是,它已經(jīng)在內(nèi)部做了緩存和釋放緩存的操作。

何時(shí)不用緩存

  不能盲目地在內(nèi)存中緩存 RDD。要考慮數(shù)據(jù)集會(huì)被訪問(wèn)多少次以及每次訪問(wèn)時(shí) 重計(jì)算和緩存的代價(jià)對(duì)比,重計(jì)算也可能比增加內(nèi)存的方式付出的代價(jià)小。
  毫無(wú)疑問(wèn),如果僅僅讀一次數(shù)據(jù)集,緩存 RDD 就毫無(wú)意義,這還會(huì)讓作業(yè)運(yùn) 行得更慢,特別是用了有序列化的持久化等級(jí)。

2.checkpointing

  圖算法中一個(gè)常用的模式是用每個(gè)迭代過(guò)程中運(yùn)算后的新數(shù)據(jù)更新圖。這意味 著,實(shí)際構(gòu)成圖的頂點(diǎn) RDD 亦或邊 RDD 的鏈會(huì)變得越來(lái)越長(zhǎng)。

定義 :當(dāng) RDD 由逐級(jí)繼承的祖先 RDD 鏈形成時(shí),我們說(shuō)從 RDD 到 根 RDD 的路徑是其譜系。

  下面清單所示的示例是一個(gè)簡(jiǎn)單的算法,可生成一個(gè)新頂點(diǎn)集并更新圖。這個(gè) 算法迭代的次數(shù)由變量 iterations 控制。
        揭秘Spark應(yīng)用性能調(diào)優(yōu)
  上述代碼每一次調(diào)用 joinVertices 都會(huì)增加一個(gè)新 RDD 到頂點(diǎn) RDD 鏈中。 顯然我們需要使用緩存來(lái)確保在每次迭代中避免重新計(jì)算 RDD 鏈,但這并不能改變一個(gè)事實(shí),那就是有一個(gè)不斷增長(zhǎng)的子 RDD 到父 RDD 的對(duì)象引用列表。
  這樣的后果是,如果運(yùn)行迭代次數(shù)過(guò)多,運(yùn)行的代碼中最終會(huì)爆出 Stack- OverflowError 棧溢出錯(cuò)誤。通常迭代 500次就會(huì)出現(xiàn)棧溢出。
  而由 RDD 提供并且被 Graph 繼承的一個(gè)特性 :checkpointing,能解決長(zhǎng) RDD 譜系問(wèn)題。下面清單中的代碼示范了如何使用 checkpointing,這樣就可以持續(xù)輸出 頂點(diǎn),更新結(jié)果圖。
              揭秘Spark應(yīng)用性能調(diào)優(yōu)
  一個(gè)標(biāo)記為 checkpointing 的 RDD 會(huì)把 RDD 保存到一個(gè) checkpoint 目錄,然 后指向父 RDD 的連接被切斷,即切斷了 lineage 譜系。一個(gè)標(biāo)記為 checkpointing 的 Graph 會(huì)導(dǎo)致下面的頂點(diǎn) RDD 和邊 RDD 做 checkpoint。
  調(diào)用 SparkContext.setCheckpointDir 來(lái)設(shè)置 checkpoint 目錄,指定一個(gè) 共享存儲(chǔ)系統(tǒng)的文件路徑,如 HDFS。
  如前面的代碼清單所示,必須在調(diào)用 RDD 任何方法之前調(diào)用 checkpoint,這 是因?yàn)?checkpointing 是一個(gè)相當(dāng)耗時(shí)的過(guò)程(畢竟需要把圖寫(xiě)入磁盤文件),通常 需要不斷地 checkpoint 避免棧溢出錯(cuò)誤,一般可以每 100 次迭代做一次 checkpoint。

注意 :一個(gè)加速 checkpointing 的選擇是 checkpoint 到 Tachyon(已 更名為 Alluxio),而不是checkpoint 到標(biāo)準(zhǔn)的文件系 統(tǒng)。Alluxio,來(lái)自 AMPLab,是一個(gè)“以內(nèi)存為中心的有容錯(cuò)能力的分布式文件系統(tǒng),它能讓Spark 這類集群框架加速訪問(wèn)共享在內(nèi)存中的文件”。

3 通過(guò)序列化降低內(nèi)存壓力

  內(nèi)存壓力(內(nèi)存不夠用)往往是 Spark 應(yīng)用性能差和容易出故障的主要原因 之一。這些問(wèn)題通常表現(xiàn)為頻繁的、耗時(shí)的 JVM 垃圾回收和“內(nèi)存不足”的錯(cuò) 誤。checkpointing 在這里也不能緩解內(nèi)存壓力。遇到這種問(wèn)題,首先要考慮序列化 Graph 對(duì)象。

定義 :數(shù)據(jù)序列化,Data serialization,是把 JVM 里表示的對(duì)象實(shí) 例轉(zhuǎn)換(序列化)成字節(jié)流 ;把字節(jié)流通過(guò)網(wǎng)絡(luò)傳輸?shù)搅硪粋€(gè) JVM 進(jìn)程 中 ;在另一個(gè) JVM 進(jìn)程中,字節(jié)流可以被“反序列化”為一個(gè)對(duì)象實(shí)例。Spark用序列化的方式,可以在網(wǎng)絡(luò)間傳輸對(duì)象,也可以把序列化后的字節(jié)流緩存在內(nèi)存中。

  要用序列化,可以選用 persist 中下面的 StorageLevels :

  • StorageLevel.MEMORY_ONLY_SER

  • StorageLevel.MEMORY_AND_DISK_SER

序列化節(jié)省了空間,同時(shí)序列化和反序列化也會(huì)增加 CPU 的開(kāi)銷。

使用 Kryo 序列化

  Spark 默認(rèn)使用 JavaSerializer 來(lái)序列化對(duì)象,這是一個(gè)低效的 Java 序列化框架,一個(gè)更好的選擇是選用 Kryo。Kryo 是一個(gè)開(kāi)源的 Java 序列化框架,提供了 快速高效的序列化能力。
  Spark 中使用 Kryo 序列 化,只需要設(shè)置 spark.serializer 參數(shù)為 org. apache.spark.serializer.KryoSerializer,如這樣設(shè)置命令行參數(shù) :

spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"

  要是每次都這樣設(shè)置參數(shù),會(huì)很煩瑣。可以在 $Spark_HOME/conf/spark-
defaults.conf 這個(gè)配置文件中,用標(biāo)準(zhǔn)的屬性文件語(yǔ)法(用 Tab 分隔作為一行),把 spark.serializer 等參數(shù)及其對(duì)應(yīng)的值寫(xiě)入這個(gè)配置文件,如下所示 :

spark.serializer org.apache.spark.serializer.KryoSerializer

  為保證性能最佳,Kryo 要求注冊(cè)要序列化的類,如果不注冊(cè),類名也會(huì)被序列 化在對(duì)象字節(jié)碼里,這樣對(duì)性能有較大影響。幸運(yùn)的是,Spark 對(duì)其框架里用到的 類做了自動(dòng)注冊(cè) ;但是,如果應(yīng)用程序代碼里有自定義的類,恰好這些自定義類也 要用 Kryo 序列化,那就需要調(diào)用 SparkConf.registerKryoClasses 函數(shù)來(lái)手 動(dòng)注冊(cè)。下面的清單展示了如何注冊(cè) Person 這個(gè)自定義類。
         揭秘Spark應(yīng)用性能調(diào)優(yōu)

檢查 RDD 大小

  在應(yīng)用程序調(diào)優(yōu)時(shí),常常需要知道 RDD 的大小。這就很棘手,因?yàn)槲募驍?shù) 據(jù)庫(kù)中對(duì)象的大小和 JVM 中對(duì)象占用多少內(nèi)存沒(méi)有太大關(guān)系。
  一個(gè)小技巧是,先將 RDD 緩存到內(nèi)存中,然后到 Spark UI 中的 Storage 選項(xiàng)卡, 這里記錄著 RDD 的大小。要衡量配置了序列化的效果,用這個(gè)方法也可以。

  本文選自《Spark GraphX實(shí)戰(zhàn)》,點(diǎn)此鏈接可在博文視點(diǎn)官網(wǎng)查看此書(shū)。
                     揭秘Spark應(yīng)用性能調(diào)優(yōu)
  想及時(shí)獲得更多精彩文章,可在微信中搜索“博文視點(diǎn)”或者掃描下方二維碼并關(guān)注。
                       揭秘Spark應(yīng)用性能調(diào)優(yōu)


向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI