您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關(guān)如何理解flink 1.11 中的JDBC Catalog,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
1.11.0 之前,用戶如果依賴 Flink 的 source/sink 讀寫關(guān)系型數(shù)據(jù)庫或讀取 changelog 時,必須要手動創(chuàng)建對應(yīng)的 schema。但是這樣會有一個問題,當(dāng)數(shù)據(jù)庫中的 schema 發(fā)生變化時,也需要手動更新對應(yīng)的 Flink 任務(wù)以保持類型匹配,任何不匹配都會造成運(yùn)行時報錯使作業(yè)失敗。這個操作冗余且繁瑣,體驗(yàn)極差。
實(shí)際上對于任何和 Flink 連接的外部系統(tǒng)都可能有類似的上述問題,在 1.11.0 中重點(diǎn)解決了和關(guān)系型數(shù)據(jù)庫對接的這個問題。提供了 JDBC catalog 的基礎(chǔ)接口以及 Postgres catalog 的實(shí)現(xiàn),這樣方便后續(xù)實(shí)現(xiàn)與其它類型的關(guān)系型數(shù)據(jù)庫的對接。
1.11.0 版本后,用戶使用 Flink SQL 時可以自動獲取表的 schema 而不再需要輸入 DDL。除此之外,任何 schema 不匹配的錯誤都會在編譯階段提前進(jìn)行檢查報錯,避免了之前運(yùn)行時報錯造成的作業(yè)失敗。
目前對于jdbc catalog,flink僅提供了postgres catalog,我們基于postgres的catalog講解一下如何使用flink的catalog ,
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.5</version>
</dependency>
通過JdbcCatalogUtils.createCatalog構(gòu)造PostgresCatalog時這五個參數(shù)都是必填項(xiàng),其中baseUrl要求是不能帶有數(shù)據(jù)庫名的
String catalogName = "mycatalog";
String defaultDatabase = "postgres";
String username = "postgres";
String pwd = "postgres";
String baseUrl = "jdbc:postgresql://localhost:5432/";
PostgresCatalog postgresCatalog = (PostgresCatalog) JdbcCatalogUtils.createCatalog(
catalogName,
defaultDatabase,
username,
pwd,
baseUrl);
訪問postgres 數(shù)據(jù)庫指定表名的時候完整的路徑名應(yīng)該是以下格式:
<catalog>.<db>.`<schema.table>`
其中schema默認(rèn)是public,如果是使用缺省值,public是可以省略的。比如下面的查詢語句:
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;
如果非缺省schema,則不能被省略:
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
我們PostgresCatalog將注冊到StreamTableEnvironment 的變量tEnv中,然后就可以用tEnv進(jìn)行一些操作了。
tEnv.registerCatalog(postgresCatalog.getName(), postgresCatalog);
tEnv.useCatalog(postgresCatalog.getName());
System.out.println("list databases :");
String[] databases = tEnv.listDatabases();
Stream.of(databases).forEach(System.out::println);
tEnv.useDatabase(defaultDatabase);
System.out.println("list tables :");
String[] tables = tEnv.listTables(); // 也可以使用 postgresCatalog.listTables(defaultDatabase);
Stream.of(tables).forEach(System.out::println);
System.out.println("list functions :");
String[] functions = tEnv.listFunctions();
Stream.of(functions).forEach(System.out::println);
CatalogBaseTable catalogBaseTable = postgresCatalog.getTable(new ObjectPath(
defaultDatabase,
"table1"));
TableSchema tableSchema = catalogBaseTable.getSchema();
System.out.println("tableSchema --------------------- :");
System.out.println(tableSchema);
List<Row> results = Lists.newArrayList(tEnv.sqlQuery("select * from table1")
.execute()
.collect());
results.stream().forEach(System.out::println);
tEnv.executeSql("insert into table1 values (3,'c')");
完整的代碼請參考:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/catalog/PostgresCatalogTest.java
這個類主要是對jdbc catalog一些公共的操作做了抽象.目前實(shí)現(xiàn)了實(shí)際功能的只有一個方法:getPrimaryKey,其他方式主要是對于Catalog的一些其他實(shí)現(xiàn)類做了特殊處理,比如類似create table 或者 alter table是不支持的,listView只是返回一個空列表,因?yàn)槲覀兪褂胘dbc catalog主要是來做一些DML操作。
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
return Collections.emptyList();
}
在這里面,主要是實(shí)現(xiàn)了一些常用的操作數(shù)據(jù)庫的方法,比如getTable、listTables、listDatabases等等,其實(shí)簡單的來說就是從postgres元數(shù)據(jù)庫里查詢出來相應(yīng)的信息,然后組裝成flink的相關(guān)對象,返回給調(diào)用方。以一個簡單的方法listDatabases為例:
從元數(shù)據(jù)表pg_database中查詢所有的tablename,然后去掉內(nèi)置的數(shù)據(jù)庫,也就是template0和template1,然后封裝到一個list對象里,返回。
@Override
public List<String> listDatabases() throws CatalogException {
List<String> pgDatabases = new ArrayList<>();
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;");
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String dbName = rs.getString(1);
if (!builtinDatabases.contains(dbName)) {
pgDatabases.add(rs.getString(1));
}
}
return pgDatabases;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", getName()), e);
}
}
有不兼容的地方需要做一些轉(zhuǎn)換,比如getTable方法,有些數(shù)據(jù)類型是不匹配的,要做一些類型的匹配,如postgres里面的serial和int4都會轉(zhuǎn)成flink的int類型,具體的參考下PostgresCatalog#fromJDBCType方法。
private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
String pgType = metadata.getColumnTypeName(colIndex);
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
switch (pgType) {
case PG_BOOLEAN:
return DataTypes.BOOLEAN();
case PG_BOOLEAN_ARRAY:
return DataTypes.ARRAY(DataTypes.BOOLEAN());
case PG_BYTEA:
return DataTypes.BYTES();
.........................
看完上述內(nèi)容,你們對如何理解flink 1.11 中的JDBC Catalog有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。