溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Java多線程 BlockingQueue實現(xiàn)生產(chǎn)者消費者模型詳解

發(fā)布時間:2020-10-04 01:41:14 來源:腳本之家 閱讀:373 作者:Rest探路者 欄目:編程語言

BlockingQueue

BlockingQueue、解決了多線程中,如何高效安全“傳輸”數(shù)據(jù)的問題。程序員無需關(guān)心什么時候阻塞線程,什么時候喚醒線程,該喚醒哪個線程。

方法介紹

Java多線程 BlockingQueue實現(xiàn)生產(chǎn)者消費者模型詳解

BlockingQueue是Queue的子類

void put(E e)

插入指定元素,當(dāng)BlockingQueue為滿,則線程阻塞,進入Waiting狀態(tài),直到BlockingQueue有空閑空間再繼續(xù)。
這里以ArrayBlockingQueue為例進行分析

Java多線程 BlockingQueue實現(xiàn)生產(chǎn)者消費者模型詳解

Java多線程 BlockingQueue實現(xiàn)生產(chǎn)者消費者模型詳解

void take()

隊首出隊,當(dāng)BlockingQueue為空,則線程阻塞,進入Waiting狀態(tài),直到BlockingQueue不為空再繼續(xù)。

Java多線程 BlockingQueue實現(xiàn)生產(chǎn)者消費者模型詳解

Java多線程 BlockingQueue實現(xiàn)生產(chǎn)者消費者模型詳解

int drainTo(Collection<? super E> c)

從隊列中批量取出數(shù)據(jù),并放入到另一個集合中,返回轉(zhuǎn)移數(shù)據(jù)的數(shù)量,只需一次加鎖和解鎖。

BlockingQueue的實現(xiàn)類

ArrayBlockingQueue

  /*
   * Concurrency control uses the classic two-condition algorithm
   * found in any textbook.
   */

  /** Main lock guarding all access */
  final ReentrantLock lock;

  /** Condition for waiting takes */
  private final Condition notEmpty;

  /** Condition for waiting puts */
  private final Condition notFull;

基于數(shù)組實現(xiàn)的BlockingQueue,需要指定隊列容量,可以指定是否為公平鎖;只有一個ReentrantLock,生產(chǎn)者和消費者不能異步執(zhí)行。

LinkedBlockingQueue

  /** Lock held by take, poll, etc */
  private final ReentrantLock takeLock = new ReentrantLock();

  /** Wait queue for waiting takes */
  private final Condition notEmpty = takeLock.newCondition();

  /** Lock held by put, offer, etc */
  private final ReentrantLock putLock = new ReentrantLock();

  /** Wait queue for waiting puts */
  private final Condition notFull = putLock.newCondition();

基于鏈表實現(xiàn)的BlockingQueue,可以指定隊列容量,不指定隊列容量默認為Integer.MAX_VALUE;有兩個ReentrantLock,生產(chǎn)者和消費者可以異步執(zhí)行。

BlockingQueue實現(xiàn)生產(chǎn)者消費者模型

緩沖區(qū)可以存放大量數(shù)據(jù)

生產(chǎn)者和消費者速度各不相同

public class MyThread42 {
  public static void main(String[] args)
  {
    final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(10);
    Runnable producerRunnable = new Runnable()
    {
      int i = 0;
      public void run()
      {
        while (true)
        {
          try
          {
            System.out.println("我生產(chǎn)了一個" + i++);
            bq.put(i + "");
            Thread.sleep(1000);
          }
          catch (InterruptedException e)
          {
            e.printStackTrace();
          }
        }
      }
    };
    Runnable customerRunnable = new Runnable()
    {
      public void run()
      {
        while (true)
        {
          try
          {
            System.out.println("我消費了一個" + bq.take());
            Thread.sleep(3000);
          }
          catch (InterruptedException e)
          {
            e.printStackTrace();
          }
        }
      }
    };
    Thread producerThread = new Thread(producerRunnable);
    Thread customerThread = new Thread(customerRunnable);
    producerThread.start();
    customerThread.start();
  }
}

輸出結(jié)果如下

我生產(chǎn)了一個0
我消費了一個1
我生產(chǎn)了一個1
我生產(chǎn)了一個2
我消費了一個2
我生產(chǎn)了一個3
我生產(chǎn)了一個4
我生產(chǎn)了一個5
我消費了一個3
我生產(chǎn)了一個6
我生產(chǎn)了一個7
我生產(chǎn)了一個8
我消費了一個4
我生產(chǎn)了一個9
我生產(chǎn)了一個10
我生產(chǎn)了一個11
我消費了一個5
我生產(chǎn)了一個12
我生產(chǎn)了一個13
我生產(chǎn)了一個14
我消費了一個6
我生產(chǎn)了一個15
我生產(chǎn)了一個16
我消費了一個7
我生產(chǎn)了一個17
我消費了一個8
我生產(chǎn)了一個18
我消費了一個9
我生產(chǎn)了一個19
我消費了一個10
我生產(chǎn)了一個20
我消費了一個11
我生產(chǎn)了一個21
我消費了一個12
我生產(chǎn)了一個22
我消費了一個13
我生產(chǎn)了一個23
我消費了一個14
我生產(chǎn)了一個24

······

生產(chǎn)者沒有生產(chǎn)到BlockingQueue的容量(極限是10)之前,生產(chǎn)3個,消費1個,再生產(chǎn)到BlockingQueue的容量之后,生產(chǎn)一個消費一個,因為不能超過BlockingQueue的容量。

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持億速云。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI