溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

ZooKeeper同步框架怎么實現(xiàn)

發(fā)布時間:2021-12-23 12:02:03 來源:億速云 閱讀:112 作者:iii 欄目:云計算

本篇內(nèi)容主要講解“ZooKeeper同步框架怎么實現(xiàn)”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“ZooKeeper同步框架怎么實現(xiàn)”吧!

首先,定義一個同步接口,它有一個execute方法,主要負(fù)責(zé)同步任務(wù)的實現(xiàn)。

Path參數(shù)是任務(wù)節(jié)點(用戶),只有相同的節(jié)點才會同步工作。想象一下,去銀行取錢,如果每個人都有一個專屬的柜臺,那效率是明顯的。

SynchronousProcessor參數(shù)用來處理具體的業(yè)務(wù)。

Synchronous.java

package org.bigmouth.nvwa.zookeeper.concurrent;
 
 
/**
 * 同步,支持分布式
 * 
 * @author Allen Hu 
 * 2015-4-17
 */
public interface Synchronous {
 
    /**
     * 同步執(zhí)行,根據(jù)path標(biāo)識來區(qū)分同步工作。不同的path將不會同步進行。
     * 
     * @param處理結(jié)果類型
     * @param path 任務(wù)節(jié)點
     * e.g. "/project/synchronous/0000001"
     * @param processor 業(yè)務(wù)處理器
     * @return 處理結(jié)果
     */T execute(String path, SynchronousProcessorprocessor);
}

MutexLockSynchronous.java

Synchronous的實現(xiàn)類,基于普通排它鎖的方式實現(xiàn)。

package org.bigmouth.nvwa.zookeeper.concurrent;
 
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.zookeeper.common.PathUtils;
import org.bigmouth.nvwa.zookeeper.ZkClientHolder;
 
 
/**
 * 基于普通排他鎖的方式實現(xiàn)同步
 * 
 * @author Allen Hu 
 * 2015-4-17
 */
public class MutexLockSynchronous implements Synchronous {
 
    private final ZkClientHolder zkClientHolder;
     
    public MutexLockSynchronous(ZkClientHolder zkClientHolder) {
        this.zkClientHolder = zkClientHolder;
    }
 
    @Override
    publicT execute(String path, SynchronousProcessorprocessor) {
        PathUtils.validatePath(path);
        InterProcessLock lock = new InterProcessMutex(zkClientHolder.get(), path);
        try {
            lock.acquire();
            if (null != processor)
                return processor.process();
        }
        catch (Exception e) {
            if (null != processor)
                processor.exceptionCaught(e);
        }
        finally {
            try {
                lock.release();
            }
            catch (Exception e) {
            }
        }
        return null;
    }
}




SynchronousProcessor.java

任務(wù)處理器接口,實現(xiàn)它來完成具體的業(yè)務(wù)工作

package org.bigmouth.nvwa.zookeeper.concurrent;
 
 
/**
 * 同步業(yè)務(wù)處理器
 * 
 * @author Allen Hu 
 * 2015-4-17
 */
public interface SynchronousProcessor{
 
    /**
     * 處理具體的業(yè)務(wù)
     * 
     * @return
     */
    T process();
     
    /**
     * 異常捕獲
     * 
     * @param throwable
     */
    void exceptionCaught(Throwable throwable);
}

ZkClientHolder.java

當(dāng)然少不了這個了,繼承的父類可以不需要了解,就是定義了兩個抽象方法:doInit和doDestroy方法。

package org.bigmouth.nvwa.zookeeper;
 
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.bigmouth.nvwa.utils.BaseLifeCycleSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import com.google.common.base.Preconditions;
 
 
/**
 * ZooKeeper client holder
 * 
 * @author Allen Hu 
 * 2015-4-16
 */
public class ZkClientHolder extends BaseLifeCycleSupport {
 
    private static final Logger LOGGER = LoggerFactory.getLogger(ZkClientHolder.class);
     
    public static final int MAX_RETRIES = 3;
    public static final int BASE_SLEEP_TIMEMS = 3000;
 
    private CuratorFramework zkClient;
 
    private final String connectString;
    private final int sessionTimeout;
     
    public ZkClientHolder(String connectString, int sessionTimeout) {
        Preconditions.checkArgument(StringUtils.isNotBlank(connectString), "connectString cannot be blank");
        Preconditions.checkArgument(sessionTimeout >= 10000, "sessionTimeout must be greater than 10000");
        this.connectString = connectString;
        this.sessionTimeout = sessionTimeout;
    }
     
    public CuratorFramework get() {
        return zkClient;
    }
 
    @Override
    protected void doInit() {
        zkClient = CuratorFrameworkFactory.builder()
                .sessionTimeoutMs(sessionTimeout)
                .connectString(connectString)
                .retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIMEMS, MAX_RETRIES))
                .build();
        zkClient.start();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Connected to ZooKepper server: {}", connectString);
        }
    }
 
    @Override
    protected void doDestroy() {
        if (null != zkClient)
            zkClient.close();
    }
}

最后來個測試類,模擬多個用戶多線程處理任務(wù)的過程,我們達到了相同用戶間同步的目的。

package org.bigmouth.nvwa.zookeeper.concurrent;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
import org.apache.curator.utils.ZKPaths;
import org.bigmouth.nvwa.zookeeper.ZkClientHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
 
/**
 * 
 * @author Allen Hu 
 * 2015-4-17
 */
public class ConcurrentTest {
     
    private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentTest.class);
    private ZkClientHolder zkClientHolder = new ZkClientHolder("172.16.3.24:2181", 60000);
    private Synchronous synchronous;
     
    public ConcurrentTest() {
        zkClientHolder.init();
        synchronous = new MutexLockSynchronous(zkClientHolder);
    }
 
    public class Service implements Runnable {
         
        private final String id;
        private final long sleepInMillis;
         
        public Service(String id, long sleepInMillis) {
            this.id = id;
            this.sleepInMillis = sleepInMillis;
        }
 
        @Override
        public void run() {
            synchronous.execute(ZKPaths.makePath("/nvwa/zookeeper/concurrent", id), new SynchronousProcessor() {
 
                @Override
                public String process() {
                    LOGGER.info(id + " star...!");
                    try {
                        Thread.sleep(sleepInMillis);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    LOGGER.info(id + " has execution!");
                    return id;
                }
 
                @Override
                public void exceptionCaught(Throwable throwable) {
                    throwable.printStackTrace();
                }
            });
        }
    }
 
    static ExecutorService executor = Executors.newCachedThreadPool();
     
    public static void main(String[] args) {
        ConcurrentTest ct = new ConcurrentTest();
        executor.submit(ct.new Service("1", 5000)); // 1號 處理5秒
        executor.submit(ct.new Service("1", 2000)); // 1號 處理2秒
        executor.submit(ct.new Service("2", 5000)); // 2號 處理5秒
        executor.submit(ct.new Service("3", 10000)); // 3號 處理10秒
        executor.submit(ct.new Service("3", 500)); // 3號 處理0.5秒
    }
}


輸出結(jié)果,1、2、3任務(wù)并行,而相同的任務(wù)串行。如:第二個1號等第一個1號執(zhí)行完才開始。

ZooKeeper同步框架怎么實現(xiàn)

到此,相信大家對“ZooKeeper同步框架怎么實現(xiàn)”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI