溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

zookeeper分布式鎖實(shí)現(xiàn)的方法是什么

發(fā)布時(shí)間:2021-12-23 12:00:09 來源:億速云 閱讀:131 作者:iii 欄目:云計(jì)算

本篇內(nèi)容介紹了“zookeeper分布式鎖實(shí)現(xiàn)的方法是什么”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

一。為何使用分布式鎖?
當(dāng)應(yīng)用服務(wù)器數(shù)量超過1臺(tái),對(duì)相同數(shù)據(jù)的訪問可能造成訪問沖突(特別是寫沖突)。單純使用關(guān)系數(shù)據(jù)庫(kù)比如MYSQL的應(yīng)用可以借助于事務(wù)來實(shí)現(xiàn)鎖,也可以使用版本號(hào)等實(shí)現(xiàn)樂觀鎖,最大的缺陷就是可用性降低(性能差)。對(duì)于GLEASY這種滿足大規(guī)模并發(fā)訪問請(qǐng)求的應(yīng)用來說,使用數(shù)據(jù)庫(kù)事務(wù)來實(shí)現(xiàn)數(shù)據(jù)庫(kù)就有些捉襟見肘了。另外對(duì)于一些不依賴數(shù)據(jù)庫(kù)的應(yīng)用,比如分布式文件系統(tǒng),為了保證同一文件在大量讀寫操作情況下的正確性,必須引入分布式鎖來約束對(duì)同一文件的并發(fā)操作。

二。對(duì)分布式鎖的要求
1.高性能(分布式鎖不能成為系統(tǒng)的性能瓶頸)
2.避免死鎖(拿到鎖的結(jié)點(diǎn)掛掉不會(huì)導(dǎo)致其它結(jié)點(diǎn)永遠(yuǎn)無法繼續(xù))
3.支持鎖重入

三。方案1,基于zookeeper的分布式鎖

/**
* DistributedLockUtil.java
* 分布式鎖工廠類,所有分布式請(qǐng)求都由該工廠類負(fù)責(zé)
**/
public class DistributedLockUtil {
	private static Object schemeLock = new Object();
	private static Object mutexLock = new Object();
	private static Map<String,Object> mutexLockMap = new ConcurrentHashMap();	
	private String schema;
	private Map<String,DistributedReentrantLock> cache  = new ConcurrentHashMap<String,DistributedReentrantLock>();
	
	private static Map<String,DistributedLockUtil> instances = new ConcurrentHashMap();
	public static DistributedLockUtil getInstance(String schema){
		DistributedLockUtil u = instances.get(schema);
		if(u==null){
			synchronized(schemeLock){
				u = instances.get(schema);
				if(u == null){
					u = new DistributedLockUtil(schema);
					instances.put(schema, u);
				}
			}
		}
		return u;
	}
	
	private DistributedLockUtil(String schema){
		this.schema = schema;
	}
	
	private Object getMutex(String key){
		Object mx = mutexLockMap.get(key);
		if(mx == null){
			synchronized(mutexLock){
				mx = mutexLockMap.get(key);
				if(mx==null){
					mx = new Object();
					mutexLockMap.put(key,mx);
				}
			}
		}
		return mx;
	}
	
	private DistributedReentrantLock getLock(String key){
		DistributedReentrantLock lock = cache.get(key);
		if(lock == null){
			synchronized(getMutex(key)){
				lock = cache.get(key);
				if(lock == null){
					lock = new DistributedReentrantLock(key,schema);
					cache.put(key, lock);
				}
			}
		}
		return lock;
	}
	
	public void reset(){
		for(String s : cache.keySet()){
			getLock(s).unlock();
		}
	}
	
	/**
	 * 嘗試加鎖
	 * 如果當(dāng)前線程已經(jīng)擁有該鎖的話,直接返回false,表示不用再次加鎖,此時(shí)不應(yīng)該再調(diào)用unlock進(jìn)行解鎖
	 * 
	 * @param key
	 * @return
	 * @throws InterruptedException
	 * @throws KeeperException
	 */
	public LockStat lock(String key) throws InterruptedException, KeeperException{
		if(getLock(key).isOwner()){
			return LockStat.NONEED;
		}
		getLock(key).lock();
		return LockStat.SUCCESS;
	}
	
	public void clearLock(String key) throws InterruptedException, KeeperException{
		synchronized(getMutex(key)){
			DistributedReentrantLock l = cache.get(key);
			l.clear();
			cache.remove(key);
		}
	}	
	
	public void unlock(String key,LockStat stat) throws InterruptedException, KeeperException{
		unlock(key,stat,false);
	}

	public void unlock(String key,LockStat stat,boolean keepalive) throws InterruptedException, KeeperException{
		if(stat == null) return;
		if(LockStat.SUCCESS.equals(stat)){
			DistributedReentrantLock lock =  getLock(key);
			boolean hasWaiter = lock.unlock();
			if(!hasWaiter && !keepalive){
				synchronized(getMutex(key)){
					lock.clear();
					cache.remove(key);
				}
			}
		}
	}
	
	public static enum LockStat{
		NONEED,
		SUCCESS
	}
}



/**
*DistributedReentrantLock.java
*本地線程之間鎖爭(zhēng)用,先使用虛擬機(jī)內(nèi)部鎖機(jī)制,減少結(jié)點(diǎn)間通信開銷
*/
public class DistributedReentrantLock {
	private static final Logger logger =  Logger.getLogger(DistributedReentrantLock.class);
    private ReentrantLock reentrantLock = new ReentrantLock();

    private WriteLock writeLock;
    private long timeout = 3*60*1000;
    
    private final Object mutex = new Object();
    private String dir;
    private String schema;
    
    private final ExitListener exitListener = new ExitListener(){
		@Override
		public void execute() {
			initWriteLock();
		}
	};
	
	private synchronized void initWriteLock(){
		logger.debug("初始化writeLock");
		writeLock = new WriteLock(dir,new LockListener(){

			@Override
			public void lockAcquired() {
				synchronized(mutex){
					mutex.notify();
				}
			}
			@Override
			public void lockReleased() {
			}
    		
    	},schema);
		
		if(writeLock != null && writeLock.zk != null){
			writeLock.zk.addExitListener(exitListener);
		}
		
		synchronized(mutex){
			mutex.notify();
		}
	}
	
    public DistributedReentrantLock(String dir,String schema) {	
    	this.dir = dir;
    	this.schema = schema;
    	initWriteLock();
    }

    public void lock(long timeout) throws InterruptedException, KeeperException {
        reentrantLock.lock();//多線程競(jìng)爭(zhēng)時(shí),先拿到第一層鎖
        try{
        	boolean res = writeLock.trylock();
        	if(!res){
	        	synchronized(mutex){
					mutex.wait(timeout);
				}
	        	if(writeLock == null || !writeLock.isOwner()){
	        		throw new InterruptedException("鎖超時(shí)");
	        	}
        	}
        }catch(InterruptedException e){
        	reentrantLock.unlock();
        	throw e;
        }catch(KeeperException e){
        	reentrantLock.unlock();
        	throw e;
        }
    }
    
    public void lock() throws InterruptedException, KeeperException {
    	lock(timeout);
    }

    public void destroy()  throws KeeperException {
    	writeLock.unlock();
    }
    

    public boolean unlock(){
    	if(!isOwner()) return false;
        try{
        	writeLock.unlock();
        	reentrantLock.unlock();//多線程競(jìng)爭(zhēng)時(shí),釋放最外層鎖
        }catch(RuntimeException e){
        	reentrantLock.unlock();//多線程競(jìng)爭(zhēng)時(shí),釋放最外層鎖
        	throw e;
        }
        
        return reentrantLock.hasQueuedThreads();
    }



    public boolean isOwner() {
        return reentrantLock.isHeldByCurrentThread() && writeLock.isOwner();
    }

	public void clear() {
		writeLock.clear();
	}

}



/**
*WriteLock.java
*基于zk的鎖實(shí)現(xiàn)
*一個(gè)最簡(jiǎn)單的場(chǎng)景如下:
*1.結(jié)點(diǎn)A請(qǐng)求加鎖,在特定路徑下注冊(cè)自己(會(huì)話自增結(jié)點(diǎn)),得到一個(gè)ID號(hào)1
*2.結(jié)點(diǎn)B請(qǐng)求加鎖,在特定路徑下注冊(cè)自己(會(huì)話自增結(jié)點(diǎn)),得到一個(gè)ID號(hào)2
*3.結(jié)點(diǎn)A獲取所有結(jié)點(diǎn)ID,判斷出來自己是最小結(jié)點(diǎn)號(hào),于是獲得鎖
*4.結(jié)點(diǎn)B獲取所有結(jié)點(diǎn)ID,判斷出來自己不是最小結(jié)點(diǎn),于是監(jiān)聽小于自己的最大結(jié)點(diǎn)(結(jié)點(diǎn)A)變更事件
*5.結(jié)點(diǎn)A拿到鎖,處理業(yè)務(wù),處理完,釋放鎖(刪除自己)
*6.結(jié)點(diǎn)B收到結(jié)點(diǎn)A變更事件,判斷出來自己已經(jīng)是最小結(jié)點(diǎn)號(hào),于是獲得鎖。
*/
public class WriteLock extends ZkPrimative {
   private static final Logger LOG =  Logger.getLogger(WriteLock.class);

    private final String dir;
    private String id;
    private LockNode idName;
    private String ownerId;
    private String lastChildId;
    private byte[] data = {0x12, 0x34};
    private LockListener callback;
    
    public WriteLock(String dir,String schema) {
        super(schema,true);
        this.dir = dir;
    }
    
    public WriteLock(String dir,LockListener callback,String schema) {
    	this(dir,schema);
        this.callback = callback;
    }

    public LockListener getLockListener() {
        return this.callback;
    }
    
    public void setLockListener(LockListener callback) {
        this.callback = callback;
    }

    public synchronized void unlock() throws RuntimeException {
    	if(zk == null || zk.isClosed()){
    		return;
    	}
        if (id != null) {
            try {
            	 zk.delete(id, -1);   
            } catch (InterruptedException e) {
                LOG.warn("Caught: " + e, e);
                //set that we have been interrupted.
               Thread.currentThread().interrupt();
            } catch (KeeperException.NoNodeException e) {
                // do nothing
            } catch (KeeperException e) {
                LOG.warn("Caught: " + e, e);
                throw (RuntimeException) new RuntimeException(e.getMessage()).
                    initCause(e);
            }finally {
                if (callback != null) {
                    callback.lockReleased();
                }
                id = null;
            }
        }
    }
    
    private class LockWatcher implements Watcher {
        public void process(WatchedEvent event) {
            LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + 
                    event.getState() + " type " + event.getType());
            try {
                trylock();
            } catch (Exception e) {
                LOG.warn("Failed to acquire lock: " + e, e);
            }
        }
    }
    
    private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) 
        throws KeeperException, InterruptedException {
        List<String> names = zookeeper.getChildren(dir, false);
        for (String name : names) {
            if (name.startsWith(prefix)) {
                id = dir + "/" + name;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Found id created last time: " + id);
                }
                break;
            }
        }
        if (id == null) {
            id = zookeeper.create(dir + "/" + prefix, data, 
                    acl, EPHEMERAL_SEQUENTIAL);

            if (LOG.isDebugEnabled()) {
                LOG.debug("Created id: " + id);
            }
        }

    }

	public void clear() {
		if(zk == null || zk.isClosed()){
    		return;
    	}
		try {
			zk.delete(dir, -1);
		} catch (Exception e) {
			 LOG.error("clear error: " + e,e);
		} 
	}
	
    public synchronized boolean trylock() throws KeeperException, InterruptedException {
    	if(zk == null){
    		LOG.info("zk 是空");
    		return false;
    	}
        if (zk.isClosed()) {
        	LOG.info("zk 已經(jīng)關(guān)閉");
            return false;
        }
        ensurePathExists(dir);
        
        LOG.debug("id:"+id);
        do {
            if (id == null) {
                long sessionId = zk.getSessionId();
                String prefix = "x-" + sessionId + "-";
                idName = new LockNode(id);
                LOG.debug("idName:"+idName);
            }
            if (id != null) {
                List<String> names = zk.getChildren(dir, false);
                if (names.isEmpty()) {
                    LOG.warn("No children in: " + dir + " when we've just " +
                    "created one! Lets recreate it...");
                    id = null;
                } else {
                    SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();
                    for (String name : names) {
                        sortedNames.add(new LockNode(dir + "/" + name));
                    }
                    ownerId = sortedNames.first().getName();
                    LOG.debug("all:"+sortedNames);
                    SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
                    LOG.debug("less than me:"+lessThanMe);
                    if (!lessThanMe.isEmpty()) {
                    	LockNode lastChildName = lessThanMe.last();
                        lastChildId = lastChildName.getName();
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("watching less than me node: " + lastChildId);
                        }
                        Stat stat = zk.exists(lastChildId, new LockWatcher());
                        if (stat != null) {
                            return Boolean.FALSE;
                        } else {
                            LOG.warn("Could not find the" +
                            		" stats for less than me: " + lastChildName.getName());
                        }
                    } else {
                        if (isOwner()) {
                            if (callback != null) {
                                callback.lockAcquired();
                            }
                            return Boolean.TRUE;
                        }
                    }
                }
            }
        }
        while (id == null);
        return Boolean.FALSE;
    }

    public String getDir() {
        return dir;
    }

    public boolean isOwner() {
        return id != null && ownerId != null && id.equals(ownerId);
    }

    public String getId() {
       return this.id;
    }
}

使用本方案實(shí)現(xiàn)的分布式鎖,可以很好地解決鎖重入的問題,而且使用會(huì)話結(jié)點(diǎn)來避免死鎖;性能方面,根據(jù)筆者自測(cè)結(jié)果,加鎖解鎖各一次算是一個(gè)操作,本方案實(shí)現(xiàn)的分布式鎖,TPS大概為2000-3000,性能比較一般

“zookeeper分布式鎖實(shí)現(xiàn)的方法是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI