溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

LinkedTransferQueue 1.8 源碼解析

發(fā)布時(shí)間:2020-07-28 03:25:11 來(lái)源:網(wǎng)絡(luò) 閱讀:954 作者:wx5c78c8b1dbb1b 欄目:編程語(yǔ)言

[TOC]

LinkedTransferQueue 1.8 源碼解析

一,簡(jiǎn)介

LinkedTransferQueue 是一個(gè)由鏈表結(jié)構(gòu)組成的wujie阻塞傳輸隊(duì)列,它是一個(gè)很多隊(duì)列的結(jié)合體(ConcurrentLinkedQueue,LinkedBlockingQueue,SynchronousQueue),在除了有基本阻塞隊(duì)列的功能(但是這個(gè)阻塞隊(duì)列沒(méi)有使用鎖)之外;隊(duì)列實(shí)現(xiàn)了TransferQueue接口重寫(xiě)了tryTransfer和transfer方法,這組方法和SynchronousQueue公平模式的隊(duì)列類(lèi)似,具有匹配的功能。

二,UML圖

LinkedTransferQueue 1.8 源碼解析

三,基本成員
    // 是否是多核
    private static final boolean MP =
            Runtime.getRuntime().availableProcessors() > 1;

     // 自旋次數(shù)
    private static final int FRONT_SPINS   = 1 << 7;

     // 前驅(qū)節(jié)點(diǎn)正在處理,當(dāng)前節(jié)點(diǎn)需要自旋的次數(shù)
    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;

    // 容忍清除節(jié)點(diǎn)失敗次數(shù)的閾值
    static final int SWEEP_THRESHOLD = 32;

    static final class Node {
        // 表示存放數(shù)據(jù)還是獲取數(shù)據(jù)
        final boolean isData;   // false if this is a request node
        // 存放數(shù)據(jù)是item有值
        volatile Object item;   // initially non-null if isData; CASed to match
        // next節(jié)點(diǎn)
        volatile Node next;
        // 等待線程
        volatile Thread waiter;

        // 構(gòu)造
        Node(Object item, boolean isData) {
            UNSAFE.putObject(this, itemOffset, item); // relaxed write
            this.isData = isData;
        }
    }

    // 頭結(jié)點(diǎn)
    transient volatile Node head;

    // 尾節(jié)點(diǎn)
    private transient volatile Node tail;

     // xfer方法的入?yún)? 不同類(lèi)型的方法內(nèi)部調(diào)用xfer方法時(shí)入?yún)⒉煌?    private static final int NOW   = 0; // for untimed poll, tryTransfer
    private static final int ASYNC = 1; // for offer, put, add
    private static final int SYNC  = 2; // for transfer, take
    private static final int TIMED = 3; // for timed poll, tryTransfer

注意:xfer 者幾個(gè)參數(shù)很重要。

NOW: 表示的是立即,不需要等待的意思,用于poll和tryTransfer方法,poll 隊(duì)列為空返回,tryTransfer隊(duì)列沒(méi)有消費(fèi)者,直接返回,都是不等待的。

ASYNC:異步,offer, put, add等入隊(duì)方法,由于是×××隊(duì)列,所以不會(huì)阻塞。

SYNC:同步表示會(huì)阻塞,take一個(gè)元素,沒(méi)有就會(huì)阻塞,transfer傳輸,必須等待消費(fèi)者來(lái)消費(fèi)。

TIMED: 帶超時(shí)時(shí)間的now,會(huì)等待一定的時(shí)間后返回。

四,常用方法
構(gòu)造方法
    public LinkedTransferQueue() {
    }
NOW 相關(guān)方法
poll 方法
    // 隊(duì)尾彈出一個(gè)元素,沒(méi)有就返回null
    public E poll() {
        return xfer(null, false, NOW, 0);
    }
tryTransfer 方法
    // 立即轉(zhuǎn)交一個(gè)元素給消費(fèi)者,如果此時(shí)隊(duì)列沒(méi)有消費(fèi)者,那就false
    public boolean tryTransfer(E e) {
        return xfer(e, true, NOW, 0) == null;
    }
ASYNC 相關(guān)方法
offer方法
    public boolean offer(E e) {
        xfer(e, true, ASYNC, 0);
        return true;
    }
put 方法
    public void put(E e) {
        xfer(e, true, ASYNC, 0);
    }
add 方法
    public boolean add(E e) {
        xfer(e, true, ASYNC, 0);
        return true;
    }
SYNC 相關(guān)方法
transfer 方法
    // 轉(zhuǎn)交一個(gè)元素給消費(fèi)者,如果此時(shí)隊(duì)列沒(méi)有消費(fèi)者,那就阻塞
    public void transfer(E e) throws InterruptedException {
        if (xfer(e, true, SYNC, 0) != null) {
            // 清除方法
            Thread.interrupted(); // failure possible only due to interrupt
            throw new InterruptedException();
        }
    }
take 方法
    public E take() throws InterruptedException {
        E e = xfer(null, false, SYNC, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }
TIMED 相關(guān)方法
poll(long timeout, TimeUnit unit) 和 tryTransfer(E e, long timeout, TimeUnit unit)方法
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E e = xfer(null, false, TIMED, unit.toNanos(timeout));
        if (e != null || !Thread.interrupted())
            return e;
        throw new InterruptedException();
    }

    public boolean tryTransfer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
        if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
            return true;
        if (!Thread.interrupted())
            return false;
        throw new InterruptedException();
    }

我們可以看見(jiàn)上面所有的方法都是調(diào)用的xfer方法,下面我們來(lái)詳解下這個(gè)方法。

核心方法 xfer
    private E xfer(E e, boolean haveData, int how, long nanos) {
        // 插入元素,
        if (haveData && (e == null))
            throw new NullPointerException();
        Node s = null;                        // the node to append, if needed

        retry:
        for (;;) {   // 死循環(huán)                          // restart on append race

            // 從頭結(jié)點(diǎn)開(kāi)始匹配
            for (Node h = head, p = h; p != null;) { // find & match first node
                boolean isData = p.isData;   // 獲取節(jié)點(diǎn)的類(lèi)型
                Object item = p.item;       // item 的值
                // 兩種情況 1.put節(jié)點(diǎn) item != null  isData 為true 2.take item = null false isData false
                // 或者節(jié)點(diǎn)已經(jīng)被匹配了
                if (item != p && (item != null) == isData) { // unmatched // 節(jié)點(diǎn)沒(méi)有被匹配過(guò)
                    if (isData == haveData)   // can't match // 類(lèi)型一致,只能執(zhí)行入隊(duì)操作
                        break;
                    if (p.casItem(item, e)) { // match  匹配,可能存在多線程競(jìng)爭(zhēng)匹配
                        for (Node q = p; q != h;) { // 不是頭節(jié)點(diǎn)了,頭結(jié)點(diǎn)發(fā)生了改變,被匹配了,自己也匹配了,
                            // 下一個(gè)節(jié)點(diǎn)
                            Node n = q.next;  // update by 2 unless singleton
                            if (head == h && casHead(h, n == null ? q : n)) {
                                // 自關(guān)聯(lián) 節(jié)點(diǎn)不要了
                                h.forgetNext();
                                break;
                            }                 // advance and retry
                            // head 已經(jīng)被更新過(guò),或者更新head失敗,需要重新判斷
                            // h = head   == null,隊(duì)列為空
                            // (q = h.next) == null 最后一個(gè)節(jié)點(diǎn)
                            // 頭接單的下一個(gè)節(jié)點(diǎn)有沒(méi)有被匹配
                            // 說(shuō)明值有頭結(jié)點(diǎn)匹配了,頭結(jié)點(diǎn)的next節(jié)點(diǎn)也匹配了,才要更新頭結(jié)點(diǎn),優(yōu)化手段
                            if ((h = head)   == null ||
                                    (q = h.next) == null || !q.isMatched())
                                break;        // unless slack < 2
                        }
                        // 匹配成功
                        LockSupport.unpark(p.waiter);
                        return LinkedTransferQueue.<E>cast(item);
                    }
                }
                // 已經(jīng)匹配就往下走
                Node n = p.next;
                p = (p != n) ? n : (h = head); // Use head if p offlist
            }

         /*   // xfer方法的入?yún)? 不同類(lèi)型的方法內(nèi)部調(diào)用xfer方法時(shí)入?yún)⒉煌?            private static final int NOW   = 0; // for untimed poll, tryTransfer
            private static final int ASYNC = 1; // for offer, put, add
            private static final int SYNC  = 2; // for transfer, take
            private static final int TIMED = 3; // for timed poll, tryTransfer*/

            // 模式不同只能入隊(duì)啦
            if (how != NOW) {                 // No matches available
                if (s == null)
                    // 創(chuàng)建一個(gè)新節(jié)點(diǎn)
                    s = new Node(e, haveData);
                // tryAppend 給tail追加節(jié)點(diǎn)
                Node pred = tryAppend(s, haveData);
                // 不能添加到這個(gè)節(jié)點(diǎn) ,重新循環(huán)
                if (pred == null)
                    continue retry;
                // lost race vs opposite mode
                // ASYNC 添加成功返回了
                // SYNC  TIMED 需要阻塞線程
                if (how != ASYNC)
                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
            }
            // now 是立即返回
            return e; // not waiting
        }
    }

分析:

  • 從頭節(jié)點(diǎn)開(kāi)始匹配,判斷頭節(jié)點(diǎn)有沒(méi)有被匹配,或者頭節(jié)點(diǎn)的模式和入隊(duì)節(jié)點(diǎn)的模式是否相同。
  • 如果模式相同或者已經(jīng)被匹配了,就去走入隊(duì)或者出隊(duì)流程。
  • 如果模式不同,就可以匹配了,casItem設(shè)置item,完成數(shù)據(jù)的傳遞,然后判斷q != h,q發(fā)生變化說(shuō)明頭結(jié)點(diǎn)被別的線程匹配了,這里可能多個(gè)線程來(lái)匹配,所以頭節(jié)點(diǎn)是可能發(fā)生變化的,我們不是每一次都更新頭節(jié)點(diǎn),而是當(dāng)頭節(jié)點(diǎn)被匹配,頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)也被匹配才會(huì)更新頭節(jié)點(diǎn),這是一種優(yōu)化手段;當(dāng)我們匹配成功了,喚醒匹配的節(jié)點(diǎn)LockSupport.unpark(p.waiter),然后返回。
  • 我們?cè)賮?lái)看模式不同或者隊(duì)列為空時(shí),我們需要做的就是入隊(duì)操作,第一步判斷how 不是NOW,NOW對(duì)應(yīng)的方法是polltryTransfer ,是不會(huì)等待的,也不會(huì)入隊(duì)的,所以直接返回;接下來(lái)的幾種狀態(tài)都是要入隊(duì)的,所以創(chuàng)建一個(gè)s = new Node(e, haveData),然后調(diào)用tryAppend方法入隊(duì)追加到隊(duì)尾,返回前置節(jié)點(diǎn);此時(shí)在判斷how是ASYNC還是SYNCTIMEDASYNC不要等待所以直接返回,SYNC`TIMED是需要等待的,所以調(diào)用awaitMatch方法等待,直到匹配成功或者超時(shí)時(shí)間到了。
tryAppend 方法

入隊(duì)尾

private Node tryAppend(Node s, boolean haveData) {
        for (Node t = tail, p = t;;) {         // move p to last node and append 遍歷
            Node n, u;                        // temps for reads of next & tail
            if (p == null && (p = head) == null) { // 還沒(méi)有節(jié)點(diǎn)
                if (casHead(null, s))
                    return s;                 // initialize
            }
            // 是否符合入隊(duì)要求
            else if (p.cannotPrecede(haveData))
                return null;                  // lost race vs opposite mode
            // p.next 不為null,說(shuō)明p真正的尾節(jié)點(diǎn),p需要向后推進(jìn)
            else if ((n = p.next) != null)    // not last; keep traversing
                p = p != t && t != (u = tail) ? (t = u) : // stale tail
                        (p != n) ? n : null;      // restart if off list
            // p.next = null,說(shuō)明找到最后一個(gè)節(jié)點(diǎn)了,可以入隊(duì)了
            // 可能存在競(jìng)爭(zhēng),失敗,就繼續(xù)下一個(gè)節(jié)點(diǎn)
            else if (!p.casNext(null, s))
                p = p.next;                   // re-read on CAS failure
            else {
                // 入隊(duì)成功了
                if (p != t) {    // 說(shuō)明此時(shí)的入隊(duì)節(jié)點(diǎn)的前節(jié)點(diǎn)p和尾節(jié)點(diǎn)有距離 是否需要更新尾節(jié)點(diǎn)
                                // update if slack now >= 2
                    while ((tail != t || !casTail(t, s)) &&
                            (t = tail)   != null &&
                            (s = t.next) != null && // advance and retry
                            (s = s.next) != null && s != t);
                }
                return p;
            }
        }
    }
awaitMatch 方法
 private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Thread w = Thread.currentThread();
        int spins = -1; // initialized after first item and cancel checks
        ThreadLocalRandom randomYields = null; // bound if needed

        for (;;) {
            Object item = s.item;
            // 被匹配過(guò)了
            if (item != e) {                  // matched
                // assert item != s;
                s.forgetContents();           // avoid garbage
                return LinkedTransferQueue.<E>cast(item);
            }
            // 被中斷  超時(shí)時(shí)間到了
            if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                    s.casItem(e, s)) {        // cancel
                unsplice(pred, s);//
                return e;
            }
            // 初始化自旋
            if (spins < 0) {
                // establish spins at/near front
                //初始化自旋次數(shù),即計(jì)算自旋次數(shù)
                if ((spins = spinsFor(pred, s.isData)) > 0)
                    randomYields = ThreadLocalRandom.current();
            }
            // 自旋遞減
            else if (spins > 0) {             // spin
                --spins;
                if (randomYields.nextInt(CHAINED_SPINS) == 0)
                    Thread.yield();           // occasionally yield
            }
            // 自旋次數(shù)到了 就會(huì)阻塞
            // 設(shè)置阻塞線程
            else if (s.waiter == null) {
                s.waiter = w;                 // request unpark then recheck
            }
            // 超時(shí)阻塞
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos > 0L)
                    LockSupport.parkNanos(this, nanos);
            }
            // 阻塞
            else {
                LockSupport.park(this);
            }
        }
    }
五,總結(jié)

LinkedTransferQueue 是很多隊(duì)列的集合體,雖然方法基本一樣,但是實(shí)現(xiàn)卻是大大的不同,我們以前的阻塞隊(duì)列幾乎都是使用鎖來(lái)控制入隊(duì)和出隊(duì)的,LinkedTransferQueue 沒(méi)有使用鎖,入隊(duì)和出隊(duì)都是使用自旋加cas實(shí)現(xiàn)的,比鎖的消耗更低,使用了很多的優(yōu)化(控制自旋次數(shù)等),性能更高;隊(duì)列是wujie的,所以使用時(shí)一定要注意內(nèi)存的問(wèn)題。

參考《Java 并發(fā)編程的藝術(shù)》

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI