溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark2.x中如何用源碼剖析SortShuffleWriter具體實現(xiàn)

發(fā)布時間:2021-11-15 15:14:24 來源:億速云 閱讀:158 作者:柒染 欄目:大數(shù)據(jù)

這篇文章給大家介紹Spark2.x中如何用源碼剖析SortShuffleWriter具體實現(xiàn),內(nèi)容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

 一、概述

這里講解Spark Shuffle Write的第三種實現(xiàn)SortShuffleWriter,在ShuffleWrite階段,如果不滿足UnsafeShuffleWriter、BypassMergeSortShuffleWriter兩種條件,最后代碼執(zhí)行SortShuffleWriter,這里來看看他的具體實現(xiàn):

二、具體實現(xiàn)

    這里直接看Write()函數(shù),代碼如下:

 /** Write a bunch of records to this task's output */  override def write(records: Iterator[Product2[K, V]]): Unit = {    // 根據(jù)是否在map端進行數(shù)據(jù)合并初始化ExternalSorter    //ExternalSorter初始化對應參數(shù)的含義    // aggregator:在RDD shuffle時,map/reduce-side使用的aggregator    // partitioner:對shuffle的輸出,使用哪種partitioner對數(shù)據(jù)做分區(qū),比如hashPartitioner或者rangePartitioner    // ordering:根據(jù)哪個key做排序    // serializer:使用哪種序列化,如果沒有顯示指定,默認使用spark.serializer參數(shù)值    sorter = if (dep.mapSideCombine) {      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")      new ExternalSorter[K, V, C](        context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)    } else {      // 如果沒有map-side聚合,那么創(chuàng)建sorter對象時候,aggregator和ordering將不傳入對應的值      new ExternalSorter[K, V, V](        context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)    }    //通過insertAll方法先寫數(shù)據(jù)到buffer    sorter.insertAll(records)
   // 構造最終的輸出文件實例,其中文件名為(reduceId為0):    // "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId;    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)    //在輸出文件名后加上uuid用于標識文件正在寫入,結束后重命名    val tmp = Utils.tempFileWith(output)
   try {      val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
     //將排序后的record寫入輸出文件      val partitionLengths = sorter.writePartitionedFile(blockId, tmp)      //生成index文件,也就是每個reduce通過該index文件得知它哪些是屬于它的數(shù)據(jù)      shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)      //構造MapStatus返回結果,里面含有ShuffleWriter輸出結果的位置信息      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)    } finally {      if (tmp.exists() && !tmp.delete()) {        logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")      }    }  }      

    其中ExternalSorter是SortShuffleWriter一個排序類,這個類用于對一些(K, V)類型的key-value對進行排序,如果需要就進行merge,生的結果是一些(K, C)類型的key-combiner對。combiner就是對同樣key的value進行合并的結果。它首先使用一個Partitioner來把key分到不同的partition,然后,如果有必要的話,就把每個partition內(nèi)部的key按照一個特定的Comparator來進行排序。它可以輸出只一個分區(qū)了的文件,其中不同的partition位于這個文件的不同區(qū)域(在字節(jié)層面上每個分區(qū)是連續(xù)的),這樣就適用于shuffle時對數(shù)據(jù)的抓取。

2.這里接著看上面代碼第14行的 sorter.insertAll(records)函數(shù),里面其實干了很多事情,代碼如下:

 def insertAll(records: Iterator[Product2[K, V]]): Unit = {       //這里獲取Map是否聚合標識    val shouldCombine = aggregator.isDefined    //根據(jù)是否進行Map端聚合,來決定使用map還是buffer,    // 如果需要通過key做map-side聚合,則使用PartitionedAppendOnlyMap;    // 如果不需要,則使用PartitionedPairBuffer    if (shouldCombine) {      // 使用AppendOnlyMap優(yōu)先在內(nèi)存中進行combine      // 獲取aggregator的mergeValue函數(shù),用于merge新的值到聚合記錄      val mergeValue = aggregator.get.mergeValue       // 獲取aggregator的createCombiner函數(shù),用于創(chuàng)建聚合的初始值      val createCombiner = aggregator.get.createCombiner      var kv: Product2[K, V] = null      val update = (hadValue: Boolean, oldValue: C) => {      //創(chuàng)建update函數(shù),如果有值進行mergeValue,如果沒有則createCombiner        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)      }      while (records.hasNext) {        addElementsRead()        kv = records.next()        ////通過key計算partition ID,通過partition ID對數(shù)據(jù)進行排序        //這里的partitionID其實就是Reduce個數(shù)        // 對key計算分區(qū),然后開始進行merge        map.changeValue((getPartition(kv._1), kv._1), update)         // 如果需要溢寫內(nèi)存數(shù)據(jù)到磁盤        maybeSpillCollection(usingMap = true)      }    } else {      // Stick values into our buffer      while (records.hasNext) {        addElementsRead()        val kv = records.next()        //通過key計算partition ID,通過partition ID對數(shù)據(jù)進行排序        //這里的partitionID其實就是Reduce個數(shù)        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])        // 當buffer達到內(nèi)存限制時(buffer默認大小32k,由spark.shuffle.file.buffer參數(shù)決定),會將buffer中的數(shù)據(jù)spill到文件中        maybeSpillCollection(usingMap = false)      }    }  }

3.下面繼續(xù)跟蹤maybeSpillCollection()函數(shù),如何對內(nèi)存數(shù)據(jù)溢寫的,代碼如下:

private def maybeSpillCollection(usingMap: Boolean): Unit = {    var estimatedSize = 0L   // 如果是map ,也就是Map端需要聚合的情況    if (usingMap) {      //這里預估一個值,根據(jù)預估值判斷是否需要溢寫,      // 如果需要,溢寫完成后重新初始化一個map      estimatedSize = map.estimateSize()      if (maybeSpill(map, estimatedSize)) {        map = new PartitionedAppendOnlyMap[K, C]      }     // 這里執(zhí)行的map不需要聚合的情況    } else {      //這里預估一個值,根據(jù)預估值判斷是否需要溢寫,      // 如果需要,溢寫完成后重新初始化一個buffer       estimatedSize = buffer.estimateSize()      if (maybeSpill(buffer, estimatedSize)) {        buffer = new PartitionedPairBuffer[K, C]      }    }    if (estimatedSize > _peakMemoryUsedBytes) {      _peakMemoryUsedBytes = estimatedSize    }  }

4.上面涉及到溢寫判斷函數(shù)maybeSpill,我們看下他是如何進行判斷的,代碼如下:

// maybeSpill函數(shù)判斷大體分了三步// 1.為當前線程嘗試獲取amountToRequest大小的內(nèi)存(amountToRequest = 2 * currentMemory - myMemoryThreshold)。// 2.如果獲得的內(nèi)存依然不足(myMemoryThreshold <= currentMemory),則調(diào)用spill,執(zhí)行溢出操作。內(nèi)存不足可能是申請到的內(nèi)存為0或者已經(jīng)申請得到的內(nèi)存大小超過了myMemoryThreshold。// 3.溢出后續(xù)處理,如elementsRead歸零,已溢出內(nèi)存字節(jié)數(shù)(memoryBytesSpilled)增加線程當前內(nèi)存大小(currentMemory),釋放當前線程占用的內(nèi)存。 protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {    var shouldSpill = false    //其中內(nèi)存閾值myMemoryThreshold  由參數(shù)spark.shuffle.spill.initialMemoryThreshold控制,默認是5M    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {      // Claim up to double our current memory from the shuffle memory pool      val amountToRequest = 2 * currentMemory - myMemoryThreshold      //底層調(diào)用TaskMemoryManager的acquireExecutionMemory方法分配內(nèi)存      val granted = acquireMemory(amountToRequest)      // 更新現(xiàn)在內(nèi)存閥值      myMemoryThreshold += granted     //再次判斷當前內(nèi)存是否大于閥值,如果還是大于閥值則spill      shouldSpill = currentMemory >= myMemoryThreshold    }    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold    // Actually spill    if (shouldSpill) {      _spillCount += 1      logSpillage(currentMemory)      //進行spill,這了溢寫肯定先寫到緩沖區(qū),后寫到磁盤,      //有個比較重要的參數(shù)spark.shuffle.file.buffer  默認32k, 優(yōu)化時常進行調(diào)整      spill(collection)      _elementsRead = 0      _memoryBytesSpilled += currentMemory      releaseMemory()    }    shouldSpill  }

    里面還有更深層次的代碼,這里就不再跟蹤了,只要是了解了整個大體思路即可,有興趣的自己去跟蹤看下即可。

   為方便大家理解,下面給大家畫了下SorteShuffleWriter執(zhí)行的流程圖,BypassMergeSortShuffleWriter和UnsafeShuffleWriter的處理流程與這個流程基本一致,只是具體的實現(xiàn)稍有差異,水平有限,僅供參考:

Spark2.x中如何用源碼剖析SortShuffleWriter具體實現(xiàn)

  

關于Spark2.x中如何用源碼剖析SortShuffleWriter具體實現(xiàn)就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI