您好,登錄后才能下訂單哦!
這篇文章主要講解了“怎么掌握AQS”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“怎么掌握AQS”吧!
鎖原理 - 信號(hào)量 vs 管程
在并發(fā)編程領(lǐng)域,有兩大核心問題:互斥與同步,互斥即同一時(shí)刻只允許一個(gè)線程訪問共享資源,同步,即線程之間如何通信、協(xié)作,一般這兩大問題可以通過信號(hào)量和管程來解決。
信號(hào)量
信號(hào)量(Semaphore)是操作系統(tǒng)提供的一種進(jìn)程間常見的通信方式,主要用來協(xié)調(diào)并發(fā)程序?qū)蚕碣Y源的訪問,操作系統(tǒng)可以保證對(duì)信號(hào)量操作的原子性。它是怎么實(shí)現(xiàn)的呢。
信號(hào)量由一個(gè)共享整型變量 S 和兩個(gè)原子操作 PV 組成,S 只能通過 P 和 V 操作來改變
P 操作:即請(qǐng)求資源,意味著 S 要減 1,如果 S < 0, 則表示沒有資源了,此時(shí)線程要進(jìn)入等待隊(duì)列(同步隊(duì)列)等待
V 操作: 即釋放資源,意味著 S 要加 1, 如果 S 小于等于 0,說明等待隊(duì)列里有線程,此時(shí)就需要喚醒線程。
示意圖如下
信號(hào)量機(jī)制的引入解決了進(jìn)程同步和互斥問題,但信號(hào)量的大量同步操作分散在各個(gè)進(jìn)程中不便于管理,還有可能導(dǎo)致系統(tǒng)死鎖。如:生產(chǎn)者消費(fèi)者問題中將P、V顛倒可能死鎖(見文末參考鏈接),另外條件越多,需要的信號(hào)量就越多,需要更加謹(jǐn)慎地處理信號(hào)量之間的處理順序,否則很容易造成死鎖現(xiàn)象。
基于信號(hào)量給編程帶來的隱患,于是有了提出了對(duì)開發(fā)者更加友好的并發(fā)編程模型-管程
管程
Dijkstra 于 1971 年提出:把所有進(jìn)程對(duì)某一種臨界資源的同步操作都集中起來,構(gòu)成一個(gè)所謂的秘書進(jìn)程。凡要訪問該臨界資源的進(jìn)程,都需先報(bào)告秘書,由秘書來實(shí)現(xiàn)諸進(jìn)程對(duì)同一臨界資源的互斥使用,這種機(jī)制就是管程。
管程是一種在信號(hào)量機(jī)制上進(jìn)行改進(jìn)的并發(fā)編程模型,解決了信號(hào)量在臨界區(qū)的 PV 操作上配對(duì)的麻煩,把配對(duì)的 PV 操作集中在一起而形成的并發(fā)編程方法理論,極大降低了使用和理解成本。
鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)
管程由四部分組成:
管程內(nèi)部的共享變量。
管程內(nèi)部的條件變量。
管程內(nèi)部并行執(zhí)行的進(jìn)程。
對(duì)于局部與管程內(nèi)部的共享數(shù)據(jù)設(shè)置初始值的語句。
由此可見,管程就是一個(gè)對(duì)象監(jiān)視器。任何線程想要訪問該資源(共享變量),就要排隊(duì)進(jìn)入監(jiān)控范圍。進(jìn)入之后,接受檢查,不符合條件,則要繼續(xù)等待,直到被通知,然后繼續(xù)進(jìn)入監(jiān)視器。
需要注意的事,信號(hào)量和管程兩者是等價(jià)的,信號(hào)量可以實(shí)現(xiàn)管程,管程也可以實(shí)現(xiàn)信號(hào)量,只是兩者的表現(xiàn)形式不同而已,管程對(duì)開發(fā)者更加友好。
兩者的區(qū)別如下
管程為了解決信號(hào)量在臨界區(qū)的 PV 操作上的配對(duì)的麻煩,把配對(duì)的 PV 操作集中在一起,并且加入了條件變量的概念,使得在多條件下線程間的同步實(shí)現(xiàn)變得更加簡單。
怎么理解管程中的入口等待隊(duì)列,共享變量,條件變量等概念,有時(shí)候技術(shù)上的概念較難理解,我們可以借助生活中的場(chǎng)景來幫助我們理解,就以我們的就醫(yī)場(chǎng)景為例來簡單說明一下,正常的就醫(yī)流程如下:
病人去掛號(hào)后,去侯診室等待叫號(hào)
叫到自己時(shí),就可以進(jìn)入就診室就診了
就診時(shí),有兩種情況,一種是醫(yī)生很快就確定病人的病,并作出診斷,診斷完成后,就通知下一位病人進(jìn)來就診,一種是醫(yī)生無法確定病因,需要病人去做個(gè)驗(yàn)血 / CT 檢查才能確定病情,于是病人就先去驗(yàn)個(gè)血 / CT
病人驗(yàn)完血 / 做完 CT 后,重新取號(hào),等待叫號(hào)(進(jìn)入入口等待隊(duì)列)
病人等到自己的號(hào),病人又重新拿著驗(yàn)血 / CT 報(bào)告去找醫(yī)生就診
整個(gè)流程如下
那么管程是如何解決互斥和同步的呢
首先來看互斥,上文中醫(yī)生即共享資源(也即共享變量),就診室即為臨界區(qū),病人即線程,任何病人如果想要訪問臨界區(qū),必須首先獲取共享資源(即醫(yī)生),入口一次只允許一個(gè)線程經(jīng)過,在共享資源被占有的情況下,如果再有線程想占有共享資源,就需要到等待隊(duì)列去等候,等到獲取共享資源的線程釋放資源后,等待隊(duì)列中的線程就可以去競(jìng)爭共享資源了,這樣就解決了互斥問題,所以本質(zhì)上管程是通過將共享資源及其對(duì)共享資源的操作(線程安全地獲取和釋放)封裝起來來保證互斥性的。
再來看同步,同步是通過文中的條件變量及其等待隊(duì)列實(shí)現(xiàn)的,同步的實(shí)現(xiàn)分兩種情況
病人進(jìn)入就診室后,無需做驗(yàn)血 / CT 等操作,于是醫(yī)生診斷完成后,就會(huì)釋放共享資源(解鎖)去通知(notify,notifyAll)入口等待隊(duì)列的下一個(gè)病人,下一個(gè)病人聽到叫號(hào)后就能看醫(yī)生了。
如果病人進(jìn)入就診室后需要做驗(yàn)血 / CT 等操作,會(huì)去驗(yàn)血 / CT 隊(duì)列(條件隊(duì)列)排隊(duì), 同時(shí)釋放共享變量(醫(yī)生),通知入口等待隊(duì)列的其他病人(線程)去獲取共享變量(醫(yī)生),獲得許可的線程執(zhí)行完臨界區(qū)的邏輯后會(huì)喚醒條件變量等待隊(duì)列中的線程,將它放到入口等待隊(duì)列中 ,等到其獲取共享變量(醫(yī)生)時(shí),即可進(jìn)入入口(臨界區(qū))處理。
在 Java 里,鎖大多是依賴于管程來實(shí)現(xiàn)的,以大家熟悉的內(nèi)置鎖 synchronized 為例,它的實(shí)現(xiàn)原理如下。
可以看到 synchronized 鎖也是基于管程實(shí)現(xiàn)的,只不過它只有且只有一個(gè)條件變量(就是鎖對(duì)象本身)而已,這也是為什么JDK 要實(shí)現(xiàn) Lock 鎖的原因之一,Lock 支持多個(gè)條件變量。
通過這樣的類比,相信大家對(duì)管程的工作機(jī)制有了比較清晰的認(rèn)識(shí),為啥要花這么大的力氣介紹管程呢,一來管程是解決并發(fā)問題的萬能鑰匙,二來 AQS 是基于 Java 并發(fā)包中管程的一種實(shí)現(xiàn),所以理解管程對(duì)我們理解 AQS 會(huì)大有幫助,接下來我們就來看看 AQS 是如何工作的。
AQS 實(shí)現(xiàn)原理
AQS 全稱是 AbstractQueuedSynchronizer,是一個(gè)用來構(gòu)建鎖和同步器的框架,它維護(hù)了一個(gè)共享資源 state 和一個(gè) FIFO 的等待隊(duì)列(即上文中管程的入口等待隊(duì)列),底層利用了 CAS 機(jī)制來保證操作的原子性。
AQS 實(shí)現(xiàn)鎖的主要原理如下:
以實(shí)現(xiàn)獨(dú)占鎖為例(即當(dāng)前資源只能被一個(gè)線程占有),其實(shí)現(xiàn)原理如下:state 初始化 0,在多線程條件下,線程要執(zhí)行臨界區(qū)的代碼,必須首先獲取 state,某個(gè)線程獲取成功之后, state 加 1,其他線程再獲取的話由于共享資源已被占用,所以會(huì)到 FIFO 等待隊(duì)列去等待,等占有 state 的線程執(zhí)行完臨界區(qū)的代碼釋放資源( state 減 1)后,會(huì)喚醒 FIFO 中的下一個(gè)等待線程(head 中的下一個(gè)結(jié)點(diǎn))去獲取 state。
state 由于是多線程共享變量,所以必須定義成 volatile,以保證 state 的可見性, 同時(shí)雖然 volatile 能保證可見性,但不能保證原子性,所以 AQS 提供了對(duì) state 的原子操作方法,保證了線程安全。
另外 AQS 中實(shí)現(xiàn)的 FIFO 隊(duì)列(CLH 隊(duì)列)其實(shí)是雙向鏈表實(shí)現(xiàn)的,由 head, tail 節(jié)點(diǎn)表示,head 結(jié)點(diǎn)代表當(dāng)前占用的線程,其他節(jié)點(diǎn)由于暫時(shí)獲取不到鎖所以依次排隊(duì)等待鎖釋放。
所以我們不難明白 AQS 的如下定義:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { // 以下為雙向鏈表的首尾結(jié)點(diǎn),代表入口等待隊(duì)列 private transient volatile Node head; private transient volatile Node tail; // 共享變量 state private volatile int state; // cas 獲取 / 釋放 state,保證線程安全地獲取鎖 protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } // ... }
AQS 源碼
剖析ReentrantLock 是我們比較常用的一種鎖,也是基于 AQS 實(shí)現(xiàn)的,所以接下來我們就來分析一下 ReentrantLock 鎖的實(shí)現(xiàn)來一探 AQS 究竟。本文將會(huì)采用圖文并茂的方式讓大家理解 AQS 的實(shí)現(xiàn)原理,大家在學(xué)習(xí)過程中,可以多類比一下上文中就診的例子,相信會(huì)有助于理解。
首先我們要知道 ReentrantLock 是獨(dú)占鎖,也有公平和非公平兩種鎖模式,什么是獨(dú)占與有共享模式,什么又是公平鎖與非公平鎖
與獨(dú)占鎖對(duì)應(yīng)的是共享鎖,這兩者有什么區(qū)別呢
獨(dú)占鎖:即其他線程只有在占有鎖的線程釋放后才能競(jìng)爭鎖,有且只有一個(gè)線程能競(jìng)爭成功(醫(yī)生只有一個(gè),一次只能看一個(gè)病人)
共享鎖:即共享資源可以被多個(gè)線程同時(shí)占有,直到共享資源被占用完畢(多個(gè)醫(yī)生,可以看多個(gè)病人),常見的有讀寫鎖 ReadWriteLock, CountdownLatch,兩者的區(qū)別如下
什么是公平鎖與非公平鎖
還是以就醫(yī)為例,所謂公平鎖即大家取號(hào)后老老實(shí)實(shí)按照先來后到的順序在侯診室依次等待叫號(hào),如果是非公平鎖呢,新來的病人(線程)很霸道,不取號(hào)排隊(duì) ,直接去搶先看病,占有醫(yī)生(不一定成功)
公平鎖與非公平鎖
本文我們將會(huì)重點(diǎn)分析獨(dú)占,非公平模式的源碼實(shí)現(xiàn),不分析共享模式與 Condition 的實(shí)現(xiàn),因?yàn)槠饰隽霜?dú)占鎖的實(shí)現(xiàn),由于原理都是相似的,再分析共享與 Condition 就不難了。
首先我們先來看下 ReentrantLock 的使用方法
// 1. 初始化可重入鎖 private ReentrantLock lock = new ReentrantLock(); public void run() { // 加鎖 lock.lock(); try { // 2. 執(zhí)行臨界區(qū)代碼 } catch (InterruptedException e) { e.printStackTrace(); } finally { // 3. 解鎖 lock.unlock(); } }
第一步是初始化可重入鎖,可以看到默認(rèn)使用的是非公平鎖機(jī)制
public ReentrantLock() { sync = new NonfairSync(); }
當(dāng)然你也可以用如下構(gòu)造方法來指定使用公平鎖:
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
畫外音: FairSync 和 NonfairSync 是 ReentrantLock 實(shí)現(xiàn)的內(nèi)部類,分別指公平和非公平模式,ReentrantLock ReentrantLock 的加鎖(lock),解鎖(unlock)在內(nèi)部具體都是調(diào)用的 FairSync,NonfairSync 的加鎖和解鎖方法。
幾個(gè)類的關(guān)系如下:
我們先來剖析下非公平鎖(NonfairSync)的實(shí)現(xiàn)方式,來看上述示例代碼的第二步:加鎖,由于默認(rèn)的是非公平鎖的加鎖,所以我們來分析下非公平鎖是如何加鎖的
可以看到 lock 方法主要有兩步
使用 CAS 來獲取 state 資源,如果成功設(shè)置 1,代表 state 資源獲取鎖成功,此時(shí)記錄下當(dāng)前占用 state 的線程 setExclusiveOwnerThread(Thread.currentThread());
如果 CAS 設(shè)置 state 為 1 失敗(代表獲取鎖失敗),則執(zhí)行 acquire(1) 方法,這個(gè)方法是 AQS 提供的方法,如下
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire 剖析
首先 調(diào)用 tryAcquire 嘗試著獲取 state,如果成功,則跳過后面的步驟。如果失敗,則執(zhí)行 acquireQueued 將線程加入 CLH 等待隊(duì)列中。
先來看下 tryAcquire 方法,這個(gè)方法是 AQS 提供的一個(gè)模板方法,最終由其 AQS 具體的實(shí)現(xiàn)類(Sync)實(shí)現(xiàn),由于執(zhí)行的是非公平鎖邏輯,執(zhí)行的代碼如下:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 如果 c 等于0,表示此時(shí)資源是空閑的(即鎖是釋放的),再用 CAS 獲取鎖 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 此條件表示之前已有線程獲得鎖,且此線程再一次獲得了鎖,獲取資源次數(shù)再加 1,這也映證了 ReentrantLock 為可重入鎖 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
此段代碼可知鎖的獲取主要分兩種情況
state 為 0 時(shí),代表鎖已經(jīng)被釋放,可以去獲取,于是使用 CAS 去重新獲取鎖資源,如果獲取成功,則代表競(jìng)爭鎖成功,使用 setExclusiveOwnerThread(current) 記錄下此時(shí)占有鎖的線程,看到這里的 CAS,大家應(yīng)該不難理解為啥當(dāng)前實(shí)現(xiàn)是非公平鎖了,因?yàn)殛?duì)列中的線程與新線程都可以 CAS 獲取鎖啊,新來的線程不需要排隊(duì)
如果 state 不為 0,代表之前已有線程占有了鎖,如果此時(shí)的線程依然是之前占有鎖的線程(current == getExclusiveOwnerThread() 為 true),代表此線程再一次占有了鎖(可重入鎖),此時(shí)更新 state,記錄下鎖被占有的次數(shù)(鎖的重入次數(shù)),這里的 setState 方法不需要使用 CAS 更新,因?yàn)榇藭r(shí)的鎖就是當(dāng)前線程占有的,其他線程沒有機(jī)會(huì)進(jìn)入這段代碼執(zhí)行。所以此時(shí)更新 state 是線程安全的。
假設(shè)當(dāng)前 state = 0,即鎖不被占用,現(xiàn)在有 T1, T2, T3 這三個(gè)線程要去競(jìng)爭鎖
假設(shè)現(xiàn)在 T1 獲取鎖成功,則兩種情況分別為 1、 T1 首次獲取鎖成功
2、 T1 再次獲取鎖成功,state 再加 1,表示鎖被重入了兩次,當(dāng)前如果 T1一直申請(qǐng)占用鎖成功,state 會(huì)一直累加
acquireQueued 剖析
如果 tryAcquire(arg) 執(zhí)行失敗,代表獲取鎖失敗,則執(zhí)行 acquireQueued 方法,將線程加入 FIFO 等待隊(duì)列
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
所以接下來我們來看看 acquireQueued 的執(zhí)行邏輯,首先會(huì)調(diào)用 addWaiter(Node.EXCLUSIVE) 將包含有當(dāng)前線程的 Node 節(jié)點(diǎn)入隊(duì), Node.EXCLUSIVE 代表此結(jié)點(diǎn)為獨(dú)占模式
再來看下 addWaiter 是如何實(shí)現(xiàn)的
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 如果尾結(jié)點(diǎn)不為空,則用 CAS 將獲取鎖失敗的線程入隊(duì) if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 如果結(jié)點(diǎn)為空,執(zhí)行 enq 方法 enq(node); return node; }
這段邏輯比較清楚,首先是獲取 FIFO 隊(duì)列的尾結(jié)點(diǎn),如果尾結(jié)點(diǎn)存在,則采用 CAS 的方式將等待線程入隊(duì),如果尾結(jié)點(diǎn)為空則執(zhí)行 enq 方法
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 尾結(jié)點(diǎn)為空,說明 FIFO 隊(duì)列未初始化,所以先初始化其頭結(jié)點(diǎn) if (compareAndSetHead(new Node())) tail = head; } else { // 尾結(jié)點(diǎn)不為空,則將等待線程入隊(duì) node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
首先判斷 tail 是否為空,如果為空說明 FIFO 隊(duì)列的 head,tail 還未構(gòu)建,此時(shí)先構(gòu)建頭結(jié)點(diǎn),構(gòu)建之后再用 CAS 的方式將此線程結(jié)點(diǎn)入隊(duì)
使用 CAS 創(chuàng)建 head 節(jié)點(diǎn)的時(shí)候只是簡單調(diào)用了 new Node() 方法,并不像其他節(jié)點(diǎn)那樣記錄 thread,這是為啥
因?yàn)?head 結(jié)點(diǎn)為虛結(jié)點(diǎn),它只代表當(dāng)前有線程占用了 state,至于占用 state 的是哪個(gè)線程,其實(shí)是調(diào)用了上文的 setExclusiveOwnerThread(current) ,即記錄在 exclusiveOwnerThread 屬性里。
執(zhí)行完 addWaiter 后,線程入隊(duì)成功,現(xiàn)在就要看最后一個(gè)最關(guān)鍵的方法 acquireQueued 了,這個(gè)方法有點(diǎn)難以理解,先不急,我們先用三個(gè)線程來模擬一下之前的代碼對(duì)應(yīng)的步驟
1、假設(shè) T1 獲取鎖成功,由于此時(shí) FIFO 未初始化,所以先創(chuàng)建 head 結(jié)點(diǎn)
2、此時(shí) T2 或 T3 再去競(jìng)爭 state 失敗,入隊(duì),如下圖:
好了,現(xiàn)在問題來了, T2,T3 入隊(duì)后怎么處理,是馬上阻塞嗎,馬上阻塞意味著線程從運(yùn)行態(tài)轉(zhuǎn)為阻塞態(tài) ,涉及到用戶態(tài)向內(nèi)核態(tài)的切換,而且喚醒后也要從內(nèi)核態(tài)轉(zhuǎn)為用戶態(tài),開銷相對(duì)比較大,所以 AQS 對(duì)這種入隊(duì)的線程采用的方式是讓它們自旋來競(jìng)爭鎖,如下圖示
不過聰明的你可能發(fā)現(xiàn)了一個(gè)問題,如果當(dāng)前鎖是獨(dú)占鎖,如果鎖一直被被 T1 占有, T2,T3 一直自旋沒太大意義,反而會(huì)占用 CPU,影響性能,所以更合適的方式是它們自旋一兩次競(jìng)爭不到鎖后識(shí)趣地阻塞以等待前置節(jié)點(diǎn)釋放鎖后再來喚醒它。
另外如果鎖在自旋過程中被中斷了,或者自旋超時(shí)了,應(yīng)該處于「取消」?fàn)顟B(tài)。
基于每個(gè) Node 可能所處的狀態(tài),AQS 為其定義了一個(gè)變量 waitStatus,根據(jù)這個(gè)變量值對(duì)相應(yīng)節(jié)點(diǎn)進(jìn)行相關(guān)的操作,我們一起來看這看這個(gè)變量都有哪些值,是時(shí)候看一個(gè) Node 結(jié)點(diǎn)的屬性定義了
static final class Node { static final Node SHARED = new Node();//標(biāo)識(shí)等待節(jié)點(diǎn)處于共享模式 static final Node EXCLUSIVE = null;//標(biāo)識(shí)等待節(jié)點(diǎn)處于獨(dú)占模式 static final int CANCELLED = 1; //由于超時(shí)或中斷,節(jié)點(diǎn)已被取消 static final int SIGNAL = -1; // 節(jié)點(diǎn)阻塞(park)必須在其前驅(qū)結(jié)點(diǎn)為 SIGNAL 的狀態(tài)下才能進(jìn)行,如果結(jié)點(diǎn)為 SIGNAL,則其釋放鎖或取消后,可以通過 unpark 喚醒下一個(gè)節(jié)點(diǎn), static final int CONDITION = -2;//表示線程在等待條件變量(先獲取鎖,加入到條件等待隊(duì)列,然后釋放鎖,等待條件變量滿足條件;只有重新獲取鎖之后才能返回) static final int PROPAGATE = -3;//表示后續(xù)結(jié)點(diǎn)會(huì)傳播喚醒的操作,共享模式下起作用 //等待狀態(tài):對(duì)于condition節(jié)點(diǎn),初始化為CONDITION;其它情況,默認(rèn)為0,通過CAS操作原子更新 volatile int waitStatus;
通過狀態(tài)的定義,我們可以猜測(cè)一下 AQS 對(duì)線程自旋的處理:如果當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)不為 head,且它的狀態(tài)為 SIGNAL,則結(jié)點(diǎn)進(jìn)入阻塞狀態(tài)。來看下代碼以映證我們的猜測(cè):
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 如果前一個(gè)節(jié)點(diǎn)是 head,則嘗試自旋獲取鎖 if (p == head && tryAcquire(arg)) { // 將 head 結(jié)點(diǎn)指向當(dāng)前節(jié)點(diǎn),原 head 結(jié)點(diǎn)出隊(duì) setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 如果前一個(gè)節(jié)點(diǎn)不是 head 或者競(jìng)爭鎖失敗,則進(jìn)入阻塞狀態(tài) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 如果線程自旋中因?yàn)楫惓5仍颢@取鎖最終失敗,則調(diào)用此方法 cancelAcquire(node); } }
先來看第一種情況,如果當(dāng)前結(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)是 head 結(jié)點(diǎn),且獲取鎖(tryAcquire)成功的處理
可以看到主要的處理就是把 head 指向當(dāng)前節(jié)點(diǎn),并且讓原 head 結(jié)點(diǎn)出隊(duì),這樣由于原 head 不可達(dá), 會(huì)被垃圾回收。
注意其中 setHead 的處理
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
將 head 設(shè)置成當(dāng)前結(jié)點(diǎn)后,要把節(jié)點(diǎn)的 thread, pre 設(shè)置成 null,因?yàn)橹胺治鲞^了,head 是虛節(jié)點(diǎn),不保存除 waitStatus(結(jié)點(diǎn)狀態(tài))的其他信息,所以這里把 thread ,pre 置為空,因?yàn)檎加墟i的線程由 exclusiveThread 記錄了,如果 head 再記錄 thread 不僅多此一舉,反而在釋放鎖的時(shí)候要多操作一遍 head 的 thread 釋放。
如果前一個(gè)節(jié)點(diǎn)不是 head 或者競(jìng)爭鎖失敗,則首先調(diào)用 shouldParkAfterFailedAcquire 方法判斷鎖是否應(yīng)該停止自旋進(jìn)入阻塞狀態(tài):
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 1. 如果前置頂點(diǎn)的狀態(tài)為 SIGNAL,表示當(dāng)前節(jié)點(diǎn)可以阻塞了 return true; if (ws > 0) { // 2. 移除取消狀態(tài)的結(jié)點(diǎn) do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 3. 如果前置節(jié)點(diǎn)的 ws 不為 0,則其設(shè)置為 SIGNAL, compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
這一段代碼有點(diǎn)繞,需要稍微動(dòng)點(diǎn)腦子,按以上步驟一步步來看
1、 首先我們要明白,根據(jù)之前 Node 類的注釋,如果前驅(qū)節(jié)點(diǎn)為 SIGNAL,則當(dāng)前節(jié)點(diǎn)可以進(jìn)入阻塞狀態(tài)。
如圖示:T2,T3 的前驅(qū)節(jié)點(diǎn)的 waitStatus 都為 SIGNAL,所以 T2,T3 此時(shí)都可以阻塞。
2、如果前驅(qū)節(jié)點(diǎn)為取消狀態(tài),則前驅(qū)節(jié)點(diǎn)需要移除,這些采用了一個(gè)更巧妙的方法,把所有當(dāng)前節(jié)點(diǎn)之前所有 waitStatus 為取消狀態(tài)的節(jié)點(diǎn)全部移除,假設(shè)有四個(gè)線程如下,T2,T3 為取消狀態(tài),則執(zhí)行邏輯后如下圖所示,T2, T3 節(jié)點(diǎn)會(huì)被 GC。
3、如果前驅(qū)節(jié)點(diǎn)小于等于 0,則需要首先將其前驅(qū)節(jié)點(diǎn)置為 SIGNAL,因?yàn)榍拔奈覀兎治鲞^,當(dāng)前節(jié)點(diǎn)進(jìn)入阻塞的一個(gè)條件是前驅(qū)節(jié)點(diǎn)必須為 SIGNAL,這樣下一次自旋后發(fā)現(xiàn)前驅(qū)節(jié)點(diǎn)為 SIGNAL,就會(huì)返回 true(即步驟 1)
shouldParkAfterFailedAcquire 返回 true 代表線程可以進(jìn)入阻塞中斷,那么下一步 parkAndCheckInterrupt 就該讓線程阻塞了
private final boolean parkAndCheckInterrupt() { // 阻塞線程 LockSupport.park(this); // 返回線程是否中斷過,并且清除中斷狀態(tài)(在獲得鎖后會(huì)補(bǔ)一次中斷) return Thread.interrupted(); }
這里的阻塞線程很容易理解,但為啥要判斷線程是否中斷過呢,因?yàn)槿绻€程在阻塞期間收到了中斷,喚醒(轉(zhuǎn)為運(yùn)行態(tài))獲取鎖后(acquireQueued 為 true)需要補(bǔ)一個(gè)中斷,如下所示:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果是因?yàn)橹袛鄦拘训木€程,獲取鎖后需要補(bǔ)一下中斷 selfInterrupt(); }
至此,獲取鎖的流程已經(jīng)分析完畢,不過還有一個(gè)疑惑我們還沒解開:前文不是說 Node 狀態(tài)為取消狀態(tài)會(huì)被取消嗎,那 Node 什么時(shí)候會(huì)被設(shè)置為取消狀態(tài)呢。
回頭看 acquireQueued
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // 省略自旋獲取鎖代碼 } finally { if (failed) // 如果線程自旋中因?yàn)楫惓5仍颢@取鎖最終失敗,則調(diào)用此方法 cancelAcquire(node); } }
看最后一個(gè) cancelAcquire 方法,如果線程自旋中因?yàn)楫惓5仍颢@取鎖最終失敗,則會(huì)調(diào)用此方法
private void cancelAcquire(Node node) { // 如果節(jié)點(diǎn)為空,直接返回 if (node == null) return; // 由于線程要被取消了,所以將 thread 線程清掉 node.thread = null; // 下面這步表示將 node 的 pre 指向之前第一個(gè)非取消狀態(tài)的結(jié)點(diǎn)(即跳過所有取消狀態(tài)的結(jié)點(diǎn)),waitStatus > 0 表示當(dāng)前結(jié)點(diǎn)狀態(tài)為取消狀態(tài) Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 獲取經(jīng)過過濾后的 pre 的 next 結(jié)點(diǎn),這一步主要用在后面的 CAS 設(shè)置 pre 的 next 節(jié)點(diǎn)上 Node predNext = pred.next; // 將當(dāng)前結(jié)點(diǎn)設(shè)置為取消狀態(tài) node.waitStatus = Node.CANCELLED; // 如果當(dāng)前取消結(jié)點(diǎn)為尾結(jié)點(diǎn),使用 CAS 則將尾結(jié)點(diǎn)設(shè)置為其前驅(qū)節(jié)點(diǎn),如果設(shè)置成功,則尾結(jié)點(diǎn)的 next 指針設(shè)置為空 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // 這一步看得有點(diǎn)繞,我們想想,如果當(dāng)前節(jié)點(diǎn)取消了,那是不是要把當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)指向當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn),但是我們之前也說了,要喚醒或阻塞結(jié)點(diǎn),須在其前驅(qū)節(jié)點(diǎn)的狀態(tài)為 SIGNAL 的條件才能操作,所以在設(shè)置 pre 的 next 節(jié)點(diǎn)時(shí)要保證 pre 結(jié)點(diǎn)的狀態(tài)為 SIGNAL,想通了這一點(diǎn)相信你不難理解以下代碼。 int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 如果 pre 為 head,或者 pre 的狀態(tài)設(shè)置 SIGNAL 失敗,則直接喚醒后繼結(jié)點(diǎn)去競(jìng)爭鎖,之前我們說過, SIGNAL 的結(jié)點(diǎn)取消(或釋放鎖)后可以喚醒后繼結(jié)點(diǎn) unparkSuccessor(node); } node.next = node; // help GC } }
這一段代碼有點(diǎn)繞,我們一個(gè)個(gè)來看,首先考慮以下情況
1、首先第一步當(dāng)前節(jié)點(diǎn)之前有取消結(jié)點(diǎn)時(shí),則邏輯如下
2、如果當(dāng)前結(jié)點(diǎn)既非頭結(jié)點(diǎn)的后繼結(jié)點(diǎn),也非尾結(jié)點(diǎn),即步驟 1 所示,則最終結(jié)果如下
這里肯定有人問,這種情況下當(dāng)前節(jié)點(diǎn)與它的前驅(qū)結(jié)點(diǎn)無法被 GC 啊,還記得我們上文分析鎖自旋時(shí)的處理嗎,再看下以下代碼
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 省略無關(guān)代碼 if (ws > 0) { // 移除取消狀態(tài)的結(jié)點(diǎn) do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } return false; }
這段代碼會(huì)將 node 的 pre 指向之前 waitStatus 為非 CANCEL 的節(jié)點(diǎn)
所以當(dāng) T4 執(zhí)行這段代碼時(shí),會(huì)變成如下情況
可以看到此時(shí)中間的兩個(gè) CANCEL 節(jié)點(diǎn)不可達(dá)了,會(huì)被 GC
3、如果當(dāng)前結(jié)點(diǎn)為 tail 結(jié)點(diǎn),則結(jié)果如下,這種情況下當(dāng)前結(jié)點(diǎn)不可達(dá),會(huì)被 GC
4、如果當(dāng)前結(jié)點(diǎn)為 head 的后繼結(jié)點(diǎn)時(shí),如下
結(jié)果中的 CANCEL 結(jié)點(diǎn)同樣會(huì)在 tail 結(jié)點(diǎn)自旋調(diào)用 shouldParkAfterFailedAcquire 后不可達(dá),如下
自此我們終于分析完了鎖的獲取流程,接下來我們來看看鎖是如何釋放的。
鎖釋放
不管是公平鎖還是非公平鎖,最終都是調(diào)的 AQS 的如下模板方法來釋放鎖
// java.util.concurrent.locks.AbstractQueuedSynchronizer public final boolean release(int arg) { // 鎖釋放是否成功 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease 方法定義在了 AQS 的子類 Sync 方法里
// java.util.concurrent.locks.ReentrantLock.Sync protected final boolean tryRelease(int releases) { int c = getState() - releases; // 只有持有鎖的線程才能釋放鎖,所以如果當(dāng)前鎖不是持有鎖的線程,則拋異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 說明線程持有的鎖全部釋放了,需要釋放 exclusiveOwnerThread 的持有線程 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
鎖釋放成功后該干嘛,顯然是喚醒之后 head 之后節(jié)點(diǎn),讓它來競(jìng)爭鎖
// java.util.concurrent.locks.AbstractQueuedSynchronizer public final boolean release(int arg) { // 鎖釋放是否成功 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 鎖釋放成功后,喚醒 head 之后的節(jié)點(diǎn),讓它來競(jìng)爭鎖 unparkSuccessor(h); return true; } return false; }
這里釋放鎖的條件為啥是 h != null && h.waitStatus != 0 呢。
如果 h == null, 這有兩種可能,一種是一個(gè)線程在競(jìng)爭鎖,現(xiàn)在它釋放了,當(dāng)然沒有所謂的喚醒后繼節(jié)點(diǎn),一種是其他線程正在運(yùn)行競(jìng)爭鎖,只是還未初始化頭節(jié)點(diǎn),既然其他線程正在運(yùn)行,也就無需執(zhí)行喚醒操作
如果 h != null 且 h.waitStatus == 0,說明 head 的后繼節(jié)點(diǎn)正在自旋競(jìng)爭鎖,也就是說線程是運(yùn)行狀態(tài)的,無需喚醒。
如果 h != null 且 h.waitStatus < 0, 此時(shí) waitStatus 值可能為 SIGNAL,或 PROPAGATE,這兩種情況說明后繼結(jié)點(diǎn)阻塞需要被喚醒
來看一下喚醒方法 unparkSuccessor:
private void unparkSuccessor(Node node) { // 獲取 head 的 waitStatus(假設(shè)其為 SIGNAL),并用 CAS 將其置為 0,為啥要做這一步呢,之前我們分析過多次,其實(shí) waitStatus = SIGNAL(< -1)或 PROPAGATE(-·3) 只是一個(gè)標(biāo)志,代表在此狀態(tài)下,后繼節(jié)點(diǎn)可以喚醒,既然正在喚醒后繼節(jié)點(diǎn),自然可以將其重置為 0,當(dāng)然如果失敗了也不影響其喚醒后繼結(jié)點(diǎn) int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 以下操作為獲取隊(duì)列第一個(gè)非取消狀態(tài)的結(jié)點(diǎn),并將其喚醒 Node s = node.next; // s 狀態(tài)為非空,或者其為取消狀態(tài),說明 s 是無效節(jié)點(diǎn),此時(shí)需要執(zhí)行 if 里的邏輯 if (s == null || s.waitStatus > 0) { s = null; // 以下操作為從尾向前獲取最后一個(gè)非取消狀態(tài)的結(jié)點(diǎn) for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
這里的尋找隊(duì)列的第一個(gè)非取消狀態(tài)的節(jié)點(diǎn)為啥要從后往前找呢,因?yàn)楣?jié)點(diǎn)入隊(duì)并不是原子操作,如下
線程自旋時(shí)時(shí)是先執(zhí)行 node.pre = pred, 然后再執(zhí)行 pred.next = node,如果 unparkSuccessor 剛好在這兩者之間執(zhí)行,此時(shí)是找不到 head 的后繼節(jié)點(diǎn)的,如下
如何利用 AQS 自定義一個(gè)互斥鎖
AQS 通過提供 state 及 FIFO 隊(duì)列的管理,為我們提供了一套通用的實(shí)現(xiàn)鎖的底層方法,基本上定義一個(gè)鎖,都是轉(zhuǎn)為在其內(nèi)部定義 AQS 的子類,調(diào)用 AQS 的底層方法來實(shí)現(xiàn)的,由于 AQS 在底層已經(jīng)為了定義好了這些獲取 state 及 FIFO 隊(duì)列的管理工作,我們要實(shí)現(xiàn)一個(gè)鎖就比較簡單了,我們可以基于 AQS 來實(shí)現(xiàn)一個(gè)非可重入的互斥鎖,如下所示
public class Mutex { private Sync sync = new Sync(); public void lock () { sync.acquire(1); } public void unlock () { sync.release(1); } private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { return compareAndSetState(0, 1); } @Override protected boolean tryRelease (int arg) { setState(0); return true; } @Override protected boolean isHeldExclusively () { return getState() == 1; } } }
可以看到區(qū)區(qū)幾行代碼就實(shí)現(xiàn)了,確實(shí)很方便。
感謝各位的閱讀,以上就是“怎么掌握AQS”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)怎么掌握AQS這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。