溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

spark(二):spark架構(gòu)及物理執(zhí)行圖

發(fā)布時(shí)間:2020-08-05 12:00:29 來(lái)源:網(wǎng)絡(luò) 閱讀:851 作者:afeiye 欄目:大數(shù)據(jù)

spark(二):spark架構(gòu)及物理執(zhí)行圖
上圖是一個(gè)job的提交流程圖,job提交的具體步驟如下

  1. 一旦有action,就會(huì)觸發(fā)DagScheduler.runJob來(lái)提交任務(wù),主要是先生成邏輯執(zhí)行圖DAG,然后調(diào)用 finalStage = newStage() 來(lái)劃分 stage。
  2. new Stage() 的時(shí)候會(huì)調(diào)用 finalRDD 的 getParentStages();
  3. getParentStages() 從 finalRDD 出發(fā),反向 visit 邏輯執(zhí)行圖,遇到 NarrowDependency 就將依賴的 RDD 加入到 stage,遇到 ShuffleDependency 切開(kāi) stage,并遞歸到 ShuffleDepedency 依賴的 stage。
  4. 一個(gè) ShuffleMapStage(不是最后形成 result 的 stage)形成后,會(huì)將該 stage 最后一個(gè) RDD 注冊(cè)到MapOutputTrackerMaster.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size),這一步很重要,因?yàn)?shuffle 過(guò)程需要 MapOutputTrackerMaster 來(lái)指示 ShuffleMapTask 輸出數(shù)據(jù)的位置。
  5. 之后是submitStage(finalStage)
  6. 先確定該 stage 的 missingParentStages,使用getMissingParentStages(stage)。如果 parentStages 都可能已經(jīng)執(zhí)行過(guò)了,那么就為空了。
  7. 如果 missingParentStages 不為空,那么先遞歸提交 missing 的 parent stages,并將自己加入到 waitingStages 里面,等到 parent stages 執(zhí)行結(jié)束后,會(huì)觸發(fā)提交 waitingStages 里面的 stage。
  8. 如果 missingParentStages 為空,說(shuō)明該 stage 可以立即執(zhí)行,那么就調(diào)用submitMissingTasks(stage, jobId)來(lái)生成和提交具體的 task。如果 stage 是 ShuffleMapStage,那么 new 出來(lái)與該 stage 最后一個(gè) RDD 的 partition 數(shù)相同的 ShuffleMapTasks。如果 stage 是 ResultStage,那么 new 出來(lái)與 stage 最后一個(gè) RDD 的 partition 個(gè)數(shù)相同的 ResultTasks。一個(gè) stage 里面的 task 組成一個(gè) TaskSet,最后調(diào)用taskScheduler.submitTasks(taskSet)來(lái)提交一整個(gè) taskSet。
  9. taskScheduler會(huì)把task發(fā)給DriverActor進(jìn)程,DriverActor序列話之后發(fā)給exector真正執(zhí)行。

spark(二):spark架構(gòu)及物理執(zhí)行圖
上圖是task執(zhí)行流程,具體執(zhí)行過(guò)程如下

  1. Worker 端接收到 tasks 后,executor 將 task 包裝成 taskRunner,并從線程池中抽取出一個(gè)空閑線程運(yùn)行 task。
  2. Executor 收到 serialized 的 task 后,先 deserialize 出正常的 task,然后運(yùn)行 task 得到其執(zhí)行結(jié)果 directResult,這個(gè)結(jié)果要送回到 driver 那里。
  3. 如果 result 比較大(比如 groupByKey 的 result)先把 result 存放到本地的“內(nèi)存+磁盤”上,由 blockManager 來(lái)管理,只把存儲(chǔ)位置信息(indirectResult)發(fā)送給 driver。
  4. ShuffleMapTask 生成的是 MapStatus,MapStatus 包含兩項(xiàng)內(nèi)容:一是該 task 所在的 BlockManager 的 BlockManagerId(實(shí)際是 executorId + host, port, nettyPort),二是 task 輸出的每個(gè) FileSegment 大小。
  5. ResultTask 生成的 result 的是 func 在 partition 上的執(zhí)行結(jié)果。**比如 count() 的 func 就是統(tǒng)計(jì) partition 中 records 的個(gè)數(shù)。
  6. Driver 收到 task 的執(zhí)行結(jié)果 result 后會(huì)進(jìn)行一系列的操作:
  7. a,首先告訴 taskScheduler 這個(gè) task 已經(jīng)執(zhí)行完,然后去分析 result。
  8. b,如果是 ResultTask 的 result,那么可以使用 ResultHandler 對(duì) result 進(jìn)行 driver 端的計(jì)算(比如 count() 會(huì)對(duì)所有 ResultTask 的 result 作 sum)
  9. c,如果 result 是 ShuffleMapTask 的 MapStatus,那么需要將 MapStatus(ShuffleMapTask 輸出的 FileSegment 的位置和大小信息)存放到 mapOutputTrackerMaster 中的 mapStatuses 數(shù)據(jù)結(jié)構(gòu)中以便以后 reducer shuffle 的時(shí)候查詢
  10. d,如果 driver 收到的 task 是該 stage 中的最后一個(gè) task,那么可以 submit 下一個(gè) stage,如果該 stage 已經(jīng)是最后一個(gè) stage,那么告訴 dagScheduler job 已經(jīng)完成
向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI