溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

死磕 java同步系列之Semaphore源碼解析

發(fā)布時(shí)間:2020-07-08 23:06:12 來源:網(wǎng)絡(luò) 閱讀:264 作者:彤哥讀源碼 欄目:編程語言

問題

(1)Semaphore是什么?

(2)Semaphore具有哪些特性?

(3)Semaphore通常使用在什么場景中?

(4)Semaphore的許可次數(shù)是否可以動(dòng)態(tài)增減?

(5)Semaphore如何實(shí)現(xiàn)限流?

簡介

Semaphore,信號(hào)量,它保存了一系列的許可(permits),每次調(diào)用acquire()都將消耗一個(gè)許可,每次調(diào)用release()都將歸還一個(gè)許可。

特性

Semaphore通常用于限制同一時(shí)間對共享資源的訪問次數(shù)上,也就是常說的限流。

下面我們一起來學(xué)習(xí)Java中Semaphore是如何實(shí)現(xiàn)的。

類結(jié)構(gòu)

死磕 java同步系列之Semaphore源碼解析

Semaphore中包含了一個(gè)實(shí)現(xiàn)了AQS的同步器Sync,以及它的兩個(gè)子類FairSync和NonFairSync,這說明Semaphore也是區(qū)分公平模式和非公平模式的。

源碼分析

基于之前對于ReentrantLock和ReentrantReadWriteLock的分析,這篇文章相對來說比較簡單,之前講過的一些方法將直接略過,有興趣的可以拉到文章底部查看之前的文章。

內(nèi)部類Sync

// java.util.concurrent.Semaphore.Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
    // 構(gòu)造方法,傳入許可次數(shù),放入state中
    Sync(int permits) {
        setState(permits);
    }
    // 獲取許可次數(shù)
    final int getPermits() {
        return getState();
    }
    // 非公平模式嘗試獲取許可
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            // 看看還有幾個(gè)許可
            int available = getState();
            // 減去這次需要獲取的許可還剩下幾個(gè)許可
            int remaining = available - acquires;
            // 如果剩余許可小于0了則直接返回
            // 如果剩余許可不小于0,則嘗試原子更新state的值,成功了返回剩余許可
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
    // 釋放許可
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            // 看看還有幾個(gè)許可
            int current = getState();
            // 加上這次釋放的許可
            int next = current + releases;
            // 檢測溢出
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // 如果原子更新state的值成功,就說明釋放許可成功,則返回true
            if (compareAndSetState(current, next))
                return true;
        }
    }
    // 減少許可
    final void reducePermits(int reductions) {
        for (;;) {
            // 看看還有幾個(gè)許可
            int current = getState();
            // 減去將要減少的許可
            int next = current - reductions;
            // 檢測舉出
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            // 原子更新state的值,成功了返回true
            if (compareAndSetState(current, next))
                return;
        }
    }
    // 銷毀許可
    final int drainPermits() {
        for (;;) {
            // 看看還有幾個(gè)許可
            int current = getState();
            // 如果為0,直接返回
            // 如果不為0,把state原子更新為0
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

通過Sync的幾個(gè)實(shí)現(xiàn)方法,我們獲取到以下幾點(diǎn)信息:

(1)許可是在構(gòu)造方法時(shí)傳入的;

(2)許可存放在狀態(tài)變量state中;

(3)嘗試獲取一個(gè)許可的時(shí)候,則state的值減1;

(4)當(dāng)state的值為0的時(shí)候,則無法再獲取許可;

(5)釋放一個(gè)許可的時(shí)候,則state的值加1;

(6)許可的個(gè)數(shù)可以動(dòng)態(tài)改變;

內(nèi)部類NonfairSync

// java.util.concurrent.Semaphore.NonfairSync
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
    // 構(gòu)造方法,調(diào)用父類的構(gòu)造方法
    NonfairSync(int permits) {
        super(permits);
    }
    // 嘗試獲取許可,調(diào)用父類的nonfairTryAcquireShared()方法
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

非公平模式下,直接調(diào)用父類的nonfairTryAcquireShared()嘗試獲取許可。

內(nèi)部類FairSync

// java.util.concurrent.Semaphore.FairSync
static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;
    // 構(gòu)造方法,調(diào)用父類的構(gòu)造方法
    FairSync(int permits) {
        super(permits);
    }
    // 嘗試獲取許可
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 公平模式需要檢測是否前面有排隊(duì)的
            // 如果有排隊(duì)的直接返回失敗
            if (hasQueuedPredecessors())
                return -1;
            // 沒有排隊(duì)的再嘗試更新state的值
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

公平模式下,先檢測前面是否有排隊(duì)的,如果有排隊(duì)的則獲取許可失敗,進(jìn)入隊(duì)列排隊(duì),否則嘗試原子更新state的值。

構(gòu)造方法

// 構(gòu)造方法,創(chuàng)建時(shí)要傳入許可次數(shù),默認(rèn)使用非公平模式
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
// 構(gòu)造方法,需要傳入許可次數(shù),及是否公平模式
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

創(chuàng)建Semaphore時(shí)需要傳入許可次數(shù)。

Semaphore默認(rèn)也是非公平模式,但是你可以調(diào)用第二個(gè)構(gòu)造方法聲明其為公平模式。

下面的方法在學(xué)習(xí)過前面的內(nèi)容看來都比較簡單,彤哥這里只列舉Semaphore支持的一些功能了。

以下的方法都是針對非公平模式來描述。

acquire()方法

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

獲取一個(gè)許可,默認(rèn)使用的是可中斷方式,如果嘗試獲取許可失敗,會(huì)進(jìn)入AQS的隊(duì)列中排隊(duì)。

acquireUninterruptibly()方法

public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

獲取一個(gè)許可,非中斷方式,如果嘗試獲取許可失敗,會(huì)進(jìn)入AQS的隊(duì)列中排隊(duì)。

tryAcquire()方法

public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

嘗試獲取一個(gè)許可,使用Sync的非公平模式嘗試獲取許可方法,不論是否獲取到許可都返回,只嘗試一次,不會(huì)進(jìn)入隊(duì)列排隊(duì)。

tryAcquire(long timeout, TimeUnit unit)方法

public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

嘗試獲取一個(gè)許可,先嘗試一次獲取許可,如果失敗則會(huì)等待timeout時(shí)間,這段時(shí)間內(nèi)都沒有獲取到許可,則返回false,否則返回true;

release()方法

public void release() {
    sync.releaseShared(1);
}

釋放一個(gè)許可,釋放一個(gè)許可時(shí)state的值會(huì)加1,并且會(huì)喚醒下一個(gè)等待獲取許可的線程。

acquire(int permits)方法

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

一次獲取多個(gè)許可,可中斷方式。

acquireUninterruptibly(int permits)方法

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

一次獲取多個(gè)許可,非中斷方式。

tryAcquire(int permits)方法

public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

一次嘗試獲取多個(gè)許可,只嘗試一次。

tryAcquire(int permits, long timeout, TimeUnit unit)方法

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

嘗試獲取多個(gè)許可,并會(huì)等待timeout時(shí)間,這段時(shí)間沒獲取到許可則返回false,否則返回true。

release(int permits)方法

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

一次釋放多個(gè)許可,state的值會(huì)相應(yīng)增加permits的數(shù)量。

availablePermits()方法

public int availablePermits() {
    return sync.getPermits();
}

獲取可用的許可次數(shù)。

drainPermits()方法

public int drainPermits() {
    return sync.drainPermits();
}

銷毀當(dāng)前可用的許可次數(shù),對于已經(jīng)獲取的許可沒有影響,會(huì)把當(dāng)前剩余的許可全部銷毀。

reducePermits(int reduction)方法

protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}

減少許可的次數(shù)。

總結(jié)

(1)Semaphore,也叫信號(hào)量,通常用于控制同一時(shí)刻對共享資源的訪問上,也就是限流場景;

(2)Semaphore的內(nèi)部實(shí)現(xiàn)是基于AQS的共享鎖來實(shí)現(xiàn)的;

(3)Semaphore初始化的時(shí)候需要指定許可的次數(shù),許可的次數(shù)是存儲(chǔ)在state中;

(4)獲取一個(gè)許可時(shí),則state值減1;

(5)釋放一個(gè)許可時(shí),則state值加1;

(6)可以動(dòng)態(tài)減少n個(gè)許可;

(7)可以動(dòng)態(tài)增加n個(gè)許可嗎?

彩蛋

(1)如何動(dòng)態(tài)增加n個(gè)許可?

答:調(diào)用release(int permits)即可。我們知道釋放許可的時(shí)候state的值會(huì)相應(yīng)增加,再回頭看看釋放許可的源碼,發(fā)現(xiàn)與ReentrantLock的釋放鎖還是有點(diǎn)區(qū)別的,Semaphore釋放許可的時(shí)候并不會(huì)檢查當(dāng)前線程有沒有獲取過許可,所以可以調(diào)用釋放許可的方法動(dòng)態(tài)增加一些許可。

(2)如何實(shí)現(xiàn)限流?

答:限流,即在流量突然增大的時(shí)候,上層要能夠限制住突然的大流量對下游服務(wù)的沖擊,在分布式系統(tǒng)中限流一般做在網(wǎng)關(guān)層,當(dāng)然在個(gè)別功能中也可以自己簡單地來限流,比如秒殺場景,假如只有10個(gè)商品需要秒殺,那么,服務(wù)本身可以限制同時(shí)只進(jìn)來100個(gè)請求,其它請求全部作廢,這樣服務(wù)的壓力也不會(huì)太大。

使用Semaphore就可以直接針對這個(gè)功能來限流,以下是代碼實(shí)現(xiàn):

public class SemaphoreTest {
    public static final Semaphore SEMAPHORE = new Semaphore(100);
    public static final AtomicInteger failCount = new AtomicInteger(0);
    public static final AtomicInteger successCount = new AtomicInteger(0);

    public static void main(String[] args) {
        for (int i = 0; i < 1000; i++) {
            new Thread(()->seckill()).start();
        }
    }

    public static boolean seckill() {
        if (!SEMAPHORE.tryAcquire()) {
            System.out.println("no permits, count="+failCount.incrementAndGet());
            return false;
        }

        try {
            // 處理業(yè)務(wù)邏輯
            Thread.sleep(2000);
            System.out.println("seckill success, count="+successCount.incrementAndGet());
        } catch (InterruptedException e) {
            // todo 處理異常
            e.printStackTrace();
        } finally {
            SEMAPHORE.release();
        }
        return true;
    }
}

推薦閱讀

1、 死磕 java同步系列之開篇

2、 死磕 java魔法類之Unsafe解析

3、 死磕 java同步系列之JMM(Java Memory Model)

4、 死磕 java同步系列之volatile解析

5、 死磕 java同步系列之synchronized解析

6、 死磕 java同步系列之自己動(dòng)手寫一個(gè)鎖Lock

7、 死磕 java同步系列之AQS起篇

8、 死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖

9、 死磕 java同步系列之ReentrantLock源碼解析(二)——條件鎖

10、 死磕 java同步系列之ReentrantLock VS synchronized

11、 死磕 java同步系列之ReentrantReadWriteLock源碼解析

歡迎關(guān)注我的公眾號(hào)“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。

死磕 java同步系列之Semaphore源碼解析

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI