溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

[Spring cloud 一步步實現廣告系統] 15. 監(jiān)聽Binlog 增量索引準備

發(fā)布時間:2020-07-05 02:55:00 來源:網絡 閱讀:526 作者:zhangpan0614 欄目:編程語言
MySQL Binlog簡介
  • 什么是binlog?

一個二進制日志,用來記錄對數據發(fā)生或潛在發(fā)生更改的SQL語句,并以而進行的形式保存在磁盤中。

  • binlog 的作用?

最主要有3個用途:

  • 數據復制(主從同步)

    Mysql 的Master-Slave協議,讓Slave可以通過監(jiān)聽binlog實現數據復制,達到數據一致性目的

  • 數據恢復

    通過mysqlbinlog工具恢復數據

  • 增量備份
  • Binlog 變量

    • log_bin (Binlog 開關,使用show variables like 'log_bin';查看)

    • binlog_format (Binlog 日志格式,使用show variables like 'binlog_format';查看)

    日志格式總共有三種:

    • ROW, 僅保存記錄被修改的細節(jié),不記錄SQL語句上下文相關信息。(能清晰的記錄下每行數據的修改細節(jié),不需要記錄上下文相關信息,因此不會發(fā)生某些特定情況下的procedure、function以及trigger 的調用無法被準確復制的問題,任何情況下都可以被復制,且能加快從庫重放日志的效率,保證從庫數據的一致性)
    • STATEMENT,每一條修改數據的SQL都會被記錄。(只記錄執(zhí)行語句的細節(jié)和上下文環(huán)境,避免了記錄每一行的變化,在一些修改記錄較多的情況下,相比ROW類型能大大減少binlog的日志量,節(jié)約IO,提高性能。還可以用于實時的還原,同時主從版本可以不一樣,從服務器版本可以比主服務器版本高)
    • MIXED, 上述2種的混合使用
  • Binlog 管理

    • show master logs; 查看所有binlog的日志列表
    • show master status; 查看最后一個binlog日志編號名稱,以及最后一個事件技術的位置(position)
    • Flush logs; 刷新binlog,此刻開始產生一個新編號的binlog日志文件
    • reset master; 清空所有的binlog日志
  • Binlog 相關SQL show binlog events[in 'log_name'][from position][limit [offset,]row_count]

    [Spring cloud 一步步實現廣告系統] 15. 監(jiān)聽Binlog 增量索引準備

    [Spring cloud 一步步實現廣告系統] 15. 監(jiān)聽Binlog 增量索引準備

  • 常用的Binlog event

    • QUERY - 與數據無關的操作,begin、drop table、truncate table等等
    • TABLE_MAP - 記錄下一個操作所對應的表信息,存儲了數據庫名稱和表名稱
    • XID - 標記事務提交
    • WRITE_ROWS 插入數據,即insert操作
    • UPDATE_ROWS 更新數據,即update操作
    • DELETE_ROWS 刪除數據,即delete操作

Event包含header和data兩部分,header提供了event的創(chuàng)建時間,哪個服務器等信息,data部分提供的是針對該event的具體信息,如具體數據的修改。

Tip: binlog不會記錄數據表的列名

在接下來的實現中,我們會將自己的系統包裝成一個假的Mysql Slave,通過開源工具mysql-binlog-connector-java來實現監(jiān)聽binlog。

開源工具mysql-binlog-connector-java
  • 工具源碼:Github傳送門

  • 組件使用

    1.加依賴

    <!-- binlog 日志監(jiān)聽,解析開源工具類庫 -->
    <dependency>
      <groupId>com.github.shyiko</groupId>
      <artifactId>mysql-binlog-connector-java</artifactId>
      <version>0.18.1</version>
    </dependency>

    2.創(chuàng)建一個測試接口

    package com.sxzhongf.ad.service;
    
    import com.github.shyiko.mysql.binlog.BinaryLogClient;
    import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
    import com.github.shyiko.mysql.binlog.event.EventData;
    import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
    import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
    
    import java.io.IOException;
    
    /**
    * BinlogServiceTest for 測試Mysql binlog 監(jiān)控
    * {@code
    * Mysql8 連接提示 Client does not support authentication protocol requested by server; consider upgrading MySQL client 解決方法
    * USE mysql;
    * ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'password';
    * FLUSH PRIVILEGES;
    * }
    *
    * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
    */
    public class BinlogServiceTest {
    
      /**
       * --------Update-----------
       * UpdateRowsEventData{tableId=90, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5, 6, 7}, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
       *     {before=[11, 10, Test Bin Log, 1, Tue Jun 25 08:00:00 CST 2019, Tue Jun 25 08:00:00 CST 2019, Tue Jun 25 08:00:00 CST 2019, Tue Jun 25 08:00:00 CST 2019], after=[11, 10, zhangpan test Binlog, 1, Tue Jun 25 08:00:00 CST 2019, Tue Jun 25 08:00:00 CST 2019, Tue Jun 25 08:00:00 CST 2019, Tue Jun 25 08:00:00 CST 2019]}
       * ]}
       *
       * --------Insert-----------
       * WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
       *     [10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]
       * ]}
       */
    
      public static void main(String[] args) throws IOException {
    
    //        //構造BinaryLogClient,填充mysql鏈接信息
          BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306,
                  "root", "12345678"
          );
    
          //設置需要讀取的Binlog的文件以及位置,否則,client會從"頭"開始讀取Binlog并監(jiān)聽
    //        client.setBinlogFilename("binlog.000035");
    //        client.setBinlogPosition();
    
          //給客戶端注冊監(jiān)聽器,實現對Binlog的監(jiān)聽和解析
          //event 就是監(jiān)聽到的Binlog變化信息,event包含header & data 兩部分
          client.registerEventListener(event -> {
              EventData data = event.getData();
              if (data instanceof UpdateRowsEventData) {
                  System.out.println("--------Update-----------");
                  System.out.println(data.toString());
              } else if (data instanceof WriteRowsEventData) {
                  System.out.println("--------Insert-----------");
                  System.out.println(data.toString());
              } else if (data instanceof DeleteRowsEventData) {
                  System.out.println("--------Delete-----------");
                  System.out.println(data.toString());
              }
          });
    
          client.connect();
      }
    }

    運行:

    八月 08, 2019 9:13:32 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
    信息: Connected to 127.0.0.1:3306 at binlog.000038/951 (sid:65535, cid:336)
    ...

    執(zhí)行sql update ad_user set user_status=1 where user_id=10;

[Spring cloud 一步步實現廣告系統] 15. 監(jiān)聽Binlog 增量索引準備

我們需要知道的是,我們的目的是實現對Mysql數據表的變更實現監(jiān)聽,并解析成我們想要的格式,也就是我們的java對象。根據上面我們看到的監(jiān)聽結果,我們知道了返回信息的大概內容,既然我們已經學會了簡單的使用BinaryLogClient 來監(jiān)聽binlog,接下來,我們需要定義一個監(jiān)聽器,來實現我們自己的業(yè)務內容。

因為我們只需要Event中的內容,那么我們也就只需要通過實現com.github.shyiko.mysql.binlog.BinaryLogClient.EventListener接口,來自定義一個監(jiān)聽器實現我們的業(yè)務即可。通過Event的內容,來判定是否需要處理當前event以及如何處理。

構造解析binlog的模版文件

我們監(jiān)聽binlog來構造增量數據的根本原因,是為了將我們的廣告投放系統廣告檢索系統 業(yè)務解耦,由于我們的檢索系統中沒有定義數據庫以及數據表的相關,所以,我們通過定義一份模版文件,通過解析模版文件來得到我們需要的數據庫和表信息,因為binlog的監(jiān)聽是不區(qū)分是哪個數據庫和哪個數據表信息的,我們可以通過模版來指定我們想要監(jiān)聽的部分。

{
  "database": "advertisement",
  "tableList": [
    {
      "tableName": "ad_plan",
      "level": 2,
      "insert": [
        {
          "column": "plan_id"
        },
        {
          "column": "user_id"
        },
        {
          "column": "plan_status"
        },
        {
          "column": "start_date"
        },
        {
          "column": "end_date"
        }
      ],
      "update": [
        {
          "column": "plan_id"
        },
        {
          "column": "user_id"
        },
        {
          "column": "plan_status"
        },
        {
          "column": "start_date"
        },
        {
          "column": "end_date"
        }
      ],
      "delete": [
        {
          "column": "plan_id"
        }
      ]
    },
    {
      "tableName": "ad_unit",
      "level": 3,
      "insert": [
        {
          "column": "unit_id"
        },
        {
          "column": "unit_status"
        },
        {
          "column": "position_type"
        },
        {
          "column": "plan_id"
        }
      ],
      "update": [
        {
          "column": "unit_id"
        },
        {
          "column": "unit_status"
        },
        {
          "column": "position_type"
        },
        {
          "column": "plan_id"
        }
      ],
      "delete": [
        {
          "column": "unit_id"
        }
      ]
    },
    {
      "tableName": "ad_creative",
      "level": 2,
      "insert": [
        {
          "column": "creative_id"
        },
        {
          "column": "type"
        },
        {
          "column": "material_type"
        },
        {
          "column": "height"
        },
        {
          "column": "width"
        },
        {
          "column": "audit_status"
        },
        {
          "column": "url"
        }
      ],
      "update": [
        {
          "column": "creative_id"
        },
        {
          "column": "type"
        },
        {
          "column": "material_type"
        },
        {
          "column": "height"
        },
        {
          "column": "width"
        },
        {
          "column": "audit_status"
        },
        {
          "column": "url"
        }
      ],
      "delete": [
        {
          "column": "creative_id"
        }
      ]
    },
    {
      "tableName": "relationship_creative_unit",
      "level": 3,
      "insert": [
        {
          "column": "creative_id"
        },
        {
          "column": "unit_id"
        }
      ],
      "update": [
      ],
      "delete": [
        {
          "column": "creative_id"
        },
        {
          "column": "unit_id"
        }
      ]
    },
    {
      "tableName": "ad_unit_district",
      "level": 4,
      "insert": [
        {
          "column": "unit_id"
        },
        {
          "column": "province"
        },
        {
          "column": "city"
        }
      ],
      "update": [
      ],
      "delete": [
        {
          "column": "unit_id"
        },
        {
          "column": "province"
        },
        {
          "column": "city"
        }
      ]
    },
    {
      "tableName": "ad_unit_hobby",
      "level": 4,
      "insert": [
        {
          "column": "unit_id"
        },
        {
          "column": "hobby_tag"
        }
      ],
      "update": [
      ],
      "delete": [
        {
          "column": "unit_id"
        },
        {
          "column": "hobby_tag"
        }
      ]
    },
    {
      "tableName": "ad_unit_keyword",
      "level": 4,
      "insert": [
        {
          "column": "unit_id"
        },
        {
          "column": "keyword"
        }
      ],
      "update": [
      ],
      "delete": [
        {
          "column": "unit_id"
        },
        {
          "column": "keyword"
        }
      ]
    }
  ]
}

上面的模版文件中,指定了一個數據庫為advertisement,大家可以方便添加多個監(jiān)聽庫。在數據庫下面,我們監(jiān)聽了幾個表的CUD操作以及每個操作所需要的字段信息。

  • 實現模版 —> Java Entity

    • 定義模版文件對應的實體
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class BinlogTemplate {
        //單數據庫對應
      private String database;
        //多表
      private List<JsonTable> tableList;
    }
    • 對應的json 中 table信息
    /**
    * JsonTable for 用于表示template.json中對應的表信息
    *
    * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
    */
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class JsonTable {
      private String tableName;
      private Integer level;
    
      private List<Column> insert;
      private List<Column> update;
      private List<Column> delete;
    
      @Data
      @AllArgsConstructor
      @NoArgsConstructor
      public static class Column {
          private String columnName;
      }
    }
    • 讀取的對應表信息對象(最主要目的就是為了能將字段索引 映射到 字段名稱
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class TableTemplate {
      private String tableName;
      private String level;
    
        //操作類型 -> 多列
      private Map<OperationTypeEnum, List<String>> opTypeFieldSetMap = new HashMap<>();
    
      /**
       * Binlog日志中 字段索引 -> 字段名稱 的一個轉換映射
       * 因為binlog中不會顯示更新的列名是什么,它只會展示字段的索引,因此我們需要實現一次轉換
       */
      private Map<Integer, String> posMap = new HashMap<>();
    }
    • 解析模版文件到java對象
    @Data
    public class ParseCustomTemplate {
    
      private String database;
    
      /**
       * key -> TableName
       * value -> {@link TableTemplate}
       */
      private Map<String, TableTemplate> tableTemplateMap;
    
      public static ParseCustomTemplate parse(BinlogTemplate _template) {
          ParseCustomTemplate template = new ParseCustomTemplate();
          template.setDatabase(_template.getDatabase());
    
          for (JsonTable jsonTable : _template.getTableList()) {
              String name = jsonTable.getTableName();
              Integer level = jsonTable.getLevel();
    
              TableTemplate tableTemplate = new TableTemplate();
              tableTemplate.setTableName(name);
              tableTemplate.setLevel(level.toString());
              template.tableTemplateMap.put(name, tableTemplate);
    
              //遍歷操作類型對應的列信息
              Map<OperationTypeEnum, List<String>> operationTypeListMap = tableTemplate.getOpTypeFieldSetMap();
    
              for (JsonTable.Column column : jsonTable.getInsert()) {
                  getAndCreateIfNeed(
                          OperationTypeEnum.ADD,
                          operationTypeListMap,
                          ArrayList::new
                  ).add(column.getColumnName());
              }
    
              for (JsonTable.Column column : jsonTable.getUpdate()) {
                  getAndCreateIfNeed(
                          OperationTypeEnum.UPDATE,
                          operationTypeListMap,
                          ArrayList::new
                  ).add(column.getColumnName());
              }
    
              for (JsonTable.Column column : jsonTable.getDelete()) {
                  getAndCreateIfNeed(
                          OperationTypeEnum.DELETE,
                          operationTypeListMap,
                          ArrayList::new
                  ).add(column.getColumnName());
              }
          }
    
          return template;
      }
    
      /**
       * 從Map中獲取對象,如果不存在,創(chuàng)建一個
       */
      private static <T, R> R getAndCreateIfNeed(T key, Map<T, R> map, Supplier<R> factory) {
          return map.computeIfAbsent(key, k -> factory.get());
      }
    }
    • 解析 字段索引 -> 字段名稱 的一個轉換映射

    首先,我們來看一下binlog的具體日志信息:

    --------Insert-----------
    WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
    [10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]
    --------Update-----------
    UpdateRowsEventData{tableId=81, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[
      {before=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}
    

    可以看到,在日志中includedColumns只包含了{0, 1, 2, 3, 4, 5}位置信息,那么我們怎么能知道它具體代表的是哪個字段呢,接下來我們來實現這步映射關系,在實現之前,我們先來查詢一下數據庫中我們的表中字段所處的具體位置:

    sql> SELECT table_schema,table_name,column_name,ordinal_position FROM information_schema.COLUMNS
    WHERE TABLE_SCHEMA = 'advertisement' AND TABLE_NAME='ad_user'

    [Spring cloud 一步步實現廣告系統] 15. 監(jiān)聽Binlog 增量索引準備

    我們可以看到ordinal_position對應的是1-6,可是上面監(jiān)聽到的binlog日志索引是0-5,所以我們就可以看出來之間的對應關系。

    我們開始編碼實現,我們使用JdbcTemplate進行查詢數據庫信息:

    @Slf4j
    @Component
    public class TemplateHolder {
      private ParseCustomTemplate template;
    
      private final JdbcTemplate jdbcTemplate;
    
      private String SQL_SCHEMA = "SELECT TABLE_SCHEMA,TABLE_NAME,COLUMN_NAME,ORDINAL_POSITION FROM information_schema.COLUMNS " +
              "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?";
    
      @Autowired
      public TemplateHolder(JdbcTemplate jdbcTemplate) {
          this.jdbcTemplate = jdbcTemplate;
      }
    
      /**
       * 需要在容器加載的時候,就載入數據信息
       */
      @PostConstruct
      private void init() {
          loadJSON("template.json");
      }
    
      /**
       * 對外提供加載服務
       */
      public TableTemplate getTable(String tableName) {
          return template.getTableTemplateMap().get(tableName);
      }
    
      /**
       * 加載需要監(jiān)聽的binlog json文件
       */
      private void loadJSON(String path) {
          ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
          InputStream inputStream = classLoader.getResourceAsStream(path);
    
          try {
              BinlogTemplate binlogTemplate = JSON.parseObject(
                      inputStream,
                      Charset.defaultCharset(),
                      BinlogTemplate.class
              );
    
              this.template = ParseCustomTemplate.parse(binlogTemplate);
              loadMeta();
          } catch (IOException ex) {
              log.error((ex.getMessage()));
              throw new RuntimeException("fail to parse json file");
          }
      }
    
      /**
       * 加載元信息
       * 使用表索引到列名稱的映射關系
       */
      private void loadMeta() {
          for (Map.Entry<String, TableTemplate> entry : template.getTableTemplateMap().entrySet()) {
              TableTemplate table = entry.getValue();
    
              List<String> updateFields = table.getOpTypeFieldSetMap().get(
                      OperationTypeEnum.UPDATE
              );
              List<String> insertFields = table.getOpTypeFieldSetMap().get(
                      OperationTypeEnum.ADD
              );
              List<String> deleteFields = table.getOpTypeFieldSetMap().get(
                      OperationTypeEnum.DELETE
              );
    
              jdbcTemplate.query(SQL_SCHEMA, new Object[]{
                              template.getDatabase(), table.getTableName()
                      }, (rs, i) -> {
                          int pos = rs.getInt("ORDINAL_POSITION");
                          String colName = rs.getString("COLUMN_NAME");
    
                          if ((null != updateFields && updateFields.contains(colName))
                              || (null != insertFields && insertFields.contains(colName))
                              || (null != deleteFields && deleteFields.contains(colName))) {
                                       table.getPosMap().put(pos - 1, colName);
                          }
                          return null;
                      }
              );
          }
      }
    }
    • 監(jiān)聽binlog實現

    • 定義Event 解析所需要轉換的java對象
    @Data
    public class BinlogRowData {
    
        private TableTemplate tableTemplate;
    
        private EventType eventType;
    
        private List<Map<String, String>> before;
    
        private List<Map<String, String>> after;
    
    }
    • 定義binlog client BinaryLogClient
    /**
     * CustomBinlogClient for 自定義Binlog Client
     *
     * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
     * @since 2019/6/27
     */
    @Slf4j
    @Component
    public class CustomBinlogClient {
    
        private BinaryLogClient client;
    
        private final BinlogConfig config;
        private final AggregationListener listener;
    
        @Autowired
        public CustomBinlogClient(BinlogConfig config, AggregationListener listener) {
            this.config = config;
            this.listener = listener;
        }
    
        public void connect() {
            new Thread(() -> {
                client = new BinaryLogClient(
                        config.getHost(),
                        config.getPort(),
                        config.getUsername(),
                        config.getPassword()
                );
    
                if (!StringUtils.isEmpty(config.getBinlogName()) && !config.getPosition().equals(-1L)) {
                    client.setBinlogFilename(config.getBinlogName());
                    client.setBinlogPosition(config.getPosition());
                }
    
                try {
                    log.info("connecting to mysql start...");
                    client.connect();
                    log.info("connecting to mysql done!");
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    
        public void disconnect() {
            try {
                log.info("disconnect to mysql start...");
                client.disconnect();
                log.info("disconnect to mysql done!");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    • 使用client注冊事件監(jiān)聽器com.github.shyiko.mysql.binlog.BinaryLogClient.EventListener
    /**
     * Ilistener for 為了后續(xù)擴展不同的實現
     *
     * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
     */
    public interface Ilistener {
    
        void register();
    
        void onEvent(BinlogRowData eventData);
    }
    • 監(jiān)聽Binlog, 收集mysql binlog datas
    @Slf4j
    @Component
    public class AggregationListener implements BinaryLogClient.EventListener {
    
        private String dbName;
        private String tbName;
    
        private Map<String, Ilistener> listenerMap = new HashMap<>();
    
        @Autowired
        private TemplateHolder templateHolder;
    
        private String genKey(String dbName, String tbName) {
            return dbName + ":" + tbName;
        }
    
        /**
         * 根據表實現注冊信息
         */
        public void register(String dbName, String tbName, Ilistener listener) {
            log.info("register : {}-{}", dbName, tbName);
            this.listenerMap.put(genKey(dbName, tbName), listener);
        }
    
        @Override
        public void onEvent(Event event) {
    
            EventType type = event.getHeader().getEventType();
            log.info("Event type: {}", type);
    
            //數據庫增刪改之前,肯定有一個table_map event 的binlog
            if (type == EventType.TABLE_MAP) {
                TableMapEventData data = event.getData();
                this.tbName = data.getTable();
                this.dbName = data.getDatabase();
                return;
            }
    
            //EXT_UPDATE_ROWS 是Mysql 8以上的type
            if (type != EventType.EXT_UPDATE_ROWS
                    && type != EventType.EXT_WRITE_ROWS
                    && type != EventType.EXT_DELETE_ROWS
                    ) {
                return;
            }
    
            // 檢查表名和數據庫名是否已經正確填充
            if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tbName)) {
                log.error("Meta data got error. tablename:{},database:{}", tbName, dbName);
                return;
            }
    
            //找出對應數據表敏感的監(jiān)聽器
            String key = genKey(this.dbName, this.tbName);
            Ilistener ilistener = this.listenerMap.get(key);
            if (null == ilistener) {
                log.debug("skip {}", key);
            }
    
            log.info("trigger event:{}", type.name());
    
            try {
                BinlogRowData rowData = convertEventData2BinlogRowData(event.getData());
                if (null == rowData) {
                    return;
                }
                rowData.setEventType(type);
                ilistener.onEvent(rowData);
    
            } catch (Exception e) {
                e.printStackTrace();
                log.error(e.getMessage());
            } finally {
                this.dbName = "";
                this.tbName = "";
            }
        }
    
        /**
         * 解析Binlog數據到Java實體對象的映射
         *
         * @param data binlog
         * @return java 對象
         */
        private BinlogRowData convertEventData2BinlogRowData(EventData data) {
            TableTemplate tableTemplate = templateHolder.getTable(tbName);
            if (null == tableTemplate) {
                log.warn("table {} not found.", tbName);
                return null;
            }
    
            List<Map<String, String>> afterMapList = new ArrayList<>();
    
            for (Serializable[] after : getAfterValues(data)) {
                Map<String, String> afterMap = new HashMap<>();
    
                int columnLength = after.length;
                for (int i = 0; i < columnLength; ++i) {
                    //取出當前位置對應的列名
                    String colName = tableTemplate.getPosMap().get(i);
                    //如果沒有,則說明不需要該列
                    if (null == colName) {
                        log.debug("ignore position: {}", i);
                        continue;
                    }
    
                    String colValue = after[i].toString();
                    afterMap.put(colName, colValue);
                }
    
                afterMapList.add(afterMap);
            }
    
            BinlogRowData binlogRowData = new BinlogRowData();
            binlogRowData.setAfter(afterMapList);
            binlogRowData.setTableTemplate(tableTemplate);
    
            return binlogRowData;
        }
    
        /**
         * 獲取不同事件的變更后數據
         * Add & Delete變更前數據假定為空
         */
        private List<Serializable[]> getAfterValues(EventData eventData) {
    
            if (eventData instanceof WriteRowsEventData) {
                return ((WriteRowsEventData) eventData).getRows();
            }
    
            if (eventData instanceof UpdateRowsEventData) {
                return ((UpdateRowsEventData) eventData).getRows()
                                                        .stream()
                                                        .map(Map.Entry::getValue)
                                                        .collect(Collectors.toList()
                                                        );
            }
    
            if (eventData instanceof DeleteRowsEventData) {
                return ((DeleteRowsEventData) eventData).getRows();
            }
    
            return Collections.emptyList();
        }
    }
    • 解析binlog 數據對象BinlogRowData ,用于增量索引的后續(xù)處理
    /**
     * MysqlRowData for 簡化{@link BinlogRowData} 以方便實現增量索引的實現
     *
     * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
     */
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class MysqlRowData {
    
        //實現多數據的時候,需要傳遞數據庫名稱
        //private String database;
        private String tableName;
        private String level;
        private OperationTypeEnum operationTypeEnum;
        private List<Map<String, String>> fieldValueMap = new ArrayList<>();
    }

    因為我們需要將Binlog EventType轉換為我們的操作類型OperationTypeEnum,所以,我們在OperationTypeEnum中添加一個轉換方法:

    public enum OperationTypeEnum {
    ...
        public static OperationTypeEnum convert(EventType type) {
            switch (type) {
                case EXT_WRITE_ROWS:
                    return ADD;
                case EXT_UPDATE_ROWS:
                    return UPDATE;
                case EXT_DELETE_ROWS:
                    return DELETE;
                default:
                    return OTHER;
            }
        }
    }

    我們還需要定義一個表包含的各個列名稱的java類,方便我們后期對數據表的CUD操作:

    package com.sxzhongf.ad.mysql.constant;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * Constant for 各個列名稱的java類,方便我們后期對數據表的CUD操作
     *
     * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
     */
    public class Constant {
    
        private static final String DATABASE_NAME = "advertisement";
    
        public static class AD_PLAN_TABLE_INFO {
    
            public static final String TABLE_NAME = "ad_plan";
    
            public static final String COLUMN_PLAN_ID = "plan_id";
            public static final String COLUMN_USER_ID = "user_id";
            public static final String COLUMN_PLAN_STATUS = "plan_status";
            public static final String COLUMN_START_DATE = "start_date";
            public static final String COLUMN_END_DATE = "end_date";
        }
    
        public static class AD_CREATIVE_TABLE_INFO {
    
            public static final String TABLE_NAME = "ad_creative";
    
            public static final String COLUMN_CREATIVE_ID = "creative_id";
            public static final String COLUMN_TYPE = "type";
            public static final String COLUMN_MATERIAL_TYPE = "material_type";
            public static final String COLUMN_HEIGHT = "height";
            public static final String COLUMN_WIDTH = "width";
            public static final String COLUMN_AUDIT_STATUS = "audit_status";
            public static final String COLUMN_URL = "url";
        }
    
        public static class AD_UNIT_TABLE_INFO {
    
            public static final String TABLE_NAME = "ad_unit";
    
            public static final String COLUMN_UNIT_ID = "unit_id";
            public static final String COLUMN_UNIT_STATUS = "unit_status";
            public static final String COLUNN_POSITION_TYPE = "position_type";
            public static final String COLUNN_PLAN_ID = "plan_id";
        }
    
        public static class RELATIONSHIP_CREATIVE_UNIT_TABLE_INFO {
    
            public static final String TABLE_NAME = "relationship_creative_unit";
    
            public static final String COLUMN_CREATIVE_ID = "creative_id";
            public static final String COLUMN_UNIT_ID = "unit_id";
        }
    
        public static class AD_UNIT_DISTRICT_TABLE_INFO {
    
            public static final String TABLE_NAME = "ad_unit_district";
    
            public static final String COLUMN_UNIT_ID = "unit_id";
            public static final String COLUMN_PROVINCE = "province";
            public static final String COLUMN_CITY = "city";
        }
    
        public static class AD_UNIT_KEYWORD_TABLE_INFO {
    
            public static final String TABLE_NAME = "ad_unit_keyword";
    
            public static final String COLUMN_UNIT_ID = "unit_id";
            public static final String COLUMN_KEYWORD = "keyword";
        }
    
        public static class AD_UNIT_HOBBY_TABLE_INFO {
    
            public static final String TABLE_NAME = "ad_unit_hobby";
    
            public static final String COLUMN_UNIT_ID = "unit_id";
            public static final String COLUMN_HOBBY_TAG = "hobby_tag";
        }
    
        //key -> 表名
        //value -> 數據庫名
        public static Map<String, String> table2db;
    
        static {
            table2db = new HashMap<>();
            table2db.put(AD_PLAN_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
            table2db.put(AD_CREATIVE_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
            table2db.put(AD_UNIT_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
            table2db.put(RELATIONSHIP_CREATIVE_UNIT_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
            table2db.put(AD_UNIT_DISTRICT_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
            table2db.put(AD_UNIT_HOBBY_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
            table2db.put(AD_UNIT_KEYWORD_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
        }
    }
    
向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI