溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

SynchronousQueue 1.8 源碼解析

發(fā)布時間:2020-08-04 09:57:03 來源:網(wǎng)絡(luò) 閱讀:971 作者:wx5c78c8b1dbb1b 欄目:編程語言

[TOC]

SynchronousQueue 1.8 源碼解析

一,簡介

SynchronousQueue 是一個很奇怪的隊列,感覺都不能叫隊列,因為內(nèi)部沒有數(shù)據(jù)的存儲空間,隊列不能peek,因為不存在元素,任何入隊的線程都會阻塞,直到有線程來出隊,也就是這個隊列是一組操作,入隊和出隊要一起離開,出隊也是一樣,必須等入隊,必須結(jié)伴而行;隊列支持公平和非公平的模式(指的是隊列匹配線程的順序),公平模式的數(shù)據(jù)結(jié)構(gòu)是隊列(FIFO),非公平模式使用的是棧(LIFO)。

二,UML 圖

SynchronousQueue 1.8 源碼解析

三,基本成員
abstract static class Transferer<E> {
    // 出隊入隊都是這一個方法
    abstract E transfer(E e, boolean timed, long nanos);
}

    // npu數(shù)
    static final int NCPUS = Runtime.getRuntime().availableProcessors();

    // 帶超時時間的自旋次數(shù)
    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;

    // 沒有超時的自旋次數(shù)
    static final int maxUntimedSpins = maxTimedSpins * 16;
TransferStack 非公平的實現(xiàn),主要成員

TransferStack 繼承 Transferer

注意:這幾個狀態(tài)很重要,因為繼承了Transferer,所以出隊和入隊都是使用的transfer方法,狀態(tài)是用來區(qū)分的,后面方法部分會詳解

        /** 0表示消費者 */
        static final int REQUEST    = 0;
        /** 1表示數(shù)據(jù)的生產(chǎn)者 */
        static final int DATA       = 1;
        /** 2 表示數(shù)據(jù)正在匹配 */
        static final int FULFILLING = 2;

        static final class SNode {
            volatile SNode next;        // 下一個節(jié)點
            volatile SNode match;       // 匹配的節(jié)點
            volatile Thread waiter;     // 等待的線程
            Object item;                // 數(shù)據(jù)
            int mode;                   // 模式 0 , 1 , 2
        }
    /** 頭結(jié)點 */
    volatile SNode head;
TransferQueue 公平實現(xiàn),主要成員

TransferQueue 繼承 Transferer

    static final class QNode {
            volatile QNode next;          // next 節(jié)點
            volatile Object item;         // 數(shù)據(jù)項
            volatile Thread waiter;       // 等待線程
            final boolean isData;         // 區(qū)分生產(chǎn)和消費
    }

        /** 頭結(jié)點 */
        transient volatile QNode head;
        /** 尾節(jié)點 */
        transient volatile QNode tail;
四,常用方法
構(gòu)造方法
    public SynchronousQueue() {
        this(false);
    }

    // 構(gòu)造方法,fair表示公平或者非公平
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
TransferStack 非公平常用方法
offer 方法
    public boolean offer(E e) {
        // e 不能為null
        if (e == null) throw new NullPointerException();
        return transferer.transfer(e, true, 0) != null;
    }

    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
            // e 不能為null
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
            return true;
        if (!Thread.interrupted())
            return false;
        throw new InterruptedException();
    }
put 方法
public void put(E e) throws InterruptedException {
    // e 不能為null
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            // 支持中斷
            Thread.interrupted();
            throw new InterruptedException();

    }
poll 方法
    public E poll() {
        return transferer.transfer(null, true, 0);
    }
take 方法
public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }
transfer 方法
    E transfer(E e, boolean timed, long nanos) {

            SNode s = null; // constructed/reused as needed
            //  根據(jù)所傳元素判斷為生產(chǎn)or消費
            int mode = (e == null) ? REQUEST : DATA;

            for (;;) { // 無限循環(huán)
                SNode h = head; // 獲取頭結(jié)點
                if (h == null || h.mode == mode) {  // 頭結(jié)點為空或者當前節(jié)點狀態(tài)(0,1,2)和頭結(jié)點相同
                    if (timed && nanos <= 0) {      // can't wait 設(shè)置有時間
                        // 節(jié)點不為null并且為取消狀態(tài)
                        if (h != null && h.isCancelled())
                            // 彈出取消的節(jié)點
                            casHead(h, h.next);     // pop cancelled node
                        else
                            // 超時直接返回null
                            return null;
                     // 沒有設(shè)置超時
                    } else if (casHead(h, s = snode(s, e, h, mode))) { // 將h設(shè)為自己的next節(jié)點
                        // 空旋或者阻塞直到s結(jié)點被FulFill操作所匹配
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {                // wait was cancelled 節(jié)點被取消了
                            clean(s);
                            return null;
                        }
                        // 找到匹配的線程了
                        // h == head 可能已經(jīng)已經(jīng)被匹配
                        // h.next 等于s 不同類型
                        if ((h = head) != null && h.next == s)
                            // 彈出h 和 s
                            casHead(h, s.next);     // help s's fulfiller
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                // 未匹配
                } else if (!isFulfilling(h.mode)) { // try to fulfill // 嘗試匹配節(jié)點
                    if (h.isCancelled())            // already cancelled // 節(jié)點被取消
                        casHead(h, h.next);         // pop and retry // 修改頭結(jié)點
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            // 沒有下一個節(jié)點了,結(jié)束這次循環(huán),走最外層循環(huán)重新開始
                            if (m == null) {        // all waiters are gone // m等于null
                                casHead(s, null);   // pop fulfill node // cas 設(shè)置head
                                s = null;           // use new node next time
                                break;              // restart main loop // 結(jié)束循環(huán)
                            }
                            SNode mn = m.next;
                            if (m.tryMatch(s)) {     // 嘗試匹配,成功
                                casHead(s, mn);     // pop both s and m
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  // lost match // 失敗,說明m背的線程匹配了,或者取消了
                                s.casNext(m, mn);   // help unlink //  修改next節(jié)點
                        }
                    }
                } else {                            // help a fulfiller 正在匹配
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // waiter is gone 匹配完成了
                        casHead(h, null);           // pop fulfilling node
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }
awaitFulfill 方法
    SNode awaitFulfill(SNode s, boolean timed, long nanos) {

            // 計算時間
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            // 獲取當前線程
            Thread w = Thread.currentThread();
            // shouldSpin控制自旋
            // shouldSpin 用于檢測當前節(jié)點是否需要自旋
            // 如果棧為空、該節(jié)點是首節(jié)點或者該節(jié)點是匹配節(jié)點,則先采用自旋,否則阻塞
            int spins = (shouldSpin(s) ?
                    (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) { // 死循環(huán)
                if (w.isInterrupted()) // 線程被中斷
                    s.tryCancel();
                SNode m = s.match;
                if (m != null)  // 存在匹配節(jié)點 ,返回
                    return m;
                if (timed) { // 存在超時設(shè)置
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel();
                        continue;
                    }
                }
                // 自旋;每次自旋的時候都需要檢查自身是否滿足自旋條件,滿足就 - 1,否則為0
                if (spins > 0)
                    spins = shouldSpin(s) ? (spins-1) : 0;
                // 設(shè)置node的線程
                else if (s.waiter == null)
                    s.waiter = w; // establish waiter so can park next iter
                // 如果不是超時,就阻塞
                else if (!timed)
                    LockSupport.park(this);
                // 設(shè)置超時阻塞
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }
clean 方法
    void clean(SNode s) {
            s.item = null;   // forget item
            s.waiter = null; // forget thread

            // next節(jié)點
            SNode past = s.next;
            // next節(jié)點也被中斷了,直接刪除
            if (past != null && past.isCancelled())
                past = past.next;

            // Absorb cancelled nodes at head
            // 從棧頂開始找,清除取消的節(jié)點
            SNode p;
            while ((p = head) != null && p != past && p.isCancelled())
                casHead(p, p.next);

            // Unsplice embedded nodes
            // 從有效的頭節(jié)點開始p ,到s的后繼節(jié)點,繼續(xù)清除
            while (p != null && p != past) {
                SNode n = p.next;
                if (n != null && n.isCancelled())
                    p.casNext(n, n.next);
                else
                    p = n;
            }
        }

分析transfer 方法:

  • 我們可以發(fā)現(xiàn)transfer 是通過e是空來判斷是offer方法還是poll方法的,也就是入隊者和出對者的區(qū)分。

  • 第一種情況,如果隊列為空head,或者隊列存在的head節(jié)點和自己的模式相同,首先判斷有沒有超時或者取消,有就執(zhí)行這些操作,沒有就執(zhí)行入隊操作,然后把新加入的節(jié)點加入棧頂,然后調(diào)用awaitFulfill方法阻塞線程,直到被中斷,超時或者匹配成功,為什么要阻塞了因為大家的模式都相同沒法匹配,所以只能阻塞線程,直到一個不同模式的線程匹配成功,喚醒自己,這也是awaitFulfill方法的結(jié)束流程。

  • 第二種情況,如果入隊的模式不同,通過isFulfilling方法判斷head節(jié)點有沒有在匹配,沒有就執(zhí)行匹配流程,

    首先判斷節(jié)點是否被取消了,沒有在判斷自己是不是唯一的一個節(jié)點,如果是循環(huán),重新開始流程,如果不是上面的這些情況,就可以開始匹配節(jié)點了,調(diào)用tryMatch方法,成功喚醒另一個節(jié)點,然后一起出棧,返回結(jié)果,失敗就向后推進,找下一個節(jié)點,這里可能別的線程會競爭的匹配。

  • 第三種情況,入隊的模式不同,但是head節(jié)點正在匹配,那就幫助它匹配完成,然后重新走整個循環(huán)。
TransferQueue 公平常用方法

入隊和出隊的方法是一樣的,我們主要看下transfer 方法吧。

transfer 方法
    E transfer(E e, boolean timed, long nanos) {

            QNode s = null; // constructed/reused as needed
            // 判斷是生產(chǎn)者 還是消費者
            boolean isData = (e != null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
                // 沒有初始化
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin
                // h==t 剛剛初始化 t.isData == isData,尾節(jié)點和當前節(jié)點的類型一樣
                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next; // 獲取尾節(jié)點
                    if (t != tail)      // 尾節(jié)點發(fā)生變了變化             // inconsistent read
                        continue;
                    if (tn != null) {   // 重設(shè)為節(jié)點             // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)  // 超時了       // can't wait
                        return null;
                    if (s == null) // 構(gòu)建新節(jié)點
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))       // failed to link in 有競爭
                        continue;

                    // 替換尾節(jié)點
                    advanceTail(t, s);              // swing tail and wait
                    // 自旋/阻塞 返回的是中斷取消/匹配的節(jié)點
                    Object x = awaitFulfill(s, e, timed, nanos);
                    // 中斷
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }

                    // 匹配成功了,需要執(zhí)行出隊操作
                    if (!s.isOffList()) {
                        // not already unlinked
                        // 修改頭結(jié)點
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {                            // complementary-mode
                    // 出隊從頭元素開始
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head) // 隊列發(fā)生變化重來
                        continue;                   // inconsistent read

                    Object x = m.item;
                    // isData == (x != null) 判斷模式是否相同,不相同才能匹配
                    // x == m 說明已經(jīng)被中斷或者超時了
                    // m.casItem(x, e) 匹配
                    if (isData == (x != null) ||    // m already fulfilled
                            x == m ||                   // m cancelled
                            !m.casItem(x, e)) {// lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }
                    // 匹配成功
                    // 替換頭
                    advanceHead(h, m);              // successfully fulfilled
                    // 喚醒等待線程
                    LockSupport.unpark(m.waiter); // 喚醒線程
                    return (x != null) ? (E)x : e;
                }
            }
        }

分析transfer方法:

  • 也是通過e來判斷是入隊還是出隊的,都是調(diào)用transfer方法,transfer方法可以看出兩部分,入隊和匹配。
  • 第一部分入隊,入隊的模式也是相同的,入隊是從尾節(jié)點開始,獲取尾節(jié)點,判斷尾節(jié)點有沒有發(fā)生變化,可能存在多線程的情況,發(fā)生改變就重新遍歷,沒有就判斷尾節(jié)點有沒有next節(jié)點,有就說明別的線程添加了新的節(jié)點,需要更新尾節(jié)點,然后構(gòu)造新的節(jié)點加入當前尾節(jié)點的next節(jié)點,更新尾節(jié)點,然后調(diào)用awaitFulfill阻塞當前節(jié)點,直到中斷,超時或者匹配,然后清除匹配成功的節(jié)點,調(diào)用clean方法。
  • 第二部分匹配(出隊),出隊是從頭節(jié)點開始,然后判斷模式是否不同,是否被取消,cas設(shè)置item,其實也就是數(shù)據(jù)的傳遞,如果匹配成功,喚醒等待在m的線程,這里注意把m設(shè)置成了頭結(jié)點,其實就是把m節(jié)點彈出了,因為我們匹配取得頭結(jié)點的next節(jié)點。
五,總結(jié)

SynchronousQueue 的實現(xiàn)還是很復(fù)雜的,我們可以發(fā)現(xiàn)雖然是個阻塞隊列,可是沒有使用鎖;這個隊列適合傳遞的場景,隊列沒有存儲元素的隊列,出隊和入隊必須結(jié)伴而行。

參考 《Java 并發(fā)編程的藝術(shù)》

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI