溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Spark廣播變量分析以及如何動態(tài)更新廣播變量

發(fā)布時間:2021-12-17 09:47:17 來源:億速云 閱讀:616 作者:柒染 欄目:大數(shù)據(jù)

本篇文章給大家分享的是有關(guān)Spark廣播變量分析以及如何動態(tài)更新廣播變量,小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

今天主要介紹一下基于Spark2.4版本的廣播變量。先前的版本比如Spark2.1之前的廣播變量有兩種實現(xiàn):HttpBroadcast和TorrentBroadcast,但是鑒于HttpBroadcast有各種弊端,目前已經(jīng)舍棄這種實現(xiàn),小編主要闡述TorrentBroadcast
廣播變量概述
廣播變量是一個只讀變量,通過它我們可以將一些共享數(shù)據(jù)集或者大變量緩存在Spark集群中的各個機器上而不用每個task都需要copy一個副本,后續(xù)計算可以重復(fù)使用,減少了數(shù)據(jù)傳輸時網(wǎng)絡(luò)帶寬的使用,提高效率。相比于Hadoop的分布式緩存,廣播的內(nèi)容可以跨作業(yè)共享。
廣播變量要求廣播的數(shù)據(jù)不可變、不能太大但也不能太小(一般幾十M以上)、可被序列化和反序列化、并且必須在driver端聲明廣播變量,適用于廣播多個stage公用的數(shù)據(jù),存儲級別目前是MEMORY_AND_DISK。

廣播變量存儲目前基于Spark實現(xiàn)的BlockManager分布式存儲系統(tǒng),Spark中的shuffle數(shù)據(jù)、加載HDFS數(shù)據(jù)時切分過來的block塊都存儲在BlockManager中,不是今天的討論點,這里先不做詳述了。

廣播變量的創(chuàng)建方式和獲取

//創(chuàng)建廣播變量
val broadcastVar = sparkSession.sparkContext.broadcast(Array(1, 2, 3))

//獲取廣播變量
broadcastVar.value
 廣播變量實例化過程

1.首先調(diào)用val broadcastVar = sparkSession.sparkContext.broadcast(Array(1, 2, 3))

2.調(diào)用BroadcastManager的newBroadcast方法
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)

3.通過廣播工廠的newBroadcast方法進行創(chuàng)建

broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())

在調(diào)用BroadcastManager的newBroadcast方法時已完成對廣播工廠的初始化(initialize方法),我們只需看BroadcastFactory的實現(xiàn)TorrentBroadcastFactory中對TorrentBroadcast的實例化過程:

new TorrentBroadcast[T](value_, id)
4.在構(gòu)建TorrentBroadcast時,將廣播的數(shù)據(jù)寫入BlockManager
1)首先會將廣播變量序列化后的對象劃分為多個block塊,存儲在driver端的BlockManager,這樣運行在driver端的task就不用創(chuàng)建廣播變量的副本了(具體可以查看TorrentBroadcast的writeBlocks方法) 
2)每個executor在獲取廣播變量時首先從本地的BlockManager獲取。獲取不到就會從driver或者其他的executor上獲取,獲取之后,會將獲取到的數(shù)據(jù)保存在自己的BlockManager中
3)塊的大小默認4M
conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024

廣播變量初始化過程

1.首先調(diào)用broadcastVar.value
2.TorrentBroadcast中l(wèi)azy變量_value進行初始化,調(diào)用readBroadcastBlock() 
3.先從緩存中讀取,對結(jié)果進行模式匹配,匹配成功的直接返回
4.讀取不到通過readBlocks()進行讀取  

從driver端或者其他的executor中讀取,將讀取的對象存儲到本地,并存于緩存中

new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)

Spark兩種廣播變量對比

正如【前言】中所說,HttpBroadcast在Spark后續(xù)的版本中已經(jīng)被廢棄,但考慮到部分公司用的Spark版本較低,面試中仍有可能問到兩種實現(xiàn)的相關(guān)問題,這里簡單介紹一下:
HttpBroadcast會在driver端的BlockManager里面存儲廣播變量對象,并且將該廣播變量序列化寫入文件中去。  所有獲取廣播數(shù)據(jù)請求都在driver端,所以存在單點故障和網(wǎng)絡(luò)IO性能問題。
TorrentBroadcast會在driver端的BlockManager里面存儲廣播變量對象,并將廣播對象分割成若干序列化block塊(默認4M),存儲于BlockManager。小的block存儲位置信息,存儲于Driver端的BlockManagerMaster。數(shù)據(jù)請求并非集中于driver端,避免了單點故障和driver端網(wǎng)絡(luò)磁盤IO過高。

TorrentBroadcast在executor端存儲一個對象的同時會將獲取的block存儲于BlockManager,并向driver端的BlockManager匯報block的存儲信息。

請求數(shù)據(jù)的時候會先獲取block的所有存儲位置信息,并且是隨機的在所有存儲了該executor的BlockManager去獲取,避免了數(shù)據(jù)請求服務(wù)集中于一點。

總之就是HttpBroadcast導(dǎo)致獲取廣播變量的請求集中于driver端,容易引起driver端單點故障,網(wǎng)絡(luò)IO過高影響性能等問題,而TorrentBroadcast獲取廣播變量的請求服務(wù)即可以請求到driver端也可以在executor,避免了上述問題,當(dāng)然這只是主要的優(yōu)化點。

動態(tài)更新廣播變量
通過上面的介紹,大家都知道廣播變量是只讀的,那么在Spark流式處理中如何進行動態(tài)更新廣播變量?

既然無法更新,那么只能動態(tài)生成,應(yīng)用場景有實時風(fēng)控中根據(jù)業(yè)務(wù)情況調(diào)整規(guī)則庫、實時日志ETL服務(wù)中獲取最新的日志格式以及字段變更等。

@volatile private var instance: Broadcast[Array[Int]] = null

//獲取廣播變量單例對象
def getInstance(sc: SparkContext, ctime: Long): Broadcast[Array[Int]] = {
 if (instance == null) {
   synchronized {
     if (instance == null) {
       instance = sc.broadcast(fetchLastestData())
     }
   }
 }
 instance
}

//加載要廣播的數(shù)據(jù),并更新廣播變量
def updateBroadCastVar(sc: SparkContext, blocking: Boolean = false): Unit = {
 if (instance != null) {
   //刪除緩存在executors上的廣播副本,并可選擇是否在刪除完成后進行block等待
   //底層可選擇是否將driver端的廣播副本也刪除
   instance.unpersist(blocking)
   
   instance = sc.broadcast(fetchLastestData())
 }
}

def fetchLastestData() = {
 //動態(tài)獲取需要更新的數(shù)據(jù)
 //這里是偽代碼
 Array(1, 2, 3)
}
val dataFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

...
...

stream.foreachRDD { rdd =>
 val current_time = dataFormat.format(new Date())
 val new_time = current_time.substring(14, 16).toLong
 //每10分鐘更新一次
 if (new_time % 10 == 0) {
   updateBroadCastVar(rdd.sparkContext, true)
 }

 rdd.foreachPartition { records =>
   instance.value
   ...
 }
}
注意:上述是給出了一個實現(xiàn)思路的偽代碼,實際生產(chǎn)中還需要進行一定的優(yōu)化。
此外,這種方式有一定的弊端,就是廣播的數(shù)據(jù)因為是周期性更新,所以存在一定的滯后性。廣播的周期不能太短,要考慮外部存儲要廣播數(shù)據(jù)的存儲系統(tǒng)的壓力。具體的還要看具體的業(yè)務(wù)場景,如果對實時性要求不是特別高的話,可以采取這種,當(dāng)然也可以參考Flink是如何實現(xiàn)動態(tài)廣播的。  

Spark流式程序中為何使用單例模式

1.廣播變量是只讀的,使用單例模式可以減少Spark流式程序中每次job生成執(zhí)行,頻繁創(chuàng)建廣播變量帶來的開銷

2.廣播變量單例模式也需要做同步處理。在FIFO調(diào)度模式下,基本不會發(fā)生并發(fā)問題。但是如果你改變了調(diào)度模式,如采用公平調(diào)度模式,同時設(shè)置Spark流式程序并行執(zhí)行的job數(shù)大于1,如設(shè)置參數(shù)spark.streaming.concurrentJobs=4,則必須加上同步代碼

3.在多個輸出流共享廣播變量的情況下,同時配置了公平調(diào)度模式,也會產(chǎn)生并發(fā)問題。建議在foreachRDD或者transform中使用局部變量進行廣播,避免在公平調(diào)度模式下不同job之間產(chǎn)生影響。

除了廣播變量,累加器也是一樣。在Spark流式組件如Spark Streaming底層,每個輸出流都會產(chǎn)生一個job,形成一個job集合提交到線程池里并發(fā)執(zhí)行。

以上就是Spark廣播變量分析以及如何動態(tài)更新廣播變量,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI