溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

DStreams上的輸出操作是怎樣的

發(fā)布時(shí)間:2022-01-14 15:10:43 來源:億速云 閱讀:138 作者:柒染 欄目:云計(jì)算

本篇文章給大家分享的是有關(guān)DStreams上的輸出操作是怎樣的,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

dstream.foreachRDD是一個(gè)強(qiáng)大的原語,發(fā)送數(shù)據(jù)到外部系統(tǒng)中。然而,明白怎樣正確地、有效地用這個(gè)原語是非常重要的。下面幾點(diǎn)介紹了如何避免一般錯(cuò)誤。

  • 經(jīng)常寫數(shù)據(jù)到外部系統(tǒng)需要建一個(gè)連接對(duì)象(例如到遠(yuǎn)程服務(wù)器的TCP連接),用它發(fā)送數(shù)據(jù)到遠(yuǎn)程系統(tǒng)。為了達(dá)到這個(gè)目的,開發(fā)人員可能不經(jīng)意的在Spark驅(qū)動(dòng)中創(chuàng)建一個(gè)連接對(duì)象,但是在Spark worker中 嘗試調(diào)用這個(gè)連接對(duì)象保存記錄到RDD中,如下:

 dstream.foreachRDD(rdd => {      val connection = createNewConnection()  // executed at the driver
      rdd.foreach(record => {
          connection.send(record) // executed at the worker
      })
  })

這是不正確的,因?yàn)檫@需要先序列化連接對(duì)象,然后將它從driver發(fā)送到worker中。這樣的連接對(duì)象在機(jī)器之間不能傳送。它可能表現(xiàn)為序列化錯(cuò)誤(連接對(duì)象不可序列化)或者初始化錯(cuò)誤(連接對(duì)象應(yīng)該 在worker中初始化)等等。正確的解決辦法是在worker中創(chuàng)建連接對(duì)象。

  • 然而,這會(huì)造成另外一個(gè)常見的錯(cuò)誤-為每一個(gè)記錄創(chuàng)建了一個(gè)連接對(duì)象。例如:

dstream.foreachRDD(rdd => {
      rdd.foreach(record => {
          val connection = createNewConnection()
          connection.send(record)
          connection.close()
      })
  })

通常,創(chuàng)建一個(gè)連接對(duì)象有資源和時(shí)間的開支。因此,為每個(gè)記錄創(chuàng)建和銷毀連接對(duì)象會(huì)導(dǎo)致非常高的開支,明顯的減少系統(tǒng)的整體吞吐量。一個(gè)更好的解決辦法是利用rdd.foreachPartition方法。 為RDD的partition創(chuàng)建一個(gè)連接對(duì)象,用這個(gè)兩件對(duì)象發(fā)送partition中的所有記錄。

dstream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
          val connection = createNewConnection()
          partitionOfRecords.foreach(record => connection.send(record))
          connection.close()
      })
  })

這就將連接對(duì)象的創(chuàng)建開銷分?jǐn)偟搅藀artition的所有記錄上了。

  • 最后,可以通過在多個(gè)RDD或者批數(shù)據(jù)間重用連接對(duì)象做更進(jìn)一步的優(yōu)化。開發(fā)者可以保有一個(gè)靜態(tài)的連接對(duì)象池,重復(fù)使用池中的對(duì)象將多批次的RDD推送到外部系統(tǒng),以進(jìn)一步節(jié)省開支。

dstream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
          // ConnectionPool is a static, lazily initialized pool of connections
          val connection = ConnectionPool.getConnection()
          partitionOfRecords.foreach(record => connection.send(record))
          ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      })
  })

需要注意的是,池中的連接對(duì)象應(yīng)該根據(jù)需要延遲創(chuàng)建,并且在空閑一段時(shí)間后自動(dòng)超時(shí)。這樣就獲取了最有效的方式發(fā)生數(shù)據(jù)到外部系統(tǒng)。

其它需要注意的地方:

  • 輸出操作通過懶執(zhí)行的方式操作DStreams,正如RDD action通過懶執(zhí)行的方式操作RDD。具體地看,RDD actions和DStreams輸出操作接收數(shù)據(jù)的處理。因此,如果你的應(yīng)用程序沒有任何輸出操作或者 用于輸出操作dstream.foreachRDD(),但是沒有任何RDD action操作在dstream.foreachRDD()里面,那么什么也不會(huì)執(zhí)行。系統(tǒng)僅僅會(huì)接收輸入,然后丟棄它們。

  • 默認(rèn)情況下,DStreams輸出操作是分時(shí)執(zhí)行的,它們按照應(yīng)用程序的定義順序按序執(zhí)行。

以上就是DStreams上的輸出操作是怎樣的,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI