您好,登錄后才能下訂單哦!
本期內(nèi)容:
1. ReceiverBlockTracker容錯安全性
2. DStream和JobGenerator容錯安全性
一:容錯安全性
1. ReceivedBlockTracker負責管理Spark Streaming運行程序的元數(shù)據(jù)。數(shù)據(jù)層面
2. DStream和JobGenerator是作業(yè)調(diào)度的核心層面,也就是具體調(diào)度到什么程度了,從運行的考慮的。DStream是邏輯層面。
3. 作業(yè)生存層面,JobGenerator是Job調(diào)度層面,具體調(diào)度到什么程度了。從運行的角度的。
談Driver容錯你要考慮Driver中有那些需要維持狀態(tài)的運行。
1. ReceivedBlockTracker跟蹤了數(shù)據(jù),因此需要容錯。通過WAL方式容錯。
2. DStreamGraph表達了依賴關系,恢復狀態(tài)的時候需要根據(jù)DStream恢復計算邏輯級別的依賴關系。通過checkpoint方式容錯。
3. JobGenerator表面你是怎么基于ReceiverBlockTracker中的數(shù)據(jù),以及DStream構(gòu)成的依賴關系不斷的產(chǎn)生Job的過程。你消費了那些數(shù)據(jù),進行到什么程度了。
總結(jié)如下:
ReceivedBlockTracker:
1. ReceivedBlockTracker會管理Spark Streaming運行過程中所有的數(shù)據(jù)。并且把數(shù)據(jù)分配給需要的batches,所有的動作都會被WAL寫入到Log中,Driver失敗的話,就可以根據(jù)歷史恢復tracker狀態(tài),在ReceivedBlockTracker創(chuàng)建的時候,使用checkpoint保存歷史目錄。
下面就從Receiver收到數(shù)據(jù)之后,怎么處理的開始。
2. ReceiverBlockTracker.addBlock源碼如下:
Receiver接收到數(shù)據(jù),把元數(shù)據(jù)信息匯報上來,然后通過ReceiverSupervisorImpl就將數(shù)據(jù)匯報上來,就直接通過WAL進行容錯.
當Receiver的管理者,ReceiverSupervisorImpl把元數(shù)據(jù)信息匯報給Driver的時候,正在處理是交給ReceiverBlockTracker. ReceiverBlockTracker將數(shù)據(jù)寫進WAL文件中,然后才會寫進內(nèi)存中,被當前的Spark Streaming程序的調(diào)度器使用的,也就是JobGenerator使用的。JobGenerator不可能直接使用WAL。WAL的數(shù)據(jù)在磁盤中,這里JobGenerator使用的內(nèi)存中緩存的數(shù)據(jù)結(jié)構(gòu)
/** Add received block. This event will get written to the write ahead log (if enabled). */ def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { try { val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) //接收數(shù)據(jù)后,先進行WAL if (writeResult) { synchronized { getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo //當WAL成功后,將Block Info元數(shù)據(jù)信息加入到Block Queue中 } logDebug(s"Stream ${receivedBlockInfo.streamId} received " + s"block ${receivedBlockInfo.blockStoreResult.blockId}") } else { logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " + s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.") } writeResult } catch { case NonFatal(e) => logError(s"Error adding block $receivedBlockInfo", e) false } }
Driver端接收到的數(shù)據(jù)保存在streamIdToUnallocatedBlockQueues中,具體結(jié)構(gòu)如下:
private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo] private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
allocateBlocksToBatch把接收到的數(shù)據(jù)分配給batch,根據(jù)streamId取出Block,由此就知道Spark Streaming處理數(shù)據(jù)的時候可以有不同數(shù)據(jù)來源
那到底什么是batchTime?
batchTime是上一個Job分配完數(shù)據(jù)之后,開始再接收到的數(shù)據(jù)的時間。
/** * Allocate all unallocated blocks to the given batch. * This event will get written to the write ahead log (if enabled). */ def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { val streamIdToBlocks = streamIds.map { streamId => (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) //根據(jù)StreamId獲取Block信息 }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime //這里有對batchTime進行賦值,就是上一個job分配完數(shù)據(jù)后,開始在接收到數(shù)據(jù)的時間 } else { logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") } } else { logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") } }
隨著時間的推移,會不斷產(chǎn)生RDD,這時就需要清理掉一些歷史數(shù)據(jù),可以通過cleanupOldBatches方法來清理歷史數(shù)據(jù)
/** * Clean up block information of old batches. If waitForCompletion is true, this method * returns only after the files are cleaned up. */ def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized { require(cleanupThreshTime.milliseconds < clock.getTimeMillis()) val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq logInfo("Deleting batches " + timesToCleanup) if (writeToLog(BatchCleanupEvent(timesToCleanup))) { timeToAllocatedBlocks --= timesToCleanup writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion)) } else { logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.") } }
以上幾個方法都進行了WAL動作
(record: ReceivedBlockTrackerLogEvent): = { (isWriteAheadLogEnabled) { logTrace(record) { .get.write(ByteBuffer.(Utils.(record))clock.getTimeMillis()) } { (e) => logWarning(recorde) } } { } }
總結(jié):
WAL對數(shù)據(jù)的管理包括數(shù)據(jù)的生成,數(shù)據(jù)的銷毀和消費。上述在操作之后都要先寫入到WAL的文件中.
JobGenerator:
Checkpoint會有時間間隔Batch Duractions,Batch執(zhí)行前和執(zhí)行后都會進行checkpoint。
doCheckpoint被調(diào)用的前后流程:
1、簡單看下generateJobs
/** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) //job完成后就需要進行checkpoint動作 }
2、processEvent接收到消息事件
/** Processes all events */ private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) // 調(diào)用doCheckpoint方法 case ClearCheckpointData(time) => clearCheckpointData(time) } }
3、doCheckpoint源碼如下:
/** Perform checkpoint for the give `time`. */ private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) { if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { logInfo("Checkpointing graph for time " + time) ssc.graph.updateCheckpointData(time) //最終是進行RDD的Checkpoint checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater) } }
4、DStream中的updateCheckpointData源碼如下:最終導致RDD的Checkpoint
/** * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of * this stream. This is an internal method that should not be called directly. This is * a default implementation that saves only the file names of the checkpointed RDDs to * checkpointData. Subclasses of DStream (especially those of InputDStream) may override * this method to save custom checkpoint data. */ private[streaming] def updateCheckpointData(currentTime: Time) { logDebug("Updating checkpoint data for time " + currentTime) checkpointData.update(currentTime) dependencies.foreach(_.updateCheckpointData(currentTime)) logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData) }
JobGenerator容錯安全性如下圖:
參考博客:http://blog.csdn.net/snail_gesture/article/details/51492873#comments
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。