您好,登錄后才能下訂單哦!
本文涉及了Apache Zookeeper使用方法實(shí)例詳解的相關(guān)知識(shí),接下來(lái)我們就看看具體內(nèi)容。
簡(jiǎn)介
Apache Zookeeper 是由 Apache Hadoop 的 Zookeeper 子項(xiàng)目發(fā)展而來(lái),現(xiàn)在已經(jīng)成為了 Apache 的頂級(jí)項(xiàng)目。Zookeeper 為分布式系統(tǒng)提供了高效可靠且易于使用的協(xié)同服務(wù),它可以為分布式應(yīng)用提供相當(dāng)多的服務(wù),諸如統(tǒng)一命名服務(wù),配置管理,狀態(tài)同步和組服務(wù)等。 Zookeeper 接口簡(jiǎn)單,開(kāi)發(fā)人員不必過(guò)多地糾結(jié)在分布式系統(tǒng)編程難于處理的同步和一致性問(wèn)題上,你可以使用 Zookeeper 提供的現(xiàn)成(off-the-shelf)服務(wù)來(lái)實(shí)現(xiàn)分布式系統(tǒng)的配置管理,組管理,Leader 選舉等功能。
英文原文地址:http://zookeeper.apache.org/doc/current/javaExample.html
一個(gè)簡(jiǎn)單的 Zookeeper Watch 客戶端
為了介紹 Zookeeper Java API 的基本用法,本文將帶你如何一步一步實(shí)現(xiàn)一個(gè)功能簡(jiǎn)單的 Zookeeper 客戶端。該 Zookeeper 客戶端會(huì)監(jiān)視一個(gè)你指定 Zookeeper 節(jié)點(diǎn) Znode, 當(dāng)被監(jiān)視的節(jié)點(diǎn)發(fā)生變化時(shí),客戶端會(huì)啟動(dòng)或者停止某一程序。
基本要求
該客戶端具備四個(gè)基本要求:
(1)客戶端所帶參數(shù):
(2)Zookeeper 服務(wù)地址。
(3)被監(jiān)視的 Znode 節(jié)點(diǎn)名稱。
(4)可執(zhí)行程序及其所帶的參數(shù)
客戶端會(huì)獲取被監(jiān)視 Znode 節(jié)點(diǎn)的數(shù)據(jù)并啟動(dòng)你所指定的可執(zhí)行程序。如果被監(jiān)視的 Znode 節(jié)點(diǎn)發(fā)生改變,客戶端重新獲取其內(nèi)容并再次啟動(dòng)你所指定的可執(zhí)行程序。如果被監(jiān)視的 Znode 節(jié)點(diǎn)消失,客戶端會(huì)殺死可執(zhí)行程序。
程序設(shè)計(jì)
一般而言,Zookeeper 應(yīng)用程序分為兩部分,其中一部分維護(hù)與服務(wù)器端的連接,另外一部分監(jiān)視 Znode 節(jié)點(diǎn)的數(shù)據(jù)。在本程序中,Executor 類負(fù)責(zé)維護(hù) Zookeeper 連接,DataMonitor 類監(jiān)視 Zookeeper 目錄樹(shù)中的數(shù)據(jù), 同時(shí),Executor 包含了主線程和程序主要的執(zhí)行邏輯,它負(fù)責(zé)少量的用戶交互,以及與可執(zhí)行程序的交互,該可執(zhí)行程序接受你向它傳入的參數(shù),并且會(huì)根據(jù)被監(jiān)視的 Znode 節(jié)點(diǎn)的狀態(tài)變化停止或重啟。
Executor類
Executor 對(duì)象是本例程最基本的“容器”,它包括Zookeeper 對(duì)象和DataMonitor對(duì)象。
public static void main(String[] args) { if (args.length < 4) { System.err .println("USAGE: Executor hostPort znode filename program [args ...]"); System.exit(2); } String hostPort = args[0]; String znode = args[1]; String filename = args[2]; String exec[] = new String[args.length - 3]; System.arraycopy(args, 3, exec, 0, exec.length); try { new Executor(hostPort, znode, filename, exec).run(); } catch (Exception e) { e.printStackTrace(); } } public Executor(String hostPort, String znode, String filename, String exec[]) throws KeeperException, IOException { this.filename = filename; this.exec = exec; zk = new ZooKeeper(hostPort, 3000, this); dm = new DataMonitor(zk, znode, null, this); } public void run() { try { synchronized (this) { while (!dm.dead) { wait(); } } } catch (InterruptedException e) { } }
回憶一下 Executor 的任務(wù)是根據(jù) Zookeeper 中 Znode 節(jié)點(diǎn)狀態(tài)改變所觸發(fā)的事件來(lái)啟動(dòng)和停止你在命令行指定的可執(zhí)行程序, 在上面的代碼你可以看到,Executor 類在其構(gòu)造函數(shù)中實(shí)例化 Zookeeper 對(duì)象時(shí),將其自身的引用作為 Watch 參數(shù)傳遞給 Zookeeper 的構(gòu)造函數(shù),同時(shí)它也將其自身的引用作為 DataMonitorListener 參數(shù)傳遞給 DataMonitor 的構(gòu)造函數(shù)。Executor 本身實(shí)現(xiàn)了以下接口:
public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener { ...
Watcher 接口是在ZooKeeper Java API 中定義的。 ZooKeeper 用它來(lái)與“容器”(此處“容器”與上面的 Executor 類相似)進(jìn)行通信,Watcher 只支持一個(gè)方法,即process(), ZooKeeper 用該函數(shù)來(lái)處理主線程可能感興趣的事件,例如 Zookeeper 連接或會(huì)話的狀態(tài),本例中的“容器” Executor只是簡(jiǎn)單地把事件向下傳遞給 DataMonitor,具體如何處理事件是由 DataMonitor 決定的。本文只是簡(jiǎn)單地描述了如何使用 Watcher,通常情況下,Executor 或 與 Executor 類似的對(duì)象擁有 與Zookeeper 服務(wù)端的連接,但它可以將事件傳遞給其他對(duì)象,并有其它的對(duì)象處理該事件。
public void process(WatchedEvent event) { dm.process(event); }
DataMonitorListener 接口本身不是Zookeeper API 的一部分,它完全是一個(gè)自定義的接口,可以說(shuō)是專門(mén)為本程序設(shè)計(jì)的。DataMonitor 對(duì)象使用該接口和“容器”(即 Executor 類)進(jìn)行通信,DataMonitorListener 接口如下:
public interface DataMonitorListener { /** * The existence status of the node has changed. */ void exists(byte data[]); /** * The ZooKeeper session is no longer valid. * * @param rc * the ZooKeeper reason code */ void closing(int rc); }
該接口在 DataMonitor 中定義,Executor 類實(shí)現(xiàn)該接口,當(dāng) Executor.exists() 被調(diào)用的時(shí)候,Executor 決定是否啟動(dòng)或停止事先指定的應(yīng)用程序(回憶一下前文所說(shuō)的,當(dāng) Znode 消失時(shí) Zookeeper 客戶端會(huì)殺死該可執(zhí)行程序)。
當(dāng) Executor.closing() 被調(diào)用的時(shí)候,Executor 會(huì)根據(jù) Zookeeper 連接永久性地消失來(lái)決定是否關(guān)閉自己。
你或許已經(jīng)猜到,DataMonitor 對(duì)象根據(jù) Zookeeper 狀態(tài)變化來(lái)調(diào)用這些方法吧?
以下是 Executor 類中實(shí)現(xiàn) DataMonitorListener.exists() 和 DataMonitorListener.closing()的代碼:
public void exists( byte[] data ) { if (data == null) { if (child != null) { System.out.println("Killing process"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { } } child = null; } else { if (child != null) { System.out.println("Stopping child"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { e.printStackTrace(); } } try { FileOutputStream fos = new FileOutputStream(filename); fos.write(data); fos.close(); } catch (IOException e) { e.printStackTrace(); } try { System.out.println("Starting child"); child = Runtime.getRuntime().exec(exec); new StreamWriter(child.getInputStream(), System.out); new StreamWriter(child.getErrorStream(), System.err); } catch (IOException e) { e.printStackTrace(); } } } public void closing(int rc) { synchronized (this) { notifyAll(); } }
DataMonitor 類
DataMonitor 類是本程序 Zookeeper 邏輯的核心, 它差不多是異步的,并由事件驅(qū)動(dòng)的。DataMonitor 構(gòu)造函數(shù)如下:
public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher, DataMonitorListener listener) { this.zk = zk; this.znode = znode; this.chainedWatcher = chainedWatcher; this.listener = listener; // Get things started by checking if the node exists. We are going // to be completely event driven zk.exists(znode, true, this, null); }
調(diào)用 ZooKeeper.exists() 檢查指定的 Znode 是否存在,并設(shè)置監(jiān)視,傳遞自身引用作為回調(diào)對(duì)象,在某種意義上,在 watch 觸發(fā)時(shí)就會(huì)引起真實(shí)的處理流程。
當(dāng) ZooKeeper.exists() 操作在服務(wù)器端完成時(shí),ZooKeeper API 會(huì)在客戶端調(diào)用 completion callback:
public void processResult(int rc, String path, Object ctx, Stat stat) { boolean exists; switch (rc) { case Code.Ok: exists = true; break; case Code.NoNode: exists = false; break; case Code.SessionExpired: case Code.NoAuth: dead = true; listener.closing(rc); return; default: // Retry errors zk.exists(znode, true, this, null); return; } byte b[] = null; if (exists) { try { b = zk.getData(znode, false, null); } catch (KeeperException e) { // We don't need to worry about recovering now. The watch // callbacks will kick off any exception handling e.printStackTrace(); } catch (InterruptedException e) { return; } } if ((b == null && b != prevData) || (b != null && !Arrays.equals(prevData, b))) { listener.exists(b); prevData = b; } }
上述代碼首先檢查 Znode 是否存在,以及其他重大的不可恢復(fù)的錯(cuò)誤。如果文件(或者Znode)存在,它將從 Znode 獲取數(shù)據(jù),如果狀態(tài)發(fā)生變化再調(diào)用 Executor 的 exists() 回調(diào)函數(shù)。注意,getData 函數(shù)本省必須要做任何的異常處理,因?yàn)楸旧砭陀斜O(jiān)視可以處理任何錯(cuò)誤:如果節(jié)點(diǎn)在調(diào)用 ZooKeeper.getData() 之前被刪除,ZooKeeper.exists() 就會(huì)觸發(fā)回調(diào)函數(shù),如果存在通信錯(cuò)誤,在連接上的監(jiān)視會(huì)在該連接重建之前觸發(fā)相應(yīng)的事件,同時(shí)引發(fā)相應(yīng)的處理。
最后,DataMonitor 處理監(jiān)視事件的代碼如下:
public void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() == Event.EventType.None) { // We are are being told that the state of the // connection has changed switch (event.getState()) { case SyncConnected: // In this particular example we don't need to do anything // here - watches are automatically re-registered with // server and any watches triggered while the client was // disconnected will be delivered (in order of course) break; case Expired: // It's all over dead = true; listener.closing(KeeperException.Code.SessionExpired); break; } } else { if (path != null && path.equals(znode)) { // Something has changed on the node, let's find out zk.exists(znode, true, this, null); } } if (chainedWatcher != null) { chainedWatcher.process(event); } }
如果客戶端 Zookeeper 程序在會(huì)話失效時(shí)(Expired event)重新建立了通信信道(SyncConnected event) ,所有的會(huì)話監(jiān)視會(huì)自動(dòng)和服務(wù)器進(jìn)行重連, (Zookeeper 3.0.0以上版本會(huì)重置之前設(shè)置的監(jiān)視). 更多編程指南請(qǐng)參見(jiàn) ZooKeeper Watches 。 當(dāng) DataMonitor 獲得了指定 Znode 的事件后,它將調(diào)用 ZooKeeper.exists() 來(lái)決定究竟發(fā)生了什么。
完整的程序:
Executor.java:
/** * A simple example program to use DataMonitor to start and * stop executables based on a znode. The program watches the * specified znode and saves the data that corresponds to the * znode in the filesystem. It also starts the specified program * with the specified arguments when the znode exists and kills * the program if the znode goes away. */ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener { String znode; DataMonitor dm; ZooKeeper zk; String filename; String exec[]; Process child; public Executor(String hostPort, String znode, String filename, String exec[]) throws KeeperException, IOException { this.filename = filename; this.exec = exec; zk = new ZooKeeper(hostPort, 3000, this); dm = new DataMonitor(zk, znode, null, this); } /** * @param args */ public static void main(String[] args) { if (args.length < 4) { System.err .println("USAGE: Executor hostPort znode filename program [args ...]"); System.exit(2); } String hostPort = args[0]; String znode = args[1]; String filename = args[2]; String exec[] = new String[args.length - 3]; System.arraycopy(args, 3, exec, 0, exec.length); try { new Executor(hostPort, znode, filename, exec).run(); } catch (Exception e) { e.printStackTrace(); } } /*************************************************************************** * We do process any events ourselves, we just need to forward them on. * * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent) */ public void process(WatchedEvent event) { dm.process(event); } public void run() { try { synchronized (this) { while (!dm.dead) { wait(); } } } catch (InterruptedException e) { } } public void closing(int rc) { synchronized (this) { notifyAll(); } } static class StreamWriter extends Thread { OutputStream os; InputStream is; StreamWriter(InputStream is, OutputStream os) { this.is = is; this.os = os; start(); } public void run() { byte b[] = new byte[80]; int rc; try { while ((rc = is.read(b)) > 0) { os.write(b, 0, rc); } } catch (IOException e) { } } } public void exists(byte[] data) { if (data == null) { if (child != null) { System.out.println("Killing process"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { } } child = null; } else { if (child != null) { System.out.println("Stopping child"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { e.printStackTrace(); } } try { FileOutputStream fos = new FileOutputStream(filename); fos.write(data); fos.close(); } catch (IOException e) { e.printStackTrace(); } try { System.out.println("Starting child"); child = Runtime.getRuntime().exec(exec); new StreamWriter(child.getInputStream(), System.out); new StreamWriter(child.getErrorStream(), System.err); } catch (IOException e) { e.printStackTrace(); } } } }
DataMonitor.java:
/** * A simple class that monitors the data and existence of a ZooKeeper * node. It uses asynchronous ZooKeeper APIs. */ import java.util.Arrays; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.Stat; public class DataMonitor implements Watcher, StatCallback { ZooKeeper zk; String znode; Watcher chainedWatcher; boolean dead; DataMonitorListener listener; byte prevData[]; public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher, DataMonitorListener listener) { this.zk = zk; this.znode = znode; this.chainedWatcher = chainedWatcher; this.listener = listener; // Get things started by checking if the node exists. We are going // to be completely event driven zk.exists(znode, true, this, null); } /** * Other classes use the DataMonitor by implementing this method */ public interface DataMonitorListener { /** * The existence status of the node has changed. */ void exists(byte data[]); /** * The ZooKeeper session is no longer valid. * * @param rc * the ZooKeeper reason code */ void closing(int rc); } public void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() == Event.EventType.None) { // We are are being told that the state of the // connection has changed switch (event.getState()) { case SyncConnected: // In this particular example we don't need to do anything // here - watches are automatically re-registered with // server and any watches triggered while the client was // disconnected will be delivered (in order of course) break; case Expired: // It's all over dead = true; listener.closing(KeeperException.Code.SessionExpired); break; } } else { if (path != null && path.equals(znode)) { // Something has changed on the node, let's find out zk.exists(znode, true, this, null); } } if (chainedWatcher != null) { chainedWatcher.process(event); } } public void processResult(int rc, String path, Object ctx, Stat stat) { boolean exists; switch (rc) { case Code.Ok: exists = true; break; case Code.NoNode: exists = false; break; case Code.SessionExpired: case Code.NoAuth: dead = true; listener.closing(rc); return; default: // Retry errors zk.exists(znode, true, this, null); return; } byte b[] = null; if (exists) { try { b = zk.getData(znode, false, null); } catch (KeeperException e) { // We don't need to worry about recovering now. The watch // callbacks will kick off any exception handling e.printStackTrace(); } catch (InterruptedException e) { return; } } if ((b == null && b != prevData) || (b != null && !Arrays.equals(prevData, b))) { listener.exists(b); prevData = b; } } }
總結(jié)
本文關(guān)于Apache Zookeeper使用方法實(shí)例詳解的介紹就到這里,希望對(duì)大家有所幫助。如果有什么問(wèn)題可以留言,小編會(huì)及時(shí)回復(fù)大家的,感謝大家對(duì)億速云網(wǎng)站的支持!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。