溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

7.spark core之?dāng)?shù)據(jù)分區(qū)

發(fā)布時(shí)間:2020-07-25 13:43:08 來源:網(wǎng)絡(luò) 閱讀:1427 作者:菲立思教育 欄目:大數(shù)據(jù)

簡(jiǎn)介

??spark一個(gè)最重要的特性就是對(duì)數(shù)據(jù)集在各個(gè)節(jié)點(diǎn)的分區(qū)進(jìn)行控制??刂茢?shù)據(jù)分布可以減少網(wǎng)絡(luò)開銷,極大地提升整體性能。

??只有Pair RDD才有分區(qū),非Pair RDD分區(qū)的值是None。如果RDD只被掃描一次,沒必要預(yù)先分區(qū)處理;如果RDD多次在諸如連接這種基于鍵的操作中使用時(shí),分區(qū)才有作用。

分區(qū)器

??分區(qū)器決定了RDD的分區(qū)個(gè)數(shù)及每條數(shù)據(jù)最終屬于哪個(gè)分區(qū)。

??spark提供了兩個(gè)分區(qū)器:HashPartitioner和RangePartitioner,它們都繼承于org.apache.spark.Partitioner類并實(shí)現(xiàn)三個(gè)方法。

  • numPartitions: Int: 指定分區(qū)數(shù)
  • getPartition(key: Any): Int: 分區(qū)編號(hào)(0~numPartitions-1)
  • equals(): 檢查分區(qū)器對(duì)象是否和其他分區(qū)器實(shí)例相同,判斷兩個(gè)RDD分區(qū)方式是否一樣。

HashPartitioner分區(qū)

??HashPartitioner分區(qū)執(zhí)行原理:對(duì)于給定的key,計(jì)算其hashCode,再除以分區(qū)數(shù)取余,最后的值就是這個(gè)key所屬的分區(qū)ID。實(shí)現(xiàn)如下:

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

RangePartitioner分區(qū)

??HashPartitioner分區(qū)可能導(dǎo)致每個(gè)分區(qū)中數(shù)據(jù)量的不均勻。而RangePartitioner分區(qū)則盡量保證每個(gè)分區(qū)中數(shù)據(jù)量的均勻,將一定范圍內(nèi)的數(shù)映射到某一個(gè)分區(qū)內(nèi)。分區(qū)與分區(qū)之間數(shù)據(jù)是有序的,但分區(qū)內(nèi)的元素是不能保證順序的。

??RangePartitioner分區(qū)執(zhí)行原理:

  • 計(jì)算總體的數(shù)據(jù)抽樣大小sampleSize,計(jì)算規(guī)則是:至少每個(gè)分區(qū)抽取20個(gè)數(shù)據(jù)或者最多1M的數(shù)據(jù)量。
  • 根據(jù)sampleSize和分區(qū)數(shù)量計(jì)算每個(gè)分區(qū)的數(shù)據(jù)抽樣樣本數(shù)量sampleSizePrePartition
  • 調(diào)用RangePartitioner的sketch函數(shù)進(jìn)行數(shù)據(jù)抽樣,計(jì)算出每個(gè)分區(qū)的樣本。
  • 計(jì)算樣本的整體占比以及數(shù)據(jù)量過多的數(shù)據(jù)分區(qū),防止數(shù)據(jù)傾斜。
  • 對(duì)于數(shù)據(jù)量比較多的RDD分區(qū)調(diào)用RDD的sample函數(shù)API重新進(jìn)行數(shù)據(jù)抽取。
  • 將最終的樣本數(shù)據(jù)通過RangePartitoner的determineBounds函數(shù)進(jìn)行數(shù)據(jù)排序分配,計(jì)算出rangeBounds。
class RangePartitioner[K: Ordering : ClassTag, V](
                partitions: Int,
                rdd: RDD[_ <: Product2[K, V]],
                private var ascending: Boolean = true)
  extends Partitioner {

  // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
  require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")

  // 獲取RDD中K類型數(shù)據(jù)的排序器
  private var ordering = implicitly[Ordering[K]]

  // An array of upper bounds for the first (partitions - 1) partitions
  private var rangeBounds: Array[K] = {
    if (partitions <= 1) {
      // 如果給定的分區(qū)數(shù)小于等于1的情況下,直接返回一個(gè)空的集合,表示數(shù)據(jù)不進(jìn)行分區(qū)
      Array.empty
    } else {
      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      // 給定總的數(shù)據(jù)抽樣大小,最多1M的數(shù)據(jù)量(10^6),最少20倍的RDD分區(qū)數(shù)量,也就是每個(gè)RDD分區(qū)至少抽取20條數(shù)據(jù)
      val sampleSize = math.min(20.0 * partitions, 1e6)
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      // RDD各分區(qū)中的數(shù)據(jù)量可能會(huì)出現(xiàn)傾斜的情況,乘于3的目的就是保證數(shù)據(jù)量小的分區(qū)能夠采樣到足夠的數(shù)據(jù),而對(duì)于數(shù)據(jù)量大的分區(qū)會(huì)進(jìn)行第二次采樣
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
      // 從rdd中抽取數(shù)據(jù),返回值:(總rdd數(shù)據(jù)量, Array[分區(qū)id,當(dāng)前分區(qū)的數(shù)據(jù)量,當(dāng)前分區(qū)抽取的數(shù)據(jù)])
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        // 如果總的數(shù)據(jù)量為0(RDD為空),那么直接返回一個(gè)空的數(shù)組
        Array.empty
      } else {
        // If a partition contains much more than the average number of items, we re-sample from it
        // to ensure that enough items are collected from that partition.
        // 計(jì)算總樣本數(shù)量和總記錄數(shù)的占比,占比最大為1.0
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        // 保存樣本數(shù)據(jù)的集合buffer
        val candidates = ArrayBuffer.empty[(K, Float)]
        // 保存數(shù)據(jù)分布不均衡的分區(qū)id(數(shù)據(jù)量超過fraction比率的分區(qū))
        val imbalancedPartitions = mutable.Set.empty[Int]
        // 計(jì)算抽取出來的樣本數(shù)據(jù)
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            // 如果fraction乘以當(dāng)前分區(qū)中的數(shù)據(jù)量大于之前計(jì)算的每個(gè)分區(qū)的抽象數(shù)據(jù)大小,那么表示當(dāng)前分區(qū)抽取的數(shù)據(jù)太少了,該分區(qū)數(shù)據(jù)分布不均衡,需要重新抽取
            imbalancedPartitions += idx
          } else {
            // 當(dāng)前分區(qū)不屬于數(shù)據(jù)分布不均衡的分區(qū),計(jì)算占比權(quán)重,并添加到candidates集合中
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.size).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }

        // 對(duì)于數(shù)據(jù)分布不均衡的RDD分區(qū),重新進(jìn)行數(shù)據(jù)抽樣
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          // 獲取數(shù)據(jù)分布不均衡的RDD分區(qū),并構(gòu)成RDD
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          // 隨機(jī)種子
          val seed = byteswap32(-rdd.id - 1)
          // 利用rdd的sample抽樣函數(shù)API進(jìn)行數(shù)據(jù)抽樣
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }

        // 將最終的抽樣數(shù)據(jù)計(jì)算出rangeBounds出來
        RangePartitioner.determineBounds(candidates, partitions)
      }
    }
  }

  // 下一個(gè)RDD的分區(qū)數(shù)量是rangeBounds數(shù)組中元素?cái)?shù)量+ 1個(gè)
  def numPartitions: Int = rangeBounds.length + 1

  // 二分查找器,內(nèi)部使用java中的Arrays類提供的二分查找方法
  private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

  // 根據(jù)RDD的key值返回對(duì)應(yīng)的分區(qū)id。從0開始
  def getPartition(key: Any): Int = {
    // 強(qiáng)制轉(zhuǎn)換key類型為RDD中原本的數(shù)據(jù)類型
    val k = key.asInstanceOf[K]
    var partition = 0
    if (rangeBounds.length <= 128) {
      // If we have less than 128 partitions naive search
      // 如果分區(qū)數(shù)據(jù)小于等于128個(gè),那么直接本地循環(huán)尋找當(dāng)前k所屬的分區(qū)下標(biāo)
      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
        partition += 1
      }
    } else {
      // Determine which binary search method to use only once.
      // 如果分區(qū)數(shù)量大于128個(gè),那么使用二分查找方法尋找對(duì)應(yīng)k所屬的下標(biāo);
      // 但是如果k在rangeBounds中沒有出現(xiàn),實(shí)質(zhì)上返回的是一個(gè)負(fù)數(shù)(范圍)或者是一個(gè)超過rangeBounds大小的數(shù)(最后一個(gè)分區(qū),比所有數(shù)據(jù)都大)
      partition = binarySearch(rangeBounds, k)
      // binarySearch either returns the match location or -[insertion point]-1
      if (partition < 0) {
        partition = -partition - 1
      }
      if (partition > rangeBounds.length) {
        partition = rangeBounds.length
      }
    }

    // 根據(jù)數(shù)據(jù)排序是升序還是降序進(jìn)行數(shù)據(jù)的排列,默認(rèn)為升序
    if (ascending) {
      partition
    } else {
      rangeBounds.length - partition
    }
  }

影響分區(qū)的算子操作

??影響分區(qū)的算子操作有:cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、partitionBy()、repartition()、coalesce()、sort()、mapValues()(如果父RDD有分區(qū)方式)、flatMapValues()(如果父RDD有分區(qū)方式)。

??對(duì)于執(zhí)行兩個(gè)RDD的算子操作,輸出數(shù)據(jù)的分區(qū)方式取決于父RDD的分區(qū)方式。默認(rèn)情況下,結(jié)果會(huì)采用哈希分區(qū),分區(qū)的數(shù)量和操作的并行度一樣。不過,如果其中一個(gè)父RDD設(shè)置過分區(qū)方式,結(jié)果就采用那種分區(qū)方式;如果兩個(gè)父RDD都設(shè)置過分區(qū)方式,結(jié)果RDD采用第一個(gè)父RDD的分區(qū)方式。

repartition和partitionBy的區(qū)別

??repartition 和 partitionBy 都是對(duì)數(shù)據(jù)進(jìn)行重新分區(qū),默認(rèn)都是使用 HashPartitioner。但是二者之間的區(qū)別有:

  • partitionBy只能用于Pair RDD
  • 都作用于Pair RDD時(shí),結(jié)果也不一樣
    7.spark core之?dāng)?shù)據(jù)分區(qū)

??其實(shí)partitionBy的結(jié)果才是我們所預(yù)期的。repartition 其實(shí)使用了一個(gè)隨機(jī)生成的數(shù)來當(dāng)作 key,而不是使用原來的key。

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
}

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = (new Random(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions).values
    } else {
      new CoalescedRDD(this, numPartitions)
    }
}

repartition和coalesce的區(qū)別

??兩個(gè)算子都是對(duì)RDD的分區(qū)進(jìn)行重新劃分,repartition只是coalesce接口中shuffle為true的簡(jiǎn)易實(shí)現(xiàn),(假設(shè)RDD有N個(gè)分區(qū),需要重新劃分成M個(gè)分區(qū))

  • N<M。一般情況下N個(gè)分區(qū)有數(shù)據(jù)分布不均勻的狀況,利用HashPartitioner函數(shù)將數(shù)據(jù)重新分區(qū)為M個(gè),這時(shí)需要將shuffle設(shè)置為true。
  • 如果N>M并且N和M相差不多(假如N是1000,M是100),這時(shí)可以將shuffle設(shè)置為false,不進(jìn)行shuffle過程,父RDD和子RDD之間是窄依賴關(guān)系。在shuffle為false的情況下,如果N<M時(shí),coalesce是無效的。
  • 如果N>M并且兩者相差懸殊,這時(shí)如果將shuffle設(shè)置為false,父子RDD是窄依賴關(guān)系,同處在一個(gè)Stage中,就可能造成spark程序的并行度不夠,從而影響性能。如果在M為1的時(shí)候,為了使coalesce之前的操作有更好的并行度,可以將shuffle設(shè)置為true。

實(shí)例分析

需求

??統(tǒng)計(jì)用戶訪問其未訂閱主題頁(yè)面的情況。

  • 用戶信息表:由(UserID,UserInfo)組成的RDD,UserInfo包含該用戶所訂閱的主題列表。
  • 事件表:由(UserID,LinkInfo)組成的RDD,存放著每五分鐘內(nèi)網(wǎng)站各用戶訪問情況。

代碼實(shí)現(xiàn)

val sc = new SparkContext()
val userData = sc.sequenceFile[UserID,LinkInfo]("hdfs://...").persist
def processNewLogs(logFileName:String){
    val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
    //RDD of (UserID,(UserInfo,LinkInfo)) pairs
    val joined = usersData.join(events)
    val offTopicVisits = joined.filter {
        // Expand the tuple into its components
        case (userId, (userInfo, linkInfo)) => 
            !userInfo.topics.contains(linkInfo.topic)
    }.count()
    println("Number of visits to non-subscribed opics: " + offTopicVisits)
}

缺點(diǎn)

??連接操作會(huì)將兩個(gè)數(shù)據(jù)集中的所有鍵的哈希值都求出來,將哈希值相同的記錄通過網(wǎng)絡(luò)傳到同一臺(tái)機(jī)器上,然后再對(duì)所有鍵相同的記錄進(jìn)行連接操作。userData表數(shù)據(jù)量很大,所以這樣進(jìn)行哈希計(jì)算和跨節(jié)點(diǎn)數(shù)據(jù)混洗非常耗時(shí)。

改進(jìn)代碼實(shí)現(xiàn)

val userData = sc.sequenceFile[UserID,LinkInfo]("hdfs://...")
.partionBy(new HashPartiotioner(100))
.persist()

優(yōu)點(diǎn)

??userData表進(jìn)行了重新分區(qū),將鍵相同的數(shù)據(jù)都放在一個(gè)分區(qū)中。然后調(diào)用persist持久化結(jié)果數(shù)據(jù),不用每次都計(jì)算哈希和跨節(jié)點(diǎn)混洗。程序運(yùn)行速度顯著提升。


忠于技術(shù),熱愛分享。歡迎關(guān)注公眾號(hào):java大數(shù)據(jù)編程,了解更多技術(shù)內(nèi)容。

7.spark core之?dāng)?shù)據(jù)分區(qū)

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI