您好,登錄后才能下訂單哦!
這篇文章主要介紹了基于Docker與Canal怎么實(shí)現(xiàn)MySQL實(shí)時(shí)增量數(shù)據(jù)傳輸功能的相關(guān)知識(shí),內(nèi)容詳細(xì)易懂,操作簡單快捷,具有一定借鑒價(jià)值,相信大家閱讀完這篇基于Docker與Canal怎么實(shí)現(xiàn)MySQL實(shí)時(shí)增量數(shù)據(jù)傳輸功能文章都會(huì)有所收獲,下面我們一起來看看吧。
canal的介紹
canal的歷史由來
在早期的時(shí)候,阿里巴巴公司因?yàn)楹贾莺兔绹鴥蓚€(gè)地方的機(jī)房都部署了數(shù)據(jù)庫實(shí)例,但因?yàn)榭鐧C(jī)房同步數(shù)據(jù)的業(yè)務(wù)需求 ,便孕育而生出了canal,主要是基于trigger(觸發(fā)器)的方式獲取增量變更。從2010年開始,阿里巴巴公司開始逐步嘗試數(shù)據(jù)庫日志解析,獲取增量變更的數(shù)據(jù)進(jìn)行同步,由此衍生出了增量訂閱和消費(fèi)業(yè)務(wù)。
當(dāng)前的canal支持的數(shù)據(jù)源端mysql版本包括:5.1.x 、5.5.x 、5.6.x、5.7.x、8.0.x。
canal的應(yīng)用場景
目前普遍基于日志增量訂閱和消費(fèi)的業(yè)務(wù),主要包括:
基于數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)
數(shù)據(jù)庫鏡像 數(shù)據(jù)庫實(shí)時(shí)備份
索引構(gòu)建和實(shí)時(shí)維護(hù)(拆分異構(gòu)索引、倒排索引等)
業(yè)務(wù)cache刷新
帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理
canal的工作原理
在介紹canal的原理之前,我們先來了解下mysql主從復(fù)制的原理。
mysql主從復(fù)制原理
mysql master將數(shù)據(jù)變更的操作寫入二進(jìn)制日志binary log中, 其中記錄的內(nèi)容叫做二進(jìn)制日志事件binary log events,可以通過show binlog events命令進(jìn)行查看
mysql slave會(huì)將master的binary log中的binary log events拷貝到它的中繼日志relay log
mysql slave重讀并執(zhí)行relay log中的事件,將數(shù)據(jù)變更映射到它自己的數(shù)據(jù)庫表中
了解了mysql的工作原理,我們可以大致猜想到canal應(yīng)該也是采用類似的邏輯去實(shí)現(xiàn)增量數(shù)據(jù)訂閱的功能,那么接下來我們看看實(shí)際上canal的工作原理是怎樣的?
canal工作原理
canal模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議
mysql master收到dump請求,開始推送binary log給slave(也就是canal)
canal解析binary log對象(數(shù)據(jù)為byte流)
基于這樣的原理與方式,便可以完成數(shù)據(jù)庫增量日志的獲取解析,提供增量數(shù)據(jù)訂閱和消費(fèi),實(shí)現(xiàn)mysql實(shí)時(shí)增量數(shù)據(jù)傳輸?shù)墓δ堋?/p>
既然canal是這樣的一個(gè)框架,又是純java語言編寫而成,那么我們接下來就開始學(xué)習(xí)怎么使用它并把它用到我們的實(shí)際工作中。
canal的docker環(huán)境準(zhǔn)備
因?yàn)槟壳叭萜骰夹g(shù)的火熱,本文通過使用docker來快速搭建開發(fā)環(huán)境,而傳統(tǒng)方式的環(huán)境搭建,在我們學(xué)會(huì)了docker容器環(huán)境搭建后,也能自行依葫蘆畫瓢搭建成功。由于本篇主要講解canal,所以關(guān)于docker的內(nèi)容不會(huì)涉及太多,主要會(huì)介紹docker的基本概念和命令使用。 如果你想和更多容器技術(shù)專家交流,可以加我微信liyingjiese,備注『加群』。群里每周都有全球各大公司的最佳實(shí)踐以及行業(yè)最新動(dòng)態(tài) 。
什么是docker
相信絕大多數(shù)人都使用過虛擬機(jī)vmware,在使用vmware進(jìn)行環(huán)境搭建的時(shí)候,只需提供了一個(gè)普通的系統(tǒng)鏡像并成功安裝,剩下的軟件環(huán)境與應(yīng)用配置還是如我們在本機(jī)操作一樣在虛擬機(jī)里也操作一遍,而且vmware占用宿主機(jī)的資源較多,容易造成宿主機(jī)卡頓,而且系統(tǒng)鏡像本身也占用過多空間。
為了便于大家快速理解docker,便與vmware做對比來做介紹,docker提供了一個(gè)開始,打包,運(yùn)行app的平臺(tái),把a(bǔ)pp(應(yīng)用)和底層infrastructure(基礎(chǔ)設(shè)施)隔離開來。docker中最主要的兩個(gè)概念就是鏡像(類似vmware的系統(tǒng)鏡像)與容器(類似vmware里安裝的系統(tǒng))。
什么是image(鏡像)
文件和meta data的集合(root filesystem)
分層的,并且每一層都可以添加改變刪除文件,成為一個(gè)新的image
不同的image可以共享相同的layer
image本身是read-only的
什么是container(容器)
通過image創(chuàng)建(copy)
在image layer之上建立一個(gè)container layer(可讀寫)
類比面向?qū)ο螅侯惡蛯?shí)例
image負(fù)責(zé)app的存儲(chǔ)和分發(fā),container負(fù)責(zé)運(yùn)行app
docker的網(wǎng)絡(luò)介紹
docker的網(wǎng)絡(luò)類型有三種:
bridge:橋接網(wǎng)絡(luò)。默認(rèn)情況下啟動(dòng)的docker容器,都是使用bridge,docker安裝時(shí)創(chuàng)建的橋接網(wǎng)絡(luò),每次docker容器重啟時(shí),會(huì)按照順序獲取對應(yīng)的ip地址,這個(gè)就導(dǎo)致重啟下,docker的ip地址就變了。
none:無指定網(wǎng)絡(luò)。使用 --network=none,docker容器就不會(huì)分配局域網(wǎng)的ip。
host:主機(jī)網(wǎng)絡(luò)。使用--network=host,此時(shí),docker容器的網(wǎng)絡(luò)會(huì)附屬在主機(jī)上,兩者是互通的。例如,在容器中運(yùn)行一個(gè)web服務(wù),監(jiān)聽8080端口,則主機(jī)的8080端口就會(huì)自動(dòng)映射到容器中。
創(chuàng)建自定義網(wǎng)絡(luò):(設(shè)置固定ip)
docker network create --subnet=172.18.0.0/16 mynetwork
查看存在的網(wǎng)絡(luò)類型docker network ls:
搭建canal環(huán)境
附上docker的下載安裝地址==> docker download 。
下載canal鏡像docker pull canal/canal-server
:
下載mysql鏡像docker pull mysql
,下載過的則如下圖:
查看已經(jīng)下載好的鏡像docker images:
接下來通過鏡像生成mysql容器與canal-server容器:
##生成mysql容器 docker run -d --name mysql --net mynetwork --ip 172.18.0.6 -p 3306:3306 -e mysql_root_password=root mysql ##生成canal-server容器 docker run -d --name canal-server --net mynetwork --ip 172.18.0.4 -p 11111:11111 canal/canal-server ## 命令介紹 --net mynetwork #使用自定義網(wǎng)絡(luò) --ip #指定分配ip
查看docker中運(yùn)行的容器docker ps:
mysql的配置修改
以上只是初步準(zhǔn)備好了基礎(chǔ)的環(huán)境,但是怎么讓canal偽裝成salve并正確獲取mysql中的binary log呢?
對于自建mysql,需要先開啟binlog寫入功能,配置binlog-format
為row模式,通過修改mysql配置文件來開啟bin_log,使用find / -name my.cnf
查找my.cnf,修改文件內(nèi)容如下:
[mysqld] log-bin=mysql-bin # 開啟binlog binlog-format=row # 選擇row模式 server_id=1 # 配置mysql replaction需要定義,不要和canal的slaveid重復(fù)
進(jìn)入mysql容器docker exec -it mysql bash。
創(chuàng)建鏈接mysql的賬號canal并授予作為mysql slave的權(quán)限,如果已有賬戶可直接grant:
mysql -uroot -proot # 創(chuàng)建賬號 create user canal identified by 'canal'; # 授予權(quán)限 grant select, replication slave, replication client on *.* to 'canal'@'%'; -- grant all privileges on *.* to 'canal'@'%' ; # 刷新并應(yīng)用 flush privileges;
數(shù)據(jù)庫重啟后,簡單測試 my.cnf 配置是否生效:
show variables like 'log_bin'; show variables like 'log_bin'; show master status;
canal-server的配置修改
進(jìn)入canal-server容器docker exec -it canal-server bash
。
編輯canal-server的配置vi canal-server/conf/example/instance.properties
:
更多配置請參考==>canal配置說明 。
重啟canal-server容器docker restart canal-server
進(jìn)入容器查看啟動(dòng)日志:
docker exec -it canal-server bash tail -100f canal-server/logs/example/example.log
至此,我們的環(huán)境工作準(zhǔn)備完成!
拉取數(shù)據(jù)并同步保存到elasticsearch
本文的elasticsearch也是基于docker環(huán)境搭建,所以讀者可執(zhí)行如下命令:
# 下載對鏡像 docker pull elasticsearch:7.1.1 docker pull mobz/elasticsearch-head:5-alpine # 創(chuàng)建容器并運(yùn)行 docker run -d --name elasticsearch --net mynetwork --ip 172.18.0.2 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.1.1 docker run -d --name elasticsearch-head --net mynetwork --ip 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpine
環(huán)境已經(jīng)準(zhǔn)備好了,現(xiàn)在就要開始我們的編碼實(shí)戰(zhàn)部分了,怎么通過應(yīng)用程序去獲取canal解析后的binlog數(shù)據(jù)。首先我們基于spring boot搭建一個(gè)canal demo應(yīng)用。結(jié)構(gòu)如下圖所示:
student.java
package com.example.canal.study.pojo; import lombok.data; import java.io.serializable; // @data 用戶生產(chǎn)getter、setter方法 @data public class student implements serializable { private string id; private string name; private int age; private string sex; private string city; }
canalconfig.java
package com.example.canal.study.common; import com.alibaba.otter.canal.client.canalconnector; import com.alibaba.otter.canal.client.canalconnectors; import org.apache.http.httphost; import org.elasticsearch.client.restclient; import org.elasticsearch.client.resthighlevelclient; import org.springframework.beans.factory.annotation.value; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import java.net.inetsocketaddress; /** * @author haha */ @configuration public class canalconfig { // @value 獲取 application.properties配置中端內(nèi)容 @value("${canal.server.ip}") private string canalip; @value("${canal.server.port}") private integer canalport; @value("${canal.destination}") private string destination; @value("${elasticsearch.server.ip}") private string elasticsearchip; @value("${elasticsearch.server.port}") private integer elasticsearchport; @value("${zookeeper.server.ip}") private string zkserverip; // 獲取簡單canal-server連接 @bean public canalconnector canalsimpleconnector() { canalconnector canalconnector = canalconnectors.newsingleconnector(new inetsocketaddress(canalip, canalport), destination, "", ""); return canalconnector; } // 通過連接zookeeper獲取canal-server連接 @bean public canalconnector canalhaconnector() { canalconnector canalconnector = canalconnectors.newclusterconnector(zkserverip, destination, "", ""); return canalconnector; } // elasticsearch 7.x客戶端 @bean public resthighlevelclient resthighlevelclient() { resthighlevelclient client = new resthighlevelclient( restclient.builder(new httphost(elasticsearchip, elasticsearchport)) ); return client; } }
canaldataparser.java
由于這個(gè)類的代碼較多,文中則摘出其中比較重要的部分,其它部分代碼可從github上獲?。?/p>
public static class twotuple<a, b> { public final a eventtype; public final b columnmap; public twotuple(a a, b b) { eventtype = a; columnmap = b; } } public static list<twotuple<eventtype, map>> printentry(list<entry> entrys) { list<twotuple<eventtype, map>> rows = new arraylist<>(); for (entry entry : entrys) { // binlog event的事件事件 long executetime = entry.getheader().getexecutetime(); // 當(dāng)前應(yīng)用獲取到該binlog鎖延遲的時(shí)間 long delaytime = system.currenttimemillis() - executetime; date date = new date(entry.getheader().getexecutetime()); simpledateformat simpledateformat = new simpledateformat("yyyy-mm-dd hh:mm:ss"); // 當(dāng)前的entry(binary log event)的條目類型屬于事務(wù) if (entry.getentrytype() == entrytype.transactionbegin || entry.getentrytype() == entrytype.transactionend) { if (entry.getentrytype() == entrytype.transactionbegin) { transactionbegin begin = null; try { begin = transactionbegin.parsefrom(entry.getstorevalue()); } catch (invalidprotocolbufferexception e) { throw new runtimeexception("parse event has an error , data:" + entry.tostring(), e); } // 打印事務(wù)頭信息,執(zhí)行的線程id,事務(wù)耗時(shí) logger.info(transaction_format, new object[]{entry.getheader().getlogfilename(), string.valueof(entry.getheader().getlogfileoffset()), string.valueof(entry.getheader().getexecutetime()), simpledateformat.format(date), entry.getheader().getgtid(), string.valueof(delaytime)}); logger.info(" begin ----> thread id: {}", begin.getthreadid()); printxainfo(begin.getpropslist()); } else if (entry.getentrytype() == entrytype.transactionend) { transactionend end = null; try { end = transactionend.parsefrom(entry.getstorevalue()); } catch (invalidprotocolbufferexception e) { throw new runtimeexception("parse event has an error , data:" + entry.tostring(), e); } // 打印事務(wù)提交信息,事務(wù)id logger.info("----------------\n"); logger.info(" end ----> transaction id: {}", end.gettransactionid()); printxainfo(end.getpropslist()); logger.info(transaction_format, new object[]{entry.getheader().getlogfilename(), string.valueof(entry.getheader().getlogfileoffset()), string.valueof(entry.getheader().getexecutetime()), simpledateformat.format(date), entry.getheader().getgtid(), string.valueof(delaytime)}); } continue; } // 當(dāng)前entry(binary log event)的條目類型屬于原始數(shù)據(jù) if (entry.getentrytype() == entrytype.rowdata) { rowchange rowchage = null; try { // 獲取儲(chǔ)存的內(nèi)容 rowchage = rowchange.parsefrom(entry.getstorevalue()); } catch (exception e) { throw new runtimeexception("parse event has an error , data:" + entry.tostring(), e); } // 獲取當(dāng)前內(nèi)容的事件類型 eventtype eventtype = rowchage.geteventtype(); logger.info(row_format, new object[]{entry.getheader().getlogfilename(), string.valueof(entry.getheader().getlogfileoffset()), entry.getheader().getschemaname(), entry.getheader().gettablename(), eventtype, string.valueof(entry.getheader().getexecutetime()), simpledateformat.format(date), entry.getheader().getgtid(), string.valueof(delaytime)}); // 事件類型是query或數(shù)據(jù)定義語言ddl直接打印sql語句,跳出繼續(xù)下一次循環(huán) if (eventtype == eventtype.query || rowchage.getisddl()) { logger.info(" sql ----> " + rowchage.getsql() + sep); continue; } printxainfo(rowchage.getpropslist()); // 循環(huán)當(dāng)前內(nèi)容條目的具體數(shù)據(jù) for (rowdata rowdata : rowchage.getrowdataslist()) { list<canalentry.column> columns; // 事件類型是delete返回刪除前的列內(nèi)容,否則返回改變后列的內(nèi)容 if (eventtype == canalentry.eventtype.delete) { columns = rowdata.getbeforecolumnslist(); } else { columns = rowdata.getaftercolumnslist(); } hashmap<string, object> map = new hashmap<>(16); // 循環(huán)把列的name與value放入map中 for (column column: columns){ map.put(column.getname(), column.getvalue()); } rows.add(new twotuple<>(eventtype, map)); } } } return rows; }
elasticutils.java
package com.example.canal.study.common; import com.alibaba.fastjson.json; import com.example.canal.study.pojo.student; import lombok.extern.slf4j.slf4j; import org.elasticsearch.client.resthighlevelclient; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.component; import org.elasticsearch.action.docwriterequest; import org.elasticsearch.action.delete.deleterequest; import org.elasticsearch.action.delete.deleteresponse; import org.elasticsearch.action.get.getrequest; import org.elasticsearch.action.get.getresponse; import org.elasticsearch.action.index.indexrequest; import org.elasticsearch.action.index.indexresponse; import org.elasticsearch.action.update.updaterequest; import org.elasticsearch.action.update.updateresponse; import org.elasticsearch.client.requestoptions; import org.elasticsearch.common.xcontent.xcontenttype; import java.io.ioexception; import java.util.map; /** * @author haha */ @slf4j @component public class elasticutils { @autowired private resthighlevelclient resthighlevelclient; /** * 新增 * @param student * @param index 索引 */ public void savees(student student, string index) { indexrequest indexrequest = new indexrequest(index) .id(student.getid()) .source(json.tojsonstring(student), xcontenttype.json) .optype(docwriterequest.optype.create); try { indexresponse response = resthighlevelclient.index(indexrequest, requestoptions.default); log.info("保存數(shù)據(jù)至elasticsearch成功:{}", response.getid()); } catch (ioexception e) { log.error("保存數(shù)據(jù)至elasticsearch失敗: {}", e); } } /** * 查看 * @param index 索引 * @param id _id * @throws ioexception */ public void getes(string index, string id) throws ioexception { getrequest getrequest = new getrequest(index, id); getresponse response = resthighlevelclient.get(getrequest, requestoptions.default); map<string, object> fields = response.getsource(); for (map.entry<string, object> entry : fields.entryset()) { system.out.println(entry.getkey() + ":" + entry.getvalue()); } } /** * 更新 * @param student * @param index 索引 * @throws ioexception */ public void updatees(student student, string index) throws ioexception { updaterequest updaterequest = new updaterequest(index, student.getid()); updaterequest.upsert(json.tojsonstring(student), xcontenttype.json); updateresponse response = resthighlevelclient.update(updaterequest, requestoptions.default); log.info("更新數(shù)據(jù)至elasticsearch成功:{}", response.getid()); } /** * 根據(jù)id刪除數(shù)據(jù) * @param index 索引 * @param id _id * @throws ioexception */ public void deletees(string index, string id) throws ioexception { deleterequest deleterequest = new deleterequest(index, id); deleteresponse response = resthighlevelclient.delete(deleterequest, requestoptions.default); log.info("刪除數(shù)據(jù)至elasticsearch成功:{}", response.getid()); } }
binlogelasticsearch.java
package com.example.canal.study.action; import com.alibaba.otter.canal.client.canalconnector; import com.alibaba.otter.canal.protocol.canalentry; import com.alibaba.otter.canal.protocol.message; import com.example.canal.study.common.canaldataparser; import com.example.canal.study.common.elasticutils; import com.example.canal.study.pojo.student; import lombok.extern.slf4j.slf4j; import org.springframework.beans.factory.annotation.autowired; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.stereotype.component; import java.io.ioexception; import java.util.list; import java.util.map; /** * @author haha */ @slf4j @component public class binlogelasticsearch { @autowired private canalconnector canalsimpleconnector; @autowired private elasticutils elasticutils; //@qualifier("canalhaconnector")使用名為canalhaconnector的bean @autowired @qualifier("canalhaconnector") private canalconnector canalhaconnector; public void binlogtoelasticsearch() throws ioexception { opencanalconnector(canalhaconnector); // 輪詢拉取數(shù)據(jù) integer batchsize = 5 * 1024; while (true) { message message = canalhaconnector.getwithoutack(batchsize); // message message = canalsimpleconnector.getwithoutack(batchsize); long id = message.getid(); int size = message.getentries().size(); log.info("當(dāng)前監(jiān)控到binlog消息數(shù)量{}", size); if (id == -1 || size == 0) { try { // 等待2秒 thread.sleep(2000); } catch (interruptedexception e) { e.printstacktrace(); } } else { //1. 解析message對象 list<canalentry.entry> entries = message.getentries(); list<canaldataparser.twotuple<canalentry.eventtype, map>> rows = canaldataparser.printentry(entries); for (canaldataparser.twotuple<canalentry.eventtype, map> tuple : rows) { if(tuple.eventtype == canalentry.eventtype.insert) { student student = createstudent(tuple); // 2。將解析出的對象同步到elasticsearch中 elasticutils.savees(student, "student_index"); // 3.消息確認(rèn)已處理 // canalsimpleconnector.ack(id); canalhaconnector.ack(id); } if(tuple.eventtype == canalentry.eventtype.update){ student student = createstudent(tuple); elasticutils.updatees(student, "student_index"); // 3.消息確認(rèn)已處理 // canalsimpleconnector.ack(id); canalhaconnector.ack(id); } if(tuple.eventtype == canalentry.eventtype.delete){ elasticutils.deletees("student_index", tuple.columnmap.get("id").tostring()); canalhaconnector.ack(id); } } } } } /** * 封裝數(shù)據(jù)至student * @param tuple * @return */ private student createstudent(canaldataparser.twotuple<canalentry.eventtype, map> tuple){ student student = new student(); student.setid(tuple.columnmap.get("id").tostring()); student.setage(integer.parseint(tuple.columnmap.get("age").tostring())); student.setname(tuple.columnmap.get("name").tostring()); student.setsex(tuple.columnmap.get("sex").tostring()); student.setcity(tuple.columnmap.get("city").tostring()); return student; } /** * 打開canal連接 * * @param canalconnector */ private void opencanalconnector(canalconnector canalconnector) { //連接canalserver canalconnector.connect(); // 訂閱destination canalconnector.subscribe(); } /** * 關(guān)閉canal連接 * * @param canalconnector */ private void closecanalconnector(canalconnector canalconnector) { //關(guān)閉連接canalserver canalconnector.disconnect(); // 注銷訂閱destination canalconnector.unsubscribe(); } }
canaldemoapplication.java(spring boot啟動(dòng)類)
package com.example.canal.study; import com.example.canal.study.action.binlogelasticsearch; import org.springframework.beans.factory.annotation.autowired; import org.springframework.boot.applicationarguments; import org.springframework.boot.applicationrunner; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; /** * @author haha */ @springbootapplication public class canaldemoapplication implements applicationrunner { @autowired private binlogelasticsearch binlogelasticsearch; public static void main(string[] args) { springapplication.run(canaldemoapplication.class, args); } // 程序啟動(dòng)則執(zhí)行run方法 @override public void run(applicationarguments args) throws exception { binlogelasticsearch.binlogtoelasticsearch(); } }
application.properties
server.port=8081 spring.application.name = canal-demo canal.server.ip = 192.168.124.5 canal.server.port = 11111 canal.destination = example zookeeper.server.ip = 192.168.124.5:2181 zookeeper.sasl.client = false elasticsearch.server.ip = 192.168.124.5 elasticsearch.server.port = 9200
canal集群高可用的搭建
通過上面的學(xué)習(xí),我們知道了單機(jī)直連方式的canala應(yīng)用。在當(dāng)今互聯(lián)網(wǎng)時(shí)代,單實(shí)例模式逐漸被集群高可用模式取代,那么canala的多實(shí)例集群方式如何搭建呢!
基于zookeeper獲取canal實(shí)例
準(zhǔn)備zookeeper的docker鏡像與容器:
docker pull zookeeper docker run -d --name zookeeper --net mynetwork --ip 172.18.0.3 -p 2181:2181 zookeeper docker run -d --name canal-server2 --net mynetwork --ip 172.18.0.8 -p 11113:11113 canal/canal-server
1、機(jī)器準(zhǔn)備:
運(yùn)行canal的容器ip: 172.18.0.4 , 172.18.0.8
zookeeper容器ip:172.18.0.3:2181
mysql容器ip:172.18.0.6:3306
2、按照部署和配置,在單臺(tái)機(jī)器上各自完成配置,演示時(shí)instance name為example。
3、修改canal.properties,加上zookeeper配置并修改canal端口:
canal.port=11113 canal.zkservers=172.18.0.3:2181 canal.instance.global.spring.xml = classpath:spring/default-instance.xml
4、創(chuàng)建example目錄,并修改instance.properties:
canal.instance.mysql.slaveid = 1235 #之前的canal slaveid是1234,保證slaveid不重復(fù)即可 canal.instance.master.address = 172.18.0.6:3306
注意: 兩臺(tái)機(jī)器上的instance目錄的名字需要保證完全一致,ha模式是依賴于instance name進(jìn)行管理,同時(shí)必須都選擇default-instance.xml
配置。
啟動(dòng)兩個(gè)不同容器的canal,啟動(dòng)后,可以通過tail -100f logs/example/example.log
查看啟動(dòng)日志,只會(huì)看到一臺(tái)機(jī)器上出現(xiàn)了啟動(dòng)成功的日志。
比如我這里啟動(dòng)成功的是 172.18.0.4:
查看一下zookeeper中的節(jié)點(diǎn)信息,也可以知道當(dāng)前工作的節(jié)點(diǎn)為172.18.0.4:11111:
[zk: localhost:2181(connected) 15] get /otter/canal/destinations/example/running {"active":true,"address":"172.18.0.4:11111","cid":1}
客戶端鏈接, 消費(fèi)數(shù)據(jù)
可以通過指定zookeeper地址和canal的instance name,canal client會(huì)自動(dòng)從zookeeper中的running節(jié)點(diǎn)獲取當(dāng)前服務(wù)的工作節(jié)點(diǎn),然后與其建立鏈接:
[zk: localhost:2181(connected) 0] get /otter/canal/destinations/example/running {"active":true,"address":"172.18.0.4:11111","cid":1}
對應(yīng)的客戶端編碼可以使用如下形式,上文中的canalconfig.java中的canalhaconnector就是一個(gè)ha連接:
canalconnector connector = canalconnectors.newclusterconnector("172.18.0.3:2181", "example", "", "");
鏈接成功后,canal server會(huì)記錄當(dāng)前正在工作的canal client信息,比如客戶端ip,鏈接的端口信息等(聰明的你,應(yīng)該也可以發(fā)現(xiàn),canal client也可以支持ha功能):
[zk: localhost:2181(connected) 4] get /otter/canal/destinations/example/1001/running {"active":true,"address":"192.168.124.5:59887","clientid":1001}
數(shù)據(jù)消費(fèi)成功后,canal server會(huì)在zookeeper中記錄下當(dāng)前最后一次消費(fèi)成功的binlog位點(diǎn)(下次你重啟client時(shí),會(huì)從這最后一個(gè)位點(diǎn)繼續(xù)進(jìn)行消費(fèi)):
[zk: localhost:2181(connected) 5] get /otter/canal/destinations/example/1001/cursor {"@type":"com.alibaba.otter.canal.protocol.position.logposition","identity":{"slaveid":-1,"sourceaddress":{"address":"mysql.mynetwork","port":3306}},"postion":{"included":false,"journalname":"binlog.000004","position":2169,"timestamp":1562672817000}}
停止正在工作的172.18.0.4的canal server:
docker exec -it canal-server bash cd canal-server/bin sh stop.sh
這時(shí)172.18.0.8會(huì)立馬啟動(dòng)example instance,提供新的數(shù)據(jù)服務(wù):
[zk: localhost:2181(connected) 19] get /otter/canal/destinations/example/running {"active":true,"address":"172.18.0.8:11111","cid":1}
與此同時(shí),客戶端也會(huì)隨著canal server的切換,通過獲取zookeeper中的最新地址,與新的canal server建立鏈接,繼續(xù)消費(fèi)數(shù)據(jù),整個(gè)過程自動(dòng)完成。
異常與總結(jié)
elasticsearch-head
無法訪問elasticsearch
es與es-head是兩個(gè)獨(dú)立的進(jìn)程,當(dāng)es-head訪問es服務(wù)時(shí),會(huì)存在一個(gè)跨域問題。所以我們需要修改es的配置文件,增加一些配置項(xiàng)來解決這個(gè)問題,如下:
[root@localhost /usr/local/elasticsearch-head-master]# cd ../elasticsearch-5.5.2/config/ [root@localhost /usr/local/elasticsearch-5.5.2/config]# vim elasticsearch.yml # 文件末尾加上如下配置 http.cors.enabled: true http.cors.allow-origin: "*"
修改完配置文件后需重啟es服務(wù)。
elasticsearch-head查詢報(bào)406 not acceptable
解決方法:
1、進(jìn)入head安裝目錄;
2、cd _site/
3、編輯vendor.js 共有兩處
#6886行 contenttype: "application/x-www-form-urlencoded 改成 contenttype: "application/json;charset=utf-8" #7574行 var inspectdata = s.contenttype === "application/x-www-form-urlencoded" && 改成 var inspectdata = s.contenttype === "application/json;charset=utf-8" &&
使用elasticsearch-rest-high-level-client
報(bào)org.elasticsearch.action.index.indexrequest.ifseqno
#pom中除了加入依賴 <dependency> <groupid>org.elasticsearch.client</groupid> <artifactid>elasticsearch-rest-high-level-client</artifactid> <version>7.1.1</version> </dependency> #還需加入 <dependency> <groupid>org.elasticsearch</groupid> <artifactid>elasticsearch</artifactid> <version>7.1.1</version> </dependency>
相關(guān)參考: 。
為什么elasticsearch要在7.x版本不能使用type?
參考: 為什么elasticsearch要在7.x版本去掉type?
使用spring-data-elasticsearch.jar報(bào)org.elasticsearch.client.transport.nonodeavailableexception
由于本文使用的是elasticsearch7.x以上的版本,目前spring-data-elasticsearch底層采用es官方transportclient,而es官方計(jì)劃放棄transportclient,工具以es官方推薦的resthighlevelclient進(jìn)行調(diào)用請求。 可參考 resthighlevelclient api 。
設(shè)置docker容器開啟啟動(dòng)
如果創(chuàng)建時(shí)未指定 --restart=always ,可通過update 命令 docker update --restart=always [containerid]
docker for mac network host模式不生效
host模式是為了性能,但是這卻對docker的隔離性造成了破壞,導(dǎo)致安全性降低。 在性能場景下,可以用--netwokr host開啟host模式,但需要注意的是,如果你用windows或mac本地啟動(dòng)容器的話,會(huì)遇到host模式失效的問題。原因是host模式只支持linux宿主機(jī)。
參見官方文檔: 。
客戶端連接zookeeper報(bào)authenticate using sasl(unknow error)
zookeeper.jar與dokcer中的zookeeper版本不一致
zookeeper.jar使用了3.4.6之前的版本
出現(xiàn)這個(gè)錯(cuò)的意思是zookeeper作為外部應(yīng)用需要向系統(tǒng)申請資源,申請資源的時(shí)候需要通過認(rèn)證,而sasl是一種認(rèn)證方式,我們想辦法來繞過sasl認(rèn)證。避免等待,來提高效率。
在項(xiàng)目代碼中加入system.setproperty("zookeeper.sasl.client", "false");,
如果是spring boot項(xiàng)目可以在application.properties
中加入zookeeper.sasl.client=false
。
參考: increased cpu usage by unnecessary sasl checks 。
如果更換canal.client.jar中依賴的zookeeper.jar的版本
把canal的官方源碼下載到本機(jī)git clone ,然后修改client模塊下pom.xml文件中關(guān)于zookeeper的內(nèi)容,然后重新mvn install:
把自己項(xiàng)目依賴的包替換為剛剛mvn install
生產(chǎn)的包:
關(guān)于“基于Docker與Canal怎么實(shí)現(xiàn)MySQL實(shí)時(shí)增量數(shù)據(jù)傳輸功能”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對“基于Docker與Canal怎么實(shí)現(xiàn)MySQL實(shí)時(shí)增量數(shù)據(jù)傳輸功能”知識(shí)都有一定的了解,大家如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。