溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何進(jìn)行SpringBoot+RabbitMQ方式收發(fā)消息

發(fā)布時間:2021-09-29 17:20:21 來源:億速云 閱讀:130 作者:柒染 欄目:編程語言

本篇文章給大家分享的是有關(guān)如何進(jìn)行SpringBoot+RabbitMQ方式收發(fā)消息,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

本篇會和SpringBoot做整合,采用自動配置的方式進(jìn)行開發(fā),我們只需要聲明RabbitMQ地址就可以了,關(guān)于各種創(chuàng)建連接關(guān)閉連接的事都由Spring幫我們了~

交給Spring幫我們管理連接可以讓我們專注于業(yè)務(wù)邏輯,就像聲明式事務(wù)一樣易用,方便又高效。

祝有好收獲,先贊后看,快樂無限。

1. ?環(huán)境配置

第一節(jié)我們先來搞一下環(huán)境的配置,上一篇中我們已經(jīng)引入了自動配置的包,我們既然使用了自動配置的方式,那RabbitMQ的連接信息我們直接放在配置文件中就行了,就像我們需要用到JDBC連接的時候去配置一下DataSource一樣。

如何進(jìn)行SpringBoot+RabbitMQ方式收發(fā)消息

如圖所示,我們只需要指明一下連接的IP+端口號和用戶名密碼就行了,這里我用的是默認(rèn)的用戶名與密碼,不寫的話默認(rèn)也都是guest,端口號也是默認(rèn)5672。

主要我們需要看一下手動確認(rèn)消息的配置,需要配置成manual才是手動確認(rèn),日后還會有其他的配置項(xiàng),眼下我們配置這一個就可以了。

接下來我們要配置一個Queue,上一篇中我們往一個名叫erduo的隊(duì)列中發(fā)送消息,當(dāng)時是我們手動定義的此隊(duì)列,這里我們也需要手動配置,聲明一個Bean就可以了。

@Configuration  public class RabbitmqConfig {      @Bean      public Queue erduo() {          // 其三個參數(shù):durable exclusive autoDelete          // 一般只設(shè)置一下持久化即可          return new Queue("erduo",true);      }    }

就這么簡單聲明一下就可以了,當(dāng)然了RabbitMQ畢竟是一個獨(dú)立的組件,如果你在RabbitMQ中通過其他方式已經(jīng)創(chuàng)建過一個名叫erduo的隊(duì)列了,你這里也可以不聲明,這里起到的一個效果就是如果你沒有這個隊(duì)列,會按照你聲明的方式幫你創(chuàng)建這個隊(duì)列。

配置完環(huán)境之后,我們就可以以SpringBoot的方式來編寫生產(chǎn)者和消費(fèi)者了。

2. ?生產(chǎn)者與RabbitTemplate

和上一篇的節(jié)奏一樣,我們先來編寫生產(chǎn)者,不過這次我要引入一個新的工具:RabbitTemplate。

聽它的這個名字就知道,又是一個拿來即用的工具類,Spring家族這點(diǎn)就很舒服,什么東西都給你封裝一遍,讓你用起來更方便更順手。

RabbitTemplate實(shí)現(xiàn)了標(biāo)準(zhǔn)AmqpTemplate接口,功能大致可以分為發(fā)送消息和接受消息。

我們這里是在生產(chǎn)者中來用,主要就是使用它的發(fā)送消息功能:send和convertAndSend方法。

// 發(fā)送消息到默認(rèn)的Exchange,使用默認(rèn)的routing key  void send(Message message) throws AmqpException;  // 使用指定的routing key發(fā)送消息到默認(rèn)的exchange  void send(String routingKey, Message message) throws AmqpException;  // 使用指定的routing key發(fā)送消息到指定的exchange  void send(String exchange, String routingKey, Message message) throws AmqpException;

send方法是發(fā)送byte數(shù)組的數(shù)據(jù)的模式,這里代表消息內(nèi)容的對象是Message對象,它的構(gòu)造方法就是傳入byte數(shù)組數(shù)據(jù),所以我們需要把我們的數(shù)據(jù)轉(zhuǎn)成byte數(shù)組然后構(gòu)造成一個Message對象再進(jìn)行發(fā)送。

// Object類型,可以傳入POJO  void convertAndSend(Object message) throws AmqpException;  void convertAndSend(String routingKey, Object message) throws AmqpException;  void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;

convertAndSend方法是可以傳入POJO對象作為參數(shù),底層是有一個MessageConverter幫我們自動將數(shù)據(jù)轉(zhuǎn)換成byte類型或String或序列化類型。

所以這里支持的傳入對象也只有三種:byte類型,String類型和實(shí)現(xiàn)了Serializable接口的POJO。

介紹完了,我們可以看一下代碼:

@Slf4j  @Component("rabbitProduce")  public class RabbitProduce {      @Autowired      private RabbitTemplate rabbitTemplate;      public void send() {          String message = "Hello 我是作者和耳朵,歡迎關(guān)注我。" + LocalDateTime.now().toString();          System.out.println("Message content : " + message);          // 指定消息類型          MessageProperties props = MessagePropertiesBuilder.newInstance()                  .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build();          rabbitTemplate.send(Producer.QUEUE_NAME,new Message(message.getBytes(StandardCharsets.UTF_8),props));          System.out.println("消息發(fā)送完畢。");     }      public void convertAndSend() {          User user = new User();          System.out.println("Message content : " + user);          rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user);          System.out.println("消息發(fā)送完畢。");      }  }

這里我特意寫明了兩個例子,一個用來測試send,另一個用來測試convertAndSend。

send方法里我們看下來和之前的代碼是幾乎一樣的,定義一個消息,然后直接send,但是這個構(gòu)造消息的構(gòu)造方法可能比我們想的要多一個參數(shù),我們原來說的只要把數(shù)據(jù)轉(zhuǎn)成二進(jìn)制數(shù)組放進(jìn)去即可,現(xiàn)在看來還要多放一個參數(shù)了。

MessageProperties,是的我們需要多放一個MessageProperties對象,從他的名字我們也可以看出它的功能就是附帶一些參數(shù),但是某些參數(shù)是少不了的,不帶不行。

比如我的代碼這里就是設(shè)置了一下消息的類型,消息的類型有很多種可以是二進(jìn)制類型,文本類型,或者序列化類型,JSON類型,我這里設(shè)置的就是文本類型,指定類型是必須的,也可以為我們拿到消息之后要將消息轉(zhuǎn)換成什么樣的對象提供一個參考。

convertAndSend方法就要簡單太多,這里我放了一個User對象拿來測試用,直接指定隊(duì)列然后放入這個對象即可。

Tips:User必須實(shí)現(xiàn)Serializable接口,不然的話調(diào)用此方法的時候會拋出IllegalArgumentException異常。

代碼完成之后我們就可以調(diào)用了,這里我寫一個測試類進(jìn)行調(diào)用:

@SpringBootTest  public class RabbitProduceTest {      @Autowired      private RabbitProduce rabbitProduce;      @Test      public void sendSimpleMessage() {          rabbitProduce.send();          rabbitProduce.convertAndSend();      }  }

效果如下圖~

如何進(jìn)行SpringBoot+RabbitMQ方式收發(fā)消息

同時在控制臺使用命令rabbitmqctl.bat list_queues查看隊(duì)列-erduo現(xiàn)在的情況:

如此一來,我們的生產(chǎn)者測試就算完成了,現(xiàn)在消息隊(duì)列里兩條消息了,而且消息類型肯定不一樣,一個是我們設(shè)置的文本類型,一個是自動設(shè)置的序列化類型。

3. ?消費(fèi)者與RabbitListener

既然隊(duì)列里面已經(jīng)有消息了,接下來我們就要看我們該如何通過新的方式拿到消息并消費(fèi)與確認(rèn)了。

消費(fèi)者這里我們要用到@RabbitListener來幫我們拿到指定隊(duì)列消息,它的用法很簡單也很復(fù)雜,我們可以先來說簡單的方式,直接放到方法上,指定監(jiān)聽的隊(duì)列就行了。

@Slf4j  @Component("rabbitConsumer")  public class RabbitConsumer {      @RabbitListener(queues = Producer.QUEUE_NAME)      public void onMessage(Message message, Channel channel) throws Exception {          System.out.println("Message content : " + message);          channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);          System.out.println("消息已確認(rèn)");      }  }

這段代碼就代表onMessage方法會處理erduo(Producer.QUEUE_NAME是常量字符串"erduo")隊(duì)列中的消息。

我們可以看到這個方法里面有兩個參數(shù),Message和Channel,如果用不到Channel可以不寫此參數(shù),但是Message消息一定是要的,它代表了消息本身。

我們可以想想,我們的程序從RabbitMQ之中拉回一條條消息之后,要以怎么樣的方式展示給我們呢?

沒錯,就是封裝為一個個Message對象,這里面放入了一條消息的所有信息,數(shù)據(jù)結(jié)構(gòu)是什么樣一會我一run你就能看到了。

同時這里我們使用Channel做一個消息確認(rèn)的操作,這里的DeliveryTag代表的是這個消息在隊(duì)列中的序號,這個信息存放在MessageProperties中。

4. ?SpringBoot 啟動!

編寫完生產(chǎn)者和消費(fèi)者,同時已經(jīng)運(yùn)行過生產(chǎn)者往消息隊(duì)列里面放了兩條信息,接下來我們可以直接啟動消息,查看消費(fèi)情況:

如何進(jìn)行SpringBoot+RabbitMQ方式收發(fā)消息

在我紅色框線標(biāo)記的地方可以看到,因?yàn)槲覀冇辛讼M(fèi)者所以項(xiàng)目啟動后先和RabbitMQ建立了一個連接進(jìn)行監(jiān)聽隊(duì)列。

隨后就開始消費(fèi)我們隊(duì)列中的兩條消息:

第一條信息是contentType=text/plain類型,所以直接就在控制臺上打印出了具體內(nèi)容。

第二條信息是contentType=application/x-java-serialized-object,在打印的時候只打印了一個內(nèi)存地址+字節(jié)大小。

不管怎么說,數(shù)據(jù)我們是拿到了,也就是代表我們的消費(fèi)是沒有問題的,同時也都進(jìn)行了消息確認(rèn)操作,從數(shù)據(jù)上看,整個消息可以分為兩部分:body和MessageProperties。

我們可以單獨(dú)使用一個注解拿到這個body的內(nèi)容 - @Payload

@RabbitListener(queues = Producer.QUEUE_NAME)  public void onMessage(@Payload String body, Channel channel) throws Exception {      System.out.println("Message content : " + body);  }

也可以單獨(dú)使用一個注解拿到MessageProperties的headers屬性,headers屬性在截圖里也可以看到,只不過是個空的 - @Headers。

@RabbitListener(queues = Producer.QUEUE_NAME)  public void onMessage(@Payload String body, @Headers Map<String,Object> headers) throws Exception {      System.out.println("Message content : " + body);      System.out.println("Message headers : " + headers);  }

這兩個注解都算是擴(kuò)展知識,我還是更喜歡直接拿到全部,全都要?。?!

上面我們已經(jīng)完成了消息的發(fā)送與消費(fèi),整個過程我們可以再次回想一下,一切都和我畫的這張圖上一樣的軌跡:

如何進(jìn)行SpringBoot+RabbitMQ方式收發(fā)消息

只不過我們一直沒有指定Exchage一直使用的默認(rèn)路由,希望大家好好記住這張圖。

5. ?@RabbitListener與@RabbitHandler

下面再來補(bǔ)一些知識點(diǎn),有關(guān)@RabbitListener與@RabbitHandler。

@RabbitListener上面我們已經(jīng)簡單的進(jìn)行了使用,稍微擴(kuò)展一下它其實(shí)是可以監(jiān)聽多個隊(duì)列的,就像這樣:

@RabbitListener(queues = { "queue1", "queue2" })  public void onMessage(Message message, Channel channel) throws Exception {      System.out.println("Message content : " + message);      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false)      System.out.println("消息已確認(rèn)");  }

還有一些其他的特性如綁定之類的,這里不再贅述因?yàn)樘簿幋a了一般用不上。

下面來說說這節(jié)要主要講的一個特性:@RabbitListener和@RabbitHandler的搭配使用。

前面我們沒有提到,@RabbitListener注解其實(shí)是可以注解在類上的,這個注解在類上標(biāo)志著這個類監(jiān)聽某個隊(duì)列或某些隊(duì)列。

這兩個注解的搭配使用就要讓@RabbitListener注解在類上,然后用@RabbitHandler注解在方法上,根據(jù)方法參數(shù)的不同自動識別并去消費(fèi),寫個例子給大家看一看更直觀一些。

@Slf4j  @Component("rabbitConsumer")  @RabbitListener(queues = Producer.QUEUE_NAME)  public class RabbitConsumer {     @RabbitHandler      public void onMessage(@Payload String message){          System.out.println("Message content : " + message);      }      @RabbitHandler      public void onMessage(@Payload User user) {          System.out.println("Message content : " + user);      }  }

大家可以看看這個例子,我們先用@RabbitListener監(jiān)聽erduo隊(duì)列中的消息,然后使用@RabbitHandler注解了兩個方法。

  •  第一個方法的body類型是String類型,這就代表著這個方法只能處理文本類型的消息。

  •  第二個方法的body類型是User類型,這就代表著這個方法只能處理序列化類型且為User類型的消息。

這兩個方法正好對應(yīng)著我們第二節(jié)中測試類會發(fā)送的兩種消息,所以我們往RabbitMQ中發(fā)送兩條測試消息,用來測試這段代碼,看看效果:

如何進(jìn)行SpringBoot+RabbitMQ方式收發(fā)消息

都在控制臺上如常打印了,如果@RabbitHandler注解的方法中沒有一個的類型可以和你消息的類型對的上,比如消息都是byte數(shù)組類型,這里沒有對應(yīng)的方法去接收,系統(tǒng)就會在控制臺不斷的報錯,如果你出現(xiàn)這個情況就證明你類型寫的不正確。

假設(shè)你的erduo隊(duì)列中會出現(xiàn)三種類型的消息:byte,文本和序列化,那你就必須要有對應(yīng)的處理這三種消息的方法,不然消息發(fā)過來的時候就會因?yàn)闊o法正確轉(zhuǎn)換而報錯。

而且使用了@RabbitHandler注解之后就不能再和之前一樣使用Message做接收類型。

@RabbitHandler  public void onMessage(Message message, Channel channel) throws Exception {      System.out.println("Message content : " + message);      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);      System.out.println("消息已確認(rèn)");  }

這樣寫的話會報類型轉(zhuǎn)換異常的,所以二者選其一。

同時上文我的@RabbitHandler沒有進(jìn)行消息確認(rèn),大家可以自己試一下進(jìn)行消息確認(rèn)。

6. ?消息的序列化轉(zhuǎn)換

通過上文我們已經(jīng)知道,能被自動轉(zhuǎn)換的對象只有byte[]、String、java序列化對象(實(shí)現(xiàn)了Serializable接口的對象),但是并不是所有的Java對象都會去實(shí)現(xiàn)Serializable接口,而且序列化的過程中使用的是JDK自帶的序列化方法,效率低下。

所以我們更普遍的做法是:使用Jackson先將數(shù)據(jù)轉(zhuǎn)換成JSON格式發(fā)送給RabbitMQ,再接收消息的時候再用Jackson將數(shù)據(jù)反序列化出來。

這樣做可以完美解決上面的痛點(diǎn):消息對象既不必再去實(shí)現(xiàn)Serializable接口,也有比較高的效率(Jackson序列化效率業(yè)界應(yīng)該是最好的了)。

默認(rèn)的消息轉(zhuǎn)換方案是消息轉(zhuǎn)換頂層接口-MessageConverter的一個子類:SimpleMessageConverter,我們?nèi)绻獡Q到另一個消息轉(zhuǎn)換器只需要替換掉這個轉(zhuǎn)換器就行了。

如何進(jìn)行SpringBoot+RabbitMQ方式收發(fā)消息

上圖是MessageConverter結(jié)構(gòu)樹的結(jié)構(gòu)樹,可以看到除了SimpleMessageConverter之外還有一個Jackson2JsonMessageConverter,我們只需要將它定義為Bean,就可以直接使用這個轉(zhuǎn)換器了。

@Bean      public MessageConverter jackson2JsonMessageConverter() {          return new Jackson2JsonMessageConverter(jacksonObjectMapper);      }

這樣就可以了,這里的jacksonObjectMapper可以不傳入,但是默認(rèn)的ObjectMapper方案對JDK8的時間日期序列化會不太友好,具體可以參考我的上一篇文章:從LocalDateTime序列化探討全局一致性序列化,總的來說就是定義了自己的ObjectMapper。

同時為了接下來測試方便,我又定義了一個專門測試JSON序列化的隊(duì)列:

@Bean  public Queue erduoJson() {      // 其三個參數(shù):durable exclusive autoDelete      // 一般只設(shè)置一下持久化即可      return new Queue("erduo_json",true);  }

如此之后就可以進(jìn)行測試了,先是生產(chǎn)者代碼:

public void sendObject() {          Client client = new Client();          System.out.println("Message content : " + client);          rabbitTemplate.convertAndSend(RabbitJsonConsumer.JSON_QUEUE,client);          System.out.println("消息發(fā)送完畢。");      }

我又重新定義了一個Client對象,它和之前測試使用的User對象成員變量都是一樣的,不一樣的是它沒有實(shí)現(xiàn)Serializable接口。

同時為了保留之前的測試代碼,我又新建了一個RabbitJsonConsumer,用于測試JSON序列化的相關(guān)消費(fèi)代碼,里面定義了一個靜態(tài)變量:JSON_QUEUE = "erduo_json";

所以這段代碼是將Client對象作為消息發(fā)送到"erduo_json"隊(duì)列中去,隨后我們在測試類中run一下進(jìn)行一次發(fā)送。

緊著是消費(fèi)者代碼:

@Slf4j  @Component("rabbitJsonConsumer")  @RabbitListener(queues = RabbitJsonConsumer.JSON_QUEUE)  public class RabbitJsonConsumer {      public static final String JSON_QUEUE = "erduo_json";      @RabbitHandler      public void onMessage(Client client, @Headers Map<String,Object> headers, Channel channel) throws Exception {          System.out.println("Message content : " + client);          System.out.println("Message headers : " + headers);          channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);          System.out.println("消息已確認(rèn)");      }  }

有了上文的經(jīng)驗(yàn)之后,這段代碼理解起來也是很簡單了吧,同時給出了上一節(jié)沒寫的如何在@RabbitHandler模式下進(jìn)行消息簽收。

我們直接來看看效果:

如何進(jìn)行SpringBoot+RabbitMQ方式收發(fā)消息

如何進(jìn)行SpringBoot+RabbitMQ方式收發(fā)消息

在打印的Headers里面,往后翻可以看到contentType=application/json,這個contentType是表明了消息的類型,這里正是說明我們新的消息轉(zhuǎn)換器生效了,將所有消息都轉(zhuǎn)換成了JSON類型。

后記

這兩篇講完了RabbitMQ的基本收發(fā)消息,包括手動配置和自動配置的兩種方式,這些大家仔細(xì)研讀之后應(yīng)該會對RabbitMQ收發(fā)消息沒什么疑問了~

不過我們一直以來發(fā)消息時都是使用默認(rèn)的交換機(jī),下篇將會講述一下RabbitMQ的幾種交換機(jī)類型,以及其使用方式。

以上就是如何進(jìn)行SpringBoot+RabbitMQ方式收發(fā)消息,小編相信有部分知識點(diǎn)可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI