溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

死磕 java同步系列之zookeeper分布式鎖

發(fā)布時間:2020-07-18 02:23:28 來源:網(wǎng)絡(luò) 閱讀:225 作者:彤哥讀源碼 欄目:編程語言

問題

(1)zookeeper如何實現(xiàn)分布式鎖?

(2)zookeeper分布式鎖有哪些優(yōu)點?

(3)zookeeper分布式鎖有哪些缺點?

簡介

zooKeeper是一個分布式的,開放源碼的分布式應(yīng)用程序協(xié)調(diào)服務(wù),它可以為分布式應(yīng)用提供一致性的服務(wù),它是Hadoop和Hbase的重要組件,同時也可以作為配置中心、注冊中心運用在微服務(wù)體系中。

本章我們將介紹zookeeper如何實現(xiàn)分布式鎖運用在分布式系統(tǒng)中。

基礎(chǔ)知識

什么是znode?

zooKeeper操作和維護(hù)的為一個個數(shù)據(jù)節(jié)點,稱為 znode,采用類似文件系統(tǒng)的層級樹狀結(jié)構(gòu)進(jìn)行管理,如果 znode 節(jié)點包含數(shù)據(jù)則存儲為字節(jié)數(shù)組(byte array)。

而且,同一個節(jié)點多個客戶同時創(chuàng)建,本文由公從號“彤哥讀源碼”原創(chuàng),只有一個客戶端會成功,其它客戶端創(chuàng)建時將失敗。

死磕 java同步系列之zookeeper分布式鎖

節(jié)點類型

znode 共有四種類型:

  • 持久(無序)

  • 持久有序

  • 臨時(無序)

  • 臨時有序

其中,持久節(jié)點如果不手動刪除會一直存在,臨時節(jié)點當(dāng)客戶端session失效就會自動刪除節(jié)點。

什么是watcher?

watcher(事件監(jiān)聽器),是zookeeper中的一個很重要的特性。

zookeeper允許用戶在指定節(jié)點上注冊一些watcher,并且在一些特定事件觸發(fā)的時候,zooKeeper服務(wù)端會將事件通知到感興趣的客戶端上去,該機制是Zookeeper實現(xiàn)分布式協(xié)調(diào)服務(wù)的重要特性

KeeperState EventType 觸發(fā)條件 說明 操作
SyncConnected(3) None(-1) 客戶端與服務(wù)端成功建立連接 此時客戶端和服務(wù)器處于連接狀態(tài) -
同上 NodeCreated(1) Watcher監(jiān)聽的對應(yīng)數(shù)據(jù)節(jié)點被創(chuàng)建 同上 Create
同上 NodeDeleted(2) Watcher監(jiān)聽的對應(yīng)數(shù)據(jù)節(jié)點被刪除 同上 Delete/znode
同上 NodeDataChanged(3) Watcher監(jiān)聽的對應(yīng)數(shù)據(jù)節(jié)點的數(shù)據(jù)內(nèi)容發(fā)生變更 同上 setDate/znode
同上 NodeChildChanged(4) Wather監(jiān)聽的對應(yīng)數(shù)據(jù)節(jié)點的子節(jié)點列表發(fā)生變更 同上 Create/child
Disconnected(0) None(-1) 客戶端與ZooKeeper服務(wù)器斷開連接 此時客戶端和服務(wù)器處于斷開連接狀態(tài) -
Expired(-112) None(-1) 會話超時 此時客戶端會話失效,通常同時也會受到SessionExpiredException異常 -
AuthFailed(4) None(-1) 通常有兩種情況,1:使用錯誤的schema進(jìn)行權(quán)限檢查 2:SASL權(quán)限檢查失敗 通常同時也會收到AuthFailedException異常 -

原理解析

方案一

既然,同一個節(jié)點只能創(chuàng)建一次,那么,加鎖時檢測節(jié)點是否存在,不存在則創(chuàng)建之,存在或者創(chuàng)建失敗則監(jiān)聽這個節(jié)點的刪除事件,這樣,當(dāng)釋放鎖的時候監(jiān)聽的客戶端再次競爭去創(chuàng)建這個節(jié)點,成功的則獲取到鎖,不成功的則再次監(jiān)聽該節(jié)點。

死磕 java同步系列之zookeeper分布式鎖

比如,有三個客戶端client1、client2、client3同時獲取/locker/user_1這把鎖,它們將按照如下步驟運行:

(1)三者同時嘗試創(chuàng)建/locker/user_1節(jié)點;

(2)client1創(chuàng)建成功,它獲取到鎖;

(3)client2和client3創(chuàng)建失敗,它們監(jiān)聽/locker/user_1的刪除事件;

(4)client1執(zhí)行鎖內(nèi)業(yè)務(wù)邏輯;

(5)client1釋放鎖,刪除節(jié)點/locker/user_1;

(6)client2和client3都捕獲到節(jié)點/locker/user_1被刪除的事件,二者皆被喚醒;

(7)client2和client3同時去創(chuàng)建/locker/user_1節(jié)點;

(8)回到第二步,依次類推,本文由公從號“彤哥讀源碼”原創(chuàng);

不過,這種方案有個很嚴(yán)重的弊端——驚群效應(yīng)。

如果并發(fā)量很高,多個客戶端同時監(jiān)聽同一個節(jié)點,釋放鎖時同時喚醒這么多個客戶端,然后再競爭,最后還是只有一個能獲取到鎖,其它客戶端又要沉睡,這些客戶端的喚醒沒有任何意義,極大地浪費系統(tǒng)資源,那么有沒有更好的方案呢?答案是當(dāng)然有,請看方案二。

方案二

為了解決方案一中的驚群效應(yīng),我們可以使用有序子節(jié)點的形式來實現(xiàn)分布式鎖,而且為了規(guī)避客戶端獲取鎖后突然斷線的風(fēng)險,我們有必要使用臨時有序節(jié)點。

死磕 java同步系列之zookeeper分布式鎖

比如,有三個客戶端client1、client2、client3同時獲取/locker/user_1這把鎖,它們將按照如下步驟運行:

(1)三者同時在/locker/user_1/下面創(chuàng)建臨時有序子節(jié)點;

(2)三者皆創(chuàng)建成功,分別為/locker/user_1/0000000001、/locker/user_1/0000000003、/locker/user_1/0000000002;

(3)檢查自己創(chuàng)建的節(jié)點是不是子節(jié)點中最小的;

(4)client1發(fā)現(xiàn)自己是最小的節(jié)點,它獲取到鎖;

(5)client2和client3發(fā)現(xiàn)自己不是最小的節(jié)點,它們無法獲取到鎖;

(6)client2創(chuàng)建的節(jié)點為/locker/user_1/0000000003,它監(jiān)聽其上一個節(jié)點/locker/user_1/0000000002的刪除事件;

(7)client3創(chuàng)建的節(jié)點為/locker/user_1/0000000002,它監(jiān)聽其上一個節(jié)點/locker/user_1/0000000001的刪除事件;

(8)client1執(zhí)行鎖內(nèi)業(yè)務(wù)邏輯;

(9)client1釋放鎖,刪除節(jié)點/locker/user_1/0000000001;

(10)client3監(jiān)聽到節(jié)點/locker/user_1/0000000001的刪除事件,被喚醒;

(11)client3再次檢查自己是不是最小的節(jié)點,發(fā)現(xiàn)是,則獲取到鎖;

(12)client3執(zhí)行鎖內(nèi)業(yè)務(wù)邏輯,本文由公從號“彤哥讀源碼”原創(chuàng);

(13)client3釋放鎖,刪除節(jié)點/locker/user_1/0000000002;

(14)client2監(jiān)聽到節(jié)點/locker/user_1/0000000002的刪除事件,被喚醒;

(15)client2執(zhí)行鎖內(nèi)業(yè)務(wù)邏輯;

(16)client2釋放鎖,刪除節(jié)點/locker/user_1/0000000003;

(17)client2檢查/locker/user_1/下是否還有子節(jié)點,沒有了則刪除/locker/user_1節(jié)點;

(18)流程結(jié)束;

這種方案相對于方案一來說,每次釋放鎖時只喚醒一個客戶端,減少了線程喚醒的代價,提高了效率。

zookeeper原生API實現(xiàn)

pom文件

pom中引入以下jar包:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.5</version>
</dependency>

Locker接口

定義一個Locker接口,與上一章mysql分布式鎖使用同一個接口。

public interface Locker {
    void lock(String key, Runnable command);
}

zookeeper分布式鎖實現(xiàn)

這里通過內(nèi)部類ZkLockerWatcher處理zookeeper的相關(guān)操作,需要注意以下幾點:

(1)zk連接建立完畢之前不要進(jìn)行相關(guān)操作,否則會報ConnectionLoss異常,這里通過LockSupport.park();阻塞連接線程并在監(jiān)聽線程中喚醒處理;

(2)客戶端線程與監(jiān)聽線程不是同一個線程,所以可以通過LockSupport.park();及LockSupport.unpark(thread);來處理;

(3)中間很多步驟不是原子的(坑),所以需要再次檢測,詳見代碼中注釋;

@Slf4j
@Component
public class ZkLocker implements Locker {
    @Override
    public void lock(String key, Runnable command) {
        ZkLockerWatcher watcher = ZkLockerWatcher.conn(key);
        try {
            if (watcher.getLock()) {
                command.run();
            }
        } finally {
            watcher.releaseLock();
        }
    }

    private static class ZkLockerWatcher implements Watcher {
        public static final String connAddr = "127.0.0.1:2181";
        public static final int timeout = 6000;
        public static final String LOCKER_ROOT = "/locker";

        ZooKeeper zooKeeper;
        String parentLockPath;
        String childLockPath;
        Thread thread;

        public static ZkLockerWatcher conn(String key) {
            ZkLockerWatcher watcher = new ZkLockerWatcher();
            try {
                ZooKeeper zooKeeper = watcher.zooKeeper = new ZooKeeper(connAddr, timeout, watcher);
                watcher.thread = Thread.currentThread();
                // 阻塞等待連接建立完畢
                LockSupport.park();
                // 根節(jié)點如果不存在,就創(chuàng)建一個(并發(fā)問題,如果兩個線程同時檢測不存在,兩個同時去創(chuàng)建必須有一個會失?。?                if (zooKeeper.exists(LOCKER_ROOT, false) == null) {
                    try {
                        zooKeeper.create(LOCKER_ROOT, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    } catch (KeeperException e) {
                        // 如果節(jié)點已存在,則創(chuàng)建失敗,這里捕獲異常,并不阻擋程序正常運行
                        log.info("創(chuàng)建節(jié)點 {} 失敗", LOCKER_ROOT);
                    }
                }
                // 當(dāng)前加鎖的節(jié)點是否存在
                watcher.parentLockPath = LOCKER_ROOT + "/" + key;
                if (zooKeeper.exists(watcher.parentLockPath, false) == null) {
                    try {
                        zooKeeper.create(watcher.parentLockPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    } catch (KeeperException e) {
                        // 如果節(jié)點已存在,則創(chuàng)建失敗,這里捕獲異常,并不阻擋程序正常運行
                        log.info("創(chuàng)建節(jié)點 {} 失敗", watcher.parentLockPath);
                    }
                }

            } catch (Exception e) {
                log.error("conn to zk error", e);
                throw new RuntimeException("conn to zk error");
            }
            return watcher;
        }

        public boolean getLock() {
            try {
                // 創(chuàng)建子節(jié)點,本文由公從號“彤哥讀源碼”原創(chuàng)
                this.childLockPath = zooKeeper.create(parentLockPath + "/", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                // 檢查自己是不是最小的節(jié)點,是則獲取成功,不是則監(jiān)聽上一個節(jié)點
                return getLockOrWatchLast();
            } catch (Exception e) {
                log.error("get lock error", e);
                throw new RuntimeException("get lock error");
            } finally {
//                System.out.println("getLock: " + childLockPath);
            }
        }

        public void releaseLock() {
            try {
                if (childLockPath != null) {
                    // 釋放鎖,刪除節(jié)點
                    zooKeeper.delete(childLockPath, -1);
                }
                // 最后一個釋放的刪除鎖節(jié)點
                List<String> children = zooKeeper.getChildren(parentLockPath, false);
                if (children.isEmpty()) {
                    try {
                        zooKeeper.delete(parentLockPath, -1);
                    } catch (KeeperException e) {
                        // 如果刪除之前又新加了一個子節(jié)點,會刪除失敗
                        log.info("刪除節(jié)點 {} 失敗", parentLockPath);
                    }
                }
                // 關(guān)閉zk連接
                if (zooKeeper != null) {
                    zooKeeper.close();
                }
            } catch (Exception e) {
                log.error("release lock error", e);
                throw new RuntimeException("release lock error");
            } finally {
//                System.out.println("releaseLock: " + childLockPath);
            }
        }

        private boolean getLockOrWatchLast() throws KeeperException, InterruptedException {
            List<String> children = zooKeeper.getChildren(parentLockPath, false);
            // 必須要排序一下,這里取出來的順序可能是亂的
            Collections.sort(children);
            // 如果當(dāng)前節(jié)點是第一個子節(jié)點,則獲取鎖成功
            if ((parentLockPath + "/" + children.get(0)).equals(childLockPath)) {
                return true;
            }

            // 如果不是第一個子節(jié)點,就監(jiān)聽前一個節(jié)點
            String last = "";
            for (String child : children) {
                if ((parentLockPath + "/" + child).equals(childLockPath)) {
                    break;
                }
                last = child;
            }

            if (zooKeeper.exists(parentLockPath + "/" + last, true) != null) {
                this.thread = Thread.currentThread();
                // 阻塞當(dāng)前線程
                LockSupport.park();
                // 喚醒之后重新檢測自己是不是最小的節(jié)點,因為有可能上一個節(jié)點斷線了
                return getLockOrWatchLast();
            } else {
                // 如果上一個節(jié)點不存在,說明還沒來得及監(jiān)聽就釋放了,重新檢查一次
                return getLockOrWatchLast();
            }
        }

        @Override
        public void process(WatchedEvent event) {
            if (this.thread != null) {
                // 喚醒阻塞的線程(這是在監(jiān)聽線程,跟獲取鎖的線程不是同一個線程)
                LockSupport.unpark(this.thread);
                this.thread = null;
            }
        }
    }
}

測試代碼

我們這里起兩批線程,一批獲取user_1這個鎖,一批獲取user_2這個鎖。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class ZkLockerTest {

    @Autowired
    private Locker locker;

    @Test
    public void testZkLocker() throws IOException {
        for (int i = 0; i < 1000; i++) {
            new Thread(()->{
                locker.lock("user_1", ()-> {
                    try {
                        System.out.println(String.format("user_1 time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }, "Thread-"+i).start();
        }
        for (int i = 1000; i < 2000; i++) {
            new Thread(()->{
                locker.lock("user_2", ()-> {
                    try {
                        System.out.println(String.format("user_2 time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }, "Thread-"+i).start();
        }

        System.in.read();
    }
}

運行結(jié)果:

可以看到穩(wěn)定在500ms左右打印兩個鎖的結(jié)果。

user_1 time: 1568973299578, threadName: Thread-10
user_2 time: 1568973299579, threadName: Thread-1780
user_1 time: 1568973300091, threadName: Thread-887
user_2 time: 1568973300091, threadName: Thread-1542
user_1 time: 1568973300594, threadName: Thread-882
user_2 time: 1568973300594, threadName: Thread-1539
user_2 time: 1568973301098, threadName: Thread-1592
user_1 time: 1568973301098, threadName: Thread-799
user_1 time: 1568973301601, threadName: Thread-444
user_2 time: 1568973301601, threadName: Thread-1096
user_1 time: 1568973302104, threadName: Thread-908
user_2 time: 1568973302104, threadName: Thread-1574
user_2 time: 1568973302607, threadName: Thread-1515
user_1 time: 1568973302607, threadName: Thread-80
user_1 time: 1568973303110, threadName: Thread-274
user_2 time: 1568973303110, threadName: Thread-1774
user_1 time: 1568973303615, threadName: Thread-324
user_2 time: 1568973303615, threadName: Thread-1621

curator實現(xiàn)

上面的原生API實現(xiàn)更易于理解zookeeper實現(xiàn)分布式鎖的邏輯,但是難免保證沒有什么問題,比如不是重入鎖,不支持讀寫鎖等。

下面我們一起看看現(xiàn)有的輪子curator是怎么實現(xiàn)的。

pom文件

pom文件中引入以下jar包:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.0</version>
</dependency>

代碼實現(xiàn)

下面是互斥鎖的一種實現(xiàn)方案:

@Component
@Slf4j
public class ZkCuratorLocker implements Locker {
    public static final String connAddr = "127.0.0.1:2181";
    public static final int timeout = 6000;
    public static final String LOCKER_ROOT = "/locker";

    private CuratorFramework cf;

    @PostConstruct
    public void init() {
        this.cf = CuratorFrameworkFactory.builder()
                .connectString(connAddr)
                .sessionTimeoutMs(timeout)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();

        cf.start();
    }

    @Override
    public void lock(String key, Runnable command) {
        String path = LOCKER_ROOT + "/" + key;
        InterProcessLock lock = new InterProcessMutex(cf, path);
        try {
            // 本文由公從號“彤哥讀源碼”原創(chuàng)
            lock.acquire();
            command.run();
        } catch (Exception e) {
            log.error("get lock error", e);
            throw new RuntimeException("get lock error", e);
        } finally {
            try {
                lock.release();
            } catch (Exception e) {
                log.error("release lock error", e);
                throw new RuntimeException("release lock error", e);
            }
        }
    }
}

除了互斥鎖,curator還提供了讀寫鎖、多重鎖、信號量等實現(xiàn)方式,而且他們是可重入的鎖。

總結(jié)

(1)zookeeper中的節(jié)點有四種類型:持久、持久有序、臨時、臨時有序;

(2)zookeeper提供了一種非常重要的特性——監(jiān)聽機制,它可以用來監(jiān)聽節(jié)點的變化;

(3)zookeeper分布式鎖是基于 臨時有序節(jié)點 + 監(jiān)聽機制 實現(xiàn)的;

(4)zookeeper分布式鎖加鎖時在鎖路徑下創(chuàng)建臨時有序節(jié)點;

(5)如果自己是第一個節(jié)點,則獲得鎖;

(6)如果自己不是第一個節(jié)點,則監(jiān)聽前一個節(jié)點,并阻塞當(dāng)前線程;

(7)當(dāng)監(jiān)聽到前一個節(jié)點的刪除事件時,喚醒當(dāng)前節(jié)點的線程,并再次檢查自己是不是第一個節(jié)點;

(8)使用臨時有序節(jié)點而不是持久有序節(jié)點是為了讓客戶端無故斷線時能夠自動釋放鎖;

彩蛋

zookeeper分布式鎖有哪些優(yōu)點?

答:1)zookeeper本身可以集群部署,相對于mysql的單點更可靠;

2)不會占用mysql的連接數(shù),不會增加mysql的壓力;

3)使用監(jiān)聽機制,減少線程上下文切換的次數(shù);

4)客戶端斷線能夠自動釋放鎖,非常安全;

5)有現(xiàn)有的輪子curator可以使用;

6)curator實現(xiàn)方式是可重入的,對現(xiàn)有代碼改造成本小;

zookeeper分布式鎖有哪些缺點?

答:1)加鎖會頻繁地“寫”zookeeper,增加zookeeper的壓力;

2)寫zookeeper的時候會在集群進(jìn)行同步,節(jié)點數(shù)越多,同步越慢,獲取鎖的過程越慢;

3)需要另外依賴zookeeper,而大部分服務(wù)是不會使用zookeeper的,增加了系統(tǒng)的復(fù)雜性;

4)相對于redis分布式鎖,性能要稍微略差一些;

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI