溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎樣分析Debezium MySQL模塊設計

發(fā)布時間:2021-10-25 09:26:50 來源:億速云 閱讀:139 作者:柒染 欄目:大數(shù)據(jù)

今天就跟大家聊聊有關怎樣分析Debezium MySQL模塊設計,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據(jù)這篇文章可以有所收獲。

注:本文不會著重分析MySQL binlog格式結構和解析過程,而在于debezium的架構設計。

Debezium is an open source distributed platform for change data capture.

這句話引用自debezium官網(wǎng),可以看到,debezium的野心還是很大的,把自己定義為一個通用的CDC平臺,事實上,也確實如此,尤其是從0.8版本以來,開發(fā)者將大量精力投入到PostgreSQL模塊的開發(fā),一方面引入SQL Server, Oracle, Db2, Cassandra等數(shù)據(jù)庫的支持,另一方面適配了Pulsar,Amazon Kineis,Google Pub/Sub等消息引擎,并且逐步重構,解耦和具體數(shù)據(jù)庫的綁定以及具體消息系統(tǒng)的依賴,向統(tǒng)一架構和云原生靠攏。

事實上,早期的Debezium是和Kafka Connect框架緊耦合的,Debezium是Kafka Connect的一個Source Plugin,并且主要適配MySQL,稍帶了MongoDB。

目前Debezium最新版是1.2,1.3已經(jīng)進入beta階段。其實MySQL模塊在0.8版本已經(jīng)基本定下來了,后面的變動只有0.9合入了DBZ-175,這是一個在線加表特性,僅作為內部實驗特性,并沒有在文檔上提及。我個人認為這個設計十分精彩,會在本文有篇幅專門討論。在后續(xù)的開發(fā)中,同為早期的MongoDB被徹底重構,但MySQL模塊還是保持原來的樣子。

Rebase MySQL connector to common framework used by the other connectors.

這個已經(jīng)在Roadmap上掛了有段兒時間了,但可以預見到,短期內還不會有什么動作。很大一部分原因是,MySQL模塊的代碼中有大量的針對MySQL和Kafka Connect缺陷的額外處理,還有像DBZ-175這種統(tǒng)一架構還不支持的特性,另外由于MySQL的廣泛使用,多年來社區(qū)發(fā)現(xiàn)和修復了大量的場景下的bug,把一個久經(jīng)驗證的模塊架構推倒是一件風險很大的事情。

后續(xù)的分析僅僅針對MySQL模塊的架構和代碼,基本上不會涉及新的統(tǒng)一架構。

Kafka Connect

上文提到,Debezium最初設計成一個Kafka Connect 的Source Plugin,目前開發(fā)者雖致力于將其與Kafka Connect解耦,但當前的代碼實現(xiàn)還未變動。下圖引自Debeizum官方文檔,可以看到一個Debezium在一個完整CDC系統(tǒng)中的位置。

怎樣分析Debezium MySQL模塊設計

Kafka Connect 為Source Plugin提供了一系列的編程接口,最主要的就是要實現(xiàn)SourceTask的poll方法,其返回List<SourceRecord>將會被以最少一次語義的方式投遞至Kafka。如果你想了解更多Kafka Connect的細節(jié),請參閱我的另一篇文章:https://www.jianshu.com/p/538b2f0a7462

public abstract class SourceTask implements Task {
    ...
    public abstract List<SourceRecord> poll() throws InterruptedException;
    ...
}

Debezium MySQL 架構

Reader體系構成了MySQL模塊中代碼的主線,我們的分析從Reader開始。

怎樣分析Debezium MySQL模塊設計

這里是Reader的整個繼承樹,我們先暫時忽略ParallelSnapshotReader,ReconcilingBinlogReader,他們是DBZ-175引入的東西。

從名字上應該可以看出,真正主要的是SnapshotReader和BinlogReader,分別實現(xiàn)了對MySQL數(shù)據(jù)的全量讀取和增量讀取,他們繼承于AbstractReader,里面封裝了共用邏輯,下圖是AbstractReader的內部設計。

怎樣分析Debezium MySQL模塊設計

可以看到,AbstractReader在實現(xiàn)時,并沒有直接將enqueue喂進來的record投遞進Kafka,而是通過一個內存阻塞隊列BlockingQueue進行了解耦,這種設計有諸多好處:

  1. 職責解耦

如上的圖中,在喂入BlockingQueue之前,要根據(jù)條件判斷是否接受該record;在向Kafka投遞record之前,判斷task的running狀態(tài)。這樣把同類的功能限定在特定的位置。

  1. 線程隔離

BlockingQueue是一個線程安全的阻塞隊列,通過BlockingQueue實現(xiàn)的生產(chǎn)者消費者模型,是可以跑在不同的線程里的,這樣避免局部的阻塞帶來的整體的干擾。如上圖中的右側,消費者會定期判斷running標志位,若running被stop信號置為了false,可以立刻停止整個task,而不會因MySQL IO阻塞延遲相應。

  1. Single與Batch的互相轉化

Enqueue record是單條的投遞record,drain_to是批量的消費records。這個用法也可以反過來,實現(xiàn)batch到single的轉化。

還剩下兩個ChainedReader和TimedBlockingReader。

  • ChainedReader顧名思義,會把幾個Reader包裝起來,串行執(zhí)行。

  • TimedBlockingReader就是簡單的sleep一段時間,它的存在是為了應對Kafka Connect rebalance的設計缺陷,在上文中我的另一篇文章中有提到。

Snapshot Stream 無縫銜接

如果你搭建過MySQL的主從同步,因該知道,建立從庫時,需要先導出全量數(shù)據(jù)(MySQL 8.0.x好像已經(jīng)有了更便捷的方法),然后記錄binlog的位置,把全量數(shù)據(jù)導入從庫后,從binlog位置繼續(xù)增量同步,已保持數(shù)據(jù)的一致性。

可能你還知道阿里開源的另一個MySQL CDC工具canal,他只負責stream過程,并沒有處理snapshot過程,這也是debezium相較于canal的一個優(yōu)勢。

對于Debezium來說,基本沿用了官方搭建從庫的這一思路,讓我們看下官方文檔描述的詳細步驟。(如果沒有額外說明,后面的討論僅針對Innodb引擎)

  1. Grabs a global read lock that blocks writes by other database clients. The snapshot itself does not prevent other clients from applying DDL which might interfere with the connector’s attempt to read the binlog position and table schemas. The global read lock is kept while the binlog position is read before released in a later step.

  2. Starts a transaction with repeatable read semantics to ensure that all subsequent reads within the transaction are done against the consistent snapshot.

  3. Reads the current binlog position.

  4. Reads the schema of the databases and tables allowed by the connector’s configuration.

  5. Releases the global read lock. This now allows other database clients to write to the database.

  6. Writes the DDL changes to the schema change topic, including all necessary DROP… and CREATE… DDL statements. This happens if applicable.

  7. Scans the database tables and generates CREATE events on the relevant table-specific Kafka topics for each row.

  8. Commits the transaction.

  9. Records the completed snapshot in the connector offsets.

Debezium把這個過程分解成了9步,看上去好像比我們想的要復雜些。

在Debezium目前版本的實現(xiàn)中,這9步是單線程串行的,其中主要的耗時就在第7步,這一步其實就是使用最樸素的方式,通過jdbc使用select * from table [where ...]來實現(xiàn)的讀取全量數(shù)據(jù),如果很多千萬級甚至更大的表,這一步的耗時是很長的。其實這一步是可以并行化的,在第1步中,已經(jīng)獲取了全局鎖,在全局鎖釋放前,是可以開多個連接,并行的實現(xiàn)全量數(shù)據(jù)的拉去,極大的提升效率。

另外snapshot整個過程如果失敗,是無法恢復的,畢竟事務已經(jīng)丟了,無法再讀取當時的快照,來保證數(shù)據(jù)的一致性。

Snapshot過程時間長和中斷不可恢復,再加上Kafka Connect 粗暴的rebalance策略,正是早期使用debezium的一大痛點。TimedBlockingReader的引入正是為了在一定程度上緩解這個問題。

ChainedReader
├── TimedBlockReader
├── SnapshotReader
└── BinlogReader

當準備一次性提交多個同步任務時,因為每次任務提交都會觸發(fā)一次rebalance,在SnapshotReader和BinlogReader前插入一個TimedBlockReader,確保同步任務提交后不會立刻執(zhí)行,等多個任務都提交完成時,集群穩(wěn)定下來,才會開始并發(fā)執(zhí)行。

特別的,snapshot和stream過程都是可選的,你也可以像canal一樣只從當前時刻開始監(jiān)聽binlog,捕獲stream數(shù)據(jù),具體配置請參考官方文檔。

Schema時間線構造

下面我們關注一下stream過程,也就是binlog解析過程。(做數(shù)據(jù)同步binlog必須設為row模式)

相信能讀到這里的大多數(shù)同學都執(zhí)行過以下命令,就是用MySQL官方的binlog工具解析binlog文件內容。仔細看,你會發(fā)現(xiàn),這里面有庫名和表名,有每個字段的值,卻沒有字段名,換句話說,binlog里不包含schema信息!

mysqlbinlog --no-defaults --base64-output=decode-rows -vvv ~/Downloads/mysql-bin.001192 | less
#190810 12:00:20 server id 206195699  end_log_pos 8624 CRC32 0x46912d80         GTID    last_committed=12       sequence_number=13      rbr_only=yes
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
SET @@SESSION.GTID_NEXT= '5358e6dc-d161-11e8-8a6c-7cd30ac4dc44:25115781'/*!*/;
# at 8624
#190810 12:00:20 server id 206195699  end_log_pos 8687 CRC32 0xe14a2f5a         Query   thread_id=576127        exec_time=0     error_code=0
SET TIMESTAMP=1565409620/*!*/;
BEGIN
/*!*/;
# at 8687
#190810 12:00:20 server id 206195699  end_log_pos 8775 CRC32 0xaf16fb7d         Table_map: `risk_control`.`log_operation` mapped to number 39286
# at 8775
#190810 12:00:20 server id 206195699  end_log_pos 9055 CRC32 0x9bdc15ae         Write_rows: table id 39286 flags: STMT_END_F
### INSERT INTO `risk_control`.`log_operation`
### SET
###   @1=8166048 /* INT meta=0 nullable=0 is_null=0 */
###   @2='7b0d526124ba40f6ac71cfe1d0d90665' /* VARSTRING(160) meta=160 nullable=0 is_null=0 */
###   @3=17 /* INT meta=0 nullable=1 is_null=0 */
###   @4='2' /* VARSTRING(64) meta=64 nullable=1 is_null=0 */
###   @5='casign_end=;方法public void com.xxx.risk.service.impl.ActivitiEventServiceImpl.updateAuditStatus(java.lang.String,java.lang.String);參數(shù){"auditStatus": 3}' /* VARSTRING(1020) meta=1020 nullable=0 is_null=0 */
###   @6='\x00\x01\x00\x16\x00\x0b\x00\x0b\x00\x05\x03\x00auditStatus' /* JSON meta=4 nullable=1 is_null=0 */
###   @7='2019-08-10 12:00:21' /* DATETIME(0) meta=0 nullable=0 is_null=0 */
###   @8='' /* VARSTRING(128) meta=128 nullable=0 is_null=0 */
###   @9='' /* VARSTRING(256) meta=256 nullable=0 is_null=0 */
# at 9055
#190810 12:00:20 server id 206195699  end_log_pos 9086 CRC32 0xbe19a1b1         Xid = 180175929
COMMIT/*!*/;

其實這種設計可以理解,作為一個高效的二進制格式,binlog里不存儲冗余度極高的列名可以很可觀的減少體積,并且,有了表名,表結構信息可以從MySQL information_schema表中拿到的,何必再存一份呢?

但是,debezium偷梁換柱,模擬從庫拉取binlog做解析,他并不是真正的從庫,是沒有information_schema表可以查的,只能從MySQL主庫查詢。但這個方式真的萬無一失嗎?

考慮下面的場景:

  • 15:00 BinlogReader正常消費

  • 15:05 Kafka Connect集群維護,暫停BinlogReader

  • 15:10 表A修改,在第3列后增加了1列(新增列不一定在尾部)

  • 15:15 Kafka Connect集群維護結束,恢復BinlogReader

這個場景中,BinlogReader在15:15恢復后,會繼續(xù)從15:05讀取并解析binlog,如果這時從MySQL讀取information_schema來獲取表A的schema信息,那么在15:05-15:10期間binlog和schema是不匹配的,也就無法解析出正確的數(shù)據(jù)。換句話說,如果debezium讀取binlog有延遲,這段時間主庫schema做了修改,那么讀取主庫information_schema的方案就會有問題了。

要解決這個問題,就要模擬information_schema機制,維護一份當前的schema快照,可這樣就夠了嗎?

回到前面提到的AbstractReader內部設計上,BinlogReader作為生產(chǎn)者,其將解析后的數(shù)據(jù)投遞到BlockingQueue中,如果在解析binlog過程中遇到了DDL語句(比如alter table add column ...),就會更新當前的schema快照。

怎樣分析Debezium MySQL模塊設計

這時,如果stop task,如上圖,BlockingQueue中還未被消費的records將被丟棄,如果包含schema修改之前解析出的record,那么下次binlog將從此處開始解析,而debezium存儲的schema快照卻已經(jīng)對應了修改后的,也會照成binlog和schema的不匹配。在這種邊緣場景下,僅保存當前schema快照的方案就行不通了。當然,后面我們還會提到這種模式同樣不能滿足很多其他場景。

事已至此,我們只好使用終極方案,把每一次的schema變更都保存下來,構造一條完整的schema時間線,確保在解析任一時刻的binlog事件時,都能找到對應版本的schema快照。

Debezium中使用DatabaseHistory來實現(xiàn)該功能,功能已經(jīng)滿足,不過實現(xiàn)的確是簡陋。MySQLDatabaseHistory會從同步任務啟動時,導出所有的create table語句(參見snapshot過程第6步),在此基礎上,追加記錄每一條DDL語句,debezium為這些DDL存儲提供了內存、文件、kafka topic實現(xiàn),其中kafka topic必須設置過期策略為永不過期。

當要恢復到任意時刻的schema快照時,從頭開始,逐條解析所有的DDL,疊加修改,直到指定時刻前最后一條DDL??梢钥吹?,這種實現(xiàn)方式的效率是比較低的,當任務持續(xù)數(shù)個月時,會累積大量的DDL(尤其是在阿里云RDS上,不知道阿里云改了什么,binlog里會產(chǎn)生海量的DDL),一次恢復可能需要數(shù)十分鐘乃至數(shù)個小時,并且若其中有一處DDL解析錯誤,會導致其后所有的快照都發(fā)生錯誤。開發(fā)者很早就意識到了這個問題,并且提出了一些改進想法,可能會在不久后有所進展。

看到這里,相信你已經(jīng)可以理解,為什么有些商業(yè)的數(shù)據(jù)同步引擎對同步過程中的schema變更有所限制,要完備的支持各種情況,著實不是一件容易的事情。

飛機上換引擎——在線加表

我們前面已經(jīng)多次提到了DBZ-175,現(xiàn)在我們我們開始討論這個精彩的設計。https://issues.redhat.com/browse/DBZ-175

注:該功能僅作為內部實驗特性,官方文檔未提及,有問題請參考JIRA討論或者閱讀調試源碼。

我們先來補充一些背景,debezium在同步數(shù)據(jù)過程中,允許通過table.whitelist和table.blacklist指定要同步的表。假設一開始,我們將table.whitelist配置為a,b兩張表,這兩張表完成了Snapshot階段,已經(jīng)穩(wěn)定的切換到Stream階段。這時,來了新的同步需求,要再同步c表,并且最好不干擾a,b兩張表的同步進度。那很自然的想法就是我再起一個新的同步任務來處理c表,長此以往,你會發(fā)現(xiàn)一個MySQL主庫上掛了很多個“從庫”,對MySQL主庫會照成一定壓力。所以,一個理想的方案便是:修改同步任務的table.whitelist后,debezium可以自動完成新增表的全量和增量同步,并且這個過程不會干擾原有的同步任務;當新老兩批同步任務進度相近時,合二為一,只使用同一個BinlogReader完成后續(xù)的stream同步。

簡單吧!怎么實現(xiàn)呢?就是我們前文暫時略過的ParallelSnapshotReader和ReconcilingBinlogReader。

首先描述一下在線加表后,整個Reader的結構。

ChainedReader
├── ParallelSnapshotReader
│   ├── OldTablesBinlogReader
│   └── ChainedReader
│       ├── NewTablesSnapshotReader
│       └── NewTablesBinlogReader
├── ReconcilingBinlogReader
└── UnifiedBinlogReader
  1. ParallelSnapshotReader就如其名字一樣,在保證不干擾OldTablesBinlogReader運行的情況下,并行的開始對新增表進行全量和增量同步;

  2. 當新增表進入stream階段后,OldTablesBinlogReader和NewTablesBinlogReader每一次拉取都會和對方作比較,當兩者的進度相差在一定時間內(默認時5分鐘)時,將兩者停止;

  3. 此時ParallelSnapshotReader退出,由ReconcilingBinlogReader將兩個BinlogReader進度同步,即將滯后者追平只領先者;

  4. 原有的兩個BinlogReader退出,新建的UnifiedBinlogReader從其位點繼續(xù)做新老所有表的stream解析,整個合并過程結束。

這段設計是我在翻閱了JIRA上數(shù)位開發(fā)者們歷經(jīng)幾年的討論記錄后,結合代碼調試整理得出的。回想當年剛畢業(yè)的我,接到這個在線加表需求時,本以為是不可能實現(xiàn)的事情,直到發(fā)現(xiàn)了這個設計,不由得是感嘆和驚喜。

這只是一個初版的實現(xiàn),在整個過程中,因為元數(shù)據(jù)的設計問題,并不支持schema的變更,可能正是由于這個原因,嚴謹?shù)拈_發(fā)者們選擇不公開這項功能,僅作為內部實驗特性。

從架構的層面梳理了debezium MySQL模塊的基本結構,希望能讓大家對代碼的整體的結構和設計理念有所理解,內部還有諸多細節(jié)等待探索。后面若有機會,我會分享一些在生產(chǎn)環(huán)境中遇到的問題和處理方案。

看完上述內容,你們對怎樣分析Debezium MySQL模塊設計有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。

AI