溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Shuffle流程是怎樣的

發(fā)布時(shí)間:2021-12-21 17:38:14 來源:億速云 閱讀:270 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容介紹了“Shuffle流程是怎樣的”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

在MapReduce框架中,shuffle是連接Map和Reduce之間的橋梁,Map的輸出要用到Reduce中必須經(jīng)過shuffle這個(gè)環(huán)節(jié),shuffle的性能高低直接影響了整個(gè)程序的性能和吞吐量。Spark作為MapReduce框架的一種實(shí)現(xiàn),也實(shí)現(xiàn)了shuffle的邏輯。

Shuffle

Shuffle是MapReduce框架中的一個(gè)特定的phase,介于Map phase和Reduce phase之間,當(dāng)Map的輸出結(jié)果要被Reduce使用時(shí),輸出結(jié)果需要按key哈希,并且分發(fā)到每一個(gè)Reducer上去,這個(gè)過程就是shuffle。由于shuffle涉及到了磁盤的讀寫和網(wǎng)絡(luò)的傳輸,因此shuffle性能的高低直接影響到了整個(gè)程序的運(yùn)行效率。

下面這幅圖清晰地描述了MapReduce算法的整個(gè)流程,其中shuffle phase是介于Map phase和Reduce phase之間。

Shuffle流程是怎樣的

概念上shuffle就是一個(gè)溝通數(shù)據(jù)連接的橋梁,那么實(shí)際上shuffle(partition)這一部分實(shí)現(xiàn)的機(jī)制如下。

1、Spark Shuffle

以圖為例簡單描述一下Spark中shuffle的整一個(gè)流程:

Shuffle流程是怎樣的

l 首先每一個(gè)Mapper會根據(jù)Reducer的數(shù)量創(chuàng)建出相應(yīng)的bucket,bucket的數(shù)量是M×RM×R,其中MM是Map的個(gè)數(shù),RR是Reduce的個(gè)數(shù)。

l 其次Mapper產(chǎn)生的結(jié)果會根據(jù)設(shè)置的partition算法填充到每個(gè)bucket中去。這里的partition算法是可以自定義的,當(dāng)然默認(rèn)的算法是根據(jù)key哈希到不同的bucket中去。

l 當(dāng)Reducer啟動時(shí),它會根據(jù)自己task的id和所依賴的Mapper的id從遠(yuǎn)端或是本地的block manager中取得相應(yīng)的bucket作為Reducer的輸入進(jìn)行處理。

這里的bucket是一個(gè)抽象概念,在實(shí)現(xiàn)中每個(gè)bucket可以對應(yīng)一個(gè)文件,可以對應(yīng)文件的一部分或是其他等。

Apache Spark 的 Shuffle 過程與 Apache Hadoop 的 Shuffle 過程有著諸多類似,一些概念可直接套用,例如,Shuffle 過程中,提供數(shù)據(jù)的一端,被稱作 Map 端,Map 端每個(gè)生成數(shù)據(jù)的任務(wù)稱為 Mapper,對應(yīng)的,接收數(shù)據(jù)的一端,被稱作 Reduce 端,Reduce 端每個(gè)拉取數(shù)據(jù)的任務(wù)稱為 Reducer,Shuffle 過程本質(zhì)上都是將 Map 端獲得的數(shù)據(jù)使用分區(qū)器進(jìn)行劃分,并將數(shù)據(jù)發(fā)送給對應(yīng)的 Reducer 的過程。

2、Shuffle Write

在Spark 0.6和0.7的版本中,對于shuffle數(shù)據(jù)的存儲是以文件的方式存儲在block manager中,與rdd.persist(StorageLevel.DISk_ONLY)采取相同的策略,可以參看:

Shuffle流程是怎樣的

可以看到Spark在每一個(gè)Mapper中為每個(gè)Reducer創(chuàng)建一個(gè)bucket,并將RDD計(jì)算結(jié)果放進(jìn)bucket中。需要注意的是每個(gè)bucket是一個(gè)ArrayBuffer,也就是說Map的輸出結(jié)果是會先存儲在內(nèi)存。

接著Spark會將ArrayBuffer中的Map輸出結(jié)果寫入block manager所管理的磁盤中,這里文件的命名方式為:shuffle_ + shuffle_id + "_" + map partition id + "_" + shuffle partition id。

早期的shuffle write有兩個(gè)比較大的問題:

l Map的輸出必須先全部存儲到內(nèi)存中,然后寫入磁盤。這對內(nèi)存是一個(gè)非常大的開銷,當(dāng)內(nèi)存不足以存儲所有的Map output時(shí)就會出現(xiàn)OOM。

l 每一個(gè)Mapper都會產(chǎn)生Reducer number個(gè)shuffle文件,如果Mapper個(gè)數(shù)是1k,Reducer個(gè)數(shù)也是1k,那么就會產(chǎn)生1M個(gè)shuffle文件,這對于文件系統(tǒng)是一個(gè)非常大的負(fù)擔(dān)。同時(shí)在shuffle數(shù)據(jù)量不大而shuffle文件又非常多的情況下,隨機(jī)寫也會嚴(yán)重降低IO的性能。

在Spark 0.8版本中,shuffle write采用了與RDD block write不同的方式,同時(shí)也為shuffle write單獨(dú)創(chuàng)建了ShuffleBlockManager,部分解決了0.6和0.7版本中遇到的問題。

首先來看一下Spark 0.8的具體實(shí)現(xiàn):

Shuffle流程是怎樣的

在這個(gè)版本中為shuffle write添加了一個(gè)新的類ShuffleBlockManager,由ShuffleBlockManager來分配和管理bucket。同時(shí)ShuffleBlockManager為每一個(gè)bucket分配一個(gè)DiskObjectWriter,每個(gè)write handler擁有默認(rèn)100KB的緩存,使用這個(gè)write handler將Map output寫入文件中。可以看到現(xiàn)在的寫入方式變?yōu)閎uckets.writers(bucketId).write(pair),也就是說Map output的key-value pair是逐個(gè)寫入到磁盤而不是預(yù)先把所有數(shù)據(jù)存儲在內(nèi)存中在整體flush到磁盤中去。

ShuffleBlockManager的代碼如下所示:

Shuffle流程是怎樣的

Spark 0.8顯著減少了shuffle的內(nèi)存壓力,現(xiàn)在Map output不需要先全部存儲在內(nèi)存中,再flush到硬盤,而是record-by-record寫入到磁盤中。同時(shí)對于shuffle文件的管理也獨(dú)立出新的ShuffleBlockManager進(jìn)行管理,而不是與rdd cache文件在一起了。

但是這一版Spark 0.8的shuffle write仍然有兩個(gè)大的問題沒有解決:

l 首先依舊是shuffle文件過多的問題,shuffle文件過多一是會造成文件系統(tǒng)的壓力過大,二是會降低IO的吞吐量。

l 其次雖然Map output數(shù)據(jù)不再需要預(yù)先在內(nèi)存中evaluate顯著減少了內(nèi)存壓力,但是新引入的DiskObjectWriter所帶來的buffer開銷也是一個(gè)不容小視的內(nèi)存開銷。假定有1k個(gè)Mapper和1k個(gè)Reducer,那么就會有1M個(gè)bucket,于此同時(shí)就會有1M個(gè)write handler,而每一個(gè)write handler默認(rèn)需要100KB內(nèi)存,那么總共需要100GB的內(nèi)存。這樣的話僅僅是buffer就需要這么多的內(nèi)存,內(nèi)存的開銷是驚人的。當(dāng)然實(shí)際情況下這1k個(gè)Mapper是分時(shí)運(yùn)行的話,所需的內(nèi)存就只有cores * reducer numbers * 100KB大小了。但是reducer數(shù)量很多的話,這個(gè)buffer的內(nèi)存開銷也是蠻厲害的。

為了解決shuffle文件過多的情況,Spark 0.8.1引入了新的shuffle consolidation,以期顯著減少shuffle文件的數(shù)量。

首先以圖例來介紹一下shuffle consolidation的原理。

Shuffle流程是怎樣的

假定該job有4個(gè)Mapper和4個(gè)Reducer,有2個(gè)core,也就是能并行運(yùn)行兩個(gè)task??梢运愠鯯park的shuffle write共需要16個(gè)bucket,也就有了16個(gè)write handler。在之前的Spark版本中,每一個(gè)bucket對應(yīng)的是一個(gè)文件,因此在這里會產(chǎn)生16個(gè)shuffle文件。

而在shuffle consolidation中每一個(gè)bucket并非對應(yīng)一個(gè)文件,而是對應(yīng)文件中的一個(gè)segment,同時(shí)shuffle consolidation所產(chǎn)生的shuffle文件數(shù)量與Spark core的個(gè)數(shù)也有關(guān)系。在上面的圖例中,job的4個(gè)Mapper分為兩批運(yùn)行,在第一批2個(gè)Mapper運(yùn)行時(shí)會申請8個(gè)bucket,產(chǎn)生8個(gè)shuffle文件;而在第二批Mapper運(yùn)行時(shí),申請的8個(gè)bucket并不會再產(chǎn)生8個(gè)新的文件,而是追加寫到之前的8個(gè)文件后面,這樣一共就只有8個(gè)shuffle文件,而在文件內(nèi)部這有16個(gè)不同的segment。因此從理論上講shuffle consolidation所產(chǎn)生的shuffle文件數(shù)量為C×R,其中C是Spark集群的core number,R是Reducer的個(gè)數(shù)。

需要注意的是當(dāng) M=C時(shí)shuffle consolidation所產(chǎn)生的文件數(shù)和之前的實(shí)現(xiàn)是一樣的。

Shuffle consolidation顯著減少了shuffle文件的數(shù)量,解決了之前版本一個(gè)比較嚴(yán)重的問題,但是writer handler的buffer開銷過大依然沒有減少,若要減少writer handler的buffer開銷,只能減少Reducer的數(shù)量,但是這又會引入新的問題,下文將會有詳細(xì)介紹。

3、Shuffle Fetch and Aggregator

Shuffle write寫出去的數(shù)據(jù)要被Reducer使用,就需要shuffle fetcher將所需的數(shù)據(jù)fetch過來,這里的fetch包括本地和遠(yuǎn)端,因?yàn)閟huffle數(shù)據(jù)有可能一部分是存儲在本地的。Spark對shuffle fetcher實(shí)現(xiàn)了兩套不同的框架:NIO通過socket連接去fetch數(shù)據(jù);OIO通過netty server去fetch數(shù)據(jù)。分別對應(yīng)的類是BasicBlockFetcherIterator和NettyBlockFetcherIterator。

在Spark 0.7和更早的版本中,只支持BasicBlockFetcherIterator,而BasicBlockFetcherIterator在shuffle數(shù)據(jù)量比較大的情況下performance始終不是很好,無法充分利用網(wǎng)絡(luò)帶寬,為了解決這個(gè)問題,添加了新的shuffle fetcher來試圖取得更好的性能。都知道在hadoop MapReduce的shuffle過程中,shuffle fetch過來的數(shù)據(jù)會進(jìn)行merge sort,使得相同key下的不同value按序歸并到一起供Reducer使用,這個(gè)過程可以參看下圖:

Shuffle流程是怎樣的

所有的merge sort都是在磁盤上進(jìn)行的,有效地控制了內(nèi)存的使用,但是代價(jià)是更多的磁盤IO。

那么Spark是否也有merge sort呢?

首先雖然Spark屬于MapReduce體系,但是對傳統(tǒng)的MapReduce算法進(jìn)行了一定的改變。Spark假定在大多數(shù)用戶的case中,shuffle數(shù)據(jù)的sort不是必須的,比如word count,強(qiáng)制地進(jìn)行排序只會使性能變差,因此Spark并不在Reducer端做merge sort。既然沒有merge sort那Spark是如何進(jìn)行reduce的呢?

在Spark中存在aggregator,aggregator本質(zhì)上是一個(gè)hashmap,它是以map output的key為key,以任意所要combine的類型為value的hashmap。當(dāng)在做word count reduce計(jì)算count值的時(shí)候,它會將shuffle fetch到的每一個(gè)key-value pair更新或是插入到hashmap中(若在hashmap中沒有查找到,則插入其中;若查找到則更新value值)。這樣就不需要預(yù)先把所有的key-value進(jìn)行merge sort,而是來一個(gè)處理一個(gè),省下了外部排序這一步驟。但同時(shí)需要注意的是reducer的內(nèi)存必須足以存放這個(gè)partition的所有key和count值,因此對內(nèi)存有一定的要求。

在上面word count的例子中,因?yàn)関alue會不斷地更新,而不需要將其全部記錄在內(nèi)存中,因此內(nèi)存的使用還是比較少的。考慮一下如果是group by key這樣的操作,Reducer需要得到key對應(yīng)的所有value。在Hadoop MapReduce中,由于有了merge sort,因此給予Reducer的數(shù)據(jù)已經(jīng)是group by key了,而Spark沒有這一步,因此需要將key和對應(yīng)的value全部存放在hashmap中,并將value合并成一個(gè)array??梢韵胂鬄榱四軌虼娣潘袛?shù)據(jù),用戶必須確保每一個(gè)partition足夠小到內(nèi)存能夠容納,這對于內(nèi)存是一個(gè)非常嚴(yán)峻的考驗(yàn)。因此Spark文檔中建議用戶涉及到這類操作的時(shí)候盡量增加partition,也就是增加Mapper和Reducer的數(shù)量。

增加Mapper和Reducer的數(shù)量固然可以減小partition的大小,使得內(nèi)存可以容納這個(gè)partition。但是在shuffle write中提到,bucket和對應(yīng)于bucket的write handler是由Mapper和Reducer的數(shù)量決定的,task越多,bucket就會增加的更多,由此帶來write handler所需的buffer也會更多。在一方面為了減少內(nèi)存的使用采取了增加task數(shù)量的策略,另一方面task數(shù)量增多又會帶來buffer開銷更大的問題,因此陷入了內(nèi)存使用的兩難境地。

為了減少內(nèi)存的使用,只能將aggregator的操作從內(nèi)存移到磁盤上進(jìn)行,Spark社區(qū)也意識到了Spark在處理數(shù)據(jù)規(guī)模遠(yuǎn)遠(yuǎn)大于內(nèi)存大小時(shí)所帶來的問題。因此PR303提供了外部排序的實(shí)現(xiàn)方案。

“Shuffle流程是怎樣的”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI