溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何理解線程安全的隊列ArrayBlockingQueue源碼

發(fā)布時間:2021-10-21 16:52:38 來源:億速云 閱讀:94 作者:iii 欄目:編程語言

本篇內(nèi)容介紹了“如何理解線程安全的隊列ArrayBlockingQueue源碼”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

一,ArrayBlockingQueue源碼分析

ArrayBlockingQueue是隊列的一種,隊列的特點(diǎn)嘛,先出先出,然而這種隊列是一種線程安全阻塞式的隊列,為什么是阻塞式隊列?我想,這正好是我寫和分析這篇文章的內(nèi)容所在。

由于本篇內(nèi)容涉及的內(nèi)容比較多,所以有些地方自己不會特地講的很詳細(xì),但是足夠自己使自己明白了,一般文章出來的時候,如果連自己讀起來都費(fèi)勁,或者有些不懂的地方,我想,這樣的文章,一種是寫作者自己遺漏了或者寫的有的時候含糊其辭了。

但是,我不會,因?yàn)椋业奈恼禄旧现饕菍懡o自己的,如果可以幫助需要的人,自己還是比較開心的,因?yàn)?,你們或許也看到了,我之前寫的文章風(fēng)格與別人不一樣,自己覺得我把當(dāng)時的想法寫出來就可以了,如果不完美也沒事,以后自己在改進(jìn)就可以了,我想這就是我與別的創(chuàng)作者不同的一點(diǎn),我也不是很刻意追求閱讀量如何如何,當(dāng)然了,如果你們關(guān)注我,或者分享我寫的內(nèi)容,我還是很感謝你的,哈哈,下面我們分析這個隊列集合的源碼了。

 

二,方法分析

 

2.0,左右可以滑動查看

 

2.1,構(gòu)造函數(shù)

//默認(rèn)必須給定隊列的容量
public ArrayBlockingQueue(int capacity) {
       this(capacity, false);
   }
//第二步
public ArrayBlockingQueue(int capacity, boolean fair) {
    //當(dāng)然了,容量不能小于等于0,因?yàn)殛犃惺怯脕硌b填元素
    //初始化容量為0,沒有意義撒
       if (capacity <= 0)
           throw new IllegalArgumentException();
    //創(chuàng)建一個容量為capacity大小的數(shù)組空間賦值給成員變量items
       this.items = new Object[capacity];
    //創(chuàng)建一個非公平鎖的實(shí)例對象
    //這里如果對ReentrantLock不了解的話,可以自己查看一下哈
    //lock鎖與synchronized關(guān)鍵字的鎖的區(qū)別還是要知道一下的
       lock = new ReentrantLock(fair);
    //下面的newCondition操作,就是為了后面的線程間通信做準(zhǔn)備的
       notEmpty = lock.newCondition();
       notFull =  lock.newCondition();
   }
 

上面的分析過程中,我們了解了如何實(shí)現(xiàn)一個鎖,以及線程間通信的內(nèi)容,這里簡單提及下,往后看,自己會對這部分進(jìn)行詳盡的說明的。

 

2.2,add()方法

public boolean add(E e) {
   //調(diào)用共用的方法add
       return super.add(e);
   }
//第二步操作
public boolean add(E e) {
   //復(fù)用offer()方法的實(shí)現(xiàn)邏輯
       if (offer(e))
           return true;
       else
   //若隊列添加失敗,說明隊列已經(jīng)滿了,不可能裝填數(shù)據(jù)元素了
   //此時拋出隊列已滿的異常即可
           throw new IllegalStateException("Queue full");
   }
//第三步操作
public boolean offer(E e) {
   //這個隊列也是不可以裝填元素為null的元素的,所以需要進(jìn)行檢查元素是否為空的邏輯校驗(yàn)
       checkNotNull(e);
   //獲取鎖實(shí)例對象
       final ReentrantLock lock = this.lock;
   //進(jìn)行加鎖操作,由于后面的大部分方法都會用到鎖,所以這里可以看出這是一個線程安全的隊列
       lock.lock();
       try {
    //隊列的容量,在創(chuàng)建的時候就已經(jīng)指定了,如果隊列的元素個數(shù)count和數(shù)組的空間相等了
    //說明隊列已經(jīng)沒有容量裝填數(shù)據(jù)元素了,此時返回false即可
           if (count == items.length)
               return false;
           else {
     //進(jìn)行入隊列操作
               enqueue(e);
               return true;
           }
       } finally {
           //釋放鎖的邏輯
           lock.unlock();
       }
   }
//第四步操作
private static void checkNotNull(Object v) {
       if (v == null)
           throw new NullPointerException();
   }
//第五步操作
private void enqueue(E x) {
       //將實(shí)例變量items賦值給臨時變量items,主要也是編程中常見的寫法
       final Object[] items = this.items;
   //將元素x裝載到隊列的末尾,此時的putIndex可以數(shù)組索引下標(biāo)的,我個人理解
       items[putIndex] = x;
   //這里為啥要加這么一句呢?我想這是因?yàn)閿?shù)組空間滿了,又要重新開始了,所以這里putIndex要置為0
       if (++putIndex == items.length)
           putIndex = 0;
   //count表示隊列的元素個數(shù),是個成員變量,入隊列之后,count加一是必須的
       count++;
   //發(fā)出一個信號通知,說明隊列不空,有元素可以從隊列進(jìn)行獲取
   //這里主要是線程間通信的,等下后面會介紹線程間通信的
       notEmpty.signal();
   }

 

線程間通信,你知道有哪種方式嗎,后面自己會單獨(dú)介紹的,后面自己慢慢會介紹的,不要著急哦

 

2.3,peek()方法

public E peek() {
   //加鎖lock.lock
       final ReentrantLock lock = this.lock;
       lock.lock();
       try {
    //根據(jù)數(shù)組的索引下標(biāo)獲取指定位置的元素,此時元素并沒有出隊列,不同于后面要分析的poll方法
           return itemAt(takeIndex);  
       } finally {
   //解鎖
           lock.unlock();
       }
   }
//第二步操作
//這是一個final關(guān)鍵字修飾的方法,fianl關(guān)鍵字修飾變量,方法,類的作用都可以去回顧一下的哈
final E itemAt(int i) {
    //根據(jù)索引下標(biāo)獲取指定位置元素,時間復(fù)雜度為o(1)
       return (E) items[i];
   }
 

final修飾方法,如果一個類不允許其子類覆蓋某個方法,即,不允許被子類重寫,則可以把這個方法聲明為final方法

 

2.4,size()方法

public int size() {
       final ReentrantLock lock = this.lock;
       lock.lock();
       try {
           //直接返回隊列元素個數(shù)的實(shí)例變量count即可,是不是很簡單
           //于此同時,這也是一個線程安全的方法
           return count;
       } finally {
           lock.unlock();
       }
   
   

2.5,contains()方法

public boolean contains(Object o) {
   //因?yàn)檫@個隊列里面不包含null元素,所以若元素o為null,則直接返回false
       if (o == null) return false;
       final Object[] items = this.items;
       final ReentrantLock lock = this.lock;
   //這是一個線程安全的方法
       lock.lock();
       try {
           if (count > 0) {
               //當(dāng)隊列的元素個數(shù)增加時,此時的putIndex值是增加的
               final int putIndex = this.putIndex;
               int i = takeIndex;
               do {
                   //若元素o,等于數(shù)組的其中一個元素,則直接返回false
                   if (o.equals(items[i]))
                       return true;
                   if (++i == items.length)
                       i = 0;
               } while (i != putIndex);
           }
           //隊列里面沒有元素,則直接返回false
           return false;
       } finally {
           lock.unlock();
       }
   }
   

2.6,take()方法

public E take() throws InterruptedException {
   //線程安全的方法
       final ReentrantLock lock = this.lock;
       lock.lockInterruptibly();
       try {
           //如果隊列的元素個數(shù)為0,則此時需要等待,所以這里是一個阻塞式隊列
           //此時這里就相當(dāng)于一條指令一直在循環(huán)判斷count的值是否不等于0
           while (count == 0)
               notEmpty.await();
           //進(jìn)行出隊列操作
           return dequeue();
       } finally {
           lock.unlock();
       }
   }
//第二步操作
private E dequeue() {
       //默認(rèn)takeIndex是從0開始的,如果出隊列了,takeIndex值就會增加一
       final Object[] items = this.items;
       E x = (E) items[takeIndex];
   //出隊列之后,元素就需要置為null了,等待gc在某個時刻進(jìn)行回收不可達(dá)對象的回收
       items[takeIndex] = null;
   //如果takeIndex等于了數(shù)組空間的大小了,說明,隊列的元素個數(shù)已經(jīng)取完了,此時需要重置takeIndex值為0
       if (++takeIndex == items.length)
           takeIndex = 0;
   //每取出一個數(shù)組元素,元素個數(shù)減一
       count--;
       if (itrs != null)
           itrs.elementDequeued();
   //發(fā)出一個信號通知,"隊列不滿,還可以put操作的信號"
       notFull.signal();
       return x;
   }
   

2.7,poll()方法

public E poll() {
     //線程安全
       final ReentrantLock lock = this.lock;
       lock.lock();
       try {
           //若隊列的元素個數(shù)為0,則隊列的元素是沒有的,就返回了null
           //否則就執(zhí)行出隊列操作,上面已經(jīng)分析過了,這里就不分析了
           return (count == 0) ? null : dequeue();
       } finally {
           lock.unlock();
       }
   }
   

2.8,clear()方法

public void clear() {
       final Object[] items = this.items;
       final ReentrantLock lock = this.lock;
       lock.lock();
       try {
           int k = count;
           //如果隊列存在元素大于0,直接執(zhí)行下面的操作
           if (k > 0) {
               //putIndex的位置,就是需要移動到的位置
               final int putIndex = this.putIndex;
               int i = takeIndex;
               do {
                   //循環(huán)將每個元素值置為null,等待gc在某個時刻觸發(fā)
                   items[i] = null;
                   if (++i == items.length)
                       i = 0;
               } while (i != putIndex);
               //這里為啥要賦值呢?思考一下
               takeIndex = putIndex;
               //元素個數(shù)置為0
               count = 0;
               if (itrs != null)
                   itrs.queueIsEmpty();
               //進(jìn)行通知,此時隊列里是可以裝填元素了
               for (; k > 0 && lock.hasWaiters(notFull); k--)
                   notFull.signal();
           }
       } finally {
           lock.unlock();
       }
   }
   

2.9,toString()方法

public String toString() {
   //線程安全的方法
       final ReentrantLock lock = this.lock;
       lock.lock();
       try {
           int k = count;
           //隊列里面的元素個數(shù)為0,則返回"[]"
           if (k == 0)
               return "[]";

           final Object[] items = this.items;
           //使用StringBuilder方法進(jìn)行拼接隊列的每一個元素
           StringBuilder sb = new StringBuilder();
           sb.append('[');
           for (int i = takeIndex; ; ) {
               Object e = items[i];
               sb.append(e == this ? "(this Collection)" : e);
               if (--k == 0)
                   return sb.append(']').toString();
               sb.append(',').append(' ');
               if (++i == items.length)
                   i = 0;
           }
       } finally {
           lock.unlock();
       }
   }
   

2.10,remainingCapacity()方法

public int remainingCapacity() {
   //獲取lock實(shí)例對象
       final ReentrantLock lock = this.lock;
       lock.lock();
       try {
    //剩余空間等于數(shù)組空間大小減去元素就是剩余空間的大小
           return items.length - count;
       } finally {
           lock.unlock();
       }
   }
   

三,總結(jié)一下

 

3.1,線程間通信

基于Condition的await()和singal()方法來實(shí)現(xiàn)

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author pc
*/
public class ConditionTest {

   public Lock lock = new ReentrantLock();
   public Condition condition = lock.newCondition();

   public static void main(String[] args) {
       ConditionTest test = new ConditionTest();
       ExecutorService executorService = Executors.newFixedThreadPool(2);
       executorService.execute(() -> test.conditionWait());
       executorService.execute(() -> test.conditionSignal());
   }

   public void conditionWait() {
       lock.lock();
       try {
           System.out.println(Thread.currentThread().getName() + "拿到鎖了");
           System.out.println(Thread.currentThread().getName() + "等待信號");
           condition.await();
           System.out.println(Thread.currentThread().getName() + "拿到信號");
       } catch (Exception e) {

       } finally {
           lock.unlock();
       }
   }

   public void conditionSignal() {
       lock.lock();
       try {
           Thread.sleep(3000);
           System.out.println(Thread.currentThread().getName() + "拿到鎖了");
           condition.signal();
           System.out.println(Thread.currentThread().getName() + "發(fā)出信號");
       } catch (Exception e) {

       } finally {
           lock.unlock();
       }
   }

}

“如何理解線程安全的隊列ArrayBlockingQueue源碼”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI