溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何實現(xiàn)基于Jedis+ZK的分布式序列號生成器

發(fā)布時間:2021-10-14 14:26:55 來源:億速云 閱讀:109 作者:iii 欄目:編程語言

本篇內(nèi)容主要講解“如何實現(xiàn)基于Jedis+ZK的分布式序列號生成器”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“如何實現(xiàn)基于Jedis+ZK的分布式序列號生成器”吧!

部分源碼參考Jedis實現(xiàn)分布式鎖博客:

package com.xxx.arch.seq.utlis;

import com.xxx.arch.seq.client.redis.RedisSEQ;
import lombok.extern.slf4j.Slf4j;


/**
 * arch-seq 唯一code 獲取客戶端
 *
 * @author jdkleo
 */
@Slf4j
public class SEQUtil {

    /**
     * 生成默認(rèn)KEY的UUID規(guī)則: 日期yyMMdd 6位 + 分布式seqID 10位,總共6 + 10 = 16位
     *
     * @param
     * @return
     */
    public static long getSEQ() {
        return RedisSEQ.getSEQ();
    }

    /**
     * 生成默認(rèn)KEY連續(xù)的UUID,共total個
     *
     * @param total - 連續(xù)多少個
     * @return
     */
    public static long[] getSEQ(long total) {
        long value = RedisSEQ.getSEQ(total);
        return getValueArray(value, (int) total);
    }

    /**
     * 生成指定KEY的UUID規(guī)則: 日期yyMMdd 6位 + 分布式seqID 10位,總共6 + 10 = 16位
     *
     * @param seqName
     * @return
     */
    public static long getSEQ(String seqName) {
        return RedisSEQ.getSEQ(seqName, 1);
    }

    /**
     * 生成指定KEY連續(xù)的UUID,共total個
     *
     * @param seqName
     * @param total
     * @return
     */
    public static long[] getSEQ(String seqName, long total) {
        long value = RedisSEQ.getSEQ(seqName, total);
        return getValueArray(value, (int) total);
    }


    private static long[] getValueArray(long value, int total) {
        int n = total;
        long[] ret = new long[n];
        do {
            ret[n - 1] = value--;
        } while (--n > 0);
        return ret;
    }
}
package com.xxx.arch.seq.client.redis;

import com.xxx.arch.seq.client.tool.StreamCloseAble;
import lombok.extern.slf4j.Slf4j;

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Redis版本SEQ(有序SEQ)
 *
 * @author zhangyang
 * @createDate 2019-01-22
 * @since 2.x
 */
@Slf4j
public class RedisSEQ extends StreamCloseAble {

    //默認(rèn)的REDIS SEQ初始化狀態(tài)器KEY
    private static final String _DEFAULT_SEQ_INIT_KEY = "ARCH_SEQ_REDIS_SEQ_INIT";
    //默認(rèn)的REDIS SEQ初始化狀態(tài)器VAL
    private static final String _DEFAULT_SEQ_INIT_PENDING = "pending";
    private static final String _DEFAULT_SEQ_INIT_READY = "ready";
    //SEQ初始化容器狀態(tài)
    private static volatile boolean _DEFAULT_SEQ_INIT_STATUS;

    //默認(rèn)REDIS SEQ序列號的名稱
    private static final String _DEFAULT_SEQ_NAME = "ARCH_SEQ_REDIS_SEQ";

    //本地模式自增ID槽
    private final static AtomicInteger _LOCAL_INCR = new AtomicInteger(0);

    static {
        JedisConfig.JedisConn jedisConn = null;
        try {
            jedisConn = JedisConfig.getInstance().getConn();
            //if REDIS宕機或第一次:創(chuàng)建初始化狀態(tài)成功后,初始化redis keys(該方法可以恢復(fù)上次redis宕機數(shù)據(jù))
            if (jedisConn.setnx(_DEFAULT_SEQ_INIT_KEY, _DEFAULT_SEQ_INIT_PENDING) == 1) {//搶到REDIS初始化鎖,并將其標(biāo)記為pending狀態(tài)
                try {
                    RedisSEQTimer.getInstance().removeNotUsedKeys();
                    RedisSEQTimer.getInstance().initRedisKeys();//初始化REDIS,從ZK上讀取初始數(shù)據(jù)
                    jedisConn.set(_DEFAULT_SEQ_INIT_KEY, _DEFAULT_SEQ_INIT_READY);//初始化完成,標(biāo)記為ready狀態(tài)
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                    //初始化arch.seq REDIS數(shù)據(jù)異常,有可能是ZK相關(guān)問題,也有可能是REDIS問題,請排查
                    log.error("Initialization of arch.seq REDIS data exceptions, may be ZK-related problems, may also be REDIS problems, please check redis key:{}", _DEFAULT_SEQ_INIT_KEY);
                    jedisConn.del(_DEFAULT_SEQ_INIT_KEY);
                }
            }
            //else{...} 沒搶到REDIS初始化鎖的話:不作任何處理
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            log.error("Initialization of arch.seq REDIS data exceptions, may be arch.seq's configuration is not ready");
        } finally {
            close(jedisConn);
        }
    }


    public static Long getSEQ() {
        return getSEQ(_DEFAULT_SEQ_NAME, 1);
    }

    public static Long getSEQ(long total) {
        return getSEQ(_DEFAULT_SEQ_NAME, total);
    }

    public static Long getSEQ(String seqName, long total) {
        Long result = null;
        JedisConfig.JedisConn jedisConn = null;
        try {
            //獲取redis連接
            jedisConn = JedisConfig.getInstance().getConn();
            //獲得REDIS初始化狀態(tài)不成功
            if (!tryInitReady(jedisConn)) {
                //arch.seq By REDIS版本不能正常初始化,請檢查REDIS服務(wù)。
                throw new RuntimeException("arch.seq By REDIS version cannot be initialized properly. Please check the REDIS service.");
            }
            //開啟分布式鎖
            //if (jedisConn.tryLock(seqName, 1000, 2000)) {
            try {
                String day = RedisSEQTimer.getInstance().getDayFormat();
                String incrVal = String.format("%010d", getIncrVal(jedisConn, day, seqName, total));
                result = Long.parseLong(day + incrVal);
            } catch (Exception e) {
                e.printStackTrace();
                log.warn("try lock failed,the arch.seq tool will be retry after sleep some times.");
                Thread.sleep(randTime());
                result = getSEQ(seqName, total);
            }
        } catch (Throwable e) {
            log.error(e.getMessage(), e);
            //redis生成失敗,返回本地ID:15位納秒+1位自然數(shù)輪詢
            //在獲取【自增序列號:{},序列號分布式鎖:{}】時發(fā)生了異常,系統(tǒng)返回了本地生成的自增序列號,不影響系統(tǒng)使用,但請管理員盡快協(xié)查!
            log.error("An exception occurred while acquiring self-incremental sequence number '{}', " +
                    "sequence number distributed lock '{}',The system returns the locally generated self-incremental " +
                    "sequence number, which does not affect the use of the system, but the administrator should check " +
                    "it as soon as possible.", seqName, seqName + "_LOCK");
            result = xUUID();
        } finally {
            //切記,一定要釋放分布式鎖(注:釋放鎖的同時jedisConn會自動釋放connection,無需再次CLOSE)
            if (jedisConn != null) {
                //jedisConn.unLock(seqName);
                jedisConn.close();
            }
            if (log.isDebugEnabled()) {
                log.debug(seqName + ":" + result + ", trace:\n" + getStackTrace());
            }
        }
        return result;
        //arch.seq發(fā)生了不可預(yù)測的異常,請聯(lián)系架構(gòu)部處理!
        //throw new RuntimeException("arch.seq發(fā)生了不可預(yù)測的異常,請聯(lián)系架構(gòu)部處理!");
    }

    private static String getStackTrace() {
        StringBuilder result = new StringBuilder();
        StackTraceElement[] element = Thread.currentThread().getStackTrace();
        for (int i = 0; i < element.length; i++) {
            result.append("\t").append(element[i]).append("\n");
        }
        return result.toString();
    }

    private static long randTime() {
        return new Random().nextInt(50) + 50;
    }

    private static boolean tryInitReady(JedisConfig.JedisConn jedisConn) throws InterruptedException {
        int times = 0;
        for (; times < 3; times++) {
            if (getSEQInitReady(jedisConn)) {
                break;
            }
            Thread.sleep(100);
        }
        return times < 3;
    }

    /**
     * 獲得SEQ初始化狀態(tài)
     *
     * @param jedisConn
     * @return
     */
    private static boolean getSEQInitReady(JedisConfig.JedisConn jedisConn) {
        if (!_DEFAULT_SEQ_INIT_STATUS) {
            synchronized (RedisSEQ.class) {
                if (!_DEFAULT_SEQ_INIT_STATUS) {
                    _DEFAULT_SEQ_INIT_STATUS = _DEFAULT_SEQ_INIT_READY.equals(jedisConn.get(_DEFAULT_SEQ_INIT_KEY));
                }
            }
        }
        return _DEFAULT_SEQ_INIT_STATUS;
    }

    /**
     * 獲得REDIS自增序列號最新值,并同步更新到ZK備份數(shù)據(jù)節(jié)點守護線程中
     *
     * @param jedisConn
     * @param day
     * @param seqName
     * @param total
     * @return
     */
    private static Long getIncrVal(JedisConfig.JedisConn jedisConn, String day, String seqName, long total) {
        String key = seqName + "_" + day;
        Long incrVal = total > 1 ? jedisConn.incr(key, total) : jedisConn.incr(key);
        if (incrVal > 9999999999L) {
            throw new RuntimeException("Exceed the maximum value,sequence:" + incrVal);
        }
        //塞到要更新的ZK隊列中
        RedisSEQTimer.getInstance().push(key, incrVal);
        return incrVal;
    }

    /**
     * 單機模式生成UUID
     *
     * @return
     */
    private static Long xUUID() {
        int rand = _LOCAL_INCR.incrementAndGet() % 10;
        String result = System.nanoTime() + "" + rand;
        return Long.parseLong(result);
    }

}
package com.xxx.arch.seq.client.redis;

import com.xxx.arch.seq.client.tool.StreamCloseAble;
import com.xxx.arch.seq.client.tool.ZkClient;
import com.xxx.arch.seq.client.zk.ZkClientUtil;
import org.apache.commons.lang3.time.DateUtils;

import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;


public class RedisSEQTimer extends StreamCloseAble {
    public static final String DAY_FORMAT_PATTERN = "yyMMdd";

    public static volatile RedisSEQTimer redisSEQTimer;

    private final ConcurrentHashMap<String, Long> REDIS_INCR_MAP = new ConcurrentHashMap<>();

    private final ZkClient _ZK_CLIENT = ZkClientUtil.getZkClient();

    private final String _DEFAULT_ZK_NAMESPACE = "/ARCH_SEQ_REDIS";

    //zk節(jié)點最大值每次遞增數(shù)
    private long _REDIS_MAXVALUE_INIT = 10_000L;

    private Timer _TIMER = new Timer(true);

    //是否處于清理狀態(tài)
    private volatile boolean _CLEAN_STATUS;

    //清理key
    private static final String _REMOVE_KEY = "ARCH_SEQ_REMOVE_KEY";

    private RedisSEQTimer() {
        super();
        //啟動zk巡查服務(wù)
        _TIMER.schedule(new TimerTask() {
            @Override
            public void run() {
                checkAndConfigure();
            }
        }, new Date(), 1 * 60 * 1000);

        //每天定時清理垃圾數(shù)據(jù)
        _TIMER.schedule(new TimerTask() {
            @Override
            public void run() {
                removeNotUsedKeys();
            }
        }, getFirstTime(), 24 * 60 * 60 * 1000);
    }


    public static RedisSEQTimer getInstance() {
        if (redisSEQTimer == null) {
            synchronized (RedisSEQTimer.class) {
                if (redisSEQTimer == null) {
                    redisSEQTimer = new RedisSEQTimer();
                }
            }
        }
        return redisSEQTimer;
    }

    /**
     * 定期更新ZK節(jié)點
     */
    private synchronized void checkAndConfigure() {
        if (_CLEAN_STATUS) {
            return;
        }
        if (REDIS_INCR_MAP.isEmpty()) {
            return;
        }
        String endDay = "_" + getDayFormat();
        List<String> notTodayKeys = new ArrayList<>();
        Set<Map.Entry<String, Long>> entrySet = REDIS_INCR_MAP.entrySet();
        for (Map.Entry<String, Long> entry : entrySet) {
            //不是今天的key不作處理
            if (!entry.getKey().endsWith(endDay)) {
                notTodayKeys.add(entry.getKey());
                return;
            }
            //將最新的值寫到zk節(jié)點上 節(jié)點格式:<ARCH_SEQ前綴>/KEY_yyMMdd
            String zkNode = _DEFAULT_ZK_NAMESPACE + "/" + entry.getKey();
            if (_ZK_CLIENT.exists(zkNode)) {
                _ZK_CLIENT.writeData(zkNode, entry.getValue());
            } else {
                try {
                    _ZK_CLIENT.createPersistent(zkNode, entry.getValue());
                } catch (RuntimeException e) {
                    //not to write log ,it's will be retry in next time.
                }
            }
        }
        ;
        if (!notTodayKeys.isEmpty()) {
            for (String key : notTodayKeys) {
                REDIS_INCR_MAP.remove(key);
            }
        }
    }

    /**
     * 刪除不再使用的KEY(包含redis和zk節(jié)點)
     */
    public synchronized void removeNotUsedKeys() {
        if (!_ZK_CLIENT.exists(_DEFAULT_ZK_NAMESPACE)) {
            return;
        }
        _CLEAN_STATUS = true;
        JedisConfig.JedisConn jedisConn = null;
        String requestId = UUID.randomUUID().toString();
        boolean tryLock = false;
        try {
            List<String> list = _ZK_CLIENT.getChildren(_DEFAULT_ZK_NAMESPACE);

            //保留兩天??紤]到多個機器的時間可能不一致,如果在剛過零點刪除了昨天的sequence,另一臺機器可能還需要使用它,則會出現(xiàn)id重復(fù)
            Date now = new Date();
            Date yesterday = DateUtils.addDays(now, -1);
            List<String> keepDays = Arrays.asList(getDayFormat(now), getDayFormat(yesterday));

            if (list != null && !list.isEmpty()) {
                jedisConn = JedisConfig.getInstance().getConn();
                if (tryLock = jedisConn.tryLock(_REMOVE_KEY, requestId, 2000)) {
                    JedisConfig.JedisConn finalJedisConn = jedisConn;
                    for (String node : list) {
                        String dayPart = node.substring(node.length() - DAY_FORMAT_PATTERN.length());
                        if (!keepDays.contains(dayPart)) {
                            REDIS_INCR_MAP.remove(node);
                            finalJedisConn.del(node);
                            removeZkNode(node);
                        }
                    }
                }
            }
        } finally {
            _CLEAN_STATUS = false;
            if (jedisConn != null) {
                if (tryLock) {
                    jedisConn.unLock(_REMOVE_KEY, requestId);
                }
                jedisConn.close();
            }
        }
    }

    /**
     * 移除ZK節(jié)點
     *
     * @param node
     */
    private void removeZkNode(String node) {
        String path = _DEFAULT_ZK_NAMESPACE + "/" + node;
        if (_ZK_CLIENT.exists(path)) {
            try {
                _ZK_CLIENT.delete(path);
            } catch (Exception e) {
            }
        }
    }


    /**
     * 獲得每天定時任務(wù)的執(zhí)行時間
     *
     * @return
     */
    private Date getFirstTime() {
        Calendar calendar = Calendar.getInstance();
        calendar.set(Calendar.HOUR_OF_DAY, 24); // 24點  可以更改時間
        calendar.set(Calendar.MINUTE, getRandNum(6, 0)); // 0-5分鐘 隨機
        calendar.set(Calendar.SECOND, getRandNum(60, 0));// 0-59秒  隨機
        return calendar.getTime();
    }

    /**
     * 獲得區(qū)間隨機整數(shù)
     *
     * @param exclude - 最大數(shù),exclude
     * @param from    - 最小數(shù),include
     * @return
     */
    private int getRandNum(int exclude, int from) {
        return new Random().nextInt(exclude) + from;
    }


    /**
     * 將某天的KEY塞到相應(yīng)隊列
     *
     * @param key - 業(yè)務(wù)KEY key_yyMMdd
     * @param val - 值
     * @return 是否成功
     */
    public synchronized void push(String key, Long val) {
        REDIS_INCR_MAP.put(key, val);
    }

    public String getDayFormat() {
        return getDayFormat(new Date());
    }

    public String getDayFormat(Date date) {
        return new SimpleDateFormat(DAY_FORMAT_PATTERN).format(date);
    }

    /**
     * 初始化redis keys
     */
    public void initRedisKeys() {
        if (!_ZK_CLIENT.exists(_DEFAULT_ZK_NAMESPACE)) {
            return;
        }
        List<String> list = _ZK_CLIENT.getChildren(_DEFAULT_ZK_NAMESPACE);
        if (list != null && !list.isEmpty()) {
            Long zkVal;
            JedisConfig.JedisConn jedisConn = null;
            for (int i = 0; i < list.size(); i++) {
                zkVal = _ZK_CLIENT.readData(_DEFAULT_ZK_NAMESPACE + "/" + list.get(i));
                if (zkVal != null) {
                    String requestId = UUID.randomUUID().toString();
                    boolean tryLock = false;
                    try {
                        jedisConn = JedisConfig.getInstance().getConn();
                        //獲得鎖才更新,沒獲得鎖就放棄更新
                        if (tryLock = jedisConn.tryLock(list.get(i), requestId, 2000)) {
                            jedisConn.set(list.get(i), String.valueOf(zkVal + _REDIS_MAXVALUE_INIT));
                        }
                    } finally {
                        if (jedisConn != null) {
                            if (tryLock) {
                                jedisConn.unLock(list.get(i), requestId);
                            }
                            jedisConn.close();
                        }
                    }
                }
            }
        }
    }


}
package com.xxx.arch.seq.client.tool;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.Collections;
import java.util.List;


@Slf4j
public class ZkClient {

    private CuratorFramework client;

    public ZkClient(String serverList, int connectionTimeoutMs, int sessionTimeout) {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder()
                .connectString(serverList)
                .connectionTimeoutMs(connectionTimeoutMs)
                .sessionTimeoutMs(sessionTimeout)
                .retryPolicy(retryPolicy)
                .build();
        client.start();
    }


    public boolean exists(String path) {
        try {
            return client.checkExists().forPath(path) != null;
        } catch (Exception e) {
            return false;
        }
    }

    public void writeData(String path, Long value) {
        try {
            client.setData().forPath(path, value.toString().getBytes());
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    public void createPersistent(String zkNode, Long value) {
        try {
            client.create().forPath(zkNode, value.toString().getBytes());
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    public List<String> getChildren(String path) {
        try {
            return client.getChildren().forPath(path);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        return Collections.emptyList();
    }

    public Long readData(String path) {
        try {
            byte[] data = client.getData().forPath(path);
            return Long.parseLong(new String(data));
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        return null;
    }

    public void delete(String path) {
        try {
            client.delete().forPath(path);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
}
package com.xxx.arch.seq.client.zk;

import com.xxx.arch.seq.client.tool.ZkClient;
import com.xxx.arch.seq.constant.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ZkClientUtil {

    private static final Logger logger = LoggerFactory.getLogger(ZkClientUtil.class);

    private static volatile ZkClient zkClient = null;

    public static ZkClient getZkClient() {
        if (zkClient == null) {
            synchronized (ZkClientUtil.class) {
                if (zkClient == null) {
                    initZkClient();
                }
            }
        }
        return zkClient;
    }

    private static void initZkClient() {
        try {
            String serverList = Constants.ARCH_SEQ_ZOOKEEPER_CONNECT_STRING;
            if (logger.isInfoEnabled()) {
                logger.info("zk cluster[" + serverList + "]");
            }
            if (serverList == null || serverList.trim().isEmpty()) {
                throw new RuntimeException("no \"arch.seq.zk-cluster.serverList\" config.used");
            } else {
                zkClient = new ZkClient(serverList, 15000, 60000);
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

}
package com.xxx.arch.seq.client.tool;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;

/**
 * Created by zhangyang on 2016/5/31.
 */
public class StreamCloseAble {
    private static Logger logger = LoggerFactory.getLogger(StreamCloseAble.class);

    /**
     * 關(guān)閉輸入輸出流
     *
     * @param closeAbles
     */
    public static void close(Closeable... closeAbles) {
        if (closeAbles == null || closeAbles.length <= 0) {
            return;
        }
        for (Closeable closeAble : closeAbles) {
            if (closeAble != null) {
                try {
                    closeAble.close();
                } catch (IOException e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    }
}

到此,相信大家對“如何實現(xiàn)基于Jedis+ZK的分布式序列號生成器”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI