溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

(版本定制)第10課:Spark Streaming源碼解讀

發(fā)布時(shí)間:2020-07-16 02:18:28 來(lái)源:網(wǎng)絡(luò) 閱讀:426 作者:Spark_2016 欄目:大數(shù)據(jù)

本期內(nèi)容:

    1、數(shù)據(jù)接收架構(gòu)設(shè)計(jì)模式

    2、數(shù)據(jù)接收源碼徹底研究


1、Receiver接受數(shù)據(jù)的過(guò)程類似于MVC模式:

Receiver,ReceiverSupervisor和Driver的關(guān)系相當(dāng)于Model,Control,View,也就是MVC。

Model就是Receiver,存儲(chǔ)數(shù)據(jù)Control,就是ReceiverSupervisor,Driver是獲得元數(shù)據(jù),也就是View。

(版本定制)第10課:Spark Streaming源碼解讀

2、數(shù)據(jù)的位置信息會(huì)被封裝到RDD里面。

3、Receiver接受數(shù)據(jù),交給ReceiverSupervisor去存儲(chǔ)數(shù)據(jù)。

4、ReceiverTracker是通過(guò)發(fā)送一個(gè)又一個(gè)的Job,每個(gè)Job只有一個(gè)Task,每個(gè)Task里面就只有一個(gè)ReceiverSupervisor,用這個(gè)函數(shù)啟動(dòng)每一個(gè)Receiver。


下面我們簡(jiǎn)單的看下Receiver啟動(dòng)流程,應(yīng)用程序首先通過(guò)JobScheduler的start方法來(lái)啟動(dòng)receiverTracker的start方法:

def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started

logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
  }
eventLoop.start()

// attach rate controllers of input streams to receive batch completion updates
for {
    inputDStream <- ssc.graph.getInputStreams
    rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)

listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
receiverTracker.start() //receiver啟動(dòng)
jobGenerator.start()
  logInfo("Started JobScheduler")
}

通過(guò)調(diào)用receiverTracker.start()方法來(lái)進(jìn)行一系列的操作:

/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
  }

if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) //Rpc消息通信,獲取receiver的狀態(tài)
if (!skipReceiverLaunch) launchReceivers() //啟動(dòng)receiver
    logInfo("ReceiverTracker started")
trackerState = Started
}
}

下面通過(guò)畫圖簡(jiǎn)單的描述下Receiver啟動(dòng)的內(nèi)部機(jī)制:

(版本定制)第10課:Spark Streaming源碼解讀


參考博客:http://blog.csdn.net/hanburgud/article/details/51471047

                 http://lqding.blog.51cto.com/9123978/1774426

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI