溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

SparkStreaming的實戰(zhàn)案例

發(fā)布時間:2020-07-07 19:45:48 來源:網(wǎng)絡(luò) 閱讀:2311 作者:原生zzy 欄目:大數(shù)據(jù)

廢話不多說,直接上干貨?。?!
相關(guān)依賴

  <properties>
    <project.build.sourceEncoding>UTF8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.3.2</spark.version>
    <hadoop.version>2.7.6</hadoop.version>
    <scala.compat.version>2.11</scala.compat.version>
  </properties>
      <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!-- sparkStreaming -->
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.scalikejdbc/scalikejdbc -->
    <dependency>
      <groupId>org.scalikejdbc</groupId>
      <artifactId>scalikejdbc_2.11</artifactId>
      <version>3.2.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>2.8.0</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>compile</scope>
    </dependency>
  </dependencies>

(1)spark streaminging無狀態(tài)計算的WordCount

編程架構(gòu)
SparkStreaming的實戰(zhàn)案例
在某個節(jié)點上中啟動nc -lk 9999,然后用作數(shù)據(jù)源。編寫程序?qū)崿F(xiàn)網(wǎng)絡(luò)的wordcount。
代碼實現(xiàn)

object NetWordCount {
    /**
      * 編程套路:
      * 1.獲取編程入口,StreamingContext
      * 2.通過StreamingContext構(gòu)建第一個DStream
      * 3.對DStream進行各種的transformation操作
      * 4.對于數(shù)據(jù)結(jié)果進行output操作
      * 5.提交sparkStreaming應用程序
      */
    def main(args: Array[String]): Unit = {
        //屏蔽日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
        Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN)
        Logger.getLogger("org.apache.hive").setLevel(Level.WARN)

        //1.獲取編程入口,StreamingContext
       val conf= new SparkConf().setMaster("local[2]")
            .setAppName("NetWordCount")
        //第二個參數(shù),表示批處理時長
        val ssc=new StreamingContext(conf,Seconds(2))

        /**
          * 2.通過StreamingContext構(gòu)建第一個DStream(通過網(wǎng)絡(luò)去讀數(shù)據(jù))
          * 第一個參數(shù):主機名
          * 第二個參數(shù):端口號
          */
        val ReceiverInputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("test",9999)

        //3.對DStream進行各種的transformation操作
        val wordDS: DStream[String] = ReceiverInputDStream.flatMap(msg => {
            msg.split("\\s+")
        })
        val wordCountDS: DStream[(String, Int)] = wordDS.map(word=>(word,1)).reduceByKey(_+_)
        //4.對于數(shù)據(jù)結(jié)果進行output操作,這里是打印輸出
        wordCountDS.print()
        //5.提交sparkStreaming應用程序
        ssc.start()
        ssc.awaitTermination()
    }
}

使用nc -lk 9999在相應的節(jié)點上發(fā)出消息(每隔一個批處理時間發(fā)送一次),查看控制臺打?。?br/>batch2
SparkStreaming的實戰(zhàn)案例
batch3
SparkStreaming的實戰(zhàn)案例
結(jié)果發(fā)現(xiàn):由于現(xiàn)在的操作時無狀態(tài)的,所以每隔2s處理一次,但是每次的單詞數(shù)不會統(tǒng)計,也就是說,只會統(tǒng)計當前批處理的單詞,之前輸入的則不會統(tǒng)計。


(2)spark streaminging有狀態(tài)計算的WordCount

同樣是wordCounte,這次要實現(xiàn)的效果是:到現(xiàn)在為止,統(tǒng)計過去時間段內(nèi)的所有單詞的個數(shù)。
代碼

object UpdateStateBykeyWordCount {
    /**
      * 編程套路:
      * 1.獲取編程入口,StreamingContext
      * 2.通過StreamingContext構(gòu)建第一個DStream
      * 3.對DStream進行各種的transformation操作
      * 4.對于數(shù)據(jù)結(jié)果進行output操作
      * 5.提交sparkStreaming應用程序
      */
    def main(args: Array[String]): Unit = {
        //屏蔽日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
        Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN)
        Logger.getLogger("org.apache.hive").setLevel(Level.WARN)

        //1.獲取編程入口,StreamingContext
        val conf = new SparkConf().setMaster("local[2]")
            .setAppName("NetWordCount")
        //第二個參數(shù),表示批處理時長
        val ssc = new StreamingContext(conf, Seconds(2))
        ssc.checkpoint("C:\\z_data\\checkPoint\\checkPoint_1")
        /**
          * 2.通過StreamingContext構(gòu)建第一個DStream(通過網(wǎng)絡(luò)去讀數(shù)據(jù))
          * 第一個參數(shù):主機名
          * 第二個參數(shù):端口號
          */
        val ReceiverInputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("test", 9999)

        //3.對DStream進行各種的transformation操作
        val wordDS: DStream[(String,Int)] = ReceiverInputDStream.flatMap(msg => {
            msg.split("\\s+")
        }).map(word=>(word,1))
        /**
          * updateStateByKey是狀態(tài)更新函數(shù),
          * updateFunc: (Seq[V], Option[S]) => Option[S]
          * (U,C)=>C
          * values:Seq[Int],state:Option[Int]==>Option[Int]
 *
          * @param values :新值
          * @param state :狀態(tài)值
          * @return
          */
        val updateDS: DStream[(String, Int)] = wordDS.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
            Option(values.sum + state.getOrElse(0))
        })
        //4.對于數(shù)據(jù)結(jié)果進行output操作,這里是打印輸出
        updateDS.print()
        //5.提交sparkStreaming應用程序
        ssc.start()
        ssc.awaitTermination()
    }
}

使用 nc -kl 9999
SparkStreaming的實戰(zhàn)案例
觀察控制臺:
batch2
SparkStreaming的實戰(zhàn)案例
batch3
SparkStreaming的實戰(zhàn)案例
發(fā)現(xiàn):兩次批處理的結(jié)果,進行了聚合,也就是所謂的有狀態(tài)的計算。
注意
ssc.checkpoint("C:\\z_data\\checkPoint\\checkPoint_1")
上面這句代碼一定要加,他會將上一次的批處理計算的結(jié)果保存起來,如果不加:
錯誤:requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().


(2)spark streaminging的HA

?? 在上述的updateStateByKey代碼中如果當前程序運行異常時,會丟失數(shù)據(jù)(重啟之后,找不回原來計算的數(shù)據(jù)),因為編程入口StreamingContext在代碼重新運行的時候,是重新生成的,為了使程序在異常退出的時候,在下次啟動的時候,依然可以獲得上一次的StreamingContext對象,保證計算數(shù)據(jù)不丟失,此時就需要將StreamingContext對象存儲在持久化的系統(tǒng)中。也就是說需要制作StreamingContext對象的HA。
代碼

object WC_DriverHA {
    def main(args: Array[String]): Unit = {
        //屏蔽日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
        Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN)
        Logger.getLogger("org.apache.hive").setLevel(Level.WARN)
        /**
          * StreamingContext.getOrCreate()
          * 第一個參數(shù):checkpointPath,和下面方法中的checkpointPath目錄一致
          * 第二個參數(shù):creatingFunc: () => StreamingContext:用于創(chuàng)建StreamingContext對象
          * 最終使用StreamingContext.getOrCreate()可以實現(xiàn)StreamingContext對象的HA,保證在程序重新運行的時候,之前狀態(tài)仍然可以恢復
          */

       val ssc= StreamingContext.getActiveOrCreate("C:\\z_data\\checkPoint\\checkPoint_HA",functionToCreateContext)
        ssc.start()
        ssc.awaitTermination()
    }
    def functionToCreateContext():StreamingContext={
        //1.獲取編程入口,StreamingContext
        val conf = new SparkConf().setMaster("local[2]")
            .setAppName("NetWordCount")
        //第二個參數(shù),表示批處理時長
        val ssc = new StreamingContext(conf, Seconds(2))
        ssc.checkpoint("C:\\z_data\\checkPoint\\checkPoint_HA")
        /**
          * 2.通過StreamingContext構(gòu)建第一個DStream(通過網(wǎng)絡(luò)去讀數(shù)據(jù))
          * 第一個參數(shù):主機名
          * 第二個參數(shù):端口號
          */
        val ReceiverInputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("test", 9999)

        //3.對DStream進行各種的transformation操作
        val wordDS: DStream[(String,Int)] = ReceiverInputDStream.flatMap(msg => {
            msg.split("\\s+")
        }).map(word=>(word,1))
        val updateDS: DStream[(String, Int)] = wordDS.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
            Option(values.sum + state.getOrElse(0))
        })
        //4.對于數(shù)據(jù)結(jié)果進行output操作,這里是打印輸出
        updateDS.print()
        //5.提交sparkStreaming應用程序
        ssc.start()
        ssc.awaitTermination()
        ssc
    }
}

測試
?? - 先正常運行一段時間,計算出結(jié)果
?? - 停止程序
?? - 再次啟動
?? - 驗證再次啟動的程序,是否能夠拿回停止前計算得到的結(jié)果
原理
??如果是第一次執(zhí)行,那么在在這個checkpointDriectory目錄中是不存在streamingContext對象的,所以要創(chuàng)建,第二次運行的時候,就不會在創(chuàng)建,則是從checkpointDriectory目錄中讀取進行恢復。
注意
??正常情況下,使用這種方式的HA,只能持久狀態(tài)數(shù)據(jù)到持久化的文件中,默認情況是不會持久化StreamingContext對象到CheckPointDriectory中的。


(3)對checkpoint的總結(jié):

?1)checkpoint的介紹:

??從故障中恢復checkpoint中有兩種類型
???- Metadata checkpointing:driver節(jié)點中的元數(shù)據(jù)信息
?????- Configuration:用于創(chuàng)建流式應用程序的配置
?????- DStream:定義streaming程序的DStream操作
?????- Incomplete batches:批量的job排隊但尚未完成。(程序上次運行到的位置)
???- Data checkpointing:將生成的RDD保存到可靠的存儲
?????- 計算之后生成的RDD
?????- 在receiver接收到數(shù)據(jù),轉(zhuǎn)化的RDD

?2)checkpoint的啟動時機:

?? - 從運行應用程序的driver的故障中恢復-元數(shù)據(jù),(driver的HA)
?? - 使用有狀態(tài)計算的時候啟動checkPoint:updateStateByKey或者reduceByKeyAndWindow…

?3)checkpoint的配置:

?? - 有狀態(tài)計算的時候:
?? ?ssc.checkpoint("C:\\z_data\\checkpoint")
?? - driver的HA的時候:

ssc.checkpoint("C:\\z_data\\checkpoint")
ssc =StreamingContext.getOrCreate("C:\\z_data\\checkpoint"
,functionToCreateContext)

(4)Spark Streaming 的 transform 操作

??在使用transform操作的時候介紹兩個重要的概念:
??黑名單:如果允許的操作比不允許的操作多,那么將不允許的操作加入黑名單
??白名單:如果允許的操作比不允許的操作少,那么將允許的操作加入白名單
代碼

object _1Streaming_tranform {
    def main(args: Array[String]): Unit = {
        //定義黑名單
        val black_list=List("@","#","$","%")
        Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
        Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN)
        Logger.getLogger("org.apache.hive").setLevel(Level.WARN)
        //1.獲取編程入口,StreamingContext
        val conf = new SparkConf().setMaster("local[2]").setAppName("_1Streaming_tranform")
        val ssc=new StreamingContext(conf,Seconds(2))
        //2.從對應的網(wǎng)絡(luò)端口讀取數(shù)據(jù)
        val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("test",9999)
        //2.1將黑名單廣播出去
        val bc = ssc.sparkContext.broadcast(black_list)
        //2.2設(shè)置checkpoint
        ssc.checkpoint("C:\\z_data\\checkPoint\\checkPoint_1")
        //3業(yè)務處理
        val wordDStream: DStream[String] = inputDStream.flatMap(_.split(" "))
        //transform方法表示:從DStream拿出來一個RDD,經(jīng)過transformsFunc計算之后,返回一個新的RDD
        val fileterdDStream: DStream[String] = wordDStream.transform(rdd=>{
            //過濾掉黑名單中的數(shù)據(jù)
            val blackList: List[String] = bc.value
            rdd.filter(word=>{
                !blackList.contains(word)
            })
        })
        //3.2統(tǒng)計相應的單詞數(shù)
        val resultDStream = fileterdDStream.map(msg => (msg, 1))
            .updateStateByKey((values: Seq[Int], stats: Option[Int]) => {
                Option(values.sum + stats.getOrElse(0))
            })
        //4打印output
        resultDStream.print()
        //5.開啟streaming流
        ssc.start()
        ssc.awaitTermination()
    }
}

黑名單中的數(shù)據(jù)會被過濾:
SparkStreaming的實戰(zhàn)案例


(5)Spark Streaming 的 window 操作

SparkStreaming的實戰(zhàn)案例
注意
在做window操作時:
  - 窗口覆蓋的數(shù)據(jù)流的時間長度,必須是批處理時間間隔的倍數(shù)
  - 前一個窗口到后一個窗口所經(jīng)過的時間長度,必須是批處理時間間隔的倍數(shù)。
偽代碼

 //1.獲取編程入口,StreamingContext
    val conf = new SparkConf().setMaster("local[2]").setAppName("WordCount_Window")
    val ssc=new StreamingContext(conf,Seconds(batchInvail.toLong))
    //2.從對應的網(wǎng)絡(luò)端口讀取數(shù)據(jù)
    val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream(hostname,port.toInt)
    val lineDStream: DStream[String] = inputDStream.flatMap(_.split(" "))
    val wordDStream: DStream[(String, Int)] = lineDStream.map((_,1))

    /**
      * 每隔4秒,算過去6秒的數(shù)據(jù)
      * reduceFunc:數(shù)據(jù)合并的函數(shù)
      * windowDuration:窗口的大小(過去6秒的數(shù)據(jù))
      * slideDuration:窗口滑動的時間(每隔4秒)
      */
    val resultDStream: DStream[(String, Int)] = wordDStream.reduceByKeyAndWindow((kv1:Int, kv2:Int)=>kv1+kv2,
      Seconds(batchInvail.toLong * 3),
      Seconds(batchInvail.toLong * 2))
    resultDStream.print()

    ssc.start()
    ssc.awaitTermination()

(6)Spark Streaming 的ForeachRDD 操作

概念

  • foreach: 遍歷一個分布式集合(rdd)中的每一個元素
  • foreachPartition:遍歷一個分布式集合(rdd)中的每一個分區(qū)
  • foreachRDD:遍歷一個分布式集合(DStream)中的每一個RDD
    這個算子用的好,通常程序的性能會提升很多。
    偽代碼
    //這個方法表示遍歷DStream中的每一個rdd
        windowDS.foreachRDD(rdd=>{
            if(!rdd.isEmpty()){
                rdd.mapPartitions(ptn=>{
                    if(!ptn.isEmpty){
                        ptn.foreach(msg=>{
                            //在這里做相應的操作
                        })
                    }
                })
            }
        })
向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI