溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Java多線程Queue、BlockingQueue和使用BlockingQueue實現(xiàn)生產(chǎn)消費者模型方法解析

發(fā)布時間:2020-09-26 03:03:29 來源:腳本之家 閱讀:164 作者:mengwei 欄目:編程語言

Queue是什么

隊列,是一種數(shù)據(jù)結(jié)構(gòu)。除了優(yōu)先級隊列和LIFO隊列外,隊列都是以FIFO(先進先出)的方式對各個元素進行排序的。無論使用哪種排序方式,隊列的頭都是調(diào)用remove()或poll()移除元素的。在FIFO隊列中,所有新元素都插入隊列的末尾。

Queue中的方法

Queue中的方法不難理解,6個,每2對是一個也就是總共3對。看一下JDKAPI就知道了:

Java多線程Queue、BlockingQueue和使用BlockingQueue實現(xiàn)生產(chǎn)消費者模型方法解析

注意一點就好,Queue通常不允許插入Null,盡管某些實現(xiàn)(比如LinkedList)是允許的,但是也不建議。

BlockingQueue

1、BlockingQueue概述

BlockingQueue也是java.util.concurrent下的主要用來控制線程同步的工具。

BlockingQueue有四個具體的實現(xiàn)類,根據(jù)不同需求,選擇不同的實現(xiàn)類

1、ArrayBlockingQueue:一個由數(shù)組支持的有界阻塞隊列,規(guī)定大小的BlockingQueue,其構(gòu)造函數(shù)必須帶一個int參數(shù)來指明其大小.其所含的對象是以FIFO(先入先出)順序排序的。

2、LinkedBlockingQueue:大小不定的BlockingQueue,若其構(gòu)造函數(shù)帶一個規(guī)定大小的參數(shù),生成的BlockingQueue有大小限制,若不帶大小參數(shù),所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的對象是以FIFO(先入先出)順序排序的。

3、PriorityBlockingQueue:類似于LinkedBlockQueue,但其所含對象的排序不是FIFO,而是依據(jù)對象的自然排序順序或者是構(gòu)造函數(shù)的Comparator決定的順序。

4、SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的。

LinkedBlockingQueue可以指定容量,也可以不指定,不指定的話,默認最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在隊列滿的時候會阻塞直到有隊列成員被消費,take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。

講BlockingQueue,因為BlockingQueue是Queue中的一個重點,并且通過BlockingQueue我們再次加深對于生產(chǎn)者/消費者模型的理解。其他的Queue都不難,通過查看JDKAPI和簡單閱讀源碼完全可以理解他們的作用。

BlockingQueue,顧名思義,阻塞隊列。BlockingQueue是在java.util.concurrent下的,因此不難理解,BlockingQueue是為了解決多線程中數(shù)據(jù)高效安全傳輸而提出的。

多線程中,很多場景都可以使用隊列實現(xiàn),比如經(jīng)典的生產(chǎn)者/消費者模型,通過隊列可以便利地實現(xiàn)兩者之間數(shù)據(jù)的共享,定義一個生產(chǎn)者線程,定義一個消費者線程,通過隊列共享數(shù)據(jù)就可以了。

當然現(xiàn)實不可能都是理想的,比如消費者消費速度比生產(chǎn)者生產(chǎn)的速度要快,那么消費者消費到一定程度上的時候,必須要暫停等待一下了(使消費者線程處于WAITING狀態(tài))。BlockingQueue的提出,就是為了解決這個問題的,他不用程序員去控制這些細節(jié),同時還要兼顧效率和線程安全。

阻塞隊列所謂的"阻塞",指的是某些情況下線程會掛起(即阻塞),一旦條件滿足,被掛起的線程又會自動喚醒。使用BlockingQueue,不需要關(guān)心什么時候需要阻塞線程,什么時候需要喚醒線程,這些內(nèi)容BlockingQueue都已經(jīng)做好了

2、BlockingQueue中的方法

BlockingQueue既然是Queue的子接口,必然有Queue中的方法,上面已經(jīng)列了??匆幌翨lockingQueue中特有的方法:

(1)voidput(Ee)throwsInterruptedException

把e添加進BlockingQueue中,如果BlockingQueue中沒有空間,則調(diào)用線程被阻塞,進入等待狀態(tài),直到BlockingQueue中有空間再繼續(xù)

(2)voidtake()throwsInterruptedException

取走BlockingQueue里面排在首位的對象,如果BlockingQueue為空,則調(diào)用線程被阻塞,進入等待狀態(tài),直到BlockingQueue有新的數(shù)據(jù)被加入

(3)intdrainTo(Collection<?superE>c,intmaxElements)

一次性取走BlockingQueue中的數(shù)據(jù)到c中,可以指定取的個數(shù)。通過該方法可以提升獲取數(shù)據(jù)效率,不需要多次分批加鎖或釋放鎖

3、ArrayBlockingQueue

基于數(shù)組的阻塞隊列,必須指定隊列大小。比較簡單。ArrayBlockingQueue中只有一個ReentrantLock對象,這意味著生產(chǎn)者和消費者無法并行運行(見下面的代碼)。另外,創(chuàng)建ArrayBlockingQueue時,可以指定ReentrantLock是否為公平鎖,默認采用非公平鎖。

/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

4、LinkedBlockingQueue

基于鏈表的阻塞隊列,和ArrayBlockingQueue差不多。不過LinkedBlockingQueue如果不指定隊列容量大小,會默認一個類似無限大小的容量,之所以說是類似是因為這個無限大小是Integer.MAX_VALUE,這么說就好理解ArrayBlockingQueue為什么必須要制定大小了,如果ArrayBlockingQueue不指定大小的話就用Integer.MAX_VALUE,那將造成大量的空間浪費,但是基于鏈表實現(xiàn)就不一樣的,一個一個節(jié)點連起來而已。另外,LinkedBlockingQueue生產(chǎn)者和消費者都有自己的鎖(見下面的代碼),這意味著生產(chǎn)者和消費者可以"同時"運行。

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

5、SynchronousQueue

比較特殊,一種沒有緩沖的等待隊列。什么叫做沒有緩沖區(qū),ArrayBlocking中有:

/** The queued items */
private final E[] items;

數(shù)組用以存儲隊列。LinkedBlockingQueue中有:

/**
 * Linked list node class
 */
static class Node<E> {
  /** The item, volatile to ensure barrier separating write and read */
  volatile E item;
  Node<E> next;
  Node(E x) { item = x; }
}

將隊列以鏈表形式連接。

生產(chǎn)者/消費者操作數(shù)據(jù)實際上都是通過這兩個"中介"來操作數(shù)據(jù)的,但是SynchronousQueue則是生產(chǎn)者直接把數(shù)據(jù)給消費者(消費者直接從生產(chǎn)者這里拿數(shù)據(jù)),好像又回到了沒有生產(chǎn)者/消費者模型的老辦法了。換句話說,每一個插入操作必須等待一個線程對應(yīng)的移除操作。SynchronousQueue又有兩種模式:

1、公平模式

采用公平鎖,并配合一個FIFO隊列(Queue)來管理多余的生產(chǎn)者和消費者

2、非公平模式

采用非公平鎖,并配合一個LIFO棧(Stack)來管理多余的生產(chǎn)者和消費者,這也是SynchronousQueue默認的模式

利用BlockingQueue實現(xiàn)生產(chǎn)者消費者模型

上一篇我們寫的生產(chǎn)者消費者模型有局限,局限體現(xiàn)在:

緩沖區(qū)內(nèi)只能存放一個數(shù)據(jù),實際生產(chǎn)者/消費者模型中的緩沖區(qū)內(nèi)可以存放大量生產(chǎn)者生產(chǎn)出來的數(shù)據(jù)
生產(chǎn)者和消費者處理數(shù)據(jù)的速度幾乎一樣
OK,我們就用BlockingQueue來簡單寫一個例子,并且讓生產(chǎn)者、消費者處理數(shù)據(jù)速度不同。子類選擇的是ArrayBlockingQueue,大小定為10:

public static void main(String[] args)
{
  final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(10);
  Runnable producerRunnable = new Runnable()
  {
    int i = 0;
    public void run()
    {
      while (true)
      {
        try
        {
          System.out.println("我生產(chǎn)了一個" + i++);
          bq.put(i + "");
          Thread.sleep(1000);
        } 
        catch (InterruptedException e)
        {
          e.printStackTrace();
        }
      }
    }
  };
  Runnable customerRunnable = new Runnable()
  {
    public void run()
    {
      while (true)
      {
        try
        {
          System.out.println("我消費了一個" + bq.take());
          Thread.sleep(3000);
        } 
        catch (InterruptedException e)
        {
          e.printStackTrace();
        }
      }
    }
  };
  Thread producerThread = new Thread(producerRunnable);
  Thread customerThread = new Thread(customerRunnable);
  producerThread.start();
  customerThread.start();
}

代碼的做法是讓生產(chǎn)者生產(chǎn)速度快于消費者消費速度的,看一下運行結(jié)果:

我生產(chǎn)了一個0
我消費了一個1
我生產(chǎn)了一個1
我生產(chǎn)了一個2
我消費了一個2
我生產(chǎn)了一個3
我生產(chǎn)了一個4
我生產(chǎn)了一個5
我消費了一個3
我生產(chǎn)了一個6
我生產(chǎn)了一個7
我生產(chǎn)了一個8
我消費了一個4
我生產(chǎn)了一個9
我生產(chǎn)了一個10
我生產(chǎn)了一個11
我消費了一個5
我生產(chǎn)了一個12
我生產(chǎn)了一個13
我生產(chǎn)了一個14
我消費了一個6
我生產(chǎn)了一個15
我生產(chǎn)了一個16
我消費了一個7
我生產(chǎn)了一個17
我消費了一個8
我生產(chǎn)了一個18

分兩部分來看輸出結(jié)果:

1、第1行~第23行。這塊BlockingQueue未滿,所以生產(chǎn)者隨便生產(chǎn),消費者隨便消費,基本上都是生產(chǎn)3個消費1個,消費者消費速度慢

2、第24行~第27行,從前面我們可以看出,生產(chǎn)到16,消費到6,說明到了ArrayBlockingQueue的極限10了,這時候沒辦法,生產(chǎn)者生產(chǎn)一個ArrayBlockingQueue就滿了,所以不能繼續(xù)生產(chǎn)了,只有等到消費者消費完才可以繼續(xù)生產(chǎn)。所以之后的打印內(nèi)容一定是一個生產(chǎn)者、一個消費者

這就是前面一章開頭說的"通過平衡生產(chǎn)者和消費者的處理能力來提高整體處理數(shù)據(jù)的速度",這給例子應(yīng)該體現(xiàn)得很明顯。另外,也不要擔心非單一生產(chǎn)者/消費者場景下的系統(tǒng)假死問題,緩沖區(qū)空、緩沖區(qū)滿的場景BlockingQueue都是定義了不同的Condition,所以不會喚醒自己的同類。

總結(jié)

以上就是本文關(guān)于Java多線程Queue、BlockingQueue和使用BlockingQueue實現(xiàn)生產(chǎn)消費者模型方法解析的全部內(nèi)容,希望對大家有所幫助。感興趣的朋友可以參閱本站:

淺談Java多線程的優(yōu)點及代碼示例

淺談Java多線程處理中Future的妙用(附源碼)

Java利用future及時獲取多線程運行結(jié)果

如有不足之處,歡迎留言指出。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI