溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

ReceiverSupervisorImpl實(shí)例化怎么實(shí)現(xiàn)

發(fā)布時(shí)間:2021-12-16 16:39:16 來源:億速云 閱讀:151 作者:iii 欄目:云計(jì)算

這篇文章主要介紹“ReceiverSupervisorImpl實(shí)例化怎么實(shí)現(xiàn)”,在日常操作中,相信很多人在ReceiverSupervisorImpl實(shí)例化怎么實(shí)現(xiàn)問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”ReceiverSupervisorImpl實(shí)例化怎么實(shí)現(xiàn)”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

先回顧下 在 Executor執(zhí)行的具體的方法

  1. 實(shí)例化ReceiverSupervisorImpl

  2. start之后等待awaitTermination

// ReceiverTracker.scala line 564
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
  (iterator: Iterator[Receiver[_]]) => {
    if (!iterator.hasNext) {
      throw new SparkException(
        "Could not start receiver as object not found.")
    }
    if (TaskContext.get().attemptNumber() == 0) {
      val receiver = iterator.next()
      assert(iterator.hasNext == false)
      val supervisor = new ReceiverSupervisorImpl(
        receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
      supervisor.start()
      supervisor.awaitTermination()
    } else {
      // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
    }
  }

看下ReceiverSupervisorImpl的父類 ReceiverSupervisor的構(gòu)造。

成員變量賦值、將當(dāng)前supervisor與receiver關(guān)聯(lián)(  receiver.attachSupervisor(this) )

注釋也很清晰:在Worker上負(fù)責(zé)監(jiān)督Receiver。提供所需所有 處理從receiver接收到的數(shù)據(jù) 的接口

// ReceiverSupervisor.scala line 31
/**
 * Abstract class that is responsible for supervising a Receiver in the worker.
 * It provides all the necessary interfaces for handling the data received by the receiver.
 */
private[streaming] abstract class ReceiverSupervisor(
    receiver: Receiver[_],
    conf: SparkConf
  ) extends Logging {

  /** Enumeration to identify current state of the Receiver */
  object ReceiverState extends Enumeration {
    type CheckpointState = Value
    val Initialized, Started, Stopped = Value
  }
  import ReceiverState._

  // Attach the supervisor to the receiver
  receiver.attachSupervisor(this)               // 將receiver與supervisor關(guān)聯(lián)

  private val futureExecutionContext = ExecutionContext.fromExecutorService(
    ThreadUtils.newDaemonCachedThreadPool("receiver-supervisor-future", 128))

  /** Receiver id */
  protected val streamId = receiver.streamId

  /** Has the receiver been marked for stop. */
  private val stopLatch = new CountDownLatch(1)

  /** Time between a receiver is stopped and started again */
  private val defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000)

  /** The current maximum rate limit for this receiver. */
  private[streaming] def getCurrentRateLimit: Long = Long.MaxValue

  /** Exception associated with the stopping of the receiver */
  @volatile protected var stoppingError: Throwable = null

  /** State of the receiver */
  @volatile private[streaming] var receiverState = Initialized
  // 一些方法,其實(shí)就是 數(shù)據(jù)處理接口
}

ReceiverSupervisorImpl的實(shí)例化

  1. 實(shí)例化了 BlockManagerBasedBlockHandler,用于將數(shù)據(jù)發(fā)送到BlockManager

  2. 實(shí)例化RpcEndpoint

  3. 實(shí)例化 BlockGenerator 

  4. 實(shí)例化 BlockGeneratorListener 監(jiān)聽器

// ReceiverSupervisorImpl.scala line 43
/**
 * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
 * which provides all the necessary functionality for handling the data received by
 * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]]
 * object that is used to divide the received data stream into blocks of data.
 */
private[streaming] class ReceiverSupervisorImpl(
    receiver: Receiver[_],
    env: SparkEnv,
    hadoopConf: Configuration,
    checkpointDirOption: Option[String]
  ) extends ReceiverSupervisor(receiver, env.conf) with Logging {

  private val host = SparkEnv.get.blockManager.blockManagerId.host
  private val executorId = SparkEnv.get.blockManager.blockManagerId.executorId

  private val receivedBlockHandler: ReceivedBlockHandler = {
    if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {  // 默認(rèn)是不開啟
      if (checkpointDirOption.isEmpty) {
        throw new SparkException(
          "Cannot enable receiver write-ahead log without checkpoint directory set. " +
            "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
            "See documentation for more details.")
      }
      new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
        receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
    } else {
      new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)  
    }
  }

  /** Remote RpcEndpointRef for the ReceiverTracker */
  private val trackerEndpoint = RpcUtils.makeDriverRef("ReceiverTracker", env.conf, env.rpcEnv)

  /** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
  private val endpoint = env.rpcEnv.setupEndpoint(
    "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
      override val rpcEnv: RpcEnv = env.rpcEnv

      override def receive: PartialFunction[Any, Unit] = {
        case StopReceiver =>
          logInfo("Received stop signal")
          ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
        case CleanupOldBlocks(threshTime) =>
          logDebug("Received delete old batch signal")
          cleanupOldBlocks(threshTime)
        case UpdateRateLimit(eps) =>
          logInfo(s"Received a new rate limit: $eps.")
          registeredBlockGenerators.foreach { bg =>
            bg.updateRate(eps)
          }
      }
    })

  /** Unique block ids if one wants to add blocks directly */
  private val newBlockId = new AtomicLong(System.currentTimeMillis())

  private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator] // 典型的面包模式
    with mutable.SynchronizedBuffer[BlockGenerator]

  /** Divides received data records into data blocks for pushing in BlockManager. */
  private val defaultBlockGeneratorListener = new BlockGeneratorListener {
    def onAddData(data: Any, metadata: Any): Unit = { }

    def onGenerateBlock(blockId: StreamBlockId): Unit = { }

    def onError(message: String, throwable: Throwable) {
      reportError(message, throwable)
    }

    def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
      pushArrayBuffer(arrayBuffer, None, Some(blockId))
    }
  }
  private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)
  // ... 一些方法
  /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
def pushArrayBuffer(
    arrayBuffer: ArrayBuffer[_],
    metadataOption: Option[Any],
    blockIdOption: Option[StreamBlockId]
  ) {
  pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
}

/** Store block and report it to driver */
def pushAndReportBlock(
    receivedBlock: ReceivedBlock,
    metadataOption: Option[Any],
    blockIdOption: Option[StreamBlockId]
  ) {
  val blockId = blockIdOption.getOrElse(nextBlockId)
  val time = System.currentTimeMillis
  val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
  val numRecords = blockStoreResult.numRecords
  val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
  trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
  logDebug(s"Reported block $blockId")
}

}

看看BlockGenerator

注釋很清晰,有兩個(gè)線程

  1. 周期性的 將上一批數(shù)據(jù) 作為一個(gè)block,并新建下一個(gè)批次的數(shù)據(jù);RecurringTimer類,內(nèi)部有Thread

  2. 將數(shù)據(jù)push到BlockManager

//
/**
 * Generates batches of objects received by a
 * [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately
 * named blocks at regular intervals. This class starts two threads,
 * one to periodically start a new batch and prepare the previous batch of as a block,
 * the other to push the blocks into the block manager.
 *
 * Note: Do not create BlockGenerator instances directly inside receivers. Use
 * `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it.
 */
private[streaming] class BlockGenerator(
    listener: BlockGeneratorListener,
    receiverId: Int,
    conf: SparkConf,
    clock: Clock = new SystemClock()
  ) extends RateLimiter(conf) with Logging{

private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])

/**
 * The BlockGenerator can be in 5 possible states, in the order as follows.
 *
 *  - Initialized: Nothing has been started
 *  - Active: start() has been called, and it is generating blocks on added data.
 *  - StoppedAddingData: stop() has been called, the adding of data has been stopped,
 *                       but blocks are still being generated and pushed.
 *  - StoppedGeneratingBlocks: Generating of blocks has been stopped, but
 *                             they are still being pushed.
 *  - StoppedAll: Everything has stopped, and the BlockGenerator object can be GCed.
 */
private object GeneratorState extends Enumeration {
  type GeneratorState = Value
  val Initialized, Active, StoppedAddingData, StoppedGeneratingBlocks, StoppedAll = Value
}
import GeneratorState._

private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value")

private val blockIntervalTimer =
  new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")  // 周期性線程
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } // 負(fù)責(zé)將數(shù)據(jù)push的

@volatile private var currentBuffer = new ArrayBuffer[Any]
@volatile private var state = Initialized
//...
}

至此,ReceiverSupervisorImpl實(shí)例化完成。不過,截至目前為止Receiver還未啟動(dòng)。

到此,關(guān)于“ReceiverSupervisorImpl實(shí)例化怎么實(shí)現(xiàn)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI