您好,登錄后才能下訂單哦!
本篇內容主要講解“Vertica集成Apache Hudi重磅使用的方法”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Vertica集成Apache Hudi重磅使用的方法”吧!
本文演示了使用外部表集成 Vertica 和 Apache Hudi。 在演示中我們使用 Spark 上的 Apache Hudi 將數(shù)據(jù)攝取到 S3 中,并使用 Vertica 外部表訪問這些數(shù)據(jù)。
Apache Hudi 是一種變更數(shù)據(jù)捕獲 (CDC) 工具,可在不同時間線將事務記錄在表中。 Hudi 代表 Hadoop Upserts Deletes and Incrementals,是一個開源框架。 Hudi 提供 ACID 事務、可擴展的元數(shù)據(jù)處理,并統(tǒng)一流和批處理數(shù)據(jù)處理。
以下流程圖說明了該過程。 使用安裝在 Apache Spark 上的 Hudi 將數(shù)據(jù)處理到 S3,并從 Vertica 外部表中讀取 S3 中的數(shù)據(jù)更改。
Apache Spark 環(huán)境。 使用具有 1 個 Master 和 3 個 Worker 的 4 節(jié)點集群進行了測試。 按照在多節(jié)點集群上設置 Apache Spark 中的說明安裝 Spark 集群環(huán)境。 啟動 Spark 多節(jié)點集群。
Vertica 分析數(shù)據(jù)庫。 使用 Vertica Enterprise 11.0.0 進行了測試。
AWS S3 或 S3 兼容對象存儲。 使用 MinIO 作為 S3 存儲桶進行了測試。
需要以下 jar 文件。將 jar 復制到 Spark 機器上任何需要的位置,將這些 jar 文件放在 /opt/spark/jars 中。
Hadoop - hadoop-aws-2.7.3.jar
AWS - aws-java-sdk-1.7.4.jar
在 Vertica 數(shù)據(jù)庫中運行以下命令來設置訪問存儲桶的 S3 參數(shù):
SELECT SET_CONFIG_PARAMETER('AWSAuth', 'accesskey:secretkey'); SELECT SET_CONFIG_PARAMETER('AWSRegion','us-east-1'); SELECT SET_CONFIG_PARAMETER('AWSEndpoint','<S3_IP>:9000'); SELECT SET_CONFIG_PARAMETER('AWSEnableHttps','0');
endpoint可能會有所不同,具體取決于 S3 存儲桶位置選擇的 S3 對象存儲。
要將 Vertica 與 Apache Hudi 集成,首先需要將 Apache Spark 與 Apache Hudi 集成,配置 jars,以及訪問 AWS S3 的連接。 其次,將 Vertica 連接到 Apache Hudi。 然后對 S3 存儲桶執(zhí)行 Insert、Append、Update 等操作。
按照以下部分中的步驟將數(shù)據(jù)寫入 Vertica。
在 Apache Spark 上配置 Apache Hudi 和 AWS S3
配置 Vertica 和 Apache Hudi 集成
在 Apache Spark 機器中運行以下命令。
這會下載 Apache Hudi 包,配置 jar 文件,以及 AWS S3
/opt/spark/bin/spark-shell \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"\--packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.0.1
導入Hudi的讀、寫等所需的包:
import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._
使用以下命令根據(jù)需要配置 Minio 訪問密鑰、Secret key、Endpoint 和其他 S3A 算法和路徑。
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "*****") spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "*****") spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "http://XXXX.9000") spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true") sc.hadoopConfiguration.set("fs.s3a.signing-algorithm","S3SignerType")
創(chuàng)建變量來存儲 MinIO 的表名和 S3 路徑。
val tableName = “Trips” val basepath = “s3a://apachehudi/vertica/”
準備數(shù)據(jù),使用 Scala 在 Apache spark 中創(chuàng)建示例數(shù)據(jù)
val df = Seq( ("aaa","r1","d1",10,"US","20211001"), ("bbb","r2","d2",20,"Europe","20211002"), ("ccc","r3","d3",30,"India","20211003"), ("ddd","r4","d4",40,"Europe","20211004"), ("eee","r5","d5",50,"India","20211005"), ).toDF("uuid", "rider", "driver","fare","partitionpath","ts")
將數(shù)據(jù)寫入 AWS S3 并驗證此數(shù)據(jù)
df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
使用 Scala 運行以下命令以驗證是否從 S3 存儲桶中正確讀取數(shù)據(jù)。
spark.read.format("hudi").load(basePath).createOrReplaceTempView("dta") spark.sql("select _hoodie_commit_time, uuid, rider, driver, fare,ts, partitionpath from dta order by uuid").show()
在 vertica 中創(chuàng)建一個外部表,其中包含來自 S3 上 Hudi 表的數(shù)據(jù)。 我們創(chuàng)建了“旅行”表。
CREATE EXTERNAL TABLE Trips ( _hoodie_commit_time TimestampTz, uuid varchar, rider varchar, driver varchar, fare int, ts varchar, partitionpath varchar ) AS COPY FROM 's3a://apachehudi/parquet/vertica/*/*.parquet' PARQUET;
運行以下命令以驗證正在讀取外部表:
以下部分包含為查看 Vertica 中更改的數(shù)據(jù)而執(zhí)行的一些操作的示例。
在這個例子中,我們使用 Scala 在 Apache spark 中運行了以下命令并附加了一些數(shù)據(jù):
val df2 = Seq( ("fff","r6","d6",50,"India","20211005") ).toDF("uuid", "rider", "driver","fare","partitionpath","ts")
運行以下命令將此數(shù)據(jù)附加到 S3 上的 Hudi 表中:
df2.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)
在這個例子中,我們更新了一條 Hudi 表的記錄。 需要導入數(shù)據(jù)以觸發(fā)并更新數(shù)據(jù):
val df3 = Seq( ("aaa","r1","d1",100,"US","20211001"), ("eee","r5","d5",500,"India","20211001") ).toDF("uuid", "rider", "driver","fare","partitionpath","ts")
運行以下命令將數(shù)據(jù)更新到 S3 上的 HUDI 表:
df3.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)
以下是 spark.sql 的輸出:
以下是 Vertica 輸出:
執(zhí)行以下指向特定時間戳的 spark 命令:
val dd = spark.read .format("hudi") .option("as.of.instant", "20211007092600") .load(basePath)
使用以下命令將數(shù)據(jù)寫入 S3 中的 parquet:
dd.write.parquet("s3a://apachehudi/parquet/p2")
在此示例中,我們正在讀取截至“20211007092600”日期的 Hudi 表快照。
dd.show
通過在 parquet 文件上創(chuàng)建外部表從 Vertica 執(zhí)行命令。
到此,相信大家對“Vertica集成Apache Hudi重磅使用的方法”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續(xù)學習!
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。