溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

mac環(huán)境canal+mysql+kafka的安裝及使用

發(fā)布時(shí)間:2020-08-05 05:12:00 來(lái)源:ITPUB博客 閱讀:186 作者:yleizzz 欄目:建站服務(wù)器

為了實(shí)現(xiàn) 實(shí)時(shí)同步數(shù)據(jù),在mac環(huán)境搭建了canal,mysql,kafka的一套流程

使用canal加mysql加kafka的方式傳遞數(shù)據(jù)

mysql 數(shù)據(jù)源頭

canal模仿slave沖mysql取數(shù)據(jù)。。是一個(gè)管道

kafka 將canal獲取的數(shù)據(jù)放入kafka 。然后消費(fèi)(程序獲取kafka的隊(duì)列。消費(fèi)數(shù)據(jù))

  1. mysql安裝

    這個(gè)就不詳述了。。不要安裝最新版的mysql。本人親測(cè)8.0版本和canal不一定能很好的兼容

    所以。。安裝了mysql5.7

    安裝命令(這樣安裝 安裝的是8.0版本)

    brew install mysql

   2.java安裝

  不管是canal還是kafka都是需要java環(huán)境的。。

   也不能用最新版的java,推薦使用java8也就是jdk8

   最好是從官網(wǎng)oracle下載

安裝方法:

https://blog.csdn.net/oumuv/article/details/84064169

3.canal安裝

在裝calal之前 確保mysql 及 java8 環(huán)境安裝好了

下載地址: https://github.com/alibaba/canal/releases

下載 canal.deployer-1.1.4.tar.gz

在家目錄下面新建一個(gè)canal目錄.直接解壓就行

官方文件已經(jīng)編譯好了,不需再編譯

需要添加賬戶(canal@%及canal@local)

在canal/bin目錄下有幾個(gè)腳本文件,startup.sh 啟動(dòng)服務(wù)用的,stop.sh 停止服務(wù)用的

在canal/logs目錄下放的是日志文件。

在canal/conf目錄下放的是配置文件。

實(shí)例配置文件

canal/conf/example/instance.properties

實(shí)例的配置文件。。決定了 你連接哪個(gè)實(shí)例的數(shù)據(jù)庫(kù) 可以精確到表

instance.properties中比較重要的參數(shù)

#實(shí)例

canal.instance.master.address = 127.0.0.1:3306

#db

canal.instance.dbUsername = canal

canal.instance.dbPassword = canal

canal.instance.defaultDatabaseName =test

canal.instance.connectionCharset = UTF-8

#table

# table regex

canal.instance.filter.regex = test.ttt

#mq topic

canal.mq.topic=test12345

上面這樣配置 就代表了。。通過(guò)canal連接127.0.0.1中的test庫(kù)的ttt表,放到一個(gè)叫做test12345的topic里面

驗(yàn)證canal是否和mysql連接 只需要在mysql的進(jìn)程里面查看是否有一個(gè)復(fù)制連接(因?yàn)閏anal就是模仿了一個(gè)slave)

mac環(huán)境canal+mysql+kafka的安裝及使用

還有一個(gè)全局的文件

表示了需要連接的kafka zookeeper等

canal/conf/canal.properties

列舉canal連接kafka的重要參數(shù)

canal.id= 1#如果有多個(gè)canal 這個(gè)值和集群中的canal不能沖突

canal.ip=172.17.61.113#canal的ip

canal.port= 11111

canal.zkServers=172.17.61.113:2181#zookeeper的ip:port

canal.serverMode = kafka

canal.destinations = example

#mq

canal.mq.servers = 172.17.61.113:9092#kafka的ip:port

上面有些關(guān)于kafka的參數(shù) 是 需要kafka安裝好之后才能陪得(因?yàn)橐呀?jīng)裝好了 所以先列舉出來(lái))

4.kafka安裝

kafka安裝時(shí)需要zookeeper的。。

但是一般zookeeper會(huì)集成在kafka里面 所以不需要特別單獨(dú)安裝(也可以單獨(dú)安裝)

mac環(huán)境安裝命令如下

brew install kafka

一直等安裝完成就行

運(yùn)行kafka是需要依賴于zookeeper的,所以安裝kafka的時(shí)候也會(huì)包含zookeeper。

kafka的安裝目錄:/usr/local/Cellar/kafka/2.0.0/bin

kafka的配置文件目錄:/usr/local/Cellar/kafka/2.0.0/bin

kafka服務(wù)的配置文件:/usr/local/etc/kafka/server.properties

zookeeper配置文件: /usr/local/etc/kafka/zookeeper.properties 

# server.properties中的重要配置 需要修改的參數(shù)
  1. listeners=PLAINTEXT://172.17.61.113:9092

  2. advertised.listeners=PLAINTEXT://172.17.61.113:9092

zookeeper 配置文件我在安裝的時(shí)候 沒(méi)有做任何修改

然后 通過(guò)上面配置canal.properties 就連接上kafka

kafka的基本命令(可以通過(guò)find / -name zookeeper-server-start 查找具體的位置)

首先,啟動(dòng)zookeeper:

zookeeper-server-start /usr/ local /etc/kafka/zookeeper.properties

然后,啟動(dòng)kafka

kafka- server -start /usr/local/etc/kafka/ server .properties

創(chuàng)建一個(gè)“使用單個(gè)分區(qū)”、“只有一個(gè)副本”、名為“test”的主題的topic


kafka-topics 
--create 
--zookeeper 
localhost
:2181 
--replication-factor 1 
--partitions 1 
--topic 
test
(創(chuàng)建topic,這個(gè)topic。我們?cè)赾anal中已經(jīng)有了配置)
使用如下命令創(chuàng)建topic
kafka-topics --create --zookeeper localhost:2181/kafka --replication-factor 1
--partitions 1 --topic    topic1 查看創(chuàng)建的topic,運(yùn)行l(wèi)ist topic命令:
kafka-topics --list --zookeeper 172.17.61.113:2181 生產(chǎn)消息
kafka-console-producer --broker-list 172.17.61.113:9092 --topic test
(消息數(shù)據(jù)由canal自動(dòng)發(fā)送。所以這里的命令我們只有明白就行)

消費(fèi)消息(這個(gè)地方就可以最后來(lái)查看mysql+canal+kafka是否已經(jīng)聯(lián)動(dòng)起來(lái)了)
kafka-console-consumer --bootstrap-server 172.17.61.113:9092 --topic test12345 --from-beginning

我在mysql里面的test庫(kù)的ttt表里加了一條記錄
然后反應(yīng)在kafka就是如下結(jié)果

{"data":[{"id":"13","var":"ded"}],"database":"test","es":1575448571000,"id":8,"isDdl":false,"mysqlType":{"id":"int(11)","var":"varchar(5)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"var":12},"table":"ttt","ts":1575448571758,"type":"INSERT"}

這就表示整個(gè)mysql+canal+kafka已經(jīng)成功了

接下來(lái)只要等著程序那邊去消費(fèi)這個(gè)隊(duì)列信息就好了

##############

有可能的報(bào)錯(cuò)

服務(wù)端:com.alibaba.otter.canal.parse.exception.CanalParseException: can't find start position for example
是由于你改了配置文件,導(dǎo)致meta.dat 中保存的位點(diǎn)信息和數(shù)據(jù)庫(kù)的位點(diǎn)信息不一致;導(dǎo)致canal抓取不到數(shù)據(jù)庫(kù)的動(dòng)作;
解決方法:刪除meta.dat刪除,再重啟canal,問(wèn)題解決;

詳見(jiàn)

https://www.cnblogs.com/shaozhiqi/p/11534658.html

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI