您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“SpringBoot整合RabbitMq的方法是什么”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
高級(jí)消息隊(duì)列協(xié)議(AMQP)是面向消息中間件的平臺(tái)中立的有線協(xié)議。Spring AMQP項(xiàng)目將核心Spring概念應(yīng)用于基于AMQP的消息傳遞解決方案的開(kāi)發(fā)。Spring Boot為通過(guò)RabbitMQ與AMQP一起工作提供了一些便利,包括spring-boot-starter-amqp “Starter”。
springboot集成RabbitMQ非常簡(jiǎn)單,如果只是簡(jiǎn)單的使用配置非常少,springboot提供了spring-boot-starter-amqp項(xiàng)目對(duì)消息各種支持。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
RabbitMQ是基于AMQP協(xié)議的輕量級(jí),可靠,可擴(kuò)展,可移植的消息代理。Spring使用RabbitMQ通過(guò)AMQP協(xié)議進(jìn)行通信。
RabbitMQ配置由外部配置屬性控制 spring.rabbitmq.*。例如,您可以在以下部分聲明以下部分 application.properties:
spring.rabbitmq.host = localhost spring.rabbitmq.port = 5672 spring.rabbitmq.username = guest spring.rabbitmq.password
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ 配置類(lèi) * * @author itguang * @create @Configuration public class RabbitConfig @Bean public Queue queue(){ return new Queue("hello"); } }
rabbitTemplate是springboot 提供的默認(rèn)實(shí)現(xiàn).
package com.example.rabbitmqdemo.rabbitmq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Date; /** * 消息發(fā)送者 * * @author itguang * @create @Component public class HelloSender @Autowired private AmqpTemplate amqpTemplate; public void send(){ String context = "hello----"+LocalDateTime.now(); System.out.println("send:"+context); //往名稱(chēng)為 hello 的queue中發(fā)送消息 this.amqpTemplate.convertAndSend("hello",context); } }
package com.example.rabbitmqdemo.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息接受者 * * @author itguang * @create @Component @RabbitListener(queues = "hello") //監(jiān)聽(tīng) 名稱(chēng)為 hello 的queue public class HelloReceiver //消息處理器 @RabbitHandler public void process(String message){ System.out.println("Receiver:"+message); } }
package com.example.rabbitmqdemo; import com.example.rabbitmqdemo.rabbitmq.HelloSender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqdemoApplicationTests @Autowired HelloSender helloSender; @Test public void contextLoads() { helloSender.send(); } }
查看控制臺(tái)輸出結(jié)果
send:hello----2018-04-21T11:29:47.739 Receiver:hello----2018-04-21T11:29:47.739
對(duì)上面的代碼進(jìn)行了小改造,接收端注冊(cè)了兩個(gè)Receiver,Receiver1和Receiver2,發(fā)送端加入?yún)?shù)計(jì)數(shù),接收端打印接收到的參數(shù),下面是測(cè)試代碼,發(fā)送一百條消息,來(lái)觀察兩個(gè)接收端的執(zhí)行效果
添加一個(gè)隊(duì)列叫 hello2
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ 配置類(lèi) * * @author itguang * @create @Configuration public class RabbitConfig @Bean public Queue queue(){ return new Queue("hello"); } @Bean public Queue queue2(){ return new Queue("hello2"); } }
給隊(duì)列 hello2 發(fā)送消息,接受一個(gè)計(jì)數(shù)參數(shù)
package com.example.rabbitmqdemo.rabbitmq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Date; /** * 消息發(fā)送者 * * @author itguang * @create @Component public class HelloSender @Autowired private AmqpTemplate amqpTemplate; public void send(){ String context = "hello----"+LocalDateTime.now(); System.out.println("send:"+context); this.amqpTemplate.convertAndSend("hello",context); } //給hello2發(fā)送消息,并接受一個(gè)計(jì)數(shù)參數(shù) public void send2(int i){ String context = i+""; System.out.println(context+"--send:"); this.amqpTemplate.convertAndSend("hello2",context); } }
兩個(gè)hello2 的接受者
@Component @RabbitListener(queues = "hello2") public class HelloReceiver1 @RabbitHandler public void process(String message){ System.out.println("Receiver1:"+message); } }
@Component @RabbitListener(queues = "hello2") public class HelloReceiver2 @RabbitHandler public void process(String message){ System.out.println("Receiver2:"+message); } }
@Test public void manyReceiver(){ for (int i=0;i<100;i++){ helloSender.send2(i); } }
查看控制臺(tái)輸出結(jié)果:
0--send: 1--send: 2--send: 3--send: 4--send: ...(省略) 58--send: 59--send: 60--send: 61--send: 62--send: 63--send: Receiver2:1 Receiver1:0 64--send: 65--send: Receiver1:2 Receiver2:3 66--send: Receiver1:4 Receiver2:5 ...(省略)
可以看到:在消息發(fā)送到63時(shí),接受者Receiver已經(jīng)收到了消息,
結(jié)論:
一個(gè)發(fā)送者,N個(gè)接受者,經(jīng)過(guò)測(cè)試會(huì)均勻的將消息發(fā)送到N個(gè)接收者中
我們可以注入兩個(gè)發(fā)送者,放在循環(huán)中,如下:
@Test public void many2many(){ for (int i=0;i<100;i++){ helloSender.send2(i); helloSender2.send2(i); } }
運(yùn)行單元測(cè)試,查看控制臺(tái)輸出:
0--send: 0--send: 1--send: 1--send: 2--send: 2--send: 3--send: 3--send: ...(省略) 22--send: 22--send: 23--send: 23--send: 24--send: 24--send: Receiver2:0 25--send: 25--send: Receiver2:1 26--send: Receiver2:2 26--send: Receiver2:3 27--send: Receiver1:0 27--send: Receiver2:4 Receiver1:1 28--send: Receiver2:5 Receiver1:2 28--send: Receiver2:6 Receiver1:3 29--send: Receiver2:7 Receiver1:4 29--send: Receiver2:8 Receiver1:5 30--send: Receiver2:9 Receiver1:6 30--send: 31--send: 31--send: 32--send: 32--send:
結(jié)論:和一對(duì)多一樣,接收端仍然會(huì)均勻接收到消息
首先我們創(chuàng)建一個(gè)實(shí)體類(lèi)對(duì)象 User,注意必須實(shí)現(xiàn) Serializable 接口.
package com.example.rabbitmqdemo.pojo; import java.io.Serializable; /** * @author itguang * @create public class User implements Serializable private String username; private String password; public User(String username, String password) { this.username = username; this.password = password; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } @Override public String toString() { return "User{" + "username='" + username + '\'' + ", password='" + password + '\'' + '}'; } }
然后在配置文件中再創(chuàng)建一個(gè)隊(duì)列,叫 object_queue
@Bean public Queue queue3(){ return new Queue("object_queue"); }
接下里就是User對(duì)象的兩個(gè)發(fā)送者ObjectSender和接受者ObjectReceiver:
@Component public class ObjectSender @Autowired AmqpTemplate amqpTemplate; public void sendUser(User user){ System.out.println("Send object:"+user.toString()); this.amqpTemplate.convertAndSend("object_queue",user); } }
@Component @RabbitListener(queues = "object_queue") public class ObjectReceiver @RabbitHandler public void objectReceiver(User user){ System.out.println("Receiver object:"+user.toString()); } }
運(yùn)行單元測(cè)試,查看控制臺(tái)輸出結(jié)果:
Send object:User{username='李增光', password='666666'} Receiver object:User{username='李增光', password='666666'}
topic 是RabbitMQ中最靈活的一種方式,可以根據(jù)routing_key自由的綁定不同的隊(duì)列
首先對(duì)topic規(guī)則配置,這里使用兩個(gè)隊(duì)列來(lái)測(cè)試
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author itguang * @create @Configuration public class TopicRabbitConfig final static String message = "topic.message"; final static String messages = "topic.messages"; //創(chuàng)建兩個(gè) Queue @Bean public Queue queueMessage(){ return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages(){ return new Queue(TopicRabbitConfig.messages); } //配置 TopicExchange,指定名稱(chēng)為 topicExchange @Bean public TopicExchange exchange(){ return new TopicExchange("topicExchange"); } //給隊(duì)列綁定 exchange 和 routing_key @Bean public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange){ return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean public Binding bingingExchangeMessages(Queue queueMessages,TopicExchange exchange){ return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
消息發(fā)送者:都是用topicExchange,并且綁定到不同的 routing_key
package com.example.rabbitmqdemo.topic; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author itguang * @create @Component public class TopicSender @Autowired AmqpTemplate amqpTemplate; public void send1(){ String context = "hi, i am message 1"; System.out.println("Sender : " + context); amqpTemplate.convertAndSend("topicExchange","topic.message",context); } public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); amqpTemplate.convertAndSend("topicExchange", "topic.messages", context); } }
兩個(gè)消息接受者,分別指定不同的 queue
package com.example.rabbitmqdemo.topic; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author itguang * @create @Component @RabbitListener(queues = "topic.message") public class TopicReceiver1 @RabbitHandler public void process(String message){ System.out.println("Receiver topic.message :"+ message); } }
package com.example.rabbitmqdemo.topic; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author itguang * @create @Component @RabbitListener(queues = "topic.messages") public class TopicReceiver2 @RabbitHandler public void process(String message){ System.out.println("Receiver topic.messages: "+ message); } }
測(cè)試:
發(fā)送send1會(huì)匹配到topic.#和topic.message 兩個(gè)Receiver都可以收到消息,發(fā)送send2只有topic.#可以匹配所有只有Receiver2監(jiān)聽(tīng)到消息
Fanout 就是我們熟悉的廣播模式或者訂閱模式,給Fanout交換機(jī)發(fā)送消息,綁定了這個(gè)交換機(jī)的所有隊(duì)列都收到這個(gè)消息。
Fanout 相關(guān)配置:
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.security.PublicKey; /** * @author itguang * @create @Configuration public class FanOutRabbitMq //創(chuàng)建三個(gè)隊(duì)列 @Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } @Bean public Queue CMessage() { return new Queue("fanout.C"); } //創(chuàng)建exchange,指定交換策略 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } //分別給三個(gè)隊(duì)列指定exchange,這里使用了A、B、C三個(gè)隊(duì)列綁定到Fanout交換機(jī)上面,發(fā)送端的routing_key寫(xiě)任何字符都會(huì)被忽略: @Bean public Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange){ return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean public Binding bindingExchangeB(Queue BMessage,FanoutExchange fanoutExchange){ return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return
消息發(fā)送者:
這里使用了A、B、C三個(gè)隊(duì)列綁定到Fanout交換機(jī)上面,發(fā)送端的routing_key寫(xiě)任何字符都會(huì)被忽略
package com.example.rabbitmqdemo.fanout; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author itguang * @create @Component public class FanoutSender @Autowired AmqpTemplate amqpTemplate; public void send(){ String context = "hi, fanout msg "; System.out.println("Sender : " + context); //這里使用了A、B、C三個(gè)隊(duì)列綁定到Fanout交換機(jī)上面,發(fā)送端的routing_key寫(xiě)任何字符都會(huì)被忽略: amqpTemplate.convertAndSend("fanoutExchange","", context); } }
三個(gè)消息接受者:
@Component @RabbitListener(queues = "fanout.A") public class FanoutReceiverA @RabbitHandler public void process(String message){ System.out.println("Receiver form fanout.A: "+message); } } @Component @RabbitListener(queues = "fanout.B") public class FanoutReceiverB @RabbitHandler public void process(String message){ System.out.println("Receiver form fanout.B: "+message); } } @Component @RabbitListener(queues = "fanout.C") public class FanoutReceiverC @RabbitHandler public void process(String message){ System.out.println("Receiver form fanout.C: "+message); } }
運(yùn)行單元測(cè)試,查看結(jié)果:
Sender : hi, fanout msg Receiver form fanout.C: hi, fanout msg Receiver form fanout.A: hi, fanout msg Receiver form fanout.B: hi, fanout msg
結(jié)果說(shuō)明,綁定到fanout交換機(jī)上面的隊(duì)列都收到了消息.
“SpringBoot整合RabbitMq的方法是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。