溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

3、MapReduce詳解與源碼剖析

發(fā)布時間:2020-02-28 15:05:08 來源:網(wǎng)絡 閱讀:355 作者:victor19901114 欄目:大數(shù)據(jù)

@[TOC]
3、MapReduce詳解與源碼剖析

1 Split階段

?????首先,接到hdf文件輸入,在mapreduce中的map task開始之前,將文件按照指定的大小切割成若干個部分,每一部分稱為一個split,默認是split的大小與block的大小相等,均為128MB。split大小由minSize、maxSize、blocksize決定,以wordcount代碼為例,以下是main()方法
3、MapReduce詳解與源碼剖析
進入waitForCompletion(true)方法,進入submit()方法
3、MapReduce詳解與源碼剖析
找到 return submitter .submitJobInternal(Job.this, cluster);
進入,找到 int maps = writeSplits(job, submitJobDir);
3、MapReduce詳解與源碼剖析
3、MapReduce詳解與源碼剖析
進入writeNewSplits()方法
3、MapReduce詳解與源碼剖析
?????進入writeNewSplits()方法,可以看出該方法首先獲取splits數(shù)組信息后,排序,將會優(yōu)先處理大文件。最終返回mapper數(shù)量。這其中又分為兩部分:確定切片數(shù)量 和 寫入切片信息。確定切片數(shù)量的任務交由FileInputFormat的getSplits(job)完成,寫入切片信息的任務交由JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array)方法,該方法會將切片信息和SplitMetaInfo都寫入HDFS中,return array.length;返回的是map任務數(shù),默認map的數(shù)量是: default_num = total_size / block_size;
?????實際的mapper數(shù)量就是輸入切片的數(shù)量,而切片的數(shù)量又由使用的輸入格式?jīng)Q定,默認為TextInputFormat,該類為FileInputFormat的子類。確定切片數(shù)量的任務交由FileInputFormat的getSplits(job)完成。FileInputFormat繼承自抽象類InputFormat,該類定義了MapReduce作業(yè)的輸入規(guī)范,其中的抽象方法List<InputSplit> getSplits(JobContext context)定義了如何將輸入分割為InputSplit,不同的輸入有不同的分隔邏輯,而分隔得到的每個InputSplit交由不同的mapper處理,因此該方法的返回值確定了mapper的數(shù)量。

2 Map階段

?????每個map task都有一個內(nèi)存緩沖區(qū), map的輸出結(jié)果先寫到內(nèi)存中的環(huán)形緩沖區(qū),緩沖區(qū)為100M,不斷的向緩沖區(qū)力寫數(shù)據(jù),當達到80M時,需要將緩沖區(qū)中的數(shù)據(jù)以一個臨時文件的方式存到磁盤,當整個map task結(jié)束后再對磁盤中這個map task所產(chǎn)生的所有臨時文件做合并,生成最終的輸出文件。最后,等待reduce task來拉取數(shù)據(jù)。當然,如果map task的結(jié)果不大,能夠完全存儲到內(nèi)存緩沖區(qū),且未達到內(nèi)存緩沖區(qū)的閥值,那么就不會有寫臨時文件到磁盤的操作,也不會有后面的合并。在寫入的過程中會進行分區(qū)、排序、combine操作。
?????環(huán)形緩沖區(qū):是使用指針機制把內(nèi)存中的地址首尾相接形成一個存儲中間數(shù)據(jù)的緩存區(qū)域,默認100MB;80M閾值,20M緩沖區(qū),是為了解決寫入環(huán)形緩沖區(qū)數(shù)據(jù)的速度大于寫出到spill文件的速度是數(shù)據(jù)的不丟失;Spill文件:spill文件是環(huán)形緩沖區(qū)到達閾值后寫入到磁盤的單個文件.這些文件在map階段計算結(jié)束時,會合成分好區(qū)的一個merge文件供給給reduce任務抓取;spill文件過小的時候,就不會浪費io資源合并merge;默認情況下3個以下spill文件不合并;對于在環(huán)形緩沖區(qū)中的數(shù)據(jù),最終達不到80m但是數(shù)據(jù)已經(jīng)計算完畢的情況,map任務將會調(diào)用flush將緩沖區(qū)中的數(shù)據(jù)強行寫出spill文件。

?????經(jīng)過map類處理后,輸出到內(nèi)存緩沖區(qū)(默認大小100M),超過一定大小后,文件溢寫到磁盤上,按照key分類
3、MapReduce詳解與源碼剖析
按照key合并成大文件,減少網(wǎng)絡開銷
3、MapReduce詳解與源碼剖析

2.1分區(qū)

看一下MapReduce自帶的分區(qū)器HashPartitioner
3、MapReduce詳解與源碼剖析
假設有聽個reduce任務,則分區(qū)的計算如下:
3、MapReduce詳解與源碼剖析

2.2排序

在對map結(jié)果進行分區(qū)之后,對于落在相同的分區(qū)中的鍵值對,要進行排序。

3 Shuffle階段

?????Shuffle過程是MapReduce的核心,描述著數(shù)據(jù)從map task輸出到reduce task輸入的這段過程。reducetask根據(jù)自己的分區(qū)號,去各個maptask分區(qū)機器上取相應的結(jié)果分區(qū)數(shù)據(jù),reducetask會將這些文件再進行合并(歸并排序)。
?????所有相同key的數(shù)據(jù)匯集到一個partition
3、MapReduce詳解與源碼剖析
?????將相同的key value匯聚到一起, 但不計算
3、MapReduce詳解與源碼剖析

4 Reduce階段

reduce階段分三個步驟:
抓取,合并,排序
?????1 reduce 任務會創(chuàng)建并行的抓取線程(fetcher)負責從完成的map任務中獲取結(jié)果文件,是否完成是通過rpc心跳監(jiān)聽,通過http協(xié)議抓取;默認是5個抓取線程,可調(diào),為了是整體并行,在map任務量大,分區(qū)多的時候,抓取線程調(diào)大;
?????2 抓取過來的數(shù)據(jù)會先保存在內(nèi)存中,如果內(nèi)存過大也溢出,不可見,不可調(diào),但是單位是每個merge文件,不會切分數(shù)據(jù);每個merge文件都會被封裝成一個segment的對象,這個對象控制著這個merge文件的讀取記錄操作,有兩種情況出現(xiàn):在內(nèi)存中有merge數(shù)據(jù) ?在溢寫之后存到磁盤上的數(shù)據(jù) ?通過構(gòu)造函數(shù)的區(qū)分,來分別創(chuàng)建對應的segment對象
?????3 這種segment對象會放到一個內(nèi)存隊列中MergerQueue,對內(nèi)存和磁盤上的數(shù)據(jù)分別進行合并,內(nèi)存中的merge對應的segment直接合并,磁盤中的合并與一個叫做合并因子的factor有關(guān)(默認是10)
?????4 排序問題,MergerQueue繼承輪換排序的接口,每一個segment 是排好序的,而且按照key的值大小邏輯(和真的大小沒關(guān)系);每一個segment的第一個key都是邏輯最小,而所有的segment的排序是按照第一個key大小排序的,最小的在前面,這種邏輯總能保證第一個segment的第一個key值是所有key的邏輯最小文件合并之后,最終交給reduce函數(shù)計算的,是MergeQueue隊列,每次計算的提取數(shù)據(jù)邏輯都是提取第一個segment的第一個key和value數(shù)據(jù),一旦segment被調(diào)用了提取key的方法,MergeQueue隊列將會整體重新按照最小key對segment排序,最終形成整體有序的計算結(jié)果;
3、MapReduce詳解與源碼剖析
3、MapReduce詳解與源碼剖析
partition 、Reduce、輸出文件數(shù)量相等
3、MapReduce詳解與源碼剖析
Reduce任務數(shù)量
在大數(shù)據(jù)量的情況下,如果只設置1個Reduce任務,其他節(jié)點將被閑置,效率底下 所以將Reduce設置成一個較大的值(max:72).調(diào)節(jié)Reduce任務數(shù)量的方法 一個節(jié)點的Reduce任務數(shù)并不像Map任務數(shù)那樣受多個因素制約

通過參數(shù)調(diào)節(jié)mapred.reduce.tasks(在配置文件中)
在代碼中調(diào)用job.setNumReduceTasks(int n)方法(在code中)
向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI