您好,登錄后才能下訂單哦!
這篇文章主要介紹“Dstream的創(chuàng)建方法”,在日常操作中,相信很多人在Dstream的創(chuàng)建方法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Dstream的創(chuàng)建方法”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
測試過程中,可以通過使用ssc.queueStream(queueOfRDDs)來創(chuàng)建DStream,每一個推送到這個隊列中的RDD,都會作為一個DStream處理。
案例
object SparkStreaming02_RDDQueue { def main(args: Array[String]): Unit = { //創(chuàng)建配置文件對象 val conf: SparkConf = new SparkConf().setAppName("SparkStreaming02_RDDQueue").setMaster("local[*]") //創(chuàng)建SparkStreaming上下文環(huán)境對象 val ssc: StreamingContext = new StreamingContext(conf,Seconds(3)) //創(chuàng)建隊列,里面放的是RDD val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]() //從隊列中采集數(shù)據(jù),獲取DS val queueDS: InputDStream[Int] = ssc.queueStream(rddQueue,false) //處理采集到的數(shù)據(jù) val resDS: DStream[(Int, Int)] = queueDS.map((_,1)).reduceByKey(_+_) //打印結(jié)果 resDS.print() //啟動采集器 ssc.start() //循環(huán)創(chuàng)建RDD,并將創(chuàng)建的RDD放到隊列里 for( i <- 1 to 5){ rddQueue.enqueue(ssc.sparkContext.makeRDD(6 to 10)) Thread.sleep(2000) } ssc.awaitTermination() } }
需要繼承Receiver,并實現(xiàn)onStart、onStop方法來自定義數(shù)據(jù)源采集。
用一個案例來說明
/** * Author: Felix * Date: 2020/5/20 * Desc: 通過自定義數(shù)據(jù)源方式創(chuàng)建DStream * 模擬從指定的網(wǎng)絡端口獲取數(shù)據(jù) */ object SparkStreaming03_CustomerReceiver { def main(args: Array[String]): Unit = { //創(chuàng)建配置文件對象 val conf: SparkConf = new SparkConf().setAppName("SparkStreaming02_RDDQueue").setMaster("local[*]") //創(chuàng)建SparkStreaming上下文環(huán)境對象 val ssc: StreamingContext = new StreamingContext(conf,Seconds(3)) //通過自定義數(shù)據(jù)源創(chuàng)建Dstream val myDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver("hadoop202",9999)) //扁平化 val flatMapDS: DStream[String] = myDS.flatMap(_.split(" ")) //結(jié)構(gòu)轉(zhuǎn)換 進行計數(shù) val mapDS: DStream[(String, Int)] = flatMapDS.map((_,1)) //聚合 val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_) //打印輸出 reduceDS.print ssc.start() ssc.awaitTermination() } } //Receiver[T] 泛型表示的是 讀取的數(shù)據(jù)類型 class MyReceiver(host: String,port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){ private var socket: Socket = _ // 真正的處理接收數(shù)據(jù)的邏輯 def receive() { try { //創(chuàng)建連接 socket = new Socket(host,port) //根據(jù)連接對象獲取輸入流 val reader: BufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream,StandardCharsets.UTF_8)) //定義一個變量,用于接收讀取到的一行數(shù)據(jù) var input:String = null while((input = reader.readLine())!= null){ store(input) } } catch { case e: ConnectException => restart(s"Error connecting to $host:$port", e) return } finally { onStop() } } override def onStart(): Unit = { new Thread("Socket Receiver") { setDaemon(true) override def run() { receive() } }.start() } override def onStop(): Unit = { synchronized { if (socket != null) { socket.close() socket = null } } } }
需求:通過SparkStreaming從Kafka讀取數(shù)據(jù),并將讀取過來的數(shù)據(jù)做簡單計算,最終打印到控制臺。
導入依賴
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.1</version> </dependency>
編寫代碼 0-8Receive模式,offset維護在zk中,程序停止后,繼續(xù)生產(chǎn)數(shù)據(jù),再次啟動程序,仍然可以繼續(xù)消費。可通過get /consumers/bigdata/offsets/主題名/分區(qū)號 查看
object Spark04_ReceiverAPI { def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkConf val sparkConf: SparkConf = new SparkConf().setAppName("Spark04_ReceiverAPI").setMaster("local[*]") //2.創(chuàng)建StreamingContext val ssc = new StreamingContext(sparkConf, Seconds(3)) //3.使用ReceiverAPI讀取Kafka數(shù)據(jù)創(chuàng)建DStream val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "hadoop202:2181,hadoop203:2181,hadoop204:2181", "bigdata", //v表示的主題的分區(qū)數(shù) Map("mybak" -> 2)) //4.計算WordCount并打印 new KafkaProducer[String,String]().send(new ProducerRecord[]()) val lineDStream: DStream[String] = kafkaDStream.map(_._2) val word: DStream[String] = lineDStream.flatMap(_.split(" ")) val wordToOneDStream: DStream[(String, Int)] = word.map((_, 1)) val wordToCountDStream: DStream[(String, Int)] = wordToOneDStream.reduceByKey(_ + _) wordToCountDStream.print() //5.開啟任務 ssc.start() ssc.awaitTermination() } }
需求:通過SparkStreaming從Kafka讀取數(shù)據(jù),并將讀取過來的數(shù)據(jù)做簡單計算,最終打印到控制臺。
導入依賴
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.1</version> </dependency>
編寫代碼(自動維護offset1)
offset維護在checkpoint中,但是獲取StreamingContext的方式需要改變,目前這種方式會丟失消息
object Spark05_DirectAPI_Auto01 { def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkConf val sparkConf: SparkConf = new SparkConf().setAppName("Spark05_DirectAPI_Auto01").setMaster("local[*]") //2.創(chuàng)建StreamingContext val ssc = new StreamingContext(sparkConf, Seconds(3)) ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp") //3.準備Kafka參數(shù) val kafkaParams: Map[String, String] = Map[String, String]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092", ConsumerConfig.GROUP_ID_CONFIG -> "bigdata" ) //4.使用DirectAPI自動維護offset的方式讀取Kafka數(shù)據(jù)創(chuàng)建DStream val kafkaDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("mybak")) //5.計算WordCount并打印 kafkaDStream.map(_._2) .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .print() //6.開啟任務 ssc.start() ssc.awaitTermination() } }
編寫代碼(自動維護offset2)
offset維護在checkpoint中,獲取StreamingContext為getActiveOrCreate
這種方式缺點:
checkpoint小文件過多
checkpoint記錄最后一次時間戳,再次啟動的時候會把間隔時間的周期再執(zhí)行一次
object Spark06_DirectAPI_Auto02 { def main(args: Array[String]): Unit = { val ssc: StreamingContext = StreamingContext.getActiveOrCreate("D:\\dev\\workspace\\my-bak\\spark-bak\\cp", () => getStreamingContext) ssc.start() ssc.awaitTermination() } def getStreamingContext: StreamingContext = { //1.創(chuàng)建SparkConf val sparkConf: SparkConf = new SparkConf().setAppName("DirectAPI_Auto01").setMaster("local[*]") //2.創(chuàng)建StreamingContext val ssc = new StreamingContext(sparkConf, Seconds(3)) ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp") //3.準備Kafka參數(shù) val kafkaParams: Map[String, String] = Map[String, String]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092", ConsumerConfig.GROUP_ID_CONFIG -> "bigdata" ) //4.使用DirectAPI自動維護offset的方式讀取Kafka數(shù)據(jù)創(chuàng)建DStream val kafkaDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("mybak")) //5.計算WordCount并打印 kafkaDStream.map(_._2) .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .print() //6.返回結(jié)果 ssc } }
編寫代碼(手動維護offset)
object Spark07_DirectAPI_Handler { def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkConf val conf: SparkConf = new SparkConf().setAppName("DirectAPI_Handler").setMaster("local[*]") //2.創(chuàng)建StreamingContext val ssc = new StreamingContext(conf, Seconds(3)) //3.創(chuàng)建Kafka參數(shù) val kafkaParams: Map[String, String] = Map[String, String]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092", ConsumerConfig.GROUP_ID_CONFIG -> "bigdata" ) //4.獲取上一次消費的位置信息 val fromOffsets: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]( TopicAndPartition("mybak", 0) -> 13L, TopicAndPartition("mybak", 1) -> 10L ) //5.使用DirectAPI手動維護offset的方式消費數(shù)據(jù) val kafakDStream: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String]( ssc, kafkaParams, fromOffsets, (m: MessageAndMetadata[String, String]) => m.message()) //6.定義空集合用于存放數(shù)據(jù)的offset var offsetRanges = Array.empty[OffsetRange] //7.將當前消費到的offset進行保存 kafakDStream.transform { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.foreachRDD { rdd => for (o <- offsetRanges) { println(s"${o.fromOffset}-${o.untilOffset}") } } //8.開啟任務 ssc.start() ssc.awaitTermination() } }
需求:通過SparkStreaming從Kafka讀取數(shù)據(jù),并將讀取過來的數(shù)據(jù)做簡單計算,最終打印到控制臺。
導入依賴,為了避免和0-8沖突,我們新建一個module演示
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.1.1</version> </dependency>
3)編寫代碼
object Spark01_DirectAPI010 { def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkConf val conf: SparkConf = new SparkConf().setAppName("DirectAPI010").setMaster("local[*]") //2.創(chuàng)建StreamingContext val ssc = new StreamingContext(conf, Seconds(3)) //3.構(gòu)建Kafka參數(shù) val kafkaParmas: Map[String, Object] = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", ConsumerConfig.GROUP_ID_CONFIG -> "bigdata191122", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer] ) //4.消費Kafka數(shù)據(jù)創(chuàng)建流 val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaParmas)) //5.計算WordCount并打印 kafkaDStream.map(_.value()) .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .print() //6.啟動任務 ssc.start() ssc.awaitTermination() } }
0-8 ReceiverAPI:
1)專門的Executor讀取數(shù)據(jù),速度不統(tǒng)一
2)跨機器傳輸數(shù)據(jù),WAL
3)Executor讀取數(shù)據(jù)通過多個線程的方式,想要增加并行度,則需要多個流union
4)offset存儲在Zookeeper中
0-8 DirectAPI:
1)Executor讀取數(shù)據(jù)并計算
2)增加Executor個數(shù)來增加消費的并行度
3)offset存儲
a)CheckPoint(getActiveOrCreate方式創(chuàng)建StreamingContext)
b)手動維護(有事務的存儲系統(tǒng))
c)獲取offset必須在第一個調(diào)用的算子中:offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
0-10 DirectAPI:
1)Executor讀取數(shù)據(jù)并計算
2)增加Executor個數(shù)來增加消費的并行度
3)offset存儲
i.a.__consumer_offsets系統(tǒng)主題中
ii.b.手動維護(有事務的存儲系統(tǒng))
到此,關于“Dstream的創(chuàng)建方法”的學習就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。