溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

spring boot集成rabbitmq的示例分析

發(fā)布時(shí)間:2021-07-08 13:44:23 來(lái)源:億速云 閱讀:275 作者:小新 欄目:編程語(yǔ)言

這篇文章主要為大家展示了“spring boot集成rabbitmq的示例分析”,內(nèi)容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“spring boot集成rabbitmq的示例分析”這篇文章吧。

一、RabbitMQ的介紹  

RabbitMQ是消息中間件的一種,消息中間件即分布式系統(tǒng)中完成消息的發(fā)送和接收的基礎(chǔ)軟件.這些軟件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,現(xiàn)已經(jīng)轉(zhuǎn)讓給apache).

消息中間件的工作過(guò)程可以用生產(chǎn)者消費(fèi)者模型來(lái)表示.即,生產(chǎn)者不斷的向消息隊(duì)列發(fā)送信息,而消費(fèi)者從消息隊(duì)列中消費(fèi)信息.具體過(guò)程如下:

spring boot集成rabbitmq的示例分析

從上圖可看出,對(duì)于消息隊(duì)列來(lái)說(shuō),生產(chǎn)者,消息隊(duì)列,消費(fèi)者是最重要的三個(gè)概念,生產(chǎn)者發(fā)消息到消息隊(duì)列中去,消費(fèi)者監(jiān)聽(tīng)指定的消息隊(duì)列,并且當(dāng)消息隊(duì)列收到消息之后,接收消息隊(duì)列傳來(lái)的消息,并且給予相應(yīng)的處理.消息隊(duì)列常用于分布式系統(tǒng)之間互相信息的傳遞.

對(duì)于RabbitMQ來(lái)說(shuō),除了這三個(gè)基本模塊以外,還添加了一個(gè)模塊,即交換機(jī)(Exchange).它使得生產(chǎn)者和消息隊(duì)列之間產(chǎn)生了隔離,生產(chǎn)者將消息發(fā)送給交換機(jī),而交換機(jī)則根據(jù)調(diào)度策略把相應(yīng)的消息轉(zhuǎn)發(fā)給對(duì)應(yīng)的消息隊(duì)列.那么RabitMQ的工作流程如下所示:

spring boot集成rabbitmq的示例分析

緊接著說(shuō)一下交換機(jī).交換機(jī)的主要作用是接收相應(yīng)的消息并且綁定到指定的隊(duì)列.交換機(jī)有四種類(lèi)型,分別為Direct,topic,headers,Fanout.

Direct是RabbitMQ默認(rèn)的交換機(jī)模式,也是最簡(jiǎn)單的模式.即創(chuàng)建消息隊(duì)列的時(shí)候,指定一個(gè)BindingKey.當(dāng)發(fā)送者發(fā)送消息的時(shí)候,指定對(duì)應(yīng)的Key.當(dāng)Key和消息隊(duì)列的BindingKey一致的時(shí)候,消息將會(huì)被發(fā)送到該消息隊(duì)列中.

topic轉(zhuǎn)發(fā)信息主要是依據(jù)通配符,隊(duì)列和交換機(jī)的綁定主要是依據(jù)一種模式(通配符+字符串),而當(dāng)發(fā)送消息的時(shí)候,只有指定的Key和該模式相匹配的時(shí)候,消息才會(huì)被發(fā)送到該消息隊(duì)列中.

headers也是根據(jù)一個(gè)規(guī)則進(jìn)行匹配,在消息隊(duì)列和交換機(jī)綁定的時(shí)候會(huì)指定一組鍵值對(duì)規(guī)則,而發(fā)送消息的時(shí)候也會(huì)指定一組鍵值對(duì)規(guī)則,當(dāng)兩組鍵值對(duì)規(guī)則相匹配的時(shí)候,消息會(huì)被發(fā)送到匹配的消息隊(duì)列中.

Fanout是路由廣播的形式,將會(huì)把消息發(fā)給綁定它的全部隊(duì)列,即便設(shè)置了key,也會(huì)被忽略. 

概念:

  • 生產(chǎn)者 消息的產(chǎn)生方,負(fù)責(zé)將消息推送到消息隊(duì)列

  • 消費(fèi)者 消息的最終接受方,負(fù)責(zé)監(jiān)聽(tīng)隊(duì)列中的對(duì)應(yīng)消息,消費(fèi)消息

  • 隊(duì)列 消息的寄存器,負(fù)責(zé)存放生產(chǎn)者發(fā)送的消息

  • 交換機(jī) 負(fù)責(zé)根據(jù)一定規(guī)則分發(fā)生產(chǎn)者產(chǎn)生的消息

  • 綁定 完成交換機(jī)和隊(duì)列之間的綁定

模式:

1、direct

直連模式,用于實(shí)例間的任務(wù)分發(fā)

2、topic

話題模式,通過(guò)可配置的規(guī)則分發(fā)給綁定在該exchange上的隊(duì)列

3、headers

適用規(guī)則復(fù)雜的分發(fā),用headers里的參數(shù)表達(dá)規(guī)則

4、fanout

分發(fā)給所有綁定到該exchange上的隊(duì)列,忽略routing key

安裝

單機(jī)版安裝很簡(jiǎn)單,大概步驟如下:

# 安裝erlang包
 yum install erlang
# 安裝socat
 yum install socat
# 安裝rabbit 
 rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm 
# 啟動(dòng)服務(wù)
 rabbitmq-server start
# 增加管理控制功能
 rabbitmq-plugins enable rabbitmq_management
# 增加用戶:
 sudo rabbitmqctl add_user root password
 rabbitmqctl set_user_tags root administrator 
 rabbitmqctl set_permissions -p / root '.*' '.*' '.*'

集群安裝,可參考這篇文章:

     rabbitmq集群安裝

以上就是rabbitmq的介紹,下面開(kāi)始本文的正文:spring boot 集成rabbitmq ,本人在學(xué)習(xí)rabbitmq時(shí)發(fā)現(xiàn)網(wǎng)上很少有系統(tǒng)性介紹springboot和rabbitmq如何集成的,其他人總結(jié)的都片段化,所以結(jié)合個(gè)人調(diào)研過(guò)程,整理此篇文章。

二、springboot配置

廢話少說(shuō)直接上代碼:

配置參數(shù)

application.yml:

spring:
 rabbitmq:
 addresses: 192.168.1.1:5672
 username: username
 password: password
 publisher-confirms: true
 virtual-host: /

java config讀取參數(shù)

/**
 * RabbitMq配置文件讀取類(lèi)
 *
 * @author chenhf
 * @create 2017-10-23 上午9:31
 **/
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {

 @Value("${spring.rabbitmq.addresses}")
 private String addresses;
 @Value("${spring.rabbitmq.username}")
 private String username;
 @Value("${spring.rabbitmq.password}")
 private String password;
 @Value("${spring.rabbitmq.publisher-confirms}")
 private Boolean publisherConfirms;
 @Value("${spring.rabbitmq.virtual-host}")
 private String virtualHost;

 // 構(gòu)建mq實(shí)例工廠
 @Bean
 public ConnectionFactory connectionFactory(){
 CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
 connectionFactory.setAddresses(addresses);
 connectionFactory.setUsername(username);
 connectionFactory.setPassword(password);
 connectionFactory.setPublisherConfirms(publisherConfirms);
 connectionFactory.setVirtualHost(virtualHost);
 return connectionFactory;
 }

 @Bean
 public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
 return new RabbitAdmin(connectionFactory);
 }

 @Bean
 @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
 public RabbitTemplate rabbitTemplate(){
 RabbitTemplate template = new RabbitTemplate(connectionFactory());
 return template;
 }
}

三、rabbitmq生產(chǎn)者配置

主要配置了直連和話題模式,其中話題模式設(shè)置兩個(gè)隊(duì)列(queueTopicTest1、queueTopicTest2),此兩個(gè)隊(duì)列在和交換機(jī)綁定時(shí)分別設(shè)置不同的routingkey(.TEST.以及l(fā)azy.#)來(lái)驗(yàn)證匹配模式。

/**
 * 用于配置交換機(jī)和隊(duì)列對(duì)應(yīng)關(guān)系
 * 新增消息隊(duì)列應(yīng)該按照如下步驟
 * 1、增加queue bean,參見(jiàn)queueXXXX方法
 * 2、增加queue和exchange的binding
 * @author chenhf
 * @create 2017-10-23 上午10:33
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class RabbitMqExchangeConfig {
 /** logger */
 private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class);

 /**
 * @Author:chenhf
 * @Description: 主題型交換機(jī)
 * @Date:下午5:49 2017/10/23
 * @param
 * @return
 */
 @Bean
 TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){
 TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode());
 rabbitAdmin.declareExchange(contractTopicExchange);
 logger.debug("完成主題型交換機(jī)bean實(shí)例化");
 return contractTopicExchange;
 }
 /**
 * 直連型交換機(jī)
 */
 @Bean
 DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {
 DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode());
 rabbitAdmin.declareExchange(contractDirectExchange);
 logger.debug("完成直連型交換機(jī)bean實(shí)例化");
 return contractDirectExchange;
 }

 //在此可以定義隊(duì)列

 @Bean
 Queue queueTest(RabbitAdmin rabbitAdmin){
 Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode());
 rabbitAdmin.declareQueue(queue);
 logger.debug("測(cè)試隊(duì)列實(shí)例化完成");
 return queue;
 }

 //topic 1
 @Bean
 Queue queueTopicTest1(RabbitAdmin rabbitAdmin){
 Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode());
 rabbitAdmin.declareQueue(queue);
 logger.debug("話題測(cè)試隊(duì)列1實(shí)例化完成");
 return queue;
 }
 //topic 2
 @Bean
 Queue queueTopicTest2(RabbitAdmin rabbitAdmin){
 Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode());
 rabbitAdmin.declareQueue(queue);
 logger.debug("話題測(cè)試隊(duì)列2實(shí)例化完成");
 return queue;
 }


 //在此處完成隊(duì)列和交換機(jī)綁定
 @Bean
 Binding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){
 Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode());
 rabbitAdmin.declareBinding(binding);
 logger.debug("測(cè)試隊(duì)列與直連型交換機(jī)綁定完成");
 return binding;
 }
 //topic binding1
 @Bean
 Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){
 Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode());
 rabbitAdmin.declareBinding(binding);
 logger.debug("測(cè)試隊(duì)列與話題交換機(jī)1綁定完成");
 return binding;
 }

 //topic binding2
 @Bean
 Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){
 Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode());
 rabbitAdmin.declareBinding(binding);
 logger.debug("測(cè)試隊(duì)列與話題交換機(jī)2綁定完成");
 return binding;
 }

}

在這里用到枚舉類(lèi):RabbitMqEnum

/**
 * 定義rabbitMq需要的常量
 *
 * @author chenhf
 * @create 2017-10-23 下午4:07
 **/
public class RabbitMqEnum {

 /**
 * @param
 * @Author:chenhf
 * @Description:定義數(shù)據(jù)交換方式
 * @Date:下午4:08 2017/10/23
 * @return
 */
 public enum Exchange {
 CONTRACT_FANOUT("CONTRACT_FANOUT", "消息分發(fā)"),
 CONTRACT_TOPIC("CONTRACT_TOPIC", "消息訂閱"),
 CONTRACT_DIRECT("CONTRACT_DIRECT", "點(diǎn)對(duì)點(diǎn)");

 private String code;
 private String name;

 Exchange(String code, String name) {
 this.code = code;
 this.name = name;
 }

 public String getCode() {
 return code;
 }

 public String getName() {
 return name;
 }
 }

 /**
 * describe: 定義隊(duì)列名稱(chēng)
 * creat_user: chenhf
 * creat_date: 2017/10/31
 **/
 public enum QueueName {
 TESTQUEUE("TESTQUEUE", "測(cè)試隊(duì)列"),
 TOPICTEST1("TOPICTEST1", "topic測(cè)試隊(duì)列"),
 TOPICTEST2("TOPICTEST2", "topic測(cè)試隊(duì)列");

 private String code;
 private String name;

 QueueName(String code, String name) {
 this.code = code;
 this.name = name;
 }

 public String getCode() {
 return code;
 }

 public String getName() {
 return name;
 }

 }

 /**
 * describe: 定義routing_key
 * creat_user: chenhf
 * creat_date: 2017/10/31
 **/
 public enum QueueEnum {
 TESTQUEUE("TESTQUEUE1", "測(cè)試隊(duì)列key"),
 TESTTOPICQUEUE1("*.TEST.*", "topic測(cè)試隊(duì)列key"),
 TESTTOPICQUEUE2("lazy.#", "topic測(cè)試隊(duì)列key");


 private String code;
 private String name;

 QueueEnum(String code, String name) {
 this.code = code;
 this.name = name;
 }

 public String getCode() {
 return code;
 }

 public String getName() {
 return name;
 }
 }

}

以上完成消息生產(chǎn)者的定義,下面封裝調(diào)用接口

測(cè)試時(shí)直接調(diào)用此工具類(lèi),testUser類(lèi)需自己實(shí)現(xiàn)

rabbitMqSender.sendRabbitmqDirect("TESTQUEUE1",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.1.2",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.TEST.2",testUser);
/**
 * rabbitmq發(fā)送消息工具類(lèi)
 *
 * @author chenhf
 * @create 2017-10-26 上午11:10
 **/

@Component
public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{
 /** logger */
 private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class);

 private RabbitTemplate rabbitTemplate;

 @Autowired
 public RabbitMqSender(RabbitTemplate rabbitTemplate) {
 this.rabbitTemplate = rabbitTemplate;
 this.rabbitTemplate.setConfirmCallback(this);
 }

 @Override
 public void confirm(CorrelationData correlationData, boolean b, String s) {
 logger.info("confirm: " + correlationData.getId());
 }

 /**
 * 發(fā)送到 指定routekey的指定queue
 * @param routeKey
 * @param obj
 */
 public void sendRabbitmqDirect(String routeKey,Object obj) {
 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 logger.info("send: " + correlationData.getId());
 this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData);
 }

 /**
 * 所有發(fā)送到Topic Exchange的消息被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定Topic的Queue上
 * @param routeKey
 * @param obj
 */
 public void sendRabbitmqTopic(String routeKey,Object obj) {
 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 logger.info("send: " + correlationData.getId());
 this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData);
 }
}

四、rabbitmq消費(fèi)者配置

springboot注解方式監(jiān)聽(tīng)隊(duì)列,無(wú)法手動(dòng)指定回調(diào),所以采用了實(shí)現(xiàn)ChannelAwareMessageListener接口,重寫(xiě)onMessage來(lái)進(jìn)行手動(dòng)回調(diào),詳見(jiàn)以下代碼,詳細(xì)介紹可以在spring的官網(wǎng)上找amqp相關(guān)章節(jié)閱讀

直連消費(fèi)者

通過(guò)設(shè)置TestUser的name來(lái)測(cè)試回調(diào),分別發(fā)兩條消息,一條UserName為1,一條為2,查看控制臺(tái)中隊(duì)列中消息是否被消費(fèi)

/**
 * 消費(fèi)者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class ExampleAmqpConfiguration {
 @Bean("testQueueContainer")
 public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames("TESTQUEUE");
 container.setMessageListener(exampleListener());
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return container;
 }


 @Bean("testQueueListener")
 public ChannelAwareMessageListener exampleListener() {
 return new ChannelAwareMessageListener() {
 @Override
 public void onMessage(Message message, Channel channel) throws Exception {
 TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
 //通過(guò)設(shè)置TestUser的name來(lái)測(cè)試回調(diào),分別發(fā)兩條消息,一條UserName為1,一條為2,查看控制臺(tái)中隊(duì)列中消息是否被消費(fèi)
 if ("2".equals(testUser.getUserName())){
  System.out.println(testUser.toString());
  channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 }

 if ("1".equals(testUser.getUserName())){
  System.out.println(testUser.toString());
  channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
 }

 }
 };
 }

}

topic消費(fèi)者1

/**
 * 消費(fèi)者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration {
 @Bean("topicTest1Container")
 public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames("TOPICTEST1");
 container.setMessageListener(exampleListener1());
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return container;
 }


 @Bean("topicTest1Listener")
 public ChannelAwareMessageListener exampleListener1(){
 return new ChannelAwareMessageListener() {
 @Override
 public void onMessage(Message message, Channel channel) throws Exception {
 TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
 System.out.println("TOPICTEST1:"+testUser.toString());
 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

 }
 };
 }




}

topic消費(fèi)者2

/**
 * 消費(fèi)者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration2 {
 @Bean("topicTest2Container")
 public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames("TOPICTEST2");
 container.setMessageListener(exampleListener());
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return container;
 }


 @Bean("topicTest2Listener")
 public ChannelAwareMessageListener exampleListener() {
 return new ChannelAwareMessageListener() {
 @Override
 public void

以上是“spring boot集成rabbitmq的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI