溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

direct Dstream是什么

發(fā)布時間:2021-12-27 10:42:56 來源:億速云 閱讀:140 作者:小新 欄目:大數(shù)據(jù)

這篇文章主要為大家展示了“direct Dstream是什么”,內(nèi)容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“direct Dstream是什么”這篇文章吧。

前言

前面,有分享過基于receiver的,實際上,看到receiver based Dstream大家就對閱讀提不起興趣了,實際上這是錯誤的,基于receiver的才是spark streaming根本,雖然direct stream才更合適。但是,我們從基于receiver可以學(xué)到很多內(nèi)容,最重要的spark streaming實現(xiàn)原理,數(shù)據(jù)本地性等。

direct dstream運行架構(gòu)圖

direct Dstream是什么

對比

對比receiver based的Dstream和direct Dstream

   a 無需啟動receiver,減少不必要的cpu占用

   b 減少了receiver接收數(shù)據(jù),寫入blockmanager,然后運行時再通過blockid,網(wǎng)絡(luò)傳輸,磁盤讀區(qū),來獲取數(shù)據(jù)這個過程。提升了效率。

   c 無需wal,進(jìn)一步減少磁盤讀寫。

   d 可以通過手動維護(hù)offset來實現(xiàn)精確的一次消費。

   e Dstream中生成的RDD,并不是blockrdd,而是kafkardd,kafkardd是和kafka分區(qū)一一對應(yīng)的,更便于我們把控并行度。

   f 數(shù)據(jù)本地性的問題,導(dǎo)致receiver存在的機(jī)器會運行過多的任務(wù),會導(dǎo)致有些executor空閑。

而kafkardd,在compute函數(shù)里,會使用simpleconsumer,根據(jù)指定的topic,分區(qū),offset范圍,去kafka讀取數(shù)據(jù)。010版本以后,又存在假如kafka和spark運行于同一集群,會有數(shù)據(jù)本性的概念。

數(shù)據(jù)本地性

spark streaming與kafka 082結(jié)合生成的rdd,數(shù)據(jù)本地性計算方式如下:

override def getPreferredLocations(thePart: Partition): Seq[String] = {
 val part = thePart.asInstanceOf[KafkaRDDPartition]
 // TODO is additional hostname resolution necessary here
 Seq(part.host)
}

spark streaming 與kafka 010結(jié)合生成的rdd,數(shù)據(jù)本地性計算方式如下:

override def getPreferredLocations(thePart: Partition): Seq[String] = {
 // The intention is best-effort consistent executor for a given topicpartition,
 // so that caching consumers can be effective.
 // TODO what about hosts specified by ip vs name
 val part = thePart.asInstanceOf[KafkaRDDPartition]
 val allExecs = executors()
 val tp = part.topicPartition
 val prefHost = preferredHosts.get(tp)
 val prefExecs = if (null == prefHost) allExecs else allExecs.filter(_.host == prefHost)
 val execs = if (prefExecs.isEmpty) allExecs else prefExecs
 if (execs.isEmpty) {
   Seq.empty
 } else {
   // execs is sorted, tp.hashCode depends only on topic and partition, so consistent index
   val index = Math.floorMod(tp.hashCode, execs.length)
   val chosen = execs(index)
   Seq(chosen.toString)
 }
}

對于 與kafka010結(jié)合的注意事項,實際上以前浪尖也翻譯過一篇文章。

必讀:Spark與kafka010整合

限速

限速,很多人使用姿勢不對,詳細(xì)的原理可以參看

Spark的PIDController源碼賞析及backpressure詳解

具體配置參數(shù)詳解,可以參考:

  1. spark.streaming.backpressure.enabled 默認(rèn)是false,設(shè)置為true,就開啟了背壓機(jī)制。

  2. spark.streaming.backpressure.initialRate 默認(rèn)沒設(shè)置,初始速率。第一次啟動的時候每個receiver接受數(shù)據(jù)的最大值。

  3. spark.streaming.receiver.maxRate 默認(rèn)值沒設(shè)置。每個接收器將接收數(shù)據(jù)的最大速率(每秒記錄數(shù))。 實際上,每個流每秒最多將消費此數(shù)量的記錄。 將此配置設(shè)置為0或負(fù)數(shù)將不會對速率進(jìn)行限制。

  4. spark.streaming.kafka.maxRatePerPartition 使用新Kafka direct API時從每個Kafka分區(qū)讀取數(shù)據(jù)的最大速率(每秒記錄數(shù))。


以上是“direct Dstream是什么”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI