您好,登錄后才能下訂單哦!
Mapreduce中由于sort的存在,MapTask和ReduceTask直接是工作流的架構(gòu)。而不是數(shù)據(jù)流的架構(gòu)。在MapTask尚未結(jié)束,其輸出結(jié)果尚未排序及合并前,ReduceTask是又有數(shù)據(jù)輸入的,因此即使ReduceTask已經(jīng)創(chuàng)建也只能睡眠等待MapTask完成。從而可以從MapTask節(jié)點(diǎn)獲取數(shù)據(jù)。一個(gè)MapTask最終的數(shù)據(jù)輸出是一個(gè)合并的spill文件,可以通過Web地址訪問。所以reduceTask一般在MapTask快要完成的時(shí)候才啟動(dòng)。啟動(dòng)早了浪費(fèi)container資源。
ReduceTask是個(gè)線程,這個(gè)線程運(yùn)行在YarnChild的Java虛擬機(jī)上,我們從ReduceTask.run開始看Reduce階段。 獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
if (isMapOrReduce()) {
/添加reduce過程需要經(jīng)過的幾個(gè)階段。以便通知TaskTracker目前運(yùn) 行的情況/
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
// start thread that will handle communication with parent
TaskReporter reporter = startReporter(umbilical);
// 設(shè)置并啟動(dòng)reporter進(jìn)程以便和TaskTracker進(jìn)行交流
boolean useNewApi = job.getUseNewReducer();
//在job client中初始化job時(shí),默認(rèn)就是用新的API,詳見Job.setUseNewAPI()方法
initialize(job, getJobID(), reporter, useNewApi);
/用來初始化任務(wù),主要是進(jìn)行一些和任務(wù)輸出相關(guān)的設(shè)置,比如創(chuàng)建commiter,設(shè)置工作目錄等/
// check if it is a cleanupJobTask
/以下4個(gè)if語句均是根據(jù)任務(wù)類型的不同進(jìn)行相應(yīng)的操作,這些方 法均是Task類的方法,所以與任務(wù)是MapTask還是ReduceTask無關(guān)/
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;//只是為了JobCleanup,做完就停
}
if () {
runJobSetupTask(umbilical, reporter);
return;
//主要是創(chuàng)建工作目錄的FileSystem對(duì)象
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
//設(shè)置任務(wù)目前所處的階段為結(jié)束階段,并且刪除工作目錄
}
下面才是真正要成為reducer
// Initialize the codec
codec = initCodec();
RawKeyValueIterator rIter = null;
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
//如果需要就創(chuàng)建combineCollector
Classextends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
//配置文件找mapreduce.job.reduce.shuffle.consumer.plugin.class默認(rèn)是shuffle.class
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
//創(chuàng)建shuffle類對(duì)象
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
//創(chuàng)建context對(duì)象,ShuffleConsumerPlugin.Context
shuffleConsumerPlugin.init(shuffleContext);
//這里調(diào)用的起始是shuffle的init函數(shù),重點(diǎn)摘要如下。
this.localMapFiles = context.getLocalMapFiles();
scheduler = new ShuffleSchedulerImpl(jobConf, taskStatus, reduceId,
this, copyPhase, context.getShuffledMapsCounter(),
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
//創(chuàng)建shuffle所需的調(diào)度器
merger = createMergeManager(context);
//創(chuàng)建shuffle內(nèi)部的merge,createMergeManager里面源碼:
return new MergeManagerImpl(reduceId, jobConf, context.getLocalFS(),
context.getLocalDirAllocator(), reporter, context.getCodec(),
context.getCombinerClass(), context.getCombineCollector(),
context.getSpilledRecordsCounter(),
context.getReduceCombineInputCounter(),
context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
context.getMapOutputFile());
//創(chuàng)建MergeMnagerImpl對(duì)象和Merge線程
rIter = shuffleConsumerPlugin.run();
//從各個(gè)Mapper復(fù)制其輸出文件,并加以合并排序,等待直到完成為止
// free up the data structures
mapOutputFilesOnDisk.clear();
sortPhase.complete();
//排序階段完成
setPhase(TaskStatus.Phase.REDUCE);
//進(jìn)入reduce階段
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();
//3.Reduce 1.Reduce任務(wù)的最后一個(gè)階段。它會(huì)準(zhǔn)備好Map的 keyClass("mapred.output.key.class""mapred.mapoutput.key.class"),valueClass("mapred.mapoutput.value.class"或"mapred.output.value.class")和 Comparator (“mapred.output.value.groupfn.class”或“mapred.output.key.comparator.class”)
if (useNewApi) {
//2.根據(jù)參數(shù)useNewAPI判斷執(zhí)行runNewReduce還是runOldReduce。分析潤runNewReduce
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
//0.像報(bào)告進(jìn)程書寫一些信息,1.獲得一個(gè)TaskAttemptContext對(duì)象。通過這個(gè)對(duì)象創(chuàng)建reduce、output及用于跟蹤的統(tǒng)計(jì)output的RecordWrit、最后創(chuàng)建用于收集reduce結(jié)果的Context,2.reducer.run(reducerContext)開始執(zhí)行reduce
} else {//老API
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
shuffleConsumerPlugin.close();
done(umbilical, reporter);
}
(1)reduce分為三個(gè)階段(copy就是遠(yuǎn)程拷貝Map的輸出數(shù)據(jù)、sort就是對(duì)所有的數(shù)據(jù)做排序、reduce做聚集就是我們自己寫的reducer),為這三個(gè)階段分別設(shè)置Progress,用來和TaskTracker通信報(bào)道狀態(tài)。
(2)上面代碼的15-40行和MapReduce的MapTask任務(wù)的運(yùn)行源碼級(jí)分析中對(duì)應(yīng)部分基本相同,可參考之;
(3)codec = initCodec()這句是檢查map的輸出是否是壓縮的,壓縮的則返回壓縮codec實(shí)例,否則返回null,這里討論不壓縮的;
(4)我們討論完全分布式的hadoop,即isLocal==false,然后構(gòu)造一個(gè)ReduceCopier對(duì)象reduceCopier,并調(diào)用reduceCopier.fetchOutputs()方法拷貝各個(gè)Mapper的輸出,到本地;
(5)然后copy階段完成,設(shè)置接下來的階段是sort階段,更新狀態(tài)信息;
(6)根據(jù)isLocal來選擇KV迭代器,完全分布式的會(huì)使用reduceCopier.createKVIterator(job, rfs, reporter)作為KV迭代器;
(7)sort階段完成,設(shè)置接下來的階段是reduce階段,更新狀態(tài)信息;
(8)然后獲取一些配置信息,并根據(jù)是否使用新API選擇不同的處理方式,這里是新的API,調(diào)用runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass)會(huì)執(zhí)行reducer;
(9)done(umbilical, reporter)這個(gè)方法用于做結(jié)束任務(wù)的一些清理工作:更新計(jì)數(shù)器updateCounters();如果任務(wù)需要提交,設(shè)置Taks狀態(tài)為COMMIT_PENDING,并利用TaskUmbilicalProtocol,匯報(bào)Task完成,等待提交,然后調(diào)用commit提交任務(wù);設(shè)置任務(wù)結(jié)束標(biāo)志位;結(jié)束Reporter通信線程;發(fā)送最后一次統(tǒng)計(jì)報(bào)告(通過sendLastUpdate方法);利用TaskUmbilicalProtocol報(bào)告結(jié)束狀態(tài)(通過sendDone方法)。
有些人將Reduce Task分為了5個(gè)階段:一、shuffle階段:也稱為Copy階段,就是從各個(gè)MapTask上遠(yuǎn)程拷貝一片數(shù)據(jù),如果大小超過一定閾值就寫到磁盤,否則放入內(nèi)存;二、Merge階段:在遠(yuǎn)程拷貝數(shù)據(jù)的同時(shí),Reduce Task啟動(dòng)了兩個(gè)后臺(tái)線程對(duì)內(nèi)存和磁盤上的文件進(jìn)行合并,防止內(nèi)存使用過多和磁盤文件過多;三、sort階段:用戶編寫的reduce方法的輸入數(shù)據(jù)是按key進(jìn)行聚集的,需要對(duì)copy過來的數(shù)據(jù)排序,這里用的是歸并排序,因?yàn)镸ap Task的結(jié)果是有序的;四、Reduce階段:將每組數(shù)據(jù)依次交給用戶編寫的Reduce方法處理;五、write階段:就是將結(jié)果寫入HDFS。
上面的5個(gè)階段分的比較細(xì)了,代碼里分為3個(gè)階段copy、sort、reduce,我們?cè)趀clipse運(yùn)行MR程序時(shí),控制臺(tái)看到的reduce階段的百分比就分為3個(gè)階段各占33.3%。
這里的shuffleConsumerPlugin是實(shí)現(xiàn)了ShuffleConsumerPlugin的某個(gè)類對(duì)象。具體可以通過配置文件mapreduce.job.reduce.shuffle.consumer.plugin.class選項(xiàng)設(shè)置,默認(rèn)情況下是使用shuffle。我們?cè)诖a中分析過完成shuffleConsumerPlugin.run,通常是shuffle.run,因?yàn)橛辛诉@個(gè)過程Mapper的合成的spill文件才能通過HTTP協(xié)議傳輸?shù)絉educer端。有了數(shù)據(jù)才能進(jìn)行runNewReducer或者runOldReducer。可以說shuffle對(duì)象就是MapTask的搬運(yùn)工。而且shuffle的搬運(yùn)方式不是一遍搬運(yùn)一遍Reducer處理,而是要把MapTask所有的數(shù)據(jù)都搬運(yùn)過來,并且進(jìn)行合并排序之后才開始提供給對(duì)應(yīng)的Reducer。
一般而言,MapTask和ReduceTask是多對(duì)多的關(guān)系,假如有M個(gè)Mapper有N個(gè)Reducer。我們知道N個(gè)Reducer對(duì)應(yīng)著N個(gè)partition,所以每個(gè)Mapper都會(huì)被劃分成N個(gè)Partition,每個(gè)Reducer承擔(dān)著一個(gè)Partition部分的操作。這樣每一個(gè)Reducer從每個(gè)不同的Mapper內(nèi)拿來屬于自己的那部分?jǐn)?shù)據(jù),這樣每個(gè)Reducer就有M份不同Mapper的數(shù)據(jù),把M份數(shù)據(jù)合并在一起就是一個(gè)最終完整的Partition,有必要還會(huì)進(jìn)行排序,這時(shí)候才成為了Reducer的具體輸入數(shù)據(jù)。這個(gè)數(shù)據(jù)搬運(yùn)和重組的過程被叫做shuffle過程。shuffle這個(gè)過程開銷頗大,會(huì)占用較大的網(wǎng)絡(luò)流量,因?yàn)樯婕暗酱罅繑?shù)據(jù)的傳輸,shuffle過程也會(huì)有延遲,因?yàn)镸個(gè)Mapper的計(jì)算有快有慢,但是shuffle要所有的Mapper完成才能開始,Reduce又必須等shuffle完成才能開始,當(dāng)然這種延遲不是shuffle造成的,如果Reducer不需要全部Partition數(shù)據(jù)到位并排序,就不用與最慢的Mapper同步,這是排序付出的代價(jià)。
所以shuffle在MapReduce框架中起著非常重要的作用。我們先看shuffle的摘要: 獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114
public class Shuffle implements ShuffleConsumerPlugin, ExceptionReporter
private ShuffleConsumerPlugin.Context context;
private TaskAttemptID reduceId;
private JobConf jobConf;
private TaskUmbilicalProtocol umbilical;
private ShuffleSchedulerImpl scheduler;
private MergeManager merger;
private Task reduceTask; //Used for status updates
private Map localMapFiles;
public void init(ShuffleConsumerPlugin.Context context)
public RawKeyValueIterator run() throws IOException, InterruptedException
在ReduceTask.run中看到調(diào)用了shuffle.init,在run理創(chuàng)建了ShuffleSchedulerImpl和MergeManagerImpl對(duì)象。后面會(huì)講解就是是做什么用的。
之后就是對(duì)shuffle.run的調(diào)用,shuffle雖然有一個(gè)run但是并非是一個(gè)線程,只是用了這個(gè)名字而已。
我們看:ReduceTask.run->Shuffle.run
public RawKeyValueIterator run() throws IOException, InterruptedException {
int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
// Start the map-completion events fetcher thread
final EventFetcher eventFetcher =
new EventFetcher(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
eventFetcher.start();
//通過查看EventFetcher我們看到他繼承了Thread,所以他是一個(gè)線程
// Start the map-output fetcher threads
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher[] fetchers = new Fetcher[numFetchers];
//創(chuàng)建了一個(gè)線程池
if (isLocal) {
//如果Mapper和Reducer在同一臺(tái)機(jī)器上,就在本地fetche
fetchers[0] = new LocalFetcher(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
//LocalFetcher是對(duì)Fetcher的擴(kuò)展,也是線程。
fetchers[0].start();//本地Fecher只有一個(gè)
} else {
//Mapper集合Reducer不在同一個(gè)機(jī)器上,需要跨多個(gè)節(jié)點(diǎn)Fecher
for (int i=0; i < numFetchers; ++i) {
//啟動(dòng)所有的Fecher
fetchers[i] = new Fetcher(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
//創(chuàng)建Fecher線程
fetchers[i].start();
//跨節(jié)點(diǎn)的Fecher需要好多個(gè),都需要開啟
}
}
// Wait for shuffle to complete successfully
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
reporter.progress();
//等待所有的Fecher都完成,如果有超時(shí)情況就報(bào)告進(jìn)度
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
}
// Stop the event-fetcher thread
eventFetcher.shutDown();
//關(guān)閉eventFetcher,代表shuffle操作完成,所有的MapTask的數(shù)據(jù)都拷貝過來了
// Stop the map-output fetcher threads
for (Fetcher fetcher : fetchers) {
fetcher.shutDown();//關(guān)閉所有的fetcher。
}
// stop the scheduler
scheduler.close();
//也不需要shuffle的調(diào)度,所以關(guān)閉
copyPhase.complete(); // copy is already complete
//文件復(fù)制階段結(jié)束
以下就是Reduce階段的MergeSort了
taskStatus.setPhase(TaskStatus.Phase.SORT);
//完成排序
reduceTask.statusUpdate(umbilical);
//通過umbilical向MRAppMaster匯報(bào),更新狀態(tài)
// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
kvIter = merger.close();
//合并和排序,完成后返回一個(gè)隊(duì)列kvIter 。
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge " , e);
}
// Sanity check
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
return kvIter;
}
數(shù)據(jù)從MapTask轉(zhuǎn)移到ReduceTask就兩種方式,一MapTask送,二ReduceTask取,hadoop采用的是第二種方式,就是文件的復(fù)制。在Shuffle進(jìn)入run之前,RduceTask.run調(diào)用過他的init函數(shù)shuffleConsumerPlugin.init(shuffleContext),在init里創(chuàng)建了scheduler和用于合并排序的merge,進(jìn)入run后又創(chuàng)建了EventFetcher線程和若干個(gè)Fetcher線程。Fetcher的作用就是拿取,向MapTask節(jié)點(diǎn)提取數(shù)據(jù)。但是我們要清楚EventFetcher雖然也是Fetcher,但是提取的是event,不是數(shù)據(jù)本身。我們可以認(rèn)為它只是對(duì)Fetcher過程的一個(gè)事件的控制。
Fetcher線程的數(shù)量也不一定,Uber模式下,MapTask和ReduceTask在同一個(gè)節(jié)點(diǎn)上,并且只有一個(gè)MapTask,所以只有一個(gè)Fetcher就能夠完成,而且這個(gè)Fetcher是localFetcher。如果不是Uber模式可能會(huì)有很多MapTask并且一般和ReduceTask不在同一個(gè)節(jié)點(diǎn)。這時(shí)Fetcher的數(shù)量可以進(jìn)行配置,默認(rèn)有5個(gè)。數(shù)組fetchers就相當(dāng)于Fetcher的線程池。
創(chuàng)建了EventFetcher和Fetcher線程池后,進(jìn)入了while循環(huán),但是while循環(huán)什么都不做,一直等待,所以實(shí)際的操作都是在線程完成的,也就是通過EventFetcher和若干的Fetcher完成。EventFetcher起到了非常關(guān)鍵的樞紐的作用。
我們查看EventFetcher的源代碼摘要,我們提取關(guān)鍵的東西:
class EventFetcher extends Thread {
private final TaskAttemptID reduce;
private final TaskUmbilicalProtocol umbilical;
private final ShuffleScheduler scheduler;
private final int maxEventsToFetch;
public void run() {
int failures = 0;
LOG.info(reduce + " Thread started: " + getName());
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {//線程沒有被打斷
try {
int numNewMaps = getMapCompletionEvents();
//獲取Map的完成的事件,接著我們看getMapCompletionEvents源代碼:
protected int getMapCompletionEvents()
throws IOException, InterruptedException {
int numNewMaps = 0;
TaskCompletionEvent events[] = null;
do {
MapTaskCompletionEventsUpdate update =
umbilical.getMapCompletionEvents(
(org.apache.hadoop.mapred.JobID)reduce.getJobID(),
fromEventIdx,
maxEventsToFetch,
(org.apache.hadoop.mapred.TaskAttemptID)reduce);
//匯報(bào)umbilical從MRAppMaster獲取Map完成的時(shí)間的報(bào)告
events = update.getMapTaskCompletionEvents();
//獲取有關(guān)具體的MapTask結(jié)束運(yùn)行的情況
LOG.debug("Got " + events.length + " map completion events from " +
fromEventIdx);
assert !update.shouldReset() : "Unexpected legacy state";
//做了一個(gè)斷言 獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114
// Update the last seen event ID
fromEventIdx += events.length;
// Process the TaskCompletionEvents:
// 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
// 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
// fetching from those maps.
// 3. Remove TIPFAILED maps from neededOutputs since we don't need their
// outputs at all.
for (TaskCompletionEvent event : events) {
//對(duì)于獲取的每個(gè)事件的報(bào)告
scheduler.resolve(event);
//這里使用了ShuffleSchedullerImpl.resolve函數(shù),源代碼如下:
public void resolve(TaskCompletionEvent event) {
switch (event.getTaskStatus()) {
case SUCCEEDED://如果成功
URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());//獲取其URI
addKnownMapOutput(u.getHost() + ":" + u.getPort(),
u.toString(),
event.getTaskAttemptId());
//記錄這個(gè)MapTask的節(jié)點(diǎn)主機(jī)記錄下來,供Fetcher使用,getBaseURI的源代碼:
static URI getBaseURI(TaskAttemptID reduceId, String url) {
StringBuffer baseUrl = new StringBuffer(url);
if (!url.endsWith("/")) {
baseUrl.append("/");
}
baseUrl.append("mapOutput?job=");
baseUrl.append(reduceId.getJobID());
baseUrl.append("&reduce=");
baseUrl.append(reduceId.getTaskID().getId());
baseUrl.append("&map=");
URI u = URI.create(baseUrl.toString());
return u;
獲取各種信息,然后添加都URI對(duì)象中。
}
回到源代碼
maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());
//最大的嘗試時(shí)間
break;
case FAILED:
case KILLED:
case OBSOLETE://如果MapTask運(yùn)行失敗
obsoleteMapOutput(event.getTaskAttemptId());//獲取TaskId
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
" map-task: '" + event.getTaskAttemptId() + "'");//寫日志
break;
case TIPFAILED://如果失敗
tipFailed(event.getTaskAttemptId().getTaskID());
LOG.info("Ignoring output of failed map TIP: '" +
event.getTaskAttemptId() + "'");//寫日志
break;
}
}
回到源代碼
if (TaskCompletionEvent.Status.SUCCEEDED == event.getTaskStatus()) {//如果事件成功
++numNewMaps;//增加map數(shù)量
}
}
} while (events.length == maxEventsToFetch);
return numNewMaps;
}
回到源代碼
failures = 0;
if (numNewMaps > 0) {
LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");
}
LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(SLEEP_TIME);
}
} catch (InterruptedException e) {
LOG.info("EventFetcher is interrupted.. Returning");
return;
} catch (IOException ie) {
LOG.info("Exception in getting events", ie);
// check to see whether to abort
if (++failures >= MAX_RETRIES) {
throw new IOException("too many failures downloading events", ie);//失敗數(shù)量大于重試的數(shù)量
}
// sleep for a bit
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(RETRY_PERIOD);
}
}
}
} catch (InterruptedException e) {
return;
} catch (Throwable t) {
exceptionReporter.reportException(t);
return;
}
}
MapTask和ReduceTask沒有直接的關(guān)系,MapTask不知道ReduceTask在哪些節(jié)點(diǎn)上,它只是把進(jìn)度的時(shí)間報(bào)告給MRAppMaster。ReduceTask通過“臍帶”執(zhí)行g(shù)etMapCompletionEvents操作想MRAppMaster獲取MapTask結(jié)束運(yùn)行的時(shí)間報(bào)告。有個(gè)別的MapTask可能會(huì)失敗,但是絕大多數(shù)都會(huì)成功,只要成功的就通過Fetcher去索取輸出數(shù)據(jù),這個(gè)信息就是通過shcheduler完成的也就是ShuffleSchedulerImpl對(duì)象,ShuffleSchedulerImpl對(duì)象并不多,只是個(gè)普通的對(duì)象。
fetchers就像線程池,里面有若干線程(默認(rèn)有5個(gè)),這些線程等待EventFetcher的通知,一旦有MapTask完成就前往提取數(shù)據(jù)。
獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114
我們看Fetcher線程類的run方法:
public void run() {
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
MapHost host = null;
try {
// If merge is on, block
merger.waitForResource();
// Get a host to shuffle from
host = scheduler.getHost();
//從scheduler獲取一個(gè)已經(jīng)成功完成的MapTask的節(jié)點(diǎn)。
metrics.threadBusy();
//線程變成繁忙狀態(tài)
// Shuffle
copyFromHost(host);
//開始復(fù)制這個(gè)節(jié)點(diǎn)的數(shù)據(jù)
} finally {
if (host != null) {//maphost還有運(yùn)行中的
scheduler.freeHost(host);
//狀態(tài)設(shè)置成空閑狀態(tài),等待其完成。
metrics.threadFree();
}
}
}
} catch (InterruptedException ie) {
return;
} catch (Throwable t) {
exceptionReporter.reportException(t);
}
}
這里的重點(diǎn)是copyFromHost獲取數(shù)據(jù)的函數(shù)。
protected void copyFromHost(MapHost host) throws IOException {
// reset retryStartTime for a new host
//這是在ReduceTask的節(jié)點(diǎn)上運(yùn)行的
retryStartTime = 0;
// Get completed maps on 'host'
List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
//獲取目標(biāo)節(jié)點(diǎn)上的MapTask集合。
// Sanity check to catch hosts with only 'OBSOLETE' maps,
// especially at the tail of large jobs
if (maps.size() == 0) {
return;//沒有完成的直接返回
}
if(LOG.isDebugEnabled()) {
LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
}
// List of maps to be fetched yet
Set remaining = new HashSet(maps);
//已經(jīng)完成、等待shuffle的MapTask集合。
// Construct the url and connect
DataInputStream input = null;
URL url = getMapOutputURL(host, maps);
//生成MapTask所在節(jié)點(diǎn)的URL,下面要看getMapOutputURL源碼:
private URL getMapOutputURL(MapHost host, Collection maps
) throws MalformedURLException {
// Get the base url
StringBuffer url = new StringBuffer(host.getBaseUrl());
boolean first = true;
for (TaskAttemptID mapId : maps) {
if (!first) {
url.append(",");
}
url.append(mapId);//在URL后面加上mapid
first = false;
}
LOG.debug("MapOutput URL for " + host + " -> " + url.toString());
//寫日志
return new URL(url.toString());
//返回URL
}
回到主代碼:
try {
setupConnectionsWithRetry(host, remaining, url);
//和對(duì)方主機(jī)建立HTTP連接,setupConnectionsWithRetry使用了openConnectionWithRetry函數(shù)打開鏈接。
openConnectionWithRetry(host, remaining, url);
這段源代碼有使用了openConnection(url);方式,繼續(xù)查看。
如下是鏈接的主要過程:
protected synchronized void openConnection(URL url)
throws IOException {
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
//使用的是HTTPURL進(jìn)行連接
if (sslShuffle) {//如果是有信任證書的
HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
//強(qiáng)轉(zhuǎn)conn類型
try {
httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());//添加一個(gè)證書socket的工廠
} catch (GeneralSecurityException ex) {
throw new IOException(ex);
}
httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
}
connection = conn;
}
在setupConnectionsWithRetry中繼續(xù)寫到:
setupShuffleConnection(encHash);
//建立了Shuffle鏈接
connect(connection, connectionTimeout);
// verify that the thread wasn't stopped during calls to connect
if (stopped) {
return;
}
verifyConnection(url, msgToEncode, encHash);
}
//至此連接通過。
if (stopped) {
abortConnect(host, remaining);
//這里邊是關(guān)閉連接,可以點(diǎn)進(jìn)去看一下,滿足列表和等待的兩個(gè)條件
return;
}
} catch (IOException ie) {
boolean connectExcpt = ie instanceof ConnectException;
ioErrs.increment(1);
LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
" map outputs", ie);
回到主代碼
input = new DataInputStream(connection.getInputStream());
//實(shí)例一個(gè)輸入流對(duì)象。
try {
// Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
// yet_to_be_fetched list and marking the failed tasks.
TaskAttemptID[] failedTasks = null;
while (!remaining.isEmpty() && failedTasks == null) {
//如果需要fetcher的列表不空,并且失敗的task數(shù)量沒有
try {
failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);
//復(fù)制數(shù)據(jù)出來copyMapOutput的源代碼如下:
try {
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
mapId = TaskAttemptID.forName(header.mapId);
//獲取mapID
compressedLength = header.compressedLength;
decompressedLength = header.uncompressedLength;
forReduce = header.forReduce;
} catch (IllegalArgumentException e) {
badIdErrs.increment(1);
LOG.warn("Invalid map id ", e);
//Don't know which one was bad, so consider all of them as bad
return remaining.toArray(new TaskAttemptID[remaining.size()]);
}
InputStream is = input;
is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength);
compressedLength -= CryptoUtils.cryptoPadding(jobConf);
decompressedLength -= CryptoUtils.cryptoPadding(jobConf);
//如果需要解壓或解密
// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength, forReduce,
remaining, mapId)) {
return new TaskAttemptID[] {mapId};
}
if(LOG.isDebugEnabled()) {
LOG.debug("header: " + mapId + ", len: " + compressedLength +
", decomp len: " + decompressedLength);
}
try {
mapOutput = merger.reserve(mapId, decompressedLength, id);
//為merge預(yù)留一個(gè)MapOutput:是內(nèi)存還是磁盤上。
} catch (IOException ioe) {
// kill this reduce attempt
ioErrs.increment(1);
scheduler.reportLocalError(ioe);
//報(bào)告錯(cuò)誤
return EMPTY_ATTEMPT_ID_ARRAY;
}
// Check if we can shuffle now ...
if (mapOutput == null) {
LOG.info("fetcher#" + id + " - MergeManager returned status WAIT ...");
//Not an error but wait to process data.
return EMPTY_ATTEMPT_ID_ARRAY;
}
// The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError
// on decompression failures. Catching and re-throwing as IOException
// to allow fetch failure logic to be processed
try {
// Go!
LOG.info("fetcher#" + id + " about to shuffle output of map "
mapOutput.getMapId() + " decomp: " + decompressedLength
mapOutput.shuffle(host, is, compressedLength, decompressedLength,
metrics, reporter);
//跨節(jié)點(diǎn)把Mapper的文件內(nèi)容拷貝到reduce的內(nèi)存或者磁盤上。
} catch (java.lang.InternalError e) {
LOG.warn("Failed to shuffle for fetcher#"+id, e);
throw new IOException(e);
}
// Inform the shuffle scheduler
long endTime = Time.monotonicNow();
// Reset retryStartTime as map task make progress if retried before.
retryStartTime = 0;
scheduler.copySucceeded(mapId, host, compressedLength,
startTime, endTime, mapOutput);
//告訴調(diào)度器完成了一個(gè)節(jié)點(diǎn)的Map輸出的文件拷貝。
remaining.remove(mapId);
//這個(gè)MapTask的輸出已經(jīng)shuffle完畢
metrics.successFetch();
return null;后面的異常失敗信息我們不管。
這里的mapOutput是用來容納MapTask輸出文件的存儲(chǔ)空間,根據(jù)輸出文件的內(nèi)容大小和內(nèi)存的情況,可以是內(nèi)存的Output也可以是DiskOutput。 如果是內(nèi)存需要預(yù)約,因?yàn)椴恢挂粋€(gè)Fetcher。我們以InMemoryMapOutput為例。
代碼結(jié)構(gòu);
Fetcher.run-->copyFromHost-->copyMapOutput-->merger.reserve(MergeManagerImpl.reserve)-->InmemoryMapOutput.shuffle
public void shuffle(MapHost host, InputStream input,
long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics,
Reporter reporter) throws IOException {
//跨節(jié)點(diǎn)從Mapper拷貝spill文件
IFileInputStream checksumIn =
new IFileInputStream(input, compressedLength, conf);
//校驗(yàn)和的輸入流
input = checksumIn;
// Are map-outputs compressed?
if (codec != null) {
//如果涉及到了壓縮
decompressor.reset();
//重啟解壓器
input = codec.createInputStream(input, decompressor);
//加了解壓器的輸入流
}
try {
IOUtils.readFully(input, memory, 0, memory.length);
//從Mapper方把特定的Partition數(shù)據(jù)讀入Reducer的內(nèi)存緩沖區(qū)。
metrics.inputBytes(memory.length);
reporter.progress();//匯報(bào)進(jìn)度
LOG.info("Read " + memory.length + " bytes from map-output for " +
getMapId());
/**
We've gotten the amount of data we were expecting. Verify the
decompressor has nothing more to offer. This action also forces the
decompressor to read any trailing bytes that weren't critical
for decompression, which is necessary to keep the stream
*/
if (input.read() >= 0 ) {
throw new IOException("Unexpected extra bytes from input stream for " +
getMapId());
}
} catch (IOException ioe) {
// Close the streams
IOUtils.cleanup(LOG, input);
// Re-throw
throw ioe;
} finally {
CodecPool.returnDecompressor(decompressor);
//釋放解壓器
}
}
從對(duì)方把spill文件中屬于本partition數(shù)據(jù)復(fù)制過來,回到copyFromHost中,通過scheduler.copySuccessed告知scheduler,并把這個(gè)MapTask的ID從remaining集合中刪除,進(jìn)入下一個(gè)循環(huán),復(fù)制下一個(gè)MapTask數(shù)據(jù)。直到把所有的屬于本Partition的數(shù)據(jù)都復(fù)制過來。
以上是Reducer端Fetcher的過程,它向Mapper端發(fā)送HTTP GET請(qǐng)求,下載文件。在MapTask就有一個(gè)與之對(duì)應(yīng)的Server,這個(gè)網(wǎng)絡(luò)協(xié)議的源代碼不做深究,課下有興趣自己研究。 獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。