溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

flink mysql數(shù)據(jù)接入的方法

發(fā)布時間:2021-06-24 10:42:35 來源:億速云 閱讀:749 作者:chen 欄目:大數(shù)據(jù)

這篇文章主要講解了“flink mysql數(shù)據(jù)接入的方法”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“flink mysql數(shù)據(jù)接入的方法”吧!

一.api方式接入

1.添加依賴

<dependency>        

            <groupId>com.alibaba.ververica</groupId>        

            <artifactId>flink-connector-mysql-cdc</artifactId>        

            <version>          1.1          .          0          </version>        

</dependency>        

2.API代碼

public                     static                     void                     main(String[] args)           throws                     Exception {        

              SourceFunction<String> sourceFunction = MySQLSource.<String>builder()        

                .hostname(          "localhost"          )        

                .port(          3306          )        

                .databaseList(          "test"          )        

                .tableList(          "test"          )        

                .deserializer(          new                     StringDebeziumDeserializationSchema())        

                .build();        

 

              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        

              env.addSource(sourceFunction)        

                .print().setParallelism(          1          );        

              env.execute();        

            }        

二.sql方式接入

1.添加jar包至lib下

flink-sql-connector-mysql-cdc_1.1.0.jar

2.mysql中創(chuàng)建表

create                     table                     test(        

id           INT          ,        

name                     varchar          (100),        

description           varchar          (100),        

weight           DECIMAL          (10,3)        

)        

3.創(chuàng)建元數(shù)據(jù)

CREATE                     TABLE                     mysql_test (        

           id           INT                     NOT                     NULL          ,        

           name                     STRING,        

           description STRING,        

           weight           DECIMAL          (10,3)        

         WITH                     (        

           'connector'                               'mysql-cdc'          ,        

           'hostname'                               'localhost'          ,        

           'port'                               '3306'          ,        

           'username'          =          'root'          ,        

           'password'          =          'root'          ,        

           'database-name'                               'test'          ,        

           'table-name'                               'test'        

);        

4.使用查詢sql

SELECT                     id,           UPPER          (          name          ), description, weight           FROM                     mysql_test;        

5.增加和刪除表字段測試

增加不影響

刪除表字段后,會出錯

[ERROR] Could not execute SQL statement. Reason:
org.apache.kafka.connect.errors.DataException: name is not a valid field name

注:mysql的版本如果是8.0,flink端鏈接會出錯

com.github.shyiko.mysql.binlog.network.AuthenticationException: Client does not support authentication protocol requested by server; consider upgrading MySQL client 

出現(xiàn)上述問題的原因是:mysql8 之前的版本中加密規(guī)則是mysql_native_password,而在mysql8之后,加密規(guī)則是caching_sha2_password 把mysql用戶登錄密碼加密規(guī)則還原成mysql_native_password

解決方案:在mysql中執(zhí)行以下命令

alter user 'root'@'%' identified with mysql_native_password by 'root';  修改認(rèn)證規(guī)則

flush privileges;  刷新權(quán)限

感謝各位的閱讀,以上就是“flink mysql數(shù)據(jù)接入的方法”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對flink mysql數(shù)據(jù)接入的方法這一問題有了更深刻的體會,具體使用情況還需要大家實(shí)踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI