您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“Spark Streaming運(yùn)行流程是怎樣的”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
通過下面的一個(gè)簡(jiǎn)單的例子來理解spark streaming
object OnlineForeachRDD2DB { def main(args: Array[String]){ /* * 第1步:創(chuàng)建Spark的配置對(duì)象SparkConf,設(shè)置Spark程序的運(yùn)行時(shí)的配置信息, * 例如說通過setMaster來設(shè)置程序要鏈接的Spark集群的Master的URL,如果設(shè)置 * 為local,則代表Spark程序在本地運(yùn)行,特別適合于機(jī)器配置條件非常差(例如 * 只有1G的內(nèi)存)的初學(xué)者 * */ val conf = new SparkConf() //創(chuàng)建SparkConf對(duì)象 conf.setAppName("OnlineForeachRDD") //設(shè)置應(yīng)用程序的名稱,在程序運(yùn)行的監(jiān)控界面可以看到名稱 // conf.setMaster("spark://Master:7077") //此時(shí),程序在Spark集群 conf.setMaster("local[6]") //設(shè)置batchDuration時(shí)間間隔來控制Job生成的頻率并且創(chuàng)建Spark Streaming執(zhí)行的入口 val ssc = new StreamingContext(conf, Seconds(5)) val lines = ssc.socketTextStream("Master", 9999) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => { // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => { val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")" val stmt = connection.createStatement(); stmt.executeUpdate(sql); }) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } } } /** * 在StreamingContext調(diào)用start方法的內(nèi)部其實(shí)是會(huì)啟動(dòng)JobScheduler的Start方法,進(jìn)行消息循環(huán),在JobScheduler * 的start內(nèi)部會(huì)構(gòu)造JobGenerator和ReceiverTacker,并且調(diào)用JobGenerator和ReceiverTacker的start方法: * 1,JobGenerator啟動(dòng)后會(huì)不斷的根據(jù)batchDuration生成一個(gè)個(gè)的Job * 2,ReceiverTracker啟動(dòng)后首先在Spark Cluster中啟動(dòng)Receiver(其實(shí)是在Executor中先啟動(dòng)ReceiverSupervisor),在Receiver收到 * 數(shù)據(jù)后會(huì)通過ReceiverSupervisor存儲(chǔ)到Executor并且把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker * 內(nèi)部會(huì)通過ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息 * 每個(gè)BatchInterval會(huì)產(chǎn)生一個(gè)具體的Job,其實(shí)這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD * 的DAG而已,從Java角度講,相當(dāng)于Runnable接口實(shí)例,此時(shí)要想運(yùn)行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個(gè) * 單獨(dú)的線程來提交Job到集群運(yùn)行(其實(shí)是在線程中基于RDD的Action觸發(fā)真正的作業(yè)的運(yùn)行),為什么使用線程池呢? * 1,作業(yè)不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執(zhí)行Task有異曲同工之妙; * 2,有可能設(shè)置了Job的FAIR公平調(diào)度的方式,這個(gè)時(shí)候也需要多線程的支持; * */ ssc.start() ssc.awaitTermination() } }
“Spark Streaming運(yùn)行流程是怎樣的”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。