溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Checkpoint與state的關(guān)系以及Checkpoint的執(zhí)行機(jī)制

發(fā)布時(shí)間:2021-09-04 14:34:14 來(lái)源:億速云 閱讀:209 作者:chen 欄目:編程語(yǔ)言

這篇文章主要講解了“Checkpoint與state的關(guān)系以及Checkpoint的執(zhí)行機(jī)制”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Checkpoint與state的關(guān)系以及Checkpoint的執(zhí)行機(jī)制”吧!

大家好,今天我將跟大家分享一下 Flink 里面的 Checkpoint,共分為四個(gè)部分。首先講一下 Checkpoint 與 state 的關(guān)系,然后介紹什么是 state,第三部分介紹如何在 Flink 中使用state,第四部分則介紹 Checkpoint 的執(zhí)行機(jī)制。

Checkpoint 與 state 的關(guān)系

Checkpoint 是從 source 觸發(fā)到下游所有節(jié)點(diǎn)完成的一次全局操作。下圖可以有一個(gè)對(duì) Checkpoint 的直觀感受,紅框里面可以看到一共觸發(fā)了 569K 次 Checkpoint,然后全部都成功完成,沒(méi)有 fail 的。

state 其實(shí)就是 Checkpoint 所做的主要持久化備份的主要數(shù)據(jù),看下圖的具體數(shù)據(jù)統(tǒng)計(jì),其 state 也就 9kb 大小 。

什么是 state

我們接下來(lái)看什么是 state。先看一個(gè)非常經(jīng)典的 word count 代碼,這段代碼會(huì)去監(jiān)控本地的 9000 端口的數(shù)據(jù)并對(duì)網(wǎng)絡(luò)端口輸入進(jìn)行詞頻統(tǒng)計(jì),我們本地行動(dòng) netcat,然后在終端輸入 hello world,執(zhí)行程序會(huì)輸出什么?

答案很明顯,(hello, 1) 和 (word,1)

那么問(wèn)題來(lái)了,如果再次在終端輸入 hello world,程序會(huì)輸入什么?

答案其實(shí)也很明顯,(hello, 2) 和 (world, 2)。為什么 Flink 知道之前已經(jīng)處理過(guò)一次 hello world,這就是 state 發(fā)揮作用了,這里是被稱(chēng)為 keyed state 存儲(chǔ)了之前需要統(tǒng)計(jì)的數(shù)據(jù),所以幫助 Flink 知道 hello 和 world 分別出現(xiàn)過(guò)一次。

回顧一下剛才這段 word count 代碼。keyby 接口的調(diào)用會(huì)創(chuàng)建 keyed stream 對(duì) key 進(jìn)行劃分,這是使用 keyed state 的前提。在此之后,sum 方法會(huì)調(diào)用內(nèi)置的 StreamGroupedReduce 實(shí)現(xiàn)。

什么是 keyed state

對(duì)于 keyed state,有兩個(gè)特點(diǎn):

  • 只能應(yīng)用于 KeyedStream 的函數(shù)與操作中,例如 Keyed UDF, window state

  • keyed state 是已經(jīng)分區(qū)/劃分好的,每一個(gè) key 只能屬于某一個(gè) keyed state

對(duì)于如何理解已經(jīng)分區(qū)的概念,我們需要看一下 keyby 的語(yǔ)義,大家可以看到下圖左邊有三個(gè)并發(fā),右邊也是三個(gè)并發(fā),左邊的詞進(jìn)來(lái)之后,通過(guò) keyby 會(huì)進(jìn)行相應(yīng)的分發(fā)。例如對(duì)于 hello word,hello 這個(gè)詞通過(guò) hash 運(yùn)算永遠(yuǎn)只會(huì)到右下方并發(fā)的 task 上面去。

什么是operator state

  • 又稱(chēng)為 non-keyed state,每一個(gè) operator state 都僅與一個(gè) operator 的實(shí)例綁定。

  • 常見(jiàn)的 operator state 是 source state,例如記錄當(dāng)前 source 的 offset

再看一段使用 operator state 的 word count 代碼:

這里的fromElements會(huì)調(diào)用FromElementsFunction的類(lèi),其中就使用了類(lèi)型為 list state 的 operator state。根據(jù) state 類(lèi)型做一個(gè)分類(lèi)如下圖:

除了從這種分類(lèi)的角度,還有一種分類(lèi)的角度是從 Flink 是否直接接管:

  • Managed State:由 Flink 管理的 state,剛才舉例的所有 state 均是 managed state

  • Raw State:Flink 僅提供 stream 可以進(jìn)行存儲(chǔ)數(shù)據(jù),對(duì) Flink 而言 raw state 只是一些 bytes

在實(shí)際生產(chǎn)中,都只推薦使用 managed state,本文將圍繞該話(huà)題進(jìn)行討論。

如何在 Flink 中使用 state

下圖就前文 word count 的 sum 所使用的StreamGroupedReduce類(lèi)為例講解了如何在代碼中使用 keyed state:

下圖則對(duì) word count 示例中的FromElementsFunction類(lèi)進(jìn)行詳解并分享如何在代碼中使用 operator state:

Checkpoint 的執(zhí)行機(jī)制

在介紹 Checkpoint 的執(zhí)行機(jī)制前,我們需要了解一下 state 的存儲(chǔ),因?yàn)?state 是 Checkpoint 進(jìn)行持久化備份的主要角色。

Statebackend 的分類(lèi)

下圖闡釋了目前 Flink 內(nèi)置的三類(lèi) state backend,其中MemoryStateBackend和FsStateBackend在運(yùn)行時(shí)都是存儲(chǔ)在 java heap 中的,只有在執(zhí)行 Checkpoint 時(shí),F(xiàn)sStateBackend才會(huì)將數(shù)據(jù)以文件格式持久化到遠(yuǎn)程存儲(chǔ)上。而RocksDBStateBackend則借用了 RocksDB(內(nèi)存磁盤(pán)混合的 LSM DB)對(duì) state 進(jìn)行存儲(chǔ)。

對(duì)于HeapKeyedStateBackend,有兩種實(shí)現(xiàn):

  • 支持異步 Checkpoint(默認(rèn)):存儲(chǔ)格式 CopyOnWriteStateMap

  • 僅支持同步 Checkpoint:存儲(chǔ)格式 NestedStateMap

特別在 MemoryStateBackend 內(nèi)使用HeapKeyedStateBackend時(shí),Checkpoint 序列化數(shù)據(jù)階段默認(rèn)有最大 5 MB數(shù)據(jù)的限制

對(duì)于RocksDBKeyedStateBackend,每個(gè) state 都存儲(chǔ)在一個(gè)單獨(dú)的 column family 內(nèi),其中 keyGroup,Key 和 Namespace 進(jìn)行序列化存儲(chǔ)在 DB 作為 key。

Checkpoint 執(zhí)行機(jī)制詳解

本小節(jié)將對(duì) Checkpoint 的執(zhí)行流程逐步拆解進(jìn)行講解,下圖左側(cè)是 Checkpoint Coordinator,是整個(gè) Checkpoint 的發(fā)起者,中間是由兩個(gè) source,一個(gè) sink 組成的 Flink 作業(yè),最右側(cè)的是持久化存儲(chǔ),在大部分用戶(hù)場(chǎng)景中對(duì)應(yīng) HDFS。

  1. 第一步,Checkpoint Coordinator 向所有 source 節(jié)點(diǎn) trigger Checkpoint;。

  1. 第二步,source 節(jié)點(diǎn)向下游廣播 barrier,這個(gè) barrier 就是實(shí)現(xiàn) Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才會(huì)執(zhí)行相應(yīng)的 Checkpoint。

  1. 第三步,當(dāng) task 完成 state 備份后,會(huì)將備份數(shù)據(jù)的地址(state handle)通知給 Checkpoint coordinator。

  1. 第四步,下游的 sink 節(jié)點(diǎn)收集齊上游兩個(gè) input 的 barrier 之后,會(huì)執(zhí)行本地快照,這里特地展示了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 會(huì)全量刷數(shù)據(jù)到磁盤(pán)上(紅色大三角表示),然后 Flink 框架會(huì)從中選擇沒(méi)有上傳的文件進(jìn)行持久化備份(紫色小三角)。

  1. 同樣的,sink 節(jié)點(diǎn)在完成自己的 Checkpoint 之后,會(huì)將 state handle 返回通知 Coordinator。

  1. 最后,當(dāng) Checkpoint coordinator 收集齊所有 task 的 state handle,就認(rèn)為這一次的 Checkpoint 全局完成了,向持久化存儲(chǔ)中再備份一個(gè) Checkpoint meta 文件。

Checkpoint 的 EXACTLY_ONCE 語(yǔ)義

為了實(shí)現(xiàn) EXACTLY ONCE 語(yǔ)義,F(xiàn)link 通過(guò)一個(gè) input buffer 將在對(duì)齊階段收到的數(shù)據(jù)緩存起來(lái),等對(duì)齊完成之后再進(jìn)行處理。而對(duì)于 AT LEAST ONCE 語(yǔ)義,無(wú)需緩存收集到的數(shù)據(jù),會(huì)對(duì)后續(xù)直接處理,所以導(dǎo)致 restore 時(shí),數(shù)據(jù)可能會(huì)被多次處理。下圖是官網(wǎng)文檔里面就 Checkpoint align 的示意圖:

需要特別注意的是,F(xiàn)link 的 Checkpoint 機(jī)制只能保證 Flink 的計(jì)算過(guò)程可以做到 EXACTLY ONCE,端到端的 EXACTLY ONCE 需要 source 和 sink 支持。

Savepoint 與 Checkpoint 的區(qū)別

作業(yè)恢復(fù)時(shí),二者均可以使用,主要區(qū)別如下:

SavepointExternalized Checkpoint用戶(hù)通過(guò)命令觸發(fā),由用戶(hù)管理其創(chuàng)建與刪除Checkpoint 完成時(shí),在用戶(hù)給定的外部持久化存儲(chǔ)保存標(biāo)準(zhǔn)化格式存儲(chǔ),允許作業(yè)升級(jí)或者配置變更當(dāng)作業(yè) FAILED(或者CANCELED)時(shí),外部存儲(chǔ)的 Checkpoint 會(huì)保留下來(lái)用戶(hù)在恢復(fù)時(shí)需要提供用于恢復(fù)作業(yè)狀態(tài)的 savepoint 路徑用戶(hù)在恢復(fù)時(shí)需要提供用于恢復(fù)的作業(yè)狀態(tài)的 Checkpoint 路徑

感謝各位的閱讀,以上就是“Checkpoint與state的關(guān)系以及Checkpoint的執(zhí)行機(jī)制”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Checkpoint與state的關(guān)系以及Checkpoint的執(zhí)行機(jī)制這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI