您好,登錄后才能下訂單哦!
這篇文章給大家介紹RabbitMQ中怎么實現(xiàn)延遲隊列,內(nèi)容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
在 RabbitMQ 3.6.x 之前我們一般采用死信隊列+TTL過期時間來實現(xiàn)延遲隊列,我們這里不做過多介紹,可以參考之前文章來了解:TTL、死信隊列
在 RabbitMQ 3.6.x 開始,RabbitMQ 官方提供了延遲隊列的插件,可以下載放置到 RabbitMQ 根目錄下的 plugins 下。延遲隊列插件下載
首先我們創(chuàng)建交換機和消息隊列
import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class MQConfig { public static final String LAZY_EXCHANGE = "Ex.LazyExchange"; public static final String LAZY_QUEUE = "MQ.LazyQueue"; public static final String LAZY_KEY = "lazy.#"; @Bean public TopicExchange lazyExchange(){ //Map<String, Object> pros = new HashMap<>(); //設(shè)置交換機支持延遲消息推送 //pros.put("x-delayed-message", "topic"); TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros); exchange.setDelayed(true); return exchange; } @Bean public Queue lazyQueue(){ return new Queue(LAZY_QUEUE, true); } @Bean public Binding lazyBinding(){ return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY); }}
我們在 Exchange 的聲明中可以設(shè)置exchange.setDelayed(true)來開啟延遲隊列,也可以設(shè)置為以下內(nèi)容傳入交換機聲明的方法中,因為第一種方式的底層就是通過這種方式來實現(xiàn)的。
//Map<String, Object> pros = new HashMap<>(); //設(shè)置交換機支持延遲消息推送 //pros.put("x-delayed-message", "topic"); TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
發(fā)送消息時我們需要指定延遲推送的時間,我們這里在發(fā)送消息的方法中傳入?yún)?shù) new MessagePostProcessor() 是為了獲得 Message對象,因為需要借助 Message對象的api 來設(shè)置延遲時間。
import com.anqi.mq.config.MQConfig;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageDeliveryMode;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.Date;@Componentpublic class MQSender { @Autowired private RabbitTemplate rabbitTemplate; //confirmCallback returnCallback 代碼省略,請參照上一篇 public void sendLazy(Object message){ rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 時間戳 全局唯一 CorrelationData correlationData = new CorrelationData("12345678909"+new Date()); //發(fā)送消息時指定 header 延遲時間 rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //設(shè)置消息持久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); //message.getMessageProperties().setHeader("x-delay", "6000"); message.getMessageProperties().setDelay(6000); return message; } }, correlationData); }}
我們可以觀察 setDelay(Integer i)底層代碼,也是在 header 中設(shè)置 x-delay。等同于我們手動設(shè)置 header
message.getMessageProperties().setHeader("x-delay", "6000");
/** * Set the x-delay header. * @param delay the delay. * @since 1.6 */public void setDelay(Integer delay) { if (delay == null || delay < 0) { this.headers.remove(X_DELAY); } else { this.headers.put(X_DELAY, delay); }}
消費端進行消費
import com.rabbitmq.client.Channel;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.stereotype.Component;import java.io.IOException;import java.util.Map;@Componentpublic class MQReceiver { @RabbitListener(queues = "MQ.LazyQueue") @RabbitHandler public void onLazyMessage(Message msg, Channel channel) throws IOException{ long deliveryTag = msg.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag, true); System.out.println("lazy receive " + new String(msg.getBody())); }
測試結(jié)果
import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest@RunWith(SpringRunner.class)public class MQSenderTest { @Autowired private MQSender mqSender; @Test public void sendLazy() throws Exception { String msg = "hello spring boot"; mqSender.sendLazy(msg + ":"); }}
果然在 6 秒后收到了消息 lazy receive hello spring boot:
關(guān)于RabbitMQ中怎么實現(xiàn)延遲隊列就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。