溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Mapper輸出緩沖區(qū)MapOutputBuffer怎么理解

發(fā)布時(shí)間:2021-12-31 16:23:05 來(lái)源:億速云 閱讀:146 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“Mapper輸出緩沖區(qū)MapOutputBuffer怎么理解”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Mapper輸出緩沖區(qū)MapOutputBuffer怎么理解”吧!

Mapper的輸出緩沖區(qū)MapOutputBuffer

現(xiàn)在我們知道了Map的輸入端,緊接著我們看map的輸出,這里重點(diǎn)就是context.write這個(gè)語(yǔ)句的內(nèi)涵。

搞清Mapper作為參數(shù)傳給map的context,這里我們看Mapper的run被調(diào)用的時(shí)候作為了參數(shù)傳遞下來(lái)。調(diào)用Mapper.run的是MapTask. runNewMapper。到這里我們深究一下runNewMapper。我們看MapTask的run方法:我們重點(diǎn)看runNewMapper

public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)

throws IOException, ClassNotFoundException, InterruptedException {

this.umbilical = umbilical;

if (isMapTask()) {

  // If there are no reducers then there won't be any sort. Hence the map

  // phase will govern the entire attempt's progress.

  if (conf.getNumReduceTasks() == 0) {

    mapPhase = getProgress().addPhase("map", 1.0f);

  } else {

    // If there are reducers then the entire attempt's progress will be

    // split between the map phase (67%) and the sort phase (33%).

    mapPhase = getProgress().addPhase("map", 0.667f);

    sortPhase  = getProgress().addPhase("sort", 0.333f);

  }

}

TaskReporter reporter = startReporter(umbilical);獲取視頻中文檔資料及完整視頻的伙伴請(qǐng)加QQ群:947967114

boolean useNewApi = job.getUseNewMapper();

initialize(job, getJobID(), reporter, useNewApi);

// check if it is a cleanupJobTask

if (jobCleanup) {

  runJobCleanupTask(umbilical, reporter);

  return;

}

if (jobSetup) {

  runJobSetupTask(umbilical, reporter);

  return;

}

if (taskCleanup) {

  runTaskCleanupTask(umbilical, reporter);

  return;

}

if (useNewApi) {

  runNewMapper(job, splitMetaInfo, umbilical, reporter);

} else {

  runOldMapper(job, splitMetaInfo, umbilical, reporter);

}

done(umbilical, reporter);

}

當(dāng)我們點(diǎn)runNewMapper的時(shí)候就能進(jìn)入真正實(shí)現(xiàn):

private <INKEY,INVALUE,OUTKEY,OUTVALUE>

void runNewMapper(final JobConf job,final TaskSplitIndex splitIndex,final TaskUmbilicalProtocol umbilical,TaskReporter reporter) throws IOException, ClassNotFoundException,

InterruptedException {

// make a task context so we can get the classes

org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =

  new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,                                                             getTaskID(),reporter);

// make a mapper    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =  (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)

ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

//確定該用哪一種具體的Mapper,然后創(chuàng)建。獲取視頻中文檔資料及完整視頻的伙伴請(qǐng)加QQ群:947967114

org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =

  (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)

ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);

//確定輸入的文件格式

// rebuild the input split

org.apache.hadoop.mapreduce.InputSplit split = null;

split = getSplitDetails(new Path(splitIndex.getSplitLocation()),splitIndex.getStartOffset());//確定這個(gè)Mapper所用的輸入是哪一個(gè)split

LOG.info("Processing split: " + split);

org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =

  new NewTrackingRecordReader<INKEY,INVALUE>

    (split, inputFormat, reporter, taskContext);

//創(chuàng)建和InputFormat相稱(chēng)的RecordReader

job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

org.apache.hadoop.mapreduce.RecordWriter output = null;

// get an output object

if (job.getNumReduceTasks() == 0) {

//如果設(shè)置的reduce個(gè)數(shù)是0,就直接輸出。

  output =

    new NewDirectOutputCollector(taskContext, job, umbilical, reporter);

} else {

  output = new NewOutputCollector(taskContext, job, umbilical, reporter);

}

接下來(lái)我們看一下NewOutputCollector源碼  獲取視頻中文檔資料及完整視頻的伙伴請(qǐng)加QQ群:947967114

NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,

                   JobConf job,

                   TaskUmbilicalProtocol umbilical,

                   TaskReporter reporter

                   ) throws IOException, ClassNotFoundException {

  collector = createSortingCollector(job, reporter);

//創(chuàng)建通向排序階段的collecter

  partitions = jobContext.getNumReduceTasks();

//通過(guò)獲取Reduce數(shù)量來(lái)獲得partitions數(shù)量。兩個(gè)數(shù)量一一對(duì)應(yīng)

  if (partitions > 1) {

//獲取的partitions 數(shù)量大于1

    partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)

  ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);

//ReflectionUtils.newInstance創(chuàng)建用戶(hù)設(shè)置的Partitioner,里邊的參數(shù)jobContext.getPartitionerClass()是對(duì)抽象類(lèi)的某種擴(kuò)充,表示自己可以書(shū)寫(xiě)一個(gè)Partitioner類(lèi),通過(guò)這個(gè)方法來(lái)獲取,如果沒(méi)有自己寫(xiě),就是用默認(rèn)的HashPartitioner

  } else {

    partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {

      @Override

      public int getPartition(K key, V value, int numPartitions) {

        return partitions - 1;

      }//只有一個(gè)partition就動(dòng)態(tài)擴(kuò)充抽象類(lèi)Partitioner類(lèi)

    };

  }

}

回到runNewMapper源碼:

org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>

mapContext =

  new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split);

//創(chuàng)建一個(gè)用于Mapper的Context。

org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context

    mapperContext =

      new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(mapContext);

//把上邊創(chuàng)建的mapContext通過(guò)getMapContext獲取過(guò)來(lái)最終傳遞給mapperContext ,我們繼續(xù)看getMapContext源碼

public Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context

getMapContext(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {

return new Context(mapContext);

}

//這里返回了Context對(duì)象,在查看Context對(duì)象。獲取視頻中文檔資料及完整視頻的伙伴請(qǐng)加QQ群:947967114

public Context(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {

  this.mapContext = mapContext;

}

//我們看到獲取了mapContext 的值。所以我們知道WrappedMapper-->Context-->mapContext是一個(gè)MapContextImpl。

try {

  input.initialize(split, mapperContext);

//初始化input,input是recordReader對(duì)象,split和mapperContext作為參數(shù)

mapper.run(mapperContext);

//我們知道這個(gè)run方法運(yùn)行的是Mapper的run方法,所以看一下這個(gè)run

public void run(Context context) throws IOException, InterruptedException {

setup(context);

//獲取context

try {

  while (context.nextKeyValue()) {

//通過(guò)nextKeyValue來(lái)控制運(yùn)行

    map(context.getCurrentKey(), context.getCurrentValue(), context);

//運(yùn)行了map方法,給了recordReader提供過(guò)來(lái)的鍵值對(duì)。

  }

} finally {

  cleanup(context);

}

}

回到MapTask源碼

  mapPhase.complete();

//上鎖

  setPhase(TaskStatus.Phase.SORT);

//所有的task結(jié)果進(jìn)行排序

  statusUpdate(umbilical);

//更新runNewMapper狀態(tài)。

  input.close();

//關(guān)閉輸入流

  input = null;

  output.close(mapperContext);

//關(guān)閉輸出流

  output = null;

} finally {

  closeQuietly(input);

  closeQuietly(output, mapperContext);

}

}

對(duì)于輸入格式和分片以前已經(jīng)詳細(xì)說(shuō)過(guò)了,需要注意NewTrackingRecordReader。我們知道有了InputFormat之后需要?jiǎng)?chuàng)建與他對(duì)應(yīng)的RecordReader。但是在RecordReader上是用NewTrackingRecordReader。不同之處在于Tracking,是一個(gè)跟蹤,對(duì)RecordReader的跟蹤,他這里有一個(gè)參數(shù)reporter,就是用來(lái)上報(bào)跟蹤結(jié)果的,RecordReader則沒(méi)有這個(gè)功能。

和輸出有關(guān)的是collecter,是輸出數(shù)據(jù)的收集器,context.write最后就通過(guò)RecodWriter落實(shí)到collector.collect上。RecordWriter和RecordReader是同一個(gè)層次。RecodWriter是hadoop定義個(gè)一個(gè)抽象類(lèi),具體的RecodWriter就是對(duì)這個(gè)抽象類(lèi)的擴(kuò)充。用于maptask的就是NewDrictDoutputCollecter和NewOutputCollecter。

這兩個(gè)類(lèi)叫做OutputCollecter,實(shí)際上都是RecordWriter。Collecter只是一種語(yǔ)意的描述。從Mapper的角度看是Writer,是輸出。從框架或下游的角度看是Collect,是收集。

如果reducer數(shù)量是0,就是沒(méi)有reducer,Mapper的輸出就是整個(gè)MR的輸出,這個(gè)時(shí)候用RecordWriter的NewDrictDoutputCollecter,直接輸出。相反至少有一個(gè)Reducer,那么使用的就是RecordWriter的NewOutputCollecter。這是我們注重的重點(diǎn)內(nèi)容。我們看NewOutputCollecter源碼。定義了幾個(gè)內(nèi)容:

  collector = createSortingCollector(job, reporter);

//實(shí)現(xiàn)MapOutputCollector

  partitions = jobContext.getNumReduceTasks();

//負(fù)責(zé)Mapper輸出的分區(qū)

    partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)

//分發(fā)目標(biāo)的個(gè)數(shù),也就是Reducer的個(gè)數(shù)。

@Override

public void write(K key, V value) throws IOException, InterruptedException {

  collector.collect(key, value,

                    partitioner.getPartition(key, value, partitions));

}

//write只寫(xiě)不讀。

@Override

public void close(TaskAttemptContext context

                  ) throws IOException,InterruptedException {

  try {

    collector.flush();

  } catch (ClassNotFoundException cnf) {

    throw new IOException("can't find class ", cnf);

  }

  collector.close();

}

}

NewOutputCollector分成兩部分,一個(gè)是collecter還有一個(gè)是partitioner。collecter負(fù)責(zé)實(shí)際收集Mapper輸出并交付給Reducer的工作,partitioner負(fù)責(zé)決定把具體的輸出交給哪一個(gè)Reducer。

有多個(gè)Reducer存在,MR框架需要把每個(gè)Mapper的每項(xiàng)輸出,也就是收集到的所有的KV對(duì)。按照某種條件(就是Partioner的實(shí)現(xiàn)方式,默認(rèn)就是HashPartitioner)輸出到不同的Reducer。這樣就把Mapper的輸出劃分成了多個(gè)分區(qū)(Partition),有幾個(gè)Reducer就把每個(gè)Mapper還分成幾個(gè)Partition,Partitioner就是起到劃分的作用。hash的方式。。。。。。。。。。。。

所以在創(chuàng)建NewOutputCollector的構(gòu)造函數(shù)中,就要把具體的collector和partitioner創(chuàng)建好。

hadoop的源碼中定義了MapOutputCollector。凡是實(shí)現(xiàn)了這個(gè)類(lèi),除了init和close方法外,還必須提供collect和flush這兩個(gè)函數(shù),從NewOutputCollector知道這兩個(gè)函數(shù)的調(diào)用者是collector,創(chuàng)建collector的方式是通過(guò)createSortingCollector來(lái)完成的。并且還實(shí)現(xiàn)了對(duì)KV對(duì)的排序。從屬關(guān)系如下:

YarnChild.main->PrivilegeExceptionAction.run->Maptask.run-->RunNewMapper->NewOutputCollector->MapTask.createSortingCollector

那么我們來(lái)看一下createSortingCollector源碼。獲取視頻中文檔資料及完整視頻的伙伴請(qǐng)加QQ群:947967114

private <KEY, VALUE> MapOutputCollector<KEY, VALUE>

      createSortingCollector(JobConf job, TaskReporter reporter)

throws IOException, ClassNotFoundException {

MapOutputCollector.Context context =

  new MapOutputCollector.Context(this, job, reporter);

Class<?>[] collectorClasses = job.getClasses(

  JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);

//如果沒(méi)有添加設(shè)置就默認(rèn)使用MapOutputBuffer.class

int remainingCollectors = collectorClasses.length;

for (Class clazz : collectorClasses) {

//逐一實(shí)驗(yàn)設(shè)置的collectorClasses

  try {

    if (!MapOutputCollector.class.isAssignableFrom(clazz)) {

      throw new IOException("Invalid output collector class: " + clazz.getName() +

        " (does not implement MapOutputCollector)");

//這里告訴我們必須實(shí)現(xiàn)MapOutputCollector.class

    }

    Class<? extends MapOutputCollector> subclazz =

      clazz.asSubclass(MapOutputCollector.class);

    LOG.debug("Trying map output collector class: " + subclazz.getName());

//獲取日志

    MapOutputCollector<KEY, VALUE> collector =

      ReflectionUtils.newInstance(subclazz, job);

//創(chuàng)建collector對(duì)象。

    collector.init(context);

//初始化collector,實(shí)際上初始化的是MapOutputBuffer對(duì)象

    LOG.info("Map output collector class = " + collector.getClass().getName());

    return collector;

//沒(méi)有異常就成功了。

  } catch (Exception e) {

    String msg = "Unable to initialize MapOutputCollector " + clazz.getName();

    if (--remainingCollectors > 0) {

      msg += " (" + remainingCollectors + " more collector(s) to try)";

    }

    LOG.warn(msg, e);

  }

}

throw new IOException("Unable to initialize any output collector");

}

具體采用什么collector是可以在配置文件mapred-default.xml中設(shè)置的,這里的MAP_OUTPUT_COLLECTOR_CLASS_ATTR即mapreduce.job.output.collector.class.如果文件中沒(méi)有設(shè)置就使用默認(rèn)的MapOutputBuffer。所以實(shí)際創(chuàng)建的collcter就是Mapask的MapOutputBuffer。這個(gè)類(lèi)是Maptask的內(nèi)部類(lèi),實(shí)現(xiàn)了MapOutputCollector。

可想而知,如果我們另寫(xiě)一個(gè)實(shí)現(xiàn)了MapOutputCollectior的Collector,并修改配置文件mapred-default.xml中隊(duì)配置項(xiàng)的設(shè)置。那么就可以創(chuàng)建不是MapTask.MapOutputBuffer。那樣createSortingCollector創(chuàng)建的就是一個(gè)沒(méi)有排序功能的collector。我們知道MapReduce框架之所以是工作流不是數(shù)據(jù)流的原因就是因?yàn)镸apper和Reducer之間的排序。因?yàn)镾ort只有在所有數(shù)據(jù)到來(lái)之后才能完成。sort完之后所有數(shù)據(jù)才被Rducer拉取。那么沒(méi)有了sort之后代表數(shù)據(jù)可以不斷的流入而不是一次性的填充,MR給我們提供了這種可能性,就是通過(guò)寫(xiě)一個(gè)不排序的Collector來(lái)替代MapOutputBuffer。我們接下來(lái)還是把注意力放到runNewMapper上。

當(dāng)創(chuàng)建了collector和partitioner之后就是Context,MapTask在調(diào)用mapper.run時(shí)作為參數(shù)的是mapperContext,這個(gè)對(duì)象的類(lèi)型是WrappedMapper.Context,整個(gè)過(guò)程是MapContextImpl創(chuàng)建了mapContext對(duì)象,通過(guò)WrappedMapper對(duì)象(是對(duì)Mapper的擴(kuò)充,根據(jù)名字就可以知道是對(duì)Mapper的包裝區(qū)別就是在內(nèi)部定義了Context類(lèi)),把一個(gè)擴(kuò)充的Mapper.Context包裝在Mapper內(nèi)部,這就是WrappedMapper.Context類(lèi)對(duì)象。下面是部分代碼;

public class WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

public Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context

getMapContext(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {

return new Context(mapContext);

}

@InterfaceStability.Evolving

public class Context

  extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context {

protected MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext;

//MapContext類(lèi)。被MapContextImpl實(shí)現(xiàn)

public Context(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {

  this.mapContext = mapContext;

}

/**

 * Get the input split for this map.

 */

public InputSplit getInputSplit() {

  return mapContext.getInputSplit();

}

@Override

public KEYIN getCurrentKey() throws IOException, InterruptedException {

  return mapContext.getCurrentKey();

}

@Override

public VALUEIN getCurrentValue() throws IOException, InterruptedException {

  return mapContext.getCurrentValue();

}

@Override

public boolean nextKeyValue() throws IOException, InterruptedException {

  return mapContext.nextKeyValue();

}

WrappedMapper.Context是對(duì)Mapper.Context的擴(kuò)充。內(nèi)部mapContext,它的構(gòu)造函數(shù)Context中的this.mapContext就設(shè)置成這個(gè)MapContextImpl類(lèi)對(duì)象mapContext。WrappedMapper.Context擴(kuò)充了Mapper.Context的write、getCurrentKey、nextKeyValue等。

傳給mapper.run的context就是WrappedMapper.Context對(duì)象。里面的mapContext是MapContextImpl對(duì)象。

我們繼續(xù)看Mapper.map的context.write

關(guān)系是:MapTask.run->runNewMapper->Mapper.run->Mapper.map

按照這個(gè)關(guān)系找到了一個(gè)沒(méi)有做任何事的方法。

public void write(KEYOUT key, VALUEOUT value)

  throws IOException, InterruptedException;

我們需要找一個(gè)實(shí)現(xiàn),這里找到的就是WrappedMapper.Context.write

就是這一段:

public void write(KEYOUT key, VALUEOUT value) throws IOException,

    InterruptedException {

  mapContext.write(key, value);

}

這里的調(diào)用的其實(shí)是MapContextImpl.write。所以我們找到MapContextImpl。當(dāng)我們看到MapContextImpl源碼是看到繼承了TaskInputOutputContextImpl我們找到了

public void write(KEYOUT key, VALUEOUT value

                ) throws IOException, InterruptedException {

output.write(key, value);

}

找到這里我們還是沒(méi)有找到真正的實(shí)現(xiàn),這里的witer實(shí)際上調(diào)用的是,NewOutputCollector.writer。

public void write(K key, V value) throws IOException, InterruptedException {

  collector.collect(key, value,

                    partitioner.getPartition(key, value, partitions));

}

繞了一大圈之后我們發(fā)現(xiàn)最終回到了NewOutputCollector,這里的write和之前的有明顯區(qū)別是collect實(shí)現(xiàn)的,里面有了分區(qū)。我們找的目的是一定要找到write中真正實(shí)現(xiàn)了分區(qū)寫(xiě)。

我們知道context是個(gè)WrappedMappe.Context對(duì)象,所以context.write其實(shí)就是就是Wrapped.Context.write,這個(gè)函數(shù)轉(zhuǎn)而調(diào)用內(nèi)部成分mapContext的write函數(shù),而mapContext是個(gè)MapContextImpl對(duì)象,所以實(shí)際調(diào)用的是MoapCntextImpl.write。然而MapContextImpl中沒(méi)有提供write函數(shù),但是我們看到這個(gè)類(lèi)繼承了TaskInputOutputContextImpl。所以就繼承他的write方法,然后這個(gè)write函數(shù)調(diào)用的是output的write,我們知道這個(gè)output參數(shù)類(lèi)型是一個(gè)RecordReader,實(shí)際上這個(gè)output就是MapTask中定義的output,這個(gè)output是一個(gè)NewOutputCollector,也就是說(shuō)是調(diào)用的NewOutputCollector的write方法,在這個(gè)write中我們看到調(diào)用了collector的collect,這個(gè)collecter就是Maptask.MapOutputBuffer。

在調(diào)用Maptask.MapOutputBuffer的collect時(shí)增加了一個(gè)參數(shù)partition,是指明KV去向的,這個(gè)值是有job.setPartitionerClass指定的,沒(méi)有設(shè)置就使用了hashPartitioner。下面所有的工作就是由MapTask的MapOutputBuffer來(lái)完成了。

感謝各位的閱讀,以上就是“Mapper輸出緩沖區(qū)MapOutputBuffer怎么理解”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Mapper輸出緩沖區(qū)MapOutputBuffer怎么理解這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI