溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Zookeeper中怎么實現(xiàn)一個分布式鎖

發(fā)布時間:2021-08-03 15:11:05 來源:億速云 閱讀:138 作者:Leah 欄目:編程語言

本篇文章為大家展示了Zookeeper中怎么實現(xiàn)一個分布式鎖,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

I. 方案設(shè)計

1. 創(chuàng)建節(jié)點方式實現(xiàn)

zk有四種節(jié)點,一個最容易想到的策略就是創(chuàng)建節(jié)點,誰創(chuàng)建成功了,就表示誰持有了這個鎖

這個思路與redissetnx有點相似,因為zk的節(jié)點創(chuàng)建,也只會有一個會話會創(chuàng)建成功,其他的則會拋已存在的異常

借助臨時節(jié)點,會話丟掉之后節(jié)點刪除,這樣可以避免持有鎖的實例異常而沒有主動釋放導致所有實例都無法持有鎖的問題

如果采用這種方案,如果我想實現(xiàn)阻塞獲取鎖的邏輯,那么其中一個方案就需要寫一個while(true)來不斷重試

while(true) {
    if (tryLock(xxx)) return true;
    else Thread.sleep(1000);
}

另外一個策略則是借助事件監(jiān)聽,當節(jié)點存在時,注冊一個節(jié)點刪除的觸發(fā)器,這樣就不需要我自己重試判斷了;充分借助zk的特性來實現(xiàn)異步回調(diào)

public void lock() {
  if (tryLock(path,  new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            synchronized (path){
                path.notify();
            }
        }
    })) {
      return true;
  }

  synchronized (path) {
      path.wait();
  }
}

那么上面這個實現(xiàn)有什么問題呢?

每次節(jié)點的變更,那么所有的都會監(jiān)聽到變動,好處是非公平鎖的支持;缺點就是剩下這些喚醒的實例中也只會有一個搶占到鎖,無意義的喚醒浪費性能

2. 臨時順序節(jié)點方式

接下來這種方案更加常見,晚上大部分的教程也是這種case,主要思路就是創(chuàng)建臨時順序節(jié)點

只有序號最小的節(jié)點,才表示搶占鎖成功;如果不是最小的節(jié)點,那么就監(jiān)聽它前面一個節(jié)點的刪除事件,前面節(jié)點刪除了,一種可能是他放棄搶鎖,一種是他釋放自己持有的鎖,不論哪種情況,對我而言,我都需要撈一下所有的節(jié)點,要么拿鎖成功;要么換一個前置節(jié)點

II.分布式鎖實現(xiàn)

接下來我們來一步步看下,基于臨時順序節(jié)點,可以怎么實現(xiàn)分布式鎖

對于zk,我們依然采用apache的提供的包 zookeeper來操作;后續(xù)提供Curator的分布式鎖實例

1. 依賴

核心依賴

<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.7.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

版本說明:

  • zk版本: 3.6.2

  • SpringBoot: 2.2.1.RELEASE

2. 簡單的分布式鎖

第一步,都是實例創(chuàng)建

public class ZkLock implements Watcher {

    private ZooKeeper zooKeeper;
    // 創(chuàng)建一個持久的節(jié)點,作為分布式鎖的根目錄
    private String root;

    public ZkLock(String root) throws IOException {
        try {
            this.root = root;
            zooKeeper = new ZooKeeper("127.0.0.1:2181", 500_000, this);
            Stat stat = zooKeeper.exists(root, false);
            if (stat == null) {
                // 不存在則創(chuàng)建
                createNode(root, true);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    
    // 簡單的封裝節(jié)點創(chuàng)建,這里只考慮持久 + 臨時順序
    private String createNode(String path, boolean persistent) throws Exception {
        return zooKeeper.create(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, persistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL_SEQUENTIAL);
    }
}

在我們的這個設(shè)計中,我們需要持有當前節(jié)點和監(jiān)聽前一個節(jié)點的變更,所以我們在ZkLock實例中,添加兩個成員

/**
 * 當前節(jié)點
 */
private String current;

/**
 * 前一個節(jié)點
 */
private String pre;

接下來就是嘗試獲取鎖的邏輯

  • current不存在,在表示沒有創(chuàng)建過,就創(chuàng)建一個臨時順序節(jié)點,并賦值current

  • current存在,則表示之前已經(jīng)創(chuàng)建過了,目前處于等待鎖釋放過程

  • 接下來根據(jù)當前節(jié)點順序是否最小,來表明是否持有鎖成功

  • 當順序不是最小時,找前面那個節(jié)點,并賦值 pre;

  • 監(jiān)聽pre的變化

/**
 * 嘗試獲取鎖,創(chuàng)建順序臨時節(jié)點,若數(shù)據(jù)最小,則表示搶占鎖成功;否則失敗
 *
 * @return
 */
public boolean tryLock() {
    try {
        String path = root + "/";
        if (current == null) {
            // 創(chuàng)建臨時順序節(jié)點
            current = createNode(path, false);
        }
        List<String> list = zooKeeper.getChildren(root, false);
        Collections.sort(list);

        if (current.equalsIgnoreCase(path + list.get(0))) {
            // 獲取鎖成功
            return true;
        } else {
            // 獲取鎖失敗,找到前一個節(jié)點
            int index = Collections.binarySearch(list, current.substring(path.length()));
            // 查詢當前節(jié)點前面的那個
            pre = path + list.get(index - 1);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    return false;
}

請注意上面的實現(xiàn),這里并沒有去監(jiān)聽前一個節(jié)點的變更,在設(shè)計tryLock,因為是立馬返回成功or失敗,所以使用這個接口的,不需要注冊監(jiān)聽

我們的監(jiān)聽邏輯,放在 lock() 同步阻塞里面

  • 嘗試搶占鎖,成功則直接返回

  • 拿鎖失敗,則監(jiān)聽前一個節(jié)點的刪除事件

public boolean lock() {
    if (tryLock()) {
        return true;
    }

    try {
        // 監(jiān)聽前一個節(jié)點的刪除事件
        Stat state = zooKeeper.exists(pre, true);
        if (state != null) {
            synchronized (pre) {
                // 阻塞等待前面的節(jié)點釋放
                pre.wait();
                // 這里不直接返回true,因為前面的一個節(jié)點刪除,可能并不是因為它持有鎖并釋放鎖,如果是因為這個會話中斷導致臨時節(jié)點刪除,這個時候需要做的是換一下監(jiān)聽的 preNode
                return lock();
            }
        } else {
          // 不存在,則再次嘗試拿鎖
          return lock();
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    return false;
}

注意:

  • 當節(jié)點不存在時,或者事件觸發(fā)回調(diào)之后,重新調(diào)用lock(),表明我胡漢三又來競爭鎖了?

為啥不是直接返回 true? 而是需要重新競爭呢?

  • 因為前面節(jié)點的刪除,有可能是因為前面節(jié)點的會話中斷導致的;但是鎖還在另外的實例手中,這個時候我應(yīng)該做的是重新排隊

最后別忘了釋放鎖

public void unlock() {
    try {
        zooKeeper.delete(current, -1);
        current = null;
        zooKeeper.close();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

到此,我們的分布式鎖就完成了,接下來我們復盤下實現(xiàn)過程

  • 所有知識點來自前一篇的zk基礎(chǔ)使用(創(chuàng)建節(jié)點,刪除節(jié)點,獲取所有自己點,監(jiān)聽事件)

  • 搶鎖過程 =》 創(chuàng)建序號最小的節(jié)點

  • 若節(jié)點不是最小的,那么就監(jiān)聽前面的節(jié)點刪除事件

這個實現(xiàn),支持了鎖的重入(why? 因為鎖未釋放時,我們保存了current,當前節(jié)點存在時則直接判斷是不是最小的;而不是重新創(chuàng)建)

3. 測試

最后寫一個測試case,來看下

@SpringBootApplication
public class Application {

    private void tryLock(long time) {
        ZkLock zkLock = null;
        try {
            zkLock = new ZkLock("/lock");
            System.out.println("嘗試獲取鎖: " + Thread.currentThread() + " at: " + LocalDateTime.now());
            boolean ans = zkLock.lock();
            System.out.println("執(zhí)行業(yè)務(wù)邏輯:" + Thread.currentThread() + " at:" + LocalDateTime.now());
            Thread.sleep(time);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (zkLock != null) {
                zkLock.unlock();
            }
        }
    }

    public Application() throws IOException, InterruptedException {
        new Thread(() -> tryLock(10_000)).start();

        Thread.sleep(1000);
        // 獲取鎖到執(zhí)行鎖會有10s的間隔,因為上面的線程搶占到鎖,并持有了10s
        new Thread(() -> tryLock(1_000)).start();
        System.out.println("---------over------------");

        Scanner scanner = new Scanner(System.in);
        String ans = scanner.next();
        System.out.println("---> over --->" + ans);
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

}

輸出結(jié)果如下

Zookeeper中怎么實現(xiàn)一個分布式鎖

上述內(nèi)容就是Zookeeper中怎么實現(xiàn)一個分布式鎖,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI