溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

spark的動(dòng)態(tài)分區(qū)裁剪怎么實(shí)現(xiàn)

發(fā)布時(shí)間:2021-12-09 16:46:39 來源:億速云 閱讀:294 作者:iii 欄目:大數(shù)據(jù)

本篇內(nèi)容主要講解“spark的動(dòng)態(tài)分區(qū)裁剪怎么實(shí)現(xiàn)”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“spark的動(dòng)態(tài)分區(qū)裁剪怎么實(shí)現(xiàn)”吧!

背景

本文基于delta 0.7.0 spark 3.0.1 spark 3.x引入了動(dòng)態(tài)分區(qū)裁剪

分析

直接定位到PartitionPruning.applyPartitionPruning是邏輯計(jì)劃的規(guī)則

override def apply(plan: LogicalPlan): LogicalPlan = plan match {
    // Do not rewrite subqueries.
    case s: Subquery if s.correlated => plan
    case _ if !SQLConf.get.dynamicPartitionPruningEnabled => plan
    case _ => prune(plan)
  }
  • 當(dāng)是該邏輯計(jì)劃是子查詢且該子查詢是相關(guān)的,則直接跳過,因?yàn)橄嚓P(guān)的子查詢將會(huì)被重寫到j(luò)oin條件中

  • 如果沒有開啟動(dòng)態(tài)分區(qū),則直接跳過

  • 其他條件則會(huì)跳到下一步 下一步的條件,則是會(huì)判斷是否是包含join操作,如果是join操作才會(huì)進(jìn)行后續(xù)的操作:

private def prune(plan: LogicalPlan): LogicalPlan = {
    plan transformUp {
      // skip this rule if there's already a DPP subquery on the LHS of a join
      case j @ Join(Filter(_: DynamicPruningSubquery, _), _, _, _, _) => j
      case j @ Join(_, Filter(_: DynamicPruningSubquery, _), _, _, _) => j
      case j @ Join(left, right, joinType, Some(condition), hint) =>

具體分析一下每一步: 1.

var newLeft = left
        var newRight = right

        // extract the left and right keys of the join condition
        val (leftKeys, rightKeys) = j match {
          case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, _, _) => (lkeys, rkeys)
          case _ => (Nil, Nil)
        }
        //ExtractEquiJoinKeys的unapply方法
        def unapply(join: Join): Option[ReturnType] = join match {
    case Join(left, right, joinType, condition, hint) =>
      logDebug(s"Considering join on: $condition")
      // Find equi-join predicates that can be evaluated before the join, and thus can be used
      // as join keys.
      val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil)
      val joinKeys = predicates.flatMap {
        case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => None
        case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => Some((l, r))
        case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => Some((r, l))
        // Replace null with default value for joining key, then those rows with null in it could
        // be joined together
        case EqualNullSafe(l, r) if canEvaluate(l, left) && canEvaluate(r, right) =>
          Seq((Coalesce(Seq(l, Literal.default(l.dataType))),
            Coalesce(Seq(r, Literal.default(r.dataType)))),
            (IsNull(l), IsNull(r))
          )
        case EqualNullSafe(l, r) if canEvaluate(l, right) && canEvaluate(r, left) =>
          Seq((Coalesce(Seq(r, Literal.default(r.dataType))),
            Coalesce(Seq(l, Literal.default(l.dataType)))),
            (IsNull(r), IsNull(l))
          )
        case other => None
      }

ExtractEquiJoinKeys用來提取and條件分隔的多個(gè)條件,之后只有條件滿足相等的才能進(jìn)行下一步處理:

  • 如果相等但是左邊或者右邊的表達(dá)式的為空,則不匹配

  • 如果相等而且有對(duì)應(yīng)的邏輯計(jì)劃能夠產(chǎn)生對(duì)應(yīng)的屬性值,則匹配

  • 如果是EqualNullsafe,且有相應(yīng)的邏輯能夠產(chǎn)生相應(yīng)的屬性值,則會(huì)轉(zhuǎn)換為Coalesce和isnull的判斷

  • 之后轉(zhuǎn)化為leftKeys和rightKeys表達(dá)式 如join的條件是:tableA.a1 = tableB.b2 AND tableA.a2=tableB.b2 則經(jīng)過該過程得到的結(jié)果為leftKey為:Seq(tableA.a1,tableA.a2) rightKeys為:Seq(tableB.b1,tableB.b2)


 splitConjunctivePredicates(condition).foreach {
          case EqualTo(a: Expression, b: Expression)
              if fromDifferentSides(a, b) =>
            val (l, r) = if (a.references.subsetOf(left.outputSet) &&
              b.references.subsetOf(right.outputSet)) {
              a -> b
            } else {
              b -> a
            }

            // there should be a partitioned table and a filter on the dimension table,
            // otherwise the pruning will not trigger
            var partScan = getPartitionTableScan(l, left)
            if (partScan.isDefined && canPruneLeft(joinType) &&
                hasPartitionPruningFilter(right)) {
              val hasBenefit = pruningHasBenefit(l, partScan.get, r, right)
              newLeft = insertPredicate(l, newLeft, r, right, rightKeys, hasBenefit)
            } else {
              partScan = getPartitionTableScan(r, right)
              if (partScan.isDefined && canPruneRight(joinType) &&
                  hasPartitionPruningFilter(left) ) {
                val hasBenefit = pruningHasBenefit(r, partScan.get, l, left)
                newRight = insertPredicate(r, newRight, l, left, leftKeys, hasBenefit)
              }
            }
          case _ =>
        }

對(duì)每一個(gè)Equals對(duì),先對(duì)左邊表達(dá)式進(jìn)行g(shù)etPartitionTableScan 操作,該方法的作用是:

  • 找到該表達(dá)式的最終邏輯計(jì)劃,并且返回

  • 只有該邏輯計(jì)劃是HadoopFsRelation類型且存在partition列的時(shí)候,才返回該邏輯計(jì)劃

如果join左邊邏輯計(jì)劃滿足getPartitionTableScan,且join的類型是innerjoin/leftSemi/RightOuter,且該join右邊邏輯計(jì)劃不是一個(gè)流且存在比如> <這種的filter, 才會(huì)在左邊邏輯計(jì)劃插入一個(gè)DynamicPruningSubquery的父節(jié)點(diǎn),但是插入該節(jié)點(diǎn)還有兩個(gè)條件是pruningHasBenefit或者SQLConf.get.exchangeReuseEnabled 滿足,默認(rèn)SQLConf.get.exchangeReuseEnabled是ture 對(duì)于右邊的邏輯計(jì)劃也是類似的處理方式。只不過join的類型要求為inner/LeftOuter
pruningHasBenefit方法的計(jì)算邏輯為: 如果filterRatio*getPartitionTableScan.stats.sizeInByte>該邏輯計(jì)劃涉及的所有葉子節(jié)點(diǎn).stats.sizeInByte 則可以添加DynamicPruningSubquery

  1. 返回整個(gè)新的join操作

 Join(newLeft, newRight, joinType, Some(condition), hint

到此,相信大家對(duì)“spark的動(dòng)態(tài)分區(qū)裁剪怎么實(shí)現(xiàn)”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI