溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

spark任務(wù)運(yùn)行過程的源碼分析

發(fā)布時(shí)間:2020-06-09 21:15:15 來源:網(wǎng)絡(luò) 閱讀:1261 作者:原生zzy 欄目:大數(shù)據(jù)

spark任務(wù)運(yùn)行的源碼分析

在整個(gè)spark任務(wù)的編寫、提交、執(zhí)行分三個(gè)部分:
① 編寫程序和提交任務(wù)到集群中
②sparkContext的初始化
③觸發(fā)action算子中的runJob方法,執(zhí)行任務(wù)

(1)編程程序并提交到集群:

①編程spark程序的代碼
②打成jar包到集群中運(yùn)行
③使用spark-submit命令提交任務(wù)
在提交任務(wù)時(shí),需要指定 --class 程序的入口(有main方法的類),
1) spark-submit --class xxx
2) ${SPARK_HOME}/bin/spark-class org.apache.spark.deploy.SparkSubmit $@
3) org.apache.spark.launcher.Main
submit(appArgs, uninitLog)
doRunMain()
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
childMainClass:…./.WordCount (自己編寫的代碼的主類)
mainClass = Utils.classForName(childMainClass)
val app: SparkApplication = if() {} else {new JavaMainApplication(mainClass)}
app.start(childArgs.toArray, sparkConf) // 通過反射調(diào)用mainClass執(zhí)行
// 到此為止,相當(dāng)于調(diào)用了我們的自己編寫的任務(wù)類的main方法執(zhí)行了。?。?!
val mainMethod = klass.getMethod("main", new ArrayString.getClass)
mainMethod.invoke(null, args)
④開始執(zhí)行自己編寫的代碼

(2)初始化sparkContext:

當(dāng)自己編寫的程序運(yùn)行到:new SparkContext()時(shí),就開始了精妙而細(xì)致的sparkContext的初始化。
sparkContext的相關(guān)介紹:sparkContext是用戶通往spark集群的唯一入口,可以用來在spark集群中創(chuàng)建RDD、累加器和廣播變量。sparkContext也是整個(gè)spark應(yīng)用程序的一個(gè)至關(guān)重要的對(duì)象,是整個(gè)應(yīng)用程序運(yùn)行調(diào)度的核心(不是資源調(diào)度的核心)。在初始化sparkContext時(shí),同時(shí)的會(huì)初始化DAGScheduler、TaskScheduler和SchedulerBackend,這些至關(guān)重要的對(duì)象。
sparkContext的構(gòu)建過程
spark任務(wù)運(yùn)行過程的源碼分析

1)Driver端執(zhí)行的代碼:

初始化 TaskScheduler
 初始化 SchedulerBackend
 初始化 DAGScheduler

spark任務(wù)運(yùn)行過程的源碼分析

2)worker和master端執(zhí)行的代碼:

driver向master注冊(cè)申請(qǐng)資源。
  Worker負(fù)責(zé)啟動(dòng)executor。

spark任務(wù)運(yùn)行過程的源碼分析

(3)觸發(fā)action算子中的runJob方法:

spark任務(wù)運(yùn)行過程的源碼分析

spark任務(wù)運(yùn)行總結(jié):

  • 將編寫的程序打成jar包
  • 調(diào)用spark-submit提交任務(wù)到集群上運(yùn)行
  • 運(yùn)行sparkSubmit 的main方法,在這個(gè)方法中通過反射的方式創(chuàng)建我們編寫的主類的實(shí)例對(duì)象,然后調(diào)用該對(duì)象的main方法,開始執(zhí)行我們編寫的代碼
  • 當(dāng)代碼運(yùn)行到new SparkContext對(duì)象的的時(shí)候,就開始了復(fù)雜和精致的sparkContext對(duì)象的初始化
  • 在初始化SparkContext對(duì)象的時(shí)候,會(huì)創(chuàng)建兩個(gè)特別重要的對(duì)象,分別是:DAGScheduler 和 TaskScheduler,其中【DAGScheduler 的作用】將RDD的依賴切成一個(gè)一個(gè)的stage,然后stage作為taskSet提交給Taskscheduler。
  • 在構(gòu)建TaskScheduler的同時(shí),會(huì)創(chuàng)建兩個(gè)非常重要的對(duì)象,分別是 DriverActor 和 ClientActor,DriverActor負(fù)責(zé)接收executor的反向注冊(cè),將任務(wù)提交給executor運(yùn)行,clientActor是負(fù)責(zé)向master注冊(cè)并提交任務(wù)
  • 當(dāng)clientActor啟動(dòng)時(shí),會(huì)將用戶提交的任務(wù)相關(guān)的參數(shù)分裝到applicationDescription對(duì)象中去,然后提交給master進(jìn)行任務(wù)注冊(cè)
  • 當(dāng)master接收到clientActor提交的任務(wù)請(qǐng)求時(shí),會(huì)將請(qǐng)求的參數(shù)進(jìn)行分析,并封裝成application,然后將其持久化,然后將其加入到任務(wù)隊(duì)列waitingApps中。
  • 當(dāng)輪到我們提交任務(wù)的時(shí)候,就開始執(zhí)行schedule(),進(jìn)行任務(wù)資源的調(diào)度
  • worker接收到master發(fā)送來的launchExecutor 時(shí),會(huì)將其解壓并封裝到ExecutorRunner中,然后調(diào)用這個(gè)對(duì)象的start方法,啟動(dòng)executor
  • executor啟動(dòng)后會(huì)向driver反向注冊(cè)
  • driver會(huì)發(fā)送注冊(cè)成功信息,給executor
  • executor接收到driver actor注冊(cè)成功信息后,就會(huì)創(chuàng)建一個(gè)線程池,用于執(zhí)行driveractor發(fā)送過來的任務(wù)
  • 當(dāng)屬于這個(gè)任務(wù)的所有的 Executor 啟動(dòng)并反向注冊(cè)成功后,就意味著運(yùn)行這個(gè)任務(wù)的 環(huán)境已經(jīng)準(zhǔn)備好了,driver 會(huì)結(jié)束 SparkContext 對(duì)象的初始化,也就意味著 new SparkContext 這句代碼運(yùn)行完成
  • 當(dāng)sparkContext初始化完成之后,就會(huì)繼續(xù)運(yùn)行我們的代碼,直到運(yùn)行到action算子時(shí),也就意味著觸發(fā)了一個(gè)job的提交
  • driver 會(huì)將這個(gè) job 提交給 DAGScheduler
  • DAGScheduler將接收到的job,從最后一個(gè)算子開始推導(dǎo),將DAG根據(jù)依賴關(guān)系劃分成為一個(gè)個(gè)stage,然后將stage封裝成一個(gè)taskSet,并將taskSet中的task提交給taskScheduler
  • taskScheduler接收到DAGScheduler發(fā)送過來的task,會(huì)拿到一個(gè)序列化器,對(duì)task進(jìn)行序列化,然后將序列化好的task封裝到launchTask中,然后將launchTask發(fā)送給指定的executor中運(yùn)行
  • executor接收到了DriverActor 發(fā)送過來的launchTask 時(shí),會(huì)拿到一個(gè)反序列化器,對(duì)launchTask 進(jìn)行反序列化,封裝到一個(gè)TaskRunner 中,然后從executor這個(gè)線程池中獲取一個(gè)線程,將反序列化好的任務(wù)中的算子作用在RDD對(duì)應(yīng)的分區(qū)上。
  • 最終當(dāng)所有的task任務(wù)完成之后,整個(gè)application執(zhí)行完成,關(guān)閉sparkContext對(duì)象。
向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI