溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Java阻塞隊(duì)列SynchronousQueue實(shí)例分析

發(fā)布時(shí)間:2021-09-03 21:24:32 來(lái)源:億速云 閱讀:135 作者:chen 欄目:開(kāi)發(fā)技術(shù)

這篇文章主要講解了“Java阻塞隊(duì)列SynchronousQueue實(shí)例分析”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Java阻塞隊(duì)列SynchronousQueue實(shí)例分析”吧!

目錄
  • 分析

其實(shí)SynchronousQueue 是一個(gè)特別有意思的阻塞隊(duì)列,就我個(gè)人理解來(lái)說(shuō),它很重要的特點(diǎn)就是沒(méi)有容量。

直接看一個(gè)例子:

package dongguabai.test.juc.test;

import java.util.concurrent.SynchronousQueue;

/**
 * @author Dongguabai
 * @description
 * @date 2021-09-01 21:52
 */
public class TestSynchronousQueue {

    public static void main(String[] args) {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        boolean add = synchronousQueue.add("1");
        System.out.println(add);
    }
}

代碼很簡(jiǎn)單,就是往 SynchronousQueue 里放了一個(gè)元素,程序卻拋異常了:

Exception in thread "main" java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(AbstractQueue.java:98)
	at dongguabai.test.juc.test.TestSynchronousQueue.main(TestSynchronousQueue.java:14)

而異常原因是隊(duì)列滿了。剛剛使用的是 SynchronousQueue#add 方法,現(xiàn)在來(lái)看看 SynchronousQueue#put 方法:

public static void main(String[] args) throws InterruptedException {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        synchronousQueue.put("1");
        System.out.println("----");
    }

看到 InterruptedException 其實(shí)就能猜出這個(gè)方法肯定會(huì)阻塞當(dāng)前線程。

通過(guò)這兩個(gè)例子,也就解釋了 SynchronousQueue 隊(duì)列是沒(méi)有容量的,也就是說(shuō)在往 SynchronousQueue 中添加元素之前,得先向 SynchronousQueue 中取出元素,這句話聽(tīng)著很別扭,那可以換個(gè)角度猜想其實(shí)現(xiàn)原理,調(diào)用取出方法的時(shí)候設(shè)置了一個(gè)“已經(jīng)有線程在等待取出”的標(biāo)識(shí),線程等待,然后添加元素的時(shí)候,先看這個(gè)標(biāo)識(shí),如果有線程在等待取出,則添加成功,反之則拋出異常或者阻塞。

分析

接下來(lái)從 SynchronousQueue#put 方法開(kāi)始進(jìn)行分析:

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

可以發(fā)現(xiàn)是調(diào)用的 Transferer#transfer 方法,這個(gè) Transferer 是在構(gòu)造 SynchronousQueue 的時(shí)候初始化的:

public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

SynchronousQueue 有兩種模式,公平與非公平,默認(rèn)是非公平,非公平使用的就是 TransferStack,是基于單向鏈表做的:

 static final class SNode {
            volatile SNode next;        // next node in stack
            volatile SNode match;       // the node matched to this
            volatile Thread waiter;     // to control park/unpark
            Object item;                // data; or null for REQUESTs
            int mode;
   ...
 }

那么重點(diǎn)就是 SynchronousQueue.TransferStack#transfer 方法了,從方法名都可以看出這是用來(lái)做數(shù)據(jù)交換的,但是這個(gè)方法有好幾十行,里面各種 Node 指針搞來(lái)搞去,這個(gè)地方我覺(jué)得沒(méi)必要過(guò)于糾結(jié)細(xì)節(jié),老規(guī)矩,抓大放小,而且隊(duì)列這種,很方便進(jìn)行 Debug 調(diào)試。

再理一下思路:

  • 今天研究的是阻塞隊(duì)列,關(guān)注阻塞的話,更應(yīng)該關(guān)系的是 takeput 方法;

  • Transferer 是一個(gè)抽象類,只有一個(gè) transfer 方法,即 takeput 共用,那就肯定是基于入?yún)⑦M(jìn)行功能的區(qū)分;

  • takeput 方法底層都調(diào)用的 SynchronousQueue.TransferStack#transfer 方法;

將上面 SynchronousQueue#put 使用的例子修改一下,再加一個(gè)線程take

package dongguabai.test.juc.test;

import java.util.Date;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguabai
 * @description
 * @date 2021-09-01 21:52
 */
public class TestSynchronousQueue {

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        new Thread(()->{
            System.out.println(new Date().toLocaleString()+"::"+Thread.currentThread().getName()+"-put了數(shù)據(jù):"+"1");

            try {
                synchronousQueue.put("1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        System.out.println("----");
        new Thread(()->{
            Object take = null;
            try {
                take = synchronousQueue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(new Date().toLocaleString()+"::"+Thread.currentThread().getName()+"-take到了數(shù)據(jù):"+take);
        }).start();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("結(jié)束...");
    }
}

整個(gè)程序結(jié)束,并且輸出:

----
2021-9-2 0:58:55::Thread-0-put了數(shù)據(jù):1
2021-9-2 0:58:55::Thread-1-take到了數(shù)據(jù):1
結(jié)束...

也就是說(shuō)當(dāng)一個(gè)線程在 put 的時(shí)候,如果有線程 take ,那么 put 線程可以正常運(yùn)行,不會(huì)被阻塞。

基于這個(gè)例子,再結(jié)合上文的猜想,也就是說(shuō)核心點(diǎn)就是找到 put 的時(shí)候現(xiàn)在已經(jīng)有線程在 take 的標(biāo)識(shí),或者 take 的時(shí)候已經(jīng)有線程在 put,這個(gè)標(biāo)識(shí)不一定是變量,結(jié)合 AQS 的原理來(lái)看,很可能是根據(jù)鏈表中的 Node 進(jìn)行判斷。

接下來(lái)看 SynchronousQueue.put 方法:

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

它底層也是調(diào)用的 SynchronousQueue.TransferStack#transfer 方法,但是傳入?yún)?shù)是當(dāng)前 put 的元素、false 和 0。再回過(guò)頭看 SynchronousQueue.TransferStack#transfer 方法:

E transfer(E e, boolean timed, long nanos) {
            SNode s = null; // constructed/reused as needed
  					//這里的參數(shù)e就是要put的元素,顯然不為null,也就是說(shuō)是DATA模式,根據(jù)注釋,DATA模式就說(shuō)明當(dāng)前線程是producer
            int mode = (e == null) ? REQUEST : DATA;  

            for (;;) {
                SNode h = head;
                if (h == null || h.mode == mode) {  // empty or same-mode
                    if (timed && nanos <= 0) {      // can't wait
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null;
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
                        //因?yàn)榈谝淮蝡ut那么h肯定為null,這里入?yún)imed為false,所以會(huì)到這里,執(zhí)行awaitFulfill方法,根據(jù)名稱可以猜想出是一個(gè)阻塞方法
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                   ....
        }

這里首先會(huì)構(gòu)造一個(gè) SNode,然后執(zhí)行 casHead 函數(shù),其實(shí)最終棧結(jié)構(gòu)就是:

head->put_e

就是 head 會(huì)指向 put 的元素對(duì)應(yīng)的 SNode。

然后會(huì)執(zhí)行 awaitFulfill 方法:

SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            int spins = (shouldSpin(s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel();
                SNode m = s.match;
                if (m != null)
                    return m;
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel();
                        continue;
                    }
                }
                if (spins > 0)
                    spins = shouldSpin(s) ? (spins-1) : 0;    //自旋機(jī)制
                else if (s.waiter == null)
                    s.waiter = w; // establish waiter so can park next iter
                else if (!timed)
                    LockSupport.park(this); //阻塞
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }

最終還是會(huì)使用 LockSupport 進(jìn)行阻塞,等待喚醒。

已經(jīng)大致過(guò)了一遍流程了,細(xì)節(jié)方面就不再糾結(jié)了,那么假如再put 一個(gè)元素呢,其實(shí)結(jié)合源碼已經(jīng)可以分析出此時(shí)棧的結(jié)果為:

head-->put_e_1-->put_e

避免分析出錯(cuò),寫個(gè) Debug 的代碼驗(yàn)證一下:

package dongguabai.test.juc.test;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguabai
 * @description
 * @date 2021-09-02 02:15
 */
public class DebugPut2E {

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        new Thread(()-> {
            try {
                synchronousQueue.put("1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(()-> {
            try {
                synchronousQueue.put("2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

SynchronousQueue.TransferStack#awaitFulfill 方法的 LockSupport.park(this); 處打上斷點(diǎn),運(yùn)行上面的代碼,再看看現(xiàn)在的 head

Java阻塞隊(duì)列SynchronousQueue實(shí)例分析

的確與分析的一致。

也就是先進(jìn)后出。再看 take 方法:

public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

調(diào)用的 SynchronousQueue.TransferStack#transfer 方法,但是傳入?yún)?shù)是 nullfalse 和 0。

偷個(gè)懶就不分析源碼了,直接 Debug 走一遍,代碼如下:

package dongguabai.test.juc.test;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguabai
 * @description
 * @date 2021-09-02 02:24
 */
public class DebugTake {

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        new Thread(()-> {
            try {
                synchronousQueue.put("1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread-put-1").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(()-> {
            try {
                synchronousQueue.put("2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread-put-2").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(()->{
            try {
                Object take = synchronousQueue.take();
                System.out.println("======take:"+take);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread-Take").start();
    }
}

SynchronousQueue#take 方法中打上斷點(diǎn),運(yùn)行上面的代碼:

Java阻塞隊(duì)列SynchronousQueue實(shí)例分析

這里的 s 就是 head,m 就是棧頂?shù)脑?,也是最近一?put 的元素。說(shuō)白了 take 就是取的棧頂?shù)脑?,最后再匹配一下,符合條件就直接取出來(lái)。take 之后 head 為:

Java阻塞隊(duì)列SynchronousQueue實(shí)例分析

棧的結(jié)構(gòu)為:

head-->put_e

最后再把整個(gè)流程梳理一遍:

執(zhí)行 put 操作的時(shí)候,每次壓入棧頂;take 的時(shí)候就取棧頂?shù)脑?,即先進(jìn)后出;這也就實(shí)現(xiàn)了非公平;

至于公平模式,結(jié)合 TransferStack 的實(shí)現(xiàn),可以猜測(cè)實(shí)現(xiàn)就是 put 的時(shí)候放入隊(duì)列,take 的時(shí)候從隊(duì)列頭部開(kāi)始取,先進(jìn)先出。

那么這個(gè)隊(duì)列設(shè)計(jì)的優(yōu)勢(shì)使用場(chǎng)景在哪里呢?個(gè)人感覺(jué)它的優(yōu)勢(shì)就是完全不會(huì)產(chǎn)生對(duì)隊(duì)列中數(shù)據(jù)的爭(zhēng)搶,因?yàn)檎f(shuō)白了隊(duì)列是空的,從某種程度上來(lái)說(shuō)消費(fèi)速率是很快的。

至于使用場(chǎng)景,我這邊的確沒(méi)有想到比較好的使用場(chǎng)景。結(jié)合組內(nèi)同學(xué)的使用來(lái)看,他選擇使用這個(gè)隊(duì)列的原因是因?yàn)樗粫?huì)在內(nèi)存中生成任務(wù)隊(duì)列,當(dāng)服務(wù)宕機(jī)后不用擔(dān)心內(nèi)存中任務(wù)的丟失(非優(yōu)雅停機(jī)的情況)。經(jīng)過(guò)討論后發(fā)現(xiàn)即使使用了 SynchronousQueue 也無(wú)法有效的避免任務(wù)丟失,但這的確是一個(gè)思路,沒(méi)準(zhǔn)以后在其他場(chǎng)景中用得上。

感謝各位的閱讀,以上就是“Java阻塞隊(duì)列SynchronousQueue實(shí)例分析”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Java阻塞隊(duì)列SynchronousQueue實(shí)例分析這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI