溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

大數(shù)據(jù) : Hadoop reduce階段

發(fā)布時(shí)間:2020-06-05 12:47:15 來源:網(wǎng)絡(luò) 閱讀:333 作者:愛碼學(xué)院 欄目:大數(shù)據(jù)

Mapreduce中由于sort的存在,MapTask和ReduceTask直接是工作流的架構(gòu)。而不是數(shù)據(jù)流的架構(gòu)。在MapTask尚未結(jié)束,其輸出結(jié)果尚未排序及合并前,ReduceTask是又有數(shù)據(jù)輸入的,因此即使ReduceTask已經(jīng)創(chuàng)建也只能睡眠等待MapTask完成。從而可以從MapTask節(jié)點(diǎn)獲取數(shù)據(jù)。一個(gè)MapTask最終的數(shù)據(jù)輸出是一個(gè)合并的spill文件,可以通過Web地址訪問。所以reduceTask一般在MapTask快要完成的時(shí)候才啟動(dòng)。啟動(dòng)早了浪費(fèi)container資源。

ReduceTask是個(gè)線程,這個(gè)線程運(yùn)行在YarnChild的Java虛擬機(jī)上,我們從ReduceTask.run開始看Reduce階段。 獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)

throws IOException, InterruptedException, ClassNotFoundException {

job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

if (isMapOrReduce()) {

/添加reduce過程需要經(jīng)過的幾個(gè)階段。以便通知TaskTracker目前運(yùn) 行的情況/

copyPhase = getProgress().addPhase("copy");

sortPhase = getProgress().addPhase("sort");

reducePhase = getProgress().addPhase("reduce");

}

// start thread that will handle communication with parent

TaskReporter reporter = startReporter(umbilical);

// 設(shè)置并啟動(dòng)reporter進(jìn)程以便和TaskTracker進(jìn)行交流

boolean useNewApi = job.getUseNewReducer();

//在job client中初始化job時(shí),默認(rèn)就是用新的API,詳見Job.setUseNewAPI()方法

initialize(job, getJobID(), reporter, useNewApi);

/用來初始化任務(wù),主要是進(jìn)行一些和任務(wù)輸出相關(guān)的設(shè)置,比如創(chuàng)建commiter,設(shè)置工作目錄等/

// check if it is a cleanupJobTask

/以下4個(gè)if語句均是根據(jù)任務(wù)類型的不同進(jìn)行相應(yīng)的操作,這些方 法均是Task類的方法,所以與任務(wù)是MapTask還是ReduceTask無關(guān)/

if (jobCleanup) {

runJobCleanupTask(umbilical, reporter);

return;//只是為了JobCleanup,做完就停

}

if () {

runJobSetupTask(umbilical, reporter);

return;

//主要是創(chuàng)建工作目錄的FileSystem對(duì)象

}

if (taskCleanup) {

runTaskCleanupTask(umbilical, reporter);

return;

//設(shè)置任務(wù)目前所處的階段為結(jié)束階段,并且刪除工作目錄

}

下面才是真正要成為reducer

// Initialize the codec

codec = initCodec();

RawKeyValueIterator rIter = null;

ShuffleConsumerPlugin shuffleConsumerPlugin = null;

Class combinerClass = conf.getCombinerClass();

CombineOutputCollector combineCollector =

(null != combinerClass) ?

new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;

//如果需要就創(chuàng)建combineCollector

Classextends ShuffleConsumerPlugin> clazz =

job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);

//配置文件找mapreduce.job.reduce.shuffle.consumer.plugin.class默認(rèn)是shuffle.class

shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);

//創(chuàng)建shuffle類對(duì)象

LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);

ShuffleConsumerPlugin.Context shuffleContext =

new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,

super.lDirAlloc, reporter, codec,

combinerClass, combineCollector,

spilledRecordsCounter, reduceCombineInputCounter,

shuffledMapsCounter,

reduceShuffleBytes, failedShuffleCounter,

mergedMapOutputsCounter,

taskStatus, copyPhase, sortPhase, this,

mapOutputFile, localMapFiles);

//創(chuàng)建context對(duì)象,ShuffleConsumerPlugin.Context

shuffleConsumerPlugin.init(shuffleContext);

//這里調(diào)用的起始是shuffle的init函數(shù),重點(diǎn)摘要如下。

this.localMapFiles = context.getLocalMapFiles();

scheduler = new ShuffleSchedulerImpl(jobConf, taskStatus, reduceId,

this, copyPhase, context.getShuffledMapsCounter(),

context.getReduceShuffleBytes(), context.getFailedShuffleCounter());

//創(chuàng)建shuffle所需的調(diào)度器

merger = createMergeManager(context);

//創(chuàng)建shuffle內(nèi)部的merge,createMergeManager里面源碼:

return new MergeManagerImpl(reduceId, jobConf, context.getLocalFS(),

context.getLocalDirAllocator(), reporter, context.getCodec(),

context.getCombinerClass(), context.getCombineCollector(),

context.getSpilledRecordsCounter(),

context.getReduceCombineInputCounter(),

context.getMergedMapOutputsCounter(), this, context.getMergePhase(),

context.getMapOutputFile());

//創(chuàng)建MergeMnagerImpl對(duì)象和Merge線程

rIter = shuffleConsumerPlugin.run();

//從各個(gè)Mapper復(fù)制其輸出文件,并加以合并排序,等待直到完成為止

// free up the data structures

mapOutputFilesOnDisk.clear();

sortPhase.complete();

//排序階段完成

setPhase(TaskStatus.Phase.REDUCE);

//進(jìn)入reduce階段

statusUpdate(umbilical);

Class keyClass = job.getMapOutputKeyClass();

Class valueClass = job.getMapOutputValueClass();

RawComparator comparator = job.getOutputValueGroupingComparator();

//3.Reduce 1.Reduce任務(wù)的最后一個(gè)階段。它會(huì)準(zhǔn)備好Map的 keyClass("mapred.output.key.class""mapred.mapoutput.key.class"),valueClass("mapred.mapoutput.value.class"或"mapred.output.value.class")和 Comparator (“mapred.output.value.groupfn.class”或“mapred.output.key.comparator.class”)

if (useNewApi) {

//2.根據(jù)參數(shù)useNewAPI判斷執(zhí)行runNewReduce還是runOldReduce。分析潤runNewReduce

runNewReducer(job, umbilical, reporter, rIter, comparator,

keyClass, valueClass);

//0.像報(bào)告進(jìn)程書寫一些信息,1.獲得一個(gè)TaskAttemptContext對(duì)象。通過這個(gè)對(duì)象創(chuàng)建reduce、output及用于跟蹤的統(tǒng)計(jì)output的RecordWrit、最后創(chuàng)建用于收集reduce結(jié)果的Context,2.reducer.run(reducerContext)開始執(zhí)行reduce

} else {//老API

runOldReducer(job, umbilical, reporter, rIter, comparator,

keyClass, valueClass);

}

shuffleConsumerPlugin.close();

done(umbilical, reporter);

}

(1)reduce分為三個(gè)階段(copy就是遠(yuǎn)程拷貝Map的輸出數(shù)據(jù)、sort就是對(duì)所有的數(shù)據(jù)做排序、reduce做聚集就是我們自己寫的reducer),為這三個(gè)階段分別設(shè)置Progress,用來和TaskTracker通信報(bào)道狀態(tài)。

(2)上面代碼的15-40行和MapReduce的MapTask任務(wù)的運(yùn)行源碼級(jí)分析中對(duì)應(yīng)部分基本相同,可參考之;

(3)codec = initCodec()這句是檢查map的輸出是否是壓縮的,壓縮的則返回壓縮codec實(shí)例,否則返回null,這里討論不壓縮的;

(4)我們討論完全分布式的hadoop,即isLocal==false,然后構(gòu)造一個(gè)ReduceCopier對(duì)象reduceCopier,并調(diào)用reduceCopier.fetchOutputs()方法拷貝各個(gè)Mapper的輸出,到本地;

(5)然后copy階段完成,設(shè)置接下來的階段是sort階段,更新狀態(tài)信息;

(6)根據(jù)isLocal來選擇KV迭代器,完全分布式的會(huì)使用reduceCopier.createKVIterator(job, rfs, reporter)作為KV迭代器;

(7)sort階段完成,設(shè)置接下來的階段是reduce階段,更新狀態(tài)信息;

(8)然后獲取一些配置信息,并根據(jù)是否使用新API選擇不同的處理方式,這里是新的API,調(diào)用runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass)會(huì)執(zhí)行reducer;

(9)done(umbilical, reporter)這個(gè)方法用于做結(jié)束任務(wù)的一些清理工作:更新計(jì)數(shù)器updateCounters();如果任務(wù)需要提交,設(shè)置Taks狀態(tài)為COMMIT_PENDING,并利用TaskUmbilicalProtocol,匯報(bào)Task完成,等待提交,然后調(diào)用commit提交任務(wù);設(shè)置任務(wù)結(jié)束標(biāo)志位;結(jié)束Reporter通信線程;發(fā)送最后一次統(tǒng)計(jì)報(bào)告(通過sendLastUpdate方法);利用TaskUmbilicalProtocol報(bào)告結(jié)束狀態(tài)(通過sendDone方法)。

有些人將Reduce Task分為了5個(gè)階段:一、shuffle階段:也稱為Copy階段,就是從各個(gè)MapTask上遠(yuǎn)程拷貝一片數(shù)據(jù),如果大小超過一定閾值就寫到磁盤,否則放入內(nèi)存;二、Merge階段:在遠(yuǎn)程拷貝數(shù)據(jù)的同時(shí),Reduce Task啟動(dòng)了兩個(gè)后臺(tái)線程對(duì)內(nèi)存和磁盤上的文件進(jìn)行合并,防止內(nèi)存使用過多和磁盤文件過多;三、sort階段:用戶編寫的reduce方法的輸入數(shù)據(jù)是按key進(jìn)行聚集的,需要對(duì)copy過來的數(shù)據(jù)排序,這里用的是歸并排序,因?yàn)镸ap Task的結(jié)果是有序的;四、Reduce階段:將每組數(shù)據(jù)依次交給用戶編寫的Reduce方法處理;五、write階段:就是將結(jié)果寫入HDFS。

上面的5個(gè)階段分的比較細(xì)了,代碼里分為3個(gè)階段copy、sort、reduce,我們?cè)趀clipse運(yùn)行MR程序時(shí),控制臺(tái)看到的reduce階段的百分比就分為3個(gè)階段各占33.3%。

這里的shuffleConsumerPlugin是實(shí)現(xiàn)了ShuffleConsumerPlugin的某個(gè)類對(duì)象。具體可以通過配置文件mapreduce.job.reduce.shuffle.consumer.plugin.class選項(xiàng)設(shè)置,默認(rèn)情況下是使用shuffle。我們?cè)诖a中分析過完成shuffleConsumerPlugin.run,通常是shuffle.run,因?yàn)橛辛诉@個(gè)過程Mapper的合成的spill文件才能通過HTTP協(xié)議傳輸?shù)絉educer端。有了數(shù)據(jù)才能進(jìn)行runNewReducer或者runOldReducer。可以說shuffle對(duì)象就是MapTask的搬運(yùn)工。而且shuffle的搬運(yùn)方式不是一遍搬運(yùn)一遍Reducer處理,而是要把MapTask所有的數(shù)據(jù)都搬運(yùn)過來,并且進(jìn)行合并排序之后才開始提供給對(duì)應(yīng)的Reducer。

一般而言,MapTask和ReduceTask是多對(duì)多的關(guān)系,假如有M個(gè)Mapper有N個(gè)Reducer。我們知道N個(gè)Reducer對(duì)應(yīng)著N個(gè)partition,所以每個(gè)Mapper都會(huì)被劃分成N個(gè)Partition,每個(gè)Reducer承擔(dān)著一個(gè)Partition部分的操作。這樣每一個(gè)Reducer從每個(gè)不同的Mapper內(nèi)拿來屬于自己的那部分?jǐn)?shù)據(jù),這樣每個(gè)Reducer就有M份不同Mapper的數(shù)據(jù),把M份數(shù)據(jù)合并在一起就是一個(gè)最終完整的Partition,有必要還會(huì)進(jìn)行排序,這時(shí)候才成為了Reducer的具體輸入數(shù)據(jù)。這個(gè)數(shù)據(jù)搬運(yùn)和重組的過程被叫做shuffle過程。shuffle這個(gè)過程開銷頗大,會(huì)占用較大的網(wǎng)絡(luò)流量,因?yàn)樯婕暗酱罅繑?shù)據(jù)的傳輸,shuffle過程也會(huì)有延遲,因?yàn)镸個(gè)Mapper的計(jì)算有快有慢,但是shuffle要所有的Mapper完成才能開始,Reduce又必須等shuffle完成才能開始,當(dāng)然這種延遲不是shuffle造成的,如果Reducer不需要全部Partition數(shù)據(jù)到位并排序,就不用與最慢的Mapper同步,這是排序付出的代價(jià)。

所以shuffle在MapReduce框架中起著非常重要的作用。我們先看shuffle的摘要: 獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114

public class Shuffle implements ShuffleConsumerPlugin, ExceptionReporter

private ShuffleConsumerPlugin.Context context;

private TaskAttemptID reduceId;

private JobConf jobConf;

private TaskUmbilicalProtocol umbilical;

private ShuffleSchedulerImpl scheduler;

private MergeManager merger;

private Task reduceTask; //Used for status updates

private Map localMapFiles;

public void init(ShuffleConsumerPlugin.Context context)

public RawKeyValueIterator run() throws IOException, InterruptedException

在ReduceTask.run中看到調(diào)用了shuffle.init,在run理創(chuàng)建了ShuffleSchedulerImpl和MergeManagerImpl對(duì)象。后面會(huì)講解就是是做什么用的。

之后就是對(duì)shuffle.run的調(diào)用,shuffle雖然有一個(gè)run但是并非是一個(gè)線程,只是用了這個(gè)名字而已。

我們看:ReduceTask.run->Shuffle.run

public RawKeyValueIterator run() throws IOException, InterruptedException {

int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,

MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());

int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);

// Start the map-completion events fetcher thread

final EventFetcher eventFetcher =

new EventFetcher(reduceId, umbilical, scheduler, this,

maxEventsToFetch);

eventFetcher.start();

//通過查看EventFetcher我們看到他繼承了Thread,所以他是一個(gè)線程

// Start the map-output fetcher threads

boolean isLocal = localMapFiles != null;

final int numFetchers = isLocal ? 1 :

jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);

Fetcher[] fetchers = new Fetcher[numFetchers];

//創(chuàng)建了一個(gè)線程池

if (isLocal) {

//如果Mapper和Reducer在同一臺(tái)機(jī)器上,就在本地fetche

fetchers[0] = new LocalFetcher(jobConf, reduceId, scheduler,

merger, reporter, metrics, this, reduceTask.getShuffleSecret(),

localMapFiles);

//LocalFetcher是對(duì)Fetcher的擴(kuò)展,也是線程。

fetchers[0].start();//本地Fecher只有一個(gè)

} else {

//Mapper集合Reducer不在同一個(gè)機(jī)器上,需要跨多個(gè)節(jié)點(diǎn)Fecher

for (int i=0; i < numFetchers; ++i) {

//啟動(dòng)所有的Fecher

fetchers[i] = new Fetcher(jobConf, reduceId, scheduler, merger,

reporter, metrics, this,

reduceTask.getShuffleSecret());

//創(chuàng)建Fecher線程

fetchers[i].start();

//跨節(jié)點(diǎn)的Fecher需要好多個(gè),都需要開啟

}

}

// Wait for shuffle to complete successfully

while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {

reporter.progress();

//等待所有的Fecher都完成,如果有超時(shí)情況就報(bào)告進(jìn)度

synchronized (this) {

if (throwable != null) {

throw new ShuffleError("error in shuffle in " + throwingThreadName,

throwable);

}

}

}

// Stop the event-fetcher thread

eventFetcher.shutDown();

//關(guān)閉eventFetcher,代表shuffle操作完成,所有的MapTask的數(shù)據(jù)都拷貝過來了

// Stop the map-output fetcher threads

for (Fetcher fetcher : fetchers) {

fetcher.shutDown();//關(guān)閉所有的fetcher。

}

// stop the scheduler

scheduler.close();

//也不需要shuffle的調(diào)度,所以關(guān)閉

copyPhase.complete(); // copy is already complete

//文件復(fù)制階段結(jié)束

以下就是Reduce階段的MergeSort了

taskStatus.setPhase(TaskStatus.Phase.SORT);

//完成排序

reduceTask.statusUpdate(umbilical);

//通過umbilical向MRAppMaster匯報(bào),更新狀態(tài)

// Finish the on-going merges...

RawKeyValueIterator kvIter = null;

try {

kvIter = merger.close();

//合并和排序,完成后返回一個(gè)隊(duì)列kvIter 。

} catch (Throwable e) {

throw new ShuffleError("Error while doing final merge " , e);

}

// Sanity check

synchronized (this) {

if (throwable != null) {

throw new ShuffleError("error in shuffle in " + throwingThreadName,

throwable);

}

}

return kvIter;

}

數(shù)據(jù)從MapTask轉(zhuǎn)移到ReduceTask就兩種方式,一MapTask送,二ReduceTask取,hadoop采用的是第二種方式,就是文件的復(fù)制。在Shuffle進(jìn)入run之前,RduceTask.run調(diào)用過他的init函數(shù)shuffleConsumerPlugin.init(shuffleContext),在init里創(chuàng)建了scheduler和用于合并排序的merge,進(jìn)入run后又創(chuàng)建了EventFetcher線程和若干個(gè)Fetcher線程。Fetcher的作用就是拿取,向MapTask節(jié)點(diǎn)提取數(shù)據(jù)。但是我們要清楚EventFetcher雖然也是Fetcher,但是提取的是event,不是數(shù)據(jù)本身。我們可以認(rèn)為它只是對(duì)Fetcher過程的一個(gè)事件的控制。

Fetcher線程的數(shù)量也不一定,Uber模式下,MapTask和ReduceTask在同一個(gè)節(jié)點(diǎn)上,并且只有一個(gè)MapTask,所以只有一個(gè)Fetcher就能夠完成,而且這個(gè)Fetcher是localFetcher。如果不是Uber模式可能會(huì)有很多MapTask并且一般和ReduceTask不在同一個(gè)節(jié)點(diǎn)。這時(shí)Fetcher的數(shù)量可以進(jìn)行配置,默認(rèn)有5個(gè)。數(shù)組fetchers就相當(dāng)于Fetcher的線程池。

創(chuàng)建了EventFetcher和Fetcher線程池后,進(jìn)入了while循環(huán),但是while循環(huán)什么都不做,一直等待,所以實(shí)際的操作都是在線程完成的,也就是通過EventFetcher和若干的Fetcher完成。EventFetcher起到了非常關(guān)鍵的樞紐的作用。

我們查看EventFetcher的源代碼摘要,我們提取關(guān)鍵的東西:

class EventFetcher extends Thread {

private final TaskAttemptID reduce;

private final TaskUmbilicalProtocol umbilical;

private final ShuffleScheduler scheduler;

private final int maxEventsToFetch;

public void run() {

int failures = 0;

LOG.info(reduce + " Thread started: " + getName());

try {

while (!stopped && !Thread.currentThread().isInterrupted()) {//線程沒有被打斷

try {

int numNewMaps = getMapCompletionEvents();

//獲取Map的完成的事件,接著我們看getMapCompletionEvents源代碼:

protected int getMapCompletionEvents()

throws IOException, InterruptedException {

int numNewMaps = 0;

TaskCompletionEvent events[] = null;

do {

MapTaskCompletionEventsUpdate update =

umbilical.getMapCompletionEvents(

(org.apache.hadoop.mapred.JobID)reduce.getJobID(),

fromEventIdx,

maxEventsToFetch,

(org.apache.hadoop.mapred.TaskAttemptID)reduce);

//匯報(bào)umbilical從MRAppMaster獲取Map完成的時(shí)間的報(bào)告

events = update.getMapTaskCompletionEvents();

//獲取有關(guān)具體的MapTask結(jié)束運(yùn)行的情況

LOG.debug("Got " + events.length + " map completion events from " +

fromEventIdx);

assert !update.shouldReset() : "Unexpected legacy state";

//做了一個(gè)斷言 獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114

// Update the last seen event ID

fromEventIdx += events.length;

// Process the TaskCompletionEvents:

// 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.

// 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop

// fetching from those maps.

// 3. Remove TIPFAILED maps from neededOutputs since we don't need their

// outputs at all.

for (TaskCompletionEvent event : events) {

//對(duì)于獲取的每個(gè)事件的報(bào)告

scheduler.resolve(event);

//這里使用了ShuffleSchedullerImpl.resolve函數(shù),源代碼如下:

public void resolve(TaskCompletionEvent event) {

switch (event.getTaskStatus()) {

case SUCCEEDED://如果成功

URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());//獲取其URI

addKnownMapOutput(u.getHost() + ":" + u.getPort(),

u.toString(),

event.getTaskAttemptId());

//記錄這個(gè)MapTask的節(jié)點(diǎn)主機(jī)記錄下來,供Fetcher使用,getBaseURI的源代碼:

static URI getBaseURI(TaskAttemptID reduceId, String url) {

StringBuffer baseUrl = new StringBuffer(url);

if (!url.endsWith("/")) {

baseUrl.append("/");

}

baseUrl.append("mapOutput?job=");

baseUrl.append(reduceId.getJobID());

baseUrl.append("&reduce=");

baseUrl.append(reduceId.getTaskID().getId());

baseUrl.append("&map=");

URI u = URI.create(baseUrl.toString());

return u;

獲取各種信息,然后添加都URI對(duì)象中。

}

回到源代碼

maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());

//最大的嘗試時(shí)間

break;

case FAILED:

case KILLED:

case OBSOLETE://如果MapTask運(yùn)行失敗

obsoleteMapOutput(event.getTaskAttemptId());//獲取TaskId

LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +

" map-task: '" + event.getTaskAttemptId() + "'");//寫日志

break;

case TIPFAILED://如果失敗

tipFailed(event.getTaskAttemptId().getTaskID());

LOG.info("Ignoring output of failed map TIP: '" +

event.getTaskAttemptId() + "'");//寫日志

break;

}

}

回到源代碼

if (TaskCompletionEvent.Status.SUCCEEDED == event.getTaskStatus()) {//如果事件成功

++numNewMaps;//增加map數(shù)量

}

}

} while (events.length == maxEventsToFetch);

return numNewMaps;

}

回到源代碼

failures = 0;

if (numNewMaps > 0) {

LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");

}

LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);

if (!Thread.currentThread().isInterrupted()) {

Thread.sleep(SLEEP_TIME);

}

} catch (InterruptedException e) {

LOG.info("EventFetcher is interrupted.. Returning");

return;

} catch (IOException ie) {

LOG.info("Exception in getting events", ie);

// check to see whether to abort

if (++failures >= MAX_RETRIES) {

throw new IOException("too many failures downloading events", ie);//失敗數(shù)量大于重試的數(shù)量

}

// sleep for a bit

if (!Thread.currentThread().isInterrupted()) {

Thread.sleep(RETRY_PERIOD);

}

}

}

} catch (InterruptedException e) {

return;

} catch (Throwable t) {

exceptionReporter.reportException(t);

return;

}

}

MapTask和ReduceTask沒有直接的關(guān)系,MapTask不知道ReduceTask在哪些節(jié)點(diǎn)上,它只是把進(jìn)度的時(shí)間報(bào)告給MRAppMaster。ReduceTask通過“臍帶”執(zhí)行g(shù)etMapCompletionEvents操作想MRAppMaster獲取MapTask結(jié)束運(yùn)行的時(shí)間報(bào)告。有個(gè)別的MapTask可能會(huì)失敗,但是絕大多數(shù)都會(huì)成功,只要成功的就通過Fetcher去索取輸出數(shù)據(jù),這個(gè)信息就是通過shcheduler完成的也就是ShuffleSchedulerImpl對(duì)象,ShuffleSchedulerImpl對(duì)象并不多,只是個(gè)普通的對(duì)象。

fetchers就像線程池,里面有若干線程(默認(rèn)有5個(gè)),這些線程等待EventFetcher的通知,一旦有MapTask完成就前往提取數(shù)據(jù)。

獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114
大數(shù)據(jù) : Hadoop reduce階段

我們看Fetcher線程類的run方法:

public void run() {

try {

while (!stopped && !Thread.currentThread().isInterrupted()) {

MapHost host = null;

try {

// If merge is on, block

merger.waitForResource();

// Get a host to shuffle from

host = scheduler.getHost();

//從scheduler獲取一個(gè)已經(jīng)成功完成的MapTask的節(jié)點(diǎn)。

metrics.threadBusy();

//線程變成繁忙狀態(tài)

// Shuffle

copyFromHost(host);

//開始復(fù)制這個(gè)節(jié)點(diǎn)的數(shù)據(jù)

} finally {

if (host != null) {//maphost還有運(yùn)行中的

scheduler.freeHost(host);

//狀態(tài)設(shè)置成空閑狀態(tài),等待其完成。

metrics.threadFree();

}

}

}

} catch (InterruptedException ie) {

return;

} catch (Throwable t) {

exceptionReporter.reportException(t);

}

}

這里的重點(diǎn)是copyFromHost獲取數(shù)據(jù)的函數(shù)。

protected void copyFromHost(MapHost host) throws IOException {

// reset retryStartTime for a new host

//這是在ReduceTask的節(jié)點(diǎn)上運(yùn)行的

retryStartTime = 0;

// Get completed maps on 'host'

List<TaskAttemptID> maps = scheduler.getMapsForHost(host);

//獲取目標(biāo)節(jié)點(diǎn)上的MapTask集合。

// Sanity check to catch hosts with only 'OBSOLETE' maps,

// especially at the tail of large jobs

if (maps.size() == 0) {

return;//沒有完成的直接返回

}

if(LOG.isDebugEnabled()) {

LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "

  • maps);

}

// List of maps to be fetched yet

Set remaining = new HashSet(maps);

//已經(jīng)完成、等待shuffle的MapTask集合。

// Construct the url and connect

DataInputStream input = null;

URL url = getMapOutputURL(host, maps);

//生成MapTask所在節(jié)點(diǎn)的URL,下面要看getMapOutputURL源碼:

private URL getMapOutputURL(MapHost host, Collection maps

) throws MalformedURLException {

// Get the base url

StringBuffer url = new StringBuffer(host.getBaseUrl());

boolean first = true;

for (TaskAttemptID mapId : maps) {

if (!first) {

url.append(",");

}

url.append(mapId);//在URL后面加上mapid

first = false;

}

LOG.debug("MapOutput URL for " + host + " -> " + url.toString());

//寫日志

return new URL(url.toString());

//返回URL

}

回到主代碼:

try {

setupConnectionsWithRetry(host, remaining, url);

//和對(duì)方主機(jī)建立HTTP連接,setupConnectionsWithRetry使用了openConnectionWithRetry函數(shù)打開鏈接。

openConnectionWithRetry(host, remaining, url);

這段源代碼有使用了openConnection(url);方式,繼續(xù)查看。

如下是鏈接的主要過程:

protected synchronized void openConnection(URL url)

throws IOException {

HttpURLConnection conn = (HttpURLConnection) url.openConnection();

//使用的是HTTPURL進(jìn)行連接

if (sslShuffle) {//如果是有信任證書的

HttpsURLConnection httpsConn = (HttpsURLConnection) conn;

//強(qiáng)轉(zhuǎn)conn類型

try {

httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());//添加一個(gè)證書socket的工廠

} catch (GeneralSecurityException ex) {

throw new IOException(ex);

}

httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());

}

connection = conn;

}

在setupConnectionsWithRetry中繼續(xù)寫到:

setupShuffleConnection(encHash);

//建立了Shuffle鏈接

connect(connection, connectionTimeout);

// verify that the thread wasn't stopped during calls to connect

if (stopped) {

return;

}

verifyConnection(url, msgToEncode, encHash);

}

//至此連接通過。

if (stopped) {

abortConnect(host, remaining);

//這里邊是關(guān)閉連接,可以點(diǎn)進(jìn)去看一下,滿足列表和等待的兩個(gè)條件

return;

}

} catch (IOException ie) {

boolean connectExcpt = ie instanceof ConnectException;

ioErrs.increment(1);

LOG.warn("Failed to connect to " + host + " with " + remaining.size() +

" map outputs", ie);

回到主代碼

input = new DataInputStream(connection.getInputStream());

//實(shí)例一個(gè)輸入流對(duì)象。

try {

// Loop through available map-outputs and fetch them

// On any error, faildTasks is not null and we exit

// after putting back the remaining maps to the

// yet_to_be_fetched list and marking the failed tasks.

TaskAttemptID[] failedTasks = null;

while (!remaining.isEmpty() && failedTasks == null) {

//如果需要fetcher的列表不空,并且失敗的task數(shù)量沒有

try {

failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);

//復(fù)制數(shù)據(jù)出來copyMapOutput的源代碼如下:

try {

ShuffleHeader header = new ShuffleHeader();

header.readFields(input);

mapId = TaskAttemptID.forName(header.mapId);

//獲取mapID

compressedLength = header.compressedLength;

decompressedLength = header.uncompressedLength;

forReduce = header.forReduce;

} catch (IllegalArgumentException e) {

badIdErrs.increment(1);

LOG.warn("Invalid map id ", e);

//Don't know which one was bad, so consider all of them as bad

return remaining.toArray(new TaskAttemptID[remaining.size()]);

}

InputStream is = input;

is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength);

compressedLength -= CryptoUtils.cryptoPadding(jobConf);

decompressedLength -= CryptoUtils.cryptoPadding(jobConf);

//如果需要解壓或解密

// Do some basic sanity verification

if (!verifySanity(compressedLength, decompressedLength, forReduce,

remaining, mapId)) {

return new TaskAttemptID[] {mapId};

}

if(LOG.isDebugEnabled()) {

LOG.debug("header: " + mapId + ", len: " + compressedLength +

", decomp len: " + decompressedLength);

}

try {

mapOutput = merger.reserve(mapId, decompressedLength, id);

//為merge預(yù)留一個(gè)MapOutput:是內(nèi)存還是磁盤上。

} catch (IOException ioe) {

// kill this reduce attempt

ioErrs.increment(1);

scheduler.reportLocalError(ioe);

//報(bào)告錯(cuò)誤

return EMPTY_ATTEMPT_ID_ARRAY;

}

// Check if we can shuffle now ...

if (mapOutput == null) {

LOG.info("fetcher#" + id + " - MergeManager returned status WAIT ...");

//Not an error but wait to process data.

return EMPTY_ATTEMPT_ID_ARRAY;

}

// The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError

// on decompression failures. Catching and re-throwing as IOException

// to allow fetch failure logic to be processed

try {

// Go!

LOG.info("fetcher#" + id + " about to shuffle output of map "

  • mapOutput.getMapId() + " decomp: " + decompressedLength

  • " len: " + compressedLength + " to " + mapOutput.getDescription());

mapOutput.shuffle(host, is, compressedLength, decompressedLength,

metrics, reporter);

//跨節(jié)點(diǎn)把Mapper的文件內(nèi)容拷貝到reduce的內(nèi)存或者磁盤上。

} catch (java.lang.InternalError e) {

LOG.warn("Failed to shuffle for fetcher#"+id, e);

throw new IOException(e);

}

// Inform the shuffle scheduler

long endTime = Time.monotonicNow();

// Reset retryStartTime as map task make progress if retried before.

retryStartTime = 0;

scheduler.copySucceeded(mapId, host, compressedLength,

startTime, endTime, mapOutput);

//告訴調(diào)度器完成了一個(gè)節(jié)點(diǎn)的Map輸出的文件拷貝。

remaining.remove(mapId);

//這個(gè)MapTask的輸出已經(jīng)shuffle完畢

metrics.successFetch();

return null;后面的異常失敗信息我們不管。

這里的mapOutput是用來容納MapTask輸出文件的存儲(chǔ)空間,根據(jù)輸出文件的內(nèi)容大小和內(nèi)存的情況,可以是內(nèi)存的Output也可以是DiskOutput。 如果是內(nèi)存需要預(yù)約,因?yàn)椴恢挂粋€(gè)Fetcher。我們以InMemoryMapOutput為例。

代碼結(jié)構(gòu);

Fetcher.run-->copyFromHost-->copyMapOutput-->merger.reserve(MergeManagerImpl.reserve)-->InmemoryMapOutput.shuffle

public void shuffle(MapHost host, InputStream input,

long compressedLength, long decompressedLength,

ShuffleClientMetrics metrics,

Reporter reporter) throws IOException {

//跨節(jié)點(diǎn)從Mapper拷貝spill文件

IFileInputStream checksumIn =

new IFileInputStream(input, compressedLength, conf);

//校驗(yàn)和的輸入流

input = checksumIn;

// Are map-outputs compressed?

if (codec != null) {

//如果涉及到了壓縮

decompressor.reset();

//重啟解壓器

input = codec.createInputStream(input, decompressor);

//加了解壓器的輸入流

}

try {

IOUtils.readFully(input, memory, 0, memory.length);

//從Mapper方把特定的Partition數(shù)據(jù)讀入Reducer的內(nèi)存緩沖區(qū)。

metrics.inputBytes(memory.length);

reporter.progress();//匯報(bào)進(jìn)度

LOG.info("Read " + memory.length + " bytes from map-output for " +

getMapId());

/**

  • We've gotten the amount of data we were expecting. Verify the

  • decompressor has nothing more to offer. This action also forces the

  • decompressor to read any trailing bytes that weren't critical

  • for decompression, which is necessary to keep the stream

  • in sync.

*/

if (input.read() >= 0 ) {

throw new IOException("Unexpected extra bytes from input stream for " +

getMapId());

}

} catch (IOException ioe) {

// Close the streams

IOUtils.cleanup(LOG, input);

// Re-throw

throw ioe;

} finally {

CodecPool.returnDecompressor(decompressor);

//釋放解壓器

}

}

從對(duì)方把spill文件中屬于本partition數(shù)據(jù)復(fù)制過來,回到copyFromHost中,通過scheduler.copySuccessed告知scheduler,并把這個(gè)MapTask的ID從remaining集合中刪除,進(jìn)入下一個(gè)循環(huán),復(fù)制下一個(gè)MapTask數(shù)據(jù)。直到把所有的屬于本Partition的數(shù)據(jù)都復(fù)制過來。

以上是Reducer端Fetcher的過程,它向Mapper端發(fā)送HTTP GET請(qǐng)求,下載文件。在MapTask就有一個(gè)與之對(duì)應(yīng)的Server,這個(gè)網(wǎng)絡(luò)協(xié)議的源代碼不做深究,課下有興趣自己研究。 獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI