您好,登錄后才能下訂單哦!
這篇文章給大家介紹Shuffle原理及對應的Consolidation優(yōu)化機制是怎樣的,內(nèi)容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
一、什么是Shuffle?
Shuffle是連接MapTask和ReduceTask過程的橋梁,MapTask的輸出必須要經(jīng)過Shuffle過程才能變成ReduceTask的輸入,在分布式集群中,ReduceTask需要跨節(jié)點去拉取MapTask的輸出結(jié)果,這中間涉及到數(shù)據(jù)的網(wǎng)絡傳輸和磁盤IO,所以Shuffle的好壞將直接影響整個應用程序的性能,通常我們將Shuffle過程分成兩部分:MapTask端的結(jié)果輸出成為ShuffleWrite,ReduceTask端的數(shù)據(jù)拉取稱為ShuffleRead。
Spark 的 Shuffle 過程與MapReduce的 Shuffle 過程原理基本相似,一些概念可直接套用,例如,Shuffle 過程中,提供數(shù)據(jù)的一端,被稱作 Map 端,Map 端每個生成數(shù)據(jù)的任務稱為 Mapper,對應的,接收數(shù)據(jù)的一端,被稱作 Reduce 端,Reduce 端每個拉取數(shù)據(jù)的任務稱為 Reducer,Shuffle 過程本質(zhì)上都是將 Map 端獲得的數(shù)據(jù)使用分區(qū)器進行劃分,并將數(shù)據(jù)發(fā)送給對應的 Reducer 的過程。
前面的文章我們已經(jīng)講過了Spark任務中Stage的劃分依據(jù)是RDD的寬窄依賴;在窄依賴中父RDD和子RDD partition之間的關(guān)系是一對一的?;蛘吒窻DD一個partition只對應一個子RDD的partition情況下的父RDD和子RDD partition關(guān)系是多對一的。不會有shuffle的產(chǎn)生。父RDD的一個分區(qū)去到子RDD的一個分區(qū)。而在寬依賴中,父RDD與子RDD partition之間的關(guān)系是一對多。會有shuffle的產(chǎn)生。父RDD的一個分區(qū)的數(shù)據(jù)去到子RDD的不同分區(qū)里面。
在現(xiàn)實場景中,百分之九十的調(diào)優(yōu)情況都是發(fā)生在shuffle階段,所以此類調(diào)優(yōu)非常重要。
二、普通的Spark HashShuffle原理
這里先看一張圖,普通的Shuffle原理是怎么樣的運行的,下面我會結(jié)合這個圖給大家講解基本的Shuffle原理:
普通Shuffle執(zhí)行過程:
1.上面有三個ShuffleMapTask和兩個ResultTask,ShuffleMapTask會根據(jù)ResultTask的數(shù)量創(chuàng)建出相應的bucket,bucket的數(shù)量是3×3。
2. 其次ShuffleMapTask產(chǎn)生的結(jié)果會根據(jù)設置的partition算法填充到每個bucket中去。這里的partition算法是可以自定義的,當然默認的算法是根據(jù)key哈希到不同的bucket中去,最后落地的話就是ShuffleBlockFIle。
3.ShuffleMapTask的輸出的ShuffleBlockFIle位置信息作作為MapStatus發(fā)送到DAGScheduler的MapOutputTracker的Master中。
3.當ShuffleMapTask啟動時,它會根據(jù)自己task的id和所依賴的ShuffleMapTask的id,然后去MapOutputTracker中讀取ShuffleBlockFIle位置信息,最后從遠端或是本地的blockmanager中取得相應的ShuffleBlockFIle作為ResultTask的輸入進行處理。
如果ShuffleMapTask和ResultTask數(shù)量過多就會產(chǎn)生N*N的小文件,導致ShuffleWrite要耗費大量性能在磁盤文件的創(chuàng)建以及磁盤的IO上,給系統(tǒng)造成很大壓力,上面我畫的圖有點不太好,這里我文字表述下:
一種情形A: 如果有4個ShuffleMapTask和4個ResultTask,我的機器只有2個cpu核數(shù),而每個task默認是需要一個cpu來運行的,這樣我的4個ShuffleMapTask就分了兩個批次運行,同時只有兩個Task運行,第一批Task會生成2*4個ShuffleBlockFIle文件,第二批Task運行仍然會生成2*4的ShuffleBlockFIle文件,這樣會產(chǎn)生16個小文件。
另一種情形B:我還是有4個ShuffleMapTask和4個ResultTask,我的機器只有4個cpu或者更多的cpu核數(shù),我的4個ShuffleMapTask就會在同一個批次運行,還是會產(chǎn)生4*4=16個小文件。
存在的問題:
1.Shuffle前在磁盤上會產(chǎn)生海量的小文件,分布式模式ResultTask去拉取數(shù)據(jù)時,會產(chǎn)生大量會有過多的小文件創(chuàng)建和磁盤IO操作。
2.可能導致OOM,大量耗時低效的 IO 操作 ,導致寫磁盤時的對象過多,讀磁盤時候的對象也過多,這些對象存儲在堆內(nèi)存中,會導致堆內(nèi)存不足,相應會導致頻繁的GC,GC會導致OOM。由于內(nèi)存中需要保存海量文件操作句柄和臨時信息,如果數(shù)據(jù)處理的規(guī)模比較龐大的話,內(nèi)存不可承受,會出現(xiàn) OOM 等問題。
二、開啟Consolidation機制的Spark HashShuffle原理
鑒于上面基本Shuffle存在的不足, 在后面的Spark0.81版本開始就引入了Consolidation機制,由參數(shù)spark.shuffle.consolidateFiles控制。將其設置為true即可開啟優(yōu)化機制,下面我們就看下優(yōu)化后的Shuffle是如何處理的:
優(yōu)化的Shuffle原理:
相當于對于上面的“情形B”做了優(yōu)化,把在同一core上運行的多個ShuffleMapTask輸出的合并到同一個文件,這樣文件數(shù)目就變成了 cores*ResultTask個ShuffleBlockFile文件了,這里一定要注意同一個批次運行的ShuffleMapTask一定是寫的不同的文件,只有不同批次的ShuffleMapTask才會寫相同的文件,當?shù)谝慌鶶huffleMapTask運行完成后,后面在同一個cpu core上運行的TShuffleMapTask才會去寫上一個在這個cpu core上運行ShuffleMapTask寫的那個ShuffleBlockFile文件。
至此Spark HashShuffle原理及其Consolidation機制講解完畢,但是如果 Reducer 端的并行任務或者是數(shù)據(jù)分片過多的話則 Core * Reducer Task 依舊過大,也會產(chǎn)生很多小文件。
上面的原理都是基于的HashShuffleManager。而Spark1.2.x以后,HashShuffleManager不再是Spark默認的Shuffle Manager,Spark1.2.x以后,Spark默認的Shuffle Manager是SortShuffleManager。在Spark2.0以后 HashShuffleManager已經(jīng)被棄用。
關(guān)于Shuffle原理及對應的Consolidation優(yōu)化機制是怎樣的就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。