溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

RocketMq事務(wù)消息發(fā)送代碼的過(guò)程詳解

發(fā)布時(shí)間:2020-07-17 17:08:10 來(lái)源:億速云 閱讀:225 作者:小豬 欄目:開(kāi)發(fā)技術(shù)

這篇文章主要講解了RocketMq事務(wù)消息發(fā)送代碼的過(guò)程詳解,內(nèi)容清晰明了,對(duì)此有興趣的小伙伴可以學(xué)習(xí)一下,相信大家閱讀完之后會(huì)有幫助。

一、RocketMq事務(wù)消息流程:

1、首先會(huì)向broker發(fā)送一個(gè)預(yù)請(qǐng)求消息,消費(fèi)者不可見(jiàn)

2、回調(diào)執(zhí)行本地事務(wù)(比如操作數(shù)據(jù)庫(kù))

3、事務(wù)執(zhí)行成功后,再次發(fā)送消息給broker,告訴broker事務(wù)執(zhí)行成功這個(gè)消息要提交,讓消費(fèi)者可見(jiàn)。如果本地事務(wù)執(zhí)行超時(shí),會(huì)返回一個(gè)unknow,broker會(huì)發(fā)送一個(gè)消息回查,檢查消息是否執(zhí)行成功。

二、RocketMq事務(wù)消息實(shí)例:

1、引入rocketMq相關(guān)的依賴(lài):

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.4.0</version>
</dependency>

2、創(chuàng)建一個(gè)TransactionProducer類(lèi):

public class TransactionProducer {

  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
    //創(chuàng)建生產(chǎn)者并制定組名
    TransactionMQProducer producer = new TransactionMQProducer("rocketMQ_transaction_producer_group");
    //2.指定Nameserver地址
    producer.setNamesrvAddr("192.168.***.***:9876");
    //3、指定消息監(jiān)聽(tīng)對(duì)象用于執(zhí)行本地事務(wù)和消息回查
    TransactionListener listener = new TransactionListenerImol();
    producer.setTransactionListener(listener);
    //4、線程池
    ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
      @Override
      public Thread newThread(Runnable r) {
        Thread thread = newThread(r);
        thread.setName("client-tanscation-msg-check-thread");
        return thread;
      }
    });
    producer.setExecutorService(executorService);
    //5、啟動(dòng)producer
    producer.start();

    //6.創(chuàng)建消息對(duì)象,指定主題Topic、Tag和消息體 String topic, String tags, String keys, byte[] body
    Message message = new Message("Topic_transaction_demo", //主題
        "Tags", //主要用于消息過(guò)濾
        "Key_1", //消息唯一值
        ("hello-transaction").getBytes(RemotingHelper.DEFAULT_CHARSET));

    //7、發(fā)送事務(wù)消息
    TransactionSendResult result = producer.sendMessageInTransaction(message, "hello-transaction");

    producer.shutdown();
  }
}

3、發(fā)送事務(wù)消息還需要一個(gè)事務(wù)監(jiān)聽(tīng)對(duì)象,它實(shí)現(xiàn)TransactionListener 接口,其中有兩個(gè)方法作用分別是執(zhí)行本地事務(wù)和消息回查:

public class TransactionListenerImol implements TransactionListener {
  //存儲(chǔ)事務(wù)狀態(tài)信息 key:事務(wù)id value:當(dāng)前事務(wù)執(zhí)行的狀態(tài)
  private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
  //執(zhí)行本地事務(wù)
  @Override
  public LocalTransactionState executeLocalTransaction(Message message, Object o) {
    //事務(wù)id
    String transactionId = message.getTransactionId();
    //0:執(zhí)行中,狀態(tài)未知 1:執(zhí)行成功 2:執(zhí)行失敗
    localTrans.put(transactionId, 0);
    //業(yè)務(wù)執(zhí)行,本地事務(wù),service
    System.out.println("hello-demo-transaction");
    try {
      System.out.println("正在執(zhí)行本地事務(wù)---");
      Thread.sleep(60000*2);
      System.out.println("本地事務(wù)執(zhí)行成功---");
      localTrans.put(transactionId, 1);
    } catch (InterruptedException e) {
      e.printStackTrace();
      localTrans.put(transactionId, 2);
      return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    return LocalTransactionState.COMMIT_MESSAGE;
  }

  //消息回查
  @Override
  public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
    //獲取對(duì)應(yīng)事務(wù)的狀態(tài)信息
    String transactionId = messageExt.getTransactionId();
    //獲取對(duì)應(yīng)事務(wù)id執(zhí)行狀態(tài)
    Integer status = localTrans.get(transactionId);
    //消息回查
    System.out.println("消息回查---transactionId:" + transactionId + "狀態(tài):" + status);
    switch (status) {
      case 0:
        return LocalTransactionState.UNKNOW;
      case 1:
        return LocalTransactionState.COMMIT_MESSAGE;
      case 2:
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    return LocalTransactionState.UNKNOW;
  }
}

看完上述內(nèi)容,是不是對(duì)RocketMq事務(wù)消息發(fā)送代碼的過(guò)程詳解有進(jìn)一步的了解,如果還想學(xué)習(xí)更多內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI