您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關(guān)如何進(jìn)行CoarseGrainedSchedulerBackend和CoarseGrainedExecutorBackend的分析,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
CoarseGrainedSchedulerBackend是Driver端用到的,CoarseGrainedExecutorBackend是Executor端用到的。他們都是Backend,什么是Backend?Backend其實(shí)就是負(fù)責(zé)端到端通信的,這兩個(gè)CoarseGrained的Backend是負(fù)責(zé)Driver和Executor之間的通信的。
什么是Driver呢?
Driver就是我們編寫的spark代碼,里面的main函數(shù)就是Driver跑的代碼。
什么是Executor呢?
Executor就是執(zhí)行spark的Task任務(wù)的地方,Backend接收到Driver的LaunchTask消息后,調(diào)用Executor類的launchTask方法來(lái)執(zhí)行任務(wù)。
Driver會(huì)啟動(dòng)CoarseGrainedSchedulerBackend,通過(guò)CoarseGrainedSchedulerBackend來(lái)向集群申請(qǐng)機(jī)器以便啟動(dòng)Executor,會(huì)找到一臺(tái)機(jī)器,發(fā)送命令讓機(jī)器啟動(dòng)一個(gè)ExecutorRunner,ExecutorRunner里啟動(dòng)CoarseGrainedExecutorBackend向Driver注冊(cè),并創(chuàng)建Executor來(lái)處理CoarseGrainedExecutorBackend接收到的請(qǐng)求。剛剛說(shuō)的是Standalone部署下的流程,Yarn下大部分類似,只有向集群申請(qǐng)機(jī)器來(lái)啟動(dòng)Executor這一步不太一樣,這個(gè)簡(jiǎn)單說(shuō)一下吧。
Yarn環(huán)境下,是通過(guò)spark-yarn工程里的幾個(gè)類一級(jí)yarn本身的功能來(lái)一起完成機(jī)器的部署和分區(qū)任務(wù)的分發(fā)。
spark-yarn包含兩個(gè)文件:client.java和ApplicationMaster.java。
client.java功能是向yarn申請(qǐng)資源來(lái)執(zhí)行ApplicationMaster.java的代碼,所以這里主要看下ApplicationMaster.java的代碼功能是什么。
ApplicationMaster首先干兩件事,啟動(dòng)一個(gè)"/bin/mesos-master"和多個(gè)"/bin/mesos-slave",這都是向yarn申請(qǐng)資源然后部署上去執(zhí)行的,都是yarn的功能部分,"/bin/mesos-master"和"/bin/mesos-slave"是yarn環(huán)境里自帶的兩個(gè)bin程序,可以看成是類似Standalone環(huán)境下的Master和Worker。
launchContainer方法是啟動(dòng)yarn的container,也就是前面說(shuō)的在container上啟動(dòng)“/bin/mesos-slave",mesos-slave會(huì)向mesos-master注冊(cè)的。等需要的slave節(jié)點(diǎn)資源全部申請(qǐng)啟動(dòng)完成后,調(diào)用startApplication()方法開始執(zhí)行Driver。
startApplication()方法:
// Start the user's application private void startApplication() throws IOException { try { String sparkClasspath = getSparkClasspath(); String jobJar = new File("job.jar").getAbsolutePath(); String javaArgs = "-Xms" + (masterMem - 128) + "m -Xmx" + (masterMem - 128) + "m"; javaArgs += " -Djava.library.path=" + mesosHome + "/lib/java"; String substitutedArgs = programArgs.replaceAll("\\[MASTER\\]", masterUrl); if (mainClass.equals("")) { javaArgs += " -cp " + sparkClasspath + " -jar " + jobJar + " " + substitutedArgs; } else { javaArgs += " -cp " + sparkClasspath + ":" + jobJar + " " + mainClass + " " + substitutedArgs; } String java = "java"; if (System.getenv("JAVA_HOME") != null) { java = System.getenv("JAVA_HOME") + "/bin/java"; } String bashCommand = java + " " + javaArgs + " 1>" + logDirectory + "/application.stdout" + " 2>" + logDirectory + "/application.stderr"; LOG.info("Command: " + bashCommand); String[] command = new String[] {"bash", "-c", bashCommand}; String[] env = new String[] {"SPARK_HOME=" + sparkHome, "MASTER=" + masterUrl, "SPARK_MEM=" + (slaveMem - 128) + "m"}; application = Runtime.getRuntime().exec(command, env); new Thread("wait for user application") { public void run() { try { appExitCode = application.waitFor(); appExited = true; LOG.info("User application exited with code " + appExitCode); } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); } catch (SparkClasspathException e) { unregister(false); System.exit(1); return; } }
這就是啟動(dòng)Driver了,masterUrl就是”bin/mesos-master“的地址,設(shè)置成了環(huán)境變量”MASTER“來(lái)用了,yarn下的master的地址格式是”mesos://host:port“,Standalone下是”spark://host:port“。
在SparkContext下會(huì)根據(jù)master地址格式,做不同的處理,這段代碼是這樣:
master match { case "local" => checkResourcesPerTask(clusterMode = false, Some(1)) val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1) scheduler.initialize(backend) (backend, scheduler) case LOCAL_N_REGEX(threads) => def localCpuCount: Int = Runtime.getRuntime.availableProcessors() // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. val threadCount = if (threads == "*") localCpuCount else threads.toInt if (threadCount <= 0) { throw new SparkException(s"Asked to run locally with $threadCount threads") } checkResourcesPerTask(clusterMode = false, Some(threadCount)) val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => def localCpuCount: Int = Runtime.getRuntime.availableProcessors() // local[*, M] means the number of cores on the computer with M failures // local[N, M] means exactly N threads with M failures val threadCount = if (threads == "*") localCpuCount else threads.toInt checkResourcesPerTask(clusterMode = false, Some(threadCount)) val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) case SPARK_REGEX(sparkUrl) => checkResourcesPerTask(clusterMode = true, None) val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) (backend, scheduler) case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => checkResourcesPerTask(clusterMode = true, Some(coresPerSlave.toInt)) // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang. val memoryPerSlaveInt = memoryPerSlave.toInt if (sc.executorMemory > memoryPerSlaveInt) { throw new SparkException( "Asked to launch cluster with %d MiB RAM / worker but requested %d MiB/worker".format( memoryPerSlaveInt, sc.executorMemory)) } val scheduler = new TaskSchedulerImpl(sc) val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf) val masterUrls = localCluster.start() val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => { localCluster.stop() } (backend, scheduler) case masterUrl => checkResourcesPerTask(clusterMode = true, None) val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr case None => throw new SparkException("Could not parse Master URL: '" + master + "'") } try { val scheduler = cm.createTaskScheduler(sc, masterUrl) val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { case se: SparkException => throw se case NonFatal(e) => throw new SparkException("External scheduler cannot be instantiated", e) } } }
如果是yarn,會(huì)落到最后一個(gè)case語(yǔ)句:
case masterUrl => checkResourcesPerTask(clusterMode = true, None) val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr case None => throw new SparkException("Could not parse Master URL: '" + master + "'") } try { val scheduler = cm.createTaskScheduler(sc, masterUrl) val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { case se: SparkException => throw se case NonFatal(e) => throw new SparkException("External scheduler cannot be instantiated", e) }
這里會(huì)用到ClusterManager的類,這又是什么東東呢?spark難就難在這,涉及的概念太多。
private def getClusterManager(url: String): Option[ExternalClusterManager] = { val loader = Utils.getContextOrSparkClassLoader val serviceLoaders = ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url)) if (serviceLoaders.size > 1) { throw new SparkException( s"Multiple external cluster managers registered for the url $url: $serviceLoaders") } serviceLoaders.headOption }
找到所有的ExternalClusterManager類及子類,看哪個(gè)類的canCreate方法對(duì)url返回true,我們這里就是找滿足"mesos://host:port"的類。
看完上述內(nèi)容,你們對(duì)如何進(jìn)行CoarseGrainedSchedulerBackend和CoarseGrainedExecutorBackend的分析有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。