溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Flink Exactly-Once 投遞的實(shí)現(xiàn)淺析是怎樣的

發(fā)布時(shí)間:2021-11-15 16:07:12 來源:億速云 閱讀:147 作者:柒染 欄目:大數(shù)據(jù)

這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)Flink Exactly-Once 投遞的實(shí)現(xiàn)淺析是怎樣的,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

本文作者:Paul Lin
文章來源:https://www.whitewood.me

隨著近來越來越多的業(yè)務(wù)遷移到 Flink 上,對 Flink 作業(yè)的準(zhǔn)確性要求也隨之進(jìn)一步提高,其中最為關(guān)鍵的是如何在不同業(yè)務(wù)場景下保證 exactly-once 的投遞語義。雖然不少實(shí)時(shí)系統(tǒng)(e.g. 實(shí)時(shí)計(jì)算/消息隊(duì)列)都宣稱支持 exactly-once,exactly-once 投遞似乎是一個(gè)已被解決的問題,但是其實(shí)它們更多是針對內(nèi)部模塊之間的信息投遞,比如 Kafka 生產(chǎn)(producer 到 Kafka broker)和消費(fèi)(broker 到 consumer)的 exactly-once。而 Flink 作為實(shí)時(shí)計(jì)算引擎,在實(shí)際場景業(yè)務(wù)會涉及到很多不同組件,由于組件特性和定位的不同,F(xiàn)link 并不是對所有組件都支持 exactly-once(見[1]),而且不同組件實(shí)現(xiàn) exactly-once 的方法也有所差異,有些實(shí)現(xiàn)或許會帶來副作用或者用法上的局限性,因此深入了解 Flink exactly-once 的實(shí)現(xiàn)機(jī)制對于設(shè)計(jì)穩(wěn)定可靠的架構(gòu)有十分重要的意義。
下文將基于 Flink 詳細(xì)分析 exactly-once 的難點(diǎn)所在以及實(shí)現(xiàn)方案,而這些結(jié)論也可以推廣到其他實(shí)時(shí)系統(tǒng),特別是流式計(jì)算系統(tǒng)。

Exactly-Once 難點(diǎn)分析

由于在分布式系統(tǒng)的進(jìn)程間協(xié)調(diào)需要通過網(wǎng)絡(luò),而網(wǎng)絡(luò)情況在很多情況下是不可預(yù)知的,通常發(fā)送消息要考慮三種情況:  正常返回、錯(cuò)誤返回和超時(shí),其中錯(cuò)誤返回又可以分為可重試錯(cuò)誤返回(e.g. 數(shù)據(jù)庫維護(hù)暫時(shí)不可用)和不可重試錯(cuò)誤返回(e.g. 認(rèn)證錯(cuò)誤),而可重試錯(cuò)誤返回和超時(shí)都會導(dǎo)致重發(fā)消息,導(dǎo)致下游可能接收到重復(fù)的消息,也就是 at-least-once 的投遞語義。  而 exactly-once 是在 at-least-once 的基礎(chǔ)之上加上了可以識別出重發(fā)數(shù)據(jù)或者將消息包裝為為冪等操作的機(jī)制。
其實(shí)消息的 exactly-once 投遞并不是一個(gè)分布式系統(tǒng)產(chǎn)生的新課題(雖然它一般特指分布式領(lǐng)域的 exactly-once),早在計(jì)算網(wǎng)絡(luò)發(fā)展初期的 TCP 協(xié)議已經(jīng)實(shí)現(xiàn)了網(wǎng)絡(luò)的可靠傳輸。  TCP 協(xié)議的 exactly-once 實(shí)現(xiàn)方式是將消息傳遞變?yōu)橛袪顟B(tài)的:  首先同步建立連接,然后發(fā)送的每個(gè)數(shù)據(jù)包加上遞增的序列號(sequence number),發(fā)送完畢后再同步釋放連接。  由于發(fā)送端和接受端都保存了狀態(tài)信息(已發(fā)送數(shù)據(jù)包的序列號/已接收數(shù)據(jù)包的序列號),它們可以知道哪些數(shù)據(jù)包是缺失或重復(fù)的。
而在分布式環(huán)境下 exactly-once 則更為復(fù)雜,最大的不同點(diǎn)在于分布式系統(tǒng)需要容忍進(jìn)程崩潰和節(jié)點(diǎn)丟失,這會帶來許多問題,比如下面常見的幾個(gè):
  • 進(jìn)程狀態(tài)需要持續(xù)化到可靠的分布式存儲,以防止節(jié)點(diǎn)丟失帶來狀態(tài)的丟失。

  • 由于發(fā)送消息是一個(gè)兩階段的操作(即發(fā)送消息和收到對方的確認(rèn)),重啟之后的進(jìn)程沒有辦法判斷崩潰前是否已經(jīng)使用當(dāng)前序列號發(fā)送過消息,因此可能會導(dǎo)致重復(fù)使用序列號的問題。

  • 被認(rèn)為崩潰的進(jìn)程有可能并沒有退出,隨后再次連上來變?yōu)?zombie 進(jìn)程繼續(xù)發(fā)送數(shù)據(jù)。

第2點(diǎn)和第3點(diǎn)其實(shí)是同一個(gè)問題,即需要區(qū)分出原本進(jìn)程和重啟后的進(jìn)程。  對此業(yè)界已經(jīng)有比較成熟的解決方案: 引入 epoch 表示進(jìn)程的不同世代并用分布式協(xié)調(diào)系統(tǒng)來負(fù)責(zé)管理。  雖然還有一些衍生的細(xì)節(jié)問題,但總體來說問題都不大。  但是第1點(diǎn)問題造成了一個(gè)比較深遠(yuǎn)的影響,即為了減低 IO 成本,狀態(tài)的保存必然是微批量(micro-batching)的而不是流式的,這會導(dǎo)致狀態(tài)的保存總是落后于流計(jì)算進(jìn)度,因而為了保證 exactly-once 流計(jì)算引擎需要實(shí)現(xiàn)事務(wù)回滾。

狀態(tài) Exactly-Once 和端到端 Exactly-Once

Flink 提供 exactly-once 的狀態(tài)(state)投遞語義,這為有狀態(tài)的(stateful)計(jì)算提供了準(zhǔn)確性保證。  其中比較容易令人混淆的一點(diǎn)是狀態(tài)投遞語義和更加常見的端到端(end to end)投遞語義,而實(shí)現(xiàn)前者是實(shí)現(xiàn)后者的前置條件。
Flink 從 0.9 版本開始提供 State API,標(biāo)志著 Flink 進(jìn)入了 Stateful Streaming 的時(shí)代。  State API 簡單來說是“不受進(jìn)程重啟影響的“數(shù)據(jù)結(jié)構(gòu),其命名規(guī)范也與常見的數(shù)據(jù)結(jié)構(gòu)一致,比如 MapState、ListState。  Flink 官方提供的算子(比如 KafkaSource)和用戶開發(fā)的算子都可以使用 State API 來保存狀態(tài)信息。  和大多數(shù)分布式系統(tǒng)一樣 Flink 采用快照的方式來將整個(gè)作業(yè)的狀態(tài)定期同步到外部存儲,也就是將 State API 保存的信息以序列化的形式存儲,作業(yè)恢復(fù)的時(shí)候只要讀取外部存儲即可將作業(yè)恢復(fù)到先前某個(gè)時(shí)間點(diǎn)的狀態(tài)。  由于從快照恢復(fù)同時(shí)會回滾數(shù)據(jù)流的處理進(jìn)度,所以 State 是天然的 exactly-once 投遞。
而端到端的一致性則需要上下游的外部系統(tǒng)配合,因?yàn)?Flink 無法將它們的狀態(tài)也保存到快照并獨(dú)立地回滾它們,否則就不叫作外部系統(tǒng)了。  通常來說 Flink 的上游是可以重復(fù)讀取或者消費(fèi)的 pull-based 持續(xù)化存儲,所以要實(shí)現(xiàn) source 端的 exactly-once 只需要回滾 source 的讀取進(jìn)度即可(e.g. Kafka 的 offset)。  而 sink 端的 exactly-once 則比較復(fù)雜,因?yàn)?sink 是 push-based 的。  所謂覆水難收,要撤回發(fā)出去的消息是并不是容易的事情,因?yàn)檫@要求下游根據(jù)消息作出的一系列反應(yīng)都是可撤回的。  這就需要用 State API 來保存已發(fā)出消息的元數(shù)據(jù),記錄哪些數(shù)據(jù)是重啟后需要回滾的。
下面將分析 Flink 是如何實(shí)現(xiàn) exactly-once Sink 的。

Exactly-Once Sink 原理

Flink 的 exactly-once sink 均基于快照機(jī)制,按照實(shí)現(xiàn)原理可以分為冪等(Idempotent) sink 和事務(wù)性(Transactional) sink 兩種。

冪等 Sink

冪等性是分布式領(lǐng)域里十分有用的特性,它意味著相同的操作執(zhí)行一次和執(zhí)行多次可以獲得相同的結(jié)果,因此 at-least-once 自然等同于 exactly-once。  如此一來,在從快照恢復(fù)的時(shí)候冪等 sink 便不需要對外部系統(tǒng)撤回已發(fā)消息,相當(dāng)于回避了外部系統(tǒng)的狀態(tài)回滾問題。  比如寫入 KV 數(shù)據(jù)庫的 sink,由于插入一行的操作是冪等的,因此 sink 可以無狀態(tài)的,在錯(cuò)誤恢復(fù)時(shí)也不需要關(guān)心外部系統(tǒng)的狀態(tài)。  從某種意義來講,上文提到的 TCP 協(xié)議也是利用了發(fā)送數(shù)據(jù)包冪等性來保證 exactly-once。
然而冪等 sink 的適用場景依賴于業(yè)務(wù)邏輯,如果下游業(yè)務(wù)本來就無法保證冪等性,這時(shí)就需要應(yīng)用事務(wù)性 sink。

事務(wù)性 Sink

事務(wù)性 sink 顧名思義類似于傳統(tǒng) DBMS 的事務(wù),將一系列(一般是一個(gè) checkpoint 內(nèi))的所有輸出包裝為一個(gè)邏輯單元,理想的情況下提供 ACID 的事務(wù)保證。  之所以說是“理想的情況下”,主要是因?yàn)?sink 依賴于目標(biāo)輸出系統(tǒng)的事務(wù)保證,而分布式系統(tǒng)對于事務(wù)的支持并不一定很完整,比如 HBase 就不支持跨行事務(wù),再比如 HDFS 等文件系統(tǒng)是不提供事務(wù)的,這種情況下 sink 只可以在客戶端的基礎(chǔ)上再包裝一層來盡最大努力地提供事務(wù)保證。
然而僅有下游系統(tǒng)本身提供的事務(wù)保證對于 exactly-once sink 來說是不夠的,因?yàn)橥粋€(gè) sink 的子任務(wù)(subtask)會有多個(gè),對于下游系統(tǒng)來說它們是處在不同會話和事務(wù)中的,并不能保證操作的原子性,因此 exactly-once sink 還需要實(shí)現(xiàn)分布式事務(wù)來達(dá)到所有 subtask 的一致 commit 或 rollback。  由于 sink 事務(wù)生命周期是與 checkpoint 一一對應(yīng)的,或者說 checkpoint 本來就是實(shí)現(xiàn)作業(yè)狀態(tài)持久化的分布式事務(wù),sink 的分布式事務(wù)也理所當(dāng)然可以通過 checkpoint 機(jī)制提供的 hook 來實(shí)現(xiàn)。
Checkpoint 提供給算子的 hook 有 CheckpointedFunction 和 CheckpointListener 兩個(gè),前者在算子進(jìn)行 checkpoint 快照時(shí)被調(diào)用,后者在 checkpoint 成功后調(diào)用。  為了簡單起見 Flink 結(jié)合上述兩個(gè)接口抽象出 exactly-once sink 的通用邏輯抽象   TwoPhaseCommitSinkFunction   接口,從命名即可看出這是對兩階段提交協(xié)議的一個(gè)實(shí)現(xiàn),其主要方法如下:
  • beginTransaction: 初始化一個(gè)事務(wù)。在有新數(shù)據(jù)到達(dá)并且當(dāng)前事務(wù)為空時(shí)調(diào)用。

  • preCommit: 預(yù)提交數(shù)據(jù),即不再寫入當(dāng)前事務(wù)并準(zhǔn)好提交當(dāng)前事務(wù)。在 sink 算子進(jìn)行快照的時(shí)候調(diào)用。

  • commit: 正式提交數(shù)據(jù),將準(zhǔn)備好的事務(wù)提交。在作業(yè)的 checkpoint 完成時(shí)調(diào)用。

  • abort: 放棄事務(wù)。在作業(yè) checkpoint 失敗的時(shí)候調(diào)用。

下面以 Bucketing File Sink 作為例子來說明如何基于異步 checkpoint 來實(shí)現(xiàn)事務(wù)性 sink。
Bucketing File Sink 是 Flink 提供的一個(gè) FileSystem Connector,用于將數(shù)據(jù)流寫到固定大小的文件里。  Bucketing File Sink 將文件分為三種狀態(tài),in-progress/pending/committed,分別表示正在寫的文件、寫完準(zhǔn)備提交的文件和已經(jīng)提交的文件。
Flink Exactly-Once 投遞的實(shí)現(xiàn)淺析是怎樣的
運(yùn)行時(shí),Bucketing File Sink 首先會打開一個(gè)臨時(shí)文件并不斷地將收到的數(shù)據(jù)寫入(相當(dāng)于事務(wù)的 beginTransaction 步驟),這時(shí)文件處于 in-progress。  直到這個(gè)文件因?yàn)榇笮〕^閾值或者一段時(shí)間內(nèi)沒有新數(shù)據(jù)寫入,這時(shí)文件關(guān)閉并變?yōu)?pending 狀態(tài)(相當(dāng)于事務(wù)的 pre-commit 步驟)。  由于 Flink checkpoint 是異步的,可能有多個(gè)并發(fā)的 checkpoint,Bucketing File Sink 會記錄 pending 文件對應(yīng)的 checkpoint epoch,當(dāng)某個(gè) epoch 的 checkpoint 完成后,Bucketing File Sink 會收到 callback 并將對應(yīng)的文件改為 committed 狀態(tài)。  這是通過原子操作重命名來完成的,因此可以保證 pre-commit 的事務(wù)要么 commit 成功要么 commit 失敗,不會出現(xiàn)其他中間狀態(tài)。
Commit 出現(xiàn)錯(cuò)誤會導(dǎo)致作業(yè)自動重啟,重啟后 Bucketing File Sink 本身已被恢復(fù)為上次 checkpoint 時(shí)的狀態(tài),不過仍需要將文件系統(tǒng)的狀態(tài)也恢復(fù)以保證一致性。  從 checkpoint 恢復(fù)后對應(yīng)的事務(wù)會再次重試 commit,它會將記錄的 pending 文件改為 committed 狀態(tài),記錄的 in-progress 文件 truncate 到 checkpoint 記錄下來的 offset,而其余未被記錄的 pending 文件和 in-progress 文件都將被刪除。
上面主要圍繞事務(wù)保證的 AC 兩點(diǎn)(Atomicity 和 Consistency),而在 I(Isolation)上 Flink exactly-once sink 也有不同的實(shí)現(xiàn)方式。  實(shí)際上由于 Flink 的流計(jì)算特性,當(dāng)前事務(wù)的未 commit 數(shù)據(jù)是一直在積累的,根據(jù)緩存未 commit 數(shù)據(jù)的地方的不同,可以將事務(wù)性 sink 分為兩種實(shí)現(xiàn)方式。
  • 在 sink 端緩存未 commit 數(shù)據(jù),等 checkpoint 完成以后將緩存的數(shù)據(jù) flush 到下游。這種方式可以提供 read-committed 的事務(wù)隔離級別,但同時(shí)由于未 commit 的數(shù)據(jù)不會發(fā)往下游(與 checkpoint 同步),sink 端緩存會帶來一定的延遲,相當(dāng)于退化為與 checkpoint 同步的 micro-batching 模式。

  • 在下游系統(tǒng)緩存未 commit 數(shù)據(jù),等 checkpoint 完成后通知下游 commit。這樣的好處是數(shù)據(jù)是流式發(fā)往下游的,不會在每次 checkpoint 完成后出現(xiàn)網(wǎng)絡(luò) IO 的高峰,并且事務(wù)隔離級別可以由下游設(shè)置,下游可以選擇低延遲弱一致性的 read-uncommitted 或高延遲強(qiáng)一致性的 read-committed。

在 Bucketing File Sink 的例子中,處于 in-progress 和 pending 狀態(tài)的文件默認(rèn)情況下都是隱藏文件(在實(shí)踐中是使用下劃線作為文件名前綴,HDFS 的 FileInputFormat 會將其過濾掉),只有 commit 成功后文件才對用戶是可見的,即提供了 read-committed 的事務(wù)隔離性。理想的情況下 exactly-once sink 都應(yīng)該使用在下游系統(tǒng)緩存未 commit 數(shù)據(jù)的方式,因?yàn)檫@最為符合流式計(jì)算的理念。最為典型的是下游系統(tǒng)本來就支持事務(wù),那么未 commit 的數(shù)據(jù)很自然地就是緩存在下游系統(tǒng)的,否則 sink 可以選擇像上例的 Bucketing File Sink 一樣在下游系統(tǒng)的用戶層面實(shí)現(xiàn)自己的事務(wù),或者 fallback 到等待數(shù)據(jù)變?yōu)?committed 再發(fā)出的 micro-batching 模式。

總結(jié)

Exactly-once 是實(shí)時(shí)系統(tǒng)最為關(guān)鍵的準(zhǔn)確性要求,也是當(dāng)前限制大部分分布式實(shí)時(shí)系統(tǒng)應(yīng)用到準(zhǔn)確性要求更高的業(yè)務(wù)場景(比如在線事務(wù)處理 OLTP)的問題之一。  目前來說流式計(jì)算的 exactly-once 在理論上已經(jīng)有了很大的突破,而 Flink 社區(qū)也在積極汲取最先進(jìn)的思想和實(shí)踐經(jīng)驗(yàn)。  隨著 Flink 在 exactly-once 上的技術(shù)愈發(fā)成熟,結(jié)合 Flink 本身的流處理特性,相信在不遠(yuǎn)的將來,除了構(gòu)造數(shù)據(jù)分析、數(shù)據(jù)管道應(yīng)用, Flink 也可以在微服務(wù)領(lǐng)域占有一席之地。

上述就是小編為大家分享的Flink Exactly-Once 投遞的實(shí)現(xiàn)淺析是怎樣的了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI