您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)如何理解數(shù)據(jù)湖技術(shù)中的Apache Hudi,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
隨著Apache Parquet和Apache ORC等存儲格式以及Presto和Apache Impala等查詢引擎的發(fā)展,Hadoop生態(tài)系統(tǒng)有潛力作為面向分鐘級延時場景的通用統(tǒng)一服務(wù)層。然而,為了實現(xiàn)這一點,這需要在HDFS中實現(xiàn)高效且低延遲的數(shù)據(jù)攝取及數(shù)據(jù)準(zhǔn)備。
為了解決這個問題,優(yōu)步開發(fā)了Hudi項目,這是一個增量處理框架,高效和低延遲地為所有業(yè)務(wù)關(guān)鍵數(shù)據(jù)鏈路提供有力支持。事實上,Uber已經(jīng)將Hudi開源。在深入的了解Hudi之前,我們首先討論一下為什么將Hadoop作為統(tǒng)一的服務(wù)層是一個不錯的想法。
Lambda架構(gòu)是一種常見的數(shù)據(jù)處理體系結(jié)構(gòu),它的數(shù)據(jù)的處理依賴流式計算層(Streaming Layer)和批處理計算層(Batch Layer)的雙重計算。每隔幾個小時,批處理過程被啟動以計算精確的業(yè)務(wù)狀態(tài),并將批量更新加載到服務(wù)層(Serving Layer)。同時,為了消除上述幾個小時的等待時間我們會在流式計算層對這個業(yè)務(wù)數(shù)據(jù)進(jìn)行實時的狀態(tài)更新。然而,這個流計算的狀態(tài)只是一個最終結(jié)果的近似值,最終需要被批處理的計算結(jié)果所覆蓋。由于兩種模式提供的狀態(tài)差異,我們需要為批處理和流處理提供不同的服務(wù)層,并在這個上面再做合并抽象,或者設(shè)計應(yīng)用一個相當(dāng)復(fù)雜的服務(wù)系統(tǒng)(如Druid),用于同時在行級更新和批量加載中提供優(yōu)異表現(xiàn)。
Lambda架構(gòu)需要雙重計算和雙重服務(wù)
對于是否需要一個額外單獨的批處理層,Kappa架構(gòu)認(rèn)為一個單獨的流式計算層足以成為數(shù)據(jù)處理的通用解決方案。廣義上,所有數(shù)據(jù)計算都可以描述為生產(chǎn)者生產(chǎn)一個數(shù)據(jù)流,而消費者不斷的逐條迭代消費這個流中的記錄,如火山模型(Volcano Iterator model)。這就意味著流式計算層可以依靠堆資源以增加并行能力的方式來對業(yè)務(wù)狀態(tài)進(jìn)行重算更新。這類系統(tǒng)可以依靠有效的檢查點(checkpoint)和大量的狀態(tài)管理來讓流式處理的結(jié)果不再只是一個近似值。這個模型被應(yīng)用于很多的數(shù)據(jù)攝取任務(wù)。盡管如此,雖然批處理層在這個模型中被去掉了,但是在服務(wù)層仍然存在兩個問題。
如今很多流式處理引擎都支持行級的數(shù)據(jù)處理,這就要求我們的服務(wù)層也需要能夠支持行級更新的能力。通常,這類系統(tǒng)并不能對分析類的查詢掃描優(yōu)化到這個地步,除非我們在內(nèi)存中緩存大量記錄(如Memsql)或者有強(qiáng)大的索引支持(如ElasticSearch)。這些系統(tǒng)為了獲得數(shù)據(jù)攝取和掃描的性能往往需要增加成本和犧牲服務(wù)的可擴(kuò)展性。出于這個原因,這類服務(wù)系統(tǒng)的數(shù)據(jù)駐留的能力往往是有限的,從時間上可能30~90天,從總量上來說幾個TB的數(shù)據(jù)就是他們的極限了。對于歷史數(shù)據(jù)的分析又會被重新定向到時延要求不那么高的HDFS上。
Kappa架構(gòu)統(tǒng)一了處理層,但服務(wù)復(fù)雜性仍然存在
對于數(shù)據(jù)攝取延時、掃描性能和計算資源和操作復(fù)雜性的權(quán)衡是無法避免的。但是如果我們的業(yè)務(wù)場景對時延的要求并不是那么的高,比如能接受10分鐘左右的延遲,在我們?nèi)绻新纷涌梢栽贖DFS上快速的進(jìn)行數(shù)據(jù)攝取和數(shù)據(jù)準(zhǔn)備的基礎(chǔ)上,服務(wù)層中的Speed Serving就不必要了。這么做可以統(tǒng)一服務(wù)層,大大降低系統(tǒng)整體的復(fù)雜度和資源消耗。
要將HDFS用作統(tǒng)一的服務(wù)層,我們不但需要使它支持存儲變更日志(或者叫日志記錄系統(tǒng)),而且需要支持根據(jù)實際業(yè)務(wù)維度來分區(qū)、壓縮、去重的業(yè)務(wù)狀態(tài)管理。這類統(tǒng)一服務(wù)層需具備如下幾個特性:
大型HDFS數(shù)據(jù)集的快速變更能力
數(shù)據(jù)存儲需要針對分析類掃描進(jìn)行優(yōu)化(列存)
有效的連接和將更新傳播到上層建模數(shù)據(jù)集的能力
被壓縮的業(yè)務(wù)狀態(tài)變更是無法避免的,即使我們以事件時間(Event time)作為業(yè)務(wù)分區(qū)字段。由于遲到數(shù)據(jù)和事件時間和處理時間(Processing time)的不一致,在數(shù)據(jù)攝取場景中我們依然需要對老的分區(qū)進(jìn)行必要的更新操作。最后就算我們把處理時間作為分區(qū)字段,依然存在一些需要進(jìn)行更新的場景,比如由于安全、審計方面的原因?qū)υ瓟?shù)據(jù)進(jìn)行校正的需求。
作為一個增量處理框架,我們的Hudi支持前面章節(jié)中所述的所有需求。一言以蔽之,Hudi是一種針對分析型業(yè)務(wù)的、掃描優(yōu)化的數(shù)據(jù)存儲抽象,它能夠使HDFS數(shù)據(jù)集在分鐘級的時延內(nèi)支持變更,也支持下游系統(tǒng)對這個數(shù)據(jù)集的增量處理。
Hudi數(shù)據(jù)集通過自定義的InputFormat
兼容當(dāng)前Hadoop生態(tài)系統(tǒng),包括Apache Hive,Apache Parquet,Presto和Apache Spark,使得終端用戶可以無縫的對接。
基于Hudi簡化的服務(wù)架構(gòu),分鐘級延時
該數(shù)據(jù)流模型通過時延和數(shù)據(jù)完整性保證兩個維度去權(quán)衡以構(gòu)建數(shù)據(jù)管道。下圖所示的是Uber Engineering如何根據(jù)這兩個維度進(jìn)行處理方式的劃分。
Uber在不同延遲和完整性級別上的用例分布
對于很少一些需要真正做到約1分鐘的延時的用例及簡單業(yè)務(wù)指標(biāo)的展示應(yīng)用,我們基于行級的流式處理。對于傳統(tǒng)的機(jī)器學(xué)習(xí)和實驗有效性分析用例,我們選擇更加擅長較重計算的批處理。對于包含復(fù)雜連接或者重要數(shù)據(jù)處理的近實時場景,我們基于Hudi以及它的增量處理原語來獲得兩全其美的結(jié)果。想要了解Uber使用Hudi的更多用例和場景,可以去他們的Githup文檔(https://uber.github.io/hudi/use_cases.html)里面看一下。
Hudi數(shù)據(jù)集的組織目錄結(jié)構(gòu)與Hive表示非常相似,一份數(shù)據(jù)集對應(yīng)這一個根目錄。數(shù)據(jù)集被打散為多個分區(qū),分區(qū)字段以文件夾形式存在,該文件夾包含該分區(qū)的所有文件。在根目錄下,每個分區(qū)都有唯一的分區(qū)路徑。每個分區(qū)記錄分布于多個文件中。每個文件都有惟一的fileId
和生成文件的commit
所標(biāo)識。如果發(fā)生更新操作時,多個文件共享相同的fileId,但會有不同的commit
。
每條記錄由記錄的key值進(jìn)行標(biāo)識并映射到一個fileId。一條記錄的key與fileId之間的映射一旦在第一個版本寫入該文件時就是永久確定的。換言之,一個fileId標(biāo)識的是一組文件,每個文件包含一組特定的記錄,不同文件之間的相同記錄通過版本號區(qū)分。
Hudi Storage由三個不同部分組成:
Metadata - 以時間軸(timeline)的形式將數(shù)據(jù)集上的各項操作元數(shù)據(jù)維護(hù)起來,以支持?jǐn)?shù)據(jù)集的瞬態(tài)視圖,這部分元數(shù)據(jù)存儲于根目錄下的元數(shù)據(jù)目錄。一共有三種類型的元數(shù)據(jù):
Commits - 一個單獨的commit包含對數(shù)據(jù)集之上一批數(shù)據(jù)的一次原子寫入操作的相關(guān)信息。我們用單調(diào)遞增的時間戳來標(biāo)識commits,標(biāo)定的是一次寫入操作的開始。
Cleans - 用于清除數(shù)據(jù)集中不再被查詢所用到的舊版本文件的后臺活動。
Compactions - 用于協(xié)調(diào)Hudi內(nèi)部的數(shù)據(jù)結(jié)構(gòu)差異的后臺活動。例如,將更新操作由基于行存的日志文件歸集到列存數(shù)據(jù)上。
Index - Hudi維護(hù)著一個索引,以支持在記錄key存在情況下,將新記錄的key快速映射到對應(yīng)的fileId。索引的實現(xiàn)是插件式的,
Bloom filter - 存儲于數(shù)據(jù)文件頁腳。默認(rèn)選項,不依賴外部系統(tǒng)實現(xiàn)。數(shù)據(jù)和索引始終保持一致。
Apache HBase - 可高效查找一小批key。在索引標(biāo)記期間,此選項可能快幾秒鐘。
Data - Hudi以兩種不同的存儲格式存儲所有攝取的數(shù)據(jù)。這塊的設(shè)計也是插件式的,用戶可選擇滿足下列條件的任意數(shù)據(jù)格式:
讀優(yōu)化的列存格式(ROFormat)。缺省值為Apache Parquet
寫優(yōu)化的行存格式(WOFormat)。缺省值為Apache Avro
Hudi存儲內(nèi)核。
Hudi對HDFS的使用模式進(jìn)行了優(yōu)化。Compaction是將數(shù)據(jù)從寫優(yōu)化格式轉(zhuǎn)換為讀優(yōu)化格式的關(guān)鍵操作。Compaction操作的基本并行單位是對一個fileID的重寫,Hudi保證所有的數(shù)據(jù)文件的大小和HDFS的塊大小對齊,這樣可以使Compaction操作的并行度、查詢的并行度和HDFS文件總數(shù)間取得平衡。Compaction操作也是插件式的,可以擴(kuò)展為合并不頻繁更新的老的數(shù)據(jù)文件已進(jìn)一步減少文件總數(shù)。
Hudi是一個Spark的第三方庫,以Spark Streaming的方式運行數(shù)據(jù)攝取作業(yè),這些作業(yè)一般建議以1~2分鐘左右的微批(micro-batch)進(jìn)行處理。當(dāng)然,在權(quán)衡自己業(yè)務(wù)在時延要求和資源層面的前提下,我們也可以用Apache Oozie或者Apache Airflow來進(jìn)行離線作業(yè)周期性調(diào)度。
在默認(rèn)配置下,Hudi使用一下寫入路徑:
Hudi從相關(guān)的分區(qū)下的parquet文件中加載BloomFilter索引,并通過傳入key值映射到對應(yīng)的文件來標(biāo)記是更新還是插入。此處的連接操作可能由于輸入數(shù)據(jù)的大小,分區(qū)的分布或者單個分區(qū)下的文件數(shù)問題導(dǎo)致數(shù)據(jù)傾斜。通過對連接字段進(jìn)行范圍分區(qū)以及新建子分區(qū)的方式處理,以避免Spark某些低版本中處理Shuffle文件時的2GB限制的問題 - https://issues.apache.org/jira/browse/SPARK-6190。
Hudi按分區(qū)對insert
進(jìn)行分組,分配一個fileId,然后對相應(yīng)的日志文件進(jìn)行append操作,知道文件大小達(dá)到HDSF塊大小。然后,新的fileId生成,重復(fù)上述過程,直到所有的數(shù)據(jù)都被插入。
一個有時間限制compaction操作會被后臺以幾分鐘為周期調(diào)度起來,生成一個compactions的優(yōu)先級列表,并壓縮一個fileId包含的所有avro文件以生成進(jìn)行當(dāng)前parquet文件的下一個版本。
Compaction操作是異步的,鎖定幾個特定的日志版本進(jìn)行壓縮,并以新的日志記錄更新到對應(yīng)fileId中。鎖維護(hù)在Zookeeper中。
Compaction操作的優(yōu)先級順序由被壓縮的日志數(shù)據(jù)大小決定,并基于一個Compaction策略可配置。每一輪壓縮迭代過程中,大文件優(yōu)先被壓縮,因為重寫parquet文件的開銷并不會根據(jù)文件的更新次數(shù)進(jìn)行分?jǐn)偂?/p>
Hudi在針對一個fileId進(jìn)行更新操作時,如果對應(yīng)的日志文件存在則append,反之,會新建日志文件。
如果數(shù)據(jù)攝取作業(yè)成功,一個commit
記錄會在Hudi的元數(shù)據(jù)時間軸中記錄,即將inflight文件重命名為commit文件,并將分區(qū)和所創(chuàng)建fileId版本的詳細(xì)信息記錄下來。
如上所述,Hudi會努力將文件大小和HDFS底層塊大小對齊。取決于一個分區(qū)下數(shù)據(jù)的總量和列存的壓縮效果,compaction操作依然能夠創(chuàng)建parquet小文件。因為對分區(qū)的插入操作會是以對現(xiàn)有小文件的更新來進(jìn)行的,所有這些小文件的問題最終會被一次次的迭代不斷修正。最終,文件大小會不斷增長直到與HDFS塊大小一致。
首先,Spark的本身的重試機(jī)制會cover一些間歇性的異常,當(dāng)然如果超過了重試次數(shù)的閾值,我們的整個作業(yè)都會失敗。下一次的迭代作業(yè)會在同一批次數(shù)據(jù)上進(jìn)行重試。以下列出兩個重要的區(qū)別:
攝取失敗可能在日志文件中生成包含部分?jǐn)?shù)據(jù)的avro塊 - 這個問題通過在commit
元數(shù)據(jù)中存儲對應(yīng)數(shù)據(jù)塊的起始偏移量和日志文件版本來解決。當(dāng)讀取日志文件時,偶爾發(fā)生的部分寫入的數(shù)據(jù)塊會被跳過,且會從正確的位置開始讀取avro文件。
Compaction過程失敗會生產(chǎn)包含部分?jǐn)?shù)據(jù)的parquet文件 - 這個問題在查詢階段被解決,通過commit
元數(shù)據(jù)進(jìn)行文件版本的過濾。查詢階段只會讀取最新的完成的compaction后的文件。這些失敗的compaction文件會在下一個compaction周期被回滾。
commit
時間軸元數(shù)據(jù)可以讓我們在同一份HDFS數(shù)據(jù)上同時享有讀取優(yōu)化的視圖和實時視圖。客戶端可以基于延遲要求和查詢性能決定使用哪種視圖。Hudi以自定義的InputFormat
和一個Hive注冊模塊來提供這兩種視圖,后者可以將這兩種視圖注冊為Hive Metastore表。這兩種輸入格式都可以識別fileId和commit
時間,可以篩選并讀取最新提交的文件。然后,Hudi會基于這些數(shù)據(jù)文件生成輸入分片供查詢使用。
InputFormat
的具體信息如下:
HoodieReadOptimizedInputFormat - 提供掃描優(yōu)化的視圖,篩選所有的日志文件并獲取最新版本的parquet壓縮文件
HoodieRealtimeInputFormat - 提供一個實時的視圖,除了會獲取最新的parquet壓縮文件之外,還提供一個RecordReader
以合并與parquet文件相關(guān)的日志文件。
這兩類InputFormat
都擴(kuò)展了MapredParquetInputFormat
和VectorizedParquetRecordReader
,因此所有針對parquet文件的優(yōu)化依然被保留。依賴于hoodie-hadoop-mr
類庫,Presto和Spark SQL可以對Hudi格式的Hive Metastore表做到開箱即用。
Hudi篩選出最新版本,在提供記錄之前將他們與日志文件合并
前面提到過,數(shù)據(jù)模型表需要在HDFS中處理和提供,才能使的HDFS算的上是一個統(tǒng)一的服務(wù)層。構(gòu)建低延時的數(shù)據(jù)模型表需要能夠鏈接HDFS數(shù)據(jù)集記性增量處理。由于Hudi在元數(shù)據(jù)中維護(hù)了每次提交的提交時間以及對應(yīng)的文件版本,使得我們可以基于起始時間戳和結(jié)束時間戳從特定的Hudi數(shù)據(jù)集中提取增量的變更數(shù)據(jù)集。
這個過程基本上與普通的查詢大致相同,只是選取特定時間范圍內(nèi)的文件版本進(jìn)行讀取而不是選最新的,提交時間會最為過濾條件被謂詞下推到文件掃描階段。這個增量結(jié)果集也收到文件自動清理的影響,如果某些時間范圍內(nèi)的文件被自動清理掉了,那自然也是不能被訪問到了。
這樣我們就可以基于watermark做雙流join和流與靜態(tài)數(shù)據(jù)的join以對存儲在HDFS中的數(shù)據(jù)模型表計算和upsert
。
基于Hudi增量計算的建模過程
關(guān)于如何理解數(shù)據(jù)湖技術(shù)中的Apache Hudi就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。