溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

【Flume】TailDirSource源碼理解

發(fā)布時(shí)間:2020-06-15 18:58:01 來(lái)源:網(wǎng)絡(luò) 閱讀:6578 作者:巧克力黒 欄目:大數(shù)據(jù)

TaildirSource類(lèi)圖如下(列出主要類(lèi))
【Flume】TailDirSource源碼理解


TailDirSource類(lèi)
TailDirSource繼承了AbstractSource類(lèi),而AbstractSource類(lèi)中channelProcessor屬性負(fù)責(zé)將Source中的Event提交給Channel組件
TailDirSource類(lèi)通過(guò)配置參數(shù)匹配日志文件,獲取日志文件更新內(nèi)容并且將已經(jīng)讀取的偏移量記錄到特定的文件當(dāng)中(position file)

configure()方法:
1.判斷從配置文件加載的配置是否合法,其中包括了對(duì)filegroups,以及以filegroups為單位的文件路徑是否存在等條件。
2.對(duì)batchSize,skipToEnd,writePosInterval,idleTimeout等變量進(jìn)行初始化工作
batchSize定義了往Channel中發(fā)送Event的批量處理大小
skipToEnd定義了每次程序啟動(dòng),對(duì)文件進(jìn)行讀取的時(shí)候,是否從文件尾部開(kāi)始讀取數(shù)據(jù),或者從文件最開(kāi)始讀取。
writePosInterval,TaildirSource讀取每個(gè)監(jiān)控文件都在位置文件中記錄監(jiān)控文件的已經(jīng)讀取的偏移量,writePosInterval則是定義了更新位置文件的間隔。
idleTimeout日志文件在idleTimeout間隔時(shí)間,沒(méi)有被修改,文件將被關(guān)閉

start()方法:
通過(guò)configure()初始化后的變量創(chuàng)建了ReliableTaildirEventReader對(duì)象,同時(shí)創(chuàng)建兩個(gè)線程池idleFileChecker和positionWriter,分別用于監(jiān)控日志文件和記錄日志文件讀取的偏移量。
idleFileChecker實(shí)現(xiàn)一個(gè)Runnable接口,遍歷reader所有監(jiān)控的文件,檢查文件最后修改時(shí)間+idleTimeout是否小于當(dāng)前時(shí)間,說(shuō)明日志文件在idleTimeout時(shí)間內(nèi)沒(méi)有被修改,該文件將被關(guān)閉。

private class idleFileCheckerRunnable implements Runnable {
  @Override
  public void run() {
    try {
      long now = System.currentTimeMillis();
      for (TailFile tf : reader.getTailFiles().values()) {
        if (tf.getLastUpdated() + idleTimeout < now && tf.getRaf() != null) {
          idleInodes.add(tf.getInode());
        }
      }
    } catch (Throwable t) {
      logger.error("Uncaught exception in IdleFileChecker thread", t);
    }
  }
}

positionWriter主要作用是記錄日志文件讀取的偏移量,以json格式("inode", inode, "pos", tf.getPos(), "file", tf.getPath()),其中inode是linux系統(tǒng)中特有屬性,在適應(yīng)其他系統(tǒng)(Windows等)日志采集時(shí)ReliableTaildirEventReader.getInode()方法需要修改(注意:在利用Linux系統(tǒng)上inode實(shí)現(xiàn)上,文件是通過(guò)inode記錄日志讀取偏移量。所以即使文件名改變了,也不影響日志讀取,在我實(shí)現(xiàn)Window版本上,只采用了文件名對(duì)應(yīng)日志讀取偏移量,文件名改變影響日志讀取)。pos則是記錄的日志讀取的偏移量,file記錄了日志文件的路徑

process()方法:
process方法記錄了TailDirSource類(lèi)中主要的邏輯,獲取每個(gè)監(jiān)控的日志文件,調(diào)用tailFileProcess獲取每個(gè)日志文件的更新數(shù)據(jù),并將每條記錄轉(zhuǎn)換為Event(具體細(xì)節(jié)要看ReliableTaildirEventReader的readEvents方法)

public Status process() {
  Status status = Status.READY;
  try {
    existingInodes.clear();
    existingInodes.addAll(reader.updateTailFiles());
    for (long inode : existingInodes) {
      TailFile tf = reader.getTailFiles().get(inode);
      if (tf.needTail()) {
        tailFileProcess(tf, true);
      }
    }
    closeTailFiles();
    try {
      TimeUnit.MILLISECONDS.sleep(retryInterval);
    } catch (InterruptedException e) {
      logger.info("Interrupted while sleeping");
    }
  } catch (Throwable t) {
    logger.error("Unable to tail files", t);
    status = Status.BACKOFF;
  }
  return status;
}

ReliableTaildirEventReader類(lèi)
構(gòu)造ReliableTaildirEventReader對(duì)象的時(shí)候,首先會(huì)判斷各種必須參數(shù)是否合法等,然后加載position file獲取每個(gè)文件上次記錄的日志文件讀取的偏移量
loadPositionFile(String filePath) 不粘貼方法的具體代碼,主要就是獲取每個(gè)監(jiān)控日志文件的讀取偏移量
readEvents()的各個(gè)不同參數(shù)方法中,下面這個(gè)是最主要的,該方法獲取當(dāng)前日志文件的偏移量,調(diào)用TailFile.readEvents(numEvents, backoffWithoutNL, addByteOffset)方法將日志文件每行轉(zhuǎn)換為Flume的消息對(duì)象Event,并循環(huán)將每個(gè)event添加header信息。

public List<Event> readEvents(int numEvents, boolean backoffWithoutNL)
    throws IOException {
  if (!committed) {
    if (currentFile == null) {
      throw new IllegalStateException("current file does not exist. " + currentFile.getPath());
    }
    logger.info("Last read was never committed - resetting position");
    long lastPos = currentFile.getPos();
    currentFile.updateFilePos(lastPos);
  }
  List<Event> events = currentFile.readEvents(numEvents, backoffWithoutNL, addByteOffset);
  if (events.isEmpty()) {
    return events;
  }

  Map<String, String> headers = currentFile.getHeaders();
  if (annotateFileName || (headers != null && !headers.isEmpty())) {
    for (Event event : events) {
      if (headers != null && !headers.isEmpty()) {
        event.getHeaders().putAll(headers);
      }
      if (annotateFileName) {
        event.getHeaders().put(fileNameHeader, currentFile.getPath());
      }
    }
  }
  committed = false;
  return events;
}

openFile(File file, Map<String, String> headers, long inode, long pos) 方法根據(jù)日志文件對(duì)象,headers,inode和偏移量pos創(chuàng)建一個(gè)TailFile對(duì)象


TailFile類(lèi)
TaildirSource通過(guò)TailFile類(lèi)操作處理每個(gè)日志文件,包含了RandomAccessFile類(lèi),以及記錄日志文件偏移量pos,最新更新時(shí)間lastUpdated等屬性
RandomAccessFile完美的符合TaildirSource的應(yīng)用場(chǎng)景,RandomAccessFile支持使用seek()方法隨機(jī)訪問(wèn)文件,配合position file中記錄的日志文件讀取偏移量,能夠輕松簡(jiǎn)單的seek到文件偏移量,然后向后讀取日志內(nèi)容,并重新將新的偏移量記錄到position file中。

readEvent(boolean backoffWithoutNL, boolean addByteOffset)方法:
下圖描述了該方法的調(diào)用層級(jí),readEvent簡(jiǎn)單的理解就是將每行日志轉(zhuǎn)為Event消息體,方法最終調(diào)用的是readFile()方法。

【Flume】TailDirSource源碼理解

readLine()方法,有點(diǎn)難還在研究

public LineResult readLine() throws IOException {
  LineResult lineResult = null;
  while (true) {
    if (bufferPos == NEED_READING) {
      if (raf.getFilePointer() < raf.length()) {//當(dāng)文件指針位置小于文件總長(zhǎng)度的時(shí)候,就需要讀取指針位置到文件最后的數(shù)據(jù)
        readFile();
      } else {
        if (oldBuffer.length > 0) {
          lineResult = new LineResult(false, oldBuffer);
          oldBuffer = new byte[0];
          setLineReadPos(lineReadPos + lineResult.line.length);
        }
        break;
      }
    }
    for (int i = bufferPos; i < buffer.length; i++) {
      if (buffer[i] == BYTE_NL) {
        int oldLen = oldBuffer.length;
        // Don't copy last byte(NEW_LINE)
        int lineLen = i - bufferPos;
        // For windows, check for CR
        if (i > 0 && buffer[i - 1] == BYTE_CR) {
          lineLen -= 1;
        } else if (oldBuffer.length > 0 && oldBuffer[oldBuffer.length - 1] == BYTE_CR) {
          oldLen -= 1;
        }
        lineResult = new LineResult(true,
            concatByteArrays(oldBuffer, 0, oldLen, buffer, bufferPos, lineLen));
        setLineReadPos(lineReadPos + (oldBuffer.length + (i - bufferPos + 1)));
        oldBuffer = new byte[0];
        if (i + 1 < buffer.length) {
          bufferPos = i + 1;
        } else {
          bufferPos = NEED_READING;
        }
        break;
      }
    }
    if (lineResult != null) {
      break;
    }
    // NEW_LINE not showed up at the end of the buffer
    oldBuffer = concatByteArrays(oldBuffer, 0, oldBuffer.length,
                                 buffer, bufferPos, buffer.length - bufferPos);
    bufferPos = NEED_READING;
  }
  return lineResult;
}

readFile()按BUFFER_SIZE(默認(rèn)8KB)作為緩沖讀取日志文件數(shù)據(jù)

private void readFile() throws IOException {
  if ((raf.length() - raf.getFilePointer()) < BUFFER_SIZE) {
    buffer = new byte[(int) (raf.length() - raf.getFilePointer())];
  } else {
    buffer = new byte[BUFFER_SIZE];
  }
  raf.read(buffer, 0, buffer.length);
  bufferPos = 0;
}
向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI