您好,登錄后才能下訂單哦!
這篇文章主要介紹了Java并發(fā)數(shù)據(jù)結(jié)構(gòu)的基石是什么的相關(guān)知識(shí),內(nèi)容詳細(xì)易懂,操作簡(jiǎn)單快捷,具有一定借鑒價(jià)值,相信大家閱讀完這篇Java并發(fā)數(shù)據(jù)結(jié)構(gòu)的基石是什么文章都會(huì)有所收獲,下面我們一起來看看吧。
Java 的線程阻塞和喚醒是通過 Unsafe 類的 park 和 unpark 方法做到的。
public class Unsafe {
...
public native void park(boolean isAbsolute, long time);
public native void unpark(Thread t);
...
}
這兩個(gè)方法都是 native 方法,它們本身是由 C 語(yǔ)言來實(shí)現(xiàn)的核心功能。park 的意思是停車,讓當(dāng)前運(yùn)行的線程 Thread.currentThread() 休眠,unpark 的意思是解除停車,喚醒指定線程。這兩個(gè)方法在底層是使用操作系統(tǒng)提供的信號(hào)量機(jī)制來實(shí)現(xiàn)的。具體實(shí)現(xiàn)過程要深究 C 代碼,這里暫時(shí)不去具體分析。park 方法的兩個(gè)參數(shù)用來控制休眠多長(zhǎng)時(shí)間,第一個(gè)參數(shù) isAbsolute 表示第二個(gè)參數(shù)是絕對(duì)時(shí)間還是相對(duì)時(shí)間,單位是毫秒。
線程從啟動(dòng)開始就會(huì)一直跑,除了操作系統(tǒng)的任務(wù)調(diào)度策略外,它只有在調(diào)用 park 的時(shí)候才會(huì)暫停運(yùn)行。鎖可以暫停線程的奧秘所在正是因?yàn)殒i在底層調(diào)用了 park 方法。
線程對(duì)象 Thread 里面有一個(gè)重要的屬性 parkBlocker,它保存當(dāng)前線程因?yàn)槭裁炊?park。就好比停車場(chǎng)上停了很多車,這些車主都是來參加一場(chǎng)拍賣會(huì)的,等拍下自己想要的物品后,就把車開走。那么這里的 parkBlocker 大約就是指這場(chǎng)「拍賣會(huì)」。它是一系列沖突線程的管理者協(xié)調(diào)者,哪個(gè)線程該休眠該喚醒都是由它來控制的。
class Thread {
...
volatile Object parkBlocker;
...
}
當(dāng)線程被 unpark 喚醒后,這個(gè)屬性會(huì)被置為 null。Unsafe.park 和 unpark 并不會(huì)幫我們?cè)O(shè)置 parkBlocker 屬性,負(fù)責(zé)管理這個(gè)屬性的工具類是 LockSupport,它對(duì) Unsafe 這兩個(gè)方法進(jìn)行了簡(jiǎn)單的包裝。
class LockSupport {
...
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, 0L);
setBlocker(t, null); // 醒來后置null
}
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
}
}
...
}
Java 的鎖數(shù)據(jù)結(jié)構(gòu)正是通過調(diào)用 LockSupport 來實(shí)現(xiàn)休眠與喚醒的。線程對(duì)象里面的 parkBlocker 字段的值就是下面我們要講的「排隊(duì)管理器」。
當(dāng)多個(gè)線程爭(zhēng)用同一把鎖時(shí),必須有排隊(duì)機(jī)制將那些沒能拿到鎖的線程串在一起。當(dāng)鎖釋放時(shí),鎖管理器就會(huì)挑選一個(gè)合適的線程來占有這個(gè)剛剛釋放的鎖。每一把鎖內(nèi)部都會(huì)有這樣一個(gè)隊(duì)列管理器,管理器里面會(huì)維護(hù)一個(gè)等待的線程隊(duì)列。ReentrantLock 里面的隊(duì)列管理器是 AbstractQueuedSynchronizer,它內(nèi)部的等待隊(duì)列是一個(gè)雙向列表結(jié)構(gòu),列表中的每個(gè)節(jié)點(diǎn)的結(jié)構(gòu)如下。
class AbstractQueuedSynchronizer {
volatile Node head; // 隊(duì)頭線程將優(yōu)先獲得鎖
volatile Node tail; // 搶鎖失敗的線程追加到隊(duì)尾
volatile int state; // 鎖計(jì)數(shù)
}
class Node {
Node prev;
Node next;
Thread thread; // 每個(gè)節(jié)點(diǎn)一個(gè)線程
// 下面這兩個(gè)特殊字段可以先不去理解
Node nextWaiter; // 請(qǐng)求的是共享鎖還是獨(dú)占鎖
int waitStatus; // 精細(xì)狀態(tài)描述字
}
加鎖不成功時(shí),當(dāng)前的線程就會(huì)把自己納入到等待鏈表的尾部,然后調(diào)用 LockSupport.park 將自己休眠。其它線程解鎖時(shí),會(huì)從鏈表的表頭取一個(gè)節(jié)點(diǎn),調(diào)用 LockSupport.unpark 喚醒它。
AbstractQueuedSynchronizer 類是一個(gè)抽象類,它是所有的鎖隊(duì)列管理器的父類,JDK 中的各種形式的鎖其內(nèi)部的隊(duì)列管理器都繼承了這個(gè)類,它是 Java 并發(fā)世界的核心基石。比如 ReentrantLock、ReadWriteLock、CountDownLatch、Semaphone、ThreadPoolExecutor 內(nèi)部的隊(duì)列管理器都是它的子類。這個(gè)抽象類暴露了一些抽象方法,每一種鎖都需要對(duì)這個(gè)管理器進(jìn)行定制。而 JDK 內(nèi)置的所有并發(fā)數(shù)據(jù)結(jié)構(gòu)都是在這些鎖的保護(hù)下完成的,它是JDK 多線程高樓大廈的地基。
鎖管理器維護(hù)的只是一個(gè)普通的雙向列表形式的隊(duì)列,這個(gè)數(shù)據(jù)結(jié)構(gòu)很簡(jiǎn)單,但是仔細(xì)維護(hù)起來卻相當(dāng)復(fù)雜,因?yàn)樗枰?xì)考慮多線程并發(fā)問題,每一行代碼都寫的無比小心。
JDK 鎖管理器的實(shí)現(xiàn)者是 Douglas S. Lea,Java 并發(fā)包幾乎全是他單槍匹馬寫出來的,在算法的世界里越是精巧的東西越是適合一個(gè)人來做。
Douglas S. Lea是紐約州立大學(xué)奧斯威戈分校計(jì)算機(jī)科學(xué)教授和現(xiàn)任計(jì)算機(jī)科學(xué)系主任,專門研究并發(fā)編程和并發(fā)數(shù)據(jù)結(jié)構(gòu)的設(shè)計(jì)。他是Java Community Process的執(zhí)行委員會(huì)成員,主持JSR 166,它為Java編程語(yǔ)言添加了并發(fā)實(shí)用程序。
后面我們將 AbstractQueuedSynchronizer 簡(jiǎn)寫成 AQS。我必須提醒各位讀者,AQS 太復(fù)雜了,如果在理解它的路上遇到了挫折,這很正常。目前市場(chǎng)上并不存在一本可以輕松理解 AQS 的書籍,能夠吃透 AQS 的人太少太少,我自己也不算。
公平鎖會(huì)確保請(qǐng)求鎖和獲得鎖的順序,如果在某個(gè)點(diǎn)鎖正處于自由狀態(tài),這時(shí)有一個(gè)線程要嘗試加鎖,公平鎖還必須查看當(dāng)前有沒有其它線程排在排隊(duì),而非公平鎖可以直接插隊(duì)。聯(lián)想一下在肯德基買漢堡時(shí)的排隊(duì)場(chǎng)景。
也許你會(huì)問,如果某個(gè)鎖處于自由狀態(tài),那它怎么會(huì)有排隊(duì)的線程呢?我們假設(shè)此刻持有鎖的線程剛剛釋放了鎖,它喚醒了等待隊(duì)列中第一個(gè)節(jié)點(diǎn)線程,這時(shí)候被喚醒的線程剛剛從 park 方法返回,接下來它就會(huì)嘗試去加鎖,那么從 park 返回到加鎖之間的狀態(tài)就是鎖的自由態(tài),這很短暫,而這短暫的時(shí)間內(nèi)還可能有其它線程也在嘗試加鎖。
其次還有一點(diǎn)需要注意,執(zhí)行了 Lock.park 方法的線程自我休眠后,并不是非要等到其它線程 unpark 了自己才會(huì)醒來,它可能隨時(shí)會(huì)以某種未知的原因醒來。我們看源碼注釋,park 返回的原因有四種
其它線程 unpark 了當(dāng)前線程
時(shí)間到了自然醒(park 有時(shí)間參數(shù))
其它線程 interrupt 了當(dāng)前線程
其它未知原因?qū)е碌摹讣傩选?/p>
文檔中沒有明確說明何種未知原因會(huì)導(dǎo)致假醒,它倒是說明了當(dāng) park 方法返回時(shí)并不意味著鎖自由了,醒過來的線程在重新嘗試獲取鎖失敗后將會(huì)再次 park 自己。所以加鎖的過程需要寫在一個(gè)循環(huán)里,在成功拿到鎖之前可能會(huì)進(jìn)行多次嘗試。
計(jì)算機(jī)世界非公平鎖的服務(wù)效率要高于公平鎖,所以 Java 默認(rèn)的鎖都使用了非公平鎖。不過現(xiàn)實(shí)世界似乎非公平鎖的效率會(huì)差一點(diǎn),比如在肯德基如果可以不停插隊(duì),你可以想象現(xiàn)場(chǎng)肯定一片混亂。為什么計(jì)算機(jī)世界和現(xiàn)實(shí)世界會(huì)有差異,大概是因?yàn)樵谟?jì)算機(jī)世界里某個(gè)線程插隊(duì)并不會(huì)導(dǎo)致其它線程抱怨。
public ReentrantLock() {
this.sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
this.sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock 的鎖是排他鎖,一個(gè)線程持有,其它線程都必須等待。而 ReadWriteLock 里面的讀鎖不是排他鎖,它允許多線程同時(shí)持有讀鎖,這是共享鎖。共享鎖和排他鎖是通過 Node 類里面的 nextWaiter 字段區(qū)分的。
class AQS {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
boolean isShared() {
return this.nextWaiter == SHARED;
}
}
那為什么這個(gè)字段沒有命名成 mode 或者 type 或者干脆直接叫 shared?這是因?yàn)?nextWaiter 在其它場(chǎng)景還有不一樣的用途,它就像 C 語(yǔ)言聯(lián)合類型的字段一樣隨機(jī)應(yīng)變,只不過 Java 語(yǔ)言沒有聯(lián)合類型。
關(guān)于條件變量,需要提出的第一個(gè)問題是為什么需要條件變量,只有鎖還不夠么?考慮下面的偽代碼,當(dāng)某個(gè)條件滿足時(shí),才去干某件事
void doSomething() {
locker.lock();
while(!condition_is_true()) { // 先看能不能搞事
locker.unlock(); // 搞不了就歇會(huì)再看看能不能搞
sleep(1);
locker.lock(); // 搞事需要加鎖,判斷能不能搞事也需要加鎖
}
justdoit(); // 搞事
locker.unlock();
}
當(dāng)條件不滿足時(shí),就循環(huán)重試(其它線程會(huì)通過加鎖來修改條件),但是需要間隔 sleep,不然 CPU 就會(huì)因?yàn)榭辙D(zhuǎn)而飆高。這里存在一個(gè)問題,那就是 sleep 多久不好控制。間隔太久,會(huì)拖慢整體效率,甚至?xí)e(cuò)過時(shí)機(jī)(條件瞬間滿足了又立即被重置了),間隔太短,又回導(dǎo)致 CPU 空轉(zhuǎn)。有了條件變量,這個(gè)問題就可以解決了
void doSomethingWithCondition() {
cond = locker.newCondition();
locker.lock();
while(!condition_is_true()) {
cond.await();
}
justdoit();
locker.unlock();
}
await() 方法會(huì)一直阻塞在 cond 條件變量上直到被另外一個(gè)線程調(diào)用了 cond.signal() 或者 cond.signalAll() 方法后才會(huì)返回,await() 阻塞時(shí)會(huì)自動(dòng)釋放當(dāng)前線程持有的鎖,await() 被喚醒后會(huì)再次嘗試持有鎖(可能又需要排隊(duì)),拿到鎖成功之后 await() 方法才能成功返回。
阻塞在條件變量上的線程可以有多個(gè),這些阻塞線程會(huì)被串聯(lián)成一個(gè)條件等待隊(duì)列。當(dāng) signalAll() 被調(diào)用時(shí),會(huì)喚醒所有的阻塞線程,讓所有的阻塞線程重新開始爭(zhēng)搶鎖。如果調(diào)用的是 signal() 只會(huì)喚醒隊(duì)列頭部的線程,這樣可以避免「驚群?jiǎn)栴}」。
await() 方法必須立即釋放鎖,否則臨界區(qū)狀態(tài)就不能被其它線程修改,condition_is_true() 返回的結(jié)果也就不會(huì)改變。 這也是為什么條件變量必須由鎖對(duì)象來創(chuàng)建,條件變量需要持有鎖對(duì)象的引用這樣才可以釋放鎖以及被 signal 喚醒后重新加鎖。創(chuàng)建條件變量的鎖必須是排他鎖,如果是共享鎖被 await() 方法釋放了并不能保證臨界區(qū)的狀態(tài)可以被其它線程來修改,可以修改臨界區(qū)狀態(tài)的只能是排他鎖。這也是為什么 ReadWriteLock.ReadLock 類的 newCondition 方法定義如下
public Condition newCondition() {
throw new UnsupportedOperationException();
}
有了條件變量,sleep 不好控制的問題就解決了。當(dāng)條件滿足時(shí),調(diào)用 signal() 或者 signalAll() 方法,阻塞的線程可以立即被喚醒,幾乎沒有任何延遲。
當(dāng)多個(gè)線程 await() 在同一個(gè)條件變量上時(shí),會(huì)形成一個(gè)條件等待隊(duì)列。同一個(gè)鎖可以創(chuàng)建多個(gè)條件變量,就會(huì)存在多個(gè)條件等待隊(duì)列。這個(gè)隊(duì)列和 AQS 的隊(duì)列結(jié)構(gòu)很接近,只不過它不是雙向隊(duì)列,而是單向隊(duì)列。隊(duì)列中的節(jié)點(diǎn)和 AQS 等待隊(duì)列的節(jié)點(diǎn)是同一個(gè)類,但是節(jié)點(diǎn)指針不是 prev 和 next,而是 nextWaiter。
class AQS {
...
class ConditionObject {
Node firstWaiter; // 指向第一個(gè)節(jié)點(diǎn)
Node lastWaiter; // 指向第二個(gè)節(jié)點(diǎn)
}
class Node {
static final int CONDITION = -2;
static final int SIGNAL = -1;
Thread thread; // 當(dāng)前等待的線程
Node nextWaiter; // 指向下一個(gè)條件等待節(jié)點(diǎn)
Node prev;
Node next;
int waitStatus; // waitStatus = CONDITION
}
...
}
ConditionObject 是 AQS 的內(nèi)部類,這個(gè)對(duì)象里會(huì)有一個(gè)隱藏的指針 this$0 指向外部的 AQS 對(duì)象,ConditionObject 可以直接訪問 AQS 對(duì)象的所有屬性和方法(加鎖解鎖)。位于條件等待隊(duì)列里的所有節(jié)點(diǎn)的 waitStatus 狀態(tài)都被標(biāo)記為 CONDITION,表示節(jié)點(diǎn)是因?yàn)闂l件變量而等待。
當(dāng)條件變量的 signal() 方法被調(diào)用時(shí),條件等待隊(duì)列的頭節(jié)點(diǎn)線程會(huì)被喚醒,該節(jié)點(diǎn)從條件等待隊(duì)列中被摘走,然后被轉(zhuǎn)移到 AQS 的等待隊(duì)列中,準(zhǔn)備排隊(duì)嘗試重新獲取鎖。這時(shí)節(jié)點(diǎn)的狀態(tài)從 CONDITION 轉(zhuǎn)為 SIGNAL,表示當(dāng)前節(jié)點(diǎn)是被條件變量喚醒轉(zhuǎn)移過來的。
class AQS {
...
boolean transferForSignal(Node node) {
// 重置節(jié)點(diǎn)狀態(tài)
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false
Node p = enq(node); // 進(jìn)入 AQS 等待隊(duì)列
int ws = p.waitStatus;
// 再修改狀態(tài)為SIGNAL
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
...
}
被轉(zhuǎn)移的節(jié)點(diǎn)的 nextWaiter 字段的含義也發(fā)生了變更,在條件隊(duì)列里它是下一個(gè)節(jié)點(diǎn)的指針,在 AQS 等待隊(duì)列里它是共享鎖還是互斥鎖的標(biāo)志。
下面我們精細(xì)分析加鎖過程,深入理解鎖邏輯控制。我必須肯定 Dough Lea 的代碼寫成下面這樣的極簡(jiǎn)形式,閱讀起來還是挺難以理解的。
class ReentrantLock {
...
public void lock() {
sync.acquire(1);
}
...
}
class Sync extends AQS {
...
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
...
}
acquire 的 if 判斷語(yǔ)句要分為三個(gè)部分,tryAcquire 方法表示當(dāng)前的線程嘗試加鎖,如果加鎖不成功就需要排隊(duì),這時(shí)候調(diào)用 addWaiter 方法,將當(dāng)前線程入隊(duì)。然后再調(diào)用 acquireQueued 方法,開始了 park 、醒來重試加鎖、加鎖不成功繼續(xù) park 的循環(huán)重試加鎖過程。直到加鎖成功 acquire 方法才會(huì)返回。
如果在循環(huán)重試加鎖過程中被其它線程打斷了,acquireQueued 方法就會(huì)返回 true。這時(shí)候線程就需要調(diào)用 selfInterrupt() 方法給當(dāng)前線程設(shè)置一個(gè)被打斷的標(biāo)識(shí)位。
// 打斷當(dāng)前線程,其實(shí)就是設(shè)置一個(gè)標(biāo)識(shí)位
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
線程如何知道自己被其它線程打斷了呢?在 park 醒來之后調(diào)用 Thread.interrupted() 就知道了,不過這個(gè)方法只能調(diào)用一次,因?yàn)樗谡{(diào)用之后就會(huì)立即 clear 打斷標(biāo)志位。這也是為什么 acquire 方法里需要調(diào)用 selfInterrupt() ,為的就是重新設(shè)置打斷標(biāo)志位。這樣上層的邏輯才可以通過 Thread.interrupted() 知道自己有沒有被打斷。
acquireQueued 和 addWaiter 方法由 AQS 類提供,tryAcquire 需要由子類自己實(shí)現(xiàn)。不同的鎖會(huì)有不同的實(shí)現(xiàn)。下面我們來看看 ReentrantLock 的公平鎖 tryAcquire 方法的實(shí)現(xiàn)
這里有個(gè) if else 分支,其中 else if 部分表示鎖的重入,當(dāng)前嘗試加鎖的線程是已經(jīng)持有了這把鎖的線程,也就是同一個(gè)線程重復(fù)加鎖,這時(shí)只需要增加計(jì)數(shù)值就行了。鎖的 state 記錄的就是加鎖計(jì)數(shù),重入一次就 +1。AQS 對(duì)象里有一個(gè) exclusiveOwnerThread 字段,記錄了當(dāng)前持有排他鎖的線程。
if(c == 0) 意味著當(dāng)前鎖是自由態(tài),計(jì)數(shù)值為零。這時(shí)就需要爭(zhēng)搶鎖,因?yàn)橥粫r(shí)間可能會(huì)有多個(gè)線程在調(diào)用 tryAcquire。爭(zhēng)搶的方式是用 CAS 操作 compareAndSetState,成功將鎖計(jì)數(shù)值從 0 改成 1 的線程將獲得這把鎖,將當(dāng)前的線程記錄到 exclusiveOwnerThread 中。
代碼里還有一個(gè) hasQueuedPredecessors() 判斷,這個(gè)判斷非常重要,它的意思是看看當(dāng)前的 AQS 等待隊(duì)列里有沒有其它線程在排隊(duì),公平鎖在加鎖之前需要 check 一下,如果有排隊(duì)的,自己就不能插隊(duì)。而非公平鎖就不需要 check,公平鎖和非公平鎖的全部的實(shí)現(xiàn)差異就在于此,就這一個(gè) check 決定了鎖的公平與否。
下面我們?cè)倏纯?addWaiter 方法的實(shí)現(xiàn),參數(shù) mode 表示是共享鎖還是排他鎖,它對(duì)應(yīng) Node.nextWaiter 屬性。
addWaiter 需要將新的節(jié)點(diǎn)添加到 AQS 等待隊(duì)列的隊(duì)尾。如果隊(duì)尾 tail 是空的意味著隊(duì)列還沒有初始化,那就需要初始化一下。AQS 隊(duì)列在初始化時(shí)需要一個(gè)冗余的頭部節(jié)點(diǎn),這個(gè)節(jié)點(diǎn)的 thread 字段是空的。
將新節(jié)點(diǎn)添加到隊(duì)尾也是需要考慮多線程并發(fā)的,所以代碼里再一次使用了 CAS 操作 compareAndSetTail 來競(jìng)爭(zhēng)隊(duì)尾指針。沒有競(jìng)爭(zhēng)到的線程就會(huì)繼續(xù)下一輪競(jìng)爭(zhēng) for(;;) 繼續(xù)使用 CAS 操作將新節(jié)點(diǎn)往隊(duì)尾添加。
下面我們?cè)倏纯?acquireQueue 方法的代碼實(shí)現(xiàn),它會(huì)重復(fù) park、嘗試再次加鎖、加鎖失敗繼續(xù) park 的循環(huán)過程。
acquireQueue 在嘗試加鎖之前會(huì)先看看自己是不是 AQS 等待隊(duì)列的第一個(gè)節(jié)點(diǎn),如果不是它就繼續(xù)去 park。這意味著不管是公平還是非公平鎖,在這里它們都統(tǒng)一采取了公平的方案,看看隊(duì)列中是不是輪到自己了。也就是說「一朝排隊(duì),永遠(yuǎn)排隊(duì)」。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
線程在 park 返回醒來之后要立即檢測(cè)一下是否被其它線程中斷了。不過即使發(fā)生中斷了,它還會(huì)繼續(xù)嘗試獲取鎖,如果獲取不到,還會(huì)繼續(xù)睡眠,直到鎖獲取到了才將中斷狀態(tài)返回。這意味著打斷線程并不會(huì)導(dǎo)致死鎖狀態(tài)(拿不到鎖)退出。
同時(shí)我們還可以注意到鎖是可以取消的 cancelAcquire(),準(zhǔn)確地說是取消處于等待加鎖的狀態(tài),線程處于 AQS 的等待隊(duì)列中等待加鎖。那什么情況下才會(huì)拋出異常而導(dǎo)致取消加鎖呢,唯一的可能就是 tryAcquire 方法,這個(gè)方法是由子類實(shí)現(xiàn)的,子類的行為不受 AQS 控制。當(dāng)子類的 tryAcquire 方法拋出了異常,那 AQS 最好的處理方法就是取消加鎖了。cancelAcquire 會(huì)將當(dāng)前節(jié)點(diǎn)從等待隊(duì)列中移除。
解鎖的過程要簡(jiǎn)單一些,將鎖計(jì)數(shù)降為零后,喚醒等待隊(duì)列中的第一個(gè)有效節(jié)點(diǎn)。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 解鈴還須系鈴人
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
考慮到可重入鎖,需要判斷鎖計(jì)數(shù)是否降為零才可以確定鎖是否徹底被釋放。只有鎖徹底被釋放了才能喚醒后繼等待節(jié)點(diǎn)。unparkSuccessor 會(huì)跳過無效節(jié)點(diǎn)(已取消的節(jié)點(diǎn)),找到第一個(gè)有效節(jié)點(diǎn)調(diào)用 unpark() 喚醒相應(yīng)的線程。
讀寫鎖分為兩個(gè)鎖對(duì)象 ReadLock 和 WriteLock,這兩個(gè)鎖對(duì)象共享同一個(gè) AQS。AQS 的鎖計(jì)數(shù)變量 state 將分為兩個(gè)部分,前 16bit 為共享鎖 ReadLock 計(jì)數(shù),后 16bit 為互斥鎖 WriteLock 計(jì)數(shù)。互斥鎖記錄的是當(dāng)前寫鎖重入的次數(shù),共享鎖記錄的是所有當(dāng)前持有共享讀鎖的線程重入總次數(shù)。
讀寫鎖同樣也需要考慮公平鎖和非公平鎖。共享鎖和互斥鎖的公平鎖策略和 ReentrantLock 一樣,就是看看當(dāng)前還有沒有其它線程在排隊(duì),自己會(huì)乖乖排到隊(duì)尾。非公平鎖策略不一樣,它會(huì)比較偏向于給寫鎖提供更多的機(jī)會(huì)。如果當(dāng)前 AQS 隊(duì)列里有任何讀寫請(qǐng)求的線程在排隊(duì),那么寫鎖可以直接去爭(zhēng)搶,但是如果隊(duì)頭是寫鎖請(qǐng)求,那么讀鎖需要將機(jī)會(huì)讓給寫鎖,去隊(duì)尾排隊(duì)。
畢竟讀寫鎖適合讀多寫少的場(chǎng)合,對(duì)于偶爾出現(xiàn)一個(gè)寫鎖請(qǐng)求就應(yīng)該得到更高的優(yōu)先級(jí)去處理。
讀寫鎖的寫鎖加鎖在整體邏輯上和 ReentrantLock 是一樣的,不同的是 tryAcquire() 方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
寫鎖也需要考慮可重入,如果當(dāng)前 AQS 互斥鎖的持有線程正好是當(dāng)前要加鎖的線程,那么就是寫鎖在重入,重入只需要遞增鎖計(jì)數(shù)值即可。當(dāng) c!=0 也就是鎖計(jì)數(shù)不為零時(shí),既可能是因?yàn)楫?dāng)前的 AQS 有讀鎖也可能是因?yàn)橛袑戞i,判斷 w == 0 就是判斷當(dāng)前的計(jì)數(shù)是不是讀鎖帶來的。
如果計(jì)數(shù)值為零,那就開始爭(zhēng)搶鎖。取決于鎖是否公平,在爭(zhēng)搶之前調(diào)用 writerShouldBlock() 方法看看自己是否需要排隊(duì),如果不需要排隊(duì),就可以使用 CAS 操作來爭(zhēng)搶,成功將計(jì)數(shù)值從 0 設(shè)置為 1 的線程將獨(dú)占寫鎖。
讀鎖加鎖過程比寫鎖要復(fù)雜很多,它在整體流程上和寫鎖一樣,但是細(xì)節(jié)差距很大。特別是它需要為每一個(gè)線程記錄讀鎖計(jì)數(shù),這部分邏輯占據(jù)了不少代碼。
public final void acquireShared(int arg) {
// 如果嘗試加鎖不成功, 就去排隊(duì)休眠,然后循環(huán)重試
if (tryAcquireShared(arg) < 0)
// 排隊(duì)、循環(huán)重試
doAcquireShared(arg);
}
如果當(dāng)前線程已經(jīng)持有寫鎖,它還可以繼續(xù)加讀鎖,這是為了達(dá)成鎖降級(jí)必須支持的邏輯。鎖降級(jí)是指在持有寫鎖的情況下,再加讀鎖,再解寫鎖。相比于先寫解鎖再加讀鎖而言,這樣可以省去加鎖二次排隊(duì)的過程。因?yàn)殒i降級(jí)的存在,鎖計(jì)數(shù)中讀寫計(jì)數(shù)可以同時(shí)不為零。
wlock.lock();
if(whatever) {
// 降級(jí)
rlock.lock();
wlock.unlock();
doRead();
rlock.unlock();
} else {
// 不降級(jí)
doWrite()
wlock.unlock();
}
為了給每一個(gè)讀鎖線程進(jìn)行鎖計(jì)數(shù),它設(shè)置了一個(gè) ThreadLocal 變量。
private transient ThreadLocalHoldCounter readHolds;
static final class HoldCounter {
int count;
final long tid = LockSupport.getThreadId(Thread.currentThread());
}
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
但是 ThreadLocal 變量訪問起來效率不夠高,所以又設(shè)置了緩存。它會(huì)存儲(chǔ)最近一次獲取讀鎖線程的鎖計(jì)數(shù)。在線程爭(zhēng)用不是特別頻繁的情況下,直接讀取緩存會(huì)比較高效。
private transient HoldCounter cachedHoldCounter;
Dough Lea 覺得使用 cachedHoldCounter 還是不夠高效,所以又加了一層緩存記錄 firstReader,記錄第一個(gè)將讀鎖計(jì)數(shù)從 0 變成 1 的線程以及鎖計(jì)數(shù)。當(dāng)沒有線程爭(zhēng)用時(shí),直接讀取這兩個(gè)字段會(huì)更加高效。
private transient Thread firstReader;
private transient int firstReaderHoldCount;
final int getReadHoldCount() {
// 先訪問鎖全局計(jì)數(shù)的讀計(jì)數(shù)部分
if (getReadLockCount() == 0)
return 0;
// 再訪問 firstReader
Thread current = Thread.currentThread();
if (firstReader == current)
return firstReaderHoldCount;
// 再訪問最近的讀線程鎖計(jì)數(shù)
HoldCounter rh = cachedHoldCounter;
if (rh != null && rh.tid == LockSupport.getThreadId(current))
return rh.count;
// 無奈讀 ThreadLocal 吧
int count = readHolds.get().count;
if (count == 0) readHolds.remove();
return count;
}
所以我們看到為了記錄這個(gè)讀鎖計(jì)數(shù)作者煞費(fèi)苦心,那這個(gè)讀計(jì)數(shù)的作用是什么呢?那就是線程可以通過這個(gè)計(jì)數(shù)值知道自己有沒有持有這個(gè)讀寫鎖。
讀加鎖還有一個(gè)自旋的過程,所謂自旋就是第一次加鎖失敗,那就直接循環(huán)重試,不休眠,聽起來有點(diǎn)像死循環(huán)重試法。
final static int SHARED_UNIT = 65536
// 讀計(jì)數(shù)是高16位
final int fullTryAcquireShared(Thread current) {
for(;;) {
int c = getState();
// 如果有其它線程加了寫鎖,還是返回睡覺去吧
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
...
// 超出計(jì)數(shù)上限
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 拿到讀鎖了
...
return 1
}
...
// 循環(huán)重試
}
}
因?yàn)樽x鎖需要使用 CAS 操作來修改底層鎖的總讀計(jì)數(shù)值,成功的才可以獲得讀鎖,獲取讀鎖的 CAS 操作失敗只是意味著讀鎖之間存在 CAS 操作的競(jìng)爭(zhēng),并不意味著此刻鎖被別人占據(jù)了自己不能獲得。多試幾次肯定可以加鎖成功,這就是自旋的原因所在。同樣在釋放讀鎖的時(shí)候也有一個(gè) CAS 操作的循環(huán)重試過程。
protected final boolean tryReleaseShared(int unused) {
...
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) {
return nextc == 0;
}
}
...
}
關(guān)于“Java并發(fā)數(shù)據(jù)結(jié)構(gòu)的基石是什么”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對(duì)“Java并發(fā)數(shù)據(jù)結(jié)構(gòu)的基石是什么”知識(shí)都有一定的了解,大家如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。