溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Spark Streaming編程方法是什么

發(fā)布時(shí)間:2021-12-16 13:49:18 來(lái)源:億速云 閱讀:110 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“Spark Streaming編程方法是什么”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Spark Streaming編程方法是什么”吧!

有狀態(tài)的計(jì)算

updateStateByKey

上一篇文章中介紹了常見的無(wú)狀態(tài)的轉(zhuǎn)換操作,比如在WordCount的例子中,輸出的結(jié)果只與當(dāng)前batch interval的數(shù)據(jù)有關(guān),不會(huì)依賴于上一個(gè)batch interval的計(jì)算結(jié)果。spark Streaming也提供了有狀態(tài)的操作:updateStateByKey,該算子會(huì)維護(hù)一個(gè)狀態(tài),同時(shí)進(jìn)行信息更新 。該操作會(huì)讀取上一個(gè)batch interval的計(jì)算結(jié)果,然后將其結(jié)果作用到當(dāng)前的batch interval數(shù)據(jù)統(tǒng)計(jì)中。其源碼如下:

def updateStateByKey[S: ClassTag](
      updateFunc: (Seq[V], Option[S]) => Option[S]
    ): DStream[(K, S)] = ssc.withScope {
    updateStateByKey(updateFunc, defaultPartitioner())
  }
 

該算子只能在key–value對(duì)的DStream上使用,需要接收一個(gè)狀態(tài)更新函數(shù) updateFunc作為參數(shù)。使用案例如下:

object StateWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName(StateWordCount.getClass.getSimpleName)
    val ssc = new StreamingContext(conf, Seconds(5))
    // 必須開啟checkpoint,否則會(huì)報(bào)錯(cuò)
    ssc.checkpoint("file:///e:/checkpoint")
    val lines = ssc.socketTextStream("localhost", 9999)

    // 狀態(tài)更新函數(shù)
    def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {

      var oldvalue = stateValue.getOrElse(0) // 獲取狀態(tài)值
      // 遍歷當(dāng)前數(shù)據(jù),并更新狀態(tài)
      for (newValue <- newValues) {
        oldvalue += newValue
      }
      // 返回最新的狀態(tài)
      Option(oldvalue)
    }

    val count = lines.flatMap(_.split(" "))
      .map(w => (w, 1))
      .updateStateByKey(updateFunc)
    count.print()
    ssc.start()
    ssc.awaitTermination()
  }

}
 

尖叫提示:上面的代碼必須要開啟checkpoint,否則會(huì)報(bào)錯(cuò):

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint() 

updateStateByKey缺點(diǎn)

運(yùn)行上面的代碼會(huì)發(fā)現(xiàn)一個(gè)現(xiàn)象:即便沒有數(shù)據(jù)源輸入,Spark也會(huì)為新的batch interval更新狀態(tài),即如果沒有數(shù)據(jù)源輸入,則會(huì)不斷地輸出之前的計(jì)算狀態(tài)結(jié)果。

updateStateByKey可以在指定的批次間隔內(nèi)返回之前的全部歷史數(shù)據(jù),包括新增的,改變的和沒有改變的。由于updateStateByKey在使用的時(shí)候一定要做checkpoint,當(dāng)數(shù)據(jù)量過(guò)大的時(shí)候,checkpoint會(huì)占據(jù)龐大的數(shù)據(jù)量,會(huì)影響性能,效率不高。 

mapwithState

mapwithState是Spark提供的另外一個(gè)有狀態(tài)的算子,該操作克服了updateStateByKey的缺點(diǎn),從Spark 1.5開始引入。源碼如下:

def mapWithState[StateType: ClassTag, MappedType: ClassTag](
      spec: StateSpec[K, V, StateType, MappedType]
    ): MapWithStateDStream[K, V, StateType, MappedType] = {
    new MapWithStateDStreamImpl[K, V, StateType, MappedType](
      self,
      spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
    )
  }

 

mapWithState只返回發(fā)生變化的key的值,對(duì)于沒有發(fā)生變化的Key,則不返回。這樣做可以只關(guān)心那些已經(jīng)發(fā)生的變化的key,對(duì)于沒有數(shù)據(jù)輸入,則不會(huì)返回那些沒有變化的key 的數(shù)據(jù)。這樣的話,即使數(shù)據(jù)量很大,checkpint也不會(huì)updateBykey那樣,占用太多的存儲(chǔ),效率比較高(生產(chǎn)環(huán)境中建議使用)。

object StatefulNetworkWordCount {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf()
      .setAppName("StatefulNetworkWordCount")
      .setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.checkpoint("file:///e:/checkpoint")

    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    /**
      * word:當(dāng)前key的值
      * one:當(dāng)前key對(duì)應(yīng)的value值
      * state:狀態(tài)值
      */
    val mappingFunc = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
      val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
      println(s">>> batchTime = $batchTime")
      println(s">>> word      = $word")
      println(s">>> one     = $one")
      println(s">>> state     = $state")
      val output = (word, sum)
      state.update(sum) //更新當(dāng)前key的狀態(tài)值
      Some(output) //返回結(jié)果
    }
    // 通過(guò)StateSpec.function構(gòu)建StateSpec
    val spec = StateSpec.function(mappingFunc)
    val stateDstream = wordDstream.mapWithState(spec)
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
   

基于時(shí)間的窗口操作

Spark Streaming提供了兩種類型的窗口操作,分別是滾動(dòng)窗口和滑動(dòng)窗口。具體分析如下: 

滾動(dòng)窗口(Tumbling Windows)

滾動(dòng)窗口的示意圖如下:滾動(dòng)窗口只需要傳入一個(gè)固定的時(shí)間間隔,滾動(dòng)窗口是不存在重疊的。

Spark Streaming編程方法是什么  

源碼如下:

/**
   * @param windowDuration:窗口的長(zhǎng)度; 必須是batch interval的整數(shù)倍.
   */
  def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration
   

滑動(dòng)窗口(Sliding Windows)

滑動(dòng)窗口的示意圖如下:滑動(dòng)窗口只需要傳入兩個(gè)參數(shù),一個(gè)為窗口的長(zhǎng)度,一個(gè)是滑動(dòng)時(shí)間間隔??梢钥闯觯夯瑒?dòng)窗口是存在重疊的。

Spark Streaming編程方法是什么  

源碼如下:

/**
   * @param windowDuration 窗口長(zhǎng)度;必須是batching interval的整數(shù)倍
   *                       
   * @param slideDuration  滑動(dòng)間隔;必須是batching interval的整數(shù)倍
   */
  def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
    new WindowedDStream(this, windowDuration, slideDuration)
  }
   

窗口操作

  • window(windowLength, slideInterval)

    • 解釋

      基于源DStream產(chǎn)生的窗口化的批數(shù)據(jù),計(jì)算得到一個(gè)新的Dstream

    • 源碼

        def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
        def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
          new WindowedDStream(this, windowDuration, slideDuration)
        }
  • countByWindow(windowLength, slideInterval)

    返回一個(gè)滑動(dòng)窗口的元素個(gè)數(shù)

    • 源碼

      /**
         * @param windowDuration window長(zhǎng)度,必須是batch interval的倍數(shù) 
         * @param slideDuration  滑動(dòng)的時(shí)間間隔,必須是batch interval的倍數(shù)
         * 底層調(diào)用的是reduceByWindow
         */
        def countByWindow(
            windowDuration: Duration,
            slideDuration: Duration): DStream[Long] = ssc.withScope {
          this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
        }
    • 解釋
  • reduceByWindow(func, windowLength, slideInterval)

    返回一個(gè)單元素流。利用函數(shù)func聚集滑動(dòng)時(shí)間間隔的流的元素創(chuàng)建這個(gè)單元素流。函數(shù)func必須滿足結(jié)合律,從而可以支持并行計(jì)算

    • 源碼

        def reduceByWindow(
            reduceFunc: (T, T) => T,
            windowDuration: Duration,
            slideDuration: Duration
          ): DStream[T] = ssc.withScope {
          this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
        }
    • 解釋
  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

    應(yīng)用到一個(gè)(K,V)鍵值對(duì)組成的DStream上時(shí),會(huì)返回一個(gè)由(K,V)鍵值對(duì)組成的新的DStream。每一個(gè)key的值均由給定的reduce函數(shù)(func函數(shù))進(jìn)行聚合計(jì)算。注意:在默認(rèn)情況下,這個(gè)算子利用了Spark默認(rèn)的并發(fā)任務(wù)數(shù)去分組。可以通過(guò)numTasks參數(shù)的設(shè)置來(lái)指定不同的任務(wù)數(shù)

    • 源碼

        def reduceByKeyAndWindow(
            reduceFunc: (V, V) => V,
            windowDuration: Duration,
            slideDuration: Duration
          ): DStream[(K, V)] = ssc.withScope {
          reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
        }
    • 解釋
  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

    更加高效的reduceByKeyAndWindow,每個(gè)窗口的reduce值,是基于先前窗口的reduce值進(jìn)行增量計(jì)算得到的;它會(huì)對(duì)進(jìn)入滑動(dòng)窗口的新數(shù)據(jù)進(jìn)行reduce操作,并對(duì)離開窗口的老數(shù)據(jù)進(jìn)行逆向reduce操作。但是,只能用于可逆reduce函數(shù),即那些reduce函數(shù)都有一個(gè)對(duì)應(yīng)的逆向reduce函數(shù)(以InvFunc參數(shù)傳入)注意:必須開啟 checkpointing

    • 源碼

      def reduceByKeyAndWindow(
            reduceFunc: (V, V) => V,
            invReduceFunc: (V, V) => V,
            windowDuration: Duration,
            slideDuration: Duration,
            partitioner: Partitioner,
            filterFunc: ((K, V)) => Boolean
          ): DStream[(K, V)] = ssc.withScope {

          val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
          val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
          val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
          new ReducedWindowedDStream[K, V](
            self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
            windowDuration, slideDuration, partitioner
          )
        }
    • 解釋
  • countByValueAndWindow(windowLength, slideInterval, [numTasks])

    • 解釋

      當(dāng)應(yīng)用到一個(gè)(K,V)鍵值對(duì)組成的DStream上,返回一個(gè)由(K,V)鍵值對(duì)組成的新的DStream。每個(gè)key的對(duì)應(yīng)的value值都是它們?cè)诨瑒?dòng)窗口中出現(xiàn)的頻率

    • 源碼

      def countByValueAndWindow(
            windowDuration: Duration,
            slideDuration: Duration,
            numPartitions: Int = ssc.sc.defaultParallelism)
            (implicit ord: Ordering[T] = null)
            : DStream[(T, Long)] = ssc.withScope {
          this.map((_, 1L)).reduceByKeyAndWindow(
            (x: Long, y: Long) => x + y,
            (x: Long, y: Long) => x - y,
            windowDuration,
            slideDuration,
            numPartitions,
            (x: (T, Long)) => x._2 != 0L
          )
        }
 

使用案例

val lines = ssc.socketTextStream("localhost", 9999)

    val count = lines.flatMap(_.split(" "))
      .map(w => (w, 1))
      .reduceByKeyAndWindow((w1: Int, w2: Int) => w1 + w2, Seconds(30), Seconds(10))
      .print()
//滾動(dòng)窗口

/*    lines.window(Seconds(20))
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()*/
   

持久化

持久化是提升Spark應(yīng)用性能的一種方式,在第二篇|Spark core編程指南一文中講解了RDD持久化的使用方式。其實(shí),DStream也是支持持久化的,同樣是使用persist()與cache()方法,持久化通常在有狀態(tài)的算子中使用,比如窗口操作,默認(rèn)情況下,雖然沒有顯性地調(diào)用持久化方法,但是底層已經(jīng)幫用戶做了持久化操作,通過(guò)下面的源碼可以看出。

private[streaming]
class WindowedDStream[T: ClassTag](
    parent: DStream[T],
    _windowDuration: Duration,
    _slideDuration: Duration)
  extends DStream[T](parent.ssc) {
  // 省略代碼...
  // Persist parent level by default, as those RDDs are going to be obviously reused.
  parent.persist(StorageLevel.MEMORY_ONLY_SER)
}

 

注意:與RDD的持久化不同,DStream的默認(rèn)持久性級(jí)別將數(shù)據(jù)序列化在內(nèi)存中,通過(guò)下面的源碼可以看出:

/** 給定一個(gè)持計(jì)劃級(jí)別 */
  def persist(level: StorageLevel): DStream[T] = {
    if (this.isInitialized) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of a DStream after streaming context has started")
    }
    this.storageLevel = level
    this
  }

  /** 默認(rèn)的持久化級(jí)別為(MEMORY_ONLY_SER) */
  def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
  def cache(): DStream[T] = persist()
 

從上面的源碼可以看出persist()與cache()的主要區(qū)別是:

  • cache()方法底層調(diào)用的是persist()方法
  • persist()方法有兩個(gè)重載的方法
    • 無(wú)參數(shù)的persist(),默認(rèn)是內(nèi)存
    • perisist(level: StorageLevel),可以選擇與RDD持久化相同的持久化級(jí)別
 

檢查點(diǎn)Checkpoint

 

簡(jiǎn)介

流應(yīng)用程序通常是24/7運(yùn)行的,因此必須對(duì)與應(yīng)用程序邏輯無(wú)關(guān)的故障(例如系統(tǒng)故障,JVM崩潰等)具有彈性的容錯(cuò)能力。為此,Spark Streaming需要將足夠的信息checkpoint到容錯(cuò)存儲(chǔ)系統(tǒng)(比如HDFS),以便可以從故障中恢復(fù)。檢查點(diǎn)包括兩種類型:

  • 元數(shù)據(jù)檢查點(diǎn)

    元數(shù)據(jù)檢查點(diǎn)可以保證從Driver程序失敗中恢復(fù)。即如果運(yùn)行drive的節(jié)點(diǎn)失敗時(shí),可以查看最近的checkpoin數(shù)據(jù)獲取最新的狀態(tài)。典型的應(yīng)用程序元數(shù)據(jù)包括:

    • 配置 :用于創(chuàng)建流應(yīng)用程序的配置。
    • DStream操作 :定義流應(yīng)用程序的DStream操作。
    • 未完成的batch :當(dāng)前運(yùn)行batch對(duì)應(yīng)的job在隊(duì)列中排隊(duì),還沒有計(jì)算到該batch的數(shù)據(jù)。
  • 數(shù)據(jù)檢查點(diǎn)

    將生成的RDD保存到可靠的存儲(chǔ)中。在某些有狀態(tài)轉(zhuǎn)換中,需要合并多個(gè)批次中的數(shù)據(jù),所以需要開啟檢查點(diǎn)。在此類轉(zhuǎn)換中,生成的RDD依賴于先前批次的RDD,這導(dǎo)致依賴鏈的長(zhǎng)度隨時(shí)間不斷增加。為了避免恢復(fù)時(shí)間無(wú)限制的增加(與依賴鏈成比例),有狀態(tài)轉(zhuǎn)換的中間RDD定期 checkpoint到可靠的存儲(chǔ)(例如HDFS),以切斷依賴鏈,功能類似于持久化,只需要從當(dāng)前的狀態(tài)恢復(fù),而不需要重新計(jì)算整個(gè)lineage。

總而言之,從Driver程序故障中恢復(fù)時(shí),主要需要元數(shù)據(jù)檢查點(diǎn)。而如果使用有狀態(tài)轉(zhuǎn)換,則需要數(shù)據(jù)或RDD檢查點(diǎn)。

 

什么時(shí)候啟用檢查點(diǎn)

必須為具有以下類型的應(yīng)用程序啟用檢查點(diǎn):

  • 使用了有狀態(tài)轉(zhuǎn)換轉(zhuǎn)換操作

    如果在應(yīng)用程序中使用updateStateByKeyreduceByKeyAndWindow,則必須提供檢查點(diǎn)目錄以允許定期進(jìn)行RDD檢查點(diǎn)。

  • 從運(yùn)行應(yīng)用程序的Driver程序故障中恢復(fù)

    元數(shù)據(jù)檢查點(diǎn)用于恢復(fù)進(jìn)度信息。

注意,沒有前述狀態(tài)轉(zhuǎn)換的簡(jiǎn)單流應(yīng)用程序可以在不啟用檢查點(diǎn)的情況下運(yùn)行。在這種情況下,從驅(qū)動(dòng)程序故障中恢復(fù)也將是部分的(某些丟失但未處理的數(shù)據(jù)可能會(huì)丟失)。這通常是可以接受的,并且許多都以這種方式運(yùn)行Spark Streaming應(yīng)用程序。預(yù)計(jì)將來(lái)會(huì)改善對(duì)非Hadoop環(huán)境的支持。

 

如何配置檢查點(diǎn)

可以通過(guò)具有容錯(cuò)的、可靠的文件系統(tǒng)(例如HDFS,S3等)中設(shè)置目錄來(lái)啟用檢查點(diǎn),將檢查點(diǎn)信息保存到該目錄中。開啟檢查點(diǎn),需要開啟下面的兩個(gè)配置:

  • streamingContext.checkpoint():配置檢查點(diǎn)的目錄,比如HDFS路徑
  • dstream.checkpoint():檢查點(diǎn)的頻率

其中配置檢查點(diǎn)的時(shí)間間隔是可選的。如果不設(shè)置,會(huì)根據(jù)DStream的類型選擇一個(gè)默認(rèn)值。對(duì)于MapWithStateDStream,默認(rèn)的檢查點(diǎn)間隔是batch interval的10倍。對(duì)于其他的DStream,默認(rèn)的檢查點(diǎn)間隔是10S,或者是batch interval的間隔時(shí)間。需要注意的是:checkpoint的頻率必須是 batch interval的整數(shù)倍,否則會(huì)報(bào)錯(cuò)

此外,如果要使應(yīng)用程序從Driver程序故障中恢復(fù),則需要使用下面的方式創(chuàng)建StreamingContext:

def createStreamingContext (conf: SparkConf,checkpointPath: String):
StreamingContext = {
val ssc = new StreamingContext( <ConfInfo> )
// .... other code ...
ssc.checkPoint(checkpointDirectory)
ssc
}
#創(chuàng)建一個(gè)新的StreamingContext或者從最近的checkpoint獲取
val context = StreamingContext.getOrCreate(checkpointDirectory,
createStreamingContext _)
#啟動(dòng)
context.start()
context.awaitTermination()
 
  • 程序首次啟動(dòng)時(shí),它將創(chuàng)建一個(gè)新的StreamingContext,然后調(diào)用start()。
  • 失敗后重新啟動(dòng)程序時(shí),它將根據(jù)檢查點(diǎn)目錄中的檢查點(diǎn)數(shù)據(jù)重新創(chuàng)建StreamingContext。

注意:

RDD的檢查點(diǎn)需要將數(shù)據(jù)保存到可靠存儲(chǔ)上,由此帶來(lái)一些成本開銷。這可能會(huì)導(dǎo)致RDD獲得檢查點(diǎn)的那些批次的處理時(shí)間增加。因此,需要設(shè)置一個(gè)合理的檢查點(diǎn)的間隔。在batch interval較小時(shí)(例如1秒),每個(gè)batch interval都進(jìn)行檢查點(diǎn)可能會(huì)大大降低吞吐量。相反,檢查點(diǎn)時(shí)間間隔太長(zhǎng)會(huì)導(dǎo)致 lineage和任務(wù)規(guī)模增加,這可能會(huì)產(chǎn)生不利影響。對(duì)于需要RDD檢查點(diǎn)的有狀態(tài)轉(zhuǎn)換,默認(rèn)間隔為batch interval的倍數(shù),至少應(yīng)為10秒??梢允褂?**dstream.checkpoint(checkpointInterval)**進(jìn)行配置。通常,DStream的5-10個(gè)batch interval的檢查點(diǎn)間隔是一個(gè)較好的選擇。

 

檢查點(diǎn)和持久化之間的區(qū)別

  • 持久化

    • 當(dāng)我們將RDD保持在DISK_ONLY存儲(chǔ)級(jí)別時(shí),RDD將存儲(chǔ)在一個(gè)位置,該RDD的后續(xù)使用將不會(huì)重新計(jì)算lineage。
    • 在調(diào)用persist()之后,Spark會(huì)記住RDD的lineage,即使它沒有調(diào)用它。
    • 作業(yè)運(yùn)行完成后,將清除緩存并銷毀文件。
  • 檢查點(diǎn)

    • 檢查點(diǎn)將RDD存儲(chǔ)在HDFS中,將會(huì)刪除lineage血緣關(guān)系。
    • 在完成作業(yè)運(yùn)行后,與持計(jì)劃不同,不會(huì)刪除檢查點(diǎn)文件。
    • 當(dāng)checkpoint一個(gè)RDD時(shí),將導(dǎo)致雙重計(jì)算。即該操作在完成實(shí)際的計(jì)算工作之前,首先會(huì)調(diào)用持久化方法,然后再將其寫入檢查點(diǎn)目錄。
 

使用DataFrames & SQL處理流數(shù)據(jù)

在Spark Streaming應(yīng)用中,可以輕松地對(duì)流數(shù)據(jù)使用DataFrames和SQL操作。使用案例如下:

object SqlStreaming {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName(SqlStreaming.getClass.getSimpleName)
      .setMaster("local[4]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))

    words.foreachRDD { rdd =>
      // 調(diào)用SparkSession單例方法,如果已經(jīng)創(chuàng)建了,則直接返回
      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
      import spark.implicits._

      val wordsDataFrame = rdd.toDF("word")
      wordsDataFrame.show()

      wordsDataFrame.createOrReplaceTempView("words")

      val wordCountsDataFrame =
        spark.sql("select word, count(*) as total from words group by word")
      wordCountsDataFrame.show()

    }


    ssc.start()
    ssc.awaitTermination()
  }
}
/** SparkSession單例 */
object SparkSessionSingleton {

  @transient private var instance: SparkSession = _

  def getInstance(sparkConf: SparkConf): SparkSession = {
    if (instance == null) {
      instance = SparkSession
        .builder
        .config(sparkConf)
        .getOrCreate()
    }
    instance
  }
}

感謝各位的閱讀,以上就是“Spark Streaming編程方法是什么”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Spark Streaming編程方法是什么這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI