溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

hadoop中的recordreader和split以及block的關(guān)系是怎樣的

發(fā)布時間:2021-12-09 17:28:56 來源:億速云 閱讀:137 作者:iii 欄目:云計算

這篇文章主要講解了“hadoop中的recordreader和split以及block的關(guān)系是怎樣的”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“hadoop中的recordreader和split以及block的關(guān)系是怎樣的”吧!

recordreader的作用不言而喻。

通常來講,Inputformat會為沒有一個split產(chǎn)生一個recordreader來提供給maptask使用,進而,MapTask能夠讀取屬于自己管轄處理的那部分split。

這里面,我們以linerecordreader為例子進行講解:

幾個核心的方法

hadoop中的recordreader和split以及block的關(guān)系是怎樣的

定義了linerecordreader基本的作用,即,是否有下一對kv,獲得下一個key,獲得下一個value。

而這個三個方法的使用地方如下。

暫時忽略...


因為文件在hdfs上分塊存放的,那么split和block什么鬼?為啥不直接按照block去處理就行了唄。原因呢,是block中的數(shù)據(jù)可能不是連續(xù)的??赡苣硞€重要的信息被兩個block分隔了。因此,我們使用邏輯上的概念,即split來處理。

而split并不是真的將文件split了...而是邏輯上的標記下start,length,filepath等即可。

hadoop中的recordreader和split以及block的關(guān)系是怎樣的

根據(jù)Path,可以過得到FileSystem

  final Path file = split.getPath();
  // open the file and seek to the start of the split
   final FileSystem fs = file.getFileSystem(job);


每個maptask呢,都會使用使用一個linerecordreader,處理對應的split,中間通過了

private FSDataInputStream fileIn;

來維護一個流。

切記:這里的流并不是只針對這個split的,我們之前說過,split只是標記而已,沒有分隔。

因此,這個流fileIn其實是指向整個文件的。

并且呢,這個流呢,會實現(xiàn)jdk中標準的方法,啥read啊之類的。讀取到緩沖區(qū)中,但是如果涉及到不同的block呢,這個流會自動幫我們?nèi)フ覍腷lock的,這個太復雜。反正記住fileIn屏蔽了頂層的不同block之前的切換,對我們來講就像處理一個大的文件一樣。


既然是流,那么就能夠定位了,因此,不同的maptask就可以根據(jù)自己的split中的start位置,通過fileIn流直接定位到要處理文件的那個地方。

fileIn.seek(start);
in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);
filePosition = fileIn;



可以看到其中的in對象,是借助fileIn生成,相比,in內(nèi)部一定借助了這個fileIn流來實現(xiàn)某個功能。


典型的,readLine, 

in對象負責一行的讀取邏輯,,而fileIn則負責從文件讀取字符到byte緩沖區(qū)。

readline函數(shù),最終會有一個這樣的抵用,可以看到

 bufferLength = fillBuffer(in, buffer, prevCharCR);

調(diào)用fillbuffer函數(shù),從in.read()中讀取東西到buffer中,

private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
  throws IOException {
    /* We're reading data from in, but the head of the stream may be
     * already buffered in buffer, so we have several cases:
     * 1. No newline characters are in the buffer, so we need to copy
     *    everything and read another buffer from the stream.
     * 2. An unambiguously terminated line is in buffer, so we just
     *    copy to str.
     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
     *    in CR.  In this case we copy everything up to CR to str, but
     *    we also need to see what follows CR: if it's LF, then we
     *    need consume LF as well, so next call to readLine will read
     *    from after that.
     * We use a flag prevCharCR to signal if previous character was CR
     * and, if it happens to be at the end of the buffer, delay
     * consuming it until we have a chance to look at the char that
     * follows.
     */
    str.clear();
    int txtLength = 0; //tracks str.getLength(), as an optimization
    int newlineLength = 0; //length of terminating newline
    boolean prevCharCR = false; //true of prev char was CR
    long bytesConsumed = 0;
    do {
      int startPosn = bufferPosn; //starting from where we left off the last time
      if (bufferPosn >= bufferLength) {
        startPosn = bufferPosn = 0;
        if (prevCharCR) {
          ++bytesConsumed; //account for CR from previous read
        }
        bufferLength = fillBuffer(in, buffer, prevCharCR);
        if (bufferLength <= 0) {
          break; // EOF
        }
      }
      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
        if (buffer[bufferPosn] == LF) {
          newlineLength = (prevCharCR) ? 2 : 1;
          ++bufferPosn; // at next invocation proceed from following byte
          break;
        }
        if (prevCharCR) { //CR + notLF, we are at notLF
          newlineLength = 1;
          break;
        }
        prevCharCR = (buffer[bufferPosn] == CR);
      }
      int readLength = bufferPosn - startPosn;
      if (prevCharCR && newlineLength == 0) {
        --readLength; //CR at the end of the buffer
      }
      bytesConsumed += readLength;
      int appendLength = readLength - newlineLength;
      if (appendLength > maxLineLength - txtLength) {
        appendLength = maxLineLength - txtLength;
      }
      if (appendLength > 0) {
        str.append(buffer, startPosn, appendLength);
        txtLength += appendLength;
      }
    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);

    if (bytesConsumed > Integer.MAX_VALUE) {
      throw new IOException("Too many bytes before newline: " + bytesConsumed);
    }
    return (int)bytesConsumed;
  }



OK,那之后的linerecordreader三個主要的方法就簡答了,讀取就行了。略屌。

但是,有一個問題還沒說。即一行信息如果被某個block分隔了咋辦。

或者這個問題,這樣說,我們知道Inputformat中的getSplit方法呢,就是根據(jù)文件的length等屬性直接劃分split的。

參照FileInputformat的getSplits方法

hadoop中的recordreader和split以及block的關(guān)系是怎樣的


那么一行數(shù)據(jù),可能在不同的splits中,也可能在不同的block中。

在不同的block中呢,這個有fileIn對象幫我們處理的了,主要是讀取read到緩沖區(qū),屬于物理上的問題,不是考慮的地方。

處于不同的split呢?這個情況有些問題,因為不同的split就是不同的劃分,并且由不同的map task執(zhí)行。

那么我們recordreader如何解決這個問題呢?

解決辦法便是,突破split的start和end限制。

linerecordreader的解決辦法:

只不start指向的位置不是文件的第一行,則默認的過濾掉一行(start位置可能是一行中的某一個位置)。

initialize()方法

if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
this.pos = start;


在nextKeyvalue方法中,多讀取一些數(shù)據(jù),補充完整的一行。

while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
      if (pos == 0) {
        newSize = skipUtfByteOrderMark();
      } else {
        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
        pos += newSize;
      }

      if ((newSize == 0) || (newSize < maxLineLength)) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }

OK,通過過濾掉一行,和多讀取一行,就能保證被split分隔的一行,能夠完成的讀取,同時也不會重復處理一些數(shù)據(jù)。因為,所有的mapTask的linerecordreader都遵循這個方法。

感謝各位的閱讀,以上就是“hadoop中的recordreader和split以及block的關(guān)系是怎樣的”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對hadoop中的recordreader和split以及block的關(guān)系是怎樣的這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI