溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Flink使用大狀態(tài)時(shí)的優(yōu)化是什么

發(fā)布時(shí)間:2022-01-04 15:17:06 來源:億速云 閱讀:235 作者:柒染 欄目:大數(shù)據(jù)

本篇文章為大家展示了Flink使用大狀態(tài)時(shí)的優(yōu)化是什么,內(nèi)容簡明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。

一、為什么要優(yōu)化?(優(yōu)化背景)

Flink 支持多種 StateBackend,當(dāng)狀態(tài)比較大時(shí)目前只有 RocksDBStateBackend 可供選擇。

RocksDB 是基于 LSM 樹原理實(shí)現(xiàn)的 KV 數(shù)據(jù)庫,LSM 樹讀放大問題比較嚴(yán)重,因此對(duì)磁盤性能要求比較高,強(qiáng)烈建議生產(chǎn)環(huán)境使用 SSD 作為 RocksDB 的存儲(chǔ)介質(zhì)。但是有些集群可能并沒有配置 SSD,僅僅是普通的機(jī)械硬盤,當(dāng) Flink 任務(wù)比較大,且對(duì)狀態(tài)訪問比較頻繁時(shí),機(jī)械硬盤的磁盤 IO 可能成為性能瓶頸。在這種情況下,該如何解決此瓶頸呢?

使用多塊硬盤來分擔(dān)壓力

RocksDB 使用內(nèi)存加磁盤的方式存儲(chǔ)數(shù)據(jù),當(dāng)狀態(tài)比較大時(shí),磁盤占用空間會(huì)比較大。如果對(duì) RocksDB 有頻繁的讀取請求,那么磁盤 IO 會(huì)成為 Flink 任務(wù)瓶頸。

強(qiáng)烈建議在 flink-conf.yaml 中配置 state.backend.rocksdb.localdir 參數(shù)來指定 RocksDB 在磁盤中的存儲(chǔ)目錄。當(dāng)一個(gè) TaskManager 包含 3 個(gè) slot 時(shí),那么單個(gè)服務(wù)器上的三個(gè)并行度都對(duì)磁盤造成頻繁讀寫,從而導(dǎo)致三個(gè)并行度的之間相互爭搶同一個(gè)磁盤 io,這樣必定導(dǎo)致三個(gè)并行度的吞吐量都會(huì)下降。

慶幸的是 Flink 的 state.backend.rocksdb.localdir 參數(shù)可以指定多個(gè)目錄,一般大數(shù)據(jù)服務(wù)器都會(huì)掛載很多塊硬盤,我們期望同一個(gè) TaskManager 的三個(gè) slot 使用不同的硬盤從而減少資源競爭。具體參數(shù)配置如下所示:


state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb,/data4/flink/rocksdb,/data5/flink/rocksdb,/data6/flink/rocksdb,/data7/flink/rocksdb,/data8/flink/rocksdb,/data9/flink/rocksdb,/data10/flink/rocksdb,/data11/flink/rocksdb,/data12/flink/rocksdb


注意:務(wù)必將目錄配置到多塊不同的磁盤上,不要配置單塊磁盤的多個(gè)目錄,這里配置多個(gè)目錄是為了讓多塊磁盤來分擔(dān)壓力。

如下圖所示是筆者測試過程中磁盤的 IO 使用率,可以看出三個(gè)大狀態(tài)算子的并行度分別對(duì)應(yīng)了三塊磁盤,這三塊磁盤的 IO 平均使用率都保持在 45% 左右,IO 最高使用率幾乎都是 100%,而其他磁盤的 IO 平均使用率為 10% 左右,相對(duì)低很多。由此可見使用 RocksDB 做為狀態(tài)后端且有大狀態(tài)的頻繁讀寫操作時(shí),對(duì)磁盤 IO 性能消耗確實(shí)比較大。

Flink使用大狀態(tài)時(shí)的優(yōu)化是什么

上述屬于理想情況,當(dāng)設(shè)置多個(gè) RocksDB 本地磁盤目錄時(shí),F(xiàn)link 會(huì)隨機(jī)選擇要使用的目錄,所以就可能存在三個(gè)并行度共用同一目錄的情況。

如下圖所示,其中兩個(gè)并行度共用了 sdb 磁盤,一個(gè)并行度使用 sdj 磁盤。可以看到 sdb 磁盤的 IO 平均使用率已經(jīng)達(dá)到了 91.6%,此時(shí) sdb 的磁盤 IO 肯定會(huì)成為整個(gè) Flink 任務(wù)的瓶頸,會(huì)導(dǎo)致 sdb 磁盤對(duì)應(yīng)的兩個(gè)并行度吞吐量大大降低,從而使得整個(gè) Flink 任務(wù)吞吐量降低。

Flink使用大狀態(tài)時(shí)的優(yōu)化是什么

如果服務(wù)器掛載的硬盤數(shù)量較多,一般不會(huì)出現(xiàn)該情況,但是如果任務(wù)重啟后吞吐量較低,可以檢查是否發(fā)生了多個(gè)并行度共用同一塊磁盤的情況。

Flink 可能會(huì)出現(xiàn)多個(gè)并行度共用同一塊磁盤的問題,那該如何解決呢?

二、常用的負(fù)載均衡策略

從現(xiàn)象來看,為 RocksDB 分配了 12 塊磁盤,僅僅有 3 個(gè)并行度需要使用 3 塊磁盤,但是有一定幾率 2 個(gè)并行度共用同一塊磁盤,甚至可能會(huì)有很小的幾率 3 個(gè)并行度共用同一塊磁盤。這樣我們的 Flink 任務(wù)很容易因?yàn)榇疟P IO 成為瓶頸。

上述分配磁盤的策略,實(shí)際上就是業(yè)界的負(fù)載均衡策略。通用的負(fù)載均衡策略有 hash、隨機(jī)以及輪循等策略。

Hash 策略

任務(wù)本身經(jīng)過某種 hash 策略后,將壓力分擔(dān)到多個(gè) Worker 上。對(duì)應(yīng)到上述場景,就是將多個(gè) slot 使用的 RocksDB 目錄壓力分擔(dān)到多塊磁盤上。但是 hash 可能會(huì)有沖突的情況,hash 沖突表示多個(gè)不同的 Flink 并行度,經(jīng)過 hash 后得到的 hashCode 一樣,或者 hashCode 對(duì)硬盤數(shù)量求余后被分配到同一塊硬盤。

Random 策略

隨機(jī)策略是每來一個(gè) Flink 任務(wù),生成一個(gè)隨機(jī)數(shù),將壓力隨機(jī)分配到某個(gè) Worker 上,也就是將壓力隨機(jī)分配到某塊磁盤。但是隨機(jī)數(shù)也會(huì)存在沖突的情況。

Round Robin 策略

輪循策略比較容易理解,多個(gè) Worker 輪流接收數(shù)據(jù)即可,F(xiàn)link 任務(wù)第一次申請 RocksDB 目錄時(shí)使用目錄1,第二次申請目錄時(shí)使用目錄2,依次申請即可。該策略是分配任務(wù)數(shù)最均勻的策略,如果使用該策略會(huì)保證所有硬盤分配到的任務(wù)數(shù)相差最大為 1。

最低負(fù)載策略 / Least Response Time(最短響應(yīng)時(shí)間 )策略

根據(jù) Worker 的響應(yīng)時(shí)間來分配任務(wù),響應(yīng)時(shí)間短說明負(fù)載能力強(qiáng),應(yīng)該多分配一些任務(wù)。對(duì)應(yīng)到上述場景就是檢測各個(gè)磁盤的 IO 使用率,使用率低表示磁盤 IO 比較空閑,應(yīng)該多分配任務(wù)。

指定權(quán)重策略

為每個(gè) Worker 分配不同的權(quán)重值,權(quán)重值高的任務(wù)分配更多的任務(wù),一般分配的任務(wù)數(shù)與權(quán)重值成正比。

例如 Worker0 權(quán)重值為 2,Worker1 權(quán)重為 1,則分配任務(wù)時(shí) Worker0 分配的任務(wù)數(shù)盡量分配成 Worker1 任務(wù)數(shù)的兩倍。該策略可能并不適合當(dāng)前業(yè)務(wù)場景,一般相同服務(wù)器上每個(gè)硬盤的負(fù)載能力相差不會(huì)很大,除非 RocksDB 的 local dir 既包含 SSD 也包含 HDD。

三、源碼中如何分配磁盤?

筆者線上使用 Flink 1.8.1 版本,出現(xiàn)了有些硬盤分配了多個(gè)并行度,有些硬盤一個(gè)并行度都沒有分配??梢源竽懙牟聹y一下,源碼中使用 hash 或者 random 的概率比較高,因?yàn)榇蠖鄶?shù)情況下,每個(gè)硬盤只分到一個(gè)任務(wù),小幾率分配多個(gè)任務(wù)(要解決的就是這個(gè)小幾率分配多個(gè)任務(wù)的問題)。

如果使用輪循策略,肯定會(huì)保證每個(gè)硬盤都分配一個(gè)并行度以后,才會(huì)出現(xiàn)單硬盤分配兩個(gè)任務(wù)的情況。而且輪循策略可以保證分配的硬盤是連續(xù)的。

直接看 RocksDBStateBackend 類的部分源碼:

  
    
  
  
  
/** Base paths for RocksDB directory, as initialized.這里就是我們上述設(shè)置的 12 個(gè) rocksdb local dir */private transient File[] initializedDbBasePaths;
/** The index of the next directory to be used from {@link #initializedDbBasePaths}.下一次要使用 dir 的 index,如果 nextDirectory = 2,則使用 initializedDbBasePaths 中下標(biāo)為 2 的那個(gè)目錄做為 rocksdb 的存儲(chǔ)目錄 */private transient int nextDirectory;
// lazyInitializeForJob 方法中, 通過這一行代碼決定下一次要使用 dir 的 index,// 根據(jù) initializedDbBasePaths.length 生成隨機(jī)數(shù),// 如果 initializedDbBasePaths.length = 12,生成隨機(jī)數(shù)的范圍為 0-11nextDirectory = new Random().nextInt(initializedDbBasePaths.length);
           


分析完簡單的源碼后,我們知道了源碼中使用了 random 的策略來分配 dir,跟我們所看到的現(xiàn)象能夠匹配。隨機(jī)分配有小概率會(huì)出現(xiàn)沖突。(寫這篇文章時(shí),F(xiàn)link 最新的 master 分支代碼仍然是上述策略,尚未做任何改動(dòng))

四、使用哪種策略更合理?

(各種策略帶來的挑戰(zhàn))

random 和 hash 策略在任務(wù)數(shù)量比較大時(shí),可以保證每個(gè) Worker 承擔(dān)的任務(wù)量基本一樣,但是如果任務(wù)量比較小,例如將 20 個(gè)任務(wù)通過隨機(jī)算法分配給 10 個(gè) Worker 時(shí),就會(huì)出現(xiàn)有的 Worker 分配不到任務(wù),有的 Worker 可能分配到 3 或 4 個(gè)任務(wù)。所以 random 和 hash 策略不能解決 rocksdb 分配磁盤不均的痛點(diǎn),那輪循策略和最低負(fù)載策略呢?

輪循策略

輪循策略可以解決上述問題,解決方式如下:


// 在 RocksDBStateBackend 類中定義了private static final AtomicInteger DIR_INDEX = new AtomicInteger(0);
// nextDirectory 的分配策略變成了如下代碼,每次將 DIR_INDEX + 1,然后對(duì) dir 的總數(shù)求余nextDirectory = DIR_INDEX.getAndIncrement() % initializedDbBasePaths.length;


通過上述即可實(shí)現(xiàn)輪循策略,申請磁盤時(shí),從 0 號(hào)磁盤開始申請,每次使用下一塊磁盤即可。

■ 帶來的問題:

Java 中靜態(tài)變量屬于 JVM 級(jí)別的,每個(gè) TaskManager 屬于單獨(dú)的 JVM,所以 TaskManager 內(nèi)部保證了輪循策略。如果同一臺(tái)服務(wù)器上運(yùn)行多個(gè) TaskManager,那么多個(gè) TaskManager 都會(huì)從 index 為 0 的磁盤開始使用,所以導(dǎo)致 index 較小的磁盤會(huì)被經(jīng)常使用,而 index 較大的磁盤可能經(jīng)常不會(huì)被使用到。

■ 解決方案 1:

DIR_INDEX 初始化時(shí),不要每次初始化為 0,可以生成一個(gè)隨機(jī)數(shù),這樣可以保證不會(huì)每次使用 index 較小的磁盤,實(shí)現(xiàn)代碼如下所示:

  
    
  
  
  
// 在 RocksDBStateBackend 類中定義了private static final AtomicInteger DIR_INDEX = new AtomicInteger(new Random().nextInt(100));
           


但是上述方案不能完全解決磁盤沖突的問題,同一臺(tái)機(jī)器上 12 塊磁盤,TaskManager0 使用 index 為 0、1、2 的三塊磁盤,TaskManager1 可能使用 index 為 1、2、3 的三塊磁盤。結(jié)果就是 TaskManager 內(nèi)部來看,實(shí)現(xiàn)了輪循策略保證負(fù)載均衡,但是全局來看,負(fù)載并不均衡。

■ 解決方案 2:

為了全局負(fù)載均衡,所以多個(gè) TaskManager 之間必須通信才能做到絕對(duì)的負(fù)載均衡,可以借助第三方的存儲(chǔ)進(jìn)行通信,例如在 Zookeeper 中,為每個(gè)服務(wù)器生成一個(gè) znode,znode 命名可以是 host 或者 ip。使用 Curator 的 DistributedAtomicInteger 來維護(hù) DIR_INDEX 變量,存儲(chǔ)在當(dāng)前服務(wù)器對(duì)應(yīng)的 znode 中,無論是哪個(gè) TaskManager 申請磁盤,都可以使用 DistributedAtomicInteger 將當(dāng)前服務(wù)器對(duì)應(yīng)的 DIR_INDEX + 1,從而就可以實(shí)現(xiàn)全局的輪循策略。

DistributedAtomicInteger 的 increment 的思路:先使用 Zookeeper 的 withVersion api 進(jìn)行 +1 操作(也就是 Zookeeper 提供的 CAS api),如果成功則成功;如果失敗,則使用分布式互斥鎖進(jìn)行 +1 操作。

基于上述描述,我們得到兩種策略來實(shí)現(xiàn)輪循,AtomicInteger 只能保證 TaskManager 內(nèi)部的輪循,不能保證全局輪循。如果要基于全局輪循,需要借助 Zookeeper 或其他組件來實(shí)現(xiàn)。如果對(duì)輪循策略要求比較苛刻,可以使用基于 Zookeeper 的輪循策略,如果不想依賴外部組件則只能使用 AtomicInteger 來實(shí)現(xiàn)。

最低負(fù)載策略

思想就是 TaskManager 啟動(dòng)時(shí),監(jiān)測所有 rocksdb local dir 對(duì)應(yīng)的磁盤最近 1 分鐘或 5 分鐘的 IO 平均使用率,篩掉 IO 使用率較高的磁盤,優(yōu)先選擇 IO 平均使用率較低的磁盤,同時(shí)在 IO 平均使用率較低的磁盤中,依然要使用輪循策略來實(shí)現(xiàn)。

■ 面臨的問題
  • Flink 任務(wù)啟動(dòng)時(shí),只能拿到磁盤當(dāng)前的 IO 使用率,是一個(gè)瞬時(shí)值,會(huì)不會(huì)不靠譜?

  • Flink 任務(wù)啟動(dòng),不可能等待任務(wù)先采集 1 分鐘 IO 使用率以后,再去啟動(dòng)。

    • 不想依賴外部監(jiān)控系統(tǒng)去拿這個(gè) IO 使用率,要考慮通用性。

  • 假設(shè)已經(jīng)拿到了所有硬盤最近 1 分鐘的 IO 使用率,該如何去決策呢?

    • 對(duì)于 IO 平均使用率較低的磁盤中,依然要使用輪循策略來實(shí)現(xiàn)。

    • IO 平均使用率較低,這里的較低不好評(píng)判,相差 10% 算低,還是 20%、30%。

    • 而且不同的新任務(wù)對(duì)于磁盤的使用率要求也是不一樣的,所以評(píng)判難度較大。

■ 新思路(discussing)

啟動(dòng)階段不采集硬盤的負(fù)載壓力,使用之前的 DistributedAtomicInteger 基本就可以保證每個(gè)硬盤負(fù)載均衡。但是任務(wù)啟動(dòng)后一段時(shí)間,如果因?yàn)?Flink 任務(wù)導(dǎo)致某個(gè)磁盤 IO 的平均使用率相對(duì)其他磁盤而言非常高。我們可以選擇遷移高負(fù)載硬盤的數(shù)據(jù)到低負(fù)載硬盤。

上述內(nèi)容就是Flink使用大狀態(tài)時(shí)的優(yōu)化是什么,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI