溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

常用BlockingQueue有哪些

發(fā)布時(shí)間:2021-11-22 17:12:38 來源:億速云 閱讀:128 作者:柒染 欄目:大數(shù)據(jù)

常用BlockingQueue有哪些,相信很多沒有經(jīng)驗(yàn)的人對(duì)此束手無(wú)策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。

為什么要使用阻塞隊(duì)列

之前,介紹了一下 ThreadPoolExecutor 的各參數(shù)的含義(并發(fā)編程之線程池ThreadPoolExecutor),其中有一個(gè) BlockingQueue,它是一個(gè)阻塞隊(duì)列。那么,小伙伴們有沒有想過,為什么此處的線程池要用阻塞隊(duì)列呢?

我們知道隊(duì)列是先進(jìn)先出的。當(dāng)放入一個(gè)元素的時(shí)候,會(huì)放在隊(duì)列的末尾,取出元素的時(shí)候,會(huì)從隊(duì)頭取。那么,當(dāng)隊(duì)列為空或者隊(duì)列滿的時(shí)候怎么辦呢。

這時(shí),阻塞隊(duì)列,會(huì)自動(dòng)幫我們處理這種情況。

當(dāng)阻塞隊(duì)列為空的時(shí)候,從隊(duì)列中取元素的操作就會(huì)被阻塞。當(dāng)阻塞隊(duì)列滿的時(shí)候,往隊(duì)列中放入元素的操作就會(huì)被阻塞。

而后,一旦空隊(duì)列有數(shù)據(jù)了,或者滿隊(duì)列有空余位置時(shí),被阻塞的線程就會(huì)被自動(dòng)喚醒。

這就是阻塞隊(duì)列的好處,你不需要關(guān)心線程何時(shí)被阻塞,也不需要關(guān)心線程何時(shí)被喚醒,一切都由阻塞隊(duì)列自動(dòng)幫我們完成。我們只需要關(guān)注具體的業(yè)務(wù)邏輯就可以了。

而這種阻塞隊(duì)列經(jīng)常用在生產(chǎn)者消費(fèi)者模式中。(可參看:面試官讓我手寫一個(gè)生產(chǎn)者消費(fèi)者模式)

 

常用的阻塞隊(duì)列

那么,一般我們用到的阻塞隊(duì)列有哪些呢。下面,通過idea的類圖,列出來常用的阻塞隊(duì)列,然后一個(gè)一個(gè)講解(不懂怎么用的,可以參考這篇文章:怎么用IDEA快速查看類圖關(guān)系)。

常用BlockingQueue有哪些  

阻塞隊(duì)列中,所有常用的方法都在 BlockingQueue 接口中定義。如

插入元素的方法:put,offer,add。移除元素的方法:remove,poll,take。

它們有四種不同的處理方式,第一種是在失敗時(shí)拋出異常,第二種是在失敗時(shí)返回特殊值,第三種是一直阻塞當(dāng)前線程,最后一種是在指定時(shí)間內(nèi)阻塞,否則返回特殊值。(以上特殊值,是指在插入元素時(shí),失敗返回false,在取出元素時(shí),失敗返回null)


拋異常特殊值阻塞超時(shí)
插入add(e)offer(e)put(e)offer(e,time,unit)
移除remove()poll()take()poll(time,unit)

 

1)ArrayBlockingQueue

這是一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。首先看下它的構(gòu)造方法,有三個(gè)。

常用BlockingQueue有哪些  

第一個(gè)可以指定隊(duì)列的大小,第二個(gè)還可以指定隊(duì)列是否公平,不指定的話,默認(rèn)是非公平。它是使用 ReentrantLock 的公平鎖和非公平鎖實(shí)現(xiàn)的(后續(xù)講解AQS時(shí),會(huì)詳細(xì)說明)。

簡(jiǎn)單理解就是,ReentrantLock 內(nèi)部會(huì)維護(hù)一個(gè)有先后順序的等待隊(duì)列,假如有五個(gè)任務(wù)一起過來,都被阻塞了。如果是公平的,則等待隊(duì)列中等待最久的任務(wù)就會(huì)先進(jìn)入阻塞隊(duì)列。如果是非公平的,那么這五個(gè)線程就需要搶鎖,誰(shuí)先搶到,誰(shuí)就先進(jìn)入阻塞隊(duì)列。

第三個(gè)構(gòu)造方法,是把一個(gè)集合的元素初始化到阻塞隊(duì)列中。

另外,ArrayBlockingQueue 沒有實(shí)現(xiàn)讀寫分離,也就是說,讀和寫是不能同時(shí)進(jìn)行的。因?yàn)椋x寫時(shí)用的是同一把鎖,如下圖所示:

常用BlockingQueue有哪些  
file

2) LinkedBlockingQueue

這是一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列。它的構(gòu)造方法有三個(gè)。

常用BlockingQueue有哪些  

可以看到和 ArrayBlockingQueue 的構(gòu)造方法大同小異,不過是,LinkedBlockingQueue 可以不指定隊(duì)列的大小,默認(rèn)值是 Integer.MAX_VALUE 。

但是,最好不要這樣做,建議指定一個(gè)固定大小。因?yàn)椋绻a(chǎn)者的速度比消費(fèi)者的速度大的多的情況下,這會(huì)導(dǎo)致阻塞隊(duì)列一直膨脹,直到系統(tǒng)內(nèi)存被耗盡(此時(shí),還沒達(dá)到隊(duì)列容量的最大值)。

此外,LinkedBlockingQueue 實(shí)現(xiàn)了讀寫分離,可以實(shí)現(xiàn)數(shù)據(jù)的讀和寫互不影響,這在高并發(fā)的場(chǎng)景下,對(duì)于效率的提高無(wú)疑是非常巨大的。

常用BlockingQueue有哪些  

3) SynchronousQueue

這是一個(gè)沒有緩沖的無(wú)界隊(duì)列。什么意思,看一下它的 size 方法:

常用BlockingQueue有哪些  

總是返回 0 ,因?yàn)樗且粋€(gè)沒有容量的隊(duì)列。

當(dāng)執(zhí)行插入元素的操作時(shí),必須等待一個(gè)取出操作。也就是說,put元素的時(shí)候,必須等待 take 操作。

那么,有的同學(xué)就好奇了,這沒有容量,還叫什么隊(duì)列啊,這有什么意義呢。

我的理解是,這適用于并發(fā)任務(wù)不大,而且生產(chǎn)者和消費(fèi)者的速度相差不多的場(chǎng)景下,直接把生產(chǎn)者和消費(fèi)者對(duì)接,不用經(jīng)過隊(duì)列的入隊(duì)出隊(duì)這一系列操作。所以,效率上會(huì)高一些。

可以去查看一下 Excutors.newCachedThreadPool 方法用的就是這種隊(duì)列。

這個(gè)隊(duì)列有兩個(gè)構(gòu)造方法,用于傳入是公平還是非公平,默認(rèn)是非公平。

常用BlockingQueue有哪些  

4)PriorityBlockingQueue

這是一個(gè)支持優(yōu)先級(jí)排序的無(wú)界隊(duì)列。有四個(gè)構(gòu)造方法:

常用BlockingQueue有哪些  

可以指定初始容量大小(注意初始容量并不代表最大容量),或者不指定,默認(rèn)大小為 11。也可以傳入一個(gè)比較器,把元素按一定的規(guī)則排序,不指定比較器的話,默認(rèn)是自然順序。

PriorityBlockingQueue 是基于二叉樹最小堆實(shí)現(xiàn)的,每當(dāng)取元素的時(shí)候,就會(huì)把優(yōu)先級(jí)最高的元素取出來。我們測(cè)試一下:

public class Person {
   private int id;
   private String name;

   public int getId() {
       return id;
   }

   public void setId(int id) {
       this.id = id;
   }

   public String getName() {
       return name;
   }

   public void setName(String name) {
       this.name = name;
   }

   @Override
   public String toString() {
       return "Person{" +
               "id=" + id +
               ", name='" + name + '\'' +
               '}';
   }

   public Person(int id, String name) {
       this.id = id;
       this.name = name;
   }

   public Person() {
   }
}

public class QueueTest {
   public static void main(String[] args) throws InterruptedException {

       PriorityBlockingQueue<Person> priorityBlockingQueue = new PriorityBlockingQueue<>(1, new Comparator<Person>() {
           @Override
           public int compare(Person o1, Person o2) {
               return o1.getId() - o2.getId();
           }
       });

       Person p2 = new Person(7, "李四");
       Person p1 = new Person(9, "張三");
       Person p3 = new Person(6, "王五");
       Person p4 = new Person(2, "趙六");
       priorityBlockingQueue.add(p1);
       priorityBlockingQueue.add(p2);
       priorityBlockingQueue.add(p3);
       priorityBlockingQueue.add(p4);

//由于二叉樹最小堆實(shí)現(xiàn),用這種方式直接打印元素,不能保證有序
       System.out.println(priorityBlockingQueue);
       System.out.println(priorityBlockingQueue.take());
       System.out.println(priorityBlockingQueue);
       System.out.println(priorityBlockingQueue.take());
       System.out.println(priorityBlockingQueue);

   }
}
 

打印結(jié)果:

[Person{id=2, name='趙六'}, Person{id=6, name='王五'}, Person{id=7, name='李四'}, Person{id=9, name='張三'}]
Person{id=2, name='趙六'}
[Person{id=6, name='王五'}, Person{id=9, name='張三'}, Person{id=7, name='李四'}]
Person{id=6, name='王五'}
[Person{id=7, name='李四'}, Person{id=9, name='張三'}]
 

可以看到,第一次取出的是 id 最小值 2, 第二次取出的是 6 。

5)DelayQueue

這是一個(gè)帶有延遲時(shí)間的無(wú)界阻塞隊(duì)列。隊(duì)列中的元素,只有等延時(shí)時(shí)間到了,才能取出來。此隊(duì)列一般用于過期數(shù)據(jù)的刪除,或任務(wù)調(diào)度。以下,模擬一下定長(zhǎng)時(shí)間的數(shù)據(jù)刪除。

首先定義數(shù)據(jù)元素,需要實(shí)現(xiàn) Delayed 接口,實(shí)現(xiàn) getDelay 方法用于計(jì)算剩余時(shí)間,和 CompareTo方法用于優(yōu)先級(jí)排序。

public class DelayData implements Delayed {

   private int id;
   private String name;
   //數(shù)據(jù)到期時(shí)間
   private long endTime;
   private TimeUnit timeUnit = TimeUnit.MILLISECONDS;

   public int getId() {
       return id;
   }

   public void setId(int id) {
       this.id = id;
   }

   public String getName() {
       return name;
   }

   public void setName(String name) {
       this.name = name;
   }

   public long getEndTime() {
       return endTime;
   }

   public void setEndTime(long endTime) {
       this.endTime = endTime;
   }

   public DelayData(int id, String name, long endTime) {
       this.id = id;
       this.name = name;
       //需要把傳入的時(shí)間endTime 加上當(dāng)前系統(tǒng)時(shí)間,作為數(shù)據(jù)的到期時(shí)間
       this.endTime = endTime + System.currentTimeMillis();
   }

   public DelayData() {
   }

   @Override
   public long getDelay(TimeUnit unit) {
       return this.endTime - System.currentTimeMillis();
   }

   @Override
   public int compareTo(Delayed o) {
       return o.getDelay(this.timeUnit) - this.getDelay(this.timeUnit) < 0 ? 1: -1;
   }

}

 

模擬三條數(shù)據(jù),分別設(shè)置不同的過期時(shí)間:

public class ProcessData {
   public static void main(String[] args) throws InterruptedException {
       DelayQueue<DelayData> delayQueue = new DelayQueue<>();

       DelayData a = new DelayData(5, "A", 5000);
       DelayData b = new DelayData(8, "B", 8000);
       DelayData c = new DelayData(2, "C", 2000);

       delayQueue.add(a);
       delayQueue.add(b);
       delayQueue.add(c);

       System.out.println("開始計(jì)時(shí)時(shí)間:" + System.currentTimeMillis());
       for (int i = 0; i < 3; i++) {
           DelayData data = delayQueue.take();
           System.out.println("id:"+data.getId()+",數(shù)據(jù):"+data.getName()+"被移除,當(dāng)前時(shí)間:"+System.currentTimeMillis());
       }
   }
}
 

最后結(jié)果:

開始計(jì)時(shí)時(shí)間:1583333583216
id:2,數(shù)據(jù):C被移除,當(dāng)前時(shí)間:1583333585216
id:5,數(shù)據(jù):A被移除,當(dāng)前時(shí)間:1583333588216
id:8,數(shù)據(jù):B被移除,當(dāng)前時(shí)間:1583333591216
 

可以看到,數(shù)據(jù)是按過期時(shí)間長(zhǎng)短,按順序移除的。C的時(shí)間最短 2 秒,然后過了 3 秒 A 也過期,再過 3 秒,B 過期。

看完上述內(nèi)容,你們掌握常用BlockingQueue有哪些的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI