您好,登錄后才能下訂單哦!
本篇文章為大家展示了Spark Streaming反壓機制是怎么樣的,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。
在默認(rèn)情況下,Spark Streaming 通過 receivers (或者是 Direct 方式) 以生產(chǎn)者生產(chǎn)數(shù)據(jù)的速率接收數(shù)據(jù)。當(dāng) batch processing time > batch interval 的時候,也就是每個批次數(shù)據(jù)處理的時間要比 Spark Streaming 批處理間隔時間長;越來越多的數(shù)據(jù)被接收,但是數(shù)據(jù)的處理速度沒有跟上,導(dǎo)致系統(tǒng)開始出現(xiàn)數(shù)據(jù)堆積,可能進一步導(dǎo)致 Executor 端出現(xiàn) OOM 問題而出現(xiàn)失敗的情況。
而在 Spark 1.5 版本之前,為了解決這個問題,對于 Receiver-based 數(shù)據(jù)接收器,我們可以通過配置 spark.streaming.receiver.maxRate
參數(shù)來限制每個 receiver 每秒最大可以接收的記錄的數(shù)據(jù);對于 Direct Approach 的數(shù)據(jù)接收,我們可以通過配置 spark.streaming.kafka.maxRatePerPartition
參數(shù)來限制每次作業(yè)中每個 Kafka 分區(qū)最多讀取的記錄條數(shù)。這種方法雖然可以通過限制接收速率,來適配當(dāng)前的處理能力,但這種方式存在以下幾個問題:
我們需要事先估計好集群的處理速度以及消息數(shù)據(jù)的產(chǎn)生速度;
這兩種方式需要人工參與,修改完相關(guān)參數(shù)之后,我們需要手動重啟 Spark Streaming 應(yīng)用程序;
如果當(dāng)前集群的處理能力高于我們配置的 maxRate,而且 producer 產(chǎn)生的數(shù)據(jù)高于 maxRate,這會導(dǎo)致集群資源利用率低下,而且也會導(dǎo)致數(shù)據(jù)不能夠及時處理。
那么有沒有可能不需要人工干預(yù),Spark Streaming 系統(tǒng)自動處理這些問題呢?當(dāng)然有了!Spark 1.5 引入了反壓(Back Pressure)機制,其通過動態(tài)收集系統(tǒng)的一些數(shù)據(jù)來自動地適配集群數(shù)據(jù)處理能力。詳細(xì)的記錄請參見 SPARK-7398 里面的說明。
在 Spark 1.5 版本之前,Spark Streaming 的體系結(jié)構(gòu)如下所示:
數(shù)據(jù)是源源不斷的通過 receiver 接收,當(dāng)數(shù)據(jù)被接收后,其將這些數(shù)據(jù)存儲在 Block Manager 中;為了不丟失數(shù)據(jù),其還將數(shù)據(jù)備份到其他的 Block Manager 中;
Receiver Tracker 收到被存儲的 Block IDs,然后其內(nèi)部會維護一個時間到這些 block IDs 的關(guān)系;
Job Generator 會每隔 batchInterval 的時間收到一個事件,其會生成一個 JobSet;
Job Scheduler 運行上面生成的 JobSet。
為了實現(xiàn)自動調(diào)節(jié)數(shù)據(jù)的傳輸速率,在原有的架構(gòu)上新增了一個名為 RateController
的組件,這個組件繼承自 StreamingListener
,其監(jiān)聽所有作業(yè)的 onBatchCompleted
事件,并且基于 processingDelay
、schedulingDelay
、當(dāng)前 Batch 處理的記錄條數(shù)以及處理完成事件來估算出一個速率;這個速率主要用于更新流每秒能夠處理的最大記錄的條數(shù)。速率估算器(RateEstimator
)可以又多種實現(xiàn),不過目前的 Spark 2.2 只實現(xiàn)了基于 PID 的速率估算器。
InputDStreams 內(nèi)部的 RateController
里面會存下計算好的最大速率,這個速率會在處理完 onBatchCompleted
事件之后將計算好的速率推送到 ReceiverSupervisorImpl
,這樣接收器就知道下一步應(yīng)該接收多少數(shù)據(jù)了。
如果用戶配置了 spark.streaming.receiver.maxRate
或 spark.streaming.kafka.maxRatePerPartition
,那么最后到底接收多少數(shù)據(jù)取決于三者的最小值。也就是說每個接收器或者每個 Kafka 分區(qū)每秒處理的數(shù)據(jù)不會超過 spark.streaming.receiver.maxRate
或 spark.streaming.kafka.maxRatePerPartition
的值。
詳細(xì)的過程如下圖所示:
在 Spark 啟用反壓機制很簡單,只需要將 spark.streaming.backpressure.enabled
設(shè)置為 true
即可,這個參數(shù)的默認(rèn)值為 false。反壓機制還涉及以下幾個參數(shù),包括文檔中沒有列出來的:
spark.streaming.backpressure.initialRate: 啟用反壓機制時每個接收器接收第一批數(shù)據(jù)的初始最大速率。默認(rèn)值沒有設(shè)置。
spark.streaming.backpressure.rateEstimator:速率估算器類,默認(rèn)值為 pid ,目前 Spark 只支持這個,大家可以根據(jù)自己的需要實現(xiàn)。
spark.streaming.backpressure.pid.proportional:用于響應(yīng)錯誤的權(quán)重(最后批次和當(dāng)前批次之間的更改)。默認(rèn)值為1,只能設(shè)置成非負(fù)值。weight for response to "error" (change between last batch and this batch)
spark.streaming.backpressure.pid.integral:錯誤積累的響應(yīng)權(quán)重,具有抑制作用(有效阻尼)。默認(rèn)值為 0.2 ,只能設(shè)置成非負(fù)值。weight for the response to the accumulation of error. This has a dampening effect.
spark.streaming.backpressure.pid.derived:對錯誤趨勢的響應(yīng)權(quán)重。 這可能會引起 batch size 的波動,可以幫助快速增加/減少容量。默認(rèn)值為0,只能設(shè)置成非負(fù)值。weight for the response to the trend in error. This can cause arbitrary/noise-induced fluctuations in batch size, but can also help react quickly to increased/reduced capacity.
spark.streaming.backpressure.pid.minRate:可以估算的最低費率是多少。默認(rèn)值為 100,只能設(shè)置成非負(fù)值。
上述內(nèi)容就是Spark Streaming反壓機制是怎么樣的,你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲備,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。