您好,登錄后才能下訂單哦!
[TOC]
ArrayBlockingQueue 是一個用數(shù)組實(shí)現(xiàn)的有界隊(duì)列;此隊(duì)列按照先進(jìn)先出(FIFO)的規(guī)則對元素進(jìn)行排序;默認(rèn)情況下不保證線程公平的訪問隊(duì)列,所謂公平訪問隊(duì)列是指阻塞的線程,可以按照阻塞的先后順序的訪問隊(duì)列,即先阻塞的線程先訪問隊(duì)列;非公平性是對先等待的線程是非公平的,當(dāng)隊(duì)列可用時,阻塞的線程都可以爭奪訪問隊(duì)列的資格,有可能先阻塞的線程最后才訪問;為了保證公平性,通常會降低吞吐量。
/** The queued items */
// 記錄數(shù)據(jù)的數(shù)組
final Object[] items;
/** items index for next take, poll, peek or remove */
// 索引用于 take,poll,peek,remove 等方法
int takeIndex;
/** items index for next put, offer, or add */
// 索引用于 put,offer,or add 等方法
int putIndex;
/** Number of elements in the queue */
// 總數(shù)
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
// 隊(duì)列的鎖
final ReentrantLock lock;
/** Condition for waiting takes */
// 用于讓線程等待,消費(fèi)時隊(duì)列為空
private final Condition notEmpty;
/** Condition for waiting puts */
// 用于讓線程等待,生產(chǎn)時隊(duì)列滿
private final Condition notFull;
我們看下兩個構(gòu)造,其實(shí)也就是一個,注意沒有無參構(gòu)造,初始化時必須要給出容量。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
// 初始化一個ArrayBlockingQueue
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
// 初始化一個數(shù)組
this.items = new Object[capacity];
// 初始化一個鎖
lock = new ReentrantLock(fair);
// 用來存放消費(fèi)者的阻塞線程
notEmpty = lock.newCondition();
// 用來存放生產(chǎn)者的線程
notFull = lock.newCondition();
}
可以看出add調(diào)用的是offer方法,詳情請看offer方法。
public boolean add(E e) {
// 調(diào)用父類的方法
return super.add(e);
}
// 父類 AbstractQueue 的add方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
注意:add 插入失敗會拋異常。
// offer加入元素
public boolean offer(E e) {
// 不能為null
checkNotNull(e);
// 獲取鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果數(shù)組滿了,返回false
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
// enqueue
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
// 獲取數(shù)組
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 喚醒消費(fèi)阻塞的隊(duì)列
notEmpty.signal();
}
注意:offer還有一個重載方法,帶有超時時間的插入,支持中斷offer(E e, long timeout, TimeUnit unit)。
public void put(E e) throws InterruptedException {
// 不能為null
checkNotNull(e);
// 獲取鎖
final ReentrantLock lock = this.lock;
// 支持中斷
lock.lockInterruptibly();
try {
// 等于數(shù)組的容量
while (count == items.length)
// 等待
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
注意:put和前面的offer要區(qū)別,offer方法隊(duì)列滿是返回false,put方法是讓線程等待,根據(jù)自己的場景用合適的方法。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
注意:poll也有一個重載方法,帶有超時和中斷poll(long timeout, TimeUnit unit)。
// 消費(fèi)
public E take() throws InterruptedException {
// 獲取鎖
final ReentrantLock lock = this.lock;
// 支持中斷
lock.lockInterruptibly();
try {
// 隊(duì)列為空
while (count == 0)
// 阻塞
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
注意:take和poll也是一對方法,poll隊(duì)列為空返回null,take是讓線程等待,直到喚醒。
// 獲取隊(duì)尾的元素 不刪除
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
// 統(tǒng)計(jì)個數(shù) size是準(zhǔn)確值
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
ArrayBlockingQueue 是有界的,所以我們在初始化是容量要設(shè)計(jì)好,因?yàn)樗遣豢梢詳U(kuò)容的,還有我覺得這個隊(duì)列適合一些穩(wěn)定并發(fā)量的系統(tǒng),如果并發(fā)量突然變大,導(dǎo)致隊(duì)列滿,會造成大量的線程等待,影響系統(tǒng)的響應(yīng);我們通過閱讀源碼也發(fā)現(xiàn)隊(duì)列的源碼是很輕量的,使用起來也很簡單,讓人很好理解;使用這個隊(duì)列一定要注意put,offer,take,poll這兩組方法,根據(jù)自己的業(yè)務(wù)場景選擇是直接返回(響應(yīng)速度快)還是阻塞線程。
參考:《Java 并發(fā)編程的藝術(shù)》
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。