溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

java實(shí)現(xiàn)便捷的統(tǒng)計(jì)訂單收益的案例

發(fā)布時(shí)間:2020-10-23 14:40:57 來源:億速云 閱讀:491 作者:小新 欄目:編程語言

這篇文章主要介紹java實(shí)現(xiàn)便捷的統(tǒng)計(jì)訂單收益的案例,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!

引言

統(tǒng)計(jì)訂單收益是做電商類型的APP老生常談的問題.常規(guī)需求大致有用戶收益日?qǐng)?bào)/月報(bào)/年報(bào).這些報(bào)表型的數(shù)據(jù)對(duì)表設(shè)計(jì)和程序設(shè)計(jì)有著不小的挑戰(zhàn).常規(guī)的聚合查詢語句的查詢時(shí)間會(huì)隨著收益表數(shù)據(jù)日漸龐大而逐漸變長(zhǎng).這時(shí)候就需要思考如何設(shè)計(jì)收益表可以更高效的查詢?怎樣的設(shè)計(jì)才可以讓統(tǒng)計(jì)收益變得簡(jiǎn)單?

需求

效果圖

具體需求

  • 收益類型分為:自購(gòu)訂單收益,分享訂單收益,分銷收益,活動(dòng)收益
  • 統(tǒng)計(jì)當(dāng)日收益,當(dāng)月收益
  • 根據(jù)篩選的時(shí)間統(tǒng)計(jì)出時(shí)間段的收益.

思考

設(shè)計(jì)思路

訂單表是肯定需要的.在寫入或者修改訂單表的時(shí)候同步寫入修改收益表.只有自購(gòu)和分享訂單會(huì)記錄到訂單表中,分銷以及活動(dòng)贈(zèng)送收益只在特殊業(yè)務(wù)中寫入收益表.再以日為維度,創(chuàng)建一張用戶收益日?qǐng)?bào)表.單行記錄寫入用戶當(dāng)天收益情況.降低查詢用戶日/月/年收益統(tǒng)計(jì)時(shí)的數(shù)據(jù)量.以單用戶為例,通過拆分用戶一個(gè)月只會(huì)產(chǎn)生最多31條數(shù)據(jù).屬于可控增長(zhǎng)速度.如果沿用收益表,因?yàn)槭找姹淼臄?shù)據(jù)量跟用戶下單的數(shù)量一一對(duì)應(yīng),如果用戶下單量多那么表會(huì)非常龐大.在前期用戶量初見增長(zhǎng)時(shí),可用此方法規(guī)避大的數(shù)據(jù)量統(tǒng)計(jì),后期如果用戶量增大導(dǎo)致日?qǐng)?bào)表數(shù)據(jù)變多可以再考慮分表.

可見問題

  • 同步收益日?qǐng)?bào)表的時(shí)機(jī)問題,因?yàn)樵居唵蔚牟僮骶秃軓?fù)雜需要同步寫入收益和計(jì)算寫入收益日?qǐng)?bào)數(shù)據(jù),代碼耦合度太高.有沒有什么方法通過收益表異構(gòu)出收益日?qǐng)?bào)表呢?
  • 雖然收益被寫入到了日?qǐng)?bào)表中,但是要滿足效果圖要求的效果,可能需要多次查詢SQL語句,有沒有辦法在不影響程序效率的情況下盡量少些一些聚合SQL呢?

實(shí)現(xiàn)

總結(jié)出上面這些問題.我開始了資料收集.最終采用canal+RocketMQ做為異構(gòu)方案.

技術(shù)棧

簡(jiǎn)單介紹下這兩款技術(shù)框架:

  • canal:主要用途是基于 MySQL 數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)
  • RocketMQ:一款開源的分布式消息系統(tǒng),基于高可用分布式集群技術(shù),提供低延時(shí)的、高可靠的消息發(fā)布與訂閱服務(wù)。

注:我用的aliyun的全家桶,MQ和mysql都是阿里云的,如果是自建服務(wù)器的可能有區(qū)別,我在后面盡量標(biāo)出

方案流程

  1. 在寫入或修改收益表的同時(shí)通過canal監(jiān)控mysql收益表的binlog日志.
  2. canal檢測(cè)到變更,組裝變更的JSON報(bào)文,發(fā)送RocketMQ中事先定義好的TOPIC.
  3. 程序消費(fèi)該TOPIC,異構(gòu)收益日?qǐng)?bào)表.

canal配置部分

canal的安裝請(qǐng)參考官方文檔 解壓后可得到一個(gè)canal文件夾,包含三個(gè)目錄

  • bin:存放啟動(dòng)重啟腳本
  • conf:存放核心配置文件
  • lib:存放核心jar包

我們需要重點(diǎn)關(guān)注conf文件夾里的conf/canal.properties核心配置文件以及conf/example/instance.properties單個(gè)監(jiān)控節(jié)點(diǎn)配置文件

conf/canal.properties
# tcp, kafka, RocketMQ,這里默認(rèn)是tcp讀取模式,采用RocketMQ需要將其改變?yōu)镽ocketMQ模式
canal.serverMode = RocketMQ
# 如果是aliyun的RocketMQ需要配置以下兩個(gè)KEY,ak/sk
canal.aliyun.accessKey =xxxxxxx
canal.aliyun.secretKey =xxxxxxx
# 監(jiān)控的節(jié)點(diǎn)名稱.這個(gè)默認(rèn)就是example如果有多節(jié)點(diǎn)可以逗號(hào)隔開,如下方的例子
canal.destinations = example,sign
# 如果是aliyun的RocketMQ需要修改canal.mq.accessChannel為cloud默認(rèn)為local
canal.mq.accessChannel = cloud
#MQ的地址,需要注意這里是不帶http://,但是需要帶端口號(hào)
canal.mq.servers = 
#rocketmq實(shí)例id
canal.mq.namespace =
conf/example/instance.properties
#mysql地址
canal.instance.master.address=
#以下兩個(gè)參數(shù)需要在開啟數(shù)據(jù)庫(kù)binlog日志后得到,在數(shù)據(jù)庫(kù)查詢界面輸入查詢語句`show master status`,canal.instance.master.journal.name對(duì)應(yīng)File參數(shù),canal.instance.master.position對(duì)應(yīng)Position參數(shù)
canal.instance.master.journal.name=
canal.instance.master.position=
#數(shù)據(jù)庫(kù)的賬號(hào)密碼
canal.instance.dbUsername=
canal.instance.dbPassword=
#需要監(jiān)控變動(dòng)的表
canal.instance.filter.regex=xxx.t_user_order,xxx.t_user_cash_out
#定義發(fā)送的mq生產(chǎn)組
canal.mq.producerGroup = 
#定義發(fā)送到mq的指定主題
canal.mq.topic=

注:監(jiān)控表的書寫規(guī)則格式參照監(jiān)控表書寫規(guī)則

啟動(dòng)
cd /canal/bin
./start.sh

這時(shí)候會(huì)發(fā)現(xiàn)canal目錄中多了一個(gè)log文件,進(jìn)入可以看到canal主日志文件和example節(jié)點(diǎn)啟動(dòng)日志.

canal日志中出現(xiàn)
 the canal server is running now ......
example日志中出現(xiàn)
 init table filter : ^tablename
 xxxxxxxxx , the next step is binlog dump

表示你已經(jīng)成功了一大步,canal監(jiān)控已正常運(yùn)行.

RocketMQ部分

如果用的aliyun的RocketMQ,配置代碼部分直接可參考文檔 自建的RocketMQ也可參照簡(jiǎn)單的消費(fèi)例子監(jiān)控對(duì)應(yīng)的TOPIC即可 消費(fèi)Canal發(fā)來的數(shù)據(jù),格式如下:

{
    "data":[
        {
            //單個(gè)修改后表數(shù)據(jù),如果同一時(shí)間有多個(gè)表變動(dòng)會(huì)有多個(gè)該JSON對(duì)象        }
    ],
    "database":"監(jiān)控的表所在數(shù)據(jù)庫(kù)",
    "es":表變動(dòng)時(shí)間,
    "id":canal生成的id,
    "isDdl":Boolean類型,表示是否DDL語句,
    "mysqlType":{
        表結(jié)構(gòu)
    },
    "old":如果是修改類型會(huì)填充修改前的值,
    "pkNames":[
        該表的主鍵,如"id"
    ],
    "sql":"執(zhí)行的SQL",
    "sqlType":{
        字段對(duì)應(yīng)的sqlType,一般使用mysqlType即可
    },
    "table":"監(jiān)控的表名",
    "ts":canal記錄發(fā)送時(shí)間,
    "type":"表的修改類型,入INSERT,UPDATE,DELETE"
}

MQ消費(fèi)代碼主要用了反射,映射到對(duì)應(yīng)的表

//這里的body就是Canal發(fā)來的數(shù)據(jù)
public Action process(String body) {
        boolean result = Boolean.FALSE;
        JSONObject data = JSONObject.parseObject(body);
        log.info("數(shù)據(jù)庫(kù)操作日志記錄:data:{}",data.toString());
        Class c = null;
        try {
            //這里監(jiān)控了訂單和收益表分別做訂單統(tǒng)計(jì)和收益日?qǐng)?bào)統(tǒng)計(jì)
            c = Class.forName(getClassName(data.getString("table")));
        } catch (ClassNotFoundException e) {
            log.error("error {}",e);
        }
        if (null != c) {
            JSONArray dataArray = data.getJSONArray("data");
            if (dataArray != null) {
                //把獲取到的data部分轉(zhuǎn)換為反射后的實(shí)體集合
                List list = dataArray.toJavaList(c);
                if (CollUtil.isNotEmpty(list)) {
                    //對(duì)修改和寫入操作分別進(jìn)行邏輯操作
                    String type = data.getString("type");
                    if ("UPDATE".equals(type)) {
                        result = uppHistory(list);
                    } else if ("INSERT".equals(type)) {
                        result = saveHistory(list);
                    }
                }
            }
        }
        return result ? Action.CommitMessage : Action.ReconsumeLater;
    }
    
    /**
     * @description: 獲取反射ClassName
     * @author: chenyunxuan
     */
    private String getClassName(String tableName) {
        StringBuilder sb = new StringBuilder();
        //判斷是哪張表的數(shù)據(jù)
        if (tableName.equals("t_user_income_detail")) {
            sb.append("cn.mc.core.model.order");
        } else if (tableName.equals("t_user_cash_out")) {
            sb.append("cn.mc.sync.model");
        }
        String className = StrUtil.toCamelCase(tableName).substring(1);
        return sb.append(".").append(className).toString();
    }
    
    /**
     * @description: 寫入對(duì)應(yīng)類型的統(tǒng)計(jì)表
     * @author: chenyunxuan
     */
    private <T> Boolean saveHistory(List<T> orderList) {
        boolean result = Boolean.FALSE;
        Object dataType = orderList.get(0);
        //用instanceof判斷類型進(jìn)入不同的邏輯處理代碼
        if (dataType instanceof TUserIncomeDetail) {
            result = userOrderHistoryService.saveIncomeDaily(orderList);
        } else if (dataType instanceof UserCashOut) {
            result = userCashOutHistoryService.delSaveHistoryList(orderList);
        }
        return result;
    }

saveIncomeDaily偽代碼

  public synchronized Boolean saveIncomeDaily(List orderList) {
    //循環(huán)收益明細(xì)記錄
    .......
    //通過創(chuàng)建時(shí)間和用戶id查詢收益日?qǐng)?bào)表中是否有當(dāng)日數(shù)據(jù)
    if(不存在當(dāng)日數(shù)據(jù)){
        //創(chuàng)建當(dāng)日的收益日?qǐng)?bào)表記錄
        .....
    }
    //因?yàn)椴淮嬖诋?dāng)日記錄也會(huì)立即寫入當(dāng)日的空數(shù)據(jù),所以下面的流程都是走更新流程
    //更新當(dāng)日數(shù)據(jù)
    .......
    return Boolean.TRUE;
    }

注:代碼中應(yīng)該多打一些日志,方便產(chǎn)生異常收益數(shù)據(jù)后的校對(duì)

后記

至此一個(gè)基于canal+RocketMQ的收益日?qǐng)?bào)統(tǒng)計(jì)異構(gòu)方案就完成了,下一篇會(huì)圍繞本文提到的第二個(gè)問題減少聚合SQL的產(chǎn)生展開.敬請(qǐng)關(guān)注.

以上是java實(shí)現(xiàn)便捷的統(tǒng)計(jì)訂單收益的案例的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI