您好,登錄后才能下訂單哦!
這篇文章主要介紹“DAG任務(wù)分解和Shuffle RDD怎么使用”,在日常操作中,相信很多人在DAG任務(wù)分解和Shuffle RDD怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”DAG任務(wù)分解和Shuffle RDD怎么使用”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習吧!
1、DagScheduler分析
DagScheduler功能主要是負責RDD的各個stage的分解和任務(wù)提交。Stage分解是從觸發(fā)任務(wù)調(diào)度過程的finalStage開始倒推尋找父stage,如果父stage沒有提交任務(wù)則循環(huán)提交缺失的父stage。每個stage有一個父RDD的概念,根據(jù)分區(qū)數(shù)的多少創(chuàng)建多個任務(wù)(Task)。
Task的調(diào)度實際是通過TaskSchedulerImp來完成的,TaskSchedulerImp里根據(jù)環(huán)境部署的不同又會使用不同的Backend,比如Yarn集群、獨立集群等其Backend是不一樣的,這里先有個概念,先不深究Backend。
這里先看看DagScheduler的核心邏輯把。里面首先要研究的一個方法:
def submitMissingTasks(stage: Stage, jobId: Int)
該方法就是提交stage執(zhí)行,為什么叫這個名稱呢?說明這里的stage是需先需要提交執(zhí)行的,沒有其他依賴的stage還未執(zhí)行了。
submitMissingTasks方法會根據(jù)RDD的依賴關(guān)系創(chuàng)建兩種task,ResultTask和ShuffleMapTask。
一步步來,只看關(guān)鍵代碼,因為整體代碼太多了不利于理解關(guān)鍵邏輯。
1.1 生成序列化的taskBinary
taskBinaryBytes = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) }
taskBinaryBytes待會是要封裝成對像分發(fā)到遠端Executor上執(zhí)行的,所以必須是可序列化的。
兩者最主要區(qū)別就是:ShuffleMapStage的入?yún)⑹且蕾嚨膕huffleDep;而ResultStage的入?yún)⑹呛瘮?shù)的定義func。
1.2 生成task
現(xiàn)在有了taskBinaryBytes,下一步就是生成Task了。
val tasks: Seq[Task[_]] = try { stage match { case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = partitions(id) stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } } } catch { case NonFatal(e) => abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return }
兩種Task類型:ShuffleMapTask和ResultTask。這里要主要的是對Task而言,有多少分區(qū)(partition)就會生成多少個Task,Task是到分區(qū)維度的,而不是到RDD維度的,這個概念一定要明確。
1.3 提交Task
最后一步就是提交任務(wù)執(zhí)行。這里就要用到taskScheduler了,當然了,這里的taskScheduler目前就是指TaskSchedulerImp。
taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
DagScheduler里還有一個方法這里可以提一下,就是:
submitWaitingChildStages(stage)
這個方法是提交等待當前stage執(zhí)行的等待stage,這樣DAG的整個調(diào)度過程就完整了。
2、Task執(zhí)行
兩種Task類型:ShuffleMapTask和ResultTask。
2.1 ResultTask
我們先看ResultTask的執(zhí)行,它相對比較簡單,核心方式是runTask,核心代碼:
override def runTask(context: TaskContext): U = { val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) func(context, rdd.iterator(partition, context)) }
反序列化出來RDD和func,然后執(zhí)行rdd的iterator方法獲取數(shù)據(jù)集,并在此數(shù)據(jù)集上執(zhí)行func函數(shù),要注意實際上這是一次迭代過程而不是多次迭代過程。
2.2 ShuffleMapTask
ShuffleMapTask任務(wù)的執(zhí)行相對復(fù)雜些。
核心方法還是runTask,核心代碼:
override def runTask(context: TaskContext): MapStatus = { val ser = SparkEnv.get.closureSerializer.newInstance() val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) val rdd = rddAndDep._1 val dep = rddAndDep._2 dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition) }
首先反序列化出RDD和依賴項ShuffleDependency。然后用ShuffleWriterProcessor寫數(shù)據(jù)到RDD。
這里的dep其實沒太大意義,主要就是來判斷是否要進行合并使用的,不影響理解整個shuffle流程,所以我們可以先不要管dep:
dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)
這里的rdd實際就是ShuffleMapTask所要生成的數(shù)據(jù)集。這句代碼到底是什么意思呢? ShuffleWriterProcessor實際上是將數(shù)據(jù)集寫到了BlockManager上去的,先看看ShuffleWriterProcessor的含義。
2.3 ShuffleWriterProcessor
ShuffleWriterProcessor的關(guān)鍵方法的定義先看一下。
def write(rdd: RDD[_],dep: ShuffleDependency[_, _, _], partitionId: Int, context: TaskContext,partition: Partition): MapStatus = { var writer: ShuffleWriter[Any, Any] = null val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any]( dep.shuffleHandle, partitionId, context, createMetricsReporter(context)) writer.write( rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get }
ShuffleManager實際上就是BlockManager,管理塊空間的。
Write是Shuffle寫入器,寫到BlockManager去;rdd.iterator(partition, context)就是當前Shuffle類型的RDD定義的數(shù)據(jù)集,dep是rdd計算數(shù)據(jù)集時依賴的RDD(這里的dep沒多大意思先不管)。
這段代碼的作用就是將shuffle rdd數(shù)據(jù)集輸出到BlockManager上,在讀取RDD的數(shù)據(jù)時,如果該RDD是shuffle類型,則需要到BlockManager上去讀取,這里就是這個作用。
2.4 Shuffle RDD的相關(guān)概念
Shuffle類的RDD是指這類RDD的compute方法是依賴于其他RDD的,這里的其他RDD可以是多個。執(zhí)行shuffle的RDD的計算過程的時候,是將一到多個依賴RDD的迭代器的輸出作為數(shù)據(jù)源迭代器,在此之上執(zhí)行自己的操作。所以shuffle RDD的compute方法里一定會用到依賴RDD的iterator方法。
可以看看CoGroupedRDD的源碼,就能很快的理解shuffle的含義。
到此,關(guān)于“DAG任務(wù)分解和Shuffle RDD怎么使用”的學(xué)習就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習,快去試試吧!若想繼續(xù)學(xué)習更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。