溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

第93課:SparkStreaming updateStateByKey 基本操作綜合案例實(shí)戰(zhàn)和內(nèi)幕源碼解密

發(fā)布時(shí)間:2020-06-12 05:50:46 來(lái)源:網(wǎng)絡(luò) 閱讀:2300 作者:lqding1980 欄目:大數(shù)據(jù)

   Spark Streaming的DStream為我們提供了一個(gè)updateStateByKey方法,它的主要功能是可以隨著時(shí)間的流逝在Spark Streaming中為每一個(gè)key維護(hù)一份state狀態(tài),通過(guò)更新函數(shù)對(duì)該key的狀態(tài)不斷更新。對(duì)每一個(gè)新的batch而言,Spark Streaming會(huì)在使用updateStateByKey的時(shí)候?yàn)橐呀?jīng)存在的key進(jìn)行state的狀態(tài)更新(對(duì)每個(gè)新出現(xiàn)的key,會(huì)同樣執(zhí)行state的更新函數(shù)操作),但是如果通過(guò)更新函數(shù)對(duì)state更新后返回none的話,此時(shí)刻key對(duì)應(yīng)的state狀態(tài)被刪除掉,需要特別說(shuō)明的是state可以是任意類型的數(shù)據(jù)結(jié)構(gòu),這就為我們的計(jì)算帶來(lái)無(wú)限的想象空間;

  重點(diǎn)來(lái)了?。?!如果要不斷的更新每個(gè)key的state,就一定會(huì)涉及到狀態(tài)的保存和容錯(cuò),這個(gè)時(shí)候就需要開(kāi)啟checkpoint機(jī)制和功能,需要說(shuō)明的是checkpoint可以保存一切可以存儲(chǔ)在文件系統(tǒng)上的內(nèi)容,例如:程序未處理的數(shù)據(jù)及已經(jīng)擁有的狀態(tài)。

  補(bǔ)充說(shuō)明:關(guān)于流式處理對(duì)歷史狀態(tài)進(jìn)行保存和更新具有重大實(shí)用意義,例如進(jìn)行廣告(投放廣告和運(yùn)營(yíng)廣告效果評(píng)估的價(jià)值意義,熱點(diǎn)隨時(shí)追蹤、熱力圖)

  簡(jiǎn)單的來(lái)說(shuō),如果我們需要進(jìn)行wordcount,每個(gè)batchInterval都會(huì)計(jì)算出新的一批數(shù)據(jù),這批數(shù)據(jù)如何更新到以前計(jì)算的結(jié)果上?updateStateByKey就能實(shí)現(xiàn)此功能。

函數(shù)定義如下:

def updateStateByKey[S: ClassTag](
    updateFunc: (Seq[V], Option[S]) => Option[S]
  ): DStream[(K, S)] = ssc.withScope {
  updateStateByKey(updateFunc, defaultPartitioner())
}

updateStateByKey 需要傳入一個(gè)函數(shù),該函數(shù)有兩個(gè)參數(shù)Seq[V]表示最新一次reduce的值的序列,Option[s]表示的是key對(duì)應(yīng)的以前的值。返回的時(shí)一個(gè)key的最新值。


下面我們用實(shí)例演示:

package com.dt.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Created by Administrator on 2016/5/3.
 */
object UpdateStateByKeyDemo {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("UpdateStateByKeyDemo")
    val ssc = new StreamingContext(conf,Seconds(20))
    //要使用updateStateByKey方法,必須設(shè)置Checkpoint。
    ssc.checkpoint("/checkpoint/")
    val socketLines = ssc.socketTextStream("spark-master",9999)

    socketLines.flatMap(_.split(",")).map(word=>(word,1))
      .updateStateByKey(
        (currValues:Seq[Int],preValue:Option[Int]) =>{
       val currValue = currValues.sum
         Some(currValue + preValue.getOrElse(0))
    }).print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }
}

打包上傳至spark集群。


打開(kāi)nc,發(fā)送測(cè)試數(shù)據(jù)

root@spark-master:~# nc -lk 9999
hadoop,spark,scala,hive
hadoop,Hbase,spark

運(yùn)行spark 程序

root@spark-master:~# /usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.streaming.UpdateStateByKeyDemo  --master spark://spark-master:7077 ./spark.jar


查看運(yùn)行結(jié)果:

-------------------------------------------
Time: 1462282180000 ms
-------------------------------------------
(scala,1)
(hive,1)
(spark,2)
(hadoop,2)
(Hbase,1)


我們?cè)趎c中再輸入一些數(shù)據(jù)

root@spark-master:~# nc -lk 9999
hadoop,spark,scala,hive
hadoop,Hbase,spark
hadoop,spark,scala,hive
hadoop,Hbase,spark

再次查看結(jié)果:

-------------------------------------------
Time: 1462282200000 ms
-------------------------------------------
(scala,2)
(hive,2)
(spark,4)
(hadoop,4)
(Hbase,2)


可見(jiàn),它將我們兩次統(tǒng)計(jì)結(jié)果合并了。


備注:

1、DT大數(shù)據(jù)夢(mèng)工廠微信公眾號(hào)DT_Spark 
2、IMF晚8點(diǎn)大數(shù)據(jù)實(shí)戰(zhàn)YY直播頻道號(hào):68917580
3、新浪微博: http://www.weibo.com/ilovepains

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI