溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

SpringBoot中怎么整合RocketMQ

發(fā)布時間:2021-06-15 11:06:16 來源:億速云 閱讀:253 作者:Leah 欄目:web開發(fā)

這篇文章給大家介紹SpringBoot中怎么整合RocketMQ,內(nèi)容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

RocketMQ機構(gòu)及概念

SpringBoot中怎么整合RocketMQ

1 消息模型(Message Model)

RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責生產(chǎn)消息,Consumer  負責消費消息,Broker 負責存儲消息。Broker 在實際部署過程中對應一臺服務器,每個 Broker  可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲于不同的 Broker。Message Queue  用于存儲消息的物理地址,每個Topic中的消息地址存儲于多個 Message Queue 中。ConsumerGroup 由多個Consumer  實例構(gòu)成。

2 消息生產(chǎn)者(Producer)

負責生產(chǎn)消息,一般由業(yè)務系統(tǒng)負責生產(chǎn)消息。一個消息生產(chǎn)者會把業(yè)務應用系統(tǒng)里產(chǎn)生的消息發(fā)送到broker服務器。RocketMQ提供多種發(fā)送方式,同步發(fā)送、異步發(fā)送、順序發(fā)送、單向發(fā)送。同步和異步方式均需要Broker返回確認信息,單向發(fā)送不需要。

3 消息消費者(Consumer)

負責消費消息,一般是后臺系統(tǒng)負責異步消費。一個消息消費者會從Broker服務器拉取消息、并將其提供給應用程序。從用戶應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。

4 主題(Topic)

表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬于一個主題,是RocketMQ進行消息訂閱的基本單位。

5 代理服務器(Broker Server)

消息中轉(zhuǎn)角色,負責存儲消息、轉(zhuǎn)發(fā)消息。代理服務器在RocketMQ系統(tǒng)中負責接收從生產(chǎn)者發(fā)送來的消息并存儲、同時為消費者的拉取請求作準備。代理服務器也存儲消息相關(guān)的元數(shù)據(jù),包括消費者組、消費進度偏移和主題和隊列消息等。如下圖:

SpringBoot中怎么整合RocketMQ

6 名字服務(Name Server)

名稱服務充當路由消息的提供者。生產(chǎn)者或消費者能夠通過名字服務查找各主題相應的Broker  IP列表。多個Namesrv實例組成集群,但相互獨立,沒有信息交換。

7 拉取式消費(Pull Consumer)

Consumer消費的一種類型,應用通常主動調(diào)用Consumer的拉消息方法從Broker服務器拉消息、主動權(quán)由應用控制。一旦獲取了批量消息,應用就會啟動消費過程。

8 推動式消費(Push Consumer)

Consumer消費的一種類型,該模式下Broker收到數(shù)據(jù)后會主動推送給消費端,該消費模式一般實時性較高。

9 生產(chǎn)者組(Producer Group)

同一類Producer的集合,這類Producer發(fā)送同一類消息且發(fā)送邏輯一致。如果發(fā)送的是事務消息且原始生產(chǎn)者在發(fā)送之后崩潰,則Broker服務器會聯(lián)系同一生產(chǎn)者組的其他生產(chǎn)者實例以提交或回溯消費。

10 消費者組(Consumer Group)

同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現(xiàn)負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者實例必須訂閱完全相同的Topic。RocketMQ  支持兩種消息模式:集群消費(Clustering)和廣播消費(Broadcasting)。

11 集群消費(Clustering)

集群消費模式下,相同Consumer Group的每個Consumer實例平均分攤消息。

12 廣播消費(Broadcasting)

廣播消費模式下,相同Consumer Group的每個Consumer實例都接收全量的消息。

13 普通順序消息(Normal Ordered Message)

普通順序消費模式下,消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。

14 嚴格順序消息(Strictly Ordered Message)

嚴格順序消息模式下,消費者收到的所有消息均是有順序的。

15 消息(Message)

消息系統(tǒng)所傳輸信息的物理載體,生產(chǎn)和消費數(shù)據(jù)的最小單位,每條消息必須屬于一個主題。RocketMQ中每個消息擁有唯一的Message  ID,且可以攜帶具有業(yè)務標識的Key。系統(tǒng)提供了通過Message ID和Key查詢消息的功能。

16 標簽(Tag)

為消息設置的標志,用于同一主題下區(qū)分不同類型的消息。來自同一業(yè)務單元的消息,可以根據(jù)不同業(yè)務目的在同一主題下設置不同標簽。標簽能夠有效地保持代碼的清晰度和連貫性,并優(yōu)化RocketMQ提供的查詢系統(tǒng)。消費者可以根據(jù)Tag實現(xiàn)對不同子主題的不同消費邏輯,實現(xiàn)更好的擴展性。

ActiveMQ,Kafka,RocketMQ對比:

SpringBoot中怎么整合RocketMQ

RocketMQ服務

1 下載RocketMQ

SpringBoot中怎么整合RocketMQ

2 配置環(huán)境變量

SpringBoot中怎么整合RocketMQ

3 啟動Name Server

SpringBoot中怎么整合RocketMQ

4 啟動 Broker

SpringBoot中怎么整合RocketMQ

5 通過命令行發(fā)送 & 接收消息

設置環(huán)境變量:

C:\Users\MSI-NB>set NAMESRV_ADDR=localhost:9876

發(fā)送消息:

C:\Users\MSI-NB>tools org.apache.rocketmq.example.quickstart.Producer

SpringBoot中怎么整合RocketMQ

接收消息:

SpringBoot中怎么整合RocketMQ

SpringBoot整合RocketMQ入門

依賴:

<dependency>   <groupId>org.apache.rocketmq</groupId>   <artifactId>rocketmq-spring-boot-starter</artifactId>   <version>2.2.0</version> </dependency>

配置文件:

rocketmq:   nameServer: localhost:9876   producer:     group: demo-mq

生產(chǎn)者:

@Service public class ProducerService {          @Resource     private RocketMQTemplate rocketMQTemplate ;          public void send(String message) {         rocketMQTemplate.convertAndSend("test-topic", message);     }      }

消費者:

@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group") @Component public class ConsumerListener implements RocketMQListener<String> {      @Override     public void onMessage(String message) {         System.out.println("接收到消息:" + message) ;     }  }

這里的topic要和發(fā)送端設置的一致,consumerGroup可隨意。

發(fā)送接口:

@RestController @RequestMapping("/messages") public class MessageController {          @Resource     private ProducerService ps ;          @GetMapping("")     public Object send(String message) {         ps.send(message) ;         return "send success" ;     }      }

測試:

SpringBoot中怎么整合RocketMQ

SpringBoot中怎么整合RocketMQ

發(fā)送消息指定tags

發(fā)送時:

rocketMQTemplate.convertAndSend("test-topic:tag1", message);

接收時:

@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1")

selectorExpression:默認是 “*” ,這里指定與發(fā)送的一致;

這里看下源碼:

RocketMQUtil.java

SpringBoot中怎么整合RocketMQ

這里topic與tags是用冒號 ":" 分割的,tags就是取的數(shù)組的第二個。

關(guān)于SpringBoot中怎么整合RocketMQ就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI