溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Dstream的創(chuàng)建方法

發(fā)布時間:2021-07-16 02:13:01 來源:億速云 閱讀:393 作者:chen 欄目:大數(shù)據(jù)

這篇文章主要介紹“Dstream的創(chuàng)建方法”,在日常操作中,相信很多人在Dstream的創(chuàng)建方法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Dstream的創(chuàng)建方法”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

1. RDD隊列(了解)

測試過程中,可以通過使用ssc.queueStream(queueOfRDDs)來創(chuàng)建DStream,每一個推送到這個隊列中的RDD,都會作為一個DStream處理。

案例

object SparkStreaming02_RDDQueue {
  def main(args: Array[String]): Unit = {
    //創(chuàng)建配置文件對象
    val conf: SparkConf = new SparkConf().setAppName("SparkStreaming02_RDDQueue").setMaster("local[*]")
    //創(chuàng)建SparkStreaming上下文環(huán)境對象
    val ssc: StreamingContext = new StreamingContext(conf,Seconds(3))
    //創(chuàng)建隊列,里面放的是RDD
    val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()
    //從隊列中采集數(shù)據(jù),獲取DS
    val queueDS: InputDStream[Int] = ssc.queueStream(rddQueue,false)
    //處理采集到的數(shù)據(jù)
    val resDS: DStream[(Int, Int)] = queueDS.map((_,1)).reduceByKey(_+_)
    //打印結(jié)果
    resDS.print()
    //啟動采集器
    ssc.start()
    //循環(huán)創(chuàng)建RDD,并將創(chuàng)建的RDD放到隊列里
    for( i <- 1 to 5){
      rddQueue.enqueue(ssc.sparkContext.makeRDD(6 to 10))
      Thread.sleep(2000)
    }
    ssc.awaitTermination()
  }
}

2. 自定義數(shù)據(jù)源(某些場景需要自定義)

需要繼承Receiver,并實現(xiàn)onStart、onStop方法來自定義數(shù)據(jù)源采集。

用一個案例來說明

/**
  * Author: Felix
  * Date: 2020/5/20
  * Desc: 通過自定義數(shù)據(jù)源方式創(chuàng)建DStream
  *     模擬從指定的網(wǎng)絡端口獲取數(shù)據(jù)
  */
object SparkStreaming03_CustomerReceiver {
  def main(args: Array[String]): Unit = {
    //創(chuàng)建配置文件對象
    val conf: SparkConf = new SparkConf().setAppName("SparkStreaming02_RDDQueue").setMaster("local[*]")

    //創(chuàng)建SparkStreaming上下文環(huán)境對象
    val ssc: StreamingContext = new StreamingContext(conf,Seconds(3))

    //通過自定義數(shù)據(jù)源創(chuàng)建Dstream
    val myDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver("hadoop202",9999))

    //扁平化
    val flatMapDS: DStream[String] = myDS.flatMap(_.split(" "))

    //結(jié)構(gòu)轉(zhuǎn)換  進行計數(shù)
    val mapDS: DStream[(String, Int)] = flatMapDS.map((_,1))

    //聚合
    val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_)

    //打印輸出
    reduceDS.print

    ssc.start()

    ssc.awaitTermination()
  }
}

//Receiver[T]  泛型表示的是 讀取的數(shù)據(jù)類型
class MyReceiver(host: String,port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
  private var socket: Socket = _

  // 真正的處理接收數(shù)據(jù)的邏輯
  def receive() {
    try {
      //創(chuàng)建連接
      socket = new Socket(host,port)
      //根據(jù)連接對象獲取輸入流
      val reader: BufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream,StandardCharsets.UTF_8))
      //定義一個變量,用于接收讀取到的一行數(shù)據(jù)
      var input:String = null
      while((input = reader.readLine())!= null){
        store(input)
      }
    } catch {
      case e: ConnectException =>
        restart(s"Error connecting to $host:$port", e)
        return
    } finally {
      onStop()
    }
  }
  override def onStart(): Unit = {
    new Thread("Socket Receiver") {
      setDaemon(true)
      override def run() { receive() }
    }.start()
  }

  override def onStop(): Unit = {
    synchronized {
      if (socket != null) {
        socket.close()
        socket = null
      }
    }
  }
}

3. kafka數(shù)據(jù)源(重要)

1. 版本選型

Dstream的創(chuàng)建方法

2. Kafka 0-8 Receive模式

  1. 需求:通過SparkStreaming從Kafka讀取數(shù)據(jù),并將讀取過來的數(shù)據(jù)做簡單計算,最終打印到控制臺。

  2. 導入依賴

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
  1. 編寫代碼 0-8Receive模式,offset維護在zk中,程序停止后,繼續(xù)生產(chǎn)數(shù)據(jù),再次啟動程序,仍然可以繼續(xù)消費。可通過get /consumers/bigdata/offsets/主題名/分區(qū)號 查看

object Spark04_ReceiverAPI {
  def main(args: Array[String]): Unit = {
    //1.創(chuàng)建SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("Spark04_ReceiverAPI").setMaster("local[*]")

    //2.創(chuàng)建StreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    //3.使用ReceiverAPI讀取Kafka數(shù)據(jù)創(chuàng)建DStream
    val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,
      "hadoop202:2181,hadoop203:2181,hadoop204:2181",
      "bigdata",
      //v表示的主題的分區(qū)數(shù)
      Map("mybak" -> 2))

    //4.計算WordCount并打印        new KafkaProducer[String,String]().send(new ProducerRecord[]())
    val lineDStream: DStream[String] = kafkaDStream.map(_._2)
    val word: DStream[String] = lineDStream.flatMap(_.split(" "))
    val wordToOneDStream: DStream[(String, Int)] = word.map((_, 1))
    val wordToCountDStream: DStream[(String, Int)] = wordToOneDStream.reduceByKey(_ + _)
    wordToCountDStream.print()

    //5.開啟任務
    ssc.start()
    ssc.awaitTermination()
  }
}

3. Kafka 0-8 Direct模式

  1. 需求:通過SparkStreaming從Kafka讀取數(shù)據(jù),并將讀取過來的數(shù)據(jù)做簡單計算,最終打印到控制臺。

  2. 導入依賴

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
  1. 編寫代碼(自動維護offset1)

offset維護在checkpoint中,但是獲取StreamingContext的方式需要改變,目前這種方式會丟失消息

object Spark05_DirectAPI_Auto01 {

  def main(args: Array[String]): Unit = {

    //1.創(chuàng)建SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("Spark05_DirectAPI_Auto01").setMaster("local[*]")

    //2.創(chuàng)建StreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")

    //3.準備Kafka參數(shù)
    val kafkaParams: Map[String, String] = Map[String, String](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"
    )

    //4.使用DirectAPI自動維護offset的方式讀取Kafka數(shù)據(jù)創(chuàng)建DStream
    val kafkaDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
      kafkaParams,
      Set("mybak"))

    //5.計算WordCount并打印
    kafkaDStream.map(_._2)
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()

    //6.開啟任務
    ssc.start()
    ssc.awaitTermination()
  }
}
  1. 編寫代碼(自動維護offset2)

offset維護在checkpoint中,獲取StreamingContext為getActiveOrCreate

這種方式缺點:

  • checkpoint小文件過多

  • checkpoint記錄最后一次時間戳,再次啟動的時候會把間隔時間的周期再執(zhí)行一次

object Spark06_DirectAPI_Auto02 {

  def main(args: Array[String]): Unit = {
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("D:\\dev\\workspace\\my-bak\\spark-bak\\cp", () => getStreamingContext)

    ssc.start()
    ssc.awaitTermination()
  }

  def getStreamingContext: StreamingContext = {
    //1.創(chuàng)建SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("DirectAPI_Auto01").setMaster("local[*]")
    
    //2.創(chuàng)建StreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")
    
    //3.準備Kafka參數(shù)
    val kafkaParams: Map[String, String] = Map[String, String](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"
    )
    
    //4.使用DirectAPI自動維護offset的方式讀取Kafka數(shù)據(jù)創(chuàng)建DStream
    val kafkaDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
      kafkaParams,
      Set("mybak"))
    
    //5.計算WordCount并打印
    kafkaDStream.map(_._2)
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()
    
    //6.返回結(jié)果
    ssc
  }
}
  1. 編寫代碼(手動維護offset)

object Spark07_DirectAPI_Handler {

  def main(args: Array[String]): Unit = {

    //1.創(chuàng)建SparkConf
    val conf: SparkConf = new SparkConf().setAppName("DirectAPI_Handler").setMaster("local[*]")

    //2.創(chuàng)建StreamingContext
    val ssc = new StreamingContext(conf, Seconds(3))

    //3.創(chuàng)建Kafka參數(shù)
    val kafkaParams: Map[String, String] = Map[String, String](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"
    )

    //4.獲取上一次消費的位置信息
    val fromOffsets: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long](
      TopicAndPartition("mybak", 0) -> 13L,
      TopicAndPartition("mybak", 1) -> 10L
    )

    //5.使用DirectAPI手動維護offset的方式消費數(shù)據(jù)
    val kafakDStream: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
      ssc,
      kafkaParams,
      fromOffsets,
      (m: MessageAndMetadata[String, String]) => m.message())

    //6.定義空集合用于存放數(shù)據(jù)的offset
    var offsetRanges = Array.empty[OffsetRange]

    //7.將當前消費到的offset進行保存
    kafakDStream.transform { rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }.foreachRDD { rdd =>
      for (o <- offsetRanges) {
        println(s"${o.fromOffset}-${o.untilOffset}")
      }
    }

    //8.開啟任務
    ssc.start()
    ssc.awaitTermination()

  }
}

4. Kafka 0-10 Direct模式

  1. 需求:通過SparkStreaming從Kafka讀取數(shù)據(jù),并將讀取過來的數(shù)據(jù)做簡單計算,最終打印到控制臺。

  2. 導入依賴,為了避免和0-8沖突,我們新建一個module演示

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

3)編寫代碼

object Spark01_DirectAPI010 {
  def main(args: Array[String]): Unit = {

    //1.創(chuàng)建SparkConf
    val conf: SparkConf = new SparkConf().setAppName("DirectAPI010").setMaster("local[*]")

    //2.創(chuàng)建StreamingContext
    val ssc = new StreamingContext(conf, Seconds(3))

    //3.構(gòu)建Kafka參數(shù)
    val kafkaParmas: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "bigdata191122",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )

    //4.消費Kafka數(shù)據(jù)創(chuàng)建流
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaParmas))

    //5.計算WordCount并打印
    kafkaDStream.map(_.value())
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()

    //6.啟動任務
    ssc.start()
    ssc.awaitTermination()

  }

}

5. 消費Kafka數(shù)據(jù)模式總結(jié)

  1. 0-8 ReceiverAPI:

  • 1)專門的Executor讀取數(shù)據(jù),速度不統(tǒng)一

  • 2)跨機器傳輸數(shù)據(jù),WAL

  • 3)Executor讀取數(shù)據(jù)通過多個線程的方式,想要增加并行度,則需要多個流union

  • 4)offset存儲在Zookeeper中

  1. 0-8 DirectAPI:

  • 1)Executor讀取數(shù)據(jù)并計算

  • 2)增加Executor個數(shù)來增加消費的并行度

  • 3)offset存儲


    • a)CheckPoint(getActiveOrCreate方式創(chuàng)建StreamingContext)


    • b)手動維護(有事務的存儲系統(tǒng))


    • c)獲取offset必須在第一個調(diào)用的算子中:offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  1. 0-10 DirectAPI:

  • 1)Executor讀取數(shù)據(jù)并計算

  • 2)增加Executor個數(shù)來增加消費的并行度

  • 3)offset存儲


    • i.a.__consumer_offsets系統(tǒng)主題中


    • ii.b.手動維護(有事務的存儲系統(tǒng))

到此,關于“Dstream的創(chuàng)建方法”的學習就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI