溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何進(jìn)行Flink 1.11 Unaligned Checkpoint 解析

發(fā)布時(shí)間:2021-12-09 11:12:43 來(lái)源:億速云 閱讀:118 作者:柒染 欄目:大數(shù)據(jù)

今天就跟大家聊聊有關(guān)如何進(jìn)行Flink 1.11 Unaligned Checkpoint 解析,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

作為 Flink 最基礎(chǔ)也是最關(guān)鍵的容錯(cuò)機(jī)制,Checkpoint 快照機(jī)制很好地保證了 Flink 應(yīng)用從異常狀態(tài)恢復(fù)后的數(shù)據(jù)準(zhǔn)確性。同時(shí) Checkpoint 相關(guān)的 metrics 也是診斷 Flink 應(yīng)用健康狀態(tài)最為重要的指標(biāo),成功且耗時(shí)較短的 Checkpoint 表明作業(yè)運(yùn)行狀況良好,沒(méi)有異?;蚍磯?。  然而,由于 Checkpoint 與反壓的耦合,反壓反過(guò)來(lái)也會(huì)作用于 Checkpoint,導(dǎo)致 Checkpoint 的種種問(wèn)題。

針對(duì)于此,F(xiàn)link 在 1.11 引入 Unaligned Checkpint 來(lái)解耦 Checkpoint 機(jī)制與反壓機(jī)制,優(yōu)化高反壓情況下的 Checkpoint 表現(xiàn)。

當(dāng)前 Checkpoint 機(jī)制簡(jiǎn)述

相信不少讀者對(duì) Flink Checkpoint 基于 Chandy-Lamport 算法的分布式快照已經(jīng)比較熟悉,該節(jié)簡(jiǎn)單回顧下算法的基礎(chǔ)邏輯,熟悉算法的讀者可放心跳過(guò)。

Chandy-Lamport 算法將分布式系統(tǒng)抽象成 DAG(暫時(shí)不考慮有閉環(huán)的圖),節(jié)點(diǎn)表示進(jìn)程,邊表示兩個(gè)進(jìn)程間通信的管道。分布式快照的目的是記錄下整個(gè)系統(tǒng)的狀態(tài),即可以分為節(jié)點(diǎn)的狀態(tài)(進(jìn)程的狀態(tài))和邊的狀態(tài)(信道的狀態(tài),即傳輸中的數(shù)據(jù))。因?yàn)橄到y(tǒng)狀態(tài)是由輸入的消息序列驅(qū)動(dòng)變化的,我們可以將輸入的消息序列分為多個(gè)較短的子序列,圖的每個(gè)節(jié)點(diǎn)或邊先后處理完某個(gè)子序列后,都會(huì)進(jìn)入同一個(gè)穩(wěn)定的全局統(tǒng)狀態(tài)。利用這個(gè)特性,系統(tǒng)的進(jìn)程和信道在子序列的邊界點(diǎn)分別進(jìn)行本地快照,即使各部分的快照時(shí)間點(diǎn)不同,最終也可以組合成一個(gè)有意義的全局快照。

如何進(jìn)行Flink 1.11 Unaligned Checkpoint 解析

圖1. Checkpoint Barrier  

從實(shí)現(xiàn)上看,F(xiàn)link 通過(guò)在 DAG 數(shù)據(jù)源定時(shí)向數(shù)據(jù)流注入名為 Barrier 的特殊元素,將連續(xù)的數(shù)據(jù)流切分為多個(gè)有限序列,對(duì)應(yīng)多個(gè) Checkpoint 周期。每當(dāng)接收到 Barrier,算子進(jìn)行本地的 Checkpoint 快照,并在完成后異步上傳本地快照,同時(shí)將 Barrier 以廣播方式發(fā)送至下游。當(dāng)某個(gè) Checkpoint 的所有 Barrier 到達(dá) DAG 末端且所有算子完成快照,則標(biāo)志著全局快照的成功。

如何進(jìn)行Flink 1.11 Unaligned Checkpoint 解析

圖2. Barrier Alignment

在有多個(gè)輸入 Channel 的情況下,為了數(shù)據(jù)準(zhǔn)確性,算子會(huì)等待所有流的 Barrier 都到達(dá)之后才會(huì)開(kāi)始本地的快照,這種機(jī)制被稱為 Barrier 對(duì)齊。在對(duì)齊的過(guò)程中,算子只會(huì)繼續(xù)處理的來(lái)自未出現(xiàn) Barrier Channel 的數(shù)據(jù),而其余 Channel 的數(shù)據(jù)會(huì)被寫(xiě)入輸入隊(duì)列,直至在隊(duì)列滿后被阻塞。當(dāng)所有 Barrier 到達(dá)后,算子進(jìn)行本地快照,輸出 Barrier 到下游并恢復(fù)正常處理。

比起其他分布式快照,該算法的優(yōu)勢(shì)在于輔以 Copy-On-Write 技術(shù)的情況下不需要 “Stop The World” 影響應(yīng)用吞吐量,同時(shí)基本不用持久化處理中的數(shù)據(jù),只用保存進(jìn)程的狀態(tài)信息,大大減小了快照的大小。

Checkpoint 與反壓的耦合

目前的 Checkpoint 算法在大多數(shù)情況下運(yùn)行良好,然而當(dāng)作業(yè)出現(xiàn)反壓時(shí),阻塞式的 Barrier 對(duì)齊反而會(huì)加劇作業(yè)的反壓,甚至導(dǎo)致作業(yè)的不穩(wěn)定。

首先, Chandy-Lamport 分布式快照的結(jié)束依賴于 Marker 的流動(dòng),而反壓則會(huì)限制 Marker 的流動(dòng),導(dǎo)致快照的完成時(shí)間變長(zhǎng)甚至超時(shí)。無(wú)論是哪種情況,都會(huì)導(dǎo)致 Checkpoint 的時(shí)間點(diǎn)落后于實(shí)際數(shù)據(jù)流較多。這時(shí)作業(yè)的計(jì)算進(jìn)度是沒(méi)有被持久化的,處于一個(gè)比較脆弱的狀態(tài),如果作業(yè)出于異常被動(dòng)重啟或者被用戶主動(dòng)重啟,作業(yè)會(huì)回滾丟失一定的進(jìn)度。如果 Checkpoint 連續(xù)超時(shí)且沒(méi)有很好的監(jiān)控,回滾丟失的進(jìn)度可能高達(dá)一天以上,對(duì)于實(shí)時(shí)業(yè)務(wù)這通常是不可接受的。更糟糕的是,回滾后的作業(yè)落后的 Lag 更大,通常帶來(lái)更大的反壓,形成一個(gè)惡性循環(huán)。

其次,Barrier 對(duì)齊本身可能成為一個(gè)反壓的源頭,影響上游算子的效率,而這在某些情況下是不必要的。比如典型的情況是一個(gè)的作業(yè)讀取多個(gè) Source,分別進(jìn)行不同的聚合計(jì)算,然后將計(jì)算完的結(jié)果分別寫(xiě)入不同的 Sink。通常來(lái)說(shuō),這些不同的 Sink 會(huì)復(fù)用公共的算子以減少重復(fù)計(jì)算,但并不希望不同 Source 間相互影響。

如何進(jìn)行Flink 1.11 Unaligned Checkpoint 解析圖3. Barrier Alignment 阻塞上游 Task


假設(shè)一個(gè)作業(yè)要分別統(tǒng)計(jì) A 和 B 兩個(gè)業(yè)務(wù)線的以天為粒度指標(biāo),同時(shí)還需要統(tǒng)計(jì)所有業(yè)務(wù)線以周為單位的指標(biāo),拓?fù)淙缟蠄D所示。如果 B 業(yè)務(wù)線某天的業(yè)務(wù)量突漲,使得 Checkpoint Barrier 有延遲,那么會(huì)導(dǎo)致公用的 Window Aggregate 進(jìn)行 Barrier 對(duì)齊,進(jìn)而阻塞業(yè)務(wù) A 的 FlatMap,最終令業(yè)務(wù) A 的計(jì)算也出現(xiàn)延遲。

當(dāng)然這種情況可以通過(guò)拆分作業(yè)等方式優(yōu)化,但難免引入更多開(kāi)發(fā)維護(hù)成本,而且更重要的是這本來(lái)就符合 Flink 用戶常規(guī)的開(kāi)發(fā)思路,應(yīng)該在框架內(nèi)盡量減小出現(xiàn)用戶意料之外的行為的可能性。

Unaligned Checkpoint

為了解決這個(gè)問(wèn)題,F(xiàn)link 在 1.11 版本引入了 Unaligned Checkpoint 的特性。要理解 Unaligned Checkpoint 的原理,首先需要了解 Chandy-Lamport 論文中對(duì)于 Marker 處理規(guī)則的描述:

如何進(jìn)行Flink 1.11 Unaligned Checkpoint 解析

圖4. Chandy-Lamport Marker 處理


其中關(guān)鍵是 if q has not recorded its state,也就是接收到 Marker 時(shí)算子是否已經(jīng)進(jìn)行過(guò)本地快照。一直以來(lái) Flink 的 Aligned Checkpoint 通過(guò) Barrier 對(duì)齊,將本地快照延遲至所有 Barrier 到達(dá),因而這個(gè)條件是永真的,從而巧妙地避免了對(duì)算子輸入隊(duì)列的狀態(tài)進(jìn)行快照,但代價(jià)是比較不可控的 Checkpoint 時(shí)長(zhǎng)和吞吐量的降低。實(shí)際上這和 Chandy-Lamport 算法是有一定出入的。

舉個(gè)例子,假設(shè)我們對(duì)兩個(gè)數(shù)據(jù)流進(jìn)行 equal-join,輸出匹配上的元素。按照 Flink Aligned Checkpoint 的方式,系統(tǒng)的狀態(tài)變化如下(圖中不同顏色的元素代表屬于不同的 Checkpoint 周期):

如何進(jìn)行Flink 1.11 Unaligned Checkpoint 解析

圖5. Aligned Checkpoint 狀態(tài)變化

  • 圖 a: 輸入 Channel 1 存在 3 個(gè)元素,其中 2 在 Barrier 前面;Channel 2 存在 4 個(gè)元素,其中 2、9、7在 Barrier 前面。
  • 圖 b: 算子分別讀取 Channel 一個(gè)元素,輸出 2。隨后接收到 Channel 1 的 Barrier,停止處理 Channel 1 后續(xù)的數(shù)據(jù),只處理 Channel 2 的數(shù)據(jù)。
  • 圖 c: 算子再消費(fèi) 2 個(gè)自 Channel 2 的元素,接收到 Barrier,開(kāi)始本地快照并輸出 Barrier。

對(duì)于相同的情況,Chandy-Lamport 算法的狀態(tài)變化如下:

如何進(jìn)行Flink 1.11 Unaligned Checkpoint 解析

圖6. Chandy-Lamport 狀態(tài)變化

  • 圖 a: 同上。
  • 圖 b: 算子分別處理兩個(gè) Channel 一個(gè)元素,輸出結(jié)果 2。此后接收到 Channel 1 的 Barrier,算子開(kāi)始本地快照記錄自己的狀態(tài),并輸出 Barrier。
  • 圖 c: 算子繼續(xù)正常處理兩個(gè) Channel 的輸入,輸出 9。特別的地方是 Channel 2 后續(xù)元素會(huì)被保存下來(lái),直到 Channel 2 的 Barrier 出現(xiàn)(即 Channel 2 的 9 和 7)。保存的數(shù)據(jù)會(huì)作為 Channel 的狀態(tài)成為快照的一部分。

兩者的差異主要可以總結(jié)為兩點(diǎn):

  1. 快照的觸發(fā)是在接收到第一個(gè) Barrier 時(shí)還是在接收到最后一個(gè) Barrier 時(shí)。
  2. 是否需要阻塞已經(jīng)接收到 Barrier 的 Channel 的計(jì)算。

從這兩點(diǎn)來(lái)看,新的 Unaligned Checkpoint 將快照的觸發(fā)改為第一個(gè) Barrier 且取消阻塞 Channel 的計(jì)算,算法上與 Chandy-Lamport 基本一致,同時(shí)在實(shí)現(xiàn)細(xì)節(jié)方面結(jié)合 Flink 的定位做了幾個(gè)改進(jìn)。

首先,不同于 Chandy-Lamport 模型的只需要考慮算子輸入 Channel 的狀態(tài),F(xiàn)link 的算子有輸入和輸出兩種 Channel,在快照時(shí)兩者的狀態(tài)都需要被考慮。

其次,無(wú)論在 Chandy-Lamport 還是 Flink Aligned Checkpoint 算法中,Barrier 都必須遵循其在數(shù)據(jù)流中的位置,算子需要等待 Barrier 被實(shí)際處理才開(kāi)始快照。而 Unaligned Checkpoint 改變了這個(gè)設(shè)定,允許算子優(yōu)先攝入并優(yōu)先輸出 Barrier。如此一來(lái),第一個(gè)到達(dá) Barrier 會(huì)在算子的緩存數(shù)據(jù)隊(duì)列(包括輸入 Channel 和輸出 Channel)中往前跳躍一段距離,而被”插隊(duì)”的數(shù)據(jù)和其他輸入 Channel 在其 Barrier 之前的數(shù)據(jù)會(huì)被寫(xiě)入快照中(圖中黃色部分)。

如何進(jìn)行Flink 1.11 Unaligned Checkpoint 解析

圖7. Barrier 越過(guò)數(shù)據(jù)

這樣的主要好處是,如果本身算子的處理就是瓶頸,Chandy-Lamport 的 Barrier 仍會(huì)被阻塞,但 Unaligned Checkpoint 則可以在 Barrier 進(jìn)入輸入 Channel 就馬上開(kāi)始快照。這可以從很大程度上加快 Barrier 流經(jīng)整個(gè) DAG 的速度,從而降低 Checkpoint 整體時(shí)長(zhǎng)。

回到之前的例子,用 Unaligned Checkpoint 來(lái)實(shí)現(xiàn),狀態(tài)變化如下:

如何進(jìn)行Flink 1.11 Unaligned Checkpoint 解析圖8. Unaligned-Checkpoint 狀態(tài)變化


  • 圖 a: 輸入 Channel 1 存在 3 個(gè)元素,其中 2 在 Barrier 前面;Channel 2 存在 4 個(gè)元素,其中 2、9、7在 Barrier 前面。輸出 Channel 已存在結(jié)果數(shù)據(jù) 1。
  • 圖 b: 算子優(yōu)先處理輸入 Channel 1 的 Barrier,開(kāi)始本地快照記錄自己的狀態(tài),并將 Barrier 插到輸出 Channel 末端。
  • 圖 c: 算子繼續(xù)正常處理兩個(gè) Channel 的輸入,輸出 2、9。同時(shí)算子會(huì)將 Barrier 越過(guò)的數(shù)據(jù)(即輸入 Channel 1 的 2 和輸出 Channel 的 1)寫(xiě)入 Checkpoint,并將輸入 Channel 2 后續(xù)早于 Barrier 的數(shù)據(jù)(即 2、9、7)持續(xù)寫(xiě)入 Checkpoint。

比起 Aligned Checkpoint 中不同 Checkpoint 周期的數(shù)據(jù)以算子快照為界限分隔得很清晰,Unaligned Checkpoint 進(jìn)行快照和輸出 Barrier 時(shí),部分本屬于當(dāng)前 Checkpoint 的輸入數(shù)據(jù)還未計(jì)算(因此未反映到當(dāng)前算子狀態(tài)中),而部分屬于當(dāng)前 Checkpoint 的輸出數(shù)據(jù)卻落到 Barrier 之后(因此未反映到下游算子的狀態(tài)中)。

這也正是 Unaligned 的含義: 不同 Checkpoint 周期的數(shù)據(jù)沒(méi)有對(duì)齊,包括不同輸入 Channel 之間的不對(duì)齊,以及輸入和輸出間的不對(duì)齊。而這部分不對(duì)齊的數(shù)據(jù)會(huì)被快照記錄下來(lái),以在恢復(fù)狀態(tài)時(shí)重放。換句話說(shuō),從 Checkpoint 恢復(fù)時(shí),不對(duì)齊的數(shù)據(jù)并不能由 Source 端重放的數(shù)據(jù)計(jì)算得出,同時(shí)也沒(méi)有反映到算子狀態(tài)中,但因?yàn)樗鼈儠?huì)被 Checkpoint 恢復(fù)到對(duì)應(yīng) Channel 中,所以依然能提供只計(jì)算一次的準(zhǔn)確結(jié)果。

當(dāng)然,Unaligned Checkpoint 并不是百分百優(yōu)于 Aligned Checkpoint,它會(huì)帶來(lái)的已知問(wèn)題就有:

  1. 由于要持久化緩存數(shù)據(jù),State Size 會(huì)有比較大的增長(zhǎng),磁盤負(fù)載會(huì)加重。
  2. 隨著 State Size 增長(zhǎng),作業(yè)恢復(fù)時(shí)間可能增長(zhǎng),運(yùn)維管理難度增加。

目前看來(lái),Unaligned Checkpoint 更適合容易產(chǎn)生高反壓同時(shí)又比較重要的復(fù)雜作業(yè)。對(duì)于像數(shù)據(jù) ETL 同步等簡(jiǎn)單作業(yè),更輕量級(jí)的 Aligned Checkpoint 顯然是更好的選擇。
Flink 1.11 的 Unaligned Checkpoint 主要解決在高反壓情況下作業(yè)難以完成 Checkpoint 的問(wèn)題,同時(shí)它以磁盤資源為代價(jià),避免了 Checkpoint 可能帶來(lái)的阻塞,有利于提升 Flink 的資源利用率。

看完上述內(nèi)容,你們對(duì)如何進(jìn)行Flink 1.11 Unaligned Checkpoint 解析有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI