溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

SpringBoot整合RabbitMq的方法是什么

發(fā)布時(shí)間:2022-10-18 15:59:32 來(lái)源:億速云 閱讀:120 作者:iii 欄目:編程語(yǔ)言

本篇內(nèi)容介紹了“SpringBoot整合RabbitMq的方法是什么”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

SpringBoot 整合RabbitMq 實(shí)戰(zhàn)

spring-boot-starter-amqp

高級(jí)消息隊(duì)列協(xié)議(AMQP)是面向消息中間件的平臺(tái)中立的有線協(xié)議。Spring AMQP項(xiàng)目將核心Spring概念應(yīng)用于基于AMQP的消息傳遞解決方案的開(kāi)發(fā)。Spring Boot為通過(guò)RabbitMQ與AMQP一起工作提供了一些便利,包括spring-boot-starter-amqp “Starter”。

springboot集成RabbitMQ非常簡(jiǎn)單,如果只是簡(jiǎn)單的使用配置非常少,springboot提供了spring-boot-starter-amqp項(xiàng)目對(duì)消息各種支持。

添加依賴

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

RabbitMQ是基于AMQP協(xié)議的輕量級(jí),可靠,可擴(kuò)展,可移植的消息代理。Spring使用RabbitMQ通過(guò)AMQP協(xié)議進(jìn)行通信。

屬性配置

RabbitMQ配置由外部配置屬性控制 spring.rabbitmq.*。例如,您可以在以下部分聲明以下部分 application.properties:

spring.rabbitmq.host = localhost
 spring.rabbitmq.port = 5672
 spring.rabbitmq.username = guest
 spring.rabbitmq.password

快速上手

1.隊(duì)列配置

package com.example.rabbitmqdemo.config;


import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 配置類(lèi)
 *
 * @author itguang
 * @create
@Configuration
public class RabbitConfig

    @Bean
    public Queue queue(){
        return new Queue("hello");
    }
}

2 發(fā)送者

rabbitTemplate是springboot 提供的默認(rèn)實(shí)現(xiàn).

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;

/**
 * 消息發(fā)送者
 *
 * @author itguang
 * @create

@Component
public class HelloSender


    @Autowired
    private AmqpTemplate amqpTemplate;


    public void send(){
        String context = "hello----"+LocalDateTime.now();
        System.out.println("send:"+context);
        //往名稱(chēng)為 hello 的queue中發(fā)送消息
        this.amqpTemplate.convertAndSend("hello",context);
    }

}

3 接收者

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息接受者
 *
 * @author itguang
 * @create

@Component
@RabbitListener(queues = "hello") //監(jiān)聽(tīng) 名稱(chēng)為 hello 的queue
public class HelloReceiver

    //消息處理器
    @RabbitHandler
    public void process(String message){
        System.out.println("Receiver:"+message);

    }


}

測(cè)試

package com.example.rabbitmqdemo;

import com.example.rabbitmqdemo.rabbitmq.HelloSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqdemoApplicationTests

    @Autowired
    HelloSender helloSender;

    @Test
    public void contextLoads() {
        helloSender.send();

    }
}

查看控制臺(tái)輸出結(jié)果

send:hello----2018-04-21T11:29:47.739
Receiver:hello----2018-04-21T11:29:47.739

一對(duì)多發(fā)送:一個(gè)發(fā)送者多個(gè)接受者

對(duì)上面的代碼進(jìn)行了小改造,接收端注冊(cè)了兩個(gè)Receiver,Receiver1和Receiver2,發(fā)送端加入?yún)?shù)計(jì)數(shù),接收端打印接收到的參數(shù),下面是測(cè)試代碼,發(fā)送一百條消息,來(lái)觀察兩個(gè)接收端的執(zhí)行效果

  • 添加一個(gè)隊(duì)列叫 hello2

package com.example.rabbitmqdemo.config;


import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 配置類(lèi)
 *
 * @author itguang
 * @create
@Configuration
public class RabbitConfig



    @Bean
    public Queue queue(){
        return new Queue("hello");
    }

    @Bean
    public Queue queue2(){
        return new Queue("hello2");
    }



}
  • 給隊(duì)列 hello2 發(fā)送消息,接受一個(gè)計(jì)數(shù)參數(shù)

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;

/**
 * 消息發(fā)送者
 *
 * @author itguang
 * @create

@Component
public class HelloSender


    @Autowired
    private AmqpTemplate amqpTemplate;


    public void send(){
        String context = "hello----"+LocalDateTime.now();
        System.out.println("send:"+context);
        this.amqpTemplate.convertAndSend("hello",context);
    }

    //給hello2發(fā)送消息,并接受一個(gè)計(jì)數(shù)參數(shù)
    public void send2(int i){
        String context = i+"";
        System.out.println(context+"--send:");
        this.amqpTemplate.convertAndSend("hello2",context);
    }
}
  • 兩個(gè)hello2 的接受者

@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver1


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver1:"+message);
    }


}
@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver2

    @RabbitHandler
    public void process(String message){
        System.out.println("Receiver2:"+message);
    }
}

測(cè)試

@Test
    public void manyReceiver(){
        for (int i=0;i<100;i++){
            helloSender.send2(i);
        }

    }

查看控制臺(tái)輸出結(jié)果:

0--send:
1--send:
2--send:
3--send:
4--send:

...(省略)

58--send:
59--send:
60--send:
61--send:
62--send:
63--send:
Receiver2:1
Receiver1:0
64--send:
65--send:
Receiver1:2
Receiver2:3
66--send:
Receiver1:4
Receiver2:5
...(省略)

可以看到:在消息發(fā)送到63時(shí),接受者Receiver已經(jīng)收到了消息,
結(jié)論:

一個(gè)發(fā)送者,N個(gè)接受者,經(jīng)過(guò)測(cè)試會(huì)均勻的將消息發(fā)送到N個(gè)接收者中

多對(duì)多: 多個(gè)發(fā)送者對(duì)多個(gè)接受者

我們可以注入兩個(gè)發(fā)送者,放在循環(huán)中,如下:

@Test
    public void many2many(){
      for (int i=0;i<100;i++){
          helloSender.send2(i);
          helloSender2.send2(i);

      }
    }

運(yùn)行單元測(cè)試,查看控制臺(tái)輸出:

0--send:
0--send:
1--send:
1--send:
2--send:
2--send:
3--send:
3--send:

...(省略)

22--send:
22--send:
23--send:
23--send:
24--send:
24--send:
Receiver2:0
25--send:
25--send:
Receiver2:1
26--send:
Receiver2:2
26--send:
Receiver2:3
27--send:
Receiver1:0
27--send:
Receiver2:4
Receiver1:1
28--send:
Receiver2:5
Receiver1:2
28--send:
Receiver2:6
Receiver1:3
29--send:
Receiver2:7
Receiver1:4
29--send:
Receiver2:8
Receiver1:5
30--send:
Receiver2:9
Receiver1:6
30--send:
31--send:
31--send:
32--send:
32--send:

結(jié)論:和一對(duì)多一樣,接收端仍然會(huì)均勻接收到消息

發(fā)送對(duì)象

首先我們創(chuàng)建一個(gè)實(shí)體類(lèi)對(duì)象 User,注意必須實(shí)現(xiàn) Serializable 接口.

package com.example.rabbitmqdemo.pojo;

import java.io.Serializable;

/**
 * @author itguang
 * @create
public class User implements Serializable


    private String username;
    private String password;

    public User(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "User{" +
                "username='" + username + '\'' +
                ", password='" + password + '\'' +
                '}';
    }
}

然后在配置文件中再創(chuàng)建一個(gè)隊(duì)列,叫 object_queue

@Bean
    public Queue queue3(){
        return new Queue("object_queue");
    }

接下里就是User對(duì)象的兩個(gè)發(fā)送者ObjectSender和接受者ObjectReceiver:

@Component
public class ObjectSender

    @Autowired
    AmqpTemplate amqpTemplate;

    public void sendUser(User user){

        System.out.println("Send object:"+user.toString());
        this.amqpTemplate.convertAndSend("object_queue",user);

    }
}
@Component
@RabbitListener(queues = "object_queue")
public class ObjectReceiver

    @RabbitHandler
    public void objectReceiver(User user){

        System.out.println("Receiver object:"+user.toString());

    }
}

運(yùn)行單元測(cè)試,查看控制臺(tái)輸出結(jié)果:

Send object:User{username='李增光', password='666666'}
Receiver object:User{username='李增光', password='666666'}

Topic Exchange

topic 是RabbitMQ中最靈活的一種方式,可以根據(jù)routing_key自由的綁定不同的隊(duì)列

首先對(duì)topic規(guī)則配置,這里使用兩個(gè)隊(duì)列來(lái)測(cè)試

package com.example.rabbitmqdemo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author itguang
 * @create
@Configuration
public class TopicRabbitConfig


    final static String message = "topic.message";
    final static String messages = "topic.messages";


    //創(chuàng)建兩個(gè) Queue
    @Bean
    public Queue queueMessage(){
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages(){
        return new Queue(TopicRabbitConfig.messages);
    }

    //配置 TopicExchange,指定名稱(chēng)為 topicExchange
    @Bean
    public TopicExchange exchange(){
        return new TopicExchange("topicExchange");
    }

    //給隊(duì)列綁定 exchange 和 routing_key

    @Bean
    public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange){
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    public Binding bingingExchangeMessages(Queue queueMessages,TopicExchange exchange){
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }


}

消息發(fā)送者:都是用topicExchange,并且綁定到不同的 routing_key

package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
public class TopicSender

    @Autowired
    AmqpTemplate amqpTemplate;

    public void send1(){
        String context = "hi, i am message 1";
        System.out.println("Sender : " + context);
        amqpTemplate.convertAndSend("topicExchange","topic.message",context);
    }

    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        amqpTemplate.convertAndSend("topicExchange", "topic.messages", context);
    }
}

兩個(gè)消息接受者,分別指定不同的 queue

package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1

    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver topic.message :"+ message);

    }

}
package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2

    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver topic.messages: "+ message);

    }

}

測(cè)試:

發(fā)送send1會(huì)匹配到topic.#和topic.message 兩個(gè)Receiver都可以收到消息,發(fā)送send2只有topic.#可以匹配所有只有Receiver2監(jiān)聽(tīng)到消息

Fanout Exchange

Fanout 就是我們熟悉的廣播模式或者訂閱模式,給Fanout交換機(jī)發(fā)送消息,綁定了這個(gè)交換機(jī)的所有隊(duì)列都收到這個(gè)消息。

Fanout 相關(guān)配置:

package com.example.rabbitmqdemo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.security.PublicKey;

/**
 * @author itguang
 * @create
@Configuration
public class FanOutRabbitMq


    //創(chuàng)建三個(gè)隊(duì)列

    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }


    //創(chuàng)建exchange,指定交換策略

    @Bean
    public FanoutExchange fanoutExchange() {

        return new FanoutExchange("fanoutExchange");
    }


    //分別給三個(gè)隊(duì)列指定exchange,這里使用了A、B、C三個(gè)隊(duì)列綁定到Fanout交換機(jī)上面,發(fā)送端的routing_key寫(xiě)任何字符都會(huì)被忽略:

    @Bean
    public Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(AMessage).to(fanoutExchange);

    }

    @Bean
    public Binding bindingExchangeB(Queue BMessage,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(BMessage).to(fanoutExchange);

    }

    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return

消息發(fā)送者:

這里使用了A、B、C三個(gè)隊(duì)列綁定到Fanout交換機(jī)上面,發(fā)送端的routing_key寫(xiě)任何字符都會(huì)被忽略

package com.example.rabbitmqdemo.fanout;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
public class FanoutSender


    @Autowired
    AmqpTemplate amqpTemplate;


    public void send(){

        String context = "hi, fanout msg ";
        System.out.println("Sender : " + context);
        //這里使用了A、B、C三個(gè)隊(duì)列綁定到Fanout交換機(jī)上面,發(fā)送端的routing_key寫(xiě)任何字符都會(huì)被忽略:
        amqpTemplate.convertAndSend("fanoutExchange","", context);

    }

}

三個(gè)消息接受者:

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.A: "+message);

    }

}

@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.B: "+message);

    }

}

@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.C: "+message);

    }

}

運(yùn)行單元測(cè)試,查看結(jié)果:

Sender : hi, fanout msg 

Receiver form fanout.C: hi, fanout msg 
Receiver form fanout.A: hi, fanout msg 
Receiver form fanout.B: hi, fanout msg

結(jié)果說(shuō)明,綁定到fanout交換機(jī)上面的隊(duì)列都收到了消息.

“SpringBoot整合RabbitMq的方法是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI