溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

rabbitmq多個消費者同時接收_RabbitMQ實現(xiàn)公平派遣任務(wù)的方法

發(fā)布時間:2021-06-25 13:56:09 來源:億速云 閱讀:168 作者:chen 欄目:編程語言

本篇內(nèi)容主要講解“rabbitmq多個消費者同時接收_RabbitMQ實現(xiàn)公平派遣任務(wù)的方法”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“rabbitmq多個消費者同時接收_RabbitMQ實現(xiàn)公平派遣任務(wù)的方法”吧!

先來看一下 RabbitMQ 工作隊列和競爭消費者簡化模型:


P:(producer/ publisher):生產(chǎn)者,一個發(fā)送消息的用戶應(yīng)用程序。
C1:消費者1,一個主要用來等待接收消息的用戶應(yīng)用程序1。
C2:消費者2,一個主要用來等待接收消息的用戶應(yīng)用程序2。
(C1 和 C2 指代多個消費者(工作人員))
工作隊列(任務(wù)隊列):主要用于在多個工作人員(消費者)之間分配耗時的任務(wù),主要思想是避免必須等待某些立即執(zhí)行的資源密集型任務(wù)完成的尷尬局面發(fā)生。將任務(wù)封裝為消息并將其發(fā)送到隊列,這樣,安排的任務(wù)可以在以后完成。當(dāng)有很多消費者時,任務(wù)將在他們之間共享,一個消息只能被一個消費者獲取。

這個概念在Web應(yīng)用程序中特別有用,因為在Web應(yīng)用程序中,不可能在較短的HTTP請求窗口內(nèi)處理復(fù)雜的任務(wù)。

生產(chǎn)者發(fā)送新任務(wù)消息:

public class NewTask {
 
    private static final String TASK_QUEUE_NAME = "task_queue";
 
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //循環(huán)發(fā)送 50 條信息
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        for(int i = 0; i < 50; i++) {
            String message = "task..." + i;
            channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
            System.out.println(" [x]Sent:" + message);
            try {
                Thread.sleep(i * 2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
 
        channel.close();
        connection.close();
 
    }
}
消費者1(消費者1設(shè)置消費耗時):

假設(shè)消耗 1 秒來處理消費過程

public class Worker {
 
    private static final String TASK_QUEUE_NAME = "task_queue";
 
    public static void main(String[] args) throws IOException, TimeoutException {
 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
 
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        // 設(shè)置每個消費者同時只能處理一條消息
        //channel.basicQos(1);
 
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println("[消費者1] Received '" + msg + "'");
                try{
                    Thread.sleep(1000);  // 模擬消費者耗時
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
    }
}
消費者2(消費者2不設(shè)置消費耗時):

public class Worker2 {
    private static final String TASK_QUEUE_NAME = "task_queue";
 
    public static void main(String[] args) throws IOException, TimeoutException {
 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
 
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        // 設(shè)置每個消費者同時只能處理一條消息
        //channel.basicQos(1);
        
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println("[消費者2] Received '" + msg + "'");
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
    }
}
首先啟動消費者1(Worker)和消費者2(Worker2)應(yīng)用程序,再啟動生產(chǎn)者(NewTask)。

運行結(jié)果如下:

消費者1:


消費者2:


由結(jié)果可見,兩個消費者各自消費了 25 條信息,消息各不相同,從而實現(xiàn)了任務(wù)的分發(fā)。

公平派遣任務(wù)

由結(jié)果,我們注意到,工作者1(消費者1)的處理任務(wù)比較忙碌(消費耗時),而消費者2卻有大量時間處于空閑狀態(tài)不做任何任務(wù)。而 RabbitMQ 對此一無所知,進行平均分配消息,這是因為 RabbitMQ 在消息進入隊列才進行信息調(diào)度,不會進行未確認消息數(shù),只是盲目地將消息發(fā)送給消費者。

為了解決這一點,可以使用 basicQos 方法并將入?yún)?shù)設(shè)置為 1,這樣做的意義是:RabbitMQ 一次不要給消費者一個以上的消息。換句話即是在消費者處理并確認上一條信息之前,不要向其發(fā)送新的消息,而是分派給不繁忙的消費者進行消費。

在消費者1和消費者2加入一段代碼:

// 設(shè)置每個消費者同時只能處理一條消息
channel.basicQos(1);
再次運行結(jié)果:

消費者1:


消費者2:


因此,通過設(shè)置 basicQos ,可以讓 RabbitMQ 實現(xiàn)分輕重地對任務(wù)進行分派。
 

到此,相信大家對“rabbitmq多個消費者同時接收_RabbitMQ實現(xiàn)公平派遣任務(wù)的方法”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI