溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark中兩個類似的api是什么

發(fā)布時間:2022-01-14 17:00:08 來源:億速云 閱讀:104 作者:iii 欄目:云計算

這篇“Spark中兩個類似的api是什么”文章的知識點大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Spark中兩個類似的api是什么”文章吧。

Spark 中有兩個類似的api,分別是 reduceByKey  和 groupByKey  。這兩個的功能類似,但底層實現(xiàn)卻有些不同,那么為什么要這樣設計呢?我們來從源碼的角度分析一下。

先看兩者的調(diào)用順序(都是使用默認的Partitioner,即defaultPartitioner)

所用 spark 版本:spark 2.1.0

#### 先看reduceByKey
Step1
```
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }
```
Setp2
```
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }
```
Setp3
```
def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("HashPartitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }
```

姑且不去看方法里面的細節(jié),我們會只要知道最后調(diào)用的是 combineByKeyWithClassTag 這個方法。這個方法有兩個參數(shù)我們來重點看一下,
```
def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)
```
首先是 **partitioner** 參數(shù) ,這個即是 RDD 的分區(qū)設置。除了默認的 defaultPartitioner,Spark 還提供了 RangePartitioner 和 HashPartitioner 外,此外用戶也可以自定義 partitioner 。通過源碼可以發(fā)現(xiàn)如果是 HashPartitioner 的話,那么是會拋出一個錯誤的。

然后是 **mapSideCombine** 參數(shù) ,這個參數(shù)正是 reduceByKey 和 groupByKey 最大不同的地方,它決定是是否會先在節(jié)點上進行一次 Combine 操作,下面會有更具體的例子來介紹。

#### 然后是groupByKey
Step1
```
  def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
    groupByKey(defaultPartitioner(self))
  }
```
Step2
```
  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }
```
Setp3
```
def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("HashPartitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }
```

結(jié)合上面 reduceByKey 的調(diào)用鏈,可以發(fā)現(xiàn)最終其實都是調(diào)用 combineByKeyWithClassTag 這個方法的,但調(diào)用的參數(shù)不同。
reduceByKey的調(diào)用
```
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
```
groupByKey的調(diào)用
```
combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
```
正是兩者不同的調(diào)用方式導致了兩個方法的差別,我們分別來看
- reduceByKey的泛型參數(shù)直接是[V],而groupByKey的泛型參數(shù)是[CompactBuffer[V]]。這直接導致了 reduceByKey 和 groupByKey 的返回值不同,前者是RDD[(K, V)],而后者是RDD[(K, Iterable[V])]

- 然后就是mapSideCombine = false 了,這個mapSideCombine 參數(shù)的默認是true的。這個值有什么用呢,上面也說了,這個參數(shù)的作用是控制要不要在map端進行初步合并(Combine)??梢钥纯聪旅婢唧w的例子。


<img src="https://cache.yisu.com/upload/information/20210523/355/698556.png" width="65%" />

<img src="https://cache.yisu.com/upload/information/20210523/355/698557.png" width="65%" />

從功能上來說,可以發(fā)現(xiàn) ReduceByKey 其實就是會在每個節(jié)點先進行一次**合并**的操作,而 groupByKey 沒有。

這么來看 ReduceByKey 的性能會比 groupByKey 好很多,因為有些工作在節(jié)點已經(jīng)處理了。

以上就是關于“Spark中兩個類似的api是什么”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對大家有幫助,若想了解更多相關的知識內(nèi)容,請關注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI