溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RabbitMQ中怎么保證消息的可靠投遞

發(fā)布時(shí)間:2021-08-10 16:27:11 來源:億速云 閱讀:112 作者:Leah 欄目:web開發(fā)

今天就跟大家聊聊有關(guān)RabbitMQ中怎么保證消息的可靠投遞,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

RabbitMQ整合Spring Boot,我們只需要增加對應(yīng)的starter即可

<dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

基于注解

在application.yaml的配置如下

spring:   rabbitmq:     host: myhost     port: 5672     username: guest     password: guest     virtual-host: /  log:   exchange: log.exchange   info:     queue: info.log.queue     binding-key: info.log.key   error:     queue: error.log.queue     binding-key: error.log.key   all:     queue: all.log.queue     binding-key: '*.log.key'

消費(fèi)者代碼如下

@Slf4j @Component public class LogReceiverListener {      /**      * 接收info級別的日志      */     @RabbitListener(             bindings = @QueueBinding(                     value = @Queue(value = "${log.info.queue}", durable = "true"),                     exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC),                     key = "${log.info.binding-key}"             )     )     public void infoLog(Message message) {         String msg = new String(message.getBody());         log.info("infoLogQueue 收到的消息為: {}", msg);     }      /**      * 接收所有的日志      */     @RabbitListener(             bindings = @QueueBinding(                     value = @Queue(value = "${log.all.queue}", durable = "true"),                     exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC),                     key = "${log.all.binding-key}"             )     )     public void allLog(Message message) {         String msg = new String(message.getBody());         log.info("allLogQueue 收到的消息為: {}", msg);     } }

生產(chǎn)者如下

@RunWith(SpringRunner.class) @SpringBootTest public class MsgProducerTest {      @Autowired     private AmqpTemplate amqpTemplate;     @Value("${log.exchange}")     private String exchange;     @Value("${log.info.binding-key}")     private String routingKey;      @SneakyThrows     @Test     public void sendMsg() {         for (int i = 0; i < 5; i++) {             String message = "this is info message " + i;             amqpTemplate.convertAndSend(exchange, routingKey, message);         }          System.in.read();     } }

Spring Boot針對消息ack的方式和原生api針對消息ack的方式有點(diǎn)不同

原生api消息ack的方式

消息的確認(rèn)方式有2種

自動確認(rèn)(autoAck=true)

手動確認(rèn)(autoAck=false)

消費(fèi)者在消費(fèi)消息的時(shí)候,可以指定autoAck參數(shù)

String basicConsume(String queue, boolean autoAck, Consumer callback)

autoAck=false: RabbitMQ會等待消費(fèi)者顯示回復(fù)確認(rèn)消息后才從內(nèi)存(或者磁盤)中移出消息

autoAck=true: RabbitMQ會自動把發(fā)送出去的消息置為確認(rèn),然后從內(nèi)存(或者磁盤)中刪除,而不管消費(fèi)者是否真正的消費(fèi)了這些消息

手動確認(rèn)的方法如下,有2個(gè)參數(shù)

basicAck(long deliveryTag, boolean multiple)

deliveryTag: 用來標(biāo)識信道中投遞的消息。RabbitMQ  推送消息給Consumer時(shí),會附帶一個(gè)deliveryTag,以便Consumer可以在消息確認(rèn)時(shí)告訴RabbitMQ到底是哪條消息被確認(rèn)了。

RabbitMQ保證在每個(gè)信道中,每條消息的deliveryTag從1開始遞增

multiple=true: 消息id<=deliveryTag的消息,都會被確認(rèn)

myltiple=false: 消息id=deliveryTag的消息,都會被確認(rèn)

消息一直不確認(rèn)會發(fā)生啥?

如果隊(duì)列中的消息發(fā)送到消費(fèi)者后,消費(fèi)者不對消息進(jìn)行確認(rèn),那么消息會一直留在隊(duì)列中,直到確認(rèn)才會刪除。

如果發(fā)送到A消費(fèi)者的消息一直不確認(rèn),只有等到A消費(fèi)者與rabbitmq的連接中斷,rabbitmq才會考慮將A消費(fèi)者未確認(rèn)的消息重新投遞給另一個(gè)消費(fèi)者

Spring Boot中針對消息ack的方式

有三種方式,定義在AcknowledgeMode枚舉類中

方式解釋
NONE沒有ack,等價(jià)于原生api中的autoAck=true
MANUAL用戶需要手動發(fā)送ack或者nack
AUTO方法正常結(jié)束,spring boot 框架返回ack,發(fā)生異常spring boot框架返回nack

spring boot針對消息默認(rèn)的ack的方式為AUTO。

在實(shí)際場景中,我們一般都是手動ack。

application.yaml的配置改為如下

spring:   rabbitmq:     host: myhost     port: 5672     username: guest     password: guest     virtual-host: /     listener:       simple:         acknowledge-mode: manual # 手動ack,默認(rèn)為auto

相應(yīng)的消費(fèi)者代碼改為

@Slf4j @Component public class LogListenerManual {      /**      * 接收info級別的日志      */     @RabbitListener(             bindings = @QueueBinding(                     value = @Queue(value = "${log.info.queue}", durable = "true"),                     exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC),                     key = "${log.info.binding-key}"             )     )     public void infoLog(Message message, Channel channel) throws Exception {         String msg = new String(message.getBody());         log.info("infoLogQueue 收到的消息為: {}", msg);         try {             // 這里寫各種業(yè)務(wù)邏輯             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);         } catch (Exception e) {             channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);         }     } }

我們上面用到的注解,作用如下

注解作用
RabbitListener消費(fèi)消息,可以定義在類上,方法上,當(dāng)定義在類上時(shí)需要和RabbitHandler配合使用
QueueBinding定義綁定關(guān)系
Queue定義隊(duì)列
Exchange定義交換機(jī)
RabbitHandlerRabbitListener定義在類上時(shí),需要用RabbitHandler指定處理的方法

基于JavaConfig

既然用注解這么方便,為啥還需要JavaConfig的方式呢?

JavaConfig方便自定義各種屬性,比如同時(shí)配置多個(gè)virtual host等

具體代碼看GitHub把

RabbitMQ如何保證消息的可靠投遞

一個(gè)消息往往會經(jīng)歷如下幾個(gè)階段

RabbitMQ中怎么保證消息的可靠投遞

在這里插入圖片描述

所以要保證消息的可靠投遞,只需要保證這3個(gè)階段的可靠投遞即可

生產(chǎn)階段

這個(gè)階段的可靠投遞主要靠ConfirmListener(發(fā)布者確認(rèn))和ReturnListener(失敗通知)

前面已經(jīng)介紹過了,一條消息在RabbitMQ中的流轉(zhuǎn)過程為

producer -> rabbitmq broker cluster -> exchange -> queue ->  consumer

ConfirmListener可以獲取消息是否從producer發(fā)送到broker

ReturnListener可以獲取從exchange路由不到queue的消息

我用Spring Boot Starter 的api來演示一下效果

application.yaml

spring:   rabbitmq:     host: myhost     port: 5672     username: guest     password: guest     virtual-host: /     listener:       simple:         acknowledge-mode: manual # 手動ack,默認(rèn)為auto  log:   exchange: log.exchange   info:     queue: info.log.queue     binding-key: info.log.key

發(fā)布者確認(rèn)回調(diào)

@Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {      @Autowired     private MessageSender messageSender;      @Override     public void confirm(CorrelationData correlationData, boolean ack, String cause) {         String msgId = correlationData.getId();         String msg = messageSender.dequeueUnAckMsg(msgId);         if (ack) {             System.out.println(String.format("消息 {%s} 成功發(fā)送給mq", msg));         } else {             // 可以加一些重試的邏輯             System.out.println(String.format("消息 {%s} 發(fā)送mq失敗", msg));         }     } }

失敗通知回調(diào)

@Component public class ReturnCallback implements RabbitTemplate.ReturnCallback {      @Override     public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {         String msg = new String(message.getBody());         System.out.println(String.format("消息 {%s} 不能被正確路由,routingKey為 {%s}", msg, routingKey));     } }
@Configuration public class RabbitMqConfig {      @Bean     public ConnectionFactory connectionFactory(             @Value("${spring.rabbitmq.host}") String host,             @Value("${spring.rabbitmq.port}") int port,             @Value("${spring.rabbitmq.username}") String username,             @Value("${spring.rabbitmq.password}") String password,             @Value("${spring.rabbitmq.virtual-host}") String vhost) {         CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);         connectionFactory.setPort(port);         connectionFactory.setUsername(username);         connectionFactory.setPassword(password);         connectionFactory.setVirtualHost(vhost);         connectionFactory.setPublisherConfirms(true);         connectionFactory.setPublisherReturns(true);         return connectionFactory;     }      @Bean     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,                                          ReturnCallback returnCallback, ConfirmCallback confirmCallback) {         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);         rabbitTemplate.setReturnCallback(returnCallback);         rabbitTemplate.setConfirmCallback(confirmCallback);         // 要想使 returnCallback 生效,必須設(shè)置為true         rabbitTemplate.setMandatory(true);         return rabbitTemplate;     } }

這里我對RabbitTemplate做了一下包裝,主要就是發(fā)送的時(shí)候增加消息id,并且保存消息id和消息的對應(yīng)關(guān)系,因?yàn)镽abbitTemplate.ConfirmCallback只能拿到消息id,并不能拿到消息內(nèi)容,所以需要我們自己保存這種映射關(guān)系。在一些可靠性要求比較高的系統(tǒng)中,你可以將這種映射關(guān)系存到數(shù)據(jù)庫中,成功發(fā)送刪除映射關(guān)系,失敗則一直發(fā)送

@Component public class MessageSender {      @Autowired     private RabbitTemplate rabbitTemplate;      public final Map<String, String> unAckMsgQueue = new ConcurrentHashMap<>();      public void convertAndSend(String exchange, String routingKey, String message) {         String msgId = UUID.randomUUID().toString();         CorrelationData correlationData = new CorrelationData();         correlationData.setId(msgId);         rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);         unAckMsgQueue.put(msgId, message);     }      public String dequeueUnAckMsg(String msgId) {         return unAckMsgQueue.remove(msgId);     }  }

測試代碼為

@RunWith(SpringRunner.class) @SpringBootTest public class MsgProducerTest {      @Autowired     private MessageSender messageSender;     @Value("${log.exchange}")     private String exchange;     @Value("${log.info.binding-key}")     private String routingKey;      /**      * 測試失敗通知      */     @SneakyThrows     @Test     public void sendErrorMsg() {         for (int i = 0; i < 3; i++) {             String message = "this is error message " + i;             messageSender.convertAndSend(exchange, "test", message);         }         System.in.read();     }      /**      * 測試發(fā)布者確認(rèn)      */     @SneakyThrows     @Test     public void sendInfoMsg() {         for (int i = 0; i < 3; i++) {             String message = "this is info message " + i;             messageSender.convertAndSend(exchange, routingKey, message);         }         System.in.read();     } }

先來測試失敗者通知

輸出為

消息 {this is error message 0} 不能被正確路由,routingKey為 {test} 消息 {this is error message 0} 成功發(fā)送給mq 消息 {this is error message 2} 不能被正確路由,routingKey為 {test} 消息 {this is error message 2} 成功發(fā)送給mq 消息 {this is error message 1} 不能被正確路由,routingKey為 {test} 消息 {this is error message 1} 成功發(fā)送給mq

消息都成功發(fā)送到broker,但是并沒有被路由到queue中

再來測試發(fā)布者確認(rèn)

輸出為

消息 {this is info message 0} 成功發(fā)送給mq infoLogQueue 收到的消息為: {this is info message 0} infoLogQueue 收到的消息為: {this is info message 1} 消息 {this is info message 1} 成功發(fā)送給mq infoLogQueue 收到的消息為: {this is info message 2} 消息 {this is info message 2} 成功發(fā)送給mq

看完上述內(nèi)容,你們對RabbitMQ中怎么保證消息的可靠投遞有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI