您好,登錄后才能下訂單哦!
這篇文章主要介紹Mysql數(shù)據(jù)庫如何監(jiān)聽binlog的開啟步驟,文中介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們一定要看完!
前言
我們經(jīng)常需要根據(jù)用戶對自己數(shù)據(jù)的一些操作來做一些事情.
比如如果用戶刪除了自己的賬號,我們就給他發(fā)短信罵他,去發(fā)短信求他回來.
類似于這種功能,當(dāng)然可以在業(yè)務(wù)邏輯層實(shí)現(xiàn),在收到用戶的刪除請求之后執(zhí)行這一操作,但是數(shù)據(jù)庫的binlog為我們提供了另外一種操作方法.
要監(jiān)聽binlog,需要兩步,第一步當(dāng)然是你的mysql需要開啟這一個功能,第二個是要寫程序來對日志進(jìn)行讀取.
mysql開啟binlog.
首先mysql的binlog日常是不打開的,因此我們需要:
找到mysql的配置文件my.cnf,這個因操作系統(tǒng)不一樣,位置也不一定一樣,可以自己找一下,
在其中加入以下內(nèi)容:
[mysqld] server_id = 1 log-bin = mysql-bin binlog-format = ROW
之后重啟mysql.
/ ubuntu service mysql restart // mac mysql.server restart
監(jiān)測是否開啟成功
進(jìn)入mysql命令行,執(zhí)行:
show variables like '%log_bin%' ;
如果結(jié)果如下圖,則說明成功了:
查看正在寫入的binlog狀態(tài):
代碼讀取binlog
引入依賴
我們使用開源的一些實(shí)現(xiàn),這里因為一些奇怪的原因,我選用了mysql-binlog-connector-java這個包,(官方github倉庫)[github.com/shyiko/mysq…]具體依賴如下:
<!-- https://mvnrepository.com/artifact/com.github.shyiko/mysql-binlog-connector-java --> <dependency> <groupId>com.github.shyiko</groupId> <artifactId>mysql-binlog-connector-java</artifactId> <version>0.17.0</version> </dependency>
當(dāng)然,對binlog的處理有很多開源實(shí)現(xiàn),阿里的cancl就是一個,也可以使用它.
寫個demo
根據(jù)官方倉庫中readme里面,來簡單的寫個demo.
public static void main(String[] args) { BinaryLogClient client = new BinaryLogClient("hostname", 3306, "username", "passwd"); EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setCompatibilityMode( EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY ); client.setEventDeserializer(eventDeserializer); client.registerEventListener(new BinaryLogClient.EventListener() { @Override public void onEvent(Event event) { // TODO dosomething(); logger.info(event.toString()); } }); client.connect(); }
這個完全是根據(jù)官方教程里面寫的,在onEvent里面可以寫自己的業(yè)務(wù)邏輯,由于我只是測試,所以我在里面將每一個event都打印了出來.
之后我手動登錄到mysql,分別進(jìn)行了增加,修改,刪除操作,監(jiān)聽到的log如下:
00:23:13.331 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=0, eventType=ROTATE, serverId=1, headerLength=19, dataLength=28, nextPosition=0, flags=32}, data=RotateEventData{binlogFilename='mysql-bin.000001', binlogPosition=886}}
00:23:13.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468403000, eventType=FORMAT_DESCRIPTION, serverId=1, headerLength=19, dataLength=100, nextPosition=0, flags=0}, data=FormatDescriptionEventData{binlogVersion=4, serverVersion='5.7.23-0ubuntu0.16.04.1-log', headerLength=19, dataLength=95}}
00:23:23.715 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=951, flags=0}, data=null}
00:23:23.716 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1021, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:23:23.721 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1072, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:23:23.724 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1114, flags=0}, data=WriteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[
[[B@546a03af, 2]
]}}
00:23:23.725 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1145, flags=0}, data=XidEventData{xid=28}}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1210, flags=0}, data=null}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1280, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:23:55.873 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1331, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=31, nextPosition=1381, flags=0}, data=UpdateRowsEventData{tableId=108, includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[
{before=[[B@6833ce2c, 1], after=[[B@725bef66, 3]}
]}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1412, flags=0}, data=XidEventData{xid=41}}
00:24:22.333 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1477, flags=0}, data=null}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1547, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1598, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1640, flags=0}, data=DeleteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[
[[B@1888ff2c, 3]
]}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1671, flags=0}, data=XidEventData{xid=42}}
根據(jù)自己的業(yè)務(wù),封裝一個更好使,更定制的工具類
開始的時候打算貼代碼的,,,但是代碼越寫越多,索性傳在github上了,這里只貼部分的實(shí)現(xiàn).代碼傳送門
實(shí)現(xiàn)思路
支持對單個表的監(jiān)聽,因為我們不想真的對所有數(shù)據(jù)庫中的所有數(shù)據(jù)表進(jìn)行監(jiān)聽.
可以多線程消費(fèi).
把監(jiān)聽到的內(nèi)容轉(zhuǎn)換成我們喜聞樂見的形式(文中的數(shù)據(jù)結(jié)構(gòu)不一定很好,我沒想到更加合適的了).
所以實(shí)現(xiàn)思路大致如下:
封裝個客戶端,對外只提供獲取方法,屏蔽掉初始化的細(xì)節(jié)代碼.
提供注冊監(jiān)聽器(偽)的方法,可以注冊對某個表的監(jiān)聽(重新定義一個監(jiān)聽接口,所有注冊的監(jiān)聽器實(shí)現(xiàn)這個就好).
真正的監(jiān)聽器只有客戶端,他將此數(shù)據(jù)庫實(shí)例上的所有操作,全部監(jiān)聽到并轉(zhuǎn)換成我們想要的格式LogItem放進(jìn)阻塞隊列里面.
啟動多個線程,消費(fèi)阻塞隊列,對某一個LogItem調(diào)用對應(yīng)的數(shù)據(jù)表的監(jiān)聽器,做一些業(yè)務(wù)邏輯.
初始化代碼:
public MysqlBinLogListener(Conf conf) { BinaryLogClient client = new BinaryLogClient(conf.host, conf.port, conf.username, conf.passwd); EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setCompatibilityMode( EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY ); client.setEventDeserializer(eventDeserializer); this.parseClient = client; this.queue = new ArrayBlockingQueue<>(1024); this.conf = conf; listeners = new ConcurrentHashMap<>(); dbTableCols = new ConcurrentHashMap<>(); this.consumer = Executors.newFixedThreadPool(consumerThreads); }
注冊代碼:
public void regListener(String db, String table, BinLogListener listener) throws Exception { String dbTable = getdbTable(db, table); Class.forName("com.mysql.jdbc.Driver"); // 保存當(dāng)前注冊的表的colum信息 Connection connection = DriverManager.getConnection("jdbc:mysql://" + conf.host + ":" + conf.port, conf.username, conf.passwd); Map<String, Colum> cols = getColMap(connection, db, table); dbTableCols.put(dbTable, cols); // 保存當(dāng)前注冊的listener List<BinLogListener> list = listeners.getOrDefault(dbTable, new ArrayList<>()); list.add(listener); listeners.put(dbTable, list); }
在這個步驟中,我們在注冊監(jiān)聽者的同時,獲得了該表的schema信息,并保存到map里面去,方便后續(xù)對數(shù)據(jù)進(jìn)行處理.
監(jiān)聽代碼:
@Override public void onEvent(Event event) { EventType eventType = event.getHeader().getEventType(); if (eventType == EventType.TABLE_MAP) { TableMapEventData tableData = event.getData(); String db = tableData.getDatabase(); String table = tableData.getTable(); dbTable = getdbTable(db, table); } // 只處理添加刪除更新三種操作 if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) { if (isWrite(eventType)) { WriteRowsEventData data = event.getData(); for (Serializable[] row : data.getRows()) { if (dbTableCols.containsKey(dbTable)) { LogItem e = LogItem.itemFromInsert(row, dbTableCols.get(dbTable)); e.setDbTable(dbTable); queue.add(e); } } } } }
我偷懶了,,,這里面只實(shí)現(xiàn)了對添加操作的處理,其他操作沒有寫.
消費(fèi)代碼:
public void parse() throws IOException { parseClient.registerEventListener(this); for (int i = 0; i < consumerThreads; i++) { consumer.submit(() -> { while (true) { if (queue.size() > 0) { try { LogItem item = queue.take(); String dbtable = item.getDbTable(); listeners.get(dbtable).forEach(l -> { l.onEvent(item); }); } catch (InterruptedException e) { e.printStackTrace(); } } Thread.sleep(1000); } }); } parseClient.connect(); }
消費(fèi)時,從隊列中獲取item,之后獲取對應(yīng)的一個或者多個監(jiān)聽者,分別消費(fèi)這個item.
測試代碼:
public static void main(String[] args) throws Exception { Conf conf = new Conf(); conf.host = "hostname"; conf.port = 3306; conf.username = conf.passwd = "hhsgsb"; MysqlBinLogListener mysqlBinLogListener = new MysqlBinLogListener(conf); mysqlBinLogListener.parseArgsAndRun(args); mysqlBinLogListener.regListener("pf", "student", item -> { System.out.println(new String((byte[])item.getAfter().get("name"))); logger.info("insert into {}, value = {}", item.getDbTable(), item.getAfter()); }); mysqlBinLogListener.regListener("pf", "teacher", item -> System.out.println("teacher ====")); mysqlBinLogListener.parse(); }
在這段很少的代碼里,注冊了兩個監(jiān)聽者,分別監(jiān)聽student和teacher表,并分別進(jìn)行打印處理,經(jīng)測試,在teacher表插入數(shù)據(jù)時,可以獨(dú)立的運(yùn)行定義的業(yè)務(wù)邏輯.
注意:這里的工具類并不能直接投入使用,因為里面有許多的異常處理沒有做,且功能僅監(jiān)聽了插入語句,可以用來做實(shí)現(xiàn)的參考.
以上是“Mysql數(shù)據(jù)庫如何監(jiān)聽binlog的開啟步驟”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。