溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

spark中的DRA怎么開啟

發(fā)布時(shí)間:2022-01-07 14:47:12 來源:億速云 閱讀:174 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“spark中的DRA怎么開啟”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“spark中的DRA怎么開啟”吧!

spark on yarn 中的DynamicResourceAllocation

spark on yarn對(duì)于DynamicResourceAllocation分配來說,從spark 1.2版本就已經(jīng)開始支持了.
對(duì)于spark熟悉的人都知道,如果我們要開啟DynamicResourceAllocation,就得有ExternalShuffleService服務(wù),
對(duì)于yarn來說ExternalShuffleService是作為輔助服務(wù)開啟的,具體配置如下:

<property>
   <name>yarn.nodemanager.aux-services</name>
   <value>spark_shuffle</value>
</property>

<property>
   <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
   <value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>

<property>
   <name>spark.shuffle.service.port</name>
   <value>7337</value>
</property>

重啟nodeManager,這樣在每個(gè)nodeManager節(jié)點(diǎn)就會(huì)啟動(dòng)一個(gè)YarnShuffleService,之后在spark應(yīng)用中設(shè)置spark.dynamicAllocation.enabled 為true,這樣就能達(dá)到運(yùn)行時(shí)資源動(dòng)態(tài)分配的效果

我們直接從CoarseGrainedExecutorBackend中SparkEnv創(chuàng)建開始說,每一個(gè)executor的啟動(dòng),必然會(huì)經(jīng)過CoarseGrainedExecutorBackend main方法,而main中就涉及到SparkEnv的創(chuàng)建

 val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
       arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)

而sparkEnv的創(chuàng)建就涉及到BlockManager的創(chuàng)建。沿著代碼往下走,最終

val blockTransferService =
     new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
       blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint)
val blockManager = new BlockManager(
     executorId,
     rpcEnv,
     blockManagerMaster,
     serializerManager,
     conf,
     memoryManager,
     mapOutputTracker,
     shuffleManager,
     blockTransferService,
     securityManager,
     externalShuffleClient)

在blockManager的initialize方法中,就會(huì)進(jìn)行registerWithExternalShuffleServer

 // Register Executors' configuration with the local shuffle service, if one should exist.
   if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
     registerWithExternalShuffleServer()
   }

如果我們開啟了ExternalShuffleService,對(duì)于yarn就是YarnShuffleService,就會(huì)把當(dāng)前的ExecutorShuffleInfo注冊到host為shuffleServerId.host, port為shuffleServerId.port的ExternalShuffleService中,ExecutorShuffleInfo的信息如下:

val shuffleConfig = new ExecutorShuffleInfo(
     diskBlockManager.localDirsString,
     diskBlockManager.subDirsPerLocalDir,
     shuffleManager.getClass.getName)

這里我重點(diǎn)分析一下registerWithExternalShuffleServer的方法中的以下片段

// Synchronous and will throw an exception if we cannot connect.
       blockStoreClient.asInstanceOf[ExternalBlockStoreClient].registerWithShuffleServer(
         shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)

該代碼中shuffleServerId來自于:

shuffleServerId = if (externalShuffleServiceEnabled) {
     logInfo(s"external shuffle service port = $externalShuffleServicePort")
     BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
   } else {
     blockManagerId
   }

而blockTransferService.hostName 是我們在SparkEnv中創(chuàng)建的時(shí)候由advertiseAddress傳過來的,
最終由CoarseGrainedExecutorBackend 主類參數(shù)hostname過來的,那到底怎么傳過來的呢? 參照ExecutorRunnable的prepareCommand方法,

val commands = prefixEnv ++
     Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
     javaOpts ++
     Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
       "--driver-url", masterAddress,
       "--executor-id", executorId,
       "--hostname", hostname,
       "--cores", executorCores.toString,
       "--app-id", appId,
       "--resourceProfileId", resourceProfileId.toString) ++

而這個(gè)hostname的值最終由YarnAllocator的方法runAllocatedContainers

val executorHostname = container.getNodeId.getHost

傳遞過來的,也就是說我們最終獲取到了yarn節(jié)點(diǎn),也就是nodeManager的host 這樣每個(gè)啟動(dòng)的executor,就向executor所在的nodeManager的YarnShuffleService注冊了ExecutorShuffleInfo信息,這樣對(duì)于開啟了動(dòng)態(tài)資源分配的
ExternalBlockStoreClient 來說fetchBlocksg過程就和未開啟動(dòng)態(tài)資源分配的NettyBlockTransferService大同小異了

spark on k8s(kubernetes) 中的DynamicResourceAllocation

參考之前的文章,我們知道在entrypoint中我們在啟動(dòng)executor的時(shí)候,我們傳遞了hostname參數(shù)

executor)
    shift 1
    CMD=(
      ${JAVA_HOME}/bin/java
      "${SPARK_EXECUTOR_JAVA_OPTS[@]}"
      -Xms$SPARK_EXECUTOR_MEMORY
      -Xmx$SPARK_EXECUTOR_MEMORY
      -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH"
      org.apache.spark.executor.CoarseGrainedExecutorBackend
      --driver-url $SPARK_DRIVER_URL
      --executor-id $SPARK_EXECUTOR_ID
      --cores $SPARK_EXECUTOR_CORES
      --app-id $SPARK_APPLICATION_ID
      --hostname $SPARK_EXECUTOR_POD_IP
    )

而SPARK_EXECUTOR_POD_IP是運(yùn)行中的POD IP,參考BasicExecutorFeatureStep類片段:

Seq(new EnvVarBuilder()
          .withName(ENV_EXECUTOR_POD_IP)
          .withValueFrom(new EnvVarSourceBuilder()
            .withNewFieldRef("v1", "status.podIP")
            .build())
          .build())

這樣按照以上流程的分析,
executor也不能向k8s節(jié)點(diǎn)ExternalShuffleService服務(wù)注冊,因?yàn)槲覀冏缘墓?jié)點(diǎn)是POD IP,而不是節(jié)點(diǎn)IP,
當(dāng)然spark社區(qū)早就提出了未開啟external shuffle service的動(dòng)態(tài)資源分配,且已經(jīng)合并到master分支. 具體配置,可以參照如下:

spark.dynamicAllocation.enabled  true 
spark.dynamicAllocation.shuffleTracking.enabled  true
spark.dynamicAllocation.minExecutors  1
spark.dynamicAllocation.maxExecutors  4
spark.dynamicAllocation.executorIdleTimeout	 60s

感謝各位的閱讀,以上就是“spark中的DRA怎么開啟”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)spark中的DRA怎么開啟這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI