您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“RocketMQ如何解決分布式事務(wù)”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“RocketMQ如何解決分布式事務(wù)”吧!
RocketMQ解決分布式事務(wù)(可靠消息最終一致性方案)
1、A系統(tǒng)發(fā)送一個(gè)prepared消息到MQ,如果這個(gè)prepared消息發(fā)送失敗那么就直接取消操作別執(zhí)行了。
2、如果這個(gè)消息發(fā)送成功了、就接著執(zhí)行本地事務(wù)(executeLocalTransaction),如果成功就告訴MQ發(fā)送確認(rèn)消息,如果失敗,就告訴MQ發(fā)送回滾消息。
3、如果發(fā)送了確認(rèn)消息、那么B系統(tǒng)會(huì)接收到確認(rèn)消息,然后執(zhí)行本地事務(wù)。
4、上面的第2步, 由于網(wǎng)絡(luò)原因發(fā)送確認(rèn)or回滾消息失敗,但是broker有輪詢機(jī)制,根據(jù)唯一id查詢本地事務(wù)狀態(tài),MQ會(huì)自動(dòng)定時(shí)輪詢所有prepared消息回調(diào)你的接口(checkLocalTransaction),問你,這個(gè)消息是不是本地事務(wù)處理失敗了,所有沒有發(fā)送確認(rèn)的消息,是繼續(xù)重試還是回滾?一版來說這里你就可以查下數(shù)據(jù)庫看之前本地事務(wù)是否執(zhí)行,如果回滾了,那么這里也回滾吧。這個(gè)就是避免可能本地事務(wù)執(zhí)行成功了,而確認(rèn)消息卻發(fā)送失敗了。
PS:此方案是不支持事務(wù)發(fā)起服務(wù)進(jìn)行回滾的,但是大部分互聯(lián)網(wǎng)應(yīng)用都不會(huì)要求事務(wù)發(fā)起方進(jìn)行回滾,如果一定要事務(wù)發(fā)起方進(jìn)行回滾應(yīng)該采用2PC、3PC、TCC等強(qiáng)一致性方案來實(shí)現(xiàn)分布式事務(wù),比如LCN。
這里通過一個(gè)實(shí)例來講一下RocketMQ實(shí)現(xiàn)分布式事務(wù)具體編碼。
場(chǎng)景: 下單場(chǎng)景,訂單服務(wù)生成訂單,當(dāng)該訂單支付成功之后,修改訂單狀態(tài)已支付,并且要通知庫存服務(wù)進(jìn)行庫存的扣減。
CREATE TABLE `yzy_order` ( `id` int(11) NOT NULL, `order_id` varchar(100) NOT NULL DEFAULT '' COMMENT '訂單id', `buy_num` int(11) DEFAULT NULL COMMENT '購買數(shù)量', `good_id` int(11) DEFAULT NULL COMMENT '商品ID', `user_id` int(11) DEFAULT NULL COMMENT '用戶ID', `pay_status` int(11) DEFAULT NULL COMMENT '支付狀態(tài),0:沒有支付,1:已經(jīng)支付', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci CREATE TABLE `yzy_repo` ( `id` int(11) NOT NULL AUTO_INCREMENT, `good_name` varchar(100) NOT NULL DEFAULT '' COMMENT '商品名稱', `num` int(11) NOT NULL DEFAULT '0' COMMENT '庫存數(shù)量', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='測(cè)試,庫存表表'
package com.transaction.order; import com.alibaba.dubbo.config.annotation.Reference; import com.transaction.repository.IRepositoryService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import java.util.List; @Service public class OrderService { @Autowired OrderDao orderDao; public final int PAY_DONE = 1; /** * 檢查訂單是否存在并且狀態(tài)是支付完成 **/ public boolean checkOrderPaySuccess(String orderId){ List<YzyOrder> allOrders = orderDao.findAll(); return allOrders.stream() .anyMatch(order -> order.getOrderId().equals(orderId) && order.getPayStatus() == PAY_DONE); } /** * 更新訂單是為支付完成 **/ public void updatePayStatusByOrderId(String orderId){ orderDao.updatePayStatusByOrderId(orderId, PAY_DONE); } /** * 生成訂單,狀態(tài)默認(rèn)是未支付 **/ public void save(String orderId, int num, int goodId,int userId) { YzyOrder yzyOrder = new YzyOrder(); yzyOrder.setOrderId(orderId); yzyOrder.setBuyNum(num); yzyOrder.setGoodId(goodId); yzyOrder.setUserId(userId); orderDao.save(yzyOrder); } }
在終端或者瀏覽器 執(zhí)行 curl '127.0.0.1:8081/order/save?num=2&good_id=1&user_id=1001'
/** * 生成訂單接口 * @param num * @param goodId * @param userId * @return */ @GetMapping("save") public String makeOrder( @RequestParam("num") int num, @RequestParam("good_id") int goodId, @RequestParam("user_id") int userId) { orderService.save(UUID.randomUUID().toString(), num, goodId,userId); return "success"; }
OrderController:pay 發(fā)送訂單支付成功的MQ事務(wù)消息,這里注意體會(huì),并不是直接調(diào)用OrderService::updatePayStatusByOrderId 然后發(fā)送普通的MQ消息。而是先發(fā)送事務(wù)消息到MQ,然后MQ回調(diào)訂單服務(wù)的TransactionListener::executeLocalTransaction,在這里完成訂單狀態(tài)的更新,保證發(fā)送事務(wù)消息和更新訂單狀態(tài)的一致性.
@GetMapping("pay") public String pay(@RequestParam("order_id") String orderId) throws UnsupportedEncodingException, MQClientException, JsonProcessingException { transactionProducer.sendOrderPaySucessEvent(orderId); return "success"; }
@Component public class TransactionProducer implements InitializingBean { private TransactionMQProducer producer; @Autowired private OrderService orderService; @Autowired private OrderDao orderDao; @Override public void afterPropertiesSet() throws Exception { producer = new TransactionMQProducer("order-pay-group"); producer.setNamesrvAddr("mq01.stag.kk.srv:9876;mq02.stag.kk.srv:9876"); ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("transaction-thread-name-%s").build(); ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), threadFactory); producer.setExecutorService(executor); //設(shè)置發(fā)送消息的回調(diào) producer.setTransactionListener(new TransactionListener() { /** * 根據(jù)消息發(fā)送的結(jié)果 判斷是否執(zhí)行本地事務(wù) * * 回調(diào)該方法的時(shí)候說明 消息已經(jīng)成功發(fā)送到了MQ,可以把訂單狀態(tài)更新為 "支付成功" */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 根據(jù)本地事務(wù)執(zhí)行成與否判斷 事務(wù)消息是否需要commit與 rollback ObjectMapper objectMapper = new ObjectMapper(); LocalTransactionState state = LocalTransactionState.UNKNOW; try { OrderRecord record = objectMapper.readValue(msg.getBody(), OrderRecord.class); //MQ已經(jīng)收到了TransactionProducer send方法發(fā)送的事務(wù)消息,下面執(zhí)行本地的事務(wù) //本地記錄訂單信息 orderService.updatePayStatusByOrderId(record.getOrderId()); state = LocalTransactionState.COMMIT_MESSAGE; } catch (UnsupportedEncodingException e) { e.printStackTrace(); state = LocalTransactionState.ROLLBACK_MESSAGE; } catch (IOException e) { e.printStackTrace(); state = LocalTransactionState.ROLLBACK_MESSAGE; } return state; } /** * RocketMQ 回調(diào) 根據(jù)本地事務(wù)是否執(zhí)行成功 告訴broker 此消息是否投遞成功 * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { ObjectMapper objectMapper = new ObjectMapper(); LocalTransactionState state = LocalTransactionState.UNKNOW; OrderRecord record = null; try { record = objectMapper.readValue(msg.getBody(), OrderRecord.class); } catch (IOException e) { e.printStackTrace(); } try { //根據(jù)是否有transaction_id對(duì)應(yīng)轉(zhuǎn)賬記錄 來判斷事務(wù)是否執(zhí)行成功 boolean isLocalSuccess = orderService.checkOrderPaySuccess(record.getOrderId()); if (isLocalSuccess) { state = LocalTransactionState.COMMIT_MESSAGE; } else { state = LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { e.printStackTrace(); } return state; } }); producer.start(); } public void sendOrderPaySucessEvent(String orderId) throws JsonProcessingException, UnsupportedEncodingException, MQClientException { ObjectMapper objectMapper = new ObjectMapper(); YzyOrder order = orderDao.findAll().stream() .filter(item->item.getOrderId().equals(orderId)) .collect(Collectors.toList()).get(0); if(order == null){ System.out.println("not found order " + orderId); } // 構(gòu)造發(fā)送的事務(wù) 消息 OrderRecord record = new OrderRecord(); record.setUserId(order.getUserId()); record.setOrderId(orderId); record.setBuyNum(order.getBuyNum()); record.setPayStatus(order.getPayStatus()); record.setGoodId(order.getGoodId()); Message message = new Message("Order-Success", "", record.getOrderId(), objectMapper.writeValueAsString(record).getBytes(RemotingHelper.DEFAULT_CHARSET)); TransactionSendResult result = producer.sendMessageInTransaction(message, null); System.out.println("發(fā)送事務(wù)消息 ,orderId = " + record.getOrderId() + " " + result.toString()); } }
需要注意的問題:
1. 扣減庫存要防止在并發(fā)的情況下被扣成負(fù)數(shù)
2. 先select后update的方式更新庫存要加分布式鎖或者數(shù)據(jù)庫樂觀鎖,update語句需要是冪等的
UPDATE t_yue SET money=$new_money WHERE id=$good_id AND money=$old_money;
3. 注意通過msgId或者orderId來進(jìn)行消費(fèi)冪等處理
@Override public int reduce(Integer buyNum, Integer goodId) { //并發(fā)的情況下,為了防止庫存被扣成負(fù)數(shù),有三種解決方案 //1. select for update (必須放到事務(wù)中) //2. 這段邏輯加上分布式鎖 //3. 數(shù)據(jù)庫加上一個(gè)version字段,樂觀鎖 while (true){ Optional<YzyRepo> repoOption = repositoryDao.findById(goodId); if (!repoOption.isPresent()) { return 0; } YzyRepo repo = repoOption.get(); //避免數(shù)據(jù)庫庫存扣減小于零 if (repo.getNum() - buyNum < 0) { return -1; } repo.setNum(repo.getNum() - buyNum); int affect = repositoryDao.updateGoodNum(repo.getNum() - buyNum, repo.getNum(), goodId); if(affect > 0){ return affect; } } }
到此,相信大家對(duì)“RocketMQ如何解決分布式事務(wù)”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。