溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Zookeeper的java實例

發(fā)布時間:2020-07-02 19:10:10 來源:網(wǎng)絡(luò) 閱讀:2615 作者:柴絲言 欄目:開發(fā)技術(shù)

還是在之前的模塊中寫這個例子:

Zookeeper的java實例

注意在pom.xml中加上Zookeeper的依賴,

Zookeeper的java實例

現(xiàn)在開始寫ZookeeperDemo.java

import org.apache.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;


public class ZookeeperDemo implements Watcher{

    Logger logger = Logger.getLogger(ZookeeperDemo.class);

    protected CountDownLatch countDownLatch = new CountDownLatch(1);

    //緩存時間
    private static final int SESSION_TIME = 2000;

    public static ZooKeeper zooKeeper = null;

    /**
     * 監(jiān)控所有被觸發(fā)的事件
     * @param watchedEvent
     */
    public void process(WatchedEvent watchedEvent) {
        logger.info("收到事件通知:" + watchedEvent.getState());
        if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
            countDownLatch.countDown();
        }
    }

    public void connect(String hosts){
        try{
            if(zooKeeper == null){
                //zk客戶端允許我們將ZK服務(wù)的所有地址進行配置
                zooKeeper = new ZooKeeper(hosts,SESSION_TIME,this);
                //使用countDownLatch的await
                countDownLatch.await();
            }

        }catch(IOException e){
            logger.error("連接創(chuàng)建失敗,發(fā)生 IOException :" + e.getMessage());
        } catch (InterruptedException e) {
            logger.error("連接創(chuàng)建失敗,發(fā)生 InterruptedException :" + e.getMessage());
        }
    }

    /**
     * 關(guān)閉連接
     */
    public void close(){
        try {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        }catch (InterruptedException e){
            logger.error("釋放連接錯誤 :"+ e.getMessage());
        }
    }
}

我們詳細解釋一下為什么要有這個類:

這個類是實現(xiàn)了Watcher接口:Watcher機制:目的是為ZK客戶端操作提供一種類似于異步獲取數(shù)據(jù)的操作。采用Watcher方式來完成對節(jié)點狀態(tài)的監(jiān)視,通過對/hotsname節(jié)點的子節(jié)點變化事件的監(jiān)聽來完成這一目標。監(jiān)聽進程是作為一個獨立的服務(wù)或者進程運行的,它覆蓋了 process 方法來實現(xiàn)應(yīng)急措施。

這里面涉及到的類:CountDownLatch:CountDownLatch是一個同步的工具類,允許一個或多個線程一直等待,直到其他線程的操作執(zhí)行完成后再執(zhí)行。在Java并發(fā)中,countdownLatch是一個常見的概念。CountDownLatch是在java1.5被引入的,跟它一起被引入的并發(fā)工具類還有CyclicBarrier、Semaphore、ConcurrentHashMap和BlockingQueue,它們都存在于java.util.concurrent包下。

CountDownLatch這個類能夠使一個線程等待其他線程完成各自的工作后再執(zhí)行。例如,應(yīng)用程序的主線程希望在負責(zé)啟動框架服務(wù)的線程已經(jīng)啟動所有的框架服務(wù)之后再執(zhí)行。CountDownLatch是通過一個計數(shù)器來實現(xiàn)的,計數(shù)器的初始值為線程的數(shù)量。每當(dāng)一個線程完成了自己的任務(wù)后,計數(shù)器的值就會減1。當(dāng)計數(shù)器值到達0時,它表示所有的線程已經(jīng)完成了任務(wù),然后在閉鎖上等待的線程就可以恢復(fù)執(zhí)行任務(wù)。

下面是本例里面用到的CountDownLatch的構(gòu)造方法和其注釋:

/**
 * Constructs a {@code CountDownLatch} initialized with the given count.
 *
 * @param count the number of times {@link #countDown} must be invoked
 *        before threads can pass through {@link #await}
 * @throws IllegalArgumentException if {@code count} is negative
 */
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

這段釋義應(yīng)該是說:以給定的count值來初始化一個countDownLatch對象。其中count是指等待的線程在count個線程執(zhí)行完成后再執(zhí)行。CountDownLatch是通過一個計數(shù)器來實現(xiàn)的,計數(shù)器的初始值為線程的數(shù)量。每當(dāng)一個線程完成了自己的任務(wù)后,計數(shù)器的值就會減1。當(dāng)計數(shù)器值到達0時,它表示所有的線程已經(jīng)完成了任務(wù),然后在閉鎖上等待的線程就可以恢復(fù)執(zhí)行任務(wù)。

那么count(計數(shù)值)實際上就是閉鎖需要等待的線程數(shù)量。

與CountDownLatch的第一次交互是主線程等待其他線程。主線程必須在啟動其他線程后立即調(diào)用CountDownLatch.await()方法。這樣主線程的操作就會在這個方法上阻塞,直到其他線程完成各自的任務(wù)。

其他N 個線程必須引用閉鎖對象,因為他們需要通知CountDownLatch對象,他們已經(jīng)完成了各自的任務(wù)。這種通知機制是通過 CountDownLatch.countDown()方法來完成的;每調(diào)用一次這個方法,在構(gòu)造函數(shù)中初始化的count值就減1。所以當(dāng)N個線程都調(diào) 用了這個方法,count的值等于0,然后主線程就能通過await()方法,恢復(fù)執(zhí)行自己的任務(wù)。

更多CountDownLatch類可以參考:http://www.importnew.com/15731.html


Zookeeper類Zookeeper 中文API 】:http://www.cnblogs.com/ggjucheng/p/3370359.html

下面這是本例用到的Zookeeper的構(gòu)造方法:第一個是主機地址,第二個是會話超時時間、第三個是監(jiān)視者。

ZooKeeper(String connectStringsessionTimeoutWatcher watcher) IOException {
    (connectStringsessionTimeoutwatcher)}


再來看第二個類:ZookeeperOperation.java。確切的說,這個名字改錯了,應(yīng)該叫ZNodeOperation。因為這就是對節(jié)點進行操作的一個類:

import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;

import org.apache.zookeeper.data.Stat;

import java.util.List;

public class ZookeeperOperation {
    Logger logger = Logger.getLogger(ZookeeperOperation.class);
    ZookeeperDemo zookeeperDemo = new ZookeeperDemo();

    /**
     * 創(chuàng)建節(jié)點
     * @param path 節(jié)點路徑
     * @param data  節(jié)點內(nèi)容
     * @return
     */
    public boolean createZNode(String path,String data){
        try {
            String zkPath = ZookeeperDemo.zooKeeper.create(path,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            logger.info("Zookeeper創(chuàng)建節(jié)點成功,節(jié)點地址:" + zkPath);
            return true;
        } catch (KeeperException e) {

            logger.error("創(chuàng)建節(jié)點失敗:" + e.getMessage() + ",path:" + path  ,e);
        } catch (InterruptedException e) {
            logger.error("創(chuàng)建節(jié)點失?。? + e.getMessage() + ",path:" + path  ,e);
        }
        return false;

    }

    /**
     * 刪除一個節(jié)點
     * @param path 節(jié)點路徑
     * @return
     */
    public boolean deteleZKNode(String path){
        try{
            ZookeeperDemo.zooKeeper.delete(path,-1);
            logger.info("Zookeeper刪除節(jié)點1成功,節(jié)點地址:" + path);
            return  true;
        }catch (InterruptedException e){
            logger.error("刪除節(jié)點失?。? + e.getMessage() + ",path:" + path,e);
        }catch (KeeperException e){
            logger.error("刪除節(jié)點失敗:" + e.getMessage() + ",path:" + path,e);
        }
        return false;
    }

    /**
     * 更新節(jié)點內(nèi)容
     * @param path 節(jié)點路徑
     * @param data 節(jié)點數(shù)據(jù)
     * @return
     */
    public boolean updateZKNodeData(String path,String data){
        try {
            Stat stat = ZookeeperDemo.zooKeeper.setData(path,data.getBytes(),-1);
            logger.info("更新節(jié)點數(shù)據(jù)成功,path:" + path+", stat:" + stat);
            return  true;
        } catch (KeeperException e) {
            logger.error("更新節(jié)點數(shù)據(jù)失?。? + e.getMessage() + ",path:" + path ,e);
        } catch (InterruptedException e) {
            logger.error("更新節(jié)點數(shù)據(jù)失?。? + e.getMessage() + ",path:" + path ,e);
        }
        return false;
    }

    /**
     * 讀取指定節(jié)點的內(nèi)容
     * @param path 指定的路徑
     * @return
     */
    public String readData(String path){
        String data=null;
        try {
            data = new String(ZookeeperDemo.zooKeeper.getData(path,false,null));
            logger.info("讀取數(shù)據(jù)成功,其中path:" + path+ ", data-content:" + data);
        } catch (KeeperException e) {
            logger.error( "讀取數(shù)據(jù)失敗,發(fā)生KeeperException! path: " + path + ", errMsg:" + e.getMessage(), e );
        } catch (InterruptedException e) {
            logger.error( "讀取數(shù)據(jù)失敗,InterruptedException! path: " + path + ", errMsg:" + e.getMessage(), e );
        }
      return data;
    }

    /**
     * 獲取某個節(jié)點下的所有節(jié)點
     * @param path 節(jié)點路徑
     * @return
     */
    public List<String> getChild(String path){
        try {
            List<String> list = ZookeeperDemo.zooKeeper.getChildren(path,false);
            if(list.isEmpty()){
                logger.info(path + "的路徑下沒有節(jié)點");
            }
            return list;
        } catch (KeeperException e) {
            logger.error( "讀取子節(jié)點數(shù)據(jù)失敗,發(fā)生KeeperException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        } catch (InterruptedException e) {
            logger.error( "讀取子節(jié)點數(shù)據(jù)失敗,InterruptedException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        }
        return null;
    }

    public boolean isExists(String path){
        try {
            Stat stat = ZookeeperDemo.zooKeeper.exists(path,false);
            return null != stat;
        } catch (KeeperException e) {
            logger.error( "讀取數(shù)據(jù)失敗,發(fā)生KeeperException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        } catch (InterruptedException e) {
            logger.error( "讀取數(shù)據(jù)失敗,發(fā)生InterruptedException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        }
        return  false;
    }
}

其中,有個creatZNode()方法中,有用到:

String zkPath = ZookeeperDemo.zooKeeper.create(path,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

這個方法是這樣的:String create(String path, byte[] data, List<ACL> acl,CreateMode createMode);

創(chuàng)建一個給定的目錄節(jié)點 path, 并給它設(shè)置數(shù)據(jù),CreateMode 標識有四種形式的目錄節(jié)點,分別是 PERSISTENT:持久化目錄節(jié)點,這個目錄節(jié)點存儲的數(shù)據(jù)不會丟失;PERSISTENT_SEQUENTIAL:順序自動編號的目錄節(jié)點,這種目錄節(jié)點會根據(jù)當(dāng)前已近存在的節(jié)點數(shù)自動加 1,然后返回給客戶端已經(jīng)成功創(chuàng)建的目錄節(jié)點名;EPHEMERAL:臨時目錄節(jié)點,一旦創(chuàng)建這個節(jié)點的客戶端與服務(wù)器端口也就是 session 超時,這種節(jié)點會被自動刪除;EPHEMERAL_SEQUENTIAL:臨時自動編號節(jié)點。

更多關(guān)于Zookeeper的中文API,參考這個博客:

http://www.cnblogs.com/ggjucheng/p/3370359.html


第三個類,ZookeeperCliTest.java就是測試啦:

import java.util.List;

public class ZookeeperCliTest {
    public static void main(String[] args){
        //定義父子類節(jié)點路徑
        String rootPath = "/ZookeeperRoot01";
        String childPath2 = rootPath+ "/child101";
        String childPath3 = rootPath+ "/child201";

        //ZookeeperOperation操作API
        ZookeeperOperation zookeeperOperation = new ZookeeperOperation();

        //連接Zookeeper服務(wù)器
        ZookeeperDemo zookeeperDemo =new ZookeeperDemo();
        zookeeperDemo.connect("127.0.0.1:2181");

        //創(chuàng)建節(jié)點
        if(zookeeperOperation.createZNode(rootPath,"<父>父節(jié)點數(shù)據(jù)")){
            System.out.println("節(jié)點 [ " +rootPath + " ],數(shù)據(jù) [ " + zookeeperOperation.readData(rootPath)+" ]");
        }

        // 創(chuàng)建子節(jié)點, 讀取 + 刪除
        if ( zookeeperOperation.createZNode( childPath2, "<父-子(1)>節(jié)點數(shù)據(jù)" ) ) {
            System.out.println( "節(jié)點[" + childPath2 + "]數(shù)據(jù)內(nèi)容[" + zookeeperOperation.readData( childPath2 ) + "]" );
            zookeeperOperation.deteleZKNode(childPath2);
            System.out.println( "節(jié)點[" + childPath2 + "]刪除值后[" + zookeeperOperation.readData( childPath2 ) + "]" );
        }

        // 創(chuàng)建子節(jié)點, 讀取 + 修改
        if ( zookeeperOperation.createZNode(childPath3, "<父-子(2)>節(jié)點數(shù)據(jù)" ) ) {
            System.out.println( "節(jié)點[" + childPath3 + "]數(shù)據(jù)內(nèi)容[" + zookeeperOperation.readData( childPath3 ) + "]" );
            zookeeperOperation.updateZKNodeData(childPath3, "<父-子(2)>節(jié)點數(shù)據(jù),更新后的數(shù)據(jù)" );
            System.out.println( "節(jié)點[" + childPath3+ "]數(shù)據(jù)內(nèi)容更新后[" + zookeeperOperation.readData( childPath3 ) + "]" );
        }

        // 獲取子節(jié)點
        List<String> childPaths = zookeeperOperation.getChild(rootPath);
        if(null != childPaths){
            System.out.println( "節(jié)點[" + rootPath + "]下的子節(jié)點數(shù)[" + childPaths.size() + "]" );
            for(String childPath : childPaths){
                System.out.println(" |--節(jié)點名[" +  childPath +  "]");
            }
        }
        // 判斷節(jié)點是否存在
        System.out.println( "檢測節(jié)點[" + rootPath + "]是否存在:" + zookeeperOperation.isExists(rootPath)  );
        System.out.println( "檢測節(jié)點[" + childPath2 + "]是否存在:" + zookeeperOperation.isExists(childPath2)  );
        System.out.println( "檢測節(jié)點[" + childPath3 + "]是否存在:" + zookeeperOperation.isExists(childPath3)  );


        zookeeperDemo.close();
    }

}

由于開啟了Logger的打印日志到控制臺,所以運行結(jié)果如下:

Zookeeper的java實例

以上是main啟動的打印信息。

Zookeeper的java實例


Zookeeper的java實例

以上的漢字就是我們在代碼中要求Logger打印到控制臺的。

若不想看到這么多信息,可以注釋掉log4j.properties.結(jié)果如下:除了System.out.println()的打印結(jié)果,Logger一個都沒有出現(xiàn)。

Zookeeper的java實例


log4j.properties的內(nèi)容如下:

#logger
log4j.rootLogger=debug,appender1
log4j.appender.appender1=org.apache.log4j.ConsoleAppender
log4j.appender.appender1.layout=org.apache.log4j.TTCCLayout


向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI