溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Spring Boot + RabbitMQ如何實(shí)現(xiàn)分布式事務(wù)

發(fā)布時(shí)間:2021-12-24 09:43:23 來源:億速云 閱讀:763 作者:小新 欄目:安全技術(shù)

小編給大家分享一下Spring Boot + RabbitMQ如何實(shí)現(xiàn)分布式事務(wù),希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!

一:分布式事務(wù)解決方案

1.兩階段提交(2PC)

第一階段:事務(wù)協(xié)調(diào)器要求每個(gè)涉及到事務(wù)的數(shù)據(jù)庫預(yù)提交(precommit)此操作,并反映是否可以提交.

第二階段:事務(wù)協(xié)調(diào)器要求每個(gè)數(shù)據(jù)庫提交數(shù)據(jù)。

Spring Boot + RabbitMQ如何實(shí)現(xiàn)分布式事務(wù)

案例可參照http://blog.itpub.net/28624388/viewspace-2137095/

2.補(bǔ)償事務(wù)(TCC)

TCC 其實(shí)就是采用的補(bǔ)償機(jī)制,其核心思想是:針對(duì)每個(gè)操作,都要注冊(cè)一個(gè)與其對(duì)應(yīng)的確認(rèn)和補(bǔ)償(撤銷)操作。它分為三個(gè)階段:

Try 階段主要是對(duì)業(yè)務(wù)系統(tǒng)做檢測(cè)及資源預(yù)留

Confirm 階段主要是對(duì)業(yè)務(wù)系統(tǒng)做確認(rèn)提交,Try階段執(zhí)行成功并開始執(zhí)行 Confirm階段時(shí),默認(rèn) Confirm階段是不會(huì)出錯(cuò)的。即:只要Try成功,Confirm一定成功。

Cancel 階段主要是在業(yè)務(wù)執(zhí)行錯(cuò)誤,需要回滾的狀態(tài)下執(zhí)行的業(yè)務(wù)取消,預(yù)留資源釋放。

3.本地消息表(異步確保)

本地消息表這種實(shí)現(xiàn)方式應(yīng)該是業(yè)界使用最多的,其核心思想是將分布式事務(wù)拆分成本地事務(wù)進(jìn)行處理。

Spring Boot + RabbitMQ如何實(shí)現(xiàn)分布式事務(wù)

基本思路:

a.消息生產(chǎn)方,需要額外建一個(gè)消息表,并記錄消息發(fā)送狀態(tài)。消息表和業(yè)務(wù)數(shù)據(jù)要在一個(gè)事務(wù)里提交,也就是說他們要在一個(gè)數(shù)據(jù)庫里面。然后消息會(huì)經(jīng)過MQ發(fā)送到消息的消費(fèi)方。如果消息發(fā)送失敗,會(huì)進(jìn)行重試發(fā)送。

b.消息消費(fèi)方,需要處理這個(gè)消息,并完成自己的業(yè)務(wù)邏輯。此時(shí)如果本地事務(wù)處理成功,表明已經(jīng)處理成功了,如果處理失敗,那么就會(huì)重試執(zhí)行。如果是業(yè)務(wù)上面的失敗,可以給生產(chǎn)方發(fā)送一個(gè)業(yè)務(wù)補(bǔ)償消息,通知生產(chǎn)方進(jìn)行回滾等操作。

c.生產(chǎn)方和消費(fèi)方定時(shí)掃描本地消息表,把還沒處理完成的消息或者失敗的消息再發(fā)送一遍。如果有靠譜的自動(dòng)對(duì)賬補(bǔ)賬邏輯,這種方案還是非常實(shí)用的。

二:Spring Boot + RabbitMQ分布式事務(wù)實(shí)現(xiàn)

1.pom.xml依賴配置

<dependency>

			<groupId>org.springframework.boot</groupId>

			<artifactId>spring-boot-starter-amqp</artifactId>

		</dependency>

 2.application.yaml  rabbitmq配置

# RabbitMQ        

  rabbitmq:

    host: 112.74.105.178

    port: 5672

    username: admin

    password: admin

    virtual-host: /

    publisher-confirms: true

    publisher-returns: true

    listener:

      simple:

        acknowledge-mode: manual

3.RabbitMQConfig.java

@Configuration
public class RabbitMQConfig {
// 下單并且派單存隊(duì)列

	public static final String ORDER_DIC_QUEUE = "order_dis_queue";

	// 補(bǔ)單隊(duì)列,判斷訂單是否已經(jīng)被創(chuàng)建

	public static final String ORDER_CREATE_QUEUE = "order_create_queue";

	// 下單并且派單交換機(jī)

	private static final String ORDER_EXCHANGE_NAME = "order_exchange_name";


	@Bean

	public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {

		RabbitTemplate template = new RabbitTemplate(connectionFactory);

		template.setMessageConverter(new Jackson2JsonMessageConverter());

		return template;

	}


	@Bean

	public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {

		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

		factory.setConnectionFactory(connectionFactory);

		factory.setMessageConverter(new Jackson2JsonMessageConverter());

		return factory;

	}

	@Bean

	public Queue OrderDicQueue() {

		return new Queue(ORDER_DIC_QUEUE);

	}


	@Bean

	public Queue OrderCreateQueue() {

		return new Queue(ORDER_CREATE_QUEUE);

	}


	@Bean

	DirectExchange directOrderExchange() {

		return new DirectExchange(ORDER_EXCHANGE_NAME);

	}


	@Bean

	Binding bindingExchangeOrderDicQueue() {

		return BindingBuilder.bind(OrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");

	}


	@Bean

	Binding bindingExchangeOrderCreateQueue() {

		return BindingBuilder.bind(OrderCreateQueue()).to(directOrderExchange()).with("orderRoutingKey");

	}

}

4. 消息生產(chǎn)者

public class MsgPushInfoServiceImpl extends ServiceImpl<MsgPushInfoMapper, MsgPushInfoEntity>

		implements MsgPushInfoService, RabbitTemplate.ConfirmCallback {


	@Autowired

	private RabbitTemplate rabbitTemplate;


	public void orderAndDsipatch() {

		try {

			String orderId = "123456";

			JSONObject jsonObect = new JSONObject();

			jsonObect.put("orderId", orderId);

			String msg = jsonObect.toString();

			System.out.println("msg:" + msg);


			MessageProperties messageProperties = new MessageProperties();

	        messageProperties.setContentType("application/json");

	        messageProperties.setMessageId(orderId);

	        Message message = new Message(msg.getBytes(),messageProperties);

	        


			CorrelationData correlationData = new CorrelationData(orderId);

			rabbitTemplate.setMandatory(true);

			rabbitTemplate.setConfirmCallback(this);

			rabbitTemplate.convertAndSend("order_exchange_name", "orderRoutingKey", message, correlationData);


		} catch (Exception e) {

			e.printStackTrace();

		}


	}


	@Override

	public void confirm(CorrelationData correlationData, boolean ack, String cause) {

		String orderId = correlationData.getId();

		System.out.println("消息id:" + orderId);

		if (ack) { // 消息發(fā)送成功

			System.out.println("消息發(fā)送確認(rèn)成功");

		} else {

			// 重試機(jī)制

			System.out.println("消息發(fā)送確認(rèn)失敗:" + cause);

		}

	}

}

5.消息消費(fèi)者

@Component
public class DispatchReceiver {


	@RabbitHandler

	@RabbitListener(queues = "order_dis_queue", containerFactory = "rabbitListenerContainerFactory")

	public void process(Message message, Channel channel) {

		System.out.println("rev : " + message.getMessageProperties().getMessageId());

		try {

			System.out.println("======basicNack====="+message.getMessageProperties().getDeliveryTag());

			//業(yè)務(wù)處理成功,則刪除消息

			channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

			//業(yè)務(wù)處理失敗,則發(fā)送補(bǔ)償消息

		} catch (Exception e) {

			e.printStackTrace();

		}

	}
}

看完了這篇文章,相信你對(duì)“Spring Boot + RabbitMQ如何實(shí)現(xiàn)分布式事務(wù)”有了一定的了解,如果想了解更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI