溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Java多線程的阻塞隊列怎么實現(xiàn)

發(fā)布時間:2022-01-06 16:38:37 來源:億速云 閱讀:149 作者:iii 欄目:云計算

這篇文章主要講解了“Java多線程的阻塞隊列怎么實現(xiàn)”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Java多線程的阻塞隊列怎么實現(xiàn)”吧!

阻塞隊列是什么?

首先了解隊列,隊列是數(shù)據(jù)先進先出的一種數(shù)據(jù)結(jié)構(gòu)。阻塞隊列,關(guān)鍵字是阻塞,先理解阻塞的含義,在阻塞隊列中,線程阻塞有這樣的兩種情況:

1.當阻塞隊列為空時,獲取隊列元素的線程將等待,直到該則塞隊列非空;2.當阻塞隊列變滿時,使用該阻塞隊列的線程會等待,直到該阻塞隊列變成非滿。

為什么要使用阻塞隊列?

在常見的情況下,生產(chǎn)者消費者模式需要用到隊列,生產(chǎn)者線程生產(chǎn)數(shù)據(jù),放進隊列,然后消費從隊列中獲取數(shù)據(jù),這個在單線程的情況下沒有問題。但是當多線程的情況下,某個特定時間下,(峰值高并發(fā))出現(xiàn)消費者速度遠大于生產(chǎn)者速度,消費者必須阻塞來等待生產(chǎn)者,以保證生產(chǎn)者能夠生產(chǎn)出新的數(shù)據(jù);當生產(chǎn)者速度遠大于消費者速度時,同樣也是一個道理。這些情況都要程序員自己控制阻塞,同時又要線程安全和運行效率。

阻塞隊列的出現(xiàn)使得程序員不需要關(guān)心這些細節(jié),比如什么時候阻塞線程,什么時候喚醒線程,這些都由阻塞隊列完成了。

阻塞隊列的主要方法

 阻塞隊列的方法,在不能立即滿足但可能在將來某一時刻滿足的情況下,按處理方式可以分為三類:

Java多線程的阻塞隊列怎么實現(xiàn)

拋出異常:拋出一個異常;

特殊值:返回一個特殊值(null或false,視情況而定)

則塞:在成功操作之前,一直阻塞線程

超時:放棄前只在最大的時間內(nèi)阻塞

工欲善其事必先利其器,學(xué)會用阻塞隊列,必須要知道它有哪些方法,怎么用,有哪些注意事項,這樣到真正使用的時候,就能少踩雷了。

首先介紹插入操作:

1.public abstract boolean add(E paramE);

 將指定元素插入此隊列中(如果立即可行且不會違反容量限制),成功時返回 true,如果當前沒有可用的空間,則拋出 IllegalStateException。

如果該元素是NULL,則會拋出NullPointerException異常。

2.public abstract boolean offer(E paramE);

將指定元素插入此隊列中(如果立即可行且不會違反容量限制),成功時返回 true,如果當前沒有可用的空間,則返回 false。

3.public abstract void put(E paramE) throws InterruptedException;

 將指定元素插入此隊列中,將等待可用的空間(如果有必要)

4.offer(E o, long timeout, TimeUnit unit)

可以設(shè)定等待的時間,如果在指定的時間內(nèi),還不能往隊列中加入BlockingQueue,則返回失敗。

獲取數(shù)據(jù)操作:

1.poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數(shù)規(guī)定的時間,取不到時返回null;

2.poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定時間內(nèi),隊列一旦有數(shù)據(jù)可取,則立即返回隊列中的數(shù)據(jù)。否則知道時間

超時還沒有數(shù)據(jù)可取,返回失敗。

3.take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態(tài)直到BlockingQueue有新的數(shù)據(jù)被加入; 

4.drainTo():一次性從BlockingQueue獲取所有可用的數(shù)據(jù)對象(還可以指定獲取數(shù)據(jù)的個數(shù)),通過該方法,可以提升獲取數(shù)據(jù)效率;不需要多次分批加鎖或釋放鎖。

重點方法重點介紹

首先來看put方法

public void put(E paramE) throws InterruptedException {
        checkNotNull(paramE);
        ReentrantLock localReentrantLock = this.lock;
        localReentrantLock.lockInterruptibly();
        try {
            while (this.count == this.items.length)
                this.notFull.await();
            enqueue(paramE);
            localReentrantLock.unlock();
        } finally {
            localReentrantLock.unlock();
        }
    }

一行一行來看代碼,首先進行空校驗。checkNotNull(paramE);

private static void checkNotNull(Object paramObject) {
        if (paramObject != null)
            return;
        throw new NullPointerException();
    }

這是一個私有方法,需要注意的就是如果put的參數(shù)為空,則拋出空指針異常。(這個很值得我們學(xué)習(xí),先進行空校驗,在維護的時候就很容易定位錯誤),接著 ReentrantLock localReentrantLock = this.lock;實例化鎖,這個ReentrantLock 在我之前的博客中也介紹過,可以共同探討一下。

下一行l(wèi)ocalReentrantLock.lockInterruptibly();這里特別強調(diào)一下:

lockInterruptibly()允許在等待時由其他線程的Thread.interrupt()方法來中斷等待線程而直接返回,這時是不用獲取鎖的,而會拋出一個InterruptException。而ReentrantLock.lock()方法則不允許Thread.interrupt()中斷,即使檢測到了Thread.interruptted一樣會繼續(xù)嘗試獲取鎖,失敗則繼續(xù)休眠。只是在最后獲取鎖成功之后在把當前線程置為interrupted狀態(tài)。

注意這里已經(jīng)鎖住,每次進行此操作時時候只有一個線程,回到代碼中,接著進行

while (this.count == this.items.length)
          this.notFull.await();

這里向我們說明了一個信息,當隊列滿的時候,將會等待。這里使用了private final Condition notFull;這個實例化的Condition,這個Condition用來控制隊列滿的等待。

 接著執(zhí)行了enqueue(paramE)方法,進入這個方法來繼續(xù)看

private void enqueue(E paramE) {
        Object[] arrayOfObject = this.items;
        arrayOfObject[this.putIndex] = paramE;
        if (++this.putIndex == arrayOfObject.length)
            this.putIndex = 0;
        this.count += 1;
        this.notEmpty.signal();
    }

來看第一行,Object[] arrayOfObject = this.items;這個items是在構(gòu)造器時候?qū)嵗?,final Object[] items = new Object[paramInt];將item賦值到arrayObject中

繼續(xù) arrayOfObject[this.putIndex] = paramE;將put方法傳入的參數(shù)賦值到arrayOfObject中,這里其實是items也改變了,因為java是值引用的緣故。

if (++this.putIndex == arrayOfObject.length)
            this.putIndex = 0;

如果這個偏移值+1之后等于數(shù)組的長度,那么偏移值變?yōu)?。this.count += 1;count值加1;這個count代表數(shù)組的總數(shù)。this.notEmpty.signal();喚醒被Condition notEmpty阻塞的方法,最后 localReentrantLock.unlock();解鎖(這個操作不能夠忘了)

這里不禁要問,是什么方法被阻塞了呢?帶著這個疑問來看take方法。

public E take() throws InterruptedException {
        ReentrantLock localReentrantLock = this.lock;
        localReentrantLock.lockInterruptibly();
        try {
            while (this.count == 0)
                this.notEmpty.await();
            Object localObject1 = dequeue();
            return localObject1;
        } finally {
            localReentrantLock.unlock();
        }
    }

 首先看前兩行,和put方法一樣先上鎖,使得每次持有本段代碼的時候只有一個線程。

while (this.count == 0)
   this.notEmpty.await();

當數(shù)組的數(shù)量為空時,也就是無任何數(shù)據(jù)供區(qū)出來的時候,notEmpty這個Condition就會進行阻塞,知道被notEmpty喚醒,還記得上文提到的嗎。就是在put方法中喚醒的,這里可以發(fā)現(xiàn),只要成功進行一個put操作,就會喚醒一次。

 繼續(xù)看代碼,接著執(zhí)行Object localObject1 = dequeue();獲取元素,跟進dequeue()方法繼續(xù):

private E dequeue() {
        Object[] arrayOfObject = this.items;
        Object localObject = arrayOfObject[this.takeIndex];
        arrayOfObject[this.takeIndex] = null;
        if (++this.takeIndex == arrayOfObject.length)
            this.takeIndex = 0;
        this.count -= 1;
        if (this.itrs != null)
            this.itrs.elementDequeued();
        this.notFull.signal();
        return localObject;
    }

Object[] arrayOfObject = this.items;進行值傳遞操作,takeIndex是取元素的時候的偏移值,由此可見,put和take操作的偏移量分別是由putIndex和takeIndex控制的。

Object localObject = arrayOfObject[this.takeIndex];取出在數(shù)組中的數(shù)據(jù),然后 arrayOfObject[this.takeIndex] = null;將原來位置的數(shù)據(jù)b變成null.

if (++this.takeIndex == arrayOfObject.length)
            this.takeIndex = 0;

如果當前的++takeIndex等于該數(shù)組的長度,則takeIndex賦值0,結(jié)合put方法,這兩個操作是用數(shù)組形成隊列操作。接著喚醒持有notFull這個Condition的線程。

方法就總結(jié)到這里,其實看put和take是有很多相似之處的,繼續(xù)看下一章節(jié)。

常見的阻塞隊列

首先來看這張圖,這個是阻塞隊列的繼承圖(雙端隊列,沒有列出來,沒有太大區(qū)別)

Java多線程的阻塞隊列怎么實現(xiàn)

主要有ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue,DelayQueue這個五個實現(xiàn)類。

在這五個阻塞隊列中,比較常用的是ArrayBlockingQueue,LinkedBlockingQueue,本文也會重點介紹這兩個類。

ArrayBlockingQueue

在上面的源碼分析中就是分析的ArrayBlockingQueue的源碼。數(shù)組阻塞隊列必須傳入的參數(shù)是數(shù)組大小,還可以指定是否公平性。公平性就是當隊列可用時,線程訪問隊列的順序按照它排隊時候的順序,非公平鎖則不按照這樣的順序,但是非公平隊列要比公平隊列執(zhí)行的速度快。

繼續(xù)看ArrayBlockingQueue其實是一個數(shù)組有界隊列,此隊列按照先進先出的原則維護數(shù)組中的元素順序,看源碼可知,是由兩個整形變量(上文提到的putIndex和takeIndex)分別指著頭和尾的位置。

LinkedBlockingQueue

LinkedBlockingQueue是基于鏈表的阻塞隊列,內(nèi)部維持的數(shù)據(jù)緩沖隊列是由鏈表組成的,也是按照先進先出的原則。

如果構(gòu)造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個類似無限大?。↖nteger.Max_VALUE)的容量,這樣的話,如果生產(chǎn)者的速度一旦大于消費者的速度,也許還沒有等到隊列滿阻塞產(chǎn)生,系統(tǒng)內(nèi)存就有可能已經(jīng)被消耗殆盡了。

LinkedBlockingQueue之所以能夠高效的處理并發(fā)數(shù)據(jù),是因為take()方法和put(E param)方法使用了不同的可重入鎖,分別為private final ReentrantLock putLock和private final ReentrantLock takeLock,這也意味著在高并發(fā)的情況下生產(chǎn)者和消費者可以并行地操作隊列中的數(shù)據(jù),以此來提高整個隊列的并發(fā)性能。

兩者對比

1.ArrayBlockingQueue在put,take操作使用了同一個鎖,兩者操作不能同時進行,而LinkedBlockingQueue使用了不同的鎖,put操作和take操作可同時進行。

2.ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在于,前者在插入或刪除元素時不會產(chǎn)生或銷毀任何額外的對象實例,而后者則會生成一個額外的Node對象,這在長時間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的系統(tǒng)中,其對于GC的影響還是存在一定的區(qū)別。

 其他還有優(yōu)先級阻塞隊列:PriorityBlockingQueue,延時隊列:DelayQueue,SynchronousQueue等,因為使用頻率較低,這里就不重點介紹了,有興趣的讀者可以深入研究。

用阻塞隊列實現(xiàn)生產(chǎn)者消費者

模擬洗盤子的經(jīng)歷,洗碗工洗好一個盤子放在工作臺上,然后廚師看到工作臺上有空余的盤子,便使用盤子。寫到代碼里就是洗碗工就是一個生產(chǎn)者線程,廚師就是消費者線程,工作臺就是阻塞隊列。

public class TestBlockingQueue {
    /**
     * 生產(chǎn)和消費業(yè)務(wù)操作
     * 
     * @author tang
     *
     */
    protected class WorkDesk {

        BlockingQueue<String> desk = new LinkedBlockingQueue<String>(10);

        public void washDish() throws InterruptedException {
            desk.put("洗好一個盤子");
        }

        public String useDish() throws InterruptedException {
            return desk.take();
        }
    }

    /**
     * 生產(chǎn)者類
     * 
     * @author tang
     *
     */
    class Producer implements Runnable {

        private String producerName;
        private WorkDesk workDesk;

        public Producer(String producerName, WorkDesk workDesk) {
            this.producerName = producerName;
            this.workDesk = workDesk;
        }

        @Override
        public void run() {
            try {
                for (;;) {
                    System.out.println(producerName + "洗好一個盤子");
                    workDesk.washDish();
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 消費者類
     * 
     * @author tang
     *
     */
    class Consumer implements Runnable {
        private String consumerName;
        private WorkDesk workDesk;

        public Consumer(String consumerName, WorkDesk workDesk) {
            this.consumerName = consumerName;
            this.workDesk = workDesk;
        }

        @Override
        public void run() {
            try {
                for (;;) {
                    System.out.println(consumerName + "使用一個盤子");
                    workDesk.useDish();
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String args[]) throws InterruptedException {
        TestBlockingQueue testQueue = new TestBlockingQueue();
        WorkDesk workDesk = testQueue.new WorkDesk();

        ExecutorService service = Executors.newCachedThreadPool();
        //四個生產(chǎn)者線程
        Producer producer1 = testQueue.new Producer("生產(chǎn)者-1-", workDesk);
        Producer producer2 = testQueue.new Producer("生產(chǎn)者-2-", workDesk);
        Producer producer3 = testQueue.new Producer("生產(chǎn)者-3-", workDesk);
        Producer producer4 = testQueue.new Producer("生產(chǎn)者-4-", workDesk);
        //兩個消費者線程
        Consumer consumer1 = testQueue.new Consumer("消費者-1-", workDesk);
        Consumer consumer2 = testQueue.new Consumer("消費者-2-", workDesk);
        
        service.submit(producer1);
        service.submit(producer2);
        service.submit(producer3);
        service.submit(producer4);
        service.submit(consumer1);
        service.submit(consumer2);

    }

}

查看打印結(jié)果:

Java多線程的阻塞隊列怎么實現(xiàn)

感謝各位的閱讀,以上就是“Java多線程的阻塞隊列怎么實現(xiàn)”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對Java多線程的阻塞隊列怎么實現(xiàn)這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI