溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Driver容錯(cuò)安全性怎么實(shí)現(xiàn)

發(fā)布時(shí)間:2021-12-16 16:32:55 來源:億速云 閱讀:119 作者:iii 欄目:云計(jì)算

這篇文章主要介紹“Driver容錯(cuò)安全性怎么實(shí)現(xiàn)”,在日常操作中,相信很多人在Driver容錯(cuò)安全性怎么實(shí)現(xiàn)問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對(duì)大家解答”Driver容錯(cuò)安全性怎么實(shí)現(xiàn)”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!

  • ·  第一、看ReceiverTracker的容錯(cuò),主要是ReceiverTracker接收元數(shù)據(jù)的進(jìn)入WAL,看ReceiverTracker的addBlock方法,代碼如下

    def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {

     try {

       val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))

       if (writeResult) {

         synchronized {

           getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo

         }

         logDebug(s"Stream ${receivedBlockInfo.streamId} received " +

           s"block ${receivedBlockInfo.blockStoreResult.blockId}")

       } else {

         logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +

           s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")

       }

       writeResult

     } catch {

       case NonFatal(e) =>

         logError(s"Error adding block $receivedBlockInfo", e)

         false

     }

    }

    writeToLog方法就是進(jìn)行WAL的操作,看writeToLog的代碼

    private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {

     if (isWriteAheadLogEnabled) {

       logTrace(s"Writing record: $record")

       try {

         writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),

           clock.getTimeMillis())

         true

       } catch {

         case NonFatal(e) =>

           logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)

           false

       }

     } else {

       true

     }

    }

    首先判斷是否開啟了WAL,根據(jù)一下isWriteAheadLogEnabled值

    private[streaming] def isWriteAheadLogEnabled: Boolean = writeAheadLogOption.nonEmpty

    接著看writeAheadLogOption

    private val writeAheadLogOption = createWriteAheadLog()

    再看createWriteAheadLog()方法

    private def createWriteAheadLog(): Option[WriteAheadLog] = {

     checkpointDirOption.map { checkpointDir =>

       val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)

       WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)

     }

    }

    根據(jù)checkpoint的配置,獲取checkpoint的目錄,這里可以看出,checkpoint可以有多個(gè)目錄。
    寫完WAL才將receivedBlockInfo放到內(nèi)存隊(duì)列g(shù)etReceivedBlockQueue中

    ·  第二、看ReceivedBlockTracker的allocateBlocksToBatch方法,代碼如下

    def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {

     if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {

       val streamIdToBlocks = streamIds.map { streamId =>

           (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))

       }.toMap

       val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

       if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {

         timeToAllocatedBlocks.put(batchTime, allocatedBlocks)

         lastAllocatedBatchTime = batchTime

       } else {

         logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")

       }

     } else {

       // This situation occurs when:

       // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,

       // possibly processed batch job or half-processed batch job need to be processed again,

       // so the batchTime will be equal to lastAllocatedBatchTime.

       // 2. Slow checkpointing makes recovered batch time older than WAL recovered

       // lastAllocatedBatchTime.

       // This situation will only occurs in recovery time.

       logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")

     }

    }

    首先從getReceivedBlockQueue中獲取每一個(gè)receiver的ReceivedBlockQueue隊(duì)列賦值給streamIdToBlocks,然后包裝一下

    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

    allocatedBlocks就是根據(jù)時(shí)間獲取的一批元數(shù)據(jù),交給對(duì)應(yīng)batchDuration的job,job在執(zhí)行的時(shí)候就可以使用,在使用前先進(jìn)行WAL,如果job出錯(cuò)恢復(fù)后,可以知道數(shù)據(jù)計(jì)算到什么位置

    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

       if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {

         timeToAllocatedBlocks.put(batchTime, allocatedBlocks)

         lastAllocatedBatchTime = batchTime

       } else {

         logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")

    }

    ·  第三、看cleanupOldBatches方法,cleanupOldBatches的功能是從內(nèi)存中清楚不用的batches元數(shù)據(jù),再刪除WAL的數(shù)據(jù),再刪除之前把要?jiǎng)h除的batches信息也進(jìn)行WAL

    def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {

     require(cleanupThreshTime.milliseconds < clock.getTimeMillis())

     val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq

     logInfo("Deleting batches " + timesToCleanup)

     if (writeToLog(BatchCleanupEvent(timesToCleanup))) {

       timeToAllocatedBlocks --= timesToCleanup

       writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))

     } else {

       logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")

     }

    }

    ·  總結(jié)一下上面的三種WAL,對(duì)應(yīng)下面的三種事件,這就是ReceiverTracker的容錯(cuò)

    /** Trait representing any event in the ReceivedBlockTracker that updates its state. */

    private[streaming] sealed trait ReceivedBlockTrackerLogEvent

    private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)

    extends ReceivedBlockTrackerLogEvent

    private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)

    extends ReceivedBlockTrackerLogEvent

    private[streaming] case class BatchCleanupEvent(times: Seq[Time])  extends ReceivedBlockTrackerLogEvent

    ·  看一下Dstream.graph和JobGenerator的容錯(cuò),從開始

    private def generateJobs(time: Time) {

    SparkEnv has been removed.

     SparkEnv.set(ssc.env)

     Try {

     

       // allocate received blocks to batch

       // 分配接收到的數(shù)據(jù)給batch

       jobScheduler.receiverTracker.allocateBlocksToBatch(time)

       // 使用分配的塊生成jobs

       graph.generateJobs(time) // generate jobs using allocated block

     } match {

       case Success(jobs) =>

         // 獲取元數(shù)據(jù)信息

         val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

         // 提交jobSet

         jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

       case Failure(e) =>

         jobScheduler.reportError("Error generating jobs for time " + time, e)

     }

     eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))

    }

    jobs生成完成后發(fā)送DoCheckpoint消息,最終調(diào)用doCheckpoint方法,代碼如下

    private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {

     if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {

       logInfo("Checkpointing graph for time " + time)

       ssc.graph.updateCheckpointData(time)

       checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)

     }

    }

     

到此,關(guān)于“Driver容錯(cuò)安全性怎么實(shí)現(xiàn)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI