溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

死磕 java集合之SynchronousQueue源碼分析

發(fā)布時(shí)間:2020-08-17 15:28:16 來(lái)源:網(wǎng)絡(luò) 閱讀:336 作者:彤哥讀源碼 欄目:編程語(yǔ)言

問(wèn)題

(1)SynchronousQueue的實(shí)現(xiàn)方式?

(2)SynchronousQueue真的是無(wú)緩沖的嗎?

(3)SynchronousQueue在高并發(fā)情景下會(huì)有什么問(wèn)題?

簡(jiǎn)介

SynchronousQueue是java并發(fā)包下無(wú)緩沖阻塞隊(duì)列,它用來(lái)在兩個(gè)線(xiàn)程之間移交元素,但是它有個(gè)很大的問(wèn)題,你知道是什么嗎?請(qǐng)看下面的分析。

源碼分析

主要屬性

// CPU的數(shù)量
static final int NCPUS = Runtime.getRuntime().availableProcessors();
// 有超時(shí)的情況自旋多少次,當(dāng)CPU數(shù)量小于2的時(shí)候不自旋
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
// 沒(méi)有超時(shí)的情況自旋多少次
static final int maxUntimedSpins = maxTimedSpins * 16;
// 針對(duì)有超時(shí)的情況,自旋了多少次后,如果剩余時(shí)間大于1000納秒就使用帶時(shí)間的LockSupport.parkNanos()這個(gè)方法
static final long spinForTimeoutThreshold = 1000L;
// 傳輸器,即兩個(gè)線(xiàn)程交換元素使用的東西
private transient volatile Transferer<E> transferer;

通過(guò)屬性我們可以Get到兩個(gè)點(diǎn):

(1)這個(gè)阻塞隊(duì)列里面是會(huì)自旋的;

(2)它使用了一個(gè)叫做transferer的東西來(lái)交換元素;

主要內(nèi)部類(lèi)

// Transferer抽象類(lèi),主要定義了一個(gè)transfer方法用來(lái)傳輸元素
abstract static class Transferer<E> {
    abstract E transfer(E e, boolean timed, long nanos);
}
// 以棧方式實(shí)現(xiàn)的Transferer
static final class TransferStack<E> extends Transferer<E> {
    // 棧中節(jié)點(diǎn)的幾種類(lèi)型:
    // 1. 消費(fèi)者(請(qǐng)求數(shù)據(jù)的)
    static final int REQUEST    = 0;
    // 2. 生產(chǎn)者(提供數(shù)據(jù)的)
    static final int DATA       = 1;
    // 3. 二者正在撮合中
    static final int FULFILLING = 2;

    // 棧中的節(jié)點(diǎn)
    static final class SNode {
        // 下一個(gè)節(jié)點(diǎn)
        volatile SNode next;        // next node in stack
        // 匹配者
        volatile SNode match;       // the node matched to this
        // 等待著的線(xiàn)程
        volatile Thread waiter;     // to control park/unpark
        // 元素
        Object item;                // data; or null for REQUESTs
        // 模式,也就是節(jié)點(diǎn)的類(lèi)型,是消費(fèi)者,是生產(chǎn)者,還是正在撮合中
        int mode;
    }
    // 棧的頭節(jié)點(diǎn)
    volatile SNode head;
}
// 以隊(duì)列方式實(shí)現(xiàn)的Transferer
static final class TransferQueue<E> extends Transferer<E> {
    // 隊(duì)列中的節(jié)點(diǎn)
    static final class QNode {
        // 下一個(gè)節(jié)點(diǎn)
        volatile QNode next;          // next node in queue
        // 存儲(chǔ)的元素
        volatile Object item;         // CAS'ed to or from null
        // 等待著的線(xiàn)程
        volatile Thread waiter;       // to control park/unpark
        // 是否是數(shù)據(jù)節(jié)點(diǎn)
        final boolean isData;
    }

    // 隊(duì)列的頭節(jié)點(diǎn)
    transient volatile QNode head;
    // 隊(duì)列的尾節(jié)點(diǎn)
    transient volatile QNode tail;
}

(1)定義了一個(gè)抽象類(lèi)Transferer,里面定義了一個(gè)傳輸元素的方法;

(2)有兩種傳輸元素的方法,一種是棧,一種是隊(duì)列;

(3)棧的特點(diǎn)是后進(jìn)先出,隊(duì)列的特點(diǎn)是先進(jìn)行出;

(4)棧只需要保存一個(gè)頭節(jié)點(diǎn)就可以了,因?yàn)榇嫒≡囟际遣僮黝^節(jié)點(diǎn);

(5)隊(duì)列需要保存一個(gè)頭節(jié)點(diǎn)一個(gè)尾節(jié)點(diǎn),因?yàn)榇嬖夭僮魑补?jié)點(diǎn),取元素操作頭節(jié)點(diǎn);

(6)每個(gè)節(jié)點(diǎn)中保存著存儲(chǔ)的元素、等待著的線(xiàn)程,以及下一個(gè)節(jié)點(diǎn);

(7)棧和隊(duì)列兩種方式有什么不同呢?請(qǐng)看下面的分析。

主要構(gòu)造方法

public SynchronousQueue() {
    // 默認(rèn)非公平模式
    this(false);
}

public SynchronousQueue(boolean fair) {
    // 如果是公平模式就使用隊(duì)列,如果是非公平模式就使用棧
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

(1)默認(rèn)使用非公平模式,也就是棧結(jié)構(gòu);

(2)公平模式使用隊(duì)列,非公平模式使用棧;

入隊(duì)

我們這里主要介紹以棧方式實(shí)現(xiàn)的傳輸模式,以put(E e)方法為例。

public void put(E e) throws InterruptedException {
    // 元素不可為空
    if (e == null) throw new NullPointerException();
    // 直接調(diào)用傳輸器的transfer()方法
    // 三個(gè)參數(shù)分別是:傳輸?shù)脑?,是否需要超時(shí),超時(shí)的時(shí)間
    if (transferer.transfer(e, false, 0) == null) {
        // 如果傳輸失敗,直接讓線(xiàn)程中斷并拋出中斷異常
        Thread.interrupted();
        throw new InterruptedException();
    }
}

調(diào)用transferer的transfer()方法,傳入元素e,說(shuō)明是生產(chǎn)者

出隊(duì)

我們這里主要介紹以棧方式實(shí)現(xiàn)的傳輸模式,以take()方法為例。

public E take() throws InterruptedException {
    // 直接調(diào)用傳輸器的transfer()方法
    // 三個(gè)參數(shù)分別是:null,是否需要超時(shí),超時(shí)的時(shí)間
    // 第一個(gè)參數(shù)為null表示是消費(fèi)者,要取元素
    E e = transferer.transfer(null, false, 0);
    // 如果取到了元素就返回
    if (e != null)
        return e;
    // 否則讓線(xiàn)程中斷并拋出中斷異常
    Thread.interrupted();
    throw new InterruptedException();
}

調(diào)用transferer的transfer()方法,傳入null,說(shuō)明是消費(fèi)者。

transfer()方法

transfer()方法同時(shí)實(shí)現(xiàn)了取元素和放元素的功能,下面我再來(lái)看看這個(gè)transfer()方法里究竟干了什么。

// TransferStack.transfer()方法
E transfer(E e, boolean timed, long nanos) {
    SNode s = null; // constructed/reused as needed
    // 根據(jù)e是否為null決定是生產(chǎn)者還是消費(fèi)者
    int mode = (e == null) ? REQUEST : DATA;
    // 自旋+CAS,熟悉的套路,熟悉的味道
    for (;;) {
        // 棧頂元素
        SNode h = head;
        // 棧頂沒(méi)有元素,或者棧頂元素跟當(dāng)前元素是一個(gè)模式的
        // 也就是都是生產(chǎn)者節(jié)點(diǎn)或者都是消費(fèi)者節(jié)點(diǎn)
        if (h == null || h.mode == mode) {  // empty or same-mode
            // 如果有超時(shí)而且已到期
            if (timed && nanos <= 0) {      // can't wait
                // 如果頭節(jié)點(diǎn)不為空且是取消狀態(tài)
                if (h != null && h.isCancelled())
                    // 就把頭節(jié)點(diǎn)彈出,并進(jìn)入下一次循環(huán)
                    casHead(h, h.next);     // pop cancelled node
                else
                    // 否則,直接返回null(超時(shí)返回null)
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // 入棧成功(因?yàn)槭悄J较嗤?,所以只能入棧?                // 調(diào)用awaitFulfill()方法自旋+阻塞當(dāng)前入棧的線(xiàn)程并等待被匹配到
                SNode m = awaitFulfill(s, timed, nanos);
                // 如果m等于s,說(shuō)明取消了,那么就把它清除掉,并返回null
                if (m == s) {               // wait was cancelled
                    clean(s);
                    // 被取消了返回null
                    return null;
                }

                // 到這里說(shuō)明匹配到元素了
                // 因?yàn)閺腶waitFulfill()里面出來(lái)要不被取消了要不就匹配到了

                // 如果頭節(jié)點(diǎn)不為空,并且頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)是s
                // 就把頭節(jié)點(diǎn)換成s的下一個(gè)節(jié)點(diǎn)
                // 也就是把h和s都彈出了
                // 也就是把棧頂兩個(gè)元素都彈出了
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                // 根據(jù)當(dāng)前節(jié)點(diǎn)的模式判斷返回m還是s中的值
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            // 到這里說(shuō)明頭節(jié)點(diǎn)和當(dāng)前節(jié)點(diǎn)模式不一樣
            // 如果頭節(jié)點(diǎn)不是正在撮合中

            // 如果頭節(jié)點(diǎn)已經(jīng)取消了,就把它彈出棧
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                // 頭節(jié)點(diǎn)沒(méi)有在撮合中,就讓當(dāng)前節(jié)點(diǎn)先入隊(duì),再讓他們嘗試匹配
                // 且s成為了新的頭節(jié)點(diǎn),它的狀態(tài)是正在撮合中
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    // 如果m為null,說(shuō)明除了s節(jié)點(diǎn)外的節(jié)點(diǎn)都被其它線(xiàn)程先一步撮合掉了
                    // 就清空棧并跳出內(nèi)部循環(huán),到外部循環(huán)再重新入棧判斷
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    // 如果m和s嘗試撮合成功,就彈出棧頂?shù)膬蓚€(gè)元素m和s
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        // 返回撮合結(jié)果
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        // 嘗試撮合失敗,說(shuō)明m已經(jīng)先一步被其它線(xiàn)程撮合了
                        // 就協(xié)助清除它
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
            // 到這里說(shuō)明當(dāng)前節(jié)點(diǎn)和頭節(jié)點(diǎn)模式不一樣
            // 且頭節(jié)點(diǎn)是正在撮合中

            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                // 如果m為null,說(shuō)明m已經(jīng)被其它線(xiàn)程先一步撮合了
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                // 協(xié)助匹配,如果m和s嘗試撮合成功,就彈出棧頂?shù)膬蓚€(gè)元素m和s
                if (m.tryMatch(h))          // help match
                    // 將棧頂?shù)膬蓚€(gè)元素彈出后,再讓s重新入棧
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    // 嘗試撮合失敗,說(shuō)明m已經(jīng)先一步被其它線(xiàn)程撮合了
                    // 就協(xié)助清除它
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

// 三個(gè)參數(shù):需要等待的節(jié)點(diǎn),是否需要超時(shí),超時(shí)時(shí)間
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    // 到期時(shí)間
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 當(dāng)前線(xiàn)程
    Thread w = Thread.currentThread();
    // 自旋次數(shù)
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 當(dāng)前線(xiàn)程中斷了,嘗試清除s
        if (w.isInterrupted())
            s.tryCancel();

        // 檢查s是否匹配到了元素m(有可能是其它線(xiàn)程的m匹配到當(dāng)前線(xiàn)程的s)
        SNode m = s.match;
        // 如果匹配到了,直接返回m
        if (m != null)
            return m;

        // 如果需要超時(shí)
        if (timed) {
            // 檢查超時(shí)時(shí)間如果小于0了,嘗試清除s
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            // 如果還有自旋次數(shù),自旋次數(shù)減一,并進(jìn)入下一次自旋
            spins = shouldSpin(s) ? (spins-1) : 0;

        // 后面的elseif都是自旋次數(shù)沒(méi)有了
        else if (s.waiter == null)
            // 如果s的waiter為null,把當(dāng)前線(xiàn)程注入進(jìn)去,并進(jìn)入下一次自旋
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)
            // 如果不允許超時(shí),直接阻塞,并等待被其它線(xiàn)程喚醒,喚醒后繼續(xù)自旋并查看是否匹配到了元素
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            // 如果允許超時(shí)且還有剩余時(shí)間,就阻塞相應(yīng)時(shí)間
            LockSupport.parkNanos(this, nanos);
    }
}

    // SNode里面的方向,調(diào)用者m是s的下一個(gè)節(jié)點(diǎn)
    // 這時(shí)候m節(jié)點(diǎn)的線(xiàn)程應(yīng)該是阻塞狀態(tài)的
    boolean tryMatch(SNode s) {
        // 如果m還沒(méi)有匹配者,就把s作為它的匹配者
        if (match == null &&
            UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
            Thread w = waiter;
            if (w != null) {    // waiters need at most one unpark
                waiter = null;
                // 喚醒m中的線(xiàn)程,兩者匹配完畢
                LockSupport.unpark(w);
            }
            // 匹配到了返回true
            return true;
        }
        // 可能其它線(xiàn)程先一步匹配了m,返回其是否是s
        return match == s;
    }

整個(gè)邏輯比較復(fù)雜,這里為了簡(jiǎn)單起見(jiàn),屏蔽掉多線(xiàn)程處理的細(xì)節(jié),只描述正常業(yè)務(wù)場(chǎng)景下的邏輯:

(1)如果棧中沒(méi)有元素,或者棧頂元素跟將要入棧的元素模式一樣,就入棧;

(2)入棧后自旋等待一會(huì)看有沒(méi)有其它線(xiàn)程匹配到它,自旋完了還沒(méi)匹配到元素就阻塞等待;

(3)阻塞等待被喚醒了說(shuō)明其它線(xiàn)程匹配到了當(dāng)前的元素,就返回匹配到的元素;

(4)如果兩者模式不一樣,且頭節(jié)點(diǎn)沒(méi)有在匹配中,就拿當(dāng)前節(jié)點(diǎn)跟它匹配,匹配成功了就返回匹配到的元素;

(5)如果兩者模式不一樣,且頭節(jié)點(diǎn)正在匹配中,當(dāng)前線(xiàn)程就協(xié)助去匹配,匹配完成了再讓當(dāng)前節(jié)點(diǎn)重新入棧重新匹配;

如果直接閱讀這部分代碼還是比較困難的,建議寫(xiě)個(gè)測(cè)試用例,打個(gè)斷點(diǎn)一步一步跟蹤調(diào)試。

下面是我的測(cè)試用例,可以參考下,在IDEA中可以讓斷點(diǎn)只阻塞線(xiàn)程:

public class TestSynchronousQueue {
    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<Integer> queue = new SynchronousQueue<>(false);

        new Thread(()->{
            try {
                queue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        Thread.sleep(500);
        System.out.println(queue.take());
    }
}

修改斷點(diǎn)只阻塞線(xiàn)程的方法,右擊斷點(diǎn),選擇Thread:

死磕 java集合之SynchronousQueue源碼分析

交給你了

上面的源碼分析都是基于Stack的方式來(lái)分析的,那么隊(duì)列是怎么動(dòng)作的呢?很簡(jiǎn)單哦,測(cè)試用例中的false改成true就可以了,這就交給你了。

總結(jié)

(1)SynchronousQueue是java里的無(wú)緩沖隊(duì)列,用于在兩個(gè)線(xiàn)程之間直接移交元素;

(2)SynchronousQueue有兩種實(shí)現(xiàn)方式,一種是公平(隊(duì)列)方式,一種是非公平(棧)方式;

(3)棧方式中的節(jié)點(diǎn)有三種模式:生產(chǎn)者、消費(fèi)者、正在匹配中;

(4)棧方式的大致思路是如果棧頂元素跟自己一樣的模式就入棧并等待被匹配,否則就匹配,匹配到了就返回;

(5)隊(duì)列方式的大致思路是……不告訴你^^(兩者的邏輯差別還是挺大的)

彩蛋

(1)SynchronousQueue真的是無(wú)緩沖的隊(duì)列嗎?

通過(guò)源碼分析,我們可以發(fā)現(xiàn)其實(shí)SynchronousQueue內(nèi)部或者使用棧或者使用隊(duì)列來(lái)存儲(chǔ)包含線(xiàn)程和元素值的節(jié)點(diǎn),如果同一個(gè)模式的節(jié)點(diǎn)過(guò)多的話(huà),它們都會(huì)存儲(chǔ)進(jìn)來(lái),且都會(huì)阻塞著,所以,嚴(yán)格上來(lái)說(shuō),SynchronousQueue并不能算是一個(gè)無(wú)緩沖隊(duì)列。

(2)SynchronousQueue有什么缺點(diǎn)呢?

試想一下,如果有多個(gè)生產(chǎn)者,但只有一個(gè)消費(fèi)者,如果消費(fèi)者處理不過(guò)來(lái),是不是生產(chǎn)者都會(huì)阻塞起來(lái)?反之亦然。

這是一件很危險(xiǎn)的事,所以,SynchronousQueue一般用于生產(chǎn)、消費(fèi)的速度大致相當(dāng)?shù)那闆r,這樣才不會(huì)導(dǎo)致系統(tǒng)中過(guò)多的線(xiàn)程處于阻塞狀態(tài)。


歡迎關(guān)注我的公眾號(hào)“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。

死磕 java集合之SynchronousQueue源碼分析

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI