溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Docker怎么啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者

發(fā)布時(shí)間:2023-02-25 09:53:30 來(lái)源:億速云 閱讀:95 作者:iii 欄目:開(kāi)發(fā)技術(shù)

這篇文章主要講解了“Docker怎么啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Docker怎么啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者”吧!

一、Docker拉取鏡像并啟動(dòng)RabbitMQ

拉取鏡像

docker pull rabbitmq:3.8.8-management

查看鏡像

docker images rabbitmq

Docker怎么啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者

 啟動(dòng)鏡像

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.8-management

Linux虛擬機(jī)記得開(kāi)放5672端口或者關(guān)閉防火墻,在window通過(guò) 主機(jī)ip:15672 訪問(wèn)rabbitmq控制臺(tái)

Docker怎么啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者

 用戶名密碼默認(rèn)為guest

Docker怎么啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者

二、Hello World

(一)依賴導(dǎo)入

<!--指定 jdk 編譯版本-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <!--rabbitmq 依賴客戶端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <!--操作文件流的一個(gè)依賴-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>

(二)消息生產(chǎn)者

工作原理

Docker怎么啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者

  • Broker:接收和分發(fā)消息的應(yīng)用,RabbitMQ Server 就是 Message Broker

  • Connection:publisher/consumer 和 broker 之間的 TCP 連接

  • Channel:如果每一次訪問(wèn) RabbitMQ 都建立一個(gè) Connection,在消息量大的時(shí)候建立 TCP Connection 的開(kāi)銷將是巨大的,效率也較低。Channel 是在 connection 內(nèi)部建立的邏輯連接,如果應(yīng)用程序支持多線程,通常每個(gè) thread 創(chuàng)建單獨(dú)的 channel 進(jìn)行通訊,AMQP method 包含了 channel id 幫助客戶端和 message broker 識(shí)別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級(jí)的 Connection 極大減少了操作系統(tǒng)建立 TCP connection 的開(kāi)銷

  • Exchange:message 到達(dá) broker 的第一站,根據(jù)分發(fā)規(guī)則,匹配查詢表中的 routing key,分發(fā)消息到 queue 中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

  • Queue:消息最終被送到這里等待 consumer 取走

我們需要先獲取連接(Connection),然后通過(guò)連接獲取信道(Channel),這里我們演示簡(jiǎn)單例子,可以直接跳過(guò)交換機(jī)(Exchange)發(fā)送隊(duì)列(Queue)

public class Producer {
 
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws Exception {
        //創(chuàng)建一個(gè)連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置主機(jī)ip
        factory.setHost("182.92.234.71");
        // 設(shè)置用戶名
        factory.setUsername("guest");
        // 設(shè)置密碼
        factory.setPassword("guest");
        //channel 實(shí)現(xiàn)了自動(dòng) close 接口 自動(dòng)關(guān)閉 不需要顯示關(guān)閉
        Connection connection = factory.newConnection();
        // 獲取信道
        Channel channel = connection.createChannel();
        /*
         * 生成一個(gè)隊(duì)列
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments)
         * 1.隊(duì)列名稱
         * 2.隊(duì)列里面的消息是否持久化 默認(rèn)消息存儲(chǔ)在內(nèi)存中
         * 3.該隊(duì)列是否只供一個(gè)消費(fèi)者進(jìn)行消費(fèi) 是否進(jìn)行共享 true 可以多個(gè)消費(fèi)者消費(fèi)
         * 4.是否自動(dòng)刪除 最后一個(gè)消費(fèi)者端開(kāi)連接以后 該隊(duì)列是否自動(dòng)刪除 true 自動(dòng)刪除
         * 5.其他參數(shù)
         **/
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "hello rabbitmq";
        /*
         * 發(fā)送一個(gè)消息
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * 1.發(fā)送到哪個(gè)交換機(jī)
         * 2.路由的key是哪個(gè)
         * 3.其他的參數(shù)信息
         * 4.發(fā)送消息的消息體
         *
         **/
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("發(fā)送成功");
    }
}

(三)消息消費(fèi)者

public class Consumer {
 
    private static final String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws Exception {
        //創(chuàng)建一個(gè)連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置主機(jī)ip
        factory.setHost("182.92.234.71");
        // 設(shè)置用戶名
        factory.setUsername("guest");
        // 設(shè)置密碼
        factory.setPassword("guest");
        //channel 實(shí)現(xiàn)了自動(dòng) close 接口 自動(dòng)關(guān)閉 不需要顯示關(guān)閉
        Connection connection = factory.newConnection();
        // 獲取信道
        Channel channel = connection.createChannel();
 
        // 推送的消息如何進(jìn)行消費(fèi)的回調(diào)接口
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        // 取消消費(fèi)的一個(gè)回調(diào)接口,如在消費(fèi)的時(shí)候隊(duì)列被刪除了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消費(fèi)被中斷");
        };
        /*
         * 消費(fèi)者消費(fèi)消息
         * basicConsume(String queue, boolean autoAck, 
         * DeliverCallback deliverCallback, CancelCallback cancelCallback)
         * 1.消費(fèi)哪個(gè)隊(duì)列
         * 2.消費(fèi)成功之后是否要自動(dòng)應(yīng)答 true 代表自動(dòng)應(yīng)答 false 手動(dòng)應(yīng)答
         * 3.消費(fèi)者未成功消費(fèi)的回調(diào)
         **/
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

三、實(shí)現(xiàn)輪訓(xùn)分發(fā)消息

(一)抽取工具類

可以發(fā)現(xiàn),上面獲取連接工廠,然后獲取連接,再獲取信道的步驟是一致的,我們可以抽取成一個(gè)工具類來(lái)調(diào)用,并使用單例模式-餓漢式完成信道的初始化

public class RabbitMqUtils {
 
    private static Channel channel;
 
    static {
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置ip地址
        factory.setHost("192.168.23.100");
        // 設(shè)置用戶名
        factory.setUsername("guest");
        // 設(shè)置密碼
        factory.setPassword("guest");
        try {
            // 創(chuàng)建連接
            Connection connection = factory.newConnection();
            // 獲取信道
            channel = connection.createChannel();
        } catch (Exception e) {
            System.out.println("創(chuàng)建信道失敗,錯(cuò)誤信息:" + e.getMessage());
        }
    }
 
    public static Channel getChannel() {
        return channel;
    }
}

(二)啟動(dòng)兩個(gè)工作線程

相當(dāng)于前面的消費(fèi)者,我們只需要寫(xiě)一個(gè)類,通過(guò)ideal實(shí)現(xiàn)多線程啟動(dòng)即可模擬兩個(gè)線程

public class Worker01 {
 
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            System.out.println("接受到消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
        };
        // 啟動(dòng)兩次,第一次為C1, 第二次為C2
        System.out.println("C2消費(fèi)者等待消費(fèi)消息");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback,cancelCallback);
    }
}

(三)啟動(dòng)發(fā)送線程

public class Test01 {
 
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 通過(guò)控制臺(tái)輸入充當(dāng)消息,使輪訓(xùn)演示更明顯
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
            System.out.println("消息發(fā)送完成:" + message);
        }
    }
}

結(jié)果 

Docker怎么啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者

四、實(shí)現(xiàn)手動(dòng)應(yīng)答

(一)消息應(yīng)答概念

消費(fèi)者完成一個(gè)任務(wù)可能需要一段時(shí)間,如果其中一個(gè)消費(fèi)者處理一個(gè)長(zhǎng)的任務(wù)并僅只完成 了部分突然它掛掉了,會(huì)發(fā)生什么情況。RabbitMQ 一旦向消費(fèi)者傳遞了一條消息,便立即將該消 息標(biāo)記為刪除。在這種情況下,突然有個(gè)消費(fèi)者掛掉了,我們將丟失正在處理的消息。以及后續(xù) 發(fā)送給該消費(fèi)這的消息,因?yàn)樗鼰o(wú)法接收到。 為了保證消息在發(fā)送過(guò)程中不丟失,rabbitmq 引入消息應(yīng)答機(jī)制,消息應(yīng)答就是: 消費(fèi)者在接 收到消息并且處理該消息之后,告訴 rabbitmq 它已經(jīng)處理了,rabbitmq 可以把該消息刪除了。

自動(dòng)應(yīng)答:消費(fèi)者發(fā)送后立即被認(rèn)為已經(jīng)傳送成功。這種模式需要在高吞吐量和數(shù)據(jù)傳輸安全性方面做權(quán),因?yàn)檫@種模式如果消息在接收到之前,消費(fèi)者那邊出現(xiàn)連接或者 channel 關(guān)閉,那么消息就丟失了。

當(dāng)然另一方面這種模式消費(fèi)者那邊可以傳遞過(guò)載的消息, 沒(méi)有對(duì)傳遞的消息數(shù)量進(jìn)行限制 , 當(dāng)然這樣有可能使得消費(fèi)者這邊由于接收太多還來(lái)不及處理的消息,導(dǎo)致這些消息的積壓,最終 使得內(nèi)存耗盡,最終這些消費(fèi)者線程被操作系統(tǒng)殺死,所以這種模式僅適用在消費(fèi)者可以高效并 以某種速率能夠處理這些消息的情況下使用 。

手動(dòng)應(yīng)答:消費(fèi)者接受到消息并順利完成業(yè)務(wù)后再調(diào)用方法進(jìn)行確認(rèn),rabbitmq 才可以把該消息刪除

(二)消息應(yīng)答的方法

  • Channel.basicAck(用于肯定確認(rèn))

  • RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了

  • Channel.basicNack(用于否定確認(rèn))

  • Channel.basicReject(用于否定確認(rèn))

  • 與 Channel.basicNack 相比少一個(gè)參數(shù)Multiple

  • multiple 的 true 和 false 代表不同意思

        true 代表批量應(yīng)答 channel 上未應(yīng)答的消息
        比如說(shuō) channel 上有傳送 tag 的消息 5,6,7,8 當(dāng)前 tag 是 8 那么此時(shí)
        5-8 的這些還未應(yīng)答的消息都會(huì)被確認(rèn)收到消息應(yīng)答
        false 同上面相比
        只會(huì)應(yīng)答 tag=8 的消息 5,6,7 這三個(gè)消息依然不會(huì)被確認(rèn)收到消息應(yīng)答

  • 不處理該消息了直接拒絕,可以將其丟棄了

Docker怎么啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者

(三)消息自動(dòng)重新入隊(duì) 

如果消費(fèi)者由于某些原因失去連接(其通道已關(guān)閉,連接已關(guān)閉或 TCP 連接丟失),導(dǎo)致消息未發(fā)送 ACK 確認(rèn),RabbitMQ 將了解到消息未完全處理,并將對(duì)其重新排隊(duì)。如果此時(shí)其他消費(fèi)者可以處理,它將很快將其重新分發(fā)給另一個(gè)消費(fèi)者。這樣,即使某個(gè)消費(fèi)者偶爾死亡,也可以確 保不會(huì)丟失任何消息。

Docker怎么啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者

(四)消息手動(dòng)應(yīng)答代碼 

1、生產(chǎn)者

public class Test01 {
 
    private final static String QUEUE_NAME = "ack";
 
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
            System.out.println("消息發(fā)送完成:" + message);
        }
    }
}

2、睡眠工具類模擬業(yè)務(wù)執(zhí)行

public class SleepUtils {
 
    public static void sleep(int second) {
        try {
            Thread.sleep(1000 * second);
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

3、消費(fèi)者

public class Worker01 {
 
    private final static String QUEUE_NAME = "ack";
 
    public static void main(String[] args) throws Exception {
        System.out.println("C1,業(yè)務(wù)時(shí)間短");
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            SleepUtils.sleep(1);  // 模擬業(yè)務(wù)執(zhí)行1秒
            System.out.println("接受到消息:" + new String(message.getBody()));
            /*
             * 1、消息標(biāo)識(shí)
             * 2、是否啟動(dòng)批量確認(rèn),false:否。
             *    啟用批量有可能造成消息丟失,比如未消費(fèi)的消息提前被確然刪除,后面業(yè)務(wù)消費(fèi)該消息
             *    時(shí)出現(xiàn)異常會(huì)導(dǎo)致該消息的丟失
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
    }
}
 
==============================================================================
public class Worker02 {
 
    private final static String QUEUE_NAME = "ack";
 
    public static void main(String[] args) throws Exception {
        System.out.println("C2,業(yè)務(wù)時(shí)間長(zhǎng)");
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            SleepUtils.sleep(15);  // 模擬業(yè)務(wù)執(zhí)行15秒
            System.out.println("接受到消息:" + new String(message.getBody()));
            /*
             * 1、消息標(biāo)識(shí)
             * 2、是否啟動(dòng)批量確認(rèn),false:否。
             *    啟用批量有可能造成消息丟失,比如未消費(fèi)的消息提前被確然刪除,后面業(yè)務(wù)消費(fèi)該消息
             *    時(shí)出現(xiàn)異常會(huì)導(dǎo)致該消息的丟失
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
    }
}

worker01業(yè)務(wù)時(shí)間短,worker02業(yè)務(wù)時(shí)間長(zhǎng),我們提前終止worker02模擬出異常,可以看到消息dd會(huì)被放回隊(duì)列由worker01接收處理。

注意:這里需要先啟動(dòng)生產(chǎn)者聲明隊(duì)列ack,不然啟動(dòng)消費(fèi)者會(huì)報(bào)錯(cuò)

Docker怎么啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者

最后一個(gè)案例我們可以看到消息輪訓(xùn)+消息自動(dòng)重新入隊(duì)+手動(dòng)應(yīng)答。

感謝各位的閱讀,以上就是“Docker怎么啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Docker怎么啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI