溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

ES 譯文之如何使用 Logstash 實(shí)現(xiàn)關(guān)系型數(shù)據(jù)庫與 ElasticSearch 之間的數(shù)據(jù)同

發(fā)布時(shí)間:2020-06-26 16:17:36 來源:網(wǎng)絡(luò) 閱讀:3056 作者:kukelook 欄目:編程語言

譯者前言
近期的主要工作是在為公司的 APP 增加搜索功能。因?yàn)橐灿龅搅诵枰?a title="關(guān)系型數(shù)據(jù)庫" target="_blank" href="http://www.kemok4.com/mysql/">關(guān)系型數(shù)據(jù)庫中的數(shù)據(jù)同步 ElasticSearch 中的問題,故抽了點(diǎn)時(shí)間翻譯了這篇官方的博文。最近,在數(shù)據(jù)同步方面也有些思考。
本篇文章的重點(diǎn)不在 Logstash 的 JDBC 插件的使用方法,而是數(shù)據(jù)同步會遇到的一些細(xì)節(jié)問題如何處理。我覺得,這些設(shè)計(jì)思想是通用的,無論你使用的何種方式進(jìn)行數(shù)據(jù)同步。
翻譯正文

為了利用 ElasticSearch 強(qiáng)大的搜索能力,大部分的業(yè)務(wù)都會在關(guān)系型數(shù)據(jù)庫的基礎(chǔ)上部署 ElasticSearch。這類場景下,保持 ElasticSearch 和關(guān)系型數(shù)據(jù)庫之間的數(shù)據(jù)同步是非常必要的。
本篇博文將會介紹如何通過 Logstash 實(shí)現(xiàn)在 MySQL 和 ElasticSearch 之間數(shù)據(jù)的高效復(fù)制與同步。
注:文中演示的代碼和方法都經(jīng)過在 MySQL 中的測試,理論上適應(yīng)于所有的關(guān)系型數(shù)據(jù)庫。
本文中,組件的相關(guān)信息如下:

MySQL:  8.0.16.
Elasticsearch: 7.1.1
Logstash: 7.1.1
Java: 1.8.0_162-b12
JDBC input plugin: v4.3.13
JDBC connector: Connector/J 8.0.16

數(shù)據(jù)同步概述
本文將會通過 Logstash 的 JDBC input 插件進(jìn)行 ElasticSearch 和 MySQL 之間的數(shù)據(jù)同步。從概念上講,JDBC 插件將通過周期性的輪詢以發(fā)現(xiàn)上次迭代后的新增和更新的數(shù)據(jù)。為了正常工作,幾個(gè)條件需要滿足:
ElasticSearch 中 _id 設(shè)置必須來自 MySQL 中 id 字段。它提供了 MySQL 和 ElasticSearch 之間文檔數(shù)據(jù)的映射關(guān)系。如果一條記錄在 MySQL 更新,那么,ElasticSearch 所有關(guān)聯(lián)文檔都應(yīng)該被重寫。要說明的是,重寫 ElasticSearch 中的文檔和更新操作的效率相同。在內(nèi)部實(shí)現(xiàn)上,一個(gè)更新操作由刪除一個(gè)舊文檔和創(chuàng)建一個(gè)新文檔兩部分組成。
當(dāng) MySQL 中插入或更新一條記錄時(shí),必須包含一個(gè)字段用于保存字段的插入或更新時(shí)間。如此一來, Logstash 就可以實(shí)現(xiàn)每次請求只獲取上次輪詢后更新或插入的記錄。Logstash 每次輪詢都會保存從 MySQL 中讀取到的最新的插入或更新時(shí)間,該時(shí)間大于上次輪詢最新時(shí)間。
如果滿足了上述條件,我們就可以配置 Logstash 周期性的從 MySQL 中讀取所有最新更新或插入的記錄,然后寫入到 Elasticsearch 中。
關(guān)于 Logstash 的配置代碼,本文稍后會給出。
MySQL 設(shè)置
MySQL 庫和表的配置如下:

CREATE DATABASE es_db

USE es_db

DROP TABLE IF EXISTS es_table

CREATE TABLE es_table (
  id BIGINT(20) UNSIGNED NOT NULL,
  PRIMARY KEY (id),
  UNIQUE KEY unique_id (id),
  client_name VARCHAR(32) NOT NULL,
  modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

配置中有幾點(diǎn)需要說明,如下:

es_table,MySQL 的數(shù)據(jù)表,我們將把它的數(shù)據(jù)同步到 ElasticSearch 中;
id,記錄的唯一標(biāo)識。注意,id 定義為主鍵的同時(shí),也定義為唯一建,可以保證每個(gè) id 在表中只出現(xiàn)一次。同步 ElasticSearch 時(shí),將會轉(zhuǎn)化為文檔的 _id;
client_name,表示用戶定義用來保存數(shù)據(jù)的字段,為使博文保持簡潔,我們只定義了一個(gè)字段,更多字段也很容易加入。接下來的演示,我們會更新該字段,用以說明不僅僅新插入記錄會同步到 MySQL,更新記錄同樣會同步到 MySQL;
modification_time,用于保存記錄的更新或插入時(shí)間,它使得 Logstash 可以在每次輪詢時(shí)只請求上次輪詢后新增更新的記錄;
insertion_time,該字段用于一條記錄插入時(shí)間,主要是為演示方便,對同步而言,并非必須;

MySQL 操作
前面設(shè)置完成,我們可以通過如下命令插入記錄:

INSERT INTO es_table (id, client_name) VALUES (<id>, <client name>);`

使用如下命令更新記錄:

UPDATE es_table SET client_name = <new client name> WHERE id=<id>;

使用如下命令更新插入記錄:

INSERT INTO es_table (id, client_name) VALUES (<id>, <client_name when created>) ON DUPLICATE KEY UPDATE client_name=<client name when updated>

同步代碼
Logstash 的 pipeline 配置代碼如下,它實(shí)現(xiàn)了前面描述的功能,從 MySQL 到 ElasticSearch 的數(shù)據(jù)同步。

input {
  jdbc {
    jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
    jdbc_user => "<my username>"
    jdbc_password => "<my password>"
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *",
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time)) > :sql_last_value AND modification_time < NOW() ORDER BY modification_time desc"
  }
}
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}

output {
  # stdout { codec => "rubydebug" }
  elasticsearch {
    index => "rdbms_sync_idx"
    document_id => "%{[%metedata][_id]}"
  }
}

關(guān)于 Pipeline 配置的幾點(diǎn)說明,如下:

tracking_column

此處配置為 "unix_ts_in_secs"。它被用于追蹤最新的記錄,并被保存在 .logstash_jdbc_last_run 文件中,下一次輪詢將以這個(gè)邊界位置為準(zhǔn)進(jìn)行記錄獲取。SELECT 語句中,可通過 :sql_last_value 訪問該配置字段的值。

unix_ts_in_secs

由 SELECT 語句生成,是 "modification_time" 的 UNIX TIMESTAMP。它被前面討論的 "track_column" 引用。使用 UNIX TIMESTAMP,而非其他時(shí)間形式,可以減少復(fù)雜性,防止時(shí)區(qū)導(dǎo)致的時(shí)間不一致問題。

sql_last_value

內(nèi)建的配置參數(shù),指定每次輪詢的開始位置。在 input 配置中,可被 SELECT 語句引用。在每次輪詢開始前,從 .logstash_jdbc_last_run 中讀取,此案例中,即為 "unix_ts_in_secs" 的最近值。如此便可保證每次輪詢只獲取最新插入和更新的記錄。

schedule

通過 cron 語法指定輪詢的執(zhí)行周期,例子中,"/5 " 表示每 5 秒輪詢一次。

modification_time < NOW()

SELECT 語句查詢條件的一部分,當(dāng)前解釋不清,具體情況待下面的章節(jié)再作介紹。

filter

該配置指定將 MySQL 中的 id 復(fù)制到 metadata 字段 _id 中,用以確保 ElasticSearch 中的文檔寫入正確的 _id。而之所以使用 metadata,因?yàn)樗桥R時(shí)的,不會使文檔中產(chǎn)生新的字段。同時(shí),我們也會把不希望寫入 Elasticsearch 的字段 id 和 @version 移除。

output

在 output 輸出段的配置,我們指定了文檔應(yīng)該被輸出到 ElasticSearch,并且設(shè)置輸出文檔 _id 為 filter 段創(chuàng)建的 metadata 的 _id。如果需要調(diào)試,注釋部分的 rubydebug 可以實(shí)現(xiàn)。
SELECT 語句的正確性分析
接下來,我們將開始解釋為什么 SELECT 語句中包含 modification_time < NOW() 是非常重要的。為了解釋這個(gè)問題,我們將引入兩個(gè)反例演示說明,為什么下面介紹的兩種最直觀的方法是錯(cuò)誤的。還有,為什么 modification_time < Now() 可以克服這些問題。
直觀場景一
當(dāng) where 子句中僅僅包含 UNIX_TIMESTAMP(modification_time) > :sql_last_value,而沒有 modification < Now() 的情況下,工作是否正常。這個(gè)場景下,SELECT 語句是如下形式:

statement => "SELECT *, UNIX_TIMESTAMP(modification_time)
AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) >
:sql_last_value) ORDER BY modification_time ASC"

粗略一看,似乎沒發(fā)現(xiàn)什么問題,應(yīng)該可以正常工作。但其實(shí),這里有一些邊界情況,可能導(dǎo)致一些文檔的丟失。舉個(gè)例子,假設(shè) MySQL 每秒插入兩個(gè)文檔,Logstash 每 5 秒執(zhí)行一次。如下圖所示,時(shí)間范圍 T0 至 T10,數(shù)據(jù)記錄 R1 至 R22。
ES 譯文之如何使用 Logstash 實(shí)現(xiàn)關(guān)系型數(shù)據(jù)庫與 ElasticSearch 之間的數(shù)據(jù)同

Logstash 的第一次輪詢發(fā)生在 T5 時(shí)刻,讀取記錄 R1 至 R11,即圖中青色區(qū)域。此時(shí),sql_last_value 即為 T5,這個(gè)時(shí)間是從 R11 中獲取到的。
如果,當(dāng) Logstash 完成從 MySQL 讀取數(shù)據(jù)后,同樣在 T5 時(shí)刻,又有一條記錄插入到 MySQL 中。 而下一次的輪詢只會拉取到大于 T5 的記錄,這意味著 R12 將會丟失。如圖所示,青色和灰色區(qū)域分別表示當(dāng)次和上次輪詢獲取到的記錄。
ES 譯文之如何使用 Logstash 實(shí)現(xiàn)關(guān)系型數(shù)據(jù)庫與 ElasticSearch 之間的數(shù)據(jù)同

注意,這類場景下的 R12 將永遠(yuǎn)不會再被寫入到 ElasticSearch。
直觀場景二
為了解決這個(gè)問題,或許有人會想,如果把 where 子句中的大于(>)改為大于等于(>=)是否可行。SELECT 語句如下

statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) >= :sql_last_value) ORDER BY modification_time ASC"

這種方式其實(shí)也不理想。這種情況下,某些文檔可能會被兩次讀取,重復(fù)寫入到 ElasticSearch 中。雖然這不影響結(jié)果的正確性,但卻做了多余的工作。如下圖所示,Logstash 的首次輪詢和場景一相同,青色區(qū)域表示已經(jīng)讀取的記錄。
ES 譯文之如何使用 Logstash 實(shí)現(xiàn)關(guān)系型數(shù)據(jù)庫與 ElasticSearch 之間的數(shù)據(jù)同

Logstash 的第二次輪詢將會讀取所有大于等于 T5 的記錄。如下圖所示,注意 R11,即紫色區(qū)域,將會被再次發(fā)送到 ElasticSearch 中。
ES 譯文之如何使用 Logstash 實(shí)現(xiàn)關(guān)系型數(shù)據(jù)庫與 ElasticSearch 之間的數(shù)據(jù)同

這兩種場景的實(shí)現(xiàn)效果都不理想。場景一會導(dǎo)致數(shù)據(jù)丟失,這是無法容忍的。場景二,存在重復(fù)讀取寫入的問題,雖然對數(shù)據(jù)正確性沒有影響,但執(zhí)行了多余的 IO。
終極方案
前面的兩場方案都不可行,我們需要繼續(xù)尋找其他解決方案。其實(shí)也很簡單,通過指定 (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()),我們就可以保證每條記錄有且只發(fā)送一次。
如下圖所示,Logstash 輪詢發(fā)生在 T5 時(shí)刻。因?yàn)橹付?modification_time < NOW(),文檔只會讀取到 T4 時(shí)刻,并且 sql_last_value 的值也將會被設(shè)置為 T4。
ES 譯文之如何使用 Logstash 實(shí)現(xiàn)關(guān)系型數(shù)據(jù)庫與 ElasticSearch 之間的數(shù)據(jù)同

開始下一次的輪詢,當(dāng)前時(shí)間 T10。
由于設(shè)置了 UNIX_TIMESTAMP(modification_time) > :sql_last_value,并且當(dāng)前 sql_last_value 為 T4,因此,本次的輪詢將從 T5 開始。而 modification_time < NOW() 決定了只有時(shí)間小于等于 T9 的記錄才會被讀取。最后,sql_last_value 也將被設(shè)置為 T9。
ES 譯文之如何使用 Logstash 實(shí)現(xiàn)關(guān)系型數(shù)據(jù)庫與 ElasticSearch 之間的數(shù)據(jù)同

如此,MySQL 中的每個(gè)記錄就可以做到都能被精確讀取了一次,如此就可以避免每次輪詢可能導(dǎo)致的當(dāng)前時(shí)間間隔內(nèi)數(shù)據(jù)丟失或重復(fù)讀取的問題。
系統(tǒng)測試
簡單的測試可以幫助我們驗(yàn)證配置是否如我們所愿。我們可以寫入一些數(shù)據(jù)至數(shù)據(jù)庫,如下:

INSERT INTO es_table (id, client_name) VALUES (1, 'Jim Carrey');
INSERT INTO es_table (id, client_name) VALUES (2, 'Mike Myers');
INSERT INTO es_table (id, client_name) VALUES (3, 'Bryan Adams');

一旦 JDBC 輸入插件觸發(fā)執(zhí)行,將會從 MySQL 中讀取所有記錄,并寫入到 ElasticSearch 中。我們可以通過查詢語句查看 ElasticSearch 中的文檔。

`GET rdbms_sync_idx/_search`

執(zhí)行結(jié)果如下:

"hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "rdbms_sync_idx",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "insertion_time" : "2019-06-18T12:58:56.000Z",
          "@timestamp" : "2019-06-18T13:04:27.436Z",
          "modification_time" : "2019-06-18T12:58:56.000Z",
          "client_name" : "Jim Carrey"
        }
      },
Etc …

更新 id=1 的文檔,如下:

UPDATE es_table SET client_name = 'Jimbo Kerry' WHERE id=1;

通過 _id = 1,可以實(shí)現(xiàn)文檔的正確更新。通過執(zhí)行如下命令查看文檔:

GET rdbms_sync_idx/_doc/1

結(jié)果如下:

{
  "_index" : "rdbms_sync_idx",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2,
  "_seq_no" : 3,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "insertion_time" : "2019-06-18T12:58:56.000Z",
    "@timestamp" : "2019-06-18T13:09:30.300Z",
    "modification_time" : "2019-06-18T13:09:28.000Z",
    "client_name" : "Jimbo Kerry"
  }
}

文檔 _version 被設(shè)置為 2,并且 modification_time 和 insertion_time 已經(jīng)不一樣了,client_name 已經(jīng)正確更新。而 @timestamp,不是我們需要關(guān)注的,它是 Logstash 默認(rèn)添加的。
更新添加 upsert 執(zhí)行語句如下:

INSERT INTO es_table (id, client_name) VALUES (4, 'Bob is new') ON DUPLICATE KEY UPDATE client_name='Bob exists already';

復(fù)制代碼和之前一樣,我們可以通過查看 ElasticSearch 中相應(yīng)文檔,便可驗(yàn)證同步的正確性。
文檔刪除
不知道你是否已經(jīng)發(fā)現(xiàn),如果一個(gè)文檔從 MySQL 中刪除,并不會同步到 ElasticSearch 。關(guān)于這個(gè)問題,列舉一些可供我們考慮的方案,如下:
MySQL 中的記錄可通過包含 is_deleted 字段用以表明該條記錄是否有效。一旦發(fā)生更新,is_deleted 也會同步更新到 ElasticSearch 中。如果通過這種方式,在執(zhí)行 MySQL 或 ElasticSearch 查詢時(shí),我們需要重寫查詢語句來過濾掉 is_deleted 為 true 的記錄。同時(shí),需要一些后臺進(jìn)程將 MySQL 和 ElasticSearch 中的這些文檔刪除。
另一個(gè)可選方案,應(yīng)用系統(tǒng)負(fù)責(zé) MySQL 和 ElasticSearch 中數(shù)據(jù)的刪除,即應(yīng)用系統(tǒng)在刪除 MySQL 中數(shù)據(jù)的同時(shí),也要負(fù)責(zé)將 ElasticSearch 中相應(yīng)的文檔刪除。
總結(jié)
本文介紹了如何通過 Logstash 進(jìn)行關(guān)系型數(shù)據(jù)庫和 ElasticSearch 之間的數(shù)據(jù)同步。文中以 MySQL 為例,但理論上,演示的方法和代碼也應(yīng)該同樣適應(yīng)于其他的關(guān)系型數(shù)據(jù)庫。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI