您好,登錄后才能下訂單哦!
今天小編給大家分享一下hadoop之MapReduce框架原理是什么的相關(guān)知識點,內(nèi)容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。
MapReduce是分為兩個階段的,MapperTask階段,和ReduceTask階段。(中間有一個Shuffle階段)
Mapper階段,可以通過選擇什么方式(K,V的選擇對應(yīng)不同的方法)來讀取數(shù)據(jù),讀取后把數(shù)據(jù)交給Mapper來進行后續(xù)的業(yè)務(wù)邏輯(用戶寫),讓后進入Reduce階段通過Shuffle來拉取Mapper階段的數(shù)據(jù),讓后通過OutputFormat(等方法)來寫出(可以是ES,mysql,hbase,文件)
MapTask個數(shù),決定了并行度(相當于在生成map集合的過程中有幾個人在干活),**(不一定越多越好,當數(shù)據(jù)量小的時候可能開啟的眾多MapTask的時間用一個MapTask已經(jīng)計算完成)
數(shù)據(jù)塊:Block是HDFS物理上把數(shù)據(jù)分成一塊一塊。數(shù)據(jù)塊是HDFS存儲數(shù)據(jù)單位。
數(shù)據(jù)切片:數(shù)據(jù)切片只是在邏輯上對輸入進行分片,并不會在磁盤上將其切分成片進行存儲。數(shù)據(jù)切片是MapReduce程序計算輸入數(shù)據(jù)的單位,一個切片會對應(yīng)啟動一個MapTask。
因為我們找的job提交,所以在job提交函數(shù)哪里打個斷點,
步入函數(shù)后
ensureState(JobState.DEFINE); 是確保你的狀態(tài)是正確的(狀態(tài)不對或者running 都會拋異常)
setUseNewAPI(); 處理Hadoop不同版本之間的API兼容
connect(); 連接,(客戶端需要與集群或者本機連接)
checkSpecs(job); 校驗 校驗輸出路徑是否已經(jīng)創(chuàng)建,是否有參
return submitter.submitJobInternal(Job.this, cluster); 核心代碼 步入的時候需要點兩下,
第一個步入是步入的參數(shù)Job 第二個才步入此方法
這個方法是提交job(在集群模式下,提交的job包含(通過客戶端方式把jar包提交給集群),在本地不需要提交jar包,jar在本地是存在的)
還會進行切片,生成切片信息(幾個切片就有幾個MapTask)
還會 生成xml文件
綜上 job提交會交三樣東西(jar,xml文件,切片信息---》集群模式下)
最后會刪除所有的信息文件
**(切片是每一個文件單獨切片)
在本地是32m一塊,前邊說過,默認一塊對應(yīng)一個切片,但是有前提條件,再你減去32m的時候,余下最后一塊如果大于1.1倍就重新分配切片,但如果小于1.1,則不能更新分片
例子1:
已有一個32.1m的數(shù)據(jù) 物理分塊是(32m+0.1m)切片分布是(1個切片,因為32.1/32=1.003125<1.1 所以使用一個切片)
例子2:
已有一個100m的數(shù)據(jù)
100-32-32=36>32(36/32=1.125>1.1 所以最后36m需要分配兩個切片)
**塊的大小沒辦法改變,但是可以調(diào)切片大小(maxSize讓切片調(diào)?。╩inSize讓切片調(diào)大)
切片總結(jié):
(開一個MapTask 默認是占1g內(nèi)存+1個cpu)
思考:在運行MapReduce程序時,輸入的文件格式包括:基于行的日志文件、二進制格式文件、數(shù)據(jù)庫表等。那么,針對不同的數(shù)據(jù)類型,MapReduce是如何讀取這些數(shù)據(jù)的呢?
FileInputFormat常見的接口實現(xiàn)類包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定義InputFormat等。(應(yīng)用場景的不同選擇不同的接口實現(xiàn)類)
TextInputFormat是默認的FileInputFormat實現(xiàn)類。按行讀取每條記錄。鍵是存儲該行在整個文件中的起始字節(jié)偏移量, LongWritable類型。值是這行的內(nèi)容,不包括任何行終止符(換行符和回車符),Text類型。
CombineTextInputFormat用于小文件過多的場景,它可以將多個小文件從邏輯上規(guī)劃到一個切片中,這樣,多個小文件就可以交給一個MapTask處理。
將輸入目錄下所有文件大小,依次和設(shè)置的setMaxInputSplitSize(切片大?。┲当容^,如果不大于設(shè)置的最大值,邏輯上劃分一個塊。如果輸入文件大于設(shè)置的最大值且大于兩倍,那么以最大值切割一塊;當剩余數(shù)據(jù)大小超過設(shè)置的最大值且不大于最大值2倍,此時將文件均分成2個虛擬存儲塊(防止出現(xiàn)太小切片)。
測試:
再不使用CombineTextInputFormat情況下(默認TextInputFormat)
可以看到切片為4
添加代碼,設(shè)置實現(xiàn)類為CombineTextInputFormat 和 設(shè)置虛擬存儲切片大小
// 如果不設(shè)置InputFormat,它默認用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); //虛擬存儲切片最大值設(shè)置4m CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
可以看到,現(xiàn)在是3個切片
我們可以通過改變虛擬切片大小來改變調(diào)用的切片的數(shù)量
綜上:影響切片的數(shù)量的因素為:(1)數(shù)據(jù)量的大?。?)切片的大小(一般會自動調(diào)整)(3)文件格式(有些文件是不可切片的)
影響切片大小的因素: HDFS中塊的大?。ㄍㄟ^調(diào)maxsize,minsize與塊的大小進行比較來判斷)
shuffle階段是一個從mapper階段出來的后的階段,會寫入(k,v)一個環(huán)形緩沖區(qū)(緩沖區(qū)分為兩半,一半存儲索引,一半存儲數(shù)據(jù),默認100m,到達80%后會反向逆寫(減少時間消耗,提高效率,逆寫是因為不需要等待全部溢寫后在進行寫入操作)逆寫入文件前會進行分區(qū)(分區(qū)的個數(shù)與reduceTask的個數(shù)有關(guān))排序(對key進行排序,但是存儲位置并不發(fā)生改變,只改變索引的位置,改變存儲位置消耗資源較大))寫入文件后會進行歸并排序(在有序的情況下,歸并是最高效的))
排序可以自定義排序,舉例全排序:
自定義了一個Bean類,bean對象做為key傳輸,需要實現(xiàn)WritableComparable接口重寫compareTo方法,就可以實現(xiàn)排序。
并不滿足所有生產(chǎn)環(huán)境下,只有在不影響最終業(yè)務(wù)邏輯下才可以實現(xiàn)(求和就可以,算平均值就不可以)
combiner與reducetask區(qū)別如下:
(1)Copy階段:ReduceTask從各個MapTask上遠程拷貝一片數(shù)據(jù),并針對某一片數(shù)據(jù),如果其大小超過一定閾值,則寫到磁盤上,否則直接放到內(nèi)存中。
(2)Sort階段:在遠程拷貝數(shù)據(jù)的同時,ReduceTask啟動了兩個后臺線程對內(nèi)存和磁盤上的文件進行合并,以防止內(nèi)存使用過多或磁盤上文件過多。按照MapReduce語義,用戶編寫reduce()函數(shù)輸入數(shù)據(jù)是按key進行聚集的一組數(shù)據(jù)。為了將key相同的數(shù)據(jù)聚在一起,Hadoop采用了基于排序的策略。由于各個MapTask已經(jīng)實現(xiàn)對自己的處理結(jié)果進行了局部排序,因此,ReduceTask只需對所有數(shù)據(jù)進行一次歸并排序即可。
(3)Reduce階段:reduce()函數(shù)將計算結(jié)果寫到HDFS上。
ReduceTask的個數(shù)可以手動進行設(shè)置,設(shè)置幾就會產(chǎn)生幾個文件(分區(qū)同上)
簡述流程:
(1)自定義bean對象(序列化反序列化函數(shù)---implements Writable)
(2)寫mapper類 先重寫setup方法(因為本案例需要兩個文件,初始化(讀多個文 希望先獲取到文件名稱(多文件) 一個文件一個切片 setup方法是一個優(yōu)化手段 獲取文件名稱)
(3)寫reduce類(業(yè)務(wù)邏輯) 先創(chuàng)建一個集合(類型為bean類型)和bean對象用于存儲
用for循環(huán)遍歷value(key是一樣的 一樣的key才會進入同一個reduce方法)
獲取文件名判斷寫出不同的業(yè)務(wù)邏輯
"order"表:
先創(chuàng)建一個bean對象,用于存儲數(shù)據(jù),用于后續(xù)寫入集合
用到方法 BeanUtils.copyProperties(tmpOrderBean,value); 獲取原數(shù)據(jù)
讓后加入上述創(chuàng)建的集合 orderBeans.add(tmpOrderBean);
“pd”表:
BeanUtils.copyProperties(pdBean,value);直接獲取原數(shù)據(jù)
存儲結(jié)束,結(jié)合階段:
使用增強for
orderbean.setPname(pdBean.getPname());
使用set函數(shù)直接設(shè)置集合中的pname
讓后寫入
context.write(orderbean,NullWritable.get());
業(yè)務(wù)結(jié)束
Reduce Join的缺點:這種方式中,合并的操作是在Reduce階段完成,Reduce端的處理壓力太大,Map節(jié)點的運算負載則很低,資源利用率不高,且在Reduce階段極易產(chǎn)生數(shù)據(jù)傾斜。
使用場景
Map Join適用于一張表十分小、一張表很大的場景。
Map端實現(xiàn)數(shù)據(jù)合并就解決了Reduce Join的缺點(數(shù)據(jù)傾斜)
簡述流程:
在map類中
setup方法:將較小文件讀入緩存,將數(shù)據(jù)存儲到全局的map集合中,將緩存中的數(shù)據(jù)全部寫入
重寫的map方法中:
轉(zhuǎn)換成字符串在切割,通過切割后的數(shù)組獲取map集合中的pname
讓后重新設(shè)置輸出文件的格式進行寫出
以上就是“hadoop之MapReduce框架原理是什么”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關(guān)注億速云行業(yè)資訊頻道。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。