溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Java搭建RabbitMq消息中間件過程是怎么樣的

發(fā)布時(shí)間:2021-10-15 17:51:07 來源:億速云 閱讀:124 作者:柒染 欄目:編程語言

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)碛嘘P(guān)Java搭建RabbitMq消息中間件過程是怎么樣的,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

前言

當(dāng)系統(tǒng)中出現(xiàn)“生產(chǎn)“和“消費(fèi)“的速度或穩(wěn)定性等因素不一致的時(shí)候,就需要消息隊(duì)列。

名詞

exchange: 交換機(jī)  routingkey: 路由key  queue:隊(duì)列

控制臺(tái)端口:15672

  exchange和queue是需要綁定在一起的,然后消息發(fā)送到exchange再由exchange通過routingkey發(fā)送到對應(yīng)的隊(duì)列中。

使用場景

1.技能訂單3分鐘自動(dòng)取消,改變狀態(tài)

2.直播開始前15分鐘提醒

3.直播狀態(tài)自動(dòng)結(jié)束

流程

  生產(chǎn)者發(fā)送消息 —> order_pre_exchange交換機(jī) —> order_per_ttl_delay_queue隊(duì)列

  —> 時(shí)間到期 —> order_delay_exchange交換機(jī) —> order_delay_process_queue隊(duì)列 —> 消費(fèi)者

第一步:在pom文件中添加

<dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-amqp</artifactId></dependency>

第二步:在application.properties文件中添加

spring.rabbitmq.host=172.xx.xx.xxxspring.rabbitmq.port=5672spring.rabbitmq.username=rabbitspring.rabbitmq.password=123456spring.rabbitmq.virtual-host=/spring.rabbitmq.connection-timeout=15000spring.rabbitmq.publisher-confirms=truespring.rabbitmq.publisher-returns=truespring.rabbitmq.template.mandatory=true

第三步:配置 OrderQueueConfig

package com.tuohang.platform.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.QueueBuilder;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * rabbitMQ的隊(duì)列設(shè)置(生產(chǎn)者發(fā)送的消息,永遠(yuǎn)是先進(jìn)入exchange,再通過路由,轉(zhuǎn)發(fā)到隊(duì)列) *  *  * @author Administrator * @version 1.0 * @Date 2018年9月18日 */@Configurationpublic class OrderQueueConfig {  /**   * 訂單緩沖交換機(jī)名稱   */  public final static String ORDER_PRE_EXCHANGE_NAME = "order_pre_exchange";  /**   * 發(fā)送到該隊(duì)列的message會(huì)在一段時(shí)間后過期進(jìn)入到order_delay_process_queue 【隊(duì)列里所有的message都有統(tǒng)一的失效時(shí)間】   */  public final static String ORDER_PRE_TTL_DELAY_QUEUE_NAME = "order_pre_ttl_delay_queue";  /**   * 訂單的交換機(jī)DLX 名字   */  final static String ORDER_DELAY_EXCHANGE_NAME = "order_delay_exchange";  /**   * 訂單message時(shí)間過期后進(jìn)入的隊(duì)列,也就是訂單實(shí)際的消費(fèi)隊(duì)列   */  public final static String ORDER_DELAY_PROCESS_QUEUE_NAME = "order_delay_process_queue";  /**   * 訂單在緩沖隊(duì)列過期時(shí)間(毫秒)30分鐘   */  public final static int ORDER_QUEUE_EXPIRATION = 1800000;  /**   * 訂單緩沖交換機(jī)   *    * @return   */  @Bean  public DirectExchange preOrderExange() {    return new DirectExchange(ORDER_PRE_EXCHANGE_NAME);  }  /**   * 創(chuàng)建order_per_ttl_delay_queue隊(duì)列,訂單消息經(jīng)過緩沖交換機(jī),會(huì)進(jìn)入該隊(duì)列   *    * @return   */  @Bean  public Queue delayQueuePerOrderTTLQueue() {    return QueueBuilder.durable(ORDER_PRE_TTL_DELAY_QUEUE_NAME)        .withArgument("x-dead-letter-exchange", ORDER_DELAY_EXCHANGE_NAME) // DLX        .withArgument("x-dead-letter-routing-key", ORDER_DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key        .withArgument("x-message-ttl", ORDER_QUEUE_EXPIRATION) // 設(shè)置訂單隊(duì)列的過期時(shí)間        .build();  }  /**   * 將order_pre_exchange綁定到order_pre_ttl_delay_queue隊(duì)列   *   * @param delayQueuePerOrderTTLQueue   * @param preOrderExange   * @return   */  @Bean  public Binding queueOrderTTLBinding(Queue delayQueuePerOrderTTLQueue, DirectExchange preOrderExange) {    return BindingBuilder.bind(delayQueuePerOrderTTLQueue).to(preOrderExange).with(ORDER_PRE_TTL_DELAY_QUEUE_NAME);  }  /**   * 創(chuàng)建訂單的DLX exchange   *   * @return   */  @Bean  public DirectExchange delayOrderExchange() {    return new DirectExchange(ORDER_DELAY_EXCHANGE_NAME);  }  /**   * 創(chuàng)建order_delay_process_queue隊(duì)列,也就是訂單實(shí)際消費(fèi)隊(duì)列   *   * @return   */  @Bean  public Queue delayProcessOrderQueue() {    return QueueBuilder.durable(ORDER_DELAY_PROCESS_QUEUE_NAME).build();  }  /**   * 將DLX綁定到實(shí)際消費(fèi)隊(duì)列   *   * @param delayProcessOrderQueue   * @param delayExchange   * @return   */  @Bean  public Binding dlxOrderBinding(Queue delayProcessOrderQueue, DirectExchange delayOrderExchange) {    return BindingBuilder.bind(delayProcessOrderQueue).to(delayOrderExchange).with(ORDER_DELAY_PROCESS_QUEUE_NAME);  }  /**   * 監(jiān)聽訂單實(shí)際消費(fèi)者隊(duì)列order_delay_process_queue   *    * @param connectionFactory   * @param processReceiver   * @return   */  @Bean  public SimpleMessageListenerContainer orderProcessContainer(ConnectionFactory connectionFactory,      OrderProcessReceiver processReceiver) {    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();    container.setConnectionFactory(connectionFactory);    container.setQueueNames(ORDER_DELAY_PROCESS_QUEUE_NAME); // 監(jiān)聽order_delay_process_queue    container.setMessageListener(new MessageListenerAdapter(processReceiver));    return container;  }}

消費(fèi)者 OrderProcessReceiver :

package com.tuohang.platform.config;import java.util.Objects;import org.apache.tools.ant.types.resources.selectors.Date;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;/** * 訂單延遲處理消費(fèi)者 *  *  * @author Administrator * @version 1.0 * @Date 2018年9月18日 */@Componentpublic class OrderProcessReceiver implements ChannelAwareMessageListener {  private static Logger logger = LoggerFactory.getLogger(OrderProcessReceiver.class);  String msg = "The failed message will auto retry after a certain delay";  @Override  public void onMessage(Message message, Channel channel) throws Exception {    try {      processMessage(message);    } catch (Exception e) {      // 如果發(fā)生了異常,則將該消息重定向到緩沖隊(duì)列,會(huì)在一定延遲之后自動(dòng)重做      channel.basicPublish(OrderQueueConfig.ORDER_PRE_EXCHANGE_NAME, OrderQueueConfig.ORDER_PRE_TTL_DELAY_QUEUE_NAME, null,          msg.getBytes());    }  }    /**   * 處理訂單消息,如果訂單未支付,取消訂單(如果當(dāng)消息內(nèi)容為FAIL_MESSAGE的話,則需要拋出異常)   *   * @param message   * @throws Exception   */  public void processMessage(Message message) throws Exception {    String realMessage = new String(message.getBody());    logger.info("Received <" + realMessage + ">");    // 取消訂單    if(!Objects.equals(realMessage, msg)) {//      SpringKit.getBean(ITestService.class).resetSexById(Long.valueOf(realMessage));      System.out.println("測試111111-----------"+new Date());      System.out.println(message);    }  }}

或者

/** * 測試 rabbit 消費(fèi)者 *  *  * @author Administrator * @version 1.0 * @Date 2018年9月25日 */@Component@RabbitListener(queues = TestQueueConfig.TEST_DELAY_PROCESS_QUEUE_NAME)public class TestProcessReceiver {  private static Logger logger = LoggerFactory.getLogger(TestProcessReceiver.class);  String msg = "The failed message will auto retry after a certain delay";  @RabbitHandler  public void onMessage(Message message, Channel channel) throws Exception {    try {      processMessage(message);      //告訴服務(wù)器收到這條消息 已經(jīng)被我消費(fèi)了 可以在隊(duì)列刪掉;否則消息服務(wù)器以為這條消息沒處理掉 后續(xù)還會(huì)在發(fā)      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);    } catch (Exception e) {      // 如果發(fā)生了異常,則將該消息重定向到緩沖隊(duì)列,會(huì)在一定延遲之后自動(dòng)重做      channel.basicPublish(TestQueueConfig.TEST_PRE_EXCHANGE_NAME, TestQueueConfig.TEST_PRE_TTL_DELAY_QUEUE_NAME, null,          msg.getBytes());    }  }    /**   * 處理訂單消息,如果訂單未支付,取消訂單(如果當(dāng)消息內(nèi)容為FAIL_MESSAGE的話,則需要拋出異常)   *   * @param message   * @throws Exception   */  public void processMessage(Message message) throws Exception {    String realMessage = new String(message.getBody());    logger.info("Received < " + realMessage + " >");    // 取消訂單    if(!Objects.equals(realMessage, msg)) {      System.out.println("測試111111-----------"+new Date());    }else {      System.out.println("rabbit else...");    }  }}

生產(chǎn)者

/**   * 測試rabbitmq   *    * @return   */  @RequestMapping(value = "/testrab")  public String testraa() {    GenericResult gr = null;    try {      String name = "test_pre_ttl_delay_queue";  long expiration = 10000;//10s 過期時(shí)間      rabbitTemplate.convertAndSend(name,String.valueOf(123456)); // 在單個(gè)消息上設(shè)置過期時(shí)間 //rabbitTemplate.convertAndSend(name,(Object)String.valueOf(123456), new ExpirationMessagePostProcessor(expiration));    } catch (ServiceException e) {      e.printStackTrace();      gr = new GenericResult(StateCode.ERROR, languageMap.get("network_error"), e.getMessage());    }        return getWrite(gr);  }

上述就是小編為大家分享的Java搭建RabbitMq消息中間件過程是怎么樣的了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI