溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark RDD的彈性指的是什么

發(fā)布時間:2021-12-16 20:40:15 來源:億速云 閱讀:194 作者:柒染 欄目:大數(shù)據(jù)

Spark RDD的彈性指的是什么,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。

RDD(Resiliennt Distributed Datasets)抽象彈性分布式數(shù)據(jù)集對于Spark來說的彈性計算到底提現(xiàn)在什么地方?

自動進行內(nèi)存和磁盤數(shù)據(jù)這兩種存儲方式的切換

Spark 可以使用 persist 和 cache 方法將任意 RDD 緩存到內(nèi)存或者磁盤文件系統(tǒng)中。數(shù)據(jù)會優(yōu)先存儲到內(nèi)存中,當內(nèi)存不足以存放RDD中的數(shù)據(jù)的時候,就會持久化到磁盤上。這樣,就可以最大化的利益內(nèi)存以達到最高的計算效率;同時又有磁盤作為兜底存儲方案以確保計算結果的正確性。

基于Linage的高效容錯機制

Linage是用于記錄RDD的父子依賴關系,子RDD會記錄父RDD,且各個分片之間的數(shù)據(jù)互不影響。當出現(xiàn)錯誤的時候,只需要恢復單個Split的特定部分即可。常規(guī)容錯方式有兩種:第一種是數(shù)據(jù)Check Poin檢查點;第二個是記錄數(shù)據(jù)的更新。一般意義上CheckPoin的基本工作方式是通過數(shù)據(jù)中心的網(wǎng)絡鏈接到不同的機器上,然后每次操作的時候都要復制數(shù)據(jù)集。相當于每個更新都對應一個記錄且同步到分布式集群中的各個節(jié)點上。由此集群間網(wǎng)絡和磁盤資源耗損比較大。但是Spark的RDD只有在Action操作的時候才會真正觸發(fā)計算,而Transform操作是惰性的,所以期間只有在Action操作的時候才會記錄到CheckPoint中。

Task失敗自動重試

缺省情況下,會自動重試4次。也可以在spark-submit的時候指定spark.task.maxFailures參數(shù)來修改缺省值

Spark RDD的彈性指的是什么

private[spark] class TaskSchedulerImpl(
   val sc: SparkContext,
   val maxTaskFailures: Int,
   isLocal: Boolean = false)
 extends TaskScheduler with Logging {

 import TaskSchedulerImpl._

 def this(sc: SparkContext) = {
   this(sc, sc.conf.get(config.TASK_MAX_FAILURES))
 }
 
 ...

源碼:https://github.com/apache/spark/blob/e1ea806b3075d279b5f08a29fe4c1ad6d3c4191a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 而其中的config.TASK_MAX_FAILURES來自:https://github.com/apache/spark/blob/e1ea806b3075d279b5f08a29fe4c1ad6d3c4191a/core/src/main/scala/org/apache/spark/internal/config/package.scala

  private[spark] val TASK_MAX_FAILURES =
   ConfigBuilder("spark.task.maxFailures")
     .intConf
     .createWithDefault(4)

TaskScheduler從每一個Stage的DAGScheduler中獲取TaskSet,運行并校驗是否存在故障。如果存在故障則會重試指定測試

Stage失敗自動重試

Stage對象可以記錄并跟蹤多個StageInfo,缺省的重試次數(shù)也是4次。且可以直接運行計算失敗的階段,值計算失敗的數(shù)據(jù)分片。Stage是Spark Job運行時均有相同邏輯功能和并行計算任務的一個基本單元。其中的所有任務都依賴同樣的Shuffle,每個DAG任務都是通過DAGScheduler在Stage的邊界處發(fā)生Shuffle形成Stage,然后DAGScheduler按照這些拓撲結構執(zhí)行

  /**
  * Number of consecutive stage attempts allowed before a stage is aborted.
  */
 private[scheduler] val maxConsecutiveStageAttempts =
   sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
     DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)

源碼:https://github.com/apache/spark/blob/094563384478a402c36415edf04ee7b884a34fc9/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Checkpoint和Persist

Checkpoin和Persist都可以由我們自己來調(diào)用。checkpoint是對RDD進行標記,并且產(chǎn)生對應的一系列文件,且所有父依賴都會被刪除,是整個Linage的終點。persist工作的主體RDD會把計算的分片結果保存在內(nèi)存或磁盤上,以確保下次針對同一個RDD的調(diào)用可以重用。兩種的區(qū)別如下:

  • Persist將RDD緩存之后,其Linage關系任然存在,在節(jié)點宕機或RDD部分緩存丟失的時候,RDD任然可以根據(jù)Linage關系來重新運算;而Checkpoin將RDD寫入到文件系統(tǒng)之后,將不再維護Linage

  • rdd.persist即使調(diào)用的是DISK_ONLY操作,也就是只寫入文件系統(tǒng),該寫入rdd是由BlockManager管理,executor程序停止后BlockManager也就停止了,所以其持久化到磁盤中的數(shù)據(jù)也會被清理掉;而checkpoint持久化到文件系統(tǒng)(HDFS文件或者是本地文件系統(tǒng)),不會被刪除,還可以供其他程序調(diào)用。

Spark RDD的彈性指的是什么

數(shù)據(jù)調(diào)度彈性

上面所提到的DAGScheduler和TaskScheduler和資源無關。Spark將執(zhí)行模型抽象為DAG,可以讓多個Stage任務串聯(lián)或者并行執(zhí)行,而無需將中間結果寫入到HDFS中。這樣當某個節(jié)點故障的時候,可以由其他節(jié)點來執(zhí)行出錯的任務。

Spark RDD的彈性指的是什么

數(shù)據(jù)分片coalesce

Spark進行數(shù)據(jù)分片的時候,默認將數(shù)據(jù)存放在內(nèi)存中,當內(nèi)存不夠的時候,會將一部分存放到磁盤上。如經(jīng)過分片后,某個Partition非常小,就可以合并多個小的partition來計算,而不用每個partition都起一個線程。這樣就提高了效率,也不會因為大量的線程而導致OOMSpark RDD的彈性指的是什么

看完上述內(nèi)容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業(yè)資訊頻道,感謝您對億速云的支持。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI