溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

MR編程模型及MR V1講解

發(fā)布時(shí)間:2020-06-06 14:37:36 來源:網(wǎng)絡(luò) 閱讀:498 作者:afeiye 欄目:大數(shù)據(jù)

MR編程模型

MR編程模型主要分為五個(gè)步驟:輸入、映射、分組、規(guī)約、輸出。

  1. 輸入(InputFormat):
    主要包含兩個(gè)步驟—數(shù)據(jù)分片、迭代輸入

    數(shù)據(jù)分片(getSplits):數(shù)據(jù)分為多少個(gè)splits,就有多少個(gè)map task; 
    單個(gè)split的大小,由設(shè)置的split.minsize和split.maxsize決定;
    公式為 max{minsize, min{maxsize, blocksize}}; 
    hadoop2.7.3之前blocksize默認(rèn)64M,之后默認(rèn)128M。
    
    決定了單個(gè)split大小之后,就是hosts選擇,一個(gè)split可能包含多個(gè)block(將minsize設(shè)置大于128M);
    而多個(gè)block可能分布在多個(gè)hosts節(jié)點(diǎn)上(一個(gè)block默認(rèn)3備份,如果4個(gè)block就可能在12個(gè)節(jié)點(diǎn)),getsplits會(huì)選擇包含數(shù)據(jù)最多的一部分hosts。
    由此可見,為了讓數(shù)據(jù)本地話更合理,最好是一個(gè)block一個(gè)task,也就是說split大小跟block大小一致。
    
    getSplits會(huì)產(chǎn)生兩個(gè)文件
    job.split:存儲(chǔ)的主要是每個(gè)分片對(duì)應(yīng)的HDFS文件路徑,和其在HDFS文件中的起始位置、長(zhǎng)度等信息(map task使用,獲取分片的具體位置);
    job.splitmetainfo:存儲(chǔ)的則是每個(gè)分片在分片數(shù)據(jù)文件job.split中的起始位置、分片大小和hosts等信息(主要是作業(yè)初始化時(shí)使用,用于map task的本地化)。
    
    迭代輸入:迭代輸入一條條的數(shù)據(jù),對(duì)于文本數(shù)據(jù)來說,key就是行號(hào)、value當(dāng)前行文本。
  2. 映射(map):正常的map操作,將一對(duì)kv映射成為另外一對(duì)kv
  3. 分組(partition):
    按照設(shè)置的reduce個(gè)數(shù)來進(jìn)行分組,getPartitions共三個(gè)參數(shù):k、v、partitionnum;
    默認(rèn)按照HashPartition,如果需要全排序,也可以設(shè)置TotalOrderPartitioner,它會(huì)采樣一部分?jǐn)?shù)據(jù)排序后設(shè)置R-1(R是reduce個(gè)數(shù))個(gè)分割點(diǎn),保證map task生成的R個(gè)文件的文件與文件之間的數(shù)據(jù)都是有序的,reduce只需要對(duì)單個(gè)文件內(nèi)部再排序即可。
  4. 規(guī)約(reduce):reduce做聚合處理。
  5. 輸出(OutputFormat):
    一件事情是檢查輸出目錄是否存在,如果存在則報(bào)錯(cuò);
    另一件事情是將數(shù)據(jù)輸出到臨時(shí)目錄。

作業(yè)提交及初始化

MR編程模型及MR V1講解

  1. 作業(yè)提交與初始化大概分為4個(gè)步驟:執(zhí)行提交、client上傳文件到hdfs、client與JobTracker通信提交任務(wù)、JobTracker通知TaskScheduler初始化任務(wù)。
  2. JobClient與JobTracker的通信過程如下兩所示MR編程模型及MR V1講解
  3. 作業(yè)提交時(shí)序圖MR編程模型及MR V1講解
    第一步:JobClient先跟JobTracker交互獲取到一個(gè)jobid;
    第二步:JobClient與HDFS交互創(chuàng)建輸出目錄;
    第三步:與HDFS交互上傳任務(wù)運(yùn)行所以來的文件(配置文件、jar包等)
    第四步:JobClient調(diào)用getSplits,與HDFS交互生成分片信息并寫到分片文件中;
    第五步:與jobtracker交互提交任務(wù)。
  4. JobTracker收到任務(wù)提交請(qǐng)求后會(huì)先生成一個(gè)JobInProgress對(duì)象,這個(gè)對(duì)象會(huì)管理和監(jiān)控這個(gè)job的整個(gè)運(yùn)行狀況;之后JobTracker再告訴TaskSchduler進(jìn)行作業(yè)初始化。
  5. 作業(yè)初始話大致過程如下MR編程模型及MR V1講解

JobTracker與TaskTracker

  1. JobTracker主要負(fù)責(zé)作業(yè)的運(yùn)行時(shí)管理,以三級(jí)樹的方式進(jìn)行管理:首先會(huì)給作業(yè)初始化一個(gè)對(duì)象JobInProgress,初始化后每個(gè)task有個(gè)TaskInProgress,每個(gè)task對(duì)應(yīng)多個(gè)TaskAtempt。其中一個(gè)TA成功則此TI成功,所有TI成功則此job成功MR編程模型及MR V1講解
    JobTracker將很多數(shù)據(jù)以KV形式存儲(chǔ)在map中,比如jobs存儲(chǔ)的是jobid和JobInProgress的映射;
    JobTracker通過接收TaskTracker的心跳請(qǐng)求,并發(fā)出應(yīng)答來監(jiān)控和管理作業(yè)運(yùn)行過程,在應(yīng)答中會(huì)下達(dá)各種命令:運(yùn)行新task、殺死task等等
  2. TaskTracker:在每臺(tái)機(jī)器上會(huì)啟動(dòng)一個(gè)TaskTracker進(jìn)程,不斷地向JobTracker發(fā)送心跳,匯報(bào)當(dāng)前節(jié)點(diǎn)的資源使用情況、當(dāng)前節(jié)點(diǎn)的task運(yùn)行情況,并根據(jù)JobTracker在應(yīng)答中的指令執(zhí)行具體命令MR編程模型及MR V1講解
    TaskTracker會(huì)為每個(gè)task啟動(dòng)一個(gè)JVM(可重用,但是僅限于重用同類型任務(wù))
    TaskTracker啟動(dòng)一個(gè)新任務(wù)
    第一步:先進(jìn)行作業(yè)本地化,某個(gè)作業(yè)在TaskTracker上的第一個(gè)task會(huì)進(jìn)行作業(yè)本地化,也就是把作業(yè)運(yùn)行依賴的文件、jar包從hdfs下載到本地。(為避免多個(gè)task同時(shí)進(jìn)行作業(yè)本地化,會(huì)對(duì)本地化操作加鎖);
    第二步:創(chuàng)建任務(wù)臨時(shí)目錄;
    第三步:?jiǎn)?dòng)JVM,并在JVM運(yùn)行任務(wù)(部分情況JVM可復(fù)用);MR編程模型及MR V1講解

Map Task內(nèi)部運(yùn)行過程

map task總共可以五個(gè)過程:read、map、collect、splill、conbine。
Read:從數(shù)據(jù)源讀入一條條數(shù)據(jù);
map:將數(shù)據(jù)傳給map函數(shù),變成另外一對(duì)KV
collect階段:
主要是map處理完的數(shù)據(jù),先放入內(nèi)存的環(huán)形緩沖區(qū)中,待環(huán)形緩沖區(qū)的值超過一定比例的時(shí)候再執(zhí)行下一步的spill到磁盤;
collect()內(nèi)部會(huì)調(diào)用getPartition來進(jìn)行分區(qū),而環(huán)形緩沖區(qū)則存儲(chǔ)的是K、V和partition號(hào)MR編程模型及MR V1講解
這里采用的兩級(jí)索引結(jié)構(gòu),主要是排序時(shí)在同一個(gè)partition內(nèi)排序,所以先排partition,再排partition內(nèi)部數(shù)據(jù)。
kvindices中記錄的分區(qū)號(hào)、key開始的位置、value開始的位置,也就是一對(duì)兒KV在kvindices中占用3個(gè)int,kvoffsets只記錄一對(duì)KV在kvindices中的偏移地址,所以只需要一個(gè)int,所以二者按1:3的大小分配內(nèi)存。
spill過程:
環(huán)形緩存區(qū)中內(nèi)存數(shù)據(jù)在超過一定閾值后會(huì)spill到磁盤上,在splill到磁盤上之前會(huì)先在內(nèi)存中進(jìn)行排序(快速排序);
之后按分區(qū)編號(hào)分別寫到臨時(shí)文件,同一個(gè)分區(qū)編號(hào)后面會(huì)有個(gè)數(shù)字,表示第幾次溢寫,conbine:對(duì)多個(gè)文件合并,多倫遞歸,沒輪合并最小的n個(gè)文件。

Reduce Task內(nèi)部運(yùn)行過程

reduce總共可分為以下幾個(gè)階段:shuffle、merge、sort、reduce、write
shuffle:從JobTracker中獲取已完成的map task列表以及輸出位置,通過http接口獲取數(shù)據(jù);
merge:shuffle拉去的數(shù)據(jù)線放入內(nèi)存,內(nèi)存不夠再放入磁盤,會(huì)有一個(gè)線程不斷地合并內(nèi)存和磁盤中的數(shù)據(jù)
sort:reduce從不同的map task中拉取到多個(gè)有序文件,然后再做一次歸并排序,則每個(gè)reduce獲取到文件就都是有序的了

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI