溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RabbitMQ中怎么實現(xiàn)延遲隊列

發(fā)布時間:2021-08-07 13:38:36 來源:億速云 閱讀:386 作者:Leah 欄目:編程語言

這篇文章給大家介紹RabbitMQ中怎么實現(xiàn)延遲隊列,內(nèi)容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

在 RabbitMQ 3.6.x 之前我們一般采用死信隊列+TTL過期時間來實現(xiàn)延遲隊列,我們這里不做過多介紹,可以參考之前文章來了解:TTL、死信隊列

在 RabbitMQ 3.6.x 開始,RabbitMQ 官方提供了延遲隊列的插件,可以下載放置到 RabbitMQ 根目錄下的 plugins 下。延遲隊列插件下載

首先我們創(chuàng)建交換機和消息隊列

import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class MQConfig {  public static final String LAZY_EXCHANGE = "Ex.LazyExchange";  public static final String LAZY_QUEUE = "MQ.LazyQueue";  public static final String LAZY_KEY = "lazy.#";  @Bean  public TopicExchange lazyExchange(){    //Map<String, Object> pros = new HashMap<>();    //設(shè)置交換機支持延遲消息推送    //pros.put("x-delayed-message", "topic");    TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);    exchange.setDelayed(true);    return exchange;  }  @Bean  public Queue lazyQueue(){    return new Queue(LAZY_QUEUE, true);  }  @Bean  public Binding lazyBinding(){    return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);  }}

我們在 Exchange 的聲明中可以設(shè)置exchange.setDelayed(true)來開啟延遲隊列,也可以設(shè)置為以下內(nèi)容傳入交換機聲明的方法中,因為第一種方式的底層就是通過這種方式來實現(xiàn)的。

//Map<String, Object> pros = new HashMap<>();    //設(shè)置交換機支持延遲消息推送    //pros.put("x-delayed-message", "topic");    TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);

發(fā)送消息時我們需要指定延遲推送的時間,我們這里在發(fā)送消息的方法中傳入?yún)?shù) new MessagePostProcessor() 是為了獲得 Message對象,因為需要借助 Message對象的api 來設(shè)置延遲時間。

import com.anqi.mq.config.MQConfig;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageDeliveryMode;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.Date;@Componentpublic class MQSender {  @Autowired  private RabbitTemplate rabbitTemplate;  //confirmCallback returnCallback 代碼省略,請參照上一篇   public void sendLazy(Object message){    rabbitTemplate.setMandatory(true);    rabbitTemplate.setConfirmCallback(confirmCallback);    rabbitTemplate.setReturnCallback(returnCallback);    //id + 時間戳 全局唯一    CorrelationData correlationData = new CorrelationData("12345678909"+new Date());    //發(fā)送消息時指定 header 延遲時間    rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,        new MessagePostProcessor() {      @Override      public Message postProcessMessage(Message message) throws AmqpException {        //設(shè)置消息持久化        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);        //message.getMessageProperties().setHeader("x-delay", "6000");        message.getMessageProperties().setDelay(6000);        return message;      }    }, correlationData);  }}

我們可以觀察 setDelay(Integer i)底層代碼,也是在 header 中設(shè)置 x-delay。等同于我們手動設(shè)置 header

message.getMessageProperties().setHeader("x-delay", "6000");

/** * Set the x-delay header. * @param delay the delay. * @since 1.6 */public void setDelay(Integer delay) {  if (delay == null || delay < 0) {    this.headers.remove(X_DELAY);  }  else {    this.headers.put(X_DELAY, delay);  }}

消費端進行消費

import com.rabbitmq.client.Channel;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.stereotype.Component;import java.io.IOException;import java.util.Map;@Componentpublic class MQReceiver {  @RabbitListener(queues = "MQ.LazyQueue")  @RabbitHandler  public void onLazyMessage(Message msg, Channel channel) throws IOException{    long deliveryTag = msg.getMessageProperties().getDeliveryTag();    channel.basicAck(deliveryTag, true);    System.out.println("lazy receive " + new String(msg.getBody()));  }

測試結(jié)果

import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest@RunWith(SpringRunner.class)public class MQSenderTest {  @Autowired  private MQSender mqSender;  @Test  public void sendLazy() throws Exception {    String msg = "hello spring boot";    mqSender.sendLazy(msg + ":");  }}

果然在 6 秒后收到了消息 lazy receive hello spring boot:

關(guān)于RabbitMQ中怎么實現(xiàn)延遲隊列就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI