溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

ZooKeeper Java API編程的示例分析

發(fā)布時(shí)間:2021-08-05 14:40:09 來源:億速云 閱讀:95 作者:小新 欄目:編程語(yǔ)言

這篇文章主要為大家展示了“ZooKeeper Java API編程的示例分析”,內(nèi)容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“ZooKeeper Java API編程的示例分析”這篇文章吧。

開發(fā)應(yīng)用程序的ZooKeeper Java綁定主要由兩個(gè)Java包組成:

org.apache.zookeeper

org.apache.zookeeper.data

org.apache.zookeeper包由ZooKeeper監(jiān)視的接口定義和ZooKeeper的各種回調(diào)處理程序組成。 它定義了ZooKeeper客戶端類庫(kù)的主要類以及許多ZooKeeper事件類型和狀態(tài)的靜態(tài)定義。 org.apache.zookeeper.data包定義了與數(shù)據(jù)寄存器(也稱為znode)相關(guān)的特性,例如訪問控制列表(ACL),IDs,stats等。

ZooKeeper Java API中的org.apache.zookeeper.server,org.apache.zookeeper.server.quorum和org.apache.zookeeper.server.upgrade包是服務(wù)器實(shí)現(xiàn)的一部分。 org.apache.zookeeper.client包用于查詢ZooKeeper服務(wù)器的狀態(tài)。

一 準(zhǔn)備開發(fā)環(huán)境

Apache ZooKeeper是一個(gè)復(fù)雜的軟件,因此它需要運(yùn)行許多其他類庫(kù)。 依賴庫(kù)作為jar文件在ZooKeeper發(fā)行版中附帶在lib目錄中。 核心ZooKeeper jar文件名字為zookeeper-3.4.6.jar,位于主目錄下。

要開發(fā)Java的ZooKeeper應(yīng)用程序,我們必須設(shè)置指向ZooKeeper jar的類路徑,以及ZooKeeper所依賴的所有第三方庫(kù)。在 bin 目錄下有一個(gè) zkEnv.sh文件,可以用來設(shè)置CLASSPATH。

我們需要將腳本如下設(shè)置,在命令行中執(zhí)行以下語(yǔ)句:

$ ZOOBINDIR=${ZK_HOME}/bin
$ source ${ZOOBINDIR}/zkEnv.sh

shell變量ZK_HOME被設(shè)置為安裝ZooKeeper的路徑,在我的設(shè)置中,它是/usr/share/zookeeper。 之后,CLASSPATH變量被正確設(shè)置,在我的系統(tǒng)中,如下所示:

$ echo $CLASSPATH 
/usr/share/zookeeper-3.4.6/bin/../build/classes :/usr/share/zookeeper-3.4.6/bin/../build/lib/*.jar :/usr/share/zookeeper-3.4.6/bin/../lib/slf4j-log4j12-1.6.1.jar :/usr/share/zookeeper-3.4.6/bin/../lib/slf4j-api-1.6.1.jar :/usr/share/zookeeper-3.4.6/bin/../lib/netty-3.7.0.Final.jar :/usr/share/zookeeper-3.4.6/bin/../lib/log4j-1.2.16.jar :/usr/share/zookeeper-3.4.6/bin/../lib/jline-0.9.94.jar :/usr/share/zookeeper-3.4.6/bin/../zookeeper-3.4.6.jar :/usr/share/zookeeper-3.4.6/bin/../src/java/lib/*.jar :/usr/share/zookeeper-3.4.6/bin/../conf:

在Windows操作系統(tǒng)中,需要運(yùn)行zkEnv.cmd腳本。 現(xiàn)在可以使用CLASSPATH變量來編譯和運(yùn)行使用ZooKeeper API編寫的Java程序。 可以在Uni/Linux中的主目錄的.bashrc文件中找到zkEnv.sh腳本,避免每次啟動(dòng)shell會(huì)話時(shí)都采用它。

二 第一個(gè)ZooKeeper程序

為了引入ZooKeeper Java API,讓我們從一個(gè)非常簡(jiǎn)單的程序開始,它可以連接到localhost中的ZooKeeper實(shí)例,如果連接成功,它將在ZooKeeper名稱空間的根路徑下打印znode的列表。

這個(gè)程序的代碼如下所示:

/*Our First ZooKeeper Program*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
public class HelloZooKeeper {
public static void main(String[] args) throws IOException {
String hostPort = "localhost:2181";
String zpath = "/";
List <String> zooChildren = new ArrayList<String>();
ZooKeeper zk = new ZooKeeper(hostPort, 2000, null);
if (zk != null) {
try {
zooChildren = zk.getChildren(zpath, false);
System.out.println("Znodes of '/': ");
for (String child: zooChildren) {
//print the children
System.out.println(child);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

在構(gòu)建和執(zhí)行前面的代碼片段之前,讓我們來看看它具體做了什么。代碼從導(dǎo)入語(yǔ)句開始。使用這些語(yǔ)句,我們導(dǎo)入了程序各個(gè)組件所需的包。如前所述,org.apache.zookeeper包包含客戶端與ZooKeeper服務(wù)器進(jìn)行交互所需的所有類和接口。在導(dǎo)入包之后,定義了一個(gè)名為HelloZooKeeper的類。由于我們要連接到在同一系統(tǒng)中運(yùn)行的ZooKeeper實(shí)例,在main方法中將主機(jī)和端口字符串定義為localhost:2181。代碼行zk = new ZooKeeper(hostPort, 2000, null)調(diào)用ZooKeeper構(gòu)造方法,該構(gòu)造方法嘗試連接到ZooKeeper服務(wù)器并返回一個(gè)引用。對(duì)于連接到ZooKeeper服務(wù)器實(shí)例并維護(hù)該連接的客戶端程序,需要維護(hù)一個(gè)實(shí)時(shí)會(huì)話。在此例中,構(gòu)造方法實(shí)例化的zk對(duì)象返回的引用表示這個(gè)會(huì)話。 ZooKeeper API是圍繞這個(gè)引用構(gòu)建的,每個(gè)方法調(diào)用都需要一個(gè)引用來執(zhí)行。

ZooKeeper類的構(gòu)造方法使用以下代碼創(chuàng)建ZooKeeper實(shí)例的引用:

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

使用的參數(shù)含義如下:

connectString:以逗號(hào)分隔的主機(jī):端口號(hào)列表,每個(gè)對(duì)應(yīng)一個(gè)ZooKeeper服務(wù)器。 例如,10.0.0.1:2001,10.0.0.2:2002和10.0.0.3:2003表示三個(gè)節(jié)點(diǎn)的ZooKeeper ensemble的有效的主機(jī):端口匹配對(duì)。 sessionTimeout:這是以毫秒為單位的會(huì)話超時(shí)時(shí)間。這是ZooKeeper在宣布session結(jié)束之前,沒有從客戶端獲得心跳的時(shí)間。 watcher:一個(gè)watcher對(duì)象,如果創(chuàng)建,當(dāng)狀態(tài)改變和發(fā)生節(jié)點(diǎn)事件時(shí)會(huì)收到通知。這個(gè)watcher對(duì)象需要通過一個(gè)用戶定義的類單獨(dú)創(chuàng)建,通過實(shí)現(xiàn)Watcher接口并將實(shí)例化的對(duì)象傳遞給ZooKeeper構(gòu)造方法??蛻舳藨?yīng)用程序可以收到各種類型的事件的通知,例如連接丟失、會(huì)話過期等。

ZooKeeper Java API定義了另外帶有三個(gè)參數(shù)的構(gòu)造方法,以指定更高級(jí)的操作。代碼如下:

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)

在ZooKeeper類的上面的構(gòu)造方法中,如果設(shè)置為true,boolean canBeReadOnly參數(shù)允許創(chuàng)建的客戶端在網(wǎng)絡(luò)分區(qū)的情況下進(jìn)入只讀模式。只讀模式是客戶端無法找到任何多數(shù)服務(wù)器的場(chǎng)景,但有一個(gè)可以到達(dá)的分區(qū)服務(wù)器,以只讀模式連接到它,這樣就允許對(duì)服務(wù)器的讀取請(qǐng)求,而寫入請(qǐng)求則不允許??蛻舳死^續(xù)嘗試在后臺(tái)連接到大多數(shù)服務(wù)器,同時(shí)仍然保持只讀模式。分區(qū)服務(wù)器僅僅是ZooKeeper組的一個(gè)子集,它是由于集群中的網(wǎng)絡(luò)分配而形成的。大多數(shù)服務(wù)器構(gòu)成了ensemble中的大多數(shù)quorum。

以下構(gòu)造方法顯示了兩個(gè)附加參數(shù)的定義:

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)

這個(gè)構(gòu)造方法允許ZooKeeper客戶端對(duì)象創(chuàng)建兩個(gè)額外的參數(shù):

sessionId:在客戶端重新連接到ZooKeeper服務(wù)器的情況下,可以使用特定的會(huì)話ID來引用先前連接的會(huì)話 sessionPasswd:如果指定的會(huì)話需要密碼,可以在這里指定

以下構(gòu)造方法是前兩個(gè)調(diào)用的組合:

ZooKeeper(String connectString, int sessionTimeout,Watcher watcher, long sessionId, byte[] sessionPasswd,boolean canBeReadOnly)

此構(gòu)造方法是前兩個(gè)調(diào)用的組合,允許在啟用只讀模式的情況下重新連接到指定的會(huì)話。

Note
ZooKeeper類的詳細(xì)Java API文檔可以在http://zookeeper.apache.org/doc/r3.4.6/api/index.html上查詢。

現(xiàn)在,回到我們的ZooKeeper程序。 在調(diào)用構(gòu)造方法后,如果連接成功,我們將得到ZooKeeper服務(wù)器的引用。 我們通過下面的代碼將引用傳遞給getChildren方法:

zooChildren = zk.getChildren(zpath, false)

ZooKeeper類的getChildren(String path,boolean watch)方法返回給定路徑上znode的子級(jí)列表。 我們只是迭代這個(gè)方法返回的列表,并將字符串打印到控制臺(tái)。

將程序命名為HelloZooKeeper.java,并編譯我們的程序如下:

$ javac -cp $CLASSPATH HelloZooKeeper.java

在我們運(yùn)行的程序之前,需要使用以下命令來啟動(dòng)ZooKeeper服務(wù)器實(shí)例:

$ ${ZK_HOME}/bin/zkServer.sh start

運(yùn)行程序如下:

$ java -cp $CLASSPATH HelloZooKeeper

執(zhí)行程序會(huì)在控制臺(tái)上打印日志消息,顯示所使用的ZooKeeper版本,Java版本,Java類路徑,服務(wù)器體系結(jié)構(gòu)等等。 這里顯示了這些日志消息的一部分:

ZooKeeper Java API編程的示例分析

ZooKeeper Java API生成的日志消息對(duì)調(diào)試非常有用。 它為我們提供了關(guān)于客戶端連接到ZooKeeper服務(wù)器,建立會(huì)話等后臺(tái)得信息。 上面顯示的最后三條日志消息告訴我們客戶端如何使用程序中指定的參數(shù)來啟動(dòng)連接,以及在成功連接后,服務(wù)器如何為客戶端分配會(huì)話ID。

最后,程序執(zhí)行最后在控制臺(tái)中輸出以下內(nèi)容:

ZooKeeper Java API編程的示例分析

我們可以使用ZooKeeper shell來驗(yàn)證程序的正確性:

$ $ZK_HOME/bin/zkCli.sh -server localhost

ZooKeeper Java API編程的示例分析

恭喜! 我們剛剛成功編寫了我們的第一個(gè)ZooKeeper客戶端程序。

二 實(shí)現(xiàn)Watcher接口

ZooKeeper Watcher監(jiān)視使客戶端能夠接收來自ZooKeeper服務(wù)器的通知,并在發(fā)生時(shí)處理這些事件。 ZooKeeper Java API提供了一個(gè)名為Watcher的公共接口,客戶端事件處理程序類必須實(shí)現(xiàn)該接口才能接收有關(guān)來自ZooKeeper服務(wù)器的事件通知。 以編程方式,使用這種客戶端的應(yīng)用程序通過向客戶端注冊(cè)回調(diào)(callback)對(duì)象來處理這些事件。

我們將實(shí)現(xiàn)Watcher接口,處理與znode關(guān)聯(lián)的數(shù)據(jù)更改時(shí)由ZooKeeper生成的事件。

Watcher接口在org.apache.zookeeper包中聲明如下:

public interface Watcher {
void process(WatchedEvent event);
}

為了演示znode數(shù)據(jù)監(jiān)視器(Watcher),有兩個(gè)Java類:DataWatcher和DataUpdater。 DataWatcher將一直運(yùn)行,并在/MyConfig指定znode路徑中偵聽來自ZooKeeper服務(wù)器的NodeDataChange事件。DataUpdater類將定期更新此znode路徑中的數(shù)據(jù)字段,這將生成事件,并且在接收到這些事件后,DataWatcher類將把更改后的數(shù)據(jù)打印到控制臺(tái)上。

以下是DataWatcher.java類的代碼:

import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
public class DataWatcher implements Watcher, Runnable {
private static String hostPort = "localhost:2181";
private static String zooDataPath = "/MyConfig";
byte zoo_data[] = null;
ZooKeeper zk;

public DataWatcher() {
try {
zk = new ZooKeeper(hostPort, 2000, this);
if (zk != null) {
try {
//Create the znode if it doesn't exist, with the following code:
if (zk.exists(zooDataPath, this) == null) {
zk.create(zooDataPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void printData() throws InterruptedException, KeeperException {
zoo_data = zk.getData(zooDataPath, this, null);
String zString = new String(zoo_data);
// The following code prints the current content of the znode to the console:
System.out.printf("\nCurrent Data @ ZK Path %s: %s", zooDataPath, zString);
}
@Override
public void process(WatchedEvent event) {
System.out.printf(
"\nEvent Received: %s", event.toString());
//We will process only events of type NodeDataChanged
if (event.getType() == Event.EventType.NodeDataChanged) {
try {
printData();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args)
throws InterruptedException, KeeperException {
DataWatcher dataWatcher = new DataWatcher();
dataWatcher.printData();
dataWatcher.run();
}
public void run() {
try {
synchronized (this) {
while (true) {
wait();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}

我們來看一下DataWatcher.java類的代碼來理解一個(gè)ZooKeeper監(jiān)視器的實(shí)現(xiàn)。 DataWatcher公共類實(shí)現(xiàn)Watcher接口以及Runnable接口,打算將監(jiān)視器作為線程運(yùn)行。 main方法創(chuàng)建DataWatcher類的一個(gè)實(shí)例。 在前面的代碼中,DataWatcher構(gòu)造方法嘗試連接到在本地主機(jī)上運(yùn)行的ZooKeeper實(shí)例。 如果連接成功,我們用下面的代碼檢查znode路徑/MyConfig是否存在:

if (zk.exists(zooDataPath, this) == null) {

如果znode不存在ZooKeeper命名空間中,那么exists方法調(diào)用將返回null,并且嘗試使用代碼將其創(chuàng)建為持久化znode,如下所示:

zk.create(zooDataPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

接下來是process方法,它在org.apache.ZooKeeper的Watcher接口中聲明,并由DataWatcher類使用以下代碼實(shí)現(xiàn):

public void process(WatchedEvent event) {

為了簡(jiǎn)單起見,在process方法中,打印從ZooKeeper實(shí)例接收的事件,并僅對(duì)NodeDataChanged類型的事件進(jìn)行進(jìn)一步處理,如下所示:

if (event.getType() == Event.EventType.NodeDataChanged)

當(dāng)znode路徑/MyConfig的數(shù)據(jù)字段發(fā)生任何更新或更改而收到NodeDataChanged類型的事件時(shí),調(diào)用printData方法來打印znode的當(dāng)前內(nèi)容。 在znode上執(zhí)行一個(gè)getData調(diào)用時(shí),我們?cè)俅卧O(shè)置一個(gè)監(jiān)視,這是該方法的第二個(gè)參數(shù),如下面的代碼所示:

zoo_data = zk.getData(zooDataPath, this, null);

監(jiān)視事件是發(fā)送給設(shè)置監(jiān)視的客戶端的一次性觸發(fā)器,為了不斷接收進(jìn)一步的事件通知,客戶端應(yīng)該重置監(jiān)視器。

DataUpdater.java是一個(gè)簡(jiǎn)單的類,它連接到運(yùn)行本地主機(jī)的ZooKeeper實(shí)例,并用隨機(jī)字符串更新znode路徑/MyConfig的數(shù)據(jù)字段。 在這里,我們選擇使用通用唯一標(biāo)識(shí)符(UUID)字符串更新znode,因?yàn)楹罄m(xù)的UUID生成器調(diào)用將保證生成唯一的字符串。

DataUpdater.java類代碼如下:

import java.io.IOException;
import java.util.UUID;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class DataUpdater implements Watcher {
private static String hostPort = "localhost:2181";
private static String zooDataPath = "/MyConfig";
ZooKeeper zk;
public DataUpdater() throws IOException {
try {
zk = new ZooKeeper(hostPort, 2000, this);
} catch (IOException e) {
e.printStackTrace();
}
}

// updates the znode path /MyConfig every 5 seconds with a new UUID string.
public void run() throws InterruptedException, KeeperException {
while (true) {
String uuid = UUID.randomUUID().toString();
byte zoo_data[] = uuid.getBytes();
zk.setData(zooDataPath, zoo_data, -1);
try {
Thread.sleep(5000); // Sleep for 5 secs
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

public static void main(String[] args) throws
IOException, InterruptedException, KeeperException {
DataUpdater dataUpdater = new DataUpdater();
dataUpdater.run();
}
@Override
public void process(WatchedEvent event) {
System.out.printf("\nEvent Received: %s", event.toString());
}
}

上面的代碼使ZooKeeper服務(wù)器觸發(fā)一個(gè)NodeDataChanged事件。 由于DataWatcher為此znode路徑設(shè)置了監(jiān)視,因此它會(huì)接收數(shù)據(jù)更改事件的通知。 然后它檢索更新的數(shù)據(jù),重置監(jiān)視,并在控制臺(tái)上打印數(shù)據(jù)。

使用以下命令編譯DataWatcher和DataUpdater類:

$ javac –cp $CLASSPATH DataWatcher.java
$ javac –cp $CLASSPATH DataUpdater.java

要執(zhí)行監(jiān)視器和更新程序,需要打開兩個(gè)終端窗口。 我要先運(yùn)行監(jiān)視器,因?yàn)樗鼊?chuàng)建了/MyConfig的znode(如果還未在ZooKeeper的命名空間中創(chuàng)建的話)。 運(yùn)行監(jiān)視器之前,請(qǐng)確保ZooKeeper服務(wù)器在本地主機(jī)上已經(jīng)運(yùn)行。

在其中一個(gè)終端窗口中,通過運(yùn)行以下命令來執(zhí)行watcher類:

$ java –cp $CLASSPATH DataWatcher

輸出類似于以下屏幕截圖所示的消息:
ZooKeeper Java API編程的示例分析

如前面的截圖所示,znode路徑/MyConfig是由DataWatcher類創(chuàng)建的。 它也打印znode的內(nèi)容,但沒有打印在控制臺(tái)中,因?yàn)槲覀冊(cè)趧?chuàng)建znode時(shí)沒有設(shè)置任何數(shù)據(jù)。 當(dāng)znode被創(chuàng)建時(shí),類中的監(jiān)視者收到了NodeCreated類型的事件通知,這個(gè)通知被打印在控制臺(tái)中。 DataWatcher類繼續(xù)運(yùn)行,并從ZooKeeper服務(wù)器偵聽/MyConfig節(jié)點(diǎn)上的事件。

讓我們?cè)诹硪粋€(gè)終端窗口中運(yùn)行DataUpdater類:

$ java -cp $CLASSPATH DataUpdater

將最初的ZooKeeper特定日志消息記錄到控制臺(tái)后,DataUpdater類運(yùn)行時(shí)沒有提示。 它將一個(gè)新的UUID字符串設(shè)置到ZooKeeper路徑/MyConfig的數(shù)據(jù)字段中。 因此,看到每隔5秒鐘,在下面的屏幕截圖中顯示的輸出內(nèi)容打印在運(yùn)行DataWatche的終端窗口中:

ZooKeeper Java API編程的示例分析

DataWatcher也可以使用ZooKeeper shell進(jìn)行測(cè)試。 繼續(xù)像以前一樣在終端中運(yùn)行DataWatcher類,并在另一個(gè)終端中調(diào)用ZooKeeper shell并運(yùn)行以下屏幕截圖中所示的命令:

ZooKeeper Java API編程的示例分析

在DataWatcher正在運(yùn)行的終端中,將打印以下消息:

ZooKeeper Java API編程的示例分析

三 示例——群集監(jiān)視器

通過互聯(lián)網(wǎng)提供的流行服務(wù),如電子郵件,文件服務(wù)平臺(tái),在線游戲等,都是通過跨越多個(gè)數(shù)據(jù)中心的高度可用的成百上千臺(tái)服務(wù)器來服務(wù)的,而這些服務(wù)器通常在地理位置上分開。 在這種集群中,設(shè)置了一些專用的服務(wù)器節(jié)點(diǎn)來監(jiān)視生產(chǎn)網(wǎng)絡(luò)中承載服務(wù)或應(yīng)用程序的服務(wù)器的活躍性。 在云計(jì)算環(huán)境中,也用于管理云環(huán)境的這種監(jiān)控節(jié)點(diǎn)被稱為云控制器。 這些控制器節(jié)點(diǎn)的一個(gè)重要工作是實(shí)時(shí)檢測(cè)生產(chǎn)服務(wù)器的故障,并相應(yīng)地通知管理員,并采取必要的措施,例如將故障服務(wù)器上的應(yīng)用程序故障轉(zhuǎn)移到另一個(gè)服務(wù)器,從而確保容錯(cuò)性和高可用性。

在本節(jié)中,我們將使用ZooKeeper Java客戶端API開發(fā)一個(gè)簡(jiǎn)約的分布式集群監(jiān)視器模型。 使用ZooKeeper的ephemeral znode概念來構(gòu)建這個(gè)監(jiān)視模型相當(dāng)簡(jiǎn)單和優(yōu)雅,如以下步驟所述:

每個(gè)生產(chǎn)服務(wù)器運(yùn)行一個(gè)ZooKeeper客戶端作為守護(hù)進(jìn)程。 這個(gè)過程連接到ZooKeeper服務(wù)器,并在/ZooKeeper命名空間的預(yù)定義路徑(比如/Members)下創(chuàng)建一個(gè)帶有名稱(最好是其網(wǎng)絡(luò)名稱或主機(jī)名)的ephemeral znode。云控制器節(jié)點(diǎn)運(yùn)行ZooKeeper監(jiān)視器進(jìn)程,該進(jìn)程監(jiān)視路徑/Members并監(jiān)聽NodeChildrenChanged類型的事件。 這個(gè)監(jiān)視器進(jìn)程作為服務(wù)或守護(hù)進(jìn)程運(yùn)行,并設(shè)置或重置路徑上的監(jiān)視,并且實(shí)現(xiàn)其邏輯以調(diào)用適當(dāng)?shù)哪K來為監(jiān)視事件采取必要的行動(dòng)?,F(xiàn)在,如果生產(chǎn)服務(wù)器由于硬件故障或軟件崩潰而關(guān)閉,ZooKeeper客戶端進(jìn)程就會(huì)被終止,導(dǎo)致服務(wù)器和ZooKeeper服務(wù)之間的會(huì)話被終止。 由于ephemeral znode的屬性唯一,每當(dāng)客戶端連接關(guān)閉時(shí),ZooKeeper服務(wù)會(huì)自動(dòng)刪除路徑/Members中的znode。路徑中znode的刪除引發(fā)了NodeChildrenChanged事件,因此云控制器中的觀察器進(jìn)程會(huì)收到通知。 通過調(diào)用路徑/Members中的getChildren方法,可以確定哪個(gè)服務(wù)器節(jié)點(diǎn)已經(jīng)關(guān)閉。然后,控制器節(jié)點(diǎn)可以采取適當(dāng)?shù)拇胧?,比如?zhí)行恢復(fù)邏輯以重啟另一臺(tái)服務(wù)器中的故障服務(wù)。這個(gè)邏輯可以構(gòu)建為實(shí)時(shí)工作,保證接近于零停機(jī)的時(shí)間和高度可用的服務(wù)。

為實(shí)現(xiàn)這個(gè)集群監(jiān)控模型,我們將開發(fā)兩個(gè)Java類。 ClusterMonitor類將持續(xù)運(yùn)行監(jiān)視器,以監(jiān)視ZooKeeper樹中的路徑/Members。 處理完引發(fā)事件后,我們將在控制臺(tái)中打印znode列表并重置監(jiān)視。 另一個(gè)類ClusterClient將啟動(dòng)到ZooKeeper服務(wù)器的連接,在/Members下創(chuàng)建一個(gè)ephemeral znode。

要模擬具有多個(gè)節(jié)點(diǎn)的集群,我們?cè)谕慌_(tái)計(jì)算機(jī)上啟動(dòng)多個(gè)客戶端,并使用客戶端進(jìn)程的進(jìn)程ID創(chuàng)建ephemeral znode。 通過查看進(jìn)程標(biāo)識(shí),ClusterMonitor類可以確定哪個(gè)客戶進(jìn)程已經(jīng)關(guān)閉,哪些進(jìn)程還在。 在實(shí)際情況中,客戶端進(jìn)程通常會(huì)使用當(dāng)前正在運(yùn)行的服務(wù)器的主機(jī)名創(chuàng)建ephemeral znode。 下面顯示了這兩個(gè)類的源代碼。

ClusterMonitor.java類定義如下:

import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class ClusterMonitor implements Runnable {
private static String membershipRoot = "/Members";
private final Watcher connectionWatcher;
private final Watcher childrenWatcher;
private ZooKeeper zk;
boolean alive=true;
public ClusterMonitor(String HostPort) throws IOException, InterruptedException, KeeperException {
connectionWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getType()==Watcher.Event.EventType.None && event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.printf("\nEvent Received: %s", event.toString());
}
}
};
childrenWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.printf("\nEvent Received: %s", event.toString());
if (event.getType() == Event.EventType.NodeChildrenChanged) {
try {
//Get current list of child znode, 
//reset the watch
List<String> children = zk.getChildren( membershipRoot, this);
wall("!!!Cluster Membership Change!!!");
wall("Members: " + children);
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
alive = false;
throw new RuntimeException(e);
}
}
}
};
zk = new ZooKeeper(HostPort, 2000, connectionWatcher);
// Ensure the parent znode exists
if(zk.exists(membershipRoot, false) == null) {
zk.create(membershipRoot, "ClusterMonitorRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

// Set a watch on the parent znode
List<String> children = zk.getChildren(membershipRoot, childrenWatcher);
System.err.println("Members: " + children);
}
public synchronized void close() {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void wall (String message) {
System.out.printf("\nMESSAGE: %s", message);
}
public void run() {
try {
synchronized (this) {
while (alive) {
wait();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
this.close();
}
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
if (args.length != 1) {
System.err.println("Usage: ClusterMonitor <Host:Port>");
System.exit(0);
}
String hostPort = args[0];
new ClusterMonitor(hostPort).run();
}
}

ClusterClient.java類定義如下:

import java.io.IOException;
import java.lang.management.ManagementFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class ClusterClient implements Watcher, Runnable {
private static String membershipRoot = "/Members";
ZooKeeper zk;
public ClusterClient(String hostPort, Long pid) {
String processId = pid.toString();
try {
zk = new ZooKeeper(hostPort, 2000, this);
} catch (IOException e) {
e.printStackTrace();
}
if (zk != null) {
try {
zk.create(membershipRoot + '/' + processId, processId.getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (
KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
}
public synchronized void close() {
try {
zk.close();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public void process(WatchedEvent event) {
System.out.printf("\nEvent Received: %s", event.toString());
}

public void run() {
try {
synchronized (this) {
while (true) {
wait();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
this.close();
}
}
public static void main(String[] args) {
if (args.length != 1) {
System.err.println("Usage: ClusterClient <Host:Port>");
System.exit(0);
}
String hostPort = args[0];
//Get the process id
String name = ManagementFactory.getRuntimeMXBean().getName();
int index = name.indexOf('@');
Long processId = Long.parseLong(name.substring(0, index));
new ClusterClient(hostPort, processId).run();
}
}

使用下面命令編譯這兩個(gè)類:

$ javac -cp $CLASSPATH ClusterMonitor.java
$ javac -cp $CLASSPATH ClusterClient.java

要執(zhí)行群集監(jiān)控模型,打開兩個(gè)終端。 在其中一個(gè)終端中,運(yùn)行ClusterMonitor類。 在另一個(gè)終端中,通過在后臺(tái)運(yùn)行ClusterClient類來執(zhí)行多個(gè)實(shí)例。

在第一個(gè)終端中,執(zhí)行ClusterMonitor類:

$ java -cp $CLASSPATH ClusterMonitorlocalhost:2181

如前面的示例所示,看到來自客戶端API的調(diào)試日志消息,最后,ClusterMonitor類開始監(jiān)視事件,輸入如下內(nèi)容:
ZooKeeper Java API編程的示例分析

現(xiàn)在,執(zhí)行ClusterClient類的五個(gè)實(shí)例來模擬一個(gè)集群的五個(gè)節(jié)點(diǎn)。ClusterClient在ZooKeeper樹的/Members路徑中使用自己的進(jìn)程ID創(chuàng)建ephemeral znode:

$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
[1] 4028
$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
[2] 4045
$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
[3] 4057
$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
[4] 4072
$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
[5] 4084

與此相對(duì)應(yīng),將觀察到ClusterMonitor類檢測(cè)到這些新的ClusterClient類實(shí)例,因?yàn)樗诒O(jiān)視ZooKeeper樹的/Members路徑上的事件。 這模擬了一個(gè)真正的集群中的節(jié)點(diǎn)加入事件。 可以在ClusterMonitor類的終端中看到輸出,這與下面的截圖中顯示的類似:

ZooKeeper Java API編程的示例分析

現(xiàn)在,如果殺死一個(gè)ClusterClient.java進(jìn)程,那么它與ZooKeeper服務(wù)器一起維護(hù)的會(huì)話將被終止。因此,客戶端創(chuàng)建的ephemeral znode將被刪除。刪除將觸發(fā)NodeChildrenChanged事件,該事件將被ClusterMonitor類捕獲。該模擬在集群中一個(gè)節(jié)點(diǎn)離開的場(chǎng)景。

讓我們用ID 4084終止ClusterClient進(jìn)程:

$ kill -9 4084

以下屏幕截圖顯示了ClusterMonitor類的終端中的輸出。 它列出了當(dāng)前可用的進(jìn)程及其進(jìn)程ID,這些進(jìn)程ID模擬了實(shí)時(shí)服務(wù)器:

ZooKeeper Java API編程的示例分析

上面的簡(jiǎn)單而優(yōu)雅的集群監(jiān)控模型的示例實(shí)現(xiàn)展示了ZooKeeper的真正威力。 在沒有ZooKeeper的情況下,開發(fā)這樣一個(gè)能夠?qū)崟r(shí)監(jiān)控節(jié)點(diǎn)活躍度的模型將是一項(xiàng)真正的艱巨任務(wù)。

以上是“ZooKeeper Java API編程的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI