溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

六、flink--容錯機制

發(fā)布時間:2020-02-28 07:43:37 來源:網絡 閱讀:580 作者:隔壁小白 欄目:大數據

一、flink容錯機制

1.1 flink 的容錯概述

在使用了flink的狀態(tài)管理之后,因為此時所有的state的讀寫都只是在task本地的內存中進行,也就是state數據此時只存儲在內存中。假設當任務出現故障之后,這些在內存中的state數據也會丟失,就無法恢復了。所以需要一種機制來保障這些state數據的不丟失,這也就是容錯機制。flink通過checkpoint來實現。flink開啟了checkpoint之后,會定時將狀態(tài)數據的快照持久存儲到指定的statebackend。

1.2 checkpoint基本原理

? flink定期對整個job任務進行快照,將快照產生的備份數據保存到指定的statebacked中。當出現故障時,將job 的狀態(tài)恢復到最近的一個快照點。Flink 的容錯機制的核心部分是生成分布式數據流和operator狀態(tài)一致的快照。這些快照充當checkpoint(檢查點), 系統(tǒng)可以早發(fā)生故障時將其回滾。分布式快照是由 Chandy-Lamport 算法實現的。
? 每個checkpoint由checkpoint ID和timestamp來唯一標識,其中checkpoint ID可以是standalone(基于內存,保存在jobmanager內存中)的,也可能是基于ZK的。

1.3 使用checkpoint的先決條件

1、持續(xù)的數據源,比如消息隊列或者文件系統(tǒng)上的文件等
2、狀態(tài)數據的持久化存儲,比如采用分布式文件系統(tǒng)存儲狀態(tài)數據

1.4 程序中啟用checkpoint的配置

默認情況下,flink是禁用了checkpoint的。下面看看程序中開啟checkpoint以及相關checkpoint工作參數的配置。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

/* -------------啟用checkpoint   */
//只指定兩個checkpoint的時間間隔,單位是毫秒
env.enableCheckpointing(1000);  
//指定checkpoint時間間隔,并指定checkpoint的模式,是exactly-once(剛好一次)還是AT_LEAST_ONCE(至少一次)。大多數情況下是exactly-once(默認就是這個模式),少數情況下,如果要求超低延遲的處理情況,才會設置AT_LEAST_ONCE
env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE) 

/* -------------設置checkpoint 模式,和上面的類似  */
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

/* -------------設置checkpoint上一個的結束點到下一個開始點之間的最短時間。
因為checkpoint觸發(fā)時,需要一定時間去完成整個checkpoint的過程,
如果checkpoint的完成時間過程,導致前后兩個checkpoint間的時間間隔過短,這是不合適的,沒有必要。
1、這里的時間間隔,指的是上一個checkpoint完成的時間點,到下一個checkpoint開始的時間點的間隔,如果過短,會導致頻繁checkpoint,影響性能。假設這個間隔為T
2、而上面設置的checkpoint時間間隔,指的是前一個checkpoint的開始時間到下一個checkpoint的開始時間。所以是始終大于1中的時間間隔的。假設這個間隔為 N

如果T小于這里設置的值,那么無論N設置多少,下一個checkpoint的開始時間必須是500ms之后。如果T大于這里設置的值,那么正常按照N設置的間隔來觸發(fā)下一個checkpoint,這里設置的間隔無關了。
*/
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

/* -------------設置checkpoint完成的超時時間   */
env.getCheckpointConfig().setCheckpointTimeout(60000);

/* -------------設置checkpoint的最大并行度   */
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

/*  開啟checkpoints的外部持久化,但是在job失敗的時候不會自動清理,需要自己手工清理state
DELETE_ON_CANCELLATION:在job canceled的時候會自動刪除外部的狀態(tài)數據,但是如果是FAILED的狀態(tài)則會保留;
RETAIN_ON_CANCELLATION:在job canceled的時候會保留狀態(tài)數據*/
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

/*  當有更近的保存點時,優(yōu)先采用savepoint來恢復成檢查點*/
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

1.5 checkpoint在flink配置文件中的配置項

在conf/flink-conf.yaml中可以配置的參數

參數 默認值 作用
state.backend (無) 用于存儲狀態(tài)的檢查點數據的后端。有三種backend,如:jobmanager(MemoryStateBackend,默認是這個),filesystem(FsStateBackend),rocksdb(RocksDBStateBackend)。
state.backend.async true 是否異步快照
state.checkpoints.dir checkpoint的目錄,例如:hdfs://192.168.1.1/checkpoint
state.backend.incremental false 是否選擇增量檢查點。即每次快照都只存儲上一個檢查點的差異數據,而不是完整的數據。可能某些后端不支持這種方式
state.checkpoints.num-retained 1 要保留的已完成檢查點的最大數量。
state.savepoints.dir 保存點默認目錄

1.6 checkpoint和savepoint的區(qū)別

1.6.1 區(qū)別

checkpoint:
1、檢查點的主要目的是在job意外失敗時提供恢復機制。
2、Checkpoint的生命周期由Flink管理,即Flink創(chuàng)建,擁有和發(fā)布Checkpoint - 無需用戶交互。
3、作為一種恢復和定期觸發(fā)的方法,Checkpoint主要的設計目標是:創(chuàng)建checkpoint,是輕量級的和盡可能快地恢復

savepoint:
1、Savepoints由用戶創(chuàng)建,擁有和刪除。
2、他們一般是有計劃的進行手動備份和恢復。而checkpoint的恢復只會發(fā)生在故障時
3、例如,在Flink版本需要更新的時候,或者更改你的流處理邏輯,更改并行性等等。
在這種情況下,我們往往需要關閉一下流,這就需要我們將流中的狀態(tài)進行存儲,后面重新部署job的時候進行會用來恢復。
4、從概念上講,Savepoints的生成和恢復成本可能更高,并且更多地關注可移植性和對前面提到的作業(yè)更改的支持

1.6.2 使用savepoint

命令用法:

flink savepoint jobid target_dir

例子:

保存狀態(tài)數據到指定目錄:
flink savepoint xxxxxxxx(哈希碼) hdfs://ronnie01:8020/data/flink/savepoint

重啟和恢復數據流(也可用于從checkpoint恢復數據流):
flink run -s hdfs://ronnie01:8020/data/flink/savepoint/savepoint-xxxxx-xxxxxxxxx -c com.ronnie.flink.stream.test.CheckPointTest flink-test.jar

-s 指定savepoint/checkpoint目錄的存儲目錄
-c 指定運行的主類的全類名

二、stateBackend分類

checkpoint數據可存儲方式有不同,flink支持三種:
MemoryStateBackend(內存狀態(tài))
FsStateBackend(文件狀態(tài))
RocksDBStateBackend(RocksDB狀態(tài))

2.1 MemoryStateBackend

1、概念
MemoryStateBackend將State作為Java對象保存(在堆內存),存儲著key/value狀態(tài)、window運算符、觸發(fā)器等的哈希表。在Checkpoint時,State Backend將對State進行快照,并將其作為checkpoint發(fā)送到JobManager機器上,JobManager將這個State數據存儲在Java堆內存。MemoryStateBackend默認使用異步快照,來避免阻塞管道。如果需要修改,可以在MemoryStateBackend的構造函數將布爾值改為false(僅用于調試)。

2、注意點
異步快照方式時,operator操作符在做快照的同時也會處理新流入的數據,默認異步方式
同步快照方式:operator操作符在做快照的時候,不會處理新流入的數據,同步快照會增加數據處理的延遲度。

3、局限性
單次狀態(tài)大小最大默認被限制為5MB,這個值可以通過構造函數來更改。
無論單次狀態(tài)大小最大被限制為多少,都不可用大過akka的frame大小。
聚合的狀態(tài)都會寫入jobmanager的內存

4、適用場景
本地開發(fā)和調試
狀態(tài)比較少的作業(yè)

2.2 FsStateBackend(生產環(huán)境最常用)

1、概念
FsStateBackend將正在運行的數據保存在TaskManager的內存中。在checkpoint時,它將State的快照寫入文件系統(tǒng)對應的目錄下的文件中。最小元數據存儲在JobManager的內存中(如果是高可用模式下,元數據存儲在checkpoint中)。FsStateBackend默認使用異步快照,來避免阻塞處理的管道。如果需要禁用,在FsStateBackend構造方法中將布爾值設為false

2、適用場景
狀態(tài)比較大, 窗口比較長, 大的 KV 狀態(tài)
需要做 HA 的場景

2.3 RocksDBStateBackend

1、概念
此種方式kv state需要由rockdb數據庫來管理,這是和內存或file backend最大的不同,即狀態(tài)數據是直接寫入到rockdb的,不像前面兩種,只有在checkpoint的時候才會將數據保存到指定的backend。RocksDBStateBackend使用RocksDB數據庫保存數據,這個數據庫保存在TaskManager的數據目錄中。注意的是:RocksDB,它是一個高性能的Key-Value數據庫。數據會放到先內存當中,在一定條件下觸發(fā)寫到磁盤文件上。
在 checkpoint時, 整個 RocksDB數據庫的數據會快照一份, 然后存到配置的文件系統(tǒng)中(一般是 hdfs)。同時, Apache Flink將一些最小的元數據存儲在 JobManager 的內存或者 Zookeeper 中(對于高可用性情況)。RocksD始終配置為執(zhí)行異步快照

2、適用場景
RocksDBStateBackend適用于非常大的狀態(tài),長窗口、大鍵值狀態(tài)的高可用作業(yè)。
RocksDBStateBackend是目前唯一可用于支持有狀態(tài)流處理應用程序的增量檢查點

2.4 使用指定的statebackend

方式1:
直接在 conf/flink-conf.yaml 中指定 state.backend 就是默認程序的backend。

jobmanager(MemoryStateBackend,默認是這個)
filesystem(FsStateBackend)
rocksdb(RocksDBStateBackend)

方式2:
在程序中指定自己想使用的backend

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 設置狀態(tài)后端
env.setStateBackend(xxx);

三種類型的實現類:
// 默認使用內存的方式存儲狀態(tài)值, 單位快照的狀態(tài)上限為10MB, 使用同步方式進行快照。單個狀態(tài)大小可以設置,單位是byte
env.setStateBackend(new MemeoryStateBackend(10*1024*1024, false));

// 使用 FsStateBackend的方式進行存儲, 并且是同步方式進行快照
env.setStateBackend(new FsStateBackend("hdfs://namenode....", false));

// 使用 RocksDBStateBackend方式存儲, 并采用增量的快照方式進行存儲。后面的true表示增量
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode....", true));
向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI