您好,登錄后才能下訂單哦!
譯者前言
近期的主要工作是在為公司的 APP 增加搜索功能。因?yàn)橐灿龅搅诵枰?a title="關(guān)系型數(shù)據(jù)庫" target="_blank" href="http://www.kemok4.com/mysql/">關(guān)系型數(shù)據(jù)庫中的數(shù)據(jù)同步 ElasticSearch 中的問題,故抽了點(diǎn)時(shí)間翻譯了這篇官方的博文。最近,在數(shù)據(jù)同步方面也有些思考。
本篇文章的重點(diǎn)不在 Logstash 的 JDBC 插件的使用方法,而是數(shù)據(jù)同步會遇到的一些細(xì)節(jié)問題如何處理。我覺得,這些設(shè)計(jì)思想是通用的,無論你使用的何種方式進(jìn)行數(shù)據(jù)同步。
翻譯正文
為了利用 ElasticSearch 強(qiáng)大的搜索能力,大部分的業(yè)務(wù)都會在關(guān)系型數(shù)據(jù)庫的基礎(chǔ)上部署 ElasticSearch。這類場景下,保持 ElasticSearch 和關(guān)系型數(shù)據(jù)庫之間的數(shù)據(jù)同步是非常必要的。
本篇博文將會介紹如何通過 Logstash 實(shí)現(xiàn)在 MySQL 和 ElasticSearch 之間數(shù)據(jù)的高效復(fù)制與同步。
注:文中演示的代碼和方法都經(jīng)過在 MySQL 中的測試,理論上適應(yīng)于所有的關(guān)系型數(shù)據(jù)庫。
本文中,組件的相關(guān)信息如下:
MySQL: 8.0.16.
Elasticsearch: 7.1.1
Logstash: 7.1.1
Java: 1.8.0_162-b12
JDBC input plugin: v4.3.13
JDBC connector: Connector/J 8.0.16
數(shù)據(jù)同步概述
本文將會通過 Logstash 的 JDBC input 插件進(jìn)行 ElasticSearch 和 MySQL 之間的數(shù)據(jù)同步。從概念上講,JDBC 插件將通過周期性的輪詢以發(fā)現(xiàn)上次迭代后的新增和更新的數(shù)據(jù)。為了正常工作,幾個(gè)條件需要滿足:
ElasticSearch 中 _id 設(shè)置必須來自 MySQL 中 id 字段。它提供了 MySQL 和 ElasticSearch 之間文檔數(shù)據(jù)的映射關(guān)系。如果一條記錄在 MySQL 更新,那么,ElasticSearch 所有關(guān)聯(lián)文檔都應(yīng)該被重寫。要說明的是,重寫 ElasticSearch 中的文檔和更新操作的效率相同。在內(nèi)部實(shí)現(xiàn)上,一個(gè)更新操作由刪除一個(gè)舊文檔和創(chuàng)建一個(gè)新文檔兩部分組成。
當(dāng) MySQL 中插入或更新一條記錄時(shí),必須包含一個(gè)字段用于保存字段的插入或更新時(shí)間。如此一來, Logstash 就可以實(shí)現(xiàn)每次請求只獲取上次輪詢后更新或插入的記錄。Logstash 每次輪詢都會保存從 MySQL 中讀取到的最新的插入或更新時(shí)間,該時(shí)間大于上次輪詢最新時(shí)間。
如果滿足了上述條件,我們就可以配置 Logstash 周期性的從 MySQL 中讀取所有最新更新或插入的記錄,然后寫入到 Elasticsearch 中。
關(guān)于 Logstash 的配置代碼,本文稍后會給出。
MySQL 設(shè)置
MySQL 庫和表的配置如下:
CREATE DATABASE es_db
USE es_db
DROP TABLE IF EXISTS es_table
CREATE TABLE es_table (
id BIGINT(20) UNSIGNED NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY unique_id (id),
client_name VARCHAR(32) NOT NULL,
modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
配置中有幾點(diǎn)需要說明,如下:
es_table,MySQL 的數(shù)據(jù)表,我們將把它的數(shù)據(jù)同步到 ElasticSearch 中;
id,記錄的唯一標(biāo)識。注意,id 定義為主鍵的同時(shí),也定義為唯一建,可以保證每個(gè) id 在表中只出現(xiàn)一次。同步 ElasticSearch 時(shí),將會轉(zhuǎn)化為文檔的 _id;
client_name,表示用戶定義用來保存數(shù)據(jù)的字段,為使博文保持簡潔,我們只定義了一個(gè)字段,更多字段也很容易加入。接下來的演示,我們會更新該字段,用以說明不僅僅新插入記錄會同步到 MySQL,更新記錄同樣會同步到 MySQL;
modification_time,用于保存記錄的更新或插入時(shí)間,它使得 Logstash 可以在每次輪詢時(shí)只請求上次輪詢后新增更新的記錄;
insertion_time,該字段用于一條記錄插入時(shí)間,主要是為演示方便,對同步而言,并非必須;
MySQL 操作
前面設(shè)置完成,我們可以通過如下命令插入記錄:
INSERT INTO es_table (id, client_name) VALUES (<id>, <client name>);`
使用如下命令更新記錄:
UPDATE es_table SET client_name = <new client name> WHERE id=<id>;
使用如下命令更新插入記錄:
INSERT INTO es_table (id, client_name) VALUES (<id>, <client_name when created>) ON DUPLICATE KEY UPDATE client_name=<client name when updated>
同步代碼
Logstash 的 pipeline 配置代碼如下,它實(shí)現(xiàn)了前面描述的功能,從 MySQL 到 ElasticSearch 的數(shù)據(jù)同步。
input {
jdbc {
jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
jdbc_user => "<my username>"
jdbc_password => "<my password>"
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/5 * * * * *",
statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time)) > :sql_last_value AND modification_time < NOW() ORDER BY modification_time desc"
}
}
filter {
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
output {
# stdout { codec => "rubydebug" }
elasticsearch {
index => "rdbms_sync_idx"
document_id => "%{[%metedata][_id]}"
}
}
關(guān)于 Pipeline 配置的幾點(diǎn)說明,如下:
tracking_column
此處配置為 "unix_ts_in_secs"。它被用于追蹤最新的記錄,并被保存在 .logstash_jdbc_last_run 文件中,下一次輪詢將以這個(gè)邊界位置為準(zhǔn)進(jìn)行記錄獲取。SELECT 語句中,可通過 :sql_last_value 訪問該配置字段的值。
unix_ts_in_secs
由 SELECT 語句生成,是 "modification_time" 的 UNIX TIMESTAMP。它被前面討論的 "track_column" 引用。使用 UNIX TIMESTAMP,而非其他時(shí)間形式,可以減少復(fù)雜性,防止時(shí)區(qū)導(dǎo)致的時(shí)間不一致問題。
sql_last_value
內(nèi)建的配置參數(shù),指定每次輪詢的開始位置。在 input 配置中,可被 SELECT 語句引用。在每次輪詢開始前,從 .logstash_jdbc_last_run 中讀取,此案例中,即為 "unix_ts_in_secs" 的最近值。如此便可保證每次輪詢只獲取最新插入和更新的記錄。
schedule
通過 cron 語法指定輪詢的執(zhí)行周期,例子中,"/5 " 表示每 5 秒輪詢一次。
modification_time < NOW()
SELECT 語句查詢條件的一部分,當(dāng)前解釋不清,具體情況待下面的章節(jié)再作介紹。
filter
該配置指定將 MySQL 中的 id 復(fù)制到 metadata 字段 _id 中,用以確保 ElasticSearch 中的文檔寫入正確的 _id。而之所以使用 metadata,因?yàn)樗桥R時(shí)的,不會使文檔中產(chǎn)生新的字段。同時(shí),我們也會把不希望寫入 Elasticsearch 的字段 id 和 @version 移除。
output
在 output 輸出段的配置,我們指定了文檔應(yīng)該被輸出到 ElasticSearch,并且設(shè)置輸出文檔 _id 為 filter 段創(chuàng)建的 metadata 的 _id。如果需要調(diào)試,注釋部分的 rubydebug 可以實(shí)現(xiàn)。
SELECT 語句的正確性分析
接下來,我們將開始解釋為什么 SELECT 語句中包含 modification_time < NOW() 是非常重要的。為了解釋這個(gè)問題,我們將引入兩個(gè)反例演示說明,為什么下面介紹的兩種最直觀的方法是錯(cuò)誤的。還有,為什么 modification_time < Now() 可以克服這些問題。
直觀場景一
當(dāng) where 子句中僅僅包含 UNIX_TIMESTAMP(modification_time) > :sql_last_value,而沒有 modification < Now() 的情況下,工作是否正常。這個(gè)場景下,SELECT 語句是如下形式:
statement => "SELECT *, UNIX_TIMESTAMP(modification_time)
AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) >
:sql_last_value) ORDER BY modification_time ASC"
粗略一看,似乎沒發(fā)現(xiàn)什么問題,應(yīng)該可以正常工作。但其實(shí),這里有一些邊界情況,可能導(dǎo)致一些文檔的丟失。舉個(gè)例子,假設(shè) MySQL 每秒插入兩個(gè)文檔,Logstash 每 5 秒執(zhí)行一次。如下圖所示,時(shí)間范圍 T0 至 T10,數(shù)據(jù)記錄 R1 至 R22。
Logstash 的第一次輪詢發(fā)生在 T5 時(shí)刻,讀取記錄 R1 至 R11,即圖中青色區(qū)域。此時(shí),sql_last_value 即為 T5,這個(gè)時(shí)間是從 R11 中獲取到的。
如果,當(dāng) Logstash 完成從 MySQL 讀取數(shù)據(jù)后,同樣在 T5 時(shí)刻,又有一條記錄插入到 MySQL 中。 而下一次的輪詢只會拉取到大于 T5 的記錄,這意味著 R12 將會丟失。如圖所示,青色和灰色區(qū)域分別表示當(dāng)次和上次輪詢獲取到的記錄。
注意,這類場景下的 R12 將永遠(yuǎn)不會再被寫入到 ElasticSearch。
直觀場景二
為了解決這個(gè)問題,或許有人會想,如果把 where 子句中的大于(>)改為大于等于(>=)是否可行。SELECT 語句如下
statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) >= :sql_last_value) ORDER BY modification_time ASC"
這種方式其實(shí)也不理想。這種情況下,某些文檔可能會被兩次讀取,重復(fù)寫入到 ElasticSearch 中。雖然這不影響結(jié)果的正確性,但卻做了多余的工作。如下圖所示,Logstash 的首次輪詢和場景一相同,青色區(qū)域表示已經(jīng)讀取的記錄。
Logstash 的第二次輪詢將會讀取所有大于等于 T5 的記錄。如下圖所示,注意 R11,即紫色區(qū)域,將會被再次發(fā)送到 ElasticSearch 中。
這兩種場景的實(shí)現(xiàn)效果都不理想。場景一會導(dǎo)致數(shù)據(jù)丟失,這是無法容忍的。場景二,存在重復(fù)讀取寫入的問題,雖然對數(shù)據(jù)正確性沒有影響,但執(zhí)行了多余的 IO。
終極方案
前面的兩場方案都不可行,我們需要繼續(xù)尋找其他解決方案。其實(shí)也很簡單,通過指定 (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()),我們就可以保證每條記錄有且只發(fā)送一次。
如下圖所示,Logstash 輪詢發(fā)生在 T5 時(shí)刻。因?yàn)橹付?modification_time < NOW(),文檔只會讀取到 T4 時(shí)刻,并且 sql_last_value 的值也將會被設(shè)置為 T4。
開始下一次的輪詢,當(dāng)前時(shí)間 T10。
由于設(shè)置了 UNIX_TIMESTAMP(modification_time) > :sql_last_value,并且當(dāng)前 sql_last_value 為 T4,因此,本次的輪詢將從 T5 開始。而 modification_time < NOW() 決定了只有時(shí)間小于等于 T9 的記錄才會被讀取。最后,sql_last_value 也將被設(shè)置為 T9。
如此,MySQL 中的每個(gè)記錄就可以做到都能被精確讀取了一次,如此就可以避免每次輪詢可能導(dǎo)致的當(dāng)前時(shí)間間隔內(nèi)數(shù)據(jù)丟失或重復(fù)讀取的問題。
系統(tǒng)測試
簡單的測試可以幫助我們驗(yàn)證配置是否如我們所愿。我們可以寫入一些數(shù)據(jù)至數(shù)據(jù)庫,如下:
INSERT INTO es_table (id, client_name) VALUES (1, 'Jim Carrey');
INSERT INTO es_table (id, client_name) VALUES (2, 'Mike Myers');
INSERT INTO es_table (id, client_name) VALUES (3, 'Bryan Adams');
一旦 JDBC 輸入插件觸發(fā)執(zhí)行,將會從 MySQL 中讀取所有記錄,并寫入到 ElasticSearch 中。我們可以通過查詢語句查看 ElasticSearch 中的文檔。
`GET rdbms_sync_idx/_search`
執(zhí)行結(jié)果如下:
"hits" : {
"total" : {
"value" : 3,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "rdbms_sync_idx",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"insertion_time" : "2019-06-18T12:58:56.000Z",
"@timestamp" : "2019-06-18T13:04:27.436Z",
"modification_time" : "2019-06-18T12:58:56.000Z",
"client_name" : "Jim Carrey"
}
},
Etc …
更新 id=1 的文檔,如下:
UPDATE es_table SET client_name = 'Jimbo Kerry' WHERE id=1;
通過 _id = 1,可以實(shí)現(xiàn)文檔的正確更新。通過執(zhí)行如下命令查看文檔:
GET rdbms_sync_idx/_doc/1
結(jié)果如下:
{
"_index" : "rdbms_sync_idx",
"_type" : "_doc",
"_id" : "1",
"_version" : 2,
"_seq_no" : 3,
"_primary_term" : 1,
"found" : true,
"_source" : {
"insertion_time" : "2019-06-18T12:58:56.000Z",
"@timestamp" : "2019-06-18T13:09:30.300Z",
"modification_time" : "2019-06-18T13:09:28.000Z",
"client_name" : "Jimbo Kerry"
}
}
文檔 _version 被設(shè)置為 2,并且 modification_time 和 insertion_time 已經(jīng)不一樣了,client_name 已經(jīng)正確更新。而 @timestamp,不是我們需要關(guān)注的,它是 Logstash 默認(rèn)添加的。
更新添加 upsert 執(zhí)行語句如下:
INSERT INTO es_table (id, client_name) VALUES (4, 'Bob is new') ON DUPLICATE KEY UPDATE client_name='Bob exists already';
復(fù)制代碼和之前一樣,我們可以通過查看 ElasticSearch 中相應(yīng)文檔,便可驗(yàn)證同步的正確性。
文檔刪除
不知道你是否已經(jīng)發(fā)現(xiàn),如果一個(gè)文檔從 MySQL 中刪除,并不會同步到 ElasticSearch 。關(guān)于這個(gè)問題,列舉一些可供我們考慮的方案,如下:
MySQL 中的記錄可通過包含 is_deleted 字段用以表明該條記錄是否有效。一旦發(fā)生更新,is_deleted 也會同步更新到 ElasticSearch 中。如果通過這種方式,在執(zhí)行 MySQL 或 ElasticSearch 查詢時(shí),我們需要重寫查詢語句來過濾掉 is_deleted 為 true 的記錄。同時(shí),需要一些后臺進(jìn)程將 MySQL 和 ElasticSearch 中的這些文檔刪除。
另一個(gè)可選方案,應(yīng)用系統(tǒng)負(fù)責(zé) MySQL 和 ElasticSearch 中數(shù)據(jù)的刪除,即應(yīng)用系統(tǒng)在刪除 MySQL 中數(shù)據(jù)的同時(shí),也要負(fù)責(zé)將 ElasticSearch 中相應(yīng)的文檔刪除。
總結(jié)
本文介紹了如何通過 Logstash 進(jìn)行關(guān)系型數(shù)據(jù)庫和 ElasticSearch 之間的數(shù)據(jù)同步。文中以 MySQL 為例,但理論上,演示的方法和代碼也應(yīng)該同樣適應(yīng)于其他的關(guān)系型數(shù)據(jù)庫。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。