您好,登錄后才能下訂單哦!
這篇文章主要講解了RocketMq事務(wù)消息發(fā)送代碼的過(guò)程詳解,內(nèi)容清晰明了,對(duì)此有興趣的小伙伴可以學(xué)習(xí)一下,相信大家閱讀完之后會(huì)有幫助。
一、RocketMq事務(wù)消息流程:
1、首先會(huì)向broker發(fā)送一個(gè)預(yù)請(qǐng)求消息,消費(fèi)者不可見(jiàn)
2、回調(diào)執(zhí)行本地事務(wù)(比如操作數(shù)據(jù)庫(kù))
3、事務(wù)執(zhí)行成功后,再次發(fā)送消息給broker,告訴broker事務(wù)執(zhí)行成功這個(gè)消息要提交,讓消費(fèi)者可見(jiàn)。如果本地事務(wù)執(zhí)行超時(shí),會(huì)返回一個(gè)unknow,broker會(huì)發(fā)送一個(gè)消息回查,檢查消息是否執(zhí)行成功。
二、RocketMq事務(wù)消息實(shí)例:
1、引入rocketMq相關(guān)的依賴(lài):
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>
2、創(chuàng)建一個(gè)TransactionProducer類(lèi):
public class TransactionProducer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException { //創(chuàng)建生產(chǎn)者并制定組名 TransactionMQProducer producer = new TransactionMQProducer("rocketMQ_transaction_producer_group"); //2.指定Nameserver地址 producer.setNamesrvAddr("192.168.***.***:9876"); //3、指定消息監(jiān)聽(tīng)對(duì)象用于執(zhí)行本地事務(wù)和消息回查 TransactionListener listener = new TransactionListenerImol(); producer.setTransactionListener(listener); //4、線程池 ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = newThread(r); thread.setName("client-tanscation-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); //5、啟動(dòng)producer producer.start(); //6.創(chuàng)建消息對(duì)象,指定主題Topic、Tag和消息體 String topic, String tags, String keys, byte[] body Message message = new Message("Topic_transaction_demo", //主題 "Tags", //主要用于消息過(guò)濾 "Key_1", //消息唯一值 ("hello-transaction").getBytes(RemotingHelper.DEFAULT_CHARSET)); //7、發(fā)送事務(wù)消息 TransactionSendResult result = producer.sendMessageInTransaction(message, "hello-transaction"); producer.shutdown(); } }
3、發(fā)送事務(wù)消息還需要一個(gè)事務(wù)監(jiān)聽(tīng)對(duì)象,它實(shí)現(xiàn)TransactionListener 接口,其中有兩個(gè)方法作用分別是執(zhí)行本地事務(wù)和消息回查:
public class TransactionListenerImol implements TransactionListener { //存儲(chǔ)事務(wù)狀態(tài)信息 key:事務(wù)id value:當(dāng)前事務(wù)執(zhí)行的狀態(tài) private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); //執(zhí)行本地事務(wù) @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { //事務(wù)id String transactionId = message.getTransactionId(); //0:執(zhí)行中,狀態(tài)未知 1:執(zhí)行成功 2:執(zhí)行失敗 localTrans.put(transactionId, 0); //業(yè)務(wù)執(zhí)行,本地事務(wù),service System.out.println("hello-demo-transaction"); try { System.out.println("正在執(zhí)行本地事務(wù)---"); Thread.sleep(60000*2); System.out.println("本地事務(wù)執(zhí)行成功---"); localTrans.put(transactionId, 1); } catch (InterruptedException e) { e.printStackTrace(); localTrans.put(transactionId, 2); return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } //消息回查 @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //獲取對(duì)應(yīng)事務(wù)的狀態(tài)信息 String transactionId = messageExt.getTransactionId(); //獲取對(duì)應(yīng)事務(wù)id執(zhí)行狀態(tài) Integer status = localTrans.get(transactionId); //消息回查 System.out.println("消息回查---transactionId:" + transactionId + "狀態(tài):" + status); switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.UNKNOW; } }
看完上述內(nèi)容,是不是對(duì)RocketMq事務(wù)消息發(fā)送代碼的過(guò)程詳解有進(jìn)一步的了解,如果還想學(xué)習(xí)更多內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。