溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Spark中的RDD到底是什么

發(fā)布時(shí)間:2021-08-31 10:47:49 來(lái)源:億速云 閱讀:178 作者:chen 欄目:云計(jì)算

這篇文章主要講解了“Spark中的RDD到底是什么”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Spark中的RDD到底是什么”吧!

Spark是開(kāi)源的分布式計(jì)算引擎,基于RDD來(lái)構(gòu)造數(shù)據(jù)處理流程,并在集群間調(diào)度任務(wù),通過(guò)分區(qū)數(shù)據(jù)管理機(jī)制來(lái)劃分任務(wù)的并行度,并在任務(wù)之間交換分區(qū)數(shù)據(jù),實(shí)現(xiàn)分布式的數(shù)據(jù)處理。

RDD是Spark中最重要的概念,理解了RDD是什么,基本也就理解了一半Spark的內(nèi)部機(jī)密了。

1、RDD基類

RDD是Spark中表示數(shù)據(jù)集的基類,是可序列號(hào)的對(duì)象,因此RDD可在Spark節(jié)點(diǎn)中復(fù)制。RDD定義了數(shù)據(jù)迭代器來(lái)循環(huán)讀取數(shù)據(jù),以及在數(shù)據(jù)集上定義各類轉(zhuǎn)換操作,生成新的RDD。

RDD的各種算子會(huì)觸發(fā)生成新的RDD。如:

map操作生成MapPartitionsRDD。

filter操作也生成MapPartitionsRDD,filter操作其實(shí)是在之前的RDD迭代器上封裝了一層filter操作,其實(shí)還是第一個(gè)迭代器,只不過(guò)這個(gè)迭代器會(huì)拋棄掉一些不滿足的記錄。

RDD的計(jì)算過(guò)程是通過(guò)compute方法來(lái)觸發(fā)的。

1.1 RDD觸發(fā)任務(wù)

submit過(guò)程是提交spark程序到集群,這時(shí)候會(huì)觸發(fā)application事件和driver事件等,并通過(guò)master節(jié)點(diǎn)選擇對(duì)應(yīng)的node來(lái)創(chuàng)建app和driver,同時(shí)在node上執(zhí)行spark jar包里的main方法。但task的真正執(zhí)行要等到RDD的compute動(dòng)作來(lái)觸發(fā)的。

RDD通過(guò)compute觸發(fā)任務(wù),提交FinalStage給Dag執(zhí)行。如collect(),count()等方法都會(huì)觸發(fā)compute過(guò)程,間接提交任務(wù)。

RDD.compute()=> finalStage => dag.submitJob()=> submitMissingStage() .

dag.submitJob()=> scheduleImpl.launchTask()=>scheduleBackend => executorBackend=> executor.launchTask()=> executorBackend.taskComplete msg => scheduleBackend.taskCompleted=>dag.stageFinished()=> ...

上面是RDD提交任務(wù)的大致流程。Compute函數(shù)是觸發(fā)函數(shù),這會(huì)導(dǎo)致最后一個(gè)RDD被執(zhí)行,也是finalStage;finalStage調(diào)用DAG的submitJob函數(shù)提交stage,這里的stage就是finalStage。

Stage是從源頭到finalStage串起來(lái)的,執(zhí)行的時(shí)候是反向?qū)ふ业?,這句話要好好體會(huì),這個(gè)過(guò)程其實(shí)就是RDD的秘密了。

Spark中的RDD到底是什么

我們先看下RDD的經(jīng)典圖例。圖中中間的部分Transformation是RDD的計(jì)算過(guò)程,左邊的HDFS示意數(shù)據(jù)源,右邊的HDFS示意RDD的finalStage執(zhí)行的操作(圖中的操作是寫(xiě)入hdfs,當(dāng)然也可以是print操作等等,就看你怎么寫(xiě)了)。

Stage1和stage2是窄依賴,map和union都是窄依賴;stage3是寬依賴,這里是join操作。窄依賴的意思就是操作只依賴一個(gè)stage的數(shù)據(jù),寬依賴的意思是依賴于多個(gè)stage,對(duì)這多個(gè)stage的數(shù)據(jù)要做全連接操作。

1.2、RDD執(zhí)行示例

RDD通過(guò)runJob調(diào)用來(lái)獲得執(zhí)行,如下:

def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

Sc是SparkContext。

對(duì)每個(gè)分區(qū)執(zhí)行func操作,返回結(jié)果是一個(gè)長(zhǎng)度等于分區(qū)數(shù)的Array。

Sc.runJob再調(diào)dagScheduler.runJob方法。具體可以看DagScheduler的作業(yè)執(zhí)行步驟,這里先不說(shuō),看筆者的專門(mén)論述DagScheduler的文章。

1.3、迭代器

RDD實(shí)際執(zhí)行是通過(guò)迭代器讀取數(shù)據(jù)的。

RDD是抽象類,定義了幾個(gè)接口:

分別是getPartitions、compute、getPreferredLocations。RDD數(shù)據(jù)是分區(qū)存儲(chǔ),每一個(gè)分區(qū)可能分布在申請(qǐng)spark資源的任何位置。這三個(gè)接口可以描述RDD的全部信息,其中g(shù)etPreferredLocations這個(gè)方法是和計(jì)算本地化有關(guān)的,這里我們就先忽略它,不影響我們理解RDD的原理。

override protected def getPartitions: Array[Partition] = {}
override def compute(split: Partition, context: TaskContext): Iterator[java.lang.Integer] = new NextIterator[Integer] {}

getPartitions方法我們也不用太關(guān)注,它的作用是返回一個(gè)分區(qū)列表,表示這個(gè)RDD有幾個(gè)分區(qū),實(shí)際運(yùn)行的時(shí)候RDD的每個(gè)分區(qū)會(huì)被安排到單獨(dú)的節(jié)點(diǎn)上運(yùn)行,這樣來(lái)實(shí)現(xiàn)分布式計(jì)算的。

我們最關(guān)心的是compute的方法,這個(gè)方法返回一個(gè)迭代器,這個(gè)迭代器就是這個(gè)RDD的split這個(gè)分區(qū)的數(shù)據(jù)集。至于這個(gè)迭代器的數(shù)據(jù)是什么,是在compute方法體中寫(xiě)代碼來(lái)生成的。我們可以定義自己的RDD,只要寫(xiě)代碼實(shí)現(xiàn)這幾個(gè)方法就可以了!

自定義RDD有什么好處呢?最大的好處就是可以把自己的數(shù)據(jù)集納入到Spark的分布式計(jì)算體系中,幫助你實(shí)現(xiàn)數(shù)據(jù)分區(qū),任務(wù)分配,和其他RDD執(zhí)行全連接匯聚操作等。

言歸正傳,回到compute方法本身。

怎么獲得Iterator[T],對(duì)ShuffleRDD來(lái)說(shuō)是從BlockManager獲取迭代器Iterator[T]。這種迭代器是blockResult,是ShuffleMapTask執(zhí)行結(jié)果的保存格式;另一種就是直接獲得iter,這種是ResultTask的執(zhí)行結(jié)果的數(shù)據(jù)。

第一種情況,看BlockManager能否找到本RDD的partition的BlockResult??纯磄etOrElseUpdate方法還傳遞了一個(gè)函數(shù)作為最后一個(gè)入?yún)ⅲ绻淮嬖谥付ǖ腂lockResult,則返回入?yún)⒑瘮?shù)來(lái)計(jì)算得到iter,方法體定義如下:

() => {
  readCachedBlock = false
  computeOrReadCheckpoint(partition, context)
}

主要就是調(diào)用computeOrReadCheckpoint方法計(jì)算分區(qū)。

def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
  if (isCheckpointedAndMaterialized) {
    firstParent[T].iterator(split, context)
  } else {
    compute(split, context)
  }
}

computeOrReadCheckpoint得到Iterator,如果是checkpoint的那么調(diào)用第一個(gè)父類的iterator方法得到Iterator,這里父類就是CheckpointRDD;否則就是調(diào)用compute方法得到Iterator。

所以,RDD的迭代器的實(shí)際獲取分成兩步:

首先,判斷是否存在該RDD指定partition的BlockResult,如果存在則將BlockResult作為Iterator結(jié)果,此時(shí)表示該RDD是shuffleRDD之類。

然后如果上述不滿足,則又分兩種情況,第一種這是checkpoint的RDD,則調(diào)用父RDD的iterator方法(此時(shí)父RDD就是CheckpointRDD);否則調(diào)用compute方法來(lái)獲得Iterator。

2、Stage劃分

我們知道RDD的提交Spark集群執(zhí)行是分階段劃分Stage提交的。從最后一個(gè)Stage開(kāi)始,依次循環(huán)遞歸判斷是否要調(diào)用依賴RDD的Stage,Stage的劃分是根據(jù)是否要Shuffle作為分界點(diǎn)的。

如果某個(gè)RDD的依賴(dep)是ShuffleDependency,則次RDD作為ShuffleMapTask任務(wù)提交,否則最后一個(gè)RDD作為ResultTask提交。

遞歸提交Stage,對(duì)ShuffleMapTask類型的RDD,會(huì)一直遞歸判斷該RDD是否存在前置的ShuffleDependency,如果存在則遞歸提交前依賴RDD。

整個(gè)Spark作業(yè)是RDD串接的,如果不存在Shuffle依賴,則提交最后一個(gè)RDD,并且只有這一個(gè)RDD被提交。在計(jì)算最后一個(gè)RDD的iterator時(shí),被調(diào)用到父RDD的iterator方法,此時(shí)父RDD一般都是MapPartitionsRDD。在MapPartitionsRDD中有進(jìn)一步敘述。

3、RDD子類

RDD含有多個(gè)子類,如MapPartitionRDD,HadoopRDD、CoGroupedRDD等等。筆者這里就找?guī)讉€(gè)例子簡(jiǎn)單說(shuō)明一下他們的內(nèi)部邏輯。 

3.1 MapPartitionsRDD

MapPartitionsRDD是RDD的子類,前面看到RDD的諸多算子都會(huì)生成新的MapPartitionRDD。

MapPartitionsRDD的構(gòu)造函數(shù)需要入?yún),它是一個(gè)函數(shù)抽象類或者叫做泛類。

f: (TaskContext, Int, Iterator[T]) => Iterator[U]

f的入?yún)⒂腥齻€(gè):

(1) TaskContext:是任務(wù)上下文

(2) Int:是分區(qū)編碼

(3) Iterator[T]是分區(qū)迭代器

f的輸出也是一個(gè)Iterator迭代器??梢钥闯觯琭是一個(gè)抽象的從一個(gè)迭代器生成另一個(gè)迭代器的自定義函數(shù)。對(duì)數(shù)據(jù)的處理邏輯就是體現(xiàn)在f上。

MapPartitionRDD中觸發(fā)計(jì)算的compute方法定義如下:

override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

這里的f是MapPartitionRDD的構(gòu)造函數(shù)中傳進(jìn)入的入?yún)?,是用戶自定義的map函數(shù)。這樣,通過(guò)RDD的map、flatmap等算子和MapPartitionRDD,可以將RDD上的一系列操作不停的串聯(lián)下去。

3.2 CoalescedRDD

CoalescedRDD將M個(gè)分區(qū)的RDD重新分成N個(gè)分區(qū),形成新的RDD。在計(jì)算過(guò)程中,會(huì)引起Shuffle工程。

首先CoalescedRDD需要一個(gè)重新分區(qū)算法,將M個(gè)分區(qū)如何劃分到N個(gè)分區(qū),這里M>N。重新分區(qū)的結(jié)果是N的每個(gè)分區(qū)對(duì)應(yīng)了M的多個(gè)分區(qū),用List<Int>來(lái)表示,List<Int>中每個(gè)Int表示父RDD中M個(gè)分區(qū)之一的編號(hào)。

如果CoalescedRDD沒(méi)有指定自己的重新分區(qū)算法,則用DefaultPartitionCoalescer來(lái)做重新分區(qū)計(jì)算。

CoalescedRDD的compute過(guò)程如下:

override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
  partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { 
    parentPartition => firstParent[T].iterator(parentPartition, context)
  }
}

partition.parents是指CoalescedRDD的第partition分區(qū)所對(duì)應(yīng)的父RDD的分區(qū)列表,對(duì)分區(qū)列表的每個(gè)分區(qū),執(zhí)行:

firstParent[T].iterator(parentPartition, context)

然后得到最終的Iterator[T]。這段應(yīng)該不難理解。

需要留意的是,這里得到的Iterator[T]最終是要寫(xiě)到Shuffle的,因?yàn)镃oalescedRDD對(duì)應(yīng)的ShuffleMapTask而不是ResultTask。

對(duì)于理解Spark計(jì)算流程來(lái)說(shuō),理解了Shuffle的過(guò)程,也就解決了一半的疑惑了。

感謝各位的閱讀,以上就是“Spark中的RDD到底是什么”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Spark中的RDD到底是什么這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI