溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RabbitMQ 可靠投遞

發(fā)布時間:2020-07-15 14:27:53 來源:網(wǎng)絡(luò) 閱讀:5652 作者:王清培 欄目:軟件技術(shù)
  • 背景
  • confirmCallback 確認模式
  • returnCallback 未投遞到 queue 退回模式
  • shovel-plugin 跨機房可靠投遞

背景

在使用 RabbitMQ 的時候,作為消息發(fā)送方希望杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 為我們提供了兩個選項用來控制消息的投遞可靠性模式。

rabbitmq 整個消息投遞的路徑為:
producer->rabbitmq broker cluster->exchange->queue->consumer

messageproducerrabbitmq broker cluster 則會返回一個 confirmCallback 。
messageexchange->queue 投遞失敗則會返回一個 returnCallback 。我們將利用這兩個 callback 控制消息的最終一致性和部分糾錯能力。

confirmCallback 確認模式

在創(chuàng)建 connectionFactory 的時候設(shè)置 PublisherConfirms(true) 選項,開啟 confirmcallback 。

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherConfirms(true);//開啟confirm模式
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
        if (!ack) {
               log.error("消息發(fā)送失敗!" + cause + data.toString());
        } else {
            log.info("消息發(fā)送成功,消息ID:" + (data != null ? data.getId() : null));
        }
    });

我們來看下 ConfirmCallback 接口。

public interface ConfirmCallback {

        /**
         * Confirmation callback.
         * @param correlationData correlation data for the callback.
         * @param ack true for ack, false for nack
         * @param cause An optional cause, for nack, when available, otherwise null.
         */
        void confirm(CorrelationData correlationData, boolean ack, String cause);

    }

重點是 CorrelationData 對象,每個發(fā)送的消息都需要配備一個 CorrelationData 相關(guān)數(shù)據(jù)對象,CorrelationData 對象內(nèi)部只有一個 id 屬性,用來表示當前消息唯一性。

發(fā)送的時候創(chuàng)建一個 CorrelationData 對象。

User user = new User();
user.setID(1010101L);
user.setUserName("plen");

rabbitTemplate.convertAndSend(exchange, routing, user,
        message -> {
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            return message;
        },
new CorrelationData(user.getID().toString()));

這里將 user ID 設(shè)置為當前消息 CorrelationData id 。當然這里是純粹 demo,真實場景是需要做業(yè)務(wù)無關(guān)消息 ID 生成,同時要記錄下這個 id 用來糾錯和對賬。

消息只要被 rabbitmq broker 接收到就會執(zhí)行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才會調(diào)用 confirmCallback。

broker 接收到只能表示 message 已經(jīng)到達服務(wù)器,并不能保證消息一定會被投遞到目標 queue 里。所以需要用到接下來的 returnCallback 。

returnCallback 未投遞到queue退回模式

confrim 模式只能保證消息到達 broker,不能保證消息準確投遞到目標 queue 里。在有些業(yè)務(wù)場景下,我們需要保證消息一定要投遞到目標 queue 里,此時就需要用到 return 退回模式。

同樣創(chuàng)建 ConnectionFactory 到時候需要設(shè)置 PublisherReturns(true) 選項。

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherReturns(true);//開啟return模式
rabbitTemplate.setMandatory(true);//開啟強制委托模式

rabbitTemplate.setReturnCallback((message, replyCode, replyText,
                    exchange, routingKey) ->
    log.info(MessageFormat.format("消息發(fā)送ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)));

這樣如果未能投遞到目標 queue 里將調(diào)用 returnCallback ,可以記錄下詳細到投遞數(shù)據(jù),定期的巡檢或者自動糾錯都需要這些數(shù)據(jù)。

shovel-plugin 跨機房可靠投遞

RabbitMQ 在跨機房集成提供了一個不錯的插件 shovel 。使用 shovel-plugin 插件非常方便,shovel 可以接受機房之間的網(wǎng)絡(luò)斷開、機器下線等不穩(wěn)定因素。

這里有兩個 broker

10.211.55.3 rabbit_node1
10.211.55.4 rabbit_node2

我們希望將發(fā)送給 rabbit_node1 plen.queue 的消息傳輸?shù)?rabbit_node2 plen.queue 中。我們先開啟 __rabbit_node1shovel-plugin__。

先看下當前 RabbitMQ 版本是否安裝了 shovel-plugin,如果有的話直接開啟。

rabbitmq-plugins  list
rabbitmq-plugins  enable rabbitmq_shovel
rabbitmq-plugins  enable rabbitmq_shovel_management

然后就可以在 Admin 面板里看到這個設(shè)置選項,怎么設(shè)置這里就不介紹了。主要就是配置下 amqp 協(xié)議地址,amqp://user:password@server-name/my-vhost 。

如果配置沒有問題的話,應(yīng)該是這樣的一個狀態(tài),說明已經(jīng)順利連接到 __rabbit_node2 broker__ 。

RabbitMQ 可靠投遞
RabbitMQ 可靠投遞

我們來看下 rabbit_node1rabbit_node2Connections 面板。
__rabbit_node1(10.211.55.3):__
RabbitMQ 可靠投遞

__rabbit_node2(10.211.55.4):__
RabbitMQ 可靠投遞

RabbitMQ shovel-plugin 插件在 rabbit_node1 broker 創(chuàng)建了兩個 tcp 連接,端口 39544 連接是用來消費 plen.queue 里的消息,端口 55706 連接是用來推送消息給 rabbit_node2

我們來看下 __rabbit_node1 tcp__ 連接狀態(tài):

tcp6       0      0 10.211.55.3:5672        10.211.55.3:39544       ESTABLISHED
tcp        0      0 10.211.55.3:55706       10.211.55.4:5672        ESTABLISHED

__rabbit_node2 tcp__ 連接狀態(tài):

tcp6       0      0 10.211.55.4:5672        10.211.55.3:55706       ESTABLISHED

為了驗證 shovel-plugin 穩(wěn)定性,我們將 __rabbit_node2__ 下線。
RabbitMQ 可靠投遞

然后再發(fā)送消息,發(fā)現(xiàn)消息會現(xiàn)在 rabbit_node1 plen.queue 里待著,一旦 shovel-plugin 連接恢復(fù)將消費 rabbit_node1 plen.queue 消息,然后投遞給 __rabbit_node2 plen.queue__ 。

作者:王清培 (滬江集團資深JAVA架構(gòu)師)

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI