您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)Yarn中如何實現(xiàn)ScheduleBackend,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
Yarn方式下的ScheduleBackend是用的啥?
在SparkContext中創(chuàng)建ScheduleBackend時,會根據(jù)指定的”master“參數(shù)的前綴決定創(chuàng)建哪種ScheduleBackend,對于"yarn://host:port"這樣的URL來說,如果是cluster模式,就是創(chuàng)建YarnClusterSchedulerBackend,如果是client模式,就是創(chuàng)建YarnClientSchedulerBackend。
我們還是先看看YarnClusterSchedulerBackend的代碼結(jié)構(gòu)把。
YarnClusterSchedulerBackend繼承了YarnSchedulerBackend,沒有太多的發(fā)揮代碼,我們直接看YarnSchedulerBackend把。估計client模式下也差不多。
YarnSchedulerBackend又繼承了CoarseGrainedSchedulerBackend,我們看看不同點在哪里。
覆寫了doRequestTotalExecutors和doKillExecutors方法,一個申請Executor,一個殺死Executor。
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal)) } override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = { yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds)) }
yarnSchedulerEndpointRef就是同一個文件里的endpoint端,看看具體的執(zhí)行代碼是什么:
case r: RequestExecutors => amEndpoint match { case Some(am) => am.ask[Boolean](r).andThen { case Success(b) => context.reply(b) case Failure(NonFatal(e)) => logError(s"Sending $r to AM was unsuccessful", e) context.sendFailure(e) }(ThreadUtils.sameThread) } case k: KillExecutors => amEndpoint match { case Some(am) => am.ask[Boolean](k).andThen { case Success(b) => context.reply(b) case Failure(NonFatal(e)) => logError(s"Sending $k to AM was unsuccessful", e) context.sendFailure(e) }(ThreadUtils.sameThread) }
我們看到它又將消息轉(zhuǎn)給了amEndpoint,就是轉(zhuǎn)給了yarn工程里的ApplicationManager。又要跳到ApplicationManager去看看里面的實現(xiàn)邏輯了,真是一波三折啊。
ApplicationManager里是怎么處理RequestExecutors和KillExecutors兩個消息的呢?
case r: RequestExecutors => Option(allocator) match { case Some(a) => if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal, r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) { resetAllocatorInterval() } context.reply(true) } case KillExecutors(executorIds) => Option(allocator) match { case Some(a) => executorIds.foreach(a.killExecutor) } context.reply(true)
調(diào)用allocator的killExecutor和requestTotalExecutorsWithPreferredLocalities方法。allocator又是啥?這里是不是類有的太多了啊。。
allocator = client.createAllocator( yarnConf, _sparkConf, appAttemptId, driverUrl, driverRef, securityMgr, localResources)
是client的createAllocator方法創(chuàng)建出來的,client是啥?是YarnRMClient,我們就要先看看YarnRMClient了,看名字就大概能猜到,YarnRMClient就是來向Yarn機(jī)器申請Executor和殺死Executor的。
createAllocator方法返回下面的YarnAllocator:
return new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, appAttemptId, securityMgr,
localResources, SparkRackResolver.get(conf))
來到Y(jié)arnAllocator。
YarnAllocator的killExecutor方法很好理解,就是釋放Yarn中的Container:
def killExecutor(executorId: String): Unit = synchronized { executorIdToContainer.get(executorId) match { case Some(container) if !releasedContainers.contains(container.getId) => internalReleaseContainer(container) runningExecutors.remove(executorId) case _ => logWarning(s"Attempted to kill unknown executor $executorId!") } }
申請Executor其實最終是在runAllocatedContainers方法中實現(xiàn)的。
核心代碼看一下把,完整的可以看源碼:
if (runningExecutors.size() < targetNumExecutors) { numExecutorsStarting.incrementAndGet() if (launchContainers) { launcherPool.execute(() => { try { new ExecutorRunnable( Some(container), conf, sparkConf, driverUrl, executorId, executorHostname, executorMemory, executorCores, appAttemptId.getApplicationId.toString, securityMgr, localResources ).run() updateInternalState() } catch { } }) }
申請targetNumExecutors個ExecutorRunner,這樣就和Standalone的申請Executor對應(yīng)起來了。好了,整個過程就是這樣了。
最終就會在Yarn集群中申請了所需數(shù)目的Container,并且在Container中啟動ExecutorRunner,來向Driver匯報成績。
這里的ExecutorRunner就是YarnCoarseGrainedExecutorBackend線程,在ExecutorRunner類中可以看到。
關(guān)于Yarn中如何實現(xiàn)ScheduleBackend就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。