您好,登錄后才能下訂單哦!
為了更好的實(shí)現(xiàn)Java操作zookeeper服務(wù)器,后來出現(xiàn)了Curator框架,非常的強(qiáng)大,目前已經(jīng)是Apache的頂級(jí)項(xiàng)目,里面提供了更多豐富的操作,例如session超時(shí)重連、主從選舉、分布式計(jì)數(shù)器、分布式鎖等等適用于各種復(fù)雜的zookeeper場景的API封裝 |
Curator框架中使用鏈?zhǔn)骄幊田L(fēng)格,易讀性更強(qiáng),使用工廠方法創(chuàng)建連接對(duì)象。 1.使用CuratorFrameworkFactory的兩個(gè)靜態(tài)工廠方法(參數(shù)不同)來實(shí)現(xiàn) 1.1 connectString:連接串 1.2 retryPolicy:重試連接策略。有四種實(shí)現(xiàn),分別是:ExponentialBackoffRetry、RetryNTimes、RetryOneTimes、RetryUntilElapsed 1.3sessionTimeoutMs:會(huì)話超時(shí)時(shí)間,默認(rèn)為60000ms 1.4connectionTimeoutMs連接超時(shí)時(shí)間,默認(rèn)為15000ms
注意對(duì)于retryPolicy策略通過一個(gè)接口來讓用戶自定義實(shí)現(xiàn) |
2.1創(chuàng)建連接
/** 重試策略: 初始時(shí)間為1s, 重試10次 */ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
/** 通過工廠創(chuàng)建連接 */ CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(ZK_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(retryPolicy) .build();
/** 開啟連接 */ cf.start(); |
2.2 新增節(jié)點(diǎn)
/** * 新增節(jié)點(diǎn):指定節(jié)點(diǎn)類型(不加withMode默認(rèn)為持久類型節(jié)點(diǎn))、路徑、數(shù)據(jù)內(nèi)容 * 1.creatingParentsIfNeeded() 遞歸創(chuàng)建父目錄 * 2.withMode() 節(jié)點(diǎn)類型(持久|臨時(shí)) * 3.forPath() 指定路徑 */ cf.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath("/super/c1", "c1內(nèi)容".getBytes()); |
2.3 刪除節(jié)點(diǎn)
/** * 刪除節(jié)點(diǎn) * 1.deletingChildrenIfNeeded() 遞歸刪除 * 2.guaranteed() 確保節(jié)點(diǎn)被刪除 * 3. withVersion(int version) //特定版本號(hào) */ cf.delete().deletingChildrenIfNeeded().forPath("/super"); |
2.4 讀取和修改數(shù)據(jù)
/** * 讀取和修改數(shù)據(jù) : getData()和setData() */ cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1", "c1內(nèi)容".getBytes()); cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2", "c2內(nèi)容".getBytes());
/** 讀取節(jié)點(diǎn)內(nèi)容 */ String c2_data = new String(cf.getData().forPath("/super/c2")); System.out.println("c2_data-->"+c2_data);
/** 修改節(jié)點(diǎn)內(nèi)容 */ cf.setData().forPath("/super/c2", "修改c2的內(nèi)容".getBytes()); String update_c2_data = new String(cf.getData().forPath("/super/c2")); System.out.println("update_c2_data-->"+update_c2_data); |
2.5 綁定回調(diào)函數(shù)
ExecutorService pool = Executors.newCachedThreadPool();
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .inBackground(new BackgroundCallback() { @Override public void proce***esult(CuratorFramework cf, CuratorEvent event) throws Exception { System.out.println("code-->" + event.getResultCode()); System.out.println("type-->" + event.getType()); System.out.println("線程為-->" + Thread.currentThread().getName()); } }, pool).forPath("/super/c3", "c2的內(nèi)容".getBytes());
System.out.println("主線程-->" + Thread.currentThread().getName());
Thread.sleep(Integer.MAX_VALUE); |
2.6 讀取子節(jié)點(diǎn)和判斷節(jié)點(diǎn)是否存在
/** * 讀取子節(jié)點(diǎn)的方法: getChildren() * 判斷節(jié)點(diǎn)是否存在: checkExists() */ List<String> list = cf.getChildren().forPath("/super"); for (String p: list) { System.out.println(p); }
//如果為null標(biāo)識(shí)不存在 Stat stat = cf.checkExists().forPath("/super/c4"); System.out.println(stat); |
如果要使用類似Wather的監(jiān)聽功能Curator必須依賴一個(gè)jar包,Maven依賴 <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.4.2</version> </dependency> 有了這個(gè)依賴包,使用NodeCache的方式去客戶端實(shí)例中注冊(cè)一個(gè)監(jiān)聽緩存,然后實(shí)現(xiàn)對(duì)應(yīng)的監(jiān)聽方法即可,這里主要有兩種監(jiān)聽方式 NodeCacheListener:監(jiān)聽節(jié)點(diǎn)的新增、修改操作 PathChildrenCacheListener:監(jiān)聽子節(jié)點(diǎn)的新增、修改、刪除操作 |
4.1 分布式鎖
在分布式場景中,為了保證數(shù)據(jù)的一致性,經(jīng)常在程序運(yùn)行的某一個(gè)點(diǎn)需要進(jìn)行同步操作(java提供了synchronized或者Reentrantlock實(shí)現(xiàn))比如看一個(gè)小示例,這個(gè)示例出現(xiàn)分布式不同步的問題 比如:之前是在高并發(fā)下訪問一個(gè)程序,現(xiàn)在則是在高并發(fā)下訪問多個(gè)服務(wù)器節(jié)點(diǎn)(分布式)
使用Curator基于zookeeper的特性提供的分布式鎖來處理分布式場景的數(shù)據(jù)一致性,zookeeper本身的分布式是有寫問題的,之前實(shí)現(xiàn)的時(shí)候遇到過,這里強(qiáng)烈推薦使用Curator分布式鎖 public class Lock2 { /** zk地址 */ private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181"; /** session超時(shí)時(shí)間 */ private static final int SESSION_TIMEOUT = 5000; //MS static int count = 10; public static CuratorFramework createCuratorFramework(){ CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(ZK_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(1000, 10)) .build(); return cf; } public static void main(String[] args) throws Exception { final CountDownLatch countDown = new CountDownLatch(1); for (int i =0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { CuratorFramework cf = createCuratorFramework(); cf.start(); //鎖對(duì)象 client 鎖節(jié)點(diǎn) final InterProcessMutex lock = new InterProcessMutex(cf, "/super"); try { countDown.await(); lock.acquire(); //獲得鎖 number(); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } finally { try { lock.release();//釋放鎖 } catch (Exception e) { e.printStackTrace(); } } } },"t" + i).start();; } Thread.sleep(2000); countDown.countDown(); } public static void number() { count--; System.out.println(Thread.currentThread().getName() + "-->" + count); } } |
4.2 分布式計(jì)數(shù)器功能
一說到分布式計(jì)數(shù)器,可能腦海里想到AtomicInteger(原子累加)這種經(jīng)典方式,如果針對(duì)一個(gè)JVM的場景當(dāng)然沒問題,但是現(xiàn)在是在分布式場景下,就需要利用Curator框架的DistributedAtomicInteger了 public class CuratorAtomicInteger { /** zk地址 */ private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181"; /** session超時(shí)時(shí)間 */ private static final int SESSION_TIMEOUT = 5000; //MS public static void main(String[] args) throws Exception { CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(ZK_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(1000, 10)) .build(); cf.start(); //使用DistributedAtomicInteger DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(cf, "/superM", new RetryNTimes(3, 1000)); //atomicInteger.increment(); atomicInteger.add(1); AtomicValue<Integer> atomicValue = atomicInteger.get(); System.out.println("atomicValue.succeeded()-->" + atomicValue.succeeded()); System.out.println("atomicValue.postValue()-->" + atomicValue.postValue()); System.out.println("atomicValue.preValue()-->" + atomicValue.preValue()); } } |
4.3 Barrier
4.3.1 DistributedDoubleBarrier
分布式Barrier 類DistributedDoubleBarrier: 它會(huì)阻塞所有節(jié)點(diǎn)上的等待進(jìn)程,直到某一個(gè)被滿足, 然后所有的節(jié)點(diǎn)同時(shí)開始,中間誰先運(yùn)行完畢,誰后運(yùn)行完畢不關(guān)心,但是最終一定是一塊退出運(yùn)行的
public class CuratorBarrier { /** zk地址 */ private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181"; /** session超時(shí)時(shí)間 */ private static final int SESSION_TIMEOUT = 5000; //MS public static void main(String[] args) throws Exception{ for (int i =0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { try { /** 實(shí)例化5個(gè)客戶端對(duì)象 */ CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(ZK_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(1000, 10)) .build(); cf.start(); DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/superBarrier", 5); Thread.sleep(1000 * (new Random()).nextInt(3)); System.out.println(Thread.currentThread().getName() + " 已準(zhǔn)備好!"); barrier.enter(); System.out.println("同時(shí)開始運(yùn)行..."); Thread.sleep(1000 * (new Random()).nextInt(3)); System.out.println("運(yùn)行完畢..."); barrier.leave(); System.out.println("同時(shí)退出運(yùn)行..."); } catch (Exception e) { e.printStackTrace(); } } },"t" + i).start();; } } } |
4.3.2 DistributedBarrier
分布式Barrier 類DistributedBarrier: 它會(huì)阻塞所有節(jié)點(diǎn)上的等待進(jìn)程(所有節(jié)點(diǎn)進(jìn)入待執(zhí)行狀態(tài)),直到“某一個(gè)人吹哨”說開始執(zhí)行, 然后所有的節(jié)點(diǎn)同時(shí)開始 public class CuratorBarrier2 { /** zk地址 */ private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181"; /** session超時(shí)時(shí)間 */ private static final int SESSION_TIMEOUT = 5000; //MS static DistributedBarrier barrier = null; public static void main(String[] args) throws Exception{ for (int i =0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { try { /** 實(shí)例化5個(gè)客戶端對(duì)象 */ CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(ZK_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(1000, 10)) .build(); cf.start(); barrier = new DistributedBarrier(cf, "/superBarrier"); System.out.println(Thread.currentThread().getName() + " 設(shè)置barrier"); barrier.setBarrier(); //設(shè)置 barrier.waitOnBarrier(); //等待 System.out.println("開始執(zhí)行程序..."); } catch (Exception e) { e.printStackTrace(); } } },"t" + i).start();; } Thread.sleep(5000); barrier.removeBarrier(); //釋放 } } |
Curator內(nèi)部實(shí)現(xiàn)的幾種重試策略: 1.ExponentialBackoffRetry:重試指定的次數(shù), 且每一次重試之間停頓的時(shí)間逐漸增加. 2.RetryNTimes:指定最大重試次數(shù)的重試策略 3.RetryOneTime:僅重試一次 4.RetryUntilElapsed:一直重試直到達(dá)到規(guī)定的時(shí)間 |
5.1 ExponentialBackoffRetry
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
參數(shù)說明 1.baseSleepTimeMs 初始sleep時(shí)間 2.maxRetries 最大重試次數(shù) 3.maxSleepMs 最大重試時(shí)間 |
5.2 RetryNTimes
RetryNTimes(int n, int sleepMsBetweenRetries) 參數(shù)說明 1.n 最大重試次數(shù) 2.sleepMsBetweenRetries 每次重試的間隔時(shí)間 |
5.3 RetryOneTime
RetryOneTime(int sleepMsBetweenRetry)
參數(shù)說明 1.sleepMsBetweenRetry為重試間隔的時(shí)間 |
5.4 RetryUntilElapsed
RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
參數(shù)說明 1.maxElapsedTimeMs 最大重試時(shí)間 2.sleepMsBetweenRetries 每次重試的間隔時(shí)間 |
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。