您好,登錄后才能下訂單哦!
[TOC]
SynchronousQueue 是一個很奇怪的隊列,感覺都不能叫隊列,因為內(nèi)部沒有數(shù)據(jù)的存儲空間,隊列不能peek,因為不存在元素,任何入隊的線程都會阻塞,直到有線程來出隊,也就是這個隊列是一組操作,入隊和出隊要一起離開,出隊也是一樣,必須等入隊,必須結(jié)伴而行;隊列支持公平和非公平的模式(指的是隊列匹配線程的順序),公平模式的數(shù)據(jù)結(jié)構(gòu)是隊列(FIFO),非公平模式使用的是棧(LIFO)。
abstract static class Transferer<E> {
// 出隊入隊都是這一個方法
abstract E transfer(E e, boolean timed, long nanos);
}
// npu數(shù)
static final int NCPUS = Runtime.getRuntime().availableProcessors();
// 帶超時時間的自旋次數(shù)
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
// 沒有超時的自旋次數(shù)
static final int maxUntimedSpins = maxTimedSpins * 16;
TransferStack 繼承 Transferer
注意:這幾個狀態(tài)很重要,因為繼承了Transferer,所以出隊和入隊都是使用的transfer方法,狀態(tài)是用來區(qū)分的,后面方法部分會詳解
/** 0表示消費者 */
static final int REQUEST = 0;
/** 1表示數(shù)據(jù)的生產(chǎn)者 */
static final int DATA = 1;
/** 2 表示數(shù)據(jù)正在匹配 */
static final int FULFILLING = 2;
static final class SNode {
volatile SNode next; // 下一個節(jié)點
volatile SNode match; // 匹配的節(jié)點
volatile Thread waiter; // 等待的線程
Object item; // 數(shù)據(jù)
int mode; // 模式 0 , 1 , 2
}
/** 頭結(jié)點 */
volatile SNode head;
TransferQueue 繼承 Transferer
static final class QNode {
volatile QNode next; // next 節(jié)點
volatile Object item; // 數(shù)據(jù)項
volatile Thread waiter; // 等待線程
final boolean isData; // 區(qū)分生產(chǎn)和消費
}
/** 頭結(jié)點 */
transient volatile QNode head;
/** 尾節(jié)點 */
transient volatile QNode tail;
public SynchronousQueue() {
this(false);
}
// 構(gòu)造方法,fair表示公平或者非公平
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
public boolean offer(E e) {
// e 不能為null
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// e 不能為null
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
public void put(E e) throws InterruptedException {
// e 不能為null
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
// 支持中斷
Thread.interrupted();
throw new InterruptedException();
}
public E poll() {
return transferer.transfer(null, true, 0);
}
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
// 根據(jù)所傳元素判斷為生產(chǎn)or消費
int mode = (e == null) ? REQUEST : DATA;
for (;;) { // 無限循環(huán)
SNode h = head; // 獲取頭結(jié)點
if (h == null || h.mode == mode) { // 頭結(jié)點為空或者當前節(jié)點狀態(tài)(0,1,2)和頭結(jié)點相同
if (timed && nanos <= 0) { // can't wait 設(shè)置有時間
// 節(jié)點不為null并且為取消狀態(tài)
if (h != null && h.isCancelled())
// 彈出取消的節(jié)點
casHead(h, h.next); // pop cancelled node
else
// 超時直接返回null
return null;
// 沒有設(shè)置超時
} else if (casHead(h, s = snode(s, e, h, mode))) { // 將h設(shè)為自己的next節(jié)點
// 空旋或者阻塞直到s結(jié)點被FulFill操作所匹配
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled 節(jié)點被取消了
clean(s);
return null;
}
// 找到匹配的線程了
// h == head 可能已經(jīng)已經(jīng)被匹配
// h.next 等于s 不同類型
if ((h = head) != null && h.next == s)
// 彈出h 和 s
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
// 未匹配
} else if (!isFulfilling(h.mode)) { // try to fulfill // 嘗試匹配節(jié)點
if (h.isCancelled()) // already cancelled // 節(jié)點被取消
casHead(h, h.next); // pop and retry // 修改頭結(jié)點
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
// 沒有下一個節(jié)點了,結(jié)束這次循環(huán),走最外層循環(huán)重新開始
if (m == null) { // all waiters are gone // m等于null
casHead(s, null); // pop fulfill node // cas 設(shè)置head
s = null; // use new node next time
break; // restart main loop // 結(jié)束循環(huán)
}
SNode mn = m.next;
if (m.tryMatch(s)) { // 嘗試匹配,成功
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match // 失敗,說明m背的線程匹配了,或者取消了
s.casNext(m, mn); // help unlink // 修改next節(jié)點
}
}
} else { // help a fulfiller 正在匹配
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone 匹配完成了
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 計算時間
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 獲取當前線程
Thread w = Thread.currentThread();
// shouldSpin控制自旋
// shouldSpin 用于檢測當前節(jié)點是否需要自旋
// 如果棧為空、該節(jié)點是首節(jié)點或者該節(jié)點是匹配節(jié)點,則先采用自旋,否則阻塞
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) { // 死循環(huán)
if (w.isInterrupted()) // 線程被中斷
s.tryCancel();
SNode m = s.match;
if (m != null) // 存在匹配節(jié)點 ,返回
return m;
if (timed) { // 存在超時設(shè)置
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
// 自旋;每次自旋的時候都需要檢查自身是否滿足自旋條件,滿足就 - 1,否則為0
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
// 設(shè)置node的線程
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
// 如果不是超時,就阻塞
else if (!timed)
LockSupport.park(this);
// 設(shè)置超時阻塞
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
void clean(SNode s) {
s.item = null; // forget item
s.waiter = null; // forget thread
// next節(jié)點
SNode past = s.next;
// next節(jié)點也被中斷了,直接刪除
if (past != null && past.isCancelled())
past = past.next;
// Absorb cancelled nodes at head
// 從棧頂開始找,清除取消的節(jié)點
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);
// Unsplice embedded nodes
// 從有效的頭節(jié)點開始p ,到s的后繼節(jié)點,繼續(xù)清除
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
分析transfer 方法:
我們可以發(fā)現(xiàn)transfer 是通過e是空來判斷是offer方法還是poll方法的,也就是入隊者和出對者的區(qū)分。
第一種情況,如果隊列為空head,或者隊列存在的head節(jié)點和自己的模式相同,首先判斷有沒有超時或者取消,有就執(zhí)行這些操作,沒有就執(zhí)行入隊操作,然后把新加入的節(jié)點加入棧頂,然后調(diào)用awaitFulfill方法阻塞線程,直到被中斷,超時或者匹配成功,為什么要阻塞了因為大家的模式都相同沒法匹配,所以只能阻塞線程,直到一個不同模式的線程匹配成功,喚醒自己,這也是awaitFulfill方法的結(jié)束流程。
第二種情況,如果入隊的模式不同,通過isFulfilling方法判斷head節(jié)點有沒有在匹配,沒有就執(zhí)行匹配流程,
首先判斷節(jié)點是否被取消了,沒有在判斷自己是不是唯一的一個節(jié)點,如果是循環(huán),重新開始流程,如果不是上面的這些情況,就可以開始匹配節(jié)點了,調(diào)用tryMatch方法,成功喚醒另一個節(jié)點,然后一起出棧,返回結(jié)果,失敗就向后推進,找下一個節(jié)點,這里可能別的線程會競爭的匹配。
入隊和出隊的方法是一樣的,我們主要看下transfer 方法吧。
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
// 判斷是生產(chǎn)者 還是消費者
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
// 沒有初始化
if (t == null || h == null) // saw uninitialized value
continue; // spin
// h==t 剛剛初始化 t.isData == isData,尾節(jié)點和當前節(jié)點的類型一樣
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next; // 獲取尾節(jié)點
if (t != tail) // 尾節(jié)點發(fā)生變了變化 // inconsistent read
continue;
if (tn != null) { // 重設(shè)為節(jié)點 // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // 超時了 // can't wait
return null;
if (s == null) // 構(gòu)建新節(jié)點
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in 有競爭
continue;
// 替換尾節(jié)點
advanceTail(t, s); // swing tail and wait
// 自旋/阻塞 返回的是中斷取消/匹配的節(jié)點
Object x = awaitFulfill(s, e, timed, nanos);
// 中斷
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
// 匹配成功了,需要執(zhí)行出隊操作
if (!s.isOffList()) {
// not already unlinked
// 修改頭結(jié)點
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // complementary-mode
// 出隊從頭元素開始
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head) // 隊列發(fā)生變化重來
continue; // inconsistent read
Object x = m.item;
// isData == (x != null) 判斷模式是否相同,不相同才能匹配
// x == m 說明已經(jīng)被中斷或者超時了
// m.casItem(x, e) 匹配
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) {// lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
// 匹配成功
// 替換頭
advanceHead(h, m); // successfully fulfilled
// 喚醒等待線程
LockSupport.unpark(m.waiter); // 喚醒線程
return (x != null) ? (E)x : e;
}
}
}
分析transfer方法:
SynchronousQueue 的實現(xiàn)還是很復(fù)雜的,我們可以發(fā)現(xiàn)雖然是個阻塞隊列,可是沒有使用鎖;這個隊列適合傳遞的場景,隊列沒有存儲元素的隊列,出隊和入隊必須結(jié)伴而行。
參考 《Java 并發(fā)編程的藝術(shù)》
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。