溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

nacos中DataSyncer的作用是什么

發(fā)布時間:2021-06-15 13:51:41 來源:億速云 閱讀:173 作者:Leah 欄目:大數(shù)據(jù)

本篇文章為大家展示了nacos中DataSyncer的作用是什么,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。

DataSyncer

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataSyncer.java

@Component
@DependsOn("serverListManager")
public class DataSyncer {

    @Autowired
    private DataStore dataStore;

    @Autowired
    private GlobalConfig partitionConfig;

    @Autowired
    private Serializer serializer;

    @Autowired
    private DistroMapper distroMapper;

    @Autowired
    private ServerListManager serverListManager;

    private Map<String, String> taskMap = new ConcurrentHashMap<>();

    @PostConstruct
    public void init() {
        startTimedSync();
    }

    public void submit(SyncTask task, long delay) {

        // If it's a new task:
        if (task.getRetryCount() == 0) {
            Iterator<String> iterator = task.getKeys().iterator();
            while (iterator.hasNext()) {
                String key = iterator.next();
                if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
                    // associated key already exist:
                    if (Loggers.DISTRO.isDebugEnabled()) {
                        Loggers.DISTRO.debug("sync already in process, key: {}", key);
                    }
                    iterator.remove();
                }
            }
        }

        if (task.getKeys().isEmpty()) {
            // all keys are removed:
            return;
        }

        GlobalExecutor.submitDataSync(new Runnable() {
            @Override
            public void run() {

                try {
                    if (getServers() == null || getServers().isEmpty()) {
                        Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
                        return;
                    }

                    List<String> keys = task.getKeys();

                    if (Loggers.DISTRO.isDebugEnabled()) {
                        Loggers.DISTRO.debug("sync keys: {}", keys);
                    }

                    Map<String, Datum> datumMap = dataStore.batchGet(keys);

                    if (datumMap == null || datumMap.isEmpty()) {
                        // clear all flags of this task:
                        for (String key : task.getKeys()) {
                            taskMap.remove(buildKey(key, task.getTargetServer()));
                        }
                        return;
                    }

                    byte[] data = serializer.serialize(datumMap);

                    long timestamp = System.currentTimeMillis();
                    boolean success = NamingProxy.syncData(data, task.getTargetServer());
                    if (!success) {
                        SyncTask syncTask = new SyncTask();
                        syncTask.setKeys(task.getKeys());
                        syncTask.setRetryCount(task.getRetryCount() + 1);
                        syncTask.setLastExecuteTime(timestamp);
                        syncTask.setTargetServer(task.getTargetServer());
                        retrySync(syncTask);
                    } else {
                        // clear all flags of this task:
                        for (String key : task.getKeys()) {
                            taskMap.remove(buildKey(key, task.getTargetServer()));
                        }
                    }

                } catch (Exception e) {
                    Loggers.DISTRO.error("sync data failed.", e);
                }
            }
        }, delay);
    }

    public void retrySync(SyncTask syncTask) {

        Server server = new Server();
        server.setIp(syncTask.getTargetServer().split(":")[0]);
        server.setServePort(Integer.parseInt(syncTask.getTargetServer().split(":")[1]));
        if (!getServers().contains(server)) {
            // if server is no longer in healthy server list, ignore this task:
            return;
        }

        // TODO may choose other retry policy.
        submit(syncTask, partitionConfig.getSyncRetryDelay());
    }

    public void startTimedSync() {
        GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync());
    }

    //......

    public List<Server> getServers() {
        return serverListManager.getHealthyServers();
    }

    public String buildKey(String key, String targetServer) {
        return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer;
    }
}
  • DataSyncer定義了submit、retrySync、startTimedSync、getServers等方法,其init方法會執(zhí)行startTimedSync

  • submit方法對于retryCount為0的任務(wù)會判斷taskMap是否存在該任務(wù)如果存在則移除其taskKey,之后使用GlobalExecutor.submitDataSync提交一個sync任務(wù),它主要是通過NamingProxy.syncData來同步,成功則移除,不成功則使用retrySync重試

  • retrySync則重新構(gòu)建server調(diào)用submit執(zhí)行;startTimedSync方法則是使用GlobalExecutor.schedulePartitionDataTimedSync提交TimedSync任務(wù);getServers則通過serverListManager.getHealthyServers()返回健康的實例

TimedSync

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataSyncer.java

    public class TimedSync implements Runnable {

        @Override
        public void run() {

            try {

                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("server list is: {}", getServers());
                }

                // send local timestamps to other servers:
                Map<String, String> keyChecksums = new HashMap<>(64);
                for (String key : dataStore.keys()) {
                    if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
                        continue;
                    }

                    keyChecksums.put(key, dataStore.get(key).value.getChecksum());
                }

                if (keyChecksums.isEmpty()) {
                    return;
                }

                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("sync checksums: {}", keyChecksums);
                }

                for (Server member : getServers()) {
                    if (NetUtils.localServer().equals(member.getKey())) {
                        continue;
                    }
                    NamingProxy.syncCheckSums(keyChecksums, member.getKey());
                }
            } catch (Exception e) {
                Loggers.DISTRO.error("timed sync task failed.", e);
            }
        }
    }
  • TimedSync會使用NamingProxy.syncCheckSums同步keyChecksums進(jìn)行校驗

小結(jié)

  • DataSyncer定義了submit、retrySync、startTimedSync、getServers等方法,其init方法會執(zhí)行startTimedSync

  • submit方法對于retryCount為0的任務(wù)會判斷taskMap是否存在該任務(wù)如果存在則移除其taskKey,之后使用GlobalExecutor.submitDataSync提交一個sync任務(wù),它主要是通過NamingProxy.syncData來同步,成功則移除,不成功則使用retrySync重試

  • retrySync則重新構(gòu)建server調(diào)用submit執(zhí)行;startTimedSync方法則是使用GlobalExecutor.schedulePartitionDataTimedSync提交TimedSync任務(wù);getServers則通過serverListManager.getHealthyServers()返回健康的實例

上述內(nèi)容就是nacos中DataSyncer的作用是什么,你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI