溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Nacos客戶端配置中心緩存動態(tài)更新的示例分析

發(fā)布時間:2022-03-31 14:45:19 來源:億速云 閱讀:253 作者:小新 欄目:開發(fā)技術(shù)

這篇文章將為大家詳細講解有關(guān)Nacos客戶端配置中心緩存動態(tài)更新的示例分析,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。


      Nacos 作為配置中心,當(dāng)應(yīng)用程序去訪問Nacos動態(tài)獲取配置源之后,會緩存到本地內(nèi)存以及磁盤中。
      由于Nacos作為動態(tài)配置中心,意味著后續(xù)配置變更之后需要讓所有相關(guān)的客戶端感知,并更新本地內(nèi)存!

      客戶端配置緩存更新

      當(dāng)客戶端拿到配置后,需要動態(tài)刷新,從而保證數(shù)據(jù)和服務(wù)器端是一致的,這個過程是如何實現(xiàn)的呢?在這一小節(jié)中我們來做一個詳細分析。

      Nacos采用長輪訓(xùn)機制來實現(xiàn)數(shù)據(jù)變更的同步,原理如下!

      Nacos客戶端配置中心緩存動態(tài)更新的示例分析

      整體工作流程如下:

      • 客戶端發(fā)起長輪訓(xùn)請求

      • 服務(wù)端收到請求以后,先比較服務(wù)端緩存中的數(shù)據(jù)是否相同,如果不通,則直接返回

      • 如果相同,則通過schedule延遲29.5s之后再執(zhí)行比較

      • 為了保證當(dāng)服務(wù)端在29.5s之內(nèi)發(fā)生數(shù)據(jù)變化能夠及時通知給客戶端,服務(wù)端采用事件訂閱的方式來監(jiān)聽服務(wù)端本地數(shù)據(jù)變化的事件,一旦收到事件,則觸發(fā)DataChangeTask的通知,并且遍歷allStubs隊列中的ClientLongPolling,把結(jié)果寫回到客戶端,就完成了一次數(shù)據(jù)的推送

      • 如果 DataChangeTask 任務(wù)完成了數(shù)據(jù)的 “推送” 之后,ClientLongPolling 中的調(diào)度任務(wù)又開始執(zhí)行了怎么辦呢?
        很簡單,只要在進行 “推送” 操作之前,先將原來等待執(zhí)行的調(diào)度任務(wù)取消掉就可以了,這樣就防止了推送操作寫完響應(yīng)數(shù)據(jù)之后,調(diào)度任務(wù)又去寫響應(yīng)數(shù)據(jù),這時肯定會報錯的。所以,在ClientLongPolling方法中,最開始的一個步驟就是刪除訂閱事件

      長輪訓(xùn)任務(wù)啟動入口

      在NacosConfigService的構(gòu)造方法中,當(dāng)這個類被實例化以后,有做一些事情

      • 初始化一個HttpAgent,這里又用到了裝飾起模式,實際工作的類是ServerHttpAgent, MetricsHttpAgent內(nèi)部也是調(diào)用了ServerHttpAgent的方法,增加了監(jiān)控統(tǒng)計的信息

      • ClientWorker, 客戶端的一個工作類,agent作為參數(shù)傳入到clientworker,可以基本猜測到里面會用到agent做一些遠程相關(guān)的事情

      public NacosConfigService(Properties properties) throws NacosException {
          ValidatorUtils.checkInitParam(properties);
          String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
          if (StringUtils.isBlank(encodeTmp)) {
              this.encode = Constants.ENCODE;
          } else {
              this.encode = encodeTmp.trim();
          }
          initNamespace(properties); //
          this.configFilterChainManager = new ConfigFilterChainManager(properties);
          //初始化網(wǎng)絡(luò)通信組件
          this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
          this.agent.start(); 
          //初始化ClientWorker
          this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
      }

      ClientWorker

      在上述初始化代碼中,我們重點需要關(guān)注ClientWorker這個類,它的構(gòu)造方法如下

      public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
              final Properties properties) {
          this.agent = agent;
          this.configFilterChainManager = configFilterChainManager; //初始化配置過濾管理器
          // Initialize the timeout parameter
          init(properties); //初始化配置
          //初始化一個定時調(diào)度的線程池,重寫了threadfactory方法
          this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
              @Override
              public Thread newThread(Runnable r) {
                  Thread t = new Thread(r);
                  t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                  t.setDaemon(true);
                  return t;
              }
          });
           //初始化一個定時調(diào)度的線程池,從里面的name名字來看,似乎和長輪訓(xùn)有關(guān)系。而這個長輪訓(xùn)應(yīng)該是和nacos服務(wù)端的長輪訓(xùn)
          this.executorService = Executors
                  .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                      @Override
                      public Thread newThread(Runnable r) {
                          Thread t = new Thread(r);
                          t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                          t.setDaemon(true);
                          return t;
                      }
                  });
          //設(shè)置定時任務(wù)的執(zhí)行頻率,并且調(diào)用checkConfigInfo這個方法,猜測是定時去檢測配置是否發(fā)生了變化
              //首次執(zhí)行延遲時間為1毫秒、延遲時間為10毫秒
          this.executor.scheduleWithFixedDelay(new Runnable() {
              @Override
              public void run() {
                  try {
                      checkConfigInfo();
                  } catch (Throwable e) {
                      LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                  }
              }
          }, 1L, 10L, TimeUnit.MILLISECONDS);
      }

      可以看到 ClientWorker 除了將 HttpAgent 維持在自己內(nèi)部,還創(chuàng)建了兩個線程池:

      第一個線程池是只擁有一個線程用來執(zhí)行定時任務(wù)的 executor,executor 每隔 10ms 就會執(zhí)行一次 checkConfigInfo() 方法,從方法名上可以知道是每 10 ms 檢查一次配置信息。

      第二個線程池是一個普通的線程池,從 ThreadFactory 的名稱可以看到這個線程池是做長輪詢的。

      checkConfigInfo

      ClientWorker構(gòu)造初始化中,啟動了一個定時任務(wù)去執(zhí)行checkConfigInfo()方法,這個方法主要是定時檢查本地配置和服務(wù)器上的配置的變更情況,這個方法定義如下.

      public void checkConfigInfo() {
          // Dispatch tasks.
          int listenerSize = cacheMap.size(); //
          // Round up the longingTaskCount.
           // 向上取整為批數(shù),監(jiān)聽的配置數(shù)量除以3000,得到一個整數(shù),代表長輪訓(xùn)任務(wù)的數(shù)量
          int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
           //currentLongingTaskCount表示當(dāng)前的長輪訓(xùn)任務(wù)數(shù)量,如果小于計算的結(jié)果,則可以繼續(xù)創(chuàng)建
          if (longingTaskCount > currentLongingTaskCount) {
              for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                  // The task list is no order.So it maybe has issues when changing.
                  executorService.execute(new LongPollingRunnable(i));
              }
              currentLongingTaskCount = longingTaskCount;
          }
      }

      這個方法主要的目的是用來檢查服務(wù)端的配置信息是否發(fā)生了變化。如果有變化,則觸發(fā)listener通知

      cacheMap: AtomicReference<Map<String, CacheData>> cacheMap 用來存儲監(jiān)聽變更的緩存集合。key是根據(jù)dataID/group/tenant(租戶) 拼接的值。Value是對應(yīng)存儲在nacos服務(wù)器上的配置文件的內(nèi)容。

      默認情況下,每個長輪訓(xùn)LongPullingRunnable任務(wù)默認處理3000個監(jiān)聽配置集。如果超過3000, 則需要啟動多個LongPollingRunnable去執(zhí)行。

      currentLongingTaskCount保存已啟動的LongPullingRunnable任務(wù)數(shù)

      executorService就是在ClientWorker構(gòu)造方法中初始化的線程池

      LongPollingRunnable.run

      LongPollingRunnable長輪訓(xùn)任務(wù)的實現(xiàn)邏輯,代碼比較長,我們分段來分析。

      第一部分主要有兩個邏輯

      • 對任務(wù)按照批次分類

      • 檢查當(dāng)前批次的緩存和本地文件的數(shù)據(jù)是否一致,如果發(fā)生了變化,則觸發(fā)監(jiān)聽。

      class LongPollingRunnable implements Runnable {
          private final int taskId; //表示當(dāng)前任務(wù)批次id
          public LongPollingRunnable(int taskId) {
              this.taskId = taskId;
          }
          @Override
          public void run() {
              List<CacheData> cacheDatas = new ArrayList<CacheData>();
              List<String> inInitializingCacheList = new ArrayList<String>();
              try {
                  // 遍歷CacheMap,把CacheMap中和當(dāng)前任務(wù)id相同的緩存,保存到cacheDatas
                  // 通過checkLocalConfig方法
                  for (CacheData cacheData : cacheMap.values()) {
                      if (cacheData.getTaskId() == taskId) {
                          cacheDatas.add(cacheData);
                          try {
                              checkLocalConfig(cacheData);
                              if (cacheData.isUseLocalConfigInfo()) { //這里表示數(shù)據(jù)有變化,需要通知監(jiān)聽器
                                  cacheData.checkListenerMd5(); //通知所有針對當(dāng)前配置設(shè)置了監(jiān)聽的監(jiān)聽器
                              }
                          } catch (Exception e) {
                              LOGGER.error("get local config info error", e);
                          }
                      }
                  }
                 //省略部分
              } catch (Throwable e) {
                  // If the rotation training task is abnormal, the next execution time of the task will be punished
                  LOGGER.error("longPolling error : ", e);
                  executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); //出現(xiàn)異常,到下一次taskPenaltyTime后重新執(zhí)行任務(wù)
              }
          }
      }

      checkLocalConfig

      檢查本地配置,這里面有三種情況

      • 如果isUseLocalConfigInfo為false,表示不使用本地配置,但是本地緩存路徑的文件是存在的,于是把isUseLocalConfigInfo設(shè)置為true,并且更新cacheData的內(nèi)容以及文件的更新時間

      • 如果isUseLocalConfigInfo為true,表示使用本地配置文件,但是本地緩存文件不存在,則設(shè)置為false,不通知監(jiān)聽器。

      • 如果isUseLocalConfigInfo為true,并且本地緩存文件也存在,但是緩存的的時間和文件的更新時間不一致,則更新cacheData中的內(nèi)容,并且isUseLocalConfigInfo設(shè)置為true。

      private void checkLocalConfig(CacheData cacheData) {
          final String dataId = cacheData.dataId;
          final String group = cacheData.group;
          final String tenant = cacheData.tenant;
          File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
          // 沒有 -> 有
          if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
              String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
              final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
              cacheData.setUseLocalConfigInfo(true);
              cacheData.setLocalConfigInfoVersion(path.lastModified());
              cacheData.setContent(content);
              String encryptedDataKey = LocalEncryptedDataKeyProcessor
                      .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
              cacheData.setEncryptedDataKey(encryptedDataKey);
              
              LOGGER.warn(
                      "[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
                      agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
              return;
          }
           // 有 -> 沒有。不通知業(yè)務(wù)監(jiān)聽器,從server拿到配置后通知。
          // If use local config info, then it doesn't notify business listener and notify after getting from server.
          if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
              cacheData.setUseLocalConfigInfo(false);
              LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),
                      dataId, group, tenant);
              return;
          }
           // 有變更
          if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path
                  .lastModified()) {
              String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
              final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
              cacheData.setUseLocalConfigInfo(true);
              cacheData.setLocalConfigInfoVersion(path.lastModified());
              cacheData.setContent(content);
              String encryptedDataKey = LocalEncryptedDataKeyProcessor
                      .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
              cacheData.setEncryptedDataKey(encryptedDataKey);
              LOGGER.warn(
                      "[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",
                      agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
          }
      }

      checkListenerMd5

      遍歷用戶自己添加的監(jiān)聽器,如果發(fā)現(xiàn)數(shù)據(jù)的md5值不同,則發(fā)送通知

      void checkListenerMd5() {
          for (ManagerListenerWrap wrap : listeners) {
              if (!md5.equals(wrap.lastCallMd5)) {
                  safeNotifyListener(dataId, group, content, type, md5, wrap);
              }
          }
      }

      檢查服務(wù)端配置

      在LongPollingRunnable.run中,先通過本地配置的讀取和檢查來判斷數(shù)據(jù)是否發(fā)生變化從而實現(xiàn)變化的通知

      接著,當(dāng)前的線程還需要去遠程服務(wù)器上獲得最新的數(shù)據(jù),檢查哪些數(shù)據(jù)發(fā)生了變化

      • 通過checkUpdateDataIds獲取遠程服務(wù)器上數(shù)據(jù)變更的dataid

      • 遍歷這些變化的集合,然后調(diào)用getServerConfig從遠程服務(wù)器獲得對應(yīng)的內(nèi)容

      • 更新本地的cache,設(shè)置為服務(wù)器端返回的內(nèi)容

      • 最后遍歷cacheDatas,找到變化的數(shù)據(jù)進行通知

      // check server config
      //從服務(wù)端獲取發(fā)生變化的數(shù)據(jù)的DataID列表,保存在List<String>集合中
      List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
      if (!CollectionUtils.isEmpty(changedGroupKeys)) {
          LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
      }
      //遍歷發(fā)生了變更的配置項
      for (String groupKey : changedGroupKeys) {
          String[] key = GroupKey.parseKey(groupKey);
          String dataId = key[0];
          String group = key[1];
          String tenant = null;
          if (key.length == 3) {
              tenant = key[2];
          }
          try {
              //逐項根據(jù)這些配置項獲取配置信息
              ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);
              //把配置信息保存到CacheData中
              CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
              cache.setContent(response.getContent());
              cache.setEncryptedDataKey(response.getEncryptedDataKey());
              if (null != response.getConfigType()) {
                  cache.setType(response.getConfigType());
              }
              LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                          agent.getName(), dataId, group, tenant, cache.getMd5(),
                          ContentUtils.truncateContent(response.getContent()), response.getConfigType());
          } catch (NacosException ioe) {
              String message = String
                  .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                          agent.getName(), dataId, group, tenant);
              LOGGER.error(message, ioe);
          }
      }
      //再遍歷CacheData這個集合,找到發(fā)生變化的數(shù)據(jù)進行通知
      for (CacheData cacheData : cacheDatas) {
          if (!cacheData.isInitializing() || inInitializingCacheList
              .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
              cacheData.checkListenerMd5();
              cacheData.setInitializing(false);
          }
      }
      inInitializingCacheList.clear();
       //繼續(xù)傳遞當(dāng)前線程進行輪詢
      executorService.execute(this);

      checkUpdateDataIds

      這個方法主要是向服務(wù)器端發(fā)起檢查請求,判斷自己本地的配置和服務(wù)端的配置是否一致。

      • 首先從cacheDatas集合中找到isUseLocalConfigInfo為false的緩存

      • 把需要檢查的配置項,拼接成一個字符串,調(diào)用checkUpdateConfigStr進行驗證

      /**
       * 從Server獲取值變化了的DataID列表。返回的對象里只有dataId和group是有效的。 保證不返回NULL。
       */
      List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {
          StringBuilder sb = new StringBuilder();
          for (CacheData cacheData : cacheDatas) { //把需要檢查的配置項,拼接成一個字符串
              if (!cacheData.isUseLocalConfigInfo()) { //找到isUseLocalConfigInfo=false的緩存
                  sb.append(cacheData.dataId).append(WORD_SEPARATOR);
                  sb.append(cacheData.group).append(WORD_SEPARATOR);
                  if (StringUtils.isBlank(cacheData.tenant)) {
                      sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
                  } else {
                      sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
                      sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
                  }
                  if (cacheData.isInitializing()) {//
                      // cacheData 首次出現(xiàn)在cacheMap中&首次check更新
                      inInitializingCacheList
                          .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
                  }
              }
          }
          boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
          return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
      }

      checkUpdateConfigStr

      從Server獲取值變化了的DataID列表。返回的對象里只有dataId和group是有效的。 保證不返回NULL。

      List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
          //拼接參數(shù)和header
          Map<String, String> params = new HashMap<String, String>(2);
          params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
          Map<String, String> headers = new HashMap<String, String>(2);
          headers.put("Long-Pulling-Timeout", "" + timeout);
          // told server do not hang me up if new initializing cacheData added in
          if (isInitializingCacheList) {
              headers.put("Long-Pulling-Timeout-No-Hangup", "true");
          }
          if (StringUtils.isBlank(probeUpdateString)) {//判斷可能發(fā)生變更的字符串是否為空,如果是,則直接返回。
              return Collections.emptyList();
          }
          try {
              // In order to prevent the server from handling the delay of the client's long task,
              // increase the client's read timeout to avoid this problem.
              // 設(shè)置readTimeoutMs,也就是本次請求等待響應(yīng)的超時時間,默認是30s
              long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
              //發(fā)起遠程調(diào)用
              HttpRestResult<String> result = agent
                      .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),
                              readTimeoutMs);
              if (result.ok()) { //如果響應(yīng)成功
                  setHealthServer(true);
                  return parseUpdateDataIdResponse(result.getData()); //解析并更新數(shù)據(jù),返回的是確實發(fā)生了數(shù)據(jù)變更的字符串:tenant/group/dataid。
              } else {//如果響應(yīng)失敗
                  setHealthServer(false);
                  LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(),
                          result.getCode());
              }
          } catch (Exception e) {
              setHealthServer(false);
              LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
              throw e;
          }
          return Collections.emptyList();
      }

      客戶端緩存配置長輪訓(xùn)機制總結(jié)

      整體實現(xiàn)的核心點就一下幾個部分

      對本地緩存的配置做任務(wù)拆分,每一個批次是3000條

      針對每3000條創(chuàng)建一個線程去執(zhí)行

      先把每一個批次的緩存和本地磁盤文件中的數(shù)據(jù)進行比較,

      • 如果和本地配置不一致,則表示該緩存發(fā)生了更新,直接通知客戶端監(jiān)聽

      • 如果本地緩存和磁盤數(shù)據(jù)一致,則需要發(fā)起遠程請求檢查配置變化

      先以tenent/groupId/dataId拼接成字符串,發(fā)送到服務(wù)端進行檢查,返回發(fā)生了變更的配置

      客戶端收到變更配置列表,再逐項遍歷發(fā)送到服務(wù)端獲取配置內(nèi)容。

      服務(wù)端配置更新的推送

      分析完客戶端之后,隨著好奇心的驅(qū)使,服務(wù)端是如何處理客戶端的請求的?那么同樣,我們需要思考幾個問題

      • 服務(wù)端是如何實現(xiàn)長輪訓(xùn)機制的

      • 客戶端的超時時間為什么要設(shè)置30s

      客戶端發(fā)起的請求地址是:/v1/cs/configs/listener,于是找到這個接口進行查看,代碼如下。

      //# ConfigController.java
      @PostMapping("/listener")
      @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
      public void listener(HttpServletRequest request, HttpServletResponse response)
              throws ServletException, IOException {
          request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
          String probeModify = request.getParameter("Listening-Configs");
          if (StringUtils.isBlank(probeModify)) {
              throw new IllegalArgumentException("invalid probeModify");
          }
          probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
          Map<String, String> clientMd5Map;
          try {
              //解析客戶端傳遞過來的可能發(fā)生變化的配置項目,轉(zhuǎn)化為Map集合(key=dataId,value=md5)
              clientMd5Map = MD5Util.getClientMd5Map(probeModify);
          } catch (Throwable e) {
              throw new IllegalArgumentException("invalid probeModify");
          }
          // 開始執(zhí)行長輪訓(xùn)。
          inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
      }

      doPollingConfig

      這個方法主要是用來做長輪訓(xùn)和短輪詢的判斷

      • 如果是長輪訓(xùn),直接走addLongPollingClient方法

      • 如果是短輪詢,直接比較服務(wù)端的數(shù)據(jù),如果存在md5不一致,直接把數(shù)據(jù)返回。

      public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
              Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
          // 判斷當(dāng)前請求是否支持長輪訓(xùn)。()
          if (LongPollingService.isSupportLongPolling(request)) {
              longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
              return HttpServletResponse.SC_OK + "";
          }
          //如果是短輪詢,走下面的請求,下面的請求就是把客戶端傳過來的數(shù)據(jù)和服務(wù)端的數(shù)據(jù)逐項進行比較,保存到changeGroups中。
          // Compatible with short polling logic.
          List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
          // Compatible with short polling result.
          String oldResult = MD5Util.compareMd5OldResult(changedGroups);
          String newResult = MD5Util.compareMd5ResultString(changedGroups);
          
          String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
          if (version == null) {
              version = "2.0.0";
          }
          int versionNum = Protocol.getVersionNumber(version);
          // Before 2.0.4 version, return value is put into header.
          if (versionNum < START_LONG_POLLING_VERSION_NUM) {
              response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
              response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
          } else {
              request.setAttribute("content", newResult);
          }
          Loggers.AUTH.info("new content:" + newResult);
          // Disable cache.
          response.setHeader("Pragma", "no-cache");
          response.setDateHeader("Expires", 0);
          response.setHeader("Cache-Control", "no-cache,no-store");
          response.setStatus(HttpServletResponse.SC_OK);
          return HttpServletResponse.SC_OK + "";
      }

      addLongPollingClient

      把客戶端的請求,保存到長輪訓(xùn)的執(zhí)行引擎中。

      public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
              int probeRequestSize) {
          //獲取客戶端長輪訓(xùn)的超時時間
          String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); 
          //不允許斷開的標(biāo)記
          String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
          //應(yīng)用名稱
          String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
          //
          String tag = req.getHeader("Vipserver-Tag");
          //延期時間,默認為500ms
          int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
      
          // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
          // 提前500ms返回一個響應(yīng),避免客戶端出現(xiàn)超時
          long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
          if (isFixedPolling()) {
              timeout = Math.max(10000, getFixedPollingInterval());
              // Do nothing but set fix polling timeout.
          } else {
              long start = System.currentTimeMillis();
              //通過md5判斷客戶端請求過來的key是否有和服務(wù)器端有不一致的,如果有,則保存到changedGroups中。
              List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
              if (changedGroups.size() > 0) { //如果發(fā)現(xiàn)有變更,則直接把請求返回給客戶端
                  generateResponse(req, rsp, changedGroups);
                  LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
                          RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                          changedGroups.size());
                  return;
              } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { //如果noHangUpFlag為true,說明不需要掛起客戶端,所以直接返回。
                  LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                          RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                          changedGroups.size());
                  return;
              }
          }
          //獲取請求端的ip
          String ip = RequestUtil.getRemoteIp(req);
          // Must be called by http thread, or send response.
          //把當(dāng)前請求轉(zhuǎn)化為一個異步請求(意味著此時tomcat線程被釋放,也就是客戶端的請求,需要通過asyncContext來手動觸發(fā)返回,否則一直掛起)
          final AsyncContext asyncContext = req.startAsync();
          // AsyncContext.setTimeout() is incorrect, Control by oneself
          asyncContext.setTimeout(0L); //設(shè)置異步請求超時時間,
          //執(zhí)行長輪訓(xùn)請求
          ConfigExecutor.executeLongPolling(
                  new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
      }

      ClientLongPolling

      接下來我們來分析一下,clientLongPolling到底做了什么操作?;蛘哒f我們可以先猜測一下應(yīng)該會做什么事情

      • 這個任務(wù)要阻塞29.5s才能執(zhí)行,因為立馬執(zhí)行沒有任何意義,畢竟前面已經(jīng)執(zhí)行過一次了

      • 如果在29.5s+之內(nèi),數(shù)據(jù)發(fā)生變化,需要提前通知。需要有一種監(jiān)控機制

      基于這些猜想,我們可以看看它的實現(xiàn)過程

      從代碼粗粒度來看,它的實現(xiàn)似乎和我們的猜想一致,在run方法中,通過scheduler.schedule實現(xiàn)了一個定時任務(wù),它的delay時間正好是前面計算的29.5s。在這個任務(wù)中,會通過MD5Util.compareMd5來進行計算

      那另外一個,當(dāng)數(shù)據(jù)發(fā)生變化以后,肯定不能等到29.5s之后才通知呀,那怎么辦呢?我們發(fā)現(xiàn)有一個allSubs的東西,它似乎和發(fā)布訂閱有關(guān)系。那是不是有可能當(dāng)前的clientLongPolling訂閱了數(shù)據(jù)變化的事件呢?

      class ClientLongPolling implements Runnable {
          @Override
          public void run() {
              //構(gòu)建一個異步任務(wù),延后29.5s執(zhí)行
              asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
                  @Override
                  public void run() { //如果達到29.5s,說明這個期間沒有做任何配置修改,則自動觸發(fā)執(zhí)行
                      try {
                          getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                          // Delete subsciber's relations.
                          allSubs.remove(ClientLongPolling.this); //移除訂閱關(guān)系
                          if (isFixedPolling()) { //如果是固定間隔的長輪訓(xùn)
                              LogUtil.CLIENT_LOG
                                      .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix",
                                              RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                                              "polling", clientMd5Map.size(), probeRequestSize);
                              //比較變更的key
                              List<String> changedGroups = MD5Util
                                      .compareMd5((HttpServletRequest) asyncContext.getRequest(),
                                              (HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
                              if (changedGroups.size() > 0) {//如果大于0,表示有變更,直接響應(yīng)
                                  sendResponse(changedGroups);
                              } else {
                                  sendResponse(null); //否則返回null
                              }
                          } else {
                              LogUtil.CLIENT_LOG
                                      .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",
                                              RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                                              "polling", clientMd5Map.size(), probeRequestSize);
                              sendResponse(null);
                          }
                      } catch (Throwable t) {
                          LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
                      }
                  }
              }, timeoutTime, TimeUnit.MILLISECONDS);
              allSubs.add(this);  //把當(dāng)前線程添加到訂閱事件隊列中
          }
      }

      allSubs

      allSubs是一個隊列,隊列里面放了ClientLongPolling這個對象。這個隊列似乎和配置變更有某種關(guān)聯(lián)關(guān)系。

      那么這里必須要實現(xiàn)的是,當(dāng)用戶在nacos 控制臺修改了配置之后,必須要從這個訂閱關(guān)系中取出關(guān)注的客戶端長連接,然后把變更的結(jié)果返回。于是我們?nèi)タ碙ongPollingService的構(gòu)造方法查找訂閱關(guān)系

      /**
       * 長輪詢訂閱關(guān)系
       */
      final Queue<ClientLongPolling> allSubs;
      allSubs.add(this);

      LongPollingService

      在LongPollingService的構(gòu)造方法中,使用了一個NotifyCenter訂閱了一個事件,其中不難發(fā)現(xiàn),如果這個事件的實例是LocalDataChangeEvent,也就是服務(wù)端數(shù)據(jù)發(fā)生變更的時間,就會執(zhí)行一個DataChangeTask的線程。

      public LongPollingService() {
          allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
          ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
          // Register LocalDataChangeEvent to NotifyCenter.
          NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
          //注冊LocalDataChangeEvent訂閱事件
          NotifyCenter.registerSubscriber(new Subscriber() {
              @Override
              public void onEvent(Event event) {
                  if (isFixedPolling()) {
                      // Ignore.
                  } else {
                      if (event instanceof LocalDataChangeEvent) { //如果觸發(fā)了LocalDataChangeEvent,則執(zhí)行下面的代碼
                          LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
                          ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
                      }
                  }
              }
              @Override
              public Class<? extends Event> subscribeType() {
                  return LocalDataChangeEvent.class;
              }
          });
      }

      DataChangeTask

      數(shù)據(jù)變更事件線程,代碼如下

      class DataChangeTask implements Runnable {
          @Override
          public void run() {
              try {
                  ConfigCacheService.getContentBetaMd5(groupKey); //
                  //遍歷所有訂閱事件表
                  for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                      ClientLongPolling clientSub = iter.next(); //得到ClientLongPolling
                      //判斷當(dāng)前的ClientLongPolling中,請求的key是否包含當(dāng)前修改的groupKey
                      if (clientSub.clientMd5Map.containsKey(groupKey)) {
                          // If published tag is not in the beta list, then it skipped.
                          if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) { //如果是beta方式且betaIps不包含當(dāng)前客戶端ip,直接返回
                              continue;
                          }
                          // If published tag is not in the tag list, then it skipped.
                          if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {//如果配置了tag標(biāo)簽且不包含當(dāng)前客戶端的tag,直接返回
                              continue;
                          }
      					//
                          getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                          iter.remove(); // Delete subscribers' relationships. 移除當(dāng)前客戶端的訂閱關(guān)系
                          LogUtil.CLIENT_LOG
                                  .info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",
                                          RequestUtil
                                                  .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
                                          "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
                          clientSub.sendResponse(Arrays.asList(groupKey)); //響應(yīng)客戶端請求。
                      }
                  }
              } catch (Throwable t) {
                  LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
              }
          }
      }

      原理總結(jié)

      Nacos客戶端配置中心緩存動態(tài)更新的示例分析

      關(guān)于“Nacos客戶端配置中心緩存動態(tài)更新的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學(xué)到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

      向AI問一下細節(jié)

      免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

      AI