溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何理解flink 1.11 中的JDBC Catalog

發(fā)布時間:2021-11-23 18:14:01 來源:億速云 閱讀:236 作者:柒染 欄目:大數(shù)據(jù)

今天就跟大家聊聊有關(guān)如何理解flink 1.11 中的JDBC Catalog,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

 

背景

1.11.0 之前,用戶如果依賴 Flink 的 source/sink 讀寫關(guān)系型數(shù)據(jù)庫或讀取 changelog 時,必須要手動創(chuàng)建對應(yīng)的 schema。但是這樣會有一個問題,當(dāng)數(shù)據(jù)庫中的 schema 發(fā)生變化時,也需要手動更新對應(yīng)的 Flink 任務(wù)以保持類型匹配,任何不匹配都會造成運(yùn)行時報錯使作業(yè)失敗。這個操作冗余且繁瑣,體驗(yàn)極差。

實(shí)際上對于任何和 Flink 連接的外部系統(tǒng)都可能有類似的上述問題,在 1.11.0 中重點(diǎn)解決了和關(guān)系型數(shù)據(jù)庫對接的這個問題。提供了 JDBC catalog 的基礎(chǔ)接口以及 Postgres catalog 的實(shí)現(xiàn),這樣方便后續(xù)實(shí)現(xiàn)與其它類型的關(guān)系型數(shù)據(jù)庫的對接。

1.11.0 版本后,用戶使用 Flink SQL 時可以自動獲取表的 schema 而不再需要輸入 DDL。除此之外,任何 schema 不匹配的錯誤都會在編譯階段提前進(jìn)行檢查報錯,避免了之前運(yùn)行時報錯造成的作業(yè)失敗。

 

示例

目前對于jdbc catalog,flink僅提供了postgres catalog,我們基于postgres的catalog講解一下如何使用flink的catalog ,

  • 引入pom
   <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.5</version>
        </dependency>

 
  • 新建PostgresCatalog    
    目前flink通過一個靜態(tài)類來創(chuàng)建相相應(yīng)的jdbc  catalog,對于PostgresCatalog,沒有提供public類型的構(gòu)造方法。

通過JdbcCatalogUtils.createCatalog構(gòu)造PostgresCatalog時這五個參數(shù)都是必填項(xiàng),其中baseUrl要求是不能帶有數(shù)據(jù)庫名的

  String catalogName = "mycatalog";
  String defaultDatabase = "postgres";
  String username = "postgres";
  String pwd = "postgres";
  String baseUrl = "jdbc:postgresql://localhost:5432/";

  PostgresCatalog postgresCatalog = (PostgresCatalog) JdbcCatalogUtils.createCatalog(
    catalogName,
    defaultDatabase,
    username,
    pwd,
    baseUrl);
 

訪問postgres 數(shù)據(jù)庫指定表名的時候完整的路徑名應(yīng)該是以下格式:

<catalog>.<db>.`<schema.table>`
 

其中schema默認(rèn)是public,如果是使用缺省值,public是可以省略的。比如下面的查詢語句:

SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;
 

如果非缺省schema,則不能被省略:

SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
 
  • 常見操作

我們PostgresCatalog將注冊到StreamTableEnvironment 的變量tEnv中,然后就可以用tEnv進(jìn)行一些操作了。

 tEnv.registerCatalog(postgresCatalog.getName(), postgresCatalog);
  tEnv.useCatalog(postgresCatalog.getName());
 
  1. 列出來所有的數(shù)據(jù)庫:
        System.out.println("list databases :");
  String[] databases = tEnv.listDatabases();
  Stream.of(databases).forEach(System.out::println);
 
  1. 列出來所有的table
     tEnv.useDatabase(defaultDatabase);
  System.out.println("list tables :");
  String[] tables = tEnv.listTables(); // 也可以使用  postgresCatalog.listTables(defaultDatabase);
  Stream.of(tables).forEach(System.out::println);
 
  1. 列出所有函數(shù)
        System.out.println("list functions :");
  String[] functions = tEnv.listFunctions();
  Stream.of(functions).forEach(System.out::println);
 
  1. 獲取table的schema
 CatalogBaseTable catalogBaseTable = postgresCatalog.getTable(new ObjectPath(
    defaultDatabase,
    "table1"));

  TableSchema tableSchema = catalogBaseTable.getSchema();
  System.out.println("tableSchema --------------------- :");
  System.out.println(tableSchema);
 
  1. 查詢表的數(shù)據(jù)
  List<Row> results = Lists.newArrayList(tEnv.sqlQuery("select * from table1")
                                             .execute()
                                             .collect());
  results.stream().forEach(System.out::println);
 
  1. 插入數(shù)據(jù)
tEnv.executeSql("insert into table1 values (3,'c')");
 

完整的代碼請參考:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/catalog/PostgresCatalogTest.java

 

源碼解析

 

AbstractJdbcCatalog

這個類主要是對jdbc catalog一些公共的操作做了抽象.目前實(shí)現(xiàn)了實(shí)際功能的只有一個方法:getPrimaryKey,其他方式主要是對于Catalog的一些其他實(shí)現(xiàn)類做了特殊處理,比如類似create table 或者 alter table是不支持的,listView只是返回一個空列表,因?yàn)槲覀兪褂胘dbc catalog主要是來做一些DML操作。

 @Override
 public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
  throw new UnsupportedOperationException();
 }

 @Override
 public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
  return Collections.emptyList();
 }
   

PostgresCatalog

在這里面,主要是實(shí)現(xiàn)了一些常用的操作數(shù)據(jù)庫的方法,比如getTable、listTables、listDatabases等等,其實(shí)簡單的來說就是從postgres元數(shù)據(jù)庫里查詢出來相應(yīng)的信息,然后組裝成flink的相關(guān)對象,返回給調(diào)用方。以一個簡單的方法listDatabases為例:

從元數(shù)據(jù)表pg_database中查詢所有的tablename,然后去掉內(nèi)置的數(shù)據(jù)庫,也就是template0和template1,然后封裝到一個list對象里,返回。

 @Override
 public List<String> listDatabases() throws CatalogException {
  List<String> pgDatabases = new ArrayList<>();

  try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {

   PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;");

   ResultSet rs = ps.executeQuery();

   while (rs.next()) {
    String dbName = rs.getString(1);
    if (!builtinDatabases.contains(dbName)) {
     pgDatabases.add(rs.getString(1));
    }
   }

   return pgDatabases;
  } catch (Exception e) {
   throw new CatalogException(
    String.format("Failed listing database in catalog %s", getName()), e);
  }
 }
 

有不兼容的地方需要做一些轉(zhuǎn)換,比如getTable方法,有些數(shù)據(jù)類型是不匹配的,要做一些類型的匹配,如postgres里面的serial和int4都會轉(zhuǎn)成flink的int類型,具體的參考下PostgresCatalog#fromJDBCType方法。

 private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
  String pgType = metadata.getColumnTypeName(colIndex);

  int precision = metadata.getPrecision(colIndex);
  int scale = metadata.getScale(colIndex);

  switch (pgType) {
   case PG_BOOLEAN:
    return DataTypes.BOOLEAN();
   case PG_BOOLEAN_ARRAY:
    return DataTypes.ARRAY(DataTypes.BOOLEAN());
   case PG_BYTEA:
    return DataTypes.BYTES();
    .........................


看完上述內(nèi)容,你們對如何理解flink 1.11 中的JDBC Catalog有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI