溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

MapReduce階段源碼分析以及shuffle過程詳解

發(fā)布時間:2020-06-13 17:23:39 來源:網(wǎng)絡(luò) 閱讀:2301 作者:原生zzy 欄目:大數(shù)據(jù)

MapReducer工作流程圖:
MapReduce階段源碼分析以及shuffle過程詳解

1. MapReduce階段源碼分析

1)客戶端提交源碼分析

MapReduce階段源碼分析以及shuffle過程詳解
解釋
   - 判斷是否打印日志
   - 判斷是否使用新的API,檢查連接
   - 在檢查連接時,檢查輸入輸出路徑,計算切片,將jar、配置文件復(fù)制到HDFS
   - 計算切片時,計算最小切片數(shù)(默認(rèn)為1,可自定義)和最大切片數(shù)(默認(rèn)是long的最大值,可以自定義)
   - 查看給定的是否是文件,如果是否目錄計算目錄下所有文件的切片
   - 通過block大小和最小切片數(shù)、最大切片數(shù)計算出切片大小
   - 過切片大小,計算出map的數(shù)量以及分發(fā)到的節(jié)點(diǎn)
   - 提交job給yarn,進(jìn)行MapReduce計算

2)map階段源碼分析源碼分析(Map 的input階段)

MapReduce階段源碼分析以及shuffle過程詳解
解釋
   - 首先Map Task任務(wù),調(diào)用run()方法,run()方法會經(jīng)過以下幾個階段
   - 初始化taskcontext對象
   - 對mapper對象的初始化,此處包括一個默認(rèn)值的判斷,如果沒有自定義mapper類,默認(rèn)用系統(tǒng)的Mapper
   - 對文件輸入的格式化,此處包括一個默認(rèn)值的判斷,如果沒有自定義inputFormat類,默認(rèn)用系統(tǒng)的TextinputFormat
   - 創(chuàng)建input對象,創(chuàng)建具體的文件讀取類,通過lineReader(),默認(rèn)每次迭代讀取一行,此處實(shí)現(xiàn)一個迭代的判斷的nextKeyVaule(),并在nextKeyVaule實(shí)現(xiàn)時初始化key和value
   - Input初始化:計算打開位置,讀取文件內(nèi)容,(放棄第一行)
   - 調(diào)用mapper的run方法循環(huán)讀取,直到末尾,多讀一行,start放棄第一行的數(shù)據(jù)被上一個切片讀到,注意這里的run方法中就會調(diào)用我們編寫的Mapper類中的setup、map、cleanup方法

3)map階段源碼分析源碼分析(Map 的output階段)

MapReduce階段源碼分析以及shuffle過程詳解
解釋
   - 由newOutCollector創(chuàng)建output對象
   - newOutCollector中需要準(zhǔn)備collector和partitions計算reduce數(shù)量,會將map端輸出的K,V,P(分區(qū)號)寫入collector中
   - 在準(zhǔn)備collector實(shí)際上是準(zhǔn)備MpaOutputBuffer,這是一特別復(fù)雜的過程,這里向大致的解釋一下,就是先將收集的KV,P寫入一個環(huán)形的緩沖區(qū),然后在經(jīng)過排序和分區(qū)將數(shù)據(jù)寫入到文件中。(具體過程會在下面的shuffle中講解)
   - 最后mapOut結(jié)束之后,會調(diào)用close方法關(guān)閉output,在關(guān)閉時,會將剩余在buffer環(huán)的數(shù)據(jù)緩沖出去,并且將所有一些的小文件進(jìn)行排序然后合并成一個大文件。

2. shuffle過程詳解

MapReduce階段源碼分析以及shuffle過程詳解
過程介紹

  • 假如在hdfs中存儲一個300M文件,每個block的大小默認(rèn)為128M,而且默認(rèn)的切片大小也是128M,因此,每一個MapTask任務(wù)會處理一個split,則是有三個MapTask并行處理。
  • 每一個MapTask任務(wù)處理完成后,會通過收集器,將輸出的結(jié)果存入一個環(huán)形緩沖區(qū)中,寫入的過程會經(jīng)過簡單的排序,這個環(huán)形緩沖區(qū)的默認(rèn)是100M,當(dāng)環(huán)形緩沖區(qū)的大小使用超過80%,一個后臺線程就會啟動把環(huán)形緩沖區(qū)中的數(shù)據(jù)寫入到磁盤文件,同時Map會繼續(xù)向環(huán)形緩沖區(qū)中寫入數(shù)據(jù)。
  • 環(huán)形緩沖去的工作原理:
    • 環(huán)形緩沖區(qū)的大小默認(rèn)為100M(可以配置mapred-site.xml:mapreduce.task.io.sort.mb)
    • 環(huán)形緩沖區(qū)的閾值為:80%((mapred-site.xml:mapreduce.map.sort.spill.percent,默認(rèn)80%)
    • 在環(huán)形緩沖區(qū)中,存儲了兩種數(shù)據(jù),一個是元數(shù)據(jù):分區(qū)號,map的key的起始位置,map的value的起始位置,map的value的長度(每一個元數(shù)據(jù)長度為4個int長度,長度固定)
    • 一種是原始數(shù)據(jù):存放map的key和value
    • 在存儲原始數(shù)據(jù)和元數(shù)據(jù)的時候,會將元數(shù)據(jù)和原始數(shù)據(jù)中間建立一個赤道,分割二者,然后不斷的向兩端寫入數(shù)據(jù),在環(huán)形緩沖區(qū)的數(shù)據(jù)寫入到80%的時候,將這些數(shù)據(jù)鎖定,然后向硬盤中溢寫成小文件,同時環(huán)形緩沖區(qū)的剩下的部分仍然可以寫數(shù)據(jù),直到溢寫結(jié)束,鎖定釋放,繼續(xù)可以將元數(shù)據(jù)和原始數(shù)據(jù)寫入緩沖區(qū)中。
  • 緩沖區(qū)溢寫小文件:在溢寫小文件的時候,會對緩沖區(qū)中的元數(shù)據(jù)根據(jù)分區(qū)號和key進(jìn)行排序,然后根據(jù)排序好的元數(shù)據(jù),溢寫相應(yīng)的原始數(shù)據(jù)(這是因?yàn)樵獢?shù)據(jù)的大小是固定的,比直接排序原始數(shù)據(jù)更容易),這樣最后就會溢寫出多個已經(jīng)根據(jù)分區(qū)和key排序好的小文件(這里可以加入conbiner)
  • 對溢寫后的小文件進(jìn)行歸并:此時會將溢寫后的小文件進(jìn)行歸并成一個大文件(使用歸并排序),此時合并的大文件已經(jīng)按照分區(qū)和key排好序,
  • reduce拉取相應(yīng)的數(shù)據(jù):Reducer 中的一個線程定期向MRAppMaster詢問Mapper輸出結(jié)果文件位置,mapper結(jié)束后會向MRAppMaster匯報信息,從而 Reducer 得知 Mapper 狀態(tài),得到 map 結(jié)果文件目錄;reduce會相應(yīng)的拉取相同分區(qū)的小文件到本地
  • 然后會將拉取得到的相應(yīng)的相同分區(qū)的小文件,進(jìn)行歸并排序合并成為一個有序的大文件(相同的key在一起)。
  • 然后根據(jù)分組規(guī)則,相同的key為一組調(diào)用一次reduce方法,處理數(shù)據(jù)
  • 最終將結(jié)果數(shù)據(jù)根據(jù)分區(qū)寫入到不同的分區(qū)文件中。
向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI