您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“Job動(dòng)態(tài)生成方法是什么”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
在spark stream程序中的一條關(guān)鍵的語(yǔ)句就是:ssc.start()
1,跟蹤進(jìn)入StreamingContext的start 方法,有一句非常關(guān)鍵的語(yǔ)句scheduler.start(),是個(gè)JobScheduler(spark stream用來(lái)job調(diào)度的)
進(jìn)行job調(diào)度的入口!
2,計(jì)入JobScheduler 的start方法。
在這個(gè)方法中幾個(gè)關(guān)鍵的點(diǎn)是:
eventLoop.start() 一個(gè)事件循環(huán)器,用于響應(yīng)其它組件發(fā)來(lái)的事件(包括job的啟動(dòng),完成,以及錯(cuò)誤報(bào)告)。
receiverTracker.start() 控制了整個(gè)receiver的生成,與數(shù)據(jù)的接受
jobGenerator.start() 真正開始進(jìn)行job的生成
在這個(gè)方法中也維護(hù)了一個(gè)事件處理的循環(huán)器eventLoop,用于處理各種事件
其中最為關(guān)鍵的事件是GenerateJobs(time),這個(gè)事件是進(jìn)行生成job的事件?。?/p>
跟蹤計(jì)入generateJobs(time)
jobScheduler.receiverTracker.allocateBlocksToBatch(time) 為當(dāng)前的bath分發(fā)收到的數(shù)據(jù)Blocks。
graph.generateJobs(time):根據(jù)當(dāng)前編寫的程序的output動(dòng)作生成相應(yīng)的job并封裝進(jìn)入集合中。
最終通過
提交作業(yè)到executor
在回去看看jobGenerator.start()中的startFirstTime()
private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) }
第一次啟動(dòng)會(huì)啟動(dòng)一個(gè)定時(shí)器,該定時(shí)器會(huì)根基duration bath 不斷的的給jobGenerator中的消息循環(huán)體!
在jobGenerator中的消息循環(huán)體就會(huì)不斷的去除消息進(jìn)行處理
“Job動(dòng)態(tài)生成方法是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。