溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Flink 1.11 究竟有哪些易用性上的改善

發(fā)布時(shí)間:2021-12-06 09:23:12 來源:億速云 閱讀:120 作者:柒染 欄目:大數(shù)據(jù)

本篇文章給大家分享的是有關(guān)Flink 1.11 究竟有哪些易用性上的改善,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。


7月7日,F(xiàn)link 1.11.0 正式發(fā)布了,作為這個(gè)版本的 release manager 之一,我想跟大家分享一下其中的經(jīng)歷感受以及一些代表性 feature 的解讀。在進(jìn)入深度解讀前,我們先簡單了解下社區(qū)發(fā)布的一般流程,幫助大家更好的理解和參與 Flink 社區(qū)的工作。

                 

  • 首先在每個(gè)版本的規(guī)劃初期,會(huì)從志愿者中選出 1-2 名作為 Release Manager。1.11.0 版本我作為中國這邊的 Release Manager,同時(shí)還有一名來自 Ververica 的 Piotr Nowojski 作為德國方的 Release Manager,這在某種程度上也說明中國的開發(fā)者和貢獻(xiàn)度在整個(gè)社區(qū)的占比很重要。


  • 接下來會(huì)進(jìn)行這個(gè)版本的 Feature Kickoff。在一些大的方向上,社區(qū)的規(guī)劃周期可能比較久,會(huì)分階段、分步驟跨越多個(gè)版本完成,確保質(zhì)量。每個(gè)版本的側(cè)重點(diǎn)也會(huì)有所不同,比如前兩個(gè)版本側(cè)重于批處理的加強(qiáng),而這個(gè)版本更側(cè)重于流處理易用性的提升。社區(qū)規(guī)劃的 Feature 列表會(huì)在郵件列表中發(fā)起討論,以收集更多的用戶/開發(fā)者意見和反饋。


  • 一般的開發(fā)周期為 2-3 個(gè)月時(shí)間,提前會(huì)明確規(guī)劃出大概的 Feature Freeze 時(shí)間,之后進(jìn)行 Release Candidate 的發(fā)布和測試、以及 Bug Fix。一般經(jīng)過幾輪的迭代周期后會(huì)正式投票通過一個(gè)相對穩(wěn)定的 Candidate 版本,然后基于這個(gè)版本正式發(fā)布。


Flink 1.11.0 從 3 月初的功能規(guī)劃到 7 月初的正式發(fā)布,歷經(jīng)了差不多 4 個(gè)月的時(shí)間,對 Flink 的生態(tài)、易用性、生產(chǎn)可用性、穩(wěn)定性等方面都進(jìn)行了增強(qiáng)和改善,下面將一一跟大家分享。

一  綜述


Flink 1.11.0 從 Feature 凍結(jié)后發(fā)布了 4 次 Candidate 才最終通過。經(jīng)統(tǒng)計(jì),一共有 236 個(gè)貢獻(xiàn)者參與了這次版本開發(fā),解決了 1474 個(gè) Jira 問題,涉及 30 多個(gè) FLIP,提交了 2325 個(gè) Commit。

Flink 1.11 究竟有哪些易用性上的改善

Flink 1.11 究竟有哪些易用性上的改善

縱觀近五次版本發(fā)布,可以看出從 1.9.0 開始 Flink 進(jìn)入了一個(gè)快速發(fā)展階段,各個(gè)維度指標(biāo)相比之前都有了幾乎翻倍的提高。也是從 1.9.0 開始阿里巴巴內(nèi)部的 Blink 項(xiàng)目開始被開源 Flink 整合,到 1.10.0 經(jīng)過兩個(gè)大版本已經(jīng)全部整合完畢,對 Flink 從生態(tài)建設(shè)、功能性、性能和生產(chǎn)穩(wěn)定性上都有了大幅的增強(qiáng)。

Flink 1.11.0 版本的最初定位是重點(diǎn)解決易用性問題,提升用戶業(yè)務(wù)的生產(chǎn)使用體驗(yàn),整體上不做大的架構(gòu)調(diào)整和功能開發(fā),傾向于快速迭代的小版本開發(fā)。但是從上面統(tǒng)計(jì)的各個(gè)指標(biāo)來看,所謂的“小版本”在各個(gè)維度的數(shù)據(jù)也絲毫不遜色于前兩個(gè)大版本,解決問題的數(shù)量和參與的貢獻(xiàn)者人數(shù)也在持續(xù)增加,其中來自中國的貢獻(xiàn)者比例達(dá)到 62%。

下面我們會(huì)深度剖析 Flink 1.11.0 帶來了哪些讓大家期待已久的特性,從用戶直接使用的 API 層一直到執(zhí)行引擎層,我們都會(huì)選擇一些有代表性的 Feature 從不同維度解讀,更完整的 Feature 列表請大家關(guān)注發(fā)布的 Release Blog。

二  生態(tài)完善和易用性提升


這兩個(gè)維度在某種程度上是相輔相成的,很難嚴(yán)格區(qū)分開,生態(tài)兼容上的缺失常常造成使用上的不便,提升易用性的過程往往也是不斷完善相關(guān)生態(tài)的過程。在這方面用戶感知最明顯的應(yīng)該就是 Table & SQL API 層面的使用。

1  Table & SQL 支持 Change Data Capture(CDC)

CDC 被廣泛使用在復(fù)制數(shù)據(jù)、更新緩存、微服務(wù)間同步數(shù)據(jù)、審計(jì)日志等場景,很多公司都在使用開源的 CDC 工具,如 MySQL CDC。通過 Flink 支持在 Table & SQL 中接入和解析 CDC 是一個(gè)強(qiáng)需求,在過往的很多討論中都被提及過,可以幫助用戶以實(shí)時(shí)的方式處理 Changelog 流,進(jìn)一步擴(kuò)展 Flink 的應(yīng)用場景,例如把 MySQL 中的數(shù)據(jù)同步到 PG 或 ElasticSearch 中,低延時(shí)的 Temporal Join 一個(gè) Changelog 等。

除了考慮到上面的真實(shí)需求,F(xiàn)link 中定義的“Dynamic Table”概念在流上有兩種模型:Append 模式和 Update 模式。通過 Append 模式把流轉(zhuǎn)化為“Dynamic Table”在之前的版本中已經(jīng)支持,因此在 1.11.0 中進(jìn)一步支持 Update 模式也從概念層面完整的實(shí)現(xiàn)了“Dynamic Table”。

Flink 1.11 究竟有哪些易用性上的改善

為了支持解析和輸出 Changelog,如何在外部系統(tǒng)和 Flink 系統(tǒng)之間編解碼這些更新操作是首要解決的問題??紤]到 Source 和 Sink 是銜接外部系統(tǒng)的一個(gè)橋梁,因此 FLIP-95 在定義全新的 Table Source 和 Table Sink 接口時(shí)解決了這個(gè)問題。

在公開的 CDC 調(diào)研報(bào)告中,Debezium 和 Canal 是用戶中最流行使用的 CDC 工具,這兩種工具用來同步 Changelog 到其它的系統(tǒng)中,如消息隊(duì)列。據(jù)此,F(xiàn)LIP-105 首先支持了 Debezium 和 Canal 這兩種格式,而且 Kafka Source 也已經(jīng)可以支持解析上述格式并輸出更新事件,在后續(xù)的版本中會(huì)進(jìn)一步支持 Avro(Debezium) 和 Protobuf(Canal)。

CREATE TABLE my_table (  ...) WITH (  'connector'='...', -- e.g. 'kafka'  'format'='debezium-json',  'debezium-json.schema-include'='true' -- default: false (Debezium can be configured to include or exclude the message schema)  'debezium-json.ignore-parse-errors'='true' -- default: false);

2  Table & SQL 支持 JDBC Catalog

1.11.0 之前,用戶如果依賴 Flink 的 Source/Sink 讀寫關(guān)系型數(shù)據(jù)庫或讀取 Changelog 時(shí),必須要手動(dòng)創(chuàng)建對應(yīng)的 Schema。而且當(dāng)數(shù)據(jù)庫中的 Schema 發(fā)生變化時(shí),也需要手動(dòng)更新對應(yīng)的 Flink 作業(yè)以保持一致和類型匹配,任何不匹配都會(huì)造成運(yùn)行時(shí)報(bào)錯(cuò)使作業(yè)失敗。用戶經(jīng)常抱怨這個(gè)看似冗余且繁瑣的流程,體驗(yàn)極差。

實(shí)際上對于任何和 Flink 連接的外部系統(tǒng)都可能有類似的上述問題,在 1.11.0 中重點(diǎn)解決了和關(guān)系型數(shù)據(jù)庫對接的這個(gè)問題。FLIP-93 提供了 JDBC catalog 的基礎(chǔ)接口以及 Postgres catalog 的實(shí)現(xiàn),這樣方便后續(xù)實(shí)現(xiàn)與其它類型的關(guān)系型數(shù)據(jù)庫的對接。

1.11.0 版本后,用戶使用 Flink SQL 時(shí)可以自動(dòng)獲取表的 Schema 而不再需要輸入 DDL。除此之外,任何 Schema 不匹配的錯(cuò)誤都會(huì)在編譯階段提前進(jìn)行檢查報(bào)錯(cuò),避免了之前運(yùn)行時(shí)報(bào)錯(cuò)造成的作業(yè)失敗。這是提升易用性和用戶體驗(yàn)的一個(gè)典型例子。

3  Hive 實(shí)時(shí)數(shù)倉

從 1.9.0 版本開始 Flink 從生態(tài)角度致力于集成 Hive,目標(biāo)打造批流一體的 Hive 數(shù)倉。經(jīng)過前兩個(gè)版本的迭代,已經(jīng)達(dá)到了 Batch 兼容且生產(chǎn)可用,在 TPC-DS 10T Benchmark 下性能達(dá)到 Hive 3.0 的 7 倍以上。

1.11.0 在 Hive 生態(tài)中重點(diǎn)實(shí)現(xiàn)了實(shí)時(shí)數(shù)倉方案,改善了端到端流式 ETL 的用戶體驗(yàn),達(dá)到了批流一體 Hive 數(shù)倉的目標(biāo)。同時(shí)在兼容性、性能、易用性方面也進(jìn)一步進(jìn)行了加強(qiáng)。

在實(shí)時(shí)數(shù)倉的解決方案中,憑借 Flink 的流式處理優(yōu)勢做到實(shí)時(shí)讀寫 Hive:

  • Hive 寫入:FLIP-115 完善擴(kuò)展了 FileSystem Connector 的基礎(chǔ)能力和實(shí)現(xiàn),Table/SQL 層的 sink 可以支持各種格式(CSV、Json、Avro、Parquet、ORC),而且支持 Hive Table 的所有格式。

  • Partition 支持:數(shù)據(jù)導(dǎo)入 Hive 引入 Partition 提交機(jī)制來控制可見性,通過sink.partition-commit.trigger 控制 Partition 提交的時(shí)機(jī),通過 sink.partition-commit.policy.kind 選擇提交策略,支持 SUCCESS 文件和 Metastore 提交。

  • Hive 讀?。簩?shí)時(shí)化的流式讀取 Hive,通過監(jiān)控 Partition 生成增量讀取新 Partition,或者監(jiān)控文件夾內(nèi)新文件生成來增量讀取新文件。

在 Hive 可用性方面的提升:

  • FLIP-123 通過 Hive Dialect 為用戶提供語法兼容,這樣用戶無需在 Flink 和 Hive 的 CLI 之間切換,可以直接遷移 Hive 腳本到 Flink 中執(zhí)行。

  • 提供 Hive 相關(guān)依賴的內(nèi)置支持,避免用戶自己下載所需的相關(guān)依賴?,F(xiàn)在只需要單獨(dú)下載一個(gè)包,配置 HADOOP_CLASSPATH 就可以運(yùn)行。

在 Hive 性能方面,1.10.0 中已經(jīng)支持了 ORC(Hive 2+)的向量化讀取,1.11.0 中我們補(bǔ)全了所有版本的 Parquet 和 ORC 向量化支持來提升性能。

4  全新 Source API

前面也提到過,Source 和 Sink 是 Flink 對接外部系統(tǒng)的一個(gè)橋梁,對于完善生態(tài)、可用性及端到端的用戶體驗(yàn)是很重要的環(huán)節(jié)。社區(qū)早在一年前就已經(jīng)規(guī)劃了 Source 端的徹底重構(gòu),從 FLIP-27 的 ID 就可以看出是很早的一個(gè) Feature。但是由于涉及到很多復(fù)雜的內(nèi)部機(jī)制和考慮到各種 Source Connector 的實(shí)現(xiàn),設(shè)計(jì)上需要考慮的很全面。從 1.10.0 就開始做 POC 的實(shí)現(xiàn),最終趕上了 1.11.0 版本的發(fā)布。

先簡要回顧下 Source 之前的主要問題:

  • 對用戶而言,在 Flink 中改造已有的 Source 或者重新實(shí)現(xiàn)一個(gè)生產(chǎn)級的 Source Connector 不是一件容易的事情,具體體現(xiàn)在沒有公共的代碼可以復(fù)用,而且需要理解很多 Flink 內(nèi)部細(xì)節(jié)以及實(shí)現(xiàn)具體的 Event Time 分配、Watermark 產(chǎn)出、Idleness 監(jiān)測、線程模型等。

  • 批和流的場景需要實(shí)現(xiàn)不同的 Source。

  • Partitions/Splits/Shards 概念在接口中沒有顯式表達(dá),比如 Split 的發(fā)現(xiàn)邏輯和數(shù)據(jù)消費(fèi)都耦合在 Source Sunction 的實(shí)現(xiàn)中,這樣在實(shí)現(xiàn) Kafka 或 Kinesis 類型的 Source 時(shí)增加了復(fù)雜性。

  • 在 Runtime 執(zhí)行層,Checkpoint 鎖被 Source Function 搶占會(huì)帶來一系列問題,框架很難進(jìn)行優(yōu)化。

FLIP-27 在設(shè)計(jì)時(shí)充分考慮了上述的痛點(diǎn):

Flink 1.11 究竟有哪些易用性上的改善

  • 首先在 Job Manager 和 Task Manager 中分別引入兩種不同的組件 Split Enumerator 和 Source Reader,解耦 Split 發(fā)現(xiàn)和對應(yīng)的消費(fèi)處理,同時(shí)方便隨意組合不同的策略。比如現(xiàn)有的 Kafka Connector 中有多種不同的 Partition 發(fā)現(xiàn)策略和實(shí)現(xiàn)耦合在一起,在新的架構(gòu)下,我們只需要實(shí)現(xiàn)一種 Source Reader,就可以適配多種 Split Enumerator 的實(shí)現(xiàn)來對應(yīng)不同的 Partition 發(fā)現(xiàn)策略。

  • 在新架構(gòu)下實(shí)現(xiàn)的 Source Connector 可以做到批流統(tǒng)一,唯一的小區(qū)別是對批場景的有限輸入,Split Enumerator 會(huì)產(chǎn)出固定數(shù)量的 Split 集合并且每個(gè) Split 都是有限數(shù)據(jù)集;對于流場景的無限輸入,Split Enumerator 要么產(chǎn)出無限多的 Split 或者 Split 自身是無限數(shù)據(jù)集。

  • 復(fù)雜的 Timestamp Assigner 以及 Watermark Generator 透明的內(nèi)置在 Source Reader 模塊內(nèi)運(yùn)行,對用戶來說是無感知的。這樣用戶如果想實(shí)現(xiàn)新的 Source Connector,一般不再需要重復(fù)實(shí)現(xiàn)這部分功能。

目前 Flink 已有的 Source Connector 會(huì)在后續(xù)的版本中基于新架構(gòu)來重新實(shí)現(xiàn),Legacy Source 也會(huì)繼續(xù)維護(hù)幾個(gè)版本保持兼容性,用戶也可以按照 Release 文檔中的說明來嘗試體驗(yàn)新 Source 的開發(fā)。

5  PyFlink 生態(tài)

眾所周知,Python 語言在機(jī)器學(xué)習(xí)和數(shù)據(jù)分析領(lǐng)域有著廣泛的使用。Flink 從 1.9.0 版本開始發(fā)力兼容 Python 生態(tài),Python 和 Flink 合力為 PyFlink,把 Flink 的實(shí)時(shí)分布式處理能力輸出給 Python 用戶。前兩個(gè)版本 PyFlink 已經(jīng)支持了 Python Table API 和 UDF,在 1.11.0 中擴(kuò)大對 Python 生態(tài)庫 Pandas 的支持以及和 SQL DDL/Client 的集成,同時(shí) Python UDF 性能有了極大的提升。

具體來說,之前普通的 Python UDF 每次調(diào)用只能處理一條數(shù)據(jù),而且在 Java 端和 Python 端都需要序列化/反序列化,開銷很大。1.11.0 中 Flink 支持在 Table & SQL 作業(yè)中自定義和使用向量化 Python UDF,用戶只需要在 UDF 修飾中額外增加一個(gè)參數(shù) udf_type=“pandas” 即可。這樣帶來的好處是:

  • 每次調(diào)用可以處理 N 條數(shù)據(jù)。

  • 數(shù)據(jù)格式基于 Apache Arrow,大大降低了 Java、Python 進(jìn)程之間的序列化/反序列化開銷。

  • 方便 Python 用戶基于 Numpy 和 Pandas 等數(shù)據(jù)分析領(lǐng)域常用的 Python 庫,開發(fā)高性能的 Python UDF。

除此之外,1.11.0 中 PyFlink 還支持:

  • PyFlink table 和 Pandas DataFrame 之間無縫切換(FLIP-120),增強(qiáng) Pandas 生態(tài)的易用性和兼容性。

  • Table & SQL 中可以定義和使用 Python UDTF(FLINK-14500),不再必需 Java/Scala UDTF。

  • Cython 優(yōu)化 Python UDF 的性能(FLIP-121),對比 1.10.0 可以提升 30 倍。

  • Python UDF 中用戶自定義 Metric(FLIP-112),方便監(jiān)控和調(diào)試 UDF 的執(zhí)行。

上述解讀的都是側(cè)重 API 層面,用戶開發(fā)作業(yè)可以直接感知到的易用性的提升。下面我們看看執(zhí)行引擎層在 1.11.0 中都有哪些值得關(guān)注的變化。

三  生產(chǎn)可用性和穩(wěn)定性提升


1  支持 Application 模式和 Kubernetes 增強(qiáng)

1.11.0 版本前,F(xiàn)link 主要支持如下兩種模式運(yùn)行:

  • Session 模式:提前啟動(dòng)一個(gè)集群,所有作業(yè)都共享這個(gè)集群的資源運(yùn)行。優(yōu)勢是避免每個(gè)作業(yè)單獨(dú)啟動(dòng)集群帶來的額外開銷,缺點(diǎn)是隔離性稍差。如果一個(gè)作業(yè)把某個(gè) Task Manager(TM)容器搞掛,會(huì)導(dǎo)致這個(gè)容器內(nèi)的所有作業(yè)都跟著重啟。雖然每個(gè)作業(yè)有自己獨(dú)立的 Job Manager(JM)來管理,但是這些 JM 都運(yùn)行在一個(gè)進(jìn)程中,容易帶來負(fù)載上的瓶頸。

  • Per-job 模式:為了解決 Session 模式隔離性差的問題,每個(gè)作業(yè)根據(jù)資源需求啟動(dòng)獨(dú)立的集群,每個(gè)作業(yè)的 JM 也是運(yùn)行在獨(dú)立的進(jìn)程中,負(fù)載相對小很多。

以上兩種模式的共同問題是需要在客戶端執(zhí)行用戶代碼,編譯生成對應(yīng)的 Job Graph 提交到集群運(yùn)行。在這個(gè)過程需要下載相關(guān) Jar 包并上傳到集群,客戶端和網(wǎng)絡(luò)負(fù)載壓力容易成為瓶頸,尤其當(dāng)一個(gè)客戶端被多個(gè)用戶共享使用。

1.11.0 中引入了 Application 模式(FLIP-85)來解決上述問題,按照 Application 粒度來啟動(dòng)一個(gè)集群,屬于這個(gè) Application 的所有 Job 在這個(gè)集群中運(yùn)行。核心是 Job Graph 的生成以及作業(yè)的提交不在客戶端執(zhí)行,而是轉(zhuǎn)移到 JM 端執(zhí)行,這樣網(wǎng)絡(luò)下載上傳的負(fù)載也會(huì)分散到集群中,不再有上述 Client 單點(diǎn)上的瓶頸。

用戶可以通過 bin/flink run-application 來使用 Application 模式,目前 Yarn 和  Kubernetes(K8s)都已經(jīng)支持這種模式。Yarn application 會(huì)在客戶端將運(yùn)行作業(yè)需要的依賴都通過 Yarn Local Resource 傳遞到 JM。K8s Application 允許用戶構(gòu)建包含用戶 Jar 與依賴的鏡像,同時(shí)會(huì)根據(jù)作業(yè)自動(dòng)創(chuàng)建 TM,并在結(jié)束后銷毀整個(gè)集群,相比 Session 模式具有更好的隔離性。K8s 不再有嚴(yán)格意義上的 Per-Job 模式,Application 模式相當(dāng)于 Per-Job 在集群進(jìn)行提交作業(yè)的實(shí)現(xiàn)。

除了支持 Application 模式,F(xiàn)link 原生 K8s 在 1.11.0 中還完善了很多基礎(chǔ)的功能特性(FLINK-14460),以達(dá)到生產(chǎn)可用性的標(biāo)準(zhǔn)。例如 Node Selector、Label、Annotation、Toleration 等。為了更方便的與 Hadoop 集成,也支持根據(jù)環(huán)境變量自動(dòng)掛載 Hadoop 配置的功能。

2  Checkpoint & Savepoint 優(yōu)化

Checkpoint 和 Savepoint 機(jī)制一直是 Flink 保持先進(jìn)性的核心競爭力之一,社區(qū)在這個(gè)領(lǐng)域的改動(dòng)很謹(jǐn)慎,最近的幾個(gè)大版本中幾乎沒有大的功能和架構(gòu)上的調(diào)整。在用戶郵件列表中,我們經(jīng)常能看到用戶反饋和抱怨的相關(guān)問題:比如 Checkpoint 長時(shí)間做不出來失敗,Savepoint 在作業(yè)重啟后不可用等等。1.11.0 有選擇的解決了一些這方面的常見問題,提高生產(chǎn)可用性和穩(wěn)定性。

1.11.0 之前, Savepoint 中 Meta 數(shù)據(jù)和 State 數(shù)據(jù)分別保存在兩個(gè)不同的目錄中,這樣如果想遷移 State 目錄很難識(shí)別這種映射關(guān)系,也可能導(dǎo)致目錄被誤刪除,對于目錄清理也同樣有麻煩。1.11.0 把兩部分?jǐn)?shù)據(jù)整合到一個(gè)目錄下,這樣方便整體轉(zhuǎn)移和復(fù)用。另外,之前 Meta 引用 State 采用的是絕對路徑,這樣 State 目錄遷移后路徑發(fā)生變化也不可用,1.11.0 把 State 引用改成了相對路徑解決了這個(gè)問題(FLINK-5763),這樣 Savepoint 的管理維護(hù)、復(fù)用更加靈活方便。

實(shí)際生產(chǎn)環(huán)境中,用戶經(jīng)常遭遇 Checkpoint 超時(shí)失敗、長時(shí)間不能完成帶來的困擾。一旦作業(yè) failover 會(huì)造成回放大量的歷史數(shù)據(jù),作業(yè)長時(shí)間沒有進(jìn)度,端到端的延遲增加。1.11.0 從不同維度對 Checkpoint 的優(yōu)化和提速做了改進(jìn),目標(biāo)實(shí)現(xiàn)分鐘甚至秒級的輕量型 Checkpoint。

首先,增加了 Checkpoint Coordinator 通知 Task 取消 Checkpoint 的機(jī)制(FLINK-8871),這樣避免 Task 端還在執(zhí)行已經(jīng)取消的 Checkpoint 而對系統(tǒng)帶來不必要的壓力。同時(shí) Task 端放棄已經(jīng)取消的 Checkpoint,可以更快的參與執(zhí)行 Coordinator 新觸發(fā)的 Checkpoint,某種程度上也可以避免新 Checkpoint 再次執(zhí)行超時(shí)而失敗。這個(gè)優(yōu)化也對后面默認(rèn)開啟 Local Recovery 提供了便利,Task 端可以及時(shí)清理失效 Checkpoint 的資源。

其次,在反壓場景下,整個(gè)數(shù)據(jù)鏈路堆積了大量 Buffer,導(dǎo)致 Checkpoint Barrier 排在數(shù)據(jù) Buffer 后面,不能被 Task 及時(shí)處理對齊,也就導(dǎo)致了 Checkpoint 長時(shí)間不能執(zhí)行。1.11.0 中從兩個(gè)維度對這個(gè)問題進(jìn)行解決:

1)嘗試減少數(shù)據(jù)鏈路中的 Buffer 總量(FLINK-16428),這樣 Checkpoint Barrier 可以盡快被處理對齊。

  • 上游輸出端控制單個(gè) Sub Partition 堆積 Buffer 的最大閾值(Backlog),避免負(fù)載不均場景下單個(gè)鏈路上堆積大量 Buffer。

  • 在不影響網(wǎng)絡(luò)吞吐性能的情況下合理修改上下游默認(rèn)的 Buffer 配置。

  • 上下游數(shù)據(jù)傳輸?shù)幕A(chǔ)協(xié)議進(jìn)行了調(diào)整,允許單個(gè)數(shù)據(jù)鏈路可以配置 0 個(gè)獨(dú)占 Buffer 而不死鎖,這樣總的 Buffer 數(shù)量和作業(yè)并發(fā)規(guī)模解耦。根據(jù)實(shí)際需求在吞吐性能和 Checkpoint 速度兩者之間權(quán)衡,自定義 Buffer 配比。

這個(gè)優(yōu)化有一部分工作已經(jīng)在 1.11.0 中完成,剩余部分會(huì)在下個(gè)版本繼續(xù)推進(jìn)完成。

2)實(shí)現(xiàn)了全新的 Unaligned Checkpoint 機(jī)制(FLIP-76)從根本上解決了反壓場景下 Checkpoint Barrier 對齊的問題。實(shí)際上這個(gè)想法早在 1.10.0 版本之前就開始醞釀設(shè)計(jì),由于涉及到很多模塊的大改動(dòng),實(shí)現(xiàn)機(jī)制和線程模型也很復(fù)雜。我們實(shí)現(xiàn)了兩種不同方案的原型 POC 進(jìn)行了測試、性能對比,確定了最終的方案,因此直到 1.11.0 才完成了 MVP 版本,這也是 1.11.0 中執(zhí)行引擎層唯一的一個(gè)重量級 Feature。其基本思想可以概括為:

  • Checkpoint Barrier 跨數(shù)據(jù) Buffer 傳輸,不在輸入輸出隊(duì)列排隊(duì)等待處理,這樣就和算子的計(jì)算能力解耦,Barrier 在節(jié)點(diǎn)之間的傳輸只有網(wǎng)絡(luò)延時(shí),可以忽略不計(jì)。

  • 每個(gè)算子多個(gè)輸入鏈路之間不需要等待 Barrier 對齊來執(zhí)行 Checkpoint,第一個(gè)到的 Barrier 就可以提前觸發(fā) Checkpoint,這樣可以進(jìn)一步提速 Checkpoint,不會(huì)因?yàn)閭€(gè)別鏈路的延遲而影響整體。

  • 為了和之前 Aligned Checkpoint 的語義保持一致,所有未被處理的輸入輸出數(shù)據(jù) Buffer 都將作為 Channel State 在 Checkpoint 執(zhí)行時(shí)進(jìn)行快照持久化,在 Failover 時(shí)連同 Operator State 一同進(jìn)行恢復(fù)。換句話說,Aligned 機(jī)制保證的是 Barrier 前面所有數(shù)據(jù)必須被處理完,狀態(tài)實(shí)時(shí)體現(xiàn)到 Operator State 中;而 Unaligned 機(jī)制把 Barrier 前面的未處理數(shù)據(jù)所反映的 Operator State 延后到 Failover Restart 時(shí)通過 Channel State 回放進(jìn)行體現(xiàn),從狀態(tài)恢復(fù)的角度來說最終都是一致的。注意這里雖然引入了額外的 In-Flight Buffer 的持久化,但是這個(gè)過程實(shí)際是在 Checkpoint 的異步階段完成的,同步階段只是進(jìn)行了輕量級的 Buffer 引用,所以不會(huì)過多占用算子的計(jì)算時(shí)間而影響吞吐性能。


Flink 1.11 究竟有哪些易用性上的改善   
Unaligned Checkpoint 在反壓嚴(yán)重的場景下可以明顯加速 Checkpoint 的完成時(shí)間,因?yàn)樗辉僖蕾囉谡w的計(jì)算吞吐能力,而和系統(tǒng)的存儲(chǔ)性能更加相關(guān),相當(dāng)于計(jì)算和存儲(chǔ)的解耦。但是它的使用也有一定的局限性,它會(huì)增加整體 State 的大小,對存儲(chǔ) IO 帶來額外的開銷,因此在 IO 已經(jīng)是瓶頸的場景下就不太適合使用 Unaligned Checkpoint 機(jī)制。

1.11.0 中 Unaligned Checkpoint 還沒有作為默認(rèn)模式,需要用戶手動(dòng)配置來開啟,并且只在 Exactly-Once 模式下生效。但目前還不支持 Savepoint 模式,因?yàn)?Savepoint 涉及到作業(yè)的 Rescale 場景,Channel State 目前還不支持 State 拆分,在后面的版本會(huì)進(jìn)一步支持,所以 Savepoint 目前還是會(huì)使用之前的 Aligned 模式,在反壓場景下有可能需要很長時(shí)間才能完成。

四  總結(jié)


Flink 1.11.0 版本的開發(fā)過程中,我們看到越來越多來自中國的貢獻(xiàn)者參與到核心功能的開發(fā)中,見證了 Flink 在中國的生態(tài)發(fā)展越來越繁榮,比如來自騰訊公司的貢獻(xiàn)者參與了 K8s、Checkpoint 等功能開發(fā),來自字節(jié)跳動(dòng)公司的貢獻(xiàn)者參與了 Table & SQL 層以及引擎網(wǎng)絡(luò)層的一些開發(fā)。希望更多的公司能夠參與到 Flink 開源社區(qū)中,分享在不同領(lǐng)域的經(jīng)驗(yàn),使 Flink 開源技術(shù)一直保持先進(jìn)性,能夠普惠到更多的受眾。

以上就是Flink 1.11 究竟有哪些易用性上的改善,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI