溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么掌握Flink on YARN應用啟動流程

發(fā)布時間:2021-11-08 15:56:23 來源:億速云 閱讀:110 作者:iii 欄目:關(guān)系型數(shù)據(jù)庫

本篇內(nèi)容介紹了“怎么掌握Flink on YARN應用啟動流程”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

Flink on YARN 流程圖

Flink on YARN集群部署模式涉及YARN和Flink兩大開源框架,應用啟動流程的很多環(huán)節(jié)交織在一起,為了便于大家理解,在一張圖上畫出了Flink on YARN基礎架構(gòu)和應用啟動全流程,并對關(guān)鍵角色和流程進行了介紹說明,整個啟動流程又被劃分成客戶端提交(流程標注為紫色)、Flink Cluster啟動和Job提交運行(流程標注為橙色)兩個階段分別闡述,由于分支和細節(jié)太多,本文會忽略掉一些,只介紹關(guān)鍵流程(基于Flink開源1.9版本源碼整理)。

客戶端提交流程

1.執(zhí)行命令:bin/flink run -d -m yarn-cluster ...或bin/yarn-session.sh ...來提交per-job運行模式或session運行模式的應用;

2.解析命令參數(shù)項并初始化,啟動指定運行模式,如果是per-job運行模式將根據(jù)命令行參數(shù)指定的Job主類創(chuàng)建job graph;

  • 如果可以從命令行參數(shù)(-yid )或YARN properties臨時文件(${java.io.tmpdir}/.yarn-properties-${user.name})中獲取應用ID,向指定的應用提交Job;

  • 否則當命令行參數(shù)中包含 -d(表示detached模式)和 -m yarn-cluster(表示指定YARN集群模式),啟動per-job運行模式;

  • 否則當命令行參數(shù)項不包含 -yq(表示查詢YARN集群可用資源)時,啟動session運行模式;

3.獲取YARN集群信息、新應用ID并啟動運行前檢查;

  • 通過YarnClient向YARN ResourceManager(下文縮寫為:YARN RM,YARN Master節(jié)點,負責整個集群資源的管理和調(diào)度)請求創(chuàng)建一個新應用(YARN RM收到創(chuàng)建應用請求后生成新應用ID和container申請的資源上限后返回),并且獲取YARN Slave節(jié)點報告(YARN RM返回全部slave節(jié)點的ID、狀態(tài)、rack、http地址、總資源、已使用資源等信息);

  • 運行前檢查:(1) 簡單驗證YARN集群能否訪問;(2) 最大node資源能否滿足flink JobManager/TaskManager vcores資源申請需求;(3) 指定queue是否存在(不存在也只是打印WARN信息,后續(xù)向YARN提交時排除異常并退出);(4)當預期應用申請的Container資源會超出YARN資源限制時拋出異常并退出;(5) 當預期應用申請不能被滿足時(例如總資源超出YARN集群可用資源總量、Container申請資源超出NM可用資源最大值等)提供一些參考信息。

4.將應用配置(flink-conf.yaml、logback.xml、log4j.properties)和相關(guān)文件(flink jars、ship files、user jars、job graph等)上傳至分布式存儲(例如HDFS)的應用暫存目錄(/user/${user.name}/.flink/);

5.準備應用提交上下文(ApplicationSubmissionContext,包括應用的名稱、類型、隊列、標簽等信息和應用Master的container的環(huán)境變量、classpath、資源大小等),注冊處理部署失敗的shutdown hook(清理應用對應的HDFS目錄),然后通過YarnClient向YARN RM提交應用;

6.循環(huán)等待直到應用狀態(tài)為RUNNING,包含兩個階段:

  • 循環(huán)等待應用提交成功(SUBMITTED):默認每隔200ms通過YarnClient獲取應用報告,如果應用狀態(tài)不是NEW和NEW_SAVING則認為提交成功并退出循環(huán),每循環(huán)10次會將當前的應用狀態(tài)輸出至日志:"Application submission is not finished, submitted application is still in ",提交成功后輸出日志:"Submitted application "

  • 循環(huán)等待應用正常運行(RUNNING):每隔250ms通過YarnClient獲取應用報告,每輪循環(huán)也會將當前的應用狀態(tài)輸出至日志:"Deploying cluster, current state "。應用狀態(tài)成功變?yōu)镽UNNING后將輸出日志"YARN application has been deployed successfully." 并退出循環(huán),如果等到的是非預期狀態(tài)如FAILED/FINISHED/KILLED,就會在輸出YARN返回的診斷信息("The YARN application unexpectedly switched to state during deployment. Diagnostics from YARN: ...")之后拋出異常并退出。

Flink Cluster啟動流程

1.YARN RM中的ClientRMService(為普通用戶提供的RPC服務組件,處理來自客戶端的各種RPC請求,比如查詢YARN集群信息,提交、終止應用等)接收到應用提交請求,簡單校驗后將請求轉(zhuǎn)交給RMAppManager(YARN RM內(nèi)部管理應用生命周期的組件);

2.RMAppManager根據(jù)應用提交上下文內(nèi)容創(chuàng)建初始狀態(tài)為NEW的應用,將應用狀態(tài)持久化到RM狀態(tài)存儲服務(例如ZooKeeper集群,RM狀態(tài)存儲服務用來保證RM重啟、HA切換或發(fā)生故障后集群應用能夠正常恢復,后續(xù)流程中的涉及狀態(tài)存儲時不再贅述),應用狀態(tài)變?yōu)镹EW_SAVING;

3.應用狀態(tài)存儲完成后,應用狀態(tài)變?yōu)镾UBMITTED;RMAppManager開始向ResourceScheduler(YARN RM可拔插資源調(diào)度器,YARN自帶三種調(diào)度器FifoScheduler/FairScheduler/CapacityScheduler,其中CapacityScheduler支持功能最多使用最廣泛,F(xiàn)ifoScheduler功能最簡單基本不可用,今年社區(qū)已明確不再繼續(xù)支持FairScheduler,建議已有用戶遷至CapacityScheduler)提交應用,如果無法正常提交(例如隊列不存在、不是葉子隊列、隊列已停用、超出隊列最大應用數(shù)限制等)則拋出拒絕該應用,應用狀態(tài)先變?yōu)镕INAL_SAVING觸發(fā)應用狀態(tài)存儲流程并在完成后變?yōu)镕AILED;如果提交成功,應用狀態(tài)變?yōu)锳CCEPTED;

4.開始創(chuàng)建應用運行實例(ApplicationAttempt,由于一次運行實例中最重要的組件是ApplicationMaster,下文簡稱AM,它的狀態(tài)代表了ApplicationAttempt的當前狀態(tài),所以ApplicationAttempt實際也代表了AM),初始狀態(tài)為NEW;

5.初始化應用運行實例信息,并向ApplicationMasterService(AM&RM協(xié)議接口服務,處理來自AM的請求,主要包括注冊和心跳)注冊,應用實例狀態(tài)變?yōu)镾UBMITTED;

6.RMAppManager維護的應用實例開始初始化AM資源申請信息并重新校驗隊列,然后向ResourceScheduler申請AM Container(Container是YARN中資源的抽象,包含了內(nèi)存、CPU等多維度資源),應用實例狀態(tài)變?yōu)锳CCEPTED;

7.ResourceScheduler會根據(jù)優(yōu)先級(隊列/應用/請求每個維度都有優(yōu)先級配置)從根隊列開始層層遞進,先后選擇當前優(yōu)先級最高的子隊列、應用直至具體某個請求,然后結(jié)合集群資源分布等情況作出分配決策,AM Container分配成功后,應用實例狀態(tài)變?yōu)锳LLOCATED_SAVING,并觸發(fā)應用實例狀態(tài)存儲流程,存儲成功后應用實例狀態(tài)變?yōu)锳LLOCATED;

8.RMAppManager維護的應用實例開始通知ApplicationMasterLauncher(AM生命周期管理服務,負責啟動或清理AM container)啟動AM container,ApplicationMasterLauncher與YARN NodeManager(下文簡稱YARN NM,與YARN RM保持通信,負責管理單個節(jié)點上的全部資源、Container生命周期、附屬服務等,監(jiān)控節(jié)點健康狀況和Container資源使用)建立通信并請求啟動AM container;

9.ContainerManager(YARN NM核心組件,管理所有Container的生命周期)接收到AM container啟動請求,YARN NM開始校驗Container Token及資源文件,創(chuàng)建應用實例和Container實例并存儲至本地,結(jié)果返回后應用實例狀態(tài)變?yōu)長AUNCHED;

10.ResourceLocalizationService(資源本地化服務,負責Container所需資源的本地化。它能夠按照描述從HDFS上下載Container所需的文件資源,并盡量將它們分攤到各個磁盤上以防止出現(xiàn)訪問熱點)初始化各種服務組件、創(chuàng)建工作目錄、從HDFS下載運行所需的各種資源至Container工作目錄(路徑為: ${yarn.nodemanager.local-dirs}/usercache/${user}/appcache//);

11.ContainersLauncher(負責container的具體操作,包括啟動、重啟、恢復和清理等)將待運行Container所需的環(huán)境變量和運行命令寫到Container工作目錄下的launch_container.sh腳本中,然后運行該腳本啟動Container;

12.Container進程加載并運行ClusterEntrypoint(Flink JobManager入口類,每種集群部署模式和應用運行模式都有相應的實現(xiàn),例如在YARN集群部署模式下,per-job應用運行模式實現(xiàn)類是YarnJobClusterEntrypoint,session應用運行模式實現(xiàn)類是YarnSessionClusterEntrypoint),首先初始化相關(guān)運行環(huán)境:

  • 輸出各軟件版本及運行環(huán)境信息、命令行參數(shù)項、classpath等信息;

  • 注冊處理各種SIGNAL的handler:記錄到日志

  • 注冊JVM關(guān)閉保障的shutdown hook:避免JVM退出時被其他shutdown hook阻塞

  • 打印YARN運行環(huán)境信息:用戶名

  • 從運行目錄中加載flink conf

  • 初始化文件系統(tǒng)

  • 創(chuàng)建并啟動各類內(nèi)部服務(包括RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore等)

  • 將RPC address和port更新到flink conf配置

13.啟動ResourceManager(Flink資源管理核心組件,包含YarnResourceManager和SlotManager兩個子組件,YarnResourceManager負責外部資源管理,與YARN RM建立通信并保持心跳,申請或釋放TaskManager資源,注銷應用等;SlotManager則負責內(nèi)部資源管理,維護全部Slot信息和狀態(tài))及相關(guān)服務,創(chuàng)建異步AMRMClient,開始注冊AM,注冊成功后每隔一段時間(心跳間隔配置項:${yarn.heartbeat.interval},默認5s)向YARN RM發(fā)送心跳來發(fā)送資源更新請求和接受資源變更結(jié)果。YARN RM內(nèi)部該應用和應用運行實例的狀態(tài)都變?yōu)镽UNNING,并通知AMLivelinessMonitor服務監(jiān)控AM是否存活狀態(tài),當心跳超過一定時間(默認10分鐘)觸發(fā)AM failover流程;

14.啟動Dispatcher(負責接收用戶提供的作業(yè),并且負責為這個新提交的作業(yè)拉起一個新的 JobManager)及相關(guān)服務(包括REST endpoint等),在per-job運行模式下,Dispatcher將直接從Container工作目錄加載JobGrap文件;在session運行模式下,Dispatcher將在接收客戶端提交的Job(_通過BlockServer接收job grap文件)后再進行后續(xù)流程;

15.根據(jù)JobGraph啟動JobManager(負責作業(yè)調(diào)度、管理Job和Task的生命周期),構(gòu)建ExecutionGraph(JobGraph的并行化版本,調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu));

16.JobManager開始執(zhí)行ExecutionGraph,向ResourceManager申請資源;

17.ResourceManager將資源請求加入等待請求隊列,并通過心跳向YARN RM申請新的Container資源來啟動TaskManager進程;后續(xù)流程如果有空閑Slot資源,SlotManager將其分配給等待請求隊列中匹配的請求,不用再通過18. YarnResourceManager申請新的TaskManager;

**18.YARN ApplicationMasterService接收到資源請求后,解析出新的資源請求并更新應用請求信息; **

19.YARN ResourceScheduler成功為該應用分配資源后更新應用信息,ApplicationMasterService接收到Flink JobManager的下一次心跳時返回新分配資源信息;

20.Flink ResourceManager接收到新分配的Container資源后,準備好TaskManager啟動上下文(ContainerLauncherContext,生成TaskManager配置并上傳至分布式存儲,配置其他依賴和環(huán)境變量等),然后向YARN NM申請啟動TaskManager進程,YARN NM啟動Container的流程與AM Container啟動流程基本類似,區(qū)別在于應用實例在NM上已存在并未RUNNING狀態(tài)時則跳過應用實例初始化流程,這里不再贅述;

21.TaskManager進程加載并運行YarnTaskExecutorRunner(Flink TaskManager入口類),初始化流程完成后啟動TaskExecutor(負責執(zhí)行Task相關(guān)操作);

22.TaskExecutor啟動后先向ResourceManager注冊,成功后再向SlotManager匯報自己的Slot資源與狀態(tài); SlotManager接收到Slot空閑資源后主動觸發(fā)Slot分配,從等待請求隊列中選出合適的資源請求后,向 TaskManager請求該Slot資源

23.TaskManager收到請求后檢查該Slot是否可分配(不存在則返回異常信息)、Job是否已注冊(沒有則先注冊再分配Slot),檢查通過后將Slot分配給JobManager;

24.JobManager檢查Slot分配是否重復,通過后通知Execution執(zhí)行部署task流程,向TaskExecutor提交task; TaskExecutor啟動新的線程運行Task。

“怎么掌握Flink on YARN應用啟動流程”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI