溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

怎么掌握AQS

發(fā)布時(shí)間:2021-10-26 13:45:10 來源:億速云 閱讀:128 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要講解了“怎么掌握AQS”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“怎么掌握AQS”吧!

鎖原理 - 信號(hào)量 vs  管程

在并發(fā)編程領(lǐng)域,有兩大核心問題:互斥與同步,互斥即同一時(shí)刻只允許一個(gè)線程訪問共享資源,同步,即線程之間如何通信、協(xié)作,一般這兩大問題可以通過信號(hào)量和管程來解決。

信號(hào)量

信號(hào)量(Semaphore)是操作系統(tǒng)提供的一種進(jìn)程間常見的通信方式,主要用來協(xié)調(diào)并發(fā)程序?qū)蚕碣Y源的訪問,操作系統(tǒng)可以保證對(duì)信號(hào)量操作的原子性。它是怎么實(shí)現(xiàn)的呢。

  • 信號(hào)量由一個(gè)共享整型變量 S 和兩個(gè)原子操作 PV 組成,S 只能通過 P 和 V 操作來改變

  • P 操作:即請(qǐng)求資源,意味著 S 要減 1,如果 S < 0, 則表示沒有資源了,此時(shí)線程要進(jìn)入等待隊(duì)列(同步隊(duì)列)等待

  • V 操作: 即釋放資源,意味著 S 要加 1, 如果 S 小于等于 0,說明等待隊(duì)列里有線程,此時(shí)就需要喚醒線程。

示意圖如下

怎么掌握AQS

信號(hào)量機(jī)制的引入解決了進(jìn)程同步和互斥問題,但信號(hào)量的大量同步操作分散在各個(gè)進(jìn)程中不便于管理,還有可能導(dǎo)致系統(tǒng)死鎖。如:生產(chǎn)者消費(fèi)者問題中將P、V顛倒可能死鎖(見文末參考鏈接),另外條件越多,需要的信號(hào)量就越多,需要更加謹(jǐn)慎地處理信號(hào)量之間的處理順序,否則很容易造成死鎖現(xiàn)象。

基于信號(hào)量給編程帶來的隱患,于是有了提出了對(duì)開發(fā)者更加友好的并發(fā)編程模型-管程

管程

Dijkstra 于 1971  年提出:把所有進(jìn)程對(duì)某一種臨界資源的同步操作都集中起來,構(gòu)成一個(gè)所謂的秘書進(jìn)程。凡要訪問該臨界資源的進(jìn)程,都需先報(bào)告秘書,由秘書來實(shí)現(xiàn)諸進(jìn)程對(duì)同一臨界資源的互斥使用,這種機(jī)制就是管程。

管程是一種在信號(hào)量機(jī)制上進(jìn)行改進(jìn)的并發(fā)編程模型,解決了信號(hào)量在臨界區(qū)的 PV 操作上配對(duì)的麻煩,把配對(duì)的 PV  操作集中在一起而形成的并發(fā)編程方法理論,極大降低了使用和理解成本。

  1. 鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)

  2. 管程由四部分組成:

  3. 管程內(nèi)部的共享變量。

  4. 管程內(nèi)部的條件變量。

  5. 管程內(nèi)部并行執(zhí)行的進(jìn)程。

對(duì)于局部與管程內(nèi)部的共享數(shù)據(jù)設(shè)置初始值的語句。

由此可見,管程就是一個(gè)對(duì)象監(jiān)視器。任何線程想要訪問該資源(共享變量),就要排隊(duì)進(jìn)入監(jiān)控范圍。進(jìn)入之后,接受檢查,不符合條件,則要繼續(xù)等待,直到被通知,然后繼續(xù)進(jìn)入監(jiān)視器。

需要注意的事,信號(hào)量和管程兩者是等價(jià)的,信號(hào)量可以實(shí)現(xiàn)管程,管程也可以實(shí)現(xiàn)信號(hào)量,只是兩者的表現(xiàn)形式不同而已,管程對(duì)開發(fā)者更加友好。

兩者的區(qū)別如下

怎么掌握AQS

管程為了解決信號(hào)量在臨界區(qū)的 PV 操作上的配對(duì)的麻煩,把配對(duì)的 PV  操作集中在一起,并且加入了條件變量的概念,使得在多條件下線程間的同步實(shí)現(xiàn)變得更加簡單。

怎么理解管程中的入口等待隊(duì)列,共享變量,條件變量等概念,有時(shí)候技術(shù)上的概念較難理解,我們可以借助生活中的場(chǎng)景來幫助我們理解,就以我們的就醫(yī)場(chǎng)景為例來簡單說明一下,正常的就醫(yī)流程如下:

  1. 病人去掛號(hào)后,去侯診室等待叫號(hào)

  2. 叫到自己時(shí),就可以進(jìn)入就診室就診了

  3. 就診時(shí),有兩種情況,一種是醫(yī)生很快就確定病人的病,并作出診斷,診斷完成后,就通知下一位病人進(jìn)來就診,一種是醫(yī)生無法確定病因,需要病人去做個(gè)驗(yàn)血 / CT  檢查才能確定病情,于是病人就先去驗(yàn)個(gè)血 / CT

  4. 病人驗(yàn)完血 / 做完 CT 后,重新取號(hào),等待叫號(hào)(進(jìn)入入口等待隊(duì)列)

  5. 病人等到自己的號(hào),病人又重新拿著驗(yàn)血 / CT 報(bào)告去找醫(yī)生就診

整個(gè)流程如下

怎么掌握AQS

那么管程是如何解決互斥和同步的呢

首先來看互斥,上文中醫(yī)生即共享資源(也即共享變量),就診室即為臨界區(qū),病人即線程,任何病人如果想要訪問臨界區(qū),必須首先獲取共享資源(即醫(yī)生),入口一次只允許一個(gè)線程經(jīng)過,在共享資源被占有的情況下,如果再有線程想占有共享資源,就需要到等待隊(duì)列去等候,等到獲取共享資源的線程釋放資源后,等待隊(duì)列中的線程就可以去競(jìng)爭共享資源了,這樣就解決了互斥問題,所以本質(zhì)上管程是通過將共享資源及其對(duì)共享資源的操作(線程安全地獲取和釋放)封裝起來來保證互斥性的。

再來看同步,同步是通過文中的條件變量及其等待隊(duì)列實(shí)現(xiàn)的,同步的實(shí)現(xiàn)分兩種情況

  1. 病人進(jìn)入就診室后,無需做驗(yàn)血 / CT  等操作,于是醫(yī)生診斷完成后,就會(huì)釋放共享資源(解鎖)去通知(notify,notifyAll)入口等待隊(duì)列的下一個(gè)病人,下一個(gè)病人聽到叫號(hào)后就能看醫(yī)生了。

  2. 如果病人進(jìn)入就診室后需要做驗(yàn)血 / CT 等操作,會(huì)去驗(yàn)血 / CT 隊(duì)列(條件隊(duì)列)排隊(duì),  同時(shí)釋放共享變量(醫(yī)生),通知入口等待隊(duì)列的其他病人(線程)去獲取共享變量(醫(yī)生),獲得許可的線程執(zhí)行完臨界區(qū)的邏輯后會(huì)喚醒條件變量等待隊(duì)列中的線程,將它放到入口等待隊(duì)列中  ,等到其獲取共享變量(醫(yī)生)時(shí),即可進(jìn)入入口(臨界區(qū))處理。

在 Java 里,鎖大多是依賴于管程來實(shí)現(xiàn)的,以大家熟悉的內(nèi)置鎖 synchronized 為例,它的實(shí)現(xiàn)原理如下。

怎么掌握AQS

可以看到 synchronized 鎖也是基于管程實(shí)現(xiàn)的,只不過它只有且只有一個(gè)條件變量(就是鎖對(duì)象本身)而已,這也是為什么JDK 要實(shí)現(xiàn) Lock  鎖的原因之一,Lock 支持多個(gè)條件變量。

通過這樣的類比,相信大家對(duì)管程的工作機(jī)制有了比較清晰的認(rèn)識(shí),為啥要花這么大的力氣介紹管程呢,一來管程是解決并發(fā)問題的萬能鑰匙,二來 AQS 是基于  Java 并發(fā)包中管程的一種實(shí)現(xiàn),所以理解管程對(duì)我們理解 AQS 會(huì)大有幫助,接下來我們就來看看 AQS 是如何工作的。

AQS 實(shí)現(xiàn)原理

AQS 全稱是 AbstractQueuedSynchronizer,是一個(gè)用來構(gòu)建鎖和同步器的框架,它維護(hù)了一個(gè)共享資源 state  和一個(gè) FIFO 的等待隊(duì)列(即上文中管程的入口等待隊(duì)列),底層利用了 CAS 機(jī)制來保證操作的原子性。

AQS 實(shí)現(xiàn)鎖的主要原理如下:

怎么掌握AQS

以實(shí)現(xiàn)獨(dú)占鎖為例(即當(dāng)前資源只能被一個(gè)線程占有),其實(shí)現(xiàn)原理如下:state 初始化 0,在多線程條件下,線程要執(zhí)行臨界區(qū)的代碼,必須首先獲取  state,某個(gè)線程獲取成功之后, state 加 1,其他線程再獲取的話由于共享資源已被占用,所以會(huì)到 FIFO 等待隊(duì)列去等待,等占有 state  的線程執(zhí)行完臨界區(qū)的代碼釋放資源( state 減 1)后,會(huì)喚醒 FIFO 中的下一個(gè)等待線程(head 中的下一個(gè)結(jié)點(diǎn))去獲取 state。

state 由于是多線程共享變量,所以必須定義成 volatile,以保證 state 的可見性, 同時(shí)雖然 volatile  能保證可見性,但不能保證原子性,所以 AQS 提供了對(duì) state 的原子操作方法,保證了線程安全。

另外 AQS 中實(shí)現(xiàn)的 FIFO 隊(duì)列(CLH 隊(duì)列)其實(shí)是雙向鏈表實(shí)現(xiàn)的,由 head, tail 節(jié)點(diǎn)表示,head  結(jié)點(diǎn)代表當(dāng)前占用的線程,其他節(jié)點(diǎn)由于暫時(shí)獲取不到鎖所以依次排隊(duì)等待鎖釋放。

所以我們不難明白 AQS 的如下定義:

public abstract class AbstractQueuedSynchronizer   extends AbstractOwnableSynchronizer     implements java.io.Serializable {     // 以下為雙向鏈表的首尾結(jié)點(diǎn),代表入口等待隊(duì)列     private transient volatile Node head;     private transient volatile Node tail;     // 共享變量 state     private volatile int state;     // cas 獲取 / 釋放 state,保證線程安全地獲取鎖     protected final boolean compareAndSetState(int expect, int update) {         // See below for intrinsics setup to support this         return unsafe.compareAndSwapInt(this, stateOffset, expect, update);     }     // ...  }

AQS 源碼

剖析ReentrantLock 是我們比較常用的一種鎖,也是基于 AQS 實(shí)現(xiàn)的,所以接下來我們就來分析一下 ReentrantLock  鎖的實(shí)現(xiàn)來一探 AQS 究竟。本文將會(huì)采用圖文并茂的方式讓大家理解 AQS  的實(shí)現(xiàn)原理,大家在學(xué)習(xí)過程中,可以多類比一下上文中就診的例子,相信會(huì)有助于理解。

首先我們要知道 ReentrantLock 是獨(dú)占鎖,也有公平和非公平兩種鎖模式,什么是獨(dú)占與有共享模式,什么又是公平鎖與非公平鎖

與獨(dú)占鎖對(duì)應(yīng)的是共享鎖,這兩者有什么區(qū)別呢

獨(dú)占鎖:即其他線程只有在占有鎖的線程釋放后才能競(jìng)爭鎖,有且只有一個(gè)線程能競(jìng)爭成功(醫(yī)生只有一個(gè),一次只能看一個(gè)病人)

共享鎖:即共享資源可以被多個(gè)線程同時(shí)占有,直到共享資源被占用完畢(多個(gè)醫(yī)生,可以看多個(gè)病人),常見的有讀寫鎖 ReadWriteLock,  CountdownLatch,兩者的區(qū)別如下

怎么掌握AQS

什么是公平鎖與非公平鎖

還是以就醫(yī)為例,所謂公平鎖即大家取號(hào)后老老實(shí)實(shí)按照先來后到的順序在侯診室依次等待叫號(hào),如果是非公平鎖呢,新來的病人(線程)很霸道,不取號(hào)排隊(duì)  ,直接去搶先看病,占有醫(yī)生(不一定成功)

怎么掌握AQS

公平鎖與非公平鎖

本文我們將會(huì)重點(diǎn)分析獨(dú)占,非公平模式的源碼實(shí)現(xiàn),不分析共享模式與 Condition 的實(shí)現(xiàn),因?yàn)槠饰隽霜?dú)占鎖的實(shí)現(xiàn),由于原理都是相似的,再分析共享與  Condition 就不難了。

首先我們先來看下 ReentrantLock 的使用方法

// 1. 初始化可重入鎖 private ReentrantLock lock = new ReentrantLock(); public void run() {     // 加鎖     lock.lock();     try {         // 2. 執(zhí)行臨界區(qū)代碼     } catch (InterruptedException e) {         e.printStackTrace();     } finally {         // 3. 解鎖         lock.unlock();     } }

第一步是初始化可重入鎖,可以看到默認(rèn)使用的是非公平鎖機(jī)制

public ReentrantLock() {     sync = new NonfairSync(); }

當(dāng)然你也可以用如下構(gòu)造方法來指定使用公平鎖:

public ReentrantLock(boolean fair) {     sync = fair ? new FairSync() : new NonfairSync(); }

畫外音: FairSync 和 NonfairSync 是 ReentrantLock 實(shí)現(xiàn)的內(nèi)部類,分別指公平和非公平模式,ReentrantLock  ReentrantLock 的加鎖(lock),解鎖(unlock)在內(nèi)部具體都是調(diào)用的 FairSync,NonfairSync 的加鎖和解鎖方法。

幾個(gè)類的關(guān)系如下:

怎么掌握AQS

我們先來剖析下非公平鎖(NonfairSync)的實(shí)現(xiàn)方式,來看上述示例代碼的第二步:加鎖,由于默認(rèn)的是非公平鎖的加鎖,所以我們來分析下非公平鎖是如何加鎖的

怎么掌握AQS

可以看到 lock 方法主要有兩步

  1. 使用 CAS 來獲取 state 資源,如果成功設(shè)置 1,代表 state 資源獲取鎖成功,此時(shí)記錄下當(dāng)前占用 state 的線程  setExclusiveOwnerThread(Thread.currentThread());

  2. 如果 CAS 設(shè)置 state 為 1 失敗(代表獲取鎖失敗),則執(zhí)行 acquire(1) 方法,這個(gè)方法是 AQS 提供的方法,如下

public final void acquire(int arg) {     if (!tryAcquire(arg) &&         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))         selfInterrupt(); }

tryAcquire 剖析

首先 調(diào)用 tryAcquire 嘗試著獲取 state,如果成功,則跳過后面的步驟。如果失敗,則執(zhí)行 acquireQueued 將線程加入 CLH  等待隊(duì)列中。

先來看下 tryAcquire 方法,這個(gè)方法是 AQS 提供的一個(gè)模板方法,最終由其 AQS  具體的實(shí)現(xiàn)類(Sync)實(shí)現(xiàn),由于執(zhí)行的是非公平鎖邏輯,執(zhí)行的代碼如下:

final boolean nonfairTryAcquire(int acquires) {     final Thread current = Thread.currentThread();     int c = getState();      if (c == 0) {         // 如果 c 等于0,表示此時(shí)資源是空閑的(即鎖是釋放的),再用 CAS 獲取鎖         if (compareAndSetState(0, acquires)) {             setExclusiveOwnerThread(current);             return true;         }     }     else if (current == getExclusiveOwnerThread()) {         // 此條件表示之前已有線程獲得鎖,且此線程再一次獲得了鎖,獲取資源次數(shù)再加 1,這也映證了 ReentrantLock 為可重入鎖         int nextc = c + acquires;         if (nextc < 0) // overflow             throw new Error("Maximum lock count exceeded");         setState(nextc);         return true;     }     return false; }

此段代碼可知鎖的獲取主要分兩種情況

  1. state 為 0 時(shí),代表鎖已經(jīng)被釋放,可以去獲取,于是使用 CAS 去重新獲取鎖資源,如果獲取成功,則代表競(jìng)爭鎖成功,使用  setExclusiveOwnerThread(current) 記錄下此時(shí)占有鎖的線程,看到這里的  CAS,大家應(yīng)該不難理解為啥當(dāng)前實(shí)現(xiàn)是非公平鎖了,因?yàn)殛?duì)列中的線程與新線程都可以 CAS 獲取鎖啊,新來的線程不需要排隊(duì)

  2. 如果 state 不為 0,代表之前已有線程占有了鎖,如果此時(shí)的線程依然是之前占有鎖的線程(current ==  getExclusiveOwnerThread() 為 true),代表此線程再一次占有了鎖(可重入鎖),此時(shí)更新  state,記錄下鎖被占有的次數(shù)(鎖的重入次數(shù)),這里的 setState 方法不需要使用 CAS  更新,因?yàn)榇藭r(shí)的鎖就是當(dāng)前線程占有的,其他線程沒有機(jī)會(huì)進(jìn)入這段代碼執(zhí)行。所以此時(shí)更新 state 是線程安全的。

假設(shè)當(dāng)前 state = 0,即鎖不被占用,現(xiàn)在有 T1, T2, T3 這三個(gè)線程要去競(jìng)爭鎖

怎么掌握AQS

假設(shè)現(xiàn)在 T1 獲取鎖成功,則兩種情況分別為 1、 T1 首次獲取鎖成功

怎么掌握AQS

2、 T1 再次獲取鎖成功,state 再加 1,表示鎖被重入了兩次,當(dāng)前如果 T1一直申請(qǐng)占用鎖成功,state 會(huì)一直累加

怎么掌握AQS

acquireQueued 剖析

如果 tryAcquire(arg) 執(zhí)行失敗,代表獲取鎖失敗,則執(zhí)行 acquireQueued 方法,將線程加入 FIFO 等待隊(duì)列

public final void acquire(int arg) {     if (!tryAcquire(arg) &&         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))         selfInterrupt(); }

所以接下來我們來看看 acquireQueued 的執(zhí)行邏輯,首先會(huì)調(diào)用 addWaiter(Node.EXCLUSIVE) 將包含有當(dāng)前線程的 Node  節(jié)點(diǎn)入隊(duì), Node.EXCLUSIVE 代表此結(jié)點(diǎn)為獨(dú)占模式

再來看下 addWaiter 是如何實(shí)現(xiàn)的

private Node addWaiter(Node mode) {     Node node = new Node(Thread.currentThread(), mode);     Node pred = tail;     // 如果尾結(jié)點(diǎn)不為空,則用 CAS 將獲取鎖失敗的線程入隊(duì)     if (pred != null) {         node.prev = pred;         if (compareAndSetTail(pred, node)) {             pred.next = node;             return node;         }     }     // 如果結(jié)點(diǎn)為空,執(zhí)行 enq 方法     enq(node);     return node; }

這段邏輯比較清楚,首先是獲取 FIFO 隊(duì)列的尾結(jié)點(diǎn),如果尾結(jié)點(diǎn)存在,則采用 CAS 的方式將等待線程入隊(duì),如果尾結(jié)點(diǎn)為空則執(zhí)行 enq 方法

private Node enq(final Node node) {     for (;;) {         Node t = tail;         if (t == null) {             // 尾結(jié)點(diǎn)為空,說明 FIFO 隊(duì)列未初始化,所以先初始化其頭結(jié)點(diǎn)             if (compareAndSetHead(new Node()))                 tail = head;         } else {             // 尾結(jié)點(diǎn)不為空,則將等待線程入隊(duì)             node.prev = t;             if (compareAndSetTail(t, node)) {                 t.next = node;                 return t;             }         }     } }

首先判斷 tail 是否為空,如果為空說明 FIFO 隊(duì)列的 head,tail 還未構(gòu)建,此時(shí)先構(gòu)建頭結(jié)點(diǎn),構(gòu)建之后再用 CAS  的方式將此線程結(jié)點(diǎn)入隊(duì)

使用 CAS 創(chuàng)建 head 節(jié)點(diǎn)的時(shí)候只是簡單調(diào)用了 new Node() 方法,并不像其他節(jié)點(diǎn)那樣記錄 thread,這是為啥

因?yàn)?head 結(jié)點(diǎn)為虛結(jié)點(diǎn),它只代表當(dāng)前有線程占用了 state,至于占用 state 的是哪個(gè)線程,其實(shí)是調(diào)用了上文的  setExclusiveOwnerThread(current) ,即記錄在 exclusiveOwnerThread 屬性里。

執(zhí)行完 addWaiter 后,線程入隊(duì)成功,現(xiàn)在就要看最后一個(gè)最關(guān)鍵的方法 acquireQueued  了,這個(gè)方法有點(diǎn)難以理解,先不急,我們先用三個(gè)線程來模擬一下之前的代碼對(duì)應(yīng)的步驟

1、假設(shè) T1 獲取鎖成功,由于此時(shí) FIFO 未初始化,所以先創(chuàng)建 head 結(jié)點(diǎn)

怎么掌握AQS

2、此時(shí) T2 或 T3 再去競(jìng)爭 state 失敗,入隊(duì),如下圖:

怎么掌握AQS

好了,現(xiàn)在問題來了, T2,T3 入隊(duì)后怎么處理,是馬上阻塞嗎,馬上阻塞意味著線程從運(yùn)行態(tài)轉(zhuǎn)為阻塞態(tài)  ,涉及到用戶態(tài)向內(nèi)核態(tài)的切換,而且喚醒后也要從內(nèi)核態(tài)轉(zhuǎn)為用戶態(tài),開銷相對(duì)比較大,所以 AQS 對(duì)這種入隊(duì)的線程采用的方式是讓它們自旋來競(jìng)爭鎖,如下圖示

怎么掌握AQS

不過聰明的你可能發(fā)現(xiàn)了一個(gè)問題,如果當(dāng)前鎖是獨(dú)占鎖,如果鎖一直被被 T1 占有, T2,T3 一直自旋沒太大意義,反而會(huì)占用  CPU,影響性能,所以更合適的方式是它們自旋一兩次競(jìng)爭不到鎖后識(shí)趣地阻塞以等待前置節(jié)點(diǎn)釋放鎖后再來喚醒它。

另外如果鎖在自旋過程中被中斷了,或者自旋超時(shí)了,應(yīng)該處于「取消」?fàn)顟B(tài)。

基于每個(gè) Node 可能所處的狀態(tài),AQS 為其定義了一個(gè)變量  waitStatus,根據(jù)這個(gè)變量值對(duì)相應(yīng)節(jié)點(diǎn)進(jìn)行相關(guān)的操作,我們一起來看這看這個(gè)變量都有哪些值,是時(shí)候看一個(gè) Node 結(jié)點(diǎn)的屬性定義了

static final class Node {     static final Node SHARED = new Node();//標(biāo)識(shí)等待節(jié)點(diǎn)處于共享模式     static final Node EXCLUSIVE = null;//標(biāo)識(shí)等待節(jié)點(diǎn)處于獨(dú)占模式      static final int CANCELLED = 1; //由于超時(shí)或中斷,節(jié)點(diǎn)已被取消     static final int SIGNAL = -1;  // 節(jié)點(diǎn)阻塞(park)必須在其前驅(qū)結(jié)點(diǎn)為 SIGNAL 的狀態(tài)下才能進(jìn)行,如果結(jié)點(diǎn)為 SIGNAL,則其釋放鎖或取消后,可以通過 unpark 喚醒下一個(gè)節(jié)點(diǎn),     static final int CONDITION = -2;//表示線程在等待條件變量(先獲取鎖,加入到條件等待隊(duì)列,然后釋放鎖,等待條件變量滿足條件;只有重新獲取鎖之后才能返回)     static final int PROPAGATE = -3;//表示后續(xù)結(jié)點(diǎn)會(huì)傳播喚醒的操作,共享模式下起作用      //等待狀態(tài):對(duì)于condition節(jié)點(diǎn),初始化為CONDITION;其它情況,默認(rèn)為0,通過CAS操作原子更新     volatile int waitStatus;

通過狀態(tài)的定義,我們可以猜測(cè)一下 AQS 對(duì)線程自旋的處理:如果當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)不為 head,且它的狀態(tài)為  SIGNAL,則結(jié)點(diǎn)進(jìn)入阻塞狀態(tài)。來看下代碼以映證我們的猜測(cè):

final boolean acquireQueued(final Node node, int arg) {     boolean failed = true;     try {         boolean interrupted = false;         for (;;) {             final Node p = node.predecessor();             // 如果前一個(gè)節(jié)點(diǎn)是 head,則嘗試自旋獲取鎖             if (p == head && tryAcquire(arg)) {                 //  將 head 結(jié)點(diǎn)指向當(dāng)前節(jié)點(diǎn),原 head 結(jié)點(diǎn)出隊(duì)                 setHead(node);                 p.next = null; // help GC                 failed = false;                 return interrupted;             }             // 如果前一個(gè)節(jié)點(diǎn)不是 head 或者競(jìng)爭鎖失敗,則進(jìn)入阻塞狀態(tài)             if (shouldParkAfterFailedAcquire(p, node) &&                 parkAndCheckInterrupt())                 interrupted = true;         }     } finally {         if (failed)             // 如果線程自旋中因?yàn)楫惓5仍颢@取鎖最終失敗,則調(diào)用此方法             cancelAcquire(node);     } }

先來看第一種情況,如果當(dāng)前結(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)是 head 結(jié)點(diǎn),且獲取鎖(tryAcquire)成功的處理

怎么掌握AQS

可以看到主要的處理就是把 head 指向當(dāng)前節(jié)點(diǎn),并且讓原 head 結(jié)點(diǎn)出隊(duì),這樣由于原 head 不可達(dá), 會(huì)被垃圾回收。

注意其中 setHead 的處理

private void setHead(Node node) {     head = node;     node.thread = null;     node.prev = null; }

將 head 設(shè)置成當(dāng)前結(jié)點(diǎn)后,要把節(jié)點(diǎn)的 thread, pre 設(shè)置成 null,因?yàn)橹胺治鲞^了,head 是虛節(jié)點(diǎn),不保存除  waitStatus(結(jié)點(diǎn)狀態(tài))的其他信息,所以這里把 thread ,pre 置為空,因?yàn)檎加墟i的線程由 exclusiveThread 記錄了,如果  head 再記錄 thread 不僅多此一舉,反而在釋放鎖的時(shí)候要多操作一遍 head 的 thread 釋放。

如果前一個(gè)節(jié)點(diǎn)不是 head 或者競(jìng)爭鎖失敗,則首先調(diào)用 shouldParkAfterFailedAcquire  方法判斷鎖是否應(yīng)該停止自旋進(jìn)入阻塞狀態(tài):

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {     int ws = pred.waitStatus;              if (ws == Node.SIGNAL)        // 1. 如果前置頂點(diǎn)的狀態(tài)為 SIGNAL,表示當(dāng)前節(jié)點(diǎn)可以阻塞了         return true;     if (ws > 0) {         // 2. 移除取消狀態(tài)的結(jié)點(diǎn)         do {             node.prev = pred = pred.prev;         } while (pred.waitStatus > 0);         pred.next = node;     } else {         // 3. 如果前置節(jié)點(diǎn)的 ws 不為 0,則其設(shè)置為 SIGNAL,         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);     }     return false; }

這一段代碼有點(diǎn)繞,需要稍微動(dòng)點(diǎn)腦子,按以上步驟一步步來看

1、 首先我們要明白,根據(jù)之前 Node 類的注釋,如果前驅(qū)節(jié)點(diǎn)為 SIGNAL,則當(dāng)前節(jié)點(diǎn)可以進(jìn)入阻塞狀態(tài)。

怎么掌握AQS

如圖示:T2,T3 的前驅(qū)節(jié)點(diǎn)的 waitStatus 都為 SIGNAL,所以 T2,T3  此時(shí)都可以阻塞。

2、如果前驅(qū)節(jié)點(diǎn)為取消狀態(tài),則前驅(qū)節(jié)點(diǎn)需要移除,這些采用了一個(gè)更巧妙的方法,把所有當(dāng)前節(jié)點(diǎn)之前所有 waitStatus  為取消狀態(tài)的節(jié)點(diǎn)全部移除,假設(shè)有四個(gè)線程如下,T2,T3 為取消狀態(tài),則執(zhí)行邏輯后如下圖所示,T2, T3 節(jié)點(diǎn)會(huì)被 GC。

怎么掌握AQS

3、如果前驅(qū)節(jié)點(diǎn)小于等于 0,則需要首先將其前驅(qū)節(jié)點(diǎn)置為 SIGNAL,因?yàn)榍拔奈覀兎治鲞^,當(dāng)前節(jié)點(diǎn)進(jìn)入阻塞的一個(gè)條件是前驅(qū)節(jié)點(diǎn)必須為  SIGNAL,這樣下一次自旋后發(fā)現(xiàn)前驅(qū)節(jié)點(diǎn)為 SIGNAL,就會(huì)返回 true(即步驟 1)

shouldParkAfterFailedAcquire 返回 true 代表線程可以進(jìn)入阻塞中斷,那么下一步 parkAndCheckInterrupt  就該讓線程阻塞了

private final boolean parkAndCheckInterrupt() {     // 阻塞線程     LockSupport.park(this);     // 返回線程是否中斷過,并且清除中斷狀態(tài)(在獲得鎖后會(huì)補(bǔ)一次中斷)     return Thread.interrupted(); }

這里的阻塞線程很容易理解,但為啥要判斷線程是否中斷過呢,因?yàn)槿绻€程在阻塞期間收到了中斷,喚醒(轉(zhuǎn)為運(yùn)行態(tài))獲取鎖后(acquireQueued 為  true)需要補(bǔ)一個(gè)中斷,如下所示:

public final void acquire(int arg) {     if (!tryAcquire(arg) &&         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))         // 如果是因?yàn)橹袛鄦拘训木€程,獲取鎖后需要補(bǔ)一下中斷         selfInterrupt(); }

至此,獲取鎖的流程已經(jīng)分析完畢,不過還有一個(gè)疑惑我們還沒解開:前文不是說 Node 狀態(tài)為取消狀態(tài)會(huì)被取消嗎,那 Node  什么時(shí)候會(huì)被設(shè)置為取消狀態(tài)呢。

回頭看 acquireQueued

final boolean acquireQueued(final Node node, int arg) {     boolean failed = true;     try {         // 省略自旋獲取鎖代碼             } finally {         if (failed)             // 如果線程自旋中因?yàn)楫惓5仍颢@取鎖最終失敗,則調(diào)用此方法             cancelAcquire(node);     } }

看最后一個(gè) cancelAcquire 方法,如果線程自旋中因?yàn)楫惓5仍颢@取鎖最終失敗,則會(huì)調(diào)用此方法

private void cancelAcquire(Node node) {     // 如果節(jié)點(diǎn)為空,直接返回     if (node == null)         return;     // 由于線程要被取消了,所以將 thread 線程清掉     node.thread = null;      // 下面這步表示將 node 的 pre 指向之前第一個(gè)非取消狀態(tài)的結(jié)點(diǎn)(即跳過所有取消狀態(tài)的結(jié)點(diǎn)),waitStatus > 0 表示當(dāng)前結(jié)點(diǎn)狀態(tài)為取消狀態(tài)     Node pred = node.prev;     while (pred.waitStatus > 0)         node.prev = pred = pred.prev;      // 獲取經(jīng)過過濾后的 pre 的 next 結(jié)點(diǎn),這一步主要用在后面的 CAS 設(shè)置 pre 的 next 節(jié)點(diǎn)上     Node predNext = pred.next;     // 將當(dāng)前結(jié)點(diǎn)設(shè)置為取消狀態(tài)     node.waitStatus = Node.CANCELLED;      // 如果當(dāng)前取消結(jié)點(diǎn)為尾結(jié)點(diǎn),使用 CAS 則將尾結(jié)點(diǎn)設(shè)置為其前驅(qū)節(jié)點(diǎn),如果設(shè)置成功,則尾結(jié)點(diǎn)的 next 指針設(shè)置為空     if (node == tail && compareAndSetTail(node, pred)) {         compareAndSetNext(pred, predNext, null);     } else {     // 這一步看得有點(diǎn)繞,我們想想,如果當(dāng)前節(jié)點(diǎn)取消了,那是不是要把當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)指向當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn),但是我們之前也說了,要喚醒或阻塞結(jié)點(diǎn),須在其前驅(qū)節(jié)點(diǎn)的狀態(tài)為 SIGNAL 的條件才能操作,所以在設(shè)置 pre 的 next 節(jié)點(diǎn)時(shí)要保證 pre 結(jié)點(diǎn)的狀態(tài)為 SIGNAL,想通了這一點(diǎn)相信你不難理解以下代碼。         int ws;         if (pred != head &&             ((ws = pred.waitStatus) == Node.SIGNAL ||              (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&             pred.thread != null) {             Node next = node.next;             if (next != null && next.waitStatus <= 0)                 compareAndSetNext(pred, predNext, next);         } else {         // 如果 pre 為 head,或者  pre 的狀態(tài)設(shè)置 SIGNAL 失敗,則直接喚醒后繼結(jié)點(diǎn)去競(jìng)爭鎖,之前我們說過, SIGNAL 的結(jié)點(diǎn)取消(或釋放鎖)后可以喚醒后繼結(jié)點(diǎn)             unparkSuccessor(node);         }         node.next = node; // help GC     } }

這一段代碼有點(diǎn)繞,我們一個(gè)個(gè)來看,首先考慮以下情況

1、首先第一步當(dāng)前節(jié)點(diǎn)之前有取消結(jié)點(diǎn)時(shí),則邏輯如下

怎么掌握AQS

2、如果當(dāng)前結(jié)點(diǎn)既非頭結(jié)點(diǎn)的后繼結(jié)點(diǎn),也非尾結(jié)點(diǎn),即步驟 1 所示,則最終結(jié)果如下

怎么掌握AQS

這里肯定有人問,這種情況下當(dāng)前節(jié)點(diǎn)與它的前驅(qū)結(jié)點(diǎn)無法被 GC 啊,還記得我們上文分析鎖自旋時(shí)的處理嗎,再看下以下代碼

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {     int ws = pred.waitStatus;     // 省略無關(guān)代碼     if (ws > 0) {         // 移除取消狀態(tài)的結(jié)點(diǎn)         do {             node.prev = pred = pred.prev;         } while (pred.waitStatus > 0);         pred.next = node;     }      return false; }

這段代碼會(huì)將 node 的 pre 指向之前 waitStatus 為非 CANCEL 的節(jié)點(diǎn)

所以當(dāng) T4 執(zhí)行這段代碼時(shí),會(huì)變成如下情況

怎么掌握AQS

可以看到此時(shí)中間的兩個(gè) CANCEL 節(jié)點(diǎn)不可達(dá)了,會(huì)被 GC

3、如果當(dāng)前結(jié)點(diǎn)為 tail 結(jié)點(diǎn),則結(jié)果如下,這種情況下當(dāng)前結(jié)點(diǎn)不可達(dá),會(huì)被 GC

怎么掌握AQS

4、如果當(dāng)前結(jié)點(diǎn)為 head 的后繼結(jié)點(diǎn)時(shí),如下

怎么掌握AQS

結(jié)果中的 CANCEL 結(jié)點(diǎn)同樣會(huì)在 tail 結(jié)點(diǎn)自旋調(diào)用 shouldParkAfterFailedAcquire 后不可達(dá),如下

怎么掌握AQS

自此我們終于分析完了鎖的獲取流程,接下來我們來看看鎖是如何釋放的。

鎖釋放

不管是公平鎖還是非公平鎖,最終都是調(diào)的 AQS 的如下模板方法來釋放鎖

// java.util.concurrent.locks.AbstractQueuedSynchronizer  public final boolean release(int arg) {     // 鎖釋放是否成功     if (tryRelease(arg)) {         Node h = head;         if (h != null && h.waitStatus != 0)             unparkSuccessor(h);         return true;     }     return false; }

tryRelease 方法定義在了 AQS 的子類 Sync 方法里

// java.util.concurrent.locks.ReentrantLock.Sync  protected final boolean tryRelease(int releases) {     int c = getState() - releases;     // 只有持有鎖的線程才能釋放鎖,所以如果當(dāng)前鎖不是持有鎖的線程,則拋異常     if (Thread.currentThread() != getExclusiveOwnerThread())         throw new IllegalMonitorStateException();     boolean free = false;     // 說明線程持有的鎖全部釋放了,需要釋放 exclusiveOwnerThread 的持有線程     if (c == 0) {         free = true;         setExclusiveOwnerThread(null);     }     setState(c);     return free; }

鎖釋放成功后該干嘛,顯然是喚醒之后 head 之后節(jié)點(diǎn),讓它來競(jìng)爭鎖

// java.util.concurrent.locks.AbstractQueuedSynchronizer  public final boolean release(int arg) {     // 鎖釋放是否成功     if (tryRelease(arg)) {         Node h = head;         if (h != null && h.waitStatus != 0)             // 鎖釋放成功后,喚醒 head 之后的節(jié)點(diǎn),讓它來競(jìng)爭鎖             unparkSuccessor(h);         return true;     }     return false; }

這里釋放鎖的條件為啥是 h != null && h.waitStatus != 0 呢。

如果 h == null,  這有兩種可能,一種是一個(gè)線程在競(jìng)爭鎖,現(xiàn)在它釋放了,當(dāng)然沒有所謂的喚醒后繼節(jié)點(diǎn),一種是其他線程正在運(yùn)行競(jìng)爭鎖,只是還未初始化頭節(jié)點(diǎn),既然其他線程正在運(yùn)行,也就無需執(zhí)行喚醒操作

如果 h != null 且 h.waitStatus == 0,說明 head 的后繼節(jié)點(diǎn)正在自旋競(jìng)爭鎖,也就是說線程是運(yùn)行狀態(tài)的,無需喚醒。

如果 h != null 且 h.waitStatus < 0, 此時(shí) waitStatus 值可能為 SIGNAL,或  PROPAGATE,這兩種情況說明后繼結(jié)點(diǎn)阻塞需要被喚醒

來看一下喚醒方法 unparkSuccessor:

private void unparkSuccessor(Node node) {     // 獲取 head 的 waitStatus(假設(shè)其為 SIGNAL),并用 CAS 將其置為 0,為啥要做這一步呢,之前我們分析過多次,其實(shí) waitStatus = SIGNAL(< -1)或 PROPAGATE(-&middot;3) 只是一個(gè)標(biāo)志,代表在此狀態(tài)下,后繼節(jié)點(diǎn)可以喚醒,既然正在喚醒后繼節(jié)點(diǎn),自然可以將其重置為 0,當(dāng)然如果失敗了也不影響其喚醒后繼結(jié)點(diǎn)     int ws = node.waitStatus;     if (ws < 0)         compareAndSetWaitStatus(node, ws, 0);      // 以下操作為獲取隊(duì)列第一個(gè)非取消狀態(tài)的結(jié)點(diǎn),并將其喚醒     Node s = node.next;     // s 狀態(tài)為非空,或者其為取消狀態(tài),說明 s 是無效節(jié)點(diǎn),此時(shí)需要執(zhí)行 if 里的邏輯     if (s == null || s.waitStatus > 0) {         s = null;         // 以下操作為從尾向前獲取最后一個(gè)非取消狀態(tài)的結(jié)點(diǎn)         for (Node t = tail; t != null && t != node; t = t.prev)             if (t.waitStatus <= 0)                 s = t;     }     if (s != null)         LockSupport.unpark(s.thread); }

這里的尋找隊(duì)列的第一個(gè)非取消狀態(tài)的節(jié)點(diǎn)為啥要從后往前找呢,因?yàn)楣?jié)點(diǎn)入隊(duì)并不是原子操作,如下

怎么掌握AQS

線程自旋時(shí)時(shí)是先執(zhí)行 node.pre = pred, 然后再執(zhí)行 pred.next = node,如果 unparkSuccessor  剛好在這兩者之間執(zhí)行,此時(shí)是找不到 head 的后繼節(jié)點(diǎn)的,如下

怎么掌握AQS

如何利用 AQS 自定義一個(gè)互斥鎖

AQS 通過提供 state 及 FIFO  隊(duì)列的管理,為我們提供了一套通用的實(shí)現(xiàn)鎖的底層方法,基本上定義一個(gè)鎖,都是轉(zhuǎn)為在其內(nèi)部定義 AQS 的子類,調(diào)用 AQS 的底層方法來實(shí)現(xiàn)的,由于 AQS  在底層已經(jīng)為了定義好了這些獲取 state 及 FIFO 隊(duì)列的管理工作,我們要實(shí)現(xiàn)一個(gè)鎖就比較簡單了,我們可以基于 AQS  來實(shí)現(xiàn)一個(gè)非可重入的互斥鎖,如下所示

public class Mutex  {      private Sync sync = new Sync();          public void lock () {         sync.acquire(1);     }          public void unlock () {         sync.release(1);     }      private static class Sync extends AbstractQueuedSynchronizer {         @Override         protected boolean tryAcquire (int arg) {             return compareAndSetState(0, 1);         }          @Override         protected boolean tryRelease (int arg) {             setState(0);             return true;         }          @Override         protected boolean isHeldExclusively () {             return getState() == 1;         }     } }

可以看到區(qū)區(qū)幾行代碼就實(shí)現(xiàn)了,確實(shí)很方便。

感謝各位的閱讀,以上就是“怎么掌握AQS”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)怎么掌握AQS這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

aqs
AI