溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Flink State管理的示例分析

發(fā)布時(shí)間:2021-12-31 10:29:50 來(lái)源:億速云 閱讀:137 作者:小新 欄目:大數(shù)據(jù)

小編給大家分享一下Flink State管理的示例分析,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!

狀態(tài)使用場(chǎng)景
  • 去重
    比如上游的系統(tǒng)數(shù)據(jù)可能會(huì)有重復(fù),落到下游系統(tǒng)時(shí)希望把重復(fù)的數(shù)據(jù)都去掉。去重需要先了解哪些數(shù)據(jù)來(lái)過(guò),哪些數(shù)據(jù)還沒(méi)有來(lái),也就是把所有的主鍵都記錄下來(lái),當(dāng)一條數(shù)據(jù)到來(lái)后,能夠看到在主鍵當(dāng)中是否存在

  • 窗口計(jì)算
    比如統(tǒng)計(jì)每分鐘 Nginx 日志 API 被訪問(wèn)了多少次。窗口是一分鐘計(jì)算一次,在窗口觸發(fā)前,如 08:00 ~ 08:01 這個(gè)窗口,前59秒的數(shù)據(jù)來(lái)了需要先放入內(nèi)存,即需要把這個(gè)窗口之內(nèi)的數(shù)據(jù)先保留下來(lái),等到 8:01 時(shí)一分鐘后,再將整個(gè)窗口內(nèi)觸發(fā)的數(shù)據(jù)輸出。未觸發(fā)的窗口數(shù)據(jù)也是一種狀態(tài)。

  • 機(jī)器學(xué)習(xí)/深度學(xué)習(xí)
    如訓(xùn)練的模型以及當(dāng)前模型的參數(shù)也是一種狀態(tài),機(jī)器學(xué)習(xí)可能每次都用有一個(gè)數(shù)據(jù)集,需要在數(shù)據(jù)集上進(jìn)行學(xué)習(xí),對(duì)模型進(jìn)行一個(gè)反饋。

  • 訪問(wèn)歷史數(shù)據(jù)
    比如與昨天的數(shù)據(jù)進(jìn)行對(duì)比,需要訪問(wèn)一些歷史數(shù)據(jù)。如果每次從外部去讀,對(duì)資源的消耗可能比較大,所以也希望把這些歷史數(shù)據(jù)也放入狀態(tài)中做對(duì)比。

理想的狀態(tài)管理
  • 易用
    Flink 提供了豐富的數(shù)據(jù)結(jié)構(gòu)、多樣的狀態(tài)組織形式以及簡(jiǎn)潔的擴(kuò)展接口,讓狀態(tài)管理更加易用

  • 高效
    實(shí)時(shí)作業(yè)一般需要更低的延遲,一旦出現(xiàn)故障,恢復(fù)速度也需要更快;當(dāng)處理能力不夠時(shí),可以橫向擴(kuò)展,同時(shí)在處理備份時(shí),不影響作業(yè)本身處理性能;

  • 可靠
    Flink 提供了狀態(tài)持久化,包括不丟不重的語(yǔ)義以及具備自動(dòng)的容錯(cuò)能力,比如 HA,當(dāng)節(jié)點(diǎn)掛掉后會(huì)自動(dòng)拉起,不需要人工介入。

Flink狀態(tài)
Managed State & Raw State
  • 從狀態(tài)管理方式的方式來(lái)說(shuō),Managed State 由 Flink Runtime 管理,自動(dòng)存儲(chǔ),自動(dòng)恢復(fù),在內(nèi)存管理上有優(yōu)化;而 Raw State 需要用戶自己管理,需要自己序列化,F(xiàn)link 不知道 State中存入的數(shù)據(jù)是什么結(jié)構(gòu),只有用戶自己知道,需要最終序列化為可存儲(chǔ)的數(shù)據(jù)結(jié)構(gòu)。

  • 從狀態(tài)數(shù)據(jù)結(jié)構(gòu)來(lái)說(shuō),Managed State 支持已知的數(shù)據(jù)結(jié)構(gòu),如 Value、List、Map 等。而 Raw State只支持字節(jié)數(shù)組,所有狀態(tài)都要轉(zhuǎn)換為二進(jìn)制字節(jié)數(shù)組才可以。

  • 從推薦使用場(chǎng)景來(lái)說(shuō),Managed State 大多數(shù)情況下均可使用,而 Raw State 是當(dāng) Managed State 不夠用時(shí),比如需要自定義Operator時(shí),推薦使用 Raw State。

Keyed State & Operator State
  • Keyed State 只能用在 KeyedStream 的算子中,即在整個(gè)程序中沒(méi)有 keyBy 的過(guò)程就沒(méi)有辦法使用 KeyedStream。

  • Operator State 可以用于所有算子,常用于 Source.由于 Operator State 沒(méi)有 Key,并發(fā)改變時(shí)需要選擇狀態(tài)如何重新分配。其中內(nèi)置了 2 種分配方式:一種是均勻分配,另外一種是將所有 State 合并為全量 State 再分發(fā)給每個(gè)實(shí)例

  • Keyed State 通過(guò) RuntimeContext 訪問(wèn),這需要 Operator 是一個(gè)Rich Function。Operator State 需要自己實(shí)現(xiàn) CheckpointedFunction 或 ListCheckpointed 接口。在數(shù)據(jù)結(jié)構(gòu)上,Keyed State 支持的數(shù)據(jù)結(jié)構(gòu),比如 ValueState、ListState、ReducingState、AggregatingState 和 MapState;而 Operator State 支持的數(shù)據(jù)結(jié)構(gòu)相對(duì)較少,如 ListState。

常用Keyed State
  • ValueState 存儲(chǔ)單個(gè)值,比如 Wordcount,用 Word 當(dāng) Key,State 就是它的 Count。這里面的單個(gè)值可能是數(shù)值或者字符串,作為單個(gè)值,訪問(wèn)接口可能有兩種,get 和 set。在 State 上體現(xiàn)的是 update(T) / T value()。

  • MapState 的狀態(tài)數(shù)據(jù)類型是 Map,在 State 上有 put、remove等。需要注意的是在 MapState 中的 key 和 Keyed state 中的 key 不是同一個(gè)。

  • ListState 狀態(tài)數(shù)據(jù)類型是 List,訪問(wèn)接口如 add、update 等

  • ReducingState 和 AggregatingState 與 ListState 都是同一個(gè)父類,但狀態(tài)數(shù)據(jù)類型上是單個(gè)值,原因在于其中的 add 方法不是把當(dāng)前的元素追加到列表中,而是把當(dāng)前元素直接更新進(jìn)了 Reducing 的結(jié)果中。

  • AggregatingState 的區(qū)別是在訪問(wèn)接口,ReducingState 中 add(T)和 T get() 進(jìn)去和出來(lái)的元素都是同一個(gè)類型,但在 AggregatingState 輸入的 IN,輸出的是 OUT。

狀態(tài)保存和恢復(fù)

Flink 狀態(tài)保存主要依靠 Checkpoint 機(jī)制,Checkpoint 會(huì)定時(shí)制作分布式快照,對(duì)程序中的狀態(tài)進(jìn)行備份。

  • MemoryStateBackend
    Checkpoint 的存儲(chǔ),第一種是內(nèi)存存儲(chǔ),即 MemoryStateBackend,構(gòu)造方法是設(shè)置最大的StateSize,選擇是否做異步快照,這種存儲(chǔ)狀態(tài)本身存儲(chǔ)在 TaskManager 節(jié)點(diǎn)也就是執(zhí)行節(jié)點(diǎn)內(nèi)存中的,因?yàn)閮?nèi)存有容量限制,所以單個(gè) State maxStateSize 默認(rèn) 5 M,且需要注意 maxStateSize <= akka.framesize 默認(rèn) 10 M。Checkpoint 存儲(chǔ)在 JobManager 內(nèi)存中,因此總大小不超過(guò) JobManager 的內(nèi)存。推薦使用的場(chǎng)景為:本地測(cè)試、幾乎無(wú)狀態(tài)的作業(yè),比如 ETL、JobManager 不容易掛,或掛掉影響不大的情況。不推薦在生產(chǎn)場(chǎng)景使用。

  • FsStateBackend
    另一種就是在文件系統(tǒng)上的 FsStateBackend ,構(gòu)建方法是需要傳一個(gè)文件路徑和是否異步快照。State 依然在 TaskManager 內(nèi)存中,但不會(huì)像 MemoryStateBackend 有 5 M 的設(shè)置上限,Checkpoint 存儲(chǔ)在外部文件系統(tǒng)(本地或 HDFS),打破了總大小 Jobmanager 內(nèi)存的限制。容量限制上,單 TaskManager 上 State 總量不超過(guò)它的內(nèi)存,總大小不超過(guò)配置的文件系統(tǒng)容量。推薦使用的場(chǎng)景、常規(guī)使用狀態(tài)的作業(yè)、例如分鐘級(jí)窗口聚合或 join、需要開(kāi)啟HA的作業(yè)。

  • RocksDBStateBackend
    還有一種存儲(chǔ)為 RocksDBStateBackend ,RocksDB 是一個(gè) key/value 的內(nèi)存存儲(chǔ)系統(tǒng),和其他的 key/value 一樣,先將狀態(tài)放到內(nèi)存中,如果內(nèi)存快滿時(shí),則寫入到磁盤中,但需要注意 RocksDB 不支持同步的 Checkpoint,構(gòu)造方法中沒(méi)有同步快照這個(gè)選項(xiàng)。不過(guò) RocksDB 支持增量的 Checkpoint,也是目前唯一增量 Checkpoint 的 Backend,意味著并不需要把所有 sst 文件上傳到 Checkpoint 目錄,僅需要上傳新生成的 sst 文件即可。它的 Checkpoint 存儲(chǔ)在外部文件系統(tǒng)(本地或HDFS),其容量限制只要單個(gè) TaskManager 上 State 總量不超過(guò)它的內(nèi)存+磁盤,單 Key最大 2G,總大小不超過(guò)配置的文件系統(tǒng)容量即可。推薦使用的場(chǎng)景為:超大狀態(tài)的作業(yè),例如天級(jí)窗口聚合、需要開(kāi)啟 HA 的作業(yè)、最好是對(duì)狀態(tài)讀寫性能要求不高的作業(yè)。

看完了這篇文章,相信你對(duì)“Flink State管理的示例分析”有了一定的了解,如果想了解更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI