溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Flink CDC怎么監(jiān)聽(tīng)MySQL表

發(fā)布時(shí)間:2021-12-04 10:04:51 來(lái)源:億速云 閱讀:313 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“Flink CDC怎么監(jiān)聽(tīng)MySQL表”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Flink CDC怎么監(jiān)聽(tīng)MySQL表”吧!

// 前景提要:開(kāi)啟mysql binlog監(jiān)控。(目錄:C:\ProgramData\MySQL\MySQL Server 5.6\my.ini)ProgramData 為隱藏目錄。注意:binlog_format=ROW
// 創(chuàng)建Blink Streaming的TableEnvironmentEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);// 創(chuàng)建表,connector使用mysql-cdcbsTableEnv.executeSql("CREATE TABLE mysql_binlog " +"(id STRING, " +"times STRING, " +"temp STRING) " +"WITH " +"('connector' = 'mysql-cdc', " +" 'hostname' = '127.0.0.1', " +" 'port' = '3306', " +" 'username' = 'root', " +" 'password' = '123456', " +" 'database-name' = 'test', " +" 'table-name' = 'sersor_temp'" +")");// 打印控制臺(tái)bsTableEnv.executeSql("CREATE TABLE sink_table " +"(id STRING, " +"times STRING, " +"temp DOUBLE) " +"WITH " +"('connector' = 'print'" +")");// 將CDC數(shù)據(jù)源和下游數(shù)據(jù)表對(duì)接起來(lái)bsTableEnv.executeSql("INSERT INTO sink_table SELECT id, times, temp FROM mysql_binlog");bsTableEnv.executeSql("CREATE TABLE sink_kafka_table " +"(id STRING, " +"times STRING, " +"temp DOUBLE " +") WITH (" +" 'connector' = 'kafka'," +" 'topic' = 'test_mysql_binlog'," +" 'scan.startup.mode' = 'earliest-offset'," +" 'properties.group.id' = 'testGroup'," +" 'properties.bootstrap.servers' = 'node2:9092', " +" 'format' = 'canal-json' " +")");// 將CDC數(shù)據(jù)與 kafka表對(duì)接起來(lái)bsTableEnv.executeSql("INSERT INTO sink_kafka_table SELECT id, times, temp FROM mysql_binlog");bsTableEnv.executeSql("CREATE TABLE hTable (" +" id STRING," +" f ROW<times STRING, temp STRING>," +" PRIMARY KEY (id) NOT ENFORCED" +") WITH (" +" 'connector' = 'hbase-2.2'," +" 'table-name' = 'regional:binlog'," +" 'zookeeper.quorum' = 'node2:2181'" +")");// 將CDC數(shù)據(jù)存儲(chǔ)到 Hbase中bsTableEnv.executeSql("INSERT INTO hTable SELECT id, ROW(times, temp) FROM mysql_binlog");

-- ----------------------------
-- Table structure for sersor_temp
-- ----------------------------
DROP TABLE IF EXISTS `sersor_temp`;
CREATE TABLE `sersor_temp`  (
  `id` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
  `temp` decimal(10, 2) NOT NULL,
  `times` varchar(10) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Compact;

-- ----------------------------
-- Records of sersor_temp
-- ----------------------------
INSERT INTO `sersor_temp` VALUES ('sensor_1', 22.20, '1547718527');
INSERT INTO `sersor_temp` VALUES ('sensor_2', 25.20, '1547718214');
INSERT INTO `sersor_temp` VALUES ('sensor_3', 46.40, '1547718520');
INSERT INTO `sersor_temp` VALUES ('sensor_5', 32.62, '1547718325');
 

注意:此處 表中 temp 字段為 decimal 類型,在SQL中使用  DECIMAL 、DOUBLE 類型 存儲(chǔ)到hbase中都會(huì)出現(xiàn)亂碼問(wèn)題,遂 都換成 STRING

感謝各位的閱讀,以上就是“Flink CDC怎么監(jiān)聽(tīng)MySQL表”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Flink CDC怎么監(jiān)聽(tīng)MySQL表這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI