溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

SpringBoot2 高級(jí)應(yīng)用(01):整合 RocketMQ ,實(shí)現(xiàn)請(qǐng)求異步處理

發(fā)布時(shí)間:2020-08-11 01:58:15 來源:ITPUB博客 閱讀:179 作者:知了一笑 欄目:編程語言

本文源碼: GitHub·點(diǎn)這里 || GitEE·點(diǎn)這里

一、RocketMQ

1、架構(gòu)圖片

SpringBoot2 高級(jí)應(yīng)用(01):整合 RocketMQ ,實(shí)現(xiàn)請(qǐng)求異步處理

2、角色分類

(1)、Broker

RocketMQ 的核心,接收 Producer 發(fā)過來的消息、處理 Consumer 的消費(fèi)消息請(qǐng)求、消息的持 久化存儲(chǔ)、服務(wù)端過濾功能等 。

(2)、NameServer

消息隊(duì)列中的狀態(tài)服務(wù)器,集群的各個(gè)組件通過它來了解全局的信息 。類似微服務(wù)中注冊(cè)中心的服務(wù)注冊(cè),發(fā)現(xiàn),下線,上線的概念。

熱備份:
NamServer可以部署多個(gè),相互之間獨(dú)立,其他角色同時(shí)向多個(gè)NameServer 機(jī)器上報(bào)狀態(tài)信息。

心跳機(jī)制:
NameServer 中的 Broker、 Topic等狀態(tài)信息不會(huì)持久存儲(chǔ),都是由各個(gè)角色定時(shí)上報(bào)并存儲(chǔ)到內(nèi)存中,超時(shí)不上報(bào)的話, NameServer會(huì)認(rèn)為某個(gè)機(jī)器出故障不可用。

(3)、Producer

消息的生成者,最常用的producer類就是DefaultMQProducer。

(4)、Consumer

消息的消費(fèi)者,常用Consumer類
DefaultMQPushConsumer
收到消息后自動(dòng)調(diào)用傳入的處理方法來處理,實(shí)時(shí)性高
DefaultMQPullConsumer
用戶自主控制 ,靈活性更高。

3、通信機(jī)制

(1)、Broker啟動(dòng)后需要完成一次將自己注冊(cè)至NameServer的操作;隨后每隔30s時(shí)間定時(shí)向NameServer更新Topic路由信息。

(2)、Producer發(fā)送消息時(shí)候,需要根據(jù)消息的Topic從本地緩存的獲取路由信息。如果沒有則更新路由信息會(huì)從NameServer重新拉取,同時(shí)Producer會(huì)默認(rèn)每隔30s向NameServer拉取一次路由信息。

(3)、Consumer消費(fèi)消息時(shí)候,從NameServer獲取的路由信息,并再完成客戶端的負(fù)載均衡后,監(jiān)聽指定消息隊(duì)列獲取消息并進(jìn)行消費(fèi)。

二、代碼實(shí)現(xiàn)案例

1、項(xiàng)目結(jié)構(gòu)圖

SpringBoot2 高級(jí)應(yīng)用(01):整合 RocketMQ ,實(shí)現(xiàn)請(qǐng)求異步處理

版本描述

<spring-boot.version>2.1.3.RELEASE</spring-boot.version>
<rocketmq.version>4.3.0</rocketmq.version>

2、配置文件

rocketmq:
  # 生產(chǎn)者配置
  producer:
    isOnOff: on
    # 發(fā)送同一類消息的設(shè)置為同一個(gè)group,保證唯一
    groupName: CicadaGroup
    # 服務(wù)地址
    namesrvAddr: 127.0.0.1:9876
    # 消息最大長(zhǎng)度 默認(rèn)1024*4(4M)
    maxMessageSize: 4096
    # 發(fā)送消息超時(shí)時(shí)間,默認(rèn)3000
    sendMsgTimeout: 3000
    # 發(fā)送消息失敗重試次數(shù),默認(rèn)2
    retryTimesWhenSendFailed: 2
  # 消費(fèi)者配置
  consumer:
    isOnOff: on
    # 官方建議:確保同一組中的每個(gè)消費(fèi)者訂閱相同的主題。
    groupName: CicadaGroup
    # 服務(wù)地址
    namesrvAddr: 127.0.0.1:9876
    # 接收該 Topic 下所有 Tag
    topics: CicadaTopic~*;
    consumeThreadMin: 20
    consumeThreadMax: 64
    # 設(shè)置一次消費(fèi)消息的條數(shù),默認(rèn)為1條
    consumeMessageBatchMaxSize: 1
# 配置 Group  Topic  Tag
rocket:
  group: rocketGroup
  topic: rocketTopic
  tag: rocketTag

3、生產(chǎn)者配置

/**
 * RocketMQ 生產(chǎn)者配置
 */
@Configuration
public class ProducerConfig {
    private static final Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ;
    @Value("${rocketmq.producer.groupName}")
    private String groupName;
    @Value("${rocketmq.producer.namesrvAddr}")
    private String namesrvAddr;
    @Value("${rocketmq.producer.maxMessageSize}")
    private Integer maxMessageSize ;
    @Value("${rocketmq.producer.sendMsgTimeout}")
    private Integer sendMsgTimeout;
    @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
    private Integer retryTimesWhenSendFailed;
    @Bean
    public DefaultMQProducer getRocketMQProducer() {
        DefaultMQProducer producer;
        producer = new DefaultMQProducer(this.groupName);
        producer.setNamesrvAddr(this.namesrvAddr);
        //如果需要同一個(gè)jvm中不同的producer往不同的mq集群,發(fā)送消息,需要設(shè)置不同的instanceName
        if(this.maxMessageSize!=null){
            producer.setMaxMessageSize(this.maxMessageSize);
        }
        if(this.sendMsgTimeout!=null){
            producer.setSendMsgTimeout(this.sendMsgTimeout);
        }
        //如果發(fā)送消息失敗,設(shè)置重試次數(shù),默認(rèn)為2次
        if(this.retryTimesWhenSendFailed!=null){
            producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
        }
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        return producer;
    }
}

4、消費(fèi)者配置

/**
 * RocketMQ 消費(fèi)者配置
 */
@Configuration
public class ConsumerConfig {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerConfig.class) ;
    @Value("${rocketmq.consumer.namesrvAddr}")
    private String namesrvAddr;
    @Value("${rocketmq.consumer.groupName}")
    private String groupName;
    @Value("${rocketmq.consumer.consumeThreadMin}")
    private int consumeThreadMin;
    @Value("${rocketmq.consumer.consumeThreadMax}")
    private int consumeThreadMax;
    @Value("${rocketmq.consumer.topics}")
    private String topics;
    @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
    private int consumeMessageBatchMaxSize;
    @Resource
    private RocketMsgListener msgListener;
    @Bean
    public DefaultMQPushConsumer getRocketMQConsumer(){
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.registerMessageListener(msgListener);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
        try {
            String[] topicTagsArr = topics.split(";");
            for (String topicTags : topicTagsArr) {
                String[] topicTag = topicTags.split("~");
                consumer.subscribe(topicTag[0],topicTag[1]);
            }
            consumer.start();
        }catch (MQClientException e){
            e.printStackTrace();
        }
        return consumer;
    }
}

5、消息監(jiān)聽配置

/**
 * 消息消費(fèi)監(jiān)聽
 */
@Component
public class RocketMsgListener implements MessageListenerConcurrently {
    private static final Logger LOG = LoggerFactory.getLogger(RocketMsgListener.class) ;
    @Resource
    private ParamConfigService paramConfigService ;
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
        if (CollectionUtils.isEmpty(list)){
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        MessageExt messageExt = list.get(0);
        LOG.info("接受到的消息為:"+new String(messageExt.getBody()));
        int reConsume = messageExt.getReconsumeTimes();
        // 消息已經(jīng)重試了3次,如果不需要再次消費(fèi),則返回成功
        if(reConsume ==3){
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        if(messageExt.getTopic().equals(paramConfigService.rocketTopic)){
            String tags = messageExt.getTags() ;
            switch (tags){
                case "rocketTag":
                    LOG.info("OpenAccount tag == >>"+tags);
                    break ;
                default:
                    LOG.info("未匹配到Tag == >>"+tags);
                    break;
            }
        }
        // 消息消費(fèi)成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

6、配置參數(shù)綁定

@Service
public class ParamConfigService {
    @Value("${rocket.group}")
    public String rocketGroup ;
    @Value("${rocket.topic}")
    public String rocketTopic ;
    @Value("${rocket.tag}")
    public String rocketTag ;
}

7、消息發(fā)送測(cè)試

@Service
public class RocketMqServiceImpl implements RocketMqService {
    @Resource
    private DefaultMQProducer defaultMQProducer;
    @Resource
    private ParamConfigService paramConfigService ;
    @Override
    public SendResult openAccountMsg(String msgInfo) {
        // 可以不使用Config中的Group
        defaultMQProducer.setProducerGroup(paramConfigService.rocketGroup);
        SendResult sendResult = null;
        try {
            Message sendMsg = new Message(paramConfigService.rocketTopic,
                                          paramConfigService.rocketTag,
                                         "open_account_key", msgInfo.getBytes());
            sendResult = defaultMQProducer.send(sendMsg);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return sendResult ;
    }
}

三、項(xiàng)目源碼

GitHub·地址
https://github.com/cicadasmile/middle-ware-parent
GitEE·地址
https://gitee.com/cicadasmile/middle-ware-parent
向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI