溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理是什么

發(fā)布時(shí)間:2021-12-07 13:51:37 來源:億速云 閱讀:122 作者:小新 欄目:系統(tǒng)運(yùn)維

這篇文章主要介紹了從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理是什么,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

今天我們一起從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理;整理這篇文章時(shí)候,也參照學(xué)習(xí)了部分大佬的博客,這里致謝;

在開始閱讀文章之前,有些思路我按我的理解先闡述一些,方便大家更快理清思路,不對的地方還請大家批評指正;

  1. Nacos客戶端會在在本地緩存服務(wù)端配置文件,防止服務(wù)器奔潰情況下,導(dǎo)致服務(wù)不可用;

  2. 本地緩存類在代碼中的體現(xiàn)就是我們下面提到的CacheData,我們知道對應(yīng)服務(wù)端一個(gè)配置,肯定可以同時(shí)被多個(gè)客戶端所使用,當(dāng)這個(gè)配置發(fā)生變更,如何去通知到每一個(gè)客戶端?

  3. 客戶端啟動之后,回去注冊監(jiān)視器,監(jiān)視器最終會被保存到CacheData類中CopyOnWriteArrayList

    listeners字段,那么,反過來,當(dāng)執(zhí)行監(jiān)視器回調(diào)方法時(shí),就可以找到所有客戶端
  4. 長輪詢左右主要就是刷新配置,保持服務(wù)端配置和本地緩存配置保持一致;

首先,我們來看看Nacos官網(wǎng)給出的Nacos地圖,我們可以清楚的看到,動態(tài)配置服務(wù)是 Nacos 的三大功能之一;

從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理是什么

這里借用官網(wǎng)的描述,一起來看看Nacos 為我們帶來什么黑科技?

動態(tài)配置服務(wù)可以讓您以中心化、外部化和動態(tài)化的方式管理所有環(huán)境的應(yīng)用配置和服務(wù)配置。動態(tài)配置消除了配置變更時(shí)重新部署應(yīng)用和服務(wù)的需要,讓配置管理變得更加高效和敏捷。配置中心化管理讓實(shí)現(xiàn)無狀態(tài)服務(wù)變得更簡單,讓服務(wù)按需彈性擴(kuò)展變得更容易。

所以,有了Nacos ,可能我們以前上線打包弄錯(cuò)配置文件,改配置需要重啟服務(wù)等一系列問題,都會顯著改觀

一 動態(tài)配置

下面我將來和大家一起來了解下 Nacos 的動態(tài)配置的能力,看看 Nacos 是如何以簡單、優(yōu)雅、高效的方式管理配置,實(shí)現(xiàn)配置的動態(tài)變更的。

我們用一個(gè)簡單的例子來了解下 Nacos 的動態(tài)配置的功能。

1. 環(huán)境準(zhǔn)備

首先,我們需要搭建一個(gè)Nacos 服務(wù)端,由于官網(wǎng)的quick-start已經(jīng)對此做了詳細(xì)的解讀,我們這里就不在贅述

從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理是什么
https://nacos.io/zh-cn/docs/quick-start.html

安裝完成之后啟動,我們就可以訪問 Nacos 的控制臺了,如下圖所示:

從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理是什么

Nacos控制臺做了簡單的權(quán)限控制,默認(rèn)的賬號和密碼都是 nacos。

登錄進(jìn)去之后,是這樣的:

從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理是什么

2.新建配置

接下來我們在控制臺上創(chuàng)建一個(gè)簡單的配置項(xiàng),如下圖所示:

從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理是什么

3.導(dǎo)入配置

Nacos支持導(dǎo)入配置,可以直接將配置文件壓縮包導(dǎo)入,這里我們以人人開源的微服務(wù)項(xiàng)目為例

從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理是什么

從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理是什么
從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理是什么

4.配置客戶端

下面我以自己搭建的子服務(wù)為例,一起來看看Nacos配置中心的使用

首先我們需要配置一下,大家只需關(guān)注config節(jié)點(diǎn)配置就可以,discovery節(jié)點(diǎn)可以忽略

cloud:   nacos:     discovery:       metadata:         management:           context-path: ${server.servlet.context-path}/actuator       server-addr: ${nacos-host:nacos-host}:${nacos-port:8848}       #nacos的命名空間ID,默認(rèn)是public       namespace: ${nacos-namespace:}       service: ets-web     config:       server-addr: ${spring.cloud.nacos.discovery.server-addr}       namespace: ${spring.cloud.nacos.discovery.namespace}       group: RENREN_CLOUD_GROUP       file-extension: yaml       #指定共享配置,且支持動態(tài)刷新       extension-configs:         - data-id: datasource.yaml           group: ${spring.cloud.nacos.config.group}           refresh: true         - data-id: common.yaml           group: ${spring.cloud.nacos.config.group}           refresh: true

其實(shí)extension-configs節(jié)點(diǎn)的配置信息對應(yīng)的是下面的類

從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理是什么

接下來我們啟動服務(wù),來看看控制臺日志

 從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理是什么

5. 修改配置信息

接下來我們在 Nacos 的控制臺上將我們的配置信息改為如下圖所示:

 從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理是什么

修改完配置,點(diǎn)擊 “發(fā)布” 按鈕后,客戶端將會收到最新的數(shù)據(jù),如下圖所示:

從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理是什么

至此一個(gè)簡單的動態(tài)配置管理功能已經(jīng)講完了,刪除配置和更新配置操作類似,這里不再贅述。

6.小結(jié)

通過上面的小案例,我們大概了解了Nacos動態(tài)配置的服務(wù)的使用方法,Nacos服務(wù)端將配置信息保存到其配置文件所配置的數(shù)據(jù)庫中,客戶端連接到服務(wù)端之后,根據(jù)  dataID,Group可以獲取到具體的配置信息,當(dāng)服務(wù)端的配置發(fā)生變更時(shí),客戶端會收到通知。當(dāng)客戶端拿到變更后的最新配置信息后,就可以做自己的處理了,這非常有用,所有需要使用配置的場景都可以通過  Nacos 來進(jìn)行管理。

二 配置中心原理(推還是拉)

現(xiàn)在我們了解了 Nacos 的動態(tài)配置服務(wù)的功能了,但是有一個(gè)問題我們需要弄明白,那就是 Nacos 客戶端是怎么實(shí)時(shí)獲取到 Nacos  服務(wù)端的最新數(shù)據(jù)的。

其實(shí)客戶端和服務(wù)端之間的數(shù)據(jù)交互,無外乎兩種情況:

  • 服務(wù)端推數(shù)據(jù)給客戶端

  • 客戶端從服務(wù)端拉數(shù)據(jù)

那到底是推還是拉呢,從 Nacos 客戶端通過 Listener  來接收最新數(shù)據(jù)的這個(gè)做法來看,感覺像是服務(wù)端推的數(shù)據(jù),但是不能想當(dāng)然,要想知道答案,最快最準(zhǔn)確的方法就是從源碼中去尋找。

官方示例代碼

try {     // 傳遞配置     String serverAddr = "{serverAddr}";     String dataId = "{dataId}";     String group = "{group}";     Properties properties = new Properties();     properties.put("serverAddr", serverAddr);      // 新建 configService     ConfigService configService = NacosFactory.createConfigService(properties);     String content = configService.getConfig(dataId, group, 5000);     System.out.println(content);      // 注冊監(jiān)聽器     configService.addListener(dataId, group, new Listener() {     @Override     public void receiveConfigInfo(String configInfo) {         System.out.println("recieve1:" + configInfo);     }     @Override     public Executor getExecutor() {         return null;     } }); } catch (NacosException e) {     // TODO      -generated catch block     e.printStackTrace(); }

1.實(shí)例化 ConfigService

當(dāng)我們引包結(jié)束以后,會發(fā)現(xiàn)下面三個(gè)關(guān)于Nacos的包

從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理是什么

從我的理解來說,api包會調(diào)用client包的能力來和Nacos服務(wù)端進(jìn)行交互.那再交互時(shí)候,主要就會用到我們接下來分析的實(shí)現(xiàn)了ConfigService接口的NacosConfigService  類

現(xiàn)在我們來看下 NacosConfigService 的構(gòu)造方法,看看 ConfigService 是怎么實(shí)例化的,如下圖所示:

  1. public class NacosConfigService implements ConfigService { 

  2.      

  3.     private static final Logger LOGGER = LogUtils.logger(NacosConfigService.class); 

  4.      

  5.     private static final long POST_TIMEOUT = 3000L; 

  6.      

  7.     /** 

  8.      * http agent. 

  9.      */ 

  10.     private final HttpAgent agent; 

  11.      

  12.     /** 

  13.      * long polling.  這里是長輪詢 

  14.      */ 

  15.     private final ClientWorker worker; 

  16.      

  17.     private String namespace; 

  18.      

  19.     private final String encode; 

  20.     //省略其他代碼 


//構(gòu)造方法 ic NacosConfigService(Properties properties) throws NacosException {     ValidatorUtils.checkInitParam(properties);     String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);     if (StringUtils.isBlank(encodeTmp)) {         this.encode = Constants.ENCODE;     } else {         this.encode = encodeTmp.trim();     }     initNamespace(properties);     //對象1     this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));     this.agent.start();     //對象2     this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); }

實(shí)例化時(shí)主要是初始化了兩個(gè)對象,他們分別是:

  • HttpAgent

  • ClientWorker

HttpAgent

其中 agent 是通過裝飾器模式實(shí)現(xiàn)的,ServerHttpAgent 是實(shí)際工作的類,MetricsHttpAgent 在內(nèi)部也是調(diào)用了  ServerHttpAgent 的方法,另外加上了一些統(tǒng)計(jì)操作,所以我們只需要關(guān)心 ServerHttpAgent 的功能就可以了。

不熟悉的同學(xué),可以看菜鳥教程對裝飾器模式的解讀

agent 實(shí)際是在 ClientWorker 中發(fā)揮能力的,而 ClientWorker 也是真正的打工人,下面我們來看下 ClientWorker  類。

ClientWorker

以下是 ClientWorker 的構(gòu)造方法,如下圖所示:

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,         final Properties properties) {     this.agent = agent;     this.configFilterChainManager = configFilterChainManager;          // Initialize the timeout parameter          init(properties);     //創(chuàng)建了一個(gè)定時(shí)任務(wù)的線程池     this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {         @Override         public Thread newThread(Runnable r) {             Thread t = new Thread(r);             t.setName("com.alibaba.nacos.client.Worker." + agent.getName());             t.setDaemon(true);             return t;         }     });     //創(chuàng)建了一個(gè)保持長輪詢的線程池     this.executorService = Executors             .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {                 @Override                 public Thread newThread(Runnable r) {                     Thread t = new Thread(r);                     t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());                     t.setDaemon(true);                     return t;                 }             });          //創(chuàng)建了一個(gè)延遲任務(wù)線程池來每隔10ms來檢查配置信息的線程池     this.executor.scheduleWithFixedDelay(new Runnable() {         @Override         public void run() {             try {                 checkConfigInfo();             } catch (Throwable e) {                 LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);             }         }     }, 1L, 10L, TimeUnit.MILLISECONDS); }

可以看到 ClientWorker 除了將 HttpAgent 維持在自己內(nèi)部,還創(chuàng)建了兩個(gè)線程池:

final ScheduledExecutorService executor;      final ScheduledExecutorService executorService;
  • 第一個(gè)線程池負(fù)責(zé)與配置中心進(jìn)行數(shù)據(jù)的交互,并且啟動后延遲1ms,之后每隔10ms對配置信息進(jìn)行定時(shí)檢查

  • 第二個(gè)線程池則是負(fù)責(zé)保持一個(gè)長輪詢鏈接

接下來讓我們來看下 executor 每 10ms 執(zhí)行的方法到底做了什么工作,如下圖所示:

  1. /** 

  2.  * groupKey -> cacheData. 

  3.  */ 

  4. private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>( 

  5.         new HashMap<String, CacheData>()); 


/**   * Check config info. 檢查配置信息   */  public void checkConfigInfo() {      // 分任務(wù)(解決大數(shù)據(jù)量的傳輸問題)      int listenerSize = cacheMap.get().size();      // 向上取整為批數(shù),分批次進(jìn)行檢查      //ParamUtil.getPerTaskConfigSize() =3000      int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());      if (longingTaskCount > currentLongingTaskCount) {          for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {              // 要判斷任務(wù)是否在執(zhí)行 這塊需要好好想想。 任務(wù)列表現(xiàn)在是無序的。變化過程可能有問題              executorService.execute(new LongPollingRunnable(i));          }          currentLongingTaskCount = longingTaskCount;      }  }

這里主要是先去拿緩存中 Map

現(xiàn)在我們來看看 LongPollingRunnable 做了什么,主要分為兩部分,

  • 第一部分是檢查本地的配置信息,

  • 第二部分是獲取服務(wù)端的配置信息然后更新到本地。

1.本地檢查

首先取出與該 taskId 相關(guān)的 CacheData,然后對 CacheData 進(jìn)行檢查,包括本地配置檢查和緩存數(shù)據(jù)的 md5  檢查,本地檢查主要是做一個(gè)故障容錯(cuò),當(dāng)服務(wù)端掛掉后,Nacos 客戶端可以從本地的文件系統(tǒng)中獲取相關(guān)的配置信息,如下圖所示:

  1. public void run() { 

  2.              

  3.             List<CacheData> cacheDatas = new ArrayList<CacheData>(); 

  4.             List<String> inInitializingCacheList = new ArrayList<String>(); 

  5.             try { 

  6.                 // 

  7.                 for (CacheData cacheData : cacheMap.get().values()) { 

  8.                     if (cacheData.getTaskId() == taskId) { 

  9.                         cacheDatas.add(cacheData); 

  10.                         try { 

  11.                             //執(zhí)行檢查本地配置 

  12.                             checkLocalConfig(cacheData); 

  13.                             if (cacheData.isUseLocalConfigInfo()) { 

  14.                                 //緩存數(shù)據(jù)的md5的檢查 

  15.                                 cacheData.checkListenerMd5(); 

  16.                             } 

  17.                         } catch (Exception e) { 

  18.                             LOGGER.error("get local config info error", e); 

  19.                         } 

  20.                     } 

  21.                 } 

  22.                

  23.         } 


//檢查本地配置      private void checkLocalConfig(CacheData cacheData) {         final String dataId = cacheData.dataId;         final String group = cacheData.group;         final String tenant = cacheData.tenant;     //本地緩存文件         File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);         //不使用本地配置,但是持久化文件存在,需要讀取文件加載至內(nèi)存         if (!cacheData.isUseLocalConfigInfo() && path.exists()) {             String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);             final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);             cacheData.setUseLocalConfigInfo(true);             cacheData.setLocalConfigInfoVersion(path.lastModified());             cacheData.setContent(content);                          LOGGER.warn(                     "[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",                     agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));             return;         }                 // 有 -> 沒有。不通知業(yè)務(wù)監(jiān)聽器,從server拿到配置后通知。         //使用本地配置,但是持久化文件不存在          if (cacheData.isUseLocalConfigInfo() && !path.exists()) {             cacheData.setUseLocalConfigInfo(false);             LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),                     dataId, group, tenant);             return;         }                  // 有變更         //使用本地配置,持久化文件存在,緩存跟文件最后修改時(shí)間不一致         if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path                 .lastModified()) {             String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);             final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);             cacheData.setUseLocalConfigInfo(true);             cacheData.setLocalConfigInfoVersion(path.lastModified());             cacheData.setContent(content);             LOGGER.warn(                     "[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",                     agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));         }     }

本地檢查主要是通過是否使用本地配置,繼而尋找持久化緩存文件,再通過判斷文件的最后修改事件與本地緩存的版本是否一致來判斷是否由變更

通過跟蹤 checkLocalConfig 方法,可以看到 Nacos 將緩存配置信息保存在了

~/nacos/config/fixed-{address}_8848_nacos/snapshot/DEFAULT_GROUP/{dataId}

這個(gè)文件中,我們看下這個(gè)文件中保存的內(nèi)容,如下圖所示:

從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理是什么

2.服務(wù)端檢查

然后通過 checkUpdateDataIds() 方法從服務(wù)端獲取值變化的 dataId 列表,

通過 getServerConfig 方法,根據(jù) dataId 到服務(wù)端獲取最新的配置信息,接著將最新的配置信息保存到 CacheData 中。

最后調(diào)用 CacheData 的 checkListenerMd5 方法,可以看到該方法在第一部分也被調(diào)用過,我們需要重點(diǎn)關(guān)注一下。

// 檢查服務(wù)器配置   List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);   if (!CollectionUtils.isEmpty(changedGroupKeys)) {       LOGGER.info("get changedGroupKeys:" + changedGroupKeys);   }      for (String groupKey : changedGroupKeys) {       String[] key = GroupKey.parseKey(groupKey);       String dataId = key[0];       String group = key[1];       String tenant = null;       if (key.length == 3) {           tenant = key[2];       }       try {           //從服務(wù)器端獲取相關(guān)id的最新配置           String[] ct = getServerConfig(dataId, group, tenant, 3000L);           CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));           cache.setContent(ct[0]);           if (null != ct[1]) {               cache.setType(ct[1]);           }           LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",                   agent.getName(), dataId, group, tenant, cache.getMd5(),                   ContentUtils.truncateContent(ct[0]), ct[1]);       } catch (NacosException ioe) {           String message = String                   .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",                           agent.getName(), dataId, group, tenant);           LOGGER.error(message, ioe);       }   }   for (CacheData cacheData : cacheDatas) {       if (!cacheData.isInitializing() || inInitializingCacheList               .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {           //校驗(yàn)MD5值           cacheData.checkListenerMd5();           cacheData.setInitializing(false);       }   }   inInitializingCacheList.clear();      executorService.execute(this);    catch (Throwable e) {      // If the rotation training task is abnormal, the next execution time of the task will be punished   LOGGER.error("longPolling error : ", e);   executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);

這里大家也發(fā)現(xiàn),當(dāng)客戶端從服務(wù)器拉去配置文件之后,會將配置文件在本地進(jìn)行緩存,所以,一般會優(yōu)先使用本地配置,如果本地文件不存在或者內(nèi)容為空,則再通過  HTTP GET 方法從遠(yuǎn)端拉取配置,并保存到本地緩存中

private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {       group = null2defaultGroup(group);       ParamUtils.checkKeyParam(dataId, group);       ConfigResponse cr = new ConfigResponse();              cr.setDataId(dataId);       cr.setTenant(tenant);       cr.setGroup(group);              // 優(yōu)先使用本地配置       String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);       if (content != null) {           LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),                   dataId, group, tenant, ContentUtils.truncateContent(content));           cr.setContent(content);           configFilterChainManager.doFilter(null, cr);           content = cr.getContent();           return content;       }              try {           String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs);           cr.setContent(ct[0]);                      configFilterChainManager.doFilter(null, cr);           content = cr.getContent();                      return content;       } catch (NacosException ioe) {           if (NacosException.NO_RIGHT == ioe.getErrCode()) {               throw ioe;           }           LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",                   agent.getName(), dataId, group, tenant, ioe.toString());       }              LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),               dataId, group, tenant, ContentUtils.truncateContent(content));       content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);       cr.setContent(content);       configFilterChainManager.doFilter(null, cr);       content = cr.getContent();       return content;   }

2.添加 Listener

好了現(xiàn)在我們可以為 ConfigService 來添加一個(gè) Listener 了,最終是調(diào)用了 ClientWorker 的  addTenantListeners 方法,如下圖所示:

/**  * Add listeners for tenant.  *  * @param dataId    dataId of data  * @param group     group of data  * @param listeners listeners  * @throws NacosException nacos exception  */ public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)         throws NacosException {     //設(shè)置默認(rèn)組     group = null2defaultGroup(group);     String tenant = agent.getTenant();     CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);     for (Listener listener : listeners) {         cache.addListener(listener);     } }

該方法分為兩個(gè)部分,首先根據(jù) dataId,group 和tenant獲取一個(gè) CacheData 對象,然后將當(dāng)前要添加的 listener 對象添加到  CacheData 中去。

接下來,我們要重點(diǎn)關(guān)注下 CacheData 類了。

3.本地緩存CacheData

首先讓我們來看一下 CacheData 中的成員變量,如下圖所示:

private final String name;  private final ConfigFilterChainManager configFilterChainManager;  public final String dataId;  public final String group;  public final String tenant; //監(jiān)聽器  private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;  private volatile String md5;  /**  * whether use local config.  */ private volatile boolean isUseLocalConfig = false;  /**  * last modify time.  */ private volatile long localConfigLastModified;  private volatile String content;  private int taskId;  private volatile boolean isInitializing = true;  private String type;

我們可以看到,成員變量包括tenant ,dataId,group,content,taskId等,還有兩個(gè)值得我們關(guān)注的:

  • listeners

  • md5

listeners 是該 CacheData 所關(guān)聯(lián)的所有 listener,不過不是保存的原始的 Listener對象,而是包裝后的  ManagerListenerWrap 對象,該對象除了持有 Listener 對象,還持有了一個(gè) lastCallMd5  和lastContent屬性。

private static class ManagerListenerWrap {            final Listener listener;            //關(guān)注      String lastCallMd5 = CacheData.getMd5String(null);            String lastContent = null;            ManagerListenerWrap(Listener listener) {          this.listener = listener;      }            ManagerListenerWrap(Listener listener, String md5) {          this.listener = listener;          this.lastCallMd5 = md5;      }            ManagerListenerWrap(Listener listener, String md5, String lastContent) {          this.listener = listener;          this.lastCallMd5 = md5;          this.lastContent = lastContent;      }        }

另外一個(gè)屬性 md5 就是根據(jù)當(dāng)前對象的 content 計(jì)算出來的 md5 值。

4.觸發(fā)監(jiān)聽器回調(diào)

現(xiàn)在我們對 ConfigService 有了大致的了解了,現(xiàn)在剩下最后一個(gè)重要的問題還沒有答案,那就是 ConfigService 的 Listener  是在什么時(shí)候觸發(fā)回調(diào)方法 receiveConfigInfo 的。

現(xiàn)在讓我們回過頭來想一下,在 ClientWorker 中的定時(shí)任務(wù)中,啟動了一個(gè)長輪詢的任務(wù):LongPollingRunnable,該任務(wù)多次執(zhí)行了  cacheData.checkListenerMd5() 方法,那現(xiàn)在就讓我們來看下這個(gè)方法到底做了些什么,如下圖所示:

void checkListenerMd5() {     for (ManagerListenerWrap wrap : listeners) {         if (!md5.equals(wrap.lastCallMd5)) {             safeNotifyListener(dataId, group, content, type, md5, wrap);         }     } }

到這里應(yīng)該就比較清晰了,該方法會檢查 CacheData 當(dāng)前的 md5 與 CacheData 持有的所有 Listener 中保存的 md5  的值是否一致,如果不一致,就執(zhí)行一個(gè)安全的監(jiān)聽器的通知方法:safeNotifyListener,通知什么呢?我們可以大膽的猜一下,應(yīng)該是通知 Listener  的使用者,該 Listener 所關(guān)注的配置信息已經(jīng)發(fā)生改變了?,F(xiàn)在讓我們來看一下 safeNotifyListener 方法,如下圖所示:

private void safeNotifyListener(final String dataId, final String group, final String content, final String type,           final String md5, final ManagerListenerWrap listenerWrap) {       final Listener listener = listenerWrap.listener;              Runnable job = new Runnable() {           @Override           public void run() {               ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();               ClassLoader appClassLoader = listener.getClass().getClassLoader();               try {                   if (listener instanceof AbstractSharedListener) {                       AbstractSharedListener adapter = (AbstractSharedListener) listener;                       adapter.fillContext(dataId, group);                       LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);                   }                   // 執(zhí)行回調(diào)之前先將線程classloader設(shè)置為具體webapp的classloader,以免回調(diào)方法中調(diào)用spi接口是出現(xiàn)異常或錯(cuò)用(多應(yīng)用部署才會有該問題)。                   Thread.currentThread().setContextClassLoader(appClassLoader);                                      ConfigResponse cr = new ConfigResponse();                   cr.setDataId(dataId);                   cr.setGroup(group);                   cr.setContent(content);                                      //重點(diǎn)關(guān)注,在這里調(diào)用                   //重點(diǎn)關(guān)注,在這里調(diào)用                   //重點(diǎn)關(guān)注,在這里調(diào)用                                      configFilterChainManager.doFilter(null, cr);                   String contentTmp = cr.getContent();                   listener.receiveConfigInfo(contentTmp);                                                                                               // compare lastContent and content                   if (listener instanceof AbstractConfigChangeListener) {                       Map data = ConfigChangeHandler.getInstance()                               .parseChangeData(listenerWrap.lastContent, content, type);                       ConfigChangeEvent event = new ConfigChangeEvent(data);                       ((AbstractConfigChangeListener) listener).receiveConfigChange(event);                       listenerWrap.lastContent = content;                   }                                      listenerWrap.lastCallMd5 = md5;                   LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,                           listener);               } catch (NacosException ex) {                   LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",                           name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());               } catch (Throwable t) {                   LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,                           group, md5, listener, t.getCause());               } finally {                   Thread.currentThread().setContextClassLoader(myClassLoader);               }           }       };              final long startNotify = System.currentTimeMillis();       try {           if (null != listener.getExecutor()) {               listener.getExecutor().execute(job);           } else {               job.run();           }       } catch (Throwable t) {           LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,                   group, md5, listener, t.getCause());       }       final long finishNotify = System.currentTimeMillis();       LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",               name, (finishNotify - startNotify), dataId, group, md5, listener);   }

可以看到在 safeNotifyListener 方法中,重點(diǎn)關(guān)注下紅框中的三行代碼:獲取最新的配置信息,調(diào)用 Listener  的回調(diào)方法,將最新的配置信息作為參數(shù)傳入,這樣 Listener 的使用者就能接收到變更后的配置信息了,最后更新 ListenerWrap 的 md5  值。和我們猜測的一樣, Listener 的回調(diào)方法就是在該方法中觸發(fā)的。

5.Md5何時(shí)變更

那 CacheData 的 md5 值是何時(shí)發(fā)生改變的呢?我們可以回想一下,在上面的 LongPollingRunnable  所執(zhí)行的任務(wù)中,在獲取服務(wù)端發(fā)生變更的配置信息時(shí),將最新的 content 數(shù)據(jù)寫入了 CacheData 中,我們可以看下該方法如下:

public void setContent(String content) {      this.content = content;      this.md5 = getMd5String(this.content);  }

可以看到是在長輪詢的任務(wù)中,當(dāng)服務(wù)端配置信息發(fā)生變更時(shí),客戶端將最新的數(shù)據(jù)獲取下來之后,保存在了 CacheData 中,同時(shí)更新了該 CacheData  的 md5 值,所以當(dāng)下次執(zhí)行 checkListenerMd5 方法時(shí),就會發(fā)現(xiàn)當(dāng)前 listener 所持有的 md5 值已經(jīng)和 CacheData 的  md5 值不一樣了,也就意味著服務(wù)端的配置信息發(fā)生改變了,這時(shí)就需要將最新的數(shù)據(jù)通知給 Listener 的持有者。

至此配置中心的完整流程已經(jīng)分析完畢了,可以發(fā)現(xiàn),Nacos  并不是通過推的方式將服務(wù)端最新的配置信息發(fā)送給客戶端的,而是客戶端維護(hù)了一個(gè)長輪詢的任務(wù),定時(shí)去拉取發(fā)生變更的配置信息,然后將最新的數(shù)據(jù)推送給 Listener  的持有者。

6.為什么要拉?

客戶端拉取服務(wù)端的數(shù)據(jù)與服務(wù)端推送數(shù)據(jù)給客戶端相比,優(yōu)勢在哪呢,為什么 Nacos  不設(shè)計(jì)成主動推送數(shù)據(jù),而是要客戶端去拉取呢?如果用推的方式,服務(wù)端需要維持與客戶端的長連接,這樣的話需要耗費(fèi)大量的資源,并且還需要考慮連接的有效性,例如需要通過心跳來維持兩者之間的連接。而用拉取的方式,客戶端只需要通過一個(gè)無狀態(tài)的  http 請求即可獲取到服務(wù)端的數(shù)據(jù)。

三 總結(jié)

 從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理是什么

現(xiàn)在,我們來簡單復(fù)盤一下Nacos客戶端視角下的配置中心實(shí)現(xiàn)原理

首先我們假設(shè)Nacos服務(wù)端一切正常,Nacos客戶端啟動以后

第一步是根據(jù)我們配置的服務(wù)端信息,新建 ConfigService 實(shí)例,它的實(shí)現(xiàn)就是我們文中提到的NacosConfigService;

第二步可以通過相應(yīng)的接口獲取配置和注冊配置監(jiān)聽器,

考慮到服務(wù)端故障的問題,客戶端將最新數(shù)據(jù)獲取后會保存在本地的  緩存文件中,以后會優(yōu)先從文件中獲取配置信息的值,如果獲取不到,會直接從服務(wù)器拉去,并保存到緩存中;

其實(shí)真正干活的就是ClientWorker類;客戶端是通過一個(gè)定時(shí)的長輪詢來檢查自己監(jiān)聽的配置項(xiàng)的數(shù)據(jù)的,一旦服務(wù)端的數(shù)據(jù)發(fā)生變化時(shí),會從服務(wù)端獲取到dataID的列表,

客戶端根據(jù)dataID列表從服務(wù)端獲取到最新的數(shù)據(jù),并將最新的數(shù)據(jù)保存在一個(gè) CacheData  對象中,在輪詢過程中,如果決定使用本地配置,就會比較當(dāng)前CacheData 的MD5值是否和所有監(jiān)聽者所持有的MD5值相等,如果不相等,,此時(shí)就會對該  CacheData 所綁定的 Listener 觸發(fā) receiveConfigInfo 回調(diào),來通知使用者此配置信息已經(jīng)變更;

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“從Nacos客戶端視角來看看配置中心實(shí)現(xiàn)原理是什么”這篇文章對大家有幫助,同時(shí)也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI