溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Flink的Checkpoint機制是什么

發(fā)布時間:2021-12-31 10:44:50 來源:億速云 閱讀:224 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“Flink的Checkpoint機制是什么”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Flink的Checkpoint機制是什么”吧!

一、Checkpoint概念

Flink本身為了保證其高可用的特性,以及保證作用的Exactly Once的快速恢復(fù),進而提供了一套強大的Checkpoint機制。

    Checkpoint機制是Flink可靠性的基石,可以保證Flink集群在某個算子因為某些原因(如 異常退出)出現(xiàn)故障時,能夠?qū)⒄麄€應(yīng)用流圖的狀態(tài)恢復(fù)到故障之前的某一狀態(tài),保 證應(yīng)用流圖狀態(tài)的一致性。Flink的Checkpoint機制原理來自“Chandy-Lamport algorithm”算法(分布式快照)。

二、Checkpoint核心元素Barriers

    Flink分布式快照中的核心元素是 stream barriers。這些barriers將注入到數(shù)據(jù)流中,并與記錄一起作為數(shù)據(jù)流的一部分流動。barriers從不超越記錄,它們嚴(yán)格按照順序排列。barriers將數(shù)據(jù)流中的記錄分為進入當(dāng)前快照的記錄集和進入下一個快照的記錄集。每個barriers都包含快照的ID,快照的記錄已推送到快照的前面。屏障不會中斷流的流動,因此非常輕便。來自不同快照的多個barriers可以同時出現(xiàn)在流中,這意味著各種快照可能同時發(fā)生。

    stream barriers被注入到流數(shù)據(jù)源的并行數(shù)據(jù)流中??煺課的barriers被注入的位置(我們稱之為Sn)是流數(shù)據(jù)源中快照覆蓋數(shù)據(jù)的位置。例如,在Apache Kafka中,這個位置是分區(qū)中最后一條記錄的偏移量。這個位置的Sn被報告給checkpoint coordinator (Flink的JobManager)。

    這些barriers隨后會順流而下。當(dāng)中間操作符從它的所有輸入流接收到快照n的barriers時,它將快照n的barriers發(fā)送到它的所有輸出流。一旦接收操作符(流DAG的末端)從其所有輸入流接收到barrier n,它就向checkpoint coordinator 確認(rèn)快照n。在所有接收確認(rèn)了快照之后,就認(rèn)為完成了快照。

Barriers對齊

    接收多個輸入流的Operators需要在快照屏障上對齊輸入流,下面是Flink官網(wǎng)的Barrier對齊流程圖:


Flink的Checkpoint機制是什么

a.Operators一旦從傳入流中接收到快照barriers n,就無法處理該流中的任何其他記錄,直到它也從其他輸入接收到barriers n為止。否則,它將混合屬于快照n的記錄和屬于快照n + 1的記錄。

b.報告barriers n的流被暫時擱置。從這些流接收的記錄不會被處理,而是放入輸入緩沖區(qū)中。

c.一旦最后一個流接收到barriers n,Operators將發(fā)出所有未決的傳出記錄,然后自身發(fā)出barriers n屏障。

d.之后,它將恢復(fù)處理所有輸入流中的記錄,處理輸入緩沖中的記錄,然后再處理流中的記錄。

    當(dāng)一個operator接收到所有上游發(fā)送的 checkpoint n barrier 向下游發(fā)送之前,會對狀態(tài)進行一次快照,將offset state 等值保存起來,默認(rèn)情況下是保存在JobManager的內(nèi)存中,由于可能會比較大,可以存在狀態(tài)后端中,生成中建議放hdfs;

    Operators在從輸入流接收到所有快照barriers的時間點,以及向輸出流發(fā)出barriers之前,對其狀態(tài)進行快照。屆時,將對進行barriers 之前的記錄進行狀態(tài)的所有更新,并且不應(yīng)用依賴于應(yīng)用barriers 后的記錄進行的任何更新。由于快照的狀態(tài)可能很大,因此將其存儲在可配置state backend中。默認(rèn)情況下,這是JobManager的內(nèi)存,但對于生產(chǎn)用途,應(yīng)配置分布式可靠存儲(例如HDFS)。在存儲狀態(tài)之后,操作員確認(rèn)檢查點,將快照barriers發(fā)送到輸出流中,然后繼續(xù)。

三、Checkpoint設(shè)置

1.代碼中一些相關(guān)配置

    默認(rèn)checkpoint功能是disabled的,想要使用的時候需要先啟用checkpoint開啟之后,默認(rèn)的checkPointMode是Exactly-once。下面是官網(wǎng)一些默認(rèn)配置:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// start a checkpoint every 1000 msenv.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkpoints have to complete within one minute, or are discardedenv.getCheckpointConfig().setCheckpointTimeout(60000);
// make sure 500 ms of progress happen between checkpointsenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// allow only one checkpoint to be in progress at the same timeenv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellationenv.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.env.getCheckpointConfig().setFailOnCheckpointingErrors(true);

1).Checkpoint默認(rèn)是是disabled,通過enableCheckpointing方法來開啟;兩種函數(shù)實現(xiàn):

    enableCheckpointing(long interval),

   enableCheckpointing(long interval, CheckpointingMode mode);

    interval用于指定checkpoint的觸發(fā)間隔(單位milliseconds);

   CheckpointingMode默認(rèn)是CheckpointingMode.EXACTLY_ONCE,也可以指定為CheckpointingMode.AT_LEAST_ONCE。

2).也可以通過setCheckpointingMode方法設(shè)置CheckpointingMode;

3).checkpointTimeout 指定checkpoint執(zhí)行的超時時間(單位milliseconds),超時沒完成就會被abort掉;

4).minPauseBetweenCheckpoints 指定checkpoint coordinator上一個checkpoint完成之后最小等多久可以出發(fā)另一個checkpoint,當(dāng)指定這個參數(shù)時,maxConcurrentCheckpoints的值為1;

5).maxConcurrentCheckpoints 指定運行中的checkpoint最多可以有多少個,當(dāng)4)指定minPauseBetweenCheckpoints后,則其就不起作用了,需要將其設(shè)置為1;

6).enableExternalizedCheckpoints 指定開啟checkpoints的外部持久化,但是在job失敗的時候不會自動清理,需要自己手工清理state;

7).ExternalizedCheckpointCleanup 指定當(dāng)job canceled的時候externalized checkpoint該如何清理,DELETE_ON_CANCELLATION的話,在job canceled的時候會自動刪除externalized state,但是如果是FAILED的狀態(tài)則會保留;RETAIN_ON_CANCELLATION則在job canceled的時候會保留externalized checkpoint state;

8).failOnCheckpointingErrors 指定在checkpoint發(fā)生異常的時候,是否應(yīng)該fail該task,默認(rèn)為true,如果設(shè)置為false,則task會拒絕checkpoint然后繼續(xù)運行;

2.flink-conf.yaml中一些相關(guān)配置

#==============================================================================# Fault tolerance and checkpointing#==============================================================================
# The backend that will be used to store operator state checkpoints if# checkpointing is enabled.## Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the# <class-name-of-factory>.## state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled# state backends.## state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Default target directory for savepoints, optional.## state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Flag to enable/disable incremental checkpoints for backends that# support incremental checkpoints (like the RocksDB state backend). ## state.backend.incremental: false

1).state.backend用于指定checkpoint state存儲后端,默認(rèn)為none,state會保存在taskmanager的內(nèi)存中,checkpoint會存儲在JobManager的內(nèi)存中;

state.backend的值可以是下面幾種:

    jobmanager(MemoryStateBackend), 

    filesystem(FsStateBackend), 

    rocksdb(RocksDBStateBackend)

2).state.backend.async用于指定backend是否使用異步snapshot(默認(rèn)為true),有些不支持async或者只支持async的state backend可能會忽略這個參數(shù);

3).state.backend.fs.memory-threshold,默認(rèn)為1024,用于指定存儲于files的state大小閾值,如果小于該值則會存儲在root checkpoint metadata file;

4).state.backend.incremental,默認(rèn)為false,用于指定是否采用增量checkpoint,有些不支持增量checkpoint的backend會忽略該配置;

5).state.backend.local-recovery,默認(rèn)為false,此選項配置此狀態(tài)后端的本地恢復(fù)。默認(rèn)情況下,本地恢復(fù)是不可用的。本地恢復(fù)目前只有鍵狀態(tài)后端可用。目前,memorystateback不支持本地恢復(fù),并忽略此選項;

6).state.checkpoints.dir,默認(rèn)為none,用于指定checkpoint的data files和meta data存儲的目錄,該目錄必須對所有參與的TaskManagers及JobManagers可見;

7).state.checkpoints.num-retained,默認(rèn)為1,用于指定保留的已完成的checkpoints個數(shù);

8).state.savepoints.dir,默認(rèn)為none,保存點的默認(rèn)目錄。用于將保存點寫入文件系統(tǒng)的狀態(tài)后端(memorystate后端、fsstate后端、rocksdbstate后端);

9).taskmanager.state.local.root-dirs,默認(rèn)為none;配置參數(shù)定義根目錄,用于存儲本地恢復(fù)的基于文件的狀態(tài)。本地恢復(fù)目前只覆蓋鍵狀態(tài)后端。目前,memorystateback不支持本地恢復(fù),并忽略此選項;

感謝各位的閱讀,以上就是“Flink的Checkpoint機制是什么”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對Flink的Checkpoint機制是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI