您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“JUC的LinkedTransferQueue怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“JUC的LinkedTransferQueue怎么使用”吧!
LinkedTransferQueue 在 jdk 1.7 被引入,是一個(gè)基于 Dual Queue 數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)的無界線程安全隊(duì)列,其作者 Doug Lea 描述 LinkedTransferQueue 從功能上來說是 ConcurrentLinkedQueue、SynchronousQueue(公平模式),以及 LinkedBlockingQueue 的超集,并且更加實(shí)用和高效。
下面的章節(jié)我們將一起來分析 LinkedTransferQueue 的設(shè)計(jì)與實(shí)現(xiàn),不過在開始之前還是需要先對(duì)兩個(gè)名詞做一下解釋,即匹配和松弛度。
我們?cè)谏弦黄榻B SynchronousQueue 隊(duì)列時(shí)已經(jīng)解釋了 匹配 的概念,這里再重復(fù)介紹一下。LinkedTransferQueue 在內(nèi)部基于隊(duì)列實(shí)現(xiàn)線程間的交互,以“生產(chǎn)者-消費(fèi)者”為例,當(dāng)生產(chǎn)者往 LinkedTransferQueue 中插入一個(gè)元素時(shí),通常情況下該生產(chǎn)者線程在插入成功之后并不會(huì)立即返回,而是等待消費(fèi)者前來消費(fèi)。當(dāng)消費(fèi)者執(zhí)行消費(fèi)時(shí)發(fā)現(xiàn)隊(duì)列上正好有生產(chǎn)者在等待,于是執(zhí)行消費(fèi)邏輯,也稱為開始執(zhí)行匹配進(jìn)程,將當(dāng)前消費(fèi)者與生產(chǎn)者匹配成一對(duì)兒紛紛出隊(duì)列。
匹配描述的是 Dual Queue 的運(yùn)行機(jī)制,而 松弛度(slack) 則是一種優(yōu)化策略。為了避免頻繁移動(dòng)隊(duì)列的 head 和 tail 指針,作者引入了松弛度的概念,以度量 head 結(jié)點(diǎn)(或 tail 結(jié)點(diǎn))與最近一個(gè)未匹配結(jié)點(diǎn)之間的距離。當(dāng)一個(gè)結(jié)點(diǎn)被匹配(或取消,或插入)時(shí),LinkedTransferQueue 并不會(huì)立即更新相應(yīng)的 head 或 tail 指針,而是當(dāng)松弛度大于指定閾值時(shí)才觸發(fā)更新。這個(gè)閾值的取值范圍一般設(shè)置在 1 到 3 之間,如果太大會(huì)降低有效結(jié)點(diǎn)命中率,增加遍歷的長(zhǎng)度,太小則會(huì)增加 CAS 的競(jìng)爭(zhēng)和開銷。
TransferQueue 接口在 JDK 1.7 被引入,用于描述一種全新的阻塞隊(duì)列。LinkedTransferQueue 實(shí)現(xiàn)自 TransferQueue 接口,并且是目前(JDK 1.8)該接口的唯一實(shí)現(xiàn)類。TransferQueue 接口繼承自 BlockingQueue 接口,由 BlockingQueue 描述的阻塞隊(duì)列在隊(duì)列為空或者已滿時(shí),相應(yīng)的出隊(duì)列線程或入隊(duì)列線程會(huì)阻塞等待,而 TransferQueue 則更進(jìn)一步。以入隊(duì)列操作為例,當(dāng)線程成功將元素添加到由 TransferQueue 描述的阻塞隊(duì)列中后,該線程通常會(huì)一直阻塞直到某個(gè)出隊(duì)列線程從隊(duì)列中取走該入隊(duì)列線程添加的元素。
TransferQueue 在 BlockingQueue 接口的基礎(chǔ)上增加了以下方法:
public interface TransferQueue<E> extends BlockingQueue<E> { void transfer(E e) throws InterruptedException; boolean tryTransfer(E e); boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException; boolean hasWaitingConsumer(); int getWaitingConsumerCount(); }
針對(duì)各方法的含義說明如下:
transfer
:生產(chǎn)者將元素直接傳遞給正在等待的消費(fèi)者,而不執(zhí)行入隊(duì)列操作,如果沒有正在等待的消費(fèi)者則無限期等待,期間支持響應(yīng)中斷。
tryTransfer
:生產(chǎn)者將元素直接傳遞給正在等待的消費(fèi)者,而不執(zhí)行入隊(duì)列操作,如果沒有正在等待的消費(fèi)者則返回 false,提供相應(yīng)的超時(shí)版本。
hasWaitingConsumer
:檢查是否存在正在等待的消費(fèi)者。
getWaitingConsumerCount
:返回當(dāng)前正在等待的消費(fèi)者數(shù)目(近似值)。
由上述接口方法釋義我們可以了解到,TransferQueue 系的隊(duì)列支持在兩個(gè)線程之間直接交換數(shù)據(jù),而無需先將數(shù)據(jù)落地存儲(chǔ)到隊(duì)列中,如果確實(shí)需要落地,則線程可以隨數(shù)據(jù)一起在隊(duì)列上等待。
LinkedTransferQueue 針對(duì) BlockingQueue 和 TransferQueue 接口中聲明的方法,在實(shí)現(xiàn)上均委托給 LinkedTransferQueue#xfer
方法執(zhí)行,該方法也是本小節(jié)將要重點(diǎn)分析的方法。
在開始分析 LinkedTransferQueue#xfer
方法的實(shí)現(xiàn)之前,我們先介紹一下 LinkedTransferQueue 的基本字段定義。LinkedTransferQueue 基于 Dual Queue 作為底層存儲(chǔ)結(jié)構(gòu),并定義了 Node 類描述 Dual Queue 上的結(jié)點(diǎn),字段 LinkedTransferQueue#head
和 LinkedTransferQueue#tail
分別指向底層隊(duì)列的頭結(jié)點(diǎn)和尾結(jié)點(diǎn)。
Node 類的字段定義如下:
static final class Node { /** 標(biāo)識(shí)當(dāng)前結(jié)點(diǎn)是一個(gè)數(shù)據(jù)結(jié)點(diǎn),還是一個(gè)請(qǐng)求結(jié)點(diǎn) */ final boolean isData; // false if this is a request node /** * 存放數(shù)據(jù),并標(biāo)識(shí)匹配狀態(tài): * - 對(duì)于請(qǐng)求結(jié)點(diǎn)初始為 null,匹配之后指向自己 * - 對(duì)于數(shù)據(jù)結(jié)點(diǎn)初始為 data,匹配之后為 null */ volatile Object item; // initially non-null if isData; CASed to match /** 后繼指針 */ volatile Node next; /** 記錄在當(dāng)前結(jié)點(diǎn)上等待的線程對(duì)象 */ volatile Thread waiter; // null until waiting // ... 省略方法定義 }
LinkedTransferQueue 中的結(jié)點(diǎn)分為 數(shù)據(jù)結(jié)點(diǎn) 和 請(qǐng)求結(jié)點(diǎn) 兩類,可以簡(jiǎn)單將數(shù)據(jù)結(jié)點(diǎn)理解為生產(chǎn)者結(jié)點(diǎn),將請(qǐng)求結(jié)點(diǎn)理解為消費(fèi)者結(jié)點(diǎn)。Node 類通過 Node#isData
字段標(biāo)記一個(gè)結(jié)點(diǎn)是數(shù)據(jù)結(jié)點(diǎn)還是請(qǐng)求結(jié)點(diǎn),并通過 Node#item
字段承載數(shù)據(jù)和標(biāo)識(shí)對(duì)應(yīng)結(jié)點(diǎn)的匹配狀態(tài)。下表展示了數(shù)據(jù)結(jié)點(diǎn)和請(qǐng)求結(jié)點(diǎn)在匹配前后,字段 Node#item
的變化:
結(jié)點(diǎn)類型 | 數(shù)據(jù)結(jié)點(diǎn) | 請(qǐng)求結(jié)點(diǎn) |
---|---|---|
匹配前 | isData = true; item != null | isData = false; item = null |
匹配后 | isData = true; item = null | isData = false; item = this |
注意:當(dāng)一個(gè)結(jié)點(diǎn)被取消后,該結(jié)點(diǎn)的 Node#item
字段同樣指向結(jié)點(diǎn)自己。
由上述表格我們可以設(shè)計(jì)一個(gè)判斷結(jié)點(diǎn)是否已經(jīng)匹配的方法,如下:
// Node#isMatched final boolean isMatched() { Object x = item; return (x == this) || ((x == null) == isData); }
如果一個(gè)結(jié)點(diǎn)的 item 字段指向自己(即 x == this
),說明該結(jié)點(diǎn)被取消,或者對(duì)于請(qǐng)求結(jié)點(diǎn)而言,該結(jié)點(diǎn)已經(jīng)被匹配,否則我們就可以繼續(xù)執(zhí)行 (x == null) == isData
進(jìn)行判斷,具體如下:
如果當(dāng)前結(jié)點(diǎn)是數(shù)據(jù)結(jié)點(diǎn)(即 isData = true
),如果該結(jié)點(diǎn)被匹配則結(jié)點(diǎn)的 item 應(yīng)該為 null,所以滿足 (x == null) == isData
。
如果當(dāng)前結(jié)點(diǎn)是請(qǐng)求結(jié)點(diǎn)(即 isData = false
),如果該結(jié)點(diǎn)被匹配則結(jié)點(diǎn)的 item 應(yīng)該不為 null,所以滿足 (x == null) == isData
。
接下來我們開始分析 LinkedTransferQueue#xfer
方法的實(shí)現(xiàn),首先來看一下方法的參數(shù)定義,如下:
private E xfer(E e, boolean haveData, int how, long nanos) { // ... 省略方法實(shí)現(xiàn) }
其中參數(shù) e 表示待添加的元素值,如果是出隊(duì)列操作,則為 null;參數(shù) haveData 用于指定當(dāng)前是入隊(duì)列操作還是出隊(duì)列操作,如果是入隊(duì)列則 haveData 為 true,否則為 false;參數(shù) how 對(duì)應(yīng)當(dāng)前的操作模式,分為:NOW、ASYNC、SYNC,以及 TIMED,如果是 TIMED 模式,則參數(shù) nanos 用于指定當(dāng)前等待的納秒值。
下面進(jìn)一步介紹一下 how 參數(shù),我們知道 LinkedTransferQueue 的隊(duì)列操作方法基本上都是直接委托給 LinkedTransferQueue#xfer
方法執(zhí)行,而參數(shù) how 則用于控制在不同調(diào)用場(chǎng)景下該方法的運(yùn)行邏輯。LinkedTransferQueue 定義了 4 個(gè) int 類型常量,分別表示不同的操作模式,如下:
private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer
針對(duì)各個(gè)模式的含義說明如下:
NOW :當(dāng)隊(duì)列中沒有匹配的結(jié)點(diǎn)時(shí)立即返回而不等待,例如當(dāng)生產(chǎn)者執(zhí)行入隊(duì)列操作時(shí),如果隊(duì)列中沒有正在等待的消費(fèi)者則立即返回。
ASYNC :當(dāng)隊(duì)列中沒有匹配的結(jié)點(diǎn)時(shí)將元素入隊(duì)列,但是當(dāng)前線程本身并不等待而是立即返回,主要用于入隊(duì)列操作。
SYNC :當(dāng)隊(duì)列中沒有匹配的結(jié)點(diǎn)時(shí)將元素入隊(duì)列,并且當(dāng)前線程會(huì)依附在對(duì)應(yīng)結(jié)點(diǎn)上無限期等待。
TIMED :當(dāng)隊(duì)列中沒有匹配的結(jié)點(diǎn)時(shí)將元素入隊(duì)列,并且當(dāng)前線程會(huì)依附在對(duì)應(yīng)結(jié)點(diǎn)上超時(shí)等待。
LinkedTransferQueue 實(shí)現(xiàn)的主要入隊(duì)列和出隊(duì)列方法在委托執(zhí)行 LinkedTransferQueue#xfer
方法時(shí)的參數(shù)值設(shè)置如下表:
方法 | e | haveData | how | nanos |
---|---|---|---|---|
LinkedTransferQueue#put | e | true | ASYNC | 0 |
LinkedTransferQueue#add | e | true | ASYNC | 0 |
LinkedTransferQueue#offer(E) | e | true | ASYNC | 0 |
LinkedTransferQueue#offer(E, long, TimeUnit) | e | true | ASYNC | 0 |
LinkedTransferQueue#take | null | false | SYNC | 0 |
LinkedTransferQueue#poll() | null | false | NOW | 0 |
LinkedTransferQueue#poll(long, TimeUnit) | null | false | TIMED | timeout |
LinkedTransferQueue#transfer | e | true | SYNC | 0 |
LinkedTransferQueue#tryTransfer(E) | e | true | NOW | 0 |
LinkedTransferQueue#tryTransfer(E, long, TimeUnit) | e | true | TIMED | timeout |
下面開始分析方法 LinkedTransferQueue#xfer
的實(shí)現(xiàn),如下:
private E xfer(E e, boolean haveData, int how, long nanos) { // 如果是入隊(duì)列操作,則不允許待添加元素值為 null if (haveData && (e == null)) { throw new NullPointerException(); } // the node to append, if needed Node s = null; retry: for (; ; ) { // restart on append race /* 1. Try to match an existing node */ // 從頭開始遍歷隊(duì)列,對(duì)第一個(gè)未匹配的結(jié)點(diǎn)執(zhí)行匹配操作 for (Node h = head, p = h; p != null; ) { // find & match first node boolean isData = p.isData; Object item = p.item; // 找到第一個(gè)未匹配且未被取消的結(jié)點(diǎn) if (item != p && (item != null) == isData) { // unmatched // 結(jié)點(diǎn)模式與本次操作模式一致,無法匹配,退出循環(huán)并進(jìn)入下一步 if (isData == haveData) { // can't match break; } // 模式互補(bǔ),執(zhí)行匹配操作,將匹配結(jié)點(diǎn) p 的 item 值修改為 e // 如果 item 為 null,則 e 為 data,如果 item 為 data,則 e 為 null if (p.casItem(item, e)) { // 匹配成功 // 如果當(dāng)前被匹配的結(jié)點(diǎn)不是 head 結(jié)點(diǎn),需要更新 head 指針,保證松弛度小于 2 for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton // 更新 head 為匹配結(jié)點(diǎn) p 的 next 結(jié)點(diǎn),如果 next 結(jié)點(diǎn)為 null 則更新為當(dāng)前匹配結(jié)點(diǎn) if (head == h && this.casHead(h, n == null ? q : n)) { // 將之前的 head 結(jié)點(diǎn)自引用,等待 GC h.forgetNext(); break; } // 如果松弛度(slack)小于 2,則退出循環(huán),否則繼續(xù)循環(huán)后移 head 指針 if ((h = head) == null || (q = h.next) == null || !q.isMatched()) { break; // unless slack < 2 } } // 喚醒在剛剛完成匹配結(jié)點(diǎn)上等待的線程 LockSupport.unpark(p.waiter); return cast(item); } } // 結(jié)點(diǎn)已被其它線程匹配,繼續(xù)往后遍歷尋找下一個(gè)可匹配結(jié)點(diǎn) Node n = p.next; p = (p != n) ? n : (h = head); // 如果 p 已經(jīng)脫離隊(duì)列,則從 head 開始尋找 } // end of for // 未找到可以匹配的結(jié)點(diǎn),將當(dāng)前結(jié)點(diǎn)添加到隊(duì)列末端 if (how != NOW) { // 上游函數(shù)不期望立即返回 if (s == null) { s = new Node(e, haveData); } /* 2. Try to append a new node */ // 將結(jié)點(diǎn) s 添加到隊(duì)列末端,如果成功則返回 s 的前驅(qū)結(jié)點(diǎn) Node pred = this.tryAppend(s, haveData); // 返回 null 說明結(jié)點(diǎn) s 入隊(duì)列失敗,重試 if (pred == null) { continue retry; // lost race vs opposite mode } // 阻塞(或自旋)等待匹配 if (how != ASYNC) { /* 3. Await match or cancellation */ return this.awaitMatch(s, pred, e, (how == TIMED), nanos); } } return e; // not waiting } }
由上述實(shí)現(xiàn)可以看出,整個(gè) LinkedTransferQueue#xfer
方法的執(zhí)行分為 3 個(gè)階段(已在代碼中標(biāo)出),針對(duì)各個(gè)階段的說明作者在文檔中已經(jīng)給出了概述,這里直接摘錄作者的原話:
Try to match an existing node;
Try to append a new node;
Await match or cancellation.
也就是說當(dāng)一個(gè)線程進(jìn)入 LinkedTransferQueue#xfer
方法時(shí),第 1 步會(huì)嘗試在隊(duì)列中尋找可以匹配的結(jié)點(diǎn),如果存在則執(zhí)行匹配操作;否則如果上游方法不期望立即返回(即不為 NOW 操作模式)則執(zhí)行第 2 步,將當(dāng)前元素添加到隊(duì)列中;如果上游方法允許當(dāng)前線程等待(即不為 ASYNC 操作模式),則進(jìn)入等待狀態(tài),也就是第 3 步。
下面我們分步驟對(duì)這 3 個(gè)階段逐一進(jìn)行分析,首先來看 步驟 1 ,作者對(duì)這一步的詳細(xì)概述摘錄如下:
Try to match an existing node
Starting at head, skip already-matched nodes until finding an unmatched node of opposite mode, if one exists, in which case matching it and returning, also if necessary updating head to one past the matched node (or the node itself if the list has no other unmatched nodes). If the CAS misses, then a loop retries advancing head by two steps until either success or the slack is at most two. By requiring that each attempt advances head by two (if applicable), we ensure that the slack does not grow without bound. Traversals also check if the initial head is now off-list, in which case they start at the new head.
If no candidates are found and the call was untimed poll/offer, (argument "how" is NOW) return.
這一步的核心邏輯在于從隊(duì)列中尋找可以匹配的結(jié)點(diǎn),并執(zhí)行匹配操作,具體執(zhí)行流程概括為:
從隊(duì)列頭部開始遍歷隊(duì)列,尋找第一個(gè)未被取消且未被匹配的結(jié)點(diǎn) p,如果存在則進(jìn)入匹配進(jìn)程;
校驗(yàn)結(jié)點(diǎn) p 的模式是否與當(dāng)前操作模式互補(bǔ),如果相同則無法匹配,需要轉(zhuǎn)而執(zhí)行步驟 2,將當(dāng)前結(jié)點(diǎn)添加到隊(duì)列末端;
否則,基于 CAS 修改結(jié)點(diǎn) p 的 item 值(如果是請(qǐng)求結(jié)點(diǎn),則更新 item 為元素值 e;如果是數(shù)據(jù)結(jié)點(diǎn),則更新 item 為 null),即執(zhí)行匹配操作;
如果匹配失敗,則說明存在其它線程先于完成了匹配操作,繼續(xù)往后尋找下一個(gè)可以匹配的結(jié)點(diǎn);
如果匹配成功,則嘗試后移 head 指針,保證 head 結(jié)點(diǎn)的松弛度小于 2,并喚醒在匹配結(jié)點(diǎn)上阻塞的線程,最后返回本次匹配結(jié)點(diǎn)的 item 值。
下面利用圖示演示上述執(zhí)行流程,其中黃色表示消費(fèi)者結(jié)點(diǎn),青色表示生產(chǎn)者結(jié)點(diǎn)(M 表示已匹配,U 表示未匹配),紅色表示當(dāng)前匹配結(jié)點(diǎn)。假設(shè)當(dāng)前操作是一個(gè)消費(fèi)者線程,則從隊(duì)列頭部開始往后尋找第一個(gè)未被取消且未被匹配的結(jié)點(diǎn),此時(shí)各指針的指向如下圖 1 所示。在執(zhí)行完幾輪循環(huán)之后,當(dāng)前線程在隊(duì)列上找到了第一個(gè)可以匹配的結(jié)點(diǎn) p,如下圖 2 所示。然后執(zhí)行匹配操作,基于 CAS 嘗試將待匹配結(jié)點(diǎn) p 的 item 值修改為 null,如下圖 3 所示。
接下來線程會(huì)進(jìn)入最內(nèi)側(cè) for 循環(huán),嘗試后移 head 指針,以保證 head 結(jié)點(diǎn)的松弛度小于 2,如果期間正好有另外一個(gè)線程更新了 head 指針的指向,此時(shí)各指針的指向如上圖 4 所示。此時(shí) head 指針與 h 指針指向不同,所以繼續(xù)執(zhí)行最內(nèi)側(cè) for 循環(huán)的第二個(gè) if 判斷,執(zhí)行完后各個(gè)指針的指向如上圖 5 所示。此時(shí)因?yàn)橹羔?q 所指向的結(jié)點(diǎn)已經(jīng)完成匹配,所以繼續(xù)進(jìn)入下一輪最內(nèi)側(cè) for 循環(huán),此時(shí)滿足最內(nèi)側(cè) for 循環(huán)的第一個(gè) if 判斷,基于 CAS 更新 head 指針,并將之前 head 結(jié)點(diǎn)的 next 指針指向自己(自引用),等待 GC 回收,如上圖 6 所示。最后喚醒在本次匹配結(jié)點(diǎn)上等待的線程,并返回。
如果上述步驟沒有找到可以匹配的結(jié)點(diǎn),則嘗試為當(dāng)前元素構(gòu)造一個(gè)新的結(jié)點(diǎn)并插入到隊(duì)列中,即執(zhí)行 步驟 2 ,作者對(duì)這一步的詳細(xì)概述摘錄如下:
Try to append a new node
Starting at current tail pointer, find the actual last node and try to append a new node (or if head was null, establish the first node). Nodes can be appended only if their predecessors are either already matched or are of the same mode. If we detect otherwise, then a new node with opposite mode must have been appended during traversal, so we must restart at phase 1. The traversal and update steps are otherwise similar to phase 1: Retrying upon CAS misses and checking for staleness. In particular, if a self-link is encountered, then we can safely jump to a node on the list by continuing the traversal at current head.
On successful append, if the call was ASYNC, return.
如果當(dāng)前操作模式為 NOW,則說明上游方法要求當(dāng)隊(duì)列中不存在可以匹配的結(jié)點(diǎn)時(shí)立即返回,則不執(zhí)行本步驟,否則執(zhí)行 LinkedTransferQueue#tryAppend
方法嘗試將當(dāng)前結(jié)點(diǎn) s 入隊(duì)列。該方法在執(zhí)行失敗的情況下會(huì)返回 null,否則返回新添加結(jié)點(diǎn) s 的前驅(qū)結(jié)點(diǎn),如果沒有前驅(qū)結(jié)點(diǎn)則返回結(jié)點(diǎn) s 自己。
方法 LinkedTransferQueue#tryAppend
的實(shí)現(xiàn)如下:
private Node tryAppend(Node s, boolean haveData) { // 嘗試將結(jié)點(diǎn) s 入隊(duì)列 for (Node t = tail, p = t; ; ) { // move p to last node and append Node n, u; // temps for reads of next & tail // 當(dāng)前隊(duì)列為空 if (p == null && (p = head) == null) { // 1 // 直接將結(jié)點(diǎn) s 設(shè)置為 head,并返回 s 結(jié)點(diǎn) if (this.casHead(null, s)) { return s; // initialize } } // 結(jié)點(diǎn) s 不能作為結(jié)點(diǎn) p 的后繼結(jié)點(diǎn),因?yàn)?nbsp;p 和 s 的模式互補(bǔ),且 p 未匹配 else if (p.cannotPrecede(haveData)) { // 2 return null; // lost race vs opposite mode } // p 已經(jīng)不是最新的尾結(jié)點(diǎn),更新 else if ((n = p.next) != null) { // 3 // not last; keep traversing p = p != t && t != (u = tail) ? (t = u) // stale tail : (p != n) ? n : null; // restart if off list } // 結(jié)點(diǎn) s 入隊(duì)列失敗,說明 p 未指向最新的尾結(jié)點(diǎn) else if (!p.casNext(null, s)) { // 4 p = p.next; // re-read on CAS failure } // 將結(jié)點(diǎn) s 入隊(duì)列成功,后移 tail 指針,保證松弛度小于 2 else { // 5 if (p != t) { // update if slack now >= 2 while ((tail != t || !this.casTail(t, s)) // 后移 tail 指針 && (t = tail) != null && (s = t.next) != null // advance and retry && (s = s.next) != null && s != t) { } } return p; } } }
這一步的核心邏輯在于將結(jié)點(diǎn) s 入隊(duì)列,并在 tail 結(jié)點(diǎn)松弛度較大時(shí)后移 tail 指針。具體執(zhí)行流程概括為:
如果隊(duì)列為空,則直接將結(jié)點(diǎn) s 入隊(duì)列,并返回結(jié)點(diǎn) s 對(duì)象;
否則,校驗(yàn)結(jié)點(diǎn) s 能否入隊(duì)列,如果前驅(qū)結(jié)點(diǎn)與結(jié)點(diǎn) s 模式互補(bǔ)且未匹配,則不能入隊(duì)列,此時(shí)直接返回 null 并退回步驟 1 開始執(zhí)行;
如果結(jié)點(diǎn) s 可以入隊(duì)列,則尋找隊(duì)列當(dāng)前真正的 tail 結(jié)點(diǎn),并將結(jié)點(diǎn) s 作為后繼結(jié)點(diǎn)入隊(duì)列;
如果入隊(duì)列失敗,則說明前驅(qū)結(jié)點(diǎn)不是最新的隊(duì)列 tail 結(jié)點(diǎn),繼續(xù)進(jìn)入下一輪循環(huán)重試;
如果入隊(duì)列成功,則判斷 tail 結(jié)點(diǎn)的松弛度是否較大,如果較大則后移 tail 指針,以降低 tail 結(jié)點(diǎn)的松弛度。
下面利用圖示演示上述執(zhí)行流程。假設(shè)當(dāng)前操作是一個(gè)生產(chǎn)者線程,期望向隊(duì)列插入一個(gè)元素值為 5 的結(jié)點(diǎn),并且隊(duì)列中存在的都是未匹配的生產(chǎn)者結(jié)點(diǎn),如下圖 1 所示。此時(shí)隊(duì)列不為空,且結(jié)點(diǎn) s 可以入隊(duì)列,此時(shí)各指針指向如下圖 2 所示。因?yàn)榻Y(jié)點(diǎn) p 的 next 結(jié)點(diǎn)不為 null,說明 p 未指向最新的 tail 結(jié)點(diǎn),需要后移 p、t 和 n 指針,直到 p 指向 tail 結(jié)點(diǎn),如下圖 3、4 和 5 所示。
接下來執(zhí)行代碼 4,基于 CAS 嘗試將 p 結(jié)點(diǎn)的 next 結(jié)點(diǎn)由 null 更新為 s,即將結(jié)點(diǎn) s 入隊(duì)列,如上圖 6 所示。如果入隊(duì)列成功,則繼續(xù)執(zhí)行代碼 5,后移 tail 指針,保證 tail 結(jié)點(diǎn)的松弛度小于 2,最后返回結(jié)點(diǎn) s 的前驅(qū)結(jié)點(diǎn),如上圖 7 和 8 所示。
最后來看 步驟 3 ,作者對(duì)這一步的詳細(xì)概述摘錄如下:
Await match or cancellation
Wait for another thread to match node; instead cancelling if the current thread was interrupted or the wait timed out. On multiprocessors, we use front-of-queue spinning: If a node appears to be the first unmatched node in the queue, it spins a bit before blocking. In either case, before blocking it tries to unsplice any nodes between the current "head" and the first unmatched node.
Front-of-queue spinning vastly improves performance of heavily contended queues. And so long as it is relatively brief and "quiet", spinning does not much impact performance of less-contended queues. During spins threads check their interrupt status and generate a thread-local random number to decide to occasionally perform a Thread.yield. While yield has underdefined specs, we assume that it might help, and will not hurt, in limiting impact of spinning on busy systems. We also use smaller (1/2) spins for nodes that are not known to be front but whose predecessors have not blocked -- these "chained" spins avoid artifacts of front-of-queue rules which otherwise lead to alternating nodes spinning vs blocking. Further, front threads that represent phase changes (from data to request node or vice versa) compared to their predecessors receive additional chained spins, reflecting longer paths typically required to unblock threads during phase changes.
如果當(dāng)前操作模式為 ASYNC,則說明上游方法要求線程在完成入隊(duì)列操作之后不阻塞等待,而是立即返回。對(duì)于其它操作模式(除 NOW 和 ASYNC 以外)則需要執(zhí)行 LinkedTransferQueue#awaitMatch
方法讓當(dāng)前線程依附在剛剛?cè)腙?duì)列的結(jié)點(diǎn)上等待。如果是 TIMED 操作模式,則執(zhí)行超時(shí)等待,否則執(zhí)行無限期等待,期間支持響應(yīng)中斷。
方法 LinkedTransferQueue#awaitMatch
實(shí)現(xiàn)如下:
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { // 如果設(shè)置超時(shí),則計(jì)算到期時(shí)間戳 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); int spins = -1; // initialized after first item and cancel checks ThreadLocalRandom randomYields = null; // bound if needed for (; ; ) { Object item = s.item; // 當(dāng)前結(jié)點(diǎn)已匹配 if (item != e) { // matched s.forgetContents(); // avoid garbage return cast(item); } // 線程被中斷,或者等待超時(shí),則取消 if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // 將結(jié)點(diǎn)的 item 指向結(jié)點(diǎn)自己,表示取消 // 移除結(jié)點(diǎn) s this.unsplice(pred, s); return e; } // 初始化自旋次數(shù) if (spins < 0) { // establish spins at/near front // 依據(jù)前驅(qū)結(jié)點(diǎn)的狀態(tài)計(jì)算當(dāng)前結(jié)點(diǎn)的自旋次數(shù) if ((spins = spinsFor(pred, s.isData)) > 0) { randomYields = ThreadLocalRandom.current(); } } // 在阻塞之前先自旋幾次 else if (spins > 0) { // spin --spins; if (randomYields.nextInt(CHAINED_SPINS) == 0) { // 隨機(jī)讓步 Thread.yield(); // occasionally yield } } // 將當(dāng)前線程對(duì)象綁定到 s 結(jié)點(diǎn)上 else if (s.waiter == null) { s.waiter = w; // request unpark then recheck } // 如果設(shè)置了超時(shí),則超時(shí)等待 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos > 0L) { LockSupport.parkNanos(this, nanos); } } // 如果未設(shè)置超時(shí),則無限期等待 else { LockSupport.park(this); } } }
可以看到在線程進(jìn)入阻塞狀態(tài)之前會(huì)先自旋幾次,這樣主要是為了提升 LinkedTransferQueue 在多核 CPU 上的性能,在入隊(duì)列和出隊(duì)列比較頻繁的場(chǎng)景下避免線程不必要的阻塞和喚醒操作。上述方法的實(shí)現(xiàn)與上一篇介紹 SynchronousQueue 中的 TransferStack#awaitFulfill
方法的執(zhí)行過程基本一致。
到此,相信大家對(duì)“JUC的LinkedTransferQueue怎么使用”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。