您好,登錄后才能下訂單哦!
這篇文章主要介紹“Driver容錯(cuò)安全性怎么實(shí)現(xiàn)”,在日常操作中,相信很多人在Driver容錯(cuò)安全性怎么實(shí)現(xiàn)問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對(duì)大家解答”Driver容錯(cuò)安全性怎么實(shí)現(xiàn)”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!
· 第一、看ReceiverTracker的容錯(cuò),主要是ReceiverTracker接收元數(shù)據(jù)的進(jìn)入WAL,看ReceiverTracker的addBlock方法,代碼如下
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
try {
val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
if (writeResult) {
synchronized {
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
}
logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
s"block ${receivedBlockInfo.blockStoreResult.blockId}")
} else {
logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
}
writeResult
} catch {
case NonFatal(e) =>
logError(s"Error adding block $receivedBlockInfo", e)
false
}
}
writeToLog方法就是進(jìn)行WAL的操作,看writeToLog的代碼
private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
if (isWriteAheadLogEnabled) {
logTrace(s"Writing record: $record")
try {
writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
clock.getTimeMillis())
true
} catch {
case NonFatal(e) =>
logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
false
}
} else {
true
}
}
首先判斷是否開啟了WAL,根據(jù)一下isWriteAheadLogEnabled值
private[streaming] def isWriteAheadLogEnabled: Boolean = writeAheadLogOption.nonEmpty
接著看writeAheadLogOption
private val writeAheadLogOption = createWriteAheadLog()
再看createWriteAheadLog()方法
private def createWriteAheadLog(): Option[WriteAheadLog] = {
checkpointDirOption.map { checkpointDir =>
val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
}
}
根據(jù)checkpoint的配置,獲取checkpoint的目錄,這里可以看出,checkpoint可以有多個(gè)目錄。
寫完WAL才將receivedBlockInfo放到內(nèi)存隊(duì)列g(shù)etReceivedBlockQueue中
· 第二、看ReceivedBlockTracker的allocateBlocksToBatch方法,代碼如下
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
}
} else {
// This situation occurs when:
// 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
// possibly processed batch job or half-processed batch job need to be processed again,
// so the batchTime will be equal to lastAllocatedBatchTime.
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
}
}
首先從getReceivedBlockQueue中獲取每一個(gè)receiver的ReceivedBlockQueue隊(duì)列賦值給streamIdToBlocks,然后包裝一下
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
allocatedBlocks就是根據(jù)時(shí)間獲取的一批元數(shù)據(jù),交給對(duì)應(yīng)batchDuration的job,job在執(zhí)行的時(shí)候就可以使用,在使用前先進(jìn)行WAL,如果job出錯(cuò)恢復(fù)后,可以知道數(shù)據(jù)計(jì)算到什么位置
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
}
· 第三、看cleanupOldBatches方法,cleanupOldBatches的功能是從內(nèi)存中清楚不用的batches元數(shù)據(jù),再刪除WAL的數(shù)據(jù),再刪除之前把要?jiǎng)h除的batches信息也進(jìn)行WAL
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
logInfo("Deleting batches " + timesToCleanup)
if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
timeToAllocatedBlocks --= timesToCleanup
writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
} else {
logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")
}
}
· 總結(jié)一下上面的三種WAL,對(duì)應(yīng)下面的三種事件,這就是ReceiverTracker的容錯(cuò)
/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
private[streaming] sealed trait ReceivedBlockTrackerLogEvent
private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
extends ReceivedBlockTrackerLogEvent
private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
extends ReceivedBlockTrackerLogEvent
private[streaming] case class BatchCleanupEvent(times: Seq[Time]) extends ReceivedBlockTrackerLogEvent
· 看一下Dstream.graph和JobGenerator的容錯(cuò),從開始
private def generateJobs(time: Time) {
SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
// allocate received blocks to batch
// 分配接收到的數(shù)據(jù)給batch
jobScheduler.receiverTracker.allocateBlocksToBatch(time)
// 使用分配的塊生成jobs
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
// 獲取元數(shù)據(jù)信息
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
// 提交jobSet
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
jobs生成完成后發(fā)送DoCheckpoint消息,最終調(diào)用doCheckpoint方法,代碼如下
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time " + time)
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
}
}
到此,關(guān)于“Driver容錯(cuò)安全性怎么實(shí)現(xiàn)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。