溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

深入理解AbstractQueuedSynchronizer(AQS)

發(fā)布時間:2020-06-02 07:19:20 來源:網(wǎng)絡 閱讀:1188 作者:Java筆記丶 欄目:編程語言
本人免費整理了Java高級資料,涵蓋了Java、RedisMongoDB、MySQL、Zookeeper、Spring Cloud、Dubbo高并發(fā)分布式等教程,一共30G,需要自己領取。
傳送門:https://mp.weixin.qq.com/s/JzddfH-7yNudmkjT0IRL8Q

1. AQS簡介

在上一篇文章中我們對lock和AbstractQueuedSynchronizer(AQS)有了初步的認識。在同步組件的實現(xiàn)中,AQS是核心部分,同步組件的實現(xiàn)者通過使用AQS提供的模板方法實現(xiàn)同步組件語義,AQS則實現(xiàn)了對同步狀態(tài)的管理,以及對阻塞線程進行排隊,等待通知等等一些底層的實現(xiàn)處理。AQS的核心也包括了這些方面:同步隊列,獨占式鎖的獲取和釋放,共享鎖的獲取和釋放以及可中斷鎖,超時等待鎖獲取這些特性的實現(xiàn),而這些實際上則是AQS提供出來的模板方法,歸納整理如下:

獨占式鎖:

void acquire(int arg):獨占式獲取同步狀態(tài),如果獲取失敗則插入同步隊列進行等待; void acquireInterruptibly(int arg):與acquire方法相同,但在同步隊列中進行等待的時候可以檢測中斷; boolean tryAcquireNanos(int arg, long nanosTimeout):在acquireInterruptibly基礎上增加了超時等待功能,在超時時間內(nèi)沒有獲得同步狀態(tài)返回false; boolean release(int arg):釋放同步狀態(tài),該方法會喚醒在同步隊列中的下一個節(jié)點

共享式鎖:

void acquireShared(int arg):共享式獲取同步狀態(tài),與獨占式的區(qū)別在于同一時刻有多個線程獲取同步狀態(tài); void acquireSharedInterruptibly(int arg):在acquireShared方法基礎上增加了能響應中斷的功能; boolean tryAcquireSharedNanos(int arg, long nanosTimeout):在acquireSharedInterruptibly基礎上增加了超時等待的功能; boolean releaseShared(int arg):共享式釋放同步狀態(tài)

要想掌握AQS的底層實現(xiàn),其實也就是對這些模板方法的邏輯進行學習。在學習這些模板方法之前,我們得首先了解下AQS中的同步隊列是一種什么樣的數(shù)據(jù)結(jié)構(gòu),因為同步隊列是AQS對同步狀態(tài)的管理的基石。

2. 同步隊列

當共享資源被某個線程占有,其他請求該資源的線程將會阻塞,從而進入同步隊列。就數(shù)據(jù)結(jié)構(gòu)而言,隊列的實現(xiàn)方式無外乎兩者一是通過數(shù)組的形式,另外一種則是鏈表的形式。AQS中的同步隊列則是通過鏈式方式進行實現(xiàn)。接下來,很顯然我們至少會抱有這樣的疑問:1. 節(jié)點的數(shù)據(jù)結(jié)構(gòu)是什么樣的?2. 是單向還是雙向?3. 是帶頭結(jié)點的還是不帶頭節(jié)點的?我們依舊先是通過看源碼的方式。

在AQS有一個靜態(tài)內(nèi)部類Node,其中有這樣一些屬性:

volatile int waitStatus //節(jié)點狀態(tài) volatile Node prev //當前節(jié)點/線程的前驅(qū)節(jié)點 volatile Node next; //當前節(jié)點/線程的后繼節(jié)點 volatile Thread thread;//加入同步隊列的線程引用 Node nextWaiter;//等待隊列中的下一個節(jié)點

節(jié)點的狀態(tài)有以下這些:

int CANCELLED = 1//節(jié)點從同步隊列中取消 int SIGNAL = -1//后繼節(jié)點的線程處于等待狀態(tài),如果當前節(jié)點釋放同步狀態(tài)會通知后繼節(jié)點,使得后繼節(jié)點的線程能夠運行; int CONDITION = -2//當前節(jié)點進入等待隊列中 int PROPAGATE = -3//表示下一次共享式同步狀態(tài)獲取將會無條件傳播下去 int INITIAL = 0;//初始狀態(tài)

現(xiàn)在我們知道了節(jié)點的數(shù)據(jù)結(jié)構(gòu)類型,并且每個節(jié)點擁有其前驅(qū)和后繼節(jié)點,很顯然這是一個雙向隊列。同樣的我們可以用一段demo看一下。

public?class?LockDemo?{
????private?static?ReentrantLock?lock?=?new?ReentrantLock();

????public?static?void?main(String[]?args)?{
????????for?(int?i?=?0;?i?<?5;?i++)?{
????????????Thread?thread?=?new?Thread(()?->?{
????????????????lock.lock();
????????????????try?{
????????????????????Thread.sleep(10000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?finally?{
????????????????????lock.unlock();
????????????????}
????????????});
????????????thread.start();
????????}
????}
}

實例代碼中開啟了5個線程,先獲取鎖之后再睡眠10S中,實際上這里讓線程睡眠是想模擬出當線程無法獲取鎖時進入同步隊列的情況。通過debug,當Thread-4(在本例中最后一個線程)獲取鎖失敗后進入同步時,AQS時現(xiàn)在的同步隊列如圖所示:


深入理解AbstractQueuedSynchronizer(AQS)


Thread-0先獲得鎖后進行睡眠,其他線程(Thread-1,Thread-2,Thread-3,Thread-4)獲取鎖失敗進入同步隊列,同時也可以很清楚的看出來每個節(jié)點有兩個域:prev(前驅(qū))和next(后繼),并且每個節(jié)點用來保存獲取同步狀態(tài)失敗的線程引用以及等待狀態(tài)等信息。另外AQS中有兩個重要的成員變量:

private?transient?volatile?Node?head;
private?transient?volatile?Node?tail;

也就是說AQS實際上通過頭尾指針來管理同步隊列,同時實現(xiàn)包括獲取鎖失敗的線程進行入隊,釋放鎖時對同步隊列中的線程進行通知等核心方法。其示意圖如下:


深入理解AbstractQueuedSynchronizer(AQS)

通過對源碼的理解以及做實驗的方式,現(xiàn)在我們可以清楚的知道這樣幾點:

  1. 節(jié)點的數(shù)據(jù)結(jié)構(gòu),即AQS的靜態(tài)內(nèi)部類Node,節(jié)點的等待狀態(tài)等信息

  2. 同步隊列是一個雙向隊列,AQS通過持有頭尾指針管理同步隊列;

那么,節(jié)點如何進行入隊和出隊是怎樣做的了?實際上這對應著鎖的獲取和釋放兩個操作:獲取鎖失敗進行入隊操作,獲取鎖成功進行出隊操作。

3. 獨占鎖

3.1 獨占鎖的獲?。╝cquire方法)

我們繼續(xù)通過看源碼和debug的方式來看,還是以上面的demo為例,調(diào)用lock()方法是獲取獨占式鎖,獲取失敗就將當前線程加入同步隊列,成功則線程執(zhí)行。而lock()方法實際上會調(diào)用AQS的acquire()方法,源碼如下

public?final?void?acquire(int?arg)?{
????????//先看同步狀態(tài)是否獲取成功,如果成功則方法結(jié)束返回
????????//若失敗則先調(diào)用addWaiter()方法再調(diào)用acquireQueued()方法
????????if?(!tryAcquire(arg)?&&
????????????acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))
????????????selfInterrupt();
}

關鍵信息請看注釋,acquire根據(jù)當前獲得同步狀態(tài)成功與否做了兩件事情:1. 成功,則方法結(jié)束返回,2. 失敗,則先調(diào)用addWaiter()然后在調(diào)用acquireQueued()方法。

獲取同步狀態(tài)失敗,入隊操作

當線程獲取獨占式鎖失敗后就會將當前線程加入同步隊列,那么加入隊列的方式是怎樣的了?我們接下來就應該去研究一下addWaiter()和acquireQueued()。addWaiter()源碼如下:

private?Node?addWaiter(Node?mode)?{
????????//?1\.?將當前線程構(gòu)建成Node類型
????????Node?node?=?new?Node(Thread.currentThread(),?mode);
????????//?Try?the?fast?path?of?enq;?backup?to?full?enq?on?failure
????????//?2\.?當前尾節(jié)點是否為null?
????????Node?pred?=?tail;
????????if?(pred?!=?null)?{
????????????//?2.2?將當前節(jié)點尾插入的方式插入同步隊列中
????????????node.prev?=?pred;
????????????if?(compareAndSetTail(pred,?node))?{
????????????????pred.next?=?node;
????????????????return?node;
????????????}
????????}
????????//?2.1\.?當前同步隊列尾節(jié)點為null,說明當前線程是第一個加入同步隊列進行等待的線程
????????enq(node);
????????return?node;
}

分析可以看上面的注釋。程序的邏輯主要分為兩個部分:1. 當前同步隊列的尾節(jié)點為null,調(diào)用方法enq()插入;2. 當前隊列的尾節(jié)點不為null,則采用尾插入(compareAndSetTail()方法)的方式入隊。另外還會有另外一個問題:如果?if (compareAndSetTail(pred, node))為false怎么辦?會繼續(xù)執(zhí)行到enq()方法,同時很明顯compareAndSetTail是一個CAS操作,通常來說如果CAS操作失敗會繼續(xù)自旋(死循環(huán))進行重試。因此,經(jīng)過我們這樣的分析,enq()方法可能承擔兩個任務:1. 處理當前同步隊列尾節(jié)點為null時進行入隊操作;2. 如果CAS尾插入節(jié)點失敗后負責自旋進行嘗試。那么是不是真的就像我們分析的一樣了?只有源碼會告訴我們答案:),enq()源碼如下:

private?Node?enq(final?Node?node)?{
????????for?(;;)?{
????????????Node?t?=?tail;
????????????if?(t?==?null)?{?//?Must?initialize
????????????????//1\.?構(gòu)造頭結(jié)點
????????????????if?(compareAndSetHead(new?Node()))
????????????????????tail?=?head;
????????????}?else?{
????????????????//?2\.?尾插入,CAS操作失敗自旋嘗試
????????????????node.prev?=?t;
????????????????if?(compareAndSetTail(t,?node))?{
????????????????????t.next?=?node;
????????????????????return?t;
????????????????}
????????????}
????????}
}

在上面的分析中我們可以看出在第1步中會先創(chuàng)建頭結(jié)點,說明同步隊列是帶頭結(jié)點的鏈式存儲結(jié)構(gòu)。帶頭結(jié)點與不帶頭結(jié)點相比,會在入隊和出隊的操作中獲得更大的便捷性,因此同步隊列選擇了帶頭結(jié)點的鏈式存儲結(jié)構(gòu)。那么帶頭節(jié)點的隊列初始化時機是什么?自然而然是在tail為null時,即當前線程是第一次插入同步隊列。compareAndSetTail(t, node)方法會利用CAS操作設置尾節(jié)點,如果CAS操作失敗會在for (;;)for死循環(huán)中不斷嘗試,直至成功return返回為止。因此,對enq()方法可以做這樣的總結(jié):

  1. 在當前線程是第一個加入同步隊列時,調(diào)用compareAndSetHead(new Node())方法,完成鏈式隊列的頭結(jié)點的初始化;

  2. 自旋不斷嘗試CAS尾插入節(jié)點直至成功為止。

現(xiàn)在我們已經(jīng)很清楚獲取獨占式鎖失敗的線程包裝成Node然后插入同步隊列的過程了?那么緊接著會有下一個問題?在同步隊列中的節(jié)點(線程)會做什么事情了來保證自己能夠有機會獲得獨占式鎖了?帶著這樣的問題我們就來看看acquireQueued()方法,從方法名就可以很清楚,這個方法的作用就是排隊獲取鎖的過程,源碼如下:

final?boolean?acquireQueued(final?Node?node,?int?arg)?{
????????boolean?failed?=?true;
????????try?{
????????????boolean?interrupted?=?false;
????????????for?(;;)?{
????????????????//?1\.?獲得當前節(jié)點的先驅(qū)節(jié)點
????????????????final?Node?p?=?node.predecessor();
????????????????//?2\.?當前節(jié)點能否獲取獨占式鎖?????????????????
????????????????//?2.1?如果當前節(jié)點的先驅(qū)節(jié)點是頭結(jié)點并且成功獲取同步狀態(tài),即可以獲得獨占式鎖
????????????????if?(p?==?head?&&?tryAcquire(arg))?{
????????????????????//隊列頭指針用指向當前節(jié)點
????????????????????setHead(node);
????????????????????//釋放前驅(qū)節(jié)點
????????????????????p.next?=?null;?//?help?GC
????????????????????failed?=?false;
????????????????????return?interrupted;
????????????????}
????????????????//?2.2?獲取鎖失敗,線程進入等待狀態(tài)等待獲取獨占式鎖
????????????????if?(shouldParkAfterFailedAcquire(p,?node)?&&
????????????????????parkAndCheckInterrupt())
????????????????????interrupted?=?true;
????????????}
????????}?finally?{
????????????if?(failed)
????????????????cancelAcquire(node);
????????}
}

程序邏輯通過注釋已經(jīng)標出,整體來看這是一個這又是一個自旋的過程(for (;;)),代碼首先獲取當前節(jié)點的先驅(qū)節(jié)點,如果先驅(qū)節(jié)點是頭結(jié)點的并且成功獲得同步狀態(tài)的時候(if (p == head && tryAcquire(arg))),當前節(jié)點所指向的線程能夠獲取鎖。反之,獲取鎖失敗進入等待狀態(tài)。整體示意圖為下圖:


深入理解AbstractQueuedSynchronizer(AQS)

獲取鎖成功,出隊操作

獲取鎖的節(jié)點出隊的邏輯是:

//隊列頭結(jié)點引用指向當前節(jié)點
setHead(node);
//釋放前驅(qū)節(jié)點
p.next?=?null;?//?help?GC
failed?=?false;
return?interrupted;

setHead()方法為:

private?void?setHead(Node?node)?{
????????head?=?node;
????????node.thread?=?null;
????????node.prev?=?null;
}

將當前節(jié)點通過setHead()方法設置為隊列的頭結(jié)點,然后將之前的頭結(jié)點的next域設置為null并且pre域也為null,即與隊列斷開,無任何引用方便GC時能夠?qū)?nèi)存進行回收。示意圖如下:

深入理解AbstractQueuedSynchronizer(AQS)

那么當獲取鎖失敗的時候會調(diào)用shouldParkAfterFailedAcquire()方法和parkAndCheckInterrupt()方法,看看他們做了什么事情。shouldParkAfterFailedAcquire()方法源碼為:

private?static?boolean?shouldParkAfterFailedAcquire(Node?pred,?Node?node)?{
????int?ws?=?pred.waitStatus;
????if?(ws?==?Node.SIGNAL)
????????/*
?????????*?This?node?has?already?set?status?asking?a?release
?????????*?to?signal?it,?so?it?can?safely?park.
?????????*/
????????return?true;
????if?(ws?>?0)?{
????????/*
?????????*?Predecessor?was?cancelled.?Skip?over?predecessors?and
?????????*?indicate?retry.
?????????*/
????????do?{
????????????node.prev?=?pred?=?pred.prev;
????????}?while?(pred.waitStatus?>?0);
????????pred.next?=?node;
????}?else?{
????????/*
?????????*?waitStatus?must?be?0?or?PROPAGATE.??Indicate?that?we
?????????*?need?a?signal,?but?don't?park?yet.??Caller?will?need?to
?????????*?retry?to?make?sure?it?cannot?acquire?before?parking.
?????????*/
????????compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL);
????}
????return?false;
}

shouldParkAfterFailedAcquire()方法主要邏輯是使用compareAndSetWaitStatus(pred, ws, Node.SIGNAL)使用CAS將節(jié)點狀態(tài)由INITIAL設置成SIGNAL,表示當前線程阻塞。當compareAndSetWaitStatus設置失敗則說明shouldParkAfterFailedAcquire方法返回false,然后會在acquireQueued()方法中for (;;)死循環(huán)中會繼續(xù)重試,直至compareAndSetWaitStatus設置節(jié)點狀態(tài)位為SIGNAL時shouldParkAfterFailedAcquire返回true時才會執(zhí)行方法parkAndCheckInterrupt()方法,該方法的源碼為:

private?final?boolean?parkAndCheckInterrupt()?{
????????//使得該線程阻塞
????????LockSupport.park(this);
????????return?Thread.interrupted();
}

該方法的關鍵是會調(diào)用LookSupport.park()方法(關于LookSupport會在以后的文章進行討論),該方法是用來阻塞當前線程的。因此到這里就應該清楚了,acquireQueued()在自旋過程中主要完成了兩件事情:

  1. 如果當前節(jié)點的前驅(qū)節(jié)點是頭節(jié)點,并且能夠獲得同步狀態(tài)的話,當前線程能夠獲得鎖該方法執(zhí)行結(jié)束退出;

  2. 獲取鎖失敗的話,先將節(jié)點狀態(tài)設置成SIGNAL,然后調(diào)用LookSupport.park方法使得當前線程阻塞。

經(jīng)過上面的分析,獨占式鎖的獲取過程也就是acquire()方法的執(zhí)行流程如下圖所示:

深入理解AbstractQueuedSynchronizer(AQS)

3.2 獨占鎖的釋放(release()方法)

獨占鎖的釋放就相對來說比較容易理解了,廢話不多說先來看下源碼:

public?final?boolean?release(int?arg)?{
????????if?(tryRelease(arg))?{
????????????Node?h?=?head;
????????????if?(h?!=?null?&&?h.waitStatus?!=?0)
????????????????unparkSuccessor(h);
????????????return?true;
????????}
????????return?false;
}

這段代碼邏輯就比較容易理解了,如果同步狀態(tài)釋放成功(tryRelease返回true)則會執(zhí)行if塊中的代碼,當head指向的頭結(jié)點不為null,并且該節(jié)點的狀態(tài)值不為0的話才會執(zhí)行unparkSuccessor()方法。unparkSuccessor方法源碼:

private?void?unparkSuccessor(Node?node)?{
????/*
?????*?If?status?is?negative?(i.e.,?possibly?needing?signal)?try
?????*?to?clear?in?anticipation?of?signalling.??It?is?OK?if?this
?????*?fails?or?if?status?is?changed?by?waiting?thread.
?????*/
????int?ws?=?node.waitStatus;
????if?(ws?<?0)
????????compareAndSetWaitStatus(node,?ws,?0);

????/*
?????*?Thread?to?unpark?is?held?in?successor,?which?is?normally
?????*?just?the?next?node.??But?if?cancelled?or?apparently?null,
?????*?traverse?backwards?from?tail?to?find?the?actual
?????*?non-cancelled?successor.
?????*/

????//頭節(jié)點的后繼節(jié)點
????Node?s?=?node.next;
????if?(s?==?null?||?s.waitStatus?>?0)?{
????????s?=?null;
????????for?(Node?t?=?tail;?t?!=?null?&&?t?!=?node;?t?=?t.prev)
????????????if?(t.waitStatus?<=?0)
????????????????s?=?t;
????}
????if?(s?!=?null)
????????//后繼節(jié)點不為null時喚醒該線程
????????LockSupport.unpark(s.thread);
}

源碼的關鍵信息請看注釋,首先獲取頭節(jié)點的后繼節(jié)點,當后繼節(jié)點的時候會調(diào)用LookSupport.unpark()方法,該方法會喚醒該節(jié)點的后繼節(jié)點所包裝的線程。因此,每一次鎖釋放后就會喚醒隊列中該節(jié)點的后繼節(jié)點所引用的線程,從而進一步可以佐證獲得鎖的過程是一個FIFO(先進先出)的過程。

到現(xiàn)在我們終于啃下了一塊硬骨頭了,通過學習源碼的方式非常深刻的學習到了獨占式鎖的獲取和釋放的過程以及同步隊列??梢宰鲆幌驴偨Y(jié):

  1. 線程獲取鎖失敗,線程被封裝成Node進行入隊操作,核心方法在于addWaiter()和enq(),同時enq()完成對同步隊列的頭結(jié)點初始化工作以及CAS操作失敗的重試;

  2. 線程獲取鎖是一個自旋的過程,當且僅當 當前節(jié)點的前驅(qū)節(jié)點是頭結(jié)點并且成功獲得同步狀態(tài)時,節(jié)點出隊即該節(jié)點引用的線程獲得鎖,否則,當不滿足條件時就會調(diào)用LookSupport.park()方法使得線程阻塞;

  3. 釋放鎖的時候會喚醒后繼節(jié)點;

總體來說:在獲取同步狀態(tài)時,AQS維護一個同步隊列,獲取同步狀態(tài)失敗的線程會加入到隊列中進行自旋;移除隊列(或停止自旋)的條件是前驅(qū)節(jié)點是頭結(jié)點并且成功獲得了同步狀態(tài)。在釋放同步狀態(tài)時,同步器會調(diào)用unparkSuccessor()方法喚醒后繼節(jié)點。

獨占鎖特性學習

3.3 可中斷式獲取鎖(acquireInterruptibly方法)

我們知道lock相較于synchronized有一些更方便的特性,比如能響應中斷以及超時等待等特性,現(xiàn)在我們依舊采用通過學習源碼的方式來看看能夠響應中斷是怎么實現(xiàn)的??身憫袛嗍芥i可調(diào)用方法lock.lockInterruptibly();而該方法其底層會調(diào)用AQS的acquireInterruptibly方法,源碼為:

public?final?void?acquireInterruptibly(int?arg)
????????throws?InterruptedException?{
????if?(Thread.interrupted())
????????throw?new?InterruptedException();
????if?(!tryAcquire(arg))
????????//線程獲取鎖失敗
????????doAcquireInterruptibly(arg);
}

在獲取同步狀態(tài)失敗后就會調(diào)用doAcquireInterruptibly方法:

private?void?doAcquireInterruptibly(int?arg)
????throws?InterruptedException?{
????//將節(jié)點插入到同步隊列中
????final?Node?node?=?addWaiter(Node.EXCLUSIVE);
????boolean?failed?=?true;
????try?{
????????for?(;;)?{
????????????final?Node?p?=?node.predecessor();
????????????//獲取鎖出隊
????????????if?(p?==?head?&&?tryAcquire(arg))?{
????????????????setHead(node);
????????????????p.next?=?null;?//?help?GC
????????????????failed?=?false;
????????????????return;
????????????}
????????????if?(shouldParkAfterFailedAcquire(p,?node)?&&
????????????????parkAndCheckInterrupt())
????????????????//線程中斷拋異常
????????????????throw?new?InterruptedException();
????????}
????}?finally?{
????????if?(failed)
????????????cancelAcquire(node);
????}
}

關鍵信息請看注釋,現(xiàn)在看這段代碼就很輕松了吧:),與acquire方法邏輯幾乎一致,唯一的區(qū)別是當parkAndCheckInterrupt返回true時即線程阻塞時該線程被中斷,代碼拋出被中斷異常。

3.4 超時等待式獲取鎖(tryAcquireNanos()方法)

通過調(diào)用lock.tryLock(timeout,TimeUnit)方式達到超時等待獲取鎖的效果,該方法會在三種情況下才會返回:

  1. 在超時時間內(nèi),當前線程成功獲取了鎖;

  2. 當前線程在超時時間內(nèi)被中斷;

  3. 超時時間結(jié)束,仍未獲得鎖返回false。

我們?nèi)匀煌ㄟ^采取閱讀源碼的方式來學習底層具體是怎么實現(xiàn)的,該方法會調(diào)用AQS的方法tryAcquireNanos(),源碼為:

public?final?boolean?tryAcquireNanos(int?arg,?long?nanosTimeout)
????????throws?InterruptedException?{
????if?(Thread.interrupted())
????????throw?new?InterruptedException();
????return?tryAcquire(arg)?||
????????//實現(xiàn)超時等待的效果
????????doAcquireNanos(arg,?nanosTimeout);
}

很顯然這段源碼最終是靠doAcquireNanos方法實現(xiàn)超時等待的效果,該方法源碼如下:

private?boolean?doAcquireNanos(int?arg,?long?nanosTimeout)
????????throws?InterruptedException?{
????if?(nanosTimeout?<=?0L)
????????return?false;
????//1\.?根據(jù)超時時間和當前時間計算出截止時間
????final?long?deadline?=?System.nanoTime()?+?nanosTimeout;
????final?Node?node?=?addWaiter(Node.EXCLUSIVE);
????boolean?failed?=?true;
????try?{
????????for?(;;)?{
????????????final?Node?p?=?node.predecessor();
????????????//2\.?當前線程獲得鎖出隊列
????????????if?(p?==?head?&&?tryAcquire(arg))?{
????????????????setHead(node);
????????????????p.next?=?null;?//?help?GC
????????????????failed?=?false;
????????????????return?true;
????????????}
????????????//?3.1?重新計算超時時間
????????????nanosTimeout?=?deadline?-?System.nanoTime();
????????????//?3.2?已經(jīng)超時返回false
????????????if?(nanosTimeout?<=?0L)
????????????????return?false;
????????????//?3.3?線程阻塞等待?
????????????if?(shouldParkAfterFailedAcquire(p,?node)?&&
????????????????nanosTimeout?>?spinForTimeoutThreshold)
????????????????LockSupport.parkNanos(this,?nanosTimeout);
????????????//?3.4?線程被中斷拋出被中斷異常
????????????if?(Thread.interrupted())
????????????????throw?new?InterruptedException();
????????}
????}?finally?{
????????if?(failed)
????????????cancelAcquire(node);
????}
}

程序邏輯如圖所示:

深入理解AbstractQueuedSynchronizer(AQS)

程序邏輯同獨占鎖可響應中斷式獲取基本一致,唯一的不同在于獲取鎖失敗后,對超時時間的處理上,在第1步會先計算出按照現(xiàn)在時間和超時時間計算出理論上的截止時間,比如當前時間是8h20min,超時時間是10min,那么根據(jù)deadline = System.nanoTime() + nanosTimeout計算出剛好達到超時時間時的系統(tǒng)時間就是8h 10min+10min = 8h 20min。然后根據(jù)deadline - System.nanoTime()就可以判斷是否已經(jīng)超時了,比如,當前系統(tǒng)時間是8h 30min很明顯已經(jīng)超過了理論上的系統(tǒng)時間8h 20min,deadline - System.nanoTime()計算出來就是一個負數(shù),自然而然會在3.2步中的If判斷之間返回false。如果還沒有超時即3.2步中的if判斷為true時就會繼續(xù)執(zhí)行3.3步通過LockSupport.parkNanos使得當前線程阻塞,同時在3.4步增加了對中斷的檢測,若檢測出被中斷直接拋出被中斷異常。

4. 共享鎖

4.1 共享鎖的獲取(acquireShared()方法)

在聊完AQS對獨占鎖的實現(xiàn)后,我們繼續(xù)一鼓作氣的來看看共享鎖是怎樣實現(xiàn)的?共享鎖的獲取方法為acquireShared,源碼為:

public?final?void?acquireShared(int?arg)?{
????if?(tryAcquireShared(arg)?<?0)
????????doAcquireShared(arg);
}

這段源碼的邏輯很容易理解,在該方法中會首先調(diào)用tryAcquireShared方法,tryAcquireShared返回值是一個int類型,當返回值為大于等于0的時候方法結(jié)束說明獲得成功獲取鎖,否則,表明獲取同步狀態(tài)失敗即所引用的線程獲取鎖失敗,會執(zhí)行doAcquireShared方法,該方法的源碼為:

private?void?doAcquireShared(int?arg)?{
????final?Node?node?=?addWaiter(Node.SHARED);
????boolean?failed?=?true;
????try?{
????????boolean?interrupted?=?false;
????????for?(;;)?{
????????????final?Node?p?=?node.predecessor();
????????????if?(p?==?head)?{
????????????????int?r?=?tryAcquireShared(arg);
????????????????if?(r?>=?0)?{
????????????????????//?當該節(jié)點的前驅(qū)節(jié)點是頭結(jié)點且成功獲取同步狀態(tài)
????????????????????setHeadAndPropagate(node,?r);
????????????????????p.next?=?null;?//?help?GC
????????????????????if?(interrupted)
????????????????????????selfInterrupt();
????????????????????failed?=?false;
????????????????????return;
????????????????}
????????????}
????????????if?(shouldParkAfterFailedAcquire(p,?node)?&&
????????????????parkAndCheckInterrupt())
????????????????interrupted?=?true;
????????}
????}?finally?{
????????if?(failed)
????????????cancelAcquire(node);
????}
}

現(xiàn)在來看這段代碼會不會很容易了?邏輯幾乎和獨占式鎖的獲取一模一樣,這里的自旋過程中能夠退出的條件是當前節(jié)點的前驅(qū)節(jié)點是頭結(jié)點并且tryAcquireShared(arg)返回值大于等于0即能成功獲得同步狀態(tài)

4.2 共享鎖的釋放(releaseShared()方法)

共享鎖的釋放在AQS中會調(diào)用方法releaseShared:

public?final?boolean?releaseShared(int?arg)?{
????if?(tryReleaseShared(arg))?{
????????doReleaseShared();
????????return?true;
????}
????return?false;
}

當成功釋放同步狀態(tài)之后即tryReleaseShared會繼續(xù)執(zhí)行doReleaseShared方法:

private?void?doReleaseShared()?{
????/*
?????*?Ensure?that?a?release?propagates,?even?if?there?are?other
?????*?in-progress?acquires/releases.??This?proceeds?in?the?usual
?????*?way?of?trying?to?unparkSuccessor?of?head?if?it?needs
?????*?signal.?But?if?it?does?not,?status?is?set?to?PROPAGATE?to
?????*?ensure?that?upon?release,?propagation?continues.
?????*?Additionally,?we?must?loop?in?case?a?new?node?is?added
?????*?while?we?are?doing?this.?Also,?unlike?other?uses?of
?????*?unparkSuccessor,?we?need?to?know?if?CAS?to?reset?status
?????*?fails,?if?so?rechecking.
?????*/
????for?(;;)?{
????????Node?h?=?head;
????????if?(h?!=?null?&&?h?!=?tail)?{
????????????int?ws?=?h.waitStatus;
????????????if?(ws?==?Node.SIGNAL)?{
????????????????if?(!compareAndSetWaitStatus(h,?Node.SIGNAL,?0))
????????????????????continue;????????????//?loop?to?recheck?cases
????????????????unparkSuccessor(h);
????????????}
????????????else?if?(ws?==?0?&&
?????????????????????!compareAndSetWaitStatus(h,?0,?Node.PROPAGATE))
????????????????continue;????????????????//?loop?on?failed?CAS
????????}
????????if?(h?==?head)???????????????????//?loop?if?head?changed
????????????break;
????}
}

這段方法跟獨占式鎖釋放過程有點點不同,在共享式鎖的釋放過程中,對于能夠支持多個線程同時訪問的并發(fā)組件,必須保證多個線程能夠安全的釋放同步狀態(tài),這里采用的CAS保證,當CAS操作失敗continue,在下一次循環(huán)中進行重試。

4.3 可中斷(acquireSharedInterruptibly()方法),超時等待(tryAcquireSharedNanos()方法)

關于可中斷鎖以及超時等待的特性其實現(xiàn)和獨占式鎖可中斷獲取鎖以及超時等待的實現(xiàn)幾乎一致,具體的就不再說了,如果理解了上面的內(nèi)容對這部分的理解也是水到渠成的。

通過這篇,加深了對AQS的底層實現(xiàn)更加清楚了,也對了解并發(fā)組件的實現(xiàn)原理打下了基礎,學無止境,繼續(xù)加油:);如果覺得不錯,請給贊,嘿嘿。


向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI