溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

RabbitMQ如何防止數(shù)據(jù)丟失

發(fā)布時(shí)間:2021-12-24 09:28:50 來(lái)源:億速云 閱讀:129 作者:小新 欄目:大數(shù)據(jù)

小編給大家分享一下RabbitMQ如何防止數(shù)據(jù)丟失,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!


 

一、分析數(shù)據(jù)丟失的原因

分析RabbitMQ消息丟失的情況,不妨先看看一條消息從生產(chǎn)者發(fā)送到消費(fèi)者消費(fèi)的過(guò)程:

RabbitMQ如何防止數(shù)據(jù)丟失  

可以看出,一條消息整個(gè)過(guò)程要經(jīng)歷兩次的網(wǎng)絡(luò)傳輸:從生產(chǎn)者發(fā)送到RabbitMQ服務(wù)器,從RabbitMQ服務(wù)器發(fā)送到消費(fèi)者。

在消費(fèi)者未消費(fèi)前存儲(chǔ)在隊(duì)列(Queue)中。

所以可以知道,有三個(gè)場(chǎng)景下是會(huì)發(fā)生消息丟失的:

  • 存儲(chǔ)在隊(duì)列中,如果隊(duì)列沒(méi)有對(duì)消息持久化,RabbitMQ服務(wù)器宕機(jī)重啟會(huì)丟失數(shù)據(jù)。
  • 生產(chǎn)者發(fā)送消息到RabbitMQ服務(wù)器過(guò)程中,RabbitMQ服務(wù)器如果宕機(jī)停止服務(wù),消息會(huì)丟失。
  • 消費(fèi)者從RabbitMQ服務(wù)器獲取隊(duì)列中存儲(chǔ)的數(shù)據(jù)消費(fèi),但是消費(fèi)者程序出錯(cuò)或者宕機(jī)而沒(méi)有正確消費(fèi),導(dǎo)致數(shù)據(jù)丟失。

針對(duì)以上三種場(chǎng)景,RabbitMQ提供了三種解決的方式,分別是消息持久化,confirm機(jī)制,ACK事務(wù)機(jī)制。

RabbitMQ如何防止數(shù)據(jù)丟失  
 

二、消息持久化

RabbitMQ是支持消息持久化的,消息持久化需要設(shè)置:Exchange為持久化和Queue持久化,這樣當(dāng)消息發(fā)送到RabbitMQ服務(wù)器時(shí),消息就會(huì)持久化。

首先看Exchange交換機(jī)的類(lèi)圖:

RabbitMQ如何防止數(shù)據(jù)丟失  

看這個(gè)類(lèi)圖其實(shí)是要說(shuō)明上一篇文章介紹的四種交換機(jī)都是AbstractExchange抽象類(lèi)的子類(lèi),所以根據(jù)java的特性,創(chuàng)建子類(lèi)的實(shí)例會(huì)先調(diào)用父類(lèi)的構(gòu)造器,父類(lèi)也就是AbstractExchange的構(gòu)造器是怎么樣的呢?

RabbitMQ如何防止數(shù)據(jù)丟失  

從上面的注釋可以看到durable參數(shù)表示是否持久化。默認(rèn)是持久化(true)。創(chuàng)建持久化的Exchange可以這樣寫(xiě):

 @Bean
    public DirectExchange rabbitmqDemoDirectExchange() {
        //Direct交換機(jī)
        return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
    }
 

接著是Queue隊(duì)列,我們先看看Queue的構(gòu)造器是怎么樣的:

RabbitMQ如何防止數(shù)據(jù)丟失  

也是通過(guò)durable參數(shù)設(shè)置是否持久化,默認(rèn)是true。所以創(chuàng)建時(shí)可以不指定:

 @Bean
    public Queue fanoutExchangeQueueA() {
     //只需要指定名稱(chēng),默認(rèn)是持久化的
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A);
    }
 

這就完成了消息持久化的設(shè)置,接下來(lái)啟動(dòng)項(xiàng)目,發(fā)送幾條消息,我們可以看到:

RabbitMQ如何防止數(shù)據(jù)丟失怎么證明是已經(jīng)持久化了呢,實(shí)際上可以找到對(duì)應(yīng)的文件:RabbitMQ如何防止數(shù)據(jù)丟失找到對(duì)應(yīng)磁盤(pán)中的目錄:RabbitMQ如何防止數(shù)據(jù)丟失消息持久化可以防止消息在RabbitMQ Server中不會(huì)因?yàn)殄礄C(jī)重啟而丟失。

 

三、消息確認(rèn)機(jī)制

 

3.1 confirm機(jī)制

在生產(chǎn)者發(fā)送到RabbitMQ Server時(shí)有可能因?yàn)榫W(wǎng)絡(luò)問(wèn)題導(dǎo)致投遞失敗,從而丟失數(shù)據(jù)。我們可以使用confirm模式防止數(shù)據(jù)丟失。工作流程是怎么樣的呢,看以下圖解:RabbitMQ如何防止數(shù)據(jù)丟失從上圖中可以看到是通過(guò)兩個(gè)回調(diào)函數(shù)**confirm()、returnedMessage()**進(jìn)行通知。

一條消息從生產(chǎn)者發(fā)送到RabbitMQ,首先會(huì)發(fā)送到Exchange,對(duì)應(yīng)回調(diào)函數(shù)confirm()。第二步從Exchange路由分配到Queue中,對(duì)應(yīng)回調(diào)函數(shù)則是returnedMessage()。

代碼怎么實(shí)現(xiàn)呢,請(qǐng)看演示:

首先在application.yml配置文件中加上如下配置:

spring:
  rabbitmq:
    publisher-confirms: true
#    publisher-returns: true
    template:
      mandatory: true
# publisher-confirms:設(shè)置為true時(shí)。當(dāng)消息投遞到Exchange后,會(huì)回調(diào)confirm()方法進(jìn)行通知生產(chǎn)者
# publisher-returns:設(shè)置為true時(shí)。當(dāng)消息匹配到Queue并且失敗時(shí),會(huì)通過(guò)回調(diào)returnedMessage()方法返回消息
# spring.rabbitmq.template.mandatory: 設(shè)置為true時(shí)。指定消息在沒(méi)有被隊(duì)列接收時(shí)會(huì)通過(guò)回調(diào)returnedMessage()方法退回。
 

有個(gè)小細(xì)節(jié),publisher-returns和mandatory如果都設(shè)置的話(huà),優(yōu)先級(jí)是以mandatory優(yōu)先。可以看源碼:RabbitMQ如何防止數(shù)據(jù)丟失接著我們需要定義回調(diào)方法:

@Component
public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class);

    /**
     * 監(jiān)聽(tīng)消息是否到達(dá)Exchange
     *
     * @param correlationData 包含消息的唯一標(biāo)識(shí)的對(duì)象
     * @param ack             true 標(biāo)識(shí) ack,false 標(biāo)識(shí) nack
     * @param cause           nack 投遞失敗的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            logger.info("消息投遞成功~消息Id:{}", correlationData.getId());
        } else {
            logger.error("消息投遞失敗,Id:{},錯(cuò)誤提示:{}", correlationData.getId(), cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        logger.info("消息沒(méi)有路由到隊(duì)列,獲得返回的消息");
        Map map = byteToObject(message.getBody(), Map.class);
        logger.info("message body: {}", map == null ? "" : map.toString());
        logger.info("replyCode: {}", replyCode);
        logger.info("replyText: {}", replyText);
        logger.info("exchange: {}", exchange);
        logger.info("routingKey: {}", exchange);
        logger.info("------------> end <------------");
    }

    @SuppressWarnings("unchecked")
    private <T> T byteToObject(byte[] bytes, Class<T> clazz) {
        T t;
        try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
             ObjectInputStream ois = new ObjectInputStream(bis)) {
            t = (T) ois.readObject();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
        return t;
    }
}
 

我這里就簡(jiǎn)單地打印回調(diào)方法返回的消息,在實(shí)際項(xiàng)目中,可以把返回的消息存儲(chǔ)到日志表中,使用定時(shí)任務(wù)進(jìn)行進(jìn)一步的處理。

我這里是使用RabbitTemplate進(jìn)行發(fā)送,所以在Service層的RabbitTemplate需要設(shè)置一下:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
 @Resource
    private RabbitmqConfirmCallback rabbitmqConfirmCallback;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        //指定 ConfirmCallback
        rabbitTemplate.setConfirmCallback(rabbitmqConfirmCallback);
        //指定 ReturnCallback
        rabbitTemplate.setReturnCallback(rabbitmqConfirmCallback);
    }
    
    @Override
    public String sendMsg(String msg) throws Exception {
        Map<String, Object> message = getMessage(msg);
        try {
            CorrelationData correlationData = (CorrelationData) message.remove("correlationData");
            rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message, correlationData);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
    
 private Map<String, Object> getMessage(String msg) {
        String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
        CorrelationData correlationData = new CorrelationData(msgId);
        String sendTime = sdf.format(new Date());
        Map<String, Object> map = new HashMap<>();
        map.put("msgId", msgId);
        map.put("sendTime", sendTime);
        map.put("msg", msg);
        map.put("correlationData", correlationData);
        return map;
    }
}
 

大功告成!接下來(lái)我們進(jìn)行測(cè)試,發(fā)送一條消息,我們可以控制臺(tái):RabbitMQ如何防止數(shù)據(jù)丟失假設(shè)發(fā)送一條信息沒(méi)有路由匹配到隊(duì)列,可以看到如下信息:RabbitMQ如何防止數(shù)據(jù)丟失這就是confirm模式。它的作用是為了保障生產(chǎn)者投遞消息到RabbitMQ不會(huì)出現(xiàn)消息丟失。

 

3.2 事務(wù)機(jī)制(ACK)

最開(kāi)始的那張圖已經(jīng)講過(guò),消費(fèi)者從隊(duì)列中獲取到消息后,會(huì)直接確認(rèn)簽收,假設(shè)消費(fèi)者宕機(jī)或者程序出現(xiàn)異常,數(shù)據(jù)沒(méi)有正常消費(fèi),這種情況就會(huì)出現(xiàn)數(shù)據(jù)丟失。

所以關(guān)鍵在于把自動(dòng)簽收改成手動(dòng)簽收,正常消費(fèi)則返回確認(rèn)簽收,如果出現(xiàn)異常,則返回拒絕簽收重回隊(duì)列。RabbitMQ如何防止數(shù)據(jù)丟失代碼怎么實(shí)現(xiàn)呢,請(qǐng)看演示:

首先在消費(fèi)者的application.yml文件中設(shè)置事務(wù)提交為manual手動(dòng)模式:

spring:
  rabbitmq:
    listener:
      simple:
  acknowledge-mode: manual # 手動(dòng)ack模式
        concurrency: 1 # 最少消費(fèi)者數(shù)量
        max-concurrency: 10 # 最大消費(fèi)者數(shù)量
 

然后編寫(xiě)消費(fèi)者的監(jiān)聽(tīng)器:

@Component
public class RabbitDemoConsumer {

    enum Action {
        //處理成功
        SUCCESS,
        //可以重試的錯(cuò)誤,消息重回隊(duì)列
        RETRY,
        //無(wú)需重試的錯(cuò)誤,拒絕消息,并從隊(duì)列中刪除
        REJECT
    }

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
    public void process(String msg, Message message, Channel channel) {
        long tag = message.getMessageProperties().getDeliveryTag();
        Action action = Action.SUCCESS;
        try {
            System.out.println("消費(fèi)者RabbitDemoConsumer從RabbitMQ服務(wù)端消費(fèi)消息:" + msg);
            if ("bad".equals(msg)) {
                throw new IllegalArgumentException("測(cè)試:拋出可重回隊(duì)列的異常");
            }
            if ("error".equals(msg)) {
                throw new Exception("測(cè)試:拋出無(wú)需重回隊(duì)列的異常");
            }
        } catch (IllegalArgumentException e1) {
            e1.printStackTrace();
            //根據(jù)異常的類(lèi)型判斷,設(shè)置action是可重試的,還是無(wú)需重試的
            action = Action.RETRY;
        } catch (Exception e2) {
            //打印異常
            e2.printStackTrace();
            //根據(jù)異常的類(lèi)型判斷,設(shè)置action是可重試的,還是無(wú)需重試的
            action = Action.REJECT;
        } finally {
            try {
                if (action == Action.SUCCESS) {
                    //multiple 表示是否批量處理。true表示批量ack處理小于tag的所有消息。false則處理當(dāng)前消息
                    channel.basicAck(tag, false);
                } else if (action == Action.RETRY) {
                    //Nack,拒絕策略,消息重回隊(duì)列
                    channel.basicNack(tag, false, true);
                } else {
                    //Nack,拒絕策略,并且從隊(duì)列中刪除
                    channel.basicNack(tag, false, false);
                }
                channel.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
 

解釋一下上面的代碼,如果沒(méi)有異常,則手動(dòng)確認(rèn)回復(fù)RabbitMQ服務(wù)端basicAck(消費(fèi)成功)。

如果拋出某些可以重回隊(duì)列的異常,我們就回復(fù)basicNack并且設(shè)置重回隊(duì)列。

如果是拋出不可重回隊(duì)列的異常,就回復(fù)basicNack并且設(shè)置從RabbitMQ的隊(duì)列中刪除。

接下來(lái)進(jìn)行測(cè)試,發(fā)送一條普通的消息"hello":RabbitMQ如何防止數(shù)據(jù)丟失解釋一下ack返回的三個(gè)方法的意思。

①成功確認(rèn)

void basicAck(long deliveryTag, boolean multiple) throws IOException;
 

消費(fèi)者成功處理后調(diào)用此方法對(duì)消息進(jìn)行確認(rèn)。

  • deliveryTag:該消息的index
  • multiple:是否批量.。true:將一次性ack所有小于deliveryTag的消息。

②失敗確認(rèn)

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
 
  • deliveryTag:該消息的index。
  • multiple:是否批量。true:將一次性拒絕所有小于deliveryTag的消息。
  • requeue:被拒絕的是否重新入隊(duì)列。

③失敗確認(rèn)

void basicReject(long deliveryTag, boolean requeue) throws IOException;
 
  • deliveryTag:該消息的index。
  • requeue:被拒絕的是否重新入隊(duì)列。

basicNack()和basicReject()的區(qū)別在于:basicNack()可以批量拒絕,basicReject()一次只能拒接一條消息。

 

四、遇到的坑

 

4.1 啟用nack機(jī)制后,導(dǎo)致的死循環(huán)

上面的代碼我故意寫(xiě)了一個(gè)bug。測(cè)試發(fā)送一條"bad",然后會(huì)拋出重回隊(duì)列的異常。這就有個(gè)問(wèn)題:重回隊(duì)列后消費(fèi)者又消費(fèi),消費(fèi)拋出異常又重回隊(duì)列,就造成了死循環(huán)。RabbitMQ如何防止數(shù)據(jù)丟失那怎么避免這種情況呢?

既然nack會(huì)造成死循環(huán)的話(huà),我提供的一個(gè)思路是不使用basicNack(),把拋出異常的消息落庫(kù)到一張表中,記錄拋出的異常,消息體,消息Id。通過(guò)定時(shí)任務(wù)去處理

如果你有什么好的解決方案,也可以留言討論~

 

4.2 double ack

有的時(shí)候比較粗心,不小心開(kāi)啟了自動(dòng)Ack模式,又手動(dòng)回復(fù)了Ack。那就會(huì)報(bào)這個(gè)錯(cuò)誤:

消費(fèi)者RabbitDemoConsumer從RabbitMQ服務(wù)端消費(fèi)消息:java技術(shù)愛(ài)好者
2020-08-02 22:52:42.148 ERROR 4880 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2020-08-02 22:52:43.102  INFO 4880 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@f4a3a8d: tags=[{amq.ctag-8MJeQ7el_PNbVJxGOOw7Rw=rabbitmq.demo.topic}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,5), conn: Proxy@782a1679 Shared Rabbit Connection: SimpleConnection@67c5b175 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56938], acknowledgeMode=AUTO local queue size=0
 

出現(xiàn)這個(gè)錯(cuò)誤,可以檢查一下yml文件是否添加了以下配置:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 1
        max-concurrency: 10
 

如果上面這個(gè)配置已經(jīng)添加了,還是報(bào)錯(cuò),有可能你使用@Configuration配置了SimpleRabbitListenerContainerFactory,根據(jù)SpringBoot的特性,代碼優(yōu)于配置,代碼的配置覆蓋了yml的配置,并且忘記設(shè)置手動(dòng)manual模式

@Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //設(shè)置手動(dòng)ack模式
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
 

如果你還是有報(bào)錯(cuò),那可能是寫(xiě)錯(cuò)地方了,寫(xiě)在生產(chǎn)者的項(xiàng)目了。以上的配置應(yīng)該配置在消費(fèi)者的項(xiàng)目。因?yàn)閍ck模式是針對(duì)消費(fèi)者而言的。我就是寫(xiě)錯(cuò)了,寫(xiě)在生產(chǎn)者,折騰了幾個(gè)小時(shí),淚目~

 

4.3 性能問(wèn)題

其實(shí)手動(dòng)ACK相對(duì)于自動(dòng)ACK肯定是會(huì)慢很多,我在網(wǎng)上查了一些資料,性能相差大概有10倍。所以一般在實(shí)際應(yīng)用中不太建議開(kāi)手動(dòng)ACK模式。不過(guò)也不是絕對(duì)不可以開(kāi),具體情況具體分析,看并發(fā)量,還有數(shù)據(jù)的重要性等等。

所以在實(shí)際項(xiàng)目中還需要權(quán)衡一下并發(fā)量和數(shù)據(jù)的重要性,再?zèng)Q定具體的方案

 

4.4 啟用手動(dòng)ack模式,如果沒(méi)有及時(shí)回復(fù),會(huì)造成隊(duì)列異常

如果開(kāi)啟了手動(dòng)ACK模式,但是由于代碼有bug的原因,沒(méi)有回復(fù)RabbitMQ服務(wù)端,那么這條消息就會(huì)放到Unacked狀態(tài)的消息堆里,只有等到消費(fèi)者的連接斷開(kāi)才會(huì)轉(zhuǎn)到Ready消息。如果消費(fèi)者一直沒(méi)有斷開(kāi)連接,那Unacked的消息就會(huì)越來(lái)越多,占用內(nèi)存就越來(lái)越大,最后就會(huì)出現(xiàn)異常。

這個(gè)問(wèn)題,我沒(méi)法用我的電腦演示,我的電腦太卡了。

 

看完了這篇文章,相信你對(duì)“RabbitMQ如何防止數(shù)據(jù)丟失”有了一定的了解,如果想了解更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI