您好,登錄后才能下訂單哦!
ArrayBlockingQueue介紹
ArrayBlockingQueue是數(shù)組實現(xiàn)的線程安全的有界的阻塞隊列。
線程安全是指,ArrayBlockingQueue內(nèi)部通過“互斥鎖”保護競爭資源,實現(xiàn)了多線程對競爭資源的互斥訪問。而有界,則是指ArrayBlockingQueue對應的數(shù)組是有界限的。 阻塞隊列,是指多線程訪問競爭資源時,當競爭資源已被某線程獲取時,其它要獲取該資源的線程需要阻塞等待;而且,ArrayBlockingQueue是按 FIFO(先進先出)原則對元素進行排序,元素都是從尾部插入到隊列,從頭部開始返回。
注意:ArrayBlockingQueue不同于ConcurrentLinkedQueue,ArrayBlockingQueue是數(shù)組實現(xiàn)的,并且是有界限的;而ConcurrentLinkedQueue是鏈表實現(xiàn)的,是無界限的。
ArrayBlockingQueue原理和數(shù)據(jù)結(jié)構(gòu)
ArrayBlockingQueue的數(shù)據(jù)結(jié)構(gòu),如下圖所示:
說明:
1. ArrayBlockingQueue繼承于AbstractQueue,并且它實現(xiàn)了BlockingQueue接口。
2. ArrayBlockingQueue內(nèi)部是通過Object[]數(shù)組保存數(shù)據(jù)的,也就是說ArrayBlockingQueue本質(zhì)上是通過數(shù)組實現(xiàn)的。ArrayBlockingQueue的大小,即數(shù)組的容量是創(chuàng)建ArrayBlockingQueue時指定的。
3. ArrayBlockingQueue與ReentrantLock是組合關系,ArrayBlockingQueue中包含一個ReentrantLock對象(lock)。
ReentrantLock是可重入的互斥鎖,ArrayBlockingQueue就是根據(jù)該互斥鎖實現(xiàn)“多線程對競爭資源的互斥訪問”。而且,ReentrantLock分為公平鎖和非公平鎖,關于具體使用公平鎖還是非公平鎖,在創(chuàng)建ArrayBlockingQueue時可以指定;而且,ArrayBlockingQueue默認會使用非公平鎖。
4. ArrayBlockingQueue與Condition是組合關系,ArrayBlockingQueue中包含兩個Condition對象(notEmpty和notFull)。而且,Condition又依賴于ArrayBlockingQueue而存在,通過Condition可以實現(xiàn)對ArrayBlockingQueue的更精確的訪問 -- (01)若某線程(線程A)要取數(shù)據(jù)時,數(shù)組正好為空,則該線程會執(zhí)行notEmpty.await()進行等待;當其它某個線程(線程B)向數(shù)組中插入了數(shù)據(jù)之后,會調(diào)用notEmpty.signal()喚醒“notEmpty上的等待線程”。此時,線程A會被喚醒從而得以繼續(xù)運行。(02)若某線程(線程H)要插入數(shù)據(jù)時,數(shù)組已滿,則該線程會它執(zhí)行notFull.await()進行等待;當其它某個線程(線程I)取出數(shù)據(jù)之后,會調(diào)用notFull.signal()喚醒“notFull上的等待線程”。此時,線程H就會被喚醒從而得以繼續(xù)運行。
ArrayBlockingQueue函數(shù)列表
// 創(chuàng)建一個帶有給定的(固定)容量和默認訪問策略的 ArrayBlockingQueue。 ArrayBlockingQueue(int capacity) // 創(chuàng)建一個具有給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue。 ArrayBlockingQueue(int capacity, boolean fair) // 創(chuàng)建一個具有給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue,它最初包含給定 collection 的元素,并以 collection 迭代器的遍歷順序添加元素。 ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) // 將指定的元素插入到此隊列的尾部(如果立即可行且不會超過該隊列的容量),在成功時返回 true,如果此隊列已滿,則拋出 IllegalStateException。 boolean add(E e) // 自動移除此隊列中的所有元素。 void clear() // 如果此隊列包含指定的元素,則返回 true。 boolean contains(Object o) // 移除此隊列中所有可用的元素,并將它們添加到給定 collection 中。 int drainTo(Collection<? super E> c) // 最多從此隊列中移除給定數(shù)量的可用元素,并將這些元素添加到給定 collection 中。 int drainTo(Collection<? super E> c, int maxElements) // 返回在此隊列中的元素上按適當順序進行迭代的迭代器。 Iterator<E> iterator() // 將指定的元素插入到此隊列的尾部(如果立即可行且不會超過該隊列的容量),在成功時返回 true,如果此隊列已滿,則返回 false。 boolean offer(E e) // 將指定的元素插入此隊列的尾部,如果該隊列已滿,則在到達指定的等待時間之前等待可用的空間。 boolean offer(E e, long timeout, TimeUnit unit) // 獲取但不移除此隊列的頭;如果此隊列為空,則返回 null。 E peek() // 獲取并移除此隊列的頭,如果此隊列為空,則返回 null。 E poll() // 獲取并移除此隊列的頭部,在指定的等待時間前等待可用的元素(如果有必要)。 E poll(long timeout, TimeUnit unit) // 將指定的元素插入此隊列的尾部,如果該隊列已滿,則等待可用的空間。 void put(E e) // 返回在無阻塞的理想情況下(不存在內(nèi)存或資源約束)此隊列能接受的其他元素數(shù)量。 int remainingCapacity() // 從此隊列中移除指定元素的單個實例(如果存在)。 boolean remove(Object o) // 返回此隊列中元素的數(shù)量。 int size() // 獲取并移除此隊列的頭部,在元素變得可用之前一直等待(如果有必要)。 E take() // 返回一個按適當順序包含此隊列中所有元素的數(shù)組。 Object[] toArray() // 返回一個按適當順序包含此隊列中所有元素的數(shù)組;返回數(shù)組的運行時類型是指定數(shù)組的運行時類型。 <T> T[] toArray(T[] a) // 返回此 collection 的字符串表示形式。 String toString()
下面從ArrayBlockingQueue的創(chuàng)建,添加,取出,遍歷這幾個方面對ArrayBlockingQueue進行分析。
1. 創(chuàng)建
下面以ArrayBlockingQueue(int capacity, boolean fair)來進行說明。
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
說明:
(01) items是保存“阻塞隊列”數(shù)據(jù)的數(shù)組。它的定義如下:
final Object[] items;
(02) fair是“可重入的獨占鎖(ReentrantLock)”的類型。fair為true,表示是公平鎖;fair為false,表示是非公平鎖。
notEmpty和notFull是鎖的兩個Condition條件。它們的定義如下:
final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull;
Lock的作用是提供獨占鎖機制,來保護競爭資源;而Condition是為了更加精細的對鎖進行控制,它依賴于Lock,通過某個條件對多線程進行控制。
notEmpty表示“鎖的非空條件”。當某線程想從隊列中取數(shù)據(jù)時,而此時又沒有數(shù)據(jù),則該線程通過notEmpty.await()進行等待;當其它線程向隊列中插入了元素之后,就調(diào)用notEmpty.signal()喚醒“之前通過notEmpty.await()進入等待狀態(tài)的線程”。
同理,notFull表示“鎖的滿條件”。當某線程想向隊列中插入元素,而此時隊列已滿時,該線程等待;當其它線程從隊列中取出元素之后,就喚醒該等待的線程。
2. 添加
下面以offer(E e)為例,對ArrayBlockingQueue的添加方法進行說明。
public boolean offer(E e) { // 創(chuàng)建插入的元素是否為null,是的話拋出NullPointerException異常 checkNotNull(e); // 獲取“該阻塞隊列的獨占鎖” final ReentrantLock lock = this.lock; lock.lock(); try { // 如果隊列已滿,則返回false。 if (count == items.length) return false; else { // 如果隊列未滿,則插入e,并返回true。 insert(e); return true; } } finally { // 釋放鎖 lock.unlock(); } }
說明:offer(E e)的作用是將e插入阻塞隊列的尾部。如果隊列已滿,則返回false,表示插入失??;否則,插入元素,并返回true。
(01) count表示”隊列中的元素個數(shù)“。除此之外,隊列中還有另外兩個遍歷takeIndex和putIndex。takeIndex表示下一個被取出元素的索引,putIndex表示下一個被添加元素的索引。它們的定義如下:
// 隊列中的元素個數(shù) int takeIndex; // 下一個被取出元素的索引 int putIndex; // 下一個被添加元素的索引 int count;
(02) insert()的源碼如下:
private void insert(E x) { // 將x添加到”隊列“中 items[putIndex] = x; // 設置”下一個被取出元素的索引“ putIndex = inc(putIndex); // 將”隊列中的元素個數(shù)”+1 ++count; // 喚醒notEmpty上的等待線程 notEmpty.signal(); }
insert()在插入元素之后,會喚醒notEmpty上面的等待線程。
inc()的源碼如下:
final int inc(int i) { return (++i == items.length) ? 0 : i; }
若i+1的值等于“隊列的長度”,即添加元素之后,隊列滿;則設置“下一個被添加元素的索引”為0。
3. 取出
下面以take()為例,對ArrayBlockingQueue的取出方法進行說明。
public E take() throws InterruptedException { // 獲取“隊列的獨占鎖” final ReentrantLock lock = this.lock; // 獲取“鎖”,若當前線程是中斷狀態(tài),則拋出InterruptedException異常 lock.lockInterruptibly(); try { // 若“隊列為空”,則一直等待。 while (count == 0) notEmpty.await(); // 取出元素 return extract(); } finally { // 釋放“鎖” lock.unlock(); } }
說明:take()的作用是取出并返回隊列的頭。若隊列為空,則一直等待。
extract()的源碼如下:
private E extract() { final Object[] items = this.items; // 強制將元素轉(zhuǎn)換為“泛型E” E x = this.<E>cast(items[takeIndex]); // 將第takeIndex元素設為null,即刪除。同時,幫助GC回收。 items[takeIndex] = null; // 設置“下一個被取出元素的索引” takeIndex = inc(takeIndex); // 將“隊列中元素數(shù)量”-1 --count; // 喚醒notFull上的等待線程。 notFull.signal(); return x; }
說明:extract()在刪除元素之后,會喚醒notFull上的等待線程。
4. 遍歷
下面對ArrayBlockingQueue的遍歷方法進行說明。
public Iterator<E> iterator() { return new Itr(); }
Itr是實現(xiàn)了Iterator接口的類,它的源碼如下:
private class Itr implements Iterator<E> { // 隊列中剩余元素的個數(shù) private int remaining; // Number of elements yet to be returned // 下一次調(diào)用next()返回的元素的索引 private int nextIndex; // Index of element to be returned by next // 下一次調(diào)用next()返回的元素 private E nextItem; // Element to be returned by next call to next // 上一次調(diào)用next()返回的元素 private E lastItem; // Element returned by last call to next // 上一次調(diào)用next()返回的元素的索引 private int lastRet; // Index of last element returned, or -1 if none Itr() { // 獲取“阻塞隊列”的鎖 final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { lastRet = -1; if ((remaining = count) > 0) nextItem = itemAt(nextIndex = takeIndex); } finally { // 釋放“鎖” lock.unlock(); } } public boolean hasNext() { return remaining > 0; } public E next() { // 獲取“阻塞隊列”的鎖 final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { // 若“剩余元素<=0”,則拋出異常。 if (remaining <= 0) throw new NoSuchElementException(); lastRet = nextIndex; // 獲取第nextIndex位置的元素 E x = itemAt(nextIndex); // check for fresher value if (x == null) { x = nextItem; // we are forced to report old value lastItem = null; // but ensure remove fails } else lastItem = x; while (--remaining > 0 && // skip over nulls (nextItem = itemAt(nextIndex = inc(nextIndex))) == null) ; return x; } finally { lock.unlock(); } } public void remove() { final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { int i = lastRet; if (i == -1) throw new IllegalStateException(); lastRet = -1; E x = lastItem; lastItem = null; // only remove if item still at index if (x != null && x == items[i]) { boolean removingHead = (i == takeIndex); removeAt(i); if (!removingHead) nextIndex = dec(nextIndex); } } finally { lock.unlock(); } } }
ArrayBlockingQueue示例
import java.util.*; import java.util.concurrent.*; /* * ArrayBlockingQueue是“線程安全”的隊列,而LinkedList是非線程安全的。 * * 下面是“多個線程同時操作并且遍歷queue”的示例 * (01) 當queue是ArrayBlockingQueue對象時,程序能正常運行。 * (02) 當queue是LinkedList對象時,程序會產(chǎn)生ConcurrentModificationException異常。 * * */ public class ArrayBlockingQueueDemo1{ // TODO: queue是LinkedList對象時,程序會出錯。 //private static Queue<String> queue = new LinkedList<String>(); private static Queue<String> queue = new ArrayBlockingQueue<String>(20); public static void main(String[] args) { // 同時啟動兩個線程對queue進行操作! new MyThread("ta").start(); new MyThread("tb").start(); } private static void printAll() { String value; Iterator iter = queue.iterator(); while(iter.hasNext()) { value = (String)iter.next(); System.out.print(value+", "); } System.out.println(); } private static class MyThread extends Thread { MyThread(String name) { super(name); } @Override public void run() { int i = 0; while (i++ < 6) { // “線程名” + "-" + "序號" String val = Thread.currentThread().getName()+i; queue.add(val); // 通過“Iterator”遍歷queue。 printAll(); } } } }
(某一次)運行結(jié)果:
ta1, ta1, tb1, ta1, tb1, ta1, ta2, tb1, ta1, ta2, tb1, tb2, ta2, ta1, tb2, tb1, ta3, ta2, ta1, tb2, tb1, ta3, ta2, tb3, tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, tb3, ta1, ta4, tb1, tb4, ta2, ta5, tb2, tb5, ta3, ta6, tb3, ta4, tb4, ta5, tb5, ta6, tb6,
結(jié)果說明:如果將源碼中的queue改成LinkedList對象時,程序會產(chǎn)生ConcurrentModificationException異常。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。