您好,登錄后才能下訂單哦!
要解決該問題,就要用到RabbitMQ中持久化的概念,所謂持久化,就是RabbitMQ會將內(nèi)存中的數(shù)據(jù)(Exchange 交換器,Queue 隊(duì)列,Message 消息)固化到磁盤,以防異常情況發(fā)生時,數(shù)據(jù)丟失。
其中,RabblitMQ的持久化分為三個部分:
在上篇博客中,我們聲明Exchange的代碼是這樣的:
private final static String EXCHANGE_NAME = "normal-confirm-exchange";
// 創(chuàng)建一個Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
這種情況下聲明的Exchange是非持久化的,在RabbitMQ出現(xiàn)異常情況(重啟,宕機(jī))時,該Exchange會丟失,會影響后續(xù)的消息寫入該Exchange,那么如何設(shè)置Exchange為持久化的呢?答案是設(shè)置durable參數(shù)。
durable:設(shè)置是否持久化。durable設(shè)置為true表示持久化,反之是非持久化。
持久化可以將交換器存盤,在服務(wù)器重啟的時候不會丟失相關(guān)信息。
設(shè)置Exchange持久化:
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
此時調(diào)用的重載方法為:
public DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException {
return this.exchangeDeclare(exchange, (String)type, durable, false, (Map)null);
}
為了能更好的理解,我們新建個生產(chǎn)類如下:
package com.zwwhnly.springbootaction.rabbitmq.durable;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DurableProducer {
private final static String EXCHANGE_NAME = "durable-exchange";
private final static String QUEUE_NAME = "durable-queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 創(chuàng)建連接
ConnectionFactory factory = new ConnectionFactory();
// 設(shè)置 RabbitMQ 的主機(jī)名
factory.setHost("localhost");
// 創(chuàng)建一個連接
Connection connection = factory.newConnection();
// 創(chuàng)建一個通道
Channel channel = connection.createChannel();
// 創(chuàng)建一個Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 發(fā)送消息
String message = "durable exchange test";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
// 關(guān)閉頻道和連接
channel.close();
connection.close();
}
}
示例代碼中,我們新建了1個非持久化的Exchange,1個非持久化的Queue,并將它們做了綁定,此時運(yùn)行代碼,Exchange和Queue新建成功,消息‘durable exchange test’也被正確地投遞到了隊(duì)列中:
此時重啟下RabbitMQ服務(wù),會發(fā)現(xiàn)Exchange丟失了:
修改下代碼,將durable參數(shù)設(shè)置為ture:
// 創(chuàng)建一個Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
此時運(yùn)行完代碼,然后重啟下RabbitMQ服務(wù),會發(fā)現(xiàn)Exchange不再丟失:
細(xì)心的網(wǎng)友可能會發(fā)現(xiàn),雖然現(xiàn)在重啟RabbitMQ服務(wù)后,Exchange不丟失了,但是隊(duì)列和消息丟失了,那么如何解決隊(duì)列不丟失呢?答案也是設(shè)置durable參數(shù)。
durable:設(shè)置是否持久化。為true則設(shè)置隊(duì)列為持久化。
持久化的隊(duì)列會存盤,在服務(wù)器重啟的時候可以保證不丟失相關(guān)信息。
簡單修改下上面聲明Queue的代碼,將durable參數(shù)設(shè)置為true:
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
此時調(diào)用的重載方法如下:
public com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
validateQueueNameLength(queue);
return (com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Queue.Declare.Builder()).queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments).build()).getMethod();
}
運(yùn)行代碼,然后重啟RabbitMQ服務(wù),會發(fā)現(xiàn)隊(duì)列現(xiàn)在不丟失了:
雖然現(xiàn)在RabbitMQ重啟后,Exchange和Queue都不丟失了,但是存儲在Queue里的消息卻仍然會丟失,那么如何保證消息不丟失呢?答案是設(shè)置消息的投遞模式為2,即代表持久化。
修改發(fā)送消息的代碼為:
// 發(fā)送消息
String message = "durable exchange test";
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());
調(diào)用的重載方法為:
public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
this.basicPublish(exchange, routingKey, false, props, body);
}
運(yùn)行代碼,然后重啟RabbitMQ服務(wù),發(fā)現(xiàn)此時Exchange,Queue,消息都不丟失了:
至此,我們完美的解決了RabbitMQ重啟后,消息丟失的問題。
最終的代碼如下,你也可以通過文末的源碼鏈接下載本文用到的所有源碼:
package com.zwwhnly.springbootaction.rabbitmq.durable;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DurableProducer {
private final static String EXCHANGE_NAME = "durable-exchange";
private final static String QUEUE_NAME = "durable-queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 創(chuàng)建連接
ConnectionFactory factory = new ConnectionFactory();
// 設(shè)置 RabbitMQ 的主機(jī)名
factory.setHost("localhost");
// 創(chuàng)建一個連接
Connection connection = factory.newConnection();
// 創(chuàng)建一個通道
Channel channel = connection.createChannel();
// 創(chuàng)建一個Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 發(fā)送消息
String message = "durable exchange test";
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());
// 關(guān)閉頻道和連接
channel.close();
connection.close();
}
}
1)理論上可以將所有的消息都設(shè)置為持久化,但是這樣會嚴(yán)重影響RabbitMQ的性能。因?yàn)閷懭氪疟P的速度比寫入內(nèi)存的速度慢得不止一點(diǎn)點(diǎn)。對于可靠性不是那么高的消息可以不采用持久化處理以提高整體的吞吐量。在選擇是否要將消息持久化時,需要在可靠性和吞吐量之間做一個權(quán)衡。
2)將交換器、隊(duì)列、消息都設(shè)置了持久化之后仍然不能百分之百保證數(shù)據(jù)不丟失,因?yàn)楫?dāng)持久化的消息正確存入RabbitMQ之后,還需要一段時間(雖然很短,但是不可忽視)才能存入磁盤之中。如果在這段時間內(nèi)RabbitMQ服務(wù)節(jié)點(diǎn)發(fā)生了宕機(jī)、重啟等異常情況,消息還沒來得及落盤,那么這些消息將會丟失。
3)單單只設(shè)置隊(duì)列持久化,重啟之后消息會丟失;單單只設(shè)置消息的持久化,重啟之后隊(duì)列消失,繼而消息也丟失。單單設(shè)置消息持久化而不設(shè)置隊(duì)列的持久化顯得毫無意義。
Java學(xué)習(xí)、面試;文檔、視頻資源免費(fèi)獲取
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。