溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Curator的使用

發(fā)布時(shí)間:2020-07-12 20:13:53 來源:網(wǎng)絡(luò) 閱讀:3219 作者:C_凡夫俗子 欄目:建站服務(wù)器


Curator



為了更好的實(shí)現(xiàn)Java操作zookeeper服務(wù)器,后來出現(xiàn)了Curator框架,非常的強(qiáng)大,目前已經(jīng)是Apache的頂級(jí)項(xiàng)目,里面提供了更多豐富的操作,例如session超時(shí)重連、主從選舉、分布式計(jì)數(shù)器、分布式鎖等等適用于各種復(fù)雜的zookeeper場景的API封裝




1 Curator框架使用(一)

Curator框架中使用鏈?zhǔn)骄幊田L(fēng)格,易讀性更強(qiáng),使用工廠方法創(chuàng)建連接對(duì)象。

1.使用CuratorFrameworkFactory的兩個(gè)靜態(tài)工廠方法(參數(shù)不同)來實(shí)現(xiàn)

1.1 connectString:連接串

1.2 retryPolicy:重試連接策略。有四種實(shí)現(xiàn),分別是:ExponentialBackoffRetry、RetryNTimes、RetryOneTimes、RetryUntilElapsed

1.3sessionTimeoutMs:會(huì)話超時(shí)時(shí)間,默認(rèn)為60000ms

1.4connectionTimeoutMs連接超時(shí)時(shí)間,默認(rèn)為15000ms

 

注意對(duì)于retryPolicy策略通過一個(gè)接口來讓用戶自定義實(shí)現(xiàn)




2 Curator框架使用(二)

2.1創(chuàng)建連接

/** 重試策略: 初始時(shí)間為1s, 重試10次 */

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);

 

/** 通過工廠創(chuàng)建連接 */

CuratorFramework cf = CuratorFrameworkFactory.builder()

.connectString(ZK_ADDR)

.sessionTimeoutMs(SESSION_TIMEOUT)

.retryPolicy(retryPolicy)

.build();

 

/** 開啟連接 */

cf.start();



2.2 新增節(jié)點(diǎn)

/**

 * 新增節(jié)點(diǎn):指定節(jié)點(diǎn)類型(不加withMode默認(rèn)為持久類型節(jié)點(diǎn))、路徑、數(shù)據(jù)內(nèi)容

 * 1.creatingParentsIfNeeded() 遞歸創(chuàng)建父目錄

 * 2.withMode() 節(jié)點(diǎn)類型(持久|臨時(shí))

 * 3.forPath() 指定路徑

 */

cf.create()

.creatingParentsIfNeeded()

.withMode(CreateMode.PERSISTENT)

.forPath("/super/c1", "c1內(nèi)容".getBytes());



2.3 刪除節(jié)點(diǎn)

/**

 * 刪除節(jié)點(diǎn)

 * 1.deletingChildrenIfNeeded() 遞歸刪除

 * 2.guaranteed() 確保節(jié)點(diǎn)被刪除

 * 3. withVersion(int version) //特定版本號(hào)  

 */

cf.delete().deletingChildrenIfNeeded().forPath("/super");



2.4 讀取和修改數(shù)據(jù)

/**

 * 讀取和修改數(shù)據(jù) : getData()和setData()

 */

cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1", "c1內(nèi)容".getBytes());

cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2", "c2內(nèi)容".getBytes());

 

/** 讀取節(jié)點(diǎn)內(nèi)容 */

String c2_data = new String(cf.getData().forPath("/super/c2"));

System.out.println("c2_data-->"+c2_data);

 

/** 修改節(jié)點(diǎn)內(nèi)容 */

cf.setData().forPath("/super/c2", "修改c2的內(nèi)容".getBytes());

String update_c2_data = new String(cf.getData().forPath("/super/c2"));

System.out.println("update_c2_data-->"+update_c2_data);



2.5 綁定回調(diào)函數(shù)

ExecutorService pool = Executors.newCachedThreadPool();

 

cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)

.inBackground(new BackgroundCallback() {


@Override

public void proce***esult(CuratorFramework cf, CuratorEvent event)

throws Exception {

System.out.println("code-->" + event.getResultCode());

System.out.println("type-->" + event.getType());

System.out.println("線程為-->" + Thread.currentThread().getName());

}

}, pool).forPath("/super/c3", "c2的內(nèi)容".getBytes());

 

System.out.println("主線程-->" + Thread.currentThread().getName());

 

Thread.sleep(Integer.MAX_VALUE);



2.6 讀取子節(jié)點(diǎn)和判斷節(jié)點(diǎn)是否存在

/**

 * 讀取子節(jié)點(diǎn)的方法: getChildren()

 * 判斷節(jié)點(diǎn)是否存在: checkExists()

 */

List<String> list = cf.getChildren().forPath("/super");

for (String p: list) {

System.out.println(p);

}

 

//如果為null標(biāo)識(shí)不存在

Stat stat = cf.checkExists().forPath("/super/c4");

System.out.println(stat);



3 Curator框架使用(三)

如果要使用類似Wather的監(jiān)聽功能Curator必須依賴一個(gè)jar包,Maven依賴

<dependency>

<groupId>org.apache.curator</groupId>

<artifactId>curator-recipes</artifactId>

<version>2.4.2</version>

</dependency>

有了這個(gè)依賴包,使用NodeCache的方式去客戶端實(shí)例中注冊(cè)一個(gè)監(jiān)聽緩存,然后實(shí)現(xiàn)對(duì)應(yīng)的監(jiān)聽方法即可,這里主要有兩種監(jiān)聽方式

NodeCacheListener:監(jiān)聽節(jié)點(diǎn)的新增、修改操作

PathChildrenCacheListener:監(jiān)聽子節(jié)點(diǎn)的新增、修改、刪除操作



4 Curator使用場景

4.1 分布式鎖

在分布式場景中,為了保證數(shù)據(jù)的一致性,經(jīng)常在程序運(yùn)行的某一個(gè)點(diǎn)需要進(jìn)行同步操作(java提供了synchronized或者Reentrantlock實(shí)現(xiàn))比如看一個(gè)小示例,這個(gè)示例出現(xiàn)分布式不同步的問題

比如:之前是在高并發(fā)下訪問一個(gè)程序,現(xiàn)在則是在高并發(fā)下訪問多個(gè)服務(wù)器節(jié)點(diǎn)(分布式)

 

使用Curator基于zookeeper的特性提供的分布式鎖來處理分布式場景的數(shù)據(jù)一致性,zookeeper本身的分布式是有寫問題的,之前實(shí)現(xiàn)的時(shí)候遇到過,這里強(qiáng)烈推薦使用Curator分布式鎖

public class Lock2 {
/** zk地址 */
private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";
/** session超時(shí)時(shí)間 */
private static final int SESSION_TIMEOUT = 5000; //MS
static int count = 10;
public static CuratorFramework createCuratorFramework(){
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZK_ADDR)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
return cf;
}
public static void main(String[] args) throws Exception {
final CountDownLatch countDown = new CountDownLatch(1);
for (int i =0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
CuratorFramework cf = createCuratorFramework();
cf.start();
//鎖對(duì)象 client 鎖節(jié)點(diǎn)
final InterProcessMutex lock = new InterProcessMutex(cf, "/super");
try {
countDown.await();
lock.acquire(); //獲得鎖
number();
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();//釋放鎖
} catch (Exception e) {
e.printStackTrace();
}
}
}
},"t" + i).start();;
}
Thread.sleep(2000);
countDown.countDown();
}
 
public static void number() {
count--;
System.out.println(Thread.currentThread().getName() + "-->" + count);
}
}




4.2 分布式計(jì)數(shù)器功能

一說到分布式計(jì)數(shù)器,可能腦海里想到AtomicInteger(原子累加)這種經(jīng)典方式,如果針對(duì)一個(gè)JVM的場景當(dāng)然沒問題,但是現(xiàn)在是在分布式場景下,就需要利用Curator框架的DistributedAtomicInteger了

public class CuratorAtomicInteger {
/** zk地址 */
private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";
/** session超時(shí)時(shí)間 */
private static final int SESSION_TIMEOUT = 5000; //MS
public static void main(String[] args) throws Exception {
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZK_ADDR)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
cf.start();
//使用DistributedAtomicInteger
DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(cf, "/superM", new RetryNTimes(3, 1000));
//atomicInteger.increment();
atomicInteger.add(1);
AtomicValue<Integer> atomicValue = atomicInteger.get();
System.out.println("atomicValue.succeeded()-->" + atomicValue.succeeded());
System.out.println("atomicValue.postValue()-->" + atomicValue.postValue());
System.out.println("atomicValue.preValue()-->" + atomicValue.preValue());
}
}





4.3 Barrier

4.3.1 DistributedDoubleBarrier

分布式Barrier 類DistributedDoubleBarrier: 它會(huì)阻塞所有節(jié)點(diǎn)上的等待進(jìn)程,直到某一個(gè)被滿足, 然后所有的節(jié)點(diǎn)同時(shí)開始,中間誰先運(yùn)行完畢,誰后運(yùn)行完畢不關(guān)心,但是最終一定是一塊退出運(yùn)行的

 

public class CuratorBarrier {
/** zk地址 */
private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";
/** session超時(shí)時(shí)間 */
private static final int SESSION_TIMEOUT = 5000; //MS
public static void main(String[] args) throws Exception{
for (int i =0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
/** 實(shí)例化5個(gè)客戶端對(duì)象 */
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZK_ADDR)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
cf.start();
DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/superBarrier", 5);
Thread.sleep(1000 * (new Random()).nextInt(3));
System.out.println(Thread.currentThread().getName() + " 已準(zhǔn)備好!");
barrier.enter();
System.out.println("同時(shí)開始運(yùn)行...");
Thread.sleep(1000 * (new Random()).nextInt(3));
System.out.println("運(yùn)行完畢...");
barrier.leave();
System.out.println("同時(shí)退出運(yùn)行...");
} catch (Exception e) {
e.printStackTrace();
}
}
},"t" + i).start();;
}
}
}





4.3.2 DistributedBarrier

分布式Barrier 類DistributedBarrier: 它會(huì)阻塞所有節(jié)點(diǎn)上的等待進(jìn)程(所有節(jié)點(diǎn)進(jìn)入待執(zhí)行狀態(tài)),直到“某一個(gè)人吹哨”說開始執(zhí)行, 然后所有的節(jié)點(diǎn)同時(shí)開始

public class CuratorBarrier2 {
/** zk地址 */
private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";
/** session超時(shí)時(shí)間 */
private static final int SESSION_TIMEOUT = 5000; //MS
static DistributedBarrier barrier = null;
public static void main(String[] args) throws Exception{
for (int i =0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
/** 實(shí)例化5個(gè)客戶端對(duì)象 */
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZK_ADDR)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
cf.start();
barrier = new DistributedBarrier(cf, "/superBarrier");
System.out.println(Thread.currentThread().getName() + " 設(shè)置barrier");
barrier.setBarrier(); //設(shè)置
barrier.waitOnBarrier(); //等待
System.out.println("開始執(zhí)行程序...");
} catch (Exception e) {
e.printStackTrace();
}
}
},"t" + i).start();;
}
Thread.sleep(5000);
barrier.removeBarrier(); //釋放
}
}





5 Curator重試策略

Curator內(nèi)部實(shí)現(xiàn)的幾種重試策略:

1.ExponentialBackoffRetry:重試指定的次數(shù), 且每一次重試之間停頓的時(shí)間逐漸增加.

2.RetryNTimes:指定最大重試次數(shù)的重試策略   

3.RetryOneTime:僅重試一次

4.RetryUntilElapsed:一直重試直到達(dá)到規(guī)定的時(shí)間



5.1 ExponentialBackoffRetry

ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)

ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

 

參數(shù)說明

   1.baseSleepTimeMs 初始sleep時(shí)間

   2.maxRetries 最大重試次數(shù)

   3.maxSleepMs 最大重試時(shí)間 




5.2 RetryNTimes

RetryNTimes(int n, int sleepMsBetweenRetries)


參數(shù)說明

   1.n 最大重試次數(shù)

   2.sleepMsBetweenRetries 每次重試的間隔時(shí)間

 



5.3 RetryOneTime

RetryOneTime(int sleepMsBetweenRetry)

 

參數(shù)說明

   1.sleepMsBetweenRetry為重試間隔的時(shí)間

 

5.4 RetryUntilElapsed

RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)

 

參數(shù)說明

   1.maxElapsedTimeMs 最大重試時(shí)間

   2.sleepMsBetweenRetries 每次重試的間隔時(shí)間

 

 

 













向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI