您好,登錄后才能下訂單哦!
v mapreduce中,map階段處理的數(shù)據(jù)如何傳遞給reduce階段,是mapreduce框架中最關(guān)鍵的一個(gè)流程,這個(gè)流程就叫shuffle;
v shuffle: 洗牌、發(fā)牌——(核心機(jī)制:數(shù)據(jù)分區(qū),排序,緩存);
v 具體來說:就是將maptask輸出的處理結(jié)果數(shù)據(jù),分發(fā)給reducetask,并在分發(fā)的過程中,對(duì)數(shù)據(jù)按key進(jìn)行了分區(qū)和排序;
Shuffle緩存流程:
shuffle是MR處理流程中的一個(gè)過程,它的每一個(gè)處理步驟是分散在各個(gè)map task和reduce task節(jié)點(diǎn)上完成的,整體來看,分為3個(gè)操作:
1、分區(qū)partition
2、Sort根據(jù)key排序
3、Combiner進(jìn)行局部value的合并
1、 maptask收集我們的map()方法輸出的kv對(duì),放到內(nèi)存緩沖區(qū)中
2、 從內(nèi)存緩沖區(qū)不斷溢出本地磁盤文件,可能會(huì)溢出多個(gè)文件
3、 多個(gè)溢出文件會(huì)被合并成大的溢出文件
4、 在溢出過程中,及合并的過程中,都要調(diào)用partitoner進(jìn)行分組和針對(duì)key進(jìn)行排序
5、 reducetask根據(jù)自己的分區(qū)號(hào),去各個(gè)maptask機(jī)器上取相應(yīng)的結(jié)果分區(qū)數(shù)據(jù)
6、 reducetask會(huì)取到同一個(gè)分區(qū)的來自不同maptask的結(jié)果文件,reducetask會(huì)將這些文件再進(jìn)行合并(歸并排序)
7、 合并成大文件后,shuffle的過程也就結(jié)束了,后面進(jìn)入reducetask的邏輯運(yùn)算過程(從文件中取出一個(gè)一個(gè)的鍵值對(duì)group,調(diào)用用戶自定義的reduce()方法)
Shuffle中的緩沖區(qū)大小會(huì)影響到mapreduce程序的執(zhí)行效率,原則上說,緩沖區(qū)越大,磁盤io的次數(shù)越少,執(zhí)行速度就越快
緩沖區(qū)的大小可以通過參數(shù)調(diào)整, 參數(shù):io.sort.mb 默認(rèn)100M
Java的序列化是一個(gè)重量級(jí)序列化框架(Serializable),一個(gè)對(duì)象被序列化后,會(huì)附帶很多額外的信息(各種校驗(yàn)信息,header,繼承體系。。。。),不便于在網(wǎng)絡(luò)中高效傳輸;
所以,hadoop自己開發(fā)了一套序列化機(jī)制(Writable),精簡(jiǎn),高效
簡(jiǎn)單代碼驗(yàn)證兩種序列化機(jī)制的差別:
public class TestSeri { public static void main(String[] args) throws Exception { //定義兩個(gè)ByteArrayOutputStream,用來接收不同序列化機(jī)制的序列化結(jié)果 ByteArrayOutputStream ba = new ByteArrayOutputStream(); ByteArrayOutputStream ba2 = new ByteArrayOutputStream();
//定義兩個(gè)DataOutputStream,用于將普通對(duì)象進(jìn)行jdk標(biāo)準(zhǔn)序列化 DataOutputStream dout = new DataOutputStream(ba); DataOutputStream dout2 = new DataOutputStream(ba2); ObjectOutputStream obout = new ObjectOutputStream(dout2); //定義兩個(gè)bean,作為序列化的源對(duì)象 ItemBeanSer itemBeanSer = new ItemBeanSer(1000L, 89.9f); ItemBean itemBean = new ItemBean(1000L, 89.9f);
//用于比較String類型和Text類型的序列化差別 Text atext = new Text("a"); // atext.write(dout); itemBean.write(dout);
byte[] byteArray = ba.toByteArray();
//比較序列化結(jié)果 System.out.println(byteArray.length); for (byte b : byteArray) {
System.out.print(b); System.out.print(":"); }
System.out.println("-----------------------");
String astr = "a"; // dout2.writeUTF(astr); obout.writeObject(itemBeanSer);
byte[] byteArray2 = ba2.toByteArray(); System.out.println(byteArray2.length); for (byte b : byteArray2) { System.out.print(b); System.out.print(":"); } } } |
如果需要將自定義的bean放在key中傳輸,則還需要實(shí)現(xiàn)comparable接口,因?yàn)?/span>mapreduce框中的shuffle過程一定會(huì)對(duì)key進(jìn)行排序,此時(shí),自定義的bean實(shí)現(xiàn)的接口應(yīng)該是:
public class FlowBean implements WritableComparable<FlowBean>
需要自己實(shí)現(xiàn)的方法是:
/** * 反序列化的方法,反序列化時(shí),從流中讀取到的各個(gè)字段的順序應(yīng)該與序列化時(shí)寫出去的順序保持一致 */ @Override public void readFields(DataInput in) throws IOException { upflow = in.readLong(); dflow = in.readLong(); sumflow = in.readLong();
}
/** * 序列化的方法 */ @Override public void write(DataOutput out) throws IOException {
out.writeLong(upflow); out.writeLong(dflow); //可以考慮不序列化總流量,因?yàn)榭偭髁渴强梢酝ㄟ^上行流量和下行流量計(jì)算出來的 out.writeLong(sumflow);
} @Override public int compareTo(FlowBean o) { //實(shí)現(xiàn)按照sumflow的大小倒序排序 return sumflow>o.getSumflow()?-1:1; } |
Yarn是一個(gè)資源調(diào)度平臺(tái),負(fù)責(zé)為運(yùn)算程序提供服務(wù)器運(yùn)算資源,相當(dāng)于一個(gè)分布式的操作系統(tǒng)平臺(tái),而mapreduce等運(yùn)算程序則相當(dāng)于運(yùn)行于操作系統(tǒng)之上的應(yīng)用程序
1、 yarn并不清楚用戶提交的程序的運(yùn)行機(jī)制
2、 yarn只提供運(yùn)算資源的調(diào)度(用戶程序向yarn申請(qǐng)資源,yarn就負(fù)責(zé)分配資源)
3、 yarn中的主管角色叫ResourceManager
4、 yarn中具體提供運(yùn)算資源的角色叫NodeManager
5、 這樣一來,yarn其實(shí)就與運(yùn)行的用戶程序完全解耦,就意味著yarn上可以運(yùn)行各種類型的分布式運(yùn)算程序(mapreduce只是其中的一種),比如mapreduce、storm程序,spark程序,tez ……
6、 所以,spark、storm等運(yùn)算框架都可以整合在yarn上運(yùn)行,只要他們各自的框架中有符合yarn規(guī)范的資源請(qǐng)求機(jī)制即可
7、 Yarn就成為一個(gè)通用的資源調(diào)度平臺(tái),從此,企業(yè)中以前存在的各種運(yùn)算集群都可以整合在一個(gè)物理集群上,提高資源利用率,方便數(shù)據(jù)共享
mapreduce程序的調(diào)度過程,如下圖
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。