溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Flink與數(shù)據(jù)庫集成方法是什么

發(fā)布時(shí)間:2021-12-22 13:34:46 來源:億速云 閱讀:180 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要介紹“Flink與數(shù)據(jù)庫集成方法是什么”,在日常操作中,相信很多人在Flink與數(shù)據(jù)庫集成方法是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Flink與數(shù)據(jù)庫集成方法是什么”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

JDBC-Connector 的重構(gòu)

JDBC Connector 在 Flink 1.11 版本發(fā)生了比較大的變化,我們先從以下幾個(gè) Feature 來具體了解一下 Flink 社區(qū)在這個(gè)版本上對 JDBC 所做的改進(jìn)。

  • FLINK-15782 :Rework JDBC Sinks[1] (重寫 JDBC Sink)


這個(gè) issue 主要為 DataStream API 新增了 JdbcSink,對于使用 DataStream 編程的用戶會更加方便地把數(shù)據(jù)寫入到 JDBC;并且規(guī)范了一些命名規(guī)則,以前命名使用的是 JDBC 加上連接器名稱,目前命名規(guī)范為 Jdbc+ 連接器名稱

  • FLINK-17537:Refactor flink-jdbc connector structure[2] (重構(gòu) flink-jdbc 連接器的結(jié)構(gòu))


這個(gè) issue 將 flink-jdbc 包名重命名為 flink-connector-jdbc,與 Flink 的其他 connector 統(tǒng)一,將所有接口和類從 org.apache.flink.java.io.jdbc(舊包)規(guī)范為新包路徑 org.apache.flink.connector.jdbc(新包),通過這種重命名用戶在對底層源代碼的閱讀上面會更加容易的理解和統(tǒng)一。

  • FLIP-95: New TableSource and TableSink interfaces[3] (新的 TableSource 和 TableSink 接口)


由于早期數(shù)據(jù)類型系統(tǒng)并不是很完善,導(dǎo)致了比較多的 Connector 在使用上會經(jīng)常報(bào)數(shù)據(jù)類型相關(guān)的異常,例如 DECIMAL 精度類型,在以往的 Flink 1.10 版本中有可能出現(xiàn)下圖問題:

Flink與數(shù)據(jù)庫集成方法是什么


基于 FLIP-95 新的 TableSource 和 TableSink 在精度支持方面做了重構(gòu),目前數(shù)據(jù)精度方面的支持已經(jīng)很完善了。

  • FLIP-122:New Connector Property Keys for New Factory[4](新的連接器參數(shù))


在 Flink 1.11 版本中,我們對 DDL 的 WITH 參數(shù)相對于 1.10 版本做了簡化,從用戶視角看上就是簡化和規(guī)范了參數(shù),如表格所示:

Old Key (Flink 1.10)    
New Key (Flink 1.11)    
connector.type    
connector.type    
connector.url    
url    
connector.table    
table-name    
connector.driver    
driver    
connector.username    
username    
connector.password    
password    
connector.read.partition.column    
scan.partition.column    
connector.read.partition.num    
scan.partition.num    
connector.read.partition.lower-bound    
scan.partition.lower-bound    
connector.read.partition.upper-bound    
scan.partition.upper-bound    
connector.read.fetch-size    
scan.fetch-size    
connector.lookup.cache.max-rows    
lookup.cache.max-rows    
connector.lookup.cache.ttl    
lookup.cache.ttl    
connector.lookup.max-retries    
lookup.max-retries    
connector.write.flush.max-rows    
sink.buffer-flush.max-rows    
connector.write.flush.interval    
sink.buffer-flush.interval    
connector.write.max-retries    
sink.max-retries    

大家可以看到表格中有 3 個(gè)標(biāo)紅的地方,這個(gè)是相對于 1.10 有發(fā)生變化比較多的地方。這次 FLIP 希望進(jìn)一步簡化連接器屬性,以便使屬性更加簡潔和可讀,并更好地與 FLIP-107 協(xié)作。如果需要了解更多的 Connector 參數(shù)可以進(jìn)一步參考官方文檔和 FLIP-122 中提到的改變,這樣有助于從舊版本遷移到新版本并了解參數(shù)的變化。

  • FLIP-87:Primary key Constraints in Table API[5] (Table API 接口中的主鍵約束問題)


Flink 1.10 存在某些 Query 無法推斷出主鍵導(dǎo)致無法進(jìn)行 Upsert 更新操作(如下圖所示錯(cuò)誤)。所以在 FLIP-87 中為 Flink SQL 引入的 Primary Key 約束。Flink 的主鍵約束遵循 SQL 標(biāo)準(zhǔn),主鍵約束分為 PRIMARY KEY NOT ENFORCED 和 PRIMARY KEY ENFORCED, ENFORCED 表示是否對數(shù)據(jù)進(jìn)行校驗(yàn)。我們常見數(shù)據(jù)庫的主鍵約束屬于 PRIMARY KEY ENFORCED,會對數(shù)據(jù)進(jìn)行校驗(yàn)。因?yàn)?Flink 并不持有數(shù)據(jù),因此 Flink 支持的主鍵模式是 PRIMARY KEY NOT ENFORCED,  這意味著 Flink 不會校驗(yàn)數(shù)據(jù),而是由用戶確保主鍵的完整性。例如 HBase 里面對應(yīng)的主鍵應(yīng)該是 RowKey,在 MySQL 中對應(yīng)的主鍵是在用戶數(shù)據(jù)庫的表中對應(yīng)的主鍵。

Flink與數(shù)據(jù)庫集成方法是什么


JDBC Catalog


目前 Flink 支持 Catalog 主要有 JDBC Catalog 和 Hive Catalog 。在關(guān)系數(shù)據(jù)庫中的表,如果要在 Flink 中使用,用戶需要手動寫表的 DDL,一旦表的 Schema 發(fā)生改變,用戶需要手動修改, 這是比較繁瑣的事情。JDBC Catalog 提供了接口用于連接到各種關(guān)系型數(shù)據(jù)庫,使得 Flink 能夠自動檢索表,不用用戶手動輸入和修改。目前 JDBC Catalog 內(nèi)置目前實(shí)現(xiàn)了 Postgres Catalog。Postgres catalog 是一個(gè) read-only (只讀)的 Catalog,只支持讀取 Postgres 表,支持的功能比較有限。下面代碼展示了目前 Postgres catalog 支持的 6 個(gè)功能:數(shù)據(jù)庫是否存在、數(shù)據(jù)庫列表、獲取數(shù)據(jù)庫、根據(jù)數(shù)據(jù)庫名獲取表列表、獲得表、表是否存在。
// The supported methods by Postgres Catalog.PostgresCatalog.databaseExists(String databaseName)PostgresCatalog.listDatabases()PostgresCatalog.getDatabase(String databaseName)PostgresCatalog.listTables(String databaseName)PostgresCatalog.getTable(ObjectPath tablePath)PostgresCatalog.tableExists(ObjectPath tablePath)

如果需要支持其他 DB (如 MySQL),需要用戶根據(jù) FLIP-93 的 JdbcCatalog 接口實(shí)現(xiàn)對應(yīng)不同的 JDBC Catalog。

JDBC Dialect


什么是 Dialect?

Dialect (方言)對各個(gè)數(shù)據(jù)庫來說,Dialect 體現(xiàn)各個(gè)數(shù)據(jù)庫的特性,比如語法、數(shù)據(jù)類型等。如果需要查看詳細(xì)的差異,可以點(diǎn)擊這里[6]查看詳細(xì)差異。下面通過對比 MySQL 和 Postgres 的一些常見場景舉例:

Dialect    
MySQL    
Postgres    
場景描述    
Grammar(語法)    
LIMIT 0,30    
WITH LIMIT 30 OFFSET 0    
分頁    
Data Type (數(shù)據(jù)類型)    
BINARY    
BYTEA,ARRAY    
字段類型    
Command (命令)    
show tables    
\dt    
查看所有表    

在數(shù)據(jù)類型上面,F(xiàn)link SQL 的數(shù)據(jù)類型目前映射規(guī)則如下:

MySQL type    
PostgreSQL type    
Flink SQL type    
TINYINT    

TINYINT    
SMALLINT    
TINYINT UNSIGNED    
SMALLINT    
INT2    
SMALLSERIAL    
SERIAL2    
SMALLINT    
INT    
MEDIUMINT    
SMALLINT    
UNSIGNED    
INTEGER    
SERIAL    
INT    
BIGINT    
INT    
UNSIGNED    
BIGINT    
BIGSERIAL    
BIGINT    
BIGINT    
UNSIGNED    

DECIMAL(20, 0)    

Flink 目前支持三種 Dialect: Derby、MySQL、PostgreSQL,Derby 主要用于測試,更多的類型映射可以點(diǎn)擊下方鏈接前往官方文檔查看。

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#data-type-mapping
如何保證 Dialect Upsert 的冪等性?

如果定義了主鍵,JDBC 寫入時(shí)是能夠保證 Upsert 語義的, 如果 DB 不支持 Upsert 語法,則會退化成 DELETE + INSERT 語義。Upsert query 是原子執(zhí)行的,可以保證冪等性。這個(gè)在官方文檔中也詳細(xì)描述了更新失敗或者存在故障時(shí)候如何做出的處理,下面的表格是不同的 DB 對應(yīng)不同的 Upsert 語法:

Database    
Upsert Grammar    
MySQL    
INSERT .. ON DUPLICATE KEY UPDATE ..    
PostgreSQL    
INSERT .. ON CONFLICT .. DO UPDATE SET ..    

如何自定義 Dialect?

目前如果要實(shí)現(xiàn)自定義 Dialect (比如 SQL Server、Oracle 等), 需要用戶自己實(shí)現(xiàn)對應(yīng) Dialect 修改源碼并重新打包 flink-connector-jdbc。社區(qū)正在討論提供一種插件化 dialect 的機(jī)制, 讓用戶可以不用修改源碼打包就能實(shí)現(xiàn)自定義 Dialect,這個(gè)機(jī)制需要把 Dialect 接口暴露給用戶。目前的 Dialect 接口不夠清晰,沒有考慮 DataStream API 的使用場景,也沒有考慮到 一些復(fù)雜的 SQL 場景,所以這個(gè)接口目前不太穩(wěn)定(后續(xù)版本會修改) 。

社區(qū)目前之所以沒有把這個(gè) API 開放給用戶,是從用戶使用的體驗(yàn)角度考慮,希望把這種頂級 API 設(shè)計(jì)得盡量穩(wěn)定、簡潔后再開放出來給用戶使用,避免用戶在后續(xù) Flink 版本的迭代中多次修改代碼。目前社區(qū)已經(jīng)有相應(yīng)的計(jì)劃去做了,大家可以留意 FLINK-16833[7]  提出的 JDBCDialect 插件化設(shè)計(jì)。

實(shí)踐 Demo


大家看完上述 Flink 1.11 在 JDBC 所做的改動后,大家可以嘗試下面這個(gè)關(guān)于商品表 CDC 同步和 ETL 的小案例,有助于理解 JDBC Catalog 和 CDC 的同步機(jī)制。

環(huán)境與版本:Flink 1.11.1、Docker、Kafka 1.11.1、MySQL Driver 5.1.48、PostgreSQL Driver 42.2.14

流程如下:

  1. Flink standalone 環(huán)境準(zhǔn)備并在提供的地址下載好對應(yīng)的安裝包和 connector jar。
  2. 測試數(shù)據(jù)準(zhǔn)備,通過拉起容器運(yùn)行已經(jīng)打包好的鏡像。其中 Kafka 中的 changelog 數(shù)據(jù)是通過 debezium connector 抓取的 MySQL orders表 的 binlog。
  3. 通過 SQL Client 編寫 SQL 作業(yè),分別創(chuàng)建 Flink 訂單表,維表,用戶表,產(chǎn)品表,并創(chuàng)建 Function UDF。從 PG Catalog 獲取結(jié)果表信息之后,把作業(yè)提交至集群執(zhí)行運(yùn)行。
  4. 測試 CDC 數(shù)據(jù)同步和維表 join,通過新增訂單、修改訂單、刪除訂單、維表數(shù)據(jù)更新等一系列操作驗(yàn)證 CDC 在 Flink 上如何運(yùn)行以及寫入結(jié)果表。

Flink與數(shù)據(jù)庫集成方法是什么


上圖為業(yè)務(wù)流程整體圖,項(xiàng)目 Demo 地址:

https://github.com/leonardBang/flink-sql-etl

問答環(huán)節(jié)


1.Flink SQL Client 上面執(zhí)行的 use default,是使用哪個(gè) catlog 呢?

答:Flink 內(nèi)部有一個(gè)內(nèi)置 Catlog,它把 meta 數(shù)據(jù)存于內(nèi)存中。在 SQL Client 上沒有顯式指定 Hive catlog 或者 jdbc catlog 時(shí)會使用內(nèi)置的 Catalog,剛剛的案例給大家演示的是 Postgres Catalog,里面有結(jié)果表。在內(nèi)置 Catlog 可以看到我們剛剛創(chuàng)建 Kafka 的表,MySQL 的維度表。

2.Flink MySQL DDL 連接 8 小時(shí)后就會自動斷開的問題是否已經(jīng)解決?

這個(gè)問題會在 1.12 版本解決此問題,目前 master 分支已經(jīng)合并,具體可以參考以下地址  ,描述了相關(guān)問題的討論和解決辦法。

3.什么是 CDC?能大概講下目前 Flink 支持的 CDC 嗎?

通過 Change Data Capture 機(jī)制(CDC)來將外部系統(tǒng)的動態(tài)數(shù)據(jù)(如 Mysql BinLog、Kafka Compacted Topic)導(dǎo)入 Flink,以及將 Flink 的 Update/Retract 流寫出到外部系統(tǒng)中是用戶一直希望的功能。

Flink 1.11 實(shí)現(xiàn)了對 CDC 數(shù)據(jù)讀取和寫出的支持。目前 Flink 可以支持 Debezium(Demo 中所用的工具) 和 Canal(阿里巴巴開源同步工具) 兩種 CDC 格式。Debezium 在國外用得比較多,Canal 在國內(nèi)用得比較多,兩者格式會有所區(qū)別,詳細(xì)可以參考官方使用文檔。

到此,關(guān)于“Flink與數(shù)據(jù)庫集成方法是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI