溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

spark shuffle如何理解

發(fā)布時間:2021-12-16 21:21:21 來源:億速云 閱讀:170 作者:柒染 欄目:大數(shù)據(jù)

這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)spark shuffle如何理解,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

shuffle概覽      

一個spark的RDD有一組固定的分區(qū)組成,每個分區(qū)有一系列的記錄組成。對于由窄依賴變換(例如map和filter)返回的RDD,會延續(xù)父RDD的分區(qū)信息,以pipeline的形式計算。每個對象僅依賴于父RDD中的單個對象。諸如coalesce之類的操作可能導(dǎo)致任務(wù)處理多個輸入分區(qū),但轉(zhuǎn)換仍然被認(rèn)為是窄依賴的,因為一個父RDD的分區(qū)只會被一個子RDD分區(qū)繼承。

Spark還支持寬依賴的轉(zhuǎn)換,例如groupByKey和reduceByKey。在這些依賴項中,計算單個分區(qū)中的記錄所需的數(shù)據(jù)可以來自于父數(shù)據(jù)集的許多分區(qū)中。要執(zhí)行這些轉(zhuǎn)換,具有相同key的所有元組必須最終位于同一分區(qū)中,由同一任務(wù)處理。為了滿足這一要求,Spark產(chǎn)生一個shuffle,它在集群內(nèi)部傳輸數(shù)據(jù),并產(chǎn)生一個帶有一組新分區(qū)的新stage。

可以看下面的代碼片段:

sc.textFile("someFile.txt").map(mapFunc).flatMap(flatMapFunc).filter(filterFunc).count()

上面的代碼片段只有一個action操作,count,從輸入textfile到action經(jīng)過了三個轉(zhuǎn)換操作。這段代碼只會在一個stage中運行,因為,三個轉(zhuǎn)換操作沒有shuffle,也即是三個轉(zhuǎn)換操作的每個分區(qū)都是只依賴于它的父RDD的單個分區(qū)。

但是,下面的單詞統(tǒng)計就跟上面有很大區(qū)別:

val tokenized = sc.textFile(args(0)).flatMap(_.split(' '))
val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)
val filtered = wordCounts.filter(_._2 >= 1000)
val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)
charCounts.collect()

這段代碼里有兩個reducebykey操作,三個stage。

下面圖更復(fù)雜,因為有一個join操作:

spark shuffle如何理解

粉框圈住的就是整個DAG的stage劃分。

spark shuffle如何理解

在每個stage的邊界,父stage的task會將數(shù)據(jù)寫入磁盤,子stage的task會將數(shù)據(jù)通過網(wǎng)絡(luò)讀取。由于它們會導(dǎo)致很高的磁盤和網(wǎng)絡(luò)IO,所以shuffle代價相當(dāng)高,應(yīng)該盡量避免。父stage的數(shù)據(jù)分區(qū)往往和子stage的分區(qū)數(shù)不同。觸發(fā)shuffle的操作算子往往可以指定分區(qū)數(shù)的,也即是numPartitions代表下個stage會有多少個分區(qū)。就像mr任務(wù)中reducer的數(shù)據(jù)是非常重要的一個參數(shù)一樣,shuffle的時候指定分區(qū)數(shù)也將在很大程度上決定一個應(yīng)用程序的性能。

   
優(yōu)化shuffle      

通常情況可以選擇使用產(chǎn)生相同結(jié)果的action和transform相互替換。但是并不是產(chǎn)生相同結(jié)果的算子就會有相同的性能。通常避免常見的陷阱并選擇正確的算子可以顯著提高應(yīng)用程序的性能。

當(dāng)選擇轉(zhuǎn)換操作的時候,應(yīng)最小化shuffle次數(shù)和shuffle的數(shù)據(jù)量。shuffle是非常消耗性能的操作。所有的shuffle數(shù)據(jù)都會被寫入磁盤,然后通過網(wǎng)絡(luò)傳輸。repartition , join, cogroup, 和  *By 或者 *ByKey 類型的操作都會產(chǎn)生shuffle。我們可以對一下幾個操作算子進行優(yōu)化:

1. groupByKey某些情況下可以被reducebykey代替。

2. reduceByKey某些情況下可以被 aggregatebykey代替。

3. flatMap-join-groupBy某些情況下可以被cgroup代替。

no shuffle      

在某些情況下,前面描述的轉(zhuǎn)換操作不會導(dǎo)致shuffle。當(dāng)先前的轉(zhuǎn)換操作已經(jīng)使用了和shuffle相同的分區(qū)器分區(qū)數(shù)據(jù)的時候,spark就不會產(chǎn)生shuffle。

舉個例子:

rdd1 = someRdd.reduceByKey(...)

rdd2 = someOtherRdd.reduceByKey(...)

rdd3 = rdd1.join(rdd2)

由于使用redcuebykey的時候沒有指定分區(qū)器,所以都是使用的默認(rèn)分區(qū)器,會導(dǎo)致rdd1和rdd2都采用的是hash分區(qū)器。兩個reducebykey操作會產(chǎn)生兩個shuffle過程。如果,數(shù)據(jù)集有相同的分區(qū)數(shù),執(zhí)行join操作的時候就不需要進行額外的shuffle。由于數(shù)據(jù)集的分區(qū)相同,因此rdd1的任何單個分區(qū)中的key集合只能出現(xiàn)在rdd2的單個分區(qū)中。 因此,rdd3的任何單個輸出分區(qū)的內(nèi)容僅取決于rdd1中單個分區(qū)的內(nèi)容和rdd2中的單個分區(qū),并且不需要第三個shuffle。

例如,如果someRdd有四個分區(qū),someOtherRdd有兩個分區(qū),而reduceByKeys都使用三個分區(qū),運行的任務(wù)集如下所示:

spark shuffle如何理解

如果rdd1和rdd2使用不同的分區(qū)器或者相同的分區(qū)器不同的分區(qū)數(shù),僅僅一個數(shù)據(jù)集在join的過程中需要重新shuffle

spark shuffle如何理解

在join的過程中為了避免shuffle,可以使用廣播變量。當(dāng)executor內(nèi)存可以存儲數(shù)據(jù)集,在driver端可以將其加載到一個hash表中,然后廣播到executor。然后,map轉(zhuǎn)換可以引用哈希表來執(zhí)行查找。

   
增加shuffle      

有時候需要打破最小化shuffle次數(shù)的規(guī)則。

當(dāng)增加并行度的時候,額外的shuffle是有利的。例如,數(shù)據(jù)中有一些文件是不可分割的,那么該大文件對應(yīng)的分區(qū)就會有大量的記錄,而不是說將數(shù)據(jù)分散到盡可能多的分區(qū)內(nèi)部來使用所有已經(jīng)申請cpu。在這種情況下,使用reparition重新產(chǎn)生更多的分區(qū)數(shù),以滿足后面轉(zhuǎn)換算子所需的并行度,這會提升很大性能。

使用reduce和aggregate操作將數(shù)據(jù)聚合到driver端,也是修改區(qū)數(shù)的很好的例子。

在對大量分區(qū)執(zhí)行聚合的時候,在driver的單線程中聚合會成為瓶頸。要減driver的負(fù)載,可以首先使用reducebykey或者aggregatebykey執(zhí)行一輪分布式聚合,同時將結(jié)果數(shù)據(jù)集分區(qū)數(shù)減少。實際思路是首先在每個分區(qū)內(nèi)部進行初步聚合,同時減少分區(qū)數(shù),然后再將聚合的結(jié)果發(fā)到driver端實現(xiàn)最終聚合。典型的操作是treeReduce 和 treeAggregate。

當(dāng)聚合已經(jīng)按照key進行分組時,此方法特別適用。例如,假如一個程序計算語料庫中每個單詞出現(xiàn)的次數(shù),并將結(jié)果使用map返回到driver。一種方法是可以使用聚合操作完成在每個分區(qū)計算局部map,然后在driver中合并map??梢杂胊ggregateByKey以完全分布的方式進行統(tǒng)計,然后簡單的用collectAsMap將結(jié)果返回到driver。

上述就是小編為大家分享的spark shuffle如何理解了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI