溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

RocketMQ如何解決分布式事務(wù)

發(fā)布時(shí)間:2021-06-22 14:53:17 來源:億速云 閱讀:175 作者:chen 欄目:大數(shù)據(jù)

本篇內(nèi)容主要講解“RocketMQ如何解決分布式事務(wù)”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“RocketMQ如何解決分布式事務(wù)”吧!

一致性如何保證:

RocketMQ解決分布式事務(wù)(可靠消息最終一致性方案)

1、A系統(tǒng)發(fā)送一個(gè)prepared消息到MQ,如果這個(gè)prepared消息發(fā)送失敗那么就直接取消操作別執(zhí)行了。

2、如果這個(gè)消息發(fā)送成功了、就接著執(zhí)行本地事務(wù)(executeLocalTransaction),如果成功就告訴MQ發(fā)送確認(rèn)消息,如果失敗,就告訴MQ發(fā)送回滾消息。

3、如果發(fā)送了確認(rèn)消息、那么B系統(tǒng)會(huì)接收到確認(rèn)消息,然后執(zhí)行本地事務(wù)。

4、上面的第2步, 由于網(wǎng)絡(luò)原因發(fā)送確認(rèn)or回滾消息失敗,但是broker有輪詢機(jī)制,根據(jù)唯一id查詢本地事務(wù)狀態(tài),MQ會(huì)自動(dòng)定時(shí)輪詢所有prepared消息回調(diào)你的接口(checkLocalTransaction),問你,這個(gè)消息是不是本地事務(wù)處理失敗了,所有沒有發(fā)送確認(rèn)的消息,是繼續(xù)重試還是回滾?一版來說這里你就可以查下數(shù)據(jù)庫看之前本地事務(wù)是否執(zhí)行,如果回滾了,那么這里也回滾吧。這個(gè)就是避免可能本地事務(wù)執(zhí)行成功了,而確認(rèn)消息卻發(fā)送失敗了。

PS:此方案是不支持事務(wù)發(fā)起服務(wù)進(jìn)行回滾的,但是大部分互聯(lián)網(wǎng)應(yīng)用都不會(huì)要求事務(wù)發(fā)起方進(jìn)行回滾,如果一定要事務(wù)發(fā)起方進(jìn)行回滾應(yīng)該采用2PC、3PC、TCC等強(qiáng)一致性方案來實(shí)現(xiàn)分布式事務(wù),比如LCN。

訂單-庫存-分布式事務(wù)

這里通過一個(gè)實(shí)例來講一下RocketMQ實(shí)現(xiàn)分布式事務(wù)具體編碼。

場(chǎng)景: 下單場(chǎng)景,訂單服務(wù)生成訂單,當(dāng)訂單支付成功之后,修改訂單狀態(tài)已支付,并且要通知庫存服務(wù)進(jìn)行庫存的扣減。

數(shù)據(jù)庫設(shè)計(jì):

CREATE TABLE `yzy_order` (
  `id` int(11) NOT NULL,
  `order_id` varchar(100) NOT NULL DEFAULT '' COMMENT '訂單id',
  `buy_num` int(11) DEFAULT NULL COMMENT '購買數(shù)量',
  `good_id` int(11) DEFAULT NULL COMMENT '商品ID',
  `user_id` int(11) DEFAULT NULL COMMENT '用戶ID',
  `pay_status` int(11) DEFAULT NULL COMMENT '支付狀態(tài),0:沒有支付,1:已經(jīng)支付',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci




CREATE TABLE `yzy_repo` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `good_name` varchar(100) NOT NULL DEFAULT '' COMMENT '商品名稱',
  `num` int(11) NOT NULL DEFAULT '0' COMMENT '庫存數(shù)量',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='測(cè)試,庫存表表'


開始實(shí)戰(zhàn)

訂單服務(wù)service的主要方法

package com.transaction.order;

import com.alibaba.dubbo.config.annotation.Reference;
import com.transaction.repository.IRepositoryService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.List;

@Service
public class OrderService {
    @Autowired
    OrderDao orderDao;

    public final int PAY_DONE = 1;

    /**
     *  檢查訂單是否存在并且狀態(tài)是支付完成
    **/
    public boolean checkOrderPaySuccess(String orderId){
        List<YzyOrder> allOrders = orderDao.findAll();
        return  allOrders.stream()
                .anyMatch(order -> order.getOrderId().equals(orderId) && order.getPayStatus() == PAY_DONE);
    }

 /**
     *  更新訂單是為支付完成
    **/
    public void updatePayStatusByOrderId(String orderId){
        orderDao.updatePayStatusByOrderId(orderId, PAY_DONE);
    }

 /**
     *  生成訂單,狀態(tài)默認(rèn)是未支付
    **/

    public void save(String orderId, int num, int goodId,int userId) {

        YzyOrder yzyOrder = new YzyOrder();
        yzyOrder.setOrderId(orderId);
        yzyOrder.setBuyNum(num);
        yzyOrder.setGoodId(goodId);
        yzyOrder.setUserId(userId);

        orderDao.save(yzyOrder);
    }
}

業(yè)務(wù)流程

1.在訂單表創(chuàng)建一個(gè)狀態(tài)是未支付的訂單

 在終端或者瀏覽器 執(zhí)行  curl '127.0.0.1:8081/order/save?num=2&good_id=1&user_id=1001' 

 /**
     * 生成訂單接口
     * @param num
     * @param goodId
     * @param userId
     * @return
     */
    @GetMapping("save")
    public String makeOrder(
            @RequestParam("num") int num,
            @RequestParam("good_id") int goodId,
            @RequestParam("user_id") int userId) {

        orderService.save(UUID.randomUUID().toString(), num, goodId,userId);
        return "success";
    }

2.用戶支付完成,通過MQ通知庫存服務(wù)扣減庫存

OrderController:pay 發(fā)送訂單支付成功的MQ事務(wù)消息,這里注意體會(huì),并不是直接調(diào)用OrderService::updatePayStatusByOrderId 然后發(fā)送普通的MQ消息。而是先發(fā)送事務(wù)消息到MQ,然后MQ回調(diào)訂單服務(wù)的TransactionListener::executeLocalTransaction,在這里完成訂單狀態(tài)的更新,保證發(fā)送事務(wù)消息和更新訂單狀態(tài)的一致性.

  @GetMapping("pay")
    public String pay(@RequestParam("order_id") String orderId)
            throws UnsupportedEncodingException, MQClientException, JsonProcessingException {
        transactionProducer.sendOrderPaySucessEvent(orderId);
        return "success";
    }

3.訂單服務(wù)端的事務(wù)消息監(jiān)聽器

@Component
public class TransactionProducer implements InitializingBean {

    private TransactionMQProducer producer;

    @Autowired
    private OrderService orderService;

    @Autowired
    private OrderDao orderDao;

    @Override
    public void afterPropertiesSet() throws Exception {
        producer = new TransactionMQProducer("order-pay-group");
        producer.setNamesrvAddr("mq01.stag.kk.srv:9876;mq02.stag.kk.srv:9876");
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("transaction-thread-name-%s").build();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), threadFactory);
        producer.setExecutorService(executor);
        //設(shè)置發(fā)送消息的回調(diào)
        producer.setTransactionListener(new TransactionListener() {
            /**
             * 根據(jù)消息發(fā)送的結(jié)果 判斷是否執(zhí)行本地事務(wù)
             *
             * 回調(diào)該方法的時(shí)候說明 消息已經(jīng)成功發(fā)送到了MQ,可以把訂單狀態(tài)更新為 "支付成功"
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 根據(jù)本地事務(wù)執(zhí)行成與否判斷 事務(wù)消息是否需要commit與 rollback
                ObjectMapper objectMapper = new ObjectMapper();
                LocalTransactionState state = LocalTransactionState.UNKNOW;
                try {
                    OrderRecord record = objectMapper.readValue(msg.getBody(), OrderRecord.class);

                    //MQ已經(jīng)收到了TransactionProducer send方法發(fā)送的事務(wù)消息,下面執(zhí)行本地的事務(wù)
                    //本地記錄訂單信息
                    orderService.updatePayStatusByOrderId(record.getOrderId());

                    state = LocalTransactionState.COMMIT_MESSAGE;
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    state = LocalTransactionState.ROLLBACK_MESSAGE;
                } catch (IOException e) {
                    e.printStackTrace();
                    state = LocalTransactionState.ROLLBACK_MESSAGE;
                }
                return state;
            }
            /**
             * RocketMQ 回調(diào) 根據(jù)本地事務(wù)是否執(zhí)行成功 告訴broker 此消息是否投遞成功
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                ObjectMapper objectMapper = new ObjectMapper();
                LocalTransactionState state = LocalTransactionState.UNKNOW;
                OrderRecord record = null;
                try {
                    record = objectMapper.readValue(msg.getBody(), OrderRecord.class);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                try {
                    //根據(jù)是否有transaction_id對(duì)應(yīng)轉(zhuǎn)賬記錄 來判斷事務(wù)是否執(zhí)行成功
                    boolean isLocalSuccess = orderService.checkOrderPaySuccess(record.getOrderId());

                    if (isLocalSuccess) {
                        state = LocalTransactionState.COMMIT_MESSAGE;
                    } else {
                        state = LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return state;
            }
        });
        producer.start();
    }

    public void sendOrderPaySucessEvent(String orderId) throws JsonProcessingException, UnsupportedEncodingException, MQClientException {
        ObjectMapper objectMapper = new ObjectMapper();
        YzyOrder order = orderDao.findAll().stream()
                .filter(item->item.getOrderId().equals(orderId))
                .collect(Collectors.toList()).get(0);
        if(order == null){
            System.out.println("not found order " + orderId);
        }
        // 構(gòu)造發(fā)送的事務(wù) 消息
        OrderRecord record = new OrderRecord();
        record.setUserId(order.getUserId());
        record.setOrderId(orderId);
        record.setBuyNum(order.getBuyNum());
        record.setPayStatus(order.getPayStatus());
        record.setGoodId(order.getGoodId());

        Message message = new Message("Order-Success", "", record.getOrderId(),
                objectMapper.writeValueAsString(record).getBytes(RemotingHelper.DEFAULT_CHARSET));

        TransactionSendResult result = producer.sendMessageInTransaction(message, null);
        System.out.println("發(fā)送事務(wù)消息 ,orderId = " + record.getOrderId() + " " + result.toString());
    }
}

4.庫存服務(wù)扣減庫存

需要注意的問題:

1.  扣減庫存要防止在并發(fā)的情況下被扣成負(fù)數(shù)

2. 先select后update的方式更新庫存要加分布式鎖或者數(shù)據(jù)庫樂觀鎖,update語句需要是冪等的

   UPDATE t_yue SET money=$new_money WHERE id=$good_id AND money=$old_money;

3. 注意通過msgId或者orderId來進(jìn)行消費(fèi)冪等處理

 @Override
    public int reduce(Integer buyNum, Integer goodId) {

        //并發(fā)的情況下,為了防止庫存被扣成負(fù)數(shù),有三種解決方案
        //1. select for update (必須放到事務(wù)中)
        //2. 這段邏輯加上分布式鎖
        //3. 數(shù)據(jù)庫加上一個(gè)version字段,樂觀鎖

        while (true){
            Optional<YzyRepo> repoOption = repositoryDao.findById(goodId);
            if (!repoOption.isPresent()) {
                return 0;
            }

            YzyRepo repo = repoOption.get();

            //避免數(shù)據(jù)庫庫存扣減小于零
            if (repo.getNum() - buyNum < 0) {
                return -1;
            }
            repo.setNum(repo.getNum() - buyNum);

            int affect = repositoryDao.updateGoodNum(repo.getNum() - buyNum, repo.getNum(), goodId);
            if(affect > 0){
                return affect;
            }
        }
    }

到此,相信大家對(duì)“RocketMQ如何解決分布式事務(wù)”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI