溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Hadoop和spark為何要對key進行排序

發(fā)布時間:2021-09-18 14:50:53 來源:億速云 閱讀:103 作者:chen 欄目:大數(shù)據(jù)

本篇內(nèi)容介紹了“Hadoop和spark為何要對key進行排序”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

1.思考

只要對hadoopmapreduce的原理清楚的都熟知下面的整個流程運行原理,其中涉及到至少三次排序,分別是溢寫快速排序,溢寫歸并排序,reduce拉取歸并排序,而且排序是默認的,即天然排序的,那么為什么要這么做的,設(shè)計原因是什么。先給個結(jié)論,為了整體更穩(wěn)定,輸出滿足多數(shù)需求,前者體現(xiàn)在不是采用hashShuffle而是sortShuffle ,后者體現(xiàn)在預(yù)計算,要知道排序后的數(shù)據(jù),在后續(xù)數(shù)據(jù)使用時的會方便很多,比如體現(xiàn)索引的地方,如reduce拉取數(shù)據(jù)時候。

Hadoop和spark為何要對key進行排序

2.MapReduce原理分析

在分析設(shè)計原因之前,先理解一下整個過程,在map階段,根據(jù)預(yù)先定義的partition規(guī)則進行分區(qū),map首先將輸出寫到緩存中,當緩存內(nèi)容達到閾值時,將結(jié)果spill到硬盤,每一次spill都會在硬盤產(chǎn)生一個spill文件,因此一個map task可能會產(chǎn)生多個spill文件,其中在每次spill的時候會對key進行排序。接下來進入shuffle階段,當map寫出最后一個輸出,需要在map端進行一次merge操作,按照partitionpartition內(nèi)的key進行歸并排序(合并+排序),此時每個partition內(nèi)按照key值整體有序。然后開始第二次merge,這次是在reduce端,在此期間數(shù)據(jù)在內(nèi)存和磁盤上都有,其實這個階段的merge并不是嚴格意義上的排序,也是跟前面類似的合并+排序,只是將多個整體有序的文件merge成一個大的文件,最終完成排序工作。分析完整個過程后,是不是覺得如果自己實現(xiàn)MapReduce框架的話,考慮用HashMap 輸出map內(nèi)容即可。

2.1 MapTask運行機制詳解

整個流程圖如下:

Hadoop和spark為何要對key進行排序

詳細步驟:

  1. 首先,讀取數(shù)據(jù)組件InputFormat(默認TextInputFormat)會通過getSplits方法對輸?入?目錄中文件進行邏輯切?片規(guī)劃得到splits,有多少個split就對應(yīng)啟動多少個MapTask。splitblock的對應(yīng)關(guān)系默認是?對?。

  2. 將輸入文件切分為splits之后,由RecordReader對象(默認LineRecordReader)進行讀取,以\n作為分隔符,讀取?行數(shù)據(jù),返回<key,value>Key表示每?行行?首字符偏移值,value表示這?行文本內(nèi)容。

  3. 讀取split返回<key,value>,進?入?用戶自己繼承的Mapper類中,執(zhí)行用戶重寫的map函數(shù)。RecordReader讀取?行這里調(diào)用一次。

  4. map邏輯完之后,將map的每條結(jié)果通過context.write進?行行collect數(shù)據(jù)收集。在collect中,會先對其進行分區(qū)處理,默認使用HashPartitioner。MapReduce提供Partitioner接口,它的作用就是根據(jù)keyvaluereduce的數(shù)量來決定當前的這對輸出數(shù)據(jù)最終應(yīng)該交由哪個reduce task處理。默認對key hash后再以reduce task數(shù)量量取模。默認的取模方式只是為了平均reduce的處理能力,如果用戶自己對Partitioner有需求,可以訂制并設(shè)置到job上。

  5. 接下來,會將數(shù)據(jù)寫入內(nèi)存,內(nèi)存中這?片區(qū)域叫做環(huán)形緩沖區(qū),緩沖區(qū)的作用是批量量收集map結(jié)果,減少磁盤IO的影響。我們的key/value對以及Partition的結(jié)果都會被寫?入緩沖區(qū)。當然寫?入之前,keyvalue值都會被序列列化成字節(jié)數(shù)組

    • 環(huán)形緩沖區(qū)其實是一個數(shù)組,數(shù)組中存放著key、value的序列化數(shù)據(jù)和keyvalue的元數(shù)據(jù)信息,包括partition、key的起始位置、value的起始位置以及value的長度。環(huán)形結(jié)構(gòu)是一個抽象概念。

    • 緩沖區(qū)是有大小限制,默認是100MB。當map task的輸出結(jié)果很多時,就可能會撐爆內(nèi)存,所以需要在一定條件下將緩沖區(qū)中的數(shù)據(jù)臨時寫?入磁盤,然后重新利利?用這塊緩沖區(qū)。這個從內(nèi)存往磁盤寫數(shù)據(jù)的過程被稱為Spill,中文可譯為溢寫。這個溢寫是由單獨線程來完成,不影響往緩沖區(qū)寫map結(jié)果的線程。溢寫線程啟動時不不應(yīng)該阻?map的結(jié)果輸出,所以整個緩沖區(qū)有個溢寫的?比例例spill.percent。這個?比例例默認是0.8,也就是當緩沖區(qū)的數(shù)據(jù)已經(jīng)達到閾值(buffer size * spillpercent = 100MB * 0.8 = 80MB),溢寫線程啟動,鎖定這80MB的內(nèi)存,執(zhí)行溢寫過程Maptask的輸出結(jié)果還可以往剩下的20MB內(nèi)存中寫,互不不影響、

  6. 當溢寫線程啟動后,需要對這80MB空間內(nèi)的key做排序(Sort)。排序是MapReduce模型默認的?行行為!

    • 如果job設(shè)置過Combiner,那么現(xiàn)在就是使?用Combiner的時候了了。將有相同keykey/value對的value加起來,減少溢寫到磁盤的數(shù)據(jù)量量。Combiner會優(yōu)化MapReduce的中間結(jié)果,所以它在整個模型中會多次使用。

    • 那哪些場景才能使?用Combiner呢?從這?里里分析,Combiner的輸出是Reducer的輸?,Combiner絕不不能改變最終的計算結(jié)果。Combiner只應(yīng)該?用于那種Reduce的輸入key/value與輸出key/value類型完全一致,且不不影響最終結(jié)果的場景。?比如累加,最?大值等。Combiner的使?用一定得慎重如果用的好,它對job執(zhí)?行行效率有幫助,反之會影響reduce的最終結(jié)果

  7. 合并溢寫文件:每次溢寫會在磁盤上生成一個臨時文件(寫之前判斷是否有combiner),如果map的輸出結(jié)果真的很大,有多次這樣的溢寫發(fā)生,磁盤上相應(yīng)的就會有多個臨時文件存在。當整個數(shù)據(jù)處理理結(jié)束之后開始對磁盤中的臨時文件進?行行merge合并,因為最終文件只有一個,寫?磁盤,并且為這個文件提供了一個索文件,以記錄每個reduce對應(yīng)數(shù)據(jù)的偏移量量。

2.2 ReduceTask運行機制詳解

Hadoop和spark為何要對key進行排序

Reduce?大致分為copy、sort、reduce三個階段,重點在前兩個階段。copy階段包含?一個 eventFetcher來獲取已完成的map列列表,由Fetcher線程去copy數(shù)據(jù),在此過程中會啟動兩個merge線程,分別為inMemoryMergeronDiskMerger,分別將內(nèi)存中的數(shù)據(jù)merge到磁盤和將磁盤中的數(shù)據(jù)進?merge。待數(shù)據(jù)copy完成之后,copy階段就完成了,開始進?行行sort階段,sort階段主要是執(zhí)?finalMerge操作,純粹的sort階段,完成之后就是reduce階段,調(diào)?用?用戶定義的reduce函數(shù)進?處理。 詳細步驟

2.2.1 Copy階段

簡單地拉取數(shù)據(jù)。Reduce進程啟動一些數(shù)據(jù)copy線程(Fetcher),通過HTTP方式請求maptask獲取屬于自己的文件。

2.2.2 Merge階段

Merge階段。這?里里的mergemap端的merge動作,只是數(shù)組中存放的是不不同mapcopy來的數(shù)值。Copy過來的數(shù)據(jù)會先放入內(nèi)存緩沖區(qū)中,這?里里的緩沖區(qū)大小要?比map端的更更為靈活。merge有三種形式:內(nèi)存到內(nèi)存;內(nèi)存到磁盤;磁盤到磁盤。默認情況下第?一種形式不不啟?用。當內(nèi)存中的數(shù)據(jù)量量到達一定閾值,就啟動內(nèi)存到磁盤的merge。與map 端類似,這也是溢寫的過程,這個過程中如果你設(shè)置有Combiner,也是會啟?用的,然后在磁盤中生成了了眾多的溢寫文件。第二種merge方式?一直在運?行行,直到?jīng)]有map端的數(shù)據(jù)時才結(jié)束,然后啟動第三種磁盤到磁盤的merge方式生成最終的文件。

2.2.3 合并排序

把分散的數(shù)據(jù)合并成一個?大的數(shù)據(jù)后,還會再對合并后的數(shù)據(jù)排序。對排序后的鍵值對調(diào)?用reduce方法,鍵相等的鍵值對調(diào)?用一次reduce方法,每次調(diào)?用會產(chǎn)生零個或者多個鍵值對,最后把這些輸出的鍵值對寫入到HDFS文件中。

“Hadoop和spark為何要對key進行排序”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI