您好,登錄后才能下訂單哦!
本篇文章為大家展示了基于Flink的典型ETL場(chǎng)景是怎么實(shí)現(xiàn),內(nèi)容簡明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。
下面將從數(shù)倉誕生的背景、數(shù)倉架構(gòu)、離線與實(shí)時(shí)數(shù)倉的對(duì)比著手,綜述數(shù)倉發(fā)展演進(jìn),然后分享基于 Flink 實(shí)現(xiàn)典型 ETL 場(chǎng)景的幾個(gè)方案。
我們先來回顧一下數(shù)據(jù)倉庫的概念。
數(shù)據(jù)倉庫的概念是于90年代由 Bill Inmon 提出, 當(dāng)時(shí)的背景是傳統(tǒng)的 OLTP 數(shù)據(jù)庫無法很好的支持長周期分析決策場(chǎng)景,所以數(shù)據(jù)倉庫概念的4個(gè)核心點(diǎn),我們要結(jié)合著 OLTP 數(shù)據(jù)庫當(dāng)時(shí)的狀態(tài)來對(duì)比理解。
面向主題的:數(shù)據(jù)倉庫的數(shù)據(jù)組織方式與 OLTP 面向事務(wù)處理不同。因?yàn)閿?shù)據(jù)倉庫是面向分析決策的,所以數(shù)據(jù)經(jīng)常按分析場(chǎng)景或者是分析對(duì)象等主題形式來組織。
集成的:對(duì)于數(shù)據(jù)倉庫來說,經(jīng)常需要去集合多個(gè)分散的、異構(gòu)的數(shù)據(jù)源,做一些數(shù)據(jù)清洗等 ETL 處理,整合成一塊數(shù)據(jù)倉庫,OLTP 則不需要做類似的集成操作。
相對(duì)穩(wěn)定的:OLTP 數(shù)據(jù)庫一般都是面向業(yè)務(wù)的,它主要的作用是把當(dāng)前的業(yè)務(wù)狀態(tài)精準(zhǔn)的反映出來,所以 OLTP 數(shù)據(jù)庫需要支持大量的增、刪、改的操作。但是對(duì)于數(shù)據(jù)倉庫來說,只要是入倉存下來的數(shù)據(jù),一般使用場(chǎng)景都是查詢,因此數(shù)據(jù)是相對(duì)穩(wěn)定的。
反映歷史變化:數(shù)據(jù)倉庫是反映歷史變化的數(shù)據(jù)集合,可以理解成它會(huì)將歷史的一些數(shù)據(jù)的快照存下來。而對(duì)于 OLTP 數(shù)據(jù)庫來說,只要反映當(dāng)時(shí)的最新的狀態(tài)就可以了。
以上這4個(gè)點(diǎn)是數(shù)據(jù)倉庫的一個(gè)核心的定義。我們也可以看出對(duì)于實(shí)時(shí)數(shù)據(jù)倉庫來說,傳統(tǒng)數(shù)據(jù)倉庫也就是離線數(shù)據(jù)倉庫中的一些定義會(huì)被弱化,比如說在反映歷史變化這一點(diǎn)。介紹完數(shù)據(jù)倉庫的基本概念,簡單說下數(shù)據(jù)倉庫建模這塊會(huì)用到一些經(jīng)典的建模方法,主要有范式建模、維度建模和 Data Vault。在互聯(lián)網(wǎng)大數(shù)據(jù)場(chǎng)景下,用的最多的是維度建模方法。
然后先看一下離線數(shù)倉的經(jīng)典架構(gòu)。
這個(gè)數(shù)倉架構(gòu)主要是偏向互聯(lián)網(wǎng)大數(shù)據(jù)的場(chǎng)景方案,由上圖可以看出有三個(gè)核心環(huán)節(jié)。
1.第一個(gè)環(huán)節(jié)是數(shù)據(jù)源部分,一般互聯(lián)網(wǎng)公司的數(shù)據(jù)源主要有兩類:
第1類是通過在客戶端埋點(diǎn)上報(bào),收集用戶的行為日志,以及一些后端日志的日志類型數(shù)據(jù)源。對(duì)于埋點(diǎn)行為日志來說,一般會(huì)經(jīng)過一個(gè)這樣的流程,首先數(shù)據(jù)會(huì)上報(bào)到 Nginx 然后經(jīng)過 Flume 收集,然后存儲(chǔ)到 Kafka 這樣的消息隊(duì)列,然后再由實(shí)時(shí)或者離線的一些拉取的任務(wù),拉取到我們的離線數(shù)據(jù)倉庫 HDFS。
第2類數(shù)據(jù)源是業(yè)務(wù)數(shù)據(jù)庫,而對(duì)于業(yè)務(wù)數(shù)據(jù)庫的話,一般會(huì)經(jīng)過 Canal 收集它的 binlog,然后也是收集到消息隊(duì)列中,最終再由 Camus 拉取到 HDFS。
這兩部分?jǐn)?shù)據(jù)源最終都會(huì)落地到 HDFS 中的 ODS 層,也叫貼源數(shù)據(jù)層,這層數(shù)據(jù)和原始數(shù)據(jù)源是保持一致的。
2.第二個(gè)環(huán)節(jié)是離線數(shù)據(jù)倉庫,是圖中藍(lán)色的框展示的部分??梢钥吹剿且粋€(gè)分層的結(jié)構(gòu),其中的模型設(shè)計(jì)是依據(jù)維度建模思路。
最底層是 ODS 層,這一層將數(shù)據(jù)保持無信息損失的存放在 HDFS,基本保持原始的日志數(shù)據(jù)不變。
在 ODS 層之上,一般會(huì)進(jìn)行統(tǒng)一的數(shù)據(jù)清洗、歸一,就得到了 DWD 明細(xì)數(shù)據(jù)層。這一層也包含統(tǒng)一的維度數(shù)據(jù)。
然后基于 DWD 明細(xì)數(shù)據(jù)層,我們會(huì)按照一些分析場(chǎng)景、分析實(shí)體等去組織我們的數(shù)據(jù),組織成一些分主題的匯總數(shù)據(jù)層 DWS。
在 DWS 之上,我們會(huì)面向應(yīng)用場(chǎng)景去做一些更貼近應(yīng)用的 APP 應(yīng)用數(shù)據(jù)層,這些數(shù)據(jù)應(yīng)該是高度匯總的,并且能夠直接導(dǎo)入到我們的應(yīng)用服務(wù)去使用。
在中間的離線數(shù)據(jù)倉庫的生產(chǎn)環(huán)節(jié),一般都是采用一些離線生產(chǎn)的架構(gòu)引擎,比如說 MapReduce、Hive、Spark 等等,數(shù)據(jù)一般是存在 HDFS 上。
3.經(jīng)過前兩個(gè)環(huán)節(jié)后,我們的一些應(yīng)用層的數(shù)據(jù)會(huì)存儲(chǔ)到數(shù)據(jù)服務(wù)里,比如說 HBase 、Redis、Kylin 這樣的一些 KV 的存儲(chǔ)。并且會(huì)針對(duì)存在這些數(shù)據(jù)存儲(chǔ)上的一些數(shù)據(jù),封裝對(duì)應(yīng)的服務(wù)接口,對(duì)外提供服務(wù)。在最外層我們會(huì)去產(chǎn)出一些面向業(yè)務(wù)的報(bào)表、面向分析的數(shù)據(jù)產(chǎn)品,以及會(huì)支持線上的一些業(yè)務(wù)產(chǎn)品等等。這一層的話,稱之為更貼近業(yè)務(wù)端的數(shù)據(jù)應(yīng)用部分。
以上是一個(gè)基本的離線數(shù)倉經(jīng)典架構(gòu)的介紹。
大家都了解到現(xiàn)在隨著移動(dòng)設(shè)備的普及,我們逐漸的由制造業(yè)時(shí)代過渡到了互聯(lián)網(wǎng)時(shí)代。在制造業(yè)的時(shí)代,傳統(tǒng)的數(shù)倉,主要是為了去支持以前的一些傳統(tǒng)行業(yè)的企業(yè)的業(yè)務(wù)決策者、管理者,去做一些業(yè)務(wù)決策。那個(gè)時(shí)代的業(yè)務(wù)決策周期是比較長的,同時(shí)當(dāng)時(shí)的數(shù)據(jù)量較小,Oracle、DB2 這一類數(shù)據(jù)庫就已經(jīng)足夠存了。
但隨著分布式計(jì)算技術(shù)的發(fā)展、智能化技術(shù)發(fā)展、以及整體算力的提升、互聯(lián)網(wǎng)的發(fā)展等等因素,我們現(xiàn)在在互聯(lián)網(wǎng)上收集的數(shù)據(jù)量,已經(jīng)呈指數(shù)級(jí)的增長。并且業(yè)務(wù)不再只依賴人做決策,做決策的主體很大部分已轉(zhuǎn)變?yōu)橛?jì)算機(jī)算法,比如一些智能推薦場(chǎng)景等等。所以這個(gè)時(shí)候決策的周期,就由原來的天級(jí)要求提升到秒級(jí),決策時(shí)間是非常短的。在場(chǎng)景上的話,也會(huì)面對(duì)更多的需要實(shí)時(shí)數(shù)據(jù)處理的場(chǎng)景,例如實(shí)時(shí)的個(gè)性化推薦、廣告的場(chǎng)景、甚至一些傳統(tǒng)企業(yè)已經(jīng)開始實(shí)時(shí)監(jiān)控加工的產(chǎn)品是否有質(zhì)量問題,以及金融行業(yè)重度依賴的反作弊等等。因此在這樣的一個(gè)背景下,實(shí)時(shí)數(shù)倉就必須被提出來了。
首先跟大家介紹一下實(shí)時(shí)數(shù)倉經(jīng)典架構(gòu) - Lambda 架構(gòu):
這個(gè)架構(gòu)是 Storm 的作者提出來的,其實(shí) Lambda 架構(gòu)的主要思路是在原來離線數(shù)倉架構(gòu)的基礎(chǔ)上疊加上實(shí)時(shí)數(shù)倉的部分,然后將離線的存量數(shù)據(jù)與我們 t+0 的實(shí)時(shí)的數(shù)據(jù)做一個(gè) merge,就可以產(chǎn)生數(shù)據(jù)狀態(tài)實(shí)時(shí)更新的結(jié)果。
和上述1.1離線數(shù)據(jù)倉庫架構(gòu)圖比較可以明顯的看到,實(shí)時(shí)數(shù)倉增加的部分是上圖黃色的這塊區(qū)域。我們一般會(huì)把實(shí)時(shí)數(shù)倉數(shù)據(jù)放在 Kafka 這樣的消息隊(duì)列上,也會(huì)有維度建模的一些分層,但是在匯總數(shù)據(jù)的部分,我們不會(huì)將 APP 層的一些數(shù)據(jù)放在實(shí)時(shí)數(shù)倉,而是更多的會(huì)移到數(shù)據(jù)服務(wù)側(cè)去做一些計(jì)算。
然后在實(shí)時(shí)計(jì)算的部分,我們經(jīng)常會(huì)使用 Flink、Spark-streaming 和 Storm 這樣的計(jì)算引擎,時(shí)效性上,由原來的天級(jí)、小時(shí)級(jí)可以提升到秒級(jí)、分鐘級(jí)。
大家也可以看到這個(gè)架構(gòu)圖中,中間數(shù)據(jù)倉庫環(huán)節(jié)有兩個(gè)部分,一個(gè)是離線的數(shù)據(jù)倉庫,一個(gè)是實(shí)時(shí)的數(shù)據(jù)倉庫。我們必須要運(yùn)維兩套(實(shí)時(shí)計(jì)算和離線計(jì)算)引擎,并且在代碼層面,我們也需要去實(shí)現(xiàn)實(shí)時(shí)和離線的業(yè)務(wù)代碼。然后在合并的時(shí)候,我們需要保證實(shí)施和離線的數(shù)據(jù)一致性,所以但凡我們的代碼做變更,我們也需要去做大量的這種實(shí)時(shí)離線數(shù)據(jù)的對(duì)比和校驗(yàn)。其實(shí)這對(duì)于不管是資源還是運(yùn)維成本來說都是比較高的。這是 Lamda 架構(gòu)上比較明顯和突出的一個(gè)問題。因此就產(chǎn)生了 Kappa 結(jié)構(gòu)。
Kappa 架構(gòu)的一個(gè)主要的思路就是在數(shù)倉部分移除了離線數(shù)倉,數(shù)倉的生產(chǎn)全部采用實(shí)時(shí)數(shù)倉。從上圖可以看到剛才中間的部分,離線數(shù)倉模塊已經(jīng)沒有了。
關(guān)于 Kappa 架構(gòu),熟悉實(shí)時(shí)數(shù)倉生產(chǎn)的同學(xué),可能會(huì)有一個(gè)疑問。因?yàn)槲覀兘?jīng)常會(huì)面臨業(yè)務(wù)變更,所以很多業(yè)務(wù)邏輯是需要去迭代的。之前產(chǎn)出的一些數(shù)據(jù),如果口徑變更了,就需要重算,甚至重刷歷史數(shù)據(jù)。對(duì)于實(shí)時(shí)數(shù)倉來說,怎么去解決數(shù)據(jù)重算問題?
Kappa 架構(gòu)在這一塊的思路是:首先要準(zhǔn)備好一個(gè)能夠存儲(chǔ)歷史數(shù)據(jù)的消息隊(duì)列,比如 Kafka,并且這個(gè)消息對(duì)列是可以支持你從某個(gè)歷史的節(jié)點(diǎn)重新開始消費(fèi)的。 接著需要新起一個(gè)任務(wù),從原來比較早的一個(gè)時(shí)間節(jié)點(diǎn)去消費(fèi) Kafka 上的數(shù)據(jù),然后當(dāng)這個(gè)新的任務(wù)運(yùn)行的進(jìn)度已經(jīng)能夠和現(xiàn)在的正在跑的任務(wù)齊平的時(shí)候,你就可以把現(xiàn)在任務(wù)的下游切換到新的任務(wù)上面,舊的任務(wù)就可以停掉,并且原來產(chǎn)出的結(jié)果表也可以被刪掉。
隨著我們現(xiàn)在實(shí)時(shí) OLAP 技術(shù)的一些提升,有一個(gè)新的實(shí)時(shí)架構(gòu)被提了出來,這里暫且稱為實(shí)時(shí) OLAP 變體。
這個(gè)思路是把大量的聚合、分析、計(jì)算由實(shí)時(shí) OLAP 引擎來承擔(dān)。在實(shí)時(shí)數(shù)倉計(jì)算的部分,我們不需要做的特別重,尤其是聚合相關(guān)的一些邏輯,然后這樣就可以保障我們?cè)跀?shù)據(jù)應(yīng)用層能靈活的面對(duì)各種業(yè)務(wù)分析的需求變更,整個(gè)架構(gòu)更加靈活。
最后我們來整體對(duì)比一下,實(shí)時(shí)數(shù)倉的這幾種架構(gòu)
這是整體三個(gè)關(guān)于實(shí)時(shí)數(shù)倉架構(gòu)的一個(gè)對(duì)比:
從計(jì)算引擎角度:Lamda 架構(gòu)它需要去維護(hù)批流兩套計(jì)算引擎,Kappa 架構(gòu)和實(shí)時(shí) OLAP 變體只需要維護(hù)流計(jì)算引擎就好了。
開發(fā)成本:對(duì) Lamda 架構(gòu)來說,因?yàn)樗枰S護(hù)實(shí)時(shí)離線兩套代碼,所以它的開發(fā)成本會(huì)高一些。 Kappa 架構(gòu)和實(shí)時(shí) OLAP 變體只用維護(hù)一套代碼就可以了。
分析靈活性:實(shí)時(shí) OLAP 變體是相對(duì)最靈活的。
在實(shí)時(shí) OLAP 引擎依賴上:實(shí)時(shí) OLAP 變體是強(qiáng)依賴實(shí)時(shí) OLAP 變體引擎的能力的,前兩者則不強(qiáng)依賴。
計(jì)算資源:Lamda 架構(gòu)需要批流兩套計(jì)算資源,Kappa 架構(gòu)只需要流計(jì)算資源,實(shí)時(shí) OLAP 變體需要額外的 OLAP 資源。
邏輯變更重算:Lamda 架構(gòu)是通過批處理來重算的,Kappa 架構(gòu)需要按照前面介紹的方式去重新消費(fèi)消息隊(duì)列重算,實(shí)時(shí) OLAP 變體也需要重新消費(fèi)消息隊(duì)列,并且這個(gè)數(shù)據(jù)還要重新導(dǎo)入到 OLAP 引擎里,去做計(jì)算。
然后我們來看一下傳統(tǒng)數(shù)倉和實(shí)時(shí)數(shù)倉整體的差異。
首先從時(shí)效性來看:離線數(shù)倉是支持小時(shí)級(jí)和天級(jí)的,實(shí)時(shí)數(shù)倉到秒級(jí)分鐘級(jí),所以實(shí)時(shí)數(shù)倉時(shí)效性是非常高的。
在數(shù)據(jù)存儲(chǔ)方式來看:離線數(shù)倉它需要存在HDFS和RDS上面,實(shí)時(shí)數(shù)倉一般是存在消息隊(duì)列,還有一些kv存儲(chǔ),像維度數(shù)據(jù)的話會(huì)更多的存在kv存儲(chǔ)上。
在生產(chǎn)加工過程方面,離線數(shù)倉需要依賴離線計(jì)算引擎以及離線的調(diào)度。 但對(duì)于實(shí)時(shí)數(shù)倉來說,主要是依賴實(shí)時(shí)計(jì)算引擎。
這里我們主要介紹兩大實(shí)時(shí) ETL 場(chǎng)景:維表 join 和雙流 join。
維表 join
預(yù)加載維表
熱存儲(chǔ)關(guān)聯(lián)
廣播維表
Temporal table function join
雙流 join
離線join vs. 實(shí)時(shí)join
Regular join
Interval join
Window join
方案1:
將維表全量預(yù)加載到內(nèi)存里去做關(guān)聯(lián),具體的實(shí)現(xiàn)方式就是我們定義一個(gè)類,去實(shí)現(xiàn) RichFlatMapFunction,然后在 open 函數(shù)中讀取維度數(shù)據(jù)庫,再將數(shù)據(jù)全量的加載到內(nèi)存,然后在 probe 流上使用算子 ,運(yùn)行時(shí)與內(nèi)存維度數(shù)據(jù)做關(guān)聯(lián)。
這個(gè)方案的優(yōu)點(diǎn)就是實(shí)現(xiàn)起來比較簡單,缺點(diǎn)也比較明顯,因?yàn)槲覀円衙總€(gè)維度數(shù)據(jù)都加載到內(nèi)存里面,所以它只支持少量的維度數(shù)據(jù)。同時(shí)如果我們要去更新維表的話,還需要重啟作業(yè),所以它在維度數(shù)據(jù)的更新方面代價(jià)是有點(diǎn)高的,而且會(huì)造成一段時(shí)間的延遲。對(duì)于預(yù)加載維表來說,它適用的場(chǎng)景就是小維表,變更頻率訴求不是很高,且對(duì)于變更的及時(shí)性的要求也比較低的這種場(chǎng)景。
這里定義了一個(gè) DimFlatMapFunction 來實(shí)現(xiàn) RichFlatMapFunction。其中有一個(gè) Map 類型的 dim,其實(shí)就是為了之后在讀取 DB 的維度數(shù)據(jù)以后,可以用于存放我們的維度數(shù)據(jù),然后在 open 函數(shù)里面我們需要去連接我們的 DB,進(jìn)而獲取 DB 里的數(shù)據(jù)。然后在下面代碼可以看到我們的場(chǎng)景是從一個(gè)商品表里面去取出商品的 ID、商品的名字。然后我們?cè)讷@取到 DB 里面的維度數(shù)據(jù)以后會(huì)把它存放到 dim 里面。
接下來在 flatMap 函數(shù)里面我們就會(huì)使用到 dim,我們?cè)讷@取了 probe 流的數(shù)據(jù)以后,我們會(huì)去 dim 里面比較。是否含有同樣的商品 ID 的數(shù)據(jù),如果有的話就把相關(guān)的商品名稱 append 到數(shù)據(jù)元組,然后做一個(gè)輸出。這就是一個(gè)基本的流程。
其實(shí)這是一個(gè)基本最初版的方案實(shí)現(xiàn)。但這個(gè)方案也有一個(gè)改進(jìn)的方式,就是在 open 函數(shù)里面,可以新建一個(gè)線程,定時(shí)的去加載維表。這樣就不需要人工的去重啟 job 來讓維度數(shù)據(jù)做更新,可以實(shí)現(xiàn)一個(gè)周期性的維度數(shù)據(jù)的更新。
方案2:
通過 Distributed cash 的機(jī)制去分發(fā)本地的維度文件到task manager后再加載到內(nèi)存做關(guān)聯(lián)。實(shí)現(xiàn)方式可以分為三步:
第1步是通過 env.registerCached 注冊(cè)文件。
第2步是實(shí)現(xiàn) RichFunction,在 open 函數(shù)里面通過 RuntimeContext 來獲取 cache 文件。
第3步是解析和使用這部分文件數(shù)據(jù)。
這種方式的一個(gè)優(yōu)點(diǎn)是你不需要去準(zhǔn)備或者依賴外部數(shù)據(jù)庫,缺點(diǎn)就是因?yàn)閿?shù)據(jù)也是要加載到內(nèi)存中,所以支持的維表數(shù)據(jù)量也是比較小。而且如果這個(gè)維度數(shù)據(jù)需要做更新,也需要重啟作業(yè)。 因此在正規(guī)的生產(chǎn)過程中不太建議使用這個(gè)方案,因?yàn)槠鋵?shí)從數(shù)倉角度,希望所有的數(shù)據(jù)都能夠通過 schema 化方式來管理。把數(shù)據(jù)放在文件里面去做這樣一個(gè)操作,不利于我們做整體數(shù)據(jù)的管理和規(guī)范化。所以這個(gè)方式的話,大家在做一些小的 demo 的時(shí)候,或者一些測(cè)試的時(shí)候可以去使用。
那么它適用的場(chǎng)景就是維度數(shù)據(jù)是文件形式的、數(shù)據(jù)量比較小、并且更新的頻率也比較低的一些場(chǎng)景,比如說我們讀一個(gè)靜態(tài)的碼表、配置文件等等。
維表 join 里第二類大的實(shí)現(xiàn)思路是熱存儲(chǔ)關(guān)聯(lián)。具體是我們把維度數(shù)據(jù)導(dǎo)入到像 Redis、Tair、HBase 這樣的一些熱存儲(chǔ)中,然后通過異步 IO 去查詢,并且疊加使用 Cache 機(jī)制,還可以加一些淘汰的機(jī)制,最后將維度數(shù)據(jù)緩存在內(nèi)存里,來減輕整體對(duì)熱存儲(chǔ)的訪問壓力。
如上圖展示的這樣的一個(gè)流程。在 Cache 這塊的話,比較推薦谷歌的 Guava Cache,它封裝了一些關(guān)于 Cache 的一些異步的交互,還有 Cache 淘汰的一些機(jī)制,用起來是比較方便的。
剛才的實(shí)驗(yàn)方案里面有兩個(gè)重要點(diǎn),一個(gè)就是我們需要用異步 IO 方式去訪問存儲(chǔ),這里也跟大家一起再回顧一下同步 IO 與異步 IO 的區(qū)別:
對(duì)于同步 IO 來說,發(fā)出一個(gè)請(qǐng)求以后,必須等待請(qǐng)求返回以后才能繼續(xù)去發(fā)新的 request。所以整體吞吐是比較小的。由于實(shí)時(shí)數(shù)據(jù)處理對(duì)于延遲特別關(guān)注,這種同步 IO 的方式,在很多場(chǎng)景是不太能夠接受的。
異步 IO 就是可以并行發(fā)出多個(gè)請(qǐng)求,整個(gè)吞吐是比較高的,延遲會(huì)相對(duì)低很多。如果使用異步 IO 的話,它對(duì)于外部存儲(chǔ)的吞吐量上升以后,會(huì)使得外部存儲(chǔ)有比較大的壓力,有時(shí)也會(huì)成為我們整個(gè)數(shù)據(jù)處理上延遲的瓶頸。所以引入 Cache 機(jī)制是希望通過 Cache 來去減少我們對(duì)外部存儲(chǔ)的訪問量。
剛才提到的 Cuava Cache,它的使用是非常簡單的,大家可以去嘗試使用。對(duì)于熱存儲(chǔ)關(guān)聯(lián)方案來說,它的優(yōu)點(diǎn)就是維度數(shù)據(jù)因?yàn)椴挥萌考虞d在內(nèi)存里,所以就不受限于內(nèi)存大小,維度數(shù)據(jù)量可以更多。在美團(tuán)點(diǎn)評(píng)的流量場(chǎng)景里面,我們的維度數(shù)據(jù)可以支持到 10 億量級(jí)。另一方面該方案的缺點(diǎn)也是比較明顯的,我們需要依賴熱存儲(chǔ)資源,而且維度的更新反饋到結(jié)果是有一定延遲的。 因?yàn)槲覀兪紫刃枰褦?shù)據(jù)導(dǎo)入到熱存儲(chǔ),然后同時(shí)在 Cache 過期的時(shí)間上也會(huì)有損失。
總體來說這個(gè)方法適用的場(chǎng)景是維度數(shù)據(jù)量比較大,又能夠接受維度更新有一定延遲的情況。
第三個(gè)大的思路是廣播維表,主要是利用 broadcast State 將維度數(shù)據(jù)流廣播到下游 task 做 join。
實(shí)現(xiàn)方式:
將維度數(shù)據(jù)發(fā)送到 Kafka 作為廣播原始流 S1
定義狀態(tài)描述符 MapStateDescriptor。調(diào)用 S1.broadcast(),獲得 broadCastStream S2
調(diào)用非廣播流 S3.connect(S2),得到 BroadcastConnectedStream S4
在 KeyedBroadcastProcessFunction/BroadcastProcessFunction 實(shí)現(xiàn)關(guān)聯(lián)處理邏輯,并作為參數(shù)調(diào)用 S4.process()
這個(gè)方案,它的優(yōu)點(diǎn)是維度的變更可以及時(shí)的更新到結(jié)果。然后缺點(diǎn)就是數(shù)據(jù)還是需要保存在內(nèi)存中,因?yàn)樗谴嬖?state 里的,所以支持維表數(shù)據(jù)量仍然不是很大。適用的場(chǎng)景就是我們需要時(shí)時(shí)的去感知維度的變更,且維度數(shù)據(jù)又可以轉(zhuǎn)化為實(shí)時(shí)流。
下面是一個(gè)小的 demo:
我們這里面用到的廣播流 pageStream,它其實(shí)是定義了一個(gè)頁面 ID 和頁面的名稱。對(duì)于非廣播流 probeStream,它是一個(gè) json 格式的 string,里面包含了設(shè)備 ID、頁面的 ID、還有時(shí)間戳,我們可以理解成用戶在設(shè)備上做 PV 訪問的行為記錄。
整個(gè)實(shí)現(xiàn)來看,就是遵循上述4個(gè)步驟:
第1步驟是要定義廣播的狀態(tài)描述符。
第2步驟我們這里去生成 broadCastStream。
第3步驟的話我們就需要去把兩個(gè) stream 做 connect。
第4步最主要的一個(gè)環(huán)節(jié)就是需要實(shí)現(xiàn) BroadcastProcessFunction。第1個(gè)參數(shù)是我們的 probeStream,第2個(gè)參數(shù)是廣播流的數(shù)據(jù),第3個(gè)參數(shù)就是我們的要輸出的數(shù)據(jù),可以看到主要的數(shù)據(jù)處理邏輯是在processElement里面。
在數(shù)據(jù)處理過程中,我們首先通過 context 來獲取我們的 broadcastStateDesc,然后解析 probe 流的數(shù)據(jù),最終獲取到對(duì)應(yīng)的一個(gè) pageid。接著就在我們剛才拿到了 state 里面去查詢是否有同樣的 pageid,如果能夠找到對(duì)應(yīng)的 pageid 話,就把對(duì)應(yīng)的 pagename 添加到我們整個(gè) json stream 去做輸出。
介紹完了上面的方法以后,還有一種比較重要的方法是用 Temporal table function join。首先說明一下什么是 Temporal table?它其實(shí)是一個(gè)概念:就是能夠返回持續(xù)變化表的某一時(shí)刻數(shù)據(jù)內(nèi)容的視圖,持續(xù)變化表也就是 changingtable,可以是一個(gè)實(shí)時(shí)的 changelog 的數(shù)據(jù),也可以是放在外部存儲(chǔ)上的一個(gè)物化的維表。
它的實(shí)現(xiàn)是通過 UDTF 去做 probe 流和 Temporal table 的 join,稱之 Temporal table function join。這種 join 的方式,它適用的場(chǎng)景是維度數(shù)據(jù)為 changelog 流的形式,而且我們有需要按時(shí)間版本去關(guān)聯(lián)的訴求。
首先來看一個(gè)例子,這里使用的是官網(wǎng)關(guān)于匯率和貨幣交易的一個(gè)例子。對(duì)于我們的維度數(shù)據(jù)來說,也就是剛剛提到的 changelog stream,它是 RateHistory。它反映的是不同的貨幣相對(duì)于日元來說,不同時(shí)刻的匯率。
第1個(gè)字段是時(shí)間,第2個(gè)字段是 currency 貨幣。第3個(gè)字段是相對(duì)日元的匯率,然后在我們的 probe table 來看的話,它定義的是購買不同貨幣的訂單的情況。比如說在 10:15 購買了兩歐元,該表記錄的是貨幣交易的一個(gè)情況。在這個(gè)例子里面,我們要求的是購買貨幣的總的日元交易額,如何通 Temporal table function join 來去實(shí)現(xiàn)我們這個(gè)目標(biāo)呢?
第1步首先我們要在 changelog 流上面去定義 TemporalTableFunction,這里面有兩個(gè)關(guān)鍵的參數(shù)是必要的。第1個(gè)參數(shù)就是能夠幫我們?nèi)プR(shí)別版本信息的一個(gè) time attribute,第2個(gè)參數(shù)是需要去做關(guān)聯(lián)的組件,這里的話我們選擇的是 currency。
接著的話我們?cè)?tableEnv 里面去注冊(cè) TemporalTableFunction 的名字。
然后我們來看一下我們注冊(cè)的 TemporalTableFunction,它能夠起到什么樣的效果。
比如說如果我們使用 rates 函數(shù),去獲取11:50的狀態(tài)??梢钥吹綄?duì)于美元來說,它在11:50的狀態(tài)其實(shí)落在11:49~11:56這個(gè)區(qū)間的,所以選取的是99。然后對(duì)于歐元來說,11:50的時(shí)刻是落在11:15和12:10之間的,所以我們會(huì)選取119這樣的一條數(shù)據(jù)。它其實(shí)實(shí)現(xiàn)的是我們?cè)谝粍傞_始定義的 TemporalTable 的概念,能夠獲取到 changelog 某一時(shí)刻有效數(shù)據(jù)。定義好 TemporalTableFunction 以后,我們就要需要使用這個(gè) Function,具體實(shí)現(xiàn)業(yè)務(wù)邏輯。
大家注意這里需要去指定我們具體需要用到的 join key。比如說因?yàn)閮蓚€(gè)流都是在一直持續(xù)更新的,對(duì)于我們的 order table 里面 11:00 的這一條記錄來說,關(guān)聯(lián)到的就是歐元在 10:45 這一條狀態(tài),然后它是 116,所以最后的結(jié)果就是 232。
剛才介紹的就是 Temporal table function join 的用法。
然后來整體回顧一下在維表 join 這塊,各個(gè)維度 join 的一些差異,便于我們更好的去理解各個(gè)方法適用的場(chǎng)景。
在實(shí)現(xiàn)復(fù)雜度上面的:除了熱存儲(chǔ)關(guān)聯(lián)稍微復(fù)雜一些,其它的實(shí)現(xiàn)方式基本上復(fù)雜度是比較低的。
在維表數(shù)據(jù)量上:熱存儲(chǔ)關(guān)聯(lián)和 Temporal table function join 兩種方式可以支持比較多的數(shù)據(jù)量。其它的方式因?yàn)槎家丫S表加載到內(nèi)存,所以就受限內(nèi)存的大小。
在維表更新頻率上面:因?yàn)轭A(yù)加載 DB 數(shù)據(jù)到內(nèi)存和 Distributed Cache 在重新更新維表數(shù)據(jù)的時(shí)候都需要重啟,所以它們不適合維表需要經(jīng)常變更的場(chǎng)景。而對(duì)于廣播維表和 Temporal table function join 來說,可以實(shí)時(shí)的更新維表數(shù)據(jù)并反映到結(jié)果,所以它們可以支持維表頻繁更新的場(chǎng)景。
對(duì)維表更新實(shí)時(shí)性來說:在廣播維表和 Temporal table function join,它們可以達(dá)到比較快的實(shí)時(shí)更新的效果。熱存儲(chǔ)關(guān)聯(lián)在大部分場(chǎng)景也是可以滿足業(yè)務(wù)需求的。
在維表形式上面:可以看到第1種方式主要是支持訪問 DB 存儲(chǔ)少量數(shù)據(jù)的形式,Distributed Cache 支持文件的形式,熱存儲(chǔ)關(guān)聯(lián)需要訪問 HBase 和 Tair 等等這種熱存儲(chǔ)。廣播維表和 Temporal table function join 都需要維度數(shù)據(jù)能轉(zhuǎn)化成實(shí)時(shí)流的形式。
在外部存儲(chǔ)上面:第1種方式和熱存儲(chǔ)關(guān)聯(lián)都是需要依賴外部存儲(chǔ)的。
在維表 join 這一塊,我們就先介紹這幾個(gè)基本方法。可能有的同學(xué)還有一些其他方案,之后可以反饋交流,這里主要提了一些比較常用的方案,但并不限于這些方案。
首先我們來回顧一下,批處理是怎么去處理兩個(gè)表 join的?一般批處理引擎實(shí)現(xiàn)的時(shí)候,會(huì)采用兩個(gè)思路。
一個(gè)是基于排序的 Sort-Merge join。另外一個(gè)是轉(zhuǎn)化為 Hash table 加載到內(nèi)存里做 Hash join。這兩個(gè)思路對(duì)于雙流 join 的場(chǎng)景是否還同樣適用?在雙流 join 場(chǎng)景里面要處理的對(duì)象不再是這種批數(shù)據(jù)、有限的數(shù)據(jù),而是是無窮數(shù)據(jù)集,對(duì)于無窮數(shù)據(jù)集來說,我們沒有辦法排序以后再做處理,同樣也沒有辦法把無窮數(shù)據(jù)集全部轉(zhuǎn)成 Cache 加載到內(nèi)存去做處理。 所以這兩種方式基本是不能夠適用的。同時(shí)在雙流 join 場(chǎng)景里面,我們的 join 對(duì)象是兩個(gè)流,數(shù)據(jù)也是不斷在進(jìn)入的,所以我們 join 的結(jié)果也是需要持續(xù)更新的。
那么我們應(yīng)該有什么樣的方案去解決雙流 join 的實(shí)現(xiàn)問題?Flink 的一個(gè)基本的思路是將兩個(gè)流的數(shù)據(jù)持續(xù)性的存到 state 中,然后使用。因?yàn)樾枰粩嗟娜ジ?join 的結(jié)果,之前的數(shù)據(jù)理論上如果沒有任何附加條件的話是不能丟棄的。但是從實(shí)現(xiàn)上來說 state 又不能永久的保存所有的數(shù)據(jù),所以需要通過一些方式將 join 的這種全局范圍局部化,就是說把一個(gè)無限的數(shù)據(jù)流,盡可能給它拆分切分成一段一段的有線數(shù)據(jù)集去做 join。
其實(shí)基本就是這樣一個(gè)大的思路,接下來去看一下具體的實(shí)現(xiàn)方式。
接下來我們以 inner join 為例看一下,
左流是黑色標(biāo)出來的這一條,右流是藍(lán)色標(biāo)出來的,這條兩流需要做 inner join。首先左流和右流在元素進(jìn)入以后,需要把相關(guān)的元素存儲(chǔ)到對(duì)應(yīng)的 state 上面。除了存儲(chǔ)到 state 上面以外,左流的數(shù)據(jù)元素到來以后需要去和右邊的 Right State 去做比較看能不能匹配到。同樣右邊的流元素到了以后,也需要和左邊的 Left State 去做比較看是否能夠 match,能夠 match 的話就會(huì)作為 inner join 的結(jié)果輸出。這個(gè)圖是比較粗的展示出來一個(gè) inner join 的大概細(xì)節(jié)。也是讓大家大概的體會(huì)雙流 join 的實(shí)現(xiàn)思路。
我們首先來看一下第1類雙流 join 的方式,Regular join。這種 join 方式需要去保留兩個(gè)流的狀態(tài),持續(xù)性地保留并且不會(huì)去做清除。兩邊的數(shù)據(jù)對(duì)于對(duì)方的流都是所有可見的,所以數(shù)據(jù)就需要持續(xù)性的存在state里面,那么 state 又不能存的過大,因此這個(gè)場(chǎng)景的只適合有界數(shù)據(jù)流。它的語法可以看一下,比較像離線批處理的 SQL:
在上圖頁面里面是現(xiàn)在 Flink 支持 Regular join 的一些寫法,可以看到和我們普通的 SQL 基本是一致的。
在雙流 join 里面 Flink支持的第2類 join 就是 Interval join 也叫區(qū)間 join。它是什么意思呢?就是加入了一個(gè)時(shí)間窗口的限定,要求在兩個(gè)流做 join 的時(shí)候,其中一個(gè)流必須落在另一個(gè)流的時(shí)間戳的一定時(shí)間范圍內(nèi),并且它們的 join key 相同才能夠完成 join。加入了時(shí)間窗口的限定,就使得我們可以對(duì)超出時(shí)間范圍的數(shù)據(jù)做一個(gè)清理,這樣的話就不需要去保留全量的 State。
Interval join 是同時(shí)支持 processing time 和 even time去定義時(shí)間的。如果使用的是 processing time,F(xiàn)link 內(nèi)部會(huì)使用系統(tǒng)時(shí)間去劃分窗口,并且去做相關(guān)的 state 清理。如果使用 even time 就會(huì)利用 Watermark 的機(jī)制去劃分窗口,并且做 State 清理。
Flink 的作者之前有一個(gè)內(nèi)容非常直觀的分享,這里就直接復(fù)用了他這部分的一個(gè)示例:
我們可以看到對(duì)于 Interval join 來說:它定義一個(gè)時(shí)間的下限,就可以使得我們對(duì)于在時(shí)間下限之外的數(shù)據(jù)做清理。比如在剛才的 SQL 里面,其實(shí)我們就限定了 join 條件是 ordertime 必須要大于 shiptime 減去4個(gè)小時(shí)。 對(duì)于 Shipments 流來說,如果接收到12:00 點(diǎn)的 Watermark,就意味著對(duì)于 Orders 流的數(shù)據(jù)小于 8:00 點(diǎn)之前的數(shù)據(jù)時(shí)間戳就可以去做丟棄,不再保留在 state 里面了。
同時(shí)對(duì)于 shiptime 來說,其實(shí)它也設(shè)定了一個(gè)時(shí)間的下限,就是它必須要大于 ordertime。對(duì)于 Orders 流來說如果接收到了一個(gè)10:15點(diǎn)的 Watermark, 那么 Shipments 的 state 10:15 之前的數(shù)據(jù)就可以拋棄掉。 所以 Interval join 使得我們可以對(duì)于一部分歷史的 state 去做清理。
最后來說一下雙流 join 的第3種 Window join:它的概念是將兩個(gè)流中有相同 key 和處在相同 window 里的元素去做 join。它的執(zhí)行的邏輯比較像 Inner join,必須同時(shí)滿足 join key 相同,而且在同一個(gè) window 里元素才能夠在最終結(jié)果中輸出。具體使用的方式是這樣的:
目前 Window join 只支持 Datastream 的 API,所以這里使用方式也是 Datastream 的一個(gè)形式。可以看到我們首先把兩流去做 join,然后在 where 和 equalTo 里面去定義 join key 的條件,然后在 window 中需要去指定 window 劃分的方式 WindowAssigner,最后要去定義 JoinFunction 或者是 FlatJoinFunction,來實(shí)現(xiàn)我們匹配元素的具體處理邏輯。
因?yàn)?window 其實(shí)劃分為三類,所以我們的 Window join 這里也會(huì)分為三類:
第1類 Tumbling Window join:它是按照時(shí)間區(qū)間去做劃分的 window。
可以看到這個(gè)圖里面是兩個(gè)流(綠色的流年和黃色的流)。在這個(gè)例子里我們定義的是一個(gè)兩毫秒的窗口,每一個(gè)圈是我們每個(gè)流上一個(gè)單個(gè)元素,上面的時(shí)間戳代表元素對(duì)應(yīng)的時(shí)間,所以我們可以看到它是按照兩毫秒的間隔去做劃分的,window 和 window 之間是不會(huì)重疊的。 對(duì)于第1個(gè)窗口我們可以看到綠色的流有兩個(gè)元素符合,然后黃色流也有兩個(gè)元素符合,它們會(huì)以 pair 的方式組合,最后輸入到 JoinFunction 或者是 FlatJoinFunction 里面去做具體的處理。
第2類 window 是 Sliding Window Join:這里用的是 Sliding Window。
sliding window 是首先定義一個(gè)窗口大小,然后再定義一個(gè)滑動(dòng)時(shí)間窗的大小。如果滑動(dòng)時(shí)間窗的大小小于定義的窗口大小,窗口和窗口之間會(huì)存在重疊的情況。就像這個(gè)圖里顯示出來的,紅色的窗口和黃色窗口是有重疊的,其中綠色流的0元素同時(shí)處于紅色的窗口和黃色窗口,說明一個(gè)元素是可以同時(shí)處于兩個(gè)窗口的。然后在具體的 Sliding Window Join 的時(shí)候,可以看到對(duì)于紅色的窗口來說有兩個(gè)元素,綠色0和黃色的0,它們兩個(gè)元素是符合 window join 條件的,于是它們會(huì)組成一個(gè)0,0的 pair。 然后對(duì)于黃色的窗口符合條件的是綠色的0與黃色0和1兩位數(shù),它們會(huì)去組合成0,1、0,0和1,0兩個(gè)pair,最后會(huì)進(jìn)入到我們定義的 JoinFunction 里面去做處理。
第3類是 SessionWindow join:這里面用到的 window 是 session window。
session window 是定義一個(gè)時(shí)間間隔,如果一個(gè)流在這個(gè)時(shí)間間隔內(nèi)沒有元素到達(dá)的話,那么它就會(huì)重新開一個(gè)新的窗口。在上圖里面我們可以看到窗口和窗口之間是不會(huì)重疊的。我們這里定義的Gap是1,對(duì)于第1個(gè)窗口來說,可以看到有綠色的0元素和黃色的1、2元素都是在同一個(gè)窗口內(nèi),所以它會(huì)組成在1 ,0和2,0這樣的一個(gè)pair。剩余的也是類似,符合條件的pair都會(huì)進(jìn)入到最后 JoinFunction 里面去做處理。
上述內(nèi)容就是基于Flink的典型ETL場(chǎng)景是怎么實(shí)現(xiàn),你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。