溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Hadoop和Spark的Shuffle過程有什么不同

發(fā)布時間:2021-12-09 16:00:09 來源:億速云 閱讀:167 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“Hadoop和Spark的Shuffle過程有什么不同”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Hadoop和Spark的Shuffle過程有什么不同”吧!

一、前言

對于基于MapReduce編程范式的分布式計算來說,本質(zhì)上而言,就是在計算數(shù)據(jù)的交、并、差、聚合、排序等過程。而分布式計算分而治之的思想,讓每個節(jié)點只計算部分數(shù)據(jù),也就是只處理一個分片,那么要想求得某個key對應的全量數(shù)據(jù),那就必須把相同key的數(shù)據(jù)匯集到同一個Reduce任務節(jié)點來處理,那么Mapreduce范式定義了一個叫做Shuffle的過程來實現(xiàn)這個效果。

二、編寫本文的目

本文旨在剖析Hadoop和Spark的Shuffle過程,并對比兩者Shuffle的差異。

三、Hadoop的Shuffle過程

Shuffle描述的是數(shù)據(jù)從Map端到Reduce端的過程,大致分為排序(sort)、溢寫(spill)、合并(merge)、拉取拷貝(Copy)、合并排序(merge  sort)這幾個過程,大體流程如下:

![image](https://yqfile.alicdn.com/e4ccedfb6ccaaa0d3c0ad5b3b7ab83d96dd9fed2.png)

上圖的Map的輸出的文件被分片為紅綠藍三個分片,這個分片的就是根據(jù)Key為條件來分片的,分片算法可以自己實現(xiàn),例如Hash、Range等,最終Reduce任務只拉取對應顏色的數(shù)據(jù)來進行處理,就實現(xiàn)把相同的Key拉取到相同的Reduce節(jié)點處理的功能。下面分開來說Shuffle的的各個過程。

Map端做了下圖所示的操作:

1、Map端sort

Map端的輸出數(shù)據(jù),先寫環(huán)形緩存區(qū)kvbuffer,當環(huán)形緩沖區(qū)到達一個閥值(可以通過配置文件設置,默認80),便要開始溢寫,但溢寫之前會有一個sort操作,這個sort操作先把Kvbuffer中的數(shù)據(jù)按照partition值和key兩個關鍵字來排序,移動的只是索引數(shù)據(jù),排序結果是Kvmeta中數(shù)據(jù)按照partition為單位聚集在一起,同一partition內(nèi)的按照key有序。

2、spill(溢寫)  當排序完成,便開始把數(shù)據(jù)刷到磁盤,刷磁盤的過程以分區(qū)為單位,一個分區(qū)寫完,寫下一個分區(qū),分區(qū)內(nèi)數(shù)據(jù)有序,最終實際上會多次溢寫,然后生成多個文件  3、merge(合并)  spill會生成多個小文件,對于Reduce端拉取數(shù)據(jù)是相當?shù)托У模敲催@時候就有了merge的過程,合并的過程也是同分片的合并成一個片段(segment),最終所有的segment組裝成一個最終文件,那么合并過程就完成了,如下圖所示

Hadoop和Spark的Shuffle過程有什么不同

至此,Map的操作就已經(jīng)完成,Reduce端操作即將登場

Reduce操作

總體過程如下圖的紅框處:

![image](https://yqfile.alicdn.com/71a52ed4799d3dbbde4552028f3aea05bc1c98c0.png)  1、拉取拷貝(fetch copy)

Reduce任務通過向各個Map任務拉取對應分片。這個過程都是以Http協(xié)議完成,每個Map節(jié)點都會啟動一個常駐的HTTP  server服務,Reduce節(jié)點會請求這個Http Server拉取數(shù)據(jù),這個過程完全通過網(wǎng)絡傳輸,所以是一個非常重量級的操作。

2、合并排序

Reduce端,拉取到各個Map節(jié)點對應分片的數(shù)據(jù)之后,會進行再次排序,排序完成,結果丟給Reduce函數(shù)進行計算。

四、總結

至此整個shuffle過程完成,***總結幾點:

  1. shuffle過程就是為了對key進行全局聚合

  2. 排序操作伴隨著整個shuffle過程,所以Hadoop的shuffle是sort-based的

Spark shuffle相對來說更簡單,因為不要求全局有序,所以沒有那么多排序合并的操作。Spark  shuffle分為write和read兩個過程。我們先來看shuffle write。

  • 一、shuffle write

shuffle  write的處理邏輯會放到該ShuffleMapStage的***(因為spark以shuffle發(fā)生與否來劃分stage,也就是寬依賴),final  RDD的每一條記錄都會寫到對應的分區(qū)緩存區(qū)bucket,如下圖所示:

Hadoop和Spark的Shuffle過程有什么不同

說明:

  1. 上圖有2個CPU,可以同時運行兩個ShuffleMapTask

  2. 每個task將寫一個buket緩沖區(qū),緩沖區(qū)的數(shù)量和reduce任務的數(shù)量相等

  3. 每個buket緩沖區(qū)會生成一個對應ShuffleBlockFile

  4. ShuffleMapTask  如何決定數(shù)據(jù)被寫到哪個緩沖區(qū)呢?這個就是跟partition算法有關系,這個分區(qū)算法可以是hash的,也可以是range的

  5. 最終產(chǎn)生的ShuffleBlockFile會有多少呢?就是ShuffleMapTask 數(shù)量乘以reduce的數(shù)量,這個是非常巨大的

那么有沒有辦法解決生成文件過多的問題呢?有,開啟FileConsolidation即可,開啟FileConsolidation之后的shuffle過程如下:

Hadoop和Spark的Shuffle過程有什么不同

在同一核CPU執(zhí)行先后執(zhí)行的ShuffleMapTask可以共用一個bucket緩沖區(qū),然后寫到同一份ShuffleFile里去,上圖所示的ShuffleFile實際上是用多個ShuffleBlock構成,那么,那么每個worker最終生成的文件數(shù)量,變成了cpu核數(shù)乘以reduce任務的數(shù)量,大大縮減了文件量。

  • 二、Shuffle read

Shuffle write過程將數(shù)據(jù)分片寫到對應的分片文件,這時候萬事具備,只差去拉取對應的數(shù)據(jù)過來計算了。

那么Shuffle Read發(fā)送的時機是什么?是要等所有ShuffleMapTask執(zhí)行完,再去fetch數(shù)據(jù)嗎?理論上,只要有一個  ShuffleMapTask執(zhí)行完,就可以開始fetch數(shù)據(jù)了,實際上,spark必須等到父stage執(zhí)行完,才能執(zhí)行子stage,所以,必須等到所有  ShuffleMapTask執(zhí)行完畢,才去fetch數(shù)據(jù)。fetch過來的數(shù)據(jù),先存入一個Buffer緩沖區(qū),所以這里一次性fetch的FileSegment不能太大,當然如果fetch過來的數(shù)據(jù)大于每一個閥值,也是會spill到磁盤的。

fetch的過程過來一個buffer的數(shù)據(jù),就可以開始聚合了,這里就遇到一個問題,每次fetch部分數(shù)據(jù),怎么能實現(xiàn)全局聚合呢?以word  count的reduceByKey(《Spark RDD操作之ReduceByKey  》)為例,假設單詞hello有十個,但是一次fetch只拉取了2個,那么怎么全局聚合呢?Spark的做法是用HashMap,聚合操作實際上是map.put(key,map.get(key)+1),將map中的聚合過的數(shù)據(jù)get出來相加,然后put回去,等到所有數(shù)據(jù)fetch完,也就完成了全局聚合。

感謝各位的閱讀,以上就是“Hadoop和Spark的Shuffle過程有什么不同”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對Hadoop和Spark的Shuffle過程有什么不同這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI