您好,登錄后才能下訂單哦!
廢話不多說,直接上干貨?。?!
相關(guān)依賴:
<properties>
<project.build.sourceEncoding>UTF8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<spark.version>2.3.2</spark.version>
<hadoop.version>2.7.6</hadoop.version>
<scala.compat.version>2.11</scala.compat.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- sparkStreaming -->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scalikejdbc/scalikejdbc -->
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc_2.11</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>compile</scope>
</dependency>
</dependencies>
編程架構(gòu):
在某個節(jié)點上中啟動nc -lk 9999,然后用作數(shù)據(jù)源。編寫程序?qū)崿F(xiàn)網(wǎng)絡(luò)的wordcount。
代碼實現(xiàn):
object NetWordCount {
/**
* 編程套路:
* 1.獲取編程入口,StreamingContext
* 2.通過StreamingContext構(gòu)建第一個DStream
* 3.對DStream進行各種的transformation操作
* 4.對于數(shù)據(jù)結(jié)果進行output操作
* 5.提交sparkStreaming應用程序
*/
def main(args: Array[String]): Unit = {
//屏蔽日志
Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN)
Logger.getLogger("org.apache.hive").setLevel(Level.WARN)
//1.獲取編程入口,StreamingContext
val conf= new SparkConf().setMaster("local[2]")
.setAppName("NetWordCount")
//第二個參數(shù),表示批處理時長
val ssc=new StreamingContext(conf,Seconds(2))
/**
* 2.通過StreamingContext構(gòu)建第一個DStream(通過網(wǎng)絡(luò)去讀數(shù)據(jù))
* 第一個參數(shù):主機名
* 第二個參數(shù):端口號
*/
val ReceiverInputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("test",9999)
//3.對DStream進行各種的transformation操作
val wordDS: DStream[String] = ReceiverInputDStream.flatMap(msg => {
msg.split("\\s+")
})
val wordCountDS: DStream[(String, Int)] = wordDS.map(word=>(word,1)).reduceByKey(_+_)
//4.對于數(shù)據(jù)結(jié)果進行output操作,這里是打印輸出
wordCountDS.print()
//5.提交sparkStreaming應用程序
ssc.start()
ssc.awaitTermination()
}
}
使用nc -lk 9999在相應的節(jié)點上發(fā)出消息(每隔一個批處理時間發(fā)送一次),查看控制臺打?。?br/>batch2
batch3
結(jié)果發(fā)現(xiàn):由于現(xiàn)在的操作時無狀態(tài)的,所以每隔2s處理一次,但是每次的單詞數(shù)不會統(tǒng)計,也就是說,只會統(tǒng)計當前批處理的單詞,之前輸入的則不會統(tǒng)計。
同樣是wordCounte,這次要實現(xiàn)的效果是:到現(xiàn)在為止,統(tǒng)計過去時間段內(nèi)的所有單詞的個數(shù)。
代碼:
object UpdateStateBykeyWordCount {
/**
* 編程套路:
* 1.獲取編程入口,StreamingContext
* 2.通過StreamingContext構(gòu)建第一個DStream
* 3.對DStream進行各種的transformation操作
* 4.對于數(shù)據(jù)結(jié)果進行output操作
* 5.提交sparkStreaming應用程序
*/
def main(args: Array[String]): Unit = {
//屏蔽日志
Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN)
Logger.getLogger("org.apache.hive").setLevel(Level.WARN)
//1.獲取編程入口,StreamingContext
val conf = new SparkConf().setMaster("local[2]")
.setAppName("NetWordCount")
//第二個參數(shù),表示批處理時長
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("C:\\z_data\\checkPoint\\checkPoint_1")
/**
* 2.通過StreamingContext構(gòu)建第一個DStream(通過網(wǎng)絡(luò)去讀數(shù)據(jù))
* 第一個參數(shù):主機名
* 第二個參數(shù):端口號
*/
val ReceiverInputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("test", 9999)
//3.對DStream進行各種的transformation操作
val wordDS: DStream[(String,Int)] = ReceiverInputDStream.flatMap(msg => {
msg.split("\\s+")
}).map(word=>(word,1))
/**
* updateStateByKey是狀態(tài)更新函數(shù),
* updateFunc: (Seq[V], Option[S]) => Option[S]
* (U,C)=>C
* values:Seq[Int],state:Option[Int]==>Option[Int]
*
* @param values :新值
* @param state :狀態(tài)值
* @return
*/
val updateDS: DStream[(String, Int)] = wordDS.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
Option(values.sum + state.getOrElse(0))
})
//4.對于數(shù)據(jù)結(jié)果進行output操作,這里是打印輸出
updateDS.print()
//5.提交sparkStreaming應用程序
ssc.start()
ssc.awaitTermination()
}
}
使用 nc -kl 9999:
觀察控制臺:
batch2
batch3
發(fā)現(xiàn):兩次批處理的結(jié)果,進行了聚合,也就是所謂的有狀態(tài)的計算。
注意:ssc.checkpoint("C:\\z_data\\checkPoint\\checkPoint_1")
上面這句代碼一定要加,他會將上一次的批處理計算的結(jié)果保存起來,如果不加:
錯誤:requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
?? 在上述的updateStateByKey代碼中如果當前程序運行異常時,會丟失數(shù)據(jù)(重啟之后,找不回原來計算的數(shù)據(jù)),因為編程入口StreamingContext在代碼重新運行的時候,是重新生成的,為了使程序在異常退出的時候,在下次啟動的時候,依然可以獲得上一次的StreamingContext對象,保證計算數(shù)據(jù)不丟失,此時就需要將StreamingContext對象存儲在持久化的系統(tǒng)中。也就是說需要制作StreamingContext對象的HA。
代碼:
object WC_DriverHA {
def main(args: Array[String]): Unit = {
//屏蔽日志
Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN)
Logger.getLogger("org.apache.hive").setLevel(Level.WARN)
/**
* StreamingContext.getOrCreate()
* 第一個參數(shù):checkpointPath,和下面方法中的checkpointPath目錄一致
* 第二個參數(shù):creatingFunc: () => StreamingContext:用于創(chuàng)建StreamingContext對象
* 最終使用StreamingContext.getOrCreate()可以實現(xiàn)StreamingContext對象的HA,保證在程序重新運行的時候,之前狀態(tài)仍然可以恢復
*/
val ssc= StreamingContext.getActiveOrCreate("C:\\z_data\\checkPoint\\checkPoint_HA",functionToCreateContext)
ssc.start()
ssc.awaitTermination()
}
def functionToCreateContext():StreamingContext={
//1.獲取編程入口,StreamingContext
val conf = new SparkConf().setMaster("local[2]")
.setAppName("NetWordCount")
//第二個參數(shù),表示批處理時長
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("C:\\z_data\\checkPoint\\checkPoint_HA")
/**
* 2.通過StreamingContext構(gòu)建第一個DStream(通過網(wǎng)絡(luò)去讀數(shù)據(jù))
* 第一個參數(shù):主機名
* 第二個參數(shù):端口號
*/
val ReceiverInputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("test", 9999)
//3.對DStream進行各種的transformation操作
val wordDS: DStream[(String,Int)] = ReceiverInputDStream.flatMap(msg => {
msg.split("\\s+")
}).map(word=>(word,1))
val updateDS: DStream[(String, Int)] = wordDS.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
Option(values.sum + state.getOrElse(0))
})
//4.對于數(shù)據(jù)結(jié)果進行output操作,這里是打印輸出
updateDS.print()
//5.提交sparkStreaming應用程序
ssc.start()
ssc.awaitTermination()
ssc
}
}
測試:
?? - 先正常運行一段時間,計算出結(jié)果
?? - 停止程序
?? - 再次啟動
?? - 驗證再次啟動的程序,是否能夠拿回停止前計算得到的結(jié)果
原理:
??如果是第一次執(zhí)行,那么在在這個checkpointDriectory目錄中是不存在streamingContext對象的,所以要創(chuàng)建,第二次運行的時候,就不會在創(chuàng)建,則是從checkpointDriectory目錄中讀取進行恢復。
注意:
??正常情況下,使用這種方式的HA,只能持久狀態(tài)數(shù)據(jù)到持久化的文件中,默認情況是不會持久化StreamingContext對象到CheckPointDriectory中的。
??從故障中恢復checkpoint中有兩種類型
???- Metadata checkpointing:driver節(jié)點中的元數(shù)據(jù)信息
?????- Configuration:用于創(chuàng)建流式應用程序的配置
?????- DStream:定義streaming程序的DStream操作
?????- Incomplete batches:批量的job排隊但尚未完成。(程序上次運行到的位置)
???- Data checkpointing:將生成的RDD保存到可靠的存儲
?????- 計算之后生成的RDD
?????- 在receiver接收到數(shù)據(jù),轉(zhuǎn)化的RDD
?? - 從運行應用程序的driver的故障中恢復-元數(shù)據(jù),(driver的HA)
?? - 使用有狀態(tài)計算的時候啟動checkPoint:updateStateByKey或者reduceByKeyAndWindow…
?? - 有狀態(tài)計算的時候:
?? ?ssc.checkpoint("C:\\z_data\\checkpoint")
?? - driver的HA的時候:
ssc.checkpoint("C:\\z_data\\checkpoint")
ssc =StreamingContext.getOrCreate("C:\\z_data\\checkpoint"
,functionToCreateContext)
??在使用transform操作的時候介紹兩個重要的概念:
??黑名單:如果允許的操作比不允許的操作多,那么將不允許的操作加入黑名單
??白名單:如果允許的操作比不允許的操作少,那么將允許的操作加入白名單
代碼:
object _1Streaming_tranform {
def main(args: Array[String]): Unit = {
//定義黑名單
val black_list=List("@","#","$","%")
Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN)
Logger.getLogger("org.apache.hive").setLevel(Level.WARN)
//1.獲取編程入口,StreamingContext
val conf = new SparkConf().setMaster("local[2]").setAppName("_1Streaming_tranform")
val ssc=new StreamingContext(conf,Seconds(2))
//2.從對應的網(wǎng)絡(luò)端口讀取數(shù)據(jù)
val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("test",9999)
//2.1將黑名單廣播出去
val bc = ssc.sparkContext.broadcast(black_list)
//2.2設(shè)置checkpoint
ssc.checkpoint("C:\\z_data\\checkPoint\\checkPoint_1")
//3業(yè)務處理
val wordDStream: DStream[String] = inputDStream.flatMap(_.split(" "))
//transform方法表示:從DStream拿出來一個RDD,經(jīng)過transformsFunc計算之后,返回一個新的RDD
val fileterdDStream: DStream[String] = wordDStream.transform(rdd=>{
//過濾掉黑名單中的數(shù)據(jù)
val blackList: List[String] = bc.value
rdd.filter(word=>{
!blackList.contains(word)
})
})
//3.2統(tǒng)計相應的單詞數(shù)
val resultDStream = fileterdDStream.map(msg => (msg, 1))
.updateStateByKey((values: Seq[Int], stats: Option[Int]) => {
Option(values.sum + stats.getOrElse(0))
})
//4打印output
resultDStream.print()
//5.開啟streaming流
ssc.start()
ssc.awaitTermination()
}
}
黑名單中的數(shù)據(jù)會被過濾:
注意:
在做window操作時:
- 窗口覆蓋的數(shù)據(jù)流的時間長度,必須是批處理時間間隔的倍數(shù)
- 前一個窗口到后一個窗口所經(jīng)過的時間長度,必須是批處理時間間隔的倍數(shù)。
偽代碼:
//1.獲取編程入口,StreamingContext
val conf = new SparkConf().setMaster("local[2]").setAppName("WordCount_Window")
val ssc=new StreamingContext(conf,Seconds(batchInvail.toLong))
//2.從對應的網(wǎng)絡(luò)端口讀取數(shù)據(jù)
val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream(hostname,port.toInt)
val lineDStream: DStream[String] = inputDStream.flatMap(_.split(" "))
val wordDStream: DStream[(String, Int)] = lineDStream.map((_,1))
/**
* 每隔4秒,算過去6秒的數(shù)據(jù)
* reduceFunc:數(shù)據(jù)合并的函數(shù)
* windowDuration:窗口的大小(過去6秒的數(shù)據(jù))
* slideDuration:窗口滑動的時間(每隔4秒)
*/
val resultDStream: DStream[(String, Int)] = wordDStream.reduceByKeyAndWindow((kv1:Int, kv2:Int)=>kv1+kv2,
Seconds(batchInvail.toLong * 3),
Seconds(batchInvail.toLong * 2))
resultDStream.print()
ssc.start()
ssc.awaitTermination()
概念:
//這個方法表示遍歷DStream中的每一個rdd
windowDS.foreachRDD(rdd=>{
if(!rdd.isEmpty()){
rdd.mapPartitions(ptn=>{
if(!ptn.isEmpty){
ptn.foreach(msg=>{
//在這里做相應的操作
})
}
})
}
})
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。