溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

詳細(xì)分析Java并發(fā)集合ArrayBlockingQueue的用法

發(fā)布時(shí)間:2020-10-18 13:06:12 來源:腳本之家 閱讀:195 作者:wo883721 欄目:編程語言

在上一章中,我們介紹了阻塞隊(duì)列BlockingQueue,下面我們介紹它的常用實(shí)現(xiàn)類ArrayBlockingQueue。

一. 用數(shù)組來實(shí)現(xiàn)隊(duì)列

因?yàn)殛?duì)列這種數(shù)據(jù)結(jié)構(gòu)的特殊要求,所以它天然適合用鏈表的方式來實(shí)現(xiàn),用兩個(gè)變量分別記錄鏈表頭和鏈表尾,當(dāng)刪除或插入隊(duì)列時(shí),只要改變鏈表頭或鏈表尾就可以了,而且鏈表使用引用的方式鏈接的,所以它的容量幾乎是無限的。
那么怎么使用數(shù)組來實(shí)現(xiàn)隊(duì)列,我們需要四個(gè)變量:Object[] array來存儲(chǔ)隊(duì)列中元素,headIndex和tailIndex分別記錄隊(duì)列頭和隊(duì)列尾,count記錄隊(duì)列的個(gè)數(shù)。

  1. 因?yàn)閿?shù)組的長度是固定,所以當(dāng)count==array.length時(shí),表示隊(duì)列已經(jīng)滿了,當(dāng)count==0的時(shí)候,表示隊(duì)列是空的。
  2. 當(dāng)添加元素的時(shí)候,將array[tailIndex] = e將tailIndex位置設(shè)置成新元素,之后將tailIndex++自增,然后將count++自增。但是有兩點(diǎn)需要注意,在添加之前必須先判斷隊(duì)列是否已滿,不然會(huì)出現(xiàn)覆蓋已有元素。當(dāng)tailIndex的值等于數(shù)組最后一個(gè)位置的時(shí)候,需要將tailIndex=0,循環(huán)利用數(shù)組
  3. 當(dāng)刪除元素的時(shí)候,將先記錄下array[headIndex] 元素,之后將headIndex++自增,然后將count--自減。但是有兩點(diǎn)需要注意要注意,在刪除之前,必須先判斷隊(duì)列是否為空,不然可能會(huì)刪除已刪除的元素。

這里用了一個(gè)很巧妙的方式,我們知道當(dāng)向隊(duì)列中插入一個(gè)元素,那么就占用了數(shù)組的一個(gè)位置,當(dāng)刪除一個(gè)元素的時(shí)候,那么其實(shí)數(shù)組的這個(gè)位置就空閑出來了,表示這個(gè)位置又可以插入新元素了。

所以我們插入新元素前,必須檢查隊(duì)列是否已滿,刪除元素之前,必須檢查隊(duì)列是否為空。

二. ArrayBlockingQueue中重要成員變量

 /** 儲(chǔ)存隊(duì)列的中元素 */
  final Object[] items;

  /** 隊(duì)列頭的位置 */
  int takeIndex;

  /** 隊(duì)列尾的位置 */
  int putIndex;

  /** 當(dāng)前隊(duì)列擁有的元素個(gè)數(shù) */
  int count;

  /** 用來保證多線程操作共享變量的安全問題 */
  final ReentrantLock lock;

  /** 當(dāng)隊(duì)列為空時(shí),就會(huì)調(diào)用notEmpty的wait方法,讓當(dāng)前線程等待 */
  private final Condition notEmpty;

  /** 當(dāng)隊(duì)列為滿時(shí),就會(huì)調(diào)用notFull的wait方法,讓當(dāng)前線程等待 */
  private final Condition notFull;

就是多了lock、notEmpty、notFull變量來實(shí)現(xiàn)多線程安全和線程等待條件的,它們?nèi)齻€(gè)是怎么操作的呢?

2.1 lock的作用

因?yàn)锳rrayBlockingQueue是在多線程下操作的,所以修改items、takeIndex、putIndex和count這些成員變量時(shí),必須要考慮多線程安全問題,所以這里使用lock獨(dú)占鎖,來保證并發(fā)操作的安全。

2.2 notEmpty與notFull的作用

因?yàn)樽枞?duì)列必須實(shí)現(xiàn),當(dāng)隊(duì)列為空或隊(duì)列已滿的時(shí)候,隊(duì)列的讀取或插入操作要等待。所以我們想到了并發(fā)框架下的Condition對象,使用它來控制。

在AQS中,我們介紹了這個(gè)類的作用:

  1. await系列方法,會(huì)釋放當(dāng)前鎖,并讓當(dāng)前線程等待。
  2. signal與signalAll方法,會(huì)喚醒當(dāng)前線程。其實(shí)它并不是喚醒當(dāng)前線程,而是將在這個(gè)Condition條件上等待的線程,添加到lock鎖上的等待線程池中,所以當(dāng)鎖被釋放時(shí),會(huì)喚醒lock鎖上的等待線程池中一個(gè)線程。具體在AQS中有源碼分析。

三. 添加元素方法

3.1 add(E e)與offer(E e)方法:

  // 調(diào)用AbstractQueue父類中的方法。
  public boolean add(E e) {
    // 通過調(diào)用offer來時(shí)實(shí)現(xiàn)
    if (offer(e))
      return true;
    else
      throw new IllegalStateException("Queue full");
  }

  //向隊(duì)列末尾新添加元素。返回true表示添加成功,false表示添加失敗,不會(huì)拋出異常
  public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 使用lock來保證,多線程修改成員屬性的安全
    lock.lock();
    try {
      // 隊(duì)列已滿,添加元素失敗,返回false。
      if (count == items.length)
        return false;
      else {
        // 調(diào)用enqueue方法將元素插入隊(duì)列中
        enqueue(e);
        return true;
      }
    } finally {
      lock.unlock();
    }
  }

add方法是調(diào)用offer方法實(shí)現(xiàn)的。在offer方法中,必須先判斷隊(duì)列是否已滿,如果已滿就直接返回false,而不會(huì)阻塞當(dāng)前線程。如果不滿就調(diào)用enqueue方法將元素插入隊(duì)列中。

注意:這里使用lock.lock()是保證同一時(shí)間只有一個(gè)線程修改成員變量,防止出現(xiàn)并發(fā)操作問題。雖然它也會(huì)阻塞當(dāng)前線程,但是它并不是條件等待,只是因?yàn)殒i被其他線程持有,而ArrayBlockingQueue中方法操作時(shí)間都不長,這里相當(dāng)于不阻塞線程。

3.2 put方法

  // 向隊(duì)列末尾新添加元素,如果隊(duì)列已滿,當(dāng)前線程就等待。響應(yīng)中斷異常
  public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 使用lock來保證,多線程修改成員屬性的安全
    lock.lockInterruptibly();
    try {
      // 隊(duì)列已滿,則調(diào)用notFull.await()方法,讓當(dāng)前線程等待,直到隊(duì)列不是滿的
      while (count == items.length)
        notFull.await();
      // 調(diào)用enqueue方法將元素插入隊(duì)列中
      enqueue(e);
    } finally {
      lock.unlock();
    }
  }

與offer方法大體流程一樣,只是當(dāng)隊(duì)列已滿的時(shí)候,會(huì)調(diào)用notFull.await()方法,讓當(dāng)前線程阻塞等待,直到隊(duì)列被別的線程移除了元素,隊(duì)列不滿的時(shí)候,會(huì)喚醒這個(gè)等待線程。

3.3 offer(E e, long timeout, TimeUnit unit)方法

/**
   * 向隊(duì)列末尾新添加元素,如果隊(duì)列中沒有可用空間,當(dāng)前線程就等待,
   * 如果等待時(shí)間超過timeout了,那么返回false,表示添加失敗
   */
  public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    checkNotNull(e);
    // 計(jì)算一共最多等待的時(shí)間值nanos
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    // 使用lock來保證,多線程修改成員屬性的安全
    lock.lockInterruptibly();
    try {
      // 隊(duì)列已滿
      while (count == items.length) {
        // nanos <= 0表示最大等待時(shí)間已到,那么不用再等待了,返回false,表示添加失敗。
        if (nanos <= 0)
          return false;
        // 調(diào)用notFull.awaitNanos(nanos)方法,超時(shí)nanos時(shí)間會(huì)被自動(dòng)喚醒,
        // 如果被提前喚醒,那么返回剩余的時(shí)間
        nanos = notFull.awaitNanos(nanos);
      }
      // 調(diào)用enqueue方法將元素插入隊(duì)列中
      enqueue(e);
      return true;
    } finally {
      lock.unlock();
    }
  }

與put的方法大體流程一樣,只不過是調(diào)用notFull.awaitNanos(nanos)方法,讓當(dāng)前線程等待,并設(shè)置等待時(shí)間。

四. 刪除元素方法

4.1 remove()和poll()方法:

  // 調(diào)用AbstractQueue父類中的方法。
  public E remove() {
    // 通過調(diào)用poll來時(shí)實(shí)現(xiàn)
    E x = poll();
    if (x != null)
      return x;
    else
      throw new NoSuchElementException();
  }

// 刪除隊(duì)列第一個(gè)元素(即隊(duì)列頭),并返回它。如果隊(duì)列是空的,它不會(huì)拋出異常,而是會(huì)返回null。
  public E poll() {
    final ReentrantLock lock = this.lock;
    // 使用lock來保證,多線程修改成員屬性的安全
    lock.lock();
    try {
      // 如果count == 0,列表為空,就返回null,否則調(diào)用dequeue方法,返回列表頭元素
      return (count == 0) ? null : dequeue();
    } finally {
      lock.unlock();
    }
  }

remove方法是調(diào)用poll()方法實(shí)現(xiàn)的。在 poll()方法中,如果列表為空,就返回null,否則調(diào)用dequeue方法,返回列表頭元素。

4.2 take()方法

  /**
   * 返回并移除隊(duì)列第一個(gè)元素,如果隊(duì)列是空的,就前線程就等待。響應(yīng)中斷異常
   */
  public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 使用lock來保證,多線程修改成員屬性的安全
    lock.lockInterruptibly();
    try {
      // 如果隊(duì)列為空,就調(diào)用notEmpty.await()方法,讓當(dāng)前線程等待。
      // 直到有別的線程向隊(duì)列中插入元素,那么這個(gè)線程會(huì)被喚醒。
      while (count == 0)
        notEmpty.await();
      // 調(diào)用dequeue方法,返回列表頭元素
      return dequeue();
    } finally {
      lock.unlock();
    }
  }

take()方法當(dāng)隊(duì)列為空的時(shí)候,當(dāng)前線程必須等待,直到有別的線程向隊(duì)列中插入新元素,那么這個(gè)線程會(huì)被喚醒。

4.3 poll(long timeout, TimeUnit unit)方法

  /**
   * 返回并移除隊(duì)列第一個(gè)元素,如果隊(duì)列是空的,就前線程就等待。
   * 如果等待時(shí)間超過timeout了,那么返回false,表示獲取元素失敗
   */
  public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 計(jì)算一共最多等待的時(shí)間值nanos
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    // 使用lock來保證,多線程修改成員屬性的安全
    lock.lockInterruptibly();
    try {
      // 隊(duì)列為空
      while (count == 0) {
        // nanos <= 0表示最大等待時(shí)間已到,那么不用再等待了,返回null。
        if (nanos <= 0)
          return null;
        // 調(diào)用notEmpty.awaitNanos(nanos)方法讓檔期線程等待,并設(shè)置超時(shí)時(shí)間。
        nanos = notEmpty.awaitNanos(nanos);
      }
      // 調(diào)用dequeue方法,返回列表頭元素
      return dequeue();
    } finally {
      lock.unlock();
    }
  }

與take()方法流程一樣,只不過調(diào)用awaitNanos(nanos)方法,讓當(dāng)前線程等待,并設(shè)置等待時(shí)間。

五. 查看元素的方法

5.1 element()與peek() 方法

  // 調(diào)用AbstractQueue父類中的方法。
  public E element() {
    E x = peek();
    if (x != null)
      return x;
    else
      throw new NoSuchElementException();
  }

  // 查看隊(duì)列頭元素
  public E peek() {
    final ReentrantLock lock = this.lock;
    // 使用lock來保證,多線程修改成員屬性的安全
    lock.lock();
    try {
      // 返回當(dāng)前隊(duì)列頭的元素
      return itemAt(takeIndex); // null when queue is empty
    } finally {
      lock.unlock();
    }
  }

六. 其他重要方法

6.1 enqueue和dequeue方法

  // 將元素x插入隊(duì)列尾
  private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null; //當(dāng)前putIndex位置元素一定是null
    final Object[] items = this.items;
    items[putIndex] = x;
    // 如果putIndex等于items.length,那么將putIndex重新設(shè)置為0
    if (++putIndex == items.length)
      putIndex = 0;
    // 隊(duì)列數(shù)量加一
    count++;
    // 因?yàn)椴迦胍粋€(gè)元素,那么當(dāng)前隊(duì)列肯定不為空,那么喚醒在notEmpty條件下等待的一個(gè)線程
    notEmpty.signal();
  }

  // 刪除隊(duì)列頭的元素,返回它
  private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;

    final Object[] items = this.items;
    // 得到當(dāng)前隊(duì)列頭的元素
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    // 將當(dāng)前隊(duì)列頭位置設(shè)置為null
    items[takeIndex] = null;
    if (++takeIndex == items.length)
      takeIndex = 0;
    // 隊(duì)列數(shù)量減一
    count--;
    if (itrs != null)
      itrs.elementDequeued();
    // 因?yàn)閯h除了一個(gè)元素,那么隊(duì)列肯定不滿了,那么喚醒在notFull條件下等待的一個(gè)線程
    notFull.signal();
    return x;
  }

這兩個(gè)方法分別代表,向隊(duì)列中插入元素和從隊(duì)列中刪除元素。而且它們要喚醒等待的線程。當(dāng)插入元素后,那么隊(duì)列一定不為空,那么就要喚醒在notEmpty條件下等待的線程。當(dāng)刪除元素后,那么隊(duì)列一定不滿,那么就要喚醒在notFull條件下等待的線程。

6.2 remove(Object o)方法

  // 刪除隊(duì)列中對象o元素,最多刪除一條
  public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    // 使用lock來保證,多線程修改成員屬性的安全
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      // 當(dāng)隊(duì)列中有值的時(shí)候,才進(jìn)行刪除。
      if (count > 0) {
        // 隊(duì)列尾下一個(gè)位置
        final int putIndex = this.putIndex;
        // 隊(duì)列頭的位置
        int i = takeIndex;
        do {
          // 當(dāng)前位置元素與被刪除元素相同
          if (o.equals(items[i])) {
            // 刪除i位置元素
            removeAt(i);
            // 返回true
            return true;
          }
          if (++i == items.length)
            i = 0;
          // 當(dāng)i==putIndex表示遍歷完所有元素
        } while (i != putIndex);
      }
      return false;
    } finally {
      lock.unlock();
    }
  }

從隊(duì)列中刪除指定對象o,那么就要遍歷隊(duì)列,刪除第一個(gè)與對象o相同的元素,如果隊(duì)列中沒有對象o元素,那么返回false刪除失敗。

這里有兩點(diǎn)需要注意:

如何遍歷隊(duì)列,就是從隊(duì)列頭遍歷到隊(duì)列尾。就要靠takeIndex和putIndex兩個(gè)變量了。

為什么Object[] items = this.items;這句代碼沒有放到同步鎖lock代碼塊內(nèi)。items是成員變量,那么多線程操作的時(shí)候,不會(huì)有并發(fā)問題么?

這個(gè)是因?yàn)閕tems是個(gè)引用變量,不是基本數(shù)據(jù)類型,而且我們對隊(duì)列的插入和刪除操作,都是針對這一個(gè)items數(shù)組,沒有改變數(shù)組的引用,所以在lock代碼中,items會(huì)得到其他線程對它最新的修改。但是如果這里將int putIndex = this.putIndex;方法lock代碼塊外面,就會(huì)產(chǎn)生問題。

removeAt(final int removeIndex)方法

  // 刪除隊(duì)列removeIndex位置的元素
  void removeAt(final int removeIndex) {
    // assert lock.getHoldCount() == 1;
    // assert items[removeIndex] != null;
    // assert removeIndex >= 0 && removeIndex < items.length;
    final Object[] items = this.items;
    // 表示刪除元素是列表頭,就容易多了,與dequeue方法流程差不多
    if (removeIndex == takeIndex) {
      // 移除removeIndex位置元素
      items[takeIndex] = null;
      // 到了數(shù)組末尾,就要轉(zhuǎn)到數(shù)組頭位置
      if (++takeIndex == items.length)
        takeIndex = 0;
      // 隊(duì)列數(shù)量減一
      count--;
      if (itrs != null)
        itrs.elementDequeued();
    } else {
      // an "interior" remove

      final int putIndex = this.putIndex;
      for (int i = removeIndex;;) {
        int next = i + 1;
        if (next == items.length)
          next = 0;
        // 還沒有到隊(duì)列尾,那么就將后一個(gè)位置元素覆蓋前一個(gè)位置的元素
        if (next != putIndex) {
          items[i] = items[next];
          i = next;
        } else {
          // 將隊(duì)列尾元素置位null
          items[i] = null;
          // 重新設(shè)置putIndex的值
          this.putIndex = i;
          break;
        }
      }
      // 隊(duì)列數(shù)量減一
      count--;
      if (itrs != null)
        itrs.removedAt(removeIndex);
    }
    // 因?yàn)閯h除了一個(gè)元素,那么隊(duì)列肯定不滿了,那么喚醒在notFull條件下等待的一個(gè)線程
    notFull.signal();
  }

在隊(duì)列中刪除指定位置的元素。需要注意的是刪除之后的數(shù)組還能保持隊(duì)列形式,分為兩種情況:

  1. 如果刪除位置是隊(duì)列頭,那么簡單,只需要將隊(duì)列頭的位置元素設(shè)置為null,將將隊(duì)列頭位置加一。
  2. 如果刪除位置不是隊(duì)列頭,那么麻煩了,這個(gè)時(shí)候,我們就要將從removeIndex位置后的元素全部左移一位,覆蓋前一個(gè)元素。最后將原來隊(duì)列尾的元素置位null。

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持億速云。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI