您好,登錄后才能下訂單哦!
這篇文章主要講解了“spark中的DRA怎么開啟”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“spark中的DRA怎么開啟”吧!
spark on yarn對(duì)于DynamicResourceAllocation分配來說,從spark 1.2版本就已經(jīng)開始支持了.
對(duì)于spark熟悉的人都知道,如果我們要開啟DynamicResourceAllocation,就得有ExternalShuffleService服務(wù),
對(duì)于yarn來說ExternalShuffleService是作為輔助服務(wù)開啟的,具體配置如下:
<property> <name>yarn.nodemanager.aux-services</name> <value>spark_shuffle</value> </property> <property> <name>yarn.nodemanager.aux-services.spark_shuffle.class</name> <value>org.apache.spark.network.yarn.YarnShuffleService</value> </property> <property> <name>spark.shuffle.service.port</name> <value>7337</value> </property>
重啟nodeManager,這樣在每個(gè)nodeManager節(jié)點(diǎn)就會(huì)啟動(dòng)一個(gè)YarnShuffleService,之后在spark應(yīng)用中設(shè)置spark.dynamicAllocation.enabled 為true,這樣就能達(dá)到運(yùn)行時(shí)資源動(dòng)態(tài)分配的效果
我們直接從CoarseGrainedExecutorBackend中SparkEnv創(chuàng)建開始說,每一個(gè)executor的啟動(dòng),必然會(huì)經(jīng)過CoarseGrainedExecutorBackend main方法,而main中就涉及到SparkEnv的創(chuàng)建
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
而sparkEnv的創(chuàng)建就涉及到BlockManager的創(chuàng)建。沿著代碼往下走,最終
val blockTransferService = new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress, blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint) val blockManager = new BlockManager( executorId, rpcEnv, blockManagerMaster, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, externalShuffleClient)
在blockManager的initialize方法中,就會(huì)進(jìn)行registerWithExternalShuffleServer
// Register Executors' configuration with the local shuffle service, if one should exist. if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { registerWithExternalShuffleServer() }
如果我們開啟了ExternalShuffleService,對(duì)于yarn就是YarnShuffleService,就會(huì)把當(dāng)前的ExecutorShuffleInfo注冊到host為shuffleServerId.host, port為shuffleServerId.port的ExternalShuffleService中,ExecutorShuffleInfo的信息如下:
val shuffleConfig = new ExecutorShuffleInfo( diskBlockManager.localDirsString, diskBlockManager.subDirsPerLocalDir, shuffleManager.getClass.getName)
這里我重點(diǎn)分析一下registerWithExternalShuffleServer的方法中的以下片段
// Synchronous and will throw an exception if we cannot connect. blockStoreClient.asInstanceOf[ExternalBlockStoreClient].registerWithShuffleServer( shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)
該代碼中shuffleServerId來自于:
shuffleServerId = if (externalShuffleServiceEnabled) { logInfo(s"external shuffle service port = $externalShuffleServicePort") BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) } else { blockManagerId }
而blockTransferService.hostName 是我們在SparkEnv中創(chuàng)建的時(shí)候由advertiseAddress傳過來的,
最終由CoarseGrainedExecutorBackend 主類參數(shù)hostname過來的,那到底怎么傳過來的呢? 參照ExecutorRunnable的prepareCommand方法,
val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend", "--driver-url", masterAddress, "--executor-id", executorId, "--hostname", hostname, "--cores", executorCores.toString, "--app-id", appId, "--resourceProfileId", resourceProfileId.toString) ++
而這個(gè)hostname的值最終由YarnAllocator的方法runAllocatedContainers
val executorHostname = container.getNodeId.getHost
傳遞過來的,也就是說我們最終獲取到了yarn節(jié)點(diǎn),也就是nodeManager的host 這樣每個(gè)啟動(dòng)的executor,就向executor所在的nodeManager的YarnShuffleService注冊了ExecutorShuffleInfo信息,這樣對(duì)于開啟了動(dòng)態(tài)資源分配的
ExternalBlockStoreClient 來說fetchBlocksg過程就和未開啟動(dòng)態(tài)資源分配的NettyBlockTransferService大同小異了
參考之前的文章,我們知道在entrypoint中我們在啟動(dòng)executor的時(shí)候,我們傳遞了hostname參數(shù)
executor) shift 1 CMD=( ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP )
而SPARK_EXECUTOR_POD_IP是運(yùn)行中的POD IP,參考BasicExecutorFeatureStep類片段:
Seq(new EnvVarBuilder() .withName(ENV_EXECUTOR_POD_IP) .withValueFrom(new EnvVarSourceBuilder() .withNewFieldRef("v1", "status.podIP") .build()) .build())
這樣按照以上流程的分析,
executor也不能向k8s節(jié)點(diǎn)ExternalShuffleService服務(wù)注冊,因?yàn)槲覀冏缘墓?jié)點(diǎn)是POD IP,而不是節(jié)點(diǎn)IP,
當(dāng)然spark社區(qū)早就提出了未開啟external shuffle service的動(dòng)態(tài)資源分配,且已經(jīng)合并到master分支. 具體配置,可以參照如下:
spark.dynamicAllocation.enabled true spark.dynamicAllocation.shuffleTracking.enabled true spark.dynamicAllocation.minExecutors 1 spark.dynamicAllocation.maxExecutors 4 spark.dynamicAllocation.executorIdleTimeout 60s
感謝各位的閱讀,以上就是“spark中的DRA怎么開啟”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)spark中的DRA怎么開啟這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。