溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Flink checkpoint機(jī)制是什么

發(fā)布時(shí)間:2021-12-31 14:19:13 來(lái)源:億速云 閱讀:144 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“Flink checkpoint機(jī)制是什么”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Flink checkpoint機(jī)制是什么”吧!

Checkpoint介紹

checkpoint機(jī)制是Flink可靠性的基石,可以保證Flink集群在某個(gè)算子因?yàn)槟承┰?如 異常退出)出現(xiàn)故障時(shí),能夠?qū)⒄麄€(gè)應(yīng)用流圖的狀態(tài)恢復(fù)到故障之前的某一狀態(tài),保 證應(yīng)用流圖狀態(tài)的一致性。Flink的checkpoint機(jī)制原理來(lái)自“Chandy-Lamport algorithm”算法。

每個(gè)需要checkpoint的應(yīng)用在啟動(dòng)時(shí),F(xiàn)link的JobManager為其創(chuàng)建一個(gè) CheckpointCoordinator(檢查點(diǎn)協(xié)調(diào)器),CheckpointCoordinator全權(quán)負(fù)責(zé)本應(yīng)用的快照制作。

Flink checkpoint機(jī)制是什么

1) CheckpointCoordinator(檢查點(diǎn)協(xié)調(diào)器) 周期性的向該流應(yīng)用的所有source算子發(fā)送 barrier(屏障)。

2) 當(dāng)某個(gè)source算子收到一個(gè)barrier時(shí),便暫停數(shù)據(jù)處理過(guò)程,然后將自己的當(dāng)前狀態(tài)制作成快照,并保存到指定的持久化存儲(chǔ)中,最后向CheckpointCoordinator報(bào)告自己快照制作情況,同時(shí)向自身所有下游算子廣播該barrier,恢復(fù)數(shù)據(jù)處理

3) 下游算子收到barrier之后,會(huì)暫停自己的數(shù)據(jù)處理過(guò)程,然后將自身的相關(guān)狀態(tài)制作成快照,并保存到指定的持久化存儲(chǔ)中,最后向CheckpointCoordinator報(bào)告自身快照情況,同時(shí)向自身所有下游算子廣播該barrier,恢復(fù)數(shù)據(jù)處理。

4) 每個(gè)算子按照步驟3不斷制作快照并向下游廣播,直到最后barrier傳遞到sink算子,快照制作完成。

5) 當(dāng)CheckpointCoordinator收到所有算子的報(bào)告之后,認(rèn)為該周期的快照制作成功; 否則,如果在規(guī)定的時(shí)間內(nèi)沒(méi)有收到所有算子的報(bào)告,則認(rèn)為本周期快照制作失敗。

如果一個(gè)算子有兩個(gè)輸入源,則暫時(shí)阻塞先收到barrier的輸入源,等到第二個(gè)輸入源相 同編號(hào)的barrier到來(lái)時(shí),再制作自身快照并向下游廣播該barrier。具體如下圖所示:

1) 假設(shè)算子C有A和B兩個(gè)輸入源

2) 在第i個(gè)快照周期中,由于某些原因(如處理時(shí)延、網(wǎng)絡(luò)時(shí)延等)輸入源A發(fā)出的 barrier 先到來(lái),這時(shí)算子C暫時(shí)將輸入源A的輸入通道阻塞,僅收輸入源B的數(shù)據(jù)。

3) 當(dāng)輸入源B發(fā)出的barrier到來(lái)時(shí),算子C制作自身快照并向 CheckpointCoordinator 報(bào)告自身的快照制作情況,然后將兩個(gè)barrier合并為一個(gè),向下游所有的算子廣播。

4) 當(dāng)由于某些原因出現(xiàn)故障時(shí),CheckpointCoordinator通知流圖上所有算子統(tǒng)一恢復(fù)到某個(gè)周期的checkpoint狀態(tài),然后恢復(fù)數(shù)據(jù)流處理。分布式checkpoint機(jī)制保證了數(shù)據(jù)僅被處理一次(Exactly Once)。

持久化存儲(chǔ)

MemStateBackend

該持久化存儲(chǔ)主要將快照數(shù)據(jù)保存到JobManager的內(nèi)存中,僅適合作為測(cè)試以及快照的數(shù)據(jù)量非常小時(shí)使用,并不推薦用作大規(guī)模商業(yè)部署。

MemoryStateBackend 的局限性

默認(rèn)情況下,每個(gè)狀態(tài)的大小限制為 5 MB??梢栽贛emoryStateBackend的構(gòu)造函數(shù)中增加此值。

無(wú)論配置的最大狀態(tài)大小如何,狀態(tài)都不能大于akka幀的大?。ㄕ?qǐng)參閱配置)。

聚合狀態(tài)必須適合 JobManager 內(nèi)存。

建議MemoryStateBackend 用于

本地開(kāi)發(fā)和調(diào)試。

狀態(tài)很少的作業(yè),例如僅包含一次記錄功能的作業(yè)(Map,F(xiàn)latMap,F(xiàn)ilter,…),kafka的消費(fèi)者需要很少的狀態(tài)。

FsStateBackend

該持久化存儲(chǔ)主要將快照數(shù)據(jù)保存到文件系統(tǒng)中,目前支持的文件系統(tǒng)主要是 HDFS和本地文件。如果使用HDFS,則初始化FsStateBackend時(shí),需要傳入以 “hdfs://”開(kāi)頭的路徑(即: new FsStateBackend("hdfs:///hacluster/checkpoint")), 如果使用本地文件,則需要傳入以“file://”開(kāi)頭的路徑(即:new FsStateBackend("file:///Data"))。在分布式情況下,不推薦使用本地文件。如果某 個(gè)算子在節(jié)點(diǎn)A上失敗,在節(jié)點(diǎn)B上恢復(fù),使用本地文件時(shí),在B上無(wú)法讀取節(jié)點(diǎn) A上的數(shù)據(jù),導(dǎo)致?tīng)顟B(tài)恢復(fù)失敗。

建議FsStateBackend:

具有大狀態(tài),長(zhǎng)窗口,大鍵 / 值狀態(tài)的作業(yè)。

所有高可用性設(shè)置。

RocksDBStateBackend

RocksDBStatBackend介于本地文件和HDFS之間,平時(shí)使用RocksDB的功能,將數(shù) 據(jù)持久化到本地文件中,當(dāng)制作快照時(shí),將本地?cái)?shù)據(jù)制作成快照,并持久化到 FsStateBackend中(FsStateBackend不必用戶特別指明,只需在初始化時(shí)傳入HDFS 或本地路徑即可,如new RocksDBStateBackend("hdfs:///hacluster/checkpoint")或new RocksDBStateBackend("file:///Data"))。

如果用戶使用自定義窗口(window),不推薦用戶使用RocksDBStateBackend。在自定義窗口中,狀態(tài)以ListState的形式保存在StatBackend中,如果一個(gè)key值中有多個(gè)value值,則RocksDB讀取該種ListState非常緩慢,影響性能。用戶可以根據(jù)應(yīng)用的具體情況選擇FsStateBackend+HDFS或RocksStateBackend+HDFS。

語(yǔ)法

val env = StreamExecutionEnvironment.getExecutionEnvironment()// start a checkpoint every 1000 msenv.enableCheckpointing(1000)// advanced options:// 設(shè)置checkpoint的執(zhí)行模式,最多執(zhí)行一次或者至少執(zhí)行一次env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)// 設(shè)置checkpoint的超時(shí)時(shí)間env.getCheckpointConfig.setCheckpointTimeout(60000)// 如果在只做快照過(guò)程中出現(xiàn)錯(cuò)誤,是否讓整體任務(wù)失?。簍rue是  false不是env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)//設(shè)置同一時(shí)間有多少 個(gè)checkpoint可以同時(shí)執(zhí)行 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

修改State Backend的兩種方式

第一種:?jiǎn)稳蝿?wù)調(diào)整

修改當(dāng)前任務(wù)代碼

env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依賴】

第二種:全局調(diào)整

修改flink-conf.yaml

state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints

注意:state.backend的值可以是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

Checkpoint的高級(jí)選項(xiàng)

默認(rèn)checkpoint功能是disabled的,想要使用的時(shí)候需要先啟用checkpoint開(kāi)啟之后,默認(rèn)的checkPointMode是Exactly-once

//配置一秒鐘開(kāi)啟一個(gè)checkpointenv.enableCheckpointing(1000)//指定checkpoint的執(zhí)行模式//兩種可選://CheckpointingMode.EXACTLY_ONCE:默認(rèn)值//CheckpointingMode.AT_LEAST_ONCEenv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

一般情況下選擇CheckpointingMode.EXACTLY_ONCE,除非場(chǎng)景要求極低的延遲(幾毫秒)

注意:如果需要保證EXACTLY_ONCE,source和sink要求必須同時(shí)保證EXACTLY_ONCE
//如果程序被cancle,保留以前做的checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)默認(rèn)情況下,檢查點(diǎn)不被保留,僅用于在故障中恢復(fù)作業(yè),可以啟用外部持久化檢查點(diǎn),同時(shí)指定保留策略:ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作業(yè)取消時(shí)保留檢查點(diǎn),注意,在這種情況下,您必須在取消后手動(dòng)清理檢查點(diǎn)狀態(tài)ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:當(dāng)作業(yè)在被cancel時(shí),刪除檢查點(diǎn),檢查點(diǎn)僅在作業(yè)失敗時(shí)可用
//設(shè)置checkpoint超時(shí)時(shí)間env.getCheckpointConfig.setCheckpointTimeout(60000)//Checkpointing的超時(shí)時(shí)間,超時(shí)時(shí)間內(nèi)沒(méi)有完成則被終止
//Checkpointing最小時(shí)間間隔,用于指定上一個(gè)checkpoint完成之后//最小等多久可以觸發(fā)另一個(gè)checkpoint,當(dāng)指定這個(gè)參數(shù)時(shí),maxConcurrentCheckpoints的值為1env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
//設(shè)置同一個(gè)時(shí)間是否可以有多個(gè)checkpoint執(zhí)行env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
指定運(yùn)行中的checkpoint最多可以有多少個(gè)

env.getCheckpointConfig.setFailOnCheckpointingErrors(true)
用于指定在checkpoint發(fā)生異常的時(shí)候,是否應(yīng)該fail該task,默認(rèn)是true,如果設(shè)置為false,則task會(huì)拒絕checkpoint然后繼續(xù)運(yùn)行

Flink的重啟策略

Flink支持不同的重啟策略,這些重啟策略控制著job失敗后如何重啟。集群可以通過(guò)默認(rèn)的重啟策略來(lái)重啟,這個(gè)默認(rèn)的重啟策略通常在未指定重啟策略的情況下使用,而如果Job提交的時(shí)候指定了重啟策略,這個(gè)重啟策略就會(huì)覆蓋掉集群的默認(rèn)重啟策略。

概覽

默認(rèn)的重啟策略是通過(guò)Flink的 flink-conf.yaml 來(lái)指定的,這個(gè)配置參數(shù) restart-strategy 定義了哪種策略會(huì)被采用。如果checkpoint未啟動(dòng),就會(huì)采用 no restart 策略,如果啟動(dòng)了checkpoint機(jī)制,但是未指定重啟策略的話,就會(huì)采用 fixed-delay 策略,重試 Integer.MAX_VALUE 次。請(qǐng)參考下面的可用重啟策略來(lái)了解哪些值是支持的。

每個(gè)重啟策略都有自己的參數(shù)來(lái)控制它的行為,這些值也可以在配置文件中設(shè)置,每個(gè)重啟策略的描述都包含著各自的配置值信息。

除了定義一個(gè)默認(rèn)的重啟策略之外,你還可以為每一個(gè)Job指定它自己的重啟策略,這個(gè)重啟策略可以在 ExecutionEnvironment 中調(diào)用 setRestartStrategy() 方法來(lái)程序化地調(diào)用,注意這種方式同樣適用于 StreamExecutionEnvironment。

下面的例子展示了如何為Job設(shè)置一個(gè)固定延遲重啟策略,一旦有失敗,系統(tǒng)就會(huì)嘗試每10秒重啟一次,重啟3次。

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // 重啟次數(shù)  Time.of(10, TimeUnit.SECONDS) // 延遲時(shí)間間隔))

固定延遲重啟策略(Fixed Delay Restart Strategy)

固定延遲重啟策略會(huì)嘗試一個(gè)給定的次數(shù)來(lái)重啟Job,如果超過(guò)了最大的重啟次數(shù),Job最終將失敗。在連續(xù)的兩次重啟嘗試之間,重啟策略會(huì)等待一個(gè)固定的時(shí)間。

重啟策略可以配置flink-conf.yaml的下面配置參數(shù)來(lái)啟用,作為默認(rèn)的重啟策略:

restart-strategy: fixed-delay

例子:

restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10 s

固定延遲重啟也可以在程序中設(shè)置:

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // 重啟次數(shù)  Time.of(10, TimeUnit.SECONDS) // 重啟時(shí)間間隔))

失敗率重啟策略

失敗率重啟策略在Job失敗后會(huì)重啟,但是超過(guò)失敗率后,Job會(huì)最終被認(rèn)定失敗。在兩個(gè)連續(xù)的重啟嘗試之間,重啟策略會(huì)等待一個(gè)固定的時(shí)間。

失敗率重啟策略可以在flink-conf.yaml中設(shè)置下面的配置參數(shù)來(lái)啟用:

restart-strategy:failure-rate

例子:

restart-strategy.failure-rate.max-failures-per-interval: 3restart-strategy.failure-rate.failure-rate-interval: 5 minrestart-strategy.failure-rate.delay: 10 s

失敗率重啟策略也可以在程序中設(shè)置:

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // 每個(gè)測(cè)量時(shí)間間隔最大失敗次數(shù)  Time.of(5, TimeUnit.MINUTES), //失敗率測(cè)量的時(shí)間間隔  Time.of(10, TimeUnit.SECONDS) // 兩次連續(xù)重啟嘗試的時(shí)間間隔))

無(wú)重啟策略

Job直接失敗,不會(huì)嘗試進(jìn)行重啟

restart-strategy: none

無(wú)重啟策略也可以在程序中設(shè)置

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())

感謝各位的閱讀,以上就是“Flink checkpoint機(jī)制是什么”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Flink checkpoint機(jī)制是什么這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI