您好,登錄后才能下訂單哦!
本篇文章為大家展示了Flink使用大狀態(tài)時(shí)的優(yōu)化是什么,內(nèi)容簡明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。
一、為什么要優(yōu)化?(優(yōu)化背景)
Flink 支持多種 StateBackend,當(dāng)狀態(tài)比較大時(shí)目前只有 RocksDBStateBackend 可供選擇。
RocksDB 是基于 LSM 樹原理實(shí)現(xiàn)的 KV 數(shù)據(jù)庫,LSM 樹讀放大問題比較嚴(yán)重,因此對(duì)磁盤性能要求比較高,強(qiáng)烈建議生產(chǎn)環(huán)境使用 SSD 作為 RocksDB 的存儲(chǔ)介質(zhì)。但是有些集群可能并沒有配置 SSD,僅僅是普通的機(jī)械硬盤,當(dāng) Flink 任務(wù)比較大,且對(duì)狀態(tài)訪問比較頻繁時(shí),機(jī)械硬盤的磁盤 IO 可能成為性能瓶頸。在這種情況下,該如何解決此瓶頸呢?
RocksDB 使用內(nèi)存加磁盤的方式存儲(chǔ)數(shù)據(jù),當(dāng)狀態(tài)比較大時(shí),磁盤占用空間會(huì)比較大。如果對(duì) RocksDB 有頻繁的讀取請求,那么磁盤 IO 會(huì)成為 Flink 任務(wù)瓶頸。
強(qiáng)烈建議在 flink-conf.yaml 中配置 state.backend.rocksdb.localdir 參數(shù)來指定 RocksDB 在磁盤中的存儲(chǔ)目錄。當(dāng)一個(gè) TaskManager 包含 3 個(gè) slot 時(shí),那么單個(gè)服務(wù)器上的三個(gè)并行度都對(duì)磁盤造成頻繁讀寫,從而導(dǎo)致三個(gè)并行度的之間相互爭搶同一個(gè)磁盤 io,這樣必定導(dǎo)致三個(gè)并行度的吞吐量都會(huì)下降。
慶幸的是 Flink 的 state.backend.rocksdb.localdir 參數(shù)可以指定多個(gè)目錄,一般大數(shù)據(jù)服務(wù)器都會(huì)掛載很多塊硬盤,我們期望同一個(gè) TaskManager 的三個(gè) slot 使用不同的硬盤從而減少資源競爭。具體參數(shù)配置如下所示:
state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb,/data4/flink/rocksdb,/data5/flink/rocksdb,/data6/flink/rocksdb,/data7/flink/rocksdb,/data8/flink/rocksdb,/data9/flink/rocksdb,/data10/flink/rocksdb,/data11/flink/rocksdb,/data12/flink/rocksdb
注意:務(wù)必將目錄配置到多塊不同的磁盤上,不要配置單塊磁盤的多個(gè)目錄,這里配置多個(gè)目錄是為了讓多塊磁盤來分擔(dān)壓力。
如下圖所示是筆者測試過程中磁盤的 IO 使用率,可以看出三個(gè)大狀態(tài)算子的并行度分別對(duì)應(yīng)了三塊磁盤,這三塊磁盤的 IO 平均使用率都保持在 45% 左右,IO 最高使用率幾乎都是 100%,而其他磁盤的 IO 平均使用率為 10% 左右,相對(duì)低很多。由此可見使用 RocksDB 做為狀態(tài)后端且有大狀態(tài)的頻繁讀寫操作時(shí),對(duì)磁盤 IO 性能消耗確實(shí)比較大。
上述屬于理想情況,當(dāng)設(shè)置多個(gè) RocksDB 本地磁盤目錄時(shí),F(xiàn)link 會(huì)隨機(jī)選擇要使用的目錄,所以就可能存在三個(gè)并行度共用同一目錄的情況。
如下圖所示,其中兩個(gè)并行度共用了 sdb 磁盤,一個(gè)并行度使用 sdj 磁盤。可以看到 sdb 磁盤的 IO 平均使用率已經(jīng)達(dá)到了 91.6%,此時(shí) sdb 的磁盤 IO 肯定會(huì)成為整個(gè) Flink 任務(wù)的瓶頸,會(huì)導(dǎo)致 sdb 磁盤對(duì)應(yīng)的兩個(gè)并行度吞吐量大大降低,從而使得整個(gè) Flink 任務(wù)吞吐量降低。
如果服務(wù)器掛載的硬盤數(shù)量較多,一般不會(huì)出現(xiàn)該情況,但是如果任務(wù)重啟后吞吐量較低,可以檢查是否發(fā)生了多個(gè)并行度共用同一塊磁盤的情況。
Flink 可能會(huì)出現(xiàn)多個(gè)并行度共用同一塊磁盤的問題,那該如何解決呢?
二、常用的負(fù)載均衡策略
從現(xiàn)象來看,為 RocksDB 分配了 12 塊磁盤,僅僅有 3 個(gè)并行度需要使用 3 塊磁盤,但是有一定幾率 2 個(gè)并行度共用同一塊磁盤,甚至可能會(huì)有很小的幾率 3 個(gè)并行度共用同一塊磁盤。這樣我們的 Flink 任務(wù)很容易因?yàn)榇疟P IO 成為瓶頸。
上述分配磁盤的策略,實(shí)際上就是業(yè)界的負(fù)載均衡策略。通用的負(fù)載均衡策略有 hash、隨機(jī)以及輪循等策略。
任務(wù)本身經(jīng)過某種 hash 策略后,將壓力分擔(dān)到多個(gè) Worker 上。對(duì)應(yīng)到上述場景,就是將多個(gè) slot 使用的 RocksDB 目錄壓力分擔(dān)到多塊磁盤上。但是 hash 可能會(huì)有沖突的情況,hash 沖突表示多個(gè)不同的 Flink 并行度,經(jīng)過 hash 后得到的 hashCode 一樣,或者 hashCode 對(duì)硬盤數(shù)量求余后被分配到同一塊硬盤。
隨機(jī)策略是每來一個(gè) Flink 任務(wù),生成一個(gè)隨機(jī)數(shù),將壓力隨機(jī)分配到某個(gè) Worker 上,也就是將壓力隨機(jī)分配到某塊磁盤。但是隨機(jī)數(shù)也會(huì)存在沖突的情況。
輪循策略比較容易理解,多個(gè) Worker 輪流接收數(shù)據(jù)即可,F(xiàn)link 任務(wù)第一次申請 RocksDB 目錄時(shí)使用目錄1,第二次申請目錄時(shí)使用目錄2,依次申請即可。該策略是分配任務(wù)數(shù)最均勻的策略,如果使用該策略會(huì)保證所有硬盤分配到的任務(wù)數(shù)相差最大為 1。
根據(jù) Worker 的響應(yīng)時(shí)間來分配任務(wù),響應(yīng)時(shí)間短說明負(fù)載能力強(qiáng),應(yīng)該多分配一些任務(wù)。對(duì)應(yīng)到上述場景就是檢測各個(gè)磁盤的 IO 使用率,使用率低表示磁盤 IO 比較空閑,應(yīng)該多分配任務(wù)。
為每個(gè) Worker 分配不同的權(quán)重值,權(quán)重值高的任務(wù)分配更多的任務(wù),一般分配的任務(wù)數(shù)與權(quán)重值成正比。
例如 Worker0 權(quán)重值為 2,Worker1 權(quán)重為 1,則分配任務(wù)時(shí) Worker0 分配的任務(wù)數(shù)盡量分配成 Worker1 任務(wù)數(shù)的兩倍。該策略可能并不適合當(dāng)前業(yè)務(wù)場景,一般相同服務(wù)器上每個(gè)硬盤的負(fù)載能力相差不會(huì)很大,除非 RocksDB 的 local dir 既包含 SSD 也包含 HDD。
三、源碼中如何分配磁盤?
筆者線上使用 Flink 1.8.1 版本,出現(xiàn)了有些硬盤分配了多個(gè)并行度,有些硬盤一個(gè)并行度都沒有分配??梢源竽懙牟聹y一下,源碼中使用 hash 或者 random 的概率比較高,因?yàn)榇蠖鄶?shù)情況下,每個(gè)硬盤只分到一個(gè)任務(wù),小幾率分配多個(gè)任務(wù)(要解決的就是這個(gè)小幾率分配多個(gè)任務(wù)的問題)。
如果使用輪循策略,肯定會(huì)保證每個(gè)硬盤都分配一個(gè)并行度以后,才會(huì)出現(xiàn)單硬盤分配兩個(gè)任務(wù)的情況。而且輪循策略可以保證分配的硬盤是連續(xù)的。
直接看 RocksDBStateBackend 類的部分源碼:
/** Base paths for RocksDB directory, as initialized.
這里就是我們上述設(shè)置的 12 個(gè) rocksdb local dir */
private transient File[] initializedDbBasePaths;
/** The index of the next directory to be used from {@link #initializedDbBasePaths}.
下一次要使用 dir 的 index,如果 nextDirectory = 2,
則使用 initializedDbBasePaths 中下標(biāo)為 2 的那個(gè)目錄做為 rocksdb 的存儲(chǔ)目錄 */
private transient int nextDirectory;
// lazyInitializeForJob 方法中, 通過這一行代碼決定下一次要使用 dir 的 index,
// 根據(jù) initializedDbBasePaths.length 生成隨機(jī)數(shù),
// 如果 initializedDbBasePaths.length = 12,生成隨機(jī)數(shù)的范圍為 0-11
nextDirectory = new Random().nextInt(initializedDbBasePaths.length);
分析完簡單的源碼后,我們知道了源碼中使用了 random 的策略來分配 dir,跟我們所看到的現(xiàn)象能夠匹配。隨機(jī)分配有小概率會(huì)出現(xiàn)沖突。(寫這篇文章時(shí),F(xiàn)link 最新的 master 分支代碼仍然是上述策略,尚未做任何改動(dòng))
四、使用哪種策略更合理?
(各種策略帶來的挑戰(zhàn))
random 和 hash 策略在任務(wù)數(shù)量比較大時(shí),可以保證每個(gè) Worker 承擔(dān)的任務(wù)量基本一樣,但是如果任務(wù)量比較小,例如將 20 個(gè)任務(wù)通過隨機(jī)算法分配給 10 個(gè) Worker 時(shí),就會(huì)出現(xiàn)有的 Worker 分配不到任務(wù),有的 Worker 可能分配到 3 或 4 個(gè)任務(wù)。所以 random 和 hash 策略不能解決 rocksdb 分配磁盤不均的痛點(diǎn),那輪循策略和最低負(fù)載策略呢?
輪循策略可以解決上述問題,解決方式如下:
// 在 RocksDBStateBackend 類中定義了
private static final AtomicInteger DIR_INDEX = new AtomicInteger(0);
// nextDirectory 的分配策略變成了如下代碼,每次將 DIR_INDEX + 1,然后對(duì) dir 的總數(shù)求余
nextDirectory = DIR_INDEX.getAndIncrement() % initializedDbBasePaths.length;
通過上述即可實(shí)現(xiàn)輪循策略,申請磁盤時(shí),從 0 號(hào)磁盤開始申請,每次使用下一塊磁盤即可。
■ 帶來的問題:
Java 中靜態(tài)變量屬于 JVM 級(jí)別的,每個(gè) TaskManager 屬于單獨(dú)的 JVM,所以 TaskManager 內(nèi)部保證了輪循策略。如果同一臺(tái)服務(wù)器上運(yùn)行多個(gè) TaskManager,那么多個(gè) TaskManager 都會(huì)從 index 為 0 的磁盤開始使用,所以導(dǎo)致 index 較小的磁盤會(huì)被經(jīng)常使用,而 index 較大的磁盤可能經(jīng)常不會(huì)被使用到。
■ 解決方案 1:
DIR_INDEX 初始化時(shí),不要每次初始化為 0,可以生成一個(gè)隨機(jī)數(shù),這樣可以保證不會(huì)每次使用 index 較小的磁盤,實(shí)現(xiàn)代碼如下所示:
// 在 RocksDBStateBackend 類中定義了private static final AtomicInteger DIR_INDEX = new AtomicInteger(new Random().nextInt(100));
但是上述方案不能完全解決磁盤沖突的問題,同一臺(tái)機(jī)器上 12 塊磁盤,TaskManager0 使用 index 為 0、1、2 的三塊磁盤,TaskManager1 可能使用 index 為 1、2、3 的三塊磁盤。結(jié)果就是 TaskManager 內(nèi)部來看,實(shí)現(xiàn)了輪循策略保證負(fù)載均衡,但是全局來看,負(fù)載并不均衡。
■ 解決方案 2:
為了全局負(fù)載均衡,所以多個(gè) TaskManager 之間必須通信才能做到絕對(duì)的負(fù)載均衡,可以借助第三方的存儲(chǔ)進(jìn)行通信,例如在 Zookeeper 中,為每個(gè)服務(wù)器生成一個(gè) znode,znode 命名可以是 host 或者 ip。使用 Curator 的 DistributedAtomicInteger 來維護(hù) DIR_INDEX 變量,存儲(chǔ)在當(dāng)前服務(wù)器對(duì)應(yīng)的 znode 中,無論是哪個(gè) TaskManager 申請磁盤,都可以使用 DistributedAtomicInteger 將當(dāng)前服務(wù)器對(duì)應(yīng)的 DIR_INDEX + 1,從而就可以實(shí)現(xiàn)全局的輪循策略。
DistributedAtomicInteger 的 increment 的思路:先使用 Zookeeper 的 withVersion api 進(jìn)行 +1 操作(也就是 Zookeeper 提供的 CAS api),如果成功則成功;如果失敗,則使用分布式互斥鎖進(jìn)行 +1 操作。
基于上述描述,我們得到兩種策略來實(shí)現(xiàn)輪循,AtomicInteger 只能保證 TaskManager 內(nèi)部的輪循,不能保證全局輪循。如果要基于全局輪循,需要借助 Zookeeper 或其他組件來實(shí)現(xiàn)。如果對(duì)輪循策略要求比較苛刻,可以使用基于 Zookeeper 的輪循策略,如果不想依賴外部組件則只能使用 AtomicInteger 來實(shí)現(xiàn)。
思想就是 TaskManager 啟動(dòng)時(shí),監(jiān)測所有 rocksdb local dir 對(duì)應(yīng)的磁盤最近 1 分鐘或 5 分鐘的 IO 平均使用率,篩掉 IO 使用率較高的磁盤,優(yōu)先選擇 IO 平均使用率較低的磁盤,同時(shí)在 IO 平均使用率較低的磁盤中,依然要使用輪循策略來實(shí)現(xiàn)。
Flink 任務(wù)啟動(dòng)時(shí),只能拿到磁盤當(dāng)前的 IO 使用率,是一個(gè)瞬時(shí)值,會(huì)不會(huì)不靠譜?
Flink 任務(wù)啟動(dòng),不可能等待任務(wù)先采集 1 分鐘 IO 使用率以后,再去啟動(dòng)。
不想依賴外部監(jiān)控系統(tǒng)去拿這個(gè) IO 使用率,要考慮通用性。
假設(shè)已經(jīng)拿到了所有硬盤最近 1 分鐘的 IO 使用率,該如何去決策呢?
對(duì)于 IO 平均使用率較低的磁盤中,依然要使用輪循策略來實(shí)現(xiàn)。
IO 平均使用率較低,這里的較低不好評(píng)判,相差 10% 算低,還是 20%、30%。
而且不同的新任務(wù)對(duì)于磁盤的使用率要求也是不一樣的,所以評(píng)判難度較大。
啟動(dòng)階段不采集硬盤的負(fù)載壓力,使用之前的 DistributedAtomicInteger 基本就可以保證每個(gè)硬盤負(fù)載均衡。但是任務(wù)啟動(dòng)后一段時(shí)間,如果因?yàn)?Flink 任務(wù)導(dǎo)致某個(gè)磁盤 IO 的平均使用率相對(duì)其他磁盤而言非常高。我們可以選擇遷移高負(fù)載硬盤的數(shù)據(jù)到低負(fù)載硬盤。
上述內(nèi)容就是Flink使用大狀態(tài)時(shí)的優(yōu)化是什么,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。