您好,登錄后才能下訂單哦!
RabbitMQ針對這個問題,提供了以下幾個機制來解決:
本篇博客我們先講解下生產者確認機制,剩余的機制后續(xù)單獨寫博客進行講解。
要想保證消息不丟失,首先我們得保證生產者能成功的將消息發(fā)送到RabbitMQ服務器。
但在之前的示例中,當生產者將消息發(fā)送出去之后,消息到底有沒有正確地到達服務器呢?如果不進行特殊配置,默認情況下發(fā)送消息的操作是不會返回任何消息給生產者的,也就是默認情況下生產者是不知道消息有沒有正確的到達服務器。
從basicPublish方法的返回類型我們也能知曉:
public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
this.basicPublish(exchange, routingKey, false, props, body);
}
為了更好理解,我們將之前的生產者Producer類中的channel.queueDeclare(QUEUE_NAME, false, false, false, null);
注釋:
package com.zwwhnly.springbootaction.rabbitmq.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
// 創(chuàng)建連接
ConnectionFactory factory = new ConnectionFactory();
// 設置 RabbitMQ 的主機名
factory.setHost("localhost");
// 創(chuàng)建一個連接
Connection connection = factory.newConnection();
// 創(chuàng)建一個通道
Channel channel = connection.createChannel();
// 指定一個隊列,不存在的話自動創(chuàng)建
//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 發(fā)送消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
// 關閉頻道和連接
channel.close();
connection.close();
}
}
此時運行代碼,因為隊列不存在,消息肯定沒地方存儲,但是程序卻并未出錯,也就是消息丟失了但是我們卻并不知曉。
RabblitMQ針對這個問題,提供了兩種解決方案:
RabblitMQ客戶端中與事務機制相關的方法有以下3個:
新建事務生產者類TransactionProducer,代碼如下:
package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TransactionProducer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
// 創(chuàng)建連接
ConnectionFactory factory = new ConnectionFactory();
// 設置 RabbitMQ 的主機名
factory.setHost("localhost");
// 創(chuàng)建一個連接
Connection connection = factory.newConnection();
// 創(chuàng)建一個通道
Channel channel = connection.createChannel();
// 指定一個隊列,不存在的話自動創(chuàng)建
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.txSelect();
// 發(fā)送消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
channel.txCommit();
System.out.println(" [x] Sent '" + message + "'");
// 關閉頻道和連接
channel.close();
connection.close();
}
}
運行代碼,發(fā)現隊列新增成功,消息發(fā)送成功:
稍微修改下代碼,看下異常機制的事務回滾:
try {
channel.txSelect();
// 發(fā)送消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
int result = 1 / 0;
channel.txCommit();
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e) {
e.printStackTrace();
channel.txRollback();
}
因為int result = 1 / 0;
肯定會觸發(fā)java.lang.ArithmeticException異常,所以事務會回滾,消息發(fā)送失?。?/p>
如果要發(fā)送多條消息,可以將channel.basicPublish,channel.txCommit等方法放在循環(huán)體內,如下所示:
channel.txSelect();
int loopTimes = 10;
for (int i = 0; i < loopTimes; i++) {
try {
// 發(fā)送消息
String message = "Hello World!" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
channel.txCommit();
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e) {
e.printStackTrace();
channel.txRollback();
}
}
雖然事務能夠解決消息發(fā)送方和RabbitMQ之間消息確認的問題,只有消息成功被RabbitMQ接收,事務才能提交成功,否則便可在捕獲異常之后進行事務回滾。但是使用事務機制會“吸干”RabbitMQ的性能,因此建議使用下面講到的發(fā)送方確認機制。
發(fā)送方確認機制是指生產者將信道設置成confirm(確認)模式,一旦信道進入confirm模式,所有在該信道上面發(fā)布的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到RabbitMQ服務器之后,RabbitMQ就會發(fā)送一個確認(Basic.Ack)給生產者(包含消息的唯一ID),這就使得生產者知曉消息已經正確到達了目的地了。
如果RabbitMQ因為自身內部錯誤導致消息丟失,就會發(fā)送一條nack(Basic.Nack)命令,生產者應用程序同樣可以在回調方法中處理該nack指令。
如果消息和隊列是可持久化的,那么確認消息會在消息寫入磁盤之后發(fā)出。
事務機制在一條消息發(fā)送之后會使發(fā)送端阻塞,以等待RabbitMQ的回應,之后才能繼續(xù)發(fā)送下一條消息。
相比之下,發(fā)送方確認機制最大的好處在于它是異步的,一旦發(fā)布一條消息。生產者應用程序就可以在等信道返回確認的同時繼續(xù)發(fā)送下一條消息,當消息最終得到確認后,生產者應用程序便可以通過回調方法來處理該確認消息。
新建確認生產類NormalConfirmProducer,代碼如下:
package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class NormalConfirmProducer {
private final static String EXCHANGE_NAME = "normal-confirm-exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 創(chuàng)建連接
ConnectionFactory factory = new ConnectionFactory();
// 設置 RabbitMQ 的主機名
factory.setHost("localhost");
// 創(chuàng)建一個連接
Connection connection = factory.newConnection();
// 創(chuàng)建一個通道
Channel channel = connection.createChannel();
// 創(chuàng)建一個Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
try {
channel.confirmSelect();
// 發(fā)送消息
String message = "normal confirm test";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
if (channel.waitForConfirms()) {
System.out.println("send message success");
} else {
System.out.println("send message failed");
// do something else...
}
} catch (InterruptedException e) {
e.printStackTrace();
}
// 關閉頻道和連接
channel.close();
connection.close();
}
}
channel.confirmSelect();將信道設置成confirm模式。
channel.waitForConfirms();等待發(fā)送消息的確認消息,如果發(fā)送成功,則返回ture,如果發(fā)送失敗,則返回false。
如果要發(fā)送多條消息,可以將channel.basicPublish,channel.waitForConfirms等方法放在循環(huán)體內,如下所示:
channel.confirmSelect();
int loopTimes = 10;
for (int i = 0; i < loopTimes; i++) {
try {
// 發(fā)送消息
String message = "normal confirm test" + i;
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
if (channel.waitForConfirms()) {
System.out.println("send message success");
} else {
System.out.println("send message failed");
// do something else...
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
運行結果:
send message success
send message success
send message success
send message success
send message success
send message success
send message success
send message success
send message success
send message success
如果不開啟信道的confirm模式,調用channel.waitForConfirms()會報錯:
注意事項:
1)事務機制和publisher confirm機制是互斥的,不能共存。
如果企圖將已開啟事務模式的信道再設置為publisher confirm模式,RabbitMQ會報錯:
channel.txSelect();
channel.confirmSelect();
如果企圖將已開啟publisher confirm模式的信道再設置為事務模式,RabbitMQ也會報錯:
channel.confirmSelect();
channel.txSelect();
2)事務機制和publisher confirm機制確保的是消息能夠正確地發(fā)送至RabbitMQ,這里的“發(fā)送至RabbitMQ”的含義是指消息被正確地發(fā)往至RabbitMQ的交換器,如果此交換器沒有匹配的隊列,那么消息也會丟失。所以在使用這兩種機制的時候要確保所涉及的交換器能夠有匹配的隊列。
比如上面的NormalConfirmProducer類發(fā)送的消息,發(fā)送到了交換器normal-confirm-exchange,但是該交換器并沒有綁定任何隊列,從業(yè)務角度來講,消息仍然是丟失了。
普通confirm模式是每發(fā)送一條消息后就調用channel.waitForConfirms()方法,之后等待服務端的確認,這實際上是一種串行同步等待的方式。因此相比于事務機制,性能提升的并不多。
批量confirm模式是每發(fā)送一批消息后,調用channel.waitForConfirms()方法,等待服務器的確認返回,因此相比于5.1中的普通confirm模式,性能更好。
但是不好的地方在于,如果出現返回Basic.Nack或者超時情況,生產者客戶端需要將這一批次的消息全部重發(fā),這樣會帶來明顯的重復消息數量,如果消息經常丟失,批量confirm模式的性能應該是不升反降的。
代碼示例:
package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class BatchConfirmProducer {
private final static String EXCHANGE_NAME = "batch-confirm-exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 創(chuàng)建連接
ConnectionFactory factory = new ConnectionFactory();
// 設置 RabbitMQ 的主機名
factory.setHost("localhost");
// 創(chuàng)建一個連接
Connection connection = factory.newConnection();
// 創(chuàng)建一個通道
Channel channel = connection.createChannel();
// 創(chuàng)建一個Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
int batchCount = 100;
int msgCount = 0;
BlockingQueue blockingQueue = new ArrayBlockingQueue(100);
try {
channel.confirmSelect();
while (msgCount <= batchCount) {
String message = "batch confirm test";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
// 將發(fā)送出去的消息存入緩存中,緩存可以是一個ArrayList或者BlockingQueue之類的
blockingQueue.add(message);
if (++msgCount >= batchCount) {
try {
if (channel.waitForConfirms()) {
// 將緩存中的消息清空
blockingQueue.clear();
} else {
// 將緩存中的消息重新發(fā)送
}
} catch (InterruptedException e) {
e.printStackTrace();
// 將緩存中的消息重新發(fā)送
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
// 關閉頻道和連接
channel.close();
connection.close();
}
}
異步confirm模式是在生產者客戶端添加ConfirmListener回調接口,重寫接口的handAck()和handNack()方法,分別用來處理RabblitMQ回傳的Basic.Ack和Basic.Nack。
這兩個方法都有兩個參數,第1個參數deliveryTag用來標記消息的唯一序列號,第2個參數multiple表示的是是否為多條確認,值為true代表是多個確認,值為false代表是單個確認。
示例代碼:
package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
public class AsyncConfirmProducer {
private final static String EXCHANGE_NAME = "async-confirm-exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 創(chuàng)建連接
ConnectionFactory factory = new ConnectionFactory();
// 設置 RabbitMQ 的主機名
factory.setHost("localhost");
// 創(chuàng)建一個連接
Connection connection = factory.newConnection();
// 創(chuàng)建一個通道
Channel channel = connection.createChannel();
// 創(chuàng)建一個Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
int batchCount = 100;
long msgCount = 1;
SortedSet<Long> confirmSet = new TreeSet<Long>();
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Ack,SeqNo:" + deliveryTag + ",multiple:" + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag - 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack,SeqNo:" + deliveryTag + ",multiple:" + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag - 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
// 注意這里需要添加處理消息重發(fā)的場景
}
});
// 演示發(fā)送100個消息
while (msgCount <= batchCount) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(EXCHANGE_NAME, "", null, "async confirm test".getBytes());
confirmSet.add(nextSeqNo);
msgCount = nextSeqNo;
}
// 關閉頻道和連接
channel.close();
connection.close();
}
}
運行結果:
Ack,SeqNo:1,multiple:false
Ack,SeqNo:2,multiple:false
Ack,SeqNo:3,multiple:false
Ack,SeqNo:4,multiple:false
Ack,SeqNo:5,multiple:false
Ack,SeqNo:6,multiple:false
Ack,SeqNo:7,multiple:false
Ack,SeqNo:8,multiple:false
Ack,SeqNo:9,multiple:false
Ack,SeqNo:10,multiple:false
注意:多次運行,發(fā)現每次運行的輸出結果是不一樣的,說明RabbitMQ端回傳給生產者的ack消息并不是以固定的批量大小回傳的。
到目前為止,我們了解到4種模式(事務機制,普通confirm,批量confirm,異步confirm)可以實現生產者確認,讓我們來對比下它們的性能,簡單修改下以上示例代碼中發(fā)送消息的數量,比如10000條,以下為4種模式的耗時:
發(fā)送10000條消息,事務機制耗時:2103
發(fā)送10000條消息,普通confirm機制耗時:1483
發(fā)送10000條消息,批量confirm機制耗時:281
發(fā)送10000條消息,異步confirm機制耗時:214
可以看出,事務機制最慢,普通confirm機制雖有提升但是不多,批量confirm和異步confirm性能最好,大家可以根據自己喜好自行選擇使用哪種機制,個人建議使用異步confirm機制。
Java學習、面試;文檔、視頻資源免費獲取
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。