溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ主從同步的實例分析以及HA機制原理

發(fā)布時間:2021-09-04 11:24:09 來源:億速云 閱讀:123 作者:chen 欄目:大數(shù)據(jù)

這篇文章主要介紹“RocketMQ主從同步的實例分析以及HA機制原理”,在日常操作中,相信很多人在RocketMQ主從同步的實例分析以及HA機制原理問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”RocketMQ主從同步的實例分析以及HA機制原理”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!


HA 核心類

HA 的實現(xiàn)邏輯放在了 store 存儲模塊的ha目錄中,其核心實現(xiàn)類如下:

  1. HAService:主從同步的核心實現(xiàn)類

  2. HAService$AcceptSocketService:主服務器監(jiān)聽從服務器連接實現(xiàn)類

  3. HAService$GroupTransferService:主從同步通知類,實現(xiàn)同步復制和異步復制的功能

  4. HAService$HAClient:從服務器連接主服務實現(xiàn)類

  5. HAConnection:主服務端 HA 連接對象的封裝,當主服務器接收到從服務器發(fā)過來的消息后,會封裝成一個 HAConnection 對象,其中里面又封裝了讀 Socket 連接實現(xiàn)與 寫 Socket 連接實現(xiàn):

  • HAConnection$ReadSocketService:主服務器讀實現(xiàn)類

  • HAConnection$WriteSocketService:主服務器寫實現(xiàn)類

RocketMQ 主從同步的整體工作機制大致是:

  1. 從服務器主動建立 TCP 連接主服務器,然后每隔 5s 向主服務器發(fā)送 commitLog 文件最大偏移量拉取還未同步的消息;

  2. 主服務器開啟監(jiān)聽端口,監(jiān)聽從服務器發(fā)送過來的信息,主服務器收到從服務器發(fā)過來的偏移量進行解析,并返回查找出未同步的消息給從服務器;

  3. 客戶端收到主服務器的消息后,將這批消息寫入 commitLog 文件中,然后更新 commitLog 拉取偏移量,接著繼續(xù)向主服務拉取未同步的消息。

Slave -> Master 過程

從 HA 實現(xiàn)邏輯可看出,可大致分為兩個過程,分別是從服務器上報偏移量,以及主服務器發(fā)送未同步消息到從服務器。

從上面的實現(xiàn)類可知,從服務器向主服務器上報偏移量的邏輯在 HAClient 類中,HAClient 類是一個繼承了 ServiceThread 類,即它是一個線程服務類,在 Broker 啟動后,Broker 啟動開一條線程定時執(zhí)行從服務器上報偏移量到主服務器的任務。

org.apache.rocketmq.store.ha.HAService.HAClient#run:

public void run() {
  log.info(this.getServiceName() + " service started");

  while (!this.isStopped()) {
    try {
      // 主動連接主服務器,獲取socketChannel對象
      if (this.connectMaster()) {
        if (this.isTimeToReportOffset()) {
          // 執(zhí)行上報偏移量到主服務器
          boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
          if (!result) {
            this.closeMaster();
          }
        }
				// 每隔一秒鐘輪詢一遍
        this.selector.select(1000);

        // 處理主服務器發(fā)送過來的消息
        boolean ok = this.processReadEvent();
        if (!ok) {
          this.closeMaster();
        }
        
        // ......
        
      } else {
        this.waitForRunning(1000 * 5);
      }
    } catch (Exception e) {
      log.warn(this.getServiceName() + " service has exception. ", e);
      this.waitForRunning(1000 * 5);
    }
  }

  log.info(this.getServiceName() + " service end");
}

以上是 HAClient 線程 run 方法邏輯,主要是做了主動連接主服務器,并上報偏移量到主服務器,以及處理主服務器發(fā)送過來的消息,并不斷循環(huán)執(zhí)行以上邏輯。

org.apache.rocketmq.store.ha.HAService.HAClient#connectMaster:

private boolean connectMaster() throws ClosedChannelException {
  if (null == socketChannel) {
    String addr = this.masterAddress.get();
    if (addr != null) {
      SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
      if (socketAddress != null) {
        this.socketChannel = RemotingUtil.connect(socketAddress);
        if (this.socketChannel != null) {
          this.socketChannel.register(this.selector, SelectionKey.OP_READ);
        }
      }
    }
    this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
    this.lastWriteTimestamp = System.currentTimeMillis();
  }
  return this.socketChannel != null;
}

該方法是從服務器連接主服務器的邏輯,拿到主服務器地址并且連接上以后,會獲取一個 socketChannel 對象,接著還會記錄當前時間戳為上次寫入的時間戳,lastWriteTimestamp 的作用時用來計算主從同步時間間隔,這里需要注意一點,如果沒有配置主服務器地址,該方法會返回 false,即不會執(zhí)行主從復制。

該方法還會調(diào)用 DefaultMessageStore 的 getMaxPhyOffset() 方法獲取 commitLog 文件最大偏移量,作為本次上報的偏移量。

org.apache.rocketmq.store.ha.HAService.HAClient#reportSlaveMaxOffset:

private boolean reportSlaveMaxOffset(final long maxOffset) {
  this.reportOffset.position(0);
  this.reportOffset.limit(8);
  this.reportOffset.putLong(maxOffset);
  this.reportOffset.position(0);
  this.reportOffset.limit(8);

  for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
    try {
      this.socketChannel.write(this.reportOffset);
    } catch (IOException e) {
      log.error(this.getServiceName()
                + "reportSlaveMaxOffset this.socketChannel.write exception", e);
      return false;
    }
  }
  return !this.reportOffset.hasRemaining();
}

該方法向主服務器上報已拉取偏移量,具體做法是將 ByteBuffer 讀取位置 position 值為 0,其實跳用 flip() 方法也可以,然后調(diào)用 putLong() 方法將 maxOffset 寫入 ByteBuffer,將 limit 設置為 8,跟寫入 ByteBuffer 中的 maxOffset(long 型)大小一樣,最后采取 for 循環(huán)將 maxOffset 寫入網(wǎng)絡通道中,并調(diào)用 hasRemaining() 方法,該方法的邏輯為判斷 position 是否小于 limit,即判斷 ByteBuffer 中的字節(jié)流是否全部寫入到通道中。

Master -> Slave 過程

org.apache.rocketmq.store.ha.HAService.AcceptSocketService#run:

public void run() {
  log.info(this.getServiceName() + " service started");

  while (!this.isStopped()) {
    try {
      this.selector.select(1000);
      Set<SelectionKey> selected = this.selector.selectedKeys();

      if (selected != null) {
        for (SelectionKey k : selected) {
          if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
            SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

            if (sc != null) {
              HAService.log.info("HAService receive new connection, "
                                 + sc.socket().getRemoteSocketAddress());

              try {
                HAConnection conn = new HAConnection(HAService.this, sc);
                conn.start();
                HAService.this.addConnection(conn);
              } catch (Exception e) {
                log.error("new HAConnection exception", e);
                sc.close();
              }
            }
          } else {
            log.warn("Unexpected ops in select " + k.readyOps());
          }
        }

        selected.clear();
      }
    } catch (Exception e) {
      log.error(this.getServiceName() + " service has exception.", e);
    }
  }

  log.info(this.getServiceName() + " service end");
}

主服務器收到從服務器的拉取偏移量后,會封裝成一個 HAConnection 對象,前面也說過 HAConnection 封裝主服務端 HA 連接對象的封裝,其中有讀實現(xiàn)類和寫實現(xiàn)類,start() 方法即開啟了讀寫線程:

org.apache.rocketmq.store.ha.HAConnection#start:

public void start() {
  this.readSocketService.start();
  this.writeSocketService.start();
}

org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#processReadEvent:

private boolean processReadEvent() {
  int readSizeZeroTimes = 0;

  if (!this.byteBufferRead.hasRemaining()) {
    this.byteBufferRead.flip();
    this.processPostion = 0;
  }

  while (this.byteBufferRead.hasRemaining()) {
    try {
      int readSize = this.socketChannel.read(this.byteBufferRead);
      if (readSize > 0) {
        readSizeZeroTimes = 0;
        this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
        if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
          int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
          // 從網(wǎng)絡通道中讀取從服務器上報的偏移量
          long readOffset = this.byteBufferRead.getLong(pos - 8);
          this.processPostion = pos;

          // 同步從服務器偏移量
          HAConnection.this.slaveAckOffset = readOffset;
          if (HAConnection.this.slaveRequestOffset < 0) {
            HAConnection.this.slaveRequestOffset = readOffset;
            log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
          }

          // 這里主要是同步后需要喚醒相關(guān)消息發(fā)送線程,實現(xiàn)主從同步是異步還是同步的功能
          HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
        }
      } else if (readSize == 0) {
        if (++readSizeZeroTimes >= 3) {
          break;
        }
      } else {
        log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
        return false;
      }
    } catch (IOException e) {
      log.error("processReadEvent exception", e);
      return false;
    }
  }

  return true;
}

從以上源碼可看出,主服務器接收到從服務器上報的偏移量后,主要作了兩件事:

  1. 獲取從服務器上報的偏移量;

  2. 喚醒主從同步消費者發(fā)送消息同步返回的線程,該方法實現(xiàn)了主從同步-同步復制的功能。

org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run:

public void run() {
  HAConnection.log.info(this.getServiceName() + " service started");

  while (!this.isStopped()) {
    try {
      this.selector.select(1000);

      // 如果slaveRequestOffset=-1,說明讀線程還沒有獲取從服務器的偏移量,繼續(xù)循環(huán)等待
      if (-1 == HAConnection.this.slaveRequestOffset) {
        Thread.sleep(10);
        continue;
      }

      // 如果nextTransferFromWhere=-1,說明線程剛開始執(zhí)行數(shù)據(jù)傳輸
      if (-1 == this.nextTransferFromWhere) {
        // 如果slaveRequestOffset=0,說明從服務器是第一次上報偏移量
        if (0 == HAConnection.this.slaveRequestOffset) {
          // 獲取最后一個 commitLog 文件且還未讀取消費的偏移量
          long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
          // 求出最后一個commitLog偏移量的初始偏移量
          masterOffset =
            masterOffset
            - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
               .getMapedFileSizeCommitLog());

          if (masterOffset < 0) {
            masterOffset = 0;
          }

          // 更新 nextTransferFromWhere
          this.nextTransferFromWhere = masterOffset;
        } else {
          // 如果slaveRequestOffset!=0,則將該值賦值給nextTransferFromWhere
          this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
        }

        log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                 + "], and slave request " + HAConnection.this.slaveRequestOffset);
      }

      // 判斷上次寫事件是否已全部寫完成
      if (this.lastWriteOver) {

        // 計算是否已到發(fā)送心跳包時間
        long interval =
          HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
        // 發(fā)送心跳包,以保持長連接
        if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
            .getHaSendHeartbeatInterval()) {
          // Build Header
          this.byteBufferHeader.position(0);
          this.byteBufferHeader.limit(headerSize);
          this.byteBufferHeader.putLong(this.nextTransferFromWhere);
          this.byteBufferHeader.putInt(0);
          this.byteBufferHeader.flip();
          this.lastWriteOver = this.transferData();
          if (!this.lastWriteOver)
            continue;
        }
      } else {
        this.lastWriteOver = this.transferData();
        if (!this.lastWriteOver)
          continue;
      }

      // 獲取同步消息數(shù)據(jù)
      SelectMappedBufferResult selectResult =      HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
      if (selectResult != null) {
        int size = selectResult.getSize();
        if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
          size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
        }

        long thisOffset = this.nextTransferFromWhere;
        this.nextTransferFromWhere += size;

        selectResult.getByteBuffer().limit(size);
        this.selectMappedBufferResult = selectResult;

        // Build Header
        this.byteBufferHeader.position(0);
        this.byteBufferHeader.limit(headerSize);
        this.byteBufferHeader.putLong(thisOffset);
        this.byteBufferHeader.putInt(size);
        this.byteBufferHeader.flip();

        // 傳輸消息到從服務器
        this.lastWriteOver = this.transferData();
      } else {

        HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
      }
    } catch (Exception e) {

      HAConnection.log.error(this.getServiceName() + " service has exception.", e);
      break;
    }
  }

  if (this.selectMappedBufferResult != null) {
    this.selectMappedBufferResult.release();
  }

  this.makeStop();

  readSocketService.makeStop();

  haService.removeConnection(HAConnection.this);

  SelectionKey sk = this.socketChannel.keyFor(this.selector);
  if (sk != null) {
    sk.cancel();
  }

  try {
    this.selector.close();
    this.socketChannel.close();
  } catch (IOException e) {
    HAConnection.log.error("", e);
  }

  HAConnection.log.info(this.getServiceName() + " service end");
}

讀實現(xiàn)類實現(xiàn)邏輯比較長,但主要做了以下幾件事情:

  1. 計算需要拉取的偏移量,如果從服務器第一次拉取,則從最后一個 commitLog 文件的初始偏移量開始同步;

  2. 傳輸消息到從服務器;

  3. 發(fā)送心跳包到從服務器,保持長連接。

關(guān)于第一步,我還需要詳細講解一下,因為之前有想到一個問題:

把 brokerA 的從服務器去掉,再啟動一臺新的從服務器指向brokerA 主服務器,這時的主服務器的消息是否會全量同步到從服務?

org.apache.rocketmq.store.MappedFileQueue#getMaxOffset:

public long getMaxOffset() {
    MappedFile mappedFile = getLastMappedFile();
    if (mappedFile != null) {
        return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
    }
    return 0;
}

org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run:

// 求出最后一個commitLog偏移量的初始偏移量
masterOffset =
  masterOffset
            - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
               .getMapedFileSizeCommitLog());

從以上邏輯可找到答案,如果有新的從服務器同步主服務器消息,則從最后一個 commitLog 文件的初始偏移量開始同步。

回到最開始開啟 HAClient 線程上報偏移量的方法,我們發(fā)現(xiàn)里面還做了一件事:

// 處理主服務器發(fā)送過來的消息
boolean ok = this.processReadEvent();

org.apache.rocketmq.store.ha.HAService.HAClient#processReadEvent:

private boolean processReadEvent() {
  int readSizeZeroTimes = 0;
  while (this.byteBufferRead.hasRemaining()) {
    try {
      int readSize = this.socketChannel.read(this.byteBufferRead);
      if (readSize > 0) {
        lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
        readSizeZeroTimes = 0;
        // 讀取消息并寫入commitLog文件中
        boolean result = this.dispatchReadRequest();
        if (!result) {
          log.error("HAClient, dispatchReadRequest error");
          return false;
        }
      } else if (readSize == 0) {
        if (++readSizeZeroTimes >= 3) {
          break;
        }
      } else {
        // TODO ERROR
        log.info("HAClient, processReadEvent read socket < 0");
        return false;
      }
    } catch (IOException e) {
      log.info("HAClient, processReadEvent read socket exception", e);
      return false;
    }
  }

  return true;
}

該方法用于處理主服務器發(fā)送回來的消息數(shù)據(jù),這里用了 while 循環(huán)的處理,不斷地從 byteBuffer 讀取數(shù)據(jù)到緩沖區(qū)中,最后調(diào)用 dispatchReadRequest 方法將消息數(shù)據(jù)寫入 commitLog 文件中,完成主從復制最后一個步驟。

到此,關(guān)于“RocketMQ主從同步的實例分析以及HA機制原理”的學習就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI