您好,登錄后才能下訂單哦!
本篇文章為大家展示了nacos中DataSyncer的作用是什么,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataSyncer.java
@Component @DependsOn("serverListManager") public class DataSyncer { @Autowired private DataStore dataStore; @Autowired private GlobalConfig partitionConfig; @Autowired private Serializer serializer; @Autowired private DistroMapper distroMapper; @Autowired private ServerListManager serverListManager; private Map<String, String> taskMap = new ConcurrentHashMap<>(); @PostConstruct public void init() { startTimedSync(); } public void submit(SyncTask task, long delay) { // If it's a new task: if (task.getRetryCount() == 0) { Iterator<String> iterator = task.getKeys().iterator(); while (iterator.hasNext()) { String key = iterator.next(); if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) { // associated key already exist: if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("sync already in process, key: {}", key); } iterator.remove(); } } } if (task.getKeys().isEmpty()) { // all keys are removed: return; } GlobalExecutor.submitDataSync(new Runnable() { @Override public void run() { try { if (getServers() == null || getServers().isEmpty()) { Loggers.SRV_LOG.warn("try to sync data but server list is empty."); return; } List<String> keys = task.getKeys(); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("sync keys: {}", keys); } Map<String, Datum> datumMap = dataStore.batchGet(keys); if (datumMap == null || datumMap.isEmpty()) { // clear all flags of this task: for (String key : task.getKeys()) { taskMap.remove(buildKey(key, task.getTargetServer())); } return; } byte[] data = serializer.serialize(datumMap); long timestamp = System.currentTimeMillis(); boolean success = NamingProxy.syncData(data, task.getTargetServer()); if (!success) { SyncTask syncTask = new SyncTask(); syncTask.setKeys(task.getKeys()); syncTask.setRetryCount(task.getRetryCount() + 1); syncTask.setLastExecuteTime(timestamp); syncTask.setTargetServer(task.getTargetServer()); retrySync(syncTask); } else { // clear all flags of this task: for (String key : task.getKeys()) { taskMap.remove(buildKey(key, task.getTargetServer())); } } } catch (Exception e) { Loggers.DISTRO.error("sync data failed.", e); } } }, delay); } public void retrySync(SyncTask syncTask) { Server server = new Server(); server.setIp(syncTask.getTargetServer().split(":")[0]); server.setServePort(Integer.parseInt(syncTask.getTargetServer().split(":")[1])); if (!getServers().contains(server)) { // if server is no longer in healthy server list, ignore this task: return; } // TODO may choose other retry policy. submit(syncTask, partitionConfig.getSyncRetryDelay()); } public void startTimedSync() { GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync()); } //...... public List<Server> getServers() { return serverListManager.getHealthyServers(); } public String buildKey(String key, String targetServer) { return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer; } }
DataSyncer定義了submit、retrySync、startTimedSync、getServers等方法,其init方法會執(zhí)行startTimedSync
submit方法對于retryCount為0的任務(wù)會判斷taskMap是否存在該任務(wù)如果存在則移除其taskKey,之后使用GlobalExecutor.submitDataSync提交一個sync任務(wù),它主要是通過NamingProxy.syncData來同步,成功則移除,不成功則使用retrySync重試
retrySync則重新構(gòu)建server調(diào)用submit執(zhí)行;startTimedSync方法則是使用GlobalExecutor.schedulePartitionDataTimedSync提交TimedSync任務(wù);getServers則通過serverListManager.getHealthyServers()返回健康的實例
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataSyncer.java
public class TimedSync implements Runnable { @Override public void run() { try { if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("server list is: {}", getServers()); } // send local timestamps to other servers: Map<String, String> keyChecksums = new HashMap<>(64); for (String key : dataStore.keys()) { if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) { continue; } keyChecksums.put(key, dataStore.get(key).value.getChecksum()); } if (keyChecksums.isEmpty()) { return; } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("sync checksums: {}", keyChecksums); } for (Server member : getServers()) { if (NetUtils.localServer().equals(member.getKey())) { continue; } NamingProxy.syncCheckSums(keyChecksums, member.getKey()); } } catch (Exception e) { Loggers.DISTRO.error("timed sync task failed.", e); } } }
TimedSync會使用NamingProxy.syncCheckSums同步keyChecksums進(jìn)行校驗
DataSyncer定義了submit、retrySync、startTimedSync、getServers等方法,其init方法會執(zhí)行startTimedSync
submit方法對于retryCount為0的任務(wù)會判斷taskMap是否存在該任務(wù)如果存在則移除其taskKey,之后使用GlobalExecutor.submitDataSync提交一個sync任務(wù),它主要是通過NamingProxy.syncData來同步,成功則移除,不成功則使用retrySync重試
retrySync則重新構(gòu)建server調(diào)用submit執(zhí)行;startTimedSync方法則是使用GlobalExecutor.schedulePartitionDataTimedSync提交TimedSync任務(wù);getServers則通過serverListManager.getHealthyServers()返回健康的實例
上述內(nèi)容就是nacos中DataSyncer的作用是什么,你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲備,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。