您好,登錄后才能下訂單哦!
這篇文章主要介紹“MQ中怎么保證消息不被重復(fù)消費(fèi)”,在日常操作中,相信很多人在MQ中怎么保證消息不被重復(fù)消費(fèi)問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對(duì)大家解答”MQ中怎么保證消息不被重復(fù)消費(fèi)”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!
一. 重復(fù)消息
為什么會(huì)出現(xiàn)消息重復(fù)?消息重復(fù)的原因有兩個(gè):1.生產(chǎn)時(shí)消息重復(fù),2.消費(fèi)時(shí)消息重復(fù)。
1.1 生產(chǎn)時(shí)消息重復(fù)
由于生產(chǎn)者發(fā)送消息給MQ,在MQ確認(rèn)的時(shí)候出現(xiàn)了網(wǎng)絡(luò)波動(dòng),生產(chǎn)者沒有收到確認(rèn),實(shí)際上MQ已經(jīng)接收到了消息。這時(shí)候生產(chǎn)者就會(huì)重新發(fā)送一遍這條消息。
生產(chǎn)者中如果消息未被確認(rèn),或確認(rèn)失敗,我們可以使用定時(shí)任務(wù)+(redis/db)來進(jìn)行消息重試。
@Component
@Slf4J
public
class SendMessage {
@Autowired
private MessageService messageService;
@Autowired
private RabbitTemplate rabbitTemplate;
// 最大投遞次數(shù)
private static final int MAX_TRY_COUNT =
3;
/**
* 每30s拉取投遞失敗的消息, 重新投遞
*/
@Scheduled(cron =
"0/30 * * * * ?")
public void resend() {
log.info("開始執(zhí)行定時(shí)任務(wù)(重新投遞消息)");
List<MsgLog> msgLogs = messageService.selectTimeoutMsg();
msgLogs.forEach(msgLog -> {
String msgId = msgLog.getMsgId();
if (msgLog.getTryCount() >= MAX_TRY_COUNT) {
messageService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);
log.info("超過最大重試次數(shù), 消息投遞失敗, msgId: {}", msgId);
}
else {
messageService.updateTryCount(msgId, msgLog.getNextTryTime());// 投遞次數(shù)+1
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData);// 重新投遞
log.info("第 " + (msgLog.getTryCount() +
1) +
" 次重新投遞消息");
}
});
log.info("定時(shí)任務(wù)執(zhí)行結(jié)束(重新投遞消息)");
}
}
1.2 消費(fèi)時(shí)消息重復(fù)
消費(fèi)者消費(fèi)成功后,再給MQ確認(rèn)的時(shí)候出現(xiàn)了網(wǎng)絡(luò)波動(dòng),MQ沒有接收到確認(rèn),為了保證消息被消費(fèi),MQ就會(huì)繼續(xù)給消費(fèi)者投遞之前的消息。這時(shí)候消費(fèi)者就接收到了兩條一樣的消息。
修改消費(fèi)者,模擬異常
@RabbitListener(queuesToDeclare =
@Queue(value =
"javatrip", durable =
"true"))
public void receive(String message,
@Headers Map<String,Object> headers, Channel channel) throws Exception{
System.out.println("重試"+System.currentTimeMillis());
System.out.println(message);
int i =
1 /
0;
}
配置yml重試策略
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 開啟消費(fèi)者進(jìn)行重試
max-attempts:
5 # 最大重試次數(shù)
initial-interval:
3000 # 重試時(shí)間間隔
由于重復(fù)消息是由于網(wǎng)絡(luò)原因造成的,因此不可避免重復(fù)消息。但是我們需要保證消息的冪等性。
二. 如何保證消息冪等性
讓每個(gè)消息攜帶一個(gè)全局的唯一ID,即可保證消息的冪等性,具體消費(fèi)過程為:
消費(fèi)者獲取到消息后先根據(jù)id去查詢r(jià)edis/db是否存在該消息
如果不存在,則正常消費(fèi),消費(fèi)完畢后寫入redis/db
如果存在,則證明消息被消費(fèi)過,直接丟棄。
生產(chǎn)者
@PostMapping("/send")
public void sendMessage(){
JSONObject jsonObject =
new JSONObject();
jsonObject.put("message","Java旅途");
String json = jsonObject.toJSONString();
Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID()+"").build();
amqpTemplate.convertAndSend("javatrip",message);
}
消費(fèi)者
@Component
@RabbitListener(queuesToDeclare =
@Queue(value =
"javatrip", durable =
"true"))
public class Consumer {
@RabbitHandler
public void receiveMessage(Message message) throws Exception {
Jedis jedis =
new Jedis("localhost",
6379);
String messageId = message.getMessageProperties().getMessageId();
String msg =
new String(message.getBody(),"UTF-8");
System.out.println("接收到的消息為:"+msg+"==消息id為:"+messageId);
String messageIdRedis = jedis.get("messageId");
if(messageId == messageIdRedis){
return;
}
JSONObject jsonObject = JSONObject.parseObject(msg);
String email = jsonObject.getString("message");
jedis.set("messageId",messageId);
}
}
如果需要存入db的話,可以直接將這個(gè)ID設(shè)為消息的主鍵,下次如果獲取到重復(fù)消息進(jìn)行消費(fèi)時(shí),由于數(shù)據(jù)庫主鍵的唯一性,則會(huì)直接拋出異常。
到此,關(guān)于“MQ中怎么保證消息不被重復(fù)消費(fèi)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。