您好,登錄后才能下訂單哦!
這篇文章主要介紹“怎么使用springboot + rabbitmq消息確認(rèn)機(jī)制”,在日常操作中,相信很多人在怎么使用springboot + rabbitmq消息確認(rèn)機(jī)制問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對(duì)大家解答”怎么使用springboot + rabbitmq消息確認(rèn)機(jī)制”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置中需要開啟
發(fā)送端
和
消費(fèi)端
的消息確認(rèn)。
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 發(fā)送者開啟 confirm 確認(rèn)機(jī)制 spring.rabbitmq.publisher-confirms=true # 發(fā)送者開啟 return 確認(rèn)機(jī)制 spring.rabbitmq.publisher-returns=true #################################################### # 設(shè)置消費(fèi)端手動(dòng) ack spring.rabbitmq.listener.simple.acknowledge-mode=manual # 是否支持重試 spring.rabbitmq.listener.simple.retry.enabled=true
定義交換機(jī)
confirmTestExchange
和隊(duì)列
confirm_test_queue
,并將隊(duì)列綁定在交換機(jī)上。
@Configuration
public class QueueConfig {
@Bean(name = "confirmTestQueue")
public Queue confirmTestQueue() {
return new Queue("confirm_test_queue", true, false, false);
}
@Bean(name = "confirmTestExchange")
public FanoutExchange confirmTestExchange() {
return new FanoutExchange("confirmTestExchange");
}
@Bean
public Binding confirmTestFanoutExchangeAndQueue(
@Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
@Qualifier("confirmTestQueue") Queue confirmTestQueue) {
return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
}
}
在這里插入圖片描述
rabbitmq
的消息確認(rèn)分為兩部分:發(fā)送消息確認(rèn) 和 消息接收確認(rèn)。
消息只要被
rabbitmq broker
接收到就會(huì)觸發(fā)
confirmCallback
回調(diào) 。
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("消息發(fā)送異常!");
} else {
log.info("發(fā)送者爸爸已經(jīng)收到確認(rèn),correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
}
}
}
實(shí)現(xiàn)接口
ConfirmCallback
,重寫其confirm()
方法,方法內(nèi)有三個(gè)參數(shù)correlationData
、ack
、cause
。
correlationData
:對(duì)象內(nèi)部只有一個(gè)
id
屬性,用來表示當(dāng)前消息的唯一性。ack
:消息投遞到broker
的狀態(tài),true
表示成功。cause
:表示投遞失敗的原因。但消息被
broker
接收到只能表示已經(jīng)到達(dá) MQ服務(wù)器,并不能保證消息一定會(huì)被投遞到目標(biāo)
queue
里。所以接下來需要用到
returnCallback
。
如果消息未能投遞到目標(biāo)
queue
里將觸發(fā)回調(diào)
returnCallback
,一旦向
queue
投遞消息未成功,這里一般會(huì)記錄下當(dāng)前消息的詳細(xì)投遞數(shù)據(jù),方便后續(xù)做重發(fā)或者補(bǔ)償?shù)炔僮鳌?/p>
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
}
}
實(shí)現(xiàn)接口ReturnCallback
,重寫
returnedMessage()
方法,方法有五個(gè)參數(shù)message
(消息體)、replyCode
(響應(yīng)code)、replyText
(響應(yīng)內(nèi)容)、exchange
(交換機(jī))、routingKey
(隊(duì)列)。
下邊是具體的消息發(fā)送,在rabbitTemplate
中設(shè)置
Confirm
和
Return
回調(diào),我們通過setDeliveryMode()
對(duì)消息做持久化處理,為了后續(xù)測(cè)試創(chuàng)建一個(gè)
CorrelationData
對(duì)象,添加一個(gè)id
為10000000000
。
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfirmCallbackService confirmCallbackService;
@Autowired
private ReturnCallbackService returnCallbackService;
public void sendMessage(String exchange, String routingKey, Object msg) {
/**
* 確保消息發(fā)送失敗后可以重新返回到隊(duì)列中
* 注意:yml需要配置 publisher-returns: true
*/
rabbitTemplate.setMandatory(true);
/**
* 消費(fèi)者確認(rèn)收到消息后,手動(dòng)ack回執(zhí)回調(diào)處理
*/
rabbitTemplate.setConfirmCallback(confirmCallbackService);
/**
* 消息投遞到隊(duì)列失敗回調(diào)處理
*/
rabbitTemplate.setReturnCallback(returnCallbackService);
/**
* 發(fā)送消息
*/
rabbitTemplate.convertAndSend(exchange, routingKey, msg,
message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
},
new CorrelationData(UUID.randomUUID().toString()));
}
消費(fèi)消息有三種回執(zhí)方法,我們來分析一下每種方法的含義。
basicAck
:表示成功確認(rèn),使用此回執(zhí)方法后,消息會(huì)被rabbitmq broker
刪除。
void basicAck(long deliveryTag, boolean multiple)
deliveryTag
:表示消息投遞序號(hào),每次消費(fèi)消息或者消息重新投遞后,deliveryTag
都會(huì)增加。手動(dòng)消息確認(rèn)模式下,我們可以對(duì)指定deliveryTag
的消息進(jìn)行ack
、nack
、reject
等操作。
multiple
:是否批量確認(rèn),值為
true
則會(huì)一次性
ack
所有小于當(dāng)前消息
deliveryTag
的消息。
舉個(gè)栗子: 假設(shè)我先發(fā)送三條消息deliveryTag
分別是5、6、7,可它們都沒有被確認(rèn),當(dāng)我發(fā)第四條消息此時(shí)deliveryTag
為8,multiple
設(shè)置為 true,會(huì)將5、6、7、8的消息全部進(jìn)行確認(rèn)。
basicNack
:表示失敗確認(rèn),一般在消費(fèi)消息業(yè)務(wù)異常時(shí)用到此方法,可以將消息重新投遞入隊(duì)列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
deliveryTag
:表示消息投遞序號(hào)。
multiple
:是否批量確認(rèn)。
requeue
:值為
true
消息將重新入隊(duì)列。
basicReject
:拒絕消息,與basicNack
區(qū)別在于不能進(jìn)行批量操作,其他用法很相似。
void basicReject(long deliveryTag, boolean requeue)
deliveryTag
:表示消息投遞序號(hào)。
requeue
:值為
true
消息將重新入隊(duì)列。
這是一個(gè)非常沒技術(shù)含量的坑,但卻是非常容易犯錯(cuò)的地方。
開啟消息確認(rèn)機(jī)制,消費(fèi)消息別忘了channel.basicAck
,否則消息會(huì)一直存在,導(dǎo)致重復(fù)消費(fèi)。
在我最開始接觸消息確認(rèn)機(jī)制的時(shí)候,消費(fèi)端代碼就像下邊這樣寫的,思路很簡單:處理完業(yè)務(wù)邏輯后確認(rèn)消息,
int a = 1 / 0
發(fā)生異常后將消息重新投入隊(duì)列。
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {
try {
log.info("消費(fèi)者 2 號(hào)收到:{}", msg);
int a = 1 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
但是有個(gè)問題是,業(yè)務(wù)代碼一旦出現(xiàn)
bug
99.9%的情況是不會(huì)自動(dòng)修復(fù),一條消息會(huì)被無限投遞進(jìn)隊(duì)列,消費(fèi)端無限執(zhí)行,導(dǎo)致了死循環(huán)。
本地的CPU
被瞬間打滿了,大家可以想象一下當(dāng)時(shí)在生產(chǎn)環(huán)境導(dǎo)致服務(wù)死機(jī),我是有多慌。
而且rabbitmq management
只有一條未被確認(rèn)的消息。
經(jīng)過測(cè)試分析發(fā)現(xiàn),當(dāng)消息重新投遞到消息隊(duì)列時(shí),這條消息不會(huì)回到隊(duì)列尾部,仍是在隊(duì)列頭部。
消費(fèi)者會(huì)立刻消費(fèi)這條消息,業(yè)務(wù)處理再拋出異常,消息再重新入隊(duì),如此反復(fù)進(jìn)行。導(dǎo)致消息隊(duì)列處理出現(xiàn)阻塞,導(dǎo)致正常消息也無法運(yùn)行。
而我們當(dāng)時(shí)的解決方案是,先將消息進(jìn)行應(yīng)答,此時(shí)消息隊(duì)列會(huì)刪除該條消息,同時(shí)我們?cè)俅伟l(fā)送該消息到消息隊(duì)列,異常消息就放在了消息隊(duì)列尾部,這樣既保證消息不會(huì)丟失,又保證了正常業(yè)務(wù)的進(jìn)行。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新發(fā)送消息到隊(duì)尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
JSON.toJSONBytes(msg));
但這種方法并沒有解決根本問題,錯(cuò)誤消息還是會(huì)時(shí)不時(shí)報(bào)錯(cuò),后面優(yōu)化設(shè)置了消息重試次數(shù),達(dá)到了重試上限以后,手動(dòng)確認(rèn),隊(duì)列刪除此消息,并將消息持久化入MySQL
并推送報(bào)警,進(jìn)行人工處理和定時(shí)任務(wù)做補(bǔ)償。
如何保證 MQ 的消費(fèi)是冪等性,這個(gè)需要根據(jù)具體業(yè)務(wù)而定,可以借助MySQL
、或者redis
將消息持久化,通過再消息中的唯一性屬性校驗(yàn)。
到此,關(guān)于“怎么使用springboot + rabbitmq消息確認(rèn)機(jī)制”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。