您好,登錄后才能下訂單哦!
Apache Hudi使用是怎么樣的,相信很多沒(méi)有經(jīng)驗(yàn)的人對(duì)此束手無(wú)策,為此本文總結(jié)了問(wèn)題出現(xiàn)的原因和解決方法,通過(guò)這篇文章希望你能解決這個(gè)問(wèn)題。]
實(shí)時(shí)分為處理的實(shí)時(shí)和數(shù)據(jù)的實(shí)時(shí) 即席分析是要求對(duì)數(shù)據(jù)實(shí)時(shí)的處理,馬上要得到對(duì)應(yīng)的結(jié)果 Flink、Spark Streaming是用來(lái)對(duì)實(shí)時(shí)數(shù)據(jù)的實(shí)時(shí)處理,數(shù)據(jù)要求實(shí)時(shí),處理也要迅速 數(shù)據(jù)不實(shí)時(shí),處理也不及時(shí)的場(chǎng)景則是我們的數(shù)倉(cāng)T+1數(shù)據(jù)
而本文探討的Apache Hudi,對(duì)應(yīng)的場(chǎng)景是數(shù)據(jù)的實(shí)時(shí),而非處理的實(shí)時(shí)。它旨在將Mysql中的時(shí)候以近實(shí)時(shí)的方式映射到大數(shù)據(jù)平臺(tái),比如Hive中。
傳統(tǒng)的離線數(shù)倉(cāng),通常數(shù)據(jù)是T+1的,不能滿(mǎn)足對(duì)當(dāng)日數(shù)據(jù)分析的需求 而流式計(jì)算一般是基于窗口,并且窗口邏輯相對(duì)比較固定。 而筆者所在的公司有一類(lèi)特殊的需求,業(yè)務(wù)分析比較熟悉現(xiàn)有事務(wù)數(shù)據(jù)庫(kù)的數(shù)據(jù)結(jié)構(gòu),并且希望有很多即席分析,這些分析包含當(dāng)日比較實(shí)時(shí)的數(shù)據(jù)。慣常他們是基于Mysql從庫(kù),直接通過(guò)Sql做相應(yīng)的分析計(jì)算。但很多時(shí)候會(huì)遇到如下障礙
數(shù)據(jù)量較大、分析邏輯較為復(fù)雜時(shí),Mysql從庫(kù)耗時(shí)較長(zhǎng)
一些跨庫(kù)的分析無(wú)法實(shí)現(xiàn)
因此,一些彌合在OLTP和OLAP之間的技術(shù)框架出現(xiàn),典型有TiDB。它能同時(shí)支持OLTP和OLAP。而諸如Apache Hudi和Apache Kudu則相當(dāng)于現(xiàn)有OLTP和OLAP技術(shù)的橋梁。他們能夠以現(xiàn)有OLTP中的數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)數(shù)據(jù),支持CRUD,同時(shí)提供跟現(xiàn)有OLAP框架的整合(如Hive,Impala),以實(shí)現(xiàn)OLAP分析
Apache Kudu,需要單獨(dú)部署集群。而Apache Hudi則不需要,它可以利用現(xiàn)有的大數(shù)據(jù)集群比如HDFS做數(shù)據(jù)文件存儲(chǔ),然后通過(guò)Hive做數(shù)據(jù)分析,相對(duì)來(lái)說(shuō)更適合資源受限的環(huán)境 ###Apache hudi簡(jiǎn)介
Hudi 提供了Hudi 表的概念,這些表支持CRUD操作。我們可以基于這個(gè)特點(diǎn),將Mysql Binlog的數(shù)據(jù)重放至Hudi表,然后基于Hive對(duì)Hudi表進(jìn)行查詢(xún)分析。數(shù)據(jù)流向架構(gòu)如下
Hudi表的數(shù)據(jù)文件,可以使用操作系統(tǒng)的文件系統(tǒng)存儲(chǔ),也可以使用HDFS這種分布式的文件系統(tǒng)存儲(chǔ)。為了后續(xù)分析性能和數(shù)據(jù)的可靠性,一般使用HDFS進(jìn)行存儲(chǔ)。以HDFS存儲(chǔ)來(lái)看,一個(gè)Hudi表的存儲(chǔ)文件分為兩類(lèi)。
包含_partition_key
相關(guān)的路徑是實(shí)際的數(shù)據(jù)文件,按分區(qū)存儲(chǔ),當(dāng)然分區(qū)的路徑key是可以指定的,我這里使用的是_partition_key
.hoodie 由于CRUD的零散性,每一次的操作都會(huì)生成一個(gè)文件,這些小文件越來(lái)越多后,會(huì)嚴(yán)重影響HDFS的性能,Hudi設(shè)計(jì)了一套文件合并機(jī)制。 .hoodie文件夾中存放了對(duì)應(yīng)的文件合并操作相關(guān)的日志文件。
Hudi真實(shí)的數(shù)據(jù)文件使用Parquet文件格式存儲(chǔ)
Hudi把隨著時(shí)間流逝,對(duì)表的一系列CRUD操作叫做Timeline。Timeline中某一次的操作,叫做Instant。Instant包含以下信息
Instant Action 記錄本次操作是一次數(shù)據(jù)提交(COMMITS),還是文件合并(COMPACTION),或者是文件清理(CLEANS)
Instant Time 本次操作發(fā)生的時(shí)間
state 操作的狀態(tài),發(fā)起(REQUESTED),進(jìn)行中(INFLIGHT),還是已完成(COMPLETED)
.hoodie文件夾中存放對(duì)應(yīng)操作的狀態(tài)記錄
hudi為了實(shí)現(xiàn)數(shù)據(jù)的CRUD,需要能夠唯一標(biāo)識(shí)一條記錄。hudi將把數(shù)據(jù)集中的唯一字段(record key ) + 數(shù)據(jù)所在分區(qū) (partitionPath) 聯(lián)合起來(lái)當(dāng)做數(shù)據(jù)的唯一鍵
基于上述基礎(chǔ)概念之上,Hudi提供了兩類(lèi)表格式COW和MOR。他們會(huì)在數(shù)據(jù)的寫(xiě)入和查詢(xún)性能上有一些不同
簡(jiǎn)稱(chēng)COW。顧名思義,他是在數(shù)據(jù)寫(xiě)入的時(shí)候,復(fù)制一份原來(lái)的拷貝,在其基礎(chǔ)上添加新數(shù)據(jù)。正在讀數(shù)據(jù)的請(qǐng)求,讀取的是是近的完整副本,這類(lèi)似Mysql 的MVCC的思想。
上圖中,每一個(gè)顏色都包含了截至到其所在時(shí)間的所有數(shù)據(jù)。老的數(shù)據(jù)副本在超過(guò)一定的個(gè)數(shù)限制后,將被刪除。這種類(lèi)型的表,沒(méi)有compact instant,因?yàn)閷?xiě)入時(shí)相當(dāng)于已經(jīng)compact了。
優(yōu)點(diǎn) 讀取時(shí),只讀取對(duì)應(yīng)分區(qū)的一個(gè)數(shù)據(jù)文件即可,較為高效
缺點(diǎn) 數(shù)據(jù)寫(xiě)入的時(shí)候,需要復(fù)制一個(gè)先前的副本再在其基礎(chǔ)上生成新的數(shù)據(jù)文件,這個(gè)過(guò)程比較耗時(shí)。且由于耗時(shí),讀請(qǐng)求讀取到的數(shù)據(jù)相對(duì)就會(huì)滯后
簡(jiǎn)稱(chēng)MOR。新插入的數(shù)據(jù)存儲(chǔ)在delta log 中。定期再將delta log合并進(jìn)行parquet數(shù)據(jù)文件。讀取數(shù)據(jù)時(shí),會(huì)將delta log跟老的數(shù)據(jù)文件做merge,得到完整的數(shù)據(jù)返回。當(dāng)然,MOR表也可以像COW表一樣,忽略delta log,只讀取最近的完整數(shù)據(jù)文件。下圖演示了MOR的兩種數(shù)據(jù)讀寫(xiě)方式
優(yōu)點(diǎn) 由于寫(xiě)入數(shù)據(jù)先寫(xiě)delta log,且delta log較小,所以寫(xiě)入成本較低
缺點(diǎn) 需要定期合并整理compact,否則碎片文件較多。讀取性能較差,因?yàn)樾枰獙elta log 和 老數(shù)據(jù)文件合并
我在github上放置了基于Hudi的封裝實(shí)現(xiàn),對(duì)應(yīng)的源碼地址為 https://github.com/wanqiufeng/hudi-learn。
binlog-consumer分支使用Spark streaming消費(fèi)kafka中的Binlog數(shù)據(jù),并寫(xiě)入Hudi表。Kafka中的binlog是通過(guò)阿里的Canal工具同步拉取的。程序入口是CanalKafkaImport2Hudi,它提供了一系列參數(shù),配置程序的執(zhí)行行為
參數(shù)名 | 含義 | 是否必填 | 默認(rèn)值 |
---|---|---|---|
--base-save-path | hudi表存放在HDFS的基礎(chǔ)路徑,比如hdfs://192.168.16.181:8020/hudi_data/ | 是 | 無(wú) |
--mapping-mysql-db-name | 指定處理的Mysql庫(kù)名 | 是 | 無(wú) |
--mapping-mysql-table-name | 指定處理的Mysql表名 | 是 | 無(wú) |
--store-table-name | 指定Hudi的表名 | 否 | 默認(rèn)會(huì)根據(jù)--mapping-mysql-db-name和--mapping-mysql-table-name自動(dòng)生成。假設(shè)--mapping-mysql-db-name 為crm,--mapping-mysql-table-name為order。那么最終的hudi表名為crm__order |
--real-save-path | 指定hudi表最終存儲(chǔ)的hdfs路徑 | 否 | 默認(rèn)根據(jù)--base-save-path和--store-table-name自動(dòng)生成,生成格式為'--base-save-path'+'/'+'--store-table-name' ,推薦默認(rèn) |
--primary-key | 指定同步的mysql表中能唯一標(biāo)識(shí)記錄的字段名 | 否 | 默認(rèn)id |
--partition-key | 指定mysql表中可以用于分區(qū)的時(shí)間字段,字段必須是timestamp 或dateime類(lèi)型 | 是 | 無(wú) |
--precombine-key | 最終用于配置hudi的hoodie.datasource.write.precombine.field | 否 | 默認(rèn)id |
--kafka-server | 指定Kafka 集群地址 | 是 | 無(wú) |
--kafka-topic | 指定消費(fèi)kafka的隊(duì)列 | 是 | 無(wú) |
--kafka-group | 指定消費(fèi)kafka的group | 否 | 默認(rèn)在存儲(chǔ)表名前加'hudi'前綴,比如'hudi_crm__order' |
--duration-seconds | 由于本程序使用Spark streaming開(kāi)發(fā),這里指定Spark streaming微批的時(shí)長(zhǎng) | 否 | 默認(rèn)10秒 |
一個(gè)使用的demo如下
/data/opt/spark-2.4.4-bin-hadoop2.6/bin/spark-submit --class com.niceshot.hudi.CanalKafkaImport2Hudi \ --name hudi__goods \ --master yarn \ --deploy-mode cluster \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 1 \ --num-executors 1 \ --queue hudi \ --conf spark.executor.memoryOverhead=2048 \ --conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=\tmp\hudi-debug" \ --conf spark.core.connection.ack.wait.timeout=300 \ --conf spark.locality.wait=100 \ --conf spark.streaming.backpressure.enabled=true \ --conf spark.streaming.receiver.maxRate=500 \ --conf spark.streaming.kafka.maxRatePerPartition=200 \ --conf spark.ui.retainedJobs=10 \ --conf spark.ui.retainedStages=10 \ --conf spark.ui.retainedTasks=10 \ --conf spark.worker.ui.retainedExecutors=10 \ --conf spark.worker.ui.retainedDrivers=10 \ --conf spark.sql.ui.retainedExecutions=10 \ --conf spark.yarn.submit.waitAppCompletion=false \ --conf spark.yarn.maxAppAttempts=4 \ --conf spark.yarn.am.attemptFailuresValidityInterval=1h \ --conf spark.yarn.max.executor.failures=20 \ --conf spark.yarn.executor.failuresValidityInterval=1h \ --conf spark.task.maxFailures=8 \ /data/opt/spark-applications/hudi_canal_consumer/hudi-canal-import-1.0-SNAPSHOT-jar-with-dependencies.jar --kafka-server local:9092 --kafka-topic dt_streaming_canal_xxx --base-save-path hdfs://192.168.2.1:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name order --primary-key id --partition-key createDate --duration-seconds 1200
history_import_and_meta_sync
分支提供了將歷史數(shù)據(jù)同步至hudi表,以及將hudi表數(shù)據(jù)結(jié)構(gòu)同步至hive meta的操作
這里采用的思路是
將mysql全量數(shù)據(jù)通過(guò)注入sqoop等工具,導(dǎo)入到hive表。
然后采用分支代碼中的工具HiveImport2HudiConfig,將數(shù)據(jù)導(dǎo)入Hudi表
HiveImport2HudiConfig提供了如下一些參數(shù),用于配置程序執(zhí)行行為
參數(shù)名 | 含義 | 是否必填 | 默認(rèn)值 |
---|---|---|---|
--base-save-path | hudi表存放在HDFS的基礎(chǔ)路徑,比如hdfs://192.168.16.181:8020/hudi_data/ | 是 | 無(wú) |
--mapping-mysql-db-name | 指定處理的Mysql庫(kù)名 | 是 | 無(wú) |
--mapping-mysql-table-name | 指定處理的Mysql表名 | 是 | 無(wú) |
--store-table-name | 指定Hudi的表名 | 否 | 默認(rèn)會(huì)根據(jù)--mapping-mysql-db-name和--mapping-mysql-table-name自動(dòng)生成。假設(shè)--mapping-mysql-db-name 為crm,--mapping-mysql-table-name為order。那么最終的hudi表名為crm__order |
--real-save-path | 指定hudi表最終存儲(chǔ)的hdfs路徑 | 否 | 默認(rèn)根據(jù)--base-save-path和--store-table-name自動(dòng)生成,生成格式為'--base-save-path'+'/'+'--store-table-name' ,推薦默認(rèn) |
--primary-key | 指定同步的hive歷史表中能唯一標(biāo)識(shí)記錄的字段名 | 否 | 默認(rèn)id |
--partition-key | 指定hive歷史表中可以用于分區(qū)的時(shí)間字段,字段必須是timestamp 或dateime類(lèi)型 | 是 | 無(wú) |
--precombine-key | 最終用于配置hudi的hoodie.datasource.write.precombine.field | 否 | 默認(rèn)id |
--sync-hive-db-name | 全量歷史數(shù)據(jù)所在hive的庫(kù)名 | 是 | 無(wú) |
--sync-hive-table-name | 全量歷史數(shù)據(jù)所在hive的表名 | 是 | 無(wú) |
--hive-base-path | hive的所有數(shù)據(jù)文件存放地址,需要參看具體的hive配置 | 否 | /user/hive/warehouse |
--hive-site-path | hive-site.xml配置文件所在的地址 | 是 | 無(wú) |
--tmp-data-path | 程序執(zhí)行過(guò)程中臨時(shí)文件存放路徑。一般默認(rèn)路徑是/tmp。有可能出現(xiàn)/tmp所在磁盤(pán)太小,而導(dǎo)致歷史程序執(zhí)行失敗的情況。當(dāng)出現(xiàn)該情況時(shí),可以通過(guò)該參數(shù)自定義執(zhí)行路徑 | 否 | 默認(rèn)操作系統(tǒng)臨時(shí)目錄 |
一個(gè)程序執(zhí)行demo
nohup java -jar hudi-learn-1.0-SNAPSHOT.jar --sync-hive-db-name hudi_temp --sync-hive-table-name crm__wx_user_info --base-save-path hdfs://192.168.2.2:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name "order" --primary-key "id" --partition-key created_date --hive-site-path /etc/lib/hive/conf/hive-site.xml --tmp-data-path /data/tmp > order.log &
需要將hudi的數(shù)據(jù)結(jié)構(gòu)和分區(qū),以hive外表的形式同步至Hive meta,才能是Hive感知到hudi數(shù)據(jù),并通過(guò)sql進(jìn)行查詢(xún)分析。Hudi本身在消費(fèi)Binlog進(jìn)行存儲(chǔ)時(shí),可以順帶將相關(guān)表元數(shù)據(jù)信息同步至hive。但考慮到每條寫(xiě)入Apache Hudi表的數(shù)據(jù),都要讀寫(xiě)Hive Meta ,對(duì)Hive的性能可能影響很大。所以我單獨(dú)開(kāi)發(fā)了HiveMetaSyncConfig工具,用于同步hudi表元數(shù)據(jù)至Hive。考慮到目前程序只支持按天分區(qū),所以同步工具可以一天執(zhí)行一次即可。參數(shù)配置如下
參數(shù)名 | 含義 | 是否必填 | 默認(rèn)值 |
---|---|---|---|
--hive-db-name | 指定hudi表同步至哪個(gè)hive數(shù)據(jù)庫(kù) | 是 | 無(wú) |
--hive-table-name | 指定hudi表同步至哪個(gè)hive表 | 是 | 無(wú) |
--hive-jdbc-url | 指定hive meta的jdbc鏈接地址,例如jdbc:hive2://192.168.16.181:10000 | 是 | 無(wú) |
--hive-user-name | 指定hive meta的鏈接用戶(hù)名 | 否 | 默認(rèn)hive |
--hive-pwd | 指定hive meta的鏈接密碼 | 否 | 默認(rèn)hive |
--hudi-table-path | 指定hudi表所在hdfs的文件路徑 | 是 | 無(wú) |
--hive-site-path | 指定hive的hive-site.xml路徑 | 是 | 無(wú) |
一個(gè)程序執(zhí)行demo
java -jar hudi-learn-1.0-SNAPSHOT.jar --hive-db-name streaming --hive-table-name crm__order --hive-user-name hive --hive-pwd hive --hive-jdbc-url jdbc:hive2://192.168.16.181:10000 --hudi-table-path hdfs://192.168.16.181:8020/hudi_table/crm__order --hive-site-path /lib/hive/conf/hive-site.xml
有些hive集群的hive.input.format配置,默認(rèn)是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,這會(huì)導(dǎo)致掛載Hudi數(shù)據(jù)的Hive外表讀取到所有Hudi的Parquet數(shù)據(jù),從而導(dǎo)致最終的讀取結(jié)果重復(fù)。需要將hive的format改為org.apache.hadoop.hive.ql.io.HiveInputFormat
,為了避免在整個(gè)集群層面上更改對(duì)其余離線Hive Sql造成不必要的影響,建議只對(duì)當(dāng)前hive session設(shè)置set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
由于binlog寫(xiě)入Hudi表的是基于Spark streaming實(shí)現(xiàn)的,這里給出了一些spark 和spark streaming層面的配置,它能使整個(gè)程序工作更穩(wěn)定
配置 | 含義 |
---|---|
spark.streaming.backpressure.enabled=true | 啟動(dòng)背壓,該配置能使Spark Streaming消費(fèi)速率,基于上一次的消費(fèi)情況,進(jìn)行調(diào)整,避免程序崩潰 |
spark.ui.retainedJobs=10 <br> spark.ui.retainedStages=10 <br> spark.ui.retainedTasks=10 <br>spark.worker.ui.retainedExecutors=10 <br>spark.worker.ui.retainedDrivers=10 <br>spark.sql.ui.retainedExecutions=10 | 默認(rèn)情況下,spark 會(huì)在driver中存儲(chǔ)一些spark 程序執(zhí)行過(guò)程中各stage和task的歷史信息,當(dāng)driver內(nèi)存過(guò)小時(shí),可能使driver崩潰,通過(guò)上述參數(shù),調(diào)節(jié)這些歷史數(shù)據(jù)存儲(chǔ)的條數(shù),從而減小對(duì)內(nèi)層使用 |
spark.yarn.maxAppAttempts=4 | 配置當(dāng)driver崩潰后,嘗試重啟的次數(shù) |
spark.yarn.am.attemptFailuresValidityInterval=1h | 假若driver執(zhí)行一周才崩潰一次,那我們更希望每次都能重啟,而上述配置在累計(jì)到重啟4次后,driver就再也不會(huì)被重啟,該配置則用于重置maxAppAttempts的時(shí)間間隔 |
spark.yarn.max.executor.failures=20 | executor執(zhí)行也可能失敗,失敗后集群會(huì)自動(dòng)分配新的executor, 該配置用于配置允許executor失敗的次數(shù),超過(guò)次數(shù)后程序會(huì)報(bào)(reason: Max number of executor failures (400) reached),并退出 |
spark.yarn.executor.failuresValidityInterval=1h | 指定executor失敗重分配次數(shù)重置的時(shí)間間隔 |
spark.task.maxFailures=8 | 允許任務(wù)執(zhí)行失敗的次數(shù) |
支持無(wú)分區(qū),或非日期分區(qū)表。目前只支持日期分區(qū)表
多數(shù)據(jù)類(lèi)型支持,目前為了程序的穩(wěn)定性,會(huì)將Mysql中的字段全部以String類(lèi)型存儲(chǔ)至Hudi
看完上述內(nèi)容,你們掌握Apache Hudi使用是怎么樣的的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。