溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Spark2.x中如何進(jìn)行BlockManagerMaster源碼剖析

發(fā)布時(shí)間:2021-12-16 20:38:05 來(lái)源:億速云 閱讀:139 作者:柒染 欄目:大數(shù)據(jù)

今天就跟大家聊聊有關(guān)Spark2.x中如何進(jìn)行BlockManagerMaster源碼剖析,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

  1.BlockManagerMaster創(chuàng)建

    BlockManagerMaster要負(fù)責(zé)整個(gè)應(yīng)用程序在運(yùn)行期間block元數(shù)據(jù)的管理和維護(hù),以及向從節(jié)點(diǎn)發(fā)送指令執(zhí)行命令,它是在構(gòu)造SparkEnv的時(shí)候創(chuàng)建的,Driver端是創(chuàng)建SparkContext的時(shí)候創(chuàng)建SparkEnv,SparkEnv中對(duì)應(yīng)的初始化代碼如下:

    val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(      BlockManagerMaster.DRIVER_ENDPOINT_NAME,      new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),      conf, isDriver)

    這里可以看到在構(gòu)造blockManagerMaster時(shí),會(huì)創(chuàng)建一個(gè)BlockManagerMasterEndpoint實(shí)例并注冊(cè)到了rpcEnv中,Executor中的blockManager通過(guò)Driver端BlockManagerMasterEndpoint的引用BlockManagerMasterRef與blockManagerMaster進(jìn)行通信。

2.BlockManagerMaster成員函數(shù):

    1).removeExecutor()函數(shù),代碼如下:

  //向BlockManagerMasterEndpoint發(fā)送RemoveExecutor消息,移除掛掉的Exeutor  //這個(gè)函數(shù)只會(huì)在driver端調(diào)用  def removeExecutor(execId: String) {    tell(RemoveExecutor(execId))    logInfo("Removed " + execId + " successfully in removeExecutor")  }

    2).removeExecutorAsync()函數(shù),代碼如下:

  // 跟1)作用差不多,移除掛掉的Executor,這里是非阻塞的異步方法  def removeExecutorAsync(execId: String) {    driverEndpoint.ask[Boolean](RemoveExecutor(execId))    logInfo("Removal of executor " + execId + " requested")  }

  3).registerBlockManager()函數(shù),代碼如下:

 //Executor端的BlockManager啟動(dòng)會(huì),會(huì)向BlockManagerMaster進(jìn)行注冊(cè)// BlockManagerMaster會(huì)保存在master的blockManagerInfo中 def registerBlockManager(      blockManagerId: BlockManagerId,      maxOnHeapMemSize: Long,      maxOffHeapMemSize: Long,      slaveEndpoint: RpcEndpointRef): BlockManagerId = {    logInfo(s"Registering BlockManager $blockManagerId")    val updatedId = driverEndpoint.askSync[BlockManagerId](      RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))    logInfo(s"Registered BlockManager $updatedId")    updatedId  }

  3).updateBlockInfo()函數(shù),代碼如下:

  //更新block數(shù)據(jù)塊信息  def updateBlockInfo(      blockManagerId: BlockManagerId,      blockId: BlockId,      storageLevel: StorageLevel,      memSize: Long,      diskSize: Long): Boolean = {      //向BlockManagerMasterEndpoint發(fā)送UpdateBlockInfo消息,并且返回結(jié)果    val res = driverEndpoint.askSync[Boolean](      UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))    logDebug(s"Updated info of block $blockId")    res  }

  4).getLocations()函數(shù),代碼如下:

 //獲取block所在的BockManager節(jié)點(diǎn)信息,這里返回的是Seq集合, //如果block的Replication>1  一個(gè)block塊,可能會(huì)在多個(gè)blockmanager //節(jié)點(diǎn)上存在  def getLocations(blockId: BlockId): Seq[BlockManagerId] = {  //向BlockManagerMasterEndpoint發(fā)送GetLocations消息    driverEndpoint.askSync[Seq[BlockManagerId]](GetLocations(blockId))  }

  5).getPeers()函數(shù),代碼如下:

  //獲取參數(shù)blockManagerId之外的其他BlockManagerId,  //上面說(shuō)了一個(gè)block,可能會(huì)在多個(gè)blockmanager節(jié)點(diǎn)上存在  def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {   //向BlockManagerMasterEndpoint發(fā)送GetPeers消息    driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId))  }

  6).getExecutorEndpointRef()函數(shù),代碼如下:

  //這里就是獲取BlockManagerMasterEndpoint的引用,與其進(jìn)行通信  private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {    for (      blockManagerId <- blockManagerIdByExecutor.get(executorId);      info <- blockManagerInfo.get(blockManagerId)    ) yield {      info.slaveEndpoint    }  }

  7).getBlockStatus()函數(shù),代碼如下:

//獲取一個(gè)Block的狀態(tài)信息,位置,占用內(nèi)存和磁盤大小def getBlockStatus(      blockId: BlockId,      askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = {    val msg = GetBlockStatus(blockId, askSlaves)    val response = driverEndpoint.      askSync[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)    val (blockManagerIds, futures) = response.unzip    implicit val sameThread = ThreadUtils.sameThread    val cbf =      implicitly[        CanBuildFrom[Iterable[Future[Option[BlockStatus]]],        Option[BlockStatus],        Iterable[Option[BlockStatus]]]]    val blockStatus = timeout.awaitResult(      Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread))    if (blockStatus == null) {      throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)    }    blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>      status.map { s => (blockManagerId, s) }    }.toMap  }

BlockManagerMaster里面的各種函數(shù)處理其實(shí)都在 BlockManagerMasterEndpoint實(shí)例中,后面我們會(huì)詳細(xì)剖析BlockManagerMasterEndpoint類的各個(gè)消息的具體處理流程。

  

看完上述內(nèi)容,你們對(duì)Spark2.x中如何進(jìn)行BlockManagerMaster源碼剖析有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI