您好,登錄后才能下訂單哦!
(1)zookeeper如何實現(xiàn)分布式鎖?
(2)zookeeper分布式鎖有哪些優(yōu)點?
(3)zookeeper分布式鎖有哪些缺點?
zooKeeper是一個分布式的,開放源碼的分布式應(yīng)用程序協(xié)調(diào)服務(wù),它可以為分布式應(yīng)用提供一致性的服務(wù),它是Hadoop和Hbase的重要組件,同時也可以作為配置中心、注冊中心運用在微服務(wù)體系中。
本章我們將介紹zookeeper如何實現(xiàn)分布式鎖運用在分布式系統(tǒng)中。
zooKeeper操作和維護(hù)的為一個個數(shù)據(jù)節(jié)點,稱為 znode,采用類似文件系統(tǒng)的層級樹狀結(jié)構(gòu)進(jìn)行管理,如果 znode 節(jié)點包含數(shù)據(jù)則存儲為字節(jié)數(shù)組(byte array)。
而且,同一個節(jié)點多個客戶同時創(chuàng)建,本文由公從號“彤哥讀源碼”原創(chuàng),只有一個客戶端會成功,其它客戶端創(chuàng)建時將失敗。
znode 共有四種類型:
持久(無序)
持久有序
臨時(無序)
其中,持久節(jié)點如果不手動刪除會一直存在,臨時節(jié)點當(dāng)客戶端session失效就會自動刪除節(jié)點。
watcher(事件監(jiān)聽器),是zookeeper中的一個很重要的特性。
zookeeper允許用戶在指定節(jié)點上注冊一些watcher,并且在一些特定事件觸發(fā)的時候,zooKeeper服務(wù)端會將事件通知到感興趣的客戶端上去,該機制是Zookeeper實現(xiàn)分布式協(xié)調(diào)服務(wù)的重要特性。
KeeperState | EventType | 觸發(fā)條件 | 說明 | 操作 |
---|---|---|---|---|
SyncConnected(3) | None(-1) | 客戶端與服務(wù)端成功建立連接 | 此時客戶端和服務(wù)器處于連接狀態(tài) | - |
同上 | NodeCreated(1) | Watcher監(jiān)聽的對應(yīng)數(shù)據(jù)節(jié)點被創(chuàng)建 | 同上 | Create |
同上 | NodeDeleted(2) | Watcher監(jiān)聽的對應(yīng)數(shù)據(jù)節(jié)點被刪除 | 同上 | Delete/znode |
同上 | NodeDataChanged(3) | Watcher監(jiān)聽的對應(yīng)數(shù)據(jù)節(jié)點的數(shù)據(jù)內(nèi)容發(fā)生變更 | 同上 | setDate/znode |
同上 | NodeChildChanged(4) | Wather監(jiān)聽的對應(yīng)數(shù)據(jù)節(jié)點的子節(jié)點列表發(fā)生變更 | 同上 | Create/child |
Disconnected(0) | None(-1) | 客戶端與ZooKeeper服務(wù)器斷開連接 | 此時客戶端和服務(wù)器處于斷開連接狀態(tài) | - |
Expired(-112) | None(-1) | 會話超時 | 此時客戶端會話失效,通常同時也會受到SessionExpiredException異常 | - |
AuthFailed(4) | None(-1) | 通常有兩種情況,1:使用錯誤的schema進(jìn)行權(quán)限檢查 2:SASL權(quán)限檢查失敗 | 通常同時也會收到AuthFailedException異常 | - |
既然,同一個節(jié)點只能創(chuàng)建一次,那么,加鎖時檢測節(jié)點是否存在,不存在則創(chuàng)建之,存在或者創(chuàng)建失敗則監(jiān)聽這個節(jié)點的刪除事件,這樣,當(dāng)釋放鎖的時候監(jiān)聽的客戶端再次競爭去創(chuàng)建這個節(jié)點,成功的則獲取到鎖,不成功的則再次監(jiān)聽該節(jié)點。
比如,有三個客戶端client1、client2、client3同時獲取/locker/user_1這把鎖,它們將按照如下步驟運行:
(1)三者同時嘗試創(chuàng)建/locker/user_1節(jié)點;
(2)client1創(chuàng)建成功,它獲取到鎖;
(3)client2和client3創(chuàng)建失敗,它們監(jiān)聽/locker/user_1的刪除事件;
(4)client1執(zhí)行鎖內(nèi)業(yè)務(wù)邏輯;
(5)client1釋放鎖,刪除節(jié)點/locker/user_1;
(6)client2和client3都捕獲到節(jié)點/locker/user_1被刪除的事件,二者皆被喚醒;
(7)client2和client3同時去創(chuàng)建/locker/user_1節(jié)點;
(8)回到第二步,依次類推,本文由公從號“彤哥讀源碼”原創(chuàng);
不過,這種方案有個很嚴(yán)重的弊端——驚群效應(yīng)。
如果并發(fā)量很高,多個客戶端同時監(jiān)聽同一個節(jié)點,釋放鎖時同時喚醒這么多個客戶端,然后再競爭,最后還是只有一個能獲取到鎖,其它客戶端又要沉睡,這些客戶端的喚醒沒有任何意義,極大地浪費系統(tǒng)資源,那么有沒有更好的方案呢?答案是當(dāng)然有,請看方案二。
為了解決方案一中的驚群效應(yīng),我們可以使用有序子節(jié)點的形式來實現(xiàn)分布式鎖,而且為了規(guī)避客戶端獲取鎖后突然斷線的風(fēng)險,我們有必要使用臨時有序節(jié)點。
比如,有三個客戶端client1、client2、client3同時獲取/locker/user_1這把鎖,它們將按照如下步驟運行:
(1)三者同時在/locker/user_1/下面創(chuàng)建臨時有序子節(jié)點;
(2)三者皆創(chuàng)建成功,分別為/locker/user_1/0000000001、/locker/user_1/0000000003、/locker/user_1/0000000002;
(3)檢查自己創(chuàng)建的節(jié)點是不是子節(jié)點中最小的;
(4)client1發(fā)現(xiàn)自己是最小的節(jié)點,它獲取到鎖;
(5)client2和client3發(fā)現(xiàn)自己不是最小的節(jié)點,它們無法獲取到鎖;
(6)client2創(chuàng)建的節(jié)點為/locker/user_1/0000000003,它監(jiān)聽其上一個節(jié)點/locker/user_1/0000000002的刪除事件;
(7)client3創(chuàng)建的節(jié)點為/locker/user_1/0000000002,它監(jiān)聽其上一個節(jié)點/locker/user_1/0000000001的刪除事件;
(8)client1執(zhí)行鎖內(nèi)業(yè)務(wù)邏輯;
(9)client1釋放鎖,刪除節(jié)點/locker/user_1/0000000001;
(10)client3監(jiān)聽到節(jié)點/locker/user_1/0000000001的刪除事件,被喚醒;
(11)client3再次檢查自己是不是最小的節(jié)點,發(fā)現(xiàn)是,則獲取到鎖;
(12)client3執(zhí)行鎖內(nèi)業(yè)務(wù)邏輯,本文由公從號“彤哥讀源碼”原創(chuàng);
(13)client3釋放鎖,刪除節(jié)點/locker/user_1/0000000002;
(14)client2監(jiān)聽到節(jié)點/locker/user_1/0000000002的刪除事件,被喚醒;
(15)client2執(zhí)行鎖內(nèi)業(yè)務(wù)邏輯;
(16)client2釋放鎖,刪除節(jié)點/locker/user_1/0000000003;
(17)client2檢查/locker/user_1/下是否還有子節(jié)點,沒有了則刪除/locker/user_1節(jié)點;
(18)流程結(jié)束;
這種方案相對于方案一來說,每次釋放鎖時只喚醒一個客戶端,減少了線程喚醒的代價,提高了效率。
pom中引入以下jar包:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.5</version>
</dependency>
定義一個Locker接口,與上一章mysql分布式鎖使用同一個接口。
public interface Locker {
void lock(String key, Runnable command);
}
這里通過內(nèi)部類ZkLockerWatcher處理zookeeper的相關(guān)操作,需要注意以下幾點:
(1)zk連接建立完畢之前不要進(jìn)行相關(guān)操作,否則會報ConnectionLoss異常,這里通過LockSupport.park();阻塞連接線程并在監(jiān)聽線程中喚醒處理;
(2)客戶端線程與監(jiān)聽線程不是同一個線程,所以可以通過LockSupport.park();及LockSupport.unpark(thread);來處理;
(3)中間很多步驟不是原子的(坑),所以需要再次檢測,詳見代碼中注釋;
@Slf4j
@Component
public class ZkLocker implements Locker {
@Override
public void lock(String key, Runnable command) {
ZkLockerWatcher watcher = ZkLockerWatcher.conn(key);
try {
if (watcher.getLock()) {
command.run();
}
} finally {
watcher.releaseLock();
}
}
private static class ZkLockerWatcher implements Watcher {
public static final String connAddr = "127.0.0.1:2181";
public static final int timeout = 6000;
public static final String LOCKER_ROOT = "/locker";
ZooKeeper zooKeeper;
String parentLockPath;
String childLockPath;
Thread thread;
public static ZkLockerWatcher conn(String key) {
ZkLockerWatcher watcher = new ZkLockerWatcher();
try {
ZooKeeper zooKeeper = watcher.zooKeeper = new ZooKeeper(connAddr, timeout, watcher);
watcher.thread = Thread.currentThread();
// 阻塞等待連接建立完畢
LockSupport.park();
// 根節(jié)點如果不存在,就創(chuàng)建一個(并發(fā)問題,如果兩個線程同時檢測不存在,兩個同時去創(chuàng)建必須有一個會失?。? if (zooKeeper.exists(LOCKER_ROOT, false) == null) {
try {
zooKeeper.create(LOCKER_ROOT, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
// 如果節(jié)點已存在,則創(chuàng)建失敗,這里捕獲異常,并不阻擋程序正常運行
log.info("創(chuàng)建節(jié)點 {} 失敗", LOCKER_ROOT);
}
}
// 當(dāng)前加鎖的節(jié)點是否存在
watcher.parentLockPath = LOCKER_ROOT + "/" + key;
if (zooKeeper.exists(watcher.parentLockPath, false) == null) {
try {
zooKeeper.create(watcher.parentLockPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
// 如果節(jié)點已存在,則創(chuàng)建失敗,這里捕獲異常,并不阻擋程序正常運行
log.info("創(chuàng)建節(jié)點 {} 失敗", watcher.parentLockPath);
}
}
} catch (Exception e) {
log.error("conn to zk error", e);
throw new RuntimeException("conn to zk error");
}
return watcher;
}
public boolean getLock() {
try {
// 創(chuàng)建子節(jié)點,本文由公從號“彤哥讀源碼”原創(chuàng)
this.childLockPath = zooKeeper.create(parentLockPath + "/", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 檢查自己是不是最小的節(jié)點,是則獲取成功,不是則監(jiān)聽上一個節(jié)點
return getLockOrWatchLast();
} catch (Exception e) {
log.error("get lock error", e);
throw new RuntimeException("get lock error");
} finally {
// System.out.println("getLock: " + childLockPath);
}
}
public void releaseLock() {
try {
if (childLockPath != null) {
// 釋放鎖,刪除節(jié)點
zooKeeper.delete(childLockPath, -1);
}
// 最后一個釋放的刪除鎖節(jié)點
List<String> children = zooKeeper.getChildren(parentLockPath, false);
if (children.isEmpty()) {
try {
zooKeeper.delete(parentLockPath, -1);
} catch (KeeperException e) {
// 如果刪除之前又新加了一個子節(jié)點,會刪除失敗
log.info("刪除節(jié)點 {} 失敗", parentLockPath);
}
}
// 關(guān)閉zk連接
if (zooKeeper != null) {
zooKeeper.close();
}
} catch (Exception e) {
log.error("release lock error", e);
throw new RuntimeException("release lock error");
} finally {
// System.out.println("releaseLock: " + childLockPath);
}
}
private boolean getLockOrWatchLast() throws KeeperException, InterruptedException {
List<String> children = zooKeeper.getChildren(parentLockPath, false);
// 必須要排序一下,這里取出來的順序可能是亂的
Collections.sort(children);
// 如果當(dāng)前節(jié)點是第一個子節(jié)點,則獲取鎖成功
if ((parentLockPath + "/" + children.get(0)).equals(childLockPath)) {
return true;
}
// 如果不是第一個子節(jié)點,就監(jiān)聽前一個節(jié)點
String last = "";
for (String child : children) {
if ((parentLockPath + "/" + child).equals(childLockPath)) {
break;
}
last = child;
}
if (zooKeeper.exists(parentLockPath + "/" + last, true) != null) {
this.thread = Thread.currentThread();
// 阻塞當(dāng)前線程
LockSupport.park();
// 喚醒之后重新檢測自己是不是最小的節(jié)點,因為有可能上一個節(jié)點斷線了
return getLockOrWatchLast();
} else {
// 如果上一個節(jié)點不存在,說明還沒來得及監(jiān)聽就釋放了,重新檢查一次
return getLockOrWatchLast();
}
}
@Override
public void process(WatchedEvent event) {
if (this.thread != null) {
// 喚醒阻塞的線程(這是在監(jiān)聽線程,跟獲取鎖的線程不是同一個線程)
LockSupport.unpark(this.thread);
this.thread = null;
}
}
}
}
我們這里起兩批線程,一批獲取user_1這個鎖,一批獲取user_2這個鎖。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class ZkLockerTest {
@Autowired
private Locker locker;
@Test
public void testZkLocker() throws IOException {
for (int i = 0; i < 1000; i++) {
new Thread(()->{
locker.lock("user_1", ()-> {
try {
System.out.println(String.format("user_1 time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}, "Thread-"+i).start();
}
for (int i = 1000; i < 2000; i++) {
new Thread(()->{
locker.lock("user_2", ()-> {
try {
System.out.println(String.format("user_2 time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}, "Thread-"+i).start();
}
System.in.read();
}
}
運行結(jié)果:
可以看到穩(wěn)定在500ms左右打印兩個鎖的結(jié)果。
user_1 time: 1568973299578, threadName: Thread-10
user_2 time: 1568973299579, threadName: Thread-1780
user_1 time: 1568973300091, threadName: Thread-887
user_2 time: 1568973300091, threadName: Thread-1542
user_1 time: 1568973300594, threadName: Thread-882
user_2 time: 1568973300594, threadName: Thread-1539
user_2 time: 1568973301098, threadName: Thread-1592
user_1 time: 1568973301098, threadName: Thread-799
user_1 time: 1568973301601, threadName: Thread-444
user_2 time: 1568973301601, threadName: Thread-1096
user_1 time: 1568973302104, threadName: Thread-908
user_2 time: 1568973302104, threadName: Thread-1574
user_2 time: 1568973302607, threadName: Thread-1515
user_1 time: 1568973302607, threadName: Thread-80
user_1 time: 1568973303110, threadName: Thread-274
user_2 time: 1568973303110, threadName: Thread-1774
user_1 time: 1568973303615, threadName: Thread-324
user_2 time: 1568973303615, threadName: Thread-1621
上面的原生API實現(xiàn)更易于理解zookeeper實現(xiàn)分布式鎖的邏輯,但是難免保證沒有什么問題,比如不是重入鎖,不支持讀寫鎖等。
下面我們一起看看現(xiàn)有的輪子curator是怎么實現(xiàn)的。
pom文件中引入以下jar包:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
下面是互斥鎖的一種實現(xiàn)方案:
@Component
@Slf4j
public class ZkCuratorLocker implements Locker {
public static final String connAddr = "127.0.0.1:2181";
public static final int timeout = 6000;
public static final String LOCKER_ROOT = "/locker";
private CuratorFramework cf;
@PostConstruct
public void init() {
this.cf = CuratorFrameworkFactory.builder()
.connectString(connAddr)
.sessionTimeoutMs(timeout)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
cf.start();
}
@Override
public void lock(String key, Runnable command) {
String path = LOCKER_ROOT + "/" + key;
InterProcessLock lock = new InterProcessMutex(cf, path);
try {
// 本文由公從號“彤哥讀源碼”原創(chuàng)
lock.acquire();
command.run();
} catch (Exception e) {
log.error("get lock error", e);
throw new RuntimeException("get lock error", e);
} finally {
try {
lock.release();
} catch (Exception e) {
log.error("release lock error", e);
throw new RuntimeException("release lock error", e);
}
}
}
}
除了互斥鎖,curator還提供了讀寫鎖、多重鎖、信號量等實現(xiàn)方式,而且他們是可重入的鎖。
(1)zookeeper中的節(jié)點有四種類型:持久、持久有序、臨時、臨時有序;
(2)zookeeper提供了一種非常重要的特性——監(jiān)聽機制,它可以用來監(jiān)聽節(jié)點的變化;
(3)zookeeper分布式鎖是基于 臨時有序節(jié)點 + 監(jiān)聽機制 實現(xiàn)的;
(4)zookeeper分布式鎖加鎖時在鎖路徑下創(chuàng)建臨時有序節(jié)點;
(5)如果自己是第一個節(jié)點,則獲得鎖;
(6)如果自己不是第一個節(jié)點,則監(jiān)聽前一個節(jié)點,并阻塞當(dāng)前線程;
(7)當(dāng)監(jiān)聽到前一個節(jié)點的刪除事件時,喚醒當(dāng)前節(jié)點的線程,并再次檢查自己是不是第一個節(jié)點;
(8)使用臨時有序節(jié)點而不是持久有序節(jié)點是為了讓客戶端無故斷線時能夠自動釋放鎖;
zookeeper分布式鎖有哪些優(yōu)點?
答:1)zookeeper本身可以集群部署,相對于mysql的單點更可靠;
2)不會占用mysql的連接數(shù),不會增加mysql的壓力;
3)使用監(jiān)聽機制,減少線程上下文切換的次數(shù);
4)客戶端斷線能夠自動釋放鎖,非常安全;
5)有現(xiàn)有的輪子curator可以使用;
6)curator實現(xiàn)方式是可重入的,對現(xiàn)有代碼改造成本小;
zookeeper分布式鎖有哪些缺點?
答:1)加鎖會頻繁地“寫”zookeeper,增加zookeeper的壓力;
2)寫zookeeper的時候會在集群進(jìn)行同步,節(jié)點數(shù)越多,同步越慢,獲取鎖的過程越慢;
3)需要另外依賴zookeeper,而大部分服務(wù)是不會使用zookeeper的,增加了系統(tǒng)的復(fù)雜性;
4)相對于redis分布式鎖,性能要稍微略差一些;
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。