溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

MAPREDUCE原理篇(2)

發(fā)布時(shí)間:2020-06-28 10:04:22 來源:網(wǎng)絡(luò) 閱讀:788 作者:yushiwh 欄目:大數(shù)據(jù)

3.1 mapreduceshuffle機(jī)制

3.1.1 概述:

mapreduce中,map階段處理的數(shù)據(jù)如何傳遞給reduce階段,是mapreduce框架中最關(guān)鍵的一個(gè)流程,這個(gè)流程就叫shuffle;

shuffle: 洗牌、發(fā)牌——(核心機(jī)制:數(shù)據(jù)分區(qū),排序,緩存);

具體來說:就是將maptask輸出的處理結(jié)果數(shù)據(jù),分發(fā)給reducetask,并在分發(fā)的過程中,對(duì)數(shù)據(jù)按key進(jìn)行了分區(qū)和排序;

 

3.1.2 主要流程:

Shuffle緩存流程:

MAPREDUCE原理篇(2) 

shuffleMR處理流程中的一個(gè)過程,它的每一個(gè)處理步驟是分散在各個(gè)map taskreduce task節(jié)點(diǎn)上完成的,整體來看,分為3個(gè)操作:

1、分區(qū)partition

2、Sort根據(jù)key排序

3、Combiner進(jìn)行局部value的合并

 

3.1.3 詳細(xì)流程

1、 maptask收集我們的map()方法輸出的kv對(duì),放到內(nèi)存緩沖區(qū)中

2、 從內(nèi)存緩沖區(qū)不斷溢出本地磁盤文件,可能會(huì)溢出多個(gè)文件

3、 多個(gè)溢出文件會(huì)被合并成大的溢出文件

4、 在溢出過程中,及合并的過程中,都要調(diào)用partitoner進(jìn)行分組和針對(duì)key進(jìn)行排序

5、 reducetask根據(jù)自己的分區(qū)號(hào),去各個(gè)maptask機(jī)器上相應(yīng)的結(jié)果分區(qū)數(shù)據(jù)

6、 reducetask會(huì)取到同一個(gè)分區(qū)的來自不同maptask的結(jié)果文件,reducetask會(huì)將這些文件再進(jìn)行合并(歸并排序)

7、 合并成大文件后,shuffle的過程也就結(jié)束了,后面進(jìn)入reducetask的邏輯運(yùn)算過程(從文件中取出一個(gè)一個(gè)的鍵值對(duì)group,調(diào)用用戶自定義的reduce()方法)

 

Shuffle中的緩沖區(qū)大小會(huì)影響到mapreduce程序的執(zhí)行效率,原則上說,緩沖區(qū)越大,磁盤io的次數(shù)越少,執(zhí)行速度就越快

緩沖區(qū)的大小可以通過參數(shù)調(diào)整,  參數(shù):io.sort.mb  默認(rèn)100M

 

 

 

3.1.4 詳細(xì)流程示意圖

MAPREDUCE原理篇(2) 


3.2. MAPREDUCE中的序列化

3.2.1 概述

Java的序列化是一個(gè)重量級(jí)序列化框架(Serializable),一個(gè)對(duì)象被序列化后,會(huì)附帶很多額外的信息(各種校驗(yàn)信息,header,繼承體系。。。。),不便于在網(wǎng)絡(luò)中高效傳輸;

所以,hadoop自己開發(fā)了一套序列化機(jī)制(Writable),精簡(jiǎn),高效

 

3.2.2 Jdk序列化和MR序列化之間的比較

簡(jiǎn)單代碼驗(yàn)證兩種序列化機(jī)制的差別:

public class TestSeri {

public static void main(String[] args) throws Exception {

//定義兩個(gè)ByteArrayOutputStream,用來接收不同序列化機(jī)制的序列化結(jié)果

ByteArrayOutputStream ba = new ByteArrayOutputStream();

ByteArrayOutputStream ba2 = new ByteArrayOutputStream();

 

//定義兩個(gè)DataOutputStream,用于將普通對(duì)象進(jìn)行jdk標(biāo)準(zhǔn)序列化

DataOutputStream dout = new DataOutputStream(ba);

DataOutputStream dout2 = new DataOutputStream(ba2);

ObjectOutputStream obout = new ObjectOutputStream(dout2);

//定義兩個(gè)bean,作為序列化的源對(duì)象

ItemBeanSer itemBeanSer = new ItemBeanSer(1000L, 89.9f);

ItemBean itemBean = new ItemBean(1000L, 89.9f);

 

//用于比較String類型和Text類型的序列化差別

Text atext = new Text("a");

// atext.write(dout);

itemBean.write(dout);

 

byte[] byteArray = ba.toByteArray();

 

//比較序列化結(jié)果

System.out.println(byteArray.length);

for (byte b : byteArray) {

 

System.out.print(b);

System.out.print(":");

}

 

System.out.println("-----------------------");

 

String astr = "a";

// dout2.writeUTF(astr);

obout.writeObject(itemBeanSer);

 

byte[] byteArray2 = ba2.toByteArray();

System.out.println(byteArray2.length);

for (byte b : byteArray2) {

System.out.print(b);

System.out.print(":");

}

}

}

 

 

 

3.2.3 自定義對(duì)象實(shí)現(xiàn)MR中的序列化接口

如果需要將自定義的bean放在key中傳輸,則還需要實(shí)現(xiàn)comparable接口,因?yàn)?/span>mapreduce框中的shuffle過程一定會(huì)對(duì)key進(jìn)行排序,此時(shí),自定義的bean實(shí)現(xiàn)的接口應(yīng)該是:

public  class  FlowBean  implements  WritableComparable<FlowBean>

需要自己實(shí)現(xiàn)的方法是:

/**

 * 反序列化的方法,反序列化時(shí),從流中讀取到的各個(gè)字段的順序應(yīng)該與序列化時(shí)寫出去的順序保持一致

 */

@Override

public void readFields(DataInput in) throws IOException {


upflow = in.readLong();

dflow = in.readLong();

sumflow = in.readLong();


 

}

 

/**

 * 序列化的方法

 */

@Override

public void write(DataOutput out) throws IOException {

 

out.writeLong(upflow);

out.writeLong(dflow);

//可以考慮不序列化總流量,因?yàn)榭偭髁渴强梢酝ㄟ^上行流量和下行流量計(jì)算出來的

out.writeLong(sumflow);

 

}


@Override

public int compareTo(FlowBean o) {


//實(shí)現(xiàn)按照sumflow的大小倒序排序

return sumflow>o.getSumflow()?-1:1;

}

 

 

 

3.3. MapReduceYARN

3.3.1 YARN概述

Yarn是一個(gè)資源調(diào)度平臺(tái),負(fù)責(zé)為運(yùn)算程序提供服務(wù)器運(yùn)算資源,相當(dāng)于一個(gè)分布式的操作系統(tǒng)平臺(tái),而mapreduce等運(yùn)算程序則相當(dāng)于運(yùn)行于操作系統(tǒng)之上的應(yīng)用程序

3.3.2 YARN的重要概念

1、 yarn并不清楚用戶提交的程序的運(yùn)行機(jī)制

2、 yarn只提供運(yùn)算資源的調(diào)度(用戶程序向yarn申請(qǐng)資源,yarn就負(fù)責(zé)分配資源)

3、 yarn中的主管角色叫ResourceManager

4、 yarn中具體提供運(yùn)算資源的角色叫NodeManager

5、 這樣一來,yarn其實(shí)就與運(yùn)行的用戶程序完全解耦,就意味著yarn上可以運(yùn)行各種類型的分布式運(yùn)算程序(mapreduce只是其中的一種),比如mapreducestorm程序,spark程序,tez ……

6、 所以,spark、storm等運(yùn)算框架都可以整合在yarn上運(yùn)行,只要他們各自的框架中有符合yarn規(guī)范的資源請(qǐng)求機(jī)制即可

7、 Yarn就成為一個(gè)通用的資源調(diào)度平臺(tái),從此,企業(yè)中以前存在的各種運(yùn)算集群都可以整合在一個(gè)物理集群上,提高資源利用率,方便數(shù)據(jù)共享

 

3.3.3 Yarn中運(yùn)行運(yùn)算程序的示例

mapreduce程序的調(diào)度過程,如下圖

 

MAPREDUCE原理篇(2) 



向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI