溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Spark 3.0 AQE及CBO的示例分析

發(fā)布時(shí)間:2021-12-17 09:40:09 來(lái)源:億速云 閱讀:363 作者:柒染 欄目:大數(shù)據(jù)

這篇文章給大家介紹Spark 3.0 AQE及CBO的示例分析,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。

Spark3.0已經(jīng)發(fā)布半年之久,這次大版本的升級(jí)主要是集中在性能優(yōu)化和文檔豐富上,其中46%的優(yōu)化都集中在Spark SQL上,SQL優(yōu)化里最引人注意的非Adaptive Query Execution莫屬了。

Spark 3.0 AQE及CBO的示例分析

Adaptive Query Execution(AQE)是英特爾大數(shù)據(jù)技術(shù)團(tuán)隊(duì)和百度大數(shù)據(jù)基礎(chǔ)架構(gòu)部工程師在Spark 社區(qū)版本的基礎(chǔ)上,改進(jìn)并實(shí)現(xiàn)的自適應(yīng)執(zhí)行引擎。近些年來(lái),Spark SQL 一直在針對(duì)CBO 特性進(jìn)行優(yōu)化,而且做得十分成功。

CBO基本原理

首先,我們先來(lái)介紹另一個(gè)基于規(guī)則優(yōu)化(Rule-Based Optimization,簡(jiǎn)稱(chēng)RBO)的優(yōu)化器,這是一種經(jīng)驗(yàn)式、啟發(fā)式的優(yōu)化思路,優(yōu)化規(guī)則都已經(jīng)預(yù)先定義好,只需要將SQL往這些規(guī)則上套就可以。簡(jiǎn)單的說(shuō),RBO就像是一個(gè)經(jīng)驗(yàn)豐富的老司機(jī),基本套路全都知道。

然而世界上有一種東西叫做 – 不按套路來(lái)。與其說(shuō)它不按套路來(lái),倒不如說(shuō)它本身并沒(méi)有什么套路。最典型的莫過(guò)于復(fù)雜Join算子優(yōu)化,對(duì)于這些Join來(lái)說(shuō),通常有兩個(gè)選擇題要做:

  1. Join應(yīng)該選擇哪種算法策略來(lái)執(zhí)行?BroadcastJoin or ShuffleHashJoin or SortMergeJoin?不同的執(zhí)行策略對(duì)系統(tǒng)的資源要求不同,執(zhí)行效率也有天壤之別,同一個(gè)SQL,選擇到合適的策略執(zhí)行可能只需要幾秒鐘,而如果沒(méi)有選擇到合適的執(zhí)行策略就可能會(huì)導(dǎo)致系統(tǒng)OOM。

  2. 對(duì)于雪花模型或者星型模型來(lái)講,多表Join應(yīng)該選擇什么樣的順序執(zhí)行?不同的Join順序意味著不同的執(zhí)行效率,比如A join B join C,A、B表都很大,C表很小,那A join B很顯然需要大量的系統(tǒng)資源來(lái)運(yùn)算,執(zhí)行時(shí)間必然不會(huì)短。而如果使用A join C join B的執(zhí)行順序,因?yàn)镃表很小,所以A join C會(huì)很快得到結(jié)果,而且結(jié)果集會(huì)很小,再使用小的結(jié)果集 join B,性能顯而易見(jiàn)會(huì)好于前一種方案。

大家想想,這有什么固定的優(yōu)化規(guī)則么?并沒(méi)有。說(shuō)白了,你需要知道更多關(guān)于表的基礎(chǔ)信息(表大小、表記錄總條數(shù)等),再通過(guò)一定規(guī)則代價(jià)評(píng)估才能從中選擇一條最優(yōu)的執(zhí)行計(jì)劃。所以,CBO 意為基于代價(jià)優(yōu)化策略,它需要計(jì)算所有可能執(zhí)行計(jì)劃的代價(jià),并挑選出代價(jià)最小的執(zhí)行計(jì)劃。

AQE對(duì)于整體的Spark SQL的執(zhí)行過(guò)程做了相應(yīng)的調(diào)整和優(yōu)化,它最大的亮點(diǎn)是可以根據(jù)已經(jīng)完成的計(jì)劃結(jié)點(diǎn)真實(shí)且精確的執(zhí)行統(tǒng)計(jì)結(jié)果來(lái)不停的反饋并重新優(yōu)化剩下的執(zhí)行計(jì)劃。

CBO這么難實(shí)現(xiàn),Spark怎么解決?

CBO 會(huì)計(jì)算一些和業(yè)務(wù)數(shù)據(jù)相關(guān)的統(tǒng)計(jì)數(shù)據(jù),來(lái)優(yōu)化查詢(xún),例如行數(shù)、去重后的行數(shù)、空值、最大最小值等。Spark會(huì)根據(jù)這些數(shù)據(jù),自動(dòng)選擇BHJ或者SMJ,對(duì)于多Join場(chǎng)景下的Cost-based Join Reorder,來(lái)達(dá)到優(yōu)化執(zhí)行計(jì)劃的目的。

但是,由于這些統(tǒng)計(jì)數(shù)據(jù)是需要預(yù)先處理的,會(huì)過(guò)時(shí),所以我們?cè)谟眠^(guò)時(shí)的數(shù)據(jù)進(jìn)行判斷,在某些情況下反而會(huì)變成負(fù)面效果,拉低了SQL執(zhí)行效率。

Spark3.0的AQE框架用了三招解決這個(gè)問(wèn)題:

  • 動(dòng)態(tài)合并shuffle分區(qū)(Dynamically coalescing shuffle partitions)

  • 動(dòng)態(tài)調(diào)整Join策略(Dynamically switching join strategies)

  • 動(dòng)態(tài)優(yōu)化數(shù)據(jù)傾斜Join(Dynamically optimizing skew joins)

下面我們來(lái)詳細(xì)介紹這三個(gè)特性。

動(dòng)態(tài)合并 shuffle 的分區(qū)

在我們處理的數(shù)據(jù)量級(jí)非常大時(shí),shuffle通常來(lái)說(shuō)是最影響性能的。因?yàn)閟huffle是一個(gè)非常耗時(shí)的算子,它需要通過(guò)網(wǎng)絡(luò)移動(dòng)數(shù)據(jù),分發(fā)給下游算子。 在shuffle中,partition的數(shù)量十分關(guān)鍵。partition的最佳數(shù)量取決于數(shù)據(jù),而數(shù)據(jù)大小在不同的query不同stage都會(huì)有很大的差異,所以很難去確定一個(gè)具體的數(shù)目:

  • 如果partition過(guò)少,每個(gè)partition數(shù)據(jù)量就會(huì)過(guò)多,可能就會(huì)導(dǎo)致大量數(shù)據(jù)要落到磁盤(pán)上,從而拖慢了查詢(xún)。

  • 如果partition過(guò)多,每個(gè)partition數(shù)據(jù)量就會(huì)很少,就會(huì)產(chǎn)生很多額外的網(wǎng)絡(luò)開(kāi)銷(xiāo),并且影響Spark task scheduler,從而拖慢查詢(xún)。

為了解決該問(wèn)題,我們?cè)谧铋_(kāi)始設(shè)置相對(duì)較大的shuffle partition個(gè)數(shù),通過(guò)執(zhí)行過(guò)程中shuffle文件的數(shù)據(jù)來(lái)合并相鄰的小partitions。 例如,假設(shè)我們執(zhí)行SELECT max(i) FROM tbl GROUP BY j,表tbl只有2個(gè)partition并且數(shù)據(jù)量非常小。我們將初始shuffle partition設(shè)為5,因此在分組后會(huì)出現(xiàn)5個(gè)partitions。若不進(jìn)行AQE優(yōu)化,會(huì)產(chǎn)生5個(gè)tasks來(lái)做聚合結(jié)果,事實(shí)上有3個(gè)partitions數(shù)據(jù)量是非常小的。

Spark 3.0 AQE及CBO的示例分析

然而在這種情況下,AQE只會(huì)生成3個(gè)reduce task。

Spark 3.0 AQE及CBO的示例分析

動(dòng)態(tài)切換join策略

Spark 支持許多 Join 策略,其中 broadcast hash join 通常是性能最好的,前提是參加 join 的一張表的數(shù)據(jù)能夠裝入內(nèi)存。由于這個(gè)原因,當(dāng) Spark 估計(jì)參加 join 的表數(shù)據(jù)量小于廣播大小的閾值時(shí),其會(huì)將 Join 策略調(diào)整為 broadcast hash join。但是,很多情況都可能導(dǎo)致這種大小估計(jì)出錯(cuò)——例如存在一個(gè)非常有選擇性的過(guò)濾器。

由于A(yíng)QE擁有精確的上游統(tǒng)計(jì)數(shù)據(jù),因此可以解決該問(wèn)題。比如下面這個(gè)例子,右表的實(shí)際大小為15M,而在該場(chǎng)景下,經(jīng)過(guò)filter過(guò)濾后,實(shí)際參與join的數(shù)據(jù)大小為8M,小于了默認(rèn)broadcast閾值10M,應(yīng)該被廣播。

Spark 3.0 AQE及CBO的示例分析

在我們執(zhí)行過(guò)程中轉(zhuǎn)化為BHJ的同時(shí),我們甚至可以將傳統(tǒng)shuffle優(yōu)化為本地shuffle(例如shuffle讀在mapper而不是基于reducer)來(lái)減小網(wǎng)絡(luò)開(kāi)銷(xiāo)。

動(dòng)態(tài)優(yōu)化數(shù)據(jù)傾斜

Join里如果出現(xiàn)某個(gè)key的數(shù)據(jù)傾斜問(wèn)題,那么基本上就是這個(gè)任務(wù)的性能殺手了。在A(yíng)QE之前,用戶(hù)沒(méi)法自動(dòng)處理Join中遇到的這個(gè)棘手問(wèn)題,需要借助外部手動(dòng)收集數(shù)據(jù)統(tǒng)計(jì)信息,并做額外的加鹽,分批處理數(shù)據(jù)等相對(duì)繁瑣的方法來(lái)應(yīng)對(duì)數(shù)據(jù)傾斜問(wèn)題。

數(shù)據(jù)傾斜本質(zhì)上是由于集群上數(shù)據(jù)在分區(qū)之間分布不均勻所導(dǎo)致的,它會(huì)拉慢join場(chǎng)景下整個(gè)查詢(xún)。AQE根據(jù)shuffle文件統(tǒng)計(jì)數(shù)據(jù)自動(dòng)檢測(cè)傾斜數(shù)據(jù),將那些傾斜的分區(qū)打散成小的子分區(qū),然后各自進(jìn)行join。

我們可以看下這個(gè)場(chǎng)景,Table A join Table B,其中Table A的partition A0數(shù)據(jù)遠(yuǎn)大于其他分區(qū)。

Spark 3.0 AQE及CBO的示例分析

AQE會(huì)將partition A0切分成2個(gè)子分區(qū),并且讓他們獨(dú)自和Table B的partition B0進(jìn)行join。

Spark 3.0 AQE及CBO的示例分析

如果不做這個(gè)優(yōu)化,SMJ將會(huì)產(chǎn)生4個(gè)tasks并且其中一個(gè)執(zhí)行時(shí)間遠(yuǎn)大于其他。經(jīng)優(yōu)化,這個(gè)join將會(huì)有5個(gè)tasks,但每個(gè)task執(zhí)行耗時(shí)差不多相同,因此個(gè)整個(gè)查詢(xún)帶來(lái)了更好的性能。

如何開(kāi)啟AQE

我們可以設(shè)置參數(shù)spark.sql.adaptive.enabled為true來(lái)開(kāi)啟AQE,在Spark 3.0中默認(rèn)是false,并滿(mǎn)足以下條件:

  • 非流式查詢(xún)

  • 包含至少一個(gè)exchange(如join、聚合、窗口算子)或者一個(gè)子查詢(xún)

AQE通過(guò)減少了對(duì)靜態(tài)統(tǒng)計(jì)數(shù)據(jù)的依賴(lài),成功解決了Spark CBO的一個(gè)難以處理的trade off(生成統(tǒng)計(jì)數(shù)據(jù)的開(kāi)銷(xiāo)和查詢(xún)耗時(shí))以及數(shù)據(jù)精度問(wèn)題。相比之前具有局限性的CBO,現(xiàn)在就顯得非常靈活。

Spark CBO源碼實(shí)現(xiàn)

Adaptive Execution 模式是在使用Spark物理執(zhí)行計(jì)劃注入生成的。在QueryExecution類(lèi)中有 preparations 一組優(yōu)化器來(lái)對(duì)物理執(zhí)行計(jì)劃進(jìn)行優(yōu)化, InsertAdaptiveSparkPlan 就是第一個(gè)優(yōu)化器。

InsertAdaptiveSparkPlan 使用 PlanAdaptiveSubqueries Rule對(duì)部分SubQuery處理后,將當(dāng)前 Plan 包裝成 AdaptiveSparkPlanExec 。

當(dāng)執(zhí)行 AdaptiveSparkPlanExec 的 collect() 或 take() 方法時(shí),全部會(huì)先執(zhí)行 getFinalPhysicalPlan() 方法生成新的SparkPlan,再執(zhí)行對(duì)應(yīng)的SparkPlan對(duì)應(yīng)的方法。

// QueryExecution類(lèi)
lazy val executedPlan: SparkPlan = {
    executePhase(QueryPlanningTracker.PLANNING) {
      QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
    }
  }

  protected def preparations: Seq[Rule[SparkPlan]] = {
    QueryExecution.preparations(sparkSession,
      Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))))
  }

  private[execution] def preparations(
      sparkSession: SparkSession,
      adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = {
    // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
    // as the original plan is hidden behind `AdaptiveSparkPlanExec`.
    adaptiveExecutionRule.toSeq ++
    Seq(
      PlanDynamicPruningFilters(sparkSession),
      PlanSubqueries(sparkSession),
      EnsureRequirements(sparkSession.sessionState.conf),
      ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
        sparkSession.sessionState.columnarRules),
      CollapseCodegenStages(sparkSession.sessionState.conf),
      ReuseExchange(sparkSession.sessionState.conf),
      ReuseSubquery(sparkSession.sessionState.conf)
    )
  }


// InsertAdaptiveSparkPlan 
  override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false)

  private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match {
   // ...some checking
    case _ if shouldApplyAQE(plan, isSubquery) =>
      if (supportAdaptive(plan)) {
        try {
          // Plan sub-queries recursively and pass in the shared stage cache for exchange reuse.
          // Fall back to non-AQE mode if AQE is not supported in any of the sub-queries.
          val subqueryMap = buildSubqueryMap(plan)
          val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap)
          val preprocessingRules = Seq(
            planSubqueriesRule)
          // Run pre-processing rules.
          val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules)
          logDebug(s"Adaptive execution enabled for plan: $plan")
          AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery)
        } catch {
          case SubqueryAdaptiveNotSupportedException(subquery) =>
            logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
              s"but is not supported for sub-query: $subquery.")
            plan
        }
      } else {
        logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
          s"but is not supported for query: $plan.")
        plan
      }
    case _ => plan
  }

AQE對(duì)Stage 分階段提交執(zhí)行和優(yōu)化過(guò)程如下:

  private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
    // 第一次調(diào)用 getFinalPhysicalPlan方法時(shí)為false,等待該方法執(zhí)行完畢,全部Stage不會(huì)再改變,直接返回最終plan
    if (isFinalPlan) return currentPhysicalPlan

    // In case of this adaptive plan being executed out of `withActive` scoped functions, e.g.,
    // `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be
    // created in the middle of the execution.
    context.session.withActive {
      val executionId = getExecutionId
      var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
      var result = createQueryStages(currentPhysicalPlan)
      val events = new LinkedBlockingQueue[StageMaterializationEvent]()
      val errors = new mutable.ArrayBuffer[Throwable]()
      var stagesToReplace = Seq.empty[QueryStageExec]
      while (!result.allChildStagesMaterialized) {
        currentPhysicalPlan = result.newPlan
        // 接下來(lái)有哪些Stage要執(zhí)行,參考 createQueryStages(plan: SparkPlan) 方法
        if (result.newStages.nonEmpty) {
          stagesToReplace = result.newStages ++ stagesToReplace
          // onUpdatePlan 通過(guò)listener更新UI
          executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))

          // Start materialization of all new stages and fail fast if any stages failed eagerly
          result.newStages.foreach { stage =>
            try {
              // materialize() 方法對(duì)Stage的作為一個(gè)單獨(dú)的Job提交執(zhí)行,并返回 SimpleFutureAction 來(lái)接收?qǐng)?zhí)行結(jié)果
              // QueryStageExec: materialize() -> doMaterialize() ->
              // ShuffleExchangeExec: -> mapOutputStatisticsFuture -> ShuffleExchangeExec
              // SparkContext: -> submitMapStage(shuffleDependency)
              stage.materialize().onComplete { res =>
                if (res.isSuccess) {
                  events.offer(StageSuccess(stage, res.get))
                } else {
                  events.offer(StageFailure(stage, res.failed.get))
                }
              }(AdaptiveSparkPlanExec.executionContext)
            } catch {
              case e: Throwable =>
                cleanUpAndThrowException(Seq(e), Some(stage.id))
            }
          }
        }

        // Wait on the next completed stage, which indicates new stats are available and probably
        // new stages can be created. There might be other stages that finish at around the same
        // time, so we process those stages too in order to reduce re-planning.
        // 等待,直到有Stage執(zhí)行完畢
        val nextMsg = events.take()
        val rem = new util.ArrayList[StageMaterializationEvent]()
        events.drainTo(rem)
        (Seq(nextMsg) ++ rem.asScala).foreach {
          case StageSuccess(stage, res) =>
            stage.resultOption = Some(res)
          case StageFailure(stage, ex) =>
            errors.append(ex)
        }

        // In case of errors, we cancel all running stages and throw exception.
        if (errors.nonEmpty) {
          cleanUpAndThrowException(errors, None)
        }

        // Try re-optimizing and re-planning. Adopt the new plan if its cost is equal to or less
        // than that of the current plan; otherwise keep the current physical plan together with
        // the current logical plan since the physical plan's logical links point to the logical
        // plan it has originated from.
        // Meanwhile, we keep a list of the query stages that have been created since last plan
        // update, which stands for the "semantic gap" between the current logical and physical
        // plans. And each time before re-planning, we replace the corresponding nodes in the
        // current logical plan with logical query stages to make it semantically in sync with
        // the current physical plan. Once a new plan is adopted and both logical and physical
        // plans are updated, we can clear the query stage list because at this point the two plans
        // are semantically and physically in sync again.
        // 對(duì)前面的Stage替換為 LogicalQueryStage 節(jié)點(diǎn)
        val logicalPlan = replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace)
        // 再次調(diào)用optimizer 和planner 進(jìn)行優(yōu)化
        val (newPhysicalPlan, newLogicalPlan) = reOptimize(logicalPlan)
        val origCost = costEvaluator.evaluateCost(currentPhysicalPlan)
        val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
        if (newCost < origCost ||
            (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
          logOnLevel(s"Plan changed from $currentPhysicalPlan to $newPhysicalPlan")
          cleanUpTempTags(newPhysicalPlan)
          currentPhysicalPlan = newPhysicalPlan
          currentLogicalPlan = newLogicalPlan
          stagesToReplace = Seq.empty[QueryStageExec]
        }
        // Now that some stages have finished, we can try creating new stages.
        // 進(jìn)入下一輪循環(huán),如果存在Stage執(zhí)行完畢, 對(duì)應(yīng)的resultOption 會(huì)有值,對(duì)應(yīng)的allChildStagesMaterialized 屬性 = true
        result = createQueryStages(currentPhysicalPlan)
      }

      // Run the final plan when there's no more unfinished stages.
      // 所有前置stage全部執(zhí)行完畢,根據(jù)stats信息優(yōu)化物理執(zhí)行計(jì)劃,確定最終的 physical plan
      currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules)
      isFinalPlan = true
      executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
      currentPhysicalPlan
    }
  }
// SparkContext
  /**
   * Submit a map stage for execution. This is currently an internal API only, but might be
   * promoted to DeveloperApi in the future.
   */
  private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C])
      : SimpleFutureAction[MapOutputStatistics] = {
    assertNotStopped()
    val callSite = getCallSite()
    var result: MapOutputStatistics = null
    val waiter = dagScheduler.submitMapStage(
      dependency,
      (r: MapOutputStatistics) => { result = r },
      callSite,
      localProperties.get)
    new SimpleFutureAction[MapOutputStatistics](waiter, result)
  }


// DAGScheduler
  def submitMapStage[K, V, C](
      dependency: ShuffleDependency[K, V, C],
      callback: MapOutputStatistics => Unit,
      callSite: CallSite,
      properties: Properties): JobWaiter[MapOutputStatistics] = {

    val rdd = dependency.rdd
    val jobId = nextJobId.getAndIncrement()
    if (rdd.partitions.length == 0) {
      throw new SparkException("Can't run submitMapStage on RDD with 0 partitions")
    }

    // We create a JobWaiter with only one "task", which will be marked as complete when the whole
    // map stage has completed, and will be passed the MapOutputStatistics for that stage.
    // This makes it easier to avoid race conditions between the user code and the map output
    // tracker that might result if we told the user the stage had finished, but then they queries
    // the map output tracker and some node failures had caused the output statistics to be lost.
    val waiter = new JobWaiter[MapOutputStatistics](
      this, jobId, 1,
      (_: Int, r: MapOutputStatistics) => callback(r))
    eventProcessLoop.post(MapStageSubmitted(
      jobId, dependency, callSite, waiter, Utils.cloneProperties(properties)))
    waiter
  }

當(dāng)前,AdaptiveSparkPlanExec 中對(duì)物理執(zhí)行的優(yōu)化器列表如下:

// AdaptiveSparkPlanExec
  @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
    ReuseAdaptiveSubquery(conf, context.subqueryCache),
    CoalesceShufflePartitions(context.session),
    // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
    // added by `CoalesceShufflePartitions`. So they must be executed after it.
    OptimizeSkewedJoin(conf),
    OptimizeLocalShuffleReader(conf),
    ApplyColumnarRulesAndInsertTransitions(conf, context.session.sessionState.columnarRules),
    CollapseCodegenStages(conf)
  )

其中 OptimizeSkewedJoin方法就是針對(duì)最容易出現(xiàn)數(shù)據(jù)傾斜的Join進(jìn)行的優(yōu)化:

AQE模式下,每個(gè)Stage執(zhí)行之前,前置依賴(lài)Stage已經(jīng)全部執(zhí)行完畢,那么就可以獲取到每個(gè)Stage的stats信息。 當(dāng)發(fā)現(xiàn)shuffle partition的輸出超過(guò)partition size的中位數(shù)的5倍,且partition的輸出大于 256M 會(huì)被判斷產(chǎn)生數(shù)據(jù)傾斜, 將partition 數(shù)據(jù)按照targetSize進(jìn)行切分為N份。 targetSize = max(64M, 非數(shù)據(jù)傾斜partition的平均大小)。

優(yōu)化前 shuffle 如下:

Spark 3.0 AQE及CBO的示例分析

優(yōu)化后 shuffle:

Spark 3.0 AQE及CBO的示例分析

Spark3.0AQE在FreeWheel的應(yīng)用與實(shí)踐

FreeWheel團(tuán)隊(duì)通過(guò)高效的敏捷開(kāi)發(fā)趕在 2020 年圣誕廣告季之前在生產(chǎn)環(huán)境順利發(fā)布上線(xiàn),整體性能提升高達(dá) 40%(對(duì)于大 batch)的數(shù)據(jù),AWS Cost 平均節(jié)省 25%~30%之間,大約每年至少能為公司節(jié)省百萬(wàn)成本。

主要升級(jí)改動(dòng)

打開(kāi) Spark 3.0 AQE 的新特性,主要配置如下:

  "spark.sql.adaptive.enabled": true,
  "spark.sql.adaptive.coalescePartitions.enabled": true,
  "spark.sql.adaptive.coalescePartitions.minPartitionNum": 1,
  "spark.sql.adaptive.advisoryPartitionSizeInBytes": "128MB"

需要注意的是,AQE 特性只是在 reducer 階段不用指定 reducer 的個(gè)數(shù),但并不代表你不再需要指定任務(wù)的并行度了。因?yàn)?map 階段仍然需要將數(shù)據(jù)劃分為合適的分區(qū)進(jìn)行處理,如果沒(méi)有指定并行度會(huì)使用默認(rèn)的 200,當(dāng)數(shù)據(jù)量過(guò)大時(shí),很容易出現(xiàn) OOM。建議還是按照任務(wù)之前的并行度設(shè)置來(lái)配置參數(shù)spark.sql.shuffle.partitions和spark.default.parallelism。

我們來(lái)仔細(xì)看一下為什么升級(jí)到 3.0 以后可以減少運(yùn)行時(shí)間,又能節(jié)省集群的成本。 以 Optimus 數(shù)據(jù)建模里的一張表的運(yùn)行情況為例:

  • 在 reduce 階段從沒(méi)有 AQE 的40320個(gè) tasks 銳減到4580個(gè) tasks,減少了一個(gè)數(shù)量級(jí)。

  • 下圖里下半部分是沒(méi)有 AQE 的 Spark 2.x 的 task 情況,上半部分是打開(kāi) AQE 特性后的 Spark 3.x 的情況。

Spark 3.0 AQE及CBO的示例分析

  • 從更詳細(xì)的運(yùn)行時(shí)間圖來(lái)看,shuffler reader后同樣的 aggregate 的操作等時(shí)間也從4.44h到2.56h,節(jié)省將近一半。

  • 左邊是 spark 2.x 的運(yùn)行指標(biāo)明細(xì),右邊是打開(kāi) AQE 后通過(guò)custom shuffler reader后的運(yùn)行指標(biāo)情況。

Spark 3.0 AQE及CBO的示例分析

性能提升

AQE性能

AQE對(duì)于整體的 Spark SQL 的執(zhí)行過(guò)程做了相應(yīng)的調(diào)整和優(yōu)化(如下圖),它最大的亮點(diǎn)是可以根據(jù)已經(jīng)完成的計(jì)劃結(jié)點(diǎn)真實(shí)且精確的執(zhí)行統(tǒng)計(jì)結(jié)果來(lái)不停的反饋并重新優(yōu)化剩下的執(zhí)行計(jì)劃。

Spark 3.0 AQE及CBO的示例分析

AQE 自動(dòng)調(diào)整 reducer 的數(shù)量,減小 partition 數(shù)量。Spark 任務(wù)的并行度一直是讓用戶(hù)比較困擾的地方。如果并行度太大的話(huà),會(huì)導(dǎo)致 task 過(guò)多,overhead 比較大,整體拉慢任務(wù)的運(yùn)行。而如果并行度太小的,數(shù)據(jù)分區(qū)會(huì)比較大,容易出現(xiàn) OOM 的問(wèn)題,并且資源也得不到合理的利用,并行運(yùn)行任務(wù)優(yōu)勢(shì)得不到最大的發(fā)揮。

而且由于 Spark Context 整個(gè)任務(wù)的并行度,需要一開(kāi)始設(shè)定好且沒(méi)法動(dòng)態(tài)修改,這就很容易出現(xiàn)任務(wù)剛開(kāi)始的時(shí)候數(shù)據(jù)量大需要大的并行度,而運(yùn)行的過(guò)程中通過(guò)轉(zhuǎn)化過(guò)濾可能最終的數(shù)據(jù)集已經(jīng)變得很小,最初設(shè)定的分區(qū)數(shù)就顯得過(guò)大了。AQE 能夠很好的解決這個(gè)問(wèn)題,在 reducer 去讀取數(shù)據(jù)時(shí),會(huì)根據(jù)用戶(hù)設(shè)定的分區(qū)數(shù)據(jù)的大小(spark.sql.adaptive.advisoryPartitionSizeInBytes)來(lái)自動(dòng)調(diào)整和合并(Coalesce)小的 partition,自適應(yīng)地減小 partition 的數(shù)量,以減少資源浪費(fèi)和 overhead,提升任務(wù)的性能。

由上面單張表可以看到,打開(kāi) AQE 的時(shí)候極大的降低了 task 的數(shù)量,除了減輕了 Driver 的負(fù)擔(dān),也減少啟動(dòng) task 帶來(lái)的 schedule,memory,啟動(dòng)管理等 overhead,減少 cpu 的占用,提升的 I/O 性能。

拿歷史 Data Pipelines 為例,同時(shí)會(huì)并行有三十多張表在 Spark 里運(yùn)行,每張表都有極大的性能提升,那么也使得其他的表能夠獲得資源更早更多,互相受益,那么最終整個(gè)的數(shù)據(jù)建模過(guò)程會(huì)自然而然有一個(gè)加速的結(jié)果。

大 batch(>200G)相對(duì)小 batch(< 100G )有比較大的提升,有高達(dá) 40%提升,主要是因?yàn)榇?batch 本身數(shù)據(jù)量大,需要機(jī)器數(shù)多,設(shè)置并發(fā)度也更大,那么 AQE 展現(xiàn)特性的時(shí)刻會(huì)更多更明顯。而小 batch 并發(fā)度相對(duì)較低,那么提升也就相對(duì)會(huì)少一些,不過(guò)也是有 27.5%左右的加速。

內(nèi)存優(yōu)化

除了因?yàn)?AQE 的打開(kāi),減少過(guò)碎的 task 對(duì)于 memory 的占用外,Spark 3.0 也在其他地方做了很多內(nèi)存方面的優(yōu)化,比如 Aggregate 部分指標(biāo)瘦身、Netty 的共享內(nèi)存 Pool 功能、Task Manager 死鎖問(wèn)題、避免某些場(chǎng)景下從網(wǎng)絡(luò)讀取 shuffle block等等,來(lái)減少內(nèi)存的壓力。一系列內(nèi)存的優(yōu)化加上 AQE 特性疊加從前文內(nèi)存實(shí)踐圖中可以看到集群的內(nèi)存使用同時(shí)有30%左右的下降。

實(shí)踐成果

升級(jí)主要的實(shí)踐成果如下:

性能提升明顯

  • 歷史數(shù)據(jù) Pipeline 對(duì)于大 batch 的數(shù)據(jù)(200~400G/每小時(shí))性能提升高達(dá)40%, 對(duì)于小 batch(小于 100G/每小時(shí))提升效果沒(méi)有大 batch 提升的那么明顯,每天所有 batches平均提升水平27.5%左右。

  • 預(yù)測(cè)數(shù)據(jù)性能平均提升30%。由于數(shù)據(jù)輸入源不一樣,目前是分別兩個(gè) pipelines 在跑歷史和預(yù)測(cè)數(shù)據(jù),產(chǎn)生的表的數(shù)目也不太一樣,因此做了分別的評(píng)估。

以歷史數(shù)據(jù)上線(xiàn)后的端到端到運(yùn)行時(shí)間為例(如下圖),肉眼可見(jiàn)上線(xiàn)后整體 pipeline 的運(yùn)行時(shí)間有了明顯的下降,能夠更快的輸出數(shù)據(jù)供下游使用。

Spark 3.0 AQE及CBO的示例分析

集群內(nèi)存使用降低

集群內(nèi)存使用對(duì)于大 batch 達(dá)降低30%左右,每天平均平均節(jié)省25%左右。

以歷史數(shù)據(jù)上線(xiàn)后的運(yùn)行時(shí)集群的 memory 在 ganglia 上的截圖為例(如下圖),整體集群的內(nèi)存使用從 41.2T 降到 30.1T,這意味著我們可以用更少的機(jī)器花更少的錢(qián)來(lái)跑同樣的 Spark 任務(wù)。

Spark 3.0 AQE及CBO的示例分析

AWS Cost 降低

Pipelines 做了自動(dòng)的 Scale In/Scale Out 策略: 在需要資源的時(shí)候擴(kuò)集群的 Task 結(jié)點(diǎn),在任務(wù)結(jié)束后自動(dòng)去縮集群的 Task 結(jié)點(diǎn),且會(huì)根據(jù)每次 batch 數(shù)據(jù)的大小通過(guò)算法學(xué)習(xí)得到最佳的機(jī)器數(shù)。通過(guò)升級(jí)到 Spark 3.0 后,由于現(xiàn)在任務(wù)跑的更快并且需要的機(jī)器更少,上線(xiàn)后統(tǒng)計(jì) AWS Cost 每天節(jié)省30%左右,大約一年能為公司節(jié)省百萬(wàn)成本。

關(guān)于Spark 3.0 AQE及CBO的示例分析就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI