溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

基于Flink的MQ-Hive實時數(shù)據(jù)集成如何實現(xiàn)字節(jié)跳動

發(fā)布時間:2021-12-10 09:16:29 來源:億速云 閱讀:153 作者:小新 欄目:大數(shù)據(jù)

這篇文章主要介紹基于Flink的MQ-Hive實時數(shù)據(jù)集成如何實現(xiàn)字節(jié)跳動,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

在數(shù)據(jù)中臺建設過程中,一個典型的數(shù)據(jù)集成場景是將 MQ (Message Queue,例如 Kafka、RocketMQ 等)的數(shù)據(jù)導入到 Hive 中,以供下游數(shù)倉建設以及指標統(tǒng)計。由于 MQ-Hive 是數(shù)倉建設第一層,因此對數(shù)據(jù)的準確性以及實時性要求比較高。


本文主要圍繞 MQ-Hive 場景,針對目前字節(jié)跳動內已有解決方案的痛點,提出基于 Flink 的實時解決方案,并介紹新方案在字節(jié)跳動內部的使用現(xiàn)狀。

已有方案及痛點


 

字節(jié)跳動內已有解決方案如下圖所示,主要分了兩個步驟:

  1. 通過 Dump 服務將 MQ 的數(shù)據(jù)寫入到 HDFS 文件

  2. 再通過 Batch ETL 將 HDFS 數(shù)據(jù)導入到 Hive 中,并添加 Hive 分區(qū)

基于Flink的MQ-Hive實時數(shù)據(jù)集成如何實現(xiàn)字節(jié)跳動  

痛點

  1. 任務鏈較長,原始數(shù)據(jù)需要經過多次轉換最終才能進入 Hive
  2. 實時性比較差,Dump Service、Batch ETL 延遲都會導致最終數(shù)據(jù)產出延遲
  3. 存儲、計算開銷大,MQ 數(shù)據(jù)重復存儲和計算
  4. 基于原生 Java 打造,數(shù)據(jù)流量持續(xù)增長后,存在單點故障和機器負載不均衡等問題
  5. 運維成本較高,架構上無法復用公司內 Hadoop/Flink/Yarn 等現(xiàn)有基礎設施
  6. 不支持異地容災

基于 Flink 實時解決方案

優(yōu)勢

針對目前公司傳統(tǒng)解決方案的痛點,我們提出基于 Flink 的實時解決方案,將 MQ 的數(shù)據(jù)實時寫入到 Hive,并支持事件時間以及 Exactly Once 語義。相比老方案,新方案優(yōu)勢如下所示:

  1. 基于流式引擎 Flink 開發(fā),支持 Exactly Once 語義
  2. 實時性更高,MQ 數(shù)據(jù)直接進入 Hive,無中間計算環(huán)節(jié)
  3. 減少中間存儲,整個流程數(shù)據(jù)只會落地一次
  4. 支撐 Yarn 部署模式,方便用戶遷移
  5. 資源管理彈性,方便擴容以及運維
  6. 支持雙機房容災

整體架構

整體架構如下圖所示,主要包括 DTS(Data Transmission Service) Source、DTS Core、DTS Sink 三大模塊,具體功能如下:

  1. DTS Source 接入不同 MQ 數(shù)據(jù)源,支持 Kafka、RocketMQ 等
  2. DTS Sink 將數(shù)據(jù)輸出到目標數(shù)據(jù)源,支持 HDFS、Hive 等
  3. DTS Core 貫穿整個數(shù)據(jù)同步流程,通過 Source 讀取源端數(shù)據(jù),經過 DTS Framework 處理,最后通過 Sink 將數(shù)據(jù)輸出到目標端。
  4. DTS Framework 集成類型系統(tǒng)、文件切分、Exactly Once、任務信息采集、事件時間、臟數(shù)據(jù)收集等核心功能
  5. 支持 Yarn 部署模式,資源調度、管理比較彈性

基于Flink的MQ-Hive實時數(shù)據(jù)集成如何實現(xiàn)字節(jié)跳動  
DTS Dump架構圖  

Exactly Once

Flink 框架通過 Checkpoint 機制,能夠提供 Exactly Once 或者 At Least Once 語義。為了實現(xiàn) MQ-Hive 全鏈路支持 Exactly-once 語義,還需要 MQ Source、Hive Sink 端支持 Exactly Once 語義。本文通過 Checkpoint + 2PC 協(xié)議實現(xiàn),具體過程如下:

  1. 數(shù)據(jù)寫入時,Source 端從上游 MQ 拉取數(shù)據(jù)并發(fā)送到 Sink 端;Sink 端將數(shù)據(jù)寫入到臨時目錄中
  2. Checkpoint Snapshot 階段,Source 端將 MQ Offset 保存到 State 中;Sink 端關閉寫入的文件句柄,并保存當前 Checkpoint ID 到 State 中;
  3. Checkpoint Complete 階段,Source 端 Commit MQ Offset;Sink 端將臨時目錄中的數(shù)據(jù)移動到正式目錄下
  4. Checkpoint Recover 階段,加載最新一次成功的 Checkpoint 目錄并恢復 State 信息,其中 Source 端將 State 中保存的 MQ Offset 作為起始位置;Sink 端恢復最新一次成功的 Checkpoint ID,并將臨時目錄的數(shù)據(jù)移動到正式目錄下

■ 實現(xiàn)優(yōu)化

在實際使用場景中,特別是大并發(fā)場景下,HDFS 寫入延遲容易有毛刺,因為個別 Task Snapshot 超時或者失敗,導致整個 Checkpoint 失敗的問題會比較明顯。因此針對 Checkpoint 失敗,提高系統(tǒng)的容錯性以及穩(wěn)定性就比較重要。

這里充分利用 Checkpoint ID 嚴格單調遞增的特性,每一次做 Checkpoint 時,當前 Checkpoint ID 一定比以前大,因此在 Checkpoint Complete 階段,可以提交小于等于當前 Checkpoint ID 的臨時數(shù)據(jù)。具體優(yōu)化策略如下:

  1. Sink 端臨時目錄為{dump_path}/{next_cp_id},這里 next_cp_id 的定義是當前最新的 cp_id + 1
  2. Checkpoint Snapshot 階段,Sink 端保存當前最新 cp_id 到 State,同時更新 next_cp_id 為 cp_id + 1
  3. Checkpoint Complete 階段,Sink 端將臨時目錄中所有小于等于當前 cp_id 的數(shù)據(jù)移動到正式目錄下
  4. Checkpoint Recover 階段,Sink 端恢復最新一次成功的 cp_id,并將臨時目錄中小于等于當前 cp_id 的數(shù)據(jù)移動到正式目錄下

類型系統(tǒng)

由于不同數(shù)據(jù)源支持的數(shù)據(jù)類型不一樣,為了解決不同數(shù)據(jù)源間的數(shù)據(jù)同步以及不同類型轉換兼容的問題,我們支持了 DTS 類型系統(tǒng),DTS 類型可細化為基礎類型和復合類型,其中復合類型支持類型嵌套,具體轉換流程如下:

  1. 在 Source 端,將源數(shù)據(jù)類型,統(tǒng)一轉成系統(tǒng)內部的 DTS 類型
  2. 在 Sink 端,將系統(tǒng)內部的 DTS 類型轉換成目標數(shù)據(jù)源類型
  3. 其中 DTS 類型系統(tǒng)支持不同類型間的相互轉換,比如 String 類型與 Date 類型的相互轉換

基于Flink的MQ-Hive實時數(shù)據(jù)集成如何實現(xiàn)字節(jié)跳動  
DTS Dump架構圖  

Rolling Policy

Sink 端是并發(fā)寫入,每個 Task 處理的流量不一樣,為了避免生成太多的小文件或者生成的文件過大,需要支持自定義文件切分策略,以控制單個文件的大小。目前支持三種文件切分策略:文件大小、文件最長未更新時間、Checkpoint。

■ 優(yōu)化策略

Hive 支持 Parquet、Orc、Text 等多種存儲格式,不同的存儲格式數(shù)據(jù)寫入過程不太一樣,具體可以分為兩大類:

  1. RowFormat:基于單條寫入,支持按照 Offset 進行 HDFS Truncate 操作,例如 Text 格式
  2. BulkFormat:基于 Block 寫入,不支持 HDFS Truncate 操作,例如 Parquet、ORC 格式

為了保障 Exactly Once 語義,并同時支持 Parquet、Orc、Text 等多種格式,在每次 Checkpoint 時,強制做文件切分,保證所有寫入的文件都是完整的,Checkpoint 恢復時不用做 Truncate 操作。

容錯處理

理想情況下流式任務會一直運行不需要重啟,但實際不可避免會遇到以下幾個場景:

  1. Flink 計算引擎升級,需要重啟任務
  2. 上游數(shù)據(jù)增加,需要調整任務并發(fā)度
  3. Task Failover

■ 并發(fā)度調整

目前 Flink 原生支持 State Rescale。具體實現(xiàn)中,在 Task 做 Checkpoint Snapshot 時,將 MQ Offset 保存到 ListState 中;Job 重啟后,Job Master 會根據(jù) Operator 并發(fā)度,將 ListState 平均分配到各個 Task 上。

■ Task Failover

由于網(wǎng)絡抖動、寫入超時等外部因素的影響,Task 不可避免會出現(xiàn)寫入失敗,如何快速、準確的做 Task Failover 就顯得比較重要。目前 Flink 原生支持多種 Task Failover 策略,本文使用 Region Failover 策略,將失敗 Task 所在 Region 的所有 Task 都重啟。

異地容災


■ 背景


大數(shù)據(jù)時代,數(shù)據(jù)的準確性和實時性顯得尤為重要。本文提供多機房部署及異地容災解決方案,當主機房因為斷網(wǎng)、斷電、地震、火災等原因暫時無法對外提供服務時,能快速將服務切換到備災機房,并同時保障 Exactly Once 語義。

■ 容災組件


整體解決方案需要多個容災組件一起配合實現(xiàn),容災組件如下圖所示,主要包括 MQ、YARN、HDFS,具體如下:

  1. MQ 需要支持多機房部署,當主機房故障時,能將 Leader 切換到備機房,以供下游消費
  2. Yarn 集群在主機房、備機房都有部署,以便 Flink Job 遷移
  3. 下游 HDFS 需要支持多機房部署,當主機房故障時,能將 Master 切換到備機房
  4. Flink Job 運行在 Yarn 上,同時任務 State Backend 保存到 HDFS,通過 HDFS 的多機房支持保障 State Backend 的多機房

 

  基于Flink的MQ-Hive實時數(shù)據(jù)集成如何實現(xiàn)字節(jié)跳動  

 

■ 容災過程

整體容災過程如下所示:

  1. 正常情況下,MQ Leader 以及 HDFS Master 部署在主機房,并將數(shù)據(jù)同步到備機房。同時 Flink Job 運行在主機房,并將任務 State 寫入到 HDFS 中,注意 State 也是多機房部署模式
  2. 災難情況下,MQ Leader 以及 HDFS Master 從主機房遷移到備災機房,同時 Flink Job 也遷移到備災機房,并通過 State 恢復災難前的 Offset 信息,以提供 Exactly Once 語義

基于Flink的MQ-Hive實時數(shù)據(jù)集成如何實現(xiàn)字節(jié)跳動  
基于Flink的MQ-Hive實時數(shù)據(jù)集成如何實現(xiàn)字節(jié)跳動  

事件時間歸檔

■ 背景

在數(shù)倉建設中,處理時間(Process Time)和事件時間(Event Time)的處理邏輯不太一樣,對于處理時間會將數(shù)據(jù)寫到當前系統(tǒng)時間所對應的時間分區(qū)下;對于事件時間,則是根據(jù)數(shù)據(jù)的生產時間將數(shù)據(jù)寫到對應時間分區(qū)下,本文也簡稱為歸檔。

在實際場景中,不可避免會遇到各種上下游故障,并在持續(xù)一段時間后恢復,如果采用 Process Time 的處理策略,則事故期間的數(shù)據(jù)會寫入到恢復后的時間分區(qū)下,最終導致分區(qū)空洞或者數(shù)據(jù)漂移的問題;如果采用歸檔的策略,會按照事件時間寫入,則沒有此類問題。

由于上游數(shù)據(jù)事件時間會存在亂序,同時 Hive 分區(qū)生成后就不應該再繼續(xù)寫入,因此實際寫入過程中不可能做到無限歸檔,只能在一定時間范圍內歸檔。歸檔的難點在于如何確定全局最小歸檔時間以及如何容忍一定的亂序。

■  全局最小歸檔時間


Source 端是并發(fā)讀取,并且一個 Task 可能同時讀取多個 MQ Partition 的數(shù)據(jù),對于 MQ 的每一個 Parititon 會保存當前分區(qū)歸檔時間,取分區(qū)中最小值作為 Task 的最小歸檔時間,最終取 Task 中最小值,作為全局最小歸檔時間。

基于Flink的MQ-Hive實時數(shù)據(jù)集成如何實現(xiàn)字節(jié)跳動  

■ 亂序處理

為了支持亂序的場景,會支持一個歸檔區(qū)間的設置,其中 Global Min Watermark 為全局最小歸檔時間,Partition Watermark 為分區(qū)當前歸檔時間,Partition Min Watermark 為分區(qū)最小歸檔時間,只有當事件時間滿足以下條件時,才會進行歸檔:

  1. 事件時間大于全局最小歸檔時間
  2. 事件時間大于分區(qū)最小歸檔時間

 
基于Flink的MQ-Hive實時數(shù)據(jù)集成如何實現(xiàn)字節(jié)跳動  

 

Hive 分區(qū)生成

■ 原理

Hive 分區(qū)生成的難點在于如何確定分區(qū)的數(shù)據(jù)是否就緒以及如何添加分區(qū)。由于 Sink 端是并發(fā)寫入,同時會有多個 Task 寫同一個分區(qū)數(shù)據(jù),因此只有當所有 Task 分區(qū)數(shù)據(jù)寫入完成,才能認為分區(qū)數(shù)據(jù)是就緒,本文解決思路如下:

  1. 在 Sink 端,對于每個 Task 保存當前最小處理時間,需要滿足單調遞增的特性
  2. 在 Checkpoint Complete 時,Task 上報最小處理時間到 JM 端
  3. JM 拿到所有 Task 的最小處理時間后,可以得到全局最小處理時間,并以此作為 Hive 分區(qū)的最小就緒時間
  4. 當最小就緒時間更新時,可判斷是否添加 Hive 分區(qū)

 
基于Flink的MQ-Hive實時數(shù)據(jù)集成如何實現(xiàn)字節(jié)跳動  

■ 動態(tài)分區(qū)

動態(tài)分區(qū)是根據(jù)上游輸入數(shù)據(jù)的值,確定數(shù)據(jù)寫到哪個分區(qū)目錄,而不是寫到固定分區(qū)目錄,例如 date={date}/hour={hour}/app={app}的場景,根據(jù)分區(qū)時間以及 app 字段的值確定最終的分區(qū)目錄,以實現(xiàn)每個小時內,相同的 app 數(shù)據(jù)在同一個分區(qū)下。

在靜態(tài)分區(qū)場景下,每個 Task 每次只會寫入一個分區(qū)文件,但在動態(tài)分區(qū)場景下,每個 Task 可能同時寫入多個分區(qū)文件。對于 Parque 格式的寫入,會先將數(shù)據(jù)寫到做本地緩存,然后批次寫入到 Hive,當 Task 同時處理的文件句柄過多時,容易出現(xiàn) OOM。為了防止單 Task OOM,會周期性對文件句柄做探活檢測,及時釋放長時間沒有寫入的文件句柄。

基于Flink的MQ-Hive實時數(shù)據(jù)集成如何實現(xiàn)字節(jié)跳動  

Messenger

Messenger 模塊用于采集 Job 運行狀態(tài)信息,以便衡量 Job 健康度以及大盤指標建設。

元信息采集

元信息采集的原理如下所示,在 Sink 端通過 Messenger 采集 Task 的核心指標,例如流量、QPS、臟數(shù)據(jù)、寫入 Latency、事件時間寫入效果等,并通過 Messenger Collector 匯總。其中臟數(shù)據(jù)需要輸出到外部存儲中,任務運行指標輸出到 Grafana,用于大盤指標展示。

基于Flink的MQ-Hive實時數(shù)據(jù)集成如何實現(xiàn)字節(jié)跳動  

 

■ 臟數(shù)據(jù)收集

數(shù)據(jù)集成場景下,不可避免會遇到臟數(shù)據(jù),例如類型配置錯誤、字段溢出、類型轉換不兼容等場景。對于流式任務來說,由于任務會一直運行,因此需要能夠實時統(tǒng)計臟數(shù)據(jù)流量,并且將臟數(shù)據(jù)保存到外部存儲中以供排查,同時在運行日志中采樣輸出。

■ 大盤監(jiān)控


大盤指標覆蓋全局指標以及單個 Job 指標,包括寫入成功流量和 QPS、寫入 Latency、寫入失敗流量和 QPS、歸檔效果統(tǒng)計等,具體如下圖所示:

基于Flink的MQ-Hive實時數(shù)據(jù)集成如何實現(xiàn)字節(jié)跳動  
基于Flink的MQ-Hive實時數(shù)據(jù)集成如何實現(xiàn)字節(jié)跳動

以上是“基于Flink的MQ-Hive實時數(shù)據(jù)集成如何實現(xiàn)字節(jié)跳動”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業(yè)資訊頻道!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。

AI