溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

docker啟動(dòng)rabbitmq及使用的方法

發(fā)布時(shí)間:2022-08-04 16:51:41 來源:億速云 閱讀:177 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹了docker啟動(dòng)rabbitmq及使用的方法的相關(guān)知識(shí),內(nèi)容詳細(xì)易懂,操作簡(jiǎn)單快捷,具有一定借鑒價(jià)值,相信大家閱讀完這篇docker啟動(dòng)rabbitmq及使用的方法文章都會(huì)有所收獲,下面我們一起來看看吧。

    搜索rabbitmq鏡像

    docker search rabbitmq:management

    docker啟動(dòng)rabbitmq及使用的方法

    下載鏡像

    docker pull rabbitmq:management

    docker啟動(dòng)rabbitmq及使用的方法

    啟動(dòng)容器

    docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management

    docker啟動(dòng)rabbitmq及使用的方法

    docker啟動(dòng)rabbitmq及使用的方法

    打印容器

    docker logs rabbitmq

    docker啟動(dòng)rabbitmq及使用的方法

    docker啟動(dòng)rabbitmq及使用的方法

    訪問RabbitMQ Management

    http://localhost:15672

    賬戶密碼默認(rèn):guest

    docker啟動(dòng)rabbitmq及使用的方法

    編寫生產(chǎn)者類

    package com.xun.rabbitmqdemo.example;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Producer {
        private final static String QUEUE_NAME = "hello";
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            Connection connection = factory.newConnection();
    
            Channel channel = connection.createChannel();
            /**
             * 生成一個(gè)queue隊(duì)列
             * 1、隊(duì)列名稱 QUEUE_NAME
             * 2、隊(duì)列里面的消息是否持久化(默認(rèn)消息存儲(chǔ)在內(nèi)存中)
             * 3、該隊(duì)列是否只供一個(gè)Consumer消費(fèi) 是否共享 設(shè)置為true可以多個(gè)消費(fèi)者消費(fèi)
             * 4、是否自動(dòng)刪除 最后一個(gè)消費(fèi)者斷開連接后 該隊(duì)列是否自動(dòng)刪除
             * 5、其他參數(shù)
             */
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            String message = "Hello world!";
            /**
             * 發(fā)送一個(gè)消息
             * 1、發(fā)送到哪個(gè)exchange交換機(jī)
             * 2、路由的key
             * 3、其他的參數(shù)信息
             * 4、消息體
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println(" [x] Sent '"+message+"'");
    
            channel.close();
            connection.close();
        }
    }

    運(yùn)行該方法,可以看到控制臺(tái)的打印

    docker啟動(dòng)rabbitmq及使用的方法

    name=hello的隊(duì)列收到Message

    docker啟動(dòng)rabbitmq及使用的方法

    消費(fèi)者

    package com.xun.rabbitmqdemo.example;
    
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Receiver {
        private final static String QUEUE_NAME = "hello";
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setConnectionTimeout(600000);//milliseconds
            factory.setRequestedHeartbeat(60);//seconds
            factory.setHandshakeTimeout(6000);//milliseconds
            factory.setRequestedChannelMax(5);
            factory.setNetworkRecoveryInterval(500);
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            System.out.println("Waiting for messages. ");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
                }
            };
            channel.basicConsume(QUEUE_NAME,true,consumer);
        }
    }

    docker啟動(dòng)rabbitmq及使用的方法

    docker啟動(dòng)rabbitmq及使用的方法

    工作隊(duì)列

    RabbitMqUtils工具類

    package com.xun.rabbitmqdemo.utils;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class RabbitMqUtils {
        public static Channel getChannel() throws Exception{
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setUsername("guest");
            factory.setPassword("guest");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            return channel;
        }
    }

    啟動(dòng)2個(gè)工作線程

    package com.xun.rabbitmqdemo.workQueue;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    
    public class Work01 {
        private static final String QUEUE_NAME = "hello";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag,delivery)->{
                String receivedMessage = new String(delivery.getBody());
                System.out.println("接收消息:"+receivedMessage);
            };
            CancelCallback cancelCallback = (consumerTag)->{
                System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
            };
            System.out.println("C1 消費(fèi)者啟動(dòng)等待消費(fèi)....");
            /**
             * 消費(fèi)者消費(fèi)消息
             * 1、消費(fèi)哪個(gè)隊(duì)列
             * 2、消費(fèi)成功后是否自動(dòng)應(yīng)答
             * 3、消費(fèi)的接口回調(diào)
             * 4、消費(fèi)未成功的接口回調(diào)
             */
            channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
        }
    }
    package com.xun.rabbitmqdemo.workQueue;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    
    public class Work02 {
        private static final String QUEUE_NAME = "hello";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag,delivery)->{
                String receivedMessage = new String(delivery.getBody());
                System.out.println("接收消息:"+receivedMessage);
            };
            CancelCallback cancelCallback = (consumerTag)->{
                System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
            };
            System.out.println("C2 消費(fèi)者啟動(dòng)等待消費(fèi)....");
            channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
        }
    }

    啟動(dòng)工作線程

    docker啟動(dòng)rabbitmq及使用的方法

    啟動(dòng)發(fā)送線程

    package com.xun.rabbitmqdemo.workQueue;
    
    import com.rabbitmq.client.Channel;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    import java.util.Scanner;
    
    public class Task01 {
        private static final String QUEUE_NAME = "hello";
        public static void main(String[] args) throws Exception{
            try(Channel channel= RabbitMqUtils.getChannel();){
                channel.queueDeclare(QUEUE_NAME,false,false,false,null);
                //從控制臺(tái)接收消息
                Scanner scanner = new Scanner(System.in);
                while(scanner.hasNext()){
                    String message = scanner.next();
                    channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                    System.out.println("發(fā)送消息完成:"+message);
                }
            }
        }
    }

    啟動(dòng)發(fā)送線程,此時(shí)發(fā)送線程等待鍵盤輸入

    docker啟動(dòng)rabbitmq及使用的方法

    發(fā)送4個(gè)消息

    docker啟動(dòng)rabbitmq及使用的方法

    docker啟動(dòng)rabbitmq及使用的方法

    docker啟動(dòng)rabbitmq及使用的方法

    可以看到2個(gè)工作線程按照順序分別接收message。

    消息應(yīng)答機(jī)制

    rabbitmq將message發(fā)送給消費(fèi)者后,就會(huì)將該消息標(biāo)記為刪除。

    但消費(fèi)者在處理message過程中宕機(jī),會(huì)導(dǎo)致消息的丟失。

    因此需要設(shè)置手動(dòng)應(yīng)答。

    生產(chǎn)者

    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    import java.util.Scanner;
    
    public class Task02 {
        private static final String TASK_QUEUE_NAME = "ack_queue";
        public static void main(String[] args) throws Exception{
            try(Channel channel = RabbitMqUtils.getChannel()){
                channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
                Scanner scanner = new Scanner(System.in);
                System.out.println("請(qǐng)輸入信息");
                while(scanner.hasNext()){
                    String message = scanner.nextLine();
                    channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
                    System.out.println("生產(chǎn)者task02發(fā)出消息"+ message);
                }
            }
        }
    }

    消費(fèi)者

    package com.xun.rabbitmqdemo.workQueue;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    import com.xun.rabbitmqdemo.utils.SleepUtils;
    
    public class Work03 {
        private static final String ACK_QUEUE_NAME = "ack_queue";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            System.out.println("Work03 等待接收消息處理時(shí)間較短");
            DeliverCallback deliverCallback = (consumerTag,delivery)->{
                String message = new String(delivery.getBody());
                SleepUtils.sleep(1);
                System.out.println("接收到消息:"+message);
                /**
                 * 1、消息的標(biāo)記tag
                 * 2、是否批量應(yīng)答
                 */
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            };
            CancelCallback cancelCallback = (consumerTag)->{
                System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
            };
            //采用手動(dòng)應(yīng)答
            boolean autoAck = false;
            channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
        }
    }
    package com.xun.rabbitmqdemo.workQueue;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    import com.xun.rabbitmqdemo.utils.SleepUtils;
    
    public class Work04 {
        private static final String ACK_QUEUE_NAME = "ack_queue";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            System.out.println("Work04 等待接收消息處理時(shí)間較長(zhǎng)");
            DeliverCallback deliverCallback = (consumerTag,delivery)->{
                String message = new String(delivery.getBody());
                SleepUtils.sleep(30);
                System.out.println("接收到消息:"+message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            };
            CancelCallback cancelCallback = (consumerTag)->{
                System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
            };
            //采用手動(dòng)應(yīng)答
            boolean autoAck = false;
            channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
        }
    }

    工具類SleepUtils

    package com.xun.rabbitmqdemo.utils;
    public class SleepUtils {
        public static void sleep(int second){
            try{
                Thread.sleep(1000*second);
            }catch (InterruptedException _ignored){
                Thread.currentThread().interrupt();
            }
        }
    }

    模擬

    docker啟動(dòng)rabbitmq及使用的方法

    docker啟動(dòng)rabbitmq及使用的方法

    docker啟動(dòng)rabbitmq及使用的方法

    work04等待30s后發(fā)出ack

    docker啟動(dòng)rabbitmq及使用的方法

    在work04處理message時(shí)手動(dòng)停止線程,可以看到message:dd被rabbitmq交給了work03

    docker啟動(dòng)rabbitmq及使用的方法

    docker啟動(dòng)rabbitmq及使用的方法

    docker啟動(dòng)rabbitmq及使用的方法

    不公平分發(fā)

    上面的輪詢分發(fā),生產(chǎn)者依次向消費(fèi)者按順序發(fā)送消息,但當(dāng)消費(fèi)者A處理速度很快,而消費(fèi)者B處理速度很慢時(shí),這種分發(fā)策略顯然是不合理的。
    不公平分發(fā):

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);

    通過此配置,當(dāng)消費(fèi)者未處理完當(dāng)前消息,rabbitmq會(huì)優(yōu)先將該message分發(fā)給空閑消費(fèi)者。

    docker啟動(dòng)rabbitmq及使用的方法

    關(guān)于“docker啟動(dòng)rabbitmq及使用的方法”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對(duì)“docker啟動(dòng)rabbitmq及使用的方法”知識(shí)都有一定的了解,大家如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

    向AI問一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI