您好,登錄后才能下訂單哦!
小編給大家分享一下RabbitMQ如何防止數(shù)據(jù)丟失,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
分析RabbitMQ消息丟失的情況,不妨先看看一條消息從生產(chǎn)者發(fā)送到消費(fèi)者消費(fèi)的過(guò)程:
可以看出,一條消息整個(gè)過(guò)程要經(jīng)歷兩次的網(wǎng)絡(luò)傳輸:從生產(chǎn)者發(fā)送到RabbitMQ服務(wù)器,從RabbitMQ服務(wù)器發(fā)送到消費(fèi)者。
在消費(fèi)者未消費(fèi)前存儲(chǔ)在隊(duì)列(Queue)中。
所以可以知道,有三個(gè)場(chǎng)景下是會(huì)發(fā)生消息丟失的:
針對(duì)以上三種場(chǎng)景,RabbitMQ提供了三種解決的方式,分別是消息持久化,confirm機(jī)制,ACK事務(wù)機(jī)制。
RabbitMQ是支持消息持久化的,消息持久化需要設(shè)置:Exchange為持久化和Queue持久化,這樣當(dāng)消息發(fā)送到RabbitMQ服務(wù)器時(shí),消息就會(huì)持久化。
首先看Exchange交換機(jī)的類(lèi)圖:
看這個(gè)類(lèi)圖其實(shí)是要說(shuō)明上一篇文章介紹的四種交換機(jī)都是AbstractExchange抽象類(lèi)的子類(lèi),所以根據(jù)java的特性,創(chuàng)建子類(lèi)的實(shí)例會(huì)先調(diào)用父類(lèi)的構(gòu)造器,父類(lèi)也就是AbstractExchange的構(gòu)造器是怎么樣的呢?
從上面的注釋可以看到durable參數(shù)表示是否持久化。默認(rèn)是持久化(true)。創(chuàng)建持久化的Exchange可以這樣寫(xiě):
@Bean
public DirectExchange rabbitmqDemoDirectExchange() {
//Direct交換機(jī)
return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
}
接著是Queue隊(duì)列,我們先看看Queue的構(gòu)造器是怎么樣的:
也是通過(guò)durable參數(shù)設(shè)置是否持久化,默認(rèn)是true。所以創(chuàng)建時(shí)可以不指定:
@Bean
public Queue fanoutExchangeQueueA() {
//只需要指定名稱(chēng),默認(rèn)是持久化的
return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A);
}
這就完成了消息持久化的設(shè)置,接下來(lái)啟動(dòng)項(xiàng)目,發(fā)送幾條消息,我們可以看到:
怎么證明是已經(jīng)持久化了呢,實(shí)際上可以找到對(duì)應(yīng)的文件:找到對(duì)應(yīng)磁盤(pán)中的目錄:消息持久化可以防止消息在RabbitMQ Server中不會(huì)因?yàn)殄礄C(jī)重啟而丟失。
在生產(chǎn)者發(fā)送到RabbitMQ Server時(shí)有可能因?yàn)榫W(wǎng)絡(luò)問(wèn)題導(dǎo)致投遞失敗,從而丟失數(shù)據(jù)。我們可以使用confirm模式防止數(shù)據(jù)丟失。工作流程是怎么樣的呢,看以下圖解:從上圖中可以看到是通過(guò)兩個(gè)回調(diào)函數(shù)**confirm()、returnedMessage()**進(jìn)行通知。
一條消息從生產(chǎn)者發(fā)送到RabbitMQ,首先會(huì)發(fā)送到Exchange,對(duì)應(yīng)回調(diào)函數(shù)confirm()。第二步從Exchange路由分配到Queue中,對(duì)應(yīng)回調(diào)函數(shù)則是returnedMessage()。
代碼怎么實(shí)現(xiàn)呢,請(qǐng)看演示:
首先在application.yml配置文件中加上如下配置:
spring:
rabbitmq:
publisher-confirms: true
# publisher-returns: true
template:
mandatory: true
# publisher-confirms:設(shè)置為true時(shí)。當(dāng)消息投遞到Exchange后,會(huì)回調(diào)confirm()方法進(jìn)行通知生產(chǎn)者
# publisher-returns:設(shè)置為true時(shí)。當(dāng)消息匹配到Queue并且失敗時(shí),會(huì)通過(guò)回調(diào)returnedMessage()方法返回消息
# spring.rabbitmq.template.mandatory: 設(shè)置為true時(shí)。指定消息在沒(méi)有被隊(duì)列接收時(shí)會(huì)通過(guò)回調(diào)returnedMessage()方法退回。
有個(gè)小細(xì)節(jié),publisher-returns和mandatory如果都設(shè)置的話(huà),優(yōu)先級(jí)是以mandatory優(yōu)先。可以看源碼:接著我們需要定義回調(diào)方法:
@Component
public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class);
/**
* 監(jiān)聽(tīng)消息是否到達(dá)Exchange
*
* @param correlationData 包含消息的唯一標(biāo)識(shí)的對(duì)象
* @param ack true 標(biāo)識(shí) ack,false 標(biāo)識(shí) nack
* @param cause nack 投遞失敗的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.info("消息投遞成功~消息Id:{}", correlationData.getId());
} else {
logger.error("消息投遞失敗,Id:{},錯(cuò)誤提示:{}", correlationData.getId(), cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("消息沒(méi)有路由到隊(duì)列,獲得返回的消息");
Map map = byteToObject(message.getBody(), Map.class);
logger.info("message body: {}", map == null ? "" : map.toString());
logger.info("replyCode: {}", replyCode);
logger.info("replyText: {}", replyText);
logger.info("exchange: {}", exchange);
logger.info("routingKey: {}", exchange);
logger.info("------------> end <------------");
}
@SuppressWarnings("unchecked")
private <T> T byteToObject(byte[] bytes, Class<T> clazz) {
T t;
try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bis)) {
t = (T) ois.readObject();
} catch (Exception e) {
e.printStackTrace();
return null;
}
return t;
}
}
我這里就簡(jiǎn)單地打印回調(diào)方法返回的消息,在實(shí)際項(xiàng)目中,可以把返回的消息存儲(chǔ)到日志表中,使用定時(shí)任務(wù)進(jìn)行進(jìn)一步的處理。
我這里是使用RabbitTemplate進(jìn)行發(fā)送,所以在Service層的RabbitTemplate需要設(shè)置一下:
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
@Resource
private RabbitmqConfirmCallback rabbitmqConfirmCallback;
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
//指定 ConfirmCallback
rabbitTemplate.setConfirmCallback(rabbitmqConfirmCallback);
//指定 ReturnCallback
rabbitTemplate.setReturnCallback(rabbitmqConfirmCallback);
}
@Override
public String sendMsg(String msg) throws Exception {
Map<String, Object> message = getMessage(msg);
try {
CorrelationData correlationData = (CorrelationData) message.remove("correlationData");
rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message, correlationData);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
private Map<String, Object> getMessage(String msg) {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
CorrelationData correlationData = new CorrelationData(msgId);
String sendTime = sdf.format(new Date());
Map<String, Object> map = new HashMap<>();
map.put("msgId", msgId);
map.put("sendTime", sendTime);
map.put("msg", msg);
map.put("correlationData", correlationData);
return map;
}
}
大功告成!接下來(lái)我們進(jìn)行測(cè)試,發(fā)送一條消息,我們可以控制臺(tái):假設(shè)發(fā)送一條信息沒(méi)有路由匹配到隊(duì)列,可以看到如下信息:這就是confirm模式。它的作用是為了保障生產(chǎn)者投遞消息到RabbitMQ不會(huì)出現(xiàn)消息丟失。
最開(kāi)始的那張圖已經(jīng)講過(guò),消費(fèi)者從隊(duì)列中獲取到消息后,會(huì)直接確認(rèn)簽收,假設(shè)消費(fèi)者宕機(jī)或者程序出現(xiàn)異常,數(shù)據(jù)沒(méi)有正常消費(fèi),這種情況就會(huì)出現(xiàn)數(shù)據(jù)丟失。
所以關(guān)鍵在于把自動(dòng)簽收改成手動(dòng)簽收,正常消費(fèi)則返回確認(rèn)簽收,如果出現(xiàn)異常,則返回拒絕簽收重回隊(duì)列。代碼怎么實(shí)現(xiàn)呢,請(qǐng)看演示:
首先在消費(fèi)者的application.yml文件中設(shè)置事務(wù)提交為manual手動(dòng)模式:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手動(dòng)ack模式
concurrency: 1 # 最少消費(fèi)者數(shù)量
max-concurrency: 10 # 最大消費(fèi)者數(shù)量
然后編寫(xiě)消費(fèi)者的監(jiān)聽(tīng)器:
@Component
public class RabbitDemoConsumer {
enum Action {
//處理成功
SUCCESS,
//可以重試的錯(cuò)誤,消息重回隊(duì)列
RETRY,
//無(wú)需重試的錯(cuò)誤,拒絕消息,并從隊(duì)列中刪除
REJECT
}
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
public void process(String msg, Message message, Channel channel) {
long tag = message.getMessageProperties().getDeliveryTag();
Action action = Action.SUCCESS;
try {
System.out.println("消費(fèi)者RabbitDemoConsumer從RabbitMQ服務(wù)端消費(fèi)消息:" + msg);
if ("bad".equals(msg)) {
throw new IllegalArgumentException("測(cè)試:拋出可重回隊(duì)列的異常");
}
if ("error".equals(msg)) {
throw new Exception("測(cè)試:拋出無(wú)需重回隊(duì)列的異常");
}
} catch (IllegalArgumentException e1) {
e1.printStackTrace();
//根據(jù)異常的類(lèi)型判斷,設(shè)置action是可重試的,還是無(wú)需重試的
action = Action.RETRY;
} catch (Exception e2) {
//打印異常
e2.printStackTrace();
//根據(jù)異常的類(lèi)型判斷,設(shè)置action是可重試的,還是無(wú)需重試的
action = Action.REJECT;
} finally {
try {
if (action == Action.SUCCESS) {
//multiple 表示是否批量處理。true表示批量ack處理小于tag的所有消息。false則處理當(dāng)前消息
channel.basicAck(tag, false);
} else if (action == Action.RETRY) {
//Nack,拒絕策略,消息重回隊(duì)列
channel.basicNack(tag, false, true);
} else {
//Nack,拒絕策略,并且從隊(duì)列中刪除
channel.basicNack(tag, false, false);
}
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
解釋一下上面的代碼,如果沒(méi)有異常,則手動(dòng)確認(rèn)回復(fù)RabbitMQ服務(wù)端basicAck(消費(fèi)成功)。
如果拋出某些可以重回隊(duì)列的異常,我們就回復(fù)basicNack并且設(shè)置重回隊(duì)列。
如果是拋出不可重回隊(duì)列的異常,就回復(fù)basicNack并且設(shè)置從RabbitMQ的隊(duì)列中刪除。
接下來(lái)進(jìn)行測(cè)試,發(fā)送一條普通的消息"hello":解釋一下ack返回的三個(gè)方法的意思。
①成功確認(rèn)
void basicAck(long deliveryTag, boolean multiple) throws IOException;
消費(fèi)者成功處理后調(diào)用此方法對(duì)消息進(jìn)行確認(rèn)。
②失敗確認(rèn)
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
③失敗確認(rèn)
void basicReject(long deliveryTag, boolean requeue) throws IOException;
basicNack()和basicReject()的區(qū)別在于:basicNack()可以批量拒絕,basicReject()一次只能拒接一條消息。
上面的代碼我故意寫(xiě)了一個(gè)bug。測(cè)試發(fā)送一條"bad",然后會(huì)拋出重回隊(duì)列的異常。這就有個(gè)問(wèn)題:重回隊(duì)列后消費(fèi)者又消費(fèi),消費(fèi)拋出異常又重回隊(duì)列,就造成了死循環(huán)。那怎么避免這種情況呢?
既然nack會(huì)造成死循環(huán)的話(huà),我提供的一個(gè)思路是不使用basicNack(),把拋出異常的消息落庫(kù)到一張表中,記錄拋出的異常,消息體,消息Id。通過(guò)定時(shí)任務(wù)去處理。
如果你有什么好的解決方案,也可以留言討論~
有的時(shí)候比較粗心,不小心開(kāi)啟了自動(dòng)Ack模式,又手動(dòng)回復(fù)了Ack。那就會(huì)報(bào)這個(gè)錯(cuò)誤:
消費(fèi)者RabbitDemoConsumer從RabbitMQ服務(wù)端消費(fèi)消息:java技術(shù)愛(ài)好者
2020-08-02 22:52:42.148 ERROR 4880 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2020-08-02 22:52:43.102 INFO 4880 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@f4a3a8d: tags=[{amq.ctag-8MJeQ7el_PNbVJxGOOw7Rw=rabbitmq.demo.topic}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,5), conn: Proxy@782a1679 Shared Rabbit Connection: SimpleConnection@67c5b175 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56938], acknowledgeMode=AUTO local queue size=0
出現(xiàn)這個(gè)錯(cuò)誤,可以檢查一下yml文件是否添加了以下配置:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
concurrency: 1
max-concurrency: 10
如果上面這個(gè)配置已經(jīng)添加了,還是報(bào)錯(cuò),有可能你使用@Configuration配置了SimpleRabbitListenerContainerFactory,根據(jù)SpringBoot的特性,代碼優(yōu)于配置,代碼的配置覆蓋了yml的配置,并且忘記設(shè)置手動(dòng)manual模式:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//設(shè)置手動(dòng)ack模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
如果你還是有報(bào)錯(cuò),那可能是寫(xiě)錯(cuò)地方了,寫(xiě)在生產(chǎn)者的項(xiàng)目了。以上的配置應(yīng)該配置在消費(fèi)者的項(xiàng)目。因?yàn)閍ck模式是針對(duì)消費(fèi)者而言的。我就是寫(xiě)錯(cuò)了,寫(xiě)在生產(chǎn)者,折騰了幾個(gè)小時(shí),淚目~
其實(shí)手動(dòng)ACK相對(duì)于自動(dòng)ACK肯定是會(huì)慢很多,我在網(wǎng)上查了一些資料,性能相差大概有10倍。所以一般在實(shí)際應(yīng)用中不太建議開(kāi)手動(dòng)ACK模式。不過(guò)也不是絕對(duì)不可以開(kāi),具體情況具體分析,看并發(fā)量,還有數(shù)據(jù)的重要性等等。
所以在實(shí)際項(xiàng)目中還需要權(quán)衡一下并發(fā)量和數(shù)據(jù)的重要性,再?zèng)Q定具體的方案。
如果開(kāi)啟了手動(dòng)ACK模式,但是由于代碼有bug的原因,沒(méi)有回復(fù)RabbitMQ服務(wù)端,那么這條消息就會(huì)放到Unacked狀態(tài)的消息堆里,只有等到消費(fèi)者的連接斷開(kāi)才會(huì)轉(zhuǎn)到Ready消息。如果消費(fèi)者一直沒(méi)有斷開(kāi)連接,那Unacked的消息就會(huì)越來(lái)越多,占用內(nèi)存就越來(lái)越大,最后就會(huì)出現(xiàn)異常。
這個(gè)問(wèn)題,我沒(méi)法用我的電腦演示,我的電腦太卡了。
看完了這篇文章,相信你對(duì)“RabbitMQ如何防止數(shù)據(jù)丟失”有了一定的了解,如果想了解更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。