溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列(死信隊(duì)列)

發(fā)布時(shí)間:2020-06-23 05:22:11 來源:網(wǎng)絡(luò) 閱讀:452 作者:Java_老男孩 欄目:編程語言

基于隊(duì)列和基于消息的TTL

TTL是time to live 的簡稱,顧名思義指的是消息的存活時(shí)間。rabbitMq可以從兩種維度設(shè)置消息過期時(shí)間,分別是隊(duì)列和消息本身。 隊(duì)列消息過期時(shí)間-Per-Queue Message TTL: 通過設(shè)置隊(duì)列的x-message-ttl參數(shù)來設(shè)置指定隊(duì)列上消息的存活時(shí)間,其值是一個(gè)非負(fù)整數(shù),單位為微秒。不同隊(duì)列的過期時(shí)間互相之間沒有影響,即使是對于同一條消息。隊(duì)列中的消息存在隊(duì)列中的時(shí)間超過過期時(shí)間則成為死信。

死信交換機(jī)DLX

隊(duì)列中的消息在以下三種情況下會(huì)變成死信 (1)消息被拒絕(basic.reject 或者 basic.nack),并且requeue=false; (2)消息的過期時(shí)間到期了; (3)隊(duì)列長度限制超過了。 當(dāng)隊(duì)列中的消息成為死信以后,如果隊(duì)列設(shè)置了DLX那么消息會(huì)被發(fā)送到DLX。通過x-dead-letter-exchange設(shè)置DLX,通過這個(gè)x-dead-letter-routing-key設(shè)置消息發(fā)送到DLX所用的routing-key,如果不設(shè)置默認(rèn)使用消息本身的routing-key.

 @Bean
  public Queue lindQueue() {
    return QueueBuilder.durable(LIND_QUEUE)
        .withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//設(shè)置死信交換機(jī)
        .withArgument("x-message-ttl", makeCallExpire)
        .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//設(shè)置死信routingKey
        .build();
  }

實(shí)現(xiàn)的過程

RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列(死信隊(duì)列)

完整的代碼

@Component
public class AmqpConfig {
  /**
   * 主要測試一個(gè)死信隊(duì)列,功能主要實(shí)現(xiàn)延時(shí)消費(fèi),原理是先把消息發(fā)到正常隊(duì)列,
   * 正常隊(duì)列有超時(shí)時(shí)間,當(dāng)達(dá)到時(shí)間后自動(dòng)發(fā)到死信隊(duì)列,然后由消費(fèi)者去消費(fèi)死信隊(duì)列里的消息.
   */
  public static final String LIND_EXCHANGE = "lind.exchange";
  public static final String LIND_DL_EXCHANGE = "lind.dl.exchange";
  public static final String LIND_QUEUE = "lind.queue";
  public static final String LIND_DEAD_QUEUE = "lind.queue.dead";

  public static final String LIND_FANOUT_EXCHANGE = "lindFanoutExchange";
  /**
   * 單位為微秒.
   */
  @Value("${tq.makecall.expire:60000}")
  private long makeCallExpire;

  /**
   * 創(chuàng)建普通交換機(jī).
   */
  @Bean
  public TopicExchange lindExchange() {
    return (TopicExchange) ExchangeBuilder.topicExchange(LIND_EXCHANGE).durable(true)
        .build();
  }

  /**
   * 創(chuàng)建死信交換機(jī).
   */
  @Bean
  public TopicExchange lindExchangeDl() {
    return (TopicExchange) ExchangeBuilder.topicExchange(LIND_DL_EXCHANGE).durable(true)
        .build();
  }

  /**
   * 創(chuàng)建普通隊(duì)列.
   */
  @Bean
  public Queue lindQueue() {
    return QueueBuilder.durable(LIND_QUEUE)
        .withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//設(shè)置死信交換機(jī)
        .withArgument("x-message-ttl", makeCallExpire)
        .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//設(shè)置死信routingKey
        .build();
  }

  /**
   * 創(chuàng)建死信隊(duì)列.
   */
  @Bean
  public Queue lindDelayQueue() {
    return QueueBuilder.durable(LIND_DEAD_QUEUE).build();
  }

  /**
   * 綁定死信隊(duì)列.
   */
  @Bean
  public Binding bindDeadBuilders() {
    return BindingBuilder.bind(lindDelayQueue())
        .to(lindExchangeDl())
        .with(LIND_DEAD_QUEUE);
  }

  /**
   * 綁定普通隊(duì)列.
   *
   * @return
   */
  @Bean
  public Binding bindBuilders() {
    return BindingBuilder.bind(lindQueue())
        .to(lindExchange())
        .with(LIND_QUEUE);
  }

  /**
   * 廣播交換機(jī).
   *
   * @return
   */
  @Bean
  public FanoutExchange fanoutExchange() {
    return new FanoutExchange(LIND_FANOUT_EXCHANGE);
  }
}

//-----------------

@Component
public class Publisher {
  @Autowired
  private RabbitTemplate rabbitTemplate;

  public void publish(String message) {
    try {
      rabbitTemplate
          .convertAndSend(AmqpConfig.LIND_EXCHANGE, AmqpConfig.LIND_DELAY_QUEUE,
              message);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

//-----------------

@Component
@Slf4j
public class Subscriber {
  @RabbitListener(queues = AmqpConfig.LIND_QUEUE)
  public void customerSign(String data) {
    try {

      log.info("從隊(duì)列拿到數(shù)據(jù) :{}", data);

    } catch (Exception ex) {
          e.printStackTrace();
    }
  }
}
向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI