您好,登錄后才能下訂單哦!
ZooKeeper構(gòu)建配置服務(wù)
* 配置服務(wù)是分布式應(yīng)用所需要的基本服務(wù)之一,它使集群中的機(jī)器可以共享配置信息中那些公共的部分。
* 簡(jiǎn)單的說(shuō),ZooKeeper可以作為一個(gè)具有高可用性的配置存儲(chǔ)器,允許分布式應(yīng)用的參與者檢索和更新配置文件。
* 使用ZooKeeper中的觀察機(jī)制,可以建立一個(gè)活躍的配置服務(wù),使那些感興趣的客戶端能夠獲得配置信息修改的通知。
在每個(gè)znode上存儲(chǔ)一個(gè)鍵值對(duì),ActiveKeyValueStore 提供了從zookeeper服務(wù)上寫和讀取鍵值方法。
public class ActiveKeyValueStore extends ConnectionWatcher{ private static final Charset CHARSET =Charset.forName("GBk"); private static final int MAX_RETRIES = 5; private static final long RETRY_PERIOD_SECONDS = 60; public void write(String path, String value) throws Exception{ int retries = 0; while(true){ try { Stat stat = zk.exists(path, false); if(stat == null){ zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }else{ zk.setData(path, value.getBytes(CHARSET), -1); } } catch (KeeperException.SessionExpiredException e) { throw e; }catch(KeeperException e){ if(retries++ == MAX_RETRIES){ throw e; } TimeUnit.SECONDS.sleep(RETRY_PERIOD_SECONDS); } } } public String read(String path, Watcher watcher) throws Exception{ byte[] data = zk.getData(path, watcher, null); return new String(data, CHARSET); } }
與zookeeper服務(wù)創(chuàng)建連接
public class ConnectionWatcher implements Watcher{ private static final int SESSION_TIMEOUT = 5000; protected ZooKeeper zk; private CountDownLatch connectedSignal = new CountDownLatch(1); public void connect(String hosts) throws Exception{ //創(chuàng)建zookeeper實(shí)例的時(shí)候會(huì)啟動(dòng)一個(gè)線程連接到zookeeper服務(wù)。 zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); connectedSignal.await(); } //當(dāng)客戶端已經(jīng)與zookeeper建立連接后,Watcher的process方法會(huì)被調(diào)用。 public void process(WatchedEvent event) { if(event.getState() == KeeperState.SyncConnected){ connectedSignal.countDown(); } } public void close() throws Exception{ zk.close(); } }
ResilientConfigUpdater類提供了管理更新配置信息方法。
public class ResilientConfigUpdater { public static final String PATH = "/config"; private ActiveKeyValueStore store; private Random random = new Random(); public ResilientConfigUpdater(String hosts) throws Exception{ store = new ActiveKeyValueStore(); store.connect(hosts); } public void run() throws Exception{ while(true){ String value = random.nextInt(100)+""; store.write(PATH, value); System.out.printf("Set %s to %s\n", PATH, value); TimeUnit.SECONDS.sleep(random.nextInt(10)); } } public static void main(String[] args) throws Exception { while(true){ try{ ResilientConfigUpdater updater = new ResilientConfigUpdater("192.168.44.231"); updater.run(); }catch(KeeperException.SessionExpiredException e){ //start a new session }catch(KeeperException e){ e.printStackTrace(); break; } } } }
ConfigWatcher類提供了配置信息變更觀察器,在信息修改后會(huì)觸發(fā)顯示方法被調(diào)用。
public class ConfigWatcher implements Watcher{ private ActiveKeyValueStore store; public ConfigWatcher(String hosts) throws Exception{ store = new ActiveKeyValueStore(); store.connect(hosts); } public void displayConfig() throws Exception{ String value = store.read(ConfigUpdater.PATH, this); System.out.printf("Read %s as %s\n", ConfigUpdater.PATH, value); } public void process(WatchedEvent event) { // TODO Auto-generated method stub if(event.getType() == EventType.NodeDataChanged){ try { displayConfig(); } catch (Exception e) { System.out.println("Interrupted. Exiting."); Thread.currentThread().interrupt(); } } } public static void main(String[] args) throws Exception { ConfigWatcher watcher = new ConfigWatcher("192.168.44.231"); watcher.displayConfig(); Thread.sleep(Long.MAX_VALUE); } }
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。