溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

ApacheFlink中如何深度解析FaultTolerance

發(fā)布時間:2021-12-28 14:39:36 來源:億速云 閱讀:140 作者:柒染 欄目:大數(shù)據(jù)

ApacheFlink中如何深度解析FaultTolerance,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。

摘要:實際問題 在流計算場景中,數(shù)據(jù)會源源不斷的流入Apache Flink系統(tǒng),每條數(shù)據(jù)進入Apache Flink系統(tǒng)都會觸發(fā)計算。那么在計算過程中如果網(wǎng)絡(luò)、機器等原因?qū)е耇ask運行失敗了,Apache Flink會如何處理呢?我們介紹了 Apache Flink 會利用State記錄計算的狀態(tài),在Failover時候Task會根據(jù)State進行恢復(fù)。                

在流計算場景中,數(shù)據(jù)會源源不斷的流入Apache Flink系統(tǒng),每條數(shù)據(jù)進入Apache Flink系統(tǒng)都會觸發(fā)計算。那么在計算過程中如果網(wǎng)絡(luò)、機器等原因?qū)е耇ask運行失敗了,Apache Flink會如何處理呢?在 《Apache Flink 漫談系列 - State》一篇中我們介紹了 Apache Flink 會利用State記錄計算的狀態(tài),在Failover時候Task會根據(jù)State進行恢復(fù)。但State的內(nèi)容是如何記錄的?Apache Flink 是如何保證 Exactly-Once 語義的呢?這就涉及到了Apache Flink的 容錯(Fault Tolerance) 機制,本篇將會為大家進行相關(guān)內(nèi)容的介紹。

容錯(Fault Tolerance) 是指容忍故障,在故障發(fā)生時能夠自動檢測出來,并使系統(tǒng)能夠自動恢復(fù)正常運行。當(dāng)出現(xiàn)某些指定的網(wǎng)絡(luò)故障、硬件故障、軟件錯誤時,系統(tǒng)仍能執(zhí)行規(guī)定的一組程序,或者說程序不會因系統(tǒng)中的故障而中止,并且執(zhí)行結(jié)果也不包含系統(tǒng)故障所引起的差錯。

我們知道MySql的binlog是一個Append Only的日志文件,Mysql的主備復(fù)制是高可用的主要方式,binlog是主備復(fù)制的核心手段(當(dāng)然mysql高可用細(xì)節(jié)很復(fù)雜也有多種不同的優(yōu)化點,如 純異步復(fù)制優(yōu)化為半同步和同步復(fù)制以保證異步復(fù)制binlog導(dǎo)致的master和slave的同步時候網(wǎng)絡(luò)壞掉,導(dǎo)致主備不一致問題等)。Mysql主備復(fù)制,是Mysql容錯機制的一部分,在容錯機制之中也包括事物控制,在傳統(tǒng)數(shù)據(jù)庫中事物可以設(shè)置不同的事物級別,以保證不同的數(shù)據(jù)質(zhì)量,級別由低到高 如下:

  • Read uncommitted - 讀未提交,就是一個事務(wù)可以讀取另一個未提交事務(wù)的數(shù)據(jù)。那么這種事物控制成本最低,但是會導(dǎo)致另一個事物讀到臟數(shù)據(jù),那么如何解決讀到臟數(shù)據(jù)的問題呢?利用Read committed 級別...

  • Read committed - 讀提交,就是一個事務(wù)要等另一個事務(wù)提交后才能讀取數(shù)據(jù)。這種級別可以解決讀臟數(shù)據(jù)的問題,那么這種級別有什么問題呢?這個級別還有一個 不能重復(fù)讀的問題,即:開啟一個讀事物T1,先讀取字段F1值是V1,這時候另一個事物T2可以UPDATA這個字段值V2,導(dǎo)致T1再次讀取字段值時候獲得V2了,同一個事物中的兩次讀取不一致了。那么如何解決不可重復(fù)讀的問題呢?利用 Repeatable read 級別...

  • Repeatable read - 重復(fù)讀,就是在開始讀取數(shù)據(jù)(事務(wù)開啟)時,不再允許修改操作。重復(fù)讀模式要有事物順序的等待,需要一定的成本達到高質(zhì)量的數(shù)據(jù)信息,那么重復(fù)讀還會有什么問題嗎?是的,重復(fù)讀級別還有一個問題就是 幻讀,幻讀產(chǎn)生的原因是INSERT,那么幻讀怎么解決呢?利用Serializable級別...

  • Serializable  - 序列化 是最高的事務(wù)隔離級別,在該級別下,事務(wù)串行化順序執(zhí)行,可以避免臟讀、不可重復(fù)讀與幻讀。但是這種事務(wù)隔離級別效率低下,比較耗數(shù)據(jù)庫性能,一般不使用。

主備復(fù)制,事物控制都是傳統(tǒng)數(shù)據(jù)庫容錯的機制。

流計算Fault Tolerance的一個很大的挑戰(zhàn)是低延遲,很多Apache Flink任務(wù)都是7 x 24小時不間斷,端到端的秒級延遲,要想在遇上網(wǎng)絡(luò)閃斷,機器壞掉等非預(yù)期的問題時候快速恢復(fù)正常,并且不影響計算結(jié)果正確性是一件極其困難的事情。同時除了流計算的低延時要求,還有計算模式上面的挑戰(zhàn),在Apache Flink中支持Exactly-Once和At-Least-Once兩種計算模式,如何做到在Failover時候不重復(fù)計算,進而精準(zhǔn)的做到Exactly-Once也是流計算Fault Tolerance要重點解決的問題。

Apache Flink的Fault Tolerance機制核心是持續(xù)創(chuàng)建分布式流數(shù)據(jù)及其狀態(tài)的快照。這些快照在系統(tǒng)遇到故障時,作為一個回退點。Apache Flink中創(chuàng)建快照的機制叫做Checkpointing,Checkpointing的理論基礎(chǔ) Stephan 在 Lightweight Asynchronous Snapshots for Distributed Dataflows 進行了細(xì)節(jié)描述,該機制源于由K. MANI CHANDY和LESLIE LAMPORT 發(fā)表的 Determining-Global-States-of-a-Distributed-System Paper,該Paper描述了在分布式系統(tǒng)如何解決全局狀態(tài)一致性問題。

在Apache Flink中以Checkpointing的機制進行容錯,Checkpointing會產(chǎn)生類似binlog一樣的、可以用來恢復(fù)任務(wù)狀態(tài)的數(shù)據(jù)文件。Apache Flink中也有類似于數(shù)據(jù)庫事物控制一樣的數(shù)據(jù)計算語義控制,比如:At-Least-Once和Exactly-Once。

上面我們說Checkpointing是Apache Flink中Fault Tolerance的核心機制,我們以Checkpointing的方式創(chuàng)建包含timer,connector,window,user-defined state 等stateful Operator的快照。在Determining-Global-States-of-a-Distributed-System的全局狀態(tài)一致性算法中重點描述了全局狀態(tài)的對齊問題,在Lightweight Asynchronous Snapshots for Distributed Dataflows中核心描述了對齊的方式,在Apache Flink中采用以在流信息中插入barrier的方式完成DAG中異步快照。 如下圖(from Lightweight Asynchronous Snapshots for Distributed Dataflows)描述了Asynchronous barrier snapshots for acyclic graphs,也是Apache Flink中采用的方式。

ApacheFlink中如何深度解析FaultTolerance

上圖描述的是一個增量計算word count的Job邏輯,核心邏輯是如下幾點:

  • barrier 由source節(jié)點發(fā)出;

  • barrier會將流上event切分到不同的checkpoint中;

  • 匯聚到當(dāng)前節(jié)點的多流的barrier要對齊;

  • barrier對齊之后會進行Checkpointing,生成snapshot;

  • 完成snapshot之后向下游發(fā)出barrier,繼續(xù)直到Sink節(jié)點;

這樣在整個流計算中以barrier方式進行Checkpointing,隨著時間的推移,整個流的計算過程中按時間順序不斷的進行Checkpointing,如下圖:

ApacheFlink中如何深度解析FaultTolerance

生成的snapshot會存儲到StateBackend中,相關(guān)State的介紹可以查閱 《Apache Flink 漫談系列 - State》。這樣在進行Failover時候,從最后一次成功的checkpoint進行恢復(fù)。

上面我們了解到整個流上面我們會隨這時間推移不斷的做Checkpointing,不斷的產(chǎn)生snapshot存儲到Statebackend中,那么多久進行一次Checkpointing?對產(chǎn)生的snapshot如何持久化的呢?帶著這些疑問,我們看看Apache Flink對于Checkpointing如何控制的?有哪些可配置的參數(shù):(這些參數(shù)都在 CheckpointCoordinator 中進行定義)

  • checkpointMode - 檢查點模式,分為 AT_LEAST_ONCE 和 EXACTLY_ONCE 兩種模式;

  • checkpointInterval - 檢查點時間間隔,單位是毫秒。

  • checkpointTimeout - 檢查點超時時間,單位毫秒。

在Apache Flink中還有一些其他配置,比如:是否將存儲到外部存儲的checkpoints數(shù)據(jù)刪除,如果不刪除,即使job被cancel掉,checkpoint信息也不會刪除,當(dāng)恢復(fù)job時候可以利用checkpoint進行狀態(tài)恢復(fù)。我們有兩種配置方式,如下:

  • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION - 當(dāng)job被cancel時候,外部存儲的checkpoints不會刪除。

  • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION - 當(dāng)job被cancel時候,外部存儲的checkpoints會被刪除。

通過上面內(nèi)容我們了解了Apache Flink中Exactly-Once和At-Least-Once只是在進行checkpointing時候的配置模式,兩種模式下進行checkpointing的原理是一致的,那么在實現(xiàn)上有什么本質(zhì)區(qū)別呢?

語義

  • At-Least-Once - 語義是流上所有數(shù)據(jù)至少被處理過一次(不要丟數(shù)據(jù))

  • Exactly-Once - 語義是流上所有數(shù)據(jù)必須被處理且只能處理一次(不丟數(shù)據(jù),且不能重復(fù))

從語義上面Exactly-Once 比 At-Least-Once對數(shù)據(jù)處理的要求更嚴(yán)格,更精準(zhǔn),那么更高的要求就意味著更高的代價,這里的代價就是 延遲。

實現(xiàn)

那在實現(xiàn)上面Apache Flink中At-Least-Once 和 Exactly-Once有什么區(qū)別呢?區(qū)別體現(xiàn)在多路輸入的時候(比如 Join),當(dāng)所有輸入的barrier沒有完全到來的時候,早到來的event在Exactly-Once模式下會進行緩存(不進行處理),而在At-Least-Once模式下即使所有輸入的barrier沒有完全到來,早到來的event也會進行處理。也就是說對于At-Least-Once模式下,對于下游節(jié)點而言,本來數(shù)據(jù)屬于checkpoint N 的數(shù)據(jù)在checkpoint N-1 里面也可能處理過了。

我以Exactly-Once為例說明Exactly-Once模式相對于At-Least-Once模式為啥會有更高的延時?如下圖:

ApacheFlink中如何深度解析FaultTolerance

上圖示意了某個節(jié)點進行Checkpointing的過程:

  • 當(dāng)Operator接收到某個上游發(fā)下來的第barrier時候開始進行barrier的對齊階段;

  • 在進行對齊期間早到的input的數(shù)據(jù)會被緩存到buffer中;

  • 當(dāng)Operator接收到上游所有barrier的時候,當(dāng)前Operator會進行Checkpointing,生成snapshot并持久化;

  • 當(dāng)完Checkpointing時候?qū)arrier廣播給下游Operator;

多路輸入的barrier沒有對齊的時候,barrier先到的輸入數(shù)據(jù)會緩存在buffer中,不進行處理,這樣對于下游而言buffer的數(shù)據(jù)越多就有更大的延遲。這個延時帶來的好處就是相鄰Checkpointing所記錄的數(shù)據(jù)(計算結(jié)果或event)沒有重復(fù)。相對At-Least-Once模式數(shù)據(jù)不會被buffer,減少延時的利好是以容忍數(shù)據(jù)重復(fù)計算為代價的。

在Apache Flink的代碼實現(xiàn)上用CheckpointBarrierHandler類處理barrier,其核心接口是:

public interface CheckpointBarrierHandler {
    ...
   
    BufferOrEvent getNextNonBlocked() throws Exception;
    ...
}

其中BufferOrEvent,可能是正常的data event,也可能是特殊的event,比如barrier event。對應(yīng)At-Least-Once和Exactly-Once有兩種不同的實現(xiàn),具體如下: 


  • Exactly-Once模式 - BarrierBuffer


    BarrierBuffer用于提供Exactly-Once一致性保證,其行為是:它將以barrier阻塞輸入直到所有的輸入都接收到基于某個檢查點的barrier,也就是上面所說的對齊。為了避免背壓輸入流,BarrierBuffer將從被阻塞的channel中持續(xù)地接收buffer并在內(nèi)部存儲它們,直到阻塞被解除。


BarrierBuffer 實現(xiàn)了CheckpointBarrierHandler的getNextNonBlocked, 該方法用于獲取待處理的下一條記錄。該方法是阻塞調(diào)用,直到獲取到下一個記錄。其中這里的記錄包括兩種,一種是來自于上游未被標(biāo)記為blocked的輸入,比如上圖中的 event(a),;另一種是,從已blocked輸入中緩沖區(qū)隊列中被釋放的記錄,比如上圖中的event(1,2,3,4)。

  • At-Least-Once模式 - BarrierTracker
    BarrierTracker會對各個輸入接收到的檢查點的barrier進行跟蹤。一旦它觀察到某個檢查點的所有barrier都已經(jīng)到達,它將會通知監(jiān)聽器檢查點已完成,以觸發(fā)相應(yīng)地回調(diào)處理。不像BarrierBuffer的處理邏輯,BarrierTracker不阻塞已經(jīng)發(fā)送了barrier的輸入,也就說明不采用對齊機制,因此本檢查點的數(shù)據(jù)會及時被處理,并且因此下一個檢查點的數(shù)據(jù)可能會在該檢查點還沒有完成時就已經(jīng)到來。這樣在恢復(fù)時只能提供At-Least-Once的語義保證。

BarrierTracker也實現(xiàn)了CheckpointBarrierHandler的getNextNonBlocked, 該方法用于獲取待處理的下一條記錄。與BarrierBuffer相比它實現(xiàn)很簡單,只是阻塞的獲取要處理的event。

如上兩個CheckpointBarrierHandler實現(xiàn)的核心區(qū)別是BarrierBuffer會維護多路輸入是否要blocked,緩存被blocked的輸入的record。所謂有得必有失,有失必有得,舍得舍得在這里也略有體現(xiàn)哈 :)。

在 《Apache Flink 漫談系列 - State》中我們有過對Apache Flink存儲到State中的內(nèi)容做過介紹,比如在connector會利用OperatorState記錄讀取位置的offset,那么一個完整的Apache Flink任務(wù)的執(zhí)行圖是一個DAG,上面我們描述了DAG中一個節(jié)點的過程,那么整體來看Checkpointing的過程是怎樣的呢?在產(chǎn)生checkpoint并分布式持久到HDFS的過程是怎樣的呢?

整體Checkpointing流程

ApacheFlink中如何深度解析FaultTolerance

上圖我們看到一個完整的Apache Flink Job進行Checkpointing的過程,JM觸發(fā)Soruce發(fā)射barriers,當(dāng)某個Operator接收到上游發(fā)下來的barrier,開始進行barrier的處理,整體根據(jù)DAG自上而下的逐個節(jié)點進行Checkpointing,并持久化到Statebackend,一直到DAG的sink節(jié)點。

Incremental Checkpointing

對于一個流計算的任務(wù),數(shù)據(jù)會源源不斷的流入,比如要進行雙流join(Apache Flink 漫談系列 - Join 篇會詳細(xì)介紹),由于兩邊的流event的到來有先后順序問題,我們必須將left和right的數(shù)據(jù)都會在state中進行存儲,Left event流入會在Right的State中進行join數(shù)據(jù),Right event流入會在Left的State中進行join數(shù)據(jù),如下圖左右兩邊的數(shù)據(jù)都會持久化到State中:
ApacheFlink中如何深度解析FaultTolerance

由于流上數(shù)據(jù)源源不斷,隨著時間的增加,每次checkpoint產(chǎn)生的snapshot的文件(RocksDB的sst文件)會變的非常龐大,增加網(wǎng)絡(luò)IO,拉長checkpoint時間,最終導(dǎo)致無法完成checkpoint,進而導(dǎo)致Apache Flink失去Failover的能力。為了解決checkpoint不斷變大的問題,Apache Flink內(nèi)部實現(xiàn)了Incremental Checkpointing,這種增量進行checkpoint的機制,會大大減少checkpoint時間,并且如果業(yè)務(wù)數(shù)據(jù)穩(wěn)定的情況下每次checkpoint的時間是相對穩(wěn)定的,根據(jù)不同的業(yè)務(wù)需求設(shè)定checkpoint的interval,穩(wěn)定快速的進行Checkpointing,保障Apache Flink任務(wù)在遇到故障時候可以順利的進行Failover。Incremental Checkpointing的優(yōu)化對于Apache Flink成百上千的任務(wù)節(jié)點帶來的利好不言而喻。

根據(jù)上面的介紹我們知道Apache Flink內(nèi)部支持Exactly-Once語義,要想達到端到端(Soruce到Sink)的Exactly-Once,需要Apache Flink外部Soruce和Sink的支持,具體如下:

  • 外部Source的容錯要求
    Apache Flink 要做到 End-to-End 的 Exactly-Once 需要外部Source的支持,比如上面我們說過 Apache Flink的Checkpointing機制會在Source節(jié)點記錄讀取的Position,那就需要外部Source提供讀取數(shù)據(jù)的Position和支持根據(jù)Position進行數(shù)據(jù)讀取。

  • 外部Sink的容錯要求
    Apache Flink 要做到 End-to-End 的 Exactly-Once相對比較困難,以Kafka作為Sink為例,當(dāng)Sink Operator節(jié)點宕機時候,根據(jù)Apache Flink 內(nèi)部Exactly-Once模式的容錯保證, 系統(tǒng)會回滾到上次成功的Checkpoint繼續(xù)寫入,但是上次成功checkpoint之后當(dāng)前checkpoint未完成之前已經(jīng)把一部分新數(shù)據(jù)寫入到kafka了. Apache Flink自上次成功的checkpoint繼續(xù)寫入kafka,就造成了kafka再次接收到一份同樣的來自Sink Operator的數(shù)據(jù),進而破壞了End-to-End 的 Exactly-Once 語義(重復(fù)寫入就變成了At-Least-Once了),如果要解決這一問題,Apache Flink 利用Two Phase Commit(兩階段提交)的方式來進行處理。本質(zhì)上是Sink Operator 需要感知整體Checkpoint的完成,并在整體Checkpoint完成時候?qū)⒂嬎憬Y(jié)果寫入Kafka。

看完上述內(nèi)容,你們掌握ApacheFlink中如何深度解析FaultTolerance的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI