溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

SpringBoot整合RocketMQ的方法是什么

發(fā)布時間:2023-04-10 10:32:19 來源:億速云 閱讀:152 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹了SpringBoot整合RocketMQ的方法是什么的相關(guān)知識,內(nèi)容詳細(xì)易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇SpringBoot整合RocketMQ的方法是什么文章都會有所收獲,下面我們一起來看看吧。

1. SpringBoot整合RocketMQ

在SpringBoot中集成RocketMQ,只需要簡單四步:

1.引入相關(guān)依賴

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

2.添加RocketMQ的相關(guān)配置

rocketmq:
    consumer:
        group: springboot_consumer_group
        # 一次拉取消息最大值,注意是拉取消息的最大值而非消費(fèi)最大值
        pull-batch-size: 10
    name-server: 10.5.103.6:9876
    producer:
        # 發(fā)送同一類消息的設(shè)置為同一個group,保證唯一
        group: springboot_producer_group
        # 發(fā)送消息超時時間,默認(rèn)3000
        sendMessageTimeout: 10000
        # 發(fā)送消息失敗重試次數(shù),默認(rèn)2
        retryTimesWhenSendFailed: 2
        # 異步消息重試此處,默認(rèn)2
        retryTimesWhenSendAsyncFailed: 2
        # 消息最大長度,默認(rèn)1024 * 1024 * 4(默認(rèn)4M)
        maxMessageSize: 4096
        # 壓縮消息閾值,默認(rèn)4k(1024 * 4)
        compressMessageBodyThreshold: 4096
        # 是否在內(nèi)部發(fā)送失敗時重試另一個broker,默認(rèn)false
        retryNextServer: false

3.使用提供的模板工具類RocketMQTemplate發(fā)送消息

@RestController
public class NormalProduceController {
  @Setter(onMethod_ = @Autowired)
  private RocketMQTemplate rocketmqTemplate;
  
  @GetMapping("/test")
  public SendResult test() {
    Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build();
    SendResult sendResult = rocketmqTemplate.send(topic, msg);
  }
}

4.實現(xiàn)RocketMQListener接口消費(fèi)消息

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "your_topic_name", consumerGroup = "your_consumer_group_name")
public class MyConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        // 處理消息的邏輯
        System.out.println("Received message: " + message);
    }
}

以上4步即可實現(xiàn)SpringBoot與RocketMQ的整合,這部分屬于基礎(chǔ)知識,不做過多說明。

2 使用RocketMQ會遇到的問題

以下是一些在SpringBoot中使用RocketMQ時常遇到的問題,現(xiàn)在為您逐一解決。

2.1 WARN No appenders could be found for logger

啟動項目時會在日志中看到如下告警

RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.

此時我們只需要在啟動類中設(shè)置環(huán)境變量 rocketmq.client.logUseSlf4j 為 true 明確指定RocketMQ的日志框架

@SpringBootApplication
public class RocketDemoApplication {

    public static void main(String[] args) {
        /*
         * 指定使用的日志框架,否則將會告警
         * RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
         * RocketMQLog:WARN Please initialize the logger system properly.
         */
        System.setProperty("rocketmq.client.logUseSlf4j", "true");
      
        SpringApplication.run(RocketDemoApplication.class, args);
    }
}

同時還得在配置文件中調(diào)整日志級別,不然在控制臺會一直看到broker的日志信息

logging:
    level:
      RocketmqClient: ERROR
    io:
        netty: ERROR

2.2 不支持LocalDate 和 LocalDateTime

在使用Java8后經(jīng)常會使用LocalDate/LocalDateTime這兩個時間類型字段,然而RocketMQ原始配置并不支持Java時間類型,當(dāng)我們發(fā)送的實體消息中包含上述兩個字段時,消費(fèi)端在消費(fèi)時會出現(xiàn)如下所示的錯誤。

比如生產(chǎn)者的代碼如下:

@GetMapping("/test")
public void test(){
  //普通消息無返回值,只負(fù)責(zé)發(fā)送消息?不等待服務(wù)器回應(yīng)且沒有回調(diào)函數(shù)觸發(fā)。
  RocketMessage rocketMessage = RocketMessage.builder().
    id(1111L).
    message("hello,world")
    .localDate(LocalDate.now())
    .localDateTime(LocalDateTime.now())
    .build();
  rocketmqTemplate.convertAndSend(destination,rocketMessage);
}

消費(fèi)者的代碼如下:

@Component
@RocketMQMessageListener(consumerGroup = "springboot_consumer_group",topic = "consumer_topic")
public class RocketMQConsumer implements RocketMQListener<RocketMessage> {
    @Override
    public void onMessage(RocketMessage message) {
        System.out.println("消費(fèi)消息-" + message);
    }
}

消費(fèi)者開始消費(fèi)時會出現(xiàn)類型轉(zhuǎn)換異常錯誤Cannot construct instance of java.time.LocalDate,錯誤詳情如下:

SpringBoot整合RocketMQ的方法是什么

原因:RocketMQ內(nèi)置使用的轉(zhuǎn)換器是RocketMQMessageConverter,轉(zhuǎn)換Json時使用的是MappingJackson2MessageConverter,但是這個轉(zhuǎn)換器不支持時間類型。

解決辦法:需要自定義消息轉(zhuǎn)換器,將MappingJackson2MessageConverter進(jìn)行替換,并添加支持時間模塊

@Configuration
public class RocketMQEnhanceConfig {

    /**
     * 解決RocketMQ Jackson不支持Java時間類型配置
     * 源碼參考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration}
     */
    @Bean
    @Primary
    public RocketMQMessageConverter enhanceRocketMQMessageConverter(){
        RocketMQMessageConverter converter = new RocketMQMessageConverter();
        CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
        List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
        for (MessageConverter messageConverter : messageConverterList) {
            if(messageConverter instanceof MappingJackson2MessageConverter){
                MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
                ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
                objectMapper.registerModules(new JavaTimeModule());
            }
        }
        return converter;
    }
}

2.3 RockeMQ環(huán)境隔離

在使用RocketMQ時,通常會在代碼中直接指定消息主題(topic),而且開發(fā)環(huán)境和測試環(huán)境可能共用一個RocketMQ環(huán)境。如果沒有進(jìn)行處理,在開發(fā)環(huán)境發(fā)送的消息就可能被測試環(huán)境的消費(fèi)者消費(fèi),測試環(huán)境發(fā)送的消息也可能被開發(fā)環(huán)境的消費(fèi)者消費(fèi),從而導(dǎo)致數(shù)據(jù)混亂的問題。

為了解決這個問題,我們可以根據(jù)不同的環(huán)境實現(xiàn)自動隔離。通過簡單配置一個選項,如dev、test、prod等不同環(huán)境,所有的消息都會被自動隔離。例如,當(dāng)發(fā)送的消息主題為consumer_topic時,可以自動在topic后面加上環(huán)境后綴,如consumer_topic_dev。

那么,我們該如何實現(xiàn)呢?

可以編寫一個配置類實現(xiàn)BeanPostProcessor,并重寫postProcessBeforeInitialization方法,在監(jiān)聽器實例初始化前修改對應(yīng)的topic。

  • BeanPostProcessor是Spring框架中的一個接口,它的作用是在Spring容器實例化、配置完bean之后,在bean初始化前后進(jìn)行一些額外的處理工作。

  • 具體來說,BeanPostProcessor接口定義了兩個方法:

    • postProcessBeforeInitialization(Object bean, String beanName): 在bean初始化之前進(jìn)行處理,可以對bean做一些修改等操作。

    • postProcessAfterInitialization(Object bean, String beanName): 在bean初始化之后進(jìn)行處理,可以進(jìn)行一些清理或者其他操作。

  • BeanPostProcessor可以在應(yīng)用程序中對Bean的創(chuàng)建和初始化過程進(jìn)行攔截和修改,對Bean的生命周期進(jìn)行干預(yù)和操作。它可以對所有的Bean類實例進(jìn)行增強(qiáng)處理,使得開發(fā)人員可以在Bean初始化前后自定義一些操作,從而實現(xiàn)自己的業(yè)務(wù)需求。比如,可以通過BeanPostProcessor來實現(xiàn)注入某些必要的屬性值、加入某一個對象等等。

實現(xiàn)方案如下:

1.在配置文件中增加相關(guān)配置

rocketmq:
	enhance:
  	# 啟動隔離,用于激活配置類EnvironmentIsolationConfig
  	# 啟動后會自動在topic上拼接激活的配置文件,達(dá)到自動隔離的效果
  	enabledIsolation: true
  	# 隔離環(huán)境名稱,拼接到topic后,topic_dev,默認(rèn)空字符串
  	environment: dev

2.新增配置類,在實例化消息監(jiān)聽者之前把topic修改掉

@Configuration
public class EnvironmentIsolationConfig implements BeanPostProcessor {
  	@Value("${rocketmq.enhance.enabledIsolation:true}")
    private boolean enabledIsolation;
    @Value("${rocketmq.enhance.environment:''}")
    private String environmentName;
  
    /**
     * 在裝載Bean之前實現(xiàn)參數(shù)修改
     */
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        if(bean instanceof DefaultRocketMQListenerContainer){

            DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
					  //拼接Topic
            if(enabledIsolation && StringUtils.hasText(environmentName)){
                container.setTopic(String.join("_", container.getTopic(),environmentName));
            }
            return container;
        }
        return bean;
    }
}

啟動項目可以看到日志中消息監(jiān)聽的隊列已經(jīng)被修改了

2023-03-23 17:04:59.726 [main] INFO  o.a.r.s.support.DefaultRocketMQListenerContainer:290 - running container: DefaultRocketMQListenerContainer{consumerGroup='springboot_consumer_group', nameServer='10.5.103.6:9876', topic='consumer_topic_dev', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING}

3. RocketMQ二次封裝

在解釋為什么要二次封裝之前先來看看RocketMQ官方文檔中推薦的最佳實踐

  • 消息發(fā)送成功或者失敗要打印消息日志,用于業(yè)務(wù)排查問題。

  • 如果消息量較少,建議在消費(fèi)入口方法打印消息,消費(fèi)耗時等,方便后續(xù)排查問題。

  • RocketMQ 無法避免消息重復(fù)(Exactly-Once),所以如果業(yè)務(wù)對消費(fèi)重復(fù)非常敏感,務(wù)必要在業(yè)務(wù)層面進(jìn)行去重處理。可以借助關(guān)系數(shù)據(jù)庫進(jìn)行去重。首先需要確定消息的唯一鍵,可以是msgId,也可以是消息內(nèi)容中的唯一標(biāo)識字段,例如訂單Id等。

上面三個步驟基本每次發(fā)送消息或者消費(fèi)消息都要實現(xiàn),屬于重復(fù)動作。

接下來討論的是在RocketMQ中發(fā)送消息時選擇何種消息類型最為合適。

在RocketMQ中有四種可選格式:

  • 發(fā)送Json對象

  • 發(fā)送轉(zhuǎn)Json后的String對象

  • 根據(jù)業(yè)務(wù)封裝對應(yīng)實體類

  • 直接使用原生MessageExt接收。

對于如何選擇消息類型,需要考慮到消費(fèi)者在不查看消息發(fā)送者的情況下,如何獲取消息的含義。因此,在這種情況下,使用第三種方式即根據(jù)業(yè)務(wù)封裝對應(yīng)實體類的方式最為合適,也是大多數(shù)開發(fā)者在發(fā)送消息時的常用方式。

有了上面兩點結(jié)論以后我們來看看為什么要對RocketMQ二次封裝。

3.1 為什么要二次封裝

按照上述最佳實踐,一個完整的消息傳遞鏈路從生產(chǎn)到消費(fèi)應(yīng)包括 準(zhǔn)備消息、發(fā)送消息、記錄消息日志、處理發(fā)送失敗、記錄接收消息日志、處理業(yè)務(wù)邏輯、異常處理和異常重試 等步驟。

雖然使用原生RocketMQ可以完成這些動作,但每個生產(chǎn)者和消費(fèi)者都需要編寫大量重復(fù)的代碼來完成相同的任務(wù),這就是需要進(jìn)行二次封裝的原因。我們希望通過二次封裝,**生產(chǎn)者只需準(zhǔn)備好消息實體并調(diào)用封裝后的工具類發(fā)送,而消費(fèi)者只需處理核心業(yè)務(wù)邏輯,其他公共邏輯會得到統(tǒng)一處理。 **

在二次封裝中,關(guān)鍵是找出框架在日常使用中所涵蓋的許多操作,以及區(qū)分哪些操作是可變的,哪些是不變的。以上述例子為例,實際上只有生產(chǎn)者的消息準(zhǔn)備和消費(fèi)者的業(yè)務(wù)處理是可變的操作,需要根據(jù)需求進(jìn)行處理,而其他步驟可以固定下來形成一個模板。

當(dāng)然,本文提到的二次封裝不是指對源代碼進(jìn)行封裝,而是針對工具的原始使用方式進(jìn)行的封裝。可以將其與Mybatis和Mybatis-plus區(qū)分開來。這兩者都能完成任務(wù),只不過Mybatis-plus更為簡單便捷。

3.2 實現(xiàn)二次封裝

實現(xiàn)二次封裝需要創(chuàng)建一個自定義的starter,這樣其他項目只需要依賴此starter即可使用封裝功能。同時,在自定義starter中還需要解決文章第二部分中提到的一些問題。

代碼結(jié)構(gòu)如下所示:

SpringBoot整合RocketMQ的方法是什么

3.2.1 消息實體類的封裝
/**
 * 消息實體,所有消息都需要繼承此類
 * 公眾號:JAVA日知錄
 */
@Data
public abstract class BaseMessage {
    /**
     * 業(yè)務(wù)鍵,用于RocketMQ控制臺查看消費(fèi)情況
     */
    protected String key;
    /**
     * 發(fā)送消息來源,用于排查問題
     */
    protected String source = "";

    /**
     * 發(fā)送時間
     */
    protected LocalDateTime sendTime = LocalDateTime.now();

    /**
     * 重試次數(shù),用于判斷重試次數(shù),超過重試次數(shù)發(fā)送異常警告
     */
    protected Integer retryTimes = 0;
}

后面所有發(fā)送的消息實體都需要繼承此實體類。

3.2.2 消息發(fā)送工具類的封裝
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class RocketMQEnhanceTemplate {
    private final RocketMQTemplate template;

    @Resource
    private RocketEnhanceProperties rocketEnhanceProperties;

    public RocketMQTemplate getTemplate() {
        return template;
    }

    /**
     * 根據(jù)系統(tǒng)上下文自動構(gòu)建隔離后的topic
     * 構(gòu)建目的地
     */
    public String buildDestination(String topic, String tag) {
        topic = reBuildTopic(topic);
        return topic + ":" + tag;
    }

    /**
     * 根據(jù)環(huán)境重新隔離topic
     * @param topic 原始topic
     */
    private String reBuildTopic(String topic) {
        if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
            return topic +"_" + rocketEnhanceProperties.getEnvironment();
        }
        return topic;
    }

    /**
     * 發(fā)送同步消息
     */
    public <T extends BaseMessage> SendResult send(String topic, String tag, T message) {
        // 注意分隔符
        return send(buildDestination(topic,tag), message);
    }


    public <T extends BaseMessage> SendResult send(String destination, T message) {
        // 設(shè)置業(yè)務(wù)鍵,此處根據(jù)公共的參數(shù)進(jìn)行處理
        // 更多的其它基礎(chǔ)業(yè)務(wù)處理...
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage);
        // 此處為了方便查看給日志轉(zhuǎn)了json,根據(jù)選擇選擇日志記錄方式,例如ELK采集
        log.info("[{}]同步消息[{}]發(fā)送結(jié)果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
        return sendResult;
    }

    /**
     * 發(fā)送延遲消息
     */
    public <T extends BaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
        return send(buildDestination(topic,tag), message, delayLevel);
    }

    public <T extends BaseMessage> SendResult send(String destination, T message, int delayLevel) {
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
        log.info("[{}]延遲等級[{}]消息[{}]發(fā)送結(jié)果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
        return sendResult;
    }
}

這里封裝了一個消息發(fā)送類,實現(xiàn)了日志記錄以及自動重建topic的功能(即生產(chǎn)者實現(xiàn)環(huán)境隔離),后面項目中只需要注入RocketMQEnhanceTemplate來實現(xiàn)消息的發(fā)送。

3.2.3 消費(fèi)者的封裝
@Slf4j
public abstract class EnhanceMessageHandler<T extends BaseMessage> {
    /**
     * 默認(rèn)重試次數(shù)
     */
    private static final int MAX_RETRY_TIMES = 3;

    /**
     * 延時等級
     */
    private static final int DELAY_LEVEL = EnhanceMessageConstant.FIVE_SECOND;


    @Resource
    private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;

    /**
     * 消息處理
     *
     * @param message 待處理消息
     * @throws Exception 消費(fèi)異常
     */
    protected abstract void handleMessage(T message) throws Exception;

    /**
     * 超過重試次數(shù)消息,需要啟用isRetry
     *
     * @param message 待處理消息
     */
    protected abstract void handleMaxRetriesExceeded(T message);


    /**
     * 是否需要根據(jù)業(yè)務(wù)規(guī)則過濾消息,去重邏輯可以在此處處理
     * @param message 待處理消息
     * @return true: 本次消息被過濾,false:不過濾
     */
    protected boolean filter(T message) {
        return false;
    }

    /**
     * 是否異常時重復(fù)發(fā)送
     *
     * @return true: 消息重試,false:不重試
     */
    protected abstract boolean isRetry();

    /**
     * 消費(fèi)異常時是否拋出異常
     * 返回true,則由rocketmq機(jī)制自動重試
     * false:消費(fèi)異常(如果沒有開啟重試則消息會被自動ack)
     */
    protected abstract boolean throwException();

    /**
     * 最大重試次數(shù)
     *
     * @return 最大重試次數(shù),默認(rèn)5次
     */
    protected int getMaxRetryTimes() {
        return MAX_RETRY_TIMES;
    }

    /**
     * isRetry開啟時,重新入隊延遲時間
     * @return -1:立即入隊重試
     */
    protected int getDelayLevel() {
        return DELAY_LEVEL;
    }

    /**
     * 使用模板模式構(gòu)建消息消費(fèi)框架,可自由擴(kuò)展或刪減
     */
    public void dispatchMessage(T message) {
        // 基礎(chǔ)日志記錄被父類處理了
        log.info("消費(fèi)者收到消息[{}]", JSONObject.toJSON(message));

        if (filter(message)) {
            log.info("消息id{}不滿足消費(fèi)條件,已過濾。",message.getKey());
            return;
        }
        // 超過最大重試次數(shù)時調(diào)用子類方法處理
        if (message.getRetryTimes() > getMaxRetryTimes()) {
            handleMaxRetriesExceeded(message);
            return;
        }
        try {
            long now = System.currentTimeMillis();
            handleMessage(message);
            long costTime = System.currentTimeMillis() - now;
            log.info("消息{}消費(fèi)成功,耗時[{}ms]", message.getKey(),costTime);
        } catch (Exception e) {
            log.error("消息{}消費(fèi)異常", message.getKey(),e);
            // 是捕獲異常還是拋出,由子類決定
            if (throwException()) {
                //拋出異常,由DefaultMessageListenerConcurrently類處理
                throw new RuntimeException(e);
            }
            //此時如果不開啟重試機(jī)制,則默認(rèn)ACK了
            if (isRetry()) {
                handleRetry(message);
            }
        }
    }

    protected void handleRetry(T message) {
        // 獲取子類RocketMQMessageListener注解拿到topic和tag
        RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
        if (annotation == null) {
            return;
        }
        //重新構(gòu)建消息體
        String messageSource = message.getSource();
        if(!messageSource.startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
            message.setSource(EnhanceMessageConstant.RETRY_PREFIX + messageSource);
        }
        message.setRetryTimes(message.getRetryTimes() + 1);

        SendResult sendResult;

        try {
            // 如果消息發(fā)送不成功,則再次重新發(fā)送,如果發(fā)送異常則拋出由MQ再次處理(異常時不走延遲消息)
            sendResult = rocketMQEnhanceTemplate.send(annotation.topic(), annotation.selectorExpression(), message, getDelayLevel());
        } catch (Exception ex) {
            // 此處捕獲之后,相當(dāng)于此條消息被消息完成然后重新發(fā)送新的消息
            //由生產(chǎn)者直接發(fā)送
            throw new RuntimeException(ex);
        }
        // 發(fā)送失敗的處理就是不進(jìn)行ACK,由RocketMQ重試
        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
            throw new RuntimeException("重試消息發(fā)送失敗");
        }

    }
}

使用模版設(shè)計模式定義了消息消費(fèi)的骨架,實現(xiàn)了日志打印,異常處理,異常重試等公共邏輯,消息過濾(查重)、業(yè)務(wù)處理則交由子類實現(xiàn)。

3.2.4 基礎(chǔ)配置類
@Configuration
@EnableConfigurationProperties(RocketEnhanceProperties.class)
public class RocketMQEnhanceAutoConfiguration {

    /**
     * 注入增強(qiáng)的RocketMQEnhanceTemplate
     */
    @Bean
    public RocketMQEnhanceTemplate rocketMQEnhanceTemplate(RocketMQTemplate rocketMQTemplate){
        return new RocketMQEnhanceTemplate(rocketMQTemplate);
    }

    /**
     * 解決RocketMQ Jackson不支持Java時間類型配置
     * 源碼參考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration}
     */
    @Bean
    @Primary
    public RocketMQMessageConverter enhanceRocketMQMessageConverter(){
        RocketMQMessageConverter converter = new RocketMQMessageConverter();
        CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
        List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
        for (MessageConverter messageConverter : messageConverterList) {
            if(messageConverter instanceof MappingJackson2MessageConverter){
                MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
                ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
                objectMapper.registerModules(new JavaTimeModule());
            }
        }
        return converter;
    }


    /**
     * 環(huán)境隔離配置
     */
    @Bean
    @ConditionalOnProperty(name="rocketmq.enhance.enabledIsolation", havingValue="true")
    public EnvironmentIsolationConfig environmentSetup(RocketEnhanceProperties rocketEnhanceProperties){
        return new EnvironmentIsolationConfig(rocketEnhanceProperties);
    }

}
public class EnvironmentIsolationConfig implements BeanPostProcessor {
    private RocketEnhanceProperties rocketEnhanceProperties;

    public EnvironmentIsolationConfig(RocketEnhanceProperties rocketEnhanceProperties) {
        this.rocketEnhanceProperties = rocketEnhanceProperties;
    }


    /**
     * 在裝載Bean之前實現(xiàn)參數(shù)修改
     */
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        if(bean instanceof DefaultRocketMQListenerContainer){

            DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;

            if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
                container.setTopic(String.join("_", container.getTopic(),rocketEnhanceProperties.getEnvironment()));
            }
            return container;
        }
        return bean;
    }
}
@ConfigurationProperties(prefix = "rocketmq.enhance")
@Data
public class RocketEnhanceProperties {

    private boolean enabledIsolation;

    private String environment;
}

3.3 封裝后的使用

3.3.1 引入依賴
 <dependency>
   <groupId>com.jianzh6</groupId>
   <artifactId>cloud-rocket-starter</artifactId>
</dependency>
3.3.2 自定義配置
rocketmq:
	...
	enhance:
		# 啟動隔離,用于激活配置類EnvironmentIsolationConfig
  	# 啟動后會自動在topic上拼接激活的配置文件,達(dá)到自動隔離的效果
  	enabledIsolation: true
    # 隔離環(huán)境名稱,拼接到topic后,topic_dev,默認(rèn)空字符串
    environment: dev
3.3.3 發(fā)送消息
@RestController
@RequestMapping("enhance")
@Slf4j
public class EnhanceProduceController {

    //注入增強(qiáng)后的模板,可以自動實現(xiàn)環(huán)境隔離,日志記錄
    @Setter(onMethod_ = @Autowired)
    private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;

    private static final String topic = "rocket_enhance";
    private static final String tag = "member";

    /**
     * 發(fā)送實體消息
     */
    @GetMapping("/member")
    public SendResult member() {
        String key = UUID.randomUUID().toString();
        MemberMessage message = new MemberMessage();
        // 設(shè)置業(yè)務(wù)key
        message.setKey(key);
        // 設(shè)置消息來源,便于查詢
        message.setSource("MEMBER");
        // 業(yè)務(wù)消息內(nèi)容
        message.setUserName("Java日知錄");
        message.setBirthday(LocalDate.now());

        return rocketMQEnhanceTemplate.send(topic, tag, message);
    }
}

注意這里使用的是封裝后的模板工具類,一旦在配置文件中啟動環(huán)境隔離,則生產(chǎn)者的消息也自動發(fā)送到隔離后的topic中。

3.3.4 消費(fèi)者
@Slf4j
@Component
@RocketMQMessageListener(
        consumerGroup = "enhance_consumer_group",
        topic = "rocket_enhance",
        selectorExpression = "*",
        consumeThreadMax = 5 //默認(rèn)是64個線程并發(fā)消息,配置 consumeThreadMax 參數(shù)指定并發(fā)消費(fèi)線程數(shù),避免太大導(dǎo)致資源不夠
)
public class EnhanceMemberMessageListener extends EnhanceMessageHandler<MemberMessage> implements RocketMQListener<MemberMessage> {

    @Override
    protected void handleMessage(MemberMessage message) throws Exception {
        // 此時這里才是最終的業(yè)務(wù)處理,代碼只需要處理資源類關(guān)閉異常,其他的可以交給父類重試
        System.out.println("業(yè)務(wù)消息處理:"+message.getUserName());
    }

    @Override
    protected void handleMaxRetriesExceeded(MemberMessage message) {
        // 當(dāng)超過指定重試次數(shù)消息時此處方法會被調(diào)用
        // 生產(chǎn)中可以進(jìn)行回退或其他業(yè)務(wù)操作
        log.error("消息消費(fèi)失敗,請執(zhí)行后續(xù)處理");
    }


    /**
     * 是否執(zhí)行重試機(jī)制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }

    @Override
    protected boolean throwException() {
        // 是否拋出異常,false搭配retry自行處理異常
        return false;
    }
  
    @Override
    protected boolean filter() {
        // 消息過濾
        return false;
    }

    /**
     * 監(jiān)聽消費(fèi)消息,不需要執(zhí)行業(yè)務(wù)處理,委派給父類做基礎(chǔ)操作,父類做完基礎(chǔ)操作后會調(diào)用子類的實際處理類型
     */
    @Override
    public void onMessage(MemberMessage memberMessage) {
        super.dispatchMessage(memberMessage);
    }
}

為了方便消費(fèi)者對RocketMQ中的消息進(jìn)行處理,我們可以使用EnhanceMessageHandler來進(jìn)行消息的處理和邏輯的處理。

消費(fèi)者實現(xiàn)了RocketMQListener的同時,可以繼承EnhanceMessageHandler來進(jìn)行公共邏輯的處理,而核心業(yè)務(wù)邏輯需要自己實現(xiàn)handleMessage方法。 如果需要對消息進(jìn)行過濾或者去重的處理,則可以重寫父類的filter方法進(jìn)行實現(xiàn)。這樣可以更加方便地對消息進(jìn)行處理,減輕開發(fā)者的工作量。

關(guān)于“SpringBoot整合RocketMQ的方法是什么”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對“SpringBoot整合RocketMQ的方法是什么”知識都有一定的了解,大家如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI