溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

大數(shù)據(jù)計算框架Spark的任務(wù)調(diào)度是怎么實現(xiàn)的

發(fā)布時間:2021-12-17 11:10:34 來源:億速云 閱讀:122 作者:柒染 欄目:大數(shù)據(jù)

大數(shù)據(jù)計算框架Spark的任務(wù)調(diào)度是怎么實現(xiàn)的,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。

Spark有幾種資源調(diào)度設(shè)施。每個Spark Application(SparkContext實例)獨立地運行在一組executor進(jìn)程內(nèi)。cluster  manager為應(yīng)用間的調(diào)度提供設(shè)施。在每個Spark應(yīng)用內(nèi),如果將多個job(多個spark action)提交給不同的線程,那么他們會并行運行。

1 Application間的資源調(diào)度

集群上,每個Spark application獲得獨立的一組executor JVM,這組executor  JVM只為那個application運行task和存儲數(shù)據(jù)。如果多個用戶要共享集群,有不同的策略管理資源分配,這取決于使用的cluster  manager。

資源的靜態(tài)分區(qū)(static partitioning)可被所有的cluster  manager獲得,這樣每個application在他的生命周期內(nèi)都可獲得他能使用的最多資源。standalone、YARN、coarse-grained  Mesos mode這三種模式使用的就是這種方式。

1.1控制資源使用

集群類型下,如下配置資源分配:

  1. Standalone mode:application提交到standalone  mode集群,將會以FIFO的順序運行,每個application會盡可能地使用所有可用節(jié)點,配置spark.cores.max來限制application使用節(jié)點的數(shù)目,或者設(shè)置spark.deploy.defaultCores。除了可以設(shè)置application可用內(nèi)核數(shù),還可以設(shè)置spark.executor.memory來控制內(nèi)存的使用。

  2. Mesos:為了使用靜態(tài)分區(qū)(static  partitioning)在Mesos集群上,spark.mesos.coarse=true,可以通過設(shè)置spark.cores.max來限制每個application的資源共享,通過設(shè)置spark.executor.memory來控制executor內(nèi)存的使用。

  3. YARN:通過設(shè)置--num-executors選項,spark  YARN客戶端可控制集群上有多少executor被分配(對應(yīng)的配置屬性為spark.executor.instances),--executor-memory(對應(yīng)的配置屬性spark.executor.memory)和--executor-cores(對應(yīng)的配置屬性spark.executor.cores)控制了分配給每個executor的資源。

應(yīng)用之間無法共享內(nèi)存。

1.2動態(tài)資源分配

Spark提供了依據(jù)應(yīng)用的工作量動態(tài)調(diào)整資源的機(jī)制。這意味著你的application不在使用的資源會返還給集群,當(dāng)需要的時候再申請分配資源,這種特性對于多應(yīng)用共享集群特別有用。

這個特性默認(rèn)失效,但在所有coarse-grained cluster manager上都可用,如:standalone mode, YARN mode,  和Mesos coarse-grained mode。

使用這個特性有兩個要求。首先用于必須設(shè)置spark.dynamicAllocation.enabled=true,其次要設(shè)置external  shuffle service在集群上的每個worker  node并設(shè)置spark.shuffle.service.enabled=true。設(shè)置external shuffle  service目的是executor可被移除但是不刪除他們生成的shuffle文件。

設(shè)置這個變量的方式為:

  • 在standalone模式:設(shè)置spark.shuffle.service.enabled=true

  • Mesos  coarse-grained模式:在所有從節(jié)點運行$SPARK_HOME/sbin/start-mesos-shuffle-service.sh設(shè)置spark.shuffle.service.enabled=true

  • YARN:詳見運行spark與YARN

1.3資源分配策略

當(dāng)Spark不再使用executor時就出讓它,需要的時候再獲取它。因為沒有一個確定的方式預(yù)測將要被移除的executor是否在不久的將來會被使用,或者一個將要被添加的新executor實際上是否是空閑的,所以我們需要一系列試探來確定是移除executor(可能會移除多個)還是請求executor(可能會請求多個)。

請求策略

開啟Spark application動態(tài)分配資源特性,當(dāng)pending task等待被調(diào)度時,Spark  application會請求額外的executor。這就意味著,當(dāng)前的這些executor無法同時滿足所有的task,這些task已經(jīng)被提交,但是還沒有執(zhí)行完。

Spark輪流請求executor。當(dāng)task等待的時間大于spark.dynamicAllocation.schedulerBacklogTimeout時,真正的請求(申請executor的請求)被觸發(fā),之后,如果未完成task隊列存在,那么每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout秒請求被觸發(fā)一次。每一輪請求的executor數(shù)量以指數(shù)級增長。例如,***輪請求一個executor,第二輪請求2個,第三,四輪分別請求4,8個。

按指數(shù)形式增長的動機(jī)有兩個,首先,起初應(yīng)用應(yīng)該慎重地請求executor,以防只需幾個executor就能滿足需求,這和TCP慢啟動類似。其次,當(dāng)應(yīng)用確實需要更多的executor時,應(yīng)用應(yīng)該能夠及時地增加資源的使用。

移除策略

當(dāng)executor閑置超過spark.dynamicAllocation.executorIdleTimeout秒時,就將他移除。注意,大多數(shù)情況下,executor的移除條件和請求條件是互斥的,這樣如果仍然有待調(diào)度的task的情況下executor是不會被移除的。

executor優(yōu)雅地退役

非動態(tài)分配資源情況下,一個Spark  executor或者是由于失敗而退出,或者是因相關(guān)application退出而退出。這兩種情況下,不在需要與executor相關(guān)聯(lián)的狀態(tài)并且這些狀態(tài)可以被安全地丟棄。動態(tài)分配資源的情況下,當(dāng)executor被明確移除時,application仍然在運行。如果application要想使用這些由executor存儲和寫下的狀態(tài),就必須重新計算狀態(tài)。這樣就需要一種優(yōu)雅的退役機(jī)制,即在executor退役前保留他的狀態(tài)。

這個機(jī)制對于shuffles特別重要。shuffle期間,executor自己的map輸出寫入本地磁盤。當(dāng)其他的executor要獲取這些文件的時候,這個executor充當(dāng)了文件服務(wù)器的角色。對于那些落后的executor,他們的task執(zhí)行時間比同輩要長,在shuffle完成之前,動態(tài)資源分配可能移除了一個executor,這種情形下,那個executor寫入本地的文件(即executor的狀態(tài))不必重新計算。

保留shuffle文件的辦法就是使用外部的shuffle服務(wù),這是在Spark  1.2中引入的。這個外部的shuffle服務(wù)指的是長時間運行的進(jìn)程,它運行與集群的每個節(jié)點上,獨立于application和executor。如果這個服務(wù)可用,executor就從這個服務(wù)獲shuffle  file,而不是彼此之間獲取shuffle  file。這意味著executor生成的任何shuffle文件都可能被服務(wù)包含,即使在executor生命周期之外也是如此。

executor除了寫shuffle  文件到本地硬盤,還緩存數(shù)據(jù)到硬盤或內(nèi)存中。但是,當(dāng)executor被移除后,緩存到內(nèi)存中的數(shù)據(jù)將不可用。為了解決這一問題,默認(rèn)地緩存數(shù)據(jù)到內(nèi)存的executor永遠(yuǎn)不會被刪除??梢酝ㄟ^spark.dynamicAllocation.cachedExecutorIdleTimeout配置這一行為,

2 Application內(nèi)的資源調(diào)度

概述

給定的application內(nèi)部(SparkContext  實例),如果多個并行的job被提交到不同的線程上,那么這些job可以同時執(zhí)行。這里的job指的是Spark action及Spark  action觸發(fā)的計算task。Spark scheduler是線程安全的,支持spark application服務(wù)于多個請求。

默認(rèn)地Spark  scheduler以FIFO的順序執(zhí)行job,每個job被切分為一到多個stage(例如,map和reduce),當(dāng)***個job的stage的task啟動后,這個job優(yōu)先獲得所有可用資源,然后才是第二,三個job......。如果隊頭的job不必使用整個集群,之后的job就能立即啟動。如果隊頭的job較大,那么之后的job啟動延遲會比較明顯。

從Spark  0.8開始,也可以通過配置實現(xiàn)隊列間的公平調(diào)度。Job間的task資源分配采用單循環(huán)的方式。所有job都會獲得大致相同的集群資源。這就意味著,當(dāng)有長job存在時,提交的短job可以立即獲得資源啟動運行而不必等到長job執(zhí)行完畢??梢栽O(shè)置spark.scheduler.mode為FAIR

val conf = new SparkConf().setMaster(...).setAppName(...)  conf.set("spark.scheduler.mode", "FAIR")  val sc = new SparkContext(conf)

公平調(diào)度池(可能多個)

公平調(diào)度器也支持在池中對job分組并給每個池配置不同的選項。這有助于為更重要的job設(shè)置高優(yōu)先級池,例如把每個用戶的job分到一組,并且給這些用戶相等的資源不論有多少并行task,而不是給每個job相等的資源。

不需要任何干預(yù),新job會進(jìn)入默認(rèn)池,但是可以使用spark.scheduler.pool設(shè)置job池。

sc.setLocalProperty("spark.scheduler.pool", "pool1")

設(shè)置完后,這個線程(通過調(diào)用RDD.save, count,  collect)提交的所有job都會使用這個資源池的名稱。設(shè)置是針對每一個線程的,這樣更容易實現(xiàn)一個線程運行一個用戶的多個job。如果想清除與一個線程相關(guān)的池,調(diào)用:sc.setLocalProperty("spark.scheduler.pool",  null)

池默認(rèn)行為

默認(rèn)地每個池都能獲得相等的資源(在默認(rèn)池中每個job都能獲得相等的資源),但在每個池內(nèi)部,job以FIFO  的順序運行。例如如果為每一個用戶創(chuàng)建一個池,這就意味著每一個用戶將獲得相等的資源,并且每個用戶的查詢都會按順序運行而不會出現(xiàn)后來的查詢搶占了前面查詢的資源

配置池屬性

可以通過修改配置文件改變池屬性。每個池都支持三種屬性:

  • schedulingMode:可以是FIFO或FAIR,控制池中的job排隊等候或公平地分享集群資源。

  • weight:控制資源分配的比例。默認(rèn)所有池分配資源比重都是1。如果指定一個池的比重為2,那么他獲得的資源是其他池的2倍。如果將一個池的比重設(shè)的很高,比如1000,那么不論他是否有活躍的job,他總是***個開始執(zhí)行task。

  • minShare:除了設(shè)置總體的占比之外,還可以對每個池設(shè)定一個最小資源分配(例如CPU核數(shù))。在根據(jù)比重重新分配資源之前,公平調(diào)度器總是試圖滿足所有活躍池的最小資源需求。minShare屬性能以另一種方式確保一個池快速地獲得一定數(shù)量的資源(10個核)而不必給他更高的優(yōu)先級。默認(rèn)地minShare=0。

調(diào)用SparkConf.set,可以通過XML文件配置池屬性:

conf.set("spark.scheduler.allocation.file", "/path/to/file")

每個池一個,在XML文件中沒有配置的池使用默認(rèn)配置(調(diào)度模式 FIFO, weight 1, minShare 0),例如:

<?xml version="1.0"?><allocations>  <pool name="production">  <schedulingMode>FAIR</schedulingMode>  <weight>1</weight>  <minShare>2</minShare>  </pool>  <pool name="test">  <schedulingMode>FIFO</schedulingMode>  <weight>2</weight>  <minShare>3</minShare>  </pool></allocations>

看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI