您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“rabbitmq多個消費者同時接收_RabbitMQ實現(xiàn)公平派遣任務(wù)的方法”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“rabbitmq多個消費者同時接收_RabbitMQ實現(xiàn)公平派遣任務(wù)的方法”吧!
先來看一下 RabbitMQ 工作隊列和競爭消費者簡化模型:
P:(producer/ publisher):生產(chǎn)者,一個發(fā)送消息的用戶應(yīng)用程序。
C1:消費者1,一個主要用來等待接收消息的用戶應(yīng)用程序1。
C2:消費者2,一個主要用來等待接收消息的用戶應(yīng)用程序2。
(C1 和 C2 指代多個消費者(工作人員))
工作隊列(任務(wù)隊列):主要用于在多個工作人員(消費者)之間分配耗時的任務(wù),主要思想是避免必須等待某些立即執(zhí)行的資源密集型任務(wù)完成的尷尬局面發(fā)生。將任務(wù)封裝為消息并將其發(fā)送到隊列,這樣,安排的任務(wù)可以在以后完成。當(dāng)有很多消費者時,任務(wù)將在他們之間共享,一個消息只能被一個消費者獲取。
這個概念在Web應(yīng)用程序中特別有用,因為在Web應(yīng)用程序中,不可能在較短的HTTP請求窗口內(nèi)處理復(fù)雜的任務(wù)。
生產(chǎn)者發(fā)送新任務(wù)消息:
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//循環(huán)發(fā)送 50 條信息
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
for(int i = 0; i < 50; i++) {
String message = "task..." + i;
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
System.out.println(" [x]Sent:" + message);
try {
Thread.sleep(i * 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
channel.close();
connection.close();
}
}
消費者1(消費者1設(shè)置消費耗時):
假設(shè)消耗 1 秒來處理消費過程
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 設(shè)置每個消費者同時只能處理一條消息
//channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[消費者1] Received '" + msg + "'");
try{
Thread.sleep(1000); // 模擬消費者耗時
}catch (InterruptedException e){
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}
}
消費者2(消費者2不設(shè)置消費耗時):
public class Worker2 {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 設(shè)置每個消費者同時只能處理一條消息
//channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[消費者2] Received '" + msg + "'");
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}
}
首先啟動消費者1(Worker)和消費者2(Worker2)應(yīng)用程序,再啟動生產(chǎn)者(NewTask)。
運行結(jié)果如下:
消費者1:
消費者2:
由結(jié)果可見,兩個消費者各自消費了 25 條信息,消息各不相同,從而實現(xiàn)了任務(wù)的分發(fā)。
公平派遣任務(wù)
由結(jié)果,我們注意到,工作者1(消費者1)的處理任務(wù)比較忙碌(消費耗時),而消費者2卻有大量時間處于空閑狀態(tài)不做任何任務(wù)。而 RabbitMQ 對此一無所知,進行平均分配消息,這是因為 RabbitMQ 在消息進入隊列才進行信息調(diào)度,不會進行未確認消息數(shù),只是盲目地將消息發(fā)送給消費者。
為了解決這一點,可以使用 basicQos 方法并將入?yún)?shù)設(shè)置為 1,這樣做的意義是:RabbitMQ 一次不要給消費者一個以上的消息。換句話即是在消費者處理并確認上一條信息之前,不要向其發(fā)送新的消息,而是分派給不繁忙的消費者進行消費。
在消費者1和消費者2加入一段代碼:
// 設(shè)置每個消費者同時只能處理一條消息
channel.basicQos(1);
再次運行結(jié)果:
消費者1:
消費者2:
因此,通過設(shè)置 basicQos ,可以讓 RabbitMQ 實現(xiàn)分輕重地對任務(wù)進行分派。
到此,相信大家對“rabbitmq多個消費者同時接收_RabbitMQ實現(xiàn)公平派遣任務(wù)的方法”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。