您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關(guān)怎么進(jìn)行Spark Streaming 原理剖析,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
Spark Streaming 通過分布在各個(gè)節(jié)點(diǎn)上的接收器,緩存接收到的數(shù)據(jù),并將數(shù)據(jù)包裝成Spark能夠處理的RDD的格式,輸入到Spark Streaming,之后由Spark Streaming將作業(yè)提交到Spark集群進(jìn)行執(zhí)行,如下圖:
初始化的過程主要可以概括為兩點(diǎn)。即:
1. 調(diào)度器的初始化。
調(diào)度器調(diào)度Spark Streaming的運(yùn)行,用戶可以通過配置相關(guān)參數(shù)進(jìn)行調(diào)優(yōu)。
2. 將輸入流的接收器轉(zhuǎn)化為Spark能夠處理的RDD格式,并在集群進(jìn)行分布式分配,然后啟動(dòng)接收器集合中的每個(gè)接收器。
針對(duì)不同的數(shù)據(jù)源,Spark Streaming提供了不同的數(shù)據(jù)接收器,分布在各個(gè)節(jié)點(diǎn)上的每個(gè)接收器可以認(rèn)為是一個(gè)特定的進(jìn)程,接收一部分流數(shù)據(jù)作為輸入。
首先,先看看JavaStreamingContext的部分源碼,說明能夠接收一個(gè)Socket文本數(shù)據(jù),也可以把文件當(dāng)做輸入流作為數(shù)據(jù)源,如下:
class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { def this(master: String, appName: String, batchDuration: Duration) = this(new StreamingContext(master, appName, batchDuration, null, Nil, Map())) def socketTextStream(hostname: String, port: Int): JavaReceiverInputDStream[String] = { ssc.socketTextStream(hostname, port) } def textFileStream(directory: String): JavaDStream[String] = { ssc.textFileStream(directory) } }
eg:完成以下代碼的時(shí)候:
val lines = ssc.socketTextStream("master", 9999)
這樣可以從socket接收文本數(shù)據(jù),而其返回的是JavaReceiverInputDStream,它是ReceiverInputDStream的一個(gè)實(shí)現(xiàn),內(nèi)含Receiver,可接收數(shù)據(jù),并轉(zhuǎn)化為Spark能處理的RDD數(shù)據(jù)。
在來看看JavaReceiverInputDStream中的部分源碼:
class JavaReceiverInputDStream[T](val receiverInputDStream: ReceiverInputDStream[T]) (implicit override val classTag: ClassTag[T]) extends JavaInputDStream[T](receiverInputDStream) { } object JavaReceiverInputDStream { implicit def fromReceiverInputDStream[T: ClassTag]( receiverInputDStream: ReceiverInputDStream[T]): JavaReceiverInputDStream[T] = { new JavaReceiverInputDStream[T](receiverInputDStream) } }
通過源碼了解JavaReceiverInputDStream是ReceiverInputDStream的一個(gè)實(shí)現(xiàn),內(nèi)含Receiver,可接收數(shù)據(jù),并轉(zhuǎn)化為RDD 數(shù)據(jù),其部分源碼如下:(注意英文注釋)
abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext) extends InputDStream[T](ssc_) { /** * Gets the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation * of a ReceiverInputDStream. */ def getReceiver(): Receiver[T] // Nothing to start or stop as both taken care of by the ReceiverTracker. def start() {} /** * Generates RDDs with blocks received by the receiver of this stream. */ override def compute(validTime: Time): Option[RDD[T]] = { val blockRDD = { if (validTime < graph.startTime) { new BlockRDD[T](ssc.sc, Array.empty) } else { // Otherwise, ask the tracker for all the blocks that have been allocated to this stream // for this batch val receiverTracker = ssc.scheduler.receiverTracker val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) // Register the input blocks information into InputInfoTracker val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) // Create the BlockRDD createBlockRDD(validTime, blockInfos) } } Some(blockRDD) } }
當(dāng)調(diào)用 getReceiver()時(shí)候,過程如下:(SocketInputDStream的部分源碼)
private[streaming] class SocketInputDStream[T: ClassTag]( ssc_ : StreamingContext, host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends ReceiverInputDStream[T](ssc_) { def getReceiver(): Receiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) } }
而其實(shí)際上是 new 了一個(gè)SocketReceiver對(duì)象,并將之前的參數(shù)給傳遞進(jìn)來,實(shí)現(xiàn)如下:
private[streaming] class SocketReceiver[T: ClassTag]( host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { def onStart() { // Start the thread that receives data over a connection new Thread("Socket Receiver") { setDaemon(true) override def run() { receive() } }.start() } /** Create a socket connection and receive data until receiver is stopped */ def receive() { var socket: Socket = null try { logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) } if (!isStopped()) { restart("Socket data stream had no more data") } else { logInfo("Stopped receiving") } } catch { //... } } }
對(duì)于子類實(shí)現(xiàn)這個(gè)方法,worker節(jié)點(diǎn)調(diào)用后能得到Receiver,使得數(shù)據(jù)接收的工作能分布到worker上。
接收器分布在各個(gè)節(jié)點(diǎn)上,如下:
在上述“初始化與接收數(shù)據(jù)”步驟中,簡(jiǎn)單介紹了receiver集合轉(zhuǎn)化為RDD,在集群上分布式地接收數(shù)據(jù)流,那么接下來將簡(jiǎn)單了解receiver是怎樣接收并處理數(shù)據(jù)流。大致流程如下圖:
Receiver提供了一系列store()接口,eg:(更多請(qǐng)查看源碼)
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable { def store(dataItem: T) { supervisor.pushSingle(dataItem) } /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ def store(dataBuffer: ArrayBuffer[T]) { supervisor.pushArrayBuffer(dataBuffer, None, None) } }
對(duì)于這些接口,已經(jīng)是實(shí)現(xiàn)好了的,會(huì)由worker節(jié)點(diǎn)上初始化的ReceiverSupervisor來完成這些存儲(chǔ)功能。ReceiverSupervisor還會(huì)對(duì)Receiver做監(jiān)控,如監(jiān)控是否啟動(dòng)了、是否停止了、是否要重啟、匯報(bào)error等等。部分ReceiverSupervisor如下:
def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener): BlockGenerator
private[streaming] abstract class ReceiverSupervisor( receiver: Receiver[_], conf: SparkConf ) extends Logging { /** Push a single data item to backend data store. */ def pushSingle(data: Any) /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ def pushArrayBuffer( arrayBuffer: ArrayBuffer[_], optionalMetadata: Option[Any], optionalBlockId: Option[StreamBlockId] ) /** * Create a custom [[BlockGenerator]] that the receiver implementation can directly control * using their provided [[BlockGeneratorListener]]. * * Note: Do not explicitly start or stop the `BlockGenerator`, the `ReceiverSupervisorImpl` * will take care of it. */ def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener): BlockGenerator /** Start receiver */ def startReceiver(): Unit = synchronized { //... } }
ReceiverSupervisor的存儲(chǔ)接口的實(shí)現(xiàn),借助的是BlockManager,數(shù)據(jù)會(huì)以RDD的形式被存放,根據(jù)StorageLevel選擇不同存放策略。默認(rèn)是序列化后存內(nèi)存,放不下的話寫磁盤(executor)。被計(jì)算出來的RDD中間結(jié)果,默認(rèn)存放策略是序列化后只存內(nèi)存。
最根本原因是store函數(shù)內(nèi)部的實(shí)現(xiàn)是調(diào)用了BlockGenerator的addData方法,最終是將數(shù)據(jù)存儲(chǔ)在currentBuffer中,而currentBuffer其實(shí)就是一個(gè)ArrayBuffer[Any]。
BlockGenerator的 addData方法 源碼如下: currentBuffer += data
/** * Push a single data item into the buffer. */ def addData(data: Any): Unit = { if (state == Active) { waitToPush() synchronized { if (state == Active) { currentBuffer += data } else { throw new SparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") } } } else { throw new SparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") } }
而如何將緩沖數(shù)據(jù)轉(zhuǎn)化為數(shù)據(jù)塊呢?
其實(shí)在BlockGenerator內(nèi)部存在兩個(gè)線程:
(1)、定期地生成新的batch,然后再將之前生成的batch封裝成block。這里的定期其實(shí)就是spark.streaming.blockInterval
參數(shù)配置的,默認(rèn)是200ms。源碼如下:
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
(2)、將生成的block發(fā)送到Block Manager中。
第一個(gè)線程:
第一個(gè)線程定期地調(diào)用updateCurrentBuffer
函數(shù)將存儲(chǔ)在currentBuffer
中的數(shù)據(jù)封裝成Block,至于塊是如何產(chǎn)生的,即在 BlockGenerator中有一個(gè)定時(shí)器(RecurringTimer),將當(dāng)前緩沖區(qū)中的數(shù)據(jù)以用戶定義的時(shí)間間隔封裝為一個(gè)數(shù)據(jù)塊Block。源碼如下:
private val blockIntervalTimer = new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
然后將一批記錄轉(zhuǎn)化為的一個(gè)數(shù)據(jù)塊放在blocksForPushing中,blocksForPushing是ArrayBlockingQueue[Block]類型的隊(duì)列。源碼如下:
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10) private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
其大小默認(rèn)是10,我們可以通過spark.streaming.blockQueueSize
參數(shù)配置(當(dāng)然,在很多情況下這個(gè)值不需要我們?nèi)ヅ渲茫.?dāng)blocksForPushing沒有多余的空間,那么該線程就會(huì)阻塞,直到有剩余的空間可用于存儲(chǔ)新生成的Block。如果你的數(shù)據(jù)量真的很大,大到blocksForPushing無法及時(shí)存儲(chǔ)那些block,這時(shí)候你就得考慮加大spark.streaming.blockQueueSize
的大小了。
updateCurrentBuffer函數(shù)實(shí)現(xiàn)如下源碼:
/** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = { try { var newBlock: Block = null synchronized { if (currentBuffer.nonEmpty) { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] val blockId = StreamBlockId(receiverId, time - blockIntervalMs) listener.onGenerateBlock(blockId) newBlock = new Block(blockId, newBlockBuffer) } } if (newBlock != null) { blocksForPushing.put(newBlock) // put is blocking when queue is full } } catch { case ie: InterruptedException => logInfo("Block updating timer thread was interrupted") case e: Exception => reportError("Error in block updating thread", e) } }
第二個(gè)線程:
第二個(gè)線程不斷地調(diào)用keepPushingBlocks函數(shù)從blocksForPushing阻塞隊(duì)列中獲取生成的Block,然后調(diào)用pushBlock方法將Block存儲(chǔ)到BlockManager中。
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
keepPushingBlocks實(shí)現(xiàn)源碼如下:
/** Keep pushing blocks to the BlockManager. */ private def keepPushingBlocks() { logInfo("Started block pushing thread") def areBlocksBeingGenerated: Boolean = synchronized { state != StoppedGeneratingBlocks } try { // While blocks are being generated, keep polling for to-be-pushed blocks and push them. while (areBlocksBeingGenerated) { Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match { case Some(block) => pushBlock(block) case None => } } // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks. logInfo("Pushing out the last " + blocksForPushing.size() + " blocks") while (!blocksForPushing.isEmpty) { val block = blocksForPushing.take() logDebug(s"Pushing block $block") pushBlock(block) logInfo("Blocks left to push " + blocksForPushing.size()) } logInfo("Stopped block pushing thread") } catch { case ie: InterruptedException => logInfo("Block pushing thread was interrupted") case e: Exception => reportError("Error in block pushing thread", e) } }
pushBlock實(shí)現(xiàn)源碼如下:
private def pushBlock(block: Block) { listener.onPushBlock(block.id, block.buffer) logInfo("Pushed block " + block.id) }
當(dāng)存儲(chǔ)到BlockManager中后,會(huì)返回一個(gè)BlcokStoreResult結(jié)果,這就是成功存儲(chǔ)到BlockManager的StreamBlcokId。
然后下一步就是將BlcokStoreResult封裝成ReceivedBlockInfo,這也就是最新的未處理過的數(shù)據(jù),然后通過Akka告訴ReceiverTracker有新的塊加入,ReceiverTracker 會(huì)調(diào)用addBlock方法將ReceivedBlockInfo存儲(chǔ)到streamIdToUnallocatedBlockQueues隊(duì)列中。
ReceiverTracker會(huì)將存儲(chǔ)的blockId放到對(duì)應(yīng)StreamId的隊(duì)列中。(HashMap)
private val receiverTrackingInfos = new HashMap[Int, ReceiverTrackingInfo]
receiverTrackingInfos.put(streamId, receiverTrackingInfo)
ReceiverTracker中addBlock方法源碼的實(shí)現(xiàn)如下:
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { receivedBlockTracker.addBlock(receivedBlockInfo) }
ReceivedBlockInfo源碼實(shí)現(xiàn)如下:
def blockId: StreamBlockId = blockStoreResult.blockId
/** Information about blocks received by the receiver */ private[streaming] case class ReceivedBlockInfo( streamId: Int, numRecords: Option[Long], metadataOption: Option[Any], blockStoreResult: ReceivedBlockStoreResult ) { require(numRecords.isEmpty || numRecords.get >= 0, "numRecords must not be negative") @volatile private var _isBlockIdValid = true def blockId: StreamBlockId = blockStoreResult.blockId }
最后看看 ReceivedBlockStoreResult 的部分源碼:
private[streaming] trait ReceivedBlockStoreResult { // Any implementation of this trait will store a block id def blockId: StreamBlockId // Any implementation of this trait will have to return the number of records def numRecords: Option[Long] }
Spark Streaming 根據(jù)時(shí)間段,將數(shù)據(jù)切分為 RDD,然后觸發(fā) RDD 的 Action 提
交 Job, Job 被 提 交 到 Job Manager 中 的 Job Queue 中 由 JobScheduler 調(diào) 度, 之 后
Job Scheduler 將 Job 提交到 Spark 的 Job 調(diào)度器,然后將 Job 轉(zhuǎn)換為大量的任務(wù)分
發(fā)給 Spark 集群執(zhí)行,
然后接下來了解下JobScheduler的源碼:
JobScheduler是整個(gè)Spark Streaming調(diào)度的核心組件。
部分源碼:
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } }
進(jìn)入Graph生成job的方法,Graph本質(zhì)是DStreamGraph類生成的對(duì)象,部分源碼如下:
final private[streaming] class DStreamGraph extends Serializable with Logging { private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]() def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs } }
outputStreaming中的對(duì)象是DStream,看下DStream的generateJob:
此處相當(dāng)于針對(duì)每個(gè)時(shí)間段生成一個(gè)RDD,會(huì)調(diào)用SparkContext的方法runJob提交Spark的一個(gè)Job。
private[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } case None => None } }
以上就是怎么進(jìn)行Spark Streaming 原理剖析,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。