您好,登錄后才能下訂單哦!
這篇文章主要介紹“Flink集成iceberg在生產(chǎn)環(huán)境中的使用方法是什么”,在日常操作中,相信很多人在Flink集成iceberg在生產(chǎn)環(huán)境中的使用方法是什么問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”Flink集成iceberg在生產(chǎn)環(huán)境中的使用方法是什么”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!
在大數(shù)據(jù)處理領(lǐng)域,有一個(gè)非常常見(jiàn)但是很麻煩的問(wèn)題,即hdfs小文件問(wèn)題,我們也被這個(gè)問(wèn)題困擾了很久。開(kāi)始的時(shí)候我們是自己寫的一個(gè)小文件壓縮工具,定期的去合并,原理就是把待壓縮數(shù)據(jù)寫入一個(gè)新的臨時(shí)的文件夾,壓縮完,和原來(lái)的數(shù)據(jù)進(jìn)行檢驗(yàn),數(shù)據(jù)一致之后,用壓縮的數(shù)據(jù)覆蓋原來(lái)的數(shù)據(jù),但是由于無(wú)法保證事務(wù),所以出現(xiàn)了很多的問(wèn)題,比如壓縮的同時(shí)又有數(shù)據(jù)寫入了,檢驗(yàn)就會(huì)失敗,導(dǎo)致合并小文件失敗,而且無(wú)法實(shí)時(shí)的合并,只能按照分區(qū)合并一天之前的?;蛘咭粋€(gè)小時(shí)之前的,最新的數(shù)據(jù)仍然有小文件的問(wèn)題,導(dǎo)致查詢性能提高不了。
所以基于以上的一些問(wèn)題,我調(diào)研了數(shù)據(jù)湖技術(shù),由于我們的流式數(shù)據(jù)主要是flink為主,查詢引擎是presto,而hudi強(qiáng)耦合了spark,對(duì)flink的支持還不太友好,所以綜合考慮了一下,決定引入iceberg。在對(duì)iceberg進(jìn)行功能測(cè)試和簡(jiǎn)單代碼review之后,發(fā)現(xiàn)iceberg在flink這塊還有一些需要優(yōu)化和提升,不過(guò)我覺(jué)得應(yīng)該能hold的住,不完善的地方和需要優(yōu)化的地方我們自己來(lái)補(bǔ)全,所以最終引入了iceberg來(lái)解決小文件的問(wèn)題。
除此之外,對(duì)于一些其他的問(wèn)題,比如cdc數(shù)據(jù)的接入,以及根據(jù)查詢條件刪除數(shù)據(jù)等,后續(xù)也可以通過(guò)數(shù)據(jù)湖技術(shù)來(lái)解決。
我們的主要使用場(chǎng)景是使用flink將kafka的流式數(shù)據(jù)寫入到Iceberg,為了代碼的簡(jiǎn)潔以及可維護(hù)性,我們盡量將程序使用sql來(lái)編寫,示例代碼如下:
// create catalog CREATE CATALOG iceberg WITH ( 'type'='iceberg', 'catalog-type'='hive'," + 'warehouse'='hdfs://localhost/user/hive/warehouse', 'uri'='thrift://localhost:9083')// create table CREATE TABLE iceberg.tmp.iceberg_table ( id BIGINT COMMENT 'unique id', data STRING, d int) PARTITIONED BY (d)WITH ('connector'='iceberg','write.format.default'='orc')// insert into insert into iceberg.tmp.iceberg_table select * from kafka_table
提示:記得開(kāi)啟checkpoint
目前壓縮小文件是采用的一個(gè)額外批任務(wù)來(lái)進(jìn)行的,Iceberg提供了一個(gè)spark版本的action,我在做功能測(cè)試的時(shí)候發(fā)現(xiàn)了一些問(wèn)題,此外我對(duì)spark也不是非常熟悉,擔(dān)心出了問(wèn)題不好排查,所以參照spark版本的自己實(shí)現(xiàn)了一個(gè)flink版本,并修復(fù)了一些bug,進(jìn)行了一些功能的優(yōu)化。
由于我們的iceberg的元數(shù)據(jù)都是存儲(chǔ)在hive中的,所以壓縮程序的邏輯是我把hive中所有的iceberg表全部都查出來(lái),依次壓縮。壓縮沒(méi)有過(guò)濾條件,不管是分區(qū)表還是非分區(qū)表,都進(jìn)行全表的壓縮。這樣做是為了處理某些使用eventtime的flink任務(wù),如果有延遲的數(shù)據(jù)的到來(lái)。就會(huì)把數(shù)據(jù)寫入以前的分區(qū),如果不是全表壓縮只壓縮當(dāng)天分區(qū)的話,新寫入的其他天的數(shù)據(jù)就不會(huì)被壓縮。
代碼示例參考:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Actions.forTable(env, table) .rewriteDataFiles()//.maxParallelism(parallelism)//.filter(Expressions.equal("day", day))//.targetSizeInBytes(targetSizeInBytes).execute();
具體的壓縮小文件相關(guān)的信息可以參考這篇文章[Flink集成iceberg數(shù)據(jù)湖之合并小文件]。
我們的快照過(guò)期策略,我是和壓縮小文件的批處理任務(wù)寫在一起的,壓縮完小文件之后,進(jìn)行表的快照過(guò)期處理,目前保留的時(shí)間是一個(gè)小時(shí),這是因?yàn)閷?duì)于有一些比較大的表,分區(qū)比較多,而且checkpoint比較短,如果保留的快照過(guò)長(zhǎng)的話,還是會(huì)保留過(guò)多小文件,我們暫時(shí)沒(méi)有查詢歷史快照的需求,所以我將快照的保留時(shí)間設(shè)置了一個(gè)小時(shí)。
long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1); table.expireSnapshots() // .retainLast(20) .expireOlderThan(olderThanTimestamp) .commit();
寫入了數(shù)據(jù)之后,有時(shí)候我想查看一下相應(yīng)的快照下面有多少數(shù)據(jù)文件,直接查詢hdfs你不知道哪個(gè)是有用的,哪個(gè)是沒(méi)用的。所以需要有對(duì)應(yīng)的管理工具。目前flink這塊還不太成熟,我們可以使用spark3提供的工具來(lái)查看。
目前create table 這些操作我們是通過(guò)flink sql client來(lái)做的。其他相關(guān)的ddl的操作可以使用spark來(lái)做:
https://iceberg.apache.org/spark/#ddl-commands
一些相關(guān)的數(shù)據(jù)的操作,比如刪除數(shù)據(jù)等可以通過(guò)spark來(lái)實(shí)現(xiàn),presto目前只支持分區(qū)級(jí)別的刪除功能。
在使用iceberg的過(guò)程中,有時(shí)候會(huì)有這樣的情況,我提交了一個(gè)flink任務(wù),由于各種原因,我把它給停了,這個(gè)時(shí)候iceberg還沒(méi)提交相應(yīng)的快照。還有由于一些異常導(dǎo)致程序失敗,就會(huì)產(chǎn)生一些不在iceberg元數(shù)據(jù)里面的孤立的數(shù)據(jù)文件,這些文件對(duì)iceberg來(lái)說(shuō)是不可達(dá)的,也是沒(méi)用的。所以我們需要像jvm的垃圾回收一樣來(lái)清理這些文件。
目前iceberg提供了一個(gè)spark版本的action來(lái)進(jìn)行處理這些沒(méi)用的文件,我們采取的策略和壓縮小文件一樣,獲取hive中的所有的iceberg表。每隔一個(gè)小時(shí)執(zhí)行一次定時(shí)任務(wù)來(lái)刪除這些沒(méi)用的文件。
SparkSession spark = ...... Actions.forTable(spark, table) .removeOrphanFiles() //.deleteWith(...) .execute();
在程序運(yùn)行過(guò)程中出現(xiàn)了正常的數(shù)據(jù)文件被刪除的問(wèn)題,經(jīng)過(guò)調(diào)研,由于我的快照保留設(shè)置是一小時(shí),這個(gè)清理程序清理時(shí)間也是設(shè)置一個(gè)小時(shí),通過(guò)日志發(fā)現(xiàn)是這個(gè)清理程序刪除了正常的數(shù)據(jù)。查了查代碼,覺(jué)得應(yīng)該是他們?cè)O(shè)置了一樣的時(shí)間,在清理孤立文件的時(shí)候,有其他程序正在讀寫表,由于這個(gè)清理程序是沒(méi)有事務(wù)的,導(dǎo)致刪除了正常的數(shù)據(jù)。最后把這個(gè)清理程序的清理時(shí)間改成默認(rèn)的三天,沒(méi)有再出現(xiàn)刪除數(shù)據(jù)文件的問(wèn)題。當(dāng)然,為了保險(xiǎn)起見(jiàn),我們可以覆蓋原來(lái)的刪除文件的方法,改成將文件到一個(gè)備份文件夾,檢查沒(méi)有問(wèn)題之后,手工刪除。
目前我們使用的版本是prestosql 346,這個(gè)版本安裝的時(shí)候需要jdk11,presto查詢iceberg比較簡(jiǎn)單。官方提供了相應(yīng)的conncter,我們配置一下就行,
//iceberg.propertiesconnector.name=iceberg hive.metastore.uri=thrift://localhost:9083
目前查詢iceberg的批處理任務(wù),使用的flink的客戶端,首先我們啟動(dòng)一個(gè)基于yarn session 的flink集群,然后通過(guò)sql客戶端提交任務(wù)到集群。
主要的配置就是我們需要根據(jù)數(shù)據(jù)的大小設(shè)置sql任務(wù)執(zhí)行的并行度,可以通過(guò)以下參數(shù)設(shè)置。
set table.exec.resource.default-parallelism = 100;
此外我在sql客戶端的配置文件里配置了hive和iceberg相應(yīng)的catalog,這樣每次客戶端啟動(dòng)的時(shí)候就不需要建catalog了。
catalogs: # empty list - name: iceberg type: iceberg warehouse: hdfs://localhost/user/hive2/warehouse uri: thrift://localhost:9083 catalog-type: hive cache-enabled: false - name: hive type: hive hive-conf-dir: /Users/user/work/hive/conf default-database: default
目前對(duì)于定時(shí)調(diào)度中的批處理任務(wù),flink的sql客戶端還沒(méi)hive那樣做的很完善,比如執(zhí)行hive -f來(lái)執(zhí)行一個(gè)文件。而且不同的任務(wù)需要不同的資源,并行度等。所以我自己封裝了一個(gè)flinK程序,通過(guò)調(diào)用這個(gè)程序來(lái)進(jìn)行處理,讀取一個(gè)指定文件里面的sql,來(lái)提交批任務(wù)。在命令行控制任務(wù)的資源和并行度等。
/home/flink/bin/flink run -p 10 -m yarn-cluster /home/work/iceberg-scheduler.jar my.sql
批任務(wù)的查詢這塊,做了一些優(yōu)化,比如limit下推,filter下推,查詢并行度優(yōu)化等,可以大大提高查詢的速度,這些優(yōu)化都已經(jīng)推回給社區(qū)。
目前我們的所有數(shù)據(jù)都是存儲(chǔ)在hive表的,在驗(yàn)證完iceberg之后,我們決定將hive的數(shù)據(jù)遷移到iceberg,所以我寫了一個(gè)工具,可以使用hive的數(shù)據(jù),然后新建一個(gè)iceberg表,為其建立相應(yīng)的元數(shù)據(jù),但是測(cè)試的時(shí)候發(fā)現(xiàn),如果采用這種方式,就需要把寫入hive的程序停止,因?yàn)槿绻鹖ceberg和hive使用同一個(gè)數(shù)據(jù)文件,而壓縮程序會(huì)不斷地壓縮iceberg表的小文件,壓縮完之后,不會(huì)馬上刪除舊數(shù)據(jù),所以hive表就會(huì)查到雙份的數(shù)據(jù)。鑒于iceberg測(cè)試的時(shí)候還有一些不穩(wěn)定,所以我們采用雙寫的策略,原來(lái)寫入hive的程序不動(dòng),新啟動(dòng)一套程序?qū)懭雐ceberg,這樣能對(duì)iceberg表觀察一段時(shí)間。還能和原來(lái)hive中的數(shù)據(jù)進(jìn)行比對(duì),來(lái)驗(yàn)證程序的正確性。
經(jīng)過(guò)一段時(shí)間觀察,每天將近20億數(shù)據(jù)的hive表和iceberg表,一條數(shù)據(jù)也不差。所以在最終對(duì)比數(shù)據(jù)沒(méi)有問(wèn)題之后,把hive表停止寫入,使用新的iceberg表,然后把hive中的舊數(shù)據(jù)導(dǎo)入到iceberg。
到此,關(guān)于“Flink集成iceberg在生產(chǎn)環(huán)境中的使用方法是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。