溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

發(fā)布時(shí)間:2020-08-07 19:12:26 來(lái)源:網(wǎng)絡(luò) 閱讀:416 作者:Java_老男孩 欄目:編程語(yǔ)言

一、發(fā)送消息到隊(duì)列(生產(chǎn)者)

新建一個(gè)maven項(xiàng)目,在pom.xml文件加入以下依賴

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>3.6.5</version>
    </dependency>
</dependencies><br>

新建一個(gè)P1類

package com.rabbitMQ.test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @create 2019/11/20-11:23
 */
public class P1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //消息隊(duì)列名字
        String queueName="queue";
        //實(shí)例連接工廠
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //設(shè)置地址
        connectionFactory.setHost("192.168.128.233");
        //設(shè)置端口
        connectionFactory.setPort(5672);
        //設(shè)置用戶名
        connectionFactory.setUsername("mowen");
        //設(shè)置密碼
        connectionFactory.setPassword("123456");
        //獲取連接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //創(chuàng)建通道
        Channel channel = connection.createChannel();
        //聲明隊(duì)列。
        //參數(shù)1:隊(duì)列名
        //參數(shù)2:持久化 (true表示是,隊(duì)列將在服務(wù)器重啟時(shí)依舊存在)
        //參數(shù)3:獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列,斷開(kāi)后自動(dòng)刪除)
        //參數(shù)4:當(dāng)所有消費(fèi)者客戶端連接斷開(kāi)時(shí)是否自動(dòng)刪除隊(duì)列
        //參數(shù)5:隊(duì)列的其他參數(shù)
        channel.queueDeclare(queueName,true,false,false,null);

        for (int i = 0; i < 10; i++) {
            String msg="msg"+i;
            // 基本發(fā)布消息
            // 第一個(gè)參數(shù)為交換機(jī)名稱、
            // 第二個(gè)參數(shù)為隊(duì)列映射的路由key、
            // 第三個(gè)參數(shù)為消息的其他屬性、
            // 第四個(gè)參數(shù)為發(fā)送信息的主體
            channel.basicPublish("",queueName,null,msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}

運(yùn)行后再瀏覽器進(jìn)入RabbitMQ的控制臺(tái),切換到queue看到

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

二、獲取隊(duì)列消息(消費(fèi)者)

新建一個(gè)C1類

package com.rabbitMQ.test;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @create 2019/11/20-13:12
 */
public class C1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //消息隊(duì)列名字
        String queueName="queue";
        //實(shí)例連接工廠
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //設(shè)置地址
        connectionFactory.setHost("192.168.128.233");
        //設(shè)置端口
        connectionFactory.setPort(5672);
        //設(shè)置用戶名
        connectionFactory.setUsername("mowen");
        //設(shè)置密碼
        connectionFactory.setPassword("123456");
        //獲取連接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //創(chuàng)建通道
        Channel channel = connection.createChannel();

        // 創(chuàng)建一個(gè)消費(fèi)者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消費(fèi)收到消息的時(shí)候調(diào)用的回調(diào)
                System.out.println("C3接收到:" + new String(body));
            }
        };

        //把消費(fèi)著綁定到指定隊(duì)列
        //第一個(gè)是隊(duì)列名
        //第二個(gè)是 是否自動(dòng)確認(rèn)
        //第三個(gè)是消費(fèi)者
        channel.basicConsume(queueName,true,consumer);

    }
}

運(yùn)行后輸出為

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

消費(fèi)者一般都不會(huì)關(guān)閉,會(huì)一直等待隊(duì)列消息,可以手動(dòng)關(guān)閉程序。

channel.basicConsume(queueName,true,consumer);中的true為收到消息后自動(dòng)確認(rèn),改為false取消自動(dòng)確認(rèn)。

在handleDelivery方法最后面用

channel.basicAck(envelope.getDeliveryTag(),false);

來(lái)收到手動(dòng)確認(rèn)消息。消費(fèi)者可以有多個(gè)并且可以同時(shí)消費(fèi)一個(gè)隊(duì)列;

當(dāng)有多個(gè)消費(fèi)者同時(shí)消費(fèi)同一個(gè)隊(duì)列時(shí),收到的消息是平均分配的(消費(fèi)者沒(méi)收到之前已經(jīng)確認(rèn)每個(gè)消費(fèi)者受到的消息),

但當(dāng)其中一個(gè)消費(fèi)者性能差的話,會(huì)影響其他的消費(fèi)者,因?yàn)檫€要等它收完消息,這樣會(huì)拖累其他消費(fèi)者。

可以設(shè)置channel 的basicQos方法

//設(shè)置最多接受消息數(shù)量
// 設(shè)置了這個(gè)參數(shù)之后要吧自動(dòng)確認(rèn)關(guān)掉
channel.basicQos(1);

三、扇形(fanout)交換機(jī)

扇形交換機(jī)是基本的交換機(jī)類型,會(huì)把收到的消息以廣播的形式發(fā)送到綁定的隊(duì)列里,因?yàn)椴恍枰?jīng)過(guò)條件篩選,所以它的速度最快。

在生產(chǎn)者項(xiàng)目新建一個(gè)fanout類

package com.rabbitMQ.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @date  2019/11/20-11:23
 */
public class fanout {
    public static void main(String[] args) throws IOException, TimeoutException {
        //交換機(jī)名字
        String exchangeName="fanout";
        //交換機(jī)名字類型
        String exchangeType="fanout";
        //消息隊(duì)列名字
        String queueName1="fanout.queue1";
        String queueName2="fanout.queue2";
        String queueName3="fanout.queue3";
        //實(shí)例連接工廠
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //設(shè)置地址
        connectionFactory.setHost("192.168.128.233");
        //設(shè)置端口
        connectionFactory.setPort(5672);
        //設(shè)置用戶名
        connectionFactory.setUsername("mowen");
        //設(shè)置密碼
        connectionFactory.setPassword("123456");
        //獲取連接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //創(chuàng)建通道
        Channel channel = connection.createChannel();
        //聲明隊(duì)列。
        //參數(shù)1:隊(duì)列名
        //參數(shù)2:持久化 (true表示是,隊(duì)列將在服務(wù)器重啟時(shí)依舊存在)
        //參數(shù)3:獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列,斷開(kāi)后自動(dòng)刪除)
        //參數(shù)4:當(dāng)所有消費(fèi)者客戶端連接斷開(kāi)時(shí)是否自動(dòng)刪除隊(duì)列
        //參數(shù)5:隊(duì)列的其他參數(shù)
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        channel.queueDeclare(queueName3,true,false,false,null);

        //聲明交換機(jī)
        channel.exchangeDeclare(exchangeName,exchangeType);

        //隊(duì)列綁定到交換機(jī)
        channel.queueBind(queueName1,exchangeName,"");
        channel.queueBind(queueName2,exchangeName,"");
        channel.queueBind(queueName3,exchangeName,"");

        for (int i = 0; i < 10; i++) {
            String msg="msg"+i;
            // 基本發(fā)布消息
            // 第一個(gè)參數(shù)為交換機(jī)名稱、
            // 第二個(gè)參數(shù)為隊(duì)列映射的路由key、
            // 第三個(gè)參數(shù)為消息的其他屬性、
            // 第四個(gè)參數(shù)為發(fā)送信息的主體
            channel.basicPublish(exchangeName,"",null,msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}

運(yùn)行后在RabbitMQ網(wǎng)頁(yè)管理后臺(tái)的queue會(huì)看到

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

切換到Exchanges會(huì)看到一個(gè)

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

就是我們聲明的交換機(jī),點(diǎn)擊會(huì)看到我們綁定的隊(duì)列

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

四、直連(direct)交換機(jī)

直連交換機(jī)會(huì)帶路由功能,隊(duì)列通過(guò)routing_key與直連交換機(jī)綁定,發(fā)送消息需要指定routing_key,交換機(jī)收到消息時(shí),交換機(jī)會(huì)根據(jù)routing_key發(fā)送到指定隊(duì)列里,同樣的routing_key可以支持多個(gè)隊(duì)列。

在生產(chǎn)者項(xiàng)目新建direct類

package com.rabbitMQ.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @date  2019/11/20-11:23
 */
public class direct {
    public static void main(String[] args) throws IOException, TimeoutException {
        String exchangeName="direct";
        String exchangeType="direct";
        //消息隊(duì)列名字
        String queueName1="direct.queue1";
        String queueName2="direct.queue2";
        String queueName3="direct.queue3";
        //實(shí)例連接工廠
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //設(shè)置地址
        connectionFactory.setHost("192.168.128.233");
        //設(shè)置端口
        connectionFactory.setPort(5672);
        //設(shè)置用戶名
        connectionFactory.setUsername("mowen");
        //設(shè)置密碼
        connectionFactory.setPassword("123456");
        //獲取連接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //創(chuàng)建通道
        Channel channel = connection.createChannel();
        //聲明隊(duì)列。
        //參數(shù)1:隊(duì)列名
        //參數(shù)2:持久化 (true表示是,隊(duì)列將在服務(wù)器重啟時(shí)依舊存在)
        //參數(shù)3:獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列,斷開(kāi)后自動(dòng)刪除)
        //參數(shù)4:當(dāng)所有消費(fèi)者客戶端連接斷開(kāi)時(shí)是否自動(dòng)刪除隊(duì)列
        //參數(shù)5:隊(duì)列的其他參數(shù)
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        channel.queueDeclare(queueName3,true,false,false,null);

        //聲明交換機(jī)
        channel.exchangeDeclare(exchangeName,exchangeType);

        //隊(duì)列綁定到交換機(jī)并指定rouing_key
        channel.queueBind(queueName1,exchangeName,"key1");
        channel.queueBind(queueName2,exchangeName,"key2");
        channel.queueBind(queueName3,exchangeName,"key1");

        for (int i = 0; i < 10; i++) {
            String msg="msg"+i;
            // 基本發(fā)布消息
            // 第一個(gè)參數(shù)為交換機(jī)名稱、
            // 第二個(gè)參數(shù)為隊(duì)列映射的路由key、
            // 第三個(gè)參數(shù)為消息的其他屬性、
            // 第四個(gè)參數(shù)為發(fā)送信息的主體
            channel.basicPublish(exchangeName,"key1",null,msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}

運(yùn)行后到后臺(tái)的queue會(huì)看到

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

切換到Exchanges會(huì)看到

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

點(diǎn)擊進(jìn)去

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

五、主題(topic)交換機(jī)

主題交換機(jī)的routing_key可以有一定的規(guī)則,交換機(jī)和隊(duì)列的routing_key需要采用.#.…..的格式

每個(gè)部分用.分開(kāi)

*代表一個(gè)單詞(不是字符)

#代表任意數(shù)量(0或n個(gè))單詞

在生產(chǎn)者項(xiàng)目新進(jìn)topic類

package com.rabbitMQ.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @date  2019/11/20-11:23
 */
public class topic {
    public static void main(String[] args) throws IOException, TimeoutException {
        String exchangeName="topic";
        String exchangeType="topic";
        //消息隊(duì)列名字
        String queueName1="topic.queue1";
        String queueName2="topic.queue2";
        String queueName3="topic.queue3";
        //實(shí)例連接工廠
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //設(shè)置地址
        connectionFactory.setHost("192.168.128.233");
        //設(shè)置端口
        connectionFactory.setPort(5672);
        //設(shè)置用戶名
        connectionFactory.setUsername("mowen");
        //設(shè)置密碼
        connectionFactory.setPassword("123456");
        //獲取連接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //創(chuàng)建通道
        Channel channel = connection.createChannel();
        //聲明隊(duì)列。
        //參數(shù)1:隊(duì)列名
        //參數(shù)2:持久化 (true表示是,隊(duì)列將在服務(wù)器重啟時(shí)依舊存在)
        //參數(shù)3:獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列,斷開(kāi)后自動(dòng)刪除)
        //參數(shù)4:當(dāng)所有消費(fèi)者客戶端連接斷開(kāi)時(shí)是否自動(dòng)刪除隊(duì)列
        //參數(shù)5:隊(duì)列的其他參數(shù)
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        channel.queueDeclare(queueName3,true,false,false,null);

        //聲明交換機(jī)
        channel.exchangeDeclare(exchangeName,exchangeType);

        //隊(duì)列綁定到交換機(jī)并指定rouing_key
        channel.queueBind(queueName1,exchangeName,"com.aaa.*");
        channel.queueBind(queueName2,exchangeName,"com.*.topic");
        channel.queueBind(queueName3,exchangeName,"com.bbb.*");

        for (int i = 0; i < 10; i++) {
            String msg="msg"+i;
            // 基本發(fā)布消息
            // 第一個(gè)參數(shù)為交換機(jī)名稱、
            // 第二個(gè)參數(shù)為隊(duì)列映射的路由key、
            // 第三個(gè)參數(shù)為消息的其他屬性、
            // 第四個(gè)參數(shù)為發(fā)送信息的主體
            channel.basicPublish(exchangeName,"com.aaa.topic",null,msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}

運(yùn)行后,到后臺(tái)queue會(huì)看到

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

切換到Exchanges會(huì)看到

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

點(diǎn)擊進(jìn)入會(huì)看到

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI