溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

ZooKeeper構(gòu)建配置服務(wù)

發(fā)布時(shí)間:2020-07-20 09:11:06 來(lái)源:網(wǎng)絡(luò) 閱讀:377 作者:zlfwmm 欄目:開發(fā)技術(shù)

ZooKeeper構(gòu)建配置服務(wù)

 * 配置服務(wù)是分布式應(yīng)用所需要的基本服務(wù)之一,它使集群中的機(jī)器可以共享配置信息中那些公共的部分。

 * 簡(jiǎn)單的說(shuō),ZooKeeper可以作為一個(gè)具有高可用性的配置存儲(chǔ)器,允許分布式應(yīng)用的參與者檢索和更新配置文件。

 * 使用ZooKeeper中的觀察機(jī)制,可以建立一個(gè)活躍的配置服務(wù),使那些感興趣的客戶端能夠獲得配置信息修改的通知。

在每個(gè)znode上存儲(chǔ)一個(gè)鍵值對(duì),ActiveKeyValueStore 提供了從zookeeper服務(wù)上寫和讀取鍵值方法。

public class ActiveKeyValueStore extends ConnectionWatcher{

	private static final Charset CHARSET =Charset.forName("GBk");
	private static final int MAX_RETRIES = 5;
	private static final long RETRY_PERIOD_SECONDS = 60;
	
	public void write(String path, String value) throws Exception{
		int retries = 0;
		while(true){
			try {
				Stat stat = zk.exists(path, false);
				if(stat == null){
					zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
				}else{
					zk.setData(path, value.getBytes(CHARSET), -1);
				}
			} catch (KeeperException.SessionExpiredException e) {
				throw e;
			}catch(KeeperException e){
				if(retries++ == MAX_RETRIES){
					throw e;
				}
				TimeUnit.SECONDS.sleep(RETRY_PERIOD_SECONDS);
			}
		}
	}
	
	public String read(String path, Watcher watcher) throws Exception{
		byte[] data  = zk.getData(path, watcher, null);
		return new String(data, CHARSET);
	}
}

與zookeeper服務(wù)創(chuàng)建連接

public class ConnectionWatcher implements Watcher{

	private static final int SESSION_TIMEOUT = 5000;
	protected ZooKeeper zk;
	private CountDownLatch connectedSignal = new CountDownLatch(1);
	
	public void connect(String hosts) throws Exception{
		//創(chuàng)建zookeeper實(shí)例的時(shí)候會(huì)啟動(dòng)一個(gè)線程連接到zookeeper服務(wù)。
		zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
		connectedSignal.await();
	}

	//當(dāng)客戶端已經(jīng)與zookeeper建立連接后,Watcher的process方法會(huì)被調(diào)用。
	public void process(WatchedEvent event) {
		if(event.getState() == KeeperState.SyncConnected){
			connectedSignal.countDown();
		}
	}

	public void close() throws Exception{
		zk.close();
	}
}

ResilientConfigUpdater類提供了管理更新配置信息方法。

public class ResilientConfigUpdater {
	
	public static final String PATH = "/config";
	
	private ActiveKeyValueStore store;
	private Random random = new Random();
	
	public ResilientConfigUpdater(String hosts) throws Exception{
		store = new ActiveKeyValueStore();
		store.connect(hosts);
	}
	
	public void run() throws Exception{
		while(true){
			String value = random.nextInt(100)+"";
			store.write(PATH, value);
			
			System.out.printf("Set %s to %s\n", PATH, value);
			TimeUnit.SECONDS.sleep(random.nextInt(10));
		}
	}
	public static void main(String[] args) throws Exception {
		while(true){
			try{
				ResilientConfigUpdater updater = new ResilientConfigUpdater("192.168.44.231");
				updater.run();
			}catch(KeeperException.SessionExpiredException e){
				//start a new session
			}catch(KeeperException e){
				e.printStackTrace();
				break;
			}
		}
	}
}

ConfigWatcher類提供了配置信息變更觀察器,在信息修改后會(huì)觸發(fā)顯示方法被調(diào)用。

public class ConfigWatcher implements Watcher{

	private ActiveKeyValueStore store;
	
	public ConfigWatcher(String hosts) throws Exception{
		store = new ActiveKeyValueStore();
		store.connect(hosts);
	}
	
	public void displayConfig() throws Exception{
		String value = store.read(ConfigUpdater.PATH, this);
		System.out.printf("Read %s as %s\n", ConfigUpdater.PATH, value);
	}
	
	public void process(WatchedEvent event) {
		// TODO Auto-generated method stub
		if(event.getType() == EventType.NodeDataChanged){
			try {
				displayConfig();
			} catch (Exception e) {
				System.out.println("Interrupted. Exiting.");
				Thread.currentThread().interrupt();
			}
		}
	}
	
	public static void main(String[] args) throws Exception {
		ConfigWatcher watcher = new ConfigWatcher("192.168.44.231");
		watcher.displayConfig();
		
		Thread.sleep(Long.MAX_VALUE);
	}
}


向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI