溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

hadoop中map如何輸出

發(fā)布時(shí)間:2021-11-24 14:27:49 來(lái)源:億速云 閱讀:193 作者:小新 欄目:大數(shù)據(jù)

這篇文章主要為大家展示了“hadoop中map如何輸出”,內(nèi)容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“hadoop中map如何輸出”這篇文章吧。

Mapper  的輸入官方文檔如下

The Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.

mapper的輸出是已經(jīng)排序并且針對(duì)每個(gè)reducer劃分開的,那么hadoop代碼是如何劃分的,這里將跟從代碼分析。

還是根據(jù)官方示例WordCount的示例

第一次分析為了簡(jiǎn)化map的輸出復(fù)雜情況,

只分析一個(gè)文檔,并且其中只有10個(gè)'單詞',分別為“J", .."c", "b",  "a" ( 這里10個(gè)字母最好是亂序的,后面會(huì)看到其排序),

注釋掉設(shè)置combine class的代碼。

1. 單步跟蹤map中的context.write(生產(chǎn)kvbuffer 和kvmeta)

可以追蹤到最終實(shí)際是由org.apache.hadoop.mapred.MapTask.MapOutputBuffer.collect(K, V, int)

這里因?yàn)槲覀兊膐utput 只有10個(gè)Record 且每個(gè)大小都比較小,所以跳過(guò)了spill了處理以及combine處理,主要代碼如下,

public synchronized void collect(K key, V value, final int partition  ) throws IOException {

{

     ...

     keySerializer.serialize(key);

     ...

     valSerializer.serialize(value);

     ....        kvmeta.put(kvindex + PARTITION, partition);        kvmeta.put(kvindex + KEYSTART, keystart);

        kvmeta.put(kvindex + VALSTART, valstart);

        kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));     ...

}

這里實(shí)際是將(K,V) 序列化到了byte數(shù)組org.apache.hadoop.mapred.MapTask.MapOutputBuffer.kvbuffer 中,

并將(K,V)在內(nèi)存中的位置信息 以及 其partition(相同partition的record由同一個(gè)reducer處理) 消息 存在 kvmeta 中.

到此map的輸出都存在了內(nèi)存中

2. 通過(guò)查找kvmeta的代碼索引, 找到消費(fèi)kvbuffer和kvmeta代碼,生產(chǎn)spillRecv到indexCacheList

可以找到在 org.apache.hadoop.mapred.MapTask.MapOutputBuffer.sortAndSpill() 中找到有使用,設(shè)置斷點(diǎn),看到如下,

private void sortAndSpill() throws IOException, ClassNotFoundException,    InterruptedException {     ...

sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);

     ...

     for (int i = 0; i < partitions; ++i) {

     ...

          if (combinerRunner == null) {

              // spill directly              DataInputBuffer key = new DataInputBuffer();

              while (spindex < mend &&

                  kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {

                ....

                 writer.append(key, value);

                ++spindex;

              }

           }      ...

          spillRec.putIndex(rec, i);

     }

     ...

indexCacheList.add(spillRec);

     ...}

這里有三個(gè)操作,

1. Sorter.sort :是以partition  和key  來(lái)排序的,目的是聚合相同partition的record, 并以key的順序排列。

2. writer.append :  將序列化的record 寫入輸出流,這里寫入到文件spill0.out

3. indexCacheList.add :  每個(gè)spillRec記錄某個(gè)spill out文件中包含的partition信息。

3. 查找消費(fèi)indexCacheList的代碼,org.apache.hadoop.mapred.MapTask.MapOutputBuffer.mergeParts()

在此設(shè)置斷點(diǎn),可以看到這里我們只有一個(gè)spill文件,不需要merge,

這里只是唯一的spillRec 寫入到到文件中, file.out.index

將spill0.out 重命名為file.out, 可以vim打開這個(gè)文件看到里面存在順序號(hào)的字符。

    private void mergeParts() throws IOException, InterruptedException,     ClassNotFoundException {

...

sameVolRename(filename[0],

            mapOutputFile.getOutputFileForWriteInVolume(filename[0]));...

     indexCacheList.get(0).writeToFile(

            mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);

...}

總結(jié)如下:

1. map的輸出首先序列化到內(nèi)存中kvbuffer,kvmeta

2. sortAndSpill 會(huì)將內(nèi)存中的record寫入到文件中

3. merge將spill出的文件merge問(wèn)一個(gè)文件file.out,并將每個(gè)文件中partition的信息寫入file.out.index

還沒(méi)分析的情況:

map 輸出大量數(shù)據(jù),出現(xiàn)多個(gè)spill 文件的復(fù)雜情況的細(xì)節(jié)(1. 異步spill, 2. merge 多個(gè)文件)

hadoop中map如何輸出

以上是“hadoop中map如何輸出”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI