溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

原理解析 | 深入了解 Apache Flink 的網(wǎng)絡(luò)協(xié)議棧

發(fā)布時(shí)間:2020-06-15 22:45:31 來源:網(wǎng)絡(luò) 閱讀:442 作者:Ververica 欄目:大數(shù)據(jù)

作者:Nico Kruber

翻譯:曹英杰

Flink 的網(wǎng)絡(luò)協(xié)議棧是組成 flink-runtime請?zhí)砑渔溄用枋?模塊的核心組件之一,是每個(gè) Flink 作業(yè)的核心。它連接所有 TaskManager 的各個(gè)子任務(wù)(Subtask),因此,對于 Flink 作業(yè)的性能包括吞吐與延遲都至關(guān)重要。與 TaskManager 和 JobManager 之間通過基于 Akka 的 RPC 通信的控制通道不同,TaskManager 之間的網(wǎng)絡(luò)協(xié)議棧依賴于更加底層的 Netty API。

本文將首先介紹 Flink 暴露給流算子(Stream operator)的高層抽象,然后詳細(xì)介紹 Flink 網(wǎng)絡(luò)協(xié)議棧的物理實(shí)現(xiàn)和各種優(yōu)化、優(yōu)化的效果以及 Flink 在吞吐量和延遲之間的權(quán)衡。

1.邏輯視圖

Flink 的網(wǎng)絡(luò)協(xié)議棧為彼此通信的子任務(wù)提供以下邏輯視圖,例如在 A 通過 keyBy() 操作進(jìn)行數(shù)據(jù) Shuffle :

原理解析 | 深入了解 Apache Flink 的網(wǎng)絡(luò)協(xié)議棧cdn.xitu.io/2019/6/25/16b8d5ce9bfaa319?w=1080&h=859&f=png&s=393657">

這一過程建立在以下三種基本概念的基礎(chǔ)上:

▼ 子任務(wù)輸出類型(ResultPartitionType):
Pipelined(有限的或無限的):一旦產(chǎn)生數(shù)據(jù)就可以持續(xù)向下游發(fā)送有限數(shù)據(jù)流或無限數(shù)據(jù)流。
Blocking:僅在生成完整結(jié)果后向下游發(fā)送數(shù)據(jù)。

▼ 調(diào)度策略:
同時(shí)調(diào)度所有任務(wù)(Eager):同時(shí)部署作業(yè)的所有子任務(wù)(用于流作業(yè))。
上游產(chǎn)生第一條記錄部署下游(Lazy):一旦任何生產(chǎn)者生成任何輸出,就立即部署下游任務(wù)。
上游產(chǎn)生完整數(shù)據(jù)部署下游:當(dāng)任何或所有生產(chǎn)者生成完整數(shù)據(jù)后,部署下游任務(wù)。

▼ 數(shù)據(jù)傳輸:
高吞吐:Flink 不是一個(gè)一個(gè)地發(fā)送每條記錄,而是將若干記錄緩沖到其網(wǎng)絡(luò)緩沖區(qū)中并一次性發(fā)送它們。這降低了每條記錄的發(fā)送成本因此提高了吞吐量。
低延遲:當(dāng)網(wǎng)絡(luò)緩沖區(qū)超過一定的時(shí)間未被填滿時(shí)會(huì)觸發(fā)超時(shí)發(fā)送,通過減小超時(shí)時(shí)間,可以通過犧牲一定的吞吐來獲取更低的延遲。

我們將在下面深入 Flink 網(wǎng)絡(luò)協(xié)議棧的物理實(shí)現(xiàn)時(shí)看到關(guān)于吞吐延遲的優(yōu)化。對于這一部分,讓我們詳細(xì)說明輸出類型與調(diào)度策略。首先,需要知道的是子任務(wù)的輸出類型和調(diào)度策略是緊密關(guān)聯(lián)的,只有兩者的一些特定組合才是有效的。

Pipelined 結(jié)果是流式輸出,需要目標(biāo) Subtask 正在運(yùn)行以便接收數(shù)據(jù)。因此需要在上游 Task 產(chǎn)生數(shù)據(jù)之前或者產(chǎn)生第一條數(shù)據(jù)的時(shí)候調(diào)度下游目標(biāo) Task 運(yùn)行。批處理作業(yè)生成有界結(jié)果數(shù)據(jù),而流式處理作業(yè)產(chǎn)生無限結(jié)果數(shù)據(jù)。

批處理作業(yè)也可能以阻塞方式產(chǎn)生結(jié)果,具體取決于所使用的算子和連接模式。在這種情況下,必須等待上游 Task 先生成完整的結(jié)果,然后才能調(diào)度下游的接收 Task 運(yùn)行。這能夠提高批處理作業(yè)的效率并且占用更少的資源。

下表總結(jié)了 Task 輸出類型以及調(diào)度策略的有效組合:

原理解析 | 深入了解 Apache Flink 的網(wǎng)絡(luò)協(xié)議棧

注釋:
[1]目前 Flink 未使用
[2]批處理 / 流計(jì)算統(tǒng)一完成后,可能適用于流式作業(yè)

此外,對于具有多個(gè)輸入的子任務(wù),調(diào)度以兩種方式啟動(dòng):當(dāng)所有或者任何上游任務(wù)產(chǎn)生第一條數(shù)據(jù)或者產(chǎn)生完整數(shù)據(jù)時(shí)調(diào)度任務(wù)運(yùn)行。要調(diào)整批處理作業(yè)中的輸出類型和調(diào)度策略,可以參考 ExecutionConfig#setExecutionMode()——尤其是 ExecutionMode,以及 ExecutionConfig#setDefaultInputDependencyConstraint()。

2.物理數(shù)據(jù)傳輸

為了理解物理數(shù)據(jù)連接,請回想一下,在 Flink 中,不同的任務(wù)可以通過 Slotsharing group 共享相同 Slot。TaskManager 還可以提供多個(gè) Slot,以允許將同一任務(wù)的多個(gè)子任務(wù)調(diào)度到同一個(gè) TaskManager 上。

對于下圖所示的示例,我們假設(shè) 2 個(gè)并發(fā)為 4 的任務(wù)部署在 2 個(gè) TaskManager 上,每個(gè) TaskManager 有兩個(gè) Slot。TaskManager 1 執(zhí)行子任務(wù) A.1,A.2,B.1 和 B.2,TaskManager 2 執(zhí)行子任務(wù) A.3,A.4,B.3 和 B.4。在 A 和 B 之間是 Shuffle 連接類型,比如來自于 A 的 keyBy() 操作,在每個(gè) TaskManager 上會(huì)有 2x4 個(gè)邏輯連接,其中一些是本地的,另一些是遠(yuǎn)程的:

原理解析 | 深入了解 Apache Flink 的網(wǎng)絡(luò)協(xié)議棧

不同任務(wù)(遠(yuǎn)程)之間的每個(gè)網(wǎng)絡(luò)連接將在 Flink 的網(wǎng)絡(luò)堆棧中獲得自己的 TCP 通道。但是,如果同一任務(wù)的不同子任務(wù)被調(diào)度到同一個(gè) TaskManager,則它們與同一個(gè) TaskManager 的網(wǎng)絡(luò)連接將多路復(fù)用并共享同一個(gè) TCP 信道以減少資源使用。在我們的例子中,這適用于 A.1→B.3,A.1→B.4,以及 A.2→B.3 和 A.2→B.4,如下圖所示:

原理解析 | 深入了解 Apache Flink 的網(wǎng)絡(luò)協(xié)議棧

每個(gè)子任務(wù)的輸出結(jié)果稱為 ResultPartition,每個(gè) ResultPartition 被分成多個(gè)單獨(dú)的 ResultSubpartition- 每個(gè)邏輯通道一個(gè)。Flink 的網(wǎng)絡(luò)協(xié)議棧在這一點(diǎn)的處理上,不再處理單個(gè)記錄,而是將一組序列化的記錄填充到網(wǎng)絡(luò)緩沖區(qū)中進(jìn)行處理。每個(gè)子任務(wù)本地緩沖區(qū)中最多可用 Buffer 數(shù)目為(每個(gè)發(fā)送方和接收方各一個(gè)):

#channels  * buffers-per-channel + floating-buffers-per-gate

單個(gè) TaskManager 上的網(wǎng)絡(luò)層 Buffer 總數(shù)通常不需要配置。有關(guān)如何在需要時(shí)進(jìn)行配置的詳細(xì)信息,請參閱配置網(wǎng)絡(luò)緩沖區(qū)的文檔。

▼ 造成反壓(1)

每當(dāng)子任務(wù)的數(shù)據(jù)發(fā)送緩沖區(qū)耗盡時(shí)——數(shù)據(jù)駐留在 Subpartition 的緩沖區(qū)隊(duì)列中或位于更底層的基于 Netty 的網(wǎng)絡(luò)堆棧內(nèi),生產(chǎn)者就會(huì)被阻塞,無法繼續(xù)發(fā)送數(shù)據(jù),而受到反壓。接收端以類似的方式工作:Netty 收到任何數(shù)據(jù)都需要通過網(wǎng)絡(luò) Buffer 傳遞給 Flink。如果相應(yīng)子任務(wù)的網(wǎng)絡(luò)緩沖區(qū)中沒有足夠可用的網(wǎng)絡(luò) Buffer,F(xiàn)link 將停止從該通道讀取,直到 Buffer 可用。這將反壓該多路復(fù)用上的所有發(fā)送子任務(wù),因此也限制了其他接收子任務(wù)。下圖說明了過載的子任務(wù) B.4,它會(huì)導(dǎo)致多路復(fù)用的反壓,也會(huì)導(dǎo)致子任務(wù) B.3 無法接受和處理數(shù)據(jù),即使是 B.3 還有足夠的處理能力。

原理解析 | 深入了解 Apache Flink 的網(wǎng)絡(luò)協(xié)議棧

為了防止這種情況發(fā)生,F(xiàn)link 1.5 引入了自己的流量控制機(jī)制。

3.Credit-based 流量控制

Credit-based 流量控制可確保發(fā)送端已經(jīng)發(fā)送的任何數(shù)據(jù),接收端都具有足夠的能力(Buffer)來接收。新的流量控制機(jī)制基于網(wǎng)絡(luò)緩沖區(qū)的可用性,作為 Flink 之前機(jī)制的自然延伸。每個(gè)遠(yuǎn)程輸入通道(RemoteInputChannel)現(xiàn)在都有自己的一組獨(dú)占緩沖區(qū)(Exclusive buffer),而不是只有一個(gè)共享的本地緩沖池(LocalBufferPool)。與之前不同,本地緩沖池中的緩沖區(qū)稱為流動(dòng)緩沖區(qū)(Floating buffer),因?yàn)樗鼈儠?huì)在輸出通道間流動(dòng)并且可用于每個(gè)輸入通道。

數(shù)據(jù)接收方會(huì)將自身的可用 Buffer 作為 Credit 告知數(shù)據(jù)發(fā)送方(1 buffer = 1 credit)。每個(gè) Subpartition 會(huì)跟蹤下游接收端的 Credit(也就是可用于接收數(shù)據(jù)的 Buffer 數(shù)目)。只有在相應(yīng)的通道(Channel)有 Credit 的時(shí)候 Flink 才會(huì)向更底層的網(wǎng)絡(luò)協(xié)議棧發(fā)送數(shù)據(jù)(以 Buffer 為粒度),并且每發(fā)送一個(gè) Buffer 的數(shù)據(jù),相應(yīng)的通道上的 Credit 會(huì)減 1。除了發(fā)送數(shù)據(jù)本身外,數(shù)據(jù)發(fā)送端還會(huì)發(fā)送相應(yīng) Subpartition 中有多少正在排隊(duì)發(fā)送的 Buffer 數(shù)(稱之為 Backlog)給下游。數(shù)據(jù)接收端會(huì)利用這一信息(Backlog)去申請合適數(shù)量的 Floating buffer 用于接收發(fā)送端的數(shù)據(jù),這可以加快發(fā)送端堆積數(shù)據(jù)的處理。接收端會(huì)首先申請和 Backlog 數(shù)量相等的 Buffer,但可能無法申請到全部,甚至一個(gè)都申請不到,這時(shí)接收端會(huì)利用已經(jīng)申請到的 Buffer 進(jìn)行數(shù)據(jù)接收,并監(jiān)聽是否有新的 Buffer 可用。

原理解析 | 深入了解 Apache Flink 的網(wǎng)絡(luò)協(xié)議棧

Credit-based 的流控使用 Buffers-per-channel 來指定每個(gè) Channel 有多少獨(dú)占的 Buffer,使用 Floating-buffers-per-gate 來指定共享的本地緩沖池(Local buffer pool)大?。蛇x3),通過共享本地緩沖池,Credit-based 流控可以使用的 Buffer 數(shù)目可以達(dá)到與原來非 Credit-based 流控同樣的大小。這兩個(gè)參數(shù)的默認(rèn)值是被精心選取的,以保證新的 Credit-based 流控在網(wǎng)絡(luò)健康延遲正常的情況下至少可以達(dá)到與原策略相同的吞吐??梢愿鶕?jù)實(shí)際的網(wǎng)絡(luò) RRT (round-trip-time)和帶寬對這兩個(gè)參數(shù)進(jìn)行調(diào)整。

注釋3:如果沒有足夠的 Buffer 可用,則每個(gè)緩沖池將獲得全局可用 Buffer 的相同份額(±1)。

▼ 造成反壓(2)

與沒有流量控制的接收端反壓機(jī)制不同,Credit 提供了更直接的控制:如果接收端的處理速度跟不上,最終它的 Credit 會(huì)減少成 0,此時(shí)發(fā)送端就不會(huì)在向網(wǎng)絡(luò)中發(fā)送數(shù)據(jù)(數(shù)據(jù)會(huì)被序列化到 Buffer 中并緩存在發(fā)送端)。由于反壓只發(fā)生在邏輯鏈路上,因此沒必要阻斷從多路復(fù)用的 TCP 連接中讀取數(shù)據(jù),也就不會(huì)影響其他的接收者接收和處理數(shù)據(jù)。

▼ Credit-based 的優(yōu)勢與問題

由于通過 Credit-based 流控機(jī)制,多路復(fù)用中的一個(gè)信道不會(huì)由于反壓阻塞其他邏輯信道,因此整體資源利用率會(huì)增加。此外,通過完全控制正在發(fā)送的數(shù)據(jù)量,我們還能夠加快 Checkpoint alignment:如果沒有流量控制,通道需要一段時(shí)間才能填滿網(wǎng)絡(luò)協(xié)議棧的內(nèi)部緩沖區(qū)并表明接收端不再讀取數(shù)據(jù)了。在這段時(shí)間里,大量的 Buffer 不會(huì)被處理。任何 Checkpoint barrier(觸發(fā) Checkpoint 的消息)都必須在這些數(shù)據(jù) Buffer 后排隊(duì),因此必須等到所有這些數(shù)據(jù)都被處理后才能夠觸發(fā) Checkpoint(“Barrier 不會(huì)在數(shù)據(jù)之前被處理!”)。

但是,來自接收方的附加通告消息(向發(fā)送端通知 Credit)可能會(huì)產(chǎn)生一些額外的開銷,尤其是在使用 SSL 加密信道的場景中。此外,單個(gè)輸入通道( Input channel)不能使用緩沖池中的所有 Buffer,因?yàn)榇嬖跓o法共享的 Exclusive buffer。新的流控協(xié)議也有可能無法做到立即發(fā)送盡可能多的數(shù)據(jù)(如果生成數(shù)據(jù)的速度快于接收端反饋 Credit 的速度),這時(shí)則可能增長發(fā)送數(shù)據(jù)的時(shí)間。雖然這可能會(huì)影響作業(yè)的性能,但由于其所有優(yōu)點(diǎn),通常新的流量控制會(huì)表現(xiàn)得更好??赡軙?huì)通過增加單個(gè)通道的獨(dú)占 Buffer 數(shù)量,這會(huì)增大內(nèi)存開銷。然而,與先前實(shí)現(xiàn)相比,總體內(nèi)存使用可能仍然會(huì)降低,因?yàn)榈讓拥木W(wǎng)絡(luò)協(xié)議棧不再需要緩存大量數(shù)據(jù),因?yàn)槲覀兛偸强梢粤⒓磳⑵鋫鬏數(shù)?Flink(一定會(huì)有相應(yīng)的 Buffer 接收數(shù)據(jù))。

在使用新的 Credit-based 流量控制時(shí),可能還會(huì)注意到另一件事:由于我們在發(fā)送方和接收方之間緩沖較少的數(shù)據(jù),反壓可能會(huì)更早的到來。然而,這是我們所期望的,因?yàn)榫彺娓鄶?shù)據(jù)并沒有真正獲得任何好處。如果要緩存更多的數(shù)據(jù)并且保留 Credit-based 流量控制,可以考慮通過增加單個(gè)輸入共享 Buffer 的數(shù)量。

原理解析 | 深入了解 Apache Flink 的網(wǎng)絡(luò)協(xié)議棧

原理解析 | 深入了解 Apache Flink 的網(wǎng)絡(luò)協(xié)議棧

注意:如果需要關(guān)閉 Credit-based 流量控制,可以將這個(gè)配置添加到 flink-conf.yaml 中:taskmanager.network.credit-model:false。但是,此參數(shù)已過時(shí),最終將與非 Credit-based 流控制代碼一起刪除。

4.序列號與反序列化

下圖從上面的擴(kuò)展了更高級別的視圖,其中包含網(wǎng)絡(luò)協(xié)議棧及其周圍組件的更多詳細(xì)信息,從發(fā)送算子發(fā)送記錄(Record)到接收算子獲取它:

原理解析 | 深入了解 Apache Flink 的網(wǎng)絡(luò)協(xié)議棧

在生成 Record 并將其傳遞出去之后,例如通過 Collector#collect(),它被傳遞給 RecordWriter,RecordWriter 會(huì)將 Java 對象序列化為字節(jié)序列,最終存儲(chǔ)在 Buffer 中按照上面所描述的在網(wǎng)絡(luò)協(xié)議棧中進(jìn)行處理。RecordWriter 首先使用 SpanningRecordSerializer 將 Record 序列化為靈活的堆上字節(jié)數(shù)組。然后,它嘗試將這些字節(jié)寫入目標(biāo)網(wǎng)絡(luò) Channel 的 Buffer 中。我們將在下面的章節(jié)回到這一部分。

在接收方,底層網(wǎng)絡(luò)協(xié)議棧(Netty)將接收到的 Buffer 寫入相應(yīng)的輸入通道(Channel)。流任務(wù)的線程最終從這些隊(duì)列中讀取并嘗試在 RecordReader 的幫助下通過 SpillingAdaptiveSpanningRecordDeserializer 將累積的字節(jié)反序列化為 Java 對象。與序列化器類似,這個(gè)反序列化器還必須處理特殊情況,例如跨越多個(gè)網(wǎng)絡(luò) Buffer 的 Record,或者因?yàn)橛涗洷旧肀染W(wǎng)絡(luò)緩沖區(qū)大(默認(rèn)情況下為32KB,通過 taskmanager.memory.segment-size 設(shè)置)或者因?yàn)樾蛄谢?Record 時(shí),目標(biāo) Buffer 中已經(jīng)沒有足夠的剩余空間保存序列化后的字節(jié)數(shù)據(jù),在這種情況下,F(xiàn)link 將使用這些字節(jié)空間并繼續(xù)將其余字節(jié)寫入新的網(wǎng)絡(luò) Buffer 中。

4.1 將網(wǎng)絡(luò) Buffer 寫入 Netty

在上圖中,Credit-based 流控制機(jī)制實(shí)際上位于“Netty Server”(和“Netty Client”)組件內(nèi)部,RecordWriter 寫入的 Buffer 始終以空狀態(tài)(無數(shù)據(jù))添加到 Subpartition 中,然后逐漸向其中填寫序列化后的記錄。但是 Netty 在什么時(shí)候真正的獲取并發(fā)送這些 Buffer 呢?顯然,不能是 Buffer 中只要有數(shù)據(jù)就發(fā)送,因?yàn)榭缇€程(寫線程與發(fā)送線程)的數(shù)據(jù)交換與同步會(huì)造成大量的額外開銷,并且會(huì)造成緩存本身失去意義(如果是這樣的話,不如直接將將序列化后的字節(jié)發(fā)到網(wǎng)絡(luò)上而不必引入中間的 Buffer)。

在 Flink 中,有三種情況可以使 Netty 服務(wù)端使用(發(fā)送)網(wǎng)絡(luò) Buffer:

  • 寫入 Record 時(shí) Buffer 變滿,或者
  • Buffer 超時(shí)未被發(fā)送,或
  • 發(fā)送特殊消息,例如 Checkpoint barrier。

在 Buffer 滿后發(fā)送

RecordWriter 將 Record 序列化到本地的序列化緩沖區(qū)中,并將這些序列化后的字節(jié)逐漸寫入位于相應(yīng) Result subpartition 隊(duì)列中的一個(gè)或多個(gè)網(wǎng)絡(luò) Buffer中。雖然單個(gè) RecordWriter 可以處理多個(gè) Subpartition,但每個(gè) Subpartition 只會(huì)有一個(gè) RecordWriter 向其寫入數(shù)據(jù)。另一方面,Netty 服務(wù)端線程會(huì)從多個(gè) Result subpartition 中讀取并像上面所說的那樣將數(shù)據(jù)寫入適當(dāng)?shù)亩嗦窂?fù)用信道。這是一個(gè)典型的生產(chǎn)者 - 消費(fèi)者模式,網(wǎng)絡(luò)緩沖區(qū)位于生產(chǎn)者與消費(fèi)者之間,如下圖所示。在(1)序列化和(2)將數(shù)據(jù)寫入 Buffer 之后,RecordWriter 會(huì)相應(yīng)地更新緩沖區(qū)的寫入索引。一旦 Buffer 完全填滿,RecordWriter 會(huì)(3)為當(dāng)前 Record 剩余的字節(jié)或者下一個(gè) Record 從其本地緩沖池中獲取新的 Buffer,并將新的 Buffer 添加到相應(yīng) Subpartition 的隊(duì)列中。這將(4)通知 Netty服務(wù)端線程有新的數(shù)據(jù)可發(fā)送(如果 Netty 還不知道有可用的數(shù)據(jù)的話4)。每當(dāng) Netty 有能力處理這些通知時(shí),它將(5)從隊(duì)列中獲取可用 Buffer 并通過適當(dāng)?shù)?TCP 通道發(fā)送它。

原理解析 | 深入了解 Apache Flink 的網(wǎng)絡(luò)協(xié)議棧

注釋4:如果隊(duì)列中有更多已完成的 Buffer,我們可以假設(shè) Netty 已經(jīng)收到通知。

在 Buffer 超時(shí)后發(fā)送

為了支持低延遲應(yīng)用,我們不能只等到 Buffer 滿了才向下游發(fā)送數(shù)據(jù)。因?yàn)榭赡艽嬖谶@種情況,某種通信信道沒有太多數(shù)據(jù),等到 Buffer 滿了在發(fā)送會(huì)不必要地增加這些少量 Record 的處理延遲。因此,F(xiàn)link 提供了一個(gè)定期 Flush 線程(the output flusher)每隔一段時(shí)間會(huì)將任何緩存的數(shù)據(jù)全部寫出??梢酝ㄟ^ StreamExecutionEnvironment#setBufferTimeout 配置 Flush 的間隔,并作為延遲5的上限(對于低吞吐量通道)。下圖顯示了它與其他組件的交互方式:RecordWriter 如前所述序列化數(shù)據(jù)并寫入網(wǎng)絡(luò) Buffer,但同時(shí),如果 Netty 還不知道有數(shù)據(jù)可以發(fā)送,Output flusher 會(huì)(3,4)通知 Netty 服務(wù)端線程數(shù)據(jù)可讀(類似與上面的“buffer已滿”的場景)。當(dāng) Netty 處理此通知(5)時(shí),它將消費(fèi)(獲取并發(fā)送)Buffer 中的可用數(shù)據(jù)并更新 Buffer 的讀取索引。Buffer 會(huì)保留在隊(duì)列中——從 Netty 服務(wù)端對此 Buffer 的任何進(jìn)一步操作將在下次從讀取索引繼續(xù)讀取。

原理解析 | 深入了解 Apache Flink 的網(wǎng)絡(luò)協(xié)議棧

注釋5:嚴(yán)格來說,Output flusher 不提供任何保證——它只向 Netty 發(fā)送通知,而 Netty 線程會(huì)按照能力與意愿進(jìn)行處理。這也意味著如果存在反壓,則 Output flusher 是無效的。

特殊消息后發(fā)送

一些特殊的消息如果通過 RecordWriter 發(fā)送,也會(huì)觸發(fā)立即 Flush 緩存的數(shù)據(jù)。其中最重要的消息包括 Checkpoint barrier 以及 end-of-partition 事件,這些事件應(yīng)該盡快被發(fā)送,而不應(yīng)該等待 Buffer 被填滿或者 Output flusher 的下一次 Flush。

進(jìn)一步的討論

與小于 1.5 版本的 Flink 不同,請注意(a)網(wǎng)絡(luò) Buffer 現(xiàn)在會(huì)被直接放在 Subpartition 的隊(duì)列中,(b)網(wǎng)絡(luò) Buffer 不會(huì)在 Flush 之后被關(guān)閉。這給我們帶來了一些好處:

  • 同步開銷較少(Output flusher 和 RecordWriter 是相互獨(dú)立的)
  • 在高負(fù)荷情況下,Netty 是瓶頸(直接的網(wǎng)絡(luò)瓶頸或反壓),我們?nèi)匀豢梢栽谖赐瓿傻?Buffer 中填充數(shù)據(jù)
  • Netty 通知顯著減少

但是,在低負(fù)載情況下,可能會(huì)出現(xiàn) CPU 使用率和 TCP 數(shù)據(jù)包速率的增加。這是因?yàn)椋現(xiàn)link 將使用任何可用的 CPU 計(jì)算能力來嘗試維持所需的延遲。一旦負(fù)載增加,F(xiàn)link 將通過填充更多的 Buffer 進(jìn)行自我調(diào)整。由于同步開銷減少,高負(fù)載場景不會(huì)受到影響,甚至可以實(shí)現(xiàn)更高的吞吐。

4.2 BufferBuilder 和 BufferConsumer

更深入地了解 Flink 中是如何實(shí)現(xiàn)生產(chǎn)者 - 消費(fèi)者機(jī)制,需要仔細(xì)查看 Flink 1.5 中引入的 BufferBuilder 和 BufferConsumer 類。雖然讀取是以 Buffer 為粒度,但寫入它是按 Record 進(jìn)行的,因此是 Flink 中所有網(wǎng)絡(luò)通信的核心路徑。因此,我們需要在任務(wù)線程(Task thread)和 Netty 線程之間實(shí)現(xiàn)輕量級連接,這意味著盡量小的同步開銷。你可以通過查看源代碼獲取更加詳細(xì)的信息。

5. 延遲與吞吐

引入網(wǎng)絡(luò) Buffer 的目是獲得更高的資源利用率和更高的吞吐,代價(jià)是讓 Record 在 Buffer 中等待一段時(shí)間。雖然可以通過 Buffer 超時(shí)給出此等待時(shí)間的上限,但可能很想知道有關(guān)這兩個(gè)維度(延遲和吞吐)之間權(quán)衡的更多信息,顯然,無法兩者同時(shí)兼得。下圖顯示了不同的 Buffer 超時(shí)時(shí)間下的吞吐,超時(shí)時(shí)間從 0 開始(每個(gè) Record 直接 Flush)到 100 毫秒(默認(rèn)值),測試在具有 100 個(gè)節(jié)點(diǎn)每個(gè)節(jié)點(diǎn) 8 個(gè) Slot 的群集上運(yùn)行,每個(gè)節(jié)點(diǎn)運(yùn)行沒有業(yè)務(wù)邏輯的 Task 因此只用于測試網(wǎng)絡(luò)協(xié)議棧的能力。為了進(jìn)行比較,我們還測試了低延遲改進(jìn)(如上所述)之前的 Flink 1.4 版本。

原理解析 | 深入了解 Apache Flink 的網(wǎng)絡(luò)協(xié)議棧

如圖,使用 Flink 1.5+,即使是非常低的 Buffer 超時(shí)(例如1ms)(對于低延遲場景)也提供高達(dá)超時(shí)默認(rèn)參數(shù)(100ms)75% 的最大吞吐,但會(huì)緩存更少的數(shù)據(jù)。

6.結(jié)論

了解 Result partition,批處理和流式計(jì)算的不同網(wǎng)絡(luò)連接以及調(diào)度類型,Credit-Based 流量控制以及 Flink 網(wǎng)絡(luò)協(xié)議棧內(nèi)部的工作機(jī)理,有助于更好的理解網(wǎng)絡(luò)協(xié)議棧相關(guān)的參數(shù)以及作業(yè)的行為。后續(xù)我們會(huì)推出更多 Flink 網(wǎng)絡(luò)棧的相關(guān)內(nèi)容,并深入更多細(xì)節(jié),包括運(yùn)維相關(guān)的監(jiān)控指標(biāo)(Metrics),進(jìn)一步的網(wǎng)絡(luò)調(diào)優(yōu)策略以及需要避免的常見錯(cuò)誤等。

via:
https://flink.apache.org/2019/06/05/flink-network-stack.html

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI