溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么進行spark.streaming.concurrentJobs參數(shù)解密的分析

發(fā)布時間:2021-12-16 18:42:37 來源:億速云 閱讀:117 作者:柒染 欄目:云計算

這篇文章將為大家詳細講解有關(guān)怎么進行spark.streaming.concurrentJobs參數(shù)解密的分析,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

最近,在spark streaming 調(diào)優(yōu)時,發(fā)現(xiàn)個增加job并行度的參數(shù)spark.streaming.concurrentJobs,spark 默認值為1,當增加為2時(在spark-default中配置),如遇到處理速度慢 streaming application UI 中會有兩個Active Jobs(默認值時為1),也就是在同一時刻可以執(zhí)行兩個批次的streaming job,下文分析這個參數(shù)是如何影響streaming 的執(zhí)行的。 ##參數(shù)引入 在spark streaming 的JobScheduler line 47,讀取了該參數(shù):

private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
private val jobExecutor =  ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")

使用concurrentJobs參數(shù)初始化jobExecutor線程池,也就是這個參數(shù)直接影響了job executor線程池中的線程數(shù)目。

job executor

job executor 線程池用來execute JobHandler線程;在jobSchedule中有個job容器jobSets:

private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]

用來保存不同的時間點生成的JobSet,而JobSet中包含多個Job; JobSet submit邏輯:

  def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
  }

不難看出jobExecutor的容量決定了池子中同時可以被處理的JobHandler線程數(shù),JobHandler是job的執(zhí)行線程,因此決定了可以被同時被提交的Job數(shù)目。

使用方法

可以通過集中方法為streaming job配置此參數(shù)。

  • spark-default中修改 全局性修改,所有的streaming job都會受到影響。

  • 提交streaming job是 --conf 參數(shù)添加(推薦) 在提交job時,可以使用--conf 參數(shù)為該job添加個性化的配置。例如: bin/spark-submit --master yarn --conf spark.streaming.concurrentJobs=5 設(shè)置該streaming job的job executor 線程池大小為5,在資源充足的情況下可以同時執(zhí)行5個batch job。

  • 代碼設(shè)置 在代碼中通過sparkConf設(shè)置: sparkConf.set("spark.streaming.concurrentJobs", "5"); 或者 System.setProperty("spark.streaming.concurrentJobs", "5");

scheduler mode的使用建議

在配置多個concurrentJob時,多個批次job被同時提交到集群中,也就需要更多的計算資源;當沒有更多的計算資源(Executor)被分配個該streaming job時,可將schedul 調(diào)整為FAIR(公平調(diào)度)來達到被提交的多個job可公平的共享計算資源。 當調(diào)整為公平調(diào)度時,job可以共享計算資源,而job的提交仍然是有時間順序的(雖然時間間隔很?。?,容易造成task在executor間分配的傾斜,拉長job的整體執(zhí)行時間。 當使用fifo調(diào)度方式,先到的job優(yōu)先獲得計算資源,當executor數(shù)目不足時,job會等待executor被釋放,task數(shù)目反而不易傾斜。 在實際使用時,如果executor數(shù)目足夠,建議使用FIFO模式,如在concurrentJob為默認配置時,executor分配數(shù)目為m,則當concurrentJobs配置為n時,executor建議分配為 n*m。

關(guān)于怎么進行spark.streaming.concurrentJobs參數(shù)解密的分析就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI