溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

(版本定制)第11課:Spark Streaming源碼解讀

發(fā)布時(shí)間:2020-06-05 21:41:05 來源:網(wǎng)絡(luò) 閱讀:286 作者:Spark_2016 欄目:大數(shù)據(jù)

本期內(nèi)容:

    1、ReceiverTracker的架構(gòu)設(shè)計(jì)

    2、消息循環(huán)系統(tǒng)

    3、ReceiverTracker具體實(shí)現(xiàn)


上節(jié)課講到了Receiver是如何不斷的接收數(shù)據(jù)的,并且接收到的數(shù)據(jù)的元數(shù)據(jù)會(huì)匯報(bào)給ReceiverTracker,下面我們看看ReceiverTracker具體的功能及實(shí)現(xiàn)。

ReceiverTracker主要的功能:

  1. 在Executor上啟動(dòng)Receivers。

  2. 停止Receivers 。

  3. 更新Receiver接收數(shù)據(jù)的速度(也就是限流)

  4. 不斷的等待Receivers的運(yùn)行狀態(tài),只要Receivers停止運(yùn)行,就重新啟動(dòng)Receiver,也就是Receiver的容錯(cuò)功能。

  5. 接受Receiver的注冊(cè)。

  6. 借助ReceivedBlockTracker來管理Receiver接收數(shù)據(jù)的元數(shù)據(jù)。

  7. 匯報(bào)Receiver發(fā)送過來的錯(cuò)誤信息


ReceiverTracker 管理了一個(gè)消息通訊體ReceiverTrackerEndpoint,用來與Receiver或者ReceiverTracker 進(jìn)行消息通信。

在ReceiverTracker的start方法中,實(shí)例化了ReceiverTrackerEndpoint,并且在Executor上啟動(dòng)Receivers。

啟動(dòng)Receivr,其實(shí)是ReceiverTracker給ReceiverTrackerEndpoint發(fā)送了一個(gè)本地消息,ReceiverTrackerEndpoint將Receiver封裝成RDD以job的方式提交給集群運(yùn)行。

Receiver啟動(dòng)后,會(huì)向ReceiverTracker注冊(cè),注冊(cè)成功才算正式啟動(dòng)了。

當(dāng)Receiver端接收到數(shù)據(jù),達(dá)到一定的條件需要將數(shù)據(jù)寫入BlockManager,并且將數(shù)據(jù)的元數(shù)據(jù)匯報(bào)給ReceiverTracker。

/** Store block and report it to driver */
def pushAndReportBlock(
    receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
  ) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
  logDebug(s"Reported block $blockId")
}

當(dāng)ReceiverTracker收到元數(shù)據(jù)后,會(huì)在線程池中啟動(dòng)一個(gè)線程來寫數(shù)據(jù)

case AddBlock(receivedBlockInfo) =>
if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
walBatchingThreadPool.execute(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
if (active) {
          context.reply(addBlock(receivedBlockInfo)) 
        } else {
throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
        }
      }
    })
  } else {
    context.reply(addBlock(receivedBlockInfo))
  }

數(shù)據(jù)的元數(shù)據(jù)是交由ReceivedBlockTracker管理的

數(shù)據(jù)最終被寫入到streamIdToUnallocatedBlockQueues中,一個(gè)流對(duì)應(yīng)一個(gè)數(shù)據(jù)塊信息的隊(duì)列。

每當(dāng)Streaming 觸發(fā)job時(shí),會(huì)將隊(duì)列中的數(shù)據(jù)分配成一個(gè)batch,并將數(shù)據(jù)寫入timeToAllocatedBlocks數(shù)據(jù)結(jié)構(gòu)。

下面是簡(jiǎn)單的流程圖:

(版本定制)第11課:Spark Streaming源碼解讀

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI