溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

SpringBoot怎么整合RocketMQ事務(wù)、廣播以及順序消息

發(fā)布時間:2021-07-09 16:45:04 來源:億速云 閱讀:169 作者:chen 欄目:web開發(fā)

本篇內(nèi)容介紹了“SpringBoot怎么整合RocketMQ事務(wù)、廣播以及順序消息”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

環(huán)境:springboot2.3.9RELEASE + RocketMQ4.8.0

依賴

<dependency>   <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency>     <groupId>org.apache.rocketmq</groupId>     <artifactId>rocketmq-spring-boot-starter</artifactId>     <version>2.2.0</version> </dependency>

配置文件

server:   port: 8080 --- rocketmq:   nameServer: localhost:9876   producer:     group: demo-mq

普通消息

發(fā)送

@Resource private RocketMQTemplate rocketMQTemplate ;      public void send(String message) {   rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); }

接受

@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2") @Component public class ConsumerListener implements RocketMQListener<String> {      @Override     public void onMessage(String message) {         System.out.println("接收到消息:" + message) ;     }  }

順序消息

發(fā)送

@Resource private RocketMQTemplate rocketMQTemplate ;  public void sendOrder(String topic, String message, String tags, int id) {     rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),              "order-" + id, new SendCallback() {                 @Override                 public void onSuccess(SendResult sendResult) {                     System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ;                 }                 @Override                 public void onException(Throwable e) {                     e.printStackTrace() ;                 }             }); }

這里是根據(jù)hashkey將消息發(fā)送到不同的隊列中

@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group",      selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) @Component public class ConsumerOrderListener implements RocketMQListener<String> {      @Override     public void onMessage(String message) {         System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ;     }  }

consumeMode = ConsumeMode.ORDERLY,指明了消息模式為順序模式,一個隊列,一個線程。

結(jié)果

SpringBoot怎么整合RocketMQ事務(wù)、廣播以及順序消息

當(dāng)consumeMode = ConsumeMode.CONCURRENTLY執(zhí)行結(jié)果如下:

SpringBoot怎么整合RocketMQ事務(wù)、廣播以及順序消息

集群/廣播消息模式

發(fā)送端

@Resource private RocketMQTemplate rocketMQTemplate ;      public void send(String topic, String message, String tags) {     rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; }

集群消息模式

消費端

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",      selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) @Component public class ConsumerBroadListener implements RocketMQListener<String> {      @Override     public void onMessage(String message) {         System.out.println("ConsumerBroadListener1接收到消息:" + message) ;     }  }

messageModel = MessageModel.CLUSTERING

測試

啟動兩個服務(wù)分別端口是8080,8081

8080服務(wù)

SpringBoot怎么整合RocketMQ事務(wù)、廣播以及順序消息

8081服務(wù)

SpringBoot怎么整合RocketMQ事務(wù)、廣播以及順序消息

集群消息模式下,每個服務(wù)分別接收一部分消息,實現(xiàn)了負(fù)載均衡

廣播消息模式

消費端

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",      selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) @Component public class ConsumerBroadListener implements RocketMQListener<String> {      @Override     public void onMessage(String message) {         System.out.println("ConsumerBroadListener1接收到消息:" + message) ;     }  }

messageModel = MessageModel.BROADCASTING

測試

啟動兩個服務(wù)分別端口是8080,8081

8080服務(wù)

SpringBoot怎么整合RocketMQ事務(wù)、廣播以及順序消息

8081服務(wù)

SpringBoot怎么整合RocketMQ事務(wù)、廣播以及順序消息

集群消息模式下,每個服務(wù)分別都接受了同樣的消息。

事務(wù)消息

RocketMQ事務(wù)的3個狀態(tài)

TransactionStatus.CommitTransaction:提交事務(wù)消息,消費者可以消費此消息

TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除,不允許被消費。

TransactionStatus.Unknown :中間狀態(tài),它代表需要檢查消息隊列來確定狀態(tài)。

RocketMQ實現(xiàn)事務(wù)消息主要分為兩個階段:正常事務(wù)的發(fā)送及提交、事務(wù)信息的補(bǔ)償流程 整體流程為:

正常事務(wù)發(fā)送與提交階段

1、生產(chǎn)者發(fā)送一個半消息給MQServer(半消息是指消費者暫時不能消費的消息)

2、服務(wù)端響應(yīng)消息寫入結(jié)果,半消息發(fā)送成功

3、開始執(zhí)行本地事務(wù)

4、根據(jù)本地事務(wù)的執(zhí)行狀態(tài)執(zhí)行Commit或者Rollback操作

事務(wù)信息的補(bǔ)償流程

1、如果MQServer長時間沒收到本地事務(wù)的執(zhí)行狀態(tài)會向生產(chǎn)者發(fā)起一個確認(rèn)回查的操作請求

2、生產(chǎn)者收到確認(rèn)回查請求后,檢查本地事務(wù)的執(zhí)行狀態(tài)

3、根據(jù)檢查后的結(jié)果執(zhí)行Commit或者Rollback操作

補(bǔ)償階段主要是用于解決生產(chǎn)者在發(fā)送Commit或者Rollback操作時發(fā)生超時或失敗的情況。

發(fā)送端

@Resource private RocketMQTemplate rocketMQTemplate ;      public void sendTx(String topic, Long id, String tags) {     rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(             new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))).             setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(),              UUID.randomUUID().toString().replaceAll("-", "")) ; }

生產(chǎn)者對應(yīng)的監(jiān)聽器

@RocketMQTransactionListener public class ProducerTxListener implements RocketMQLocalTransactionListener {          @Resource     private BusinessService bs ;      @Override     public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {         // 這里執(zhí)行本地的事務(wù)操作,比如保存數(shù)據(jù)。         try {             // 創(chuàng)建一個日志記錄表,將這唯一的ID存入數(shù)據(jù)庫中,在下面的check方法中可以根據(jù)這個id查詢是否有數(shù)據(jù)             String id = (String) msg.getHeaders().get("BID") ;             Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ;             System.out.println("消息內(nèi)容:" + users + "\t參與數(shù)據(jù):" + arg + "\t本次事務(wù)的唯一編號:" + id) ;             bs.save(users, new UsersLog(users.getId(), id)) ;         } catch (Exception e) {             e.printStackTrace() ;             return RocketMQLocalTransactionState.ROLLBACK ;         }         return RocketMQLocalTransactionState.COMMIT ;     }      @Override     public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {         // 這里檢查本地事務(wù)是否執(zhí)行成功         String id = (String) msg.getHeaders().get("BID") ;         System.out.println("執(zhí)行查詢ID為:" + id + " 的數(shù)據(jù)是否存在") ;         UsersLog usersLog = bs.queryUsersLog(id) ;         if (usersLog == null) {             return RocketMQLocalTransactionState.ROLLBACK ;         }         return RocketMQLocalTransactionState.COMMIT ;     }  }

消費端

@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") @Component public class ConsumerTxListener implements RocketMQListener<Users> {      @Override     public void onMessage(Users users) {         System.out.println("TX接收到消息:" + users) ;     }  }

Service

@Transactional public boolean save(Users users, UsersLog usersLog) {     usersRepository.save(users) ;     usersLogRepository.save(usersLog) ;     if (users.getId() == 1) {         throw new RuntimeException("數(shù)據(jù)錯誤") ;     }     return true ; }      public UsersLog queryUsersLog(String bid) {     return usersLogRepository.findByBid(bid) ; }

Controller

@GetMapping("/tx/{id}") public Object sendTx(@PathVariable("id")Long id) {     ps.sendTx("tx-topic", id, "tag10") ;     return "send transaction success" ; }

測試

調(diào)用接口后,控制臺輸出:

SpringBoot怎么整合RocketMQ事務(wù)、廣播以及順序消息

從打印日志看出來都保存完畢了后 消費端才接受到消息。

SpringBoot怎么整合RocketMQ事務(wù)、廣播以及順序消息

SpringBoot怎么整合RocketMQ事務(wù)、廣播以及順序消息

刪除數(shù)據(jù),再測試ID為1會報錯的。

SpringBoot怎么整合RocketMQ事務(wù)、廣播以及順序消息

數(shù)據(jù)庫中沒有數(shù)據(jù)。。。

是不是也不是很復(fù)雜,2個階段來處理。

完畢!!!

“SpringBoot怎么整合RocketMQ事務(wù)、廣播以及順序消息”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI