您好,登錄后才能下訂單哦!
Spark Streaming的DStream為我們提供了一個(gè)updateStateByKey方法,它的主要功能是可以隨著時(shí)間的流逝在Spark Streaming中為每一個(gè)key維護(hù)一份state狀態(tài),通過(guò)更新函數(shù)對(duì)該key的狀態(tài)不斷更新。對(duì)每一個(gè)新的batch而言,Spark Streaming會(huì)在使用updateStateByKey的時(shí)候?yàn)橐呀?jīng)存在的key進(jìn)行state的狀態(tài)更新(對(duì)每個(gè)新出現(xiàn)的key,會(huì)同樣執(zhí)行state的更新函數(shù)操作),但是如果通過(guò)更新函數(shù)對(duì)state更新后返回none的話,此時(shí)刻key對(duì)應(yīng)的state狀態(tài)被刪除掉,需要特別說(shuō)明的是state可以是任意類型的數(shù)據(jù)結(jié)構(gòu),這就為我們的計(jì)算帶來(lái)無(wú)限的想象空間;
重點(diǎn)來(lái)了?。?!如果要不斷的更新每個(gè)key的state,就一定會(huì)涉及到狀態(tài)的保存和容錯(cuò),這個(gè)時(shí)候就需要開(kāi)啟checkpoint機(jī)制和功能,需要說(shuō)明的是checkpoint可以保存一切可以存儲(chǔ)在文件系統(tǒng)上的內(nèi)容,例如:程序未處理的數(shù)據(jù)及已經(jīng)擁有的狀態(tài)。
補(bǔ)充說(shuō)明:關(guān)于流式處理對(duì)歷史狀態(tài)進(jìn)行保存和更新具有重大實(shí)用意義,例如進(jìn)行廣告(投放廣告和運(yùn)營(yíng)廣告效果評(píng)估的價(jià)值意義,熱點(diǎn)隨時(shí)追蹤、熱力圖)
簡(jiǎn)單的來(lái)說(shuō),如果我們需要進(jìn)行wordcount,每個(gè)batchInterval都會(huì)計(jì)算出新的一批數(shù)據(jù),這批數(shù)據(jù)如何更新到以前計(jì)算的結(jié)果上?updateStateByKey就能實(shí)現(xiàn)此功能。
函數(shù)定義如下:
def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)] = ssc.withScope { updateStateByKey(updateFunc, defaultPartitioner()) }
updateStateByKey 需要傳入一個(gè)函數(shù),該函數(shù)有兩個(gè)參數(shù)Seq[V]表示最新一次reduce的值的序列,Option[s]表示的是key對(duì)應(yīng)的以前的值。返回的時(shí)一個(gè)key的最新值。
下面我們用實(shí)例演示:
package com.dt.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by Administrator on 2016/5/3. */ object UpdateStateByKeyDemo { def main(args: Array[String]) { val conf = new SparkConf().setAppName("UpdateStateByKeyDemo") val ssc = new StreamingContext(conf,Seconds(20)) //要使用updateStateByKey方法,必須設(shè)置Checkpoint。 ssc.checkpoint("/checkpoint/") val socketLines = ssc.socketTextStream("spark-master",9999) socketLines.flatMap(_.split(",")).map(word=>(word,1)) .updateStateByKey( (currValues:Seq[Int],preValue:Option[Int]) =>{ val currValue = currValues.sum Some(currValue + preValue.getOrElse(0)) }).print() ssc.start() ssc.awaitTermination() ssc.stop() } }
打包上傳至spark集群。
打開(kāi)nc,發(fā)送測(cè)試數(shù)據(jù)
root@spark-master:~# nc -lk 9999 hadoop,spark,scala,hive hadoop,Hbase,spark
運(yùn)行spark 程序
root@spark-master:~# /usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.streaming.UpdateStateByKeyDemo --master spark://spark-master:7077 ./spark.jar
查看運(yùn)行結(jié)果:
------------------------------------------- Time: 1462282180000 ms ------------------------------------------- (scala,1) (hive,1) (spark,2) (hadoop,2) (Hbase,1)
我們?cè)趎c中再輸入一些數(shù)據(jù)
root@spark-master:~# nc -lk 9999 hadoop,spark,scala,hive hadoop,Hbase,spark hadoop,spark,scala,hive hadoop,Hbase,spark
再次查看結(jié)果:
------------------------------------------- Time: 1462282200000 ms ------------------------------------------- (scala,2) (hive,2) (spark,4) (hadoop,4) (Hbase,2)
可見(jiàn),它將我們兩次統(tǒng)計(jì)結(jié)果合并了。
備注:
1、DT大數(shù)據(jù)夢(mèng)工廠微信公眾號(hào)DT_Spark
2、IMF晚8點(diǎn)大數(shù)據(jù)實(shí)戰(zhàn)YY直播頻道號(hào):68917580
3、新浪微博: http://www.weibo.com/ilovepains
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。