您好,登錄后才能下訂單哦!
這篇文章主要講解了“Flink的Checkpoint機制是什么”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Flink的Checkpoint機制是什么”吧!
一、Checkpoint概念
Flink本身為了保證其高可用的特性,以及保證作用的Exactly Once的快速恢復(fù),進而提供了一套強大的Checkpoint機制。
Checkpoint機制是Flink可靠性的基石,可以保證Flink集群在某個算子因為某些原因(如 異常退出)出現(xiàn)故障時,能夠?qū)⒄麄€應(yīng)用流圖的狀態(tài)恢復(fù)到故障之前的某一狀態(tài),保 證應(yīng)用流圖狀態(tài)的一致性。Flink的Checkpoint機制原理來自“Chandy-Lamport algorithm”算法(分布式快照)。
二、Checkpoint核心元素Barriers
Flink分布式快照中的核心元素是 stream barriers。這些barriers將注入到數(shù)據(jù)流中,并與記錄一起作為數(shù)據(jù)流的一部分流動。barriers從不超越記錄,它們嚴(yán)格按照順序排列。barriers將數(shù)據(jù)流中的記錄分為進入當(dāng)前快照的記錄集和進入下一個快照的記錄集。每個barriers都包含快照的ID,快照的記錄已推送到快照的前面。屏障不會中斷流的流動,因此非常輕便。來自不同快照的多個barriers可以同時出現(xiàn)在流中,這意味著各種快照可能同時發(fā)生。
stream barriers被注入到流數(shù)據(jù)源的并行數(shù)據(jù)流中??煺課的barriers被注入的位置(我們稱之為Sn)是流數(shù)據(jù)源中快照覆蓋數(shù)據(jù)的位置。例如,在Apache Kafka中,這個位置是分區(qū)中最后一條記錄的偏移量。這個位置的Sn被報告給checkpoint coordinator (Flink的JobManager)。
這些barriers隨后會順流而下。當(dāng)中間操作符從它的所有輸入流接收到快照n的barriers時,它將快照n的barriers發(fā)送到它的所有輸出流。一旦接收操作符(流DAG的末端)從其所有輸入流接收到barrier n,它就向checkpoint coordinator 確認(rèn)快照n。在所有接收確認(rèn)了快照之后,就認(rèn)為完成了快照。
Barriers對齊
接收多個輸入流的Operators需要在快照屏障上對齊輸入流,下面是Flink官網(wǎng)的Barrier對齊流程圖:
a.Operators一旦從傳入流中接收到快照barriers n,就無法處理該流中的任何其他記錄,直到它也從其他輸入接收到barriers n為止。否則,它將混合屬于快照n的記錄和屬于快照n + 1的記錄。
b.報告barriers n的流被暫時擱置。從這些流接收的記錄不會被處理,而是放入輸入緩沖區(qū)中。
c.一旦最后一個流接收到barriers n,Operators將發(fā)出所有未決的傳出記錄,然后自身發(fā)出barriers n屏障。
d.之后,它將恢復(fù)處理所有輸入流中的記錄,處理輸入緩沖中的記錄,然后再處理流中的記錄。
當(dāng)一個operator接收到所有上游發(fā)送的 checkpoint n barrier 向下游發(fā)送之前,會對狀態(tài)進行一次快照,將offset state 等值保存起來,默認(rèn)情況下是保存在JobManager的內(nèi)存中,由于可能會比較大,可以存在狀態(tài)后端中,生成中建議放hdfs;
Operators在從輸入流接收到所有快照barriers的時間點,以及向輸出流發(fā)出barriers之前,對其狀態(tài)進行快照。屆時,將對進行barriers 之前的記錄進行狀態(tài)的所有更新,并且不應(yīng)用依賴于應(yīng)用barriers 后的記錄進行的任何更新。由于快照的狀態(tài)可能很大,因此將其存儲在可配置state backend中。默認(rèn)情況下,這是JobManager的內(nèi)存,但對于生產(chǎn)用途,應(yīng)配置分布式可靠存儲(例如HDFS)。在存儲狀態(tài)之后,操作員確認(rèn)檢查點,將快照barriers發(fā)送到輸出流中,然后繼續(xù)。
三、Checkpoint設(shè)置
1.代碼中一些相關(guān)配置
默認(rèn)checkpoint功能是disabled的,想要使用的時候需要先啟用checkpoint開啟之后,默認(rèn)的checkPointMode是Exactly-once。下面是官網(wǎng)一些默認(rèn)配置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
1).Checkpoint默認(rèn)是是disabled,通過enableCheckpointing方法來開啟;兩種函數(shù)實現(xiàn):
enableCheckpointing(long interval),
enableCheckpointing(long interval, CheckpointingMode mode);
interval用于指定checkpoint的觸發(fā)間隔(單位milliseconds);
CheckpointingMode默認(rèn)是CheckpointingMode.EXACTLY_ONCE,也可以指定為CheckpointingMode.AT_LEAST_ONCE。
2).也可以通過setCheckpointingMode方法設(shè)置CheckpointingMode;
3).checkpointTimeout 指定checkpoint執(zhí)行的超時時間(單位milliseconds
),超時沒完成就會被abort掉;
4).minPauseBetweenCheckpoints 指定checkpoint coordinator上一個checkpoint完成之后最小等多久可以出發(fā)另一個checkpoint,當(dāng)指定這個參數(shù)時,maxConcurrentCheckpoints的值為1;
5).maxConcurrentCheckpoints 指定運行中的checkpoint最多可以有多少個,當(dāng)4)指定minPauseBetweenCheckpoints后,則其就不起作用了,需要將其設(shè)置為1;
6).enableExternalizedCheckpoints 指定開啟checkpoints的外部持久化,但是在job失敗的時候不會自動清理,需要自己手工清理state;
7).ExternalizedCheckpointCleanup 指定當(dāng)job canceled的時候externalized checkpoint該如何清理,DELETE_ON_CANCELLATION的話,在job canceled的時候會自動刪除externalized state,但是如果是FAILED的狀態(tài)則會保留;RETAIN_ON_CANCELLATION則在job canceled的時候會保留externalized checkpoint state;
8).failOnCheckpointingErrors 指定在checkpoint發(fā)生異常的時候,是否應(yīng)該fail該task,默認(rèn)為true,如果設(shè)置為false,則task會拒絕checkpoint然后繼續(xù)運行;
2.flink-conf.yaml中一些相關(guān)配置
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
# state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#
# state.backend.incremental: false
1).state.backend用于指定checkpoint state存儲后端,默認(rèn)為none,state會保存在taskmanager的內(nèi)存中,checkpoint會存儲在JobManager的內(nèi)存中;
state.backend的值可以是下面幾種:
jobmanager(MemoryStateBackend),
filesystem(FsStateBackend),
rocksdb(RocksDBStateBackend)
2).state.backend.async用于指定backend是否使用異步snapshot(默認(rèn)為true),有些不支持async或者只支持async的state backend可能會忽略這個參數(shù);
3).state.backend.fs.memory-threshold,默認(rèn)為1024,用于指定存儲于files的state大小閾值,如果小于該值則會存儲在root checkpoint metadata file;
4).state.backend.incremental,默認(rèn)為false,用于指定是否采用增量checkpoint,有些不支持增量checkpoint的backend會忽略該配置;
5).state.backend.local-recovery,默認(rèn)為false,此選項配置此狀態(tài)后端的本地恢復(fù)。默認(rèn)情況下,本地恢復(fù)是不可用的。本地恢復(fù)目前只有鍵狀態(tài)后端可用。目前,memorystateback不支持本地恢復(fù),并忽略此選項;
6).state.checkpoints.dir,默認(rèn)為none,用于指定checkpoint的data files和meta data存儲的目錄,該目錄必須對所有參與的TaskManagers及JobManagers可見;
7).state.checkpoints.num-retained,默認(rèn)為1,用于指定保留的已完成的checkpoints個數(shù);
8).state.savepoints.dir,默認(rèn)為none,保存點的默認(rèn)目錄。用于將保存點寫入文件系統(tǒng)的狀態(tài)后端(memorystate后端、fsstate后端、rocksdbstate后端);
9).taskmanager.state.local.root-dirs,默認(rèn)為none;配置參數(shù)定義根目錄,用于存儲本地恢復(fù)的基于文件的狀態(tài)。本地恢復(fù)目前只覆蓋鍵狀態(tài)后端。目前,memorystateback不支持本地恢復(fù),并忽略此選項;
感謝各位的閱讀,以上就是“Flink的Checkpoint機制是什么”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對Flink的Checkpoint機制是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。