溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

ZooKeeper與Curator注冊(cè)和監(jiān)控方法

發(fā)布時(shí)間:2021-08-13 19:11:50 來(lái)源:億速云 閱讀:133 作者:chen 欄目:云計(jì)算

這篇文章主要介紹“ZooKeeper與Curator注冊(cè)和監(jiān)控方法”,在日常操作中,相信很多人在ZooKeeper與Curator注冊(cè)和監(jiān)控方法問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”ZooKeeper與Curator注冊(cè)和監(jiān)控方法”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!

Curator提供了對(duì)zookeeper客戶端的封裝,并監(jiān)控連接狀態(tài)和會(huì)話session,特別是會(huì)話session過(guò)期后,curator能夠重新連接zookeeper,并且創(chuàng)建一個(gè)新的session。

對(duì)于zk的使用者來(lái)說(shuō),session的概念至關(guān)重要,如果想了解更多session的說(shuō)明,請(qǐng)?jiān)L問(wèn):

http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html

zk客戶端和zk服務(wù)器間主要可能存在下面幾種異常情況:

  1. 短暫失去連接:此時(shí)客戶端檢測(cè)到與服務(wù)端的連接已經(jīng)斷開(kāi),但是服務(wù)端維護(hù)的客戶端session尚未過(guò)期,之后客戶端和服務(wù)端重新建立了連接;當(dāng)客戶端重新連接后,由于session沒(méi)有過(guò)期,zookeeper能夠保證連接恢復(fù)后保持正常服務(wù)。

  2. 失去連接時(shí)間很長(zhǎng):此時(shí)服務(wù)器相對(duì)于客戶端的session已經(jīng)過(guò)期了,與先前session相關(guān)的watcher和ephemeral的路徑和數(shù)據(jù)都會(huì)消失;當(dāng)Curator重新創(chuàng)建了與zk的連接后,會(huì)獲取到session expired異常,Curator會(huì)銷(xiāo)毀先前的session,并且會(huì)創(chuàng)建一個(gè)新的session,需要注意的是,與之前session相關(guān)的watcher和ephemeral類型的路徑和數(shù)據(jù)在新的session中也不會(huì)存在,需要開(kāi)發(fā)者在CuratorFramework.getConnectionStateListenable().addListener()中添加狀態(tài)監(jiān)聽(tīng)事件,對(duì)ConnectionState.LOST事件進(jìn)行監(jiān)聽(tīng),當(dāng)session過(guò)期后,使得之前的session狀態(tài)得以恢復(fù)。對(duì)于ephemeral類型,在客戶端應(yīng)該保持?jǐn)?shù)據(jù)的狀態(tài),以便及時(shí)恢復(fù)。

  3. 客戶端重新啟動(dòng):不論先前的zk session是否已經(jīng)過(guò)期,都需要重新創(chuàng)建臨時(shí)節(jié)點(diǎn)、添加數(shù)據(jù)和watch事件,先前的session也會(huì)在稍后的一段時(shí)間內(nèi)過(guò)期。

  4. Zk服務(wù)器重新啟動(dòng):由于zk將session信息存放到了硬盤(pán)上,因此重啟后,先前未過(guò)期的session仍然存在,在zk服務(wù)器啟動(dòng)后,客戶端與zk服務(wù)器創(chuàng)建新的連接,并使用先前的session,與1相同。

  5. 需要注意的是,當(dāng)session過(guò)期了,在session過(guò)期期間另外的客戶端修改了zk的值,那么這個(gè)修改在客戶端重新連接到zk上時(shí),zk客戶端不會(huì)接收到這個(gè)修改的watch事件(盡管添加了watch),如果需要嚴(yán)格的watch邏輯,就需要在curator的狀態(tài)監(jiān)控中添加邏輯。

特別提示:watcher僅僅是一次性的,zookeeper通知了watcher事件后,就會(huì)將這個(gè)watcher從session中刪除,因此,如果想繼續(xù)監(jiān)控,就要添加新的watcher。

下面提供了對(duì)persistent和ephemeral兩種類型節(jié)點(diǎn)的監(jiān)控方法,其中g(shù)et方法說(shuō)明了persistent節(jié)點(diǎn)如何監(jiān)控,而register方法說(shuō)明了ephemeral類型的節(jié)點(diǎn)如何監(jiān)控。

package demo;

import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.concurrent.ConcurrentSkipListSet;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

public class CuratorTest {
    private CuratorFramework zkTools;
    private ConcurrentSkipListSet<String> watchers = new ConcurrentSkipListSet<String>();
    private static Charset charset = Charset.forName("utf-8");

    public CuratorTest() {
        zkTools = CuratorFrameworkFactory.builder()
                .connectString("192.168.0.216:3306")
                .namespace("zk/test")
                .retryPolicy(new RetryNTimes(2000, 20000))
                .build();
        zkTools.start();
    }

    public void addReconnectionWatcher(final String path, final ZookeeperWatcherType watcherType,
            final CuratorWatcher watcher) {
        synchronized (this) {
            if (!watchers.contains(watcher.toString()))// 不要添加重復(fù)的監(jiān)聽(tīng)事件
            {
                watchers.add(watcher.toString());
                System.out.println("add new watcher " + watcher);
                zkTools.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                    @Override
                    public void stateChanged(CuratorFramework client, ConnectionState newState) {
                        System.out.println(newState);
                        if (newState == ConnectionState.LOST) {// 處理session過(guò)期
                            try {
                                if (watcherType == ZookeeperWatcherType.EXITS) {
                                    zkTools.checkExists().usingWatcher(watcher).forPath(path);
                                } else if (watcherType == ZookeeperWatcherType.GET_CHILDREN) {
                                    zkTools.getChildren().usingWatcher(watcher).forPath(path);
                                } else if (watcherType == ZookeeperWatcherType.GET_DATA) {
                                    zkTools.getData().usingWatcher(watcher).forPath(path);
                                } else if (watcherType == ZookeeperWatcherType.CREATE_ON_NO_EXITS) {
                                    // ephemeral類型的節(jié)點(diǎn)session過(guò)期了,需要重新創(chuàng)建節(jié)點(diǎn),并且注冊(cè)監(jiān)聽(tīng)事件,之后監(jiān)聽(tīng)事件中,
                                    // 會(huì)處理create事件,將路徑值恢復(fù)到先前狀態(tài)
                                    Stat stat = zkTools.checkExists().usingWatcher(watcher)
                                            .forPath(path);
                                    if (stat == null) {
                                        System.err.println("to create");
                                        zkTools.create().creatingParentsIfNeeded()
                                                .withMode(CreateMode.EPHEMERAL)
                                                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
                                    }
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });
            }
        }
    }

    public void create() throws Exception {
        zkTools.create()// 創(chuàng)建一個(gè)路徑
                .creatingParentsIfNeeded()// 如果指定的節(jié)點(diǎn)的父節(jié)點(diǎn)不存在,遞歸創(chuàng)建父節(jié)點(diǎn)
                .withMode(CreateMode.PERSISTENT)// 存儲(chǔ)類型(臨時(shí)的還是持久的)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)// 訪問(wèn)權(quán)限
                .forPath("zk/test");// 創(chuàng)建的路徑
    }

    public void put() throws Exception {
        // 對(duì)路徑節(jié)點(diǎn)賦值
        zkTools.setData().forPath("zk/test", "hello world".getBytes(Charset.forName("utf-8")));
    }

    public void get() throws Exception {
        String path = "zk/test";
        ZKWatch watch = new ZKWatch(path);
        byte[] buffer = zkTools.getData().usingWatcher(watch).forPath(path);
        System.out.println(new String(buffer, charset));
        // 添加session過(guò)期的監(jiān)控
        addReconnectionWatcher(path, ZookeeperWatcherType.GET_DATA, watch);
    }

    public void register() throws Exception {
        String ip = InetAddress.getLocalHost().getHostAddress();
        String registeNode = "zk/register/" + ip;// 節(jié)點(diǎn)路徑
        byte[] data = "disable".getBytes(charset);// 節(jié)點(diǎn)值
        CuratorWatcher watcher = new ZKWatchRegister(registeNode, data); // 創(chuàng)建一個(gè)register watcher
        Stat stat = zkTools.checkExists().forPath(registeNode);
        if (stat != null) {
            zkTools.delete().forPath(registeNode);
        }
        zkTools.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(registeNode, data);// 創(chuàng)建的路徑和值
        // 添加到session過(guò)期監(jiān)控事件中
        addReconnectionWatcher(registeNode, ZookeeperWatcherType.CREATE_ON_NO_EXITS, watcher);
        data = zkTools.getData().usingWatcher(watcher).forPath(registeNode);
        System.out.println("get path form zk : " + registeNode + ":" + new String(data, charset));
    }

    public static void main(String[] args) throws Exception {
        CuratorTest test = new CuratorTest();
        test.get();
        test.register();
        Thread.sleep(10000000000L);
    }


    public class ZKWatch implements CuratorWatcher {
        private final String path;

        public String getPath() {
            return path;
        }

        public ZKWatch(String path) {
            this.path = path;
        }

        @Override
        public void process(WatchedEvent event) throws Exception {
            System.out.println(event.getType());
            if (event.getType() == EventType.NodeDataChanged) {
                byte[] data = zkTools.getData().usingWatcher(this).forPath(path);
                System.out.println(path + ":" + new String(data, Charset.forName("utf-8")));
            }
        }
    }

    public class ZKWatchRegister implements CuratorWatcher {
        private final String path;
        private byte[] value;

        public String getPath() {
            return path;
        }

        public ZKWatchRegister(String path, byte[] value) {
            this.path = path;
            this.value = value;
        }

        @Override
        public void process(WatchedEvent event) throws Exception {
            System.out.println(event.getType());
            if (event.getType() == EventType.NodeDataChanged) {
                // 節(jié)點(diǎn)數(shù)據(jù)改變了,需要記錄下來(lái),以便session過(guò)期后,能夠恢復(fù)到先前的數(shù)據(jù)狀態(tài)
                byte[] data = zkTools.getData().usingWatcher(this).forPath(path);
                value = data;
                System.out.println(path + ":" + new String(data, charset));
            } else if (event.getType() == EventType.NodeDeleted) {
                // 節(jié)點(diǎn)被刪除了,需要?jiǎng)?chuàng)建新的節(jié)點(diǎn)
                System.out.println(path + ":" + path + " has been deleted.");
                Stat stat = zkTools.checkExists().usingWatcher(this).forPath(path);
                if (stat == null) {
                    zkTools.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
                            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
                }
            } else if (event.getType() == EventType.NodeCreated) {
                // 節(jié)點(diǎn)被創(chuàng)建時(shí),需要添加監(jiān)聽(tīng)事件(創(chuàng)建可能是由于session過(guò)期后,curator的狀態(tài)監(jiān)聽(tīng)部分觸發(fā)的)
                System.out.println(path + ":" + " has been created!" + "the current data is "
                        + new String(value));
                zkTools.setData().forPath(path, value);
                zkTools.getData().usingWatcher(this).forPath(path);
            }
        }
    }
    public enum ZookeeperWatcherType {
        GET_DATA, GET_CHILDREN, EXITS, CREATE_ON_NO_EXITS
    }

}

到此,關(guān)于“ZooKeeper與Curator注冊(cè)和監(jiān)控方法”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI