溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何基于Flink+ClickHouse 構(gòu)建實時數(shù)據(jù)分析平臺

發(fā)布時間:2021-12-27 17:31:11 來源:億速云 閱讀:243 作者:柒染 欄目:大數(shù)據(jù)

今天就跟大家聊聊有關(guān)如何基于Flink+ClickHouse 構(gòu)建實時數(shù)據(jù)分析平臺,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

下面 主要介紹Flink-to-Hive 小時級場景和 Flink-to-ClickHouse 秒級場景。

一、業(yè)務(wù)場景與現(xiàn)狀分析

   

查詢的頁面分為離線查詢頁面和實時查詢頁面。今年所實現(xiàn)的改造是在實時查詢中接入了 ClickHouse 計算引擎。根據(jù)不同的業(yè)務(wù)場景,實時數(shù)據(jù)報表中會展現(xiàn)數(shù)據(jù)指標曲線圖和詳細的數(shù)據(jù)指標表。目前數(shù)據(jù)指標的采集和計算為每五分鐘一個時間窗口,當(dāng)然也存在三分鐘或一分鐘的特殊情況。數(shù)據(jù)指標數(shù)據(jù)全部從 Kafka 實時數(shù)據(jù)中導(dǎo)出,并導(dǎo)入 ClickHouse 進行計算。

如何基于Flink+ClickHouse 構(gòu)建實時數(shù)據(jù)分析平臺

 

二、Flink-to-Hive 小時級場景

1.小時級實現(xiàn)架構(gòu)圖


如下圖所示,Database 中的 Binlog 導(dǎo)出到 Kafka,同時 Log Server 數(shù)據(jù)也會上報到 Kafka。所有數(shù)據(jù)實時落地到 Kafka 之后,通過 Flink 抽取到 HDFS。下圖中 HDFS 到 Hive 之間為虛線,即 Flink 并非直接落地到 Hive,F(xiàn)link 落地到 HDFS 后,再落地到 Hive 的時間可能是小時級、半小時級甚至分鐘級,需要知道數(shù)據(jù)的 Event time 已經(jīng)到何時,再觸發(fā) alter table,add partition,add location 等,寫入其分區(qū)。

這時需要有一個程序監(jiān)控當(dāng)前 Flink 任務(wù)的數(shù)據(jù)時間已經(jīng)消費到什么時候,如9點的數(shù)據(jù),落地時需要查看 Kafka 中消費的數(shù)據(jù)是否已經(jīng)到達9點,然后在 Hive 中觸發(fā)分區(qū)寫入。

如何基于Flink+ClickHouse 構(gòu)建實時數(shù)據(jù)分析平臺


   

2.實現(xiàn)原理

主要使用了 Flink 高階版本的一個特性——StreamingFileSink。StreamingFileSink 主要有幾點功能。
  • 第一,  forBulkFormat 支持 avro、parquet 格式,即列式存儲格式。

  • 第二,  withBucketAssigner 自定義按數(shù)據(jù)時間分桶,此處會定義一個EventtimeBucket,既按數(shù)據(jù)時間進行數(shù)據(jù)落地到離線中。

  • 第三,  OnCheckPointRollingPolicy,根據(jù) CheckPoint 時間進行數(shù)據(jù)落地,在一定的 CheckPoint 時間內(nèi)數(shù)據(jù)落地并回穩(wěn)。按照 CheckPoint 落地還有其它策略,如按照數(shù)據(jù)大小。

  • 第四,  StreamingFileSink 是 Exactly-Once 語義實現(xiàn)。

 
Flink 中有兩個 Exactly-Once 語義實現(xiàn),第一個是 Kafka,第二個是 StreamingFileSink。下圖為 OnCheckPointRollingPolicy 設(shè)計的每10分鐘落地一次到HDFS文件中的 demo。

如何基于Flink+ClickHouse 構(gòu)建實時數(shù)據(jù)分析平臺

 
■ 如何實現(xiàn) Exactly-Once

下圖左側(cè)為一個簡單的二 PC 模型。Coordinator 發(fā)送一個 prepare,執(zhí)行者開始觸發(fā) ack 動作,Coordinator 收到 ack 所有消息后,所有 ack 開始觸發(fā) commit,所有執(zhí)行者進行落地,將其轉(zhuǎn)化到 Flink 的模型中,Source 收到 checkpoint barrier 流時,開始觸發(fā)一個 snapshot。

每個算子的 CheckPoint、snapshot 都完成之后,CheckPoint 會給 Job Manager 發(fā)送 notifyCheckpointComplete。下圖中二階段模型和 Flink 模型左側(cè)三條線部分是一致的。因此用 Flink 可以實現(xiàn)二階段提交協(xié)議。

如何基于Flink+ClickHouse 構(gòu)建實時數(shù)據(jù)分析平臺

 
■ 如何使用 Flink 實現(xiàn)二階段提交協(xié)議  
 
首先,StreamingFileSink 實現(xiàn)兩個接口,CheckpointedFunction 和CheckpointListener。  CheckpointedFunction 實現(xiàn) initializeState 和 snapshotState 函數(shù)。  CheckpointListener 是 notifyCheckpointComplete 的方法實現(xiàn),因此這兩個接口可以實現(xiàn)二階段提交語義。

  • initializeState 


initializeState 在任務(wù)啟動時會觸發(fā)三個動作。  第一個是 commitPendingFile。  實時數(shù)據(jù)落地到 Hdfs 上有三個狀態(tài)。  第一個狀態(tài)是 in-progress ,正在進行狀態(tài)。  第二個狀態(tài)是 pending 狀態(tài),第三個狀態(tài)是 finished 狀態(tài)。

 initializeState 在任務(wù)啟動時還會觸發(fā) restoreInProgressFile,算子實時寫入。  如果 CheckPoint 還未成功時程序出現(xiàn)問題,再次啟動時 initializeState 會 commit PendingFile,然后采用 Hadoop 2.7+ 版本的 truncate 方式重置或截斷 in-progress 文件。

  • invoke 


實時寫入數(shù)據(jù)。
  • snapshotState 


觸發(fā) CheckPoint 時會將 in-progress 文件轉(zhuǎn)化為 pending state,同時記錄數(shù)據(jù)長度(truncate 方式需要截斷長度)。  snapshotState 并非真正將數(shù)據(jù)寫入 HDFS,而是寫入 ListState。  Flink 在 Barrier 對齊狀態(tài)時內(nèi)部實現(xiàn) Exactly-Once 語義,但是實現(xiàn)外部端到端的 Exactly-Once 語義比較困難。  Flink 內(nèi)部實現(xiàn) Exactly-Once 通過 ListState,將數(shù)據(jù)全部存入 ListState,等待所有算子 CheckPoint 完成,再將 ListState 中的數(shù)據(jù)刷到 HDFS 中。

  • notifyCheckpointComplete 


notifyCheckpointComplete 會觸發(fā) pending 到 finished state 的數(shù)據(jù)寫入。  實現(xiàn)方法是 rename,Streaming 不斷向 HDFS 寫入臨時文件,所有動作結(jié)束后通過 rename 動作寫成正式文件。

如何基于Flink+ClickHouse 構(gòu)建實時數(shù)據(jù)分析平臺


   

3.跨集群多 nameservices

 
趣頭條的實時集群和離線集群是獨立的,離線集群有多套,實時集群目前有一套。通過實時集群寫入離線集群,會產(chǎn)生 HDFS nameservices 問題。在實時集群中將所有離線集群的 nameservices 用 namenode HA 的方式全部打入實時集群并不合適。那么如何在任務(wù)中通過實時集群提交到各個離線集群?

如下圖所示,在 Flink 任務(wù)的 resource 下面,在 HDFS 的 xml 中間加入 <final>。在 PropertyHong Kong 中添加 nameservices,如 stream 是實時集群的 namenode HA 配置,data 是即將寫入的離線集群的 namenode HA 配置。那么兩個集群中間的 HDFS set 不需要相互修改,直接可以在客戶端實現(xiàn)。

如何基于Flink+ClickHouse 構(gòu)建實時數(shù)據(jù)分析平臺


   

4.多用戶寫入權(quán)限

   
 
實時要寫入離線 HDFS,可能會涉及用戶權(quán)限問題。實時提交的用戶已經(jīng)定義好該用戶在所有程序中都是同一個用戶,但離線中是多用戶的,因此會造成實時和離線用戶不對等。趣頭條在 API 中添加了 withBucketUser 寫 HDFS。配置好 nameservices后,接下來只需要知道該 HDFS 路徑通過哪個用戶來寫,比如配置一個 stream 用戶寫入。

API 層級的好處是一個 Flink 程序可以指定多個不同的 HDFS 和不同的用戶。多用戶寫入的實現(xiàn)是在 Hadoop file system 中加一個 ugi.do as ,代理用戶。以上為趣頭條使用 Flink 方式進行實時數(shù)據(jù)同步到 Hive 的一些工作。其中可能會出現(xiàn)小文件問題,小文件是后臺程序進行定期 merge,如果 CheckPoint 間隔時間較短,如3分鐘一次,會出現(xiàn)大量小文件問題。

如何基于Flink+ClickHouse 構(gòu)建實時數(shù)據(jù)分析平臺

 

三、Flink-to-ClickHouse 秒級場景


 
   

1.秒級實現(xiàn)架構(gòu)圖

   
趣頭條目前有很多實時指標,平均每五分鐘或三分鐘計算一次,如果每一個實時指標用一個 Flink 任務(wù),或者一個 Flink SQL 來寫,比如消費一個 Kafka Topic,需要計算其日活、新增、流程等等當(dāng)用戶提出一個新需求時,需要改當(dāng)前的 Flink 任務(wù)或者啟動一個新的 Flink 任務(wù)消費 Topic。

因此會出現(xiàn) Flink 任務(wù)不斷修改或者不斷起新的 Flink 任務(wù)的問題。趣頭條嘗試在 Flink 后接入 ClickHouse,實現(xiàn)整體的 OLAP。下圖為秒級實現(xiàn)架構(gòu)圖。從 Kafka 到 Flink,到 Hive,到 ClickHouse 集群,對接外部 Horizon(實時報表),QE(實時 adhoc 查詢),千尋(數(shù)據(jù)分析),用戶畫像(實時圈人)。

如何基于Flink+ClickHouse 構(gòu)建實時數(shù)據(jù)分析平臺

   
   

2.Why Flink+ClickHouse

 
  • 指標實現(xiàn) sql 化描述:分析師提出的指標基本都以 SQL 進行描述。

  • 指標的上下線互不影響:一個 Flink 任務(wù)消費 Topic,如果還需要其它指標,可以保證指標的上下線互不影響。

  • 數(shù)據(jù)可回溯,方便異常排查:當(dāng)日活下降,需要回溯排查是哪些指標口徑的邏輯問題,比如是報的數(shù)據(jù)差異或是數(shù)據(jù)流 Kafka 掉了,或者是因為用戶沒有上報某個指標導(dǎo)致日活下降,而 Flink 則無法進行回溯。

  • 計算快,一個周期內(nèi)完成所有指標計算:需要在五分鐘內(nèi)將成百上千的所有維度的指標全部計算完成。

  • 支持實時流,分布式部署,運維簡單:支持 Kafka 數(shù)據(jù)實時流。


目前趣頭條 Flink 集群有 100+ 臺 32 核 128 G 3.5T SSD,日數(shù)據(jù)量 2000+ 億,日查詢量 21w+ 次,80% 查詢在 1s 內(nèi)完成。下圖為單表測試結(jié)果。ClickHouse 單表測試速度快。但受制于架構(gòu),ClickHouse 的 Join 較弱。

如何基于Flink+ClickHouse 構(gòu)建實時數(shù)據(jù)分析平臺


下圖是處理相對較為復(fù)雜的 SQL,count+group by+order by,ClickHouse 在 3.6s內(nèi)完成 26 億數(shù)據(jù)計算。

如何基于Flink+ClickHouse 構(gòu)建實時數(shù)據(jù)分析平臺

 
   

3.Why ClickHouse so Fast


 
ClickHouse 采用列式存儲 +LZ4、ZSTD 數(shù)據(jù)壓縮。其次,計算存儲結(jié)合本地化+向量化執(zhí)行。Presto 數(shù)據(jù)可能存儲在 Hadoop 集群或者 HDFS 中,實時拉取數(shù)據(jù)進行計算。而 ClickHouse 計算存儲本地化是指每一臺計算機器存在本地 SSD 盤,只需要計算自己的數(shù)據(jù),再進行節(jié)點合并。  同時,LSM merge tree+Index。  將數(shù)據(jù)寫入    ClickHouse 之后,會在后臺開始一個線程將數(shù)據(jù)進行 merge,做 Index 索引。  如建常見的 DT 索引和小時級數(shù)據(jù)索引,以提高查詢性能。  第四,SIMD+LLVM 優(yōu)化。  SIMD 是單指令多數(shù)據(jù)集。  第五,SQL 語法及 UDF 完善。  ClickHouse 對此有很大需求。  在數(shù)據(jù)分析或者維度下拽時需要更高的特性,如時間窗口的一部分功能點。
 
  • Merge Tree:如下圖所示。第一層為實時數(shù)據(jù)寫入。后臺進行每一層級數(shù)據(jù)的merge。merge 時會進行數(shù)據(jù)排序,做 Index 索引。

  • ClickHouse Connector:ClickHouse 有兩個概念,Local table 和Distributed table。一般是寫 Local table ,讀 Distributed table。ClickHouse 一般以 5~10w一個批次進行數(shù)據(jù)寫入,5s一個周期。趣頭條還實現(xiàn)了 RoundRobinClickHouseDataSource。

  • BalancedClickHouseDataSource MySQL 中配置一個 IP 和端口號就可以寫入數(shù)據(jù),而 BalancedClickHouseDataSource 需要寫 Local 表,因此必須要知道該集群有多少個 Local 表,每一個 Local 表的 IP 和端口號。如有一百臺機器,需要將一百臺機器的 IP 和端口號全部配置好,再進行寫入。BalancedClickHouseDataSource 有兩個 schedule。scheduleActualization和 scheduleConnectionsCleaning 。配置一百臺機器的 IP 和端口號,會出現(xiàn)某些機器不連接或者服務(wù)不響應(yīng)問題,scheduleActualization 會定期發(fā)現(xiàn)機器無法連接的問題,觸發(fā)下線或刪除 IP 等動作。scheduleConnectionsCleaning 會定期清理 ClickHouse 中無用的 http 請求。


如何基于Flink+ClickHouse 構(gòu)建實時數(shù)據(jù)分析平臺


 
  • RoundRobinClickHouseDataSource:趣頭條對BalancedClickHouseDataSource 進行加強的結(jié)果,實現(xiàn)了三個語義。testOnBorrow 設(shè)置為 true,嘗試 ping 看能否獲取連接。用 ClickHouse 寫入時是一個 batch,再將 testOnReturn 設(shè)置為 false,testWhileIdel 設(shè)置為true,填入官方 scheduleActualization 和 scheduleConnectionsCleaning 的功能。ClickHouse 后臺不斷進行 merge,如果 insert 過快使后臺 merge 速度變慢,跟不上 insert,出現(xiàn)報錯。因此需要盡量不斷往下寫,等寫完當(dāng)前機器,再寫下一個機器,以5s間隔進行寫入,使 merge 速度能夠盡量與 insert 速度保持一致。


如何基于Flink+ClickHouse 構(gòu)建實時數(shù)據(jù)分析平臺


   

4.Backfill


Flink 導(dǎo)入 ClickHouse,在數(shù)據(jù)查詢或展示報表時,會遇到一些問題,比如 Flink 任務(wù)出現(xiàn)故障、報錯或數(shù)據(jù)反壓等,或 ClickHouse 集群出現(xiàn)不可響應(yīng),zk 跟不上,insert 過快或集群負載等問題,這會導(dǎo)致整個任務(wù)出現(xiàn)問題。

如果流數(shù)據(jù)量突然暴增,啟動 Flink 可能出現(xiàn)一段時間內(nèi)不斷追數(shù)據(jù)的情況,需要進行調(diào)整并行度等操作幫助 Flink 追數(shù)據(jù)。但這時已經(jīng)出現(xiàn)數(shù)據(jù)積壓,若還要加大 Flink 并發(fā)度處理數(shù)據(jù),ClickHouse 限制 insert 不能過快,否則會導(dǎo)致惡性循環(huán)。因此當(dāng) Flink 故障或 ClickHouse 集群故障時,等待 ClickHouse 集群恢復(fù)后,F(xiàn)link 任務(wù)從最新數(shù)據(jù)開始消費,不再追過去一段時間的數(shù)據(jù),通過 Hive 將數(shù)據(jù)導(dǎo)入到 ClickHouse。

由于之前已經(jīng)通過 Kafka 將數(shù)據(jù)實時落地到 Hive,通過 Hive 將數(shù)據(jù)寫入 ClickHouse 中。ClickHouse 有分區(qū),只需要將上一個小時的數(shù)據(jù)刪除,導(dǎo)入 Hive 的一小時數(shù)據(jù),就可以繼續(xù)進行數(shù)據(jù)查詢操作。Backfill 提供了 Flink 任務(wù)小時級容錯以及 ClickHouse 集群小時級容錯機制。

如何基于Flink+ClickHouse 構(gòu)建實時數(shù)據(jù)分析平臺


未來發(fā)展與思考

1.Connector SQL 化

目前, Flink-to-Hive 以及 Flink-to-ClickHouse 都是趣頭條較為固化的場景,只需指定 HDFS 路徑以及用戶,其余過程都可以通過 SQL 化描述。
 
   

2.Delta lake

Flink 是流批一體計算引擎,但是沒有流批一體的存儲。趣頭條會用 HBase、Kudu、Redis 等能夠與 Flink 實時交互的 KV 存儲進行數(shù)據(jù)計算。如計算新增問題,目前趣頭條的方案是需要將 Hive 歷史用戶刷到 Redis 或 HBase 中,與 Flink 進行實時交互判斷用戶是否新增。
但因為 Hive 中的數(shù)據(jù)和 Redis 中的數(shù)據(jù)是存儲為兩份數(shù)據(jù)。其次 Binlog 抽取數(shù)據(jù)會涉及 delete 動作,Hbase,Kudu 支持數(shù)據(jù)修改,定期回到 Hive 中。帶來的問題是 HBase,Kudu 中存在數(shù)據(jù),Hive 又保存了一份數(shù)據(jù),多出一份或多份數(shù)據(jù)。如果有流批一體的存儲支持上述場景,當(dāng) Flink 任務(wù)過來,可以與離線數(shù)據(jù)進行實時交互,包括實時查詢 Hive 數(shù)據(jù)等,可以實時判斷用戶是否新增,對數(shù)據(jù)進行實時修改、更新或 delete,也能支持 Hive 的批的動作存儲。

看完上述內(nèi)容,你們對如何基于Flink+ClickHouse 構(gòu)建實時數(shù)據(jù)分析平臺有進一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI