您好,登錄后才能下訂單哦!
小編給大家分享一下Spring Boot + RabbitMQ如何實(shí)現(xiàn)分布式事務(wù),希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
一:分布式事務(wù)解決方案
1.兩階段提交(2PC)
第一階段:事務(wù)協(xié)調(diào)器要求每個(gè)涉及到事務(wù)的數(shù)據(jù)庫預(yù)提交(precommit)此操作,并反映是否可以提交.
第二階段:事務(wù)協(xié)調(diào)器要求每個(gè)數(shù)據(jù)庫提交數(shù)據(jù)。
案例可參照http://blog.itpub.net/28624388/viewspace-2137095/
2.補(bǔ)償事務(wù)(TCC)
TCC 其實(shí)就是采用的補(bǔ)償機(jī)制,其核心思想是:針對(duì)每個(gè)操作,都要注冊(cè)一個(gè)與其對(duì)應(yīng)的確認(rèn)和補(bǔ)償(撤銷)操作。它分為三個(gè)階段:
Try 階段主要是對(duì)業(yè)務(wù)系統(tǒng)做檢測(cè)及資源預(yù)留
Confirm 階段主要是對(duì)業(yè)務(wù)系統(tǒng)做確認(rèn)提交,Try階段執(zhí)行成功并開始執(zhí)行 Confirm階段時(shí),默認(rèn) Confirm階段是不會(huì)出錯(cuò)的。即:只要Try成功,Confirm一定成功。
Cancel 階段主要是在業(yè)務(wù)執(zhí)行錯(cuò)誤,需要回滾的狀態(tài)下執(zhí)行的業(yè)務(wù)取消,預(yù)留資源釋放。
3.本地消息表(異步確保)
本地消息表這種實(shí)現(xiàn)方式應(yīng)該是業(yè)界使用最多的,其核心思想是將分布式事務(wù)拆分成本地事務(wù)進(jìn)行處理。
基本思路:
a.消息生產(chǎn)方,需要額外建一個(gè)消息表,并記錄消息發(fā)送狀態(tài)。消息表和業(yè)務(wù)數(shù)據(jù)要在一個(gè)事務(wù)里提交,也就是說他們要在一個(gè)數(shù)據(jù)庫里面。然后消息會(huì)經(jīng)過MQ發(fā)送到消息的消費(fèi)方。如果消息發(fā)送失敗,會(huì)進(jìn)行重試發(fā)送。
b.消息消費(fèi)方,需要處理這個(gè)消息,并完成自己的業(yè)務(wù)邏輯。此時(shí)如果本地事務(wù)處理成功,表明已經(jīng)處理成功了,如果處理失敗,那么就會(huì)重試執(zhí)行。如果是業(yè)務(wù)上面的失敗,可以給生產(chǎn)方發(fā)送一個(gè)業(yè)務(wù)補(bǔ)償消息,通知生產(chǎn)方進(jìn)行回滾等操作。
c.生產(chǎn)方和消費(fèi)方定時(shí)掃描本地消息表,把還沒處理完成的消息或者失敗的消息再發(fā)送一遍。如果有靠譜的自動(dòng)對(duì)賬補(bǔ)賬邏輯,這種方案還是非常實(shí)用的。
二:Spring Boot + RabbitMQ分布式事務(wù)實(shí)現(xiàn)
1.pom.xml依賴配置
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.application.yaml rabbitmq配置
# RabbitMQ rabbitmq: host: 112.74.105.178 port: 5672 username: admin password: admin virtual-host: / publisher-confirms: true publisher-returns: true listener: simple: acknowledge-mode: manual
3.RabbitMQConfig.java
@Configuration public class RabbitMQConfig { // 下單并且派單存隊(duì)列 public static final String ORDER_DIC_QUEUE = "order_dis_queue"; // 補(bǔ)單隊(duì)列,判斷訂單是否已經(jīng)被創(chuàng)建 public static final String ORDER_CREATE_QUEUE = "order_create_queue"; // 下單并且派單交換機(jī) private static final String ORDER_EXCHANGE_NAME = "order_exchange_name"; @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } @Bean public Queue OrderDicQueue() { return new Queue(ORDER_DIC_QUEUE); } @Bean public Queue OrderCreateQueue() { return new Queue(ORDER_CREATE_QUEUE); } @Bean DirectExchange directOrderExchange() { return new DirectExchange(ORDER_EXCHANGE_NAME); } @Bean Binding bindingExchangeOrderDicQueue() { return BindingBuilder.bind(OrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey"); } @Bean Binding bindingExchangeOrderCreateQueue() { return BindingBuilder.bind(OrderCreateQueue()).to(directOrderExchange()).with("orderRoutingKey"); } }
4. 消息生產(chǎn)者
public class MsgPushInfoServiceImpl extends ServiceImpl<MsgPushInfoMapper, MsgPushInfoEntity> implements MsgPushInfoService, RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; public void orderAndDsipatch() { try { String orderId = "123456"; JSONObject jsonObect = new JSONObject(); jsonObect.put("orderId", orderId); String msg = jsonObect.toString(); System.out.println("msg:" + msg); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); messageProperties.setMessageId(orderId); Message message = new Message(msg.getBytes(),messageProperties); CorrelationData correlationData = new CorrelationData(orderId); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(this); rabbitTemplate.convertAndSend("order_exchange_name", "orderRoutingKey", message, correlationData); } catch (Exception e) { e.printStackTrace(); } } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String orderId = correlationData.getId(); System.out.println("消息id:" + orderId); if (ack) { // 消息發(fā)送成功 System.out.println("消息發(fā)送確認(rèn)成功"); } else { // 重試機(jī)制 System.out.println("消息發(fā)送確認(rèn)失敗:" + cause); } } }
5.消息消費(fèi)者
@Component public class DispatchReceiver { @RabbitHandler @RabbitListener(queues = "order_dis_queue", containerFactory = "rabbitListenerContainerFactory") public void process(Message message, Channel channel) { System.out.println("rev : " + message.getMessageProperties().getMessageId()); try { System.out.println("======basicNack====="+message.getMessageProperties().getDeliveryTag()); //業(yè)務(wù)處理成功,則刪除消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); //業(yè)務(wù)處理失敗,則發(fā)送補(bǔ)償消息 } catch (Exception e) { e.printStackTrace(); } } }
看完了這篇文章,相信你對(duì)“Spring Boot + RabbitMQ如何實(shí)現(xiàn)分布式事務(wù)”有了一定的了解,如果想了解更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。