溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Apache Flink結(jié)合Kafka構(gòu)建端到端的Exactly-Once處理

發(fā)布時(shí)間:2020-07-16 12:54:10 來(lái)源:網(wǎng)絡(luò) 閱讀:892 作者:Ververica 欄目:大數(shù)據(jù)
作者:Piotr Nowojski
翻譯| 周凱波
阿里巴巴技術(shù)專家,四川大學(xué)碩士,2010年畢業(yè)后加入阿里搜索事業(yè)部,從事搜索離線平臺(tái)的研發(fā)工作,參與將搜索后臺(tái)數(shù)據(jù)處理架構(gòu)從MapReduce到Flink的重構(gòu)。目前在阿里計(jì)算平臺(tái)事業(yè)部,專注于基于Flink的一站式計(jì)算平臺(tái)的建設(shè)。


目錄:


1.Apache Flink應(yīng)用程序中的Exactly-Once語(yǔ)義

2.Flink應(yīng)用程序端到端的Exactly-Once語(yǔ)義

3.示例Flink應(yīng)用程序啟動(dòng)預(yù)提交階段

4.在Flink中實(shí)現(xiàn)兩階段提交Operator

5.總結(jié)


Apache Flink自2017年12月發(fā)布的1.4.0版本開始,為流計(jì)算引入了一個(gè)重要的里程碑特性:TwoPhaseCommitSinkFunction(相關(guān)的Jira)。它提取了兩階段提交協(xié)議的通用邏輯,使得通過(guò)Flink來(lái)構(gòu)建端到端的Exactly-Once程序成為可能。同時(shí)支持一些數(shù)據(jù)源(source)和輸出端(sink),包括Apache Kafka 0.11及更高版本。它提供了一個(gè)抽象層,用戶只需要實(shí)現(xiàn)少數(shù)方法就能實(shí)現(xiàn)端到端的Exactly-Once語(yǔ)義。


有關(guān)TwoPhaseCommitSinkFunction的使用詳見文檔:?TwoPhaseCommitSinkFunction?;蛘呖梢灾苯娱喿xKafka 0.11 sink的文檔:?kafka。


接下來(lái)會(huì)詳細(xì)分析這個(gè)新功能以及Flink的實(shí)現(xiàn)邏輯,分為如下幾點(diǎn)。


  • 描述Flink checkpoint機(jī)制是如何保證Flink程序結(jié)果的Exactly-Once的

  • 顯示Flink如何通過(guò)兩階段提交協(xié)議與數(shù)據(jù)源和數(shù)據(jù)輸出端交互,以提供端到端的Exactly-Once保證

  • 通過(guò)一個(gè)簡(jiǎn)單的示例,了解如何使用TwoPhaseCommitSinkFunction實(shí)現(xiàn)Exactly-Once的文件輸出


一、Apache Flink應(yīng)用程序中的Exactly-Once語(yǔ)義


當(dāng)我們說(shuō)『Exactly-Once』時(shí),指的是每個(gè)輸入的事件只影響最終結(jié)果一次。即使機(jī)器或軟件出現(xiàn)故障,既沒有重復(fù)數(shù)據(jù),也不會(huì)丟數(shù)據(jù)。


Flink很久之前就提供了Exactly-Once語(yǔ)義。在過(guò)去幾年中,我們對(duì)Flink的checkpoint機(jī)制有過(guò)深入的描述,這是Flink有能力提供Exactly-Once語(yǔ)義的核心。Flink文檔還提供了該功能的全面概述。


在繼續(xù)之前,先看下對(duì)checkpoint機(jī)制的簡(jiǎn)要介紹,這對(duì)理解后面的主題至關(guān)重要。


一次checkpoint是以下內(nèi)容的一致性快照:

  • 應(yīng)用程序的當(dāng)前狀態(tài)

  • 輸入流的位置


Flink可以配置一個(gè)固定的時(shí)間點(diǎn),定期產(chǎn)生checkpoint,將checkpoint的數(shù)據(jù)寫入持久存儲(chǔ)系統(tǒng),例如S3或HDFS。將checkpoint數(shù)據(jù)寫入持久存儲(chǔ)是異步發(fā)生的,這意味著Flink應(yīng)用程序在checkpoint過(guò)程中可以繼續(xù)處理數(shù)據(jù)。


如果發(fā)生機(jī)器或軟件故障,重新啟動(dòng)后,F(xiàn)link應(yīng)用程序?qū)淖钚碌腸heckpoint點(diǎn)恢復(fù)處理; Flink會(huì)恢復(fù)應(yīng)用程序狀態(tài),將輸入流回滾到上次checkpoint保存的位置,然后重新開始運(yùn)行。這意味著Flink可以像從未發(fā)生過(guò)故障一樣計(jì)算結(jié)果。


在Flink 1.4.0之前,Exactly-Once語(yǔ)義僅限于Flink應(yīng)用程序內(nèi)部,并沒有擴(kuò)展到Flink數(shù)據(jù)處理完后發(fā)送的大多數(shù)外部系統(tǒng)。Flink應(yīng)用程序與各種數(shù)據(jù)輸出端進(jìn)行交互,開發(fā)人員需要有能力自己維護(hù)組件的上下文來(lái)保證Exactly-Once語(yǔ)義。


為了提供端到端的Exactly-Once語(yǔ)義 – 也就是說(shuō),除了Flink應(yīng)用程序內(nèi)部,F(xiàn)link寫入的外部系統(tǒng)也需要能滿足Exactly-Once語(yǔ)義 – 這些外部系統(tǒng)必須提供提交或回滾的方法,然后通過(guò)Flink的checkpoint機(jī)制來(lái)協(xié)調(diào)。


分布式系統(tǒng)中,協(xié)調(diào)提交和回滾的常用方法是兩階段提交協(xié)議。在下一節(jié)中,我們將討論Flink的TwoPhaseCommitSinkFunction是如何利用兩階段提交協(xié)議來(lái)提供端到端的Exactly-Once語(yǔ)義。


二、Flink應(yīng)用程序端到端的Exactly-Once語(yǔ)義


我們將介紹兩階段提交協(xié)議,以及它如何在一個(gè)讀寫Kafka的Flink程序中實(shí)現(xiàn)端到端的Exactly-Once語(yǔ)義。Kafka是一個(gè)流行的消息中間件,經(jīng)常與Flink一起使用。Kafka在最近的0.11版本中添加了對(duì)事務(wù)的支持。這意味著現(xiàn)在通過(guò)Flink讀寫Kafaka,并提供端到端的Exactly-Once語(yǔ)義有了必要的支持。


Flink對(duì)端到端的Exactly-Once語(yǔ)義的支持不僅局限于Kafka,您可以將它與任何一個(gè)提供了必要的協(xié)調(diào)機(jī)制的源/輸出端一起使用。例如Pravega,來(lái)自DELL/EMC的開源流媒體存儲(chǔ)系統(tǒng),通過(guò)Flink的TwoPhaseCommitSinkFunction也能支持端到端的Exactly-Once語(yǔ)義。


Apache Flink結(jié)合Kafka構(gòu)建端到端的Exactly-Once處理


在今天討論的這個(gè)示例程序中,我們有:


– 從Kafka讀取的數(shù)據(jù)源(Flink內(nèi)置的KafkaConsumer)

– 窗口聚合

– 將數(shù)據(jù)寫回Kafka的數(shù)據(jù)輸出端(Flink內(nèi)置的KafkaProducer)


要使數(shù)據(jù)輸出端提供Exactly-Once保證,它必須將所有數(shù)據(jù)通過(guò)一個(gè)事務(wù)提交給Kafka。提交捆綁了兩個(gè)checkpoint之間的所有要寫入的數(shù)據(jù)。這可確保在發(fā)生故障時(shí)能回滾寫入的數(shù)據(jù)。但是在分布式系統(tǒng)中,通常會(huì)有多個(gè)并發(fā)運(yùn)行的寫入任務(wù)的,簡(jiǎn)單的提交或回滾是不夠的,因?yàn)樗薪M件必須在提交或回滾時(shí)“一致”才能確保一致的結(jié)果。Flink使用兩階段提交協(xié)議及預(yù)提交階段來(lái)解決這個(gè)問題。


在checkpoint開始的時(shí)候,即兩階段提交協(xié)議的“預(yù)提交”階段。當(dāng)checkpoint開始時(shí),F(xiàn)link的JobManager會(huì)將checkpoint barrier(將數(shù)據(jù)流中的記錄分為進(jìn)入當(dāng)前checkpoint與進(jìn)入下一個(gè)checkpoint)注入數(shù)據(jù)流。


brarrier在operator之間傳遞。對(duì)于每一個(gè)operator,它觸發(fā)operator的狀態(tài)快照寫入到state backend。


Apache Flink結(jié)合Kafka構(gòu)建端到端的Exactly-Once處理


數(shù)據(jù)源保存了消費(fèi)Kafka的偏移量(offset),之后將checkpoint barrier傳遞給下一個(gè)operator。


這種方式僅適用于operator具有『內(nèi)部』狀態(tài)。所謂內(nèi)部狀態(tài),是指Flink state backend保存和管理的 -例如,第二個(gè)operator中window聚合算出來(lái)的sum值。當(dāng)一個(gè)進(jìn)程有它的內(nèi)部狀態(tài)的時(shí)候,除了在checkpoint之前需要將數(shù)據(jù)變更寫入到state backend,不需要在預(yù)提交階段執(zhí)行任何其他操作。Flink負(fù)責(zé)在checkpoint成功的情況下正確提交這些寫入,或者在出現(xiàn)故障時(shí)中止這些寫入。


Apache Flink結(jié)合Kafka構(gòu)建端到端的Exactly-Once處理


三、示例Flink應(yīng)用程序啟動(dòng)預(yù)提交階段


但是,當(dāng)進(jìn)程具有『外部』狀態(tài)時(shí),需要作些額外的處理。外部狀態(tài)通常以寫入外部系統(tǒng)(如Kafka)的形式出現(xiàn)。在這種情況下,為了提供Exactly-Once保證,外部系統(tǒng)必須支持事務(wù),這樣才能和兩階段提交協(xié)議集成。


在本文示例中的數(shù)據(jù)需要寫入Kafka,因此數(shù)據(jù)輸出端(Data Sink)有外部狀態(tài)。在這種情況下,在預(yù)提交階段,除了將其狀態(tài)寫入state backend之外,數(shù)據(jù)輸出端還必須預(yù)先提交其外部事務(wù)。


Apache Flink結(jié)合Kafka構(gòu)建端到端的Exactly-Once處理

當(dāng)checkpoint barrier在所有operator都傳遞了一遍,并且觸發(fā)的checkpoint回調(diào)成功完成時(shí),預(yù)提交階段就結(jié)束了。所有觸發(fā)的狀態(tài)快照都被視為該checkpoint的一部分。checkpoint是整個(gè)應(yīng)用程序狀態(tài)的快照,包括預(yù)先提交的外部狀態(tài)。如果發(fā)生故障,我們可以回滾到上次成功完成快照的時(shí)間點(diǎn)。


下一步是通知所有operator,checkpoint已經(jīng)成功了。這是兩階段提交協(xié)議的提交階段,JobManager為應(yīng)用程序中的每個(gè)operator發(fā)出checkpoint已完成的回調(diào)。


數(shù)據(jù)源和widnow operator沒有外部狀態(tài),因此在提交階段,這些operator不必執(zhí)行任何操作。但是,數(shù)據(jù)輸出端(Data Sink)擁有外部狀態(tài),此時(shí)應(yīng)該提交外部事務(wù)。


Apache Flink結(jié)合Kafka構(gòu)建端到端的Exactly-Once處理



我們對(duì)上述知識(shí)點(diǎn)總結(jié)下:


– 一旦所有operator完成預(yù)提交,就提交一個(gè)commit。

– 如果至少有一個(gè)預(yù)提交失敗,則所有其他提交都將中止,我們將回滾到上一個(gè)成功完成的checkpoint。

– 在預(yù)提交成功之后,提交的commit需要保證最終成功 – operator和外部系統(tǒng)都需要保障這點(diǎn)。如果commit失?。ɡ?,由于間歇性網(wǎng)絡(luò)問題),整個(gè)Flink應(yīng)用程序?qū)⑹?,?yīng)用程序?qū)⒏鶕?jù)用戶的重啟策略重新啟動(dòng),還會(huì)嘗試再提交。這個(gè)過(guò)程至關(guān)重要,因?yàn)槿绻鹀ommit最終沒有成功,將會(huì)導(dǎo)致數(shù)據(jù)丟失。


因此,我們可以確定所有operator都同意checkpoint的最終結(jié)果:所有operator都同意數(shù)據(jù)已提交,或提交被中止并回滾。


四、在Flink中實(shí)現(xiàn)兩階段提交Operator


完整的實(shí)現(xiàn)兩階段提交協(xié)議可能有點(diǎn)復(fù)雜,這就是為什么Flink將它的通用邏輯提取到抽象類TwoPhaseCommitSinkFunction中的原因。


接下來(lái)基于輸出到文件的簡(jiǎn)單示例,說(shuō)明如何使用TwoPhaseCommitSinkFunction。用戶只需要實(shí)現(xiàn)四個(gè)函數(shù),就能為數(shù)據(jù)輸出端實(shí)現(xiàn)Exactly-Once語(yǔ)義:


– beginTransaction – 在事務(wù)開始前,我們?cè)谀繕?biāo)文件系統(tǒng)的臨時(shí)目錄中創(chuàng)建一個(gè)臨時(shí)文件。隨后,我們可以在處理數(shù)據(jù)時(shí)將數(shù)據(jù)寫入此文件。

– preCommit – 在預(yù)提交階段,我們刷新文件到存儲(chǔ),關(guān)閉文件,不再重新寫入。我們還將為屬于下一個(gè)checkpoint的任何后續(xù)文件寫入啟動(dòng)一個(gè)新的事務(wù)。

– commit – 在提交階段,我們將預(yù)提交階段的文件原子地移動(dòng)到真正的目標(biāo)目錄。需要注意的是,這會(huì)增加輸出數(shù)據(jù)可見性的延遲。

– abort – 在中止階段,我們刪除臨時(shí)文件。


我們知道,如果發(fā)生任何故障,F(xiàn)link會(huì)將應(yīng)用程序的狀態(tài)恢復(fù)到最新的一次checkpoint點(diǎn)。一種極端的情況是,預(yù)提交成功了,但在這次commit的通知到達(dá)operator之前發(fā)生了故障。在這種情況下,F(xiàn)link會(huì)將operator的狀態(tài)恢復(fù)到已經(jīng)預(yù)提交,但尚未真正提交的狀態(tài)。


我們需要在預(yù)提交階段保存足夠多的信息到checkpoint狀態(tài)中,以便在重啟后能正確的中止或提交事務(wù)。在這個(gè)例子中,這些信息是臨時(shí)文件和目標(biāo)目錄的路徑。


TwoPhaseCommitSinkFunction已經(jīng)把這種情況考慮在內(nèi)了,并且在從checkpoint點(diǎn)恢復(fù)狀態(tài)時(shí),會(huì)優(yōu)先發(fā)出一個(gè)commit。我們需要以冪等方式實(shí)現(xiàn)提交,一般來(lái)說(shuō),這并不難。在這個(gè)示例中,我們可以識(shí)別出這樣的情況:臨時(shí)文件不在臨時(shí)目錄中,但已經(jīng)移動(dòng)到目標(biāo)目錄了。


在TwoPhaseCommitSinkFunction中,還有一些其他邊界情況也會(huì)考慮在內(nèi),請(qǐng)參考Flink文檔了解更多信息。


總結(jié)


總結(jié)下本文涉及的一些要點(diǎn):


  • Flink的checkpoint機(jī)制是支持兩階段提交協(xié)議并提供端到端的Exactly-Once語(yǔ)義的基礎(chǔ)。

  • 這個(gè)方案的優(yōu)點(diǎn)是: Flink不像其他一些系統(tǒng)那樣,通過(guò)網(wǎng)絡(luò)傳輸存儲(chǔ)數(shù)據(jù) – 不需要像大多數(shù)批處理程序那樣將計(jì)算的每個(gè)階段寫入磁盤。

  • Flink的TwoPhaseCommitSinkFunction提取了兩階段提交協(xié)議的通用邏輯,基于此將Flink和支持事務(wù)的外部系統(tǒng)結(jié)合,構(gòu)建端到端的Exactly-Once成為可能。

  • 從Flink 1.4.0開始,Pravega和Kafka 0.11 producer都提供了Exactly-Once語(yǔ)義;Kafka在0.11版本首次引入了事務(wù),為在Flink程序中使用Kafka producer提供Exactly-Once語(yǔ)義提供了可能性。

  • Kafaka 0.11 producer的事務(wù)是在TwoPhaseCommitSinkFunction基礎(chǔ)上實(shí)現(xiàn)的,和at-least-once producer相比只增加了非常低的開銷。


這是個(gè)令人興奮的功能,期待Flink TwoPhaseCommitSinkFunction在未來(lái)支持更多的數(shù)據(jù)接收端。


向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI