溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

apache zookeeper使用方法實(shí)例詳解

發(fā)布時(shí)間:2020-08-25 17:42:10 來(lái)源:腳本之家 閱讀:211 作者:Forhappy && Haippy 欄目:編程語(yǔ)言

本文涉及了Apache Zookeeper使用方法實(shí)例詳解的相關(guān)知識(shí),接下來(lái)我們就看看具體內(nèi)容。

簡(jiǎn)介

Apache Zookeeper 是由 Apache Hadoop 的 Zookeeper 子項(xiàng)目發(fā)展而來(lái),現(xiàn)在已經(jīng)成為了 Apache 的頂級(jí)項(xiàng)目。Zookeeper 為分布式系統(tǒng)提供了高效可靠且易于使用的協(xié)同服務(wù),它可以為分布式應(yīng)用提供相當(dāng)多的服務(wù),諸如統(tǒng)一命名服務(wù),配置管理,狀態(tài)同步和組服務(wù)等。 Zookeeper 接口簡(jiǎn)單,開(kāi)發(fā)人員不必過(guò)多地糾結(jié)在分布式系統(tǒng)編程難于處理的同步和一致性問(wèn)題上,你可以使用 Zookeeper 提供的現(xiàn)成(off-the-shelf)服務(wù)來(lái)實(shí)現(xiàn)分布式系統(tǒng)的配置管理,組管理,Leader 選舉等功能。

英文原文地址:http://zookeeper.apache.org/doc/current/javaExample.html

一個(gè)簡(jiǎn)單的 Zookeeper Watch 客戶端

為了介紹 Zookeeper Java API 的基本用法,本文將帶你如何一步一步實(shí)現(xiàn)一個(gè)功能簡(jiǎn)單的  Zookeeper 客戶端。該 Zookeeper 客戶端會(huì)監(jiān)視一個(gè)你指定 Zookeeper 節(jié)點(diǎn) Znode, 當(dāng)被監(jiān)視的節(jié)點(diǎn)發(fā)生變化時(shí),客戶端會(huì)啟動(dòng)或者停止某一程序。

基本要求

該客戶端具備四個(gè)基本要求:

(1)客戶端所帶參數(shù):

(2)Zookeeper 服務(wù)地址。

(3)被監(jiān)視的 Znode 節(jié)點(diǎn)名稱。

(4)可執(zhí)行程序及其所帶的參數(shù)

客戶端會(huì)獲取被監(jiān)視 Znode 節(jié)點(diǎn)的數(shù)據(jù)并啟動(dòng)你所指定的可執(zhí)行程序。如果被監(jiān)視的 Znode 節(jié)點(diǎn)發(fā)生改變,客戶端重新獲取其內(nèi)容并再次啟動(dòng)你所指定的可執(zhí)行程序。如果被監(jiān)視的 Znode 節(jié)點(diǎn)消失,客戶端會(huì)殺死可執(zhí)行程序。

程序設(shè)計(jì)

一般而言,Zookeeper 應(yīng)用程序分為兩部分,其中一部分維護(hù)與服務(wù)器端的連接,另外一部分監(jiān)視 Znode 節(jié)點(diǎn)的數(shù)據(jù)。在本程序中,Executor 類負(fù)責(zé)維護(hù) Zookeeper 連接,DataMonitor 類監(jiān)視 Zookeeper 目錄樹(shù)中的數(shù)據(jù), 同時(shí),Executor 包含了主線程和程序主要的執(zhí)行邏輯,它負(fù)責(zé)少量的用戶交互,以及與可執(zhí)行程序的交互,該可執(zhí)行程序接受你向它傳入的參數(shù),并且會(huì)根據(jù)被監(jiān)視的 Znode 節(jié)點(diǎn)的狀態(tài)變化停止或重啟。

Executor類

Executor 對(duì)象是本例程最基本的“容器”,它包括Zookeeper 對(duì)象和DataMonitor對(duì)象。

public static void main(String[] args) {
    if (args.length < 4) {
      System.err
          .println("USAGE: Executor hostPort znode filename program [args ...]");
      System.exit(2);
    }
    String hostPort = args[0];
    String znode = args[1];
    String filename = args[2];
    String exec[] = new String[args.length - 3];
    System.arraycopy(args, 3, exec, 0, exec.length);
    try {
      new Executor(hostPort, znode, filename, exec).run();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  public Executor(String hostPort, String znode, String filename,
      String exec[]) throws KeeperException, IOException {
    this.filename = filename;
    this.exec = exec;
    zk = new ZooKeeper(hostPort, 3000, this);
    dm = new DataMonitor(zk, znode, null, this);
  }
  public void run() {
    try {
      synchronized (this) {
        while (!dm.dead) {
          wait();
        }
      }
    } catch (InterruptedException e) {
    }
  }

回憶一下 Executor 的任務(wù)是根據(jù) Zookeeper 中 Znode 節(jié)點(diǎn)狀態(tài)改變所觸發(fā)的事件來(lái)啟動(dòng)和停止你在命令行指定的可執(zhí)行程序, 在上面的代碼你可以看到,Executor 類在其構(gòu)造函數(shù)中實(shí)例化 Zookeeper 對(duì)象時(shí),將其自身的引用作為 Watch 參數(shù)傳遞給 Zookeeper 的構(gòu)造函數(shù),同時(shí)它也將其自身的引用作為 DataMonitorListener 參數(shù)傳遞給 DataMonitor 的構(gòu)造函數(shù)。Executor 本身實(shí)現(xiàn)了以下接口:

public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener {
...

Watcher 接口是在ZooKeeper Java API 中定義的。 ZooKeeper 用它來(lái)與“容器”(此處“容器”與上面的 Executor 類相似)進(jìn)行通信,Watcher 只支持一個(gè)方法,即process(), ZooKeeper 用該函數(shù)來(lái)處理主線程可能感興趣的事件,例如 Zookeeper 連接或會(huì)話的狀態(tài),本例中的“容器” Executor只是簡(jiǎn)單地把事件向下傳遞給 DataMonitor,具體如何處理事件是由 DataMonitor 決定的。本文只是簡(jiǎn)單地描述了如何使用 Watcher,通常情況下,Executor 或 與 Executor 類似的對(duì)象擁有 與Zookeeper 服務(wù)端的連接,但它可以將事件傳遞給其他對(duì)象,并有其它的對(duì)象處理該事件。

public void process(WatchedEvent event) {
    dm.process(event);
  }

DataMonitorListener 接口本身不是Zookeeper API 的一部分,它完全是一個(gè)自定義的接口,可以說(shuō)是專門(mén)為本程序設(shè)計(jì)的。DataMonitor 對(duì)象使用該接口和“容器”(即 Executor 類)進(jìn)行通信,DataMonitorListener 接口如下:

public interface DataMonitorListener {
  /**
  * The existence status of the node has changed.
  */
  void exists(byte data[]);
  /**
  * The ZooKeeper session is no longer valid.
  * 
  * @param rc
  * the ZooKeeper reason code
  */
  void closing(int rc);
}

該接口在 DataMonitor 中定義,Executor 類實(shí)現(xiàn)該接口,當(dāng) Executor.exists() 被調(diào)用的時(shí)候,Executor 決定是否啟動(dòng)或停止事先指定的應(yīng)用程序(回憶一下前文所說(shuō)的,當(dāng) Znode 消失時(shí) Zookeeper 客戶端會(huì)殺死該可執(zhí)行程序)。

當(dāng) Executor.closing() 被調(diào)用的時(shí)候,Executor 會(huì)根據(jù) Zookeeper 連接永久性地消失來(lái)決定是否關(guān)閉自己。

你或許已經(jīng)猜到,DataMonitor 對(duì)象根據(jù) Zookeeper 狀態(tài)變化來(lái)調(diào)用這些方法吧?

以下是 Executor 類中實(shí)現(xiàn) DataMonitorListener.exists() DataMonitorListener.closing()的代碼:

public void exists( byte[] data ) {
  if (data == null) {
    if (child != null) {
      System.out.println("Killing process");
      child.destroy();
      try {
        child.waitFor();
      } catch (InterruptedException e) {
      }
    }
    child = null;
  } else {
    if (child != null) {
      System.out.println("Stopping child");
      child.destroy();
      try {
        child.waitFor();
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
    }
    try {
      FileOutputStream fos = new FileOutputStream(filename);
      fos.write(data);
      fos.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
    try {
      System.out.println("Starting child");
      child = Runtime.getRuntime().exec(exec);
      new StreamWriter(child.getInputStream(), System.out);
      new StreamWriter(child.getErrorStream(), System.err);
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}
public void closing(int rc) {
  synchronized (this) {
    notifyAll();
  }
}

DataMonitor 類

DataMonitor 類是本程序 Zookeeper 邏輯的核心, 它差不多是異步的,并由事件驅(qū)動(dòng)的。DataMonitor 構(gòu)造函數(shù)如下:

public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
    DataMonitorListener listener) {
  this.zk = zk;
  this.znode = znode;
  this.chainedWatcher = chainedWatcher;
  this.listener = listener;
  // Get things started by checking if the node exists. We are going
  // to be completely event driven
  zk.exists(znode, true, this, null);
}

調(diào)用 ZooKeeper.exists() 檢查指定的 Znode 是否存在,并設(shè)置監(jiān)視,傳遞自身引用作為回調(diào)對(duì)象,在某種意義上,在 watch 觸發(fā)時(shí)就會(huì)引起真實(shí)的處理流程。

當(dāng) ZooKeeper.exists() 操作在服務(wù)器端完成時(shí),ZooKeeper API 會(huì)在客戶端調(diào)用 completion callback

public void processResult(int rc, String path, Object ctx, Stat stat) {
  boolean exists;
  switch (rc) {
  case Code.Ok:
    exists = true;
    break;
  case Code.NoNode:
    exists = false;
    break;
  case Code.SessionExpired:
  case Code.NoAuth:
    dead = true;
    listener.closing(rc);
    return;
  default:
    // Retry errors
    zk.exists(znode, true, this, null);
    return;
  }
  byte b[] = null;
  if (exists) {
    try {
      b = zk.getData(znode, false, null);
    } catch (KeeperException e) {
      // We don't need to worry about recovering now. The watch
      // callbacks will kick off any exception handling
      e.printStackTrace();
    } catch (InterruptedException e) {
      return;
    }
  }   
  if ((b == null && b != prevData)
      || (b != null && !Arrays.equals(prevData, b))) {
    listener.exists(b);
    prevData = b;
  }
}

上述代碼首先檢查 Znode 是否存在,以及其他重大的不可恢復(fù)的錯(cuò)誤。如果文件(或者Znode)存在,它將從 Znode 獲取數(shù)據(jù),如果狀態(tài)發(fā)生變化再調(diào)用 Executor 的 exists() 回調(diào)函數(shù)。注意,getData 函數(shù)本省必須要做任何的異常處理,因?yàn)楸旧砭陀斜O(jiān)視可以處理任何錯(cuò)誤:如果節(jié)點(diǎn)在調(diào)用 ZooKeeper.getData() 之前被刪除,ZooKeeper.exists() 就會(huì)觸發(fā)回調(diào)函數(shù),如果存在通信錯(cuò)誤,在連接上的監(jiān)視會(huì)在該連接重建之前觸發(fā)相應(yīng)的事件,同時(shí)引發(fā)相應(yīng)的處理。

最后,DataMonitor 處理監(jiān)視事件的代碼如下:

public void process(WatchedEvent event) {
    String path = event.getPath();
    if (event.getType() == Event.EventType.None) {
      // We are are being told that the state of the
      // connection has changed
      switch (event.getState()) {
      case SyncConnected:
        // In this particular example we don't need to do anything
        // here - watches are automatically re-registered with 
        // server and any watches triggered while the client was 
        // disconnected will be delivered (in order of course)
        break;
      case Expired:
        // It's all over
        dead = true;
        listener.closing(KeeperException.Code.SessionExpired);
        break;
      }
    } else {
      if (path != null && path.equals(znode)) {
        // Something has changed on the node, let's find out
        zk.exists(znode, true, this, null);
      }
    }
    if (chainedWatcher != null) {
      chainedWatcher.process(event);
    }
  }

如果客戶端 Zookeeper 程序在會(huì)話失效時(shí)(Expired event)重新建立了通信信道(SyncConnected event) ,所有的會(huì)話監(jiān)視會(huì)自動(dòng)和服務(wù)器進(jìn)行重連, (Zookeeper 3.0.0以上版本會(huì)重置之前設(shè)置的監(jiān)視). 更多編程指南請(qǐng)參見(jiàn) ZooKeeper Watches 。 當(dāng) DataMonitor 獲得了指定 Znode 的事件后,它將調(diào)用 ZooKeeper.exists() 來(lái)決定究竟發(fā)生了什么。

完整的程序:

Executor.java:

/**
 * A simple example program to use DataMonitor to start and
 * stop executables based on a znode. The program watches the
 * specified znode and saves the data that corresponds to the
 * znode in the filesystem. It also starts the specified program
 * with the specified arguments when the znode exists and kills
 * the program if the znode goes away.
 */
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class Executor
  implements Watcher, Runnable, DataMonitor.DataMonitorListener
{
  String znode;
  DataMonitor dm;
  ZooKeeper zk;
  String filename;
  String exec[];
  Process child;
  public Executor(String hostPort, String znode, String filename,
      String exec[]) throws KeeperException, IOException {
    this.filename = filename;
    this.exec = exec;
    zk = new ZooKeeper(hostPort, 3000, this);
    dm = new DataMonitor(zk, znode, null, this);
  }
  /**
   * @param args
   */
  public static void main(String[] args) {
    if (args.length < 4) {
      System.err
          .println("USAGE: Executor hostPort znode filename program [args ...]");
      System.exit(2);
    }
    String hostPort = args[0];
    String znode = args[1];
    String filename = args[2];
    String exec[] = new String[args.length - 3];
    System.arraycopy(args, 3, exec, 0, exec.length);
    try {
      new Executor(hostPort, znode, filename, exec).run();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  /***************************************************************************
   * We do process any events ourselves, we just need to forward them on.
   *
   * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent)
   */
  public void process(WatchedEvent event) {
    dm.process(event);
  }
  public void run() {
    try {
      synchronized (this) {
        while (!dm.dead) {
          wait();
        }
      }
    } catch (InterruptedException e) {
    }
  }
  public void closing(int rc) {
    synchronized (this) {
      notifyAll();
    }
  }
  static class StreamWriter extends Thread {
    OutputStream os;
    InputStream is;
    StreamWriter(InputStream is, OutputStream os) {
      this.is = is;
      this.os = os;
      start();
    }
    public void run() {
      byte b[] = new byte[80];
      int rc;
      try {
        while ((rc = is.read(b)) > 0) {
          os.write(b, 0, rc);
        }
      } catch (IOException e) {
      }
    }
  }
  public void exists(byte[] data) {
    if (data == null) {
      if (child != null) {
        System.out.println("Killing process");
        child.destroy();
        try {
          child.waitFor();
        } catch (InterruptedException e) {
        }
      }
      child = null;
    } else {
      if (child != null) {
        System.out.println("Stopping child");
        child.destroy();
        try {
          child.waitFor();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      try {
        FileOutputStream fos = new FileOutputStream(filename);
        fos.write(data);
        fos.close();
      } catch (IOException e) {
        e.printStackTrace();
      }
      try {
        System.out.println("Starting child");
        child = Runtime.getRuntime().exec(exec);
        new StreamWriter(child.getInputStream(), System.out);
        new StreamWriter(child.getErrorStream(), System.err);
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
}

DataMonitor.java:

/**
 * A simple class that monitors the data and existence of a ZooKeeper
 * node. It uses asynchronous ZooKeeper APIs.
 */
import java.util.Arrays;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
public class DataMonitor implements Watcher, StatCallback {
  ZooKeeper zk;
  String znode;
  Watcher chainedWatcher;
  boolean dead;
  DataMonitorListener listener;
  byte prevData[];
  public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
      DataMonitorListener listener) {
    this.zk = zk;
    this.znode = znode;
    this.chainedWatcher = chainedWatcher;
    this.listener = listener;
    // Get things started by checking if the node exists. We are going
    // to be completely event driven
    zk.exists(znode, true, this, null);
  }
  /**
   * Other classes use the DataMonitor by implementing this method
   */
  public interface DataMonitorListener {
    /**
     * The existence status of the node has changed.
     */
    void exists(byte data[]);
    /**
     * The ZooKeeper session is no longer valid.
     *
     * @param rc
     *        the ZooKeeper reason code
     */
    void closing(int rc);
  }
  public void process(WatchedEvent event) {
    String path = event.getPath();
    if (event.getType() == Event.EventType.None) {
      // We are are being told that the state of the
      // connection has changed
      switch (event.getState()) {
      case SyncConnected:
        // In this particular example we don't need to do anything
        // here - watches are automatically re-registered with 
        // server and any watches triggered while the client was 
        // disconnected will be delivered (in order of course)
        break;
      case Expired:
        // It's all over
        dead = true;
        listener.closing(KeeperException.Code.SessionExpired);
        break;
      }
    } else {
      if (path != null && path.equals(znode)) {
        // Something has changed on the node, let's find out
        zk.exists(znode, true, this, null);
      }
    }
    if (chainedWatcher != null) {
      chainedWatcher.process(event);
    }
  }
  public void processResult(int rc, String path, Object ctx, Stat stat) {
    boolean exists;
    switch (rc) {
    case Code.Ok:
      exists = true;
      break;
    case Code.NoNode:
      exists = false;
      break;
    case Code.SessionExpired:
    case Code.NoAuth:
      dead = true;
      listener.closing(rc);
      return;
    default:
      // Retry errors
      zk.exists(znode, true, this, null);
      return;
    }
    byte b[] = null;
    if (exists) {
      try {
        b = zk.getData(znode, false, null);
      } catch (KeeperException e) {
        // We don't need to worry about recovering now. The watch
        // callbacks will kick off any exception handling
        e.printStackTrace();
      } catch (InterruptedException e) {
        return;
      }
    }
    if ((b == null && b != prevData)
        || (b != null && !Arrays.equals(prevData, b))) {
      listener.exists(b);
      prevData = b;
    }
  }
}

總結(jié)

本文關(guān)于Apache Zookeeper使用方法實(shí)例詳解的介紹就到這里,希望對(duì)大家有所幫助。如果有什么問(wèn)題可以留言,小編會(huì)及時(shí)回復(fù)大家的,感謝大家對(duì)億速云網(wǎng)站的支持!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI