溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何剖析具體實現(xiàn)

發(fā)布時間:2021-11-15 15:13:29 來源:億速云 閱讀:90 作者:柒染 欄目:大數(shù)據(jù)

這篇文章將為大家詳細(xì)講解有關(guān)如何剖析具體實現(xiàn),文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

一、概述

這里我們從源碼角度剖析BypassMergeSortShuffleWriter實現(xiàn)策略的原理和具體的實現(xiàn)細(xì)節(jié)。

BypassMergeSortShuffleWriter具體的實現(xiàn)都在對應(yīng)類的write()函數(shù)中,我們直接看源碼進行剖析

1.先看構(gòu)造函數(shù)初始化

 BypassMergeSortShuffleWriter(      BlockManager blockManager,      IndexShuffleBlockResolver shuffleBlockResolver,      BypassMergeSortShuffleHandle<K, V> handle,      int mapId,      TaskContext taskContext,      SparkConf conf) {    // 獲取spark.shuffle.file.buffer參數(shù)值,默認(rèn)32k,這里是一個比較重要的條有參數(shù),    // 該參數(shù)用于設(shè)置shuffle write task的BufferedOutputStream的buffer緩沖大小。    // 將數(shù)據(jù)寫到磁盤文件之前,會先寫入buffer緩沖中,待緩沖寫滿之后,才會溢寫到磁盤    //如果作業(yè)可用的內(nèi)存資源較為充足的話,可以適當(dāng)增加這個參數(shù)的大小(比如64k),從而減少shuffle write過程中溢寫磁盤文件的次數(shù),    // 也就可以減少磁盤IO次數(shù),進而提升性能    this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;   // 是否采用NIO的從文件到文件流的復(fù)制方式,默認(rèn)值是true 一般不用修改    this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);    this.blockManager = blockManager;    // 獲取shufflehandle中的ShuffleDependency對象,通過該對象得到分區(qū)器和分區(qū)個數(shù)等數(shù)據(jù)。    final ShuffleDependency<K, V, V> dep = handle.dependency();    this.mapId = mapId;    this.shuffleId = dep.shuffleId();    this.partitioner = dep.partitioner();    this.numPartitions = partitioner.numPartitions();    this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();    //設(shè)置序列化工具對象,和shuffleBlockResolver對象,    // 該對象用來創(chuàng)建和維護shuffle的數(shù)據(jù)的邏輯塊和物理文件位置之間的映射的對象    this.serializer = dep.serializer();    this.shuffleBlockResolver = shuffleBlockResolver;  }

2.再看write()函數(shù),源碼如下:

   //這里大體意思是 為每個分區(qū)在磁盤創(chuàng)建臨時文件  并給每一個writer

上面代碼的大體思路如下:

a.確定分區(qū)數(shù),然后為每個分區(qū)創(chuàng)建DiskBlockObjectWriter和臨時文件

b.循環(huán)將record通過Partitioner進行分區(qū),并寫入對應(yīng)分區(qū)臨時文件

c. 將分區(qū)數(shù)據(jù)刷到磁盤

d.根據(jù)shuffleId和mapId,構(gòu)建ShuffleDataBlockId,創(chuàng)建合并文件data和合并文件的臨時文件,文件格式為:

shuffle_{shuffleId}_{mapId}_{reduceId}.data

e.將分區(qū)文件合并到一個總的臨時文件,合并后會重命名為最終輸出文件名,并返回一個對應(yīng)分區(qū)文件長度的數(shù)組

f.創(chuàng)建索引文件index和索引臨時文件,每一個分區(qū)的長度和offset寫入索引文件等;并且重命名臨時data文件和臨時index文件

g.將一些信息封裝到MapStatus返回

存在問題:

    這種Writer會為每個分區(qū)創(chuàng)建一個臨時文件,如果分區(qū)過多時,會創(chuàng)建很多的output輸出流和臨時文件對象,占用資源過多,性能會下降。

重點關(guān)注:

參數(shù):spark.shuffle.file.buffer   默認(rèn)值32k

    默認(rèn)情況下,shuffle的map task,輸出到磁盤文件的時候,統(tǒng)一都會先寫入到每個task自己關(guān)聯(lián)的一個內(nèi)存緩沖區(qū),每一次當(dāng)內(nèi)存緩沖區(qū)滿溢后,然后才會進行溢寫到磁盤中。如果內(nèi)存沖突可適當(dāng)調(diào)大這個參數(shù),從而減少shuffle write過程中溢寫磁盤文件的次數(shù),也就可以減少磁盤IO次數(shù),進而提升性能。在實踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù),性能會有1%~5%的提升。

關(guān)于如何剖析具體實現(xiàn)就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI