溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Java中怎么使用BlockingQueue實(shí)現(xiàn)并發(fā)

發(fā)布時(shí)間:2021-08-03 15:46:55 來(lái)源:億速云 閱讀:125 作者:Leah 欄目:編程語(yǔ)言

Java中怎么使用BlockingQueue實(shí)現(xiàn)并發(fā),相信很多沒有經(jīng)驗(yàn)的人對(duì)此束手無(wú)策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。


1 概述

阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩種附加操作的隊(duì)列。支持附加阻塞的插入和移除操作。

  • 支持阻塞的插入:當(dāng)隊(duì)列滿時(shí),插入操作會(huì)被阻塞,直到隊(duì)列不滿。

  • 支持阻塞的移除:當(dāng)隊(duì)列空時(shí),移除操作會(huì)被阻塞,直到隊(duì)列不空。

阻塞隊(duì)列不可用時(shí),操作處理方式

方法\處理方式拋出異常返回特殊值一直阻塞超時(shí)退出
插入方法add(e)offer(e)put(e)offer(e, time, unit)
移除方法remove()poll()take()poll(time, unit)
檢查方法element()peek()無(wú)無(wú)
  • 拋出異常:隊(duì)列滿時(shí),若繼續(xù)插入元素會(huì)拋出IllegalStateException;當(dāng)隊(duì)列為空時(shí),若獲取元素則會(huì)拋出NoSuchElementException異常。

  • 返回特殊值:向隊(duì)列插入元素時(shí),會(huì)返回是否插入成功true/false;獲取元素時(shí),成功則返回元素,失敗則返回null。

  • 一直阻塞:當(dāng)阻塞隊(duì)列滿時(shí),若繼續(xù)使用put新增元素時(shí)會(huì)被阻塞,直到隊(duì)列不為空或者響應(yīng)中斷退出;當(dāng)阻塞隊(duì)列為空時(shí),繼續(xù)使用take獲取元素時(shí)會(huì)被阻塞,直到隊(duì)列不為空。

  • 超時(shí)退出:當(dāng)阻塞隊(duì)列滿時(shí),使用offer(e, time, unit)新增元素會(huì)被阻塞至超時(shí)退出;當(dāng)隊(duì)列為空時(shí),使用poll(time, unit)獲取元素時(shí)會(huì)被阻塞至超時(shí)退出。

注意:

  • 阻塞隊(duì)列中不允許插入null,會(huì)拋出NPE異常。

  • 可以訪問阻塞隊(duì)列中的任意元素,調(diào)用remove(Object o)可以將隊(duì)列之中的特定對(duì)象移除,但會(huì)遍歷全部元素,并不高效。

2 阻塞隊(duì)列的實(shí)現(xiàn)

2.1 ArrayBlockingQueue

由數(shù)組構(gòu)成的有界阻塞隊(duì)列,內(nèi)部由數(shù)組final Object[] items實(shí)現(xiàn)。默認(rèn)情況下不保證線程公平的訪問隊(duì)列,所謂公平訪問隊(duì)列指阻塞的線程,可以按照阻塞的先后順序訪問隊(duì)列。

public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  lock = new ReentrantLock(fair);  // 使用公平鎖/非公平鎖
  notEmpty = lock.newCondition();
  notFull =  lock.newCondition();
}

隊(duì)列大小初始化后不可修改。參數(shù)fair控制內(nèi)部ReentrantLock是否采用公平鎖。

2.2 LinkedBlockingQueue

鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列。內(nèi)部結(jié)構(gòu)是單鏈表。默認(rèn)大小為Integer.MAX_VALUE,可以指定大小。

public LinkedBlockingQueue(int capacity) {
  if (capacity <= 0) throw new IllegalArgumentException();
  // 指定隊(duì)列大小
  this.capacity = capacity;
  last = head = new Node<E>(null);
}

// 單鏈表節(jié)點(diǎn)Node
static class Node<E> {
  E item;
  Node<E> next;
  Node(E x) { item = x; }
}
2.3 PriorityBlockingQueue

支持優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列。默認(rèn)情況下采取自然順序升序排列。也可以自定義compareTo()方法來(lái)指定元素的排列順序,或者初始化隊(duì)列時(shí),指定構(gòu)造參數(shù)Comparator來(lái)對(duì)元素進(jìn)行排序。同優(yōu)先級(jí)順序無(wú)法保證。

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
  if (initialCapacity < 1)
    throw new IllegalArgumentException();
  this.lock = new ReentrantLock();  // 非公平鎖
  this.notEmpty = lock.newCondition();
  this.comparator = comparator;
  this.queue = new Object[initialCapacity];
}


// offer方法部分代碼
Comparator<? super E> cmp = comparator;
if (cmp == null)
  siftUpComparable(n, e, array);
else
  siftUpUsingComparator(n, e, array, cmp);

由offer代碼可以看出,Comparator的優(yōu)先級(jí)是大于Comparable.compareTo方法的。

注意:PriorityBlockingQueue不會(huì)阻塞數(shù)據(jù)生產(chǎn)者(隊(duì)列無(wú)界),只會(huì)在沒有數(shù)據(jù)時(shí)阻塞消費(fèi)者。生產(chǎn)者生產(chǎn)數(shù)據(jù)的速度絕對(duì)不能快于消費(fèi)者消費(fèi)數(shù)據(jù)的速度,否則將有可能耗盡堆空間。

2.4 DelayQueue

支持延時(shí)獲取元素的無(wú)界隊(duì)列。隊(duì)列使用PriorityQueue實(shí)現(xiàn)。隊(duì)列中的元素必須實(shí)現(xiàn)java.util.concurrent.Delayed接口,在創(chuàng)建元素時(shí)指定多久才能才能從隊(duì)列中取到元素。

DelayQueue非常有用,可以將DelayQueu應(yīng)用在以下應(yīng)用場(chǎng)景。

  • 緩存系統(tǒng)的設(shè)計(jì):用DelayQueue保存緩存元素的有效期,使用一個(gè)線程循環(huán)查詢DelayQueue,一旦能獲取到元素時(shí),表示緩存有限期到了。

  • 定時(shí)任務(wù)調(diào)度:使用DelayQueue保存當(dāng)天將會(huì)執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從DelayQueue中獲取到任務(wù)就開始執(zhí)行。比如TimerQueue就是使用DelayQueue實(shí)現(xiàn)的。

2.5 SynchronousQueue

不存儲(chǔ)元素的阻塞隊(duì)列。每個(gè)put操作都必須等待一個(gè)take操作,反之亦然。

// fair為true,等待線程將以FIFO的順序進(jìn)行訪問
public SynchronousQueue(boolean fair) {
  transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

將生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費(fèi)者線程。隊(duì)列本身不存儲(chǔ)任何元素,非常適合傳遞性場(chǎng)景。SynchronousQueue的吞吐量高于ArrayBlockingQueueLinkedBlockingQueue。

3 阻塞隊(duì)列的原理

利用Lock鎖的多條件(Condition)阻塞控制。下面簡(jiǎn)單分析下ArrayBlockingQueue部分代碼。

3.1 ArrayBlockingQueue屬性
/** The queued items */
// 數(shù)據(jù)元素?cái)?shù)組
final Object[] items;

/** items index for next take, poll, peek or remove */
// 下一個(gè)待獲取元素索引
int takeIndex;

/** items index for next put, offer, or add */
// 下一個(gè)待插入元素索引
int putIndex;

/** Number of elements in the queue */
// 隊(duì)列中元素個(gè)數(shù)
int count;

/*
 * Concurrency control uses the classic two-condition algorithm
 * found in any textbook.
 */

/** Main lock guarding all access */
// 所有訪問的主鎖
final ReentrantLock lock;

/** Condition for waiting takes */
// 消費(fèi)者監(jiān)視器
private final Condition notEmpty;

/** Condition for waiting puts */
// 生產(chǎn)者監(jiān)視器
private final Condition notFull;


// 
public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  lock = new ReentrantLock(fair);
  notEmpty = lock.newCondition();
  notFull =  lock.newCondition();
}
3.2 put操作
// 在隊(duì)列尾部插入元素,若隊(duì)列已滿則等待隊(duì)列非滿。
public void put(E e) throws InterruptedException {
  // 校驗(yàn)插入元素,為空則拋出NPE
  checkNotNull(e);
  final ReentrantLock lock = this.lock;
  // 1. 嘗試獲取鎖(響應(yīng)中斷)
  lock.lockInterruptibly();
  try {
    // 2. 當(dāng)隊(duì)列滿時(shí)
    while (count == items.length)
      // 2.1 若隊(duì)列滿,則阻塞當(dāng)前線程。等待`notFull.signal()`喚醒。
      notFull.await();
    // 3. 非滿則執(zhí)行入隊(duì)操作
    enqueue(e);
  } finally {
    lock.unlock();
  }
}

// 在`putIndex`處放置當(dāng)前元素,只有獲取lock鎖后才會(huì)調(diào)用
private void enqueue(E x) {
  // assert lock.getHoldCount() == 1;
  // assert items[putIndex] == null;
  final Object[] items = this.items;
  // 在`putIndex`處放置元素
  items[putIndex] = x;
  // putIndex等于數(shù)組長(zhǎng)度時(shí),重置為0索引。
  if (++putIndex == items.length)
    putIndex = 0;
  // 數(shù)量加1
  count++;
  // 4. 喚醒一個(gè)等待線程(等待取元素的線程)
  notEmpty.signal();
}

put總體流程:

  1. 獲取lock鎖,拿到鎖后繼續(xù)執(zhí)行,否則自旋競(jìng)爭(zhēng)鎖。

  2. 判斷阻塞隊(duì)列是否滿。滿了了則調(diào)用await阻塞當(dāng)前線程。同時(shí)釋放lock鎖。

  3. 如果沒滿,則調(diào)用enqueue方法將元素put進(jìn)阻塞隊(duì)列。此時(shí)還有一種可能是:第2步中被阻塞的線程被喚醒且又拿到了lock鎖。

  4. 喚醒一個(gè)標(biāo)記為notEmpty(消費(fèi)者)的線程。

3.3 take操作
// 從頭部獲取元素,若隊(duì)列為空則等待隊(duì)列非空。
public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  // 1. 獲取鎖
  lock.lockInterruptibly();
  try {
    // 2. 當(dāng)隊(duì)列為空時(shí)
    while (count == 0)
      // 2.1 當(dāng)隊(duì)列為空時(shí),阻塞當(dāng)前線程。等待`notEmpty.signal()`喚醒。
      notEmpty.await();
    // 3. 非空則進(jìn)行入隊(duì)操作
    return dequeue();
  } finally {
    lock.unlock();
  }
}

// 從`takeIndex`位置獲取當(dāng)前元素,只有獲取到lock鎖后才會(huì)調(diào)用
private E dequeue() {
  // assert lock.getHoldCount() == 1;
  // assert items[takeIndex] != null;
  final Object[] items = this.items;
  @SuppressWarnings("unchecked")
  // 從`takeIndex`位置獲取元素,然后清除該位置元素
  E x = (E) items[takeIndex];
  items[takeIndex] = null;
  // 
  if (++takeIndex == items.length)
    takeIndex = 0;
  // 隊(duì)列元素減1
  count--;
  if (itrs != null)
    itrs.elementDequeued();
  // 4. 喚醒一個(gè)標(biāo)記為notFull(生產(chǎn)者)的線程
  notFull.signal();
  return x;
}

take的整體流程:

  1. 獲取lock鎖,拿到鎖則執(zhí)行下一步流程;未拿到則自旋競(jìng)爭(zhēng)鎖。

  2. 當(dāng)前隊(duì)列是否為空,若為空則調(diào)用notEmpty.await阻塞當(dāng)前線程,同時(shí)釋放鎖,等待被喚醒。

  3. 若非空,則調(diào)用dequeue進(jìn)行出隊(duì)操作。此時(shí)還有一種可能:第2步中的阻塞的線程被喚醒并且又拿到了lock鎖。

  4. 喚醒一個(gè)被標(biāo)記為notFull(生產(chǎn)者)的線程。

3.4 總結(jié)
  1. puttake操作都需要先獲得鎖,沒有獲得鎖的線程無(wú)法進(jìn)行操作。

  2. 拿到鎖后,并不一定能順利執(zhí)行put/take操作,還需要判斷隊(duì)列是否可用(是否滿/空),不可用則會(huì)被阻塞,并釋放鎖。

  3. 在2中被阻塞的線程會(huì)被喚醒,但喚醒之后依然需要拿到鎖之后才能繼續(xù)向下執(zhí)行。否則,自旋拿鎖,拿到鎖后再while判斷隊(duì)列是否可用。

看完上述內(nèi)容,你們掌握J(rèn)ava中怎么使用BlockingQueue實(shí)現(xiàn)并發(fā)的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI