溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何分析Spark Streaming的好處與坑

發(fā)布時(shí)間:2021-12-17 10:50:41 來(lái)源:億速云 閱讀:319 作者:柒染 欄目:大數(shù)據(jù)

如何分析Spark Streaming的好處與坑,相信很多沒(méi)有經(jīng)驗(yàn)的人對(duì)此束手無(wú)策,為此本文總結(jié)了問(wèn)題出現(xiàn)的原因和解決方法,通過(guò)這篇文章希望你能解決這個(gè)問(wèn)題。

前言

說(shuō)人話:其實(shí)就是講Spark Streaming 的好處與坑。好處主要從一些大的方面講,坑則是從實(shí)際場(chǎng)景中遇到的一些小細(xì)節(jié)描述。

玫瑰篇

玫瑰篇主要是說(shuō)Spark Streaming的優(yōu)勢(shì)點(diǎn)。

玫瑰之代碼復(fù)用

這主要得益于Spark的設(shè)計(jì),以及平臺(tái)的全面性。你寫的流處理的代碼可以很方便的適用于Spark平臺(tái)上的批處理,交互式處理。因?yàn)樗麄儽旧矶际腔赗DD模型的,并且Spark  Streaming的設(shè)計(jì)者也做了比較好的封裝和兼容。所以我說(shuō)RDD是個(gè)很強(qiáng)大的框,能把各種場(chǎng)景都給框住,這就是高度抽象和思考后的結(jié)果。

玫瑰之機(jī)器學(xué)習(xí)

如果你使用Spark MLlib 做模型訓(xùn)練。恭喜你,首先是很多算法已經(jīng)支持Spark Streaming,譬如k-means 就支持流式數(shù)據(jù)更新模型。  其次,你也可以在Spark Streaming中直接將離線計(jì)算好的模型load進(jìn)來(lái),然后對(duì)新進(jìn)來(lái)的數(shù)據(jù)做實(shí)時(shí)的Predict操作。

玫瑰之SQL支持

Spark Streaming 里天然就可以使用 sql/dataframe/datasets  等。而且時(shí)間窗口的使用可以極大擴(kuò)展這種使用場(chǎng)景,譬如各種系統(tǒng)預(yù)警等。類似Storm則需要額外的開(kāi)發(fā)與支持。

玫瑰之吞吐和實(shí)時(shí)的有效控制

Spark Streaming 可以很好的控制實(shí)時(shí)的程度(小時(shí),分鐘,秒)。極端情況可以設(shè)置到毫秒。

玫瑰之概述

Spark Streaming 可以很好的和Spark其他組件進(jìn)行交互,獲取其支持。同時(shí)Spark 生態(tài)圈的快速發(fā)展,亦能從中受益。

刺篇

刺篇就是描述Spark Streaming 的一些問(wèn)題,做選型前關(guān)注這些問(wèn)題可以有效的降低使用風(fēng)險(xiǎn)。

checkpoint 之刺

checkpoint  是個(gè)很好的恢復(fù)機(jī)制。但是方案比較粗暴,直接通過(guò)序列化的機(jī)制寫入到文件系統(tǒng),導(dǎo)致代碼變更和配置變更無(wú)法生效。實(shí)際場(chǎng)景是升級(jí)往往比系統(tǒng)崩潰的頻率高太多。但是升級(jí)需要能夠無(wú)縫的銜接上一次的偏移量。所以spark  streaming在無(wú)法容忍數(shù)據(jù)有丟失的情況下,你需要自己記錄偏移量,然后從上一次進(jìn)行恢復(fù)。

我們目前是重寫了相關(guān)的代碼,每次記錄偏移量,不過(guò)只有在升級(jí)的時(shí)候才會(huì)讀取自己記錄的偏移量,其他情況都是依然采用checkpoint機(jī)制。

Kafka 之刺

這個(gè)和Spark Streaming相關(guān),也不太相關(guān)。說(shuō)相關(guān)是因?yàn)镾park 對(duì)很多異常處理比較簡(jiǎn)單。很多是和Kafka配置相關(guān)的。我舉個(gè)例子:

如果消息體太大了,超過(guò) fetch.message.max.bytes=1m ,那么Spark  Streaming會(huì)直接拋出OffsetOutOfRangeException異常,然后停止服務(wù)。

對(duì)應(yīng)的錯(cuò)誤會(huì)從這行代碼拋出:

if (!iter.hasNext) { assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part)) finished = true  null.asInstanceOf[R]  }

其實(shí)就是消費(fèi)的完成后 實(shí)際的消費(fèi)數(shù)據(jù)量和預(yù)先估計(jì)的量不一致。

你在日志中看到的信息其實(shí)是這個(gè)代碼答應(yīng)出來(lái)的:

private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =

s"Ran out of messages before reaching ending offset ${part.untilOffset} "  +

s"for topic ${part.topic} partition ${part.partition} start  ${part.fromOffset}." +

" This should not happen, and indicates that messages may have been lost"

解決辦法自然是把 fetch.message.max.bytes 設(shè)置大些。

如果你使用Spark  Streaming去追數(shù)據(jù),從頭開(kāi)始消費(fèi)kafka,而Kafka因?yàn)槟撤N原因,老數(shù)據(jù)快速的被清理掉,也會(huì)引發(fā)OffsetOutOfRangeException錯(cuò)誤。并且使得Spark  Streaming程序異常的終止。

解決辦法是事先記錄kafka偏移量和時(shí)間的關(guān)系(可以隔幾秒記錄一次),然后根據(jù)時(shí)間找到一個(gè)較大的偏移量開(kāi)始消費(fèi)。

或者你根據(jù)目前Kafka新增數(shù)據(jù)的消費(fèi)速度,給smallest獲取到的偏移量再加一個(gè)較大的值,避免出現(xiàn)Spark Streaming  在fetch的時(shí)候數(shù)據(jù)不存在的情況。

Kafka partition 映射 RDD partition 之刺

Kafka的分區(qū)數(shù)決定了你的并行度(我們假設(shè)你使用Direct  Approach的模式集成)。為了獲得更大的并行度,則需要進(jìn)行一次repartition,而repartition  就意味著需要發(fā)生Shuffle,在流式計(jì)算里,可能會(huì)消耗掉我們寶貴的時(shí)間。

為了能夠避免Shuffle,并且提高Spark Streaming處理的并行度,我們重寫了  DirectKafkaInputDStream,KafkaRDD,KafkaUtils等類,實(shí)現(xiàn)了一個(gè)Kafka partition 可以映射為多個(gè)RDD  partition的功能。譬如你有M個(gè)Kafka partitions,則可映射成 M*N個(gè) RDD partitions。 其中N 為>1  的正整數(shù)。

我們期望官方能夠?qū)崿F(xiàn)將一個(gè)Kafka的partitions 映射為多個(gè)Spark  的partitions,避免發(fā)生Shuffle而導(dǎo)致多次的數(shù)據(jù)移動(dòng)。

textFileStream

其實(shí)使用textFileStream  的人應(yīng)該也不少。因?yàn)榭梢院芊奖愕谋O(jiān)控HDFS上某個(gè)文件夾下的文件,并且進(jìn)行計(jì)算。這里我們遇到的一個(gè)問(wèn)題是,如果底層比如是壓縮文件,遇到有順壞的文件,你是跳不過(guò)去的,直接會(huì)讓Spark  Streaming 異常退出。 官方并沒(méi)有提供合適的方式讓你跳過(guò)損壞的文件。

以NewHadoopRDD為例,里面有這么幾行代碼,獲取一條新的數(shù)據(jù):

override def getNext(): (K, V) = { try { finished = !reader.next(key, value) } catch { case eof: EOFException => finished = true } if (!finished) { inputMetrics.incRecordsRead(1) } (key, value) }

通過(guò)reader 獲取下一條記錄的時(shí)候,譬如是一個(gè)損壞的gzip文件,可能就會(huì)拋出異常,而這個(gè)異常是用戶catch不到的,直接讓Spark  Streaming程序掛掉了。

而在 HadoopRDD類中,對(duì)應(yīng)的實(shí)現(xiàn)如下:

override def getNext(): (K, V) = { try { finished = !reader.next(key, value) } catch { case eof: EOFException => finished = true } if (!finished) { inputMetrics.incRecordsRead(1) } (key, value) }

這里好歹做了個(gè)EOFException。然而,如果是一個(gè)壓縮文件,解壓的時(shí)候就直接產(chǎn)生錯(cuò)誤了,一般而言是  IOException,而不是EOFException了,這個(gè)時(shí)候也就歇菜了。

個(gè)人認(rèn)為應(yīng)該添加一些配置,允許用戶可以選擇如何對(duì)待這種有損壞或者無(wú)法解壓的文件。

因?yàn)楝F(xiàn)階段我們并沒(méi)有維護(hù)一個(gè)Spark的私有版本,所以是通過(guò)重寫FileInputDStream,NewHadoopRDD 等相關(guān)類來(lái)修正該問(wèn)題。

Shuffle 之刺

Shuffle (尤其是每個(gè)周期數(shù)據(jù)量很大的情況)是Spark Streaming 不可避免的疼痛,尤其是數(shù)據(jù)量極大的情況,因?yàn)镾park  Streaming對(duì)處理的時(shí)間是有限制的。我們有一個(gè)場(chǎng)景,是五分鐘一個(gè)周期,我們僅僅是做了一個(gè)repartion,耗時(shí)就達(dá)到2.1分鐘(包括到  Kafka取數(shù)據(jù))?,F(xiàn)階段Spark 的Shuffle實(shí)現(xiàn)都需要落磁盤,并且Shuffle Write 和 Shuffle Read  階段是完全分開(kāi),后者必須等到前者都完成才能開(kāi)始工作。我認(rèn)為Spark Streaming有必要單獨(dú)開(kāi)發(fā)一個(gè)更快速,完全基于內(nèi)存的Shuffle方案。

內(nèi)存之刺

在Spark Streaming中,你也會(huì)遇到在Spark中常見(jiàn)的問(wèn)題,典型如Executor Lost 相關(guān)的問(wèn)題(shuffle fetch  失敗,Task失敗重試等)。這就意味著發(fā)生了內(nèi)存不足或者數(shù)據(jù)傾斜的問(wèn)題。這個(gè)目前你需要考慮如下幾個(gè)點(diǎn)以期獲得解決方案:

相同資源下,增加partition數(shù)可以減少內(nèi)存問(wèn)題。 原因如下:通過(guò)增加partition數(shù),每個(gè)task要處理的數(shù)據(jù)少了,同一時(shí)間內(nèi),所有正在  運(yùn)行的task要處理的數(shù)量少了很多,所有Executor占用的內(nèi)存也變小了。這可以緩解數(shù)據(jù)傾斜以及內(nèi)存不足的壓力。

關(guān)注shuffle read 階段的并行數(shù)。例如reduce,group  之類的函數(shù),其實(shí)他們都有第二個(gè)參數(shù),并行度(partition數(shù)),只是大家一般都不設(shè)置。不過(guò)出了問(wèn)題再設(shè)置一下,也不錯(cuò)。

給一個(gè)Executor 核數(shù)設(shè)置的太多,也就意味著同一時(shí)刻,在該Executor  的內(nèi)存壓力會(huì)更大,GC也會(huì)更頻繁。我一般會(huì)控制在3個(gè)左右。然后通過(guò)提高Executor數(shù)量來(lái)保持資源的總量不變。

監(jiān)控之刺

Spark Streaming 的UI 上的Executors Tab缺少一個(gè)監(jiān)控,就是Worker內(nèi)存GC詳情。雖然我們可以將這些信息導(dǎo)入到  第三方監(jiān)控中,然而終究是不如在 Spark UI上展現(xiàn)更加方便。 為此我們也將該功能列入研發(fā)計(jì)劃。

看完上述內(nèi)容,你們掌握如何分析Spark Streaming的好處與坑的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI