溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

nacos client中PushReceiver的原理和應(yīng)用

發(fā)布時(shí)間:2021-06-29 14:02:03 來源:億速云 閱讀:664 作者:chen 欄目:大數(shù)據(jù)

本篇內(nèi)容主要講解“nacos client中PushReceiver的原理和應(yīng)用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“nacos client中PushReceiver的原理和應(yīng)用”吧!

本文主要研究一下nacos client的PushReceiver

PushReceiver

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/PushReceiver.java

public class PushReceiver implements Runnable {

    private ScheduledExecutorService executorService;

    private static final int UDP_MSS = 64 * 1024;

    private DatagramSocket udpSocket;

    private HostReactor hostReactor;

    public PushReceiver(HostReactor hostReactor) {
        try {
            this.hostReactor = hostReactor;
            udpSocket = new DatagramSocket();

            executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.naming.push.receiver");
                    return thread;
                }
            });

            executorService.execute(this);
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] init udp socket failed", e);
        }
    }

    @Override
    public void run() {
        while (true) {
            try {
                // byte[] is initialized with 0 full filled by default
                byte[] buffer = new byte[UDP_MSS];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

                udpSocket.receive(packet);

                String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());

                PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
                String ack;
                if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                    hostReactor.processServiceJSON(pushPacket.data);

                    // send ack to server
                    ack = "{\"type\": \"push-ack\""
                        + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\"\"}";
                } else if ("dump".equals(pushPacket.type)) {
                    // dump data to server
                    ack = "{\"type\": \"dump-ack\""
                        + ", \"lastRefTime\": \"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\""
                        + StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
                        + "\"}";
                } else {
                    // do nothing send ack only
                    ack = "{\"type\": \"unknown-ack\""
                        + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\"\"}";
                }

                udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
                    ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
            } catch (Exception e) {
                NAMING_LOGGER.error("[NA] error while receiving push data", e);
            }
        }
    }

    public static class PushPacket {
        public String type;
        public long lastRefTime;
        public String data;
    }

    public int getUDPPort() {
        return udpSocket.getLocalPort();
    }
}
  • PushReceiver實(shí)現(xiàn)了Runnable接口,其構(gòu)造器會(huì)創(chuàng)建udpSocket以及ScheduledThreadPoolExecutor,然后往ScheduledThreadPoolExecutor注冊(cè)自己

  • 其run方法使用while true循環(huán)來執(zhí)行udpSocket.receive(packet),之后將接收到的數(shù)據(jù)解析為PushPacket,然后根據(jù)不同pushPacket.type做不同處理

  • 當(dāng)pushPacket.type為dom或者service的時(shí)候會(huì)調(diào)用hostReactor.processServiceJSON(pushPacket.data);當(dāng)pushPacket.type為dump的時(shí)候會(huì)將hostReactor.getServiceInfoMap()序列化到ack中,最后將ack返回去

HostReactor

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java

public class HostReactor {

    private static final long DEFAULT_DELAY = 1000L;

    private static final long UPDATE_HOLD_INTERVAL = 5000L;

    private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();

    private Map<String, ServiceInfo> serviceInfoMap;

    private Map<String, Object> updatingMap;

    private PushReceiver pushReceiver;

    private EventDispatcher eventDispatcher;

    private NamingProxy serverProxy;

    private FailoverReactor failoverReactor;

    private String cacheDir;

    private ScheduledExecutorService executor;

    public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir) {
        this(eventDispatcher, serverProxy, cacheDir, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
    }

    public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,
                       boolean loadCacheAtStart, int pollingThreadCount) {

        executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.client.naming.updater");
                return thread;
            }
        });

        this.eventDispatcher = eventDispatcher;
        this.serverProxy = serverProxy;
        this.cacheDir = cacheDir;
        if (loadCacheAtStart) {
            this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
        } else {
            this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
        }

        this.updatingMap = new ConcurrentHashMap<String, Object>();
        this.failoverReactor = new FailoverReactor(this, cacheDir);
        this.pushReceiver = new PushReceiver(this);
    }    

    //......

    public ServiceInfo processServiceJSON(String json) {
        ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class);
        ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
        if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {
            //empty or error push, just ignore
            return oldService;
        }

        boolean changed = false;

        if (oldService != null) {

            if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
                NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime()
                    + ", new-t: " + serviceInfo.getLastRefTime());
            }

            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);

            Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
            for (Instance host : oldService.getHosts()) {
                oldHostMap.put(host.toInetAddr(), host);
            }

            Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
            for (Instance host : serviceInfo.getHosts()) {
                newHostMap.put(host.toInetAddr(), host);
            }

            Set<Instance> modHosts = new HashSet<Instance>();
            Set<Instance> newHosts = new HashSet<Instance>();
            Set<Instance> remvHosts = new HashSet<Instance>();

            List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
                newHostMap.entrySet());
            for (Map.Entry<String, Instance> entry : newServiceHosts) {
                Instance host = entry.getValue();
                String key = entry.getKey();
                if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(),
                    oldHostMap.get(key).toString())) {
                    modHosts.add(host);
                    continue;
                }

                if (!oldHostMap.containsKey(key)) {
                    newHosts.add(host);
                }
            }

            for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
                Instance host = entry.getValue();
                String key = entry.getKey();
                if (newHostMap.containsKey(key)) {
                    continue;
                }

                if (!newHostMap.containsKey(key)) {
                    remvHosts.add(host);
                }

            }

            if (newHosts.size() > 0) {
                changed = true;
                NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: "
                    + serviceInfo.getKey() + " -> " + JSON.toJSONString(newHosts));
            }

            if (remvHosts.size() > 0) {
                changed = true;
                NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: "
                    + serviceInfo.getKey() + " -> " + JSON.toJSONString(remvHosts));
            }

            if (modHosts.size() > 0) {
                changed = true;
                NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: "
                    + serviceInfo.getKey() + " -> " + JSON.toJSONString(modHosts));
            }

            serviceInfo.setJsonFromServer(json);

            if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
                eventDispatcher.serviceChanged(serviceInfo);
                DiskCache.write(serviceInfo, cacheDir);
            }

        } else {
            changed = true;
            NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JSON
                .toJSONString(serviceInfo.getHosts()));
            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
            eventDispatcher.serviceChanged(serviceInfo);
            serviceInfo.setJsonFromServer(json);
            DiskCache.write(serviceInfo, cacheDir);
        }

        MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());

        if (changed) {
            NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() +
                " -> " + JSON.toJSONString(serviceInfo.getHosts()));
        }

        return serviceInfo;
    }

    public Map<String, ServiceInfo> getServiceInfoMap() {
        return serviceInfoMap;
    }

    //......
}
  • processServiceJSON方法會(huì)對(duì)比將接收到serviceInfo與本地對(duì)比,然后判斷是否變更,并在需要的時(shí)候更新本地的serviceInfo并回調(diào)eventDispatcher.serviceChanged(serviceInfo)以及DiskCache.write(serviceInfo, cacheDir);HostReactor的構(gòu)造器有個(gè)loadCacheAtStart參數(shù)(默認(rèn)為false),如果為true則會(huì)使用DiskCache.read(this.cacheDir)從本地文件讀取serviceInfo信息來初始化serviceInfoMap

DiskCache

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/cache/DiskCache.java

public class DiskCache {

    public static void write(ServiceInfo dom, String dir) {

        try {
            makeSureCacheDirExists(dir);


            File file = new File(dir, dom.getKeyEncoded());
            if (!file.exists()) {
                // add another !file.exists() to avoid conflicted creating-new-file from multi-instances
                if (!file.createNewFile() && !file.exists()) {
                    throw new IllegalStateException("failed to create cache file");
                }
            }

            StringBuilder keyContentBuffer = new StringBuilder("");

            String json = dom.getJsonFromServer();

            if (StringUtils.isEmpty(json)) {
                json = JSON.toJSONString(dom);
            }

            keyContentBuffer.append(json);

            //Use the concurrent API to ensure the consistency.
            ConcurrentDiskUtil.writeFileContent(file, keyContentBuffer.toString(), Charset.defaultCharset().toString());

        } catch (Throwable e) {
            NAMING_LOGGER.error("[NA] failed to write cache for dom:" + dom.getName(), e);
        }
    }

    public static String getLineSeparator() {
        return System.getProperty("line.separator");
    }

    public static Map<String, ServiceInfo> read(String cacheDir) {
        Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16);

        BufferedReader reader = null;
        try {
            File[] files = makeSureCacheDirExists(cacheDir).listFiles();
            if (files == null || files.length == 0) {
                return domMap;
            }

            for (File file : files) {
                if (!file.isFile()) {
                    continue;
                }

                String fileName = URLDecoder.decode(file.getName(), "UTF-8");

                if (!(fileName.endsWith(Constants.SERVICE_INFO_SPLITER + "meta") || fileName.endsWith(
                    Constants.SERVICE_INFO_SPLITER + "special-url"))) {
                    ServiceInfo dom = new ServiceInfo(fileName);
                    List<Instance> ips = new ArrayList<Instance>();
                    dom.setHosts(ips);

                    ServiceInfo newFormat = null;

                    try {
                        String dataString = ConcurrentDiskUtil.getFileContent(file,
                            Charset.defaultCharset().toString());
                        reader = new BufferedReader(new StringReader(dataString));

                        String json;
                        while ((json = reader.readLine()) != null) {
                            try {
                                if (!json.startsWith("{")) {
                                    continue;
                                }

                                newFormat = JSON.parseObject(json, ServiceInfo.class);

                                if (StringUtils.isEmpty(newFormat.getName())) {
                                    ips.add(JSON.parseObject(json, Instance.class));
                                }
                            } catch (Throwable e) {
                                NAMING_LOGGER.error("[NA] error while parsing cache file: " + json, e);
                            }
                        }
                    } catch (Exception e) {
                        NAMING_LOGGER.error("[NA] failed to read cache for dom: " + file.getName(), e);
                    } finally {
                        try {
                            if (reader != null) {
                                reader.close();
                            }
                        } catch (Exception e) {
                            //ignore
                        }
                    }
                    if (newFormat != null && !StringUtils.isEmpty(newFormat.getName()) && !CollectionUtils.isEmpty(
                        newFormat.getHosts())) {
                        domMap.put(dom.getKey(), newFormat);
                    } else if (!CollectionUtils.isEmpty(dom.getHosts())) {
                        domMap.put(dom.getKey(), dom);
                    }
                }

            }
        } catch (Throwable e) {
            NAMING_LOGGER.error("[NA] failed to read cache file", e);
        }

        return domMap;
    }

    private static File makeSureCacheDirExists(String dir) {
        File cacheDir = new File(dir);
        if (!cacheDir.exists() && !cacheDir.mkdirs()) {
            throw new IllegalStateException("failed to create cache dir: " + dir);
        }

        return cacheDir;
    }
}
  • DiskCache的write方法會(huì)將serviceInfo寫入到dir文件夾下面,文件名為serviceInfo.getKeyEncoded();read方法則是讀取dir文件夾下面的文件然后解析為一個(gè)個(gè)ServiceInfo然后放到domMap,最后返回domMap

小結(jié)

  • PushReceiver實(shí)現(xiàn)了Runnable接口,其構(gòu)造器會(huì)創(chuàng)建udpSocket以及ScheduledThreadPoolExecutor,然后往ScheduledThreadPoolExecutor注冊(cè)自己

  • 其run方法使用while true循環(huán)來執(zhí)行udpSocket.receive(packet),之后將接收到的數(shù)據(jù)解析為PushPacket,然后根據(jù)不同pushPacket.type做不同處理

  • 當(dāng)pushPacket.type為dom或者service的時(shí)候會(huì)調(diào)用hostReactor.processServiceJSON(pushPacket.data);當(dāng)pushPacket.type為dump的時(shí)候會(huì)將hostReactor.getServiceInfoMap()序列化到ack中,最后將ack返回去

到此,相信大家對(duì)“nacos client中PushReceiver的原理和應(yīng)用”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI