您好,登錄后才能下訂單哦!
阻塞隊列 (BlockingQueue)是Java util.concurrent包下重要的數(shù)據(jù)結(jié)構(gòu),BlockingQueue提供了線程安全的隊列訪問方式:當(dāng)阻塞隊列進(jìn)行插入數(shù)據(jù)時,如果隊列已滿,線程將會阻塞等待直到隊列非滿;從阻塞隊列取數(shù)據(jù)時,如果隊列已空,線程將會阻塞等待直到隊列非空。并發(fā)包下很多高級同步類的實現(xiàn)都是基于BlockingQueue實現(xiàn)的。
BlockingQueue 的操作方法
BlockingQueue 具有 4 組不同的方法用于插入、移除以及對隊列中的元素進(jìn)行檢查。如果請求的操作不能得到立即執(zhí)行的話,每個方法的表現(xiàn)也不同。這些方法如下:
四組不同的行為方式解釋:
無法向一個 BlockingQueue 中插入 null。如果你試圖插入 null,BlockingQueue 將會拋出一個 NullPointerException。
可以訪問到 BlockingQueue 中的所有元素,而不僅僅是開始和結(jié)束的元素。比如說,你將一個對象放入隊列之中以等待處理,但你的應(yīng)用想要將其取消掉。那么你可以調(diào)用諸如 remove(o) 方法來將隊列之中的特定對象進(jìn)行移除。但是這么干效率并不高(譯者注:基于隊列的數(shù)據(jù)結(jié)構(gòu),獲取除開始或結(jié)束位置的其他對象的效率不會太高),因此你盡量不要用這一類的方法,除非你確實不得不那么做。
BlockingQueue 的實現(xiàn)類
BlockingQueue 是個接口,你需要使用它的實現(xiàn)之一來使用BlockingQueue,Java.util.concurrent包下具有以下 BlockingQueue 接口的實現(xiàn)類:
使用例子:
阻塞隊列的最長使用的例子就是生產(chǎn)者消費者模式,也是各種實現(xiàn)生產(chǎn)者消費者模式方式中首選的方式。使用者不用關(guān)心什么阻塞生產(chǎn),什么時候阻塞消費,使用非常方便,代碼如下:
package MyThread; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class BlockingQueueTest { //生產(chǎn)者 public static class Producer implements Runnable{ private final BlockingQueue<Integer> blockingQueue; private volatile boolean flag; private Random random; public Producer(BlockingQueue<Integer> blockingQueue) { this.blockingQueue = blockingQueue; flag=false; random=new Random(); } public void run() { while(!flag){ int info=random.nextInt(100); try { blockingQueue.put(info); System.out.println(Thread.currentThread().getName()+" produce "+info); Thread.sleep(50); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public void shutDown(){ flag=true; } } //消費者 public static class Consumer implements Runnable{ private final BlockingQueue<Integer> blockingQueue; private volatile boolean flag; public Consumer(BlockingQueue<Integer> blockingQueue) { this.blockingQueue = blockingQueue; } public void run() { while(!flag){ int info; try { info = blockingQueue.take(); System.out.println(Thread.currentThread().getName()+" consumer "+info); Thread.sleep(50); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public void shutDown(){ flag=true; } } public static void main(String[] args){ BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>(10); Producer producer=new Producer(blockingQueue); Consumer consumer=new Consumer(blockingQueue); //創(chuàng)建5個生產(chǎn)者,5個消費者 for(int i=0;i<10;i++){ if(i<5){ new Thread(producer,"producer"+i).start(); }else{ new Thread(consumer,"consumer"+(i-5)).start(); } } try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } producer.shutDown(); consumer.shutDown(); } }
阻塞隊列原理:
其實阻塞隊列實現(xiàn)阻塞同步的方式很簡單,使用的就是是lock鎖的多條件(condition)阻塞控制。使用BlockingQueue封裝了根據(jù)條件阻塞線程的過程,而我們就不用關(guān)心繁瑣的await/signal操作了。
下面是Jdk 1.7中ArrayBlockingQueue部分代碼:
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); //創(chuàng)建數(shù)組 this.items = new Object[capacity]; //創(chuàng)建鎖和阻塞條件 lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } //添加元素的方法 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); //如果隊列不滿就入隊 enqueue(e); } finally { lock.unlock(); } } //入隊的方法 private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } //移除元素的方法 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } //出隊的方法 private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x;
雙端阻塞隊列(BlockingDeque)
concurrent包下還提供雙端阻塞隊列(BlockingDeque),和BlockingQueue是類似的,只不過BlockingDeque提供從任意一端插入或者抽取元素的隊列。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持億速云。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。