您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)SpringBoot中如何使用RabbitMQ延時(shí)隊(duì)列,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
1.什么是MQ
MQ,是一種跨進(jìn)程的通信機(jī)制,用于上下游傳遞消息。
在互聯(lián)網(wǎng)架構(gòu)中,MQ是一種非常常見(jiàn)的上下游“邏輯解耦+物理解耦”的消息通信服務(wù)。
使用了MQ之后,消息發(fā)送上游只需要依賴(lài)MQ,不用依賴(lài)其他服務(wù)。
為什么會(huì)產(chǎn)生消息列隊(duì)?
不同進(jìn)程(process)之間傳遞消息時(shí),兩個(gè)進(jìn)程之間耦合程度過(guò)高,改動(dòng)一個(gè)進(jìn)程,引發(fā)必須修改另一個(gè)進(jìn)程,為了隔離這兩個(gè)進(jìn)程,在兩進(jìn)程間抽離出一層(一個(gè)模塊),所有兩進(jìn)程之間傳遞的消息,都必須通過(guò)消息隊(duì)列來(lái)傳遞,單獨(dú)修改某一個(gè)進(jìn)程,不會(huì)影響另一個(gè); 不同進(jìn)程(process)之間傳遞消息時(shí),為了實(shí)現(xiàn)標(biāo)準(zhǔn)化,將消息的格式規(guī)范化了,并且,某一個(gè)進(jìn)程接受的消息太多,一下子無(wú)法處理完,并且也有先后順序,必須對(duì)收到的消息進(jìn)行排隊(duì),因此誕生了事實(shí)上的消息隊(duì)列;
延時(shí)列隊(duì)的使用場(chǎng)景?
訂單業(yè)務(wù):在淘寶或者京東購(gòu)買(mǎi)東西,用戶(hù)下單后未付款則30分鐘后取消訂單。 短信通知:手機(jī)用戶(hù)交完話(huà)費(fèi)后,幾分鐘之內(nèi)將會(huì)收到繳費(fèi)信息
2.什么是RabbitMQ(這里就做了一下簡(jiǎn)單介紹)
RabbitMQ是一種消息隊(duì)列 ,用于常見(jiàn)的進(jìn)程通信。支持點(diǎn)對(duì)點(diǎn),請(qǐng)求應(yīng)答和發(fā)布訂閱模式 并且提供多種語(yǔ)言的支持。常見(jiàn)的java,c#,php都支持。
常被用在異步處理,應(yīng)用解耦。流量消鋒等復(fù)雜的業(yè)務(wù)場(chǎng)景中。和java的kafka一樣都屬于消息中間件。
下載地址:
https://www.rabbitmq.com/download.html
進(jìn)入RabbitMQ官網(wǎng)
1.第一步
第二步
下載好后不要著急安裝RabbitMQ,我們這里還需要安裝Erlang
下載地址:http://www.erlang.org/download/otp_win64_17.3.exe
安裝步驟
步驟一
步驟二
步驟三
步驟四
安裝完成
現(xiàn)在安裝RabbitMQ
步驟一
步驟二
步驟三
安裝完成
啟動(dòng)RabbitMQ管理工具
開(kāi)始菜單 — 最新添加 — 展開(kāi) — 選中雙擊
輸入命令:rabbitmq-plugins enable rabbitmq_management
效果如果圖
在瀏覽器中輸入地址查看:http://127.0.0.1:15672/
出現(xiàn)次頁(yè)面代表成功,默認(rèn)用戶(hù)和密碼都是guest/ guest
若不出現(xiàn)此頁(yè)面,就是安裝失敗了,不要慌,多半問(wèn)題在系統(tǒng)用戶(hù)名必須是中文(放心有解決辦法):
Windows下安裝RabbitMQ后,按正常RabbitMQ會(huì)自動(dòng)注冊(cè)服務(wù)并自動(dòng)啟動(dòng),但是如果有的道友不注意中英文目錄就會(huì)出現(xiàn)服務(wù)啟動(dòng)后幾秒鐘自動(dòng)停止,而且反反復(fù)復(fù)。
出現(xiàn)這種情況一般都是由我們的用戶(hù)名是中文,而導(dǎo)致默認(rèn)的DB和log訪問(wèn)出現(xiàn)問(wèn)。所以我建議以后大家在使用windows操作系統(tǒng)的時(shí)候盡量用英文來(lái)命名文件或目錄,這樣會(huì)極大的減小以后安裝軟件出現(xiàn)莫名其妙的問(wèn)題的bug。
接下來(lái)我們先卸載我們的RabbitMQ,然后在我們的系統(tǒng)變量里設(shè)置一個(gè)RABBITMQ_BASE 的變量路徑為一個(gè)不含英文的路徑 比如 E:\rabbit,最后我們重新安裝RabbitMQ即可,然后就會(huì)看到RabbitMQ服務(wù)自動(dòng)注冊(cè)了,并且不會(huì)自動(dòng)停止。
SpringBoot整合RabbitMQ
1.添加依賴(lài)
pom.xml中添加 spring-boot-starter-amqp的依賴(lài)
<!-- spring-boot-starter-amqp的依賴(lài) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
其他依賴(lài)
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency>
application.yml文件中配置rabbitmq相關(guān)內(nèi)容
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
這里我們環(huán)境就搭建起來(lái)了
2.具體編碼實(shí)現(xiàn)
配置列隊(duì)
package com.example.spring_boot_rabbitmq; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @author:zq * @date: Greated in 2019/12/19 11:46 * 配置隊(duì)列 */ @Configuration @Slf4j public class DelayRabbitConfig { /** * 延遲隊(duì)列 TTL 名稱(chēng) */ private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue"; /** * DLX,dead letter發(fā)送到的 exchange * 延時(shí)消息就是發(fā)送到該交換機(jī)的 */ public static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange"; /** * routing key 名稱(chēng) * 具體消息發(fā)送在該 routingKey 的 */ public static final String ORDER_DELAY_ROUTING_KEY = "order_delay"; public static final String ORDER_QUEUE_NAME = "user.order.queue"; public static final String ORDER_EXCHANGE_NAME = "user.order.exchange"; public static final String ORDER_ROUTING_KEY = "order"; /** * 延遲隊(duì)列配置 * <p> * 1、params.put("x-message-ttl", 5 * 1000); * 第一種方式是直接設(shè)置 Queue 延遲時(shí)間 但如果直接給隊(duì)列設(shè)置過(guò)期時(shí)間,這種做法不是很靈活,(當(dāng)然二者是兼容的,默認(rèn)是時(shí)間小的優(yōu)先) * 2、rabbitTemplate.convertAndSend(book, message -> { * message.getMessageProperties().setExpiration(2 * 1000 + ""); * return message; * }); * 第二種就是每次發(fā)送消息動(dòng)態(tài)設(shè)置延遲時(shí)間,這樣我們可以靈活控制 **/ @Bean public Queue delayOrderQueue() { Map<String, Object> params = new HashMap<>(); // x-dead-letter-exchange 聲明了隊(duì)列里的死信轉(zhuǎn)發(fā)到的DLX名稱(chēng), params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME); // x-dead-letter-routing-key 聲明了這些死信在轉(zhuǎn)發(fā)時(shí)攜帶的 routing-key 名稱(chēng)。 params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY); return new Queue(ORDER_DELAY_QUEUE, true, false, false, params); } /** * 需要將一個(gè)隊(duì)列綁定到交換機(jī)上,要求該消息與一個(gè)特定的路由鍵完全匹配。 * 這是一個(gè)完整的匹配。如果一個(gè)隊(duì)列綁定到該交換機(jī)上要求路由鍵 “dog”,則只有被標(biāo)記為“dog”的消息才被轉(zhuǎn)發(fā), * 不會(huì)轉(zhuǎn)發(fā)dog.puppy,也不會(huì)轉(zhuǎn)發(fā)dog.guard,只會(huì)轉(zhuǎn)發(fā)dog。 * @return DirectExchange */ @Bean public DirectExchange orderDelayExchange() { return new DirectExchange(ORDER_DELAY_EXCHANGE); } @Bean public Binding dlxBinding() { return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY); } @Bean public Queue orderQueue() { return new Queue(ORDER_QUEUE_NAME, true); } /** * 將路由鍵和某模式進(jìn)行匹配。此時(shí)隊(duì)列需要綁定要一個(gè)模式上。 * 符號(hào)“#”匹配一個(gè)或多個(gè)詞,符號(hào)“*”匹配不多不少一個(gè)詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會(huì)匹配到“audit.irs”。 **/ @Bean public TopicExchange orderTopicExchange() { return new TopicExchange(ORDER_EXCHANGE_NAME); } @Bean public Binding orderBinding() { // TODO 如果要讓延遲隊(duì)列之間有關(guān)聯(lián),這里的 routingKey 和 綁定的交換機(jī)很關(guān)鍵 return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY); } }
創(chuàng)建一個(gè)Order實(shí)體類(lèi)
package com.example.spring_boot_rabbitmq.pojo; import lombok.Data; import java.io.Serializable; /** * @author:zq * @date: Greated in 2019/12/19 11:49 */ @Data public class Order implements Serializable { private static final long serialVersionUID = -2221214252163879885L; private String orderId; // 訂單id private Integer orderStatus; // 訂單狀態(tài) 0:未支付,1:已支付,2:訂單已取消 private String orderName; // 訂單名字 }
接收者
package com.example.spring_boot_rabbitmq; import com.example.spring_boot_rabbitmq.pojo.Order; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Date; /** * @author:zq * @date: Greated in 2019/12/19 11:53 * 接收者 */ @Component @Slf4j public class DelayReceiver { @RabbitListener(queues = {DelayRabbitConfig.ORDER_QUEUE_NAME}) public void orderDelayQueue(Order order, Message message, Channel channel) { log.info("###########################################"); log.info("【orderDelayQueue 監(jiān)聽(tīng)的消息】 - 【消費(fèi)時(shí)間】 - [{}]- 【訂單內(nèi)容】 - [{}]", new Date(), order.toString()); if(order.getOrderStatus() == 0) { order.setOrderStatus(2); log.info("【該訂單未支付,取消訂單】" + order.toString()); } else if(order.getOrderStatus() == 1) { log.info("【該訂單已完成支付】"); } else if(order.getOrderStatus() == 2) { log.info("【該訂單已取消】"); } log.info("###########################################"); } }
發(fā)送者
package com.example.spring_boot_rabbitmq; import com.example.spring_boot_rabbitmq.pojo.Order; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @author:zq * @date: Greated in 2019/12/19 11:55 * 發(fā)送者 */ @Component @Slf4j public class DelaySender { @Autowired private AmqpTemplate amqpTemplate; public void sendDelay(Order order) { log.info("【訂單生成時(shí)間】" + new Date().toString() +"【1分鐘后檢查訂單是否已經(jīng)支付】" + order.toString() ); this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE, DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY, order, message -> { // 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么這一句也可以省略,具體根據(jù)業(yè)務(wù)需要是聲明 Queue 的時(shí)候就指定好延遲時(shí)間還是在發(fā)送自己控制時(shí)間 message.getMessageProperties().setExpiration(1 * 1000 * 60 + ""); return message; }); } }
測(cè)試,訪問(wèn)http://localhost:8080/sendDelay查看日志輸出
package com.example.spring_boot_rabbitmq;import com.example.spring_boot_rabbitmq.pojo.Order;import org.springframework.web.bind.annotation.RestController;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;/** * @author:zq * @date: Greated in 2019/12/19 11:57 * 測(cè)試 */@RestControllerpublic class TestController { @Autowired private DelaySender delaySender; @GetMapping("/sendDelay") public Object sendDelay() { Order order1 = new Order(); order1.setOrderStatus(0); order1.setOrderId("123456"); order1.setOrderName("小米6"); Order order2 = new Order(); order2.setOrderStatus(1); order2.setOrderId("456789"); order2.setOrderName("小米8"); delaySender.sendDelay(order1); delaySender.sendDelay(order2); return "ok"; }}
關(guān)于SpringBoot中如何使用RabbitMQ延時(shí)隊(duì)列就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。