溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

什么是增量索引實(shí)現(xiàn)以及投送數(shù)據(jù)到MQ kafka

發(fā)布時(shí)間:2021-10-21 09:53:40 來源:億速云 閱讀:88 作者:柒染 欄目:大數(shù)據(jù)

今天就跟大家聊聊有關(guān)什么是增量索引實(shí)現(xiàn)以及投送數(shù)據(jù)到MQ kafka,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

實(shí)現(xiàn)增量數(shù)據(jù)索引

我們將根據(jù)binlog 的數(shù)據(jù)對(duì)象,來實(shí)現(xiàn)增量數(shù)據(jù)的處理,我們構(gòu)建廣告的增量數(shù)據(jù),其實(shí)說白了就是為了在后期能把廣告投放到索引服務(wù),實(shí)現(xiàn)增量數(shù)據(jù)到增量索引的生成。

  • 定義一個(gè)投遞增量數(shù)據(jù)的接口(接收參數(shù)為我們上一節(jié)定義的binlog日志的轉(zhuǎn)換對(duì)象)

/**
 * ISender for 投遞增量數(shù)據(jù) 方法定義接口
 *
 * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
 */
public interface ISender {

    void sender(MysqlRowData rowData);
}
  • 創(chuàng)建增量索引監(jiān)聽器

/**
 * IncrementListener for 增量數(shù)據(jù)實(shí)現(xiàn)監(jiān)聽
 *
 * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
 * @since 2019/6/27
 */
@Slf4j
@Component
public class IncrementListener implements Ilistener {

    private final AggregationListener aggregationListener;

    @Autowired
    public IncrementListener(AggregationListener aggregationListener) {
        this.aggregationListener = aggregationListener;
    }

    //根據(jù)名稱選擇要注入的投遞方式
    @Resource(name = "indexSender")
    private ISender sender;

    /**
     * 標(biāo)注為 {@link PostConstruct},
     * 即表示在服務(wù)啟動(dòng),Bean完成初始化之后,立刻初始化
     */
    @Override
    @PostConstruct
    public void register() {
        log.info("IncrementListener register db and table info.");
        Constant.table2db.forEach((tb, db) -&gt; aggregationListener.register(db, tb, this));
    }

    @Override
    public void onEvent(BinlogRowData eventData) {
        TableTemplate table = eventData.getTableTemplate();
        EventType eventType = eventData.getEventType();

        //包裝成最后需要投遞的數(shù)據(jù)
        MysqlRowData rowData = new MysqlRowData();
        rowData.setTableName(table.getTableName());
        rowData.setLevel(eventData.getTableTemplate().getLevel());
        //將EventType轉(zhuǎn)為OperationTypeEnum
        OperationTypeEnum operationType = OperationTypeEnum.convert(eventType);
        rowData.setOperationTypeEnum(operationType);

        //獲取模版中該操作對(duì)應(yīng)的字段列表
        List<string> fieldList = table.getOpTypeFieldSetMap().get(operationType);
        if (null == fieldList) {
            log.warn("{} not support for {}.", operationType, table.getTableName());
            return;
        }

        for (Map<string, string> afterMap : eventData.getAfter()) {
            Map<string, string> _afterMap = new HashMap&lt;&gt;();
            for (Map.Entry<string, string> entry : afterMap.entrySet()) {
                String colName = entry.getKey();
                String colValue = entry.getValue();

                _afterMap.put(colName, colValue);
            }

            rowData.getFieldValueMap().add(_afterMap);
        }
        sender.sender(rowData);
    }
}

開啟binlog監(jiān)聽

  • 首先來配置監(jiān)聽binlog的數(shù)據(jù)庫(kù)連接信息

adconf:
  mysql:
    host: 127.0.0.1
    port: 3306
    username: root
    password: 12345678
    binlogName: ""
    position: -1 # 從當(dāng)前位置開始監(jiān)聽

編寫配置類:

/**
 * BinlogConfig for 定義監(jiān)聽Binlog的配置信息
 *
 * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
 */
@Component
@ConfigurationProperties(prefix = "adconf.mysql")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BinlogConfig {
    private String host;
    private Integer port;
    private String username;
    private String password;
    private String binlogName;
    private Long position;
}

在我們實(shí)現(xiàn) 監(jiān)聽binlog那節(jié),我們實(shí)現(xiàn)了一個(gè)自定義client CustomBinlogClient,需要實(shí)現(xiàn)binlog的監(jiān)聽,這個(gè)監(jiān)聽的客戶端就必須是一個(gè)獨(dú)立運(yùn)行的線程,并且要在程序啟動(dòng)的時(shí)候進(jìn)行監(jiān)聽,我們來實(shí)現(xiàn)運(yùn)行當(dāng)前client的方式,這里我們會(huì)使用到一個(gè)新的Runnerorg.springframework.boot.CommandLineRunner,let's code.

@Slf4j
@Component
public class BinlogRunner implements CommandLineRunner {

    @Autowired
    private CustomBinlogClient binlogClient;

    @Override
    public void run(String... args) throws Exception {
        log.info("BinlogRunner is running...");
        binlogClient.connect();
    }
}
增量數(shù)據(jù)投遞

在binlog監(jiān)聽的過程中,我們看到針對(duì)于int, String 這類數(shù)據(jù)字段,mysql的記錄是沒有問題的,但是針對(duì)于時(shí)間類型,它被格式化成了字符串類型:Fri Jun 21 15:07:53 CST 2019。

--------Insert-----------
WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
[10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]
--------Update-----------
UpdateRowsEventData{tableId=81, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[
    {before=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}

對(duì)于這個(gè)時(shí)間格式,我們需要關(guān)注2點(diǎn)信息:

  • CST,這個(gè)時(shí)間格式會(huì)比我們的時(shí)間+ 8h(中國(guó)標(biāo)準(zhǔn)時(shí)間 China Standard Time UT+8:00)

  • 需要對(duì)這個(gè)日期進(jìn)行解釋處理

當(dāng)然,我們也可以通過設(shè)置mysql的日期格式來改變?cè)撔袨?,在此,我們通過編碼來解析該時(shí)間格式:

  /**
   * Thu Jun 27 08:00:00 CST 2019
   */
  public static Date parseBinlogString2Date(String dateString) {
      try {
          DateFormat dateFormat = new SimpleDateFormat(
                  "EEE MMM dd HH:mm:ss zzz yyyy",
                  Locale.US
          );
          return DateUtils.addHours(dateFormat.parse(dateString), -8);

      } catch (ParseException ex) {
          log.error("parseString2Date error:{}", dateString);
          return null;
      }
  }

因?yàn)槲覀冊(cè)诙x索引的時(shí)候,是根據(jù)表之間的層級(jí)關(guān)系(Level)來設(shè)定的,根據(jù)代碼規(guī)范,不允許出現(xiàn)Magic Number, 因此我們定義一個(gè)數(shù)據(jù)層級(jí)枚舉,來表達(dá)數(shù)據(jù)層級(jí)。

/**
 * AdDataLevel for 廣告數(shù)據(jù)層級(jí)
 *
 * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
 */
@Getter
public enum AdDataLevel {

    LEVEL2("2", "level 2"),
    LEVEL3("3", "level 3"),
    LEVEL4("4", "level 4");

    private String level;
    private String desc;

    AdDataLevel(String level, String desc) {
        this.level = level;
        this.desc = desc;
    }
}

實(shí)現(xiàn)數(shù)據(jù)投遞

因?yàn)樵隽繑?shù)據(jù)可以投遞到不同的位置以及用途,我們之前實(shí)現(xiàn)了一個(gè)投遞接口com.sxzhongf.ad.sender.ISender,接下來我們實(shí)現(xiàn)一個(gè)投遞類:

@Slf4j
@Component("indexSender")
public class IndexSender implements ISender {

    /**
     * 根據(jù)廣告級(jí)別,投遞Binlog數(shù)據(jù)
     */
    @Override
    public void sender(MysqlRowData rowData) {
        if (AdDataLevel.LEVEL2.getLevel().equals(rowData.getLevel())) {
            Level2RowData(rowData);
        } else if (AdDataLevel.LEVEL3.getLevel().equals(rowData.getLevel())) {
            Level3RowData(rowData);
        } else if (AdDataLevel.LEVEL4.getLevel().equals(rowData.getLevel())) {
            Level4RowData(rowData);
        } else {
            log.error("Binlog MysqlRowData error: {}", JSON.toJSONString(rowData));
        }
    }

    private void Level2RowData(MysqlRowData rowData) {

        if (rowData.getTableName().equals(Constant.AD_PLAN_TABLE_INFO.TABLE_NAME)) {
            List<adplantable> planTables = new ArrayList&lt;&gt;();

            for (Map<string, string> fieldValueMap : rowData.getFieldValueMap()) {
                AdPlanTable planTable = new AdPlanTable();
                //Map的第二種循環(huán)方式
                fieldValueMap.forEach((k, v) -&gt; {
                    switch (k) {
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_ID:
                            planTable.setPlanId(Long.valueOf(v));
                            break;
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_USER_ID:
                            planTable.setUserId(Long.valueOf(v));
                            break;
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_STATUS:
                            planTable.setPlanStatus(Integer.valueOf(v));
                            break;
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_START_DATE:
                            planTable.setStartDate(CommonUtils.parseBinlogString2Date(v));
                            break;
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_END_DATE:
                            planTable.setEndDate(CommonUtils.parseBinlogString2Date(v));
                            break;
                    }
                });
                planTables.add(planTable);
            }

            //投遞推廣計(jì)劃
            planTables.forEach(p -&gt; AdLevelDataHandler.handleLevel2Index(p, rowData.getOperationTypeEnum()));
        } else if (rowData.getTableName().equals(Constant.AD_CREATIVE_TABLE_INFO.TABLE_NAME)) {
            List<adcreativetable> creativeTables = new LinkedList&lt;&gt;();

            rowData.getFieldValueMap().forEach(afterMap -&gt; {
                AdCreativeTable creativeTable = new AdCreativeTable();
                afterMap.forEach((k, v) -&gt; {
                    switch (k) {
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_CREATIVE_ID:
                            creativeTable.setAdId(Long.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_TYPE:
                            creativeTable.setType(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_MATERIAL_TYPE:
                            creativeTable.setMaterialType(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_HEIGHT:
                            creativeTable.setHeight(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_WIDTH:
                            creativeTable.setWidth(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_AUDIT_STATUS:
                            creativeTable.setAuditStatus(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_URL:
                            creativeTable.setAdUrl(v);
                            break;
                    }
                });
                creativeTables.add(creativeTable);
            });

            //投遞廣告創(chuàng)意
            creativeTables.forEach(c -&gt; AdLevelDataHandler.handleLevel2Index(c, rowData.getOperationTypeEnum()));
        }
    }

    private void Level3RowData(MysqlRowData rowData) {
       ...
    }

    /**
     * 處理4級(jí)廣告
     */
    private void Level4RowData(MysqlRowData rowData) {
        ...
    }
}

投放增量數(shù)據(jù)到MQ(kafka)

為了我們的數(shù)據(jù)投放更加靈活,方便數(shù)據(jù)統(tǒng)計(jì),分析等系統(tǒng)的需求,我們來實(shí)現(xiàn)一個(gè)投放到消息中的接口,其他服務(wù)可以訂閱當(dāng)前MQ 的TOPIC來實(shí)現(xiàn)數(shù)據(jù)訂閱。

配置文件中配置TOPIC
adconf:
  kafka:
    topic: ad-search-mysql-data

--------------------------------------
/**
 * KafkaSender for 投遞Binlog增量數(shù)據(jù)到kafka消息隊(duì)列
 *
 * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
 * @since 2019/7/1
 */
@Component(value = "kafkaSender")
public class KafkaSender implements ISender {

    @Value("${adconf.kafka.topic}")
    private String topic;

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 發(fā)送數(shù)據(jù)到kafka隊(duì)列
     */
    @Override
    public void sender(MysqlRowData rowData) {
        kafkaTemplate.send(
                topic, JSON.toJSONString(rowData)
        );
    }

    /**
     * 測(cè)試消費(fèi)kafka消息
     */
    @KafkaListener(topics = {"ad-search-mysql-data"}, groupId = "ad-search")
    public void processMysqlRowData(ConsumerRecord<!--?, ?--> record) {
        Optional<!--?--> kafkaMsg = Optional.ofNullable(record.value());
        if (kafkaMsg.isPresent()) {
            Object message = kafkaMsg.get();
            MysqlRowData rowData = JSON.parseObject(
                    message.toString(),
                    MysqlRowData.class
            );
            System.out.println("kafka process MysqlRowData: " + JSON.toJSONString(rowData));
            //sender.sender();
        }

    }
}
```</adcreativetable></string,></adplantable></string,></string,></string,></string>

看完上述內(nèi)容,你們對(duì)什么是增量索引實(shí)現(xiàn)以及投送數(shù)據(jù)到MQ kafka有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI