您好,登錄后才能下訂單哦!
Spark Streaming中的反壓機(jī)制是Spark 1.5.0推出的新特性,可以根據(jù)處理效率動(dòng)態(tài)調(diào)整攝入速率。
當(dāng)批處理時(shí)間(Batch Processing Time)大于批次間隔(Batch Interval,即 BatchDuration)時(shí),說明處理數(shù)據(jù)的速度小于數(shù)據(jù)攝入的速度,持續(xù)時(shí)間過長(zhǎng)或源頭數(shù)據(jù)暴增,容易造成數(shù)據(jù)在內(nèi)存中堆積,最終導(dǎo)致Executor OOM或任務(wù)奔潰。
在這種情況下,若是基于Kafka Receiver的數(shù)據(jù)源,可以通過設(shè)置spark.streaming.receiver.maxRate來控制最大輸入速率;若是基于Direct的數(shù)據(jù)源(如Kafka Direct Stream),則可以通過設(shè)置spark.streaming.kafka.maxRatePerPartition來控制最大輸入速率。當(dāng)然,在事先經(jīng)過壓測(cè),且流量高峰不會(huì)超過預(yù)期的情況下,設(shè)置這些參數(shù)一般沒什么問題。但最大值,不代表是最優(yōu)值,最好還能根據(jù)每個(gè)批次處理情況來動(dòng)態(tài)預(yù)估下個(gè)批次最優(yōu)速率。在Spark 1.5.0以上,就可通過背壓機(jī)制來實(shí)現(xiàn)。開啟反壓機(jī)制,即設(shè)置spark.streaming.backpressure.enabled為true,Spark Streaming會(huì)自動(dòng)根據(jù)處理能力來調(diào)整輸入速率,從而在流量高峰時(shí)仍能保證最大的吞吐和性能。
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo
for {
// 處理結(jié)束時(shí)間
processingEnd <- batchCompleted.batchInfo.processingEndTime
// 處理時(shí)間,即`processingEndTime` - `processingStartTime`
workDelay <- batchCompleted.batchInfo.processingDelay
// 在調(diào)度隊(duì)列中的等待時(shí)間,即`processingStartTime` - `submissionTime`
waitDelay <- batchCompleted.batchInfo.schedulingDelay
// 當(dāng)前批次處理的記錄數(shù)
elems <- elements.get(streamUID).map(_.numRecords)
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}
可以看到,接著又調(diào)用的是computeAndPublish
方法,如下:
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
// 根據(jù)處理時(shí)間、調(diào)度時(shí)間、當(dāng)前Batch記錄數(shù),預(yù)估新速率
val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
newRate.foreach { s =>
// 設(shè)置新速率
rateLimit.set(s.toLong)
// 發(fā)布新速率
publish(getLatestRate())
}
}
更深一層,具體調(diào)用的是rateEstimator.compute
方法來預(yù)估新速率,如下:
def compute(
time: Long,
elements: Long,
processingDelay: Long,
schedulingDelay: Long): Option[Double]
spark.streaming.backpressure.enabled
默認(rèn)值false,是否啟用反壓機(jī)制。
spark.streaming.backpressure.initialRate
默認(rèn)值無,初始最大接收速率。只適用于Receiver Stream,不適用于Direct Stream。類型為整數(shù),默認(rèn)直接讀取所有,在1開啟的情況下,限制第一次批處理應(yīng)該消費(fèi)的數(shù)據(jù),因?yàn)槌绦蚶鋯?dòng)隊(duì)列里面有大量積壓,防止第一次全部讀取,造成系統(tǒng)阻塞
spark.streaming.kafka.maxRatePerPartition
類型為整數(shù),默認(rèn)直接讀取所有,限制每秒每個(gè)消費(fèi)線程讀取每個(gè)kafka分區(qū)最大的數(shù)據(jù)量
注意: 只有 3 激活的時(shí)候,每次消費(fèi)的最大數(shù)據(jù)量,就是設(shè)置的數(shù)據(jù)量,如果不足這個(gè)數(shù),就有多少讀多少,如果超過這個(gè)數(shù)字,就讀取這個(gè)數(shù)字的設(shè)置的值
只有 1+3 激活的時(shí)候,每次消費(fèi)讀取的數(shù)量最大會(huì)等于3設(shè)置的值,最小是spark根據(jù)系統(tǒng)負(fù)載自動(dòng)推斷的值,消費(fèi)的數(shù)據(jù)量會(huì)在這兩個(gè)范圍之內(nèi)變化根據(jù)系統(tǒng)情況,但第一次啟動(dòng)會(huì)有多少讀多少數(shù)據(jù)。此后按 1+3 設(shè)置規(guī)則運(yùn)行
1+2+3 同時(shí)激活的時(shí)候,跟上一個(gè)消費(fèi)情況基本一樣,但第一次消費(fèi)會(huì)得到限制,因?yàn)槲覀冊(cè)O(shè)置第一次消費(fèi)的頻率了。
spark.streaming.backpressure.rateEstimator
默認(rèn)值pid,速率控制器,Spark 默認(rèn)只支持此控制器,可自定義。
spark.streaming.backpressure.pid.proportional
默認(rèn)值1.0,只能為非負(fù)值。當(dāng)前速率與最后一批速率之間的差值對(duì)總控制信號(hào)貢獻(xiàn)的權(quán)重。用默認(rèn)值即可。
spark.streaming.backpressure.pid.integral
默認(rèn)值0.2,只能為非負(fù)值。比例誤差累積對(duì)總控制信號(hào)貢獻(xiàn)的權(quán)重。用默認(rèn)值即可。
spark.streaming.backpressure.pid.derived
默認(rèn)值0.0,只能為非負(fù)值。比例誤差變化對(duì)總控制信號(hào)貢獻(xiàn)的權(quán)重。用默認(rèn)值即可。
spark.streaming.backpressure.pid.minRate
默認(rèn)值100,只能為正數(shù),最小速率。
//啟用反壓機(jī)制
conf.set("spark.streaming.backpressure.enabled","true")
//最小攝入條數(shù)控制
conf.set("spark.streaming.backpressure.pid.minRate","1")
//最大攝入條數(shù)控制
conf.set("spark.streaming.kafka.maxRatePerPartition","12")
//初始最大接收速率控制
conf.set("spark.streaming.backpressure.initialRate","10")
要保證反壓機(jī)制真正起作用前Spark 應(yīng)用程序不會(huì)崩潰,需要控制每個(gè)批次最大攝入速率。以Direct Stream為例,如Kafka Direct Stream,則可以通過spark.streaming.kafka.maxRatePerPartition參數(shù)來控制。此參數(shù)代表了 每秒每個(gè)分區(qū)最大攝入的數(shù)據(jù)條數(shù)。假設(shè)BatchDuration為10秒,spark.streaming.kafka.maxRatePerPartition為12條,kafka topic 分區(qū)數(shù)為3個(gè),則一個(gè)批(Batch)最大讀取的數(shù)據(jù)條數(shù)為360條(31210=360)。同時(shí),需要注意,該參數(shù)也代表了整個(gè)應(yīng)用生命周期中的最大速率,即使是背壓調(diào)整的最大值也不會(huì)超過該參數(shù)。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。