您好,登錄后才能下訂單哦!
本文小編為大家詳細介紹“springboot rabbitmq reply消息直接回復模式怎么實現(xiàn)”,內(nèi)容詳細,步驟清晰,細節(jié)處理妥當,希望這篇“springboot rabbitmq reply消息直接回復模式怎么實現(xiàn)”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。
MQ的作用包括了解耦、異步等。
通常生產(chǎn)者只負責生產(chǎn)消息,而不關(guān)心消息誰去獲取,或者消費結(jié)果如何;消費者只負責接收指定的消息進行業(yè)務(wù)處理而不關(guān)心消息從哪里來一級回復業(yè)務(wù)處理情況。但我們項目中有特殊的業(yè)務(wù)存在,我們作為消息生產(chǎn)者在生產(chǎn)消息后需要接收消費者的響應結(jié)果(說白了就是類似同步調(diào)用 請求響應的MQ使用),經(jīng)過研究,MQ的Reply模式(直接回復模式)就是為此種業(yè)務(wù)模式而產(chǎn)生。
依賴:
我這里只列出最核心的rabbitMq所需依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置:
無其余特殊配置,因為reply就是rabbitmq的一種交互方式而已
spring: rabbitmq: host: 10.50.40.116 port: 5673 username: admin password: admin
package com.leilei.demo; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author lei * @create 2022-09-19 21:44 * @desc mq配置 **/ @Configuration public class RabbitMqConfig { @Bean public Queue bizQueue() { return new Queue("bizQueue"); } @Bean public Queue replyQueue() { return new Queue("replyQueue"); } @Bean FanoutExchange bizExchange() { return new FanoutExchange("bizExchange"); } }
業(yè)務(wù)類:
@Data @NoArgsConstructor @AllArgsConstructor public class Vehicle implements Serializable { private Integer id; private String name; }
消息生產(chǎn)端需要做的事情:有生產(chǎn)消息、接受消息消費響應
(1)生產(chǎn)消息
1、生產(chǎn)消息,看業(yè)務(wù)場景選擇是否生成全局唯一自定義的消息ID
2、指定消息消費后響應的隊列(Reply)
/** * 生產(chǎn)消息 * * @param * @return void * @author lei * @date 2022-09-19 21:59:18 */ public void replySend() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setReplyTo("replyQueue"); //todo 根據(jù)業(yè)務(wù),做一個嚴謹?shù)娜治ㄒ籌D,我這里暫時用UUID String correlationId = UUID.randomUUID().toString(); // 我這里指定了唯一消息ID,看業(yè)務(wù)場景,消費者消費響應后,生產(chǎn)者端可根據(jù)消息ID做業(yè)務(wù)處理 messageProperties.setCorrelationId(correlationId); Vehicle vehicle = new Vehicle(1, "川A0001"); Message message = new Message(JSON.toJSONString(vehicle).getBytes(), messageProperties); rabbitTemplate.convertAndSend("bizExchange","",message); System.out.println("生產(chǎn)者發(fā)送消息,自定義消息ID為:" + correlationId); }
(2)接受Reply響應
消費者消費消息后會將處理結(jié)果進行發(fā)送到一個隊列,我們讀取這里隊列就可以拿到對應消息的響應結(jié)果進行業(yè)務(wù)處理了
/** * 接收消息響應 * * @param message * @return void * @author lei * @date 2022-09-19 21:59:27 */ @RabbitListener(queues = "replyQueue") public void replyResponse(Message message) { String s = new String(message.getBody()); String correlationId = message.getMessageProperties().getCorrelationId(); System.out.println("收到客戶端響應消息ID:" + correlationId); //todo 根據(jù)消息ID可判斷這是哪一個消息的響應,我們就可做業(yè)務(wù)操作 System.out.println("收到客戶端響應消息:" + s); }
消息消費端需要做的事有:接受消息然后進行業(yè)務(wù)處理、響應消息
一般來說,我們mq消費者監(jiān)聽方法不需要返回值,我們這里使用sendTo注解,則需要將要響應的消息定義為返回值,sendTo注解中指定要響應到哪個隊列
重點:
1、sendTo注解指定要相應的隊列(注意和生產(chǎn)端保持一致)
2、方法定義的返回值內(nèi)容就是要響應的消息,最終會發(fā)送到sendTo注解指定要相應的隊列
3、這種方法的缺點是消費端的主關(guān)性很高,因為sendTo指定的目標隊列可以自己瞎寫,導致生產(chǎn)者端無法正確收到消息響應,但我相信一般項目中也不會這么干
/** * 方式1 SendTo指定響應隊列 * * @param message * @return String * @author lei * @date 2022-09-19 16:17:52 */ @RabbitListener(queues ="bizQueue") @SendTo("replyQueue") public String handleEmailMessage(Message message) { try { String msg=new String(message.getBody(), StandardCharsets.UTF_8); log.info("---consumer接收到消息----{}",msg); return "客戶端響應消息:"+msg+"處理完成!"; } catch (Exception e) { log.error("處理業(yè)務(wù)消息失敗",e); } return null; }
與普通的消費者方法一樣,只需要RabbitListener注解監(jiān)聽業(yè)務(wù)隊列;但還需要根據(jù)消息獲取出ReplyTo地址,然后自己消費者方法內(nèi)部手動發(fā)送消息
1、優(yōu)點,更強烈的感受到消息請求 響應的交互性,流程看起來更清晰
2、缺點,代碼不雅
/** * 方式2 message消息獲取內(nèi)部reply rabbitmq手動發(fā)送 * * @param message * @return String * @author lei * @date 2022-09-19 16:17:52 */ @RabbitListener(queues = "bizQueue") public void handleEmailMessage2(Message message) { try { String msg = new String(message.getBody(), StandardCharsets.UTF_8); log.info("---consumer接收到消息----{}", msg); String replyTo = message.getMessageProperties().getReplyTo(); System.out.println("接收到的reply:" + replyTo); rabbitTemplate.convertAndSend(replyTo, "客戶端響應消息:" + msg + "處理完成!", x -> { x.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId()); return x; }); } catch (Exception e) { log.error("處理業(yè)務(wù)消息失敗",e); } }
這種方式與1其實是一致的,但我經(jīng)過測試,因為生產(chǎn)者消息指定了ReplyTo的地址,消費者端無需自己再次手動指定,即生產(chǎn)消息到哪里,是否響應以及響應消息發(fā)送到哪里全由生產(chǎn)端自己空,消費者只需要處理自身業(yè)務(wù)以及返回結(jié)果
/** * 方式三 方法有返回值,返回要響應的數(shù)據(jù) (reply 由生產(chǎn)者發(fā)送消息時指定,消費者不做任何處理) * * @param message * @return String * @author lei * @date 2022-09-19 23:17:47 */ @RabbitListener(queues ="bizQueue") public String handleEmailMessage3(Message message) { try { String msg=new String(message.getBody(), StandardCharsets.UTF_8); log.info("---consumer接收到消息----{}",msg); return "客戶端響應消息:"+msg+"處理完成!"; } catch (Exception e) { log.error("處理業(yè)務(wù)消息失敗",e); } return null; }
生產(chǎn)消息:
消費消息與響應:
收到的響應:
鏈路:
讀到這里,這篇“springboot rabbitmq reply消息直接回復模式怎么實現(xiàn)”文章已經(jīng)介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領(lǐng)會,如果想了解更多相關(guān)內(nèi)容的文章,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。